You are here: Home > Publications > RIPE Labs > Shane Kerr > PCAP

PCAP

Shane Kerr — 12 May 2015
pcap.py file

Python Source icon pcap.py — Python Source, 4 KB (4305 bytes)

File contents

import struct
import socket
import re

# pcap format is documented here:
#     http://wiki.wireshark.org/Development/LibpcapFileFormat
#
# pcap header:
#   typedef struct pcap_hdr_s {
#        guint32 magic_number;   /* magic number */
#        guint16 version_major;  /* major version number */
#        guint16 version_minor;  /* minor version number */
#        gint32  thiszone;       /* GMT to local correction */
#        guint32 sigfigs;        /* accuracy of timestamps */
#        guint32 snaplen;        /* max length of captured packets, in octets */
#        guint32 network;        /* data link type */
#   } pcap_hdr_t;
#
# record (packet) header:
#   typedef struct pcaprec_hdr_s {
#        guint32 ts_sec;         /* timestamp seconds */
#        guint32 ts_usec;        /* timestamp microseconds */
#        guint32 incl_len;       /* number of octets of packet saved in file */
#        guint32 orig_len;       /* actual length of packet */
#   } pcaprec_hdr_t;

class pcap_error(Exception):
    def __init__(self, value):
        self.value = value

class pcap_file:
    def __init__(self, fname):
        self.fname = fname
        self.fp = open(fname, "rb")
        self.header = self.fp.read(24) 
        if len(self.header) != 24:
             raise pcap_error("pcap header is too small")

        # figure out our byte order
        (magic_number,) = struct.unpack("<I", self.header[0:4])
        if magic_number == 0xa1b2c3d4L:
            self.intfmt = "<"
        else:
            (magic_number,) = struct.unpack(">I", self.header[0:4])
            if magic_number == 0xa1b2c3d4:
                self.intfmt = ">"
            else:
                raise pcap_error("pcap header has bad magic number")

        # check the version
        self.version = "%d.%d" % struct.unpack(self.intfmt + "HH", 
                                               self.header[4:8])
        if self.version != "2.4":
            raise pcap_error("pcap version %s not understood" % self.version)

        # read the remaining network header details
        (self.thiszone, self.sigflags, self.snaplen, self.network) = \
                struct.unpack(self.intfmt + "iIII", self.header[8:])

    def next(self):
        pkt_hdr = self.fp.read(16)
        if pkt_hdr == '':
            return (None, None, None)
        if len(pkt_hdr) != 16:
            raise pcap_error("packet header has bad length")
        (ts_sec, ts_usec, incl_len, orig_len) = \
                struct.unpack(self.intfmt + "IIII", pkt_hdr)
        data = self.fp.read(incl_len)
        if len(data) != incl_len:
            raise pcap_error("data in file does not match packet length")
        return (ts_sec + (ts_usec * 0.000001), orig_len, data)

# get the Ethernet header information
#    http://en.wikipedia.org/wiki/Ethernet
def eth_pkt_parse(pkt):
    dst = pkt[0:6]
    src = pkt[6:12]
    type = (ord(pkt[12]) << 8) | ord(pkt[13])
    return (src, dst, type, pkt[14:])

# get the IP header information
#    http://en.wikipedia.org/wiki/IPv4
#    http://en.wikipedia.org/wiki/IPv6
def ip_pkt_parse(pkt):
    version = (ord(pkt[0]) >> 4) & 0xF
    if version == 4:
        proto = ord(pkt[9])
        src = socket.inet_ntoa(pkt[12:16])
        dst = socket.inet_ntoa(pkt[16:20])
        len = ord(pkt[0]) & 0xF
        payload = pkt[(len * 4):]
    elif version == 6:
        proto = ord(pkt[6])
        src = socket.inet_ntop(socket.AF_INET6, pkt[8:24])
        dst = socket.inet_ntop(socket.AF_INET6, pkt[24:40])
        payload = pkt[40:]
    else:
        src = None
        dst = None
        proto = None
        payload = pkt
    return (version, src, dst, proto, payload)

# get the UDP header information
#     http://en.wikipedia.org/wiki/User_Datagram_Protocol
def udp_pkt_parse(pkt):
    src_port = (ord(pkt[0]) << 8) | ord(pkt[1])
    dst_port = (ord(pkt[2]) << 8) | ord(pkt[3])
    len = (ord(pkt[4]) << 8) | ord(pkt[5])
    return (src_port, dst_port, pkt[8:len])

# get the DNS opcode from a packet
def dns_opcode(pkt):
    return (ord(pkt[2]) >> 3) & 0xF
    
# convert a string to a MAC address
def str2mac(s):
    if not re.search('^[0-9a-fA-F]{2}(:[0-9a-fA-F]{2}){5}$', s):
       return None
    mac = ""
    for byte_str in s.split(':'):
        mac = mac + chr(int(byte_str, 16))
    return mac