diff options
author | Ilya Etingof <etingof@gmail.com> | 2019-09-14 18:46:08 +0200 |
---|---|---|
committer | Ilya Etingof <etingof@gmail.com> | 2019-09-28 23:05:25 +0200 |
commit | 4f644c59bf3ec34a3a8b9cd045dfd7cd1735259f (patch) | |
tree | 873f47b66d57f88f57d8b2c24f71df366114e560 /pyasn1/codec/cer/decoder.py | |
parent | 4d7d55330522f43472e8637c5f9a01778dea0f3a (diff) | |
download | pyasn1-git-4f644c59bf3ec34a3a8b9cd045dfd7cd1735259f.tar.gz |
Refactor BER decoder into a suspendable coroutine
The goal of this change is to make the decoder stopping on input
data starvation and resuming from where it stopped whenever the
caller decides to try again (hopefully making sure that some more
input becomes available).
This change makes it possible for the decoder to operate on streams
of data (meaning that the entire DER blob might not be immediately
available on input).
On top of that, the decoder yields partially reconstructed ASN.1
object on input starvation making it possible for the caller to
inspect what has been decoded so far and possibly consume partial
ASN.1 data.
All these new feature are natively available through
`StreamingDecoder` class. Previously published API is implemented
as a thin wrapper on top of that ensuring backward compatibility.
Diffstat (limited to 'pyasn1/codec/cer/decoder.py')
-rw-r--r-- | pyasn1/codec/cer/decoder.py | 89 |
1 files changed, 47 insertions, 42 deletions
diff --git a/pyasn1/codec/cer/decoder.py b/pyasn1/codec/cer/decoder.py index b709313..08f9ec8 100644 --- a/pyasn1/codec/cer/decoder.py +++ b/pyasn1/codec/cer/decoder.py @@ -4,79 +4,89 @@ # Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> # License: http://snmplabs.com/pyasn1/license.html # -from io import BytesIO - from pyasn1 import error +from pyasn1.codec import streaming from pyasn1.codec.ber import decoder -from pyasn1.codec.ber.decoder import _asSeekableStream from pyasn1.compat.octets import oct2int from pyasn1.type import univ -__all__ = ['decode', 'decodeStream'] +__all__ = ['decode', 'StreamingDecoder'] + +SubstrateUnderrunError = error.SubstrateUnderrunError -class BooleanDecoder(decoder.AbstractSimpleDecoder): +class BooleanPayloadDecoder(decoder.AbstractSimplePayloadDecoder): protoComponent = univ.Boolean(0) def valueDecoder(self, substrate, asn1Spec, tagSet=None, length=None, state=None, decodeFun=None, substrateFun=None, **options): - head = substrate.read(1) - if not head or length != 1: + + if length != 1: raise error.PyAsn1Error('Not single-octet Boolean payload') - byte = oct2int(head[0]) + + for chunk in streaming.read(substrate, length, options): + if isinstance(chunk, SubstrateUnderrunError): + yield chunk + + byte = oct2int(chunk[0]) + # CER/DER specifies encoding of TRUE as 0xFF and FALSE as 0x0, while # BER allows any non-zero value as TRUE; cf. sections 8.2.2. and 11.1 # in https://www.itu.int/ITU-T/studygroups/com17/languages/X.690-0207.pdf if byte == 0xff: value = 1 + elif byte == 0x00: value = 0 + else: raise error.PyAsn1Error('Unexpected Boolean payload: %s' % byte) - return self._createComponent(asn1Spec, tagSet, value, **options) + + yield self._createComponent(asn1Spec, tagSet, value, **options) + # TODO: prohibit non-canonical encoding -BitStringDecoder = decoder.BitStringDecoder -OctetStringDecoder = decoder.OctetStringDecoder -RealDecoder = decoder.RealDecoder - -tagMap = decoder.tagMap.copy() -tagMap.update( - {univ.Boolean.tagSet: BooleanDecoder(), - univ.BitString.tagSet: BitStringDecoder(), - univ.OctetString.tagSet: OctetStringDecoder(), - univ.Real.tagSet: RealDecoder()} +BitStringPayloadDecoder = decoder.BitStringPayloadDecoder +OctetStringPayloadDecoder = decoder.OctetStringPayloadDecoder +RealPayloadDecoder = decoder.RealPayloadDecoder + +TAG_MAP = decoder.TAG_MAP.copy() +TAG_MAP.update( + {univ.Boolean.tagSet: BooleanPayloadDecoder(), + univ.BitString.tagSet: BitStringPayloadDecoder(), + univ.OctetString.tagSet: OctetStringPayloadDecoder(), + univ.Real.tagSet: RealPayloadDecoder()} ) -typeMap = decoder.typeMap.copy() +TYPE_MAP = decoder.TYPE_MAP.copy() # Put in non-ambiguous types for faster codec lookup -for typeDecoder in tagMap.values(): +for typeDecoder in TAG_MAP.values(): if typeDecoder.protoComponent is not None: typeId = typeDecoder.protoComponent.__class__.typeId - if typeId is not None and typeId not in typeMap: - typeMap[typeId] = typeDecoder + if typeId is not None and typeId not in TYPE_MAP: + TYPE_MAP[typeId] = typeDecoder -class Decoder(decoder.Decoder): - pass +class SingleItemDecoder(decoder.SingleItemDecoder): + __doc__ = decoder.SingleItemDecoder.__doc__ + + TAG_MAP = TAG_MAP + TYPE_MAP = TYPE_MAP -_decode = Decoder(tagMap, typeMap) +class StreamingDecoder(decoder.StreamingDecoder): + __doc__ = decoder.StreamingDecoder.__doc__ + SINGLE_ITEM_DECODER = SingleItemDecoder + + +class Decoder(decoder.Decoder): + __doc__ = decoder.Decoder.__doc__ -def decodeStream(substrate, asn1Spec=None, **kwargs): - """Iterator of objects in a substrate.""" - # TODO: This should become `decode` after API-breaking approved - substrate = _asSeekableStream(substrate) - while True: - result = _decode(substrate, asn1Spec, **kwargs) - if result is None: - break - yield result - # TODO: Check about eoo.endOfOctets? + STREAMING_DECODER = StreamingDecoder #: Turns CER octet stream into an ASN.1 object. @@ -129,9 +139,4 @@ def decodeStream(substrate, asn1Spec=None, **kwargs): #: SequenceOf: #: 1 2 3 #: -def decode(substrate, asn1Spec=None, **kwargs): - # TODO: Temporary solution before merging with upstream - # It preserves the original API - substrate = _asSeekableStream(substrate) - value = _decode(substrate, asn1Spec=asn1Spec, **kwargs) - return value, substrate.read() +decode = Decoder() |