diff options
Diffstat (limited to 'pyasn1/codec/ber/encoder.py')
-rw-r--r-- | pyasn1/codec/ber/encoder.py | 190 |
1 files changed, 111 insertions, 79 deletions
diff --git a/pyasn1/codec/ber/encoder.py b/pyasn1/codec/ber/encoder.py index 10527ae..2c5ba14 100644 --- a/pyasn1/codec/ber/encoder.py +++ b/pyasn1/codec/ber/encoder.py @@ -4,7 +4,7 @@ # Copyright (c) 2005-2017, Ilya Etingof <etingof@gmail.com> # License: http://pyasn1.sf.net/license.html # -from pyasn1.type import base, tag, univ, char, useful +from pyasn1.type import tag, univ, char, useful from pyasn1.codec.ber import eoo from pyasn1.compat.octets import int2oct, oct2int, ints2octs, null, str2octs from pyasn1.compat.integer import to_bytes @@ -23,9 +23,9 @@ class AbstractItemEncoder(object): if isConstructed: encodedTag |= tag.tagFormatConstructed if tagId < 31: - return (encodedTag | tagId,) + return encodedTag | tagId, else: - substrate = (tagId & 0x7f,) + substrate = tagId & 0x7f, tagId >>= 7 while tagId: substrate = (0x80 | (tagId & 0x7f),) + substrate @@ -36,7 +36,7 @@ class AbstractItemEncoder(object): if not defMode and self.supportIndefLenMode: return (0x80,) if length < 0x80: - return (length,) + return length, else: substrate = () while length: @@ -47,65 +47,62 @@ class AbstractItemEncoder(object): raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen) return (0x80 | substrateLen,) + substrate - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): raise error.PyAsn1Error('Not implemented') - def _encodeEndOfOctets(self, encodeFun, defMode): - if defMode or not self.supportIndefLenMode: - return null - else: - return encodeFun(eoo.endOfOctets, defMode) + def encode(self, value, encodeFun, **options): - def encode(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): - substrate, isConstructed, isOctets = self.encodeValue( - encodeFun, value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty - ) + tagSet = value.tagSet - if ifNotEmpty and not substrate: + # untagged item? + if not tagSet: + substrate, isConstructed, isOctets = self.encodeValue( + value, encodeFun, **options + ) return substrate - tagSet = value.tagSet + defMode = options.get('defMode', True) + + for idx, singleTag in enumerate(tagSet.superTags): - # tagged value? - if tagSet: - if not isConstructed: # primitive form implies definite mode - defMode = True - header = self.encodeTag(tagSet[-1], isConstructed) - header += self.encodeLength(len(substrate), defMode) + defModeOverride = defMode + + # base tag? + if not idx: + substrate, isConstructed, isOctets = self.encodeValue( + value, encodeFun, **options + ) + + if options.get('ifNotEmpty', False) and not substrate: + return substrate + + # primitive form implies definite mode + if not isConstructed: + defModeOverride = True + + header = self.encodeTag(singleTag, isConstructed) + header += self.encodeLength(len(substrate), defModeOverride) if isOctets: substrate = ints2octs(header) + substrate else: substrate = ints2octs(header + substrate) - eoo = self._encodeEndOfOctets(encodeFun, defMode) - if eoo: - substrate += eoo + if not defModeOverride: + substrate += encodeFun(eoo.endOfOctets, defMode=defModeOverride) return substrate class EndOfOctetsEncoder(AbstractItemEncoder): - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): return null, False, True -class ExplicitlyTaggedItemEncoder(AbstractItemEncoder): - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): - if isinstance(value, base.AbstractConstructedAsn1Item): - value = value.clone(tagSet=value.tagSet[:-1], cloneValueFlag=True) - else: - value = value.clone(tagSet=value.tagSet[:-1]) - return encodeFun(value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty), True, True - - -explicitlyTaggedItemEncoder = ExplicitlyTaggedItemEncoder() - - class BooleanEncoder(AbstractItemEncoder): supportIndefLenMode = False - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): return value and (1,) or (0,), False, False @@ -113,7 +110,7 @@ class IntegerEncoder(AbstractItemEncoder): supportIndefLenMode = False supportCompactZero = False - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): if value == 0: # de-facto way to encode zero if self.supportCompactZero: @@ -125,39 +122,51 @@ class IntegerEncoder(AbstractItemEncoder): class BitStringEncoder(AbstractItemEncoder): - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): valueLength = len(value) if valueLength % 8: alignedValue = value << (8 - valueLength % 8) else: alignedValue = value + maxChunkSize = options.get('maxChunkSize', 0) if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8: substrate = alignedValue.asOctets() return int2oct(len(substrate) * 8 - valueLength) + substrate, False, True + # strip off explicit tags + alignedValue = alignedValue.clone( + tagSet=tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag) + ) + stop = 0 substrate = null while stop < valueLength: start = stop stop = min(start + maxChunkSize * 8, valueLength) - substrate += encodeFun(alignedValue[start:stop], defMode, maxChunkSize, ifNotEmpty=ifNotEmpty) + substrate += encodeFun(alignedValue[start:stop], **options) return substrate, True, True class OctetStringEncoder(AbstractItemEncoder): - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): + maxChunkSize = options.get('maxChunkSize', 0) if not maxChunkSize or len(value) <= maxChunkSize: return value.asOctets(), False, True + else: + # will strip off explicit tags + baseTagSet = tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag) + pos = 0 substrate = null while True: - v = value.clone(value[pos:pos + maxChunkSize]) - if not v: + chunk = value.clone(value[pos:pos + maxChunkSize], + tagSet=baseTagSet) + if not chunk: break - substrate += encodeFun(v, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty) + substrate += encodeFun(chunk, **options) pos += maxChunkSize return substrate, True, True @@ -166,14 +175,14 @@ class OctetStringEncoder(AbstractItemEncoder): class NullEncoder(AbstractItemEncoder): supportIndefLenMode = False - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): return null, False, True class ObjectIdentifierEncoder(AbstractItemEncoder): supportIndefLenMode = False - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): oid = value.asTuple() # Build the first pair @@ -271,7 +280,7 @@ class RealEncoder(AbstractItemEncoder): encbase = encBase[i] return sign, m, encbase, e - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): if value.isPlusInf: return (0x40,), False, False if value.isMinusInf: @@ -342,41 +351,54 @@ class RealEncoder(AbstractItemEncoder): class SequenceEncoder(AbstractItemEncoder): - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): value.verifySizeSpec() + namedTypes = value.componentType substrate = null + idx = len(value) while idx > 0: idx -= 1 if namedTypes: - if namedTypes[idx].isOptional and not value[idx].isValue: + namedType = namedTypes[idx] + if namedType.isOptional and not value[idx].isValue: continue - if namedTypes[idx].isDefaulted and value[idx] == namedTypes[idx].asn1Object: + if namedType.isDefaulted and value[idx] == namedType.asn1Object: continue - substrate = encodeFun(value[idx], defMode, maxChunkSize) + substrate + + chunk = encodeFun(value[idx], **options) + + # wrap open type blob if needed + if namedTypes and namedType.openType: + asn1Spec = namedType.asn1Object + if asn1Spec.tagSet and not asn1Spec.isSameTypeWith(value[idx]): + chunk = encodeFun(asn1Spec.clone(chunk), **options) + + substrate = chunk + substrate + return substrate, True, True class SequenceOfEncoder(AbstractItemEncoder): - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): + def encodeValue(self, value, encodeFun, **options): value.verifySizeSpec() substrate = null idx = len(value) while idx > 0: idx -= 1 - substrate = encodeFun(value[idx], defMode, maxChunkSize, ifNotEmpty=False) + substrate + substrate = encodeFun(value[idx], **options) + substrate return substrate, True, True class ChoiceEncoder(AbstractItemEncoder): - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): - return encodeFun(value.getComponent(), defMode, maxChunkSize, ifNotEmpty=False), True, True + def encodeValue(self, value, encodeFun, **options): + return encodeFun(value.getComponent(), **options), True, True class AnyEncoder(OctetStringEncoder): - def encodeValue(self, encodeFun, value, defMode, maxChunkSize, ifNotEmpty=False): - return value.asOctets(), defMode == False, True + def encodeValue(self, value, encodeFun, **options): + return value.asOctets(), not options.get('defMode', True), True tagMap = { @@ -448,40 +470,50 @@ typeMap = { class Encoder(object): - supportIndefLength = True + fixedDefLengthMode = None + fixedChunkSize = None # noinspection PyDefaultArgument def __init__(self, tagMap, typeMap={}): self.__tagMap = tagMap self.__typeMap = typeMap - def __call__(self, value, defMode=True, maxChunkSize=0, ifNotEmpty=False): - if not defMode and not self.supportIndefLength: - raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') + def __call__(self, value, **options): + if debug.logger & debug.flagEncoder: logger = debug.logger else: logger = None + if logger: - logger('encoder called in %sdef mode, chunk size %s for type %s, value:\n%s' % (not defMode and 'in' or '', maxChunkSize, value.prettyPrintType(), value.prettyPrint())) + logger('encoder called in %sdef mode, chunk size %s for type %s, value:\n%s' % (not options.get('defMode', True) and 'in' or '', options.get('maxChunkSize', 0), value.prettyPrintType(), value.prettyPrint())) + + if self.fixedDefLengthMode is not None: + options.update(defMode=self.fixedDefLengthMode) + + if self.fixedChunkSize is not None: + options.update(maxChunkSize=self.fixedChunkSize) + tagSet = value.tagSet - if len(tagSet) > 1: - concreteEncoder = explicitlyTaggedItemEncoder - else: + + try: + concreteEncoder = self.__typeMap[value.typeId] + + except KeyError: + # use base type for codec lookup to recover untagged types + baseTagSet = tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag) + try: - concreteEncoder = self.__typeMap[value.typeId] + concreteEncoder = self.__tagMap[baseTagSet] + except KeyError: - # use base type for codec lookup to recover untagged types - baseTagSet = tag.TagSet(value.tagSet.baseTag, value.tagSet.baseTag) - try: - concreteEncoder = self.__tagMap[baseTagSet] - except KeyError: - raise error.PyAsn1Error('No encoder for %s' % (value,)) - if logger: - logger('using value codec %s chosen by %s' % (concreteEncoder.__class__.__name__, tagSet)) - substrate = concreteEncoder.encode( - self, value, defMode, maxChunkSize, ifNotEmpty=ifNotEmpty - ) + raise error.PyAsn1Error('No encoder for %s' % (value,)) + + if logger: + logger('using value codec %s chosen by %s' % (concreteEncoder.__class__.__name__, tagSet)) + + substrate = concreteEncoder.encode(value, self, **options) + if logger: logger('codec %s built %s octets of substrate: %s\nencoder completed' % (concreteEncoder, len(substrate), debug.hexdump(substrate))) return substrate |