summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohan Lundberg <lundberg@sunet.se>2020-09-30 10:46:45 +0200
committerIvan Kanakarakis <ivan.kanak@gmail.com>2020-10-30 12:55:25 +0200
commitd19febc77caa859193126864486a26055f167250 (patch)
treeebcee758527a914ec8be71b28c9c9fe73b56cc17
parent326705d1e4aa0bb2740ae8d2f5836b7630f58a8f (diff)
downloadpysaml2-d19febc77caa859193126864486a26055f167250.tar.gz
Allow registration authorities in policy
-rw-r--r--docs/howto/config.rst19
-rw-r--r--src/saml2/assertion.py157
-rw-r--r--tests/test_20_assertion.py93
3 files changed, 182 insertions, 87 deletions
diff --git a/docs/howto/config.rst b/docs/howto/config.rst
index e1c2025f..50be1de7 100644
--- a/docs/howto/config.rst
+++ b/docs/howto/config.rst
@@ -530,13 +530,24 @@ An example might be::
"default": {
"lifetime": {"minutes":15},
"attribute_restrictions": None, # means all I have
- "name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
+ "name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
+ "entity_categories": ["edugain"]
},
"urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5},
"attribute_restrictions": {
"givenName": None,
"surName": None,
+ },
+ },
+ "registration_authorities": {
+ "default" {
+ "attribute_restrictions": None
+ },
+ "http://www.swamid.se/": {
+ "attribute_restrictions": {
+ "givenName": None,
+ }
}
}
}
@@ -561,6 +572,12 @@ An example might be::
Using this information, the attribute name in the data source will be mapped to
the friendly name, and the saml attribute name will be taken from the uri/oid
defined in the attribute map.
+*nameid_format*
+ Which nameid format that should be used. Defaults to urn:oasis:names:tc:SAML:2.0:nameid-format:transient.
+*entity_categories*
+ Entity categories to apply.
+*sign*
+ Possible choices: "sign": ["response", "assertion", "on_demand"]
If restrictions on values are deemed necessary, those are represented by
regular expressions.::
diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py
index b47f14b9..dd18affb 100644
--- a/src/saml2/assertion.py
+++ b/src/saml2/assertion.py
@@ -320,37 +320,54 @@ def post_entity_categories(maps, **kwargs):
class Policy(object):
""" handles restrictions on assertions """
- def __init__(self, restrictions=None):
- if restrictions:
- self.compile(restrictions)
- else:
- self._restrictions = None
+ def __init__(self, restrictions=None, config=None):
+ self._config = config
+ self._restrictions = self.setup_restrictions(restrictions)
+ logger.debug("policy restrictions: %s", self._restrictions)
self.acs = []
- def compile(self, restrictions):
+ def setup_restrictions(self, restrictions=None):
+ if restrictions is None:
+ return None
+
+ restrictions = copy.deepcopy(restrictions)
+ # TODO: Split policy config in service_providers and registration_authorities
+ # "policy": {
+ # "service_providers": {
+ # "default": ...,
+ # "urn:mace:example.com:saml:roland:sp": ...,
+ # },
+ # "registration_authorities": {
+ # "default": ...,
+ # "http://www.swamid.se": ...,
+ # },
+ # },
+ registration_authorities = restrictions.pop('registration_authorities', None)
+ restrictions = self.compile(restrictions)
+ if registration_authorities:
+ restrictions['registration_authorities'] = self.compile(registration_authorities)
+ return restrictions
+
+ @staticmethod
+ def compile(restrictions):
""" This is only for IdPs or AAs, and it's about limiting what
is returned to the SP.
In the configuration file, restrictions on which values that
can be returned are specified with the help of regular expressions.
This function goes through and pre-compiles the regular expressions.
- :param restrictions:
+ :param restrictions: policy configuration
:return: The assertion with the string specification replaced with
a compiled regular expression.
"""
-
- self._restrictions = copy.deepcopy(restrictions)
-
- for who, spec in self._restrictions.items():
+ for who, spec in restrictions.items():
if spec is None:
continue
- try:
- items = spec["entity_categories"]
- except KeyError:
- pass
- else:
+
+ entity_categories = spec.get("entity_categories")
+ if entity_categories is not None:
ecs = []
- for cat in items:
+ for cat in entity_categories:
try:
_mod = importlib.import_module(cat)
except ImportError:
@@ -366,25 +383,27 @@ class Policy(object):
_ec[key] = (alist, _only_required)
ecs.append(_ec)
spec["entity_categories"] = ecs
- try:
- restr = spec["attribute_restrictions"]
- except KeyError:
- continue
- if restr is None:
+ attribute_restrictions = spec.get("attribute_restrictions")
+ if attribute_restrictions is None:
continue
- _are = {}
- for key, values in restr.items():
+ _attribute_restrictions = {}
+ for key, values in attribute_restrictions.items():
if not values:
- _are[key.lower()] = None
+ _attribute_restrictions[key.lower()] = None
continue
+ _attribute_restrictions[key.lower()] = [re.compile(value) for value in values]
- _are[key.lower()] = [re.compile(value) for value in values]
- spec["attribute_restrictions"] = _are
- logger.debug("policy restrictions: %s", self._restrictions)
+ spec["attribute_restrictions"] = _attribute_restrictions
- return self._restrictions
+ return restrictions
+
+ def _lookup_registry_authority(self, sp_entity_id):
+ if self._config and self._config.metadata:
+ registration_info = self._config.metadata.registration_info(sp_entity_id)
+ return registration_info.get('registration_authority')
+ return None
def get(self, attribute, sp_entity_id, default=None, post_func=None,
**kwargs):
@@ -399,16 +418,22 @@ class Policy(object):
if not self._restrictions:
return default
- try:
- try:
- val = self._restrictions[sp_entity_id][attribute]
- except KeyError:
- try:
- val = self._restrictions["default"][attribute]
- except KeyError:
- val = None
- except KeyError:
- val = None
+ registration_authority_name = self._lookup_registry_authority(sp_entity_id)
+ registration_authorities = self._restrictions.get("registration_authorities")
+
+ val = None
+ # Specific SP takes precedence
+ if sp_entity_id in self._restrictions:
+ val = self._restrictions[sp_entity_id].get(attribute)
+ # Second choice is if the SP is part of a configured registration authority
+ elif registration_authorities and registration_authority_name in registration_authorities:
+ val = registration_authorities[registration_authority_name].get(attribute)
+ # Third is to try default for registration authorities
+ elif registration_authorities and 'default' in registration_authorities:
+ val = registration_authorities['default'].get(attribute)
+ # Lastly we try default for SPs
+ elif 'default' in self._restrictions:
+ val = self._restrictions.get('default').get(attribute)
if val is None:
return default
@@ -422,8 +447,7 @@ class Policy(object):
:param: The SP entity ID
:retur: The format
"""
- return self.get("nameid_format", sp_entity_id,
- saml.NAMEID_FORMAT_TRANSIENT)
+ return self.get("nameid_format", sp_entity_id, saml.NAMEID_FORMAT_TRANSIENT)
def get_name_form(self, sp_entity_id):
""" Get the NameFormat to used for the entity id
@@ -431,7 +455,7 @@ class Policy(object):
:retur: The format
"""
- return self.get("name_form", sp_entity_id, NAME_FORMAT_URI)
+ return self.get("name_form", sp_entity_id, default=NAME_FORMAT_URI)
def get_lifetime(self, sp_entity_id):
""" The lifetime of the assertion
@@ -458,32 +482,20 @@ class Policy(object):
:return: The restrictions
"""
- return self.get("fail_on_missing_requested", sp_entity_id, True)
-
- def entity_category_attributes(self, ec):
- if not self._restrictions:
- return None
-
- ec_maps = self._restrictions["default"]["entity_categories"]
- for ec_map in ec_maps:
- try:
- return ec_map[ec]
- except KeyError:
- pass
- return []
+ return self.get("fail_on_missing_requested", sp_entity_id, default=True)
def get_entity_categories(self, sp_entity_id, mds, required):
"""
:param sp_entity_id:
:param mds: MetadataStore instance
+ :param required: required attributes
:return: A dictionary with restrictions
"""
kwargs = {"mds": mds, 'required': required}
- return self.get("entity_categories", sp_entity_id, default={},
- post_func=post_entity_categories, **kwargs)
+ return self.get("entity_categories", sp_entity_id, default={}, post_func=post_entity_categories, **kwargs)
def not_on_or_after(self, sp_entity_id):
""" When the assertion stops being valid, should not be
@@ -495,6 +507,17 @@ class Policy(object):
return in_a_while(**self.get_lifetime(sp_entity_id))
+ def get_sign(self, sp_entity_id):
+ """
+ Possible choices
+ "sign": ["response", "assertion", "on_demand"]
+
+ :param sp_entity_id:
+ :return:
+ """
+
+ return self.get("sign", sp_entity_id, default=[])
+
def filter(self, ava, sp_entity_id, mdstore, required=None, optional=None):
""" What attribute and attribute values returns depends on what
the SP has said it wants in the request or in the metadata file and
@@ -568,16 +591,18 @@ class Policy(object):
audience=[factory(saml.Audience,
text=sp_entity_id)])])
- def get_sign(self, sp_entity_id):
- """
- Possible choices
- "sign": ["response", "assertion", "on_demand"]
-
- :param sp_entity_id:
- :return:
- """
+ def entity_category_attributes(self, ec):
+ # TODO: Not used. Remove?
+ if not self._restrictions:
+ return None
- return self.get("sign", sp_entity_id, [])
+ ec_maps = self._restrictions["default"]["entity_categories"]
+ for ec_map in ec_maps:
+ try:
+ return ec_map[ec]
+ except KeyError:
+ pass
+ return []
class EntityCategories(object):
diff --git a/tests/test_20_assertion.py b/tests/test_20_assertion.py
index f617e516..dc501291 100644
--- a/tests/test_20_assertion.py
+++ b/tests/test_20_assertion.py
@@ -1,8 +1,11 @@
# coding=utf-8
+import copy
+
from saml2.argtree import add_path
from saml2.authn_context import pword
from saml2.mdie import to_dict
-from saml2 import md, assertion, create_class_from_xml_string
+from saml2 import md, assertion, create_class_from_xml_string, config
+from saml2.mdstore import MetadataStore
from saml2.saml import Attribute
from saml2.saml import Issuer
from saml2.saml import NAMEID_FORMAT_ENTITY
@@ -33,6 +36,15 @@ from saml2 import xmlenc
from pathutils import full_path
ONTS = [saml, mdui, mdattr, dri, idpdisc, md, xmldsig, xmlenc]
+ATTRCONV = ac_factory(full_path("attributemaps"))
+sec_config = config.Config()
+
+METADATACONF = {
+ "1": [{
+ "class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("swamid-2.0.xml"),)],
+ }],
+}
def _eq(l1, l2):
@@ -859,25 +871,66 @@ def test_assertion_with_noop_attribute_conv():
assert attr.attribute_value[0].text == "Roland"
-# THis test doesn't work without a MetadataStore instance
-# def test_filter_ava_5():
-# policy = Policy({
-# "default": {
-# "lifetime": {"minutes": 15},
-# #"attribute_restrictions": None # means all I have
-# "entity_categories": ["swamid", "edugain"]
-# }
-# })
-#
-# ava = {"givenName": ["Derek"], "surName": ["Jeter"],
-# "mail": ["derek@nyy.mlb.com", "dj@example.com"]}
-#
-# ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", None, [], [])
-#
-# # using entity_categories means there *always* are restrictions
-# # in this case the only allowed attribute is eduPersonTargetedID
-# # which isn't available in the ava hence zip is returned.
-# assert ava == {}
+def test_filter_ava_5():
+ mds = MetadataStore(ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+ mds.imp(METADATACONF["1"])
+
+ policy = Policy({
+ "default": {
+ "lifetime": {"minutes": 15},
+ "attribute_restrictions": None, # means all I have
+ "entity_categories": ["swamid", "edugain"]
+ }
+ })
+
+ ava = {"givenName": ["Derek"], "surName": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com", "dj@example.com"]}
+
+ ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", mdstore=mds, required=[], optional=[])
+
+ # using entity_categories means there *always* are restrictions
+ # in this case the only allowed attribute is eduPersonTargetedID
+ # which isn't available in the ava hence zip is returned.
+ assert ava == {}
+
+
+def test_filter_ava_registration_authority_1():
+ mds = MetadataStore(ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+ mds.imp(METADATACONF["1"])
+ config.metadata = mds
+
+ policy = Policy({
+ "default": {
+ "lifetime": {"minutes": 15},
+ "attribute_restrictions": None,
+ },
+ "registration_authorities": {
+ "http://rr.aai.switch.ch/": {
+ "attribute_restrictions": {
+ "givenName": None,
+ "surName": None,
+ }
+ }
+ }
+ }, config=config)
+
+ attributes = {"givenName": ["Derek"], "surName": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com", "dj@example.com"]}
+
+ # SP registered with http://rr.aai.switch.ch/
+ ava = policy.filter(attributes, "https://aai-idp.unibe.ch/idp/shibboleth", mdstore=mds, required=[], optional=[])
+ assert _eq(sorted(list(ava.keys())), ["givenName", "surName"])
+ assert ava["givenName"] == ["Derek"]
+ assert ava["surName"] == ["Jeter"]
+
+ # SP not registered with http://rr.aai.switch.ch/
+ ava = policy.filter(attributes, "https://alpha.kib.ki.se/shibboleth", mdstore=mds, required=[], optional=[])
+ assert _eq(sorted(list(ava.keys())), ["givenName", "mail", "surName"])
+ assert ava["givenName"] == ["Derek"]
+ assert ava["surName"] == ["Jeter"]
+ assert ava["mail"] == ["derek@nyy.mlb.com", "dj@example.com"]
def test_assertion_with_zero_attributes():