diff options
-rw-r--r-- | CHANGELOG.md | 32 | ||||
-rw-r--r-- | README.rst | 4 | ||||
-rw-r--r-- | VERSION | 2 | ||||
-rw-r--r-- | docs/howto/config.rst | 3 | ||||
-rw-r--r-- | docs/howto/index.rst | 16 | ||||
-rw-r--r-- | src/saml2/__init__.py | 10 | ||||
-rw-r--r-- | src/saml2/assertion.py | 3 | ||||
-rw-r--r-- | src/saml2/client.py | 191 | ||||
-rw-r--r-- | src/saml2/data/templates/template_enc.xml | 6 | ||||
-rw-r--r-- | src/saml2/entity.py | 60 | ||||
-rw-r--r-- | src/saml2/mdstore.py | 53 | ||||
-rw-r--r-- | src/saml2/response.py | 50 | ||||
-rw-r--r-- | src/saml2/saml.py | 81 | ||||
-rw-r--r-- | src/saml2/sigver.py | 116 | ||||
-rw-r--r-- | src/saml2/time_util.py | 4 | ||||
-rw-r--r-- | src/saml2/xmldsig/__init__.py | 15 | ||||
-rw-r--r-- | tests/idp_uiinfo.xml | 34 | ||||
-rw-r--r-- | tests/test_00_xmldsig.py | 49 | ||||
-rw-r--r-- | tests/test_30_mdstore.py | 50 | ||||
-rw-r--r-- | tests/test_41_response.py | 80 | ||||
-rw-r--r-- | tests/test_42_enc.py | 4 | ||||
-rw-r--r-- | tests/test_60_sp.py | 5 |
22 files changed, 636 insertions, 232 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a793bab..9d530eba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,38 @@ # Changelog +## 7.0.1 (2021-05-20) + +- Preserve order of response bindings on IdP-initiated logout +- Fix use of expected binding on SP logout + + +## 7.0.0 (2021-05-18) + +- **BREAKING** Replace encryption method rsa-1_5 with rsa-oaep-mgf1p +- Add documentation next to the code + + +## 6.5.2 (2021-05-18) + +- Add shibmd_scopes metadata extractor +- Allow the Issuer element on a Response to be missing +- Respect the preferred_binding configuration for the single_logout_service +- Fix logout signature flags for redirect, post and soap requests +- Respect the logout_requests_signed configuration option +- Fix crash when applying policy on RequestedAttribute without a friendlyName +- Correctly validate IssueInstant +- Correctly handle AudienceRestriction elements with no value +- Raise InvalidAssertion exception when assertion requirements are not met +- Raise SAMLError on failure to parse a metadata file +- Raise StatusInvalidAuthnResponseStatement when the AuthnStatement is not valid +- Handle all forms of ACS endpoint specifications +- tests: Always use base64.encodebytes; base64.encodestring has been dropped +- build: Set minimum version needed for xmlschema +- docs: Update Travis CI badge from travis-ci.org to travis-ci.com +- examples: Fix example code + + ## 6.5.1 (2021-01-21) - Fix the parser to take into account both the xs and xsd namespace prefixes @@ -5,8 +5,8 @@ PySAML2 - SAML2 in Python :Version: see VERSION_ :Documentation: https://pysaml2.readthedocs.io/ -.. image:: https://api.travis-ci.org/IdentityPython/pysaml2.png?branch=master - :target: https://travis-ci.org/IdentityPython/pysaml2 +.. image:: https://api.travis-ci.com/IdentityPython/pysaml2.png?branch=master + :target: https://travis-ci.com/IdentityPython/pysaml2 .. image:: https://img.shields.io/pypi/pyversions/pysaml2.svg :target: https://pypi.org/project/pysaml2/ @@ -1 +1 @@ -6.5.1 +7.0.1 diff --git a/docs/howto/config.rst b/docs/howto/config.rst index 556a6ee8..9060ad2c 100644 --- a/docs/howto/config.rst +++ b/docs/howto/config.rst @@ -153,7 +153,7 @@ assurance_certification Example:: - "assurance_specification": [ + "assurance_certification": [ "https://refeds.org/sirtfi", ] @@ -463,6 +463,7 @@ The available services are: * assertion_id_request_service * artifact_resolution_service * attribute_consuming_service +* single_logout_service service diff --git a/docs/howto/index.rst b/docs/howto/index.rst index 52f2d409..711058d0 100644 --- a/docs/howto/index.rst +++ b/docs/howto/index.rst @@ -6,30 +6,30 @@ How to use PySAML2 :Release: |release| :Date: |today| -Before you can use Pysaml2, you'll need to get it installed. +Before you can use Pysaml2, you'll need to get it installed. If you have not done it yet, read the :ref:`install` Well, now you have it installed and you want to do something. -And I'm sorry to tell you this; but there isn't really a lot you can do with +And I'm sorry to tell you this; but there isn't really a lot you can do with this code on its own. -Sure you can send a AuthenticationRequest to an IdentityProvider or a +Sure you can send a AuthenticationRequest to an IdentityProvider or a AttributeQuery to an AttributeAuthority, but in order to get what they return you have to sit behind a Web server. Well that is not really true since the AttributeQuery would be over SOAP and you would get the result over the connection you have to the AttributeAuthority. -But anyway, you may get my point. This is middleware stuff ! +But anyway, you may get my point. This is middleware stuff! -PySAML2 is built to fit into a +PySAML2 is built to fit into a `WSGI <http://www.python.org/dev/peps/pep-0333/>`_ application -But it can be used in a non-WSGI environment too. +But it can be used in a non-WSGI environment too. So you will find descriptions of both cases here. -The configuration is the same regardless of whether you are using PySAML2 in a +The configuration is the same regardless of whether you are using PySAML2 in a WSGI or non-WSGI environment. .. toctree:: @@ -37,4 +37,4 @@ WSGI or non-WSGI environment. config - + diff --git a/src/saml2/__init__.py b/src/saml2/__init__.py index 0fa9e49c..6c11e200 100644 --- a/src/saml2/__init__.py +++ b/src/saml2/__init__.py @@ -60,12 +60,20 @@ DECISION_TYPE_DENY = "Deny" DECISION_TYPE_INDETERMINATE = "Indeterminate" VERSION = "2.0" - +# http://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf +# The specification was later updated with errata, and the new version is here: +# http://www.oasis-open.org/committees/download.php/56779/sstc-saml-bindings-errata-2.0-wd-06.pdf +# parse a SOAP header, make a SOAP request, and receive a SOAP response BINDING_SOAP = 'urn:oasis:names:tc:SAML:2.0:bindings:SOAP' +# parse a PAOS header, make a PAOS request, and receive a PAOS response BINDING_PAOS = 'urn:oasis:names:tc:SAML:2.0:bindings:PAOS' +# URI encoded messages BINDING_HTTP_REDIRECT = 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect' +# HTML encoded messages BINDING_HTTP_POST = 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST' +# sensitive messages are transported over a backchannel BINDING_HTTP_ARTIFACT = 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact' +# as uri response encoded message BINDING_URI = 'urn:oasis:names:tc:SAML:2.0:bindings:URI' diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py index cd01463b..4474bf42 100644 --- a/src/saml2/assertion.py +++ b/src/saml2/assertion.py @@ -35,6 +35,9 @@ def _filter_values(vals, vlist=None, must=False): if not vlist: # No value specified equals any value return vals + if vals is None: # cannot iterate over None, return early + return vals + if isinstance(vlist, six.string_types): vlist = [vlist] diff --git a/src/saml2/client.py b/src/saml2/client.py index cf0399f3..532a23c9 100644 --- a/src/saml2/client.py +++ b/src/saml2/client.py @@ -251,86 +251,122 @@ class Saml2Client(Base): not_done = entity_ids[:] responses = {} + bindings_slo_preferred = self.config.preferred_binding["single_logout_service"] + for entity_id in entity_ids: logger.debug("Logout from '%s'", entity_id) - # for all where I can use the SOAP binding, do those first - for binding in [BINDING_SOAP, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT]: - if expected_binding and binding != expected_binding: - continue - - try: - srvs = self.metadata.single_logout_service( - entity_id, binding, "idpsso" - ) - except: - srvs = None - - if not srvs: - logger.debug("No SLO '%s' service", binding) - continue - - destination = next(locations(srvs), None) - logger.info("destination to provider: %s", destination) - - try: - session_info = self.users.get_info_from( - name_id, entity_id, False - ) - session_indexes = [session_info['session_index']] - except KeyError: - session_indexes = None - - sign = sign if sign is not None else self.logout_requests_signed - sign_post = False if binding == BINDING_HTTP_REDIRECT else sign - sign_redirect = False if binding == BINDING_HTTP_POST and sign else sign - - req_id, request = self.create_logout_request( - destination, - entity_id, - name_id=name_id, - reason=reason, - expire=expire, - session_indexes=session_indexes, - sign=sign_post, - sign_alg=sign_alg, - digest_alg=digest_alg, + + bindings_slo_supported = self.metadata.single_logout_service( + entity_id=entity_id, typ="idpsso" + ) + bindings_slo_preferred_and_supported = ( + binding + for binding in bindings_slo_preferred + if binding in bindings_slo_supported + ) + bindings_slo_choices = filter( + lambda x: x, + ( + expected_binding, + *bindings_slo_preferred_and_supported, + *bindings_slo_supported, ) + ) + binding = next(bindings_slo_choices, None) + if not binding: + logger.info( + { + "message": "Entity does not support SLO", + "entity": entity_id, + } + ) + continue - relay_state = self._relay_state(req_id) - http_info = self.apply_binding( - binding, - str(request), - destination, - relay_state, - sign=sign_redirect, - sigalg=sign_alg, + service_info = bindings_slo_supported[binding] + service_location = next(locations(service_info), None) + if not service_location: + logger.info( + { + "message": "Entity SLO service does not have a location", + "entity": entity_id, + "service_location": service_location, + } ) + continue + + session_info = self.users.get_info_from(name_id, entity_id, False) + session_index = session_info.get('session_index') + session_indexes = [session_index] if session_index else None - if binding == BINDING_SOAP: - response = self.send(**http_info) - if response and response.status_code == 200: - not_done.remove(entity_id) - response = response.text - logger.info("Response: %s", response) - res = self.parse_logout_request_response(response, binding) - responses[entity_id] = res - else: - logger.info("NOT OK response from %s", destination) + sign = sign if sign is not None else self.logout_requests_signed + sign_post = sign and ( + binding == BINDING_HTTP_POST or binding == BINDING_SOAP + ) + sign_redirect = sign and binding == BINDING_HTTP_REDIRECT + + log_report = { + "message": "Invoking SLO on entity", + "entity": entity_id, + "binding": binding, + "location": service_location, + "session_indexes": session_indexes, + "sign": sign, + } + logger.info(log_report) + + req_id, request = self.create_logout_request( + service_location, + entity_id, + name_id=name_id, + reason=reason, + expire=expire, + session_indexes=session_indexes, + sign=sign_post, + sign_alg=sign_alg, + digest_alg=digest_alg, + ) + relay_state = self._relay_state(req_id) + http_info = self.apply_binding( + binding, + str(request), + service_location, + relay_state, + sign=sign_redirect, + sigalg=sign_alg, + ) + + if binding == BINDING_SOAP: + response = self.send(**http_info) + if response and response.status_code == 200: + not_done.remove(entity_id) + response_text = response.text + log_report_response = { + **log_report, + "message": "Response from SLO service", + "response_text": response_text, + } + logger.debug(log_report_response) + res = self.parse_logout_request_response(response_text, binding) + responses[entity_id] = res else: - self.state[req_id] = { - "entity_id": entity_id, - "operation": "SLO", - "entity_ids": entity_ids, - "name_id": code(name_id), - "reason": reason, - "not_on_or_after": expire, - "sign": sign, + log_report_response = { + **log_report, + "message": "Bad status_code response from SLO service", + "status_code": (response and response.status_code), } - responses[entity_id] = (binding, http_info) - not_done.remove(entity_id) - - # only try one binding - break + logger.info(log_report_response) + else: + self.state[req_id] = { + "entity_id": entity_id, + "operation": "SLO", + "entity_ids": entity_ids, + "name_id": code(name_id), + "reason": reason, + "not_on_or_after": expire, + "sign": sign, + } + responses[entity_id] = (binding, http_info) + not_done.remove(entity_id) if not_done: # upstream should try later @@ -630,12 +666,11 @@ class Saml2Client(Base): status = status_message_factory("Wrong user", STATUS_UNKNOWN_PRINCIPAL) - if binding == BINDING_SOAP: - response_bindings = [BINDING_SOAP] - elif binding in [BINDING_HTTP_POST, BINDING_HTTP_REDIRECT]: - response_bindings = [BINDING_HTTP_POST, BINDING_HTTP_REDIRECT] - else: - response_bindings = self.config.preferred_binding["single_logout_service"] + response_bindings = { + BINDING_SOAP: [BINDING_SOAP], + BINDING_HTTP_POST: [BINDING_HTTP_POST, BINDING_HTTP_REDIRECT], + BINDING_HTTP_REDIRECT: [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST], + }.get(binding) if sign is None: sign = self.logout_responses_signed diff --git a/src/saml2/data/templates/template_enc.xml b/src/saml2/data/templates/template_enc.xml index 0b962e55..d581485e 100644 --- a/src/saml2/data/templates/template_enc.xml +++ b/src/saml2/data/templates/template_enc.xml @@ -2,12 +2,10 @@ <EncryptedData xmlns="http://www.w3.org/2001/04/xmlenc#" Type="http://www.w3.org/2001/04/xmlenc#Element"> - <EncryptionMethod Algorithm= - "http://www.w3.org/2001/04/xmlenc#tripledes-cbc"/> + <EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#tripledes-cbc"/> <KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#"> <EncryptedKey xmlns="http://www.w3.org/2001/04/xmlenc#"> - <EncryptionMethod Algorithm= - "http://www.w3.org/2001/04/xmlenc#rsa-1_5"/> + <EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p"/> <KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#"> <KeyName/> </KeyInfo> diff --git a/src/saml2/entity.py b/src/saml2/entity.py index d2d6ec5c..f6ca396c 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -65,6 +65,7 @@ from saml2.sigver import security_context from saml2.sigver import SigverError from saml2.sigver import SignatureError from saml2.sigver import make_temp +from saml2.sigver import get_pem_wrapped_unwrapped from saml2.sigver import pre_encryption_part from saml2.sigver import pre_signature_part from saml2.sigver import pre_encrypt_assertion @@ -202,6 +203,39 @@ class Entity(HTTPBase): self.msg_cb = msg_cb + def reload_metadata(self, metadata_conf): + """ + Reload metadata configuration. + + Load a new metadata configuration as defined by metadata_conf (by + passing this to Config.load_metadata) and make this entity (as well as + subordinate objects with own metadata reference) use the new metadata. + + The structure of metadata_conf is the same as the 'metadata' entry in + the configuration passed to saml2.Config. + + param metadata_conf: Metadata configuration as passed to Config.load_metadata + return: True if successfully reloaded + """ + logger.debug("Loading new metadata") + try: + new_metadata = self.config.load_metadata(metadata_conf) + except Exception as ex: + logger.error("Loading metadata failed", exc_info=ex) + return False + + logger.debug("Applying new metadata to main config") + ( self.metadata, self.sec.metadata, self.config.metadata ) = [new_metadata]*3 + policy = getattr(self.config, "_%s_policy" % self.entity_type, None) + if policy and policy.metadata_store: + logger.debug("Applying new metadata to %s policy", self.entity_type) + policy.metadata_store = self.metadata + + logger.debug("Applying new metadata source_id") + self.sourceid = self.metadata.construct_source_id() + + return True + def _issuer(self, entityid=None): """ Return an Issuer instance """ if entityid: @@ -304,7 +338,7 @@ class Entity(HTTPBase): sfunc = getattr(self.metadata, service) - if bindings is None: + if not bindings: if request and request.protocol_binding: bindings = [request.protocol_binding] else: @@ -654,19 +688,19 @@ class Entity(HTTPBase): _certs = self.metadata.certs(sp_entity_id, "any", "encryption") exception = None for _cert in _certs: + wrapped_cert, unwrapped_cert = get_pem_wrapped_unwrapped(_cert) try: - begin_cert = "-----BEGIN CERTIFICATE-----\n" - end_cert = "\n-----END CERTIFICATE-----\n" - if begin_cert not in _cert: - _cert = "%s%s" % (begin_cert, _cert) - if end_cert not in _cert: - _cert = "%s%s" % (_cert, end_cert) - tmp = make_temp(_cert.encode('ascii'), - decode=False, - delete_tmpfiles=self.config.delete_tmpfiles) - response = self.sec.encrypt_assertion(response, tmp.name, - pre_encryption_part(), - node_xpath=node_xpath) + tmp = make_temp( + wrapped_cert.encode('ascii'), + decode=False, + delete_tmpfiles=self.config.delete_tmpfiles, + ) + response = self.sec.encrypt_assertion( + response, + tmp.name, + pre_encryption_part(encrypt_cert=unwrapped_cert), + node_xpath=node_xpath, + ) return response except Exception as ex: exception = ex diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py index 44930773..d001999d 100644 --- a/src/saml2/mdstore.py +++ b/src/saml2/mdstore.py @@ -10,6 +10,7 @@ from warnings import warn as _warn from hashlib import sha1 from os.path import isfile from os.path import join +from re import compile as regex_compile import requests @@ -58,6 +59,8 @@ from saml2.extension.mdui import Logo from saml2.extension.mdrpi import NAMESPACE as NS_MDRPI from saml2.extension.mdrpi import RegistrationInfo from saml2.extension.mdrpi import RegistrationPolicy +from saml2.extension.shibmd import NAMESPACE as NS_SHIBMD +from saml2.extension.shibmd import Scope logger = logging.getLogger(__name__) @@ -83,6 +86,7 @@ classnames = { "service_nameid_mapping": "{ns}&{tag}".format(ns=NS_MD, tag=NameIDMappingService.c_tag), "mdrpi_registration_info": "{ns}&{tag}".format(ns=NS_MDRPI, tag=RegistrationInfo.c_tag), "mdrpi_registration_policy": "{ns}&{tag}".format(ns=NS_MDRPI, tag=RegistrationPolicy.c_tag), + "shibmd_scope": "{ns}&{tag}".format(ns=NS_SHIBMD, tag=Scope.c_tag) } ENTITY_CATEGORY = "http://macedir.org/entity-category" @@ -614,7 +618,14 @@ class InMemoryMetaData(MetaData): try: self.entities_descr = md.entities_descriptor_from_string(xmlstr) except Exception as e: - raise SAMLError(f'Failed to parse metadata file: {self.filename}') from e + _md_desc = ( + f'metadata file: {self.filename}' + if isinstance(self,MetaDataFile) + else f'remote metadata: {self.url}' + if isinstance(self, MetaDataExtern) + else 'metadata' + ) + raise SAMLError(f'Failed to parse {_md_desc}') from e if not self.entities_descr: self.entity_descr = md.entity_descriptor_from_string(xmlstr) @@ -1229,8 +1240,6 @@ class MetadataStore(MetaData): # IDP + SP if typ is None: raise AttributeError("Missing type specification") - if binding is None: - binding = BINDING_HTTP_REDIRECT return self.service(entity_id, "%s_descriptor" % typ, "single_logout_service", binding) @@ -1479,6 +1488,41 @@ class MetadataStore(MetaData): ) return elements + def sbibmd_scopes(self, entity_id, typ=None): + try: + md = self[entity_id] + except KeyError: + md = {} + + descriptor_scopes = ( + { + "regexp": is_regexp, + "text": regex_compile(text) if is_regexp else text, + } + for elem in md.get("extensions", {}).get("extension_elements", []) + if elem.get("__class__") == classnames["shibmd_scope"] + for is_regexp, text in [ + (elem.get("regexp", "").lower() == "true", elem.get("text", "")), + ] + ) + + services_of_type = md.get(typ) or [] + services_of_type_scopes = ( + { + "regexp": is_regexp, + "text": regex_compile(text) if is_regexp else text, + } + for srv in services_of_type + for elem in srv.get("extensions", {}).get("extension_elements", []) + if elem.get("__class__") == classnames["shibmd_scope"] + for is_regexp, text in [ + (elem.get("regexp", "").lower() == "true", elem.get("text", "")), + ] + ) + + scopes = chain(descriptor_scopes, services_of_type_scopes) + return scopes + def mdui_uiinfo(self, entity_id): try: data = self[entity_id] @@ -1656,4 +1700,5 @@ class MetadataStore(MetaData): return "%s" % res elif format == "md": - return json.dumps(self.items(), indent=2) + # self.items() returns dictitems(), convert that back into a dict + return json.dumps(dict(self.items()), indent=2) diff --git a/src/saml2/response.py b/src/saml2/response.py index 9d4021ee..fc2071d3 100644 --- a/src/saml2/response.py +++ b/src/saml2/response.py @@ -413,7 +413,7 @@ class StatusResponse(object): self.response.destination and self.response.destination not in self.return_addrs ): - logger.error("%s not in %s", self.response.destination, self.return_addrs) + logger.error("destination '%s' not in return addresses '%s'", self.response.destination, self.return_addrs) return None valid = self.issue_instant_ok() and self.status_ok() @@ -435,7 +435,12 @@ class StatusResponse(object): self.response = mold.response def issuer(self): - return self.response.issuer.text.strip() + issuer_value = ( + self.response.issuer.text + if self.response.issuer is not None + else "" + ).strip() + return issuer_value class LogoutResponse(StatusResponse): @@ -732,6 +737,10 @@ class AuthnResponse(StatusResponse): def get_subject(self): """ The assertion must contain a Subject """ + + if not self.assertion: + raise ValueError("Missing assertion") + if not self.assertion.subject: raise ValueError( "Invalid assertion subject: {subject}".format( @@ -1066,23 +1075,30 @@ class AuthnResponse(StatusResponse): def authn_info(self): res = [] - for astat in self.assertion.authn_statement: - context = astat.authn_context + for statement in self.assertion.authn_statement: try: - authn_instant = astat.authn_instant + authn_instant = statement.authn_instant except AttributeError: authn_instant = "" - if context: - try: - aclass = context.authn_context_class_ref.text - except AttributeError: - aclass = "" - try: - authn_auth = [a.text for a in - context.authenticating_authority] - except AttributeError: - authn_auth = [] - res.append((aclass, authn_auth, authn_instant)) + + context = statement.authn_context + if not context: + continue + + try: + authn_class = ( + context.authn_context_class_ref.text + or context.authn_context_decl_ref.text + ) + except AttributeError: + authn_class = "" + + try: + authn_auth = [a.text for a in context.authenticating_authority] + except AttributeError: + authn_auth = [] + + res.append((authn_class, authn_auth, authn_instant)) return res def authz_decision_info(self): @@ -1116,7 +1132,7 @@ class AuthnResponse(StatusResponse): raise StatusInvalidAuthnResponseStatement( "The Authn Response Statement is not valid" ) - + def __str__(self): return self.xmlstr diff --git a/src/saml2/saml.py b/src/saml2/saml.py index 1b60822b..6ddd913e 100644 --- a/src/saml2/saml.py +++ b/src/saml2/saml.py @@ -3,6 +3,17 @@ # # Generated Mon May 2 14:23:33 2011 by parse_xsd.py version 0.4. # +# A summary of available specifications can be found at: +# https://wiki.oasis-open.org/security/FrontPage +# +# saml core specifications to be found at: +# if any question arise please query the following pdf. +# http://docs.oasis-open.org/security/saml/v2.0/saml-core-2.0-os.pdf +# The specification was later updated with errata, and the new version is here: +# https://www.oasis-open.org/committees/download.php/56776/sstc-saml-core-errata-2.0-wd-07.pdf +# + + import base64 from saml2.validate import valid_ipv4, MustValueError @@ -17,32 +28,53 @@ import six from saml2 import xmldsig as ds from saml2 import xmlenc as xenc +# authentication information fields NAMESPACE = 'urn:oasis:names:tc:SAML:2.0:assertion' -XSI_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance' +# xmlschema definition +XSD = "xs" +# xmlschema templates and extensions XS_NAMESPACE = 'http://www.w3.org/2001/XMLSchema' - +# xmlschema-instance, which contains several builtin attributes +XSI_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance' +# xml soap namespace +NS_SOAP_ENC = "http://schemas.xmlsoap.org/soap/encoding/" +# type definitions for xmlschemas XSI_TYPE = '{%s}type' % XSI_NAMESPACE +# nil type definition for xmlschemas XSI_NIL = '{%s}nil' % XSI_NAMESPACE +# idp and sp communicate usually about a subject(NameID) +# the format determines the category the subject is in + +# custom subject NAMEID_FORMAT_UNSPECIFIED = ( "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified") +# subject as email address NAMEID_FORMAT_EMAILADDRESS = ( "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress") +# subject as x509 key NAMEID_FORMAT_X509SUBJECTNAME = ( "urn:oasis:names:tc:SAML:1.1:nameid-format:X509SubjectName") +# subject as windows domain name NAMEID_FORMAT_WINDOWSDOMAINQUALIFIEDNAME = ( "urn:oasis:names:tc:SAML:1.1:nameid-format:WindowsDomainQualifiedName") +# subject from a kerberos instance NAMEID_FORMAT_KERBEROS = ( "urn:oasis:names:tc:SAML:2.0:nameid-format:kerberos") +# subject as name NAMEID_FORMAT_ENTITY = ( "urn:oasis:names:tc:SAML:2.0:nameid-format:entity") +# linked subject NAMEID_FORMAT_PERSISTENT = ( "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent") +# annonymous subject NAMEID_FORMAT_TRANSIENT = ( "urn:oasis:names:tc:SAML:2.0:nameid-format:transient") +# subject avaiable in encrypted format NAMEID_FORMAT_ENCRYPTED = ( "urn:oasis:names:tc:SAML:2.0:nameid-format:encrypted") +# dicc for avaiable formats NAMEID_FORMATS_SAML2 = ( ('NAMEID_FORMAT_EMAILADDRESS', NAMEID_FORMAT_EMAILADDRESS), ('NAMEID_FORMAT_ENCRYPTED', NAMEID_FORMAT_ENCRYPTED), @@ -51,41 +83,80 @@ NAMEID_FORMATS_SAML2 = ( ('NAMEID_FORMAT_TRANSIENT', NAMEID_FORMAT_TRANSIENT), ('NAMEID_FORMAT_UNSPECIFIED', NAMEID_FORMAT_UNSPECIFIED), ) + +# a profile outlines a set of rules describing how to embed SAML assertions. +# https://docs.oasis-open.org/security/saml/v2.0/saml-profiles-2.0-os.pdf +# The specification was later updated with errata, and the new version is here: +# https://www.oasis-open.org/committees/download.php/56782/sstc-saml-profiles-errata-2.0-wd-07.pdf + +# XML based values for SAML attributes PROFILE_ATTRIBUTE_BASIC = ( "urn:oasis:names:tc:SAML:2.0:profiles:attribute:basic") +# an AuthnRequest is made to initiate authentication +# authenticate the request with login credentials AUTHN_PASSWORD = "urn:oasis:names:tc:SAML:2.0:ac:classes:Password" +# authenticate the request with login credentials, over tls/https AUTHN_PASSWORD_PROTECTED = \ "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport" +# attribute statements is key:value metadata shared with your app + +# custom format NAME_FORMAT_UNSPECIFIED = ( "urn:oasis:names:tc:SAML:2.0:attrname-format:unspecified") +# uri format NAME_FORMAT_URI = "urn:oasis:names:tc:SAML:2.0:attrname-format:uri" +# XML-based format NAME_FORMAT_BASIC = "urn:oasis:names:tc:SAML:2.0:attrname-format:basic" +# dicc for avaiable formats NAME_FORMATS_SAML2 = ( ('NAME_FORMAT_BASIC', NAME_FORMAT_BASIC), ('NAME_FORMAT_URI', NAME_FORMAT_URI), ('NAME_FORMAT_UNSPECIFIED', NAME_FORMAT_UNSPECIFIED), ) + +# the SAML authority's decision can be predetermined by arbitrary context + +# the specified action is permitted DECISION_TYPE_PERMIT = "Permit" +# the specified action is denied DECISION_TYPE_DENY = "Deny" +# the SAML authority cannot determine if the action is permitted or denied DECISION_TYPE_INDETERMINATE = "Indeterminate" + +# consent attributes determine wether consent has been given and under +# what conditions + +# no claim to consent is made CONSENT_UNSPECIFIED = "urn:oasis:names:tc:SAML:2.0:consent:unspecified" +# consent has been obtained CONSENT_OBTAINED = "urn:oasis:names:tc:SAML:2.0:consent:obtained" +# consent has been obtained before the message has been initiated CONSENT_PRIOR = "urn:oasis:names:tc:SAML:2.0:consent:prior" +# consent has been obtained implicitly CONSENT_IMPLICIT = "urn:oasis:names:tc:SAML:2.0:consent:current-implicit" +# consent has been obtained explicitly CONSENT_EXPLICIT = "urn:oasis:names:tc:SAML:2.0:consent:current-explicit" +# no consent has been obtained CONSENT_UNAVAILABLE = "urn:oasis:names:tc:SAML:2.0:consent:unavailable" +# no consent is needed. CONSENT_INAPPLICABLE = "urn:oasis:names:tc:SAML:2.0:consent:inapplicable" + +# Subject confirmation methods(scm), can be issued, besides the subject itself +# by third parties. +# http://docs.oasis-open.org/wss/oasis-wss-saml-token-profile-1.0.pdf + +# the 3rd party is identified on behalf of the subject given private/public key SCM_HOLDER_OF_KEY = "urn:oasis:names:tc:SAML:2.0:cm:holder-of-key" +# the 3rd party is identified by subject confirmation and must include a security header +# signing its content. SCM_SENDER_VOUCHES = "urn:oasis:names:tc:SAML:2.0:cm:sender-vouches" +# a bearer token is issued instead. SCM_BEARER = "urn:oasis:names:tc:SAML:2.0:cm:bearer" -XSD = "xs" -NS_SOAP_ENC = "http://schemas.xmlsoap.org/soap/encoding/" - class AttributeValueBase(SamlBase): def __init__(self, diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index b950d18d..973d6245 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -7,6 +7,7 @@ import hashlib import itertools import logging import os +import re import six from uuid import uuid4 as gen_random_key from time import mktime @@ -41,6 +42,10 @@ from saml2.s_utils import sid from saml2.s_utils import Unsupported from saml2.time_util import instant from saml2.time_util import str_to_time +from saml2.xmldsig import ALLOWED_CANONICALIZATIONS +from saml2.xmldsig import ALLOWED_TRANSFORMS +from saml2.xmldsig import TRANSFORM_C14N +from saml2.xmldsig import TRANSFORM_ENVELOPED from saml2.xmldsig import SIG_RSA_SHA1 from saml2.xmldsig import SIG_RSA_SHA224 from saml2.xmldsig import SIG_RSA_SHA256 @@ -59,9 +64,10 @@ logger = logging.getLogger(__name__) SIG = '{{{ns}#}}{attribute}'.format(ns=ds.NAMESPACE, attribute='Signature') +# RSA_1_5 is considered deprecated RSA_1_5 = 'http://www.w3.org/2001/04/xmlenc#rsa-1_5' TRIPLE_DES_CBC = 'http://www.w3.org/2001/04/xmlenc#tripledes-cbc' - +RSA_OAEP_MGF1P = "http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p" class SigverError(SAMLError): pass @@ -100,6 +106,14 @@ class CertificateError(SigverError): pass +def get_pem_wrapped_unwrapped(cert): + begin_cert = "-----BEGIN CERTIFICATE-----\n" + end_cert = "\n-----END CERTIFICATE-----\n" + unwrapped_cert = re.sub(f'{begin_cert}|{end_cert}', '', cert) + wrapped_cert = f'{begin_cert}{unwrapped_cert}{end_cert}' + return wrapped_cert, unwrapped_cert + + def read_file(*args, **kwargs): with open(*args, **kwargs) as handler: return handler.read() @@ -1085,10 +1099,8 @@ def encrypt_cert_from_item(item): pass if _encrypt_cert is not None: - if _encrypt_cert.find('-----BEGIN CERTIFICATE-----\n') == -1: - _encrypt_cert = '-----BEGIN CERTIFICATE-----\n' + _encrypt_cert - if _encrypt_cert.find('\n-----END CERTIFICATE-----') == -1: - _encrypt_cert = _encrypt_cert + '\n-----END CERTIFICATE-----' + wrapped_cert, unwrapped_cert = get_pem_wrapped_unwrapped(_encrypt_cert) + _encrypt_cert = wrapped_cert return _encrypt_cert @@ -1495,7 +1507,8 @@ class SecurityContext(object): # * the Reference element must have a URI attribute # * the URI attribute contains an anchor # * the anchor points to the enclosing element's ID attribute - references = item.signature.signed_info.reference + signed_info = item.signature.signed_info + references = signed_info.reference signatures_must_have_a_single_reference_element = len(references) == 1 the_Reference_element_must_have_a_URI_attribute = ( signatures_must_have_a_single_reference_element @@ -1510,6 +1523,46 @@ class SecurityContext(object): the_URI_attribute_contains_an_anchor and references[0].uri == "#{id}".format(id=item.id) ) + + # SAML implementations SHOULD use Exclusive Canonicalization, + # with or without comments + canonicalization_method_is_c14n = ( + signed_info.canonicalization_method.algorithm in ALLOWED_CANONICALIZATIONS + ) + + # Signatures in SAML messages SHOULD NOT contain transforms other than the + # - enveloped signature transform + # (with the identifier http://www.w3.org/2000/09/xmldsig#enveloped-signature) + # - or the exclusive canonicalization transforms + # (with the identifier http://www.w3.org/2001/10/xml-exc-c14n# + # or http://www.w3.org/2001/10/xml-exc-c14n#WithComments). + transform_algos = [ + transform.algorithm + for transform in references[0].transforms.transform + ] + tranform_algos_valid = ALLOWED_TRANSFORMS.intersection(transform_algos) + transform_algos_n = len(transform_algos) + tranform_algos_valid_n = len(tranform_algos_valid) + + the_number_of_transforms_is_one_or_two = ( + signatures_must_have_a_single_reference_element + and 1 <= transform_algos_n <= 2 + ) + all_transform_algs_are_allowed = ( + the_number_of_transforms_is_one_or_two + and transform_algos_n == tranform_algos_valid_n + ) + the_enveloped_signature_transform_is_defined = ( + the_number_of_transforms_is_one_or_two + and TRANSFORM_ENVELOPED in transform_algos + ) + + # The <ds:Object> element is not defined for use with SAML signatures, + # and SHOULD NOT be present. + # Since it can be used in service of an attacker by carrying unsigned data, + # verifiers SHOULD reject signatures that contain a <ds:Object> element. + object_element_is_not_present = not item.signature.object + validators = { "signatures must have a single reference element": ( signatures_must_have_a_single_reference_element @@ -1523,6 +1576,15 @@ class SecurityContext(object): "the anchor points to the enclosing element ID attribute": ( the_anchor_points_to_the_enclosing_element_ID_attribute ), + "canonicalization method is c14n": canonicalization_method_is_c14n, + "the number of transforms is one or two": ( + the_number_of_transforms_is_one_or_two + ), + "all transform algs are allowed": all_transform_algs_are_allowed, + "the enveloped signature transform is defined": ( + the_enveloped_signature_transform_is_defined + ), + "object element is not present": object_element_is_not_present, } if not all(validators.values()): error_context = { @@ -1810,10 +1872,9 @@ def pre_signature_part( sign_alg = ds.DefaultSignature().get_sign_alg() signature_method = ds.SignatureMethod(algorithm=sign_alg) - canonicalization_method = ds.CanonicalizationMethod( - algorithm=ds.ALG_EXC_C14N) - trans0 = ds.Transform(algorithm=ds.TRANSFORM_ENVELOPED) - trans1 = ds.Transform(algorithm=ds.ALG_EXC_C14N) + canonicalization_method = ds.CanonicalizationMethod(algorithm=TRANSFORM_C14N) + trans0 = ds.Transform(algorithm=TRANSFORM_ENVELOPED) + trans1 = ds.Transform(algorithm=TRANSFORM_C14N) transforms = ds.Transforms(transform=[trans0, trans1]) digest_method = ds.DigestMethod(algorithm=digest_alg) @@ -1835,6 +1896,7 @@ def pre_signature_part( if identifier: signature.id = 'Signature{n}'.format(n=identifier) + # XXX remove - do not embed the cert if public_key: x509_data = ds.X509Data( x509_certificate=[ds.X509Certificate(text=public_key)]) @@ -1872,23 +1934,34 @@ def pre_signature_part( # </EncryptedData> -def pre_encryption_part(msg_enc=TRIPLE_DES_CBC, key_enc=RSA_1_5, key_name='my-rsa-key', - encrypted_key_id=None, encrypted_data_id=None): - """ - - :param msg_enc: - :param key_enc: - :param key_name: - :return: - """ +def pre_encryption_part( + *, + msg_enc=TRIPLE_DES_CBC, + key_enc=RSA_OAEP_MGF1P, + key_name='my-rsa-key', + encrypted_key_id=None, + encrypted_data_id=None, + encrypt_cert=None, +): ek_id = encrypted_key_id or "EK_{id}".format(id=gen_random_key()) ed_id = encrypted_data_id or "ED_{id}".format(id=gen_random_key()) msg_encryption_method = EncryptionMethod(algorithm=msg_enc) key_encryption_method = EncryptionMethod(algorithm=key_enc) + + x509_data = ( + ds.X509Data(x509_certificate=ds.X509Certificate(text=encrypt_cert)) + if encrypt_cert + else None + ) + key_info = ds.KeyInfo( + key_name=ds.KeyName(text=key_name), + x509_data=x509_data, + ) + encrypted_key = EncryptedKey( id=ek_id, encryption_method=key_encryption_method, - key_info=ds.KeyInfo(key_name=ds.KeyName(text=key_name)), + key_info=key_info, cipher_data=CipherData(cipher_value=CipherValue(text='')), ) key_info = ds.KeyInfo(encrypted_key=encrypted_key) @@ -1897,7 +1970,8 @@ def pre_encryption_part(msg_enc=TRIPLE_DES_CBC, key_enc=RSA_1_5, key_name='my-rs type='http://www.w3.org/2001/04/xmlenc#Element', encryption_method=msg_encryption_method, key_info=key_info, - cipher_data=CipherData(cipher_value=CipherValue(text=''))) + cipher_data=CipherData(cipher_value=CipherValue(text='')), + ) return encrypted_data diff --git a/src/saml2/time_util.py b/src/saml2/time_util.py index 332d84bb..9eb4cec0 100644 --- a/src/saml2/time_util.py +++ b/src/saml2/time_util.py @@ -269,7 +269,7 @@ def utc_now(): def before(point): - """ True if point datetime specification is before now. + """ True if current time is before point datetime specification. NOTE: If point is specified it is supposed to be in local time. Not UTC/GMT !! This is because that is what gmtime() expects. @@ -286,7 +286,7 @@ def before(point): def after(point): - """ True if point datetime specification is equal or after now """ + """ True if current time is after or equal to point datetime specification.""" if not point: return True else: diff --git a/src/saml2/xmldsig/__init__.py b/src/saml2/xmldsig/__init__.py index 02bac3f4..4177601f 100644 --- a/src/saml2/xmldsig/__init__.py +++ b/src/saml2/xmldsig/__init__.py @@ -53,14 +53,21 @@ SIG_AVAIL_ALG = SIG_ALLOWED_ALG + (('SIG_RSA_MD5', SIG_RSA_MD5),) MAC_SHA1 = 'http://www.w3.org/2000/09/xmldsig#hmac-sha1' -C14N = 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315' -C14N_WITH_C = 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315#WithComments' -ALG_EXC_C14N = 'http://www.w3.org/2001/10/xml-exc-c14n#' - TRANSFORM_XSLT = 'http://www.w3.org/TR/1999/REC-xslt-19991116' TRANSFORM_XPATH = 'http://www.w3.org/TR/1999/REC-xpath-19991116' TRANSFORM_ENVELOPED = 'http://www.w3.org/2000/09/xmldsig#enveloped-signature' +TRANSFORM_C14N = 'http://www.w3.org/2001/10/xml-exc-c14n#' +TRANSFORM_C14N_WITH_COMMENTS = 'http://www.w3.org/2001/10/xml-exc-c14n#WithComments' +ALLOWED_CANONICALIZATIONS = { + TRANSFORM_C14N, + TRANSFORM_C14N_WITH_COMMENTS, +} +ALLOWED_TRANSFORMS = { + TRANSFORM_ENVELOPED, + TRANSFORM_C14N, + TRANSFORM_C14N_WITH_COMMENTS, +} class DefaultSignature(object): class _DefaultSignature(object): diff --git a/tests/idp_uiinfo.xml b/tests/idp_uiinfo.xml index fa37d703..7d531fd1 100644 --- a/tests/idp_uiinfo.xml +++ b/tests/idp_uiinfo.xml @@ -1,17 +1,19 @@ <?xml version='1.0' encoding='UTF-8'?> -<ns0:EntitiesDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ns1="urn:mace:shibboleth:metadata:1.0" xmlns:ns2="urn:oasis:names:tc:SAML:metadata:ui" xmlns:ns3="http://www.w3.org/2000/09/xmldsig#"><ns0:EntityDescriptor entityID="http://example.com/saml2/idp.xml"><ns0:IDPSSODescriptor WantAuthnRequestsSigned="false" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:Extensions><ns1:Scope regexp="false">example.org</ns1:Scope><ns2:UIInfo><ns2:Keywords xml:lang="en">foo bar</ns2:Keywords><ns2:Logo height="40" width="30">http://example.com/logo.jpg</ns2:Logo><ns2:InformationURL>http://example.com/saml2/info.html</ns2:InformationURL><ns2:DisplayName>Example Co.</ns2:DisplayName><ns2:Description xml:lang="se">Exempel bolag</ns2:Description><ns2:PrivacyStatementURL>http://example.com/saml2/privacyStatement.html</ns2:PrivacyStatementURL></ns2:UIInfo></ns0:Extensions><ns0:KeyDescriptor><ns3:KeyInfo><ns3:X509Data><ns3:X509Certificate>MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV -BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX -aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF -MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 -ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB -gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy -3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN -efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G -A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs -iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt -U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw -mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6 -h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5 -U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6 -mrPzGzk3ECbupFnqyREH3+ZPSdk= -</ns3:X509Certificate></ns3:X509Data></ns3:KeyInfo></ns0:KeyDescriptor><ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://example.com/saml2/" /></ns0:IDPSSODescriptor></ns0:EntityDescriptor></ns0:EntitiesDescriptor> +<ns0:EntitiesDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ns1="urn:mace:shibboleth:metadata:1.0" xmlns:ns2="urn:oasis:names:tc:SAML:metadata:ui" xmlns:ns3="http://www.w3.org/2000/09/xmldsig#"> + <ns0:EntityDescriptor entityID="http://example.com/saml2/idp.xml"> + <ns0:Extensions> + <ns1:Scope regexp="false">descriptor-example.org</ns1:Scope> + <ns1:Scope regexp="true">descriptor-example[^0-9]*\.org</ns1:Scope> + </ns0:Extensions> + <ns0:IDPSSODescriptor WantAuthnRequestsSigned="false" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"> + <ns0:Extensions> + <ns1:Scope regexp="false">idpssodescriptor-example.org</ns1:Scope> + <ns2:UIInfo><ns2:Keywords xml:lang="en">foo bar</ns2:Keywords><ns2:Logo height="40" width="30">http://example.com/logo.jpg</ns2:Logo><ns2:InformationURL>http://example.com/saml2/info.html</ns2:InformationURL><ns2:DisplayName>Example Co.</ns2:DisplayName><ns2:Description xml:lang="se">Exempel bolag</ns2:Description><ns2:PrivacyStatementURL>http://example.com/saml2/privacyStatement.html</ns2:PrivacyStatementURL></ns2:UIInfo> + </ns0:Extensions> + <ns0:KeyDescriptor> + <ns3:KeyInfo><ns3:X509Data><ns3:X509Certificate>MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBFMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaNefiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0GA1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJsiojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSwmDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6mrPzGzk3ECbupFnqyREH3+ZPSdk=</ns3:X509Certificate></ns3:X509Data></ns3:KeyInfo> + </ns0:KeyDescriptor> + <ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://example.com/saml2/" /> + </ns0:IDPSSODescriptor> + </ns0:EntityDescriptor> +</ns0:EntitiesDescriptor> diff --git a/tests/test_00_xmldsig.py b/tests/test_00_xmldsig.py index 5b800194..80453f9a 100644 --- a/tests/test_00_xmldsig.py +++ b/tests/test_00_xmldsig.py @@ -33,12 +33,10 @@ class TestObject: new_object = ds.object_from_string(ds_data.TEST_OBJECT) assert new_object.id == "object_id" assert new_object.encoding == ds.ENCODING_BASE64 - assert new_object.text.strip() == \ - "V2VkIEp1biAgNCAxMjoxMTowMyBFRFQgMjAwMwo" - + assert new_object.text.strip() == "V2VkIEp1biAgNCAxMjoxMTowMyBFRFQgMjAwMwo" -class TestMgmtData: +class TestMgmtData: def setup_class(self): self.mgmt_data = ds.MgmtData() @@ -156,7 +154,7 @@ class TestX509Data: self.x509_data.x509_certificate = ds.X509Certificate( text="x509 certificate") self.x509_data.x509_crl = ds.X509CRL(text="x509 crl") - + new_x509_data = ds.x509_data_from_string(self.x509_data.to_string()) print(new_x509_data.keyswv()) print(new_x509_data.__dict__.keys()) @@ -231,7 +229,7 @@ class TestTransforms: ds.TRANSFORM_ENVELOPED assert new_transforms.transform[0].x_path[0].text.strip() == "xpath" assert new_transforms.transform[1].x_path[0].text.strip() == "xpath" - + def testUsingTestData(self): """Test for transform_from_string() using test data""" new_transforms = ds.transforms_from_string(ds_data.TEST_TRANSFORMS) @@ -261,7 +259,7 @@ class TestRetrievalMethod: assert new_retrieval_method.uri == "http://www.example.com/URI" assert new_retrieval_method.type == "http://www.example.com/Type" assert isinstance(new_retrieval_method.transforms, ds.Transforms) - + def testUsingTestData(self): """Test for retrieval_method_from_string() using test data""" new_retrieval_method = ds.retrieval_method_from_string( @@ -285,7 +283,7 @@ class TestRSAKeyValue: assert isinstance(new_rsa_key_value.exponent, ds.Exponent) assert new_rsa_key_value.modulus.text.strip() == "modulus" assert new_rsa_key_value.exponent.text.strip() == "exponent" - + def testUsingTestData(self): """Test for rsa_key_value_from_string() using test data""" new_rsa_key_value = ds.rsa_key_value_from_string( @@ -325,7 +323,7 @@ class TestDSAKeyValue: assert new_dsa_key_value.j.text.strip() == "j" assert new_dsa_key_value.seed.text.strip() == "seed" assert new_dsa_key_value.pgen_counter.text.strip() == "pgen counter" - + def testUsingTestData(self): """Test for dsa_key_value_from_string() using test data""" new_dsa_key_value = ds.dsa_key_value_from_string( @@ -362,7 +360,7 @@ class TestKeyValue: ds_data.TEST_RSA_KEY_VALUE) new_key_value = ds.key_value_from_string(self.key_value.to_string()) assert isinstance(new_key_value.rsa_key_value, ds.RSAKeyValue) - + def testUsingTestData(self): """Test for key_value_from_string() using test data""" new_key_value = ds.key_value_from_string(ds_data.TEST_KEY_VALUE1) @@ -384,7 +382,7 @@ class TestKeyName: self.key_name.text = "key name" new_key_name = ds.key_name_from_string(self.key_name.to_string()) assert new_key_name.text.strip() == "key name" - + def testUsingTestData(self): """Test for key_name_from_string() using test data""" new_key_name = ds.key_name_from_string(ds_data.TEST_KEY_NAME) @@ -423,7 +421,7 @@ class TestKeyInfo: assert isinstance(new_key_info.spki_data[0], ds.SPKIData) assert isinstance(new_key_info.mgmt_data[0], ds.MgmtData) assert new_key_info.id == "id" - + def testUsingTestData(self): """Test for key_info_from_string() using test data""" new_key_info = ds.key_info_from_string(ds_data.TEST_KEY_INFO) @@ -436,7 +434,7 @@ class TestKeyInfo: assert isinstance(new_key_info.spki_data[0], ds.SPKIData) assert isinstance(new_key_info.mgmt_data[0], ds.MgmtData) assert new_key_info.id == "id" - + class TestDigestValue: @@ -448,7 +446,7 @@ class TestDigestValue: self.digest_value.text = "digest value" new_digest_value = ds.digest_value_from_string(self.digest_value.to_string()) assert new_digest_value.text.strip() == "digest value" - + def testUsingTestData(self): """Test for digest_value_from_string() using test data""" new_digest_value = ds.digest_value_from_string(ds_data.TEST_DIGEST_VALUE) @@ -466,7 +464,7 @@ class TestDigestMethod: new_digest_method = ds.digest_method_from_string( self.digest_method.to_string()) assert new_digest_method.algorithm == ds.DIGEST_SHA1 - + def testUsingTestData(self): """Test for digest_method_from_string() using test data""" new_digest_method = ds.digest_method_from_string( @@ -497,7 +495,7 @@ class TestReference: assert new_reference.id == "id" assert new_reference.uri == "http://www.example.com/URI" assert new_reference.type == "http://www.example.com/Type" - + def testUsingTestData(self): """Test for reference_from_string() using test data""" new_reference = ds.reference_from_string(ds_data.TEST_REFERENCE) @@ -524,7 +522,7 @@ class TestSignatureMethod: ds.HMACOutputLength) assert new_signature_method.hmac_output_length.text.strip() == "8" assert new_signature_method.algorithm == ds.SIG_RSA_SHA1 - + def testUsingTestData(self): """Test for signature_method_from_string() using test data""" new_signature_method = ds.signature_method_from_string( @@ -542,16 +540,17 @@ class TestCanonicalizationMethod: def testAccessors(self): """Test for CanonicalizationMethod accessors""" - self.canonicalization_method.algorithm = ds.C14N_WITH_C + self.canonicalization_method.algorithm = ds.TRANSFORM_C14N_WITH_COMMENTS new_canonicalization_method = ds.canonicalization_method_from_string( self.canonicalization_method.to_string()) - assert new_canonicalization_method.algorithm == ds.C14N_WITH_C - + assert new_canonicalization_method.algorithm == ds.TRANSFORM_C14N_WITH_COMMENTS + def testUsingTestData(self): """Test for canonicalization_method_from_string() using test data""" new_canonicalization_method = ds.canonicalization_method_from_string( - ds_data.TEST_CANONICALIZATION_METHOD) - assert new_canonicalization_method.algorithm == ds.C14N_WITH_C + ds_data.TEST_CANONICALIZATION_METHOD + ) + assert new_canonicalization_method.algorithm == "http://www.w3.org/TR/2001/REC-xml-c14n-20010315#WithComments" class TestSignedInfo: @@ -574,7 +573,7 @@ class TestSignedInfo: ds.CanonicalizationMethod) assert isinstance(new_si.signature_method, ds.SignatureMethod) assert isinstance(new_si.reference[0], ds.Reference) - + def testUsingTestData(self): """Test for signed_info_from_string() using test data""" new_si = ds.signed_info_from_string(ds_data.TEST_SIGNED_INFO) @@ -597,7 +596,7 @@ class TestSignatureValue: self.signature_value.to_string()) assert new_signature_value.id == "id" assert new_signature_value.text.strip() == "signature value" - + def testUsingTestData(self): """Test for signature_value_from_string() using test data""" new_signature_value = ds.signature_value_from_string( @@ -627,7 +626,7 @@ class TestSignature: assert isinstance(new_signature.signature_value, ds.SignatureValue) assert isinstance(new_signature.key_info, ds.KeyInfo) assert isinstance(new_signature.object[0], ds.Object) - + def testUsingTestData(self): """Test for signature_value_from_string() using test data""" new_signature = ds.signature_from_string(ds_data.TEST_SIGNATURE) diff --git a/tests/test_30_mdstore.py b/tests/test_30_mdstore.py index bfe261dd..bf54594e 100644 --- a/tests/test_30_mdstore.py +++ b/tests/test_30_mdstore.py @@ -3,6 +3,7 @@ import datetime import os import re +from re import compile as regex_compile from collections import OrderedDict from unittest.mock import Mock from unittest.mock import patch @@ -163,6 +164,10 @@ METADATACONF = { "class": "saml2.mdstore.MetaDataFile", "metadata": [(full_path("invalid_metadata_file.xml"),)], }], + "15": [{ + "class": "saml2.mdstore.MetaDataFile", + "metadata": [(full_path("idp_uiinfo.xml"),)], + }], } @@ -608,5 +613,50 @@ def test_extension(): assert mds.extension("entity2", "idpsso_descriptor", "test") +def test_shibmd_scope_no_regex_no_descriptor_type(): + mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) + mds.imp(METADATACONF["15"]) + + scopes = mds.sbibmd_scopes(entity_id='http://example.com/saml2/idp.xml') + all_scopes = list(scopes) + + expected = [ + { + "regexp": False, + "text": "descriptor-example.org", + }, + { + "regexp": True, + "text": regex_compile("descriptor-example[^0-9]*\.org"), + }, + ] + assert len(all_scopes) == 2 + assert all_scopes == expected + + +def test_shibmd_scope_no_regex_all_descriptors(): + mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) + mds.imp(METADATACONF["15"]) + + scopes = mds.sbibmd_scopes(entity_id='http://example.com/saml2/idp.xml', typ="idpsso_descriptor") + all_scopes = list(scopes) + expected = [ + { + "regexp": False, + "text": "descriptor-example.org", + }, + { + "regexp": True, + "text": regex_compile("descriptor-example[^0-9]*\.org"), + }, + { + "regexp": False, + "text": "idpssodescriptor-example.org", + }, + ] + assert len(all_scopes) == 3 + assert all_scopes == expected + + if __name__ == "__main__": test_metadata_extension_algsupport() diff --git a/tests/test_41_response.py b/tests/test_41_response.py index 9f1dab6b..9380ae10 100644 --- a/tests/test_41_response.py +++ b/tests/test_41_response.py @@ -48,28 +48,38 @@ class TestResponse: self._resp_ = server.create_authn_response( IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", - # consumer_url - "urn:mace:example.com:saml:roland:sp", - # sp_entity_id - name_id=name_id) + in_response_to="id12", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=name_id, + ) self._sign_resp_ = server.create_authn_response( IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id + in_response_to="id12", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=name_id, - sign_assertion=True) + sign_assertion=True, + ) self._resp_authn = server.create_authn_response( IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id + in_response_to="id12", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=name_id, + authn=AUTHN, + ) + + self._resp_issuer_none = server.create_authn_response( + IDENTITY, + in_response_to="id12", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=name_id, - authn=AUTHN) + ) + self._resp_issuer_none.issuer = None conf = config.SPConfig() conf.load_file("server_conf") @@ -77,27 +87,45 @@ class TestResponse: def test_1(self): xml_response = ("%s" % (self._resp_,)) - resp = response_factory(xml_response, self.conf, - return_addrs=[ - "http://lingon.catalogix.se:8087/"], - outstanding_queries={ - "id12": "http://localhost:8088/sso"}, - timeslack=TIMESLACK, decode=False) + resp = response_factory( + xml_response, self.conf, + return_addrs=["http://lingon.catalogix.se:8087/"], + outstanding_queries={"id12": "http://localhost:8088/sso"}, + timeslack=TIMESLACK, + decode=False, + ) assert isinstance(resp, StatusResponse) assert isinstance(resp, AuthnResponse) def test_2(self): xml_response = self._sign_resp_ - resp = response_factory(xml_response, self.conf, - return_addrs=[ - "http://lingon.catalogix.se:8087/"], - outstanding_queries={ - "id12": "http://localhost:8088/sso"}, - timeslack=TIMESLACK, decode=False) + resp = response_factory( + xml_response, + self.conf, + return_addrs=["http://lingon.catalogix.se:8087/"], + outstanding_queries={"id12": "http://localhost:8088/sso"}, + timeslack=TIMESLACK, + decode=False, + ) + + assert isinstance(resp, StatusResponse) + assert isinstance(resp, AuthnResponse) + + def test_issuer_none(self): + xml_response = ("%s" % (self._resp_issuer_none,)) + resp = response_factory( + xml_response, + self.conf, + return_addrs=["http://lingon.catalogix.se:8087/"], + outstanding_queries={"id12": "http://localhost:8088/sso"}, + timeslack=TIMESLACK, + decode=False, + ) assert isinstance(resp, StatusResponse) assert isinstance(resp, AuthnResponse) + assert resp.issuer() == "" @mock.patch('saml2.time_util.datetime') def test_false_sign(self, mock_datetime): diff --git a/tests/test_42_enc.py b/tests/test_42_enc.py index 97bdf8ea..c268aa76 100644 --- a/tests/test_42_enc.py +++ b/tests/test_42_enc.py @@ -12,8 +12,8 @@ from pathutils import full_path __author__ = 'roland' -TMPL_NO_HEADER = """<ns0:EncryptedData xmlns:ns0="http://www.w3.org/2001/04/xmlenc#" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" Id="{ed_id}" Type="http://www.w3.org/2001/04/xmlenc#Element"><ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#tripledes-cbc" /><ns1:KeyInfo><ns0:EncryptedKey Id="{ek_id}"><ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-1_5" /><ns1:KeyInfo><ns1:KeyName>my-rsa-key</ns1:KeyName></ns1:KeyInfo><ns0:CipherData><ns0:CipherValue /></ns0:CipherData></ns0:EncryptedKey></ns1:KeyInfo><ns0:CipherData><ns0:CipherValue /></ns0:CipherData></ns0:EncryptedData>""" -TMPL = "<?xml version='1.0' encoding='UTF-8'?>\n%s" % TMPL_NO_HEADER +TMPL_NO_HEADER = """<ns0:EncryptedData xmlns:ns0="http://www.w3.org/2001/04/xmlenc#" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" Id="{ed_id}" Type="http://www.w3.org/2001/04/xmlenc#Element"><ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#tripledes-cbc" /><ns1:KeyInfo><ns0:EncryptedKey Id="{ek_id}"><ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p" /><ns1:KeyInfo><ns1:KeyName>my-rsa-key</ns1:KeyName></ns1:KeyInfo><ns0:CipherData><ns0:CipherValue /></ns0:CipherData></ns0:EncryptedKey></ns1:KeyInfo><ns0:CipherData><ns0:CipherValue /></ns0:CipherData></ns0:EncryptedData>""" +TMPL = f"<?xml version='1.0' encoding='UTF-8'?>\n{TMPL_NO_HEADER}" IDENTITY = {"eduPersonAffiliation": ["staff", "member"], "surName": ["Jeter"], "givenName": ["Derek"], diff --git a/tests/test_60_sp.py b/tests/test_60_sp.py index 78e88400..b1eaa7c6 100644 --- a/tests/test_60_sp.py +++ b/tests/test_60_sp.py @@ -1,7 +1,8 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import base64 +from base64 import encodebytes as b64encode + import pytest from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.saml import NAMEID_FORMAT_TRANSIENT @@ -73,7 +74,7 @@ class TestSP(): "urn:mace:example.com:saml:roland:sp", trans_name_policy, "foba0001@example.com", authn=AUTHN) - resp_str = base64.encodestring(resp_str.encode('utf-8')) + resp_str = b64encode(resp_str.encode('utf-8')) self.sp.outstanding_queries = {"id1": "http://www.example.com/service"} session_info = self.sp._eval_authn_response( {}, {"SAMLResponse": [resp_str]}) |