diff options
Diffstat (limited to 'tests/codec/ber/test_decoder.py')
-rw-r--r-- | tests/codec/ber/test_decoder.py | 425 |
1 files changed, 242 insertions, 183 deletions
diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py index e72e025..17483db 100644 --- a/tests/codec/ber/test_decoder.py +++ b/tests/codec/ber/test_decoder.py @@ -23,10 +23,11 @@ from pyasn1.type import namedtype from pyasn1.type import opentype from pyasn1.type import univ from pyasn1.type import char +from pyasn1.codec import streaming from pyasn1.codec.ber import decoder from pyasn1.codec.ber import eoo from pyasn1.compat.octets import ints2octs, str2octs, null -from pyasn1.error import PyAsn1Error, SubstrateUnderrunError, UnsupportedSubstrateError +from pyasn1 import error class LargeTagDecoderTestCase(BaseTestCase): @@ -78,7 +79,7 @@ class IntegerDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((2, 1, 12)), asn1Spec=univ.Null() ) == (12, null) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong asn1Spec worked out' @@ -89,7 +90,7 @@ class IntegerDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((34, 1, 12))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -111,7 +112,7 @@ class BooleanDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((33, 1, 1))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -138,24 +139,22 @@ class BitStringDecoderTestCase(BaseTestCase): ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)) ) == ((1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1), null) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testDefModeChunkedSubst(self): - # assert decoder.decode( - # ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)), - # substrateFun=lambda a, b, c: (b, b[c:]) - # ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs('')) + def testDefModeChunkedSubst(self): + assert decoder.decode( + ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs('')) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testIndefModeChunkedSubst(self): - # assert decoder.decode( - # ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), - # substrateFun=lambda a, b, c: (b, str2octs('')) - # ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs('')) + def testIndefModeChunkedSubst(self): + assert decoder.decode( + ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs('')) def testTypeChecking(self): try: decoder.decode(ints2octs((35, 4, 2, 2, 42, 42))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'accepted mis-encoded bit-string constructed out of an integer' @@ -183,22 +182,20 @@ class OctetStringDecoderTestCase(BaseTestCase): ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)) ) == (str2octs('Quick brown fox'), null) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testDefModeChunkedSubst(self): - # assert decoder.decode( - # ints2octs( - # (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), - # substrateFun=lambda a, b, c: (b, b[c:]) - # ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs('')) + def testDefModeChunkedSubst(self): + assert decoder.decode( + ints2octs( + (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs('')) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testIndefModeChunkedSubst(self): - # assert decoder.decode( - # ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, - # 120, 0, 0)), - # substrateFun=lambda a, b, c: (b, str2octs('')) - # ) == (ints2octs( - # (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs('')) + def testIndefModeChunkedSubst(self): + assert decoder.decode( + ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, + 120, 0, 0)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs( + (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs('')) class ExpTaggedOctetStringDecoderTestCase(BaseTestCase): @@ -246,22 +243,20 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase): assert self.o.tagSet == o.tagSet assert self.o.isSameTypeWith(o) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testDefModeSubst(self): - # assert decoder.decode( - # ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), - # substrateFun=lambda a, b, c: (b, b[c:]) - # ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs('')) + def testDefModeSubst(self): + assert decoder.decode( + ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs('')) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testIndefModeSubst(self): - # assert decoder.decode( - # ints2octs(( - # 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, - # 0, 0, 0)), - # substrateFun=lambda a, b, c: (b, str2octs('')) - # ) == (ints2octs( - # (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs('')) + def testIndefModeSubst(self): + assert decoder.decode( + ints2octs(( + 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, + 0, 0, 0)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs( + (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs('')) class NullDecoderTestCase(BaseTestCase): @@ -271,7 +266,7 @@ class NullDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((37, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -340,7 +335,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((6, 5, 85, 4, 128, 129, 0)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'Leading 0x80 tolerated' @@ -350,7 +345,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((6, 7, 1, 0x80, 0x80, 0x80, 0x80, 0x80, 0x7F)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'Leading 0x80 tolerated' @@ -360,7 +355,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((6, 2, 0x80, 1)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'Leading 0x80 tolerated' @@ -370,7 +365,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((6, 2, 0x80, 0x7F)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'Leading 0x80 tolerated' @@ -378,7 +373,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((38, 1, 239))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -386,7 +381,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): def testZeroLength(self): try: decoder.decode(ints2octs((6, 0, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'zero length tolerated' @@ -394,7 +389,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): def testIndefiniteLength(self): try: decoder.decode(ints2octs((6, 128, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'indefinite length tolerated' @@ -402,7 +397,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): def testReservedLength(self): try: decoder.decode(ints2octs((6, 255, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'reserved length tolerated' @@ -479,7 +474,7 @@ class RealDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((41, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -487,7 +482,7 @@ class RealDecoderTestCase(BaseTestCase): def testShortEncoding(self): try: decoder.decode(ints2octs((9, 1, 131))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'accepted too-short real' @@ -684,27 +679,25 @@ class SequenceDecoderTestCase(BaseTestCase): ints2octs((48, 128, 5, 0, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)) ) == (self.s, null) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testWithOptionalAndDefaultedDefModeSubst(self): - # assert decoder.decode( - # ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), - # substrateFun=lambda a, b, c: (b, b[c:]) - # ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) - - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testWithOptionalAndDefaultedIndefModeSubst(self): - # assert decoder.decode( - # ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), - # substrateFun=lambda a, b, c: (b, str2octs('')) - # ) == (ints2octs( - # (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) + def testWithOptionalAndDefaultedDefModeSubst(self): + assert decoder.decode( + ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) + + def testWithOptionalAndDefaultedIndefModeSubst(self): + assert decoder.decode( + ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs( + (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) def testTagFormat(self): try: decoder.decode( ints2octs((16, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -886,7 +879,7 @@ class SequenceDecoderWithUntaggedOpenTypesTestCase(BaseTestCase): decodeOpenTypes=True ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: @@ -1025,7 +1018,7 @@ class SequenceDecoderWithUnaggedSetOfOpenTypesTestCase(BaseTestCase): decodeOpenTypes=True ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: @@ -1172,27 +1165,25 @@ class SetDecoderTestCase(BaseTestCase): ints2octs((49, 128, 5, 0, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)) ) == (self.s, null) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testWithOptionalAndDefaultedDefModeSubst(self): - # assert decoder.decode( - # ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), - # substrateFun=lambda a, b, c: (b, b[c:]) - # ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) - - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testWithOptionalAndDefaultedIndefModeSubst(self): - # assert decoder.decode( - # ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), - # substrateFun=lambda a, b, c: (b, str2octs('')) - # ) == (ints2octs( - # (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) + def testWithOptionalAndDefaultedDefModeSubst(self): + assert decoder.decode( + ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) + + def testWithOptionalAndDefaultedIndefModeSubst(self): + assert decoder.decode( + ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs( + (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) def testTagFormat(self): try: decoder.decode( ints2octs((16, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -1505,28 +1496,26 @@ class AnyDecoderTestCase(BaseTestCase): s = univ.Any('\004\003fox').subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4)) assert decoder.decode(ints2octs((164, 128, 4, 3, 102, 111, 120, 0, 0)), asn1Spec=s) == (s, null) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testByUntaggedSubst(self): - # assert decoder.decode( - # ints2octs((4, 3, 102, 111, 120)), - # asn1Spec=self.s, - # substrateFun=lambda a, b, c: (b, b[c:]) - # ) == (ints2octs((4, 3, 102, 111, 120)), str2octs('')) + def testByUntaggedSubst(self): + assert decoder.decode( + ints2octs((4, 3, 102, 111, 120)), + asn1Spec=self.s, + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs((4, 3, 102, 111, 120)), str2octs('')) - # TODO: Not clear how to deal with substrateFun in stream implementation - # def testTaggedExSubst(self): - # assert decoder.decode( - # ints2octs((164, 5, 4, 3, 102, 111, 120)), - # asn1Spec=self.s, - # substrateFun=lambda a, b, c: (b, b[c:]) - # ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs('')) + def testTaggedExSubst(self): + assert decoder.decode( + ints2octs((164, 5, 4, 3, 102, 111, 120)), + asn1Spec=self.s, + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) + ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs('')) class EndOfOctetsTestCase(BaseTestCase): def testUnexpectedEoo(self): try: decoder.decode(ints2octs((0, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted at top level' @@ -1539,7 +1528,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testDefiniteNoEoo(self): try: decoder.decode(ints2octs((0x23, 0x02, 0x00, 0x00))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted inside definite-length encoding' @@ -1551,7 +1540,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testNoLongFormEoo(self): try: decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x81, 0x00))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted with invalid long-form length' @@ -1559,7 +1548,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testNoConstructedEoo(self): try: decoder.decode(ints2octs((0x23, 0x80, 0x20, 0x00))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted with invalid constructed encoding' @@ -1567,7 +1556,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testNoEooData(self): try: decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x01, 0x00))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted with unexpected data' @@ -1590,41 +1579,51 @@ class NonStringDecoderTestCase(BaseTestCase): self.substrate = ints2octs([48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1]) def testOctetString(self): - s = list(decoder.decodeStream(univ.OctetString(self.substrate), asn1Spec=self.s)) + s = list(decoder.StreamingDecoder( + univ.OctetString(self.substrate), asn1Spec=self.s)) assert [self.s] == s def testAny(self): - s = list(decoder.decodeStream(univ.Any(self.substrate), asn1Spec=self.s)) + s = list(decoder.StreamingDecoder( + univ.Any(self.substrate), asn1Spec=self.s)) assert [self.s] == s class ErrorOnDecodingTestCase(BaseTestCase): def testErrorCondition(self): - decode = decoder.Decoder(decoder.tagMap, decoder.typeMap) - substrate = b'abc' - stream = decoder._asSeekableStream(substrate) + decode = decoder.SingleItemDecoder( + tagMap=decoder.TAG_MAP, typeMap=decoder.TYPE_MAP) + substrate = ints2octs((00, 1, 2)) + stream = streaming.asSeekableStream(substrate) try: - asn1Object = decode(stream) + asn1Object = next(decode(stream)) - except PyAsn1Error: + except error.PyAsn1Error: exc = sys.exc_info()[1] - assert isinstance(exc, PyAsn1Error), ( + assert isinstance(exc, error.PyAsn1Error), ( 'Unexpected exception raised %r' % (exc,)) else: assert False, 'Unexpected decoder result %r' % (asn1Object,) def testRawDump(self): - decode = decoder.Decoder(decoder.tagMap, decoder.typeMap) substrate = ints2octs((31, 8, 2, 1, 1, 131, 3, 2, 1, 12)) - stream = decoder._asSeekableStream(substrate, ) + stream = streaming.asSeekableStream(substrate) + + class SingleItemEncoder(decoder.SingleItemDecoder): + defaultErrorState = decoder.stDumpRawValue - decode.defaultErrorState = decoder.stDumpRawValue + class StreamingDecoder(decoder.StreamingDecoder): + SINGLE_ITEM_DECODER = SingleItemEncoder - asn1Object = decode(stream) - rest = stream.read() + class OneShotDecoder(decoder.Decoder): + STREAMING_DECODER = StreamingDecoder + + d = OneShotDecoder() + + asn1Object, rest = d(stream) assert isinstance(asn1Object, univ.Any), ( 'Unexpected raw dump type %r' % (asn1Object,)) @@ -1643,7 +1642,7 @@ class BinaryFileTestCase(BaseTestCase): out.write(ints2octs((2, 1, 12))) with open(path, "rb") as source: - values = list(decoder.decodeStream(source)) + values = list(decoder.StreamingDecoder(source)) assert values == [12] finally: @@ -1656,9 +1655,10 @@ class BinaryFileTestCase(BaseTestCase): out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))) with open(path, "rb") as source: - values = list(decoder.decodeStream(source)) + values = list(decoder.StreamingDecoder(source)) assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + finally: os.remove(path) @@ -1669,8 +1669,11 @@ class BinaryFileTestCase(BaseTestCase): out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0, 7))) with open(path, "rb") as source: - with self.assertRaises(SubstrateUnderrunError): - _ = list(decoder.decodeStream(source)) + list(decoder.StreamingDecoder(source)) + + except error.EndOfStreamError: + pass + finally: os.remove(path) @@ -1679,7 +1682,7 @@ class BytesIOTestCase(BaseTestCase): def testRead(self): source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)) stream = io.BytesIO(source) - values = list(decoder.decodeStream(stream)) + values = list(decoder.StreamingDecoder(stream)) assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] @@ -1687,8 +1690,114 @@ class UnicodeTestCase(BaseTestCase): def testFail(self): # This ensures that unicode objects in Python 2 & str objects in Python 3.7 cannot be parsed. source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)).decode("latin-1") - with self.assertRaises(UnsupportedSubstrateError): - _ = next(decoder.decodeStream(source)) + try: + next(decoder.StreamingDecoder(source)) + + except error.UnsupportedSubstrateError: + pass + + else: + assert False, 'Tolerated parsing broken unicode strings' + + +class RestartableDecoderTestCase(BaseTestCase): + + class NonBlockingStream(io.BytesIO): + block = False + + def read(self, size=-1): + self.block = not self.block + if self.block: + return # this is what non-blocking streams sometimes do + + return io.BytesIO.read(self, size) + + def setUp(self): + BaseTestCase.setUp(self) + + self.s = univ.SequenceOf(componentType=univ.OctetString()) + self.s.setComponentByPosition(0, univ.OctetString('quick brown')) + source = ints2octs( + (48, 26, + 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, + 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110)) + self.stream = self.NonBlockingStream(source) + + def testPartialReadingFromNonBlockingStream(self): + iterator = iter(decoder.StreamingDecoder(self.stream, asn1Spec=self.s)) + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' not in res.context + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' not in res.context + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 0 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 0 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 0 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 1 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 1 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 1 + + res = next(iterator) + + assert isinstance(res, univ.SequenceOf) + assert res.isValue + assert len(res) == 2 + + try: + next(iterator) + + except StopIteration: + pass + + else: + assert False, 'End of stream not raised' class CompressedFilesTestCase(BaseTestCase): @@ -1699,9 +1808,10 @@ class CompressedFilesTestCase(BaseTestCase): out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))) with gzip.open(path, "rb") as source: - values = list(decoder.decodeStream(source)) + values = list(decoder.StreamingDecoder(source)) assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + finally: os.remove(path) @@ -1715,7 +1825,7 @@ class CompressedFilesTestCase(BaseTestCase): with zipfile.ZipFile(path, "r") as myzip: with myzip.open("data", "r") as source: - values = list(decoder.decodeStream(source)) + values = list(decoder.StreamingDecoder(source)) assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] finally: os.remove(path) @@ -1729,63 +1839,12 @@ class CompressedFilesTestCase(BaseTestCase): with zipfile.ZipFile(path, "r") as myzip: with myzip.open("data", "r") as source: - values = list(decoder.decodeStream(source)) + values = list(decoder.StreamingDecoder(source)) assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] * 1000 finally: os.remove(path) -class CachingStreamWrapperTestCase(BaseTestCase): - def setUp(self): - self.shortText = b"abcdefghij" - self.longText = self.shortText * (io.DEFAULT_BUFFER_SIZE * 5) - self.shortStream = io.BytesIO(self.shortText) - self.longStream = io.BytesIO(self.longText) - - def testReadJustFromCache(self): - wrapper = decoder._CachingStreamWrapper(self.shortStream) - wrapper.read(6) - wrapper.seek(3) - assert wrapper.read(1) == b"d" - assert wrapper.read(1) == b"e" - assert wrapper.tell() == 5 - - def testReadFromCacheAndStream(self): - wrapper = decoder._CachingStreamWrapper(self.shortStream) - wrapper.read(6) - wrapper.seek(3) - assert wrapper.read(4) == b"defg" - assert wrapper.tell() == 7 - - def testReadJustFromStream(self): - wrapper = decoder._CachingStreamWrapper(self.shortStream) - assert wrapper.read(6) == b"abcdef" - assert wrapper.tell() == 6 - - def testPeek(self): - wrapper = decoder._CachingStreamWrapper(self.longStream) - read_bytes = wrapper.peek(io.DEFAULT_BUFFER_SIZE + 73) - assert len(read_bytes) == io.DEFAULT_BUFFER_SIZE + 73 - assert read_bytes.startswith(b"abcdefg") - assert wrapper.tell() == 0 - assert wrapper.read(4) == b"abcd" - - def testMarkedPositionResets(self): - wrapper = decoder._CachingStreamWrapper(self.longStream) - wrapper.read(10) - wrapper._markedPosition = wrapper.tell() - assert wrapper._markedPosition == 10 - - # Reach the maximum capacity of cache - wrapper.read(io.DEFAULT_BUFFER_SIZE) - assert wrapper.tell() == 10 + io.DEFAULT_BUFFER_SIZE - - # The following should clear the cache - wrapper._markedPosition = wrapper.tell() - assert wrapper._markedPosition == 0 - assert len(wrapper._cache.getvalue()) == 0 - - suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) if __name__ == '__main__': |