summaryrefslogtreecommitdiff
path: root/src/saml2/mdstore.py
diff options
context:
space:
mode:
authorRebecka Gulliksson <rebecka.gulliksson@umu.se>2015-11-12 13:28:44 +0100
committerRebecka Gulliksson <rebecka.gulliksson@umu.se>2015-11-16 11:24:35 +0100
commit122cb9cd8d960011a9ab40dd13aabeabd3ec8627 (patch)
treedee51243139b0253314d034676b8956fdd07ca81 /src/saml2/mdstore.py
parent0111b9bf1e3e029fe0a60da24dc6bf22e2059772 (diff)
downloadpysaml2-122cb9cd8d960011a9ab40dd13aabeabd3ec8627.tar.gz
Refactor Metadata.certs() and move it to base class.
Diffstat (limited to 'src/saml2/mdstore.py')
-rw-r--r--src/saml2/mdstore.py121
1 files changed, 39 insertions, 82 deletions
diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py
index 687b1288..574a64e6 100644
--- a/src/saml2/mdstore.py
+++ b/src/saml2/mdstore.py
@@ -314,7 +314,42 @@ class MetaData(object):
'''
Returns certificates for the given Entity
'''
- raise NotImplementedError
+ ent = self[entity_id]
+
+ def extract_certs(srvs):
+ res = []
+ for srv in srvs:
+ for key in srv["key_descriptor"]:
+ if "use" in key and key["use"] == use:
+ for dat in key["key_info"]["x509_data"]:
+ cert = repack_cert(
+ dat["x509_certificate"]["text"])
+ if cert not in res:
+ res.append(cert)
+ elif not "use" in key:
+ for dat in key["key_info"]["x509_data"]:
+ cert = repack_cert(
+ dat["x509_certificate"]["text"])
+ if cert not in res:
+ res.append(cert)
+
+ return res
+
+ if descriptor == "any":
+ res = []
+ for descr in ["spsso", "idpsso", "role", "authn_authority",
+ "attribute_authority", "pdp"]:
+ try:
+ srvs = ent["%s_descriptor" % descr]
+ except KeyError:
+ continue
+
+ res.extend(extract_certs(srvs))
+ else:
+ srvs = ent["%s_descriptor" % descriptor]
+ res = extract_certs(srvs)
+
+ return res
class InMemoryMetaData(MetaData):
@@ -511,45 +546,6 @@ class InMemoryMetaData(MetaData):
return res
- def certs(self, entity_id, descriptor, use="signing"):
- ent = self.__getitem__(entity_id)
- if descriptor == "any":
- res = []
- for descr in ["spsso", "idpsso", "role", "authn_authority",
- "attribute_authority", "pdp"]:
- try:
- srvs = ent["%s_descriptor" % descr]
- except KeyError:
- continue
-
- for srv in srvs:
- for key in srv["key_descriptor"]:
- if "use" in key and key["use"] == use:
- for dat in key["key_info"]["x509_data"]:
- cert = repack_cert(
- dat["x509_certificate"]["text"])
- if cert not in res:
- res.append(cert)
- elif not "use" in key:
- for dat in key["key_info"]["x509_data"]:
- cert = repack_cert(
- dat["x509_certificate"]["text"])
- if cert not in res:
- res.append(cert)
- else:
- srvs = ent["%s_descriptor" % descriptor]
-
- res = []
- for srv in srvs:
- for key in srv["key_descriptor"]:
- if "use" in key and key["use"] == use:
- for dat in key["key_info"]["x509_data"]:
- res.append(dat["x509_certificate"]["text"])
- elif not "use" in key:
- for dat in key["key_info"]["x509_data"]:
- res.append(dat["x509_certificate"]["text"])
- return res
-
def signed(self):
if self.entities_descr and self.entities_descr.signature:
return True
@@ -567,8 +563,8 @@ class InMemoryMetaData(MetaData):
return True
node_name = self.node_name \
- or "%s:%s" % (md.EntitiesDescriptor.c_namespace,
- md.EntitiesDescriptor.c_tag)
+ or "%s:%s" % (md.EntitiesDescriptor.c_namespace,
+ md.EntitiesDescriptor.c_tag)
if self.security.verify_signature(
txt, node_name=node_name, cert_file=self.cert):
@@ -752,7 +748,7 @@ class MetaDataMDX(InMemoryMetaData):
raise KeyError
-class MetadataStore(object):
+class MetadataStore(MetaData):
def __init__(self, onts, attrc, config, ca_certs=None,
check_validity=True,
disable_ssl_certificate_validation=False,
@@ -1062,45 +1058,6 @@ class MetadataStore(object):
return name(_md[entity_id], langpref)
return None
- def certs(self, entity_id, descriptor, use="signing"):
- ent = self.__getitem__(entity_id)
- if descriptor == "any":
- res = []
- for descr in ["spsso", "idpsso", "role", "authn_authority",
- "attribute_authority", "pdp"]:
- try:
- srvs = ent["%s_descriptor" % descr]
- except KeyError:
- continue
-
- for srv in srvs:
- for key in srv["key_descriptor"]:
- if "use" in key and key["use"] == use:
- for dat in key["key_info"]["x509_data"]:
- cert = repack_cert(
- dat["x509_certificate"]["text"])
- if cert not in res:
- res.append(cert)
- elif not "use" in key:
- for dat in key["key_info"]["x509_data"]:
- cert = repack_cert(
- dat["x509_certificate"]["text"])
- if cert not in res:
- res.append(cert)
- else:
- srvs = ent["%s_descriptor" % descriptor]
-
- res = []
- for srv in srvs:
- for key in srv["key_descriptor"]:
- if "use" in key and key["use"] == use:
- for dat in key["key_info"]["x509_data"]:
- res.append(dat["x509_certificate"]["text"])
- elif not "use" in key:
- for dat in key["key_info"]["x509_data"]:
- res.append(dat["x509_certificate"]["text"])
- return res
-
def vo_members(self, entity_id):
ad = self.__getitem__(entity_id)["affiliation_descriptor"]
return [m["text"] for m in ad["affiliate_member"]]