summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Etingof <etingof@gmail.com>2019-07-06 14:04:53 +0200
committerGitHub <noreply@github.com>2019-07-06 14:04:53 +0200
commitb5e2eebe53736eb96f3baf5c17ae953261e09d6c (patch)
treedac2f96248bee477ef0a25a67fb34090bd1eca65
parentba302699d8fc791760829aa9a6b014563eedbf2c (diff)
downloadpyasn1-git-b5e2eebe53736eb96f3baf5c17ae953261e09d6c.tar.gz
Add `SET|SEQUENCE OF ANY` encoding support (#165)
For example: AttributeTypeAndValues ::= SEQUENCE { type OBJECT IDENTIFIER, values SET OF ANY DEFINED BY type } This patch adds support of the above ASN.1 syntax to BER/DER/CER codecs. It appears that to implement this feature properly, `SetOf`/`SequenceOf` pyasn1 types need to have `.componentType` wrapped into something similar to `NamedType` that `Set`/`Sequence` have. That additional layer would then carry the open type meta information. Without it, `Sequence`/`Set` codec needs to signal `SetOf`/`SequenceOf` codec of the open type being processed, which is a slight hack. A other inconvenience is that when `SetOf`/`SequenceOf` deal with an open type component, they should not verify types on component assignment. Without open type property in `SetOf`/`SequenceOf`, the code checks for `Any` component type which is another hack. The above shortcomings should be addressed in the follow up patch.
-rw-r--r--CHANGES.rst1
-rw-r--r--pyasn1/codec/ber/decoder.py56
-rw-r--r--pyasn1/codec/ber/encoder.py88
-rw-r--r--pyasn1/type/univ.py18
-rw-r--r--tests/codec/ber/test_decoder.py169
-rw-r--r--tests/codec/ber/test_encoder.py124
6 files changed, 410 insertions, 46 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index 448e372..26d4978 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -2,6 +2,7 @@
Revision 0.4.6, released XX-06-2019
-----------------------------------
+- Added previously missing `SET OF ANY` construct encoding/decoding support.
- Added `omitEmptyOptionals` option which is respected by `Sequence`
and `Set` encoders. When `omitEmptyOptionals` is set to `True`, empty
initialized optional components are not encoded. Default is `False`.
diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py
index 8c556fc..655af04 100644
--- a/pyasn1/codec/ber/decoder.py
+++ b/pyasn1/codec/ber/decoder.py
@@ -671,12 +671,28 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
LOG('resolved open type %r by governing '
'value %r' % (openType, governingValue))
- component, rest = decodeFun(
- asn1Object.getComponentByPosition(idx).asOctets(),
- asn1Spec=openType
- )
+ containerValue = asn1Object.getComponentByPosition(idx)
+
+ if containerValue.typeId in (
+ univ.SetOf.typeId, univ.SequenceOf.typeId):
+
+ for pos, containerElement in enumerate(
+ containerValue):
+
+ component, rest = decodeFun(
+ containerValue[pos].asOctets(),
+ asn1Spec=openType
+ )
- asn1Object.setComponentByPosition(idx, component)
+ containerValue[pos] = component
+
+ else:
+ component, rest = decodeFun(
+ asn1Object.getComponentByPosition(idx).asOctets(),
+ asn1Spec=openType
+ )
+
+ asn1Object.setComponentByPosition(idx, component)
else:
asn1Object.verifySizeSpec()
@@ -799,7 +815,7 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
if not namedTypes.requiredComponents.issubset(seenIndices):
raise error.PyAsn1Error('ASN.1 object %s has uninitialized components' % asn1Object.__class__.__name__)
- if namedTypes.hasOpenTypes:
+ if namedTypes.hasOpenTypes:
openTypes = options.get('openTypes', {})
@@ -837,13 +853,29 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
LOG('resolved open type %r by governing '
'value %r' % (openType, governingValue))
- component, rest = decodeFun(
- asn1Object.getComponentByPosition(idx).asOctets(),
- asn1Spec=openType, allowEoo=True
- )
+ containerValue = asn1Object.getComponentByPosition(idx)
- if component is not eoo.endOfOctets:
- asn1Object.setComponentByPosition(idx, component)
+ if containerValue.typeId in (
+ univ.SetOf.typeId, univ.SequenceOf.typeId):
+
+ for pos, containerElement in enumerate(
+ containerValue):
+
+ component, rest = decodeFun(
+ containerValue[pos].asOctets(),
+ asn1Spec=openType, allowEoo=True
+ )
+
+ containerValue[pos] = component
+
+ else:
+ component, rest = decodeFun(
+ asn1Object.getComponentByPosition(idx).asOctets(),
+ asn1Spec=openType, allowEoo=True
+ )
+
+ if component is not eoo.endOfOctets:
+ asn1Object.setComponentByPosition(idx, component)
else:
asn1Object.verifySizeSpec()
diff --git a/pyasn1/codec/ber/encoder.py b/pyasn1/codec/ber/encoder.py
index 325ed46..b17fca8 100644
--- a/pyasn1/codec/ber/encoder.py
+++ b/pyasn1/codec/ber/encoder.py
@@ -89,6 +89,8 @@ class AbstractItemEncoder(object):
defMode = options.get('defMode', True)
+ substrate = null
+
for idx, singleTag in enumerate(tagSet.superTags):
defModeOverride = defMode
@@ -556,18 +558,32 @@ class SequenceEncoder(AbstractItemEncoder):
if omitEmptyOptionals:
options.update(ifNotEmpty=namedType.isOptional)
- chunk = encodeFun(component, asn1Spec, **options)
-
# wrap open type blob if needed
- if namedTypes and namedType.openType:
- wrapType = namedType.asn1Object
- if wrapType.tagSet and not wrapType.isSameTypeWith(component):
- chunk = encodeFun(chunk, wrapType, **options)
+ if (namedTypes and namedType.openType
+ and namedType.asn1Object.tagSet):
- if LOG:
- LOG('wrapped open type with wrap type %r' % (wrapType,))
+ if component.typeId in (
+ univ.SetOf.typeId, univ.SequenceOf.typeId):
+ substrate += encodeFun(
+ component, asn1Spec,
+ **dict(options, openType=True))
+
+ else:
+ chunk = encodeFun(component, asn1Spec, **options)
+
+ wrapType = namedType.asn1Object
- substrate += chunk
+ if wrapType.isSameTypeWith(component):
+ substrate += chunk
+
+ else:
+ substrate += encodeFun(chunk, wrapType, **options)
+
+ if LOG:
+ LOG('wrapped with wrap type %r' % (wrapType,))
+
+ else:
+ substrate += encodeFun(component, asn1Spec, **options)
else:
# bare Python value + ASN.1 schema
@@ -593,18 +609,31 @@ class SequenceEncoder(AbstractItemEncoder):
if omitEmptyOptionals:
options.update(ifNotEmpty=namedType.isOptional)
- chunk = encodeFun(component, asn1Spec[idx], **options)
-
# wrap open type blob if needed
- if namedType.openType:
- wrapType = namedType.asn1Object
- if wrapType.tagSet and not wrapType.isSameTypeWith(component):
- chunk = encodeFun(chunk, wrapType, **options)
+ if namedType.openType and namedType.asn1Object.tagSet:
- if LOG:
- LOG('wrapped open type with wrap type %r' % (wrapType,))
+ if component.typeId in (
+ univ.SetOf.typeId, univ.SequenceOf.typeId):
+ substrate += encodeFun(
+ component, asn1Spec[idx],
+ **dict(options, openType=True))
+
+ else:
+ chunk = encodeFun(component, asn1Spec[idx], **options)
+
+ wrapType = namedType.asn1Object
- substrate += chunk
+ if wrapType.isSameTypeWith(component):
+ substrate += chunk
+
+ else:
+ substrate += encodeFun(chunk, wrapType, **options)
+
+ if LOG:
+ LOG('wrapped with wrap type %r' % (wrapType,))
+
+ else:
+ substrate += encodeFun(component, asn1Spec[idx], **options)
return substrate, True, True
@@ -613,13 +642,32 @@ class SequenceOfEncoder(AbstractItemEncoder):
def encodeValue(self, value, asn1Spec, encodeFun, **options):
if asn1Spec is None:
value.verifySizeSpec()
+
+ wrapType = value.componentType
+
else:
- asn1Spec = asn1Spec.componentType
+ asn1Spec = wrapType = asn1Spec.componentType
+
+ openType = options.pop('openType', False)
substrate = null
for idx, component in enumerate(value):
- substrate += encodeFun(value[idx], asn1Spec, **options)
+ if openType:
+ # do not use asn1Spec even if given because it's a wrapper
+ chunk = encodeFun(component, **options)
+
+ if not wrapType.isSameTypeWith(component):
+ # wrap encoded value with wrapper container (e.g. ANY)
+ chunk = encodeFun(chunk, wrapType, **options)
+
+ if LOG:
+ LOG('wrapped with wrap type %r' % (wrapType,))
+
+ else:
+ chunk = encodeFun(component, asn1Spec, **options)
+
+ substrate += chunk
return substrate, True, True
diff --git a/pyasn1/type/univ.py b/pyasn1/type/univ.py
index 96623e7..5634f94 100644
--- a/pyasn1/type/univ.py
+++ b/pyasn1/type/univ.py
@@ -1854,13 +1854,19 @@ class SequenceOfAndSetOfBase(base.AbstractConstructedAsn1Item):
'Non-ASN.1 value %r and undefined component'
' type at %r' % (value, self))
- elif componentType is not None:
- if self.strictConstraints:
- if not componentType.isSameTypeWith(
- value, matchTags, matchConstraints):
+ elif componentType is not None and (matchTags or matchConstraints):
+ subtypeChecker = (
+ self.strictConstraints and
+ componentType.isSameTypeWith or
+ componentType.isSuperTypeOf)
+
+ if not subtypeChecker(value, matchTags, matchConstraints):
+ # TODO: we should wrap componentType with UnnamedType to carry
+ # additional properties associated with componentType
+ if componentType.typeId != Any.typeId:
raise error.PyAsn1Error(
- 'Component value is tag-incompatible: %r '
- 'vs %r' % (value, componentType))
+ 'Component value is tag-incompatible: %r vs '
+ '%r' % (value, componentType))
else:
if not componentType.isSuperTypeOf(
diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py
index bff246e..95f6c91 100644
--- a/tests/codec/ber/test_decoder.py
+++ b/tests/codec/ber/test_decoder.py
@@ -972,6 +972,159 @@ class SequenceDecoderWithExplicitlyTaggedOpenTypesTestCase(BaseTestCase):
assert s[1] == univ.OctetString(hexValue='02010C')
+class SequenceDecoderWithUnaggedSetOfOpenTypesTestCase(BaseTestCase):
+ def setUp(self):
+ openType = opentype.OpenType(
+ 'id',
+ {1: univ.Integer(),
+ 2: univ.OctetString()}
+ )
+ self.s = univ.Sequence(
+ componentType=namedtype.NamedTypes(
+ namedtype.NamedType('id', univ.Integer()),
+ namedtype.NamedType('blob', univ.SetOf(componentType=univ.Any()),
+ openType=openType)
+ )
+ )
+
+ def testDecodeOpenTypesChoiceOne(self):
+ s, r = decoder.decode(
+ ints2octs((48, 8, 2, 1, 1, 49, 3, 2, 1, 12)), asn1Spec=self.s,
+ decodeOpenTypes=True
+ )
+ assert not r
+ assert s[0] == 1
+ assert s[1][0] == 12
+
+ def testDecodeOpenTypesChoiceTwo(self):
+ s, r = decoder.decode(
+ ints2octs((48, 18, 2, 1, 2, 49, 13, 4, 11, 113, 117, 105, 99,
+ 107, 32, 98, 114, 111, 119, 110)), asn1Spec=self.s,
+ decodeOpenTypes=True
+ )
+ assert not r
+ assert s[0] == 2
+ assert s[1][0] == univ.OctetString('quick brown')
+
+ def testDecodeOpenTypesUnknownType(self):
+ try:
+ s, r = decoder.decode(
+ ints2octs((48, 6, 2, 1, 2, 6, 1, 39)), asn1Spec=self.s,
+ decodeOpenTypes=True
+ )
+
+ except PyAsn1Error:
+ pass
+
+ else:
+ assert False, 'unknown open type tolerated'
+
+ def testDecodeOpenTypesUnknownId(self):
+ s, r = decoder.decode(
+ ints2octs((48, 8, 2, 1, 3, 49, 3, 2, 1, 12)), asn1Spec=self.s,
+ decodeOpenTypes=True
+ )
+ assert not r
+ assert s[0] == 3
+ assert s[1][0] == univ.OctetString(hexValue='02010c')
+
+ def testDontDecodeOpenTypesChoiceOne(self):
+ s, r = decoder.decode(
+ ints2octs((48, 8, 2, 1, 1, 49, 3, 2, 1, 12)), asn1Spec=self.s
+ )
+ assert not r
+ assert s[0] == 1
+ assert s[1][0] == ints2octs((2, 1, 12))
+
+ def testDontDecodeOpenTypesChoiceTwo(self):
+ s, r = decoder.decode(
+ ints2octs((48, 18, 2, 1, 2, 49, 13, 4, 11, 113, 117, 105, 99,
+ 107, 32, 98, 114, 111, 119, 110)), asn1Spec=self.s
+ )
+ assert not r
+ assert s[0] == 2
+ assert s[1][0] == ints2octs((4, 11, 113, 117, 105, 99, 107, 32, 98, 114,
+ 111, 119, 110))
+
+
+class SequenceDecoderWithImplicitlyTaggedSetOfOpenTypesTestCase(BaseTestCase):
+ def setUp(self):
+ openType = opentype.OpenType(
+ 'id',
+ {1: univ.Integer(),
+ 2: univ.OctetString()}
+ )
+ self.s = univ.Sequence(
+ componentType=namedtype.NamedTypes(
+ namedtype.NamedType('id', univ.Integer()),
+ namedtype.NamedType(
+ 'blob', univ.SetOf(
+ componentType=univ.Any().subtype(
+ implicitTag=tag.Tag(
+ tag.tagClassContext, tag.tagFormatSimple, 3))),
+ openType=openType
+ )
+ )
+ )
+
+ def testDecodeOpenTypesChoiceOne(self):
+ s, r = decoder.decode(
+ ints2octs((48, 10, 2, 1, 1, 49, 5, 131, 3, 2, 1, 12)),
+ asn1Spec=self.s, decodeOpenTypes=True
+ )
+ assert not r
+ assert s[0] == 1
+ assert s[1][0] == 12
+
+ def testDecodeOpenTypesUnknownId(self):
+ s, r = decoder.decode(
+ ints2octs((48, 10, 2, 1, 3, 49, 5, 131, 3, 2, 1, 12)),
+ asn1Spec=self.s, decodeOpenTypes=True
+ )
+ assert not r
+ assert s[0] == 3
+ assert s[1][0] == univ.OctetString(hexValue='02010C')
+
+
+class SequenceDecoderWithExplicitlyTaggedSetOfOpenTypesTestCase(BaseTestCase):
+ def setUp(self):
+ openType = opentype.OpenType(
+ 'id',
+ {1: univ.Integer(),
+ 2: univ.OctetString()}
+ )
+ self.s = univ.Sequence(
+ componentType=namedtype.NamedTypes(
+ namedtype.NamedType('id', univ.Integer()),
+ namedtype.NamedType(
+ 'blob', univ.SetOf(
+ componentType=univ.Any().subtype(
+ explicitTag=tag.Tag(
+ tag.tagClassContext, tag.tagFormatSimple, 3))),
+ openType=openType
+ )
+ )
+ )
+
+ def testDecodeOpenTypesChoiceOne(self):
+ s, r = decoder.decode(
+ ints2octs((48, 10, 2, 1, 1, 49, 5, 131, 3, 2, 1, 12)),
+ asn1Spec=self.s, decodeOpenTypes=True
+ )
+ assert not r
+ assert s[0] == 1
+ assert s[1][0] == 12
+
+ def testDecodeOpenTypesUnknownId(self):
+ s, r = decoder.decode(
+ ints2octs( (48, 10, 2, 1, 3, 49, 5, 131, 3, 2, 1, 12)),
+ asn1Spec=self.s, decodeOpenTypes=True
+ )
+ assert not r
+ assert s[0] == 3
+ assert s[1][0] == univ.OctetString(hexValue='02010C')
+
+
class SetDecoderTestCase(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
@@ -1439,8 +1592,8 @@ class ErrorOnDecodingTestCase(BaseTestCase):
except PyAsn1Error:
exc = sys.exc_info()[1]
- assert (isinstance(exc, PyAsn1Error),
- 'Unexpected exception raised %r' % (exc,))
+ assert isinstance(exc, PyAsn1Error), (
+ 'Unexpected exception raised %r' % (exc,))
else:
assert False, 'Unexpected decoder result %r' % (asn1Object,)
@@ -1453,12 +1606,12 @@ class ErrorOnDecodingTestCase(BaseTestCase):
asn1Object, rest = decode(ints2octs(
(31, 8, 2, 1, 1, 131, 3, 2, 1, 12)))
- assert (isinstance(asn1Object, univ.Any),
- 'Unexpected raw dump type %r' % (asn1Object,))
- assert (asn1Object.asNumbers() == (31, 8, 2, 1, 1),
- 'Unexpected raw dump value %r' % (asn1Object,))
- assert (rest == ints2octs((131, 3, 2, 1, 12)),
- 'Unexpected rest of substrate after raw dump %r' % rest)
+ assert isinstance(asn1Object, univ.Any), (
+ 'Unexpected raw dump type %r' % (asn1Object,))
+ assert asn1Object.asNumbers() == (31, 8, 2, 1, 1), (
+ 'Unexpected raw dump value %r' % (asn1Object,))
+ assert rest == ints2octs((131, 3, 2, 1, 12)), (
+ 'Unexpected rest of substrate after raw dump %r' % rest)
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
diff --git a/tests/codec/ber/test_encoder.py b/tests/codec/ber/test_encoder.py
index da14830..26236af 100644
--- a/tests/codec/ber/test_encoder.py
+++ b/tests/codec/ber/test_encoder.py
@@ -854,6 +854,130 @@ class SequenceEncoderWithExplicitlyTaggedOpenTypesTestCase(BaseTestCase):
)
+class SequenceEncoderWithUntaggedSetOfOpenTypesTestCase(BaseTestCase):
+ def setUp(self):
+ BaseTestCase.setUp(self)
+
+ openType = opentype.OpenType(
+ 'id',
+ {1: univ.Integer(),
+ 2: univ.OctetString()}
+ )
+ self.s = univ.Sequence(
+ componentType=namedtype.NamedTypes(
+ namedtype.NamedType('id', univ.Integer()),
+ namedtype.NamedType('blob', univ.SetOf(
+ componentType=univ.Any()), openType=openType)
+ )
+ )
+
+ def testEncodeOpenTypeChoiceOne(self):
+ self.s.clear()
+
+ self.s[0] = 1
+ self.s[1].append(univ.Integer(12))
+
+ assert encoder.encode(self.s, asn1Spec=self.s) == ints2octs(
+ (48, 8, 2, 1, 1, 49, 3, 2, 1, 12)
+ )
+
+ def testEncodeOpenTypeChoiceTwo(self):
+ self.s.clear()
+
+ self.s[0] = 2
+ self.s[1].append(univ.OctetString('quick brown'))
+
+ assert encoder.encode(self.s, asn1Spec=self.s) == ints2octs(
+ (48, 18, 2, 1, 2, 49, 13, 4, 11, 113, 117, 105, 99,
+ 107, 32, 98, 114, 111, 119, 110)
+ )
+
+ def testEncodeOpenTypeUnknownId(self):
+ self.s.clear()
+
+ self.s[0] = 2
+ self.s[1].append(univ.ObjectIdentifier('1.3.6'))
+
+ try:
+ encoder.encode(self.s, asn1Spec=self.s)
+
+ except PyAsn1Error:
+ assert False, 'incompatible open type tolerated'
+
+ def testEncodeOpenTypeIncompatibleType(self):
+ self.s.clear()
+
+ self.s[0] = 2
+ self.s[1].append(univ.ObjectIdentifier('1.3.6'))
+
+ try:
+ encoder.encode(self.s, asn1Spec=self.s)
+
+ except PyAsn1Error:
+ assert False, 'incompatible open type tolerated'
+
+
+class SequenceEncoderWithImplicitlyTaggedSetOfOpenTypesTestCase(BaseTestCase):
+ def setUp(self):
+ BaseTestCase.setUp(self)
+
+ openType = opentype.OpenType(
+ 'id',
+ {1: univ.Integer(),
+ 2: univ.OctetString()}
+ )
+ self.s = univ.Sequence(
+ componentType=namedtype.NamedTypes(
+ namedtype.NamedType('id', univ.Integer()),
+ namedtype.NamedType('blob', univ.SetOf(
+ componentType=univ.Any().subtype(
+ implicitTag=tag.Tag(
+ tag.tagClassContext, tag.tagFormatSimple, 3))),
+ openType=openType)
+ )
+ )
+
+ def testEncodeOpenTypeChoiceOne(self):
+ self.s.clear()
+
+ self.s[0] = 1
+ self.s[1].append(univ.Integer(12))
+
+ assert encoder.encode(self.s, asn1Spec=self.s) == ints2octs(
+ (48, 10, 2, 1, 1, 49, 5, 131, 3, 2, 1, 12)
+ )
+
+
+class SequenceEncoderWithExplicitlyTaggedSetOfOpenTypesTestCase(BaseTestCase):
+ def setUp(self):
+ BaseTestCase.setUp(self)
+
+ openType = opentype.OpenType(
+ 'id',
+ {1: univ.Integer(),
+ 2: univ.OctetString()}
+ )
+ self.s = univ.Sequence(
+ componentType=namedtype.NamedTypes(
+ namedtype.NamedType('id', univ.Integer()),
+ namedtype.NamedType('blob', univ.SetOf(
+ componentType=univ.Any().subtype(
+ explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))),
+ openType=openType)
+ )
+ )
+
+ def testEncodeOpenTypeChoiceOne(self):
+ self.s.clear()
+
+ self.s[0] = 1
+ self.s[1].append(univ.Integer(12))
+
+ assert encoder.encode(self.s, asn1Spec=self.s) == ints2octs(
+ (48, 10, 2, 1, 1, 49, 5, 163, 3, 2, 1, 12)
+ )
+
+
class SequenceEncoderWithComponentsSchemaTestCase(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)