root/scapy/asn1fields.py

Revision 934:b8c1666c4723, 9.3 kB (checked in by Phil <phil@secdev.org>, 3 months ago)

Fixed namespace problem with asn1fields (ticket #140)

Line 
1 ## This file is part of Scapy
2 ## See http://www.secdev.org/projects/scapy for more informations
3 ## Copyright (C) Philippe Biondi <phil@secdev.org>
4 ## This program is published under a GPLv2 license
5
6 from asn1.asn1 import *
7 from asn1.ber import *
8 from volatile import *
9 from base_classes import BasePacket
10
11
12 #####################
13 #### ASN1 Fields ####
14 #####################
15
16 class ASN1F_badsequence(Exception):
17     pass
18
19 class ASN1F_element:
20     pass
21
22 class ASN1F_optionnal(ASN1F_element):
23     def __init__(self, field):
24         self._field=field
25     def __getattr__(self, attr):
26         return getattr(self._field,attr)
27     def dissect(self,pkt,s):
28         try:
29             return self._field.dissect(pkt,s)
30         except ASN1F_badsequence:
31             self._field.set_val(pkt,None)
32             return s
33         except BER_Decoding_Error:
34             self._field.set_val(pkt,None)
35             return s
36     def build(self, pkt):
37         if self._field.is_empty(pkt):
38             return ""
39         return self._field.build(pkt)
40
41 class ASN1F_field(ASN1F_element):
42     holds_packets=0
43     islist=0
44
45     ASN1_tag = ASN1_Class_UNIVERSAL.ANY
46     context=ASN1_Class_UNIVERSAL
47    
48     def __init__(self, name, default, context=None):
49         if context is not None:
50             self.context = context
51         self.name = name
52         self.default = default
53
54     def i2repr(self, pkt, x):
55         return repr(x)
56     def i2h(self, pkt, x):
57         return x
58     def any2i(self, pkt, x):
59         return x
60     def m2i(self, pkt, x):
61         return self.ASN1_tag.get_codec(pkt.ASN1_codec).safedec(x, context=self.context)
62     def i2m(self, pkt, x):
63         if x is None:
64             x = 0
65         if isinstance(x, ASN1_Object):
66             if ( self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY
67                  or x.tag == ASN1_Class_UNIVERSAL.RAW
68                  or x.tag == ASN1_Class_UNIVERSAL.ERROR
69                  or self.ASN1_tag == x.tag ):
70                 return x.enc(pkt.ASN1_codec)
71             else:
72                 raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name))
73         return self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x)
74
75     def do_copy(self, x):
76         if hasattr(x, "copy"):
77             return x.copy()
78         if type(x) is list:
79             x = x[:]
80             for i in xrange(len(x)):
81                 if isinstance(x[i], BasePacket):
82                     x[i] = x[i].copy()
83         return x
84
85     def build(self, pkt):
86         return self.i2m(pkt, getattr(pkt, self.name))
87
88     def set_val(self, pkt, val):
89         setattr(pkt, self.name, val)
90     def is_empty(self, pkt):
91         return getattr(pkt,self.name) is None
92    
93     def dissect(self, pkt, s):
94         v,s = self.m2i(pkt, s)
95         self.set_val(pkt, v)
96         return s
97
98     def get_fields_list(self):
99         return [self]
100
101     def __hash__(self):
102         return hash(self.name)
103     def __str__(self):
104         return self.name
105     def __eq__(self, other):
106         return self.name == other
107     def __repr__(self):
108         return self.name
109     def randval(self):
110         return RandInt()
111
112
113 class ASN1F_INTEGER(ASN1F_field):
114     ASN1_tag= ASN1_Class_UNIVERSAL.INTEGER
115     def randval(self):
116         return RandNum(-2**64, 2**64-1)
117
118 class ASN1F_NULL(ASN1F_INTEGER):
119     ASN1_tag= ASN1_Class_UNIVERSAL.NULL
120
121 class ASN1F_enum_INTEGER(ASN1F_INTEGER):
122     def __init__(self, name, default, enum):
123         ASN1F_INTEGER.__init__(self, name, default)
124         i2s = self.i2s = {}
125         s2i = self.s2i = {}
126         if type(enum) is list:
127             keys = xrange(len(enum))
128         else:
129             keys = enum.keys()
130         if filter(lambda x: type(x) is str, keys):
131             i2s,s2i = s2i,i2s
132         for k in keys:
133             i2s[k] = enum[k]
134             s2i[enum[k]] = k
135     def any2i_one(self, pkt, x):
136         if type(x) is str:
137             x = self.s2i[x]
138         return x
139     def i2repr_one(self, pkt, x):
140         return self.i2s.get(x, repr(x))
141    
142     def any2i(self, pkt, x):
143         if type(x) is list:
144             return map(lambda z,pkt=pkt:self.any2i_one(pkt,z), x)
145         else:
146             return self.any2i_one(pkt,x)       
147     def i2repr(self, pkt, x):
148         if type(x) is list:
149             return map(lambda z,pkt=pkt:self.i2repr_one(pkt,z), x)
150         else:
151             return self.i2repr_one(pkt,x)
152
153 class ASN1F_STRING(ASN1F_field):
154     ASN1_tag = ASN1_Class_UNIVERSAL.STRING
155     def randval(self):
156         return RandString(RandNum(0, 1000))
157
158 class ASN1F_PRINTABLE_STRING(ASN1F_STRING):
159     ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
160
161 class ASN1F_BIT_STRING(ASN1F_STRING):
162     ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
163    
164 class ASN1F_IPADDRESS(ASN1F_STRING):
165     ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS   
166
167 class ASN1F_TIME_TICKS(ASN1F_INTEGER):
168     ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS
169
170 class ASN1F_UTC_TIME(ASN1F_STRING):
171     ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME
172
173 class ASN1F_OID(ASN1F_field):
174     ASN1_tag = ASN1_Class_UNIVERSAL.OID
175     def randval(self):
176         return RandOID()
177
178 class ASN1F_SEQUENCE(ASN1F_field):
179     ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
180     def __init__(self, *seq, **kargs):
181         if "ASN1_tag" in kargs:
182             self.ASN1_tag = kargs["ASN1_tag"]
183         self.seq = seq
184     def __repr__(self):
185         return "<%s%r>" % (self.__class__.__name__,self.seq,)
186     def set_val(self, pkt, val):
187         for f in self.seq:
188             f.set_val(pkt,val)
189     def is_empty(self, pkt):
190         for f in self.seq:
191             if not f.is_empty(pkt):
192                 return False
193         return True
194     def get_fields_list(self):
195         return reduce(lambda x,y: x+y.get_fields_list(), self.seq, [])
196     def build(self, pkt):
197         s = reduce(lambda x,y: x+y.build(pkt), self.seq, "")
198         return self.i2m(pkt, s)
199     def dissect(self, pkt, s):
200         codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
201         try:
202             i,s,remain = codec.check_type_check_len(s)
203             for obj in self.seq:
204                 s = obj.dissect(pkt,s)
205             if s:
206                 warning("Too many bytes to decode sequence: [%r]" % s) # XXX not reversible!
207             return remain
208         except ASN1_Error,e:
209             raise ASN1F_badsequence(e)
210
211 class ASN1F_SET(ASN1F_SEQUENCE):
212     ASN1_tag = ASN1_Class_UNIVERSAL.SET
213
214 class ASN1F_SEQUENCE_OF(ASN1F_SEQUENCE):
215     holds_packets = 1
216     islist = 1
217     def __init__(self, name, default, asn1pkt, ASN1_tag=0x30):
218         self.asn1pkt = asn1pkt
219         self.tag = chr(ASN1_tag)
220         self.name = name
221         self.default = default
222     def i2repr(self, pkt, i):
223         if i is None:
224             return []
225         return i
226     def get_fields_list(self):
227         return [self]
228     def set_val(self, pkt, val):
229         ASN1F_field.set_val(self, pkt, val)
230     def is_empty(self, pkt):
231         return ASN1F_field.is_empty(self, pkt)
232     def build(self, pkt):
233         val = getattr(pkt, self.name)
234         if isinstance(val, ASN1_Object) and val.tag == ASN1_Class_UNIVERSAL.RAW:
235             s = val
236         elif val is None:
237             s = ""
238         else:
239             s = "".join(map(str, val ))
240         return self.i2m(pkt, s)
241     def dissect(self, pkt, s):
242         codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
243         i,s1,remain = codec.check_type_check_len(s)
244         lst = []
245         while s1:
246             try:
247                 p = self.asn1pkt(s1)
248             except ASN1F_badsequence,e:
249                 lst.append(packet.Raw(s1))
250                 break
251             lst.append(p)
252             if packet.Raw in p:
253                 s1 = p[packet.Raw].load
254                 del(p[packet.Raw].underlayer.payload)
255             else:
256                 break
257         self.set_val(pkt, lst)
258         return remain
259     def randval(self):
260         return fuzz(self.asn1pkt())
261     def __repr__(self):
262         return "<%s %s>" % (self.__class__.__name__,self.name)
263
264 class ASN1F_PACKET(ASN1F_field):
265     holds_packets = 1
266     def __init__(self, name, default, cls):
267         ASN1_field.__init__(self, name, default)
268         self.cls = cls
269     def i2m(self, pkt, x):
270         if x is None:
271             x = ""
272         return str(x)
273     def extract_packet(self, cls, x):
274         try:
275             c = cls(x)
276         except ASN1F_badsequence:
277             c = packet.Raw(x)
278         cpad = c[packet.Padding]
279         x = ""
280         if cpad is not None:
281             x = cpad.load
282             del(cpad.underlayer.payload)
283         return c,x
284     def m2i(self, pkt, x):
285         return self.extract_packet(self.cls, x)
286
287
288 class ASN1F_CHOICE(ASN1F_PACKET):
289     ASN1_tag = ASN1_Class_UNIVERSAL.NONE
290     def __init__(self, name, default, *args):
291         self.name=name
292         self.choice = {}
293         for p in args:
294             self.choice[p.ASN1_root.ASN1_tag] = p
295 #        self.context=context
296         self.default=default
297     def m2i(self, pkt, x):
298         if len(x) == 0:
299             return packet.Raw(),""
300             raise ASN1_Error("ASN1F_CHOICE: got empty string")
301         if ord(x[0]) not in self.choice:
302             return packet.Raw(x),"" # XXX return RawASN1 packet ? Raise error
303             raise ASN1_Error("Decoding Error: choice [%i] not found in %r" % (ord(x[0]), self.choice.keys()))
304
305         z = ASN1F_PACKET.extract_packet(self, self.choice[ord(x[0])], x)
306         return z
307     def randval(self):
308         return RandChoice(*map(lambda x:fuzz(x()), self.choice.values()))
309            
310    
311 # This import must come in last to avoid problems with cyclic dependencies
312 import packet
Note: See TracBrowser for help on using the browser.