summaryrefslogtreecommitdiff
path: root/src/saml2/mdstore.py
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2019-12-26 19:08:56 +0200
committerIvan Kanakarakis <ivan.kanak@gmail.com>2019-12-26 20:49:47 +0200
commit55be003ab717423a2d685082482ee7d56897c115 (patch)
treea42961a208fe46d717ed96d7334f97ec5fd57ab5 /src/saml2/mdstore.py
parent7fd1719ffe942e59273971776db1a925255848e9 (diff)
downloadpysaml2-55be003ab717423a2d685082482ee7d56897c115.tar.gz
Reformat and rearrange code
Signed-off-by: Ivan Kanakarakis <ivan.kanak@gmail.com>
Diffstat (limited to 'src/saml2/mdstore.py')
-rw-r--r--src/saml2/mdstore.py114
1 files changed, 64 insertions, 50 deletions
diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py
index 44fda723..32778af9 100644
--- a/src/saml2/mdstore.py
+++ b/src/saml2/mdstore.py
@@ -32,7 +32,11 @@ from saml2.s_utils import UnsupportedBinding
from saml2.s_utils import UnknownSystemEntity
from saml2.sigver import split_len
from saml2.validate import valid_instance
-from saml2.time_util import valid, instant, add_duration, before, str_to_time
+from saml2.time_util import valid
+from saml2.time_util import instant
+from saml2.time_util import add_duration
+from saml2.time_util import before
+from saml2.time_util import str_to_time
from saml2.validate import NotValid
from saml2.sigver import security_context
from saml2.extension.mdattr import NAMESPACE as NS_MDATTR
@@ -72,6 +76,11 @@ ENTITY_CATEGORY = "http://macedir.org/entity-category"
ENTITY_CATEGORY_SUPPORT = "http://macedir.org/entity-category-support"
ASSURANCE_CERTIFICATION = "urn:oasis:names:tc:SAML:attribute:assurance-certification"
+SAML_METADATA_CONTENT_TYPE = "application/samlmetadata+xml"
+DEFAULT_FRESHNESS_PERIOD = "P0Y0M0DT12H0M0S"
+
+
+
REQ2SRV = {
# IDP
"authn_request": "single_sign_on_service",
@@ -664,22 +673,21 @@ class InMemoryMetaData(MetaData):
def parse_and_check_signature(self, txt):
self.parse(txt)
- if self.cert:
- if not self.signed():
- return True
-
- node_name = self.node_name \
- or "%s:%s" % (md.EntitiesDescriptor.c_namespace,
- md.EntitiesDescriptor.c_tag)
+ if not self.cert:
+ return True
- if self.security.verify_signature(
- txt, node_name=node_name, cert_file=self.cert):
- return True
- else:
- return False
- else:
+ if not self.signed():
return True
+ fallback_name = "{ns}:{tag}".format(
+ ns=md.EntitiesDescriptor.c_namespace, tag=md.EntitiesDescriptor.c_tag
+ )
+ node_name = self.node_name or fallback_name
+
+ return self.security.verify_signature(
+ txt, node_name=node_name, cert_file=self.cert
+ )
+
class MetaDataFile(InMemoryMetaData):
"""
@@ -805,9 +813,6 @@ class MetaDataMD(InMemoryMetaData):
self.entity[key] = item
-SAML_METADATA_CONTENT_TYPE = 'application/samlmetadata+xml'
-
-
class MetaDataMDX(InMemoryMetaData):
"""
Uses the MDQ protocol to fetch entity information.
@@ -817,8 +822,9 @@ class MetaDataMDX(InMemoryMetaData):
@staticmethod
def sha1_entity_transform(entity_id):
- return "{{sha1}}{}".format(
- hashlib.sha1(entity_id.encode("utf-8")).hexdigest())
+ entity_id_sha1 = hashlib.sha1(entity_id.encode("utf-8")).hexdigest()
+ transform = "{{sha1}}{digest}".format(digest=entity_id_sha1)
+ return transform
def __init__(self, url=None, security=None, cert=None,
entity_transform=None, freshness_period=None, **kwargs):
@@ -847,9 +853,8 @@ class MetaDataMDX(InMemoryMetaData):
self.cert = cert
self.security = security
- self.freshness_period = freshness_period
- if freshness_period:
- self.expiration_date = {}
+ self.freshness_period = freshness_period or DEFAULT_FRESHNESS_PERIOD
+ self.expiration_date = {}
# We assume that the MDQ server will return a single entity
# described by a single <EntityDescriptor> element. The protocol
@@ -857,43 +862,52 @@ class MetaDataMDX(InMemoryMetaData):
# <EntitiesDescriptor> element but we will not currently support
# that use case since it is unlikely to be leveraged for most
# flows.
- self.node_name = "%s:%s" % (md.EntityDescriptor.c_namespace,
- md.EntityDescriptor.c_tag)
+ self.node_name = "{ns}:{tag}".format(
+ ns=md.EntityDescriptor.c_namespace, tag=md.EntityDescriptor.c_tag
+ )
def load(self, *args, **kwargs):
# Do nothing
pass
- def fetch_metadata(self, item):
- mdx_url = "%s/entities/%s" % (self.url, self.entity_transform(item))
- response = requests.get(mdx_url, headers={
- 'Accept': SAML_METADATA_CONTENT_TYPE})
- if response.status_code == 200:
- _txt = response.content
- if self.parse_and_check_signature(_txt):
- if self.freshness_period:
- curr_time = str_to_time(instant())
- self.expiration_date[item] = add_duration(
- curr_time, self.freshness_period)
- return self.entity[item]
- else:
- logger.info("Response status: %s", response.status_code)
- raise KeyError
+ def _fetch_metadata(self, item):
+ mdx_url = "{url}/entities/{id}".format(
+ url=self.url, id=self.entity_transform(item)
+ )
- def _is_fresh(self, item):
- return self.freshness_period and before(self.expiration_date[item])
+ response = requests.get(mdx_url, headers={"Accept": SAML_METADATA_CONTENT_TYPE})
+ if response.status_code != 200:
+ error_msg = "Fething {item}: Got response status {status}".format(
+ item=item, status=response.status_code
+ )
+ logger.info(error_msg)
+ raise KeyError(error_msg)
+
+ _txt = response.content
+ if not self.parse_and_check_signature(_txt):
+ error_msg = "Fething {item}: invalid signature".format(
+ item=item, status=response.status_code
+ )
+ logger.info(error_msg)
+ raise KeyError(error_msg)
+
+ curr_time = str_to_time(instant())
+ self.expiration_date[item] = add_duration(curr_time, self.freshness_period)
+ return self.entity[item]
+
+ def _is_metadata_fresh(self, item):
+ return before(self.expiration_date[item])
def __getitem__(self, item):
- if item in self.entity:
- if self._is_fresh(item):
- entity = self.entity[item]
- else:
- logger.info("Metadata for {} have expired, refreshing "
- "metadata".format(item))
- self.entity.pop(item)
- entity = self.fetch_metadata(item)
+ if item not in self.entity:
+ entity = self._fetch_metadata(item)
+ elif not self._is_metadata_fresh(item):
+ msg = "Metadata for {} have expired; refreshing metadata".format(item)
+ logger.info(msg)
+ old_entity = self.entity.pop(item)
+ entity = self._fetch_metadata(item)
else:
- entity = self.fetch_metadata(item)
+ entity = self.entity[item]
return entity
def single_sign_on_service(self, entity_id, binding=None, typ="idpsso"):