summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRebecka Gulliksson <rebecka.gulliksson@umu.se>2015-11-12 13:28:44 +0100
committerRebecka Gulliksson <rebecka.gulliksson@umu.se>2015-11-16 11:24:35 +0100
commit122cb9cd8d960011a9ab40dd13aabeabd3ec8627 (patch)
treedee51243139b0253314d034676b8956fdd07ca81
parent0111b9bf1e3e029fe0a60da24dc6bf22e2059772 (diff)
downloadpysaml2-122cb9cd8d960011a9ab40dd13aabeabd3ec8627.tar.gz
Refactor Metadata.certs() and move it to base class.
-rw-r--r--src/saml2/mdstore.py121
-rw-r--r--tests/test_30_mdstore.py54
2 files changed, 70 insertions, 105 deletions
diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py
index 687b1288..574a64e6 100644
--- a/src/saml2/mdstore.py
+++ b/src/saml2/mdstore.py
@@ -314,7 +314,42 @@ class MetaData(object):
'''
Returns certificates for the given Entity
'''
- raise NotImplementedError
+ ent = self[entity_id]
+
+ def extract_certs(srvs):
+ res = []
+ for srv in srvs:
+ for key in srv["key_descriptor"]:
+ if "use" in key and key["use"] == use:
+ for dat in key["key_info"]["x509_data"]:
+ cert = repack_cert(
+ dat["x509_certificate"]["text"])
+ if cert not in res:
+ res.append(cert)
+ elif not "use" in key:
+ for dat in key["key_info"]["x509_data"]:
+ cert = repack_cert(
+ dat["x509_certificate"]["text"])
+ if cert not in res:
+ res.append(cert)
+
+ return res
+
+ if descriptor == "any":
+ res = []
+ for descr in ["spsso", "idpsso", "role", "authn_authority",
+ "attribute_authority", "pdp"]:
+ try:
+ srvs = ent["%s_descriptor" % descr]
+ except KeyError:
+ continue
+
+ res.extend(extract_certs(srvs))
+ else:
+ srvs = ent["%s_descriptor" % descriptor]
+ res = extract_certs(srvs)
+
+ return res
class InMemoryMetaData(MetaData):
@@ -511,45 +546,6 @@ class InMemoryMetaData(MetaData):
return res
- def certs(self, entity_id, descriptor, use="signing"):
- ent = self.__getitem__(entity_id)
- if descriptor == "any":
- res = []
- for descr in ["spsso", "idpsso", "role", "authn_authority",
- "attribute_authority", "pdp"]:
- try:
- srvs = ent["%s_descriptor" % descr]
- except KeyError:
- continue
-
- for srv in srvs:
- for key in srv["key_descriptor"]:
- if "use" in key and key["use"] == use:
- for dat in key["key_info"]["x509_data"]:
- cert = repack_cert(
- dat["x509_certificate"]["text"])
- if cert not in res:
- res.append(cert)
- elif not "use" in key:
- for dat in key["key_info"]["x509_data"]:
- cert = repack_cert(
- dat["x509_certificate"]["text"])
- if cert not in res:
- res.append(cert)
- else:
- srvs = ent["%s_descriptor" % descriptor]
-
- res = []
- for srv in srvs:
- for key in srv["key_descriptor"]:
- if "use" in key and key["use"] == use:
- for dat in key["key_info"]["x509_data"]:
- res.append(dat["x509_certificate"]["text"])
- elif not "use" in key:
- for dat in key["key_info"]["x509_data"]:
- res.append(dat["x509_certificate"]["text"])
- return res
-
def signed(self):
if self.entities_descr and self.entities_descr.signature:
return True
@@ -567,8 +563,8 @@ class InMemoryMetaData(MetaData):
return True
node_name = self.node_name \
- or "%s:%s" % (md.EntitiesDescriptor.c_namespace,
- md.EntitiesDescriptor.c_tag)
+ or "%s:%s" % (md.EntitiesDescriptor.c_namespace,
+ md.EntitiesDescriptor.c_tag)
if self.security.verify_signature(
txt, node_name=node_name, cert_file=self.cert):
@@ -752,7 +748,7 @@ class MetaDataMDX(InMemoryMetaData):
raise KeyError
-class MetadataStore(object):
+class MetadataStore(MetaData):
def __init__(self, onts, attrc, config, ca_certs=None,
check_validity=True,
disable_ssl_certificate_validation=False,
@@ -1062,45 +1058,6 @@ class MetadataStore(object):
return name(_md[entity_id], langpref)
return None
- def certs(self, entity_id, descriptor, use="signing"):
- ent = self.__getitem__(entity_id)
- if descriptor == "any":
- res = []
- for descr in ["spsso", "idpsso", "role", "authn_authority",
- "attribute_authority", "pdp"]:
- try:
- srvs = ent["%s_descriptor" % descr]
- except KeyError:
- continue
-
- for srv in srvs:
- for key in srv["key_descriptor"]:
- if "use" in key and key["use"] == use:
- for dat in key["key_info"]["x509_data"]:
- cert = repack_cert(
- dat["x509_certificate"]["text"])
- if cert not in res:
- res.append(cert)
- elif not "use" in key:
- for dat in key["key_info"]["x509_data"]:
- cert = repack_cert(
- dat["x509_certificate"]["text"])
- if cert not in res:
- res.append(cert)
- else:
- srvs = ent["%s_descriptor" % descriptor]
-
- res = []
- for srv in srvs:
- for key in srv["key_descriptor"]:
- if "use" in key and key["use"] == use:
- for dat in key["key_info"]["x509_data"]:
- res.append(dat["x509_certificate"]["text"])
- elif not "use" in key:
- for dat in key["key_info"]["x509_data"]:
- res.append(dat["x509_certificate"]["text"])
- return res
-
def vo_members(self, entity_id):
ad = self.__getitem__(entity_id)["affiliation_descriptor"]
return [m["text"] for m in ad["affiliate_member"]]
diff --git a/tests/test_30_mdstore.py b/tests/test_30_mdstore.py
index ed2b9ebd..74f508a1 100644
--- a/tests/test_30_mdstore.py
+++ b/tests/test_30_mdstore.py
@@ -5,11 +5,9 @@ import re
from six.moves.urllib.parse import quote_plus
from saml2.config import Config
from saml2.httpbase import HTTPBase
-
from saml2.mdstore import MetadataStore, MetaDataMDX
from saml2.mdstore import destinations
from saml2.mdstore import name
-
from saml2 import md
from saml2 import sigver
from saml2 import BINDING_SOAP
@@ -20,7 +18,6 @@ from saml2 import saml
from saml2 import config
from saml2.attribute_converter import ac_factory
from saml2.attribute_converter import d_to_local_name
-
from saml2.extension import mdui
from saml2.extension import idpdisc
from saml2.extension import dri
@@ -29,12 +26,27 @@ from saml2.extension import ui
from saml2.s_utils import UnknownPrincipal
from saml2 import xmldsig
from saml2 import xmlenc
-
from pathutils import full_path
sec_config = config.Config()
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
+TEST_CERT = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
+gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
+3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
+efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
+A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
+iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
+U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
+mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
+h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
+U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
+mrPzGzk3ECbupFnqyREH3+ZPSdk="""
+
TEST_METADATA_STRING = """
<EntitiesDescriptor
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"
@@ -51,21 +63,8 @@ TEST_METADATA_STRING = """
<ds:KeyInfo>
<ds:X509Data>
<ds:X509Certificate>
- MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
- BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
- aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
- MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
- ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
- gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
- 3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
- efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
- A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
- iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
- U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
- mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
- h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
- U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
- mrPzGzk3ECbupFnqyREH3+ZPSdk=</ds:X509Certificate>
+ {cert_data}
+ </ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</KeyDescriptor>
@@ -85,7 +84,7 @@ TEST_METADATA_STRING = """
</ContactPerson>
</EntityDescriptor>
</EntitiesDescriptor>
-"""
+""".format(cert_data=TEST_CERT)
ONTS = {
saml.NAMESPACE: saml,
@@ -149,7 +148,7 @@ METADATACONF = {
}],
"11": [{
"class": "saml2.mdstore.InMemoryMetaData",
- "metadata": [(TEST_METADATA_STRING, )]
+ "metadata": [(TEST_METADATA_STRING,)]
}],
}
@@ -372,7 +371,7 @@ def test_load_string():
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["11"])
- #print(mds)
+ # print(mds)
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
@@ -384,5 +383,14 @@ def test_load_string():
assert len(certs) == 1
+def test_get_certs_from_metadata():
+ mds = MetadataStore(ONTS.values(), ATTRCONV, None)
+ mds.imp(METADATACONF["11"])
+ certs1 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "any")
+ certs2 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso")
+
+ assert certs1[0] == certs2[0] == TEST_CERT
+
+
if __name__ == "__main__":
- test_load_local()
+ test_get_certs_from_metadata()