From a462fec429b751fa1cb39da6d5a6781ad9ec0d0d Mon Sep 17 00:00:00 2001 From: Jan Pipek Date: Tue, 10 Sep 2019 13:15:28 +0200 Subject: Update tests with more streams for ber.decoder --- tests/codec/ber/test_decoder.py | 55 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py index aee69a8..0686c6d 100644 --- a/tests/codec/ber/test_decoder.py +++ b/tests/codec/ber/test_decoder.py @@ -4,10 +4,12 @@ # Copyright (c) 2005-2019, Ilya Etingof # License: http://snmplabs.com/pyasn1/license.html # +import gzip import io import os import sys import tempfile +import zipfile try: import unittest2 as unittest @@ -24,7 +26,7 @@ from pyasn1.type import char from pyasn1.codec.ber import decoder from pyasn1.codec.ber import eoo from pyasn1.compat.octets import ints2octs, str2octs, null -from pyasn1.error import PyAsn1Error, SubstrateUnderrunError +from pyasn1.error import PyAsn1Error, SubstrateUnderrunError, UnsupportedSubstrateError class LargeTagDecoderTestCase(BaseTestCase): @@ -1666,7 +1668,6 @@ class BinaryFileTestCase(BaseTestCase): with open(path, "wb") as out: out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0, 7))) - with open(path, "rb") as source: with self.assertRaises(SubstrateUnderrunError): _ = list(decoder.decodeStream(source)) @@ -1674,6 +1675,56 @@ class BinaryFileTestCase(BaseTestCase): os.remove(path) +class BytesIOTestCase(BaseTestCase): + def testRead(self): + source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)) + stream = io.BytesIO(source) + values = list(decoder.decodeStream(stream)) + assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + + +class UnicodeTestCase(BaseTestCase): + def testFail(self): + # This ensures that unicode objects in Python 2 & str objects in Python 3.7 cannot be parsed. + source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)).decode("latin-1") + with self.assertRaises(UnsupportedSubstrateError): + _ = next(decoder.decodeStream(source)) + + +class CompressedFilesTestCase(BaseTestCase): + def testGzip(self): + _, path = tempfile.mkstemp(suffix=".gz") + try: + with gzip.open(path, "wb") as out: + out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))) + + with gzip.open(path, "rb") as source: + values = list(decoder.decodeStream(source)) + + assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + finally: + os.remove(path) + + def testZipfile(self): + # File from ZIP archive is a good example of non-seekable stream in Python 2.7 + # In Python 3.7, it is a seekable stream. + _, path = tempfile.mkstemp(suffix=".zip") + try: + with zipfile.ZipFile(path, "w") as myzip: + myzip.writestr("data", ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))) + + with zipfile.ZipFile(path, "r") as myzip: + with myzip.open("data", "r") as source: + if sys.version_info < (3,): + with self.assertRaises(UnsupportedSubstrateError): + _ = list(decoder.decodeStream(source)) + else: + values = list(decoder.decodeStream(source)) + assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] + finally: + os.remove(path) + + suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) if __name__ == '__main__': -- cgit v1.2.1