summaryrefslogtreecommitdiff
path: root/pyasn1
diff options
context:
space:
mode:
authorelie <elie>2012-06-09 07:48:38 +0000
committerelie <elie>2012-06-09 07:48:38 +0000
commitddb52ae4b68d0c75768de8a3230df70280ffadb3 (patch)
tree0d4f229c18a6b85bf3660e39306e387741a98b79 /pyasn1
parent16c058a0f4e8f2e644269bb938005a06574c32cf (diff)
downloadpyasn1-git-ddb52ae4b68d0c75768de8a3230df70280ffadb3.tar.gz
decoder's recursiveFlag feature generalized as a user callback function
which is passed an uninitialized object recovered from substrate and its uninterpreted payload.
Diffstat (limited to 'pyasn1')
-rw-r--r--pyasn1/codec/ber/decoder.py238
-rw-r--r--pyasn1/codec/cer/decoder.py26
2 files changed, 137 insertions, 127 deletions
diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py
index 763d6a8..32a9b09 100644
--- a/pyasn1/codec/ber/decoder.py
+++ b/pyasn1/codec/ber/decoder.py
@@ -7,11 +7,11 @@ from pyasn1 import debug, error
class AbstractDecoder:
protoComponent = None
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,))
def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,))
class AbstractSimpleDecoder(AbstractDecoder):
@@ -32,21 +32,29 @@ class AbstractConstructedDecoder(AbstractDecoder):
class EndOfOctetsDecoder(AbstractSimpleDecoder):
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
return eoo.endOfOctets, substrate[length:]
class ExplicitTagDecoder(AbstractSimpleDecoder):
protoComponent = univ.Any('')
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
- if not decodeFun:
- return self._createComponent(asn1Spec, tagSet, ''), substrate
- value, _ = decodeFun(substrate, asn1Spec, tagSet, length)
- return value, rest
+ length, state, decodeFun, substrateFun):
+ if substrateFun:
+ return substrateFun(
+ self._createComponent(asn1Spec, tagSet, ''),
+ substrate, length
+ )
+ head, tail = substrate[:length], substrate[length:]
+ value, _ = decodeFun(head, asn1Spec, tagSet, length)
+ return value, tail
def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
+ if substrateFun:
+ return substrateFun(
+ self._createComponent(asn1Spec, tagSet, ''),
+ substrate, length
+ )
value, substrate = decodeFun(substrate, asn1Spec, tagSet, length)
terminator, substrate = decodeFun(substrate)
if terminator == eoo.endOfOctets:
@@ -77,21 +85,21 @@ class IntegerDecoder(AbstractSimpleDecoder):
}
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
- if not substrate:
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if not head:
raise error.PyAsn1Error('Empty substrate')
- if substrate in self.precomputedValues:
- value = self.precomputedValues[substrate]
+ if head in self.precomputedValues:
+ value = self.precomputedValues[head]
else:
- firstOctet = oct2int(substrate[0])
+ firstOctet = oct2int(head[0])
if firstOctet & 0x80:
value = -1
else:
value = 0
- for octet in substrate:
+ for octet in head:
value = value << 8 | oct2int(octet)
- return self._createComponent(asn1Spec, tagSet, value), rest
+ return self._createComponent(asn1Spec, tagSet, value), tail
class BooleanDecoder(IntegerDecoder):
protoComponent = univ.Boolean(0)
@@ -101,41 +109,41 @@ class BooleanDecoder(IntegerDecoder):
class BitStringDecoder(AbstractSimpleDecoder):
protoComponent = univ.BitString(())
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check?
- if not substrate:
- raise error.PyAsn1Error('Missing initial octet')
- trailingBits = oct2int(substrate[0])
+ if not head:
+ raise error.PyAsn1Error('Empty substrate')
+ trailingBits = oct2int(head[0])
if trailingBits > 7:
raise error.PyAsn1Error(
'Trailing bits overflow %s' % trailingBits
)
- substrate = substrate[1:]
- lsb = p = 0; l = len(substrate)-1; b = ()
+ head = head[1:]
+ lsb = p = 0; l = len(head)-1; b = ()
while p <= l:
if p == l:
lsb = trailingBits
j = 7
- o = oct2int(substrate[p])
+ o = oct2int(head[p])
while j >= lsb:
b = b + ((o>>j)&0x01,)
j = j - 1
p = p + 1
- return self._createComponent(asn1Spec, tagSet, b), rest
+ return self._createComponent(asn1Spec, tagSet, b), tail
r = self._createComponent(asn1Spec, tagSet, ())
- if not decodeFun:
- return r, substrate
- while substrate:
- component, substrate = decodeFun(substrate)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while head:
+ component, head = decodeFun(head)
r = r + component
- return r, rest
+ return r, tail
def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
r = self._createComponent(asn1Spec, tagSet, '')
- if not decodeFun:
- return r, substrate
+ if substrateFun:
+ return substrateFun(r, substrate, length)
while substrate:
component, substrate = decodeFun(substrate)
if component == eoo.endOfOctets:
@@ -150,23 +158,23 @@ class BitStringDecoder(AbstractSimpleDecoder):
class OctetStringDecoder(AbstractSimpleDecoder):
protoComponent = univ.OctetString('')
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
if tagSet[0][1] == tag.tagFormatSimple: # XXX what tag to check?
- return self._createComponent(asn1Spec, tagSet, substrate), rest
+ return self._createComponent(asn1Spec, tagSet, head), tail
r = self._createComponent(asn1Spec, tagSet, '')
- if not decodeFun:
- return r, substrate
- while substrate:
- component, substrate = decodeFun(substrate)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while head:
+ component, head = decodeFun(head)
r = r + component
- return r, rest
+ return r, tail
def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
r = self._createComponent(asn1Spec, tagSet, '')
- if not decodeFun:
- return r, substrate
+ if substrateFun:
+ return substrateFun(r, substrate, length)
while substrate:
component, substrate = decodeFun(substrate)
if component == eoo.endOfOctets:
@@ -181,29 +189,29 @@ class OctetStringDecoder(AbstractSimpleDecoder):
class NullDecoder(AbstractSimpleDecoder):
protoComponent = univ.Null('')
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
r = self._createComponent(asn1Spec, tagSet)
- if substrate:
+ if head:
raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
- return r, rest
+ return r, tail
class ObjectIdentifierDecoder(AbstractSimpleDecoder):
protoComponent = univ.ObjectIdentifier(())
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
- if not substrate:
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if not head:
raise error.PyAsn1Error('Empty substrate')
# Get the first subid
- subId = oct2int(substrate[0])
+ subId = oct2int(head[0])
oid = divmod(subId, 40)
index = 1
- substrateLen = len(substrate)
+ substrateLen = len(head)
while index < substrateLen:
- subId = oct2int(substrate[index])
+ subId = oct2int(head[index])
index = index + 1
if subId > 127:
# Construct subid from a number of octets
@@ -215,20 +223,20 @@ class ObjectIdentifierDecoder(AbstractSimpleDecoder):
raise error.SubstrateUnderrunError(
'Short substrate for sub-OID past %s' % (oid,)
)
- nextSubId = oct2int(substrate[index])
+ nextSubId = oct2int(head[index])
index = index + 1
subId = (subId << 7) + nextSubId
oid = oid + (subId,)
- return self._createComponent(asn1Spec, tagSet, oid), rest
+ return self._createComponent(asn1Spec, tagSet, oid), tail
class RealDecoder(AbstractSimpleDecoder):
protoComponent = univ.Real()
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
- if not substrate:
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if not head:
raise error.SubstrateUnderrunError('Short substrate for Real')
- fo = oct2int(substrate[0]); substrate = substrate[1:]
+ fo = oct2int(head[0]); head = head[1:]
if fo & 0x40: # infinite value
value = fo & 0x01 and '-inf' or 'inf'
elif fo & 0x80: # binary enoding
@@ -239,9 +247,9 @@ class RealDecoder(AbstractSimpleDecoder):
elif fo & 0x02:
n = 3
else:
- n = oct2int(substrate[0])
- eo, substrate = substrate[:n], substrate[n:]
- if not eo or not substrate:
+ n = oct2int(head[0])
+ eo, head = head[:n], head[n:]
+ if not eo or not head:
raise error.PyAsn1Error('Real exponent screwed')
e = 0
while eo: # exponent
@@ -249,21 +257,21 @@ class RealDecoder(AbstractSimpleDecoder):
e |= oct2int(eo[0])
eo = eo[1:]
p = 0
- while substrate: # value
+ while head: # value
p <<= 8
- p |= oct2int(substrate[0])
- substrate = substrate[1:]
+ p |= oct2int(head[0])
+ head = head[1:]
if fo & 0x40: # sign bit
p = -p
value = (p, 2, e)
elif fo & 0xc0 == 0: # character encoding
try:
if fo & 0x3 == 0x1: # NR1
- value = (int(substrate), 10, 0)
+ value = (int(head), 10, 0)
elif fo & 0x3 == 0x2: # NR2
- value = float(substrate)
+ value = float(head)
elif fo & 0x3 == 0x3: # NR3
- value = float(substrate)
+ value = float(head)
else:
raise error.SubstrateUnderrunError(
'Unknown NR (tag %s)' % fo
@@ -278,7 +286,7 @@ class RealDecoder(AbstractSimpleDecoder):
raise error.SubstrateUnderrunError(
'Unknown encoding (tag %s)' % fo
)
- return self._createComponent(asn1Spec, tagSet, value), rest
+ return self._createComponent(asn1Spec, tagSet, value), tail
class SequenceDecoder(AbstractConstructedDecoder):
protoComponent = univ.Sequence()
@@ -292,17 +300,15 @@ class SequenceDecoder(AbstractConstructedDecoder):
return r.getComponentPositionNearType(t, idx)
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
r = self._createComponent(asn1Spec, tagSet)
idx = 0
- if not decodeFun:
- return r, substrate
- while substrate:
+ if substrateFun:
+ return substrateFun(r, substrate, length)
+ while head:
asn1Spec = self._getComponentTagMap(r, idx)
- component, substrate = decodeFun(
- substrate, asn1Spec
- )
+ component, head = decodeFun(head, asn1Spec)
idx = self._getComponentPositionByType(
r, component.getEffectiveTagSet(), idx
)
@@ -310,16 +316,16 @@ class SequenceDecoder(AbstractConstructedDecoder):
idx = idx + 1
r.setDefaultComponents()
r.verifySizeSpec()
- return r, rest
+ return r, tail
def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
r = self._createComponent(asn1Spec, tagSet)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
idx = 0
while substrate:
asn1Spec = self._getComponentTagMap(r, idx)
- if not decodeFun:
- return r, substrate
component, substrate = decodeFun(substrate, asn1Spec)
if component == eoo.endOfOctets:
break
@@ -339,29 +345,27 @@ class SequenceDecoder(AbstractConstructedDecoder):
class SequenceOfDecoder(AbstractConstructedDecoder):
protoComponent = univ.SequenceOf()
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
r = self._createComponent(asn1Spec, tagSet)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
asn1Spec = r.getComponentType()
idx = 0
- if not decodeFun:
- return r, substrate
- while substrate:
- component, substrate = decodeFun(
- substrate, asn1Spec
- )
+ while head:
+ component, head = decodeFun(head, asn1Spec)
r.setComponentByPosition(idx, component, asn1Spec is None)
idx = idx + 1
r.verifySizeSpec()
- return r, rest
+ return r, tail
def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
r = self._createComponent(asn1Spec, tagSet)
+ if substrateFun:
+ return substrateFun(r, substrate, length)
asn1Spec = r.getComponentType()
idx = 0
- if not decodeFun:
- return r, substrate
while substrate:
component, substrate = decodeFun(substrate, asn1Spec)
if component == eoo.endOfOctets:
@@ -393,42 +397,45 @@ class SetOfDecoder(SequenceOfDecoder):
class ChoiceDecoder(AbstractConstructedDecoder):
protoComponent = univ.Choice()
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
+ length, state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
r = self._createComponent(asn1Spec, tagSet)
- if not decodeFun:
- return r, substrate
+ if substrateFun:
+ return substrateFun(r, substrate, length)
if r.getTagSet() == tagSet: # explicitly tagged Choice
- component, substrate = decodeFun(
- substrate, r.getComponentTagMap()
+ component, head = decodeFun(
+ head, r.getComponentTagMap()
)
else:
- component, substrate = decodeFun(
- substrate, r.getComponentTagMap(), tagSet, length, state
+ component, head = decodeFun(
+ head, r.getComponentTagMap(), tagSet, length, state
)
if isinstance(component, univ.Choice):
effectiveTagSet = component.getEffectiveTagSet()
else:
effectiveTagSet = component.getTagSet()
r.setComponentByType(effectiveTagSet, component, 0, asn1Spec is None)
- return r, rest
+ return r, tail
indefLenValueDecoder = valueDecoder
class AnyDecoder(AbstractSimpleDecoder):
protoComponent = univ.Any()
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
if asn1Spec is None or \
asn1Spec is not None and tagSet != asn1Spec.getTagSet():
# untagged Any container, recover inner header substrate
length = length + len(fullSubstrate) - len(substrate)
substrate = fullSubstrate
- substrate, rest = substrate[:length], substrate[length:]
- return self._createComponent(asn1Spec, tagSet, value=substrate), rest
+ if substrateFun:
+ return substrateFun(self._createComponent(asn1Spec, tagSet),
+ substrate, length)
+ head, tail = substrate[:length], substrate[length:]
+ return self._createComponent(asn1Spec, tagSet, value=head), tail
def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun):
+ length, state, decodeFun, substrateFun):
if asn1Spec is not None and tagSet == asn1Spec.getTagSet():
# tagged Any type -- consume header substrate
header = ''
@@ -441,8 +448,8 @@ class AnyDecoder(AbstractSimpleDecoder):
# Any components do not inherit initial tag
asn1Spec = self.protoComponent
- if not decodeFun:
- return r, substrate
+ if substrateFun:
+ return substrateFun(r, substrate, length)
while substrate:
component, substrate = decodeFun(substrate, asn1Spec)
if component == eoo.endOfOctets:
@@ -541,7 +548,8 @@ class Decoder:
self.__tagSetCache = {}
def __call__(self, substrate, asn1Spec=None, tagSet=None,
- length=None, state=stDecodeTag, recursiveFlag=1):
+ length=None, state=stDecodeTag, recursiveFlag=1,
+ substrateFun=None):
if debug.logger & debug.flagDecoder:
debug.logger('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate)))
fullSubstrate = substrate
@@ -736,15 +744,17 @@ class Decoder:
debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
state = stDecodeValue
if state == stDecodeValue:
+ if recursiveFlag == 0 and not substrateFun: # legacy
+ substrateFun = lambda a,b,c: (a,b[:c])
if length == -1: # indef length
value, substrate = concreteDecoder.indefLenValueDecoder(
fullSubstrate, substrate, asn1Spec, tagSet, length,
- stGetValueDecoder, recursiveFlag and self or None
+ stGetValueDecoder, self, substrateFun
)
else:
value, substrate = concreteDecoder.valueDecoder(
fullSubstrate, substrate, asn1Spec, tagSet, length,
- stGetValueDecoder, recursiveFlag and self or None
+ stGetValueDecoder, self, substrateFun
)
state = stStop
debug.logger and debug.logger & debug.flagDecoder and debug.logger('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, value.prettyPrint(), substrate and debug.hexdump(substrate) or '<none>'))
diff --git a/pyasn1/codec/cer/decoder.py b/pyasn1/codec/cer/decoder.py
index 44a001d..367feac 100644
--- a/pyasn1/codec/cer/decoder.py
+++ b/pyasn1/codec/cer/decoder.py
@@ -7,35 +7,35 @@ from pyasn1 import error
class BooleanDecoder(decoder.AbstractSimpleDecoder):
protoComponent = univ.Boolean(0)
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
- if not substrate:
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if not head:
raise error.PyAsn1Error('Empty substrate')
- byte = oct2int(substrate[0])
+ byte = oct2int(head[0])
if byte == 0xff:
value = 1
elif byte == 0x00:
value = 0
else:
raise error.PyAsn1Error('Boolean CER violation: %s' % byte)
- return self._createComponent(asn1Spec, tagSet, value), rest
+ return self._createComponent(asn1Spec, tagSet, value), tail
class ObjectIdentifierDecoder(decoder.AbstractSimpleDecoder):
protoComponent = univ.ObjectIdentifier(())
def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun):
- substrate, rest = substrate[:length], substrate[length:]
- if not substrate:
+ state, decodeFun, substrateFun):
+ head, tail = substrate[:length], substrate[length:]
+ if not head:
raise error.PyAsn1Error('Empty substrate')
# Get the first subid
- subId = oct2int(substrate[0])
+ subId = oct2int(head[0])
oid = divmod(subId, 40)
index = 1
- substrateLen = len(substrate)
+ substrateLen = len(head)
while index < substrateLen:
- subId = oct2int(substrate[index])
+ subId = oct2int(head[index])
index = index + 1
if subId == 128:
# ASN.1 spec forbids leading zeros (0x80) in sub-ID OID
@@ -52,11 +52,11 @@ class ObjectIdentifierDecoder(decoder.AbstractSimpleDecoder):
raise error.SubstrateUnderrunError(
'Short substrate for sub-OID past %s' % (oid,)
)
- nextSubId = oct2int(substrate[index])
+ nextSubId = oct2int(head[index])
index = index + 1
subId = (subId << 7) + nextSubId
oid = oid + (subId,)
- return self._createComponent(asn1Spec, tagSet, oid), rest
+ return self._createComponent(asn1Spec, tagSet, oid), tail
tagMap = decoder.tagMap.copy()
tagMap.update({