From f2f35576877b143bfd870f4600359ee21c7d81a8 Mon Sep 17 00:00:00 2001 From: elie Date: Sat, 25 Apr 2015 21:31:57 +0000 Subject: Fix to end-of-octest sentinel handling: * require strict two-zeros sentinel encoding * recognize EOO sentinel only when explicitly requested by caller of the decoder (via allowEoo=True parameter) --- CHANGES | 4 ++++ pyasn1/codec/ber/decoder.py | 53 ++++++++++++++++++++++-------------------- test/codec/ber/test_decoder.py | 52 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 83 insertions(+), 26 deletions(-) diff --git a/CHANGES b/CHANGES index 57666c1..4b06b4b 100644 --- a/CHANGES +++ b/CHANGES @@ -54,6 +54,10 @@ Revision 0.1.8rc2 - Fix to broken REAL type decoding handling. - Fix to BitString and OctetString decoders dealing with constructed encoding -- it used to be possible to embed other types in substrate. +- Fix to end-of-octest sentinel handling: + * require strict two-zeros sentinel encoding + * recognize EOO sentinel only when explicitly requested by caller + of the decoder (via allowEoo=True parameter) Revision 0.1.7 -------------- diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py index 2da4142..6794f5e 100644 --- a/pyasn1/codec/ber/decoder.py +++ b/pyasn1/codec/ber/decoder.py @@ -36,11 +36,6 @@ class AbstractConstructedDecoder(AbstractDecoder): else: return asn1Spec.clone() -class EndOfOctetsDecoder(AbstractSimpleDecoder): - def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, - length, state, decodeFun, substrateFun): - return eoo.endOfOctets, substrate[length:] - class ExplicitTagDecoder(AbstractSimpleDecoder): protoComponent = univ.Any('') tagFormats = (tag.tagFormatConstructed,) @@ -63,7 +58,7 @@ class ExplicitTagDecoder(AbstractSimpleDecoder): substrate, length ) value, substrate = decodeFun(substrate, asn1Spec, tagSet, length) - terminator, substrate = decodeFun(substrate) + terminator, substrate = decodeFun(substrate, allowEoo=True) if eoo.endOfOctets.isSameTypeWith(terminator) and \ terminator == eoo.endOfOctets: return value, substrate @@ -154,7 +149,8 @@ class BitStringDecoder(AbstractSimpleDecoder): if substrateFun: return substrateFun(r, substrate, length) while substrate: - component, substrate = decodeFun(substrate, self.protoComponent) + component, substrate = decodeFun(substrate, self.protoComponent, + allowEoo=True) if eoo.endOfOctets.isSameTypeWith(component) and \ component == eoo.endOfOctets: break @@ -187,7 +183,8 @@ class OctetStringDecoder(AbstractSimpleDecoder): if substrateFun: return substrateFun(r, substrate, length) while substrate: - component, substrate = decodeFun(substrate, self.protoComponent) + component, substrate = decodeFun(substrate, self.protoComponent, + allowEoo=True) if eoo.endOfOctets.isSameTypeWith(component) and \ component == eoo.endOfOctets: break @@ -360,7 +357,7 @@ class SequenceDecoder(AbstractConstructedDecoder): idx = 0 while substrate: asn1Spec = self._getComponentTagMap(r, idx) - component, substrate = decodeFun(substrate, asn1Spec) + component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True) if eoo.endOfOctets.isSameTypeWith(component) and \ component == eoo.endOfOctets: break @@ -402,7 +399,7 @@ class SequenceOfDecoder(AbstractConstructedDecoder): asn1Spec = r.getComponentType() idx = 0 while substrate: - component, substrate = decodeFun(substrate, asn1Spec) + component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True) if eoo.endOfOctets.isSameTypeWith(component) and \ component == eoo.endOfOctets: break @@ -461,7 +458,8 @@ class ChoiceDecoder(AbstractConstructedDecoder): return substrateFun(r, substrate, length) if r.getTagSet() == tagSet: # explicitly tagged Choice component, substrate = decodeFun(substrate, r.getComponentTagMap()) - eooMarker, substrate = decodeFun(substrate) # eat up EOO marker + # eat up EOO marker + eooMarker, substrate = decodeFun(substrate, allowEoo=True) if not eoo.endOfOctets.isSameTypeWith(eooMarker) or \ eooMarker != eoo.endOfOctets: raise error.PyAsn1Error('No EOO seen before substrate ends') @@ -509,7 +507,7 @@ class AnyDecoder(AbstractSimpleDecoder): if substrateFun: return substrateFun(r, substrate, length) while substrate: - component, substrate = decodeFun(substrate, asn1Spec) + component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True) if eoo.endOfOctets.isSameTypeWith(component) and \ component == eoo.endOfOctets: break @@ -553,7 +551,6 @@ class UTCTimeDecoder(OctetStringDecoder): protoComponent = useful.UTCTime() tagMap = { - eoo.endOfOctets.tagSet: EndOfOctetsDecoder(), univ.Integer.tagSet: IntegerDecoder(), univ.Boolean.tagSet: BooleanDecoder(), univ.BitString.tagSet: BitStringDecoder(), @@ -581,7 +578,7 @@ tagMap = { useful.ObjectDescriptor.tagSet: ObjectDescriptorDecoder(), useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(), useful.UTCTime.tagSet: UTCTimeDecoder() - } +} # Type-to-codec map for ambiguous ASN.1 types typeMap = { @@ -591,7 +588,7 @@ typeMap = { univ.SequenceOf.typeId: SequenceOfDecoder(), univ.Choice.typeId: ChoiceDecoder(), univ.Any.typeId: AnyDecoder() - } +} ( stDecodeTag, stDecodeLength, stGetValueDecoder, stGetValueDecoderByAsn1Spec, stGetValueDecoderByTag, stTryAsExplicitTag, stDecodeValue, @@ -604,20 +601,18 @@ class Decoder: def __init__(self, tagMap, typeMap={}): self.__tagMap = tagMap self.__typeMap = typeMap - self.__endOfOctetsTagSet = eoo.endOfOctets.getTagSet() # Tag & TagSet objects caches self.__tagCache = {} self.__tagSetCache = {} def __call__(self, substrate, asn1Spec=None, tagSet=None, length=None, state=stDecodeTag, recursiveFlag=1, - substrateFun=None): + substrateFun=None, allowEoo=False): if debug.logger & debug.flagDecoder: debug.logger('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate))) fullSubstrate = substrate while state != stStop: if state == stDecodeTag: - # Decode tag if not substrate: raise error.SubstrateUnderrunError( 'Short octet stream on tag decoding' @@ -625,13 +620,25 @@ class Decoder: if not isOctetsType(substrate) and \ not isinstance(substrate, univ.OctetString): raise error.PyAsn1Error('Bad octet stream type') - + # Decode tag firstOctet = substrate[0] substrate = substrate[1:] if firstOctet in self.__tagCache: lastTag = self.__tagCache[firstOctet] else: t = oct2int(firstOctet) + # Look for end-of-octets sentinel + if t == 0: + if substrate and oct2int(substrate[0]) == 0: + if allowEoo: + debug.logger and debug.logger & debug.flagDecoder and debug.logger('end-of-octets sentinel found') + value, substrate = eoo.endOfOctets, substrate[1:] + state = stStop + continue + else: + raise error.PyAsn1Error('Unexpected end-of-contents sentinel') + else: + raise error.PyAsn1Error('Zero tag encountered') tagClass = t&0xC0 tagFormat = t&0x20 tagId = t&0x1F @@ -649,7 +656,7 @@ class Decoder: break lastTag = tag.Tag( tagClass=tagClass, tagFormat=tagFormat, tagId=tagId - ) + ) if tagId < 31: # cache short tags self.__tagCache[firstOctet] = lastTag @@ -780,10 +787,6 @@ class Decoder: state = stDecodeValue else: state = stTryAsExplicitTag - elif tagSet == self.__endOfOctetsTagSet: - concreteDecoder = self.__tagMap[tagSet] - state = stDecodeValue - debug.logger and debug.logger & debug.flagDecoder and debug.logger('end-of-octets found') else: concreteDecoder = None state = stTryAsExplicitTag @@ -823,7 +826,7 @@ class Decoder: if state == stErrorCondition: raise error.PyAsn1Error( '%s not in asn1Spec: %s' % (tagSet, asn1Spec) - ) + ) if debug.logger and debug.logger & debug.flagDecoder: debug.scope.pop() debug.logger('decoder left scope %s, call completed' % debug.scope) diff --git a/test/codec/ber/test_decoder.py b/test/codec/ber/test_decoder.py index cde3ff8..2022216 100644 --- a/test/codec/ber/test_decoder.py +++ b/test/codec/ber/test_decoder.py @@ -1,5 +1,5 @@ from pyasn1.type import tag, namedtype, univ -from pyasn1.codec.ber import decoder +from pyasn1.codec.ber import decoder, eoo from pyasn1.compat.octets import ints2octs, str2octs, null from pyasn1.error import PyAsn1Error from sys import version_info @@ -661,4 +661,54 @@ class AnyDecoderTestCase(unittest.TestCase): substrateFun=lambda a,b,c: (b,c) ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), 7) +class EndOfOctetsTestCase(unittest.TestCase): + def testUnexpectedEoo(self): + try: + decoder.decode(ints2octs((0, 0))) + except PyAsn1Error: + pass + else: + assert 0, 'end-of-contents octets accepted at top level' + + def testExpectedEoo(self): + result, remainder = decoder.decode(ints2octs((0, 0)), allowEoo=True) + assert eoo.endOfOctets.isSameTypeWith(result) and result == eoo.endOfOctets + assert remainder == null + + def testDefiniteNoEoo(self): + try: + decoder.decode(ints2octs((0x23, 0x02, 0x00, 0x00))) + except PyAsn1Error: + pass + else: + assert 0, 'end-of-contents octets accepted inside definite-length encoding' + + def testIndefiniteEoo(self): + result, remainder = decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x00))) + assert result == () and remainder == null, 'incorrect decoding of indefinite length end-of-octets' + + def testNoLongFormEoo(self): + try: + decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x81, 0x00))) + except PyAsn1Error: + pass + else: + assert 0, 'end-of-contents octets accepted with invalid long-form length' + + def testNoConstructedEoo(self): + try: + decoder.decode(ints2octs((0x23, 0x80, 0x20, 0x00))) + except PyAsn1Error: + pass + else: + assert 0, 'end-of-contents octets accepted with invalid constructed encoding' + + def testNoEooData(self): + try: + decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x01, 0x00))) + except PyAsn1Error: + pass + else: + assert 0, 'end-of-contents octets accepted with unexpected data' + if __name__ == '__main__': unittest.main() -- cgit v1.2.1