summaryrefslogtreecommitdiff
path: root/pyasn1/codec/ber/encoder.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyasn1/codec/ber/encoder.py')
-rw-r--r--pyasn1/codec/ber/encoder.py190
1 files changed, 111 insertions, 79 deletions
diff --git a/pyasn1/codec/ber/encoder.py b/pyasn1/codec/ber/encoder.py
index 10527ae..2c5ba14 100644
--- a/pyasn1/codec/ber/encoder.py
+++ b/pyasn1/codec/ber/encoder.py
@@ -4,7 +4,7 @@
# Copyright (c) 2005-2017, Ilya Etingof <etingof@gmail.com>
# License: http://pyasn1.sf.net/license.html
#
-from pyasn1.type import base, tag, univ, char, useful
+from pyasn1.type import tag, univ, char, useful
from pyasn1.codec.ber import eoo
from pyasn1.compat.octets import int2oct, oct2int, ints2octs, null, str2octs
from pyasn1.compat.integer import to_bytes
@@ -23,9 +23,9 @@ class AbstractItemEncoder(object):
if isConstructed:
encodedTag |= tag.tagFormatConstructed
if tagId < 31:
- return (encodedTag | tagId,)
+ return encodedTag | tagId,
else:
- substrate = (tagId & 0x7f,)
+ substrate = tagId & 0x7f,
tagId >>= 7
while tagId:
substrate = (0x80 | (tagId & 0x7f),) + substrate
@@ -36,7 +36,7 @@ class AbstractItemEncoder(object):
if not defMode and self.supportIndefLenMode:
return (0x80,)
if length < 0x80:
- return (length,)
+ return length,
else:
substrate = ()
while length:
@@ -47,65 +47,62 @@ class AbstractItemEncoder(object):
raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen)
return (0x80 | substrateLen,) + substrate
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
raise error.PyAsn1Error('Not implemented')
- def _encodeEndOfOctets(self, encodeFun, defMode):
- if defMode or not self.supportIndefLenMode:
- return null
- else:
- return encodeFun(eoo.endOfOctets, defMode)
+ def encode(self, value, encodeFun, **options):
- def encode(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
- substrate, isConstructed, isOctets = self.encodeValue(
- encodeFun, value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty
- )
+ tagSet = value.tagSet
- if ifNotEmpty and not substrate:
+ # untagged item?
+ if not tagSet:
+ substrate, isConstructed, isOctets = self.encodeValue(
+ value, encodeFun, **options
+ )
return substrate
- tagSet = value.tagSet
+ defMode = options.get('defMode', True)
+
+ for idx, singleTag in enumerate(tagSet.superTags):
- # tagged value?
- if tagSet:
- if not isConstructed: # primitive form implies definite mode
- defMode = True
- header = self.encodeTag(tagSet[-1], isConstructed)
- header += self.encodeLength(len(substrate), defMode)
+ defModeOverride = defMode
+
+ # base tag?
+ if not idx:
+ substrate, isConstructed, isOctets = self.encodeValue(
+ value, encodeFun, **options
+ )
+
+ if options.get('ifNotEmpty', False) and not substrate:
+ return substrate
+
+ # primitive form implies definite mode
+ if not isConstructed:
+ defModeOverride = True
+
+ header = self.encodeTag(singleTag, isConstructed)
+ header += self.encodeLength(len(substrate), defModeOverride)
if isOctets:
substrate = ints2octs(header) + substrate
else:
substrate = ints2octs(header + substrate)
- eoo = self._encodeEndOfOctets(encodeFun, defMode)
- if eoo:
- substrate += eoo
+ if not defModeOverride:
+ substrate += encodeFun(eoo.endOfOctets, defMode=defModeOverride)
return substrate
class EndOfOctetsEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
return null, False, True
-class ExplicitlyTaggedItemEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
- if isinstance(value, base.AbstractConstructedAsn1Item):
- value = value.clone(tagSet=value.tagSet[:-1], cloneValueFlag=True)
- else:
- value = value.clone(tagSet=value.tagSet[:-1])
- return encodeFun(value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty), True, True
-
-
-explicitlyTaggedItemEncoder = ExplicitlyTaggedItemEncoder()
-
-
class BooleanEncoder(AbstractItemEncoder):
supportIndefLenMode = False
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
return value and (1,) or (0,), False, False
@@ -113,7 +110,7 @@ class IntegerEncoder(AbstractItemEncoder):
supportIndefLenMode = False
supportCompactZero = False
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
if value == 0:
# de-facto way to encode zero
if self.supportCompactZero:
@@ -125,39 +122,51 @@ class IntegerEncoder(AbstractItemEncoder):
class BitStringEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
valueLength = len(value)
if valueLength % 8:
alignedValue = value << (8 - valueLength % 8)
else:
alignedValue = value
+ maxChunkSize = options.get('maxChunkSize', 0)
if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8:
substrate = alignedValue.asOctets()
return int2oct(len(substrate) * 8 - valueLength) + substrate, False, True
+ # strip off explicit tags
+ alignedValue = alignedValue.clone(
+ tagSet=tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag)
+ )
+
stop = 0
substrate = null
while stop < valueLength:
start = stop
stop = min(start + maxChunkSize * 8, valueLength)
- substrate += encodeFun(alignedValue[start:stop], defMode, maxChunkSize, ifNotEmpty=ifNotEmpty)
+ substrate += encodeFun(alignedValue[start:stop], **options)
return substrate, True, True
class OctetStringEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
+ maxChunkSize = options.get('maxChunkSize', 0)
if not maxChunkSize or len(value) <= maxChunkSize:
return value.asOctets(), False, True
+
else:
+ # will strip off explicit tags
+ baseTagSet = tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag)
+
pos = 0
substrate = null
while True:
- v = value.clone(value[pos:pos + maxChunkSize])
- if not v:
+ chunk = value.clone(value[pos:pos + maxChunkSize],
+ tagSet=baseTagSet)
+ if not chunk:
break
- substrate += encodeFun(v, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty)
+ substrate += encodeFun(chunk, **options)
pos += maxChunkSize
return substrate, True, True
@@ -166,14 +175,14 @@ class OctetStringEncoder(AbstractItemEncoder):
class NullEncoder(AbstractItemEncoder):
supportIndefLenMode = False
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
return null, False, True
class ObjectIdentifierEncoder(AbstractItemEncoder):
supportIndefLenMode = False
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
oid = value.asTuple()
# Build the first pair
@@ -271,7 +280,7 @@ class RealEncoder(AbstractItemEncoder):
encbase = encBase[i]
return sign, m, encbase, e
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
if value.isPlusInf:
return (0x40,), False, False
if value.isMinusInf:
@@ -342,41 +351,54 @@ class RealEncoder(AbstractItemEncoder):
class SequenceEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
value.verifySizeSpec()
+
namedTypes = value.componentType
substrate = null
+
idx = len(value)
while idx > 0:
idx -= 1
if namedTypes:
- if namedTypes[idx].isOptional and not value[idx].isValue:
+ namedType = namedTypes[idx]
+ if namedType.isOptional and not value[idx].isValue:
continue
- if namedTypes[idx].isDefaulted and value[idx] == namedTypes[idx].asn1Object:
+ if namedType.isDefaulted and value[idx] == namedType.asn1Object:
continue
- substrate = encodeFun(value[idx], defMode, maxChunkSize) + substrate
+
+ chunk = encodeFun(value[idx], **options)
+
+ # wrap open type blob if needed
+ if namedTypes and namedType.openType:
+ asn1Spec = namedType.asn1Object
+ if asn1Spec.tagSet and not asn1Spec.isSameTypeWith(value[idx]):
+ chunk = encodeFun(asn1Spec.clone(chunk), **options)
+
+ substrate = chunk + substrate
+
return substrate, True, True
class SequenceOfEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
+ def encodeValue(self, value, encodeFun, **options):
value.verifySizeSpec()
substrate = null
idx = len(value)
while idx > 0:
idx -= 1
- substrate = encodeFun(value[idx], defMode, maxChunkSize, ifNotEmpty=False) + substrate
+ substrate = encodeFun(value[idx], **options) + substrate
return substrate, True, True
class ChoiceEncoder(AbstractItemEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
- return encodeFun(value.getComponent(), defMode, maxChunkSize, ifNotEmpty=False), True, True
+ def encodeValue(self, value, encodeFun, **options):
+ return encodeFun(value.getComponent(), **options), True, True
class AnyEncoder(OctetStringEncoder):
- def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False):
- return value.asOctets(), defMode == False, True
+ def encodeValue(self, value, encodeFun, **options):
+ return value.asOctets(), not options.get('defMode', True), True
tagMap = {
@@ -448,40 +470,50 @@ typeMap = {
class Encoder(object):
- supportIndefLength = True
+ fixedDefLengthMode = None
+ fixedChunkSize = None
# noinspection PyDefaultArgument
def __init__(self, tagMap, typeMap={}):
self.__tagMap = tagMap
self.__typeMap = typeMap
- def __call__(self, value, defMode=True, maxChunkSize=0, ifNotEmpty=False):
- if not defMode and not self.supportIndefLength:
- raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
+ def __call__(self, value, **options):
+
if debug.logger & debug.flagEncoder:
logger = debug.logger
else:
logger = None
+
if logger:
- logger('encoder called in %sdef mode, chunk size %s for type %s, value:\n%s' % (not defMode and 'in' or '', maxChunkSize, value.prettyPrintType(), value.prettyPrint()))
+ logger('encoder called in %sdef mode, chunk size %s for type %s, value:\n%s' % (not options.get('defMode', True) and 'in' or '', options.get('maxChunkSize', 0), value.prettyPrintType(), value.prettyPrint()))
+
+ if self.fixedDefLengthMode is not None:
+ options.update(defMode=self.fixedDefLengthMode)
+
+ if self.fixedChunkSize is not None:
+ options.update(maxChunkSize=self.fixedChunkSize)
+
tagSet = value.tagSet
- if len(tagSet) > 1:
- concreteEncoder = explicitlyTaggedItemEncoder
- else:
+
+ try:
+ concreteEncoder = self.__typeMap[value.typeId]
+
+ except KeyError:
+ # use base type for codec lookup to recover untagged types
+ baseTagSet = tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag)
+
try:
- concreteEncoder = self.__typeMap[value.typeId]
+ concreteEncoder = self.__tagMap[baseTagSet]
+
except KeyError:
- # use base type for codec lookup to recover untagged types
- baseTagSet = tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag)
- try:
- concreteEncoder = self.__tagMap[baseTagSet]
- except KeyError:
- raise error.PyAsn1Error('No encoder for %s' % (value,))
- if logger:
- logger('using value codec %s chosen by %s' % (concreteEncoder.__class__.__name__, tagSet))
- substrate = concreteEncoder.encode(
- self, value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty
- )
+ raise error.PyAsn1Error('No encoder for %s' % (value,))
+
+ if logger:
+ logger('using value codec %s chosen by %s' % (concreteEncoder.__class__.__name__, tagSet))
+
+ substrate = concreteEncoder.encode(value, self, **options)
+
if logger:
logger('codec %s built %s octets of substrate: %s\nencoder completed' % (concreteEncoder, len(substrate), debug.hexdump(substrate)))
return substrate