diff options
author | elie <elie> | 2012-06-09 07:48:38 +0000 |
---|---|---|
committer | elie <elie> | 2012-06-09 07:48:38 +0000 |
commit | ddb52ae4b68d0c75768de8a3230df70280ffadb3 (patch) | |
tree | 0d4f229c18a6b85bf3660e39306e387741a98b79 /pyasn1 | |
parent | 16c058a0f4e8f2e644269bb938005a06574c32cf (diff) | |
download | pyasn1-git-ddb52ae4b68d0c75768de8a3230df70280ffadb3.tar.gz |
decoder's recursiveFlag feature generalized as a user callback function
which is passed an uninitialized object recovered from substrate and
its uninterpreted payload.
Diffstat (limited to 'pyasn1')
-rw-r--r-- | pyasn1/codec/ber/decoder.py | 238 | ||||
-rw-r--r-- | pyasn1/codec/cer/decoder.py | 26 |
2 files changed, 137 insertions, 127 deletions
diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py index 763d6a8..32a9b09 100644 --- a/pyasn1/codec/ber/decoder.py +++ b/pyasn1/codec/ber/decoder.py @@ -7,11 +7,11 @@ from pyasn1 import debug, error class AbstractDecoder: protoComponent = None def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,)) def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) class AbstractSimpleDecoder(AbstractDecoder): @@ -32,21 +32,29 @@ class AbstractConstructedDecoder(AbstractDecoder): class EndOfOctetsDecoder(AbstractSimpleDecoder): def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): return eoo.endOfOctets, substrate[length:] class ExplicitTagDecoder(AbstractSimpleDecoder): protoComponent = univ.Any('') def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] - if not decodeFun: - return self._createComponent(asn1Spec, tagSet, ''), substrate - value, _ = decodeFun(substrate, asn1Spec, tagSet, length) - return value, rest + length, state, decodeFun, substrateFun): + if substrateFun: + return substrateFun( + self._createComponent(asn1Spec, tagSet, ''), + substrate, length + ) + head, tail = substrate[:length], substrate[length:] + value, _ = decodeFun(head, asn1Spec, tagSet, length) + return value, tail def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): + if substrateFun: + return substrateFun( + self._createComponent(asn1Spec, tagSet, ''), + substrate, length + ) value, substrate = decodeFun(substrate, asn1Spec, tagSet, length) terminator, substrate = decodeFun(substrate) if terminator == eoo.endOfOctets: @@ -77,21 +85,21 @@ class IntegerDecoder(AbstractSimpleDecoder): } def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, - state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] - if not substrate: + state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] + if not head: raise error.PyAsn1Error('Empty substrate') - if substrate in self.precomputedValues: - value = self.precomputedValues[substrate] + if head in self.precomputedValues: + value = self.precomputedValues[head] else: - firstOctet = oct2int(substrate[0]) + firstOctet = oct2int(head[0]) if firstOctet & 0x80: value = -1 else: value = 0 - for octet in substrate: + for octet in head: value = value << 8 | oct2int(octet) - return self._createComponent(asn1Spec, tagSet, value), rest + return self._createComponent(asn1Spec, tagSet, value), tail class BooleanDecoder(IntegerDecoder): protoComponent = univ.Boolean(0) @@ -101,41 +109,41 @@ class BooleanDecoder(IntegerDecoder): class BitStringDecoder(AbstractSimpleDecoder): protoComponent = univ.BitString(()) def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, - state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] + state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? - if not substrate: - raise error.PyAsn1Error('Missing initial octet') - trailingBits = oct2int(substrate[0]) + if not head: + raise error.PyAsn1Error('Empty substrate') + trailingBits = oct2int(head[0]) if trailingBits > 7: raise error.PyAsn1Error( 'Trailing bits overflow %s' % trailingBits ) - substrate = substrate[1:] - lsb = p = 0; l = len(substrate)-1; b = () + head = head[1:] + lsb = p = 0; l = len(head)-1; b = () while p <= l: if p == l: lsb = trailingBits j = 7 - o = oct2int(substrate[p]) + o = oct2int(head[p]) while j >= lsb: b = b + ((o>>j)&0x01,) j = j - 1 p = p + 1 - return self._createComponent(asn1Spec, tagSet, b), rest + return self._createComponent(asn1Spec, tagSet, b), tail r = self._createComponent(asn1Spec, tagSet, ()) - if not decodeFun: - return r, substrate - while substrate: - component, substrate = decodeFun(substrate) + if substrateFun: + return substrateFun(r, substrate, length) + while head: + component, head = decodeFun(head) r = r + component - return r, rest + return r, tail def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): r = self._createComponent(asn1Spec, tagSet, '') - if not decodeFun: - return r, substrate + if substrateFun: + return substrateFun(r, substrate, length) while substrate: component, substrate = decodeFun(substrate) if component == eoo.endOfOctets: @@ -150,23 +158,23 @@ class BitStringDecoder(AbstractSimpleDecoder): class OctetStringDecoder(AbstractSimpleDecoder): protoComponent = univ.OctetString('') def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, - state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] + state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check? - return self._createComponent(asn1Spec, tagSet, substrate), rest + return self._createComponent(asn1Spec, tagSet, head), tail r = self._createComponent(asn1Spec, tagSet, '') - if not decodeFun: - return r, substrate - while substrate: - component, substrate = decodeFun(substrate) + if substrateFun: + return substrateFun(r, substrate, length) + while head: + component, head = decodeFun(head) r = r + component - return r, rest + return r, tail def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): r = self._createComponent(asn1Spec, tagSet, '') - if not decodeFun: - return r, substrate + if substrateFun: + return substrateFun(r, substrate, length) while substrate: component, substrate = decodeFun(substrate) if component == eoo.endOfOctets: @@ -181,29 +189,29 @@ class OctetStringDecoder(AbstractSimpleDecoder): class NullDecoder(AbstractSimpleDecoder): protoComponent = univ.Null('') def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] + length, state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] r = self._createComponent(asn1Spec, tagSet) - if substrate: + if head: raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) - return r, rest + return r, tail class ObjectIdentifierDecoder(AbstractSimpleDecoder): protoComponent = univ.ObjectIdentifier(()) def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, - state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] - if not substrate: + state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] + if not head: raise error.PyAsn1Error('Empty substrate') # Get the first subid - subId = oct2int(substrate[0]) + subId = oct2int(head[0]) oid = divmod(subId, 40) index = 1 - substrateLen = len(substrate) + substrateLen = len(head) while index < substrateLen: - subId = oct2int(substrate[index]) + subId = oct2int(head[index]) index = index + 1 if subId > 127: # Construct subid from a number of octets @@ -215,20 +223,20 @@ class ObjectIdentifierDecoder(AbstractSimpleDecoder): raise error.SubstrateUnderrunError( 'Short substrate for sub-OID past %s' % (oid,) ) - nextSubId = oct2int(substrate[index]) + nextSubId = oct2int(head[index]) index = index + 1 subId = (subId << 7) + nextSubId oid = oid + (subId,) - return self._createComponent(asn1Spec, tagSet, oid), rest + return self._createComponent(asn1Spec, tagSet, oid), tail class RealDecoder(AbstractSimpleDecoder): protoComponent = univ.Real() def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] - if not substrate: + length, state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] + if not head: raise error.SubstrateUnderrunError('Short substrate for Real') - fo = oct2int(substrate[0]); substrate = substrate[1:] + fo = oct2int(head[0]); head = head[1:] if fo & 0x40: # infinite value value = fo & 0x01 and '-inf' or 'inf' elif fo & 0x80: # binary enoding @@ -239,9 +247,9 @@ class RealDecoder(AbstractSimpleDecoder): elif fo & 0x02: n = 3 else: - n = oct2int(substrate[0]) - eo, substrate = substrate[:n], substrate[n:] - if not eo or not substrate: + n = oct2int(head[0]) + eo, head = head[:n], head[n:] + if not eo or not head: raise error.PyAsn1Error('Real exponent screwed') e = 0 while eo: # exponent @@ -249,21 +257,21 @@ class RealDecoder(AbstractSimpleDecoder): e |= oct2int(eo[0]) eo = eo[1:] p = 0 - while substrate: # value + while head: # value p <<= 8 - p |= oct2int(substrate[0]) - substrate = substrate[1:] + p |= oct2int(head[0]) + head = head[1:] if fo & 0x40: # sign bit p = -p value = (p, 2, e) elif fo & 0xc0 == 0: # character encoding try: if fo & 0x3 == 0x1: # NR1 - value = (int(substrate), 10, 0) + value = (int(head), 10, 0) elif fo & 0x3 == 0x2: # NR2 - value = float(substrate) + value = float(head) elif fo & 0x3 == 0x3: # NR3 - value = float(substrate) + value = float(head) else: raise error.SubstrateUnderrunError( 'Unknown NR (tag %s)' % fo @@ -278,7 +286,7 @@ class RealDecoder(AbstractSimpleDecoder): raise error.SubstrateUnderrunError( 'Unknown encoding (tag %s)' % fo ) - return self._createComponent(asn1Spec, tagSet, value), rest + return self._createComponent(asn1Spec, tagSet, value), tail class SequenceDecoder(AbstractConstructedDecoder): protoComponent = univ.Sequence() @@ -292,17 +300,15 @@ class SequenceDecoder(AbstractConstructedDecoder): return r.getComponentPositionNearType(t, idx) def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] + length, state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] r = self._createComponent(asn1Spec, tagSet) idx = 0 - if not decodeFun: - return r, substrate - while substrate: + if substrateFun: + return substrateFun(r, substrate, length) + while head: asn1Spec = self._getComponentTagMap(r, idx) - component, substrate = decodeFun( - substrate, asn1Spec - ) + component, head = decodeFun(head, asn1Spec) idx = self._getComponentPositionByType( r, component.getEffectiveTagSet(), idx ) @@ -310,16 +316,16 @@ class SequenceDecoder(AbstractConstructedDecoder): idx = idx + 1 r.setDefaultComponents() r.verifySizeSpec() - return r, rest + return r, tail def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): r = self._createComponent(asn1Spec, tagSet) + if substrateFun: + return substrateFun(r, substrate, length) idx = 0 while substrate: asn1Spec = self._getComponentTagMap(r, idx) - if not decodeFun: - return r, substrate component, substrate = decodeFun(substrate, asn1Spec) if component == eoo.endOfOctets: break @@ -339,29 +345,27 @@ class SequenceDecoder(AbstractConstructedDecoder): class SequenceOfDecoder(AbstractConstructedDecoder): protoComponent = univ.SequenceOf() def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] + length, state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] r = self._createComponent(asn1Spec, tagSet) + if substrateFun: + return substrateFun(r, substrate, length) asn1Spec = r.getComponentType() idx = 0 - if not decodeFun: - return r, substrate - while substrate: - component, substrate = decodeFun( - substrate, asn1Spec - ) + while head: + component, head = decodeFun(head, asn1Spec) r.setComponentByPosition(idx, component, asn1Spec is None) idx = idx + 1 r.verifySizeSpec() - return r, rest + return r, tail def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): r = self._createComponent(asn1Spec, tagSet) + if substrateFun: + return substrateFun(r, substrate, length) asn1Spec = r.getComponentType() idx = 0 - if not decodeFun: - return r, substrate while substrate: component, substrate = decodeFun(substrate, asn1Spec) if component == eoo.endOfOctets: @@ -393,42 +397,45 @@ class SetOfDecoder(SequenceOfDecoder): class ChoiceDecoder(AbstractConstructedDecoder): protoComponent = univ.Choice() def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] + length, state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] r = self._createComponent(asn1Spec, tagSet) - if not decodeFun: - return r, substrate + if substrateFun: + return substrateFun(r, substrate, length) if r.getTagSet() == tagSet: # explicitly tagged Choice - component, substrate = decodeFun( - substrate, r.getComponentTagMap() + component, head = decodeFun( + head, r.getComponentTagMap() ) else: - component, substrate = decodeFun( - substrate, r.getComponentTagMap(), tagSet, length, state + component, head = decodeFun( + head, r.getComponentTagMap(), tagSet, length, state ) if isinstance(component, univ.Choice): effectiveTagSet = component.getEffectiveTagSet() else: effectiveTagSet = component.getTagSet() r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None) - return r, rest + return r, tail indefLenValueDecoder = valueDecoder class AnyDecoder(AbstractSimpleDecoder): protoComponent = univ.Any() def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): if asn1Spec is None or \ asn1Spec is not None and tagSet != asn1Spec.getTagSet(): # untagged Any container, recover inner header substrate length = length + len(fullSubstrate) - len(substrate) substrate = fullSubstrate - substrate, rest = substrate[:length], substrate[length:] - return self._createComponent(asn1Spec, tagSet, value=substrate), rest + if substrateFun: + return substrateFun(self._createComponent(asn1Spec, tagSet), + substrate, length) + head, tail = substrate[:length], substrate[length:] + return self._createComponent(asn1Spec, tagSet, value=head), tail def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun): + length, state, decodeFun, substrateFun): if asn1Spec is not None and tagSet == asn1Spec.getTagSet(): # tagged Any type -- consume header substrate header = '' @@ -441,8 +448,8 @@ class AnyDecoder(AbstractSimpleDecoder): # Any components do not inherit initial tag asn1Spec = self.protoComponent - if not decodeFun: - return r, substrate + if substrateFun: + return substrateFun(r, substrate, length) while substrate: component, substrate = decodeFun(substrate, asn1Spec) if component == eoo.endOfOctets: @@ -541,7 +548,8 @@ class Decoder: self.__tagSetCache = {} def __call__(self, substrate, asn1Spec=None, tagSet=None, - length=None, state=stDecodeTag, recursiveFlag=1): + length=None, state=stDecodeTag, recursiveFlag=1, + substrateFun=None): if debug.logger & debug.flagDecoder: debug.logger('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate))) fullSubstrate = substrate @@ -736,15 +744,17 @@ class Decoder: debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) state = stDecodeValue if state == stDecodeValue: + if recursiveFlag == 0 and not substrateFun: # legacy + substrateFun = lambda a,b,c: (a,b[:c]) if length == -1: # indef length value, substrate = concreteDecoder.indefLenValueDecoder( fullSubstrate, substrate, asn1Spec, tagSet, length, - stGetValueDecoder, recursiveFlag and self or None + stGetValueDecoder, self, substrateFun ) else: value, substrate = concreteDecoder.valueDecoder( fullSubstrate, substrate, asn1Spec, tagSet, length, - stGetValueDecoder, recursiveFlag and self or None + stGetValueDecoder, self, substrateFun ) state = stStop debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, value.prettyPrint(), substrate and debug.hexdump(substrate) or '<none>')) diff --git a/pyasn1/codec/cer/decoder.py b/pyasn1/codec/cer/decoder.py index 44a001d..367feac 100644 --- a/pyasn1/codec/cer/decoder.py +++ b/pyasn1/codec/cer/decoder.py @@ -7,35 +7,35 @@ from pyasn1 import error class BooleanDecoder(decoder.AbstractSimpleDecoder): protoComponent = univ.Boolean(0) def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, - state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] - if not substrate: + state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] + if not head: raise error.PyAsn1Error('Empty substrate') - byte = oct2int(substrate[0]) + byte = oct2int(head[0]) if byte == 0xff: value = 1 elif byte == 0x00: value = 0 else: raise error.PyAsn1Error('Boolean CER violation: %s' % byte) - return self._createComponent(asn1Spec, tagSet, value), rest + return self._createComponent(asn1Spec, tagSet, value), tail class ObjectIdentifierDecoder(decoder.AbstractSimpleDecoder): protoComponent = univ.ObjectIdentifier(()) def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length, - state, decodeFun): - substrate, rest = substrate[:length], substrate[length:] - if not substrate: + state, decodeFun, substrateFun): + head, tail = substrate[:length], substrate[length:] + if not head: raise error.PyAsn1Error('Empty substrate') # Get the first subid - subId = oct2int(substrate[0]) + subId = oct2int(head[0]) oid = divmod(subId, 40) index = 1 - substrateLen = len(substrate) + substrateLen = len(head) while index < substrateLen: - subId = oct2int(substrate[index]) + subId = oct2int(head[index]) index = index + 1 if subId == 128: # ASN.1 spec forbids leading zeros (0x80) in sub-ID OID @@ -52,11 +52,11 @@ class ObjectIdentifierDecoder(decoder.AbstractSimpleDecoder): raise error.SubstrateUnderrunError( 'Short substrate for sub-OID past %s' % (oid,) ) - nextSubId = oct2int(substrate[index]) + nextSubId = oct2int(head[index]) index = index + 1 subId = (subId << 7) + nextSubId oid = oid + (subId,) - return self._createComponent(asn1Spec, tagSet, oid), rest + return self._createComponent(asn1Spec, tagSet, oid), tail tagMap = decoder.tagMap.copy() tagMap.update({ |