summaryrefslogtreecommitdiff
path: root/tests/codec/ber/test_decoder.py
diff options
context:
space:
mode:
authorIlya Etingof <etingof@gmail.com>2019-09-14 18:46:08 +0200
committerIlya Etingof <etingof@gmail.com>2019-11-15 19:39:11 +0100
commit93e11a2dfded950827ba3393b5a4562270a766da (patch)
treecddf49a1daa1258d91f348bad30eabc662241bd8 /tests/codec/ber/test_decoder.py
parentfdd0bd66f66e5f7832287ff0994f4219632935a7 (diff)
downloadpyasn1-git-93e11a2dfded950827ba3393b5a4562270a766da.tar.gz
Refactor BER decoder into a suspendable coroutine
The goal of this change is to make the decoder stopping on input data starvation and resuming from where it stopped whenever the caller decides to try again (hopefully making sure that some more input becomes available). This change makes it possible for the decoder to operate on streams of data (meaning that the entire DER blob might not be immediately available on input). On top of that, the decoder yields partially reconstructed ASN.1 object on input starvation making it possible for the caller to inspect what has been decoded so far and possibly consume partial ASN.1 data. All these new feature are natively available through `StreamingDecoder` class. Previously published API is implemented as a thin wrapper on top of that ensuring backward compatibility.
Diffstat (limited to 'tests/codec/ber/test_decoder.py')
-rw-r--r--tests/codec/ber/test_decoder.py424
1 files changed, 241 insertions, 183 deletions
diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py
index e72e025..2430ff4 100644
--- a/tests/codec/ber/test_decoder.py
+++ b/tests/codec/ber/test_decoder.py
@@ -23,10 +23,11 @@ from pyasn1.type import namedtype
from pyasn1.type import opentype
from pyasn1.type import univ
from pyasn1.type import char
+from pyasn1.codec import streaming
from pyasn1.codec.ber import decoder
from pyasn1.codec.ber import eoo
from pyasn1.compat.octets import ints2octs, str2octs, null
-from pyasn1.error import PyAsn1Error, SubstrateUnderrunError, UnsupportedSubstrateError
+from pyasn1 import error
class LargeTagDecoderTestCase(BaseTestCase):
@@ -78,7 +79,7 @@ class IntegerDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((2, 1, 12)), asn1Spec=univ.Null()
) == (12, null)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong asn1Spec worked out'
@@ -89,7 +90,7 @@ class IntegerDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((34, 1, 12)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -111,7 +112,7 @@ class BooleanDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((33, 1, 1)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -138,24 +139,22 @@ class BitStringDecoderTestCase(BaseTestCase):
ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))
) == ((1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1), null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testDefModeChunkedSubst(self):
- # assert decoder.decode(
- # ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs(''))
+ def testDefModeChunkedSubst(self):
+ assert decoder.decode(
+ ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs(''))
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testIndefModeChunkedSubst(self):
- # assert decoder.decode(
- # ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs(''))
+ def testIndefModeChunkedSubst(self):
+ assert decoder.decode(
+ ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs(''))
def testTypeChecking(self):
try:
decoder.decode(ints2octs((35, 4, 2, 2, 42, 42)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'accepted mis-encoded bit-string constructed out of an integer'
@@ -183,22 +182,20 @@ class OctetStringDecoderTestCase(BaseTestCase):
ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0))
) == (str2octs('Quick brown fox'), null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testDefModeChunkedSubst(self):
- # assert decoder.decode(
- # ints2octs(
- # (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs(''))
+ def testDefModeChunkedSubst(self):
+ assert decoder.decode(
+ ints2octs(
+ (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs(''))
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testIndefModeChunkedSubst(self):
- # assert decoder.decode(
- # ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111,
- # 120, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs(
- # (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs(''))
+ def testIndefModeChunkedSubst(self):
+ assert decoder.decode(
+ ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111,
+ 120, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs(
+ (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs(''))
class ExpTaggedOctetStringDecoderTestCase(BaseTestCase):
@@ -246,22 +243,20 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase):
assert self.o.tagSet == o.tagSet
assert self.o.isSameTypeWith(o)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testDefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs(''))
+ def testDefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs(''))
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testIndefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((
- # 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0,
- # 0, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs(
- # (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs(''))
+ def testIndefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((
+ 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0,
+ 0, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs(
+ (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs(''))
class NullDecoderTestCase(BaseTestCase):
@@ -271,7 +266,7 @@ class NullDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((37, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -340,7 +335,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((6, 5, 85, 4, 128, 129, 0))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'
@@ -350,7 +345,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((6, 7, 1, 0x80, 0x80, 0x80, 0x80, 0x80, 0x7F))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'
@@ -360,7 +355,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((6, 2, 0x80, 1))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'
@@ -370,7 +365,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((6, 2, 0x80, 0x7F))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'
@@ -378,7 +373,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((38, 1, 239)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -386,7 +381,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
def testZeroLength(self):
try:
decoder.decode(ints2octs((6, 0, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'zero length tolerated'
@@ -394,7 +389,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
def testIndefiniteLength(self):
try:
decoder.decode(ints2octs((6, 128, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'indefinite length tolerated'
@@ -402,7 +397,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
def testReservedLength(self):
try:
decoder.decode(ints2octs((6, 255, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'reserved length tolerated'
@@ -479,7 +474,7 @@ class RealDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((41, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -487,7 +482,7 @@ class RealDecoderTestCase(BaseTestCase):
def testShortEncoding(self):
try:
decoder.decode(ints2octs((9, 1, 131)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'accepted too-short real'
@@ -684,27 +679,25 @@ class SequenceDecoderTestCase(BaseTestCase):
ints2octs((48, 128, 5, 0, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0))
) == (self.s, null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testWithOptionalAndDefaultedDefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
-
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testWithOptionalAndDefaultedIndefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs(
- # (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
+ def testWithOptionalAndDefaultedDefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
+
+ def testWithOptionalAndDefaultedIndefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs(
+ (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
def testTagFormat(self):
try:
decoder.decode(
ints2octs((16, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -886,7 +879,7 @@ class SequenceDecoderWithUntaggedOpenTypesTestCase(BaseTestCase):
decodeOpenTypes=True
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
@@ -1025,7 +1018,7 @@ class SequenceDecoderWithUnaggedSetOfOpenTypesTestCase(BaseTestCase):
decodeOpenTypes=True
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
@@ -1172,27 +1165,25 @@ class SetDecoderTestCase(BaseTestCase):
ints2octs((49, 128, 5, 0, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0))
) == (self.s, null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testWithOptionalAndDefaultedDefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
-
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testWithOptionalAndDefaultedIndefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs(
- # (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
+ def testWithOptionalAndDefaultedDefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
+
+ def testWithOptionalAndDefaultedIndefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs(
+ (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
def testTagFormat(self):
try:
decoder.decode(
ints2octs((16, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -1505,28 +1496,26 @@ class AnyDecoderTestCase(BaseTestCase):
s = univ.Any('\004\003fox').subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4))
assert decoder.decode(ints2octs((164, 128, 4, 3, 102, 111, 120, 0, 0)), asn1Spec=s) == (s, null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testByUntaggedSubst(self):
- # assert decoder.decode(
- # ints2octs((4, 3, 102, 111, 120)),
- # asn1Spec=self.s,
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((4, 3, 102, 111, 120)), str2octs(''))
+ def testByUntaggedSubst(self):
+ assert decoder.decode(
+ ints2octs((4, 3, 102, 111, 120)),
+ asn1Spec=self.s,
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((4, 3, 102, 111, 120)), str2octs(''))
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testTaggedExSubst(self):
- # assert decoder.decode(
- # ints2octs((164, 5, 4, 3, 102, 111, 120)),
- # asn1Spec=self.s,
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs(''))
+ def testTaggedExSubst(self):
+ assert decoder.decode(
+ ints2octs((164, 5, 4, 3, 102, 111, 120)),
+ asn1Spec=self.s,
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs(''))
class EndOfOctetsTestCase(BaseTestCase):
def testUnexpectedEoo(self):
try:
decoder.decode(ints2octs((0, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted at top level'
@@ -1539,7 +1528,7 @@ class EndOfOctetsTestCase(BaseTestCase):
def testDefiniteNoEoo(self):
try:
decoder.decode(ints2octs((0x23, 0x02, 0x00, 0x00)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted inside definite-length encoding'
@@ -1551,7 +1540,7 @@ class EndOfOctetsTestCase(BaseTestCase):
def testNoLongFormEoo(self):
try:
decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x81, 0x00)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted with invalid long-form length'
@@ -1559,7 +1548,7 @@ class EndOfOctetsTestCase(BaseTestCase):
def testNoConstructedEoo(self):
try:
decoder.decode(ints2octs((0x23, 0x80, 0x20, 0x00)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted with invalid constructed encoding'
@@ -1567,7 +1556,7 @@ class EndOfOctetsTestCase(BaseTestCase):
def testNoEooData(self):
try:
decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x01, 0x00)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted with unexpected data'
@@ -1590,41 +1579,50 @@ class NonStringDecoderTestCase(BaseTestCase):
self.substrate = ints2octs([48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1])
def testOctetString(self):
- s = list(decoder.decodeStream(univ.OctetString(self.substrate), asn1Spec=self.s))
+ s = list(decoder.StreamingDecoder(
+ univ.OctetString(self.substrate), asn1Spec=self.s))
assert [self.s] == s
def testAny(self):
- s = list(decoder.decodeStream(univ.Any(self.substrate), asn1Spec=self.s))
+ s = list(decoder.StreamingDecoder(
+ univ.Any(self.substrate), asn1Spec=self.s))
assert [self.s] == s
class ErrorOnDecodingTestCase(BaseTestCase):
def testErrorCondition(self):
- decode = decoder.Decoder(decoder.tagMap, decoder.typeMap)
- substrate = b'abc'
- stream = decoder._asSeekableStream(substrate)
+ decode = decoder.SingleItemDecoder(decoder.TAG_MAP, decoder.TYPE_MAP)
+ substrate = ints2octs((00, 1, 2))
+ stream = streaming.asSeekableStream(substrate)
try:
- asn1Object = decode(stream)
+ asn1Object = next(decode(stream))
- except PyAsn1Error:
+ except error.PyAsn1Error:
exc = sys.exc_info()[1]
- assert isinstance(exc, PyAsn1Error), (
+ assert isinstance(exc, error.PyAsn1Error), (
'Unexpected exception raised %r' % (exc,))
else:
assert False, 'Unexpected decoder result %r' % (asn1Object,)
def testRawDump(self):
- decode = decoder.Decoder(decoder.tagMap, decoder.typeMap)
substrate = ints2octs((31, 8, 2, 1, 1, 131, 3, 2, 1, 12))
- stream = decoder._asSeekableStream(substrate, )
+ stream = streaming.asSeekableStream(substrate)
+
+ class StateMachine(decoder.SingleItemDecoder):
+ defaultErrorState = decoder.stDumpRawValue
- decode.defaultErrorState = decoder.stDumpRawValue
+ class StreamingDecoder(decoder.StreamingDecoder):
+ SINGLE_ITEM_DECODER = StateMachine
- asn1Object = decode(stream)
- rest = stream.read()
+ class OneShotDecoder(decoder.Decoder):
+ STREAMING_DECODER = StreamingDecoder
+
+ d = OneShotDecoder()
+
+ asn1Object, rest = d(stream)
assert isinstance(asn1Object, univ.Any), (
'Unexpected raw dump type %r' % (asn1Object,))
@@ -1643,7 +1641,7 @@ class BinaryFileTestCase(BaseTestCase):
out.write(ints2octs((2, 1, 12)))
with open(path, "rb") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12]
finally:
@@ -1656,9 +1654,10 @@ class BinaryFileTestCase(BaseTestCase):
out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)))
with open(path, "rb") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)]
+
finally:
os.remove(path)
@@ -1669,8 +1668,11 @@ class BinaryFileTestCase(BaseTestCase):
out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0, 7)))
with open(path, "rb") as source:
- with self.assertRaises(SubstrateUnderrunError):
- _ = list(decoder.decodeStream(source))
+ list(decoder.StreamingDecoder(source))
+
+ except error.EndOfStreamError:
+ pass
+
finally:
os.remove(path)
@@ -1679,7 +1681,7 @@ class BytesIOTestCase(BaseTestCase):
def testRead(self):
source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))
stream = io.BytesIO(source)
- values = list(decoder.decodeStream(stream))
+ values = list(decoder.StreamingDecoder(stream))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)]
@@ -1687,8 +1689,114 @@ class UnicodeTestCase(BaseTestCase):
def testFail(self):
# This ensures that unicode objects in Python 2 & str objects in Python 3.7 cannot be parsed.
source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)).decode("latin-1")
- with self.assertRaises(UnsupportedSubstrateError):
- _ = next(decoder.decodeStream(source))
+ try:
+ next(decoder.StreamingDecoder(source))
+
+ except error.UnsupportedSubstrateError:
+ pass
+
+ else:
+ assert False, 'Tolerated parsing broken unicode strings'
+
+
+class RestartableDecoderTestCase(BaseTestCase):
+
+ class NonBlockingStream(io.BytesIO):
+ block = False
+
+ def read(self, size=-1):
+ self.block = not self.block
+ if self.block:
+ return # this is what non-blocking streams sometimes do
+
+ return io.BytesIO.read(self, size)
+
+ def setUp(self):
+ BaseTestCase.setUp(self)
+
+ self.s = univ.SequenceOf(componentType=univ.OctetString())
+ self.s.setComponentByPosition(0, univ.OctetString('quick brown'))
+ source = ints2octs(
+ (48, 26,
+ 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110,
+ 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110))
+ self.stream = self.NonBlockingStream(source)
+
+ def testPartialReadingFromNonBlockingStream(self):
+ iterator = iter(decoder.StreamingDecoder(self.stream, asn1Spec=self.s))
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' not in res.context
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' not in res.context
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 0
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 0
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 0
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 1
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 1
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 1
+
+ res = next(iterator)
+
+ assert isinstance(res, univ.SequenceOf)
+ assert res.isValue
+ assert len(res) == 2
+
+ try:
+ next(iterator)
+
+ except StopIteration:
+ pass
+
+ else:
+ assert False, 'End of stream not raised'
class CompressedFilesTestCase(BaseTestCase):
@@ -1699,9 +1807,10 @@ class CompressedFilesTestCase(BaseTestCase):
out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)))
with gzip.open(path, "rb") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)]
+
finally:
os.remove(path)
@@ -1715,7 +1824,7 @@ class CompressedFilesTestCase(BaseTestCase):
with zipfile.ZipFile(path, "r") as myzip:
with myzip.open("data", "r") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)]
finally:
os.remove(path)
@@ -1729,63 +1838,12 @@ class CompressedFilesTestCase(BaseTestCase):
with zipfile.ZipFile(path, "r") as myzip:
with myzip.open("data", "r") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] * 1000
finally:
os.remove(path)
-class CachingStreamWrapperTestCase(BaseTestCase):
- def setUp(self):
- self.shortText = b"abcdefghij"
- self.longText = self.shortText * (io.DEFAULT_BUFFER_SIZE * 5)
- self.shortStream = io.BytesIO(self.shortText)
- self.longStream = io.BytesIO(self.longText)
-
- def testReadJustFromCache(self):
- wrapper = decoder._CachingStreamWrapper(self.shortStream)
- wrapper.read(6)
- wrapper.seek(3)
- assert wrapper.read(1) == b"d"
- assert wrapper.read(1) == b"e"
- assert wrapper.tell() == 5
-
- def testReadFromCacheAndStream(self):
- wrapper = decoder._CachingStreamWrapper(self.shortStream)
- wrapper.read(6)
- wrapper.seek(3)
- assert wrapper.read(4) == b"defg"
- assert wrapper.tell() == 7
-
- def testReadJustFromStream(self):
- wrapper = decoder._CachingStreamWrapper(self.shortStream)
- assert wrapper.read(6) == b"abcdef"
- assert wrapper.tell() == 6
-
- def testPeek(self):
- wrapper = decoder._CachingStreamWrapper(self.longStream)
- read_bytes = wrapper.peek(io.DEFAULT_BUFFER_SIZE + 73)
- assert len(read_bytes) == io.DEFAULT_BUFFER_SIZE + 73
- assert read_bytes.startswith(b"abcdefg")
- assert wrapper.tell() == 0
- assert wrapper.read(4) == b"abcd"
-
- def testMarkedPositionResets(self):
- wrapper = decoder._CachingStreamWrapper(self.longStream)
- wrapper.read(10)
- wrapper._markedPosition = wrapper.tell()
- assert wrapper._markedPosition == 10
-
- # Reach the maximum capacity of cache
- wrapper.read(io.DEFAULT_BUFFER_SIZE)
- assert wrapper.tell() == 10 + io.DEFAULT_BUFFER_SIZE
-
- # The following should clear the cache
- wrapper._markedPosition = wrapper.tell()
- assert wrapper._markedPosition == 0
- assert len(wrapper._cache.getvalue()) == 0
-
-
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
if __name__ == '__main__':