Package dpkt :: Module dpkt
[hide private]
[frames] | no frames]

Source Code for Module dpkt.dpkt

  1   
  2  # $Id: dpkt.py 372 2006-06-06 12:19:35Z dugsong $ 
  3   
  4  """Simple packet creation and parsing.""" 
  5   
  6  import copy, itertools, struct     
  7   
8 -class Error(Exception): pass
9 -class UnpackError(Error): pass
10 -class NeedData(UnpackError): pass
11 -class PackError(Error): pass
12
13 -class _MetaPacket(type):
14 - def __new__(cls, clsname, clsbases, clsdict):
15 t = type.__new__(cls, clsname, clsbases, clsdict) 16 st = getattr(t, '__hdr__', None) 17 if st: 18 t.__hdr_fields__ = [ x[0] for x in st ] 19 t.__hdr_fmt__ = getattr(t, '__byte_order__', '>') + \ 20 ''.join([ x[1] for x in st ]) 21 t.__hdr_len__ = struct.calcsize(t.__hdr_fmt__) 22 t.__hdr_defaults__ = dict(zip( 23 t.__hdr_fields__, [ x[2] for x in st ])) 24 return t
25
26 -class Packet(object):
27 """Base packet class, with metaclass magic to generate members from 28 self.__hdr__. 29 30 __hdr__ should be defined as a list of (name, structfmt, default) tuples 31 __byte_order__ can be set to override the default ('>') 32 33 Example:: 34 35 >>> class Foo(Packet): 36 ... __hdr__ = (('foo', 'I', 1), ('bar', 'H', 2), ('baz', '4s', 'quux')) 37 ... 38 >>> foo = Foo(bar=3) 39 >>> foo 40 Foo(bar=3) 41 >>> str(foo) 42 '\x00\x00\x00\x01\x00\x03quux' 43 >>> foo.bar 44 3 45 >>> foo.baz 46 'quux' 47 >>> foo.foo = 7 48 >>> foo.baz = 'whee' 49 >>> foo 50 Foo(baz='whee', foo=7, bar=3) 51 >>> Foo('hello, world!') 52 Foo(baz=' wor', foo=1751477356L, bar=28460, data='ld!') 53 """ 54 __metaclass__ = _MetaPacket 55
56 - def __init__(self, *args, **kwargs):
57 """Packet constructor with ([buf], [field=val,...]) prototype. 58 59 Arguments: 60 61 buf -- optional packet buffer to unpack 62 63 Optional keyword arguments correspond to members to set 64 (matching fields in self.__hdr__, or 'data'). 65 """ 66 self.data = '' 67 if args: 68 try: 69 self.unpack(args[0]) 70 except struct.error: 71 if len(args[0]) < self.__hdr_len__: 72 raise NeedData 73 raise UnpackError('invalid %s: %r' % 74 (self.__class__.__name__, args[0])) 75 else: 76 for k in self.__hdr_fields__: 77 setattr(self, k, copy.copy(self.__hdr_defaults__[k])) 78 for k, v in kwargs.iteritems(): 79 setattr(self, k, v)
80
81 - def __len__(self):
82 return self.__hdr_len__ + len(self.data)
83
84 - def __getitem__(self, k):
85 try: return getattr(self, k) 86 except AttributeError: raise KeyError
87
88 - def __repr__(self):
89 l = [ '%s=%r' % (k, getattr(self, k)) 90 for k in self.__hdr_defaults__ 91 if getattr(self, k) != self.__hdr_defaults__[k] ] 92 if self.data: 93 l.append('data=%r' % self.data) 94 return '%s(%s)' % (self.__class__.__name__, ', '.join(l))
95
96 - def __str__(self):
97 return self.pack_hdr() + str(self.data)
98
99 - def pack_hdr(self):
100 """Return packed header string.""" 101 try: 102 return struct.pack(self.__hdr_fmt__, 103 *[ getattr(self, k) for k in self.__hdr_fields__ ]) 104 except struct.error: 105 vals = [] 106 for k in self.__hdr_fields__: 107 v = getattr(self, k) 108 if isinstance(v, tuple): 109 vals.extend(v) 110 else: 111 vals.append(v) 112 try: 113 return struct.pack(self.__hdr_fmt__, *vals) 114 except struct.error, e: 115 raise PackError(str(e))
116
117 - def pack(self):
118 """Return packed header + self.data string.""" 119 return str(self)
120
121 - def unpack(self, buf):
122 """Unpack packet header fields from buf, and set self.data.""" 123 for k, v in itertools.izip(self.__hdr_fields__, 124 struct.unpack(self.__hdr_fmt__, buf[:self.__hdr_len__])): 125 setattr(self, k, v) 126 self.data = buf[self.__hdr_len__:]
127 128 # XXX - ''.join([(len(`chr(x)`)==3) and chr(x) or '.' for x in range(256)]) 129 __vis_filter = """................................ !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[.]^_`abcdefghijklmnopqrstuvwxyz{|}~.................................................................................................................................""" 130
131 -def hexdump(buf, length=16):
132 """Return a hexdump output string of the given buffer.""" 133 n = 0 134 res = [] 135 while buf: 136 line, buf = buf[:length], buf[length:] 137 hexa = ' '.join(['%02x' % ord(x) for x in line]) 138 line = line.translate(__vis_filter) 139 res.append(' %04d: %-*s %s' % (n, length * 3, hexa, line)) 140 n += length 141 return '\n'.join(res)
142 143 try: 144 import dnet, socket
145 - def in_cksum_add(s, buf):
146 return dnet.ip_cksum_add(buf, s)
147 - def in_cksum_done(s):
148 return socket.ntohs(dnet.ip_cksum_carry(s)) & 0xffff
149 except ImportError: 150 import array
151 - def in_cksum_add(s, buf):
152 nleft = len(buf) 153 a = array.array('B', buf) 154 i = 0 155 while nleft > 1: 156 s += a[i] * 256 + a[i+1] 157 i += 2 158 nleft -= 2 159 if nleft: 160 s += a[i] * 256 161 return s
162 - def in_cksum_done(s):
163 while (s >> 16): 164 s = (s >> 16) + (s & 0xffff) 165 return (~s & 0xffff)
166
167 -def in_cksum(buf):
168 """Return computed Internet checksum.""" 169 return in_cksum_done(in_cksum_add(0, buf))
170