diff options
-rw-r--r-- | src/saml2/metadata.py | 241 |
1 files changed, 196 insertions, 45 deletions
diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py index 4aabe7e4..6ffca1b0 100644 --- a/src/saml2/metadata.py +++ b/src/saml2/metadata.py @@ -59,6 +59,28 @@ from saml2.country_codes import D_COUNTRIES logger = logging.getLogger(__name__) +REQ2SRV = { + # IDP + "authn_request": "single_sign_on_service", + "nameid_mapping_request": "name_id_mapping_service", + # AuthnAuthority + "authn_query": "authn_query_service", + # AttributeAuthority + "attribute_query": "attribute_service", + # PDP + "authz_decision_query": "authz_service", + # AuthnAuthority + IDP + PDP + AttributeAuthority + "assertion_id_request": "assertion_id_request_service", + # IDP + SP + "logout_query": "single_logout_service", + "manage_nameid_query": "manage_name_id_service", + "artifact_query": "artifact_resolution_service", + # SP + "assertion_response": "assertion_consumer_service", + "attribute_response": "attribute_consuming_service", + } + + def metadata_extension_modules(): _pre = "saml2.extension" res = [] @@ -106,6 +128,7 @@ class MetaData(object): self._keys = {} self._extension_modules = metadata_extension_modules() self.post_load_process = post_load_process + self.entities_descr = {} def _extensions(self, entity): if entity.extensions: @@ -401,6 +424,7 @@ class MetaData(object): #print >> sys.stderr, "Loading %s" % (source,) entities_descr = md.entities_descriptor_from_string(xml_str) + self.entities_descr[source] = entities_descr if not entities_descr: entity_descr = md.entity_descriptor_from_string(xml_str) if entity_descr: @@ -466,8 +490,55 @@ class MetaData(object): loc[sso.binding] = sso.location return loc + def _loc(self, desc, service, binding=None): + loc = [] + for des in desc: + endps = getattr(des, service, None) + if endps: + for endp in endps: + if binding: + if binding == endp.binding: + loc.append(endp.location) + else: + loc.append((endp.binding, endp.location)) + return loc + @keep_updated - def single_sign_on_services(self, entity_id, + def _service(self, entity_id, service, binding=None, typ=None): + """ Get me all single-sign-on services with a specified + entity ID that supports the specified version of binding. + + :param entity_id: The EntityId + :param binding: A binding identifier + :param service: which service that is sought for + :param typ: Type of service (idp, attribute_authority, ...) + :return: list of single-sign-on service location run by the entity + with the specified EntityId. Or if no binding was specified + a list of 2-tuples (binding, location) + """ + + loc = [] + if typ is None: + try: + srv = self.entity[entity_id] + except KeyError: + return loc + + for item in ["idpsso", "attribute_authority", "authn_authority", + "pdp", "role", "spsso"]: + if item in srv: + loc.extend(self._loc(srv[item], service, binding)) + else: + try: + desc = self.entity[entity_id][typ] + except KeyError: + return loc + + loc.extend(self._loc(desc, service, binding)) + + return loc + + def single_sign_on_service(self, entity_id, binding = BINDING_HTTP_REDIRECT): """ Get me all single-sign-on services with a specified entity ID that supports the specified version of binding. @@ -478,24 +549,25 @@ class MetaData(object): with the specified EntityId. """ - loc = [] - try: - idps = self.entity[entity_id]["idpsso"] - except KeyError: - return loc - - #print idps - for idp in idps: - #print "==",idp.keyswv() - for sso in idp.single_sign_on_service: - #print "SSO",sso - if binding == sso.binding: - loc.append(sso.location) - return loc + return self._service(entity_id, "single_sign_on_service", binding, + "idpsso") + + def name_id_mapping_service(self, entity_id, binding=BINDING_HTTP_REDIRECT): + """ Get me all nameID mapping services with a specified + entity ID that supports the specified version of binding. + + :param entity_id: The EntityId + :param binding: A binding identifier + :return: list of single-sign-on service location run by the entity + with the specified EntityId. + """ + + return self._service(entity_id, "name_id_mapping_service", binding, + "idpsso") @keep_updated def single_sign_on_services_with_uiinfo(self, entity_id, - binding = BINDING_HTTP_REDIRECT): + binding=BINDING_HTTP_REDIRECT): """ Get me all single-sign-on services with a specified entity ID that supports the specified version of binding. @@ -524,35 +596,125 @@ class MetaData(object): loc.append((sso.location, uiinfo)) return loc - @keep_updated - def single_logout_services(self, entity_id, typ, - binding = BINDING_HTTP_REDIRECT): + def single_logout_service(self, entity_id, binding=BINDING_HTTP_REDIRECT, + typ=None): """ Get me all single-logout services that supports the specified binding version. :param entity_id: The EntityId - :param typ: "sp", "idp" or "aa" :param binding: A binding identifier + :param typ: If a specific service is wanted :return: list of single-logout service location run by the entity with the specified EntityId. """ - # May raise KeyError - #print >> sys.stderr, "%s" % self.entity[entity_id] + return self._service(entity_id, "single_logout_service", binding, typ) - loc = [] - - try: - sss = self.entity[entity_id]["%ssso" % typ] - except KeyError: - return loc + def authn_query_service(self, entity_id, binding=BINDING_SOAP): + """ Get me all authn_query services that supports the specified + binding version. + + :param entity_id: The EntityId + :param binding: A binding identifier + :return: list service locations. + """ + + return self._service(entity_id, "single_logout_service", binding, + typ="authn_authority") + + def attribute_service(self, entity_id, binding=BINDING_HTTP_REDIRECT): + """ Get me all authn_query services that supports the specified + binding version. + + :param entity_id: The EntityId + :param binding: A binding identifier + :return: list service locations. + """ + + return self._service(entity_id, "attribute_service", binding, + typ="attribute_authority") + + def authz_service(self, entity_id, binding=BINDING_SOAP, typ="pdp"): + """ Get me all authz services that supports the specified + binding version. + + :param entity_id: The EntityId + :param binding: A binding identifier + :return: list service locations. + """ + + return self._service(entity_id, "authz_service", binding, typ) + + def assertion_id_request_service(self, entity_id, + binding=BINDING_SOAP, typ=None): + """ Get me all assertion_id_request services that supports the specified + binding version. + + :param entity_id: The EntityId + :param binding: A binding identifier + :param typ: Type of service container (idpsso, .. ) + :return: list service locations. + """ + + return self._service(entity_id, "attribute_service", binding, + typ) + + def manage_name_id_service(self, entity_id, + binding=BINDING_HTTP_REDIRECT, typ=None): + """ Get me all manage_name_id services that supports the specified + binding version. + + :param entity_id: The EntityId + :param binding: A binding identifier + :param typ: Type of service container (idpsso, .. ) + :return: list service locations. + """ + + return self._service(entity_id, "manage_name_id_service", binding, + typ) + + def artifact_resolution_service(self, entity_id, + binding=BINDING_HTTP_REDIRECT, typ=None): + """ Get me all artifact_resolution services that supports the specified + binding version. + + :param entity_id: The EntityId + :param binding: A binding identifier + :param typ: Type of service container (idpsso, .. ) + :return: list service locations. + """ + + return self._service(entity_id, "artifact_resolution_service", binding, + typ) + + def assertion_consumer_service(self, entity_id, binding=BINDING_HTTP_POST, + typ="spsso"): + """ Get me all artifact_resolution services that supports the specified + binding version. + + :param entity_id: The EntityId + :param binding: A binding identifier + :param typ: Type of service container (idpsso, .. ) + :return: list service locations. + """ + + return self._service(entity_id, "assertion_consumer_service", binding, + typ) + + def attribute_consuming_service(self, entity_id, + binding=BINDING_HTTP_REDIRECT, typ="spsso"): + """ Get me all artifact_resolution services that supports the specified + binding version. + + :param entity_id: The EntityId + :param binding: A binding identifier + :param typ: Type of service container (idpsso, .. ) + :return: list service locations. + """ + + return self._service(entity_id, "attribute_consuming_service", binding, + typ) - for entity in sss: - for slo in entity.single_logout_service: - if binding == slo.binding: - loc.append(slo.location) - return loc - @keep_updated def attribute_authority(self, entity_id): try: @@ -567,17 +729,6 @@ class MetaData(object): except KeyError: return [] - def authz_service_endpoints(self, entity_id, binding=BINDING_SOAP): - try: - result = [] - for pdp in self.entity[entity_id]["pdp"]: - for aserv in pdp.authz_service: - if aserv.binding == binding: - result.append(aserv.location) - return result - except KeyError: - return [] - def locations(self): """ Returns all the locations that are know using this metadata file. |