summaryrefslogtreecommitdiff
path: root/pyasn1/codec/cer/decoder.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyasn1/codec/cer/decoder.py')
-rw-r--r--pyasn1/codec/cer/decoder.py89
1 files changed, 47 insertions, 42 deletions
diff --git a/pyasn1/codec/cer/decoder.py b/pyasn1/codec/cer/decoder.py
index b709313..0a92b26 100644
--- a/pyasn1/codec/cer/decoder.py
+++ b/pyasn1/codec/cer/decoder.py
@@ -4,79 +4,89 @@
# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
# License: http://snmplabs.com/pyasn1/license.html
#
-from io import BytesIO
-
from pyasn1 import error
+from pyasn1.codec.streaming import readFromStream
from pyasn1.codec.ber import decoder
-from pyasn1.codec.ber.decoder import _asSeekableStream
from pyasn1.compat.octets import oct2int
from pyasn1.type import univ
-__all__ = ['decode', 'decodeStream']
+__all__ = ['decode', 'StreamingDecoder']
+
+SubstrateUnderrunError = error.SubstrateUnderrunError
-class BooleanDecoder(decoder.AbstractSimpleDecoder):
+class BooleanPayloadDecoder(decoder.AbstractSimplePayloadDecoder):
protoComponent = univ.Boolean(0)
def valueDecoder(self, substrate, asn1Spec,
tagSet=None, length=None, state=None,
decodeFun=None, substrateFun=None,
**options):
- head = substrate.read(1)
- if not head or length != 1:
+
+ if length != 1:
raise error.PyAsn1Error('Not single-octet Boolean payload')
- byte = oct2int(head[0])
+
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ byte = oct2int(chunk[0])
+
# CER/DER specifies encoding of TRUE as 0xFF and FALSE as 0x0, while
# BER allows any non-zero value as TRUE; cf. sections 8.2.2. and 11.1
# in https://www.itu.int/ITU-T/studygroups/com17/languages/X.690-0207.pdf
if byte == 0xff:
value = 1
+
elif byte == 0x00:
value = 0
+
else:
raise error.PyAsn1Error('Unexpected Boolean payload: %s' % byte)
- return self._createComponent(asn1Spec, tagSet, value, **options)
+
+ yield self._createComponent(asn1Spec, tagSet, value, **options)
+
# TODO: prohibit non-canonical encoding
-BitStringDecoder = decoder.BitStringDecoder
-OctetStringDecoder = decoder.OctetStringDecoder
-RealDecoder = decoder.RealDecoder
-
-tagMap = decoder.tagMap.copy()
-tagMap.update(
- {univ.Boolean.tagSet: BooleanDecoder(),
- univ.BitString.tagSet: BitStringDecoder(),
- univ.OctetString.tagSet: OctetStringDecoder(),
- univ.Real.tagSet: RealDecoder()}
+BitStringPayloadDecoder = decoder.BitStringPayloadDecoder
+OctetStringPayloadDecoder = decoder.OctetStringPayloadDecoder
+RealPayloadDecoder = decoder.RealPayloadDecoder
+
+TAG_MAP = decoder.TAG_MAP.copy()
+TAG_MAP.update(
+ {univ.Boolean.tagSet: BooleanPayloadDecoder(),
+ univ.BitString.tagSet: BitStringPayloadDecoder(),
+ univ.OctetString.tagSet: OctetStringPayloadDecoder(),
+ univ.Real.tagSet: RealPayloadDecoder()}
)
-typeMap = decoder.typeMap.copy()
+TYPE_MAP = decoder.TYPE_MAP.copy()
# Put in non-ambiguous types for faster codec lookup
-for typeDecoder in tagMap.values():
+for typeDecoder in TAG_MAP.values():
if typeDecoder.protoComponent is not None:
typeId = typeDecoder.protoComponent.__class__.typeId
- if typeId is not None and typeId not in typeMap:
- typeMap[typeId] = typeDecoder
+ if typeId is not None and typeId not in TYPE_MAP:
+ TYPE_MAP[typeId] = typeDecoder
-class Decoder(decoder.Decoder):
- pass
+class SingleItemDecoder(decoder.SingleItemDecoder):
+ __doc__ = decoder.SingleItemDecoder.__doc__
+
+ TAG_MAP = TAG_MAP
+ TYPE_MAP = TYPE_MAP
-_decode = Decoder(tagMap, typeMap)
+class StreamingDecoder(decoder.StreamingDecoder):
+ __doc__ = decoder.StreamingDecoder.__doc__
+ SINGLE_ITEM_DECODER = SingleItemDecoder
+
+
+class Decoder(decoder.Decoder):
+ __doc__ = decoder.Decoder.__doc__
-def decodeStream(substrate, asn1Spec=None, **kwargs):
- """Iterator of objects in a substrate."""
- # TODO: This should become `decode` after API-breaking approved
- substrate = _asSeekableStream(substrate)
- while True:
- result = _decode(substrate, asn1Spec, **kwargs)
- if result is None:
- break
- yield result
- # TODO: Check about eoo.endOfOctets?
+ STREAMING_DECODER = StreamingDecoder
#: Turns CER octet stream into an ASN.1 object.
@@ -129,9 +139,4 @@ def decodeStream(substrate, asn1Spec=None, **kwargs):
#: SequenceOf:
#: 1 2 3
#:
-def decode(substrate, asn1Spec=None, **kwargs):
- # TODO: Temporary solution before merging with upstream
- # It preserves the original API
- substrate = _asSeekableStream(substrate)
- value = _decode(substrate, asn1Spec=asn1Spec, **kwargs)
- return value, substrate.read()
+decode = Decoder()