diff options
author | Ivan Kanakarakis <ivan.kanak@gmail.com> | 2019-12-26 19:08:56 +0200 |
---|---|---|
committer | Ivan Kanakarakis <ivan.kanak@gmail.com> | 2019-12-26 20:49:47 +0200 |
commit | 55be003ab717423a2d685082482ee7d56897c115 (patch) | |
tree | a42961a208fe46d717ed96d7334f97ec5fd57ab5 /src/saml2/mdstore.py | |
parent | 7fd1719ffe942e59273971776db1a925255848e9 (diff) | |
download | pysaml2-55be003ab717423a2d685082482ee7d56897c115.tar.gz |
Reformat and rearrange code
Signed-off-by: Ivan Kanakarakis <ivan.kanak@gmail.com>
Diffstat (limited to 'src/saml2/mdstore.py')
-rw-r--r-- | src/saml2/mdstore.py | 114 |
1 files changed, 64 insertions, 50 deletions
diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py index 44fda723..32778af9 100644 --- a/src/saml2/mdstore.py +++ b/src/saml2/mdstore.py @@ -32,7 +32,11 @@ from saml2.s_utils import UnsupportedBinding from saml2.s_utils import UnknownSystemEntity from saml2.sigver import split_len from saml2.validate import valid_instance -from saml2.time_util import valid, instant, add_duration, before, str_to_time +from saml2.time_util import valid +from saml2.time_util import instant +from saml2.time_util import add_duration +from saml2.time_util import before +from saml2.time_util import str_to_time from saml2.validate import NotValid from saml2.sigver import security_context from saml2.extension.mdattr import NAMESPACE as NS_MDATTR @@ -72,6 +76,11 @@ ENTITY_CATEGORY = "http://macedir.org/entity-category" ENTITY_CATEGORY_SUPPORT = "http://macedir.org/entity-category-support" ASSURANCE_CERTIFICATION = "urn:oasis:names:tc:SAML:attribute:assurance-certification" +SAML_METADATA_CONTENT_TYPE = "application/samlmetadata+xml" +DEFAULT_FRESHNESS_PERIOD = "P0Y0M0DT12H0M0S" + + + REQ2SRV = { # IDP "authn_request": "single_sign_on_service", @@ -664,22 +673,21 @@ class InMemoryMetaData(MetaData): def parse_and_check_signature(self, txt): self.parse(txt) - if self.cert: - if not self.signed(): - return True - - node_name = self.node_name \ - or "%s:%s" % (md.EntitiesDescriptor.c_namespace, - md.EntitiesDescriptor.c_tag) + if not self.cert: + return True - if self.security.verify_signature( - txt, node_name=node_name, cert_file=self.cert): - return True - else: - return False - else: + if not self.signed(): return True + fallback_name = "{ns}:{tag}".format( + ns=md.EntitiesDescriptor.c_namespace, tag=md.EntitiesDescriptor.c_tag + ) + node_name = self.node_name or fallback_name + + return self.security.verify_signature( + txt, node_name=node_name, cert_file=self.cert + ) + class MetaDataFile(InMemoryMetaData): """ @@ -805,9 +813,6 @@ class MetaDataMD(InMemoryMetaData): self.entity[key] = item -SAML_METADATA_CONTENT_TYPE = 'application/samlmetadata+xml' - - class MetaDataMDX(InMemoryMetaData): """ Uses the MDQ protocol to fetch entity information. @@ -817,8 +822,9 @@ class MetaDataMDX(InMemoryMetaData): @staticmethod def sha1_entity_transform(entity_id): - return "{{sha1}}{}".format( - hashlib.sha1(entity_id.encode("utf-8")).hexdigest()) + entity_id_sha1 = hashlib.sha1(entity_id.encode("utf-8")).hexdigest() + transform = "{{sha1}}{digest}".format(digest=entity_id_sha1) + return transform def __init__(self, url=None, security=None, cert=None, entity_transform=None, freshness_period=None, **kwargs): @@ -847,9 +853,8 @@ class MetaDataMDX(InMemoryMetaData): self.cert = cert self.security = security - self.freshness_period = freshness_period - if freshness_period: - self.expiration_date = {} + self.freshness_period = freshness_period or DEFAULT_FRESHNESS_PERIOD + self.expiration_date = {} # We assume that the MDQ server will return a single entity # described by a single <EntityDescriptor> element. The protocol @@ -857,43 +862,52 @@ class MetaDataMDX(InMemoryMetaData): # <EntitiesDescriptor> element but we will not currently support # that use case since it is unlikely to be leveraged for most # flows. - self.node_name = "%s:%s" % (md.EntityDescriptor.c_namespace, - md.EntityDescriptor.c_tag) + self.node_name = "{ns}:{tag}".format( + ns=md.EntityDescriptor.c_namespace, tag=md.EntityDescriptor.c_tag + ) def load(self, *args, **kwargs): # Do nothing pass - def fetch_metadata(self, item): - mdx_url = "%s/entities/%s" % (self.url, self.entity_transform(item)) - response = requests.get(mdx_url, headers={ - 'Accept': SAML_METADATA_CONTENT_TYPE}) - if response.status_code == 200: - _txt = response.content - if self.parse_and_check_signature(_txt): - if self.freshness_period: - curr_time = str_to_time(instant()) - self.expiration_date[item] = add_duration( - curr_time, self.freshness_period) - return self.entity[item] - else: - logger.info("Response status: %s", response.status_code) - raise KeyError + def _fetch_metadata(self, item): + mdx_url = "{url}/entities/{id}".format( + url=self.url, id=self.entity_transform(item) + ) - def _is_fresh(self, item): - return self.freshness_period and before(self.expiration_date[item]) + response = requests.get(mdx_url, headers={"Accept": SAML_METADATA_CONTENT_TYPE}) + if response.status_code != 200: + error_msg = "Fething {item}: Got response status {status}".format( + item=item, status=response.status_code + ) + logger.info(error_msg) + raise KeyError(error_msg) + + _txt = response.content + if not self.parse_and_check_signature(_txt): + error_msg = "Fething {item}: invalid signature".format( + item=item, status=response.status_code + ) + logger.info(error_msg) + raise KeyError(error_msg) + + curr_time = str_to_time(instant()) + self.expiration_date[item] = add_duration(curr_time, self.freshness_period) + return self.entity[item] + + def _is_metadata_fresh(self, item): + return before(self.expiration_date[item]) def __getitem__(self, item): - if item in self.entity: - if self._is_fresh(item): - entity = self.entity[item] - else: - logger.info("Metadata for {} have expired, refreshing " - "metadata".format(item)) - self.entity.pop(item) - entity = self.fetch_metadata(item) + if item not in self.entity: + entity = self._fetch_metadata(item) + elif not self._is_metadata_fresh(item): + msg = "Metadata for {} have expired; refreshing metadata".format(item) + logger.info(msg) + old_entity = self.entity.pop(item) + entity = self._fetch_metadata(item) else: - entity = self.fetch_metadata(item) + entity = self.entity[item] return entity def single_sign_on_service(self, entity_id, binding=None, typ="idpsso"): |