diff options
Diffstat (limited to 'tests/codec')
-rw-r--r-- | tests/codec/__main__.py | 3 | ||||
-rw-r--r-- | tests/codec/ber/test_decoder.py | 330 | ||||
-rw-r--r-- | tests/codec/ber/test_encoder.py | 8 | ||||
-rw-r--r-- | tests/codec/cer/test_decoder.py | 1 | ||||
-rw-r--r-- | tests/codec/cer/test_encoder.py | 1 | ||||
-rw-r--r-- | tests/codec/test_streaming.py | 75 |
6 files changed, 363 insertions, 55 deletions
diff --git a/tests/codec/__main__.py b/tests/codec/__main__.py index 8f2aaff..ec81462 100644 --- a/tests/codec/__main__.py +++ b/tests/codec/__main__.py @@ -7,7 +7,8 @@ import unittest suite = unittest.TestLoader().loadTestsFromNames( - ['tests.codec.ber.__main__.suite', + ['tests.codec.test_streaming.suite', + 'tests.codec.ber.__main__.suite', 'tests.codec.cer.__main__.suite', 'tests.codec.der.__main__.suite', 'tests.codec.native.__main__.suite'] diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py index 639cd54..4e2a984 100644 --- a/tests/codec/ber/test_decoder.py +++ b/tests/codec/ber/test_decoder.py @@ -4,8 +4,13 @@ # Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> # License: http://snmplabs.com/pyasn1/license.html # +import gzip +import io +import os import sys +import tempfile import unittest +import zipfile from tests.base import BaseTestCase @@ -14,10 +19,11 @@ from pyasn1.type import namedtype from pyasn1.type import opentype from pyasn1.type import univ from pyasn1.type import char +from pyasn1.codec import streaming from pyasn1.codec.ber import decoder from pyasn1.codec.ber import eoo from pyasn1.compat.octets import ints2octs, str2octs, null -from pyasn1.error import PyAsn1Error +from pyasn1 import error class LargeTagDecoderTestCase(BaseTestCase): @@ -69,7 +75,7 @@ class IntegerDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((2, 1, 12)), asn1Spec=univ.Null() ) == (12, null) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong asn1Spec worked out' @@ -80,7 +86,7 @@ class IntegerDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((34, 1, 12))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -102,7 +108,7 @@ class BooleanDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((33, 1, 1))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -132,19 +138,19 @@ class BitStringDecoderTestCase(BaseTestCase): def testDefModeChunkedSubst(self): assert decoder.decode( ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)), - substrateFun=lambda a, b, c: (b, b[c:]) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs('')) def testIndefModeChunkedSubst(self): assert decoder.decode( ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), - substrateFun=lambda a, b, c: (b, str2octs('')) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs('')) def testTypeChecking(self): try: decoder.decode(ints2octs((35, 4, 2, 2, 42, 42))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'accepted mis-encoded bit-string constructed out of an integer' @@ -176,14 +182,14 @@ class OctetStringDecoderTestCase(BaseTestCase): assert decoder.decode( ints2octs( (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), - substrateFun=lambda a, b, c: (b, b[c:]) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs('')) def testIndefModeChunkedSubst(self): assert decoder.decode( ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), - substrateFun=lambda a, b, c: (b, str2octs('')) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs( (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs('')) @@ -236,7 +242,7 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase): def testDefModeSubst(self): assert decoder.decode( ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), - substrateFun=lambda a, b, c: (b, b[c:]) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs('')) def testIndefModeSubst(self): @@ -244,7 +250,7 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase): ints2octs(( 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), - substrateFun=lambda a, b, c: (b, str2octs('')) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs( (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs('')) @@ -256,7 +262,7 @@ class NullDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((37, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -325,7 +331,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((6, 5, 85, 4, 128, 129, 0)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'Leading 0x80 tolerated' @@ -335,7 +341,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((6, 7, 1, 0x80, 0x80, 0x80, 0x80, 0x80, 0x7F)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'Leading 0x80 tolerated' @@ -345,7 +351,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((6, 2, 0x80, 1)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'Leading 0x80 tolerated' @@ -355,7 +361,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((6, 2, 0x80, 0x7F)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'Leading 0x80 tolerated' @@ -363,7 +369,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((38, 1, 239))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -371,7 +377,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): def testZeroLength(self): try: decoder.decode(ints2octs((6, 0, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'zero length tolerated' @@ -379,7 +385,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): def testIndefiniteLength(self): try: decoder.decode(ints2octs((6, 128, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'indefinite length tolerated' @@ -387,7 +393,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase): def testReservedLength(self): try: decoder.decode(ints2octs((6, 255, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'reserved length tolerated' @@ -464,7 +470,7 @@ class RealDecoderTestCase(BaseTestCase): def testTagFormat(self): try: decoder.decode(ints2octs((41, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -472,7 +478,7 @@ class RealDecoderTestCase(BaseTestCase): def testShortEncoding(self): try: decoder.decode(ints2octs((9, 1, 131))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'accepted too-short real' @@ -671,13 +677,13 @@ class SequenceDecoderTestCase(BaseTestCase): def testWithOptionalAndDefaultedDefModeSubst(self): assert decoder.decode( ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), - substrateFun=lambda a, b, c: (b, b[c:]) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) def testWithOptionalAndDefaultedIndefModeSubst(self): assert decoder.decode( ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), - substrateFun=lambda a, b, c: (b, str2octs('')) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs( (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) @@ -686,7 +692,7 @@ class SequenceDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((16, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -868,7 +874,7 @@ class SequenceDecoderWithUntaggedOpenTypesTestCase(BaseTestCase): decodeOpenTypes=True ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: @@ -1007,7 +1013,7 @@ class SequenceDecoderWithUnaggedSetOfOpenTypesTestCase(BaseTestCase): decodeOpenTypes=True ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: @@ -1157,13 +1163,13 @@ class SetDecoderTestCase(BaseTestCase): def testWithOptionalAndDefaultedDefModeSubst(self): assert decoder.decode( ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), - substrateFun=lambda a, b, c: (b, b[c:]) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) def testWithOptionalAndDefaultedIndefModeSubst(self): assert decoder.decode( ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), - substrateFun=lambda a, b, c: (b, str2octs('')) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs( (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) @@ -1172,7 +1178,7 @@ class SetDecoderTestCase(BaseTestCase): decoder.decode( ints2octs((16, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)) ) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'wrong tagFormat worked out' @@ -1489,14 +1495,14 @@ class AnyDecoderTestCase(BaseTestCase): assert decoder.decode( ints2octs((4, 3, 102, 111, 120)), asn1Spec=self.s, - substrateFun=lambda a, b, c: (b, b[c:]) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((4, 3, 102, 111, 120)), str2octs('')) def testTaggedExSubst(self): assert decoder.decode( ints2octs((164, 5, 4, 3, 102, 111, 120)), asn1Spec=self.s, - substrateFun=lambda a, b, c: (b, b[c:]) + substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs('')) @@ -1504,7 +1510,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testUnexpectedEoo(self): try: decoder.decode(ints2octs((0, 0))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted at top level' @@ -1517,7 +1523,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testDefiniteNoEoo(self): try: decoder.decode(ints2octs((0x23, 0x02, 0x00, 0x00))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted inside definite-length encoding' @@ -1529,7 +1535,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testNoLongFormEoo(self): try: decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x81, 0x00))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted with invalid long-form length' @@ -1537,7 +1543,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testNoConstructedEoo(self): try: decoder.decode(ints2octs((0x23, 0x80, 0x20, 0x00))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted with invalid constructed encoding' @@ -1545,7 +1551,7 @@ class EndOfOctetsTestCase(BaseTestCase): def testNoEooData(self): try: decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x01, 0x00))) - except PyAsn1Error: + except error.PyAsn1Error: pass else: assert 0, 'end-of-contents octets accepted with unexpected data' @@ -1568,37 +1574,51 @@ class NonStringDecoderTestCase(BaseTestCase): self.substrate = ints2octs([48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1]) def testOctetString(self): - s, _ = decoder.decode(univ.OctetString(self.substrate), asn1Spec=self.s) - assert self.s == s + s = list(decoder.StreamingDecoder( + univ.OctetString(self.substrate), asn1Spec=self.s)) + assert [self.s] == s def testAny(self): - s, _ = decoder.decode(univ.Any(self.substrate), asn1Spec=self.s) - assert self.s == s + s = list(decoder.StreamingDecoder( + univ.Any(self.substrate), asn1Spec=self.s)) + assert [self.s] == s class ErrorOnDecodingTestCase(BaseTestCase): def testErrorCondition(self): - decode = decoder.Decoder(decoder.tagMap, decoder.typeMap) + decode = decoder.SingleItemDecoder( + tagMap=decoder.TAG_MAP, typeMap=decoder.TYPE_MAP) + substrate = ints2octs((00, 1, 2)) + stream = streaming.asSeekableStream(substrate) try: - asn1Object, rest = decode(str2octs('abc')) + asn1Object = next(decode(stream)) - except PyAsn1Error: + except error.PyAsn1Error: exc = sys.exc_info()[1] - assert isinstance(exc, PyAsn1Error), ( + assert isinstance(exc, error.PyAsn1Error), ( 'Unexpected exception raised %r' % (exc,)) else: assert False, 'Unexpected decoder result %r' % (asn1Object,) def testRawDump(self): - decode = decoder.Decoder(decoder.tagMap, decoder.typeMap) + substrate = ints2octs((31, 8, 2, 1, 1, 131, 3, 2, 1, 12)) + stream = streaming.asSeekableStream(substrate) - decode.defaultErrorState = decoder.stDumpRawValue + class SingleItemEncoder(decoder.SingleItemDecoder): + defaultErrorState = decoder.stDumpRawValue - asn1Object, rest = decode(ints2octs( - (31, 8, 2, 1, 1, 131, 3, 2, 1, 12))) + class StreamingDecoder(decoder.StreamingDecoder): + SINGLE_ITEM_DECODER = SingleItemEncoder + + class OneShotDecoder(decoder.Decoder): + STREAMING_DECODER = StreamingDecoder + + d = OneShotDecoder() + + asn1Object, rest = d(stream) assert isinstance(asn1Object, univ.Any), ( 'Unexpected raw dump type %r' % (asn1Object,)) @@ -1608,6 +1628,218 @@ class ErrorOnDecodingTestCase(BaseTestCase): 'Unexpected rest of substrate after raw dump %r' % rest) +class BinaryFileTestCase(BaseTestCase): + """Assure that decode works on open binary files.""" + def testOneObject(self): + _, path = tempfile.mkstemp() + try: + with open(path, "wb") as out: + out.write(ints2octs((2, 1, 12))) + + with open(path, "rb") as source: + values = list(decoder.StreamingDecoder(source)) + + assert values == [12] + finally: + os.remove(path) + + def testMoreObjects(self): + _, path = tempfile.mkstemp() + try: + with open(path, "wb") as out: + out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))) + + with open(path, "rb") as source: + values = list(decoder.StreamingDecoder(source)) + + assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + + finally: + os.remove(path) + + def testInvalidFileContent(self): + _, path = tempfile.mkstemp() + try: + with open(path, "wb") as out: + out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0, 7))) + + with open(path, "rb") as source: + list(decoder.StreamingDecoder(source)) + + except error.EndOfStreamError: + pass + + finally: + os.remove(path) + + +class BytesIOTestCase(BaseTestCase): + def testRead(self): + source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)) + stream = io.BytesIO(source) + values = list(decoder.StreamingDecoder(stream)) + assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + + +class UnicodeTestCase(BaseTestCase): + def testFail(self): + # This ensures that unicode objects in Python 2 & str objects in Python 3.7 cannot be parsed. + source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)).decode("latin-1") + try: + next(decoder.StreamingDecoder(source)) + + except error.UnsupportedSubstrateError: + pass + + else: + assert False, 'Tolerated parsing broken unicode strings' + + +class RestartableDecoderTestCase(BaseTestCase): + + class NonBlockingStream(io.BytesIO): + block = False + + def read(self, size=-1): + self.block = not self.block + if self.block: + return # this is what non-blocking streams sometimes do + + return io.BytesIO.read(self, size) + + def setUp(self): + BaseTestCase.setUp(self) + + self.s = univ.SequenceOf(componentType=univ.OctetString()) + self.s.setComponentByPosition(0, univ.OctetString('quick brown')) + source = ints2octs( + (48, 26, + 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, + 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110)) + self.stream = self.NonBlockingStream(source) + + def testPartialReadingFromNonBlockingStream(self): + iterator = iter(decoder.StreamingDecoder(self.stream, asn1Spec=self.s)) + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' not in res.context + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' not in res.context + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 0 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 0 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 0 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 1 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 1 + + res = next(iterator) + + assert isinstance(res, error.SubstrateUnderrunError) + assert 'asn1Object' in res.context + assert isinstance(res.context['asn1Object'], univ.SequenceOf) + assert res.context['asn1Object'].isValue + assert len(res.context['asn1Object']) == 1 + + res = next(iterator) + + assert isinstance(res, univ.SequenceOf) + assert res.isValue + assert len(res) == 2 + + try: + next(iterator) + + except StopIteration: + pass + + else: + assert False, 'End of stream not raised' + + +class CompressedFilesTestCase(BaseTestCase): + def testGzip(self): + _, path = tempfile.mkstemp(suffix=".gz") + try: + with gzip.open(path, "wb") as out: + out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))) + + with gzip.open(path, "rb") as source: + values = list(decoder.StreamingDecoder(source)) + + assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + + finally: + os.remove(path) + + def testZipfile(self): + # File from ZIP archive is a good example of non-seekable stream in Python 2.7 + # In Python 3.7, it is a seekable stream. + _, path = tempfile.mkstemp(suffix=".zip") + try: + with zipfile.ZipFile(path, "w") as myzip: + myzip.writestr("data", ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))) + + with zipfile.ZipFile(path, "r") as myzip: + with myzip.open("data", "r") as source: + values = list(decoder.StreamingDecoder(source)) + assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + finally: + os.remove(path) + + def testZipfileMany(self): + _, path = tempfile.mkstemp(suffix=".zip") + try: + with zipfile.ZipFile(path, "w") as myzip: + #for i in range(100): + myzip.writestr("data", ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)) * 1000) + + with zipfile.ZipFile(path, "r") as myzip: + with myzip.open("data", "r") as source: + values = list(decoder.StreamingDecoder(source)) + assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] * 1000 + finally: + os.remove(path) + + suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) if __name__ == '__main__': diff --git a/tests/codec/ber/test_encoder.py b/tests/codec/ber/test_encoder.py index 3ba658a..49eb91d 100644 --- a/tests/codec/ber/test_encoder.py +++ b/tests/codec/ber/test_encoder.py @@ -377,19 +377,19 @@ class RealEncoderTestCase(BaseTestCase): def testBin3(self): # change binEncBase in the RealEncoder instance => for all further Real - binEncBase, encoder.typeMap[univ.Real.typeId].binEncBase = encoder.typeMap[univ.Real.typeId].binEncBase, 16 + binEncBase, encoder.TYPE_MAP[univ.Real.typeId].binEncBase = encoder.TYPE_MAP[univ.Real.typeId].binEncBase, 16 assert encoder.encode( univ.Real((0.00390625, 2, 0)) # check encbase = 16 ) == ints2octs((9, 3, 160, 254, 1)) - encoder.typeMap[univ.Real.typeId].binEncBase = binEncBase + encoder.TYPE_MAP[univ.Real.typeId].binEncBase = binEncBase def testBin4(self): # choose binEncBase automatically for all further Real (testBin[4-7]) - binEncBase, encoder.typeMap[univ.Real.typeId].binEncBase = encoder.typeMap[univ.Real.typeId].binEncBase, None + binEncBase, encoder.TYPE_MAP[univ.Real.typeId].binEncBase = encoder.TYPE_MAP[univ.Real.typeId].binEncBase, None assert encoder.encode( univ.Real((1, 2, 0)) # check exponent = 0 ) == ints2octs((9, 3, 128, 0, 1)) - encoder.typeMap[univ.Real.typeId].binEncBase = binEncBase + encoder.TYPE_MAP[univ.Real.typeId].binEncBase = binEncBase def testBin5(self): assert encoder.encode( diff --git a/tests/codec/cer/test_decoder.py b/tests/codec/cer/test_decoder.py index 47a13eb..ea435e1 100644 --- a/tests/codec/cer/test_decoder.py +++ b/tests/codec/cer/test_decoder.py @@ -37,6 +37,7 @@ class BooleanDecoderTestCase(BaseTestCase): except PyAsn1Error: pass + class BitStringDecoderTestCase(BaseTestCase): def testShortMode(self): assert decoder.decode( diff --git a/tests/codec/cer/test_encoder.py b/tests/codec/cer/test_encoder.py index 9e003a7..4535927 100644 --- a/tests/codec/cer/test_encoder.py +++ b/tests/codec/cer/test_encoder.py @@ -80,7 +80,6 @@ class GeneralizedTimeEncoderTestCase(BaseTestCase): else: assert 0, 'Missing timezone tolerated' - def testDecimalCommaPoint(self): try: assert encoder.encode( diff --git a/tests/codec/test_streaming.py b/tests/codec/test_streaming.py new file mode 100644 index 0000000..c608b11 --- /dev/null +++ b/tests/codec/test_streaming.py @@ -0,0 +1,75 @@ +# +# This file is part of pyasn1 software. +# +# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> +# License: http://snmplabs.com/pyasn1/license.html +# +import io +import sys + +try: + import unittest2 as unittest + +except ImportError: + import unittest + +from tests.base import BaseTestCase + +from pyasn1.codec import streaming + + +class CachingStreamWrapperTestCase(BaseTestCase): + def setUp(self): + self.shortText = b"abcdefghij" + self.longText = self.shortText * (io.DEFAULT_BUFFER_SIZE * 5) + self.shortStream = io.BytesIO(self.shortText) + self.longStream = io.BytesIO(self.longText) + + def testReadJustFromCache(self): + wrapper = streaming.CachingStreamWrapper(self.shortStream) + wrapper.read(6) + wrapper.seek(3) + assert wrapper.read(1) == b"d" + assert wrapper.read(1) == b"e" + assert wrapper.tell() == 5 + + def testReadFromCacheAndStream(self): + wrapper = streaming.CachingStreamWrapper(self.shortStream) + wrapper.read(6) + wrapper.seek(3) + assert wrapper.read(4) == b"defg" + assert wrapper.tell() == 7 + + def testReadJustFromStream(self): + wrapper = streaming.CachingStreamWrapper(self.shortStream) + assert wrapper.read(6) == b"abcdef" + assert wrapper.tell() == 6 + + def testPeek(self): + wrapper = streaming.CachingStreamWrapper(self.longStream) + read_bytes = wrapper.peek(io.DEFAULT_BUFFER_SIZE + 73) + assert len(read_bytes) == io.DEFAULT_BUFFER_SIZE + 73 + assert read_bytes.startswith(b"abcdefg") + assert wrapper.tell() == 0 + assert wrapper.read(4) == b"abcd" + + def testMarkedPositionResets(self): + wrapper = streaming.CachingStreamWrapper(self.longStream) + wrapper.read(10) + wrapper.markedPosition = wrapper.tell() + assert wrapper.markedPosition == 10 + + # Reach the maximum capacity of cache + wrapper.read(io.DEFAULT_BUFFER_SIZE) + assert wrapper.tell() == 10 + io.DEFAULT_BUFFER_SIZE + + # The following should clear the cache + wrapper.markedPosition = wrapper.tell() + assert wrapper.markedPosition == 0 + assert len(wrapper._cache.getvalue()) == 0 + + +suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) + +if __name__ == '__main__': + unittest.TextTestRunner(verbosity=2).run(suite) |