1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
#! /usr/bin/env python
# http://trac.secdev.org/scapy/ticket/162
# scapy.contrib.description = BGP
# scapy.contrib.status = loads
from scapy.packet import *
from scapy.fields import *
from scapy.layers.inet import TCP
class BGPIPField(Field):
"""Represents how bgp dose an ip prefix in (length, prefix)"""
def mask2iplen(self,mask):
"""turn the mask into the length in bytes of the ip field"""
return (mask + 7) // 8
def h2i(self, pkt, h):
"""human x.x.x.x/y to internal"""
ip,mask = re.split( '/', h)
return int(mask), ip
def i2h( self, pkt, i):
mask, ip = i
return ip + '/' + str( mask )
def i2repr( self, pkt, i):
"""make it look nice"""
return self.i2h(pkt,i)
def i2len(self, pkt, i):
"""rely on integer division"""
mask, ip = i
return self.mask2iplen(mask) + 1
def i2m(self, pkt, i):
"""internal (ip as bytes, mask as int) to machine"""
mask, ip = i
ip = inet_aton( ip )
return struct.pack(">B",mask) + ip[:self.mask2iplen(mask)]
def addfield(self, pkt, s, val):
return s+self.i2m(pkt, val)
def getfield(self, pkt, s):
l = self.mask2iplen( struct.unpack(">B",s[0])[0] ) + 1
return s[l:], self.m2i(pkt,s[:l])
def m2i(self,pkt,m):
mask = struct.unpack(">B",m[0])[0]
ip = "".join( [ m[i + 1] if i < self.mask2iplen(mask) else '\x00' for i in range(4)] )
return (mask,inet_ntoa(ip))
class BGPHeader(Packet):
"""The first part of any BGP packet"""
name = "BGP header"
fields_desc = [
XBitField("marker",0xffffffffffffffffffffffffffffffff, 0x80 ),
ShortField("len", None),
ByteEnumField("type", 4, {0:"none", 1:"open",2:"update",3:"notification",4:"keep_alive"}),
]
def post_build(self, p, pay):
if self.len is None and pay:
l = len(p) + len(pay)
p = p[:16]+struct.pack("!H", l)+p[18:]
return p+pay
class BGPOptionalParameter(Packet):
"""Format of optional Parameter for BGP Open"""
name = "BGP Optional Parameters"
fields_desc = [
ByteField("type", 2),
ByteField("len", None),
StrLenField("value", "", length_from = lambda x: x.len),
]
def post_build(self,p,pay):
if self.len is None:
l = len(p) - 2 # 2 is length without value
p = p[:1]+struct.pack("!B", l)+p[2:]
return p+pay
def extract_padding(self, p):
"""any thing after this packet is extracted is padding"""
return "",p
class BGPOpen(Packet):
""" Opens a new BGP session"""
name = "BGP Open Header"
fields_desc = [
ByteField("version", 4),
ShortField("AS", 0),
ShortField("hold_time", 0),
IPField("bgp_id","0.0.0.0"),
ByteField("opt_parm_len", None),
PacketListField("opt_parm",[], BGPOptionalParameter, length_from=lambda p:p.opt_parm_len),
]
def post_build(self, p, pay):
if self.opt_parm_len is None:
l = len(p) - 10 # 10 is regular length with no additional options
p = p[:9] + struct.pack("!B",l) +p[10:]
return p+pay
class BGPAuthenticationData(Packet):
name = "BGP Authentication Data"
fields_desc = [
ByteField("AuthenticationCode", 0),
ByteField("FormMeaning", 0),
FieldLenField("Algorithm", 0),
]
class BGPPathAttribute(Packet):
"the attribute of total path"
name = "BGP Attribute fields"
fields_desc = [
FlagsField("flags", 0x40, 8, ["NA0","NA1","NA2","NA3","Extended-Length","Partial","Transitive","Optional"]), #Extened leght may not work
ByteEnumField("type", 1, {1:"ORIGIN", 2:"AS_PATH", 3:"NEXT_HOP", 4:"MULTI_EXIT_DISC", 5:"LOCAL_PREF", 6:"ATOMIC_AGGREGATE", 7:"AGGREGATOR"}),
ByteField("attr_len", None),
StrLenField("value", "", length_from = lambda p: p.attr_len),
]
def post_build(self, p, pay):
if self.attr_len is None:
l = len(p) - 3 # 3 is regular length with no additional options
p = p[:2] + struct.pack("!B",l) +p[3:]
return p+pay
def extract_padding(self, p):
"""any thing after this packet is extracted is padding"""
return "",p
class BGPUpdate(Packet):
"""Update the routes WithdrawnRoutes = UnfeasiableRoutes"""
name = "BGP Update fields"
fields_desc = [
ShortField("withdrawn_len", None),
FieldListField("withdrawn",[], BGPIPField("","0.0.0.0/0"), length_from=lambda p:p.withdrawn_len),
ShortField("tp_len", None),
PacketListField("total_path", [], BGPPathAttribute, length_from = lambda p: p.tp_len),
FieldListField("nlri",[], BGPIPField("","0.0.0.0/0"), length_from=lambda p:p.underlayer.len - 23 - p.tp_len - p.withdrawn_len), # len should be BGPHeader.len
]
def post_build(self,p,pay):
wl = self.withdrawn_len
subpacklen = lambda p: len ( str( p ))
subfieldlen = lambda p: BGPIPField("", "0.0.0.0/0").i2len(self, p )
if wl is None:
wl = sum ( map ( subfieldlen , self.withdrawn))
p = p[:0]+struct.pack("!H", wl)+p[2:]
if self.tp_len is None:
l = sum ( map ( subpacklen , self.total_path))
p = p[:2+wl]+struct.pack("!H", l)+p[4+wl:]
return p+pay
class BGPNotification(Packet):
name = "BGP Notification fields"
fields_desc = [
ByteEnumField("ErrorCode",0,{1:"Message Header Error",2:"OPEN Message Error",3:"UPDATE Messsage Error",4:"Hold Timer Expired",5:"Finite State Machine",6:"Cease"}),
ByteEnumField("ErrorSubCode",0,{1:"MessageHeader",2:"OPENMessage",3:"UPDATEMessage"}),
LongField("Data", 0),
]
class BGPErrorSubcodes(Packet):
name = "BGP Error Subcodes"
Fields_desc = [
ByteEnumField("MessageHeader",0,{1:"Connection Not Synchronized",2:"Bad Message Length",3:"Bad Messsage Type"}),
ByteEnumField("OPENMessage",0,{1:"Unsupported Version Number",2:"Bad Peer AS",3:"Bad BGP Identifier",4:"Unsupported Optional Parameter",5:"Authentication Failure",6:"Unacceptable Hold Time"}),
ByteEnumField("UPDATEMessage",0,{1:"Malformed Attribute List",2:"Unrecognized Well-Known Attribute",3:"Missing Well-Known Attribute",4:"Attribute Flags Error",5:"Attribute Length Error",6:"Invalid ORIGIN Attribute",7:"AS Routing Loop",8:"Invalid NEXT_HOP Attribute",9:"Optional Attribute Error",10:"Invalid Network Field",11:"Malformed AS_PATH"}),
]
bind_layers( TCP, BGPHeader, dport=179)
bind_layers( TCP, BGPHeader, sport=179)
bind_layers( BGPHeader, BGPOpen, type=1)
bind_layers( BGPHeader, BGPUpdate, type=2)
bind_layers( BGPHeader, BGPHeader, type=4)
if __name__ == "__main__":
interact(mydict=globals(), mybanner="BGP addon .05")
|