summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Etingof <etingof@gmail.com>2017-09-08 21:02:40 +0200
committerIlya Etingof <etingof@gmail.com>2017-09-09 12:17:38 +0200
commit3e8db6efc18ed08d9a98d10c30749186baa03aee (patch)
tree872cba7050faf73ec5ce0a0999a358d352854530
parentf53f48553c5e683a0c5e9e8009ef754410f2b9e0 (diff)
downloadpyasn1-git-3e8db6efc18ed08d9a98d10c30749186baa03aee.tar.gz
codecs signatures unified and pass **options through
WARNING: signatures of some undocumented codecs methods changed
-rw-r--r--CHANGES.rst5
-rw-r--r--pyasn1/__init__.py2
-rw-r--r--pyasn1/codec/ber/decoder.py210
-rw-r--r--pyasn1/codec/ber/encoder.py74
-rw-r--r--pyasn1/codec/cer/decoder.py6
-rw-r--r--pyasn1/codec/cer/encoder.py47
-rw-r--r--pyasn1/codec/der/encoder.py17
-rw-r--r--pyasn1/codec/native/decoder.py22
-rw-r--r--pyasn1/codec/native/encoder.py40
9 files changed, 265 insertions, 158 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index dfdf3ce..8702ee1 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,4 +1,9 @@
+Revision 0.3.5, released XX-09-2017
+-----------------------------------
+
+- Codecs signatures unified and pass **options through the call chain
+
Revision 0.3.4, released 07-09-2017
-----------------------------------
diff --git a/pyasn1/__init__.py b/pyasn1/__init__.py
index 8b713b4..26495d5 100644
--- a/pyasn1/__init__.py
+++ b/pyasn1/__init__.py
@@ -1,7 +1,7 @@
import sys
# http://www.python.org/dev/peps/pep-0396/
-__version__ = '0.3.4'
+__version__ = '0.3.5'
if sys.version_info[:2] < (2, 4):
raise RuntimeError('PyASN1 requires Python 2.4 or later')
diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py
index 4fff81b..5424195 100644
--- a/pyasn1/codec/ber/decoder.py
+++ b/pyasn1/codec/ber/decoder.py
@@ -18,12 +18,16 @@ noValue = base.noValue
class AbstractDecoder(object):
protoComponent = None
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,))
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,))
@@ -44,27 +48,37 @@ class AbstractSimpleDecoder(AbstractDecoder):
class ExplicitTagDecoder(AbstractSimpleDecoder):
protoComponent = univ.Any('')
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if substrateFun:
return substrateFun(
self._createComponent(asn1Spec, tagSet, ''),
substrate, length
)
+
head, tail = substrate[:length], substrate[length:]
- value, _ = decodeFun(head, asn1Spec, tagSet, length)
+
+ value, _ = decodeFun(head, asn1Spec, tagSet, length, **options)
+
return value, tail
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if substrateFun:
return substrateFun(
self._createComponent(asn1Spec, tagSet, ''),
substrate, length
)
- value, substrate = decodeFun(substrate, asn1Spec, tagSet, length)
- terminator, substrate = decodeFun(substrate, allowEoo=True)
- if terminator is eoo.endOfOctets:
+
+ value, substrate = decodeFun(substrate, asn1Spec, tagSet, length, **options)
+
+ eooMarker, substrate = decodeFun(substrate, allowEoo=True, **options)
+
+ if eooMarker is eoo.endOfOctets:
return value, substrate
else:
raise error.PyAsn1Error('Missing end-of-octets terminator')
@@ -76,8 +90,10 @@ explicitTagDecoder = ExplicitTagDecoder()
class IntegerDecoder(AbstractSimpleDecoder):
protoComponent = univ.Integer(0)
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if tagSet[0].tagFormat != tag.tagFormatSimple:
raise error.PyAsn1Error('Simple tag format expected')
@@ -103,8 +119,10 @@ class BitStringDecoder(AbstractSimpleDecoder):
protoComponent = univ.BitString(())
supportConstructedForm = True
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
head, tail = substrate[:length], substrate[length:]
if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
if not head:
@@ -127,20 +145,23 @@ class BitStringDecoder(AbstractSimpleDecoder):
return substrateFun(bitString, substrate, length)
while head:
- component, head = decodeFun(head, self.protoComponent)
+ component, head = decodeFun(head, self.protoComponent, **options)
bitString += component
return bitString, tail
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
bitString = self._createComponent(asn1Spec, tagSet)
if substrateFun:
return substrateFun(bitString, substrate, length)
while substrate:
- component, substrate = decodeFun(substrate, self.protoComponent, allowEoo=True)
+ component, substrate = decodeFun(substrate, self.protoComponent,
+ allowEoo=True, **options)
if component is eoo.endOfOctets:
break
@@ -156,8 +177,10 @@ class OctetStringDecoder(AbstractSimpleDecoder):
protoComponent = univ.OctetString('')
supportConstructedForm = True
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
head, tail = substrate[:length], substrate[length:]
if substrateFun:
@@ -177,13 +200,16 @@ class OctetStringDecoder(AbstractSimpleDecoder):
while head:
component, head = decodeFun(head, self.protoComponent,
- substrateFun=substrateFun)
+ substrateFun=substrateFun,
+ **options)
header += component
return self._createComponent(asn1Spec, tagSet, header), tail
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if substrateFun and substrateFun is not self.substrateCollector:
asn1Object = self._createComponent(asn1Spec, tagSet)
return substrateFun(asn1Object, substrate, length)
@@ -197,7 +223,7 @@ class OctetStringDecoder(AbstractSimpleDecoder):
component, substrate = decodeFun(substrate,
self.protoComponent,
substrateFun=substrateFun,
- allowEoo=True)
+ allowEoo=True, **options)
if component is eoo.endOfOctets:
break
header += component
@@ -205,14 +231,17 @@ class OctetStringDecoder(AbstractSimpleDecoder):
raise error.SubstrateUnderrunError(
'No EOO seen before substrate ends'
)
+
return self._createComponent(asn1Spec, tagSet, header), substrate
class NullDecoder(AbstractSimpleDecoder):
protoComponent = univ.Null('')
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if tagSet[0].tagFormat != tag.tagFormatSimple:
raise error.PyAsn1Error('Simple tag format expected')
@@ -230,9 +259,10 @@ class NullDecoder(AbstractSimpleDecoder):
class ObjectIdentifierDecoder(AbstractSimpleDecoder):
protoComponent = univ.ObjectIdentifier(())
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
-
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if tagSet[0].tagFormat != tag.tagFormatSimple:
raise error.PyAsn1Error('Simple tag format expected')
@@ -286,8 +316,10 @@ class ObjectIdentifierDecoder(AbstractSimpleDecoder):
class RealDecoder(AbstractSimpleDecoder):
protoComponent = univ.Real()
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if tagSet[0].tagFormat != tag.tagFormatSimple:
raise error.PyAsn1Error('Simple tag format expected')
@@ -371,11 +403,11 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
def _getComponentPositionByType(self, asn1Object, tagSet, idx):
raise NotImplementedError()
- def _decodeComponents(self, substrate, decodeFun, tagSet, allowEoo=True):
+ def _decodeComponents(self, substrate, tagSet=None, decodeFun=None, **options):
components = []
componentTypes = set()
while substrate:
- component, substrate = decodeFun(substrate, allowEoo=True)
+ component, substrate = decodeFun(substrate, **options)
if component is eoo.endOfOctets:
break
components.append(component)
@@ -406,8 +438,10 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
return asn1Object, substrate
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if tagSet[0].tagFormat != tag.tagFormatConstructed:
raise error.PyAsn1Error('Constructed tag format expected')
@@ -424,7 +458,9 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
return substrateFun(asn1Object, substrate, length)
if asn1Spec is None:
- asn1Object, trailing = self._decodeComponents(head, decodeFun, tagSet)
+ asn1Object, trailing = self._decodeComponents(
+ head, tagSet=tagSet, decodeFun=decodeFun, **options
+ )
if trailing:
raise error.PyAsn1Error('Unused trailing %d octets encountered' % len(trailing))
return asn1Object, tail
@@ -455,10 +491,10 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
asn1Spec = namedTypes[idx].asn1Object
except IndexError:
raise error.PyAsn1Error(
- 'Excessive components decoded at %r'(asn1Object,)
+ 'Excessive components decoded at %r' % (asn1Object,)
)
- component, head = decodeFun(head, asn1Spec)
+ component, head = decodeFun(head, asn1Spec, **options)
if not isDeterministic and namedTypes:
if isSetType:
@@ -485,7 +521,7 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
asn1Spec = asn1Object.componentType
idx = 0
while head:
- component, head = decodeFun(head, asn1Spec)
+ component, head = decodeFun(head, asn1Spec, **options)
asn1Object.setComponentByPosition(
idx, component,
verifyConstraints=False,
@@ -497,8 +533,10 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
return asn1Object, tail
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if tagSet[0].tagFormat != tag.tagFormatConstructed:
raise error.PyAsn1Error('Constructed tag format expected')
@@ -513,7 +551,9 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
return substrateFun(asn1Object, substrate, length)
if asn1Spec is None:
- return self._decodeComponents(substrate, decodeFun, tagSet, allowEoo=True)
+ return self._decodeComponents(
+ substrate, tagSet=tagSet, decodeFun=decodeFun, allowEoo=True, **options
+ )
asn1Object = asn1Spec.clone()
@@ -544,7 +584,7 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
'Excessive components decoded at %r' % (asn1Object,)
)
- component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True)
+ component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True, **options)
if component is eoo.endOfOctets:
break
@@ -578,7 +618,7 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
asn1Spec = asn1Object.componentType
idx = 0
while substrate:
- component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True)
+ component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True, **options)
if component is eoo.endOfOctets:
break
asn1Object.setComponentByPosition(
@@ -625,8 +665,10 @@ class SetOfDecoder(SetOrSetOfDecoder):
class ChoiceDecoder(AbstractConstructedDecoder):
protoComponent = univ.Choice()
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
head, tail = substrate[:length], substrate[length:]
if asn1Spec is None:
asn1Object = self.protoComponent.clone(tagSet=tagSet)
@@ -636,11 +678,12 @@ class ChoiceDecoder(AbstractConstructedDecoder):
return substrateFun(asn1Object, substrate, length)
if asn1Object.tagSet == tagSet: # explicitly tagged Choice
component, head = decodeFun(
- head, asn1Object.componentTagMap
+ head, asn1Object.componentTagMap, **options
)
else:
component, head = decodeFun(
- head, asn1Object.componentTagMap, tagSet, length, state
+ head, asn1Object.componentTagMap,
+ tagSet, length, state, **options
)
effectiveTagSet = component.effectiveTagSet
asn1Object.setComponentByType(
@@ -651,8 +694,10 @@ class ChoiceDecoder(AbstractConstructedDecoder):
)
return asn1Object, tail
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if asn1Spec is None:
asn1Object = self.protoComponent.clone(tagSet=tagSet)
else:
@@ -660,14 +705,19 @@ class ChoiceDecoder(AbstractConstructedDecoder):
if substrateFun:
return substrateFun(asn1Object, substrate, length)
if asn1Object.tagSet == tagSet: # explicitly tagged Choice
- component, substrate = decodeFun(substrate, asn1Object.componentType.tagMapUnique)
+ component, substrate = decodeFun(
+ substrate, asn1Object.componentType.tagMapUnique, **options
+ )
# eat up EOO marker
- eooMarker, substrate = decodeFun(substrate, allowEoo=True)
+ eooMarker, substrate = decodeFun(
+ substrate, allowEoo=True, **options
+ )
if eooMarker is not eoo.endOfOctets:
raise error.PyAsn1Error('No EOO seen before substrate ends')
else:
component, substrate = decodeFun(
- substrate, asn1Object.componentType.tagMapUnique, tagSet, length, state
+ substrate, asn1Object.componentType.tagMapUnique,
+ tagSet, length, state, **options
)
effectiveTagSet = component.effectiveTagSet
asn1Object.setComponentByType(
@@ -682,24 +732,35 @@ class ChoiceDecoder(AbstractConstructedDecoder):
class AnyDecoder(AbstractSimpleDecoder):
protoComponent = univ.Any()
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if asn1Spec is None or asn1Spec is not None and tagSet != asn1Spec.tagSet:
+ fullSubstrate=options['fullSubstrate']
+
# untagged Any container, recover inner header substrate
length += len(fullSubstrate) - len(substrate)
substrate = fullSubstrate
+
if substrateFun:
return substrateFun(self._createComponent(asn1Spec, tagSet),
substrate, length)
+
head, tail = substrate[:length], substrate[length:]
+
return self._createComponent(asn1Spec, tagSet, value=head), tail
- def indefLenValueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet,
- length, state, decodeFun, substrateFun):
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
if asn1Spec is not None and tagSet == asn1Spec.tagSet:
# tagged Any type -- consume header substrate
header = null
else:
+ fullSubstrate=options['fullSubstrate']
+
# untagged Any, recover header substrate
header = fullSubstrate[:-len(substrate)]
@@ -716,7 +777,7 @@ class AnyDecoder(AbstractSimpleDecoder):
while substrate:
component, substrate = decodeFun(substrate, asn1Spec,
substrateFun=substrateFun,
- allowEoo=True)
+ allowEoo=True, **options)
if component is eoo.endOfOctets:
break
header += component
@@ -863,9 +924,11 @@ class Decoder(object):
self.__tagSetCache = {}
self.__eooSentinel = ints2octs((0, 0))
- def __call__(self, substrate, asn1Spec=None, tagSet=None,
- length=None, state=stDecodeTag, recursiveFlag=True,
- substrateFun=None, allowEoo=False):
+ def __call__(self, substrate, asn1Spec=None,
+ tagSet=None, length=None, state=stDecodeTag,
+ decodeFun=None, substrateFun=None,
+ **options):
+
if debug.logger & debug.flagDecoder:
logger = debug.logger
else:
@@ -874,6 +937,8 @@ class Decoder(object):
if logger:
logger('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate)))
+ allowEoo = options.pop('allowEoo', False)
+
# Look for end-of-octets sentinel
if allowEoo and self.supportIndefLength:
if substrate[:2] == self.__eooSentinel:
@@ -1074,19 +1139,24 @@ class Decoder(object):
logger('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
if state is stDecodeValue:
- if not recursiveFlag and not substrateFun: # deprecate this
- def substrateFun(a, b, c):
- return a, b[:c]
+ if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
+ substrateFun = lambda a, b, c: (a, b[:c])
+
+ options.update(fullSubstrate=fullSubstrate)
if length == -1: # indef length
value, substrate = concreteDecoder.indefLenValueDecoder(
- fullSubstrate, substrate, asn1Spec, tagSet, length,
- stGetValueDecoder, self, substrateFun
+ substrate, asn1Spec,
+ tagSet, length, stGetValueDecoder,
+ self, substrateFun,
+ **options
)
else:
value, substrate = concreteDecoder.valueDecoder(
- fullSubstrate, substrate, asn1Spec, tagSet, length,
- stGetValueDecoder, self, substrateFun
+ substrate, asn1Spec,
+ tagSet, length, stGetValueDecoder,
+ self, substrateFun,
+ **options
)
if logger:
logger('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, value.prettyPrint(), substrate and debug.hexdump(substrate) or '<none>'))
diff --git a/pyasn1/codec/ber/encoder.py b/pyasn1/codec/ber/encoder.py
index 10527ae..758c1a9 100644
--- a/pyasn1/codec/ber/encoder.py
+++ b/pyasn1/codec/ber/encoder.py
@@ -47,29 +47,32 @@ class AbstractItemEncoder(object):
raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen)
return (0x80 | substrateLen,) + substrate
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
raise error.PyAsn1Error('Not implemented')
def _encodeEndOfOctets(self, encodeFun, defMode):
if defMode or not self.supportIndefLenMode:
return null
else:
- return encodeFun(eoo.endOfOctets, defMode)
+ return encodeFun(eoo.endOfOctets, defMode=defMode)
- def encode(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encode(self, value, encodeFun, **options):
substrate, isConstructed, isOctets = self.encodeValue(
- encodeFun, value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty
+ value, encodeFun, **options
)
- if ifNotEmpty and not substrate:
+ if options.get('ifNotEmpty', False) and not substrate:
return substrate
tagSet = value.tagSet
# tagged value?
if tagSet:
+ defMode = options.get('defMode', True)
+
if not isConstructed: # primitive form implies definite mode
defMode = True
+
header = self.encodeTag(tagSet[-1], isConstructed)
header += self.encodeLength(len(substrate), defMode)
@@ -86,17 +89,18 @@ class AbstractItemEncoder(object):
class EndOfOctetsEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
return null, False, True
class ExplicitlyTaggedItemEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
if isinstance(value, base.AbstractConstructedAsn1Item):
value = value.clone(tagSet=value.tagSet[:-1], cloneValueFlag=True)
else:
value = value.clone(tagSet=value.tagSet[:-1])
- return encodeFun(value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty), True, True
+
+ return encodeFun(value, **options), True, True
explicitlyTaggedItemEncoder = ExplicitlyTaggedItemEncoder()
@@ -105,7 +109,7 @@ explicitlyTaggedItemEncoder = ExplicitlyTaggedItemEncoder()
class BooleanEncoder(AbstractItemEncoder):
supportIndefLenMode = False
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
return value and (1,) or (0,), False, False
@@ -113,7 +117,7 @@ class IntegerEncoder(AbstractItemEncoder):
supportIndefLenMode = False
supportCompactZero = False
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
if value == 0:
# de-facto way to encode zero
if self.supportCompactZero:
@@ -125,13 +129,14 @@ class IntegerEncoder(AbstractItemEncoder):
class BitStringEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
valueLength = len(value)
if valueLength % 8:
alignedValue = value << (8 - valueLength % 8)
else:
alignedValue = value
+ maxChunkSize = options.get('maxChunkSize', 0)
if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8:
substrate = alignedValue.asOctets()
return int2oct(len(substrate) * 8 - valueLength) + substrate, False, True
@@ -141,13 +146,14 @@ class BitStringEncoder(AbstractItemEncoder):
while stop < valueLength:
start = stop
stop = min(start + maxChunkSize * 8, valueLength)
- substrate += encodeFun(alignedValue[start:stop], defMode, maxChunkSize, ifNotEmpty=ifNotEmpty)
+ substrate += encodeFun(alignedValue[start:stop], **options)
return substrate, True, True
class OctetStringEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
+ maxChunkSize = options.get('maxChunkSize', 0)
if not maxChunkSize or len(value) <= maxChunkSize:
return value.asOctets(), False, True
else:
@@ -157,7 +163,7 @@ class OctetStringEncoder(AbstractItemEncoder):
v = value.clone(value[pos:pos + maxChunkSize])
if not v:
break
- substrate += encodeFun(v, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty)
+ substrate += encodeFun(v, **options)
pos += maxChunkSize
return substrate, True, True
@@ -166,14 +172,14 @@ class OctetStringEncoder(AbstractItemEncoder):
class NullEncoder(AbstractItemEncoder):
supportIndefLenMode = False
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
return null, False, True
class ObjectIdentifierEncoder(AbstractItemEncoder):
supportIndefLenMode = False
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
oid = value.asTuple()
# Build the first pair
@@ -271,7 +277,7 @@ class RealEncoder(AbstractItemEncoder):
encbase = encBase[i]
return sign, m, encbase, e
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
if value.isPlusInf:
return (0x40,), False, False
if value.isMinusInf:
@@ -342,10 +348,12 @@ class RealEncoder(AbstractItemEncoder):
class SequenceEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
value.verifySizeSpec()
+
namedTypes = value.componentType
substrate = null
+
idx = len(value)
while idx > 0:
idx -= 1
@@ -354,29 +362,30 @@ class SequenceEncoder(AbstractItemEncoder):
continue
if namedTypes[idx].isDefaulted and value[idx] == namedTypes[idx].asn1Object:
continue
- substrate = encodeFun(value[idx], defMode, maxChunkSize) + substrate
+ substrate = encodeFun(value[idx], **options) + substrate
+
return substrate, True, True
class SequenceOfEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
value.verifySizeSpec()
substrate = null
idx = len(value)
while idx > 0:
idx -= 1
- substrate = encodeFun(value[idx], defMode, maxChunkSize, ifNotEmpty=False) + substrate
+ substrate = encodeFun(value[idx], **options) + substrate
return substrate, True, True
class ChoiceEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
- return encodeFun(value.getComponent(), defMode, maxChunkSize, ifNotEmpty=False), True, True
+ def encodeValue(self, value, encodeFun, **options):
+ return encodeFun(value.getComponent(), **options), True, True
class AnyEncoder(OctetStringEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
- return value.asOctets(), defMode == False, True
+ def encodeValue(self, value, encodeFun, **options):
+ return value.asOctets(), options.get('defMode', True) == False, True
tagMap = {
@@ -455,16 +464,20 @@ class Encoder(object):
self.__tagMap = tagMap
self.__typeMap = typeMap
- def __call__(self, value, defMode=True, maxChunkSize=0, ifNotEmpty=False):
- if not defMode and not self.supportIndefLength:
+ def __call__(self, value, **options):
+ if not options.get('defMode', True) and not self.supportIndefLength:
raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
+
if debug.logger & debug.flagEncoder:
logger = debug.logger
else:
logger = None
+
if logger:
logger('encoder called in %sdef mode, chunk size %s for type %s, value:\n%s' % (not defMode and 'in' or '', maxChunkSize, value.prettyPrintType(), value.prettyPrint()))
+
tagSet = value.tagSet
+
if len(tagSet) > 1:
concreteEncoder = explicitlyTaggedItemEncoder
else:
@@ -477,11 +490,12 @@ class Encoder(object):
concreteEncoder = self.__tagMap[baseTagSet]
except KeyError:
raise error.PyAsn1Error('No encoder for %s' % (value,))
+
if logger:
logger('using value codec %s chosen by %s' % (concreteEncoder.__class__.__name__, tagSet))
- substrate = concreteEncoder.encode(
- self, value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty
- )
+
+ substrate = concreteEncoder.encode(value, self, **options)
+
if logger:
logger('codec %s built %s octets of substrate: %s\nencoder completed' % (concreteEncoder, len(substrate), debug.hexdump(substrate)))
return substrate
diff --git a/pyasn1/codec/cer/decoder.py b/pyasn1/codec/cer/decoder.py
index c7e2174..5e3e8bf 100644
--- a/pyasn1/codec/cer/decoder.py
+++ b/pyasn1/codec/cer/decoder.py
@@ -15,8 +15,10 @@ __all__ = ['decode']
class BooleanDecoder(decoder.AbstractSimpleDecoder):
protoComponent = univ.Boolean(0)
- def valueDecoder(self, fullSubstrate, substrate, asn1Spec, tagSet, length,
- state, decodeFun, substrateFun):
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
head, tail = substrate[:length], substrate[length:]
if not head or length != 1:
raise error.PyAsn1Error('Not single-octet Boolean payload')
diff --git a/pyasn1/codec/cer/encoder.py b/pyasn1/codec/cer/encoder.py
index c6f92ca..50cb346 100644
--- a/pyasn1/codec/cer/encoder.py
+++ b/pyasn1/codec/cer/encoder.py
@@ -14,7 +14,7 @@ __all__ = ['encode']
class BooleanEncoder(encoder.IntegerEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
if value == 0:
substrate = (0,)
else:
@@ -23,16 +23,18 @@ class BooleanEncoder(encoder.IntegerEncoder):
class BitStringEncoder(encoder.BitStringEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
+ options.update(maxChunkSize=1000)
return encoder.BitStringEncoder.encodeValue(
- self, encodeFun, value, defMode, 1000, ifNotEmpty=ifNotEmpty
+ self, value, encodeFun, **options
)
class OctetStringEncoder(encoder.OctetStringEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
+ options.update(maxChunkSize=1000)
return encoder.OctetStringEncoder.encodeValue(
- self, encodeFun, value, defMode, 1000, ifNotEmpty=ifNotEmpty
+ self, value, encodeFun, **options
)
@@ -52,7 +54,7 @@ class TimeEncoderMixIn(object):
minLength = 12
maxLength = 19
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
# Encoding constraints:
# - minutes are mandatory, seconds are optional
# - subseconds must NOT be zero
@@ -74,8 +76,10 @@ class TimeEncoderMixIn(object):
if self.commachar in octets:
raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value)
+ options.update(maxChunkSize=1000)
+
return encoder.OctetStringEncoder.encodeValue(
- self, encodeFun, value, defMode, 1000, ifNotEmpty=ifNotEmpty
+ self, value, encodeFun, **options
)
@@ -95,7 +99,7 @@ class SetOfEncoder(encoder.SequenceOfEncoder):
# sort by tags regardless of the Choice value (static sort)
return sorted(components, key=lambda x: isinstance(x, univ.Choice) and x.minTagSet or x.tagSet)
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
value.verifySizeSpec()
substrate = null
idx = len(value)
@@ -115,9 +119,10 @@ class SetOfEncoder(encoder.SequenceOfEncoder):
compsMap[id(value[idx])] = namedTypes and namedTypes[idx].isOptional
for comp in self._sortComponents(comps):
- substrate += encodeFun(comp, defMode, maxChunkSize, ifNotEmpty=compsMap[id(comp)])
+ options.update(ifNotEmpty=compsMap[id(comp)])
+ substrate += encodeFun(comp, **options)
else:
- components = [encodeFun(x, defMode, maxChunkSize) for x in value]
+ components = [encodeFun(x, **options) for x in value]
# sort by serialized and padded components
if len(components) > 1:
@@ -136,10 +141,12 @@ class SetOfEncoder(encoder.SequenceOfEncoder):
class SequenceEncoder(encoder.SequenceEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
value.verifySizeSpec()
+
namedTypes = value.componentType
substrate = null
+
idx = len(value)
while idx > 0:
idx -= 1
@@ -149,24 +156,25 @@ class SequenceEncoder(encoder.SequenceEncoder):
if namedTypes[idx].isDefaulted and value[idx] == namedTypes[idx].asn1Object:
continue
- substrate = encodeFun(value[idx], defMode, maxChunkSize,
- namedTypes and namedTypes[idx].isOptional) + substrate
+ options.update(ifNotEmpty=namedTypes and namedTypes[idx].isOptional)
+
+ substrate = encodeFun(value[idx], **options) + substrate
return substrate, True, True
class SequenceOfEncoder(encoder.SequenceOfEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
substrate = null
idx = len(value)
- if ifNotEmpty and not idx:
+ if options.get('ifNotEmpty', False) and not idx:
return substrate, True, True
value.verifySizeSpec()
while idx > 0:
idx -= 1
- substrate = encodeFun(value[idx], defMode, maxChunkSize, ifNotEmpty=False) + substrate
+ substrate = encodeFun(value[idx], **options) + substrate
return substrate, True, True
@@ -200,9 +208,12 @@ typeMap.update({
class Encoder(encoder.Encoder):
+ supportIndefLength = True
- def __call__(self, value, defMode=False, maxChunkSize=0, ifNotEmpty=False):
- return encoder.Encoder.__call__(self, value, defMode, maxChunkSize, ifNotEmpty)
+ def __call__(self, value, **options):
+ if 'defMode' not in options:
+ options.update(defMode=False)
+ return encoder.Encoder.__call__(self, value, **options)
#: Turns ASN.1 object into CER octet stream.
#:
diff --git a/pyasn1/codec/der/encoder.py b/pyasn1/codec/der/encoder.py
index 6782819..59dd512 100644
--- a/pyasn1/codec/der/encoder.py
+++ b/pyasn1/codec/der/encoder.py
@@ -6,21 +6,20 @@
#
from pyasn1.type import univ
from pyasn1.codec.cer import encoder
-from pyasn1 import error
__all__ = ['encode']
class BitStringEncoder(encoder.BitStringEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
return encoder.BitStringEncoder.encodeValue(
- self, encodeFun, value, defMode, 0, ifNotEmpty=ifNotEmpty
+ self, value, encodeFun, **options
)
class OctetStringEncoder(encoder.OctetStringEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
return encoder.OctetStringEncoder.encodeValue(
- self, encodeFun, value, defMode, 0, ifNotEmpty=ifNotEmpty
+ self, value, encodeFun, **options
)
class SetOfEncoder(encoder.SetOfEncoder):
@@ -50,10 +49,10 @@ typeMap.update({
class Encoder(encoder.Encoder):
supportIndefLength = False
- def __call__(self, value, defMode=True, maxChunkSize=0, ifNotEmpty=False):
- if not defMode:
- raise error.PyAsn1Error('DER forbids indefinite length mode')
- return encoder.Encoder.__call__(self, value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty)
+ def __call__(self, value, **options):
+ if 'defMode' not in options:
+ options.update(defMode=True)
+ return encoder.Encoder.__call__(self, value, **options)
#: Turns ASN.1 object into DER octet stream.
#:
diff --git a/pyasn1/codec/native/decoder.py b/pyasn1/codec/native/decoder.py
index 4c7df95..ba9811b 100644
--- a/pyasn1/codec/native/decoder.py
+++ b/pyasn1/codec/native/decoder.py
@@ -11,47 +11,47 @@ __all__ = ['decode']
class AbstractScalarDecoder(object):
- def __call__(self, pyObject, asn1Spec, decoderFunc=None):
+ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
return asn1Spec.clone(pyObject)
class BitStringDecoder(AbstractScalarDecoder):
- def __call__(self, pyObject, asn1Spec, decoderFunc=None):
+ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
return asn1Spec.clone(univ.BitString.fromBinaryString(pyObject))
class SequenceOrSetDecoder(object):
- def __call__(self, pyObject, asn1Spec, decoderFunc):
+ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
asn1Value = asn1Spec.clone()
componentsTypes = asn1Spec.componentType
for field in asn1Value:
if field in pyObject:
- asn1Value[field] = decoderFunc(pyObject[field], componentsTypes[field].asn1Object)
+ asn1Value[field] = decodeFun(pyObject[field], componentsTypes[field].asn1Object, **options)
return asn1Value
class SequenceOfOrSetOfDecoder(object):
- def __call__(self, pyObject, asn1Spec, decoderFunc):
+ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
asn1Value = asn1Spec.clone()
for pyValue in pyObject:
- asn1Value.append(decoderFunc(pyValue, asn1Spec.componentType.asn1Object))
+ asn1Value.append(decodeFun(pyValue, asn1Spec.componentType.asn1Object), **options)
return asn1Value
class ChoiceDecoder(object):
- def __call__(self, pyObject, asn1Spec, decoderFunc):
+ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
asn1Value = asn1Spec.clone()
componentsTypes = asn1Spec.componentType
for field in pyObject:
if field in componentsTypes:
- asn1Value[field] = decoderFunc(pyObject[field], componentsTypes[field].asn1Object)
+ asn1Value[field] = decodeFun(pyObject[field], componentsTypes[field].asn1Object, **options)
break
return asn1Value
@@ -130,7 +130,7 @@ class Decoder(object):
self.__tagMap = tagMap
self.__typeMap = typeMap
- def __call__(self, pyObject, asn1Spec):
+ def __call__(self, pyObject, asn1Spec, **options):
if debug.logger & debug.flagDecoder:
logger = debug.logger
else:
@@ -144,9 +144,11 @@ class Decoder(object):
try:
valueDecoder = self.__typeMap[asn1Spec.typeId]
+
except KeyError:
# use base type for codec lookup to recover untagged types
baseTagSet = tag.TagSet(asn1Spec.tagSet.baseTag, asn1Spec.tagSet.baseTag)
+
try:
valueDecoder = self.__tagMap[baseTagSet]
except KeyError:
@@ -155,7 +157,7 @@ class Decoder(object):
if logger:
logger('calling decoder %s on Python type %s <%s>' % (type(valueDecoder).__name__, type(pyObject).__name__, repr(pyObject)))
- value = valueDecoder(pyObject, asn1Spec, self)
+ value = valueDecoder(pyObject, asn1Spec, self, **options)
if logger:
logger('decoder %s produced ASN.1 type %s <%s>' % (type(valueDecoder).__name__, type(value).__name__, repr(value)))
diff --git a/pyasn1/codec/native/encoder.py b/pyasn1/codec/native/encoder.py
index ab4b71a..78fe7c2 100644
--- a/pyasn1/codec/native/encoder.py
+++ b/pyasn1/codec/native/encoder.py
@@ -17,72 +17,76 @@ __all__ = ['encode']
class AbstractItemEncoder(object):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
raise error.PyAsn1Error('Not implemented')
class ExplicitlyTaggedItemEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
if isinstance(value, base.AbstractConstructedAsn1Item):
value = value.clone(tagSet=value.tagSet[:-1],
cloneValueFlag=1)
else:
value = value.clone(tagSet=value.tagSet[:-1])
- return encodeFun(value)
+
+ return encodeFun(value, **options)
explicitlyTaggedItemEncoder = ExplicitlyTaggedItemEncoder()
class BooleanEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return bool(value)
class IntegerEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return int(value)
class BitStringEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return str(value)
class OctetStringEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return value.asOctets()
class TextStringEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return value.prettyPrint()
class NullEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return None
class ObjectIdentifierEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return str(value)
class RealEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return float(value)
class SetEncoder(AbstractItemEncoder):
protoDict = dict
- def encode(self, encodeFun, value):
+
+ def encode(self, value, encodeFun, **options):
value.verifySizeSpec()
+
namedTypes = value.componentType
substrate = self.protoDict()
+
for idx, (key, subValue) in enumerate(value.items()):
if namedTypes and namedTypes[idx].isOptional and not value[idx].isValue:
continue
- substrate[key] = encodeFun(subValue)
+ substrate[key] = encodeFun(subValue, **options)
return substrate
@@ -91,9 +95,9 @@ class SequenceEncoder(SetEncoder):
class SequenceOfEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
value.verifySizeSpec()
- return [encodeFun(x) for x in value]
+ return [encodeFun(x, **options) for x in value]
class ChoiceEncoder(SequenceEncoder):
@@ -101,7 +105,7 @@ class ChoiceEncoder(SequenceEncoder):
class AnyEncoder(AbstractItemEncoder):
- def encode(self, encodeFun, value):
+ def encode(self, value, encodeFun, **options):
return value.asOctets()
@@ -154,7 +158,7 @@ class Encoder(object):
self.__tagMap = tagMap
self.__typeMap = typeMap
- def __call__(self, asn1Value):
+ def __call__(self, asn1Value, **options):
if not isinstance(asn1Value, base.Asn1Item):
raise error.PyAsn1Error('value is not valid (should be an instance of an ASN.1 Item)')
@@ -184,7 +188,7 @@ class Encoder(object):
if logger:
logger('using value codec %s chosen by %s' % (type(concreteEncoder).__name__, tagSet))
- pyObject = concreteEncoder.encode(self, asn1Value)
+ pyObject = concreteEncoder.encode(asn1Value, self, **options)
if logger:
logger('encoder %s produced: %s' % (type(concreteEncoder).__name__, repr(pyObject)))