From d0f45047b8e3525cfb5064192ab8f9159a085b0d Mon Sep 17 00:00:00 2001 From: Ilya Etingof Date: Fri, 5 Jul 2019 18:36:14 +0200 Subject: Add `SET|SEQUENCE OF ANY` encoding support For example: AttributeTypeAndValues ::= SEQUENCE { type OBJECT IDENTIFIER, values SET OF ANY DEFINED BY type } This patch adds support of the above ASN.1 syntax to BER/DER/CER codecs. It appears that to implement this feature properly, `SetOf`/`SequenceOf` pyasn1 types need to have `.componentType` wrapped into something similar to `NamedType` that `Set`/`Sequence` have. That additional layer would then carry the open type meta information. Without it, `Sequence`/`Set` codec needs to signal `SetOf`/`SequenceOf` codec of the open type being processed, which is a slight hack. A other inconvenience is that when `SetOf`/`SequenceOf` deal with an open type component, they should not verify types on component assignment. Without open type property in `SetOf`/`SequenceOf`, the code checks for `Any` component type which is another hack. The above shortcomings should be addressed in the follow up patch. --- CHANGES.rst | 1 + pyasn1/codec/ber/decoder.py | 56 ++++++++++--- pyasn1/codec/ber/encoder.py | 88 ++++++++++++++++----- pyasn1/type/univ.py | 18 +++-- tests/codec/ber/test_decoder.py | 169 ++++++++++++++++++++++++++++++++++++++-- tests/codec/ber/test_encoder.py | 124 +++++++++++++++++++++++++++++ 6 files changed, 410 insertions(+), 46 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 448e372..26d4978 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,7 @@ Revision 0.4.6, released XX-06-2019 ----------------------------------- +- Added previously missing `SET OF ANY` construct encoding/decoding support. - Added `omitEmptyOptionals` option which is respected by `Sequence` and `Set` encoders. When `omitEmptyOptionals` is set to `True`, empty initialized optional components are not encoded. Default is `False`. diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py index 8c556fc..655af04 100644 --- a/pyasn1/codec/ber/decoder.py +++ b/pyasn1/codec/ber/decoder.py @@ -671,12 +671,28 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder): LOG('resolved open type %r by governing ' 'value %r' % (openType, governingValue)) - component, rest = decodeFun( - asn1Object.getComponentByPosition(idx).asOctets(), - asn1Spec=openType - ) + containerValue = asn1Object.getComponentByPosition(idx) + + if containerValue.typeId in ( + univ.SetOf.typeId, univ.SequenceOf.typeId): + + for pos, containerElement in enumerate( + containerValue): + + component, rest = decodeFun( + containerValue[pos].asOctets(), + asn1Spec=openType + ) - asn1Object.setComponentByPosition(idx, component) + containerValue[pos] = component + + else: + component, rest = decodeFun( + asn1Object.getComponentByPosition(idx).asOctets(), + asn1Spec=openType + ) + + asn1Object.setComponentByPosition(idx, component) else: asn1Object.verifySizeSpec() @@ -799,7 +815,7 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder): if not namedTypes.requiredComponents.issubset(seenIndices): raise error.PyAsn1Error('ASN.1 object %s has uninitialized components' % asn1Object.__class__.__name__) - if namedTypes.hasOpenTypes: + if namedTypes.hasOpenTypes: openTypes = options.get('openTypes', {}) @@ -837,13 +853,29 @@ class UniversalConstructedTypeDecoder(AbstractConstructedDecoder): LOG('resolved open type %r by governing ' 'value %r' % (openType, governingValue)) - component, rest = decodeFun( - asn1Object.getComponentByPosition(idx).asOctets(), - asn1Spec=openType, allowEoo=True - ) + containerValue = asn1Object.getComponentByPosition(idx) - if component is not eoo.endOfOctets: - asn1Object.setComponentByPosition(idx, component) + if containerValue.typeId in ( + univ.SetOf.typeId, univ.SequenceOf.typeId): + + for pos, containerElement in enumerate( + containerValue): + + component, rest = decodeFun( + containerValue[pos].asOctets(), + asn1Spec=openType, allowEoo=True + ) + + containerValue[pos] = component + + else: + component, rest = decodeFun( + asn1Object.getComponentByPosition(idx).asOctets(), + asn1Spec=openType, allowEoo=True + ) + + if component is not eoo.endOfOctets: + asn1Object.setComponentByPosition(idx, component) else: asn1Object.verifySizeSpec() diff --git a/pyasn1/codec/ber/encoder.py b/pyasn1/codec/ber/encoder.py index 325ed46..b17fca8 100644 --- a/pyasn1/codec/ber/encoder.py +++ b/pyasn1/codec/ber/encoder.py @@ -89,6 +89,8 @@ class AbstractItemEncoder(object): defMode = options.get('defMode', True) + substrate = null + for idx, singleTag in enumerate(tagSet.superTags): defModeOverride = defMode @@ -556,18 +558,32 @@ class SequenceEncoder(AbstractItemEncoder): if omitEmptyOptionals: options.update(ifNotEmpty=namedType.isOptional) - chunk = encodeFun(component, asn1Spec, **options) - # wrap open type blob if needed - if namedTypes and namedType.openType: - wrapType = namedType.asn1Object - if wrapType.tagSet and not wrapType.isSameTypeWith(component): - chunk = encodeFun(chunk, wrapType, **options) + if (namedTypes and namedType.openType + and namedType.asn1Object.tagSet): - if LOG: - LOG('wrapped open type with wrap type %r' % (wrapType,)) + if component.typeId in ( + univ.SetOf.typeId, univ.SequenceOf.typeId): + substrate += encodeFun( + component, asn1Spec, + **dict(options, openType=True)) + + else: + chunk = encodeFun(component, asn1Spec, **options) + + wrapType = namedType.asn1Object - substrate += chunk + if wrapType.isSameTypeWith(component): + substrate += chunk + + else: + substrate += encodeFun(chunk, wrapType, **options) + + if LOG: + LOG('wrapped with wrap type %r' % (wrapType,)) + + else: + substrate += encodeFun(component, asn1Spec, **options) else: # bare Python value + ASN.1 schema @@ -593,18 +609,31 @@ class SequenceEncoder(AbstractItemEncoder): if omitEmptyOptionals: options.update(ifNotEmpty=namedType.isOptional) - chunk = encodeFun(component, asn1Spec[idx], **options) - # wrap open type blob if needed - if namedType.openType: - wrapType = namedType.asn1Object - if wrapType.tagSet and not wrapType.isSameTypeWith(component): - chunk = encodeFun(chunk, wrapType, **options) + if namedType.openType and namedType.asn1Object.tagSet: - if LOG: - LOG('wrapped open type with wrap type %r' % (wrapType,)) + if component.typeId in ( + univ.SetOf.typeId, univ.SequenceOf.typeId): + substrate += encodeFun( + component, asn1Spec[idx], + **dict(options, openType=True)) + + else: + chunk = encodeFun(component, asn1Spec[idx], **options) + + wrapType = namedType.asn1Object - substrate += chunk + if wrapType.isSameTypeWith(component): + substrate += chunk + + else: + substrate += encodeFun(chunk, wrapType, **options) + + if LOG: + LOG('wrapped with wrap type %r' % (wrapType,)) + + else: + substrate += encodeFun(component, asn1Spec[idx], **options) return substrate, True, True @@ -613,13 +642,32 @@ class SequenceOfEncoder(AbstractItemEncoder): def encodeValue(self, value, asn1Spec, encodeFun, **options): if asn1Spec is None: value.verifySizeSpec() + + wrapType = value.componentType + else: - asn1Spec = asn1Spec.componentType + asn1Spec = wrapType = asn1Spec.componentType + + openType = options.pop('openType', False) substrate = null for idx, component in enumerate(value): - substrate += encodeFun(value[idx], asn1Spec, **options) + if openType: + # do not use asn1Spec even if given because it's a wrapper + chunk = encodeFun(component, **options) + + if not wrapType.isSameTypeWith(component): + # wrap encoded value with wrapper container (e.g. ANY) + chunk = encodeFun(chunk, wrapType, **options) + + if LOG: + LOG('wrapped with wrap type %r' % (wrapType,)) + + else: + chunk = encodeFun(component, asn1Spec, **options) + + substrate += chunk return substrate, True, True diff --git a/pyasn1/type/univ.py b/pyasn1/type/univ.py index 96623e7..5634f94 100644 --- a/pyasn1/type/univ.py +++ b/pyasn1/type/univ.py @@ -1854,13 +1854,19 @@ class SequenceOfAndSetOfBase(base.AbstractConstructedAsn1Item): 'Non-ASN.1 value %r and undefined component' ' type at %r' % (value, self)) - elif componentType is not None: - if self.strictConstraints: - if not componentType.isSameTypeWith( - value, matchTags, matchConstraints): + elif componentType is not None and (matchTags or matchConstraints): + subtypeChecker = ( + self.strictConstraints and + componentType.isSameTypeWith or + componentType.isSuperTypeOf) + + if not subtypeChecker(value, matchTags, matchConstraints): + # TODO: we should wrap componentType with UnnamedType to carry + # additional properties associated with componentType + if componentType.typeId != Any.typeId: raise error.PyAsn1Error( - 'Component value is tag-incompatible: %r ' - 'vs %r' % (value, componentType)) + 'Component value is tag-incompatible: %r vs ' + '%r' % (value, componentType)) else: if not componentType.isSuperTypeOf( diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py index bff246e..95f6c91 100644 --- a/tests/codec/ber/test_decoder.py +++ b/tests/codec/ber/test_decoder.py @@ -972,6 +972,159 @@ class SequenceDecoderWithExplicitlyTaggedOpenTypesTestCase(BaseTestCase): assert s[1] == univ.OctetString(hexValue='02010C') +class SequenceDecoderWithUnaggedSetOfOpenTypesTestCase(BaseTestCase): + def setUp(self): + openType = opentype.OpenType( + 'id', + {1: univ.Integer(), + 2: univ.OctetString()} + ) + self.s = univ.Sequence( + componentType=namedtype.NamedTypes( + namedtype.NamedType('id', univ.Integer()), + namedtype.NamedType('blob', univ.SetOf(componentType=univ.Any()), + openType=openType) + ) + ) + + def testDecodeOpenTypesChoiceOne(self): + s, r = decoder.decode( + ints2octs((48, 8, 2, 1, 1, 49, 3, 2, 1, 12)), asn1Spec=self.s, + decodeOpenTypes=True + ) + assert not r + assert s[0] == 1 + assert s[1][0] == 12 + + def testDecodeOpenTypesChoiceTwo(self): + s, r = decoder.decode( + ints2octs((48, 18, 2, 1, 2, 49, 13, 4, 11, 113, 117, 105, 99, + 107, 32, 98, 114, 111, 119, 110)), asn1Spec=self.s, + decodeOpenTypes=True + ) + assert not r + assert s[0] == 2 + assert s[1][0] == univ.OctetString('quick brown') + + def testDecodeOpenTypesUnknownType(self): + try: + s, r = decoder.decode( + ints2octs((48, 6, 2, 1, 2, 6, 1, 39)), asn1Spec=self.s, + decodeOpenTypes=True + ) + + except PyAsn1Error: + pass + + else: + assert False, 'unknown open type tolerated' + + def testDecodeOpenTypesUnknownId(self): + s, r = decoder.decode( + ints2octs((48, 8, 2, 1, 3, 49, 3, 2, 1, 12)), asn1Spec=self.s, + decodeOpenTypes=True + ) + assert not r + assert s[0] == 3 + assert s[1][0] == univ.OctetString(hexValue='02010c') + + def testDontDecodeOpenTypesChoiceOne(self): + s, r = decoder.decode( + ints2octs((48, 8, 2, 1, 1, 49, 3, 2, 1, 12)), asn1Spec=self.s + ) + assert not r + assert s[0] == 1 + assert s[1][0] == ints2octs((2, 1, 12)) + + def testDontDecodeOpenTypesChoiceTwo(self): + s, r = decoder.decode( + ints2octs((48, 18, 2, 1, 2, 49, 13, 4, 11, 113, 117, 105, 99, + 107, 32, 98, 114, 111, 119, 110)), asn1Spec=self.s + ) + assert not r + assert s[0] == 2 + assert s[1][0] == ints2octs((4, 11, 113, 117, 105, 99, 107, 32, 98, 114, + 111, 119, 110)) + + +class SequenceDecoderWithImplicitlyTaggedSetOfOpenTypesTestCase(BaseTestCase): + def setUp(self): + openType = opentype.OpenType( + 'id', + {1: univ.Integer(), + 2: univ.OctetString()} + ) + self.s = univ.Sequence( + componentType=namedtype.NamedTypes( + namedtype.NamedType('id', univ.Integer()), + namedtype.NamedType( + 'blob', univ.SetOf( + componentType=univ.Any().subtype( + implicitTag=tag.Tag( + tag.tagClassContext, tag.tagFormatSimple, 3))), + openType=openType + ) + ) + ) + + def testDecodeOpenTypesChoiceOne(self): + s, r = decoder.decode( + ints2octs((48, 10, 2, 1, 1, 49, 5, 131, 3, 2, 1, 12)), + asn1Spec=self.s, decodeOpenTypes=True + ) + assert not r + assert s[0] == 1 + assert s[1][0] == 12 + + def testDecodeOpenTypesUnknownId(self): + s, r = decoder.decode( + ints2octs((48, 10, 2, 1, 3, 49, 5, 131, 3, 2, 1, 12)), + asn1Spec=self.s, decodeOpenTypes=True + ) + assert not r + assert s[0] == 3 + assert s[1][0] == univ.OctetString(hexValue='02010C') + + +class SequenceDecoderWithExplicitlyTaggedSetOfOpenTypesTestCase(BaseTestCase): + def setUp(self): + openType = opentype.OpenType( + 'id', + {1: univ.Integer(), + 2: univ.OctetString()} + ) + self.s = univ.Sequence( + componentType=namedtype.NamedTypes( + namedtype.NamedType('id', univ.Integer()), + namedtype.NamedType( + 'blob', univ.SetOf( + componentType=univ.Any().subtype( + explicitTag=tag.Tag( + tag.tagClassContext, tag.tagFormatSimple, 3))), + openType=openType + ) + ) + ) + + def testDecodeOpenTypesChoiceOne(self): + s, r = decoder.decode( + ints2octs((48, 10, 2, 1, 1, 49, 5, 131, 3, 2, 1, 12)), + asn1Spec=self.s, decodeOpenTypes=True + ) + assert not r + assert s[0] == 1 + assert s[1][0] == 12 + + def testDecodeOpenTypesUnknownId(self): + s, r = decoder.decode( + ints2octs( (48, 10, 2, 1, 3, 49, 5, 131, 3, 2, 1, 12)), + asn1Spec=self.s, decodeOpenTypes=True + ) + assert not r + assert s[0] == 3 + assert s[1][0] == univ.OctetString(hexValue='02010C') + + class SetDecoderTestCase(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) @@ -1439,8 +1592,8 @@ class ErrorOnDecodingTestCase(BaseTestCase): except PyAsn1Error: exc = sys.exc_info()[1] - assert (isinstance(exc, PyAsn1Error), - 'Unexpected exception raised %r' % (exc,)) + assert isinstance(exc, PyAsn1Error), ( + 'Unexpected exception raised %r' % (exc,)) else: assert False, 'Unexpected decoder result %r' % (asn1Object,) @@ -1453,12 +1606,12 @@ class ErrorOnDecodingTestCase(BaseTestCase): asn1Object, rest = decode(ints2octs( (31, 8, 2, 1, 1, 131, 3, 2, 1, 12))) - assert (isinstance(asn1Object, univ.Any), - 'Unexpected raw dump type %r' % (asn1Object,)) - assert (asn1Object.asNumbers() == (31, 8, 2, 1, 1), - 'Unexpected raw dump value %r' % (asn1Object,)) - assert (rest == ints2octs((131, 3, 2, 1, 12)), - 'Unexpected rest of substrate after raw dump %r' % rest) + assert isinstance(asn1Object, univ.Any), ( + 'Unexpected raw dump type %r' % (asn1Object,)) + assert asn1Object.asNumbers() == (31, 8, 2, 1, 1), ( + 'Unexpected raw dump value %r' % (asn1Object,)) + assert rest == ints2octs((131, 3, 2, 1, 12)), ( + 'Unexpected rest of substrate after raw dump %r' % rest) suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) diff --git a/tests/codec/ber/test_encoder.py b/tests/codec/ber/test_encoder.py index da14830..26236af 100644 --- a/tests/codec/ber/test_encoder.py +++ b/tests/codec/ber/test_encoder.py @@ -854,6 +854,130 @@ class SequenceEncoderWithExplicitlyTaggedOpenTypesTestCase(BaseTestCase): ) +class SequenceEncoderWithUntaggedSetOfOpenTypesTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + + openType = opentype.OpenType( + 'id', + {1: univ.Integer(), + 2: univ.OctetString()} + ) + self.s = univ.Sequence( + componentType=namedtype.NamedTypes( + namedtype.NamedType('id', univ.Integer()), + namedtype.NamedType('blob', univ.SetOf( + componentType=univ.Any()), openType=openType) + ) + ) + + def testEncodeOpenTypeChoiceOne(self): + self.s.clear() + + self.s[0] = 1 + self.s[1].append(univ.Integer(12)) + + assert encoder.encode(self.s, asn1Spec=self.s) == ints2octs( + (48, 8, 2, 1, 1, 49, 3, 2, 1, 12) + ) + + def testEncodeOpenTypeChoiceTwo(self): + self.s.clear() + + self.s[0] = 2 + self.s[1].append(univ.OctetString('quick brown')) + + assert encoder.encode(self.s, asn1Spec=self.s) == ints2octs( + (48, 18, 2, 1, 2, 49, 13, 4, 11, 113, 117, 105, 99, + 107, 32, 98, 114, 111, 119, 110) + ) + + def testEncodeOpenTypeUnknownId(self): + self.s.clear() + + self.s[0] = 2 + self.s[1].append(univ.ObjectIdentifier('1.3.6')) + + try: + encoder.encode(self.s, asn1Spec=self.s) + + except PyAsn1Error: + assert False, 'incompatible open type tolerated' + + def testEncodeOpenTypeIncompatibleType(self): + self.s.clear() + + self.s[0] = 2 + self.s[1].append(univ.ObjectIdentifier('1.3.6')) + + try: + encoder.encode(self.s, asn1Spec=self.s) + + except PyAsn1Error: + assert False, 'incompatible open type tolerated' + + +class SequenceEncoderWithImplicitlyTaggedSetOfOpenTypesTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + + openType = opentype.OpenType( + 'id', + {1: univ.Integer(), + 2: univ.OctetString()} + ) + self.s = univ.Sequence( + componentType=namedtype.NamedTypes( + namedtype.NamedType('id', univ.Integer()), + namedtype.NamedType('blob', univ.SetOf( + componentType=univ.Any().subtype( + implicitTag=tag.Tag( + tag.tagClassContext, tag.tagFormatSimple, 3))), + openType=openType) + ) + ) + + def testEncodeOpenTypeChoiceOne(self): + self.s.clear() + + self.s[0] = 1 + self.s[1].append(univ.Integer(12)) + + assert encoder.encode(self.s, asn1Spec=self.s) == ints2octs( + (48, 10, 2, 1, 1, 49, 5, 131, 3, 2, 1, 12) + ) + + +class SequenceEncoderWithExplicitlyTaggedSetOfOpenTypesTestCase(BaseTestCase): + def setUp(self): + BaseTestCase.setUp(self) + + openType = opentype.OpenType( + 'id', + {1: univ.Integer(), + 2: univ.OctetString()} + ) + self.s = univ.Sequence( + componentType=namedtype.NamedTypes( + namedtype.NamedType('id', univ.Integer()), + namedtype.NamedType('blob', univ.SetOf( + componentType=univ.Any().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))), + openType=openType) + ) + ) + + def testEncodeOpenTypeChoiceOne(self): + self.s.clear() + + self.s[0] = 1 + self.s[1].append(univ.Integer(12)) + + assert encoder.encode(self.s, asn1Spec=self.s) == ints2octs( + (48, 10, 2, 1, 1, 49, 5, 163, 3, 2, 1, 12) + ) + + class SequenceEncoderWithComponentsSchemaTestCase(BaseTestCase): def setUp(self): BaseTestCase.setUp(self) -- cgit v1.2.1