summaryrefslogtreecommitdiff
path: root/pyasn1/codec/cer/decoder.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyasn1/codec/cer/decoder.py')
-rw-r--r--pyasn1/codec/cer/decoder.py72
1 files changed, 50 insertions, 22 deletions
diff --git a/pyasn1/codec/cer/decoder.py b/pyasn1/codec/cer/decoder.py
index 3e86fd0..0a92b26 100644
--- a/pyasn1/codec/cer/decoder.py
+++ b/pyasn1/codec/cer/decoder.py
@@ -5,60 +5,88 @@
# License: http://snmplabs.com/pyasn1/license.html
#
from pyasn1 import error
+from pyasn1.codec.streaming import readFromStream
from pyasn1.codec.ber import decoder
from pyasn1.compat.octets import oct2int
from pyasn1.type import univ
-__all__ = ['decode']
+__all__ = ['decode', 'StreamingDecoder']
+SubstrateUnderrunError = error.SubstrateUnderrunError
-class BooleanDecoder(decoder.AbstractSimpleDecoder):
+
+class BooleanPayloadDecoder(decoder.AbstractSimplePayloadDecoder):
protoComponent = univ.Boolean(0)
def valueDecoder(self, substrate, asn1Spec,
tagSet=None, length=None, state=None,
decodeFun=None, substrateFun=None,
**options):
- head, tail = substrate[:length], substrate[length:]
- if not head or length != 1:
+
+ if length != 1:
raise error.PyAsn1Error('Not single-octet Boolean payload')
- byte = oct2int(head[0])
+
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ byte = oct2int(chunk[0])
+
# CER/DER specifies encoding of TRUE as 0xFF and FALSE as 0x0, while
# BER allows any non-zero value as TRUE; cf. sections 8.2.2. and 11.1
# in https://www.itu.int/ITU-T/studygroups/com17/languages/X.690-0207.pdf
if byte == 0xff:
value = 1
+
elif byte == 0x00:
value = 0
+
else:
raise error.PyAsn1Error('Unexpected Boolean payload: %s' % byte)
- return self._createComponent(asn1Spec, tagSet, value, **options), tail
+
+ yield self._createComponent(asn1Spec, tagSet, value, **options)
+
# TODO: prohibit non-canonical encoding
-BitStringDecoder = decoder.BitStringDecoder
-OctetStringDecoder = decoder.OctetStringDecoder
-RealDecoder = decoder.RealDecoder
-
-tagMap = decoder.tagMap.copy()
-tagMap.update(
- {univ.Boolean.tagSet: BooleanDecoder(),
- univ.BitString.tagSet: BitStringDecoder(),
- univ.OctetString.tagSet: OctetStringDecoder(),
- univ.Real.tagSet: RealDecoder()}
+BitStringPayloadDecoder = decoder.BitStringPayloadDecoder
+OctetStringPayloadDecoder = decoder.OctetStringPayloadDecoder
+RealPayloadDecoder = decoder.RealPayloadDecoder
+
+TAG_MAP = decoder.TAG_MAP.copy()
+TAG_MAP.update(
+ {univ.Boolean.tagSet: BooleanPayloadDecoder(),
+ univ.BitString.tagSet: BitStringPayloadDecoder(),
+ univ.OctetString.tagSet: OctetStringPayloadDecoder(),
+ univ.Real.tagSet: RealPayloadDecoder()}
)
-typeMap = decoder.typeMap.copy()
+TYPE_MAP = decoder.TYPE_MAP.copy()
# Put in non-ambiguous types for faster codec lookup
-for typeDecoder in tagMap.values():
+for typeDecoder in TAG_MAP.values():
if typeDecoder.protoComponent is not None:
typeId = typeDecoder.protoComponent.__class__.typeId
- if typeId is not None and typeId not in typeMap:
- typeMap[typeId] = typeDecoder
+ if typeId is not None and typeId not in TYPE_MAP:
+ TYPE_MAP[typeId] = typeDecoder
+
+
+class SingleItemDecoder(decoder.SingleItemDecoder):
+ __doc__ = decoder.SingleItemDecoder.__doc__
+
+ TAG_MAP = TAG_MAP
+ TYPE_MAP = TYPE_MAP
+
+
+class StreamingDecoder(decoder.StreamingDecoder):
+ __doc__ = decoder.StreamingDecoder.__doc__
+
+ SINGLE_ITEM_DECODER = SingleItemDecoder
class Decoder(decoder.Decoder):
- pass
+ __doc__ = decoder.Decoder.__doc__
+
+ STREAMING_DECODER = StreamingDecoder
#: Turns CER octet stream into an ASN.1 object.
@@ -111,4 +139,4 @@ class Decoder(decoder.Decoder):
#: SequenceOf:
#: 1 2 3
#:
-decode = Decoder(tagMap, decoder.typeMap)
+decode = Decoder()