#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime import os import re from collections import OrderedDict from unittest.mock import Mock from unittest.mock import patch import responses from six.moves.urllib import parse from saml2.config import Config from saml2.mdstore import MetadataStore, MetaDataExtern from saml2.mdstore import MetaDataMDX from saml2.mdstore import SAML_METADATA_CONTENT_TYPE from saml2.mdstore import destinations from saml2.mdstore import name from saml2 import sigver from saml2.httpbase import HTTPBase from saml2 import BINDING_SOAP from saml2 import BINDING_HTTP_REDIRECT from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_ARTIFACT from saml2 import config from saml2.attribute_converter import ac_factory from saml2.attribute_converter import d_to_local_name from saml2.s_utils import UnknownPrincipal from pathutils import full_path TESTS_DIR = os.path.dirname(__file__) sec_config = config.Config() # sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"]) TEST_CERT = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy 3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6 h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5 U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6 mrPzGzk3ECbupFnqyREH3+ZPSdk=""" TEST_METADATA_STRING = """ {cert_data} urn:oasis:names:tc:SAML:2.0:nameid-format:transient Catalogix Catalogix http://www.catalogix.se Hedberg datordrift@catalogix.se """.format(cert_data=TEST_CERT) ATTRCONV = ac_factory(full_path("attributemaps")) METADATACONF = { "1": [{ "class": "saml2.mdstore.MetaDataFile", "metadata": [(full_path("swamid-1.0.xml"),)], }], "2": [{ "class": "saml2.mdstore.MetaDataFile", "metadata": [(full_path("InCommon-metadata.xml"),)], }], "3": [{ "class": "saml2.mdstore.MetaDataFile", "metadata": [(full_path("extended.xml"),)], }], # "7": [{ # "class": "saml2.mdstore.MetaDataFile", # "metadata": [(full_path("metadata_sp_1.xml"), ), # (full_path("InCommon-metadata.xml"), )], }, # { # "class": "saml2.mdstore.MetaDataExtern", # "metadata": [ # ("https://kalmar2.org/simplesaml/module.php/aggregator/?id # =kalmarcentral2&set=saml2", # full_path("kalmar2.pem")), ], # }], "4": [{ "class": "saml2.mdstore.MetaDataFile", "metadata": [(full_path("metadata_example.xml"),)], }], "5": [{ "class": "saml2.mdstore.MetaDataFile", "metadata": [(full_path("metadata.aaitest.xml"),)], }], "8": [{ "class": "saml2.mdstore.MetaDataMD", "metadata": [(full_path("swamid.md"),)], }], "9": [{ "class": "saml2.mdstore.MetaDataFile", "metadata": [(full_path("metadata"),)] }], "10": [{ "class": "saml2.mdstore.MetaDataExtern", "metadata": [ ("http://md.incommon.org/InCommon/InCommon-metadata-export.xml", full_path("inc-md-cert.pem"))] }], "11": [{ "class": "saml2.mdstore.InMemoryMetaData", "metadata": [(TEST_METADATA_STRING,)] }], "12": [{ "class": "saml2.mdstore.MetaDataFile", "metadata": [(full_path("uu.xml"),)], }], } def _eq(l1, l2): return set(l1) == set(l2) def _fix_valid_until(xmlstring): new_date = datetime.datetime.now() + datetime.timedelta(days=1) new_date = new_date.strftime("%Y-%m-%dT%H:%M:%SZ") return re.sub(r' validUntil=".*?"', ' validUntil="%s"' % new_date, xmlstring) def test_swami_1(): UMU_IDP = 'https://idp.umu.se/saml2/idp/metadata.php' mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["1"]) assert len(mds) == 1 # One source idps = mds.with_descriptor("idpsso") assert idps.keys() idpsso = mds.single_sign_on_service(UMU_IDP) assert len(idpsso) == 1 assert destinations(idpsso) == [ 'https://idp.umu.se/saml2/idp/SSOService.php'] _name = name(mds[UMU_IDP]) assert _name == u'UmeƄ University (SAML2)' certs = mds.certs(UMU_IDP, "idpsso", "signing") assert len(certs) == 1 sps = mds.with_descriptor("spsso") assert len(sps) == 108 wants = mds.attribute_requirement('https://connect8.sunet.se/shibboleth') lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]] assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn', 'eduPersonScopedAffiliation']) wants = mds.attribute_requirement('https://beta.lobber.se/shibboleth') assert wants["required"] == [] lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]] assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn', 'eduPersonScopedAffiliation', 'eduPersonEntitlement']) def test_incommon_1(): mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["2"]) print(mds.entities()) assert mds.entities() > 1700 idps = mds.with_descriptor("idpsso") print(idps.keys()) assert len(idps) > 300 # ~ 18% try: _ = mds.single_sign_on_service('urn:mace:incommon:uiuc.edu') except UnknownPrincipal: pass idpsso = mds.single_sign_on_service('urn:mace:incommon:alaska.edu') assert len(idpsso) == 1 print(idpsso) assert destinations(idpsso) == [ 'https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO'] sps = mds.with_descriptor("spsso") acs_sp = [] for nam, desc in sps.items(): if "attribute_consuming_service" in desc: acs_sp.append(nam) assert len(acs_sp) == 0 # Look for attribute authorities aas = mds.with_descriptor("attribute_authority") print(aas.keys()) assert len(aas) == 180 def test_ext_2(): mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["3"]) # No specific binding defined ents = mds.with_descriptor("spsso") for binding in [BINDING_SOAP, BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT, BINDING_HTTP_REDIRECT]: assert mds.single_logout_service(list(ents.keys())[0], binding, "spsso") def test_example(): mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["4"]) assert len(mds.keys()) == 1 idps = mds.with_descriptor("idpsso") assert list(idps.keys()) == [ 'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php'] certs = mds.certs( 'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php', "idpsso", "signing") assert len(certs) == 1 def test_switch_1(): mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["5"]) assert len(mds.keys()) > 160 idps = mds.with_descriptor("idpsso") print(idps.keys()) idpsso = mds.single_sign_on_service( 'https://aai-demo-idp.switch.ch/idp/shibboleth') assert len(idpsso) == 1 print(idpsso) assert destinations(idpsso) == [ 'https://aai-demo-idp.switch.ch/idp/profile/SAML2/Redirect/SSO'] assert len(idps) > 30 aas = mds.with_descriptor("attribute_authority") print(aas.keys()) aad = aas['https://aai-demo-idp.switch.ch/idp/shibboleth'] print(aad.keys()) assert len(aad["attribute_authority_descriptor"]) == 1 assert len(aad["idpsso_descriptor"]) == 1 sps = mds.with_descriptor("spsso") dual = [eid for eid, ent in idps.items() if eid in sps] print(len(dual)) assert len(dual) == 0 def test_metadata_file(): sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"]) mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["8"]) print(len(mds.keys())) assert len(mds.keys()) == 560 @responses.activate def test_mdx_service(): entity_id = "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php" url = "http://mdx.example.com/entities/{}".format( parse.quote_plus(MetaDataMDX.sha1_entity_transform(entity_id))) responses.add(responses.GET, url, body=TEST_METADATA_STRING, status=200, content_type=SAML_METADATA_CONTENT_TYPE) mdx = MetaDataMDX("http://mdx.example.com") sso_loc = mdx.service(entity_id, "idpsso_descriptor", "single_sign_on_service") assert sso_loc[BINDING_HTTP_REDIRECT][0]["location"] == "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php" certs = mdx.certs(entity_id, "idpsso") assert len(certs) == 1 @responses.activate def test_mdx_single_sign_on_service(): entity_id = "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php" url = "http://mdx.example.com/entities/{}".format( parse.quote_plus(MetaDataMDX.sha1_entity_transform(entity_id))) responses.add(responses.GET, url, body=TEST_METADATA_STRING, status=200, content_type=SAML_METADATA_CONTENT_TYPE) mdx = MetaDataMDX("http://mdx.example.com") sso_loc = mdx.single_sign_on_service(entity_id, BINDING_HTTP_REDIRECT) assert sso_loc[0]["location"] == "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php" @responses.activate def test_mdx_metadata_freshness_period_not_expired(): """Ensure that metadata is not refreshed if not expired.""" entity_id = "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php" url = "http://mdx.example.com/entities/{}".format( parse.quote_plus(MetaDataMDX.sha1_entity_transform(entity_id)) ) responses.add( responses.GET, url, body=TEST_METADATA_STRING, status=200, content_type=SAML_METADATA_CONTENT_TYPE, ) mdx = MetaDataMDX("http://mdx.example.com", freshness_period="P0Y0M0DT0H2M0S") mdx._is_metadata_fresh = Mock(return_value=True) mdx.single_sign_on_service(entity_id, BINDING_HTTP_REDIRECT) assert entity_id in mdx.entity mdx.single_sign_on_service(entity_id, BINDING_HTTP_REDIRECT) assert len(responses.calls) == 1 @responses.activate def test_mdx_metadata_freshness_period_expired(): """Ensure that metadata is not refreshed if not expired.""" entity_id = "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php" url = "http://mdx.example.com/entities/{}".format( parse.quote_plus(MetaDataMDX.sha1_entity_transform(entity_id)) ) responses.add( responses.GET, url, body=TEST_METADATA_STRING, status=200, content_type=SAML_METADATA_CONTENT_TYPE, ) mdx = MetaDataMDX("http://mdx.example.com", freshness_period="P0Y0M0DT0H2M0S") mdx._is_metadata_fresh = Mock(return_value=False) mdx.single_sign_on_service(entity_id, BINDING_HTTP_REDIRECT) assert entity_id in mdx.entity mdx.single_sign_on_service(entity_id, BINDING_HTTP_REDIRECT) assert len(responses.calls) == 2 # pyff-test not available # def test_mdx_service(): # sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"]) # http = HTTPBase(verify=False, ca_bundle=None) # # mdx = MetaDataMDX(quote_plus, ATTRCONV, # "http://pyff-test.nordu.net", # sec_config, None, http) # foo = mdx.service("https://idp.umu.se/saml2/idp/metadata.php", # "idpsso_descriptor", "single_sign_on_service") # # assert len(foo) == 1 # assert foo.keys()[0] == BINDING_HTTP_REDIRECT # # # def test_mdx_certs(): # sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"]) # http = HTTPBase(verify=False, ca_bundle=None) # # mdx = MetaDataMDX(quote_plus, ATTRCONV, # "http://pyff-test.nordu.net", # sec_config, None, http) # foo = mdx.certs("https://idp.umu.se/saml2/idp/metadata.php", "idpsso") # # assert len(foo) == 1 def test_load_local_dir(): sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"]) mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["9"]) print(mds) assert len(mds) == 3 # Three sources assert len(mds.keys()) == 4 # number of idps @patch('saml2.httpbase.requests.request') def test_load_extern_incommon(mock_request): filepath = os.path.join(TESTS_DIR, "remote_data/InCommon-metadata-export.xml") with open(filepath) as fd: data = fd.read() mock_request.return_value.ok = True mock_request.return_value.status_code = 200 mock_request.return_value.content = data sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"]) mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["10"]) print(mds) assert mds assert len(mds.keys()) def test_load_local(): # string representation of XML idp definition with open(full_path("metadata.xml")) as fp: idp_metadata = fp.read() saml_config = Config() config_dict = { "metadata": {"inline": [idp_metadata]} } cfg = saml_config.load(config_dict) assert cfg @patch('saml2.httpbase.requests.request') def test_load_remote_encoding(mock_request): filepath = os.path.join(TESTS_DIR, "remote_data/metadata.aaitest.xml") with open(filepath) as fd: data = fd.read() mock_request.return_value.ok = True mock_request.return_value.status_code = 200 mock_request.return_value.content = data crypto = sigver._get_xmlsec_cryptobackend() sc = sigver.SecurityContext(crypto, key_type="", cert_type="") httpc = HTTPBase() mds = MetaDataExtern(ATTRCONV, 'http://metadata.aai.switch.ch/metadata.aaitest.xml', sc, full_path('SWITCHaaiRootCA.crt.pem'), httpc) mds.load() def test_load_string(): sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"]) mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["11"]) # print(mds) assert len(mds.keys()) == 1 idps = mds.with_descriptor("idpsso") assert list(idps.keys()) == [ 'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php'] certs = mds.certs( 'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php', "idpsso", "signing") assert len(certs) == 1 def test_get_certs_from_metadata(): mds = MetadataStore(ATTRCONV, None) mds.imp(METADATACONF["11"]) certs1 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "any") certs2 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso") assert certs1[0] == certs2[0] == TEST_CERT def test_get_certs_from_metadata_without_keydescriptor(): mds = MetadataStore(ATTRCONV, None) mds.imp([{ "class": "saml2.mdstore.InMemoryMetaData", "metadata": [(""" urn:oasis:names:tc:SAML:2.0:nameid-format:transient Catalogix Catalogix http://www.catalogix.se Hedberg datordrift@catalogix.se """,)] }]) certs = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso") assert len(certs) == 0 def test_metadata_extension_algsupport(): mds = MetadataStore(ATTRCONV, None) mds.imp(METADATACONF["12"]) mdf = mds.metadata[full_path("uu.xml")] assert mds def test_supported_algorithms(): mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["11"]) algs = mds.supported_algorithms(entity_id='http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php') assert 'http://www.w3.org/2001/04/xmlenc#sha256' in algs['digest_methods'] assert 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha256' in algs['signing_methods'] def test_extension(): mds = MetadataStore(ATTRCONV, None) # use ordered dict to force expected entity to be last metadata = OrderedDict() metadata["1"] = {"entity1": {}} metadata["2"] = {"entity2": {"idpsso_descriptor": [{"extensions": {"extension_elements": [{"__class__": "test"}]}}]}} mds.metadata = metadata assert mds.extension("entity2", "idpsso_descriptor", "test") if __name__ == "__main__": test_metadata_extension_algsupport()