summaryrefslogtreecommitdiff
path: root/pyasn1/codec/streaming.py
diff options
context:
space:
mode:
authorIlya Etingof <etingof@gmail.com>2019-09-14 18:46:08 +0200
committerIlya Etingof <etingof@gmail.com>2019-11-15 19:39:11 +0100
commit93e11a2dfded950827ba3393b5a4562270a766da (patch)
treecddf49a1daa1258d91f348bad30eabc662241bd8 /pyasn1/codec/streaming.py
parentfdd0bd66f66e5f7832287ff0994f4219632935a7 (diff)
downloadpyasn1-git-93e11a2dfded950827ba3393b5a4562270a766da.tar.gz
Refactor BER decoder into a suspendable coroutine
The goal of this change is to make the decoder stopping on input data starvation and resuming from where it stopped whenever the caller decides to try again (hopefully making sure that some more input becomes available). This change makes it possible for the decoder to operate on streams of data (meaning that the entire DER blob might not be immediately available on input). On top of that, the decoder yields partially reconstructed ASN.1 object on input starvation making it possible for the caller to inspect what has been decoded so far and possibly consume partial ASN.1 data. All these new feature are natively available through `StreamingDecoder` class. Previously published API is implemented as a thin wrapper on top of that ensuring backward compatibility.
Diffstat (limited to 'pyasn1/codec/streaming.py')
-rw-r--r--pyasn1/codec/streaming.py240
1 files changed, 240 insertions, 0 deletions
diff --git a/pyasn1/codec/streaming.py b/pyasn1/codec/streaming.py
new file mode 100644
index 0000000..1889677
--- /dev/null
+++ b/pyasn1/codec/streaming.py
@@ -0,0 +1,240 @@
+#
+# This file is part of pyasn1 software.
+#
+# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
+# License: http://snmplabs.com/pyasn1/license.html
+#
+import io
+import os
+import sys
+
+from pyasn1 import error
+from pyasn1.type import univ
+
+_PY2 = sys.version_info < (3,)
+
+
+class CachingStreamWrapper(io.IOBase):
+ """Wrapper around non-seekable streams.
+
+ Note that the implementation is tied to the decoder,
+ not checking for dangerous arguments for the sake
+ of performance.
+
+ The read bytes are kept in an internal cache until
+ setting _markedPosition which may reset the cache.
+ """
+ def __init__(self, raw):
+ self._raw = raw
+ self._cache = io.BytesIO()
+ self._markedPosition = 0
+
+ def peek(self, n):
+ result = self.read(n)
+ self._cache.seek(-len(result), os.SEEK_CUR)
+ return result
+
+ def seekable(self):
+ return True
+
+ def seek(self, n=-1, whence=os.SEEK_SET):
+ # Note that this not safe for seeking forward.
+ return self._cache.seek(n, whence)
+
+ def read(self, n=-1):
+ read_from_cache = self._cache.read(n)
+ if n != -1:
+ n -= len(read_from_cache)
+ if not n: # 0 bytes left to read
+ return read_from_cache
+
+ read_from_raw = self._raw.read(n)
+
+ self._cache.write(read_from_raw)
+
+ return read_from_cache + read_from_raw
+
+ @property
+ def markedPosition(self):
+ """Position where the currently processed element starts.
+
+ This is used for back-tracking in SingleItemDecoder.__call__
+ and (indefLen)ValueDecoder and should not be used for other purposes.
+ The client is not supposed to ever seek before this position.
+ """
+ return self._markedPosition
+
+ @markedPosition.setter
+ def markedPosition(self, value):
+ # By setting the value, we ensure we won't seek back before it.
+ # `value` should be the same as the current position
+ # We don't check for this for performance reasons.
+ self._markedPosition = value
+
+ # Whenever we set _marked_position, we know for sure
+ # that we will not return back, and thus it is
+ # safe to drop all cached data.
+ if self._cache.tell() > io.DEFAULT_BUFFER_SIZE:
+ self._cache = io.BytesIO(self._cache.read())
+ self._markedPosition = 0
+
+ def tell(self):
+ return self._cache.tell()
+
+
+def asSeekableStream(substrate):
+ """Convert object to seekable byte-stream.
+
+ Parameters
+ ----------
+ substrate: :py:class:`bytes` or :py:class:`io.IOBase` or :py:class:`univ.OctetString`
+
+ Returns
+ -------
+ : :py:class:`io.IOBase`
+
+ Raises
+ ------
+ : :py:class:`~pyasn1.error.PyAsn1Error`
+ If the supplied substrate cannot be converted to a seekable stream.
+ """
+ if isinstance(substrate, bytes):
+ return io.BytesIO(substrate)
+
+ elif isinstance(substrate, univ.OctetString):
+ return io.BytesIO(substrate.asOctets())
+
+ try:
+ # Special case: impossible to set attributes on `file` built-in
+ if _PY2 and isinstance(substrate, file):
+ return io.BufferedReader(substrate)
+
+ elif substrate.seekable(): # Will fail for most invalid types
+ return substrate
+
+ else:
+ return CachingStreamWrapper(substrate)
+
+ except AttributeError:
+ raise error.UnsupportedSubstrateError(
+ "Cannot convert " + substrate.__class__.__name__ +
+ " to a seekable bit stream.")
+
+
+def isEndOfStream(substrate):
+ """Check whether we have reached the end of a stream.
+
+ Although it is more effective to read and catch exceptions, this
+ function
+
+ Parameters
+ ----------
+ substrate: :py:class:`IOBase`
+ Stream to check
+
+ Returns
+ -------
+ : :py:class:`bool`
+ """
+ if isinstance(substrate, io.BytesIO):
+ cp = substrate.tell()
+ substrate.seek(0, os.SEEK_END)
+ result = substrate.tell() == cp
+ substrate.seek(cp, os.SEEK_SET)
+ yield result
+
+ else:
+ received = substrate.read(1)
+ if received is None:
+ yield
+
+ if received:
+ substrate.seek(-1, os.SEEK_CUR)
+
+ yield not received
+
+
+def peek(substrate, size=-1):
+ """Peek the stream.
+
+ Parameters
+ ----------
+ substrate: :py:class:`IOBase`
+ Stream to read from.
+
+ size: :py:class:`int`
+ How many bytes to peek (-1 = all available)
+
+ Returns
+ -------
+ : :py:class:`bytes` or :py:class:`str`
+ The return type depends on Python major version
+ """
+ if hasattr(substrate, "peek"):
+ received = substrate.peek(size)
+ if received is None:
+ yield
+
+ while len(received) < size:
+ yield
+
+ yield received
+
+ else:
+ current_position = substrate.tell()
+ try:
+ for chunk in read(substrate, size):
+ yield chunk
+
+ finally:
+ substrate.seek(current_position)
+
+
+def read(substrate, size=-1, context=None):
+ """Read from the stream.
+
+ Parameters
+ ----------
+ substrate: :py:class:`IOBase`
+ Stream to read from.
+
+ Keyword parameters
+ ------------------
+ size: :py:class:`int`
+ How many bytes to read (-1 = all available)
+
+ context: :py:class:`dict`
+ Opaque caller context will be attached to exception objects created
+ by this function.
+
+ Yields
+ ------
+ : :py:class:`bytes` or :py:class:`str` or None
+ Returns read data or :py:class:`~pyasn1.error.SubstrateUnderrunError`
+ object if no `size` bytes is readily available in the stream. The
+ data type depends on Python major version
+
+ Raises
+ ------
+ : :py:class:`~pyasn1.error.EndOfStreamError`
+ Input stream is exhausted
+ """
+ while True:
+ # this will block unless stream is non-blocking
+ received = substrate.read(size)
+ if received is None: # non-blocking stream can do this
+ yield error.SubstrateUnderrunError(context=context)
+
+ elif size != 0 and not received: # end-of-stream
+ raise error.EndOfStreamError(context=context)
+
+ elif len(received) < size:
+ substrate.seek(-len(received), os.SEEK_CUR)
+
+ # behave like a non-blocking stream
+ yield error.SubstrateUnderrunError(context=context)
+
+ else:
+ break
+
+ yield received