summaryrefslogtreecommitdiff
path: root/src/saml2/metadata.py
diff options
context:
space:
mode:
authorRoland Hedberg <roland.hedberg@adm.umu.se>2012-11-19 12:45:40 +0100
committerRoland Hedberg <roland.hedberg@adm.umu.se>2012-11-19 12:45:40 +0100
commitce15badb7d620d2d0b7ef8323125462946160374 (patch)
tree8413d36c8f3210e60ddcaebf4cb81ccc38249f66 /src/saml2/metadata.py
parent411fd23050e1b48d069bb6156cdb591b10e82d7f (diff)
downloadpysaml2-ce15badb7d620d2d0b7ef8323125462946160374.tar.gz
Mapping between protocol request and metadata service description
Diffstat (limited to 'src/saml2/metadata.py')
-rw-r--r--src/saml2/metadata.py241
1 files changed, 196 insertions, 45 deletions
diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py
index 4aabe7e4..6ffca1b0 100644
--- a/src/saml2/metadata.py
+++ b/src/saml2/metadata.py
@@ -59,6 +59,28 @@ from saml2.country_codes import D_COUNTRIES
logger = logging.getLogger(__name__)
+REQ2SRV = {
+ # IDP
+ "authn_request": "single_sign_on_service",
+ "nameid_mapping_request": "name_id_mapping_service",
+ # AuthnAuthority
+ "authn_query": "authn_query_service",
+ # AttributeAuthority
+ "attribute_query": "attribute_service",
+ # PDP
+ "authz_decision_query": "authz_service",
+ # AuthnAuthority + IDP + PDP + AttributeAuthority
+ "assertion_id_request": "assertion_id_request_service",
+ # IDP + SP
+ "logout_query": "single_logout_service",
+ "manage_nameid_query": "manage_name_id_service",
+ "artifact_query": "artifact_resolution_service",
+ # SP
+ "assertion_response": "assertion_consumer_service",
+ "attribute_response": "attribute_consuming_service",
+ }
+
+
def metadata_extension_modules():
_pre = "saml2.extension"
res = []
@@ -106,6 +128,7 @@ class MetaData(object):
self._keys = {}
self._extension_modules = metadata_extension_modules()
self.post_load_process = post_load_process
+ self.entities_descr = {}
def _extensions(self, entity):
if entity.extensions:
@@ -401,6 +424,7 @@ class MetaData(object):
#print >> sys.stderr, "Loading %s" % (source,)
entities_descr = md.entities_descriptor_from_string(xml_str)
+ self.entities_descr[source] = entities_descr
if not entities_descr:
entity_descr = md.entity_descriptor_from_string(xml_str)
if entity_descr:
@@ -466,8 +490,55 @@ class MetaData(object):
loc[sso.binding] = sso.location
return loc
+ def _loc(self, desc, service, binding=None):
+ loc = []
+ for des in desc:
+ endps = getattr(des, service, None)
+ if endps:
+ for endp in endps:
+ if binding:
+ if binding == endp.binding:
+ loc.append(endp.location)
+ else:
+ loc.append((endp.binding, endp.location))
+ return loc
+
@keep_updated
- def single_sign_on_services(self, entity_id,
+ def _service(self, entity_id, service, binding=None, typ=None):
+ """ Get me all single-sign-on services with a specified
+ entity ID that supports the specified version of binding.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :param service: which service that is sought for
+ :param typ: Type of service (idp, attribute_authority, ...)
+ :return: list of single-sign-on service location run by the entity
+ with the specified EntityId. Or if no binding was specified
+ a list of 2-tuples (binding, location)
+ """
+
+ loc = []
+ if typ is None:
+ try:
+ srv = self.entity[entity_id]
+ except KeyError:
+ return loc
+
+ for item in ["idpsso", "attribute_authority", "authn_authority",
+ "pdp", "role", "spsso"]:
+ if item in srv:
+ loc.extend(self._loc(srv[item], service, binding))
+ else:
+ try:
+ desc = self.entity[entity_id][typ]
+ except KeyError:
+ return loc
+
+ loc.extend(self._loc(desc, service, binding))
+
+ return loc
+
+ def single_sign_on_service(self, entity_id,
binding = BINDING_HTTP_REDIRECT):
""" Get me all single-sign-on services with a specified
entity ID that supports the specified version of binding.
@@ -478,24 +549,25 @@ class MetaData(object):
with the specified EntityId.
"""
- loc = []
- try:
- idps = self.entity[entity_id]["idpsso"]
- except KeyError:
- return loc
-
- #print idps
- for idp in idps:
- #print "==",idp.keyswv()
- for sso in idp.single_sign_on_service:
- #print "SSO",sso
- if binding == sso.binding:
- loc.append(sso.location)
- return loc
+ return self._service(entity_id, "single_sign_on_service", binding,
+ "idpsso")
+
+ def name_id_mapping_service(self, entity_id, binding=BINDING_HTTP_REDIRECT):
+ """ Get me all nameID mapping services with a specified
+ entity ID that supports the specified version of binding.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :return: list of single-sign-on service location run by the entity
+ with the specified EntityId.
+ """
+
+ return self._service(entity_id, "name_id_mapping_service", binding,
+ "idpsso")
@keep_updated
def single_sign_on_services_with_uiinfo(self, entity_id,
- binding = BINDING_HTTP_REDIRECT):
+ binding=BINDING_HTTP_REDIRECT):
""" Get me all single-sign-on services with a specified
entity ID that supports the specified version of binding.
@@ -524,35 +596,125 @@ class MetaData(object):
loc.append((sso.location, uiinfo))
return loc
- @keep_updated
- def single_logout_services(self, entity_id, typ,
- binding = BINDING_HTTP_REDIRECT):
+ def single_logout_service(self, entity_id, binding=BINDING_HTTP_REDIRECT,
+ typ=None):
""" Get me all single-logout services that supports the specified
binding version.
:param entity_id: The EntityId
- :param typ: "sp", "idp" or "aa"
:param binding: A binding identifier
+ :param typ: If a specific service is wanted
:return: list of single-logout service location run by the entity
with the specified EntityId.
"""
- # May raise KeyError
- #print >> sys.stderr, "%s" % self.entity[entity_id]
+ return self._service(entity_id, "single_logout_service", binding, typ)
- loc = []
-
- try:
- sss = self.entity[entity_id]["%ssso" % typ]
- except KeyError:
- return loc
+ def authn_query_service(self, entity_id, binding=BINDING_SOAP):
+ """ Get me all authn_query services that supports the specified
+ binding version.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :return: list service locations.
+ """
+
+ return self._service(entity_id, "single_logout_service", binding,
+ typ="authn_authority")
+
+ def attribute_service(self, entity_id, binding=BINDING_HTTP_REDIRECT):
+ """ Get me all authn_query services that supports the specified
+ binding version.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :return: list service locations.
+ """
+
+ return self._service(entity_id, "attribute_service", binding,
+ typ="attribute_authority")
+
+ def authz_service(self, entity_id, binding=BINDING_SOAP, typ="pdp"):
+ """ Get me all authz services that supports the specified
+ binding version.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :return: list service locations.
+ """
+
+ return self._service(entity_id, "authz_service", binding, typ)
+
+ def assertion_id_request_service(self, entity_id,
+ binding=BINDING_SOAP, typ=None):
+ """ Get me all assertion_id_request services that supports the specified
+ binding version.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :param typ: Type of service container (idpsso, .. )
+ :return: list service locations.
+ """
+
+ return self._service(entity_id, "attribute_service", binding,
+ typ)
+
+ def manage_name_id_service(self, entity_id,
+ binding=BINDING_HTTP_REDIRECT, typ=None):
+ """ Get me all manage_name_id services that supports the specified
+ binding version.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :param typ: Type of service container (idpsso, .. )
+ :return: list service locations.
+ """
+
+ return self._service(entity_id, "manage_name_id_service", binding,
+ typ)
+
+ def artifact_resolution_service(self, entity_id,
+ binding=BINDING_HTTP_REDIRECT, typ=None):
+ """ Get me all artifact_resolution services that supports the specified
+ binding version.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :param typ: Type of service container (idpsso, .. )
+ :return: list service locations.
+ """
+
+ return self._service(entity_id, "artifact_resolution_service", binding,
+ typ)
+
+ def assertion_consumer_service(self, entity_id, binding=BINDING_HTTP_POST,
+ typ="spsso"):
+ """ Get me all artifact_resolution services that supports the specified
+ binding version.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :param typ: Type of service container (idpsso, .. )
+ :return: list service locations.
+ """
+
+ return self._service(entity_id, "assertion_consumer_service", binding,
+ typ)
+
+ def attribute_consuming_service(self, entity_id,
+ binding=BINDING_HTTP_REDIRECT, typ="spsso"):
+ """ Get me all artifact_resolution services that supports the specified
+ binding version.
+
+ :param entity_id: The EntityId
+ :param binding: A binding identifier
+ :param typ: Type of service container (idpsso, .. )
+ :return: list service locations.
+ """
+
+ return self._service(entity_id, "attribute_consuming_service", binding,
+ typ)
- for entity in sss:
- for slo in entity.single_logout_service:
- if binding == slo.binding:
- loc.append(slo.location)
- return loc
-
@keep_updated
def attribute_authority(self, entity_id):
try:
@@ -567,17 +729,6 @@ class MetaData(object):
except KeyError:
return []
- def authz_service_endpoints(self, entity_id, binding=BINDING_SOAP):
- try:
- result = []
- for pdp in self.entity[entity_id]["pdp"]:
- for aserv in pdp.authz_service:
- if aserv.binding == binding:
- result.append(aserv.location)
- return result
- except KeyError:
- return []
-
def locations(self):
""" Returns all the locations that are know using this metadata file.