summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Etingof <etingof@gmail.com>2019-11-16 18:02:12 +0100
committerIlya Etingof <etingof@gmail.com>2019-11-16 18:02:12 +0100
commit317452bd76d711c35a1bbdda54879606dd693268 (patch)
tree2d06cd815b392ca7f075d4c51b76451677a35742
parent8393983359edc25b75cbe07f0d4c13497285aa71 (diff)
downloadpyasn1-git-317452bd76d711c35a1bbdda54879606dd693268.tar.gz
Pass `tagMap` and `typeMap` to decoder instance
This change should simplify decoder specialization by means of parameterization in addition to subclassing.
-rw-r--r--CHANGES.rst3
-rw-r--r--pyasn1/codec/ber/decoder.py25
-rw-r--r--pyasn1/codec/ber/encoder.py12
-rw-r--r--pyasn1/codec/native/decoder.py12
-rw-r--r--pyasn1/codec/native/encoder.py14
-rw-r--r--tests/codec/ber/test_decoder.py3
6 files changed, 37 insertions, 32 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index 9297c9b..a4f91b8 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -22,6 +22,9 @@ Revision 0.5.0, released XX-09-2019
`StreamingDecoder` class. Previously published API is implemented
as a thin wrapper on top of that ensuring backward compatibility.
+Revision 0.4.8, released XX-09-2019
+-----------------------------------
+
- Added ability of combining `SingleValueConstraint` and
`PermittedAlphabetConstraint` objects into one for proper modeling
`FROM ... EXCEPT ...` ASN.1 clause.
diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py
index 6dc8866..0755adc 100644
--- a/pyasn1/codec/ber/decoder.py
+++ b/pyasn1/codec/ber/decoder.py
@@ -1483,12 +1483,13 @@ class SingleItemDecoder(object):
TAG_MAP = TAG_MAP
TYPE_MAP = TYPE_MAP
- def __init__(self, tagMap=None, typeMap=None):
- self.__tagMap = tagMap or self.TAG_MAP
- self.__typeMap = typeMap or self.TYPE_MAP
+ def __init__(self, **options):
+ self._tagMap = options.get('tagMap', self.TAG_MAP)
+ self._typeMap = options.get('typeMap', self.TYPE_MAP)
+
# Tag & TagSet objects caches
- self.__tagCache = {}
- self.__tagSetCache = {}
+ self._tagCache = {}
+ self._tagSetCache = {}
def __call__(self, substrate, asn1Spec=None,
tagSet=None, length=None, state=stDecodeTag,
@@ -1518,10 +1519,10 @@ class SingleItemDecoder(object):
else:
substrate.seek(-2, os.SEEK_CUR)
- tagMap = self.__tagMap
- typeMap = self.__typeMap
- tagCache = self.__tagCache
- tagSetCache = self.__tagSetCache
+ tagMap = self._tagMap
+ typeMap = self._typeMap
+ tagCache = self._tagCache
+ tagSetCache = self._tagSetCache
value = noValue
@@ -1902,7 +1903,7 @@ class StreamingDecoder(object):
SINGLE_ITEM_DECODER = SingleItemDecoder
def __init__(self, substrate, asn1Spec=None, **options):
- self._singleItemDecoder = self.SINGLE_ITEM_DECODER()
+ self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
self._substrate = asSeekableStream(substrate)
self._asn1Spec = asn1Spec
self._options = options
@@ -1931,7 +1932,7 @@ class Decoder(object):
STREAMING_DECODER = StreamingDecoder
@classmethod
- def __call__(cls, substrate, asn1Spec=None, **kwargs):
+ def __call__(cls, substrate, asn1Spec=None, **options):
"""Turns BER/CER/DER octet stream into an ASN.1 object.
Takes BER/CER/DER octet-stream in form of :py:class:`bytes` (Python 3)
@@ -1992,7 +1993,7 @@ class Decoder(object):
substrate = asSeekableStream(substrate)
streamingDecoder = cls.STREAMING_DECODER(
- substrate, asn1Spec, **kwargs)
+ substrate, asn1Spec, **options)
for asn1Object in streamingDecoder:
if isinstance(asn1Object, SubstrateUnderrunError):
diff --git a/pyasn1/codec/ber/encoder.py b/pyasn1/codec/ber/encoder.py
index 7ee9b47..826ea73 100644
--- a/pyasn1/codec/ber/encoder.py
+++ b/pyasn1/codec/ber/encoder.py
@@ -781,9 +781,9 @@ class SingleItemEncoder(object):
TAG_MAP = TAG_MAP
TYPE_MAP = TYPE_MAP
- def __init__(self, tagMap=None, typeMap=None):
- self.__tagMap = tagMap or self.TAG_MAP
- self.__typeMap = typeMap or self.TYPE_MAP
+ def __init__(self, **options):
+ self._tagMap = options.get('tagMap', self.TAG_MAP)
+ self._typeMap = options.get('typeMap', self.TYPE_MAP)
def __call__(self, value, asn1Spec=None, **options):
try:
@@ -810,7 +810,7 @@ class SingleItemEncoder(object):
options.update(maxChunkSize=self.fixedChunkSize)
try:
- concreteEncoder = self.__typeMap[typeId]
+ concreteEncoder = self._typeMap[typeId]
if LOG:
LOG('using value codec %s chosen by type ID '
@@ -826,7 +826,7 @@ class SingleItemEncoder(object):
baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag)
try:
- concreteEncoder = self.__tagMap[baseTagSet]
+ concreteEncoder = self._tagMap[baseTagSet]
except KeyError:
raise error.PyAsn1Error('No encoder for %r (%s)' % (value, tagSet))
@@ -849,7 +849,7 @@ class Encoder(object):
SINGLE_ITEM_ENCODER = SingleItemEncoder
def __init__(self, **options):
- self._singleItemEncoder = self.SINGLE_ITEM_ENCODER()
+ self._singleItemEncoder = self.SINGLE_ITEM_ENCODER(**options)
def __call__(self, pyObject, asn1Spec=None, **options):
return self._singleItemEncoder(
diff --git a/pyasn1/codec/native/decoder.py b/pyasn1/codec/native/decoder.py
index db30c71..1838b7d 100644
--- a/pyasn1/codec/native/decoder.py
+++ b/pyasn1/codec/native/decoder.py
@@ -135,9 +135,9 @@ class SingleItemDecoder(object):
TAG_MAP = TAG_MAP
TYPE_MAP = TYPE_MAP
- def __init__(self, tagMap=None, typeMap=None):
- self.__tagMap = tagMap or self.TAG_MAP
- self.__typeMap = typeMap or self.TYPE_MAP
+ def __init__(self, **options):
+ self._tagMap = options.get('tagMap', self.TAG_MAP)
+ self._typeMap = options.get('typeMap', self.TYPE_MAP)
def __call__(self, pyObject, asn1Spec, **options):
@@ -152,14 +152,14 @@ class SingleItemDecoder(object):
'Item, not %s)' % asn1Spec.__class__.__name__)
try:
- valueDecoder = self.__typeMap[asn1Spec.typeId]
+ valueDecoder = self._typeMap[asn1Spec.typeId]
except KeyError:
# use base type for codec lookup to recover untagged types
baseTagSet = tag.TagSet(asn1Spec.tagSet.baseTag, asn1Spec.tagSet.baseTag)
try:
- valueDecoder = self.__tagMap[baseTagSet]
+ valueDecoder = self._tagMap[baseTagSet]
except KeyError:
raise error.PyAsn1Error('Unknown ASN.1 tag %s' % asn1Spec.tagSet)
@@ -184,7 +184,7 @@ class Decoder(object):
SINGLE_ITEM_DECODER = SingleItemDecoder
def __init__(self, **options):
- self._singleItemDecoder = self.SINGLE_ITEM_DECODER()
+ self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
def __call__(self, pyObject, asn1Spec=None, **kwargs):
return self._singleItemDecoder(pyObject, asn1Spec=asn1Spec, **kwargs)
diff --git a/pyasn1/codec/native/encoder.py b/pyasn1/codec/native/encoder.py
index d0d65ec..7c5edc9 100644
--- a/pyasn1/codec/native/encoder.py
+++ b/pyasn1/codec/native/encoder.py
@@ -180,9 +180,9 @@ class SingleItemEncoder(object):
TAG_MAP = TAG_MAP
TYPE_MAP = TYPE_MAP
- def __init__(self, tagMap=None, typeMap=None):
- self.__tagMap = tagMap or self.TAG_MAP
- self.__typeMap = typeMap or self.TYPE_MAP
+ def __init__(self, **options):
+ self._tagMap = options.get('tagMap', self.TAG_MAP)
+ self._typeMap = options.get('typeMap', self.TYPE_MAP)
def __call__(self, value, **options):
if not isinstance(value, base.Asn1Item):
@@ -197,7 +197,7 @@ class SingleItemEncoder(object):
tagSet = value.tagSet
try:
- concreteEncoder = self.__typeMap[value.typeId]
+ concreteEncoder = self._typeMap[value.typeId]
except KeyError:
# use base type for codec lookup to recover untagged types
@@ -205,7 +205,7 @@ class SingleItemEncoder(object):
value.tagSet.baseTag, value.tagSet.baseTag)
try:
- concreteEncoder = self.__tagMap[baseTagSet]
+ concreteEncoder = self._tagMap[baseTagSet]
except KeyError:
raise error.PyAsn1Error('No encoder for %s' % (value,))
@@ -227,8 +227,8 @@ class SingleItemEncoder(object):
class Encoder(object):
SINGLE_ITEM_ENCODER = SingleItemEncoder
- def __init__(self, **kwargs):
- self._singleItemEncoder = self.SINGLE_ITEM_ENCODER()
+ def __init__(self, **options):
+ self._singleItemEncoder = self.SINGLE_ITEM_ENCODER(**options)
def __call__(self, pyObject, asn1Spec=None, **options):
return self._singleItemEncoder(
diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py
index a559209..17483db 100644
--- a/tests/codec/ber/test_decoder.py
+++ b/tests/codec/ber/test_decoder.py
@@ -1592,7 +1592,8 @@ class NonStringDecoderTestCase(BaseTestCase):
class ErrorOnDecodingTestCase(BaseTestCase):
def testErrorCondition(self):
- decode = decoder.SingleItemDecoder(decoder.TAG_MAP, decoder.TYPE_MAP)
+ decode = decoder.SingleItemDecoder(
+ tagMap=decoder.TAG_MAP, typeMap=decoder.TYPE_MAP)
substrate = ints2octs((00, 1, 2))
stream = streaming.asSeekableStream(substrate)