diff options
author | Rebecka Gulliksson <rebecka.gulliksson@umu.se> | 2015-11-12 13:28:44 +0100 |
---|---|---|
committer | Rebecka Gulliksson <rebecka.gulliksson@umu.se> | 2015-11-16 11:24:35 +0100 |
commit | 122cb9cd8d960011a9ab40dd13aabeabd3ec8627 (patch) | |
tree | dee51243139b0253314d034676b8956fdd07ca81 | |
parent | 0111b9bf1e3e029fe0a60da24dc6bf22e2059772 (diff) | |
download | pysaml2-122cb9cd8d960011a9ab40dd13aabeabd3ec8627.tar.gz |
Refactor Metadata.certs() and move it to base class.
-rw-r--r-- | src/saml2/mdstore.py | 121 | ||||
-rw-r--r-- | tests/test_30_mdstore.py | 54 |
2 files changed, 70 insertions, 105 deletions
diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py index 687b1288..574a64e6 100644 --- a/src/saml2/mdstore.py +++ b/src/saml2/mdstore.py @@ -314,7 +314,42 @@ class MetaData(object): ''' Returns certificates for the given Entity ''' - raise NotImplementedError + ent = self[entity_id] + + def extract_certs(srvs): + res = [] + for srv in srvs: + for key in srv["key_descriptor"]: + if "use" in key and key["use"] == use: + for dat in key["key_info"]["x509_data"]: + cert = repack_cert( + dat["x509_certificate"]["text"]) + if cert not in res: + res.append(cert) + elif not "use" in key: + for dat in key["key_info"]["x509_data"]: + cert = repack_cert( + dat["x509_certificate"]["text"]) + if cert not in res: + res.append(cert) + + return res + + if descriptor == "any": + res = [] + for descr in ["spsso", "idpsso", "role", "authn_authority", + "attribute_authority", "pdp"]: + try: + srvs = ent["%s_descriptor" % descr] + except KeyError: + continue + + res.extend(extract_certs(srvs)) + else: + srvs = ent["%s_descriptor" % descriptor] + res = extract_certs(srvs) + + return res class InMemoryMetaData(MetaData): @@ -511,45 +546,6 @@ class InMemoryMetaData(MetaData): return res - def certs(self, entity_id, descriptor, use="signing"): - ent = self.__getitem__(entity_id) - if descriptor == "any": - res = [] - for descr in ["spsso", "idpsso", "role", "authn_authority", - "attribute_authority", "pdp"]: - try: - srvs = ent["%s_descriptor" % descr] - except KeyError: - continue - - for srv in srvs: - for key in srv["key_descriptor"]: - if "use" in key and key["use"] == use: - for dat in key["key_info"]["x509_data"]: - cert = repack_cert( - dat["x509_certificate"]["text"]) - if cert not in res: - res.append(cert) - elif not "use" in key: - for dat in key["key_info"]["x509_data"]: - cert = repack_cert( - dat["x509_certificate"]["text"]) - if cert not in res: - res.append(cert) - else: - srvs = ent["%s_descriptor" % descriptor] - - res = [] - for srv in srvs: - for key in srv["key_descriptor"]: - if "use" in key and key["use"] == use: - for dat in key["key_info"]["x509_data"]: - res.append(dat["x509_certificate"]["text"]) - elif not "use" in key: - for dat in key["key_info"]["x509_data"]: - res.append(dat["x509_certificate"]["text"]) - return res - def signed(self): if self.entities_descr and self.entities_descr.signature: return True @@ -567,8 +563,8 @@ class InMemoryMetaData(MetaData): return True node_name = self.node_name \ - or "%s:%s" % (md.EntitiesDescriptor.c_namespace, - md.EntitiesDescriptor.c_tag) + or "%s:%s" % (md.EntitiesDescriptor.c_namespace, + md.EntitiesDescriptor.c_tag) if self.security.verify_signature( txt, node_name=node_name, cert_file=self.cert): @@ -752,7 +748,7 @@ class MetaDataMDX(InMemoryMetaData): raise KeyError -class MetadataStore(object): +class MetadataStore(MetaData): def __init__(self, onts, attrc, config, ca_certs=None, check_validity=True, disable_ssl_certificate_validation=False, @@ -1062,45 +1058,6 @@ class MetadataStore(object): return name(_md[entity_id], langpref) return None - def certs(self, entity_id, descriptor, use="signing"): - ent = self.__getitem__(entity_id) - if descriptor == "any": - res = [] - for descr in ["spsso", "idpsso", "role", "authn_authority", - "attribute_authority", "pdp"]: - try: - srvs = ent["%s_descriptor" % descr] - except KeyError: - continue - - for srv in srvs: - for key in srv["key_descriptor"]: - if "use" in key and key["use"] == use: - for dat in key["key_info"]["x509_data"]: - cert = repack_cert( - dat["x509_certificate"]["text"]) - if cert not in res: - res.append(cert) - elif not "use" in key: - for dat in key["key_info"]["x509_data"]: - cert = repack_cert( - dat["x509_certificate"]["text"]) - if cert not in res: - res.append(cert) - else: - srvs = ent["%s_descriptor" % descriptor] - - res = [] - for srv in srvs: - for key in srv["key_descriptor"]: - if "use" in key and key["use"] == use: - for dat in key["key_info"]["x509_data"]: - res.append(dat["x509_certificate"]["text"]) - elif not "use" in key: - for dat in key["key_info"]["x509_data"]: - res.append(dat["x509_certificate"]["text"]) - return res - def vo_members(self, entity_id): ad = self.__getitem__(entity_id)["affiliation_descriptor"] return [m["text"] for m in ad["affiliate_member"]] diff --git a/tests/test_30_mdstore.py b/tests/test_30_mdstore.py index ed2b9ebd..74f508a1 100644 --- a/tests/test_30_mdstore.py +++ b/tests/test_30_mdstore.py @@ -5,11 +5,9 @@ import re from six.moves.urllib.parse import quote_plus from saml2.config import Config from saml2.httpbase import HTTPBase - from saml2.mdstore import MetadataStore, MetaDataMDX from saml2.mdstore import destinations from saml2.mdstore import name - from saml2 import md from saml2 import sigver from saml2 import BINDING_SOAP @@ -20,7 +18,6 @@ from saml2 import saml from saml2 import config from saml2.attribute_converter import ac_factory from saml2.attribute_converter import d_to_local_name - from saml2.extension import mdui from saml2.extension import idpdisc from saml2.extension import dri @@ -29,12 +26,27 @@ from saml2.extension import ui from saml2.s_utils import UnknownPrincipal from saml2 import xmldsig from saml2 import xmlenc - from pathutils import full_path sec_config = config.Config() # sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"]) +TEST_CERT = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB +gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy +3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN +efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G +A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs +iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt +U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw +mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6 +h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5 +U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6 +mrPzGzk3ECbupFnqyREH3+ZPSdk=""" + TEST_METADATA_STRING = """ <EntitiesDescriptor xmlns="urn:oasis:names:tc:SAML:2.0:metadata" @@ -51,21 +63,8 @@ TEST_METADATA_STRING = """ <ds:KeyInfo> <ds:X509Data> <ds:X509Certificate> - MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV - BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX - aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF - MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 - ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB - gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy - 3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN - efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G - A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs - iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt - U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw - mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6 - h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5 - U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6 - mrPzGzk3ECbupFnqyREH3+ZPSdk=</ds:X509Certificate> + {cert_data} + </ds:X509Certificate> </ds:X509Data> </ds:KeyInfo> </KeyDescriptor> @@ -85,7 +84,7 @@ TEST_METADATA_STRING = """ </ContactPerson> </EntityDescriptor> </EntitiesDescriptor> -""" +""".format(cert_data=TEST_CERT) ONTS = { saml.NAMESPACE: saml, @@ -149,7 +148,7 @@ METADATACONF = { }], "11": [{ "class": "saml2.mdstore.InMemoryMetaData", - "metadata": [(TEST_METADATA_STRING, )] + "metadata": [(TEST_METADATA_STRING,)] }], } @@ -372,7 +371,7 @@ def test_load_string(): disable_ssl_certificate_validation=True) mds.imp(METADATACONF["11"]) - #print(mds) + # print(mds) assert len(mds.keys()) == 1 idps = mds.with_descriptor("idpsso") @@ -384,5 +383,14 @@ def test_load_string(): assert len(certs) == 1 +def test_get_certs_from_metadata(): + mds = MetadataStore(ONTS.values(), ATTRCONV, None) + mds.imp(METADATACONF["11"]) + certs1 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "any") + certs2 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso") + + assert certs1[0] == certs2[0] == TEST_CERT + + if __name__ == "__main__": - test_load_local() + test_get_certs_from_metadata() |