Package dpkt :: Module sctp
[hide private]
[frames] | no frames]

Source Code for Module dpkt.sctp

 1  # $Id: sctp.py 271 2006-01-11 16:03:33Z dugsong $ 
 2   
 3  """Stream Control Transmission Protocol.""" 
 4   
 5  import dpkt, crc32c 
 6   
 7  # Stream Control Transmission Protocol 
 8  # http://tools.ietf.org/html/rfc2960 
 9   
10  # Chunk Types 
11  DATA                    = 0 
12  INIT                    = 1 
13  INIT_ACK                = 2 
14  SACK                    = 3 
15  HEARTBEAT               = 4 
16  HEARTBEAT_ACK           = 5 
17  ABORT                   = 6 
18  SHUTDOWN                = 7 
19  SHUTDOWN_ACK            = 8 
20  ERROR                   = 9 
21  COOKIE_ECHO             = 10 
22  COOKIE_ACK              = 11 
23  ECNE                    = 12 
24  CWR                     = 13 
25  SHUTDOWN_COMPLETE       = 14 
26   
27 -class SCTP(dpkt.Packet):
28 __hdr__ = ( 29 ('sport', 'H', 0), 30 ('dport', 'H', 0), 31 ('vtag', 'I', 0), 32 ('sum', 'I', 0) 33 ) 34
35 - def unpack(self, buf):
36 dpkt.Packet.unpack(self, buf) 37 l = [] 38 while self.data: 39 chunk = Chunk(self.data) 40 l.append(chunk) 41 self.data = self.data[len(chunk):] 42 self.data = self.chunks = l
43
44 - def __len__(self):
45 return self.__hdr_len__ + \ 46 sum(map(len, self.data))
47
48 - def __str__(self):
49 l = [ str(x) for x in self.data ] 50 if self.sum == 0: 51 s = crc32c.add(0xffffffffL, self.pack_hdr()) 52 for x in l: 53 s = crc32c.add(s, x) 54 self.sum = crc32c.done(s) 55 return self.pack_hdr() + ''.join(l)
56
57 -class Chunk(dpkt.Packet):
58 __hdr__ = ( 59 ('type', 'B', INIT), 60 ('flags', 'B', 0), 61 ('len', 'H', 0) 62 ) 63
64 - def unpack(self, buf):
65 dpkt.Packet.unpack(self, buf) 66 self.data = self.data[:self.len - self.__hdr_len__]
67 68 if __name__ == '__main__': 69 import unittest 70
71 - class SCTPTestCase(unittest.TestCase):
72 - def testPack(self):
73 sctp = SCTP(self.s) 74 self.failUnless(self.s == str(sctp)) 75 sctp.sum = 0 76 self.failUnless(self.s == str(sctp))
77
78 - def testUnpack(self):
79 sctp = SCTP(self.s) 80 self.failUnless(sctp.sport == 32836) 81 self.failUnless(sctp.dport == 80) 82 self.failUnless(len(sctp.chunks) == 1) 83 self.failUnless(len(sctp) == 72) 84 85 chunk = sctp.chunks[0] 86 self.failUnless(chunk.type == INIT) 87 self.failUnless(chunk.len == 60)
88 89 s = '\x80\x44\x00\x50\x00\x00\x00\x00\x30\xba\xef\x54\x01\x00\x00\x3c\x3b\xb9\x9c\x46\x00\x01\xa0\x00\x00\x0a\xff\xff\x2b\x2d\x7e\xb2\x00\x05\x00\x08\x9b\xe6\x18\x9b\x00\x05\x00\x08\x9b\xe6\x18\x9c\x00\x0c\x00\x06\x00\x05\x00\x00\x80\x00\x00\x04\xc0\x00\x00\x04\xc0\x06\x00\x08\x00\x00\x00\x00'
90 unittest.main() 91