summaryrefslogtreecommitdiff
path: root/tests/codec/ber/test_decoder.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/codec/ber/test_decoder.py')
-rw-r--r--tests/codec/ber/test_decoder.py424
1 files changed, 241 insertions, 183 deletions
diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py
index e72e025..2430ff4 100644
--- a/tests/codec/ber/test_decoder.py
+++ b/tests/codec/ber/test_decoder.py
@@ -23,10 +23,11 @@ from pyasn1.type import namedtype
from pyasn1.type import opentype
from pyasn1.type import univ
from pyasn1.type import char
+from pyasn1.codec import streaming
from pyasn1.codec.ber import decoder
from pyasn1.codec.ber import eoo
from pyasn1.compat.octets import ints2octs, str2octs, null
-from pyasn1.error import PyAsn1Error, SubstrateUnderrunError, UnsupportedSubstrateError
+from pyasn1 import error
class LargeTagDecoderTestCase(BaseTestCase):
@@ -78,7 +79,7 @@ class IntegerDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((2, 1, 12)), asn1Spec=univ.Null()
) == (12, null)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong asn1Spec worked out'
@@ -89,7 +90,7 @@ class IntegerDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((34, 1, 12)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -111,7 +112,7 @@ class BooleanDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((33, 1, 1)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -138,24 +139,22 @@ class BitStringDecoderTestCase(BaseTestCase):
ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))
) == ((1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1), null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testDefModeChunkedSubst(self):
- # assert decoder.decode(
- # ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs(''))
+ def testDefModeChunkedSubst(self):
+ assert decoder.decode(
+ ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs(''))
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testIndefModeChunkedSubst(self):
- # assert decoder.decode(
- # ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs(''))
+ def testIndefModeChunkedSubst(self):
+ assert decoder.decode(
+ ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs(''))
def testTypeChecking(self):
try:
decoder.decode(ints2octs((35, 4, 2, 2, 42, 42)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'accepted mis-encoded bit-string constructed out of an integer'
@@ -183,22 +182,20 @@ class OctetStringDecoderTestCase(BaseTestCase):
ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0))
) == (str2octs('Quick brown fox'), null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testDefModeChunkedSubst(self):
- # assert decoder.decode(
- # ints2octs(
- # (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs(''))
+ def testDefModeChunkedSubst(self):
+ assert decoder.decode(
+ ints2octs(
+ (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs(''))
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testIndefModeChunkedSubst(self):
- # assert decoder.decode(
- # ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111,
- # 120, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs(
- # (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs(''))
+ def testIndefModeChunkedSubst(self):
+ assert decoder.decode(
+ ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111,
+ 120, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs(
+ (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs(''))
class ExpTaggedOctetStringDecoderTestCase(BaseTestCase):
@@ -246,22 +243,20 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase):
assert self.o.tagSet == o.tagSet
assert self.o.isSameTypeWith(o)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testDefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs(''))
+ def testDefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs(''))
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testIndefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((
- # 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0,
- # 0, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs(
- # (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs(''))
+ def testIndefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((
+ 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0,
+ 0, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs(
+ (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs(''))
class NullDecoderTestCase(BaseTestCase):
@@ -271,7 +266,7 @@ class NullDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((37, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -340,7 +335,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((6, 5, 85, 4, 128, 129, 0))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'
@@ -350,7 +345,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((6, 7, 1, 0x80, 0x80, 0x80, 0x80, 0x80, 0x7F))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'
@@ -360,7 +355,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((6, 2, 0x80, 1))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'
@@ -370,7 +365,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
decoder.decode(
ints2octs((6, 2, 0x80, 0x7F))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'Leading 0x80 tolerated'
@@ -378,7 +373,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((38, 1, 239)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -386,7 +381,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
def testZeroLength(self):
try:
decoder.decode(ints2octs((6, 0, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'zero length tolerated'
@@ -394,7 +389,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
def testIndefiniteLength(self):
try:
decoder.decode(ints2octs((6, 128, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'indefinite length tolerated'
@@ -402,7 +397,7 @@ class ObjectIdentifierDecoderTestCase(BaseTestCase):
def testReservedLength(self):
try:
decoder.decode(ints2octs((6, 255, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'reserved length tolerated'
@@ -479,7 +474,7 @@ class RealDecoderTestCase(BaseTestCase):
def testTagFormat(self):
try:
decoder.decode(ints2octs((41, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -487,7 +482,7 @@ class RealDecoderTestCase(BaseTestCase):
def testShortEncoding(self):
try:
decoder.decode(ints2octs((9, 1, 131)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'accepted too-short real'
@@ -684,27 +679,25 @@ class SequenceDecoderTestCase(BaseTestCase):
ints2octs((48, 128, 5, 0, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0))
) == (self.s, null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testWithOptionalAndDefaultedDefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
-
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testWithOptionalAndDefaultedIndefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs(
- # (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
+ def testWithOptionalAndDefaultedDefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
+
+ def testWithOptionalAndDefaultedIndefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs(
+ (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
def testTagFormat(self):
try:
decoder.decode(
ints2octs((16, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -886,7 +879,7 @@ class SequenceDecoderWithUntaggedOpenTypesTestCase(BaseTestCase):
decodeOpenTypes=True
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
@@ -1025,7 +1018,7 @@ class SequenceDecoderWithUnaggedSetOfOpenTypesTestCase(BaseTestCase):
decodeOpenTypes=True
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
@@ -1172,27 +1165,25 @@ class SetDecoderTestCase(BaseTestCase):
ints2octs((49, 128, 5, 0, 36, 128, 4, 4, 113, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 3, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0))
) == (self.s, null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testWithOptionalAndDefaultedDefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
-
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testWithOptionalAndDefaultedIndefModeSubst(self):
- # assert decoder.decode(
- # ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
- # substrateFun=lambda a, b, c: (b, str2octs(''))
- # ) == (ints2octs(
- # (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
+ def testWithOptionalAndDefaultedDefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
+
+ def testWithOptionalAndDefaultedIndefModeSubst(self):
+ assert decoder.decode(
+ ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs(
+ (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
def testTagFormat(self):
try:
decoder.decode(
ints2octs((16, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1))
)
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'wrong tagFormat worked out'
@@ -1505,28 +1496,26 @@ class AnyDecoderTestCase(BaseTestCase):
s = univ.Any('\004\003fox').subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 4))
assert decoder.decode(ints2octs((164, 128, 4, 3, 102, 111, 120, 0, 0)), asn1Spec=s) == (s, null)
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testByUntaggedSubst(self):
- # assert decoder.decode(
- # ints2octs((4, 3, 102, 111, 120)),
- # asn1Spec=self.s,
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((4, 3, 102, 111, 120)), str2octs(''))
+ def testByUntaggedSubst(self):
+ assert decoder.decode(
+ ints2octs((4, 3, 102, 111, 120)),
+ asn1Spec=self.s,
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((4, 3, 102, 111, 120)), str2octs(''))
- # TODO: Not clear how to deal with substrateFun in stream implementation
- # def testTaggedExSubst(self):
- # assert decoder.decode(
- # ints2octs((164, 5, 4, 3, 102, 111, 120)),
- # asn1Spec=self.s,
- # substrateFun=lambda a, b, c: (b, b[c:])
- # ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs(''))
+ def testTaggedExSubst(self):
+ assert decoder.decode(
+ ints2octs((164, 5, 4, 3, 102, 111, 120)),
+ asn1Spec=self.s,
+ substrateFun=lambda a, b, c, d: streaming.read(b, c)
+ ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs(''))
class EndOfOctetsTestCase(BaseTestCase):
def testUnexpectedEoo(self):
try:
decoder.decode(ints2octs((0, 0)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted at top level'
@@ -1539,7 +1528,7 @@ class EndOfOctetsTestCase(BaseTestCase):
def testDefiniteNoEoo(self):
try:
decoder.decode(ints2octs((0x23, 0x02, 0x00, 0x00)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted inside definite-length encoding'
@@ -1551,7 +1540,7 @@ class EndOfOctetsTestCase(BaseTestCase):
def testNoLongFormEoo(self):
try:
decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x81, 0x00)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted with invalid long-form length'
@@ -1559,7 +1548,7 @@ class EndOfOctetsTestCase(BaseTestCase):
def testNoConstructedEoo(self):
try:
decoder.decode(ints2octs((0x23, 0x80, 0x20, 0x00)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted with invalid constructed encoding'
@@ -1567,7 +1556,7 @@ class EndOfOctetsTestCase(BaseTestCase):
def testNoEooData(self):
try:
decoder.decode(ints2octs((0x23, 0x80, 0x00, 0x01, 0x00)))
- except PyAsn1Error:
+ except error.PyAsn1Error:
pass
else:
assert 0, 'end-of-contents octets accepted with unexpected data'
@@ -1590,41 +1579,50 @@ class NonStringDecoderTestCase(BaseTestCase):
self.substrate = ints2octs([48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1])
def testOctetString(self):
- s = list(decoder.decodeStream(univ.OctetString(self.substrate), asn1Spec=self.s))
+ s = list(decoder.StreamingDecoder(
+ univ.OctetString(self.substrate), asn1Spec=self.s))
assert [self.s] == s
def testAny(self):
- s = list(decoder.decodeStream(univ.Any(self.substrate), asn1Spec=self.s))
+ s = list(decoder.StreamingDecoder(
+ univ.Any(self.substrate), asn1Spec=self.s))
assert [self.s] == s
class ErrorOnDecodingTestCase(BaseTestCase):
def testErrorCondition(self):
- decode = decoder.Decoder(decoder.tagMap, decoder.typeMap)
- substrate = b'abc'
- stream = decoder._asSeekableStream(substrate)
+ decode = decoder.SingleItemDecoder(decoder.TAG_MAP, decoder.TYPE_MAP)
+ substrate = ints2octs((00, 1, 2))
+ stream = streaming.asSeekableStream(substrate)
try:
- asn1Object = decode(stream)
+ asn1Object = next(decode(stream))
- except PyAsn1Error:
+ except error.PyAsn1Error:
exc = sys.exc_info()[1]
- assert isinstance(exc, PyAsn1Error), (
+ assert isinstance(exc, error.PyAsn1Error), (
'Unexpected exception raised %r' % (exc,))
else:
assert False, 'Unexpected decoder result %r' % (asn1Object,)
def testRawDump(self):
- decode = decoder.Decoder(decoder.tagMap, decoder.typeMap)
substrate = ints2octs((31, 8, 2, 1, 1, 131, 3, 2, 1, 12))
- stream = decoder._asSeekableStream(substrate, )
+ stream = streaming.asSeekableStream(substrate)
+
+ class StateMachine(decoder.SingleItemDecoder):
+ defaultErrorState = decoder.stDumpRawValue
- decode.defaultErrorState = decoder.stDumpRawValue
+ class StreamingDecoder(decoder.StreamingDecoder):
+ SINGLE_ITEM_DECODER = StateMachine
- asn1Object = decode(stream)
- rest = stream.read()
+ class OneShotDecoder(decoder.Decoder):
+ STREAMING_DECODER = StreamingDecoder
+
+ d = OneShotDecoder()
+
+ asn1Object, rest = d(stream)
assert isinstance(asn1Object, univ.Any), (
'Unexpected raw dump type %r' % (asn1Object,))
@@ -1643,7 +1641,7 @@ class BinaryFileTestCase(BaseTestCase):
out.write(ints2octs((2, 1, 12)))
with open(path, "rb") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12]
finally:
@@ -1656,9 +1654,10 @@ class BinaryFileTestCase(BaseTestCase):
out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)))
with open(path, "rb") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)]
+
finally:
os.remove(path)
@@ -1669,8 +1668,11 @@ class BinaryFileTestCase(BaseTestCase):
out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0, 7)))
with open(path, "rb") as source:
- with self.assertRaises(SubstrateUnderrunError):
- _ = list(decoder.decodeStream(source))
+ list(decoder.StreamingDecoder(source))
+
+ except error.EndOfStreamError:
+ pass
+
finally:
os.remove(path)
@@ -1679,7 +1681,7 @@ class BytesIOTestCase(BaseTestCase):
def testRead(self):
source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0))
stream = io.BytesIO(source)
- values = list(decoder.decodeStream(stream))
+ values = list(decoder.StreamingDecoder(stream))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)]
@@ -1687,8 +1689,114 @@ class UnicodeTestCase(BaseTestCase):
def testFail(self):
# This ensures that unicode objects in Python 2 & str objects in Python 3.7 cannot be parsed.
source = ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)).decode("latin-1")
- with self.assertRaises(UnsupportedSubstrateError):
- _ = next(decoder.decodeStream(source))
+ try:
+ next(decoder.StreamingDecoder(source))
+
+ except error.UnsupportedSubstrateError:
+ pass
+
+ else:
+ assert False, 'Tolerated parsing broken unicode strings'
+
+
+class RestartableDecoderTestCase(BaseTestCase):
+
+ class NonBlockingStream(io.BytesIO):
+ block = False
+
+ def read(self, size=-1):
+ self.block = not self.block
+ if self.block:
+ return # this is what non-blocking streams sometimes do
+
+ return io.BytesIO.read(self, size)
+
+ def setUp(self):
+ BaseTestCase.setUp(self)
+
+ self.s = univ.SequenceOf(componentType=univ.OctetString())
+ self.s.setComponentByPosition(0, univ.OctetString('quick brown'))
+ source = ints2octs(
+ (48, 26,
+ 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110,
+ 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110))
+ self.stream = self.NonBlockingStream(source)
+
+ def testPartialReadingFromNonBlockingStream(self):
+ iterator = iter(decoder.StreamingDecoder(self.stream, asn1Spec=self.s))
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' not in res.context
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' not in res.context
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 0
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 0
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 0
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 1
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 1
+
+ res = next(iterator)
+
+ assert isinstance(res, error.SubstrateUnderrunError)
+ assert 'asn1Object' in res.context
+ assert isinstance(res.context['asn1Object'], univ.SequenceOf)
+ assert res.context['asn1Object'].isValue
+ assert len(res.context['asn1Object']) == 1
+
+ res = next(iterator)
+
+ assert isinstance(res, univ.SequenceOf)
+ assert res.isValue
+ assert len(res) == 2
+
+ try:
+ next(iterator)
+
+ except StopIteration:
+ pass
+
+ else:
+ assert False, 'End of stream not raised'
class CompressedFilesTestCase(BaseTestCase):
@@ -1699,9 +1807,10 @@ class CompressedFilesTestCase(BaseTestCase):
out.write(ints2octs((2, 1, 12, 35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)))
with gzip.open(path, "rb") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)]
+
finally:
os.remove(path)
@@ -1715,7 +1824,7 @@ class CompressedFilesTestCase(BaseTestCase):
with zipfile.ZipFile(path, "r") as myzip:
with myzip.open("data", "r") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)]
finally:
os.remove(path)
@@ -1729,63 +1838,12 @@ class CompressedFilesTestCase(BaseTestCase):
with zipfile.ZipFile(path, "r") as myzip:
with myzip.open("data", "r") as source:
- values = list(decoder.decodeStream(source))
+ values = list(decoder.StreamingDecoder(source))
assert values == [12, (1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1)] * 1000
finally:
os.remove(path)
-class CachingStreamWrapperTestCase(BaseTestCase):
- def setUp(self):
- self.shortText = b"abcdefghij"
- self.longText = self.shortText * (io.DEFAULT_BUFFER_SIZE * 5)
- self.shortStream = io.BytesIO(self.shortText)
- self.longStream = io.BytesIO(self.longText)
-
- def testReadJustFromCache(self):
- wrapper = decoder._CachingStreamWrapper(self.shortStream)
- wrapper.read(6)
- wrapper.seek(3)
- assert wrapper.read(1) == b"d"
- assert wrapper.read(1) == b"e"
- assert wrapper.tell() == 5
-
- def testReadFromCacheAndStream(self):
- wrapper = decoder._CachingStreamWrapper(self.shortStream)
- wrapper.read(6)
- wrapper.seek(3)
- assert wrapper.read(4) == b"defg"
- assert wrapper.tell() == 7
-
- def testReadJustFromStream(self):
- wrapper = decoder._CachingStreamWrapper(self.shortStream)
- assert wrapper.read(6) == b"abcdef"
- assert wrapper.tell() == 6
-
- def testPeek(self):
- wrapper = decoder._CachingStreamWrapper(self.longStream)
- read_bytes = wrapper.peek(io.DEFAULT_BUFFER_SIZE + 73)
- assert len(read_bytes) == io.DEFAULT_BUFFER_SIZE + 73
- assert read_bytes.startswith(b"abcdefg")
- assert wrapper.tell() == 0
- assert wrapper.read(4) == b"abcd"
-
- def testMarkedPositionResets(self):
- wrapper = decoder._CachingStreamWrapper(self.longStream)
- wrapper.read(10)
- wrapper._markedPosition = wrapper.tell()
- assert wrapper._markedPosition == 10
-
- # Reach the maximum capacity of cache
- wrapper.read(io.DEFAULT_BUFFER_SIZE)
- assert wrapper.tell() == 10 + io.DEFAULT_BUFFER_SIZE
-
- # The following should clear the cache
- wrapper._markedPosition = wrapper.tell()
- assert wrapper._markedPosition == 0
- assert len(wrapper._cache.getvalue()) == 0
-
-
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
if __name__ == '__main__':