summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2020-10-25 19:54:58 +0200
committerIvan Kanakarakis <ivan.kanak@gmail.com>2020-10-30 17:24:04 +0200
commit2987a4e53c4bc9e5c6491733058fc14346c5d734 (patch)
tree2406132c8a89536fc3c0633ce4201c83e6f3c7de
parentb315bfb772da1f40f7da2254819f5e4004aeb1d8 (diff)
downloadpysaml2-2987a4e53c4bc9e5c6491733058fc14346c5d734.tar.gz
Refactor Policy to check the registration authority for restrictions
Signed-off-by: Ivan Kanakarakis <ivan.kanak@gmail.com>
-rw-r--r--src/saml2/assertion.py422
-rw-r--r--src/saml2/server.py8
-rw-r--r--tests/test_20_assertion.py91
-rw-r--r--tests/test_37_entity_categories.py215
4 files changed, 387 insertions, 349 deletions
diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py
index dd18affb..30cd90ab 100644
--- a/src/saml2/assertion.py
+++ b/src/saml2/assertion.py
@@ -5,6 +5,7 @@ import importlib
import logging
import re
import six
+from warnings import warn as _warn
from saml2 import saml
from saml2 import xmlenc
@@ -18,6 +19,7 @@ from saml2.saml import NAME_FORMAT_URI
from saml2.time_util import instant
from saml2.time_util import in_a_while
+
logger = logging.getLogger(__name__)
@@ -276,52 +278,54 @@ def restriction_from_attribute_spec(attributes):
return restr
-def post_entity_categories(maps, **kwargs):
- restrictions = {}
- try:
- required = [d['friendly_name'].lower() for d in kwargs['required']]
- except (KeyError, TypeError):
- required = []
+def compile(restrictions):
+ """ This is only for IdPs or AAs, and it's about limiting what
+ is returned to the SP.
+ In the configuration file, restrictions on which values that
+ can be returned are specified with the help of regular expressions.
+ This function goes through and pre-compiles the regular expressions.
- if kwargs["mds"]:
- if "sp_entity_id" in kwargs:
- ecs = kwargs["mds"].entity_categories(kwargs["sp_entity_id"])
- for ec_map in maps:
- for key, (atlist, only_required) in ec_map.items():
- if key == "": # always released
- attrs = atlist
- elif isinstance(key, tuple):
- if only_required:
- attrs = [a for a in atlist if a in required]
- else:
- attrs = atlist
- for _key in key:
- if _key not in ecs:
- attrs = []
- break
- elif key in ecs:
- if only_required:
- attrs = [a for a in atlist if a in required]
- else:
- attrs = atlist
- else:
- attrs = []
+ :param restrictions: policy configuration
+ :return: The assertion with the string specification replaced with
+ a compiled regular expression.
+ """
+ for who, spec in restrictions.items():
+ spec = spec or {}
- for attr in attrs:
- restrictions[attr] = None
- else:
- for ec_map in maps:
- for attr in ec_map[""]:
- restrictions[attr] = None
+ entity_categories = spec.get("entity_categories", [])
+ ecs = []
+ for cat in entity_categories:
+ try:
+ _mod = importlib.import_module(cat)
+ except ImportError:
+ _mod = importlib.import_module("saml2.entity_category.%s" % cat)
+
+ _ec = {}
+ for key, items in _mod.RELEASE.items():
+ alist = [k.lower() for k in items]
+ _only_required = getattr(_mod, "ONLY_REQUIRED", {}).get(key, False)
+ _ec[key] = (alist, _only_required)
+ ecs.append(_ec)
+ spec["entity_categories"] = ecs or None
+
+ attribute_restrictions = spec.get("attribute_restrictions") or {}
+ _attribute_restrictions = {}
+ for key, values in attribute_restrictions.items():
+ lkey = key.lower()
+ values = [] if not values else values
+ _attribute_restrictions[lkey] = (
+ [re.compile(value) for value in values] or None
+ )
+ spec["attribute_restrictions"] = _attribute_restrictions or None
return restrictions
class Policy(object):
- """ handles restrictions on assertions """
+ """Handles restrictions on assertions."""
- def __init__(self, restrictions=None, config=None):
- self._config = config
+ def __init__(self, restrictions=None, mds=None):
+ self.metadata_store = mds
self._restrictions = self.setup_restrictions(restrictions)
logger.debug("policy restrictions: %s", self._restrictions)
self.acs = []
@@ -331,116 +335,50 @@ class Policy(object):
return None
restrictions = copy.deepcopy(restrictions)
- # TODO: Split policy config in service_providers and registration_authorities
- # "policy": {
- # "service_providers": {
- # "default": ...,
- # "urn:mace:example.com:saml:roland:sp": ...,
- # },
- # "registration_authorities": {
- # "default": ...,
- # "http://www.swamid.se": ...,
- # },
- # },
- registration_authorities = restrictions.pop('registration_authorities', None)
- restrictions = self.compile(restrictions)
- if registration_authorities:
- restrictions['registration_authorities'] = self.compile(registration_authorities)
+ restrictions = compile(restrictions)
return restrictions
- @staticmethod
- def compile(restrictions):
- """ This is only for IdPs or AAs, and it's about limiting what
- is returned to the SP.
- In the configuration file, restrictions on which values that
- can be returned are specified with the help of regular expressions.
- This function goes through and pre-compiles the regular expressions.
-
- :param restrictions: policy configuration
- :return: The assertion with the string specification replaced with
- a compiled regular expression.
- """
- for who, spec in restrictions.items():
- if spec is None:
- continue
-
- entity_categories = spec.get("entity_categories")
- if entity_categories is not None:
- ecs = []
- for cat in entity_categories:
- try:
- _mod = importlib.import_module(cat)
- except ImportError:
- _mod = importlib.import_module(
- "saml2.entity_category.%s" % cat)
- _ec = {}
- for key, items in _mod.RELEASE.items():
- alist = [k.lower() for k in items]
- try:
- _only_required = _mod.ONLY_REQUIRED[key]
- except (AttributeError, KeyError):
- _only_required = False
- _ec[key] = (alist, _only_required)
- ecs.append(_ec)
- spec["entity_categories"] = ecs
-
- attribute_restrictions = spec.get("attribute_restrictions")
- if attribute_restrictions is None:
- continue
-
- _attribute_restrictions = {}
- for key, values in attribute_restrictions.items():
- if not values:
- _attribute_restrictions[key.lower()] = None
- continue
- _attribute_restrictions[key.lower()] = [re.compile(value) for value in values]
-
- spec["attribute_restrictions"] = _attribute_restrictions
-
- return restrictions
-
- def _lookup_registry_authority(self, sp_entity_id):
- if self._config and self._config.metadata:
- registration_info = self._config.metadata.registration_info(sp_entity_id)
- return registration_info.get('registration_authority')
- return None
-
- def get(self, attribute, sp_entity_id, default=None, post_func=None,
- **kwargs):
+ def get(self, attribute, sp_entity_id, default=None):
"""
:param attribute:
:param sp_entity_id:
:param default:
- :param post_func:
:return:
"""
if not self._restrictions:
return default
- registration_authority_name = self._lookup_registry_authority(sp_entity_id)
- registration_authorities = self._restrictions.get("registration_authorities")
-
- val = None
- # Specific SP takes precedence
- if sp_entity_id in self._restrictions:
- val = self._restrictions[sp_entity_id].get(attribute)
- # Second choice is if the SP is part of a configured registration authority
- elif registration_authorities and registration_authority_name in registration_authorities:
- val = registration_authorities[registration_authority_name].get(attribute)
- # Third is to try default for registration authorities
- elif registration_authorities and 'default' in registration_authorities:
- val = registration_authorities['default'].get(attribute)
- # Lastly we try default for SPs
- elif 'default' in self._restrictions:
- val = self._restrictions.get('default').get(attribute)
-
- if val is None:
- return default
- elif post_func:
- return post_func(val, sp_entity_id=sp_entity_id, **kwargs)
- else:
- return val
+ ra_info = (
+ self.metadata_store.registration_info(sp_entity_id) or {}
+ if self.metadata_store is not None
+ else {}
+ )
+ ra_entity_id = ra_info.get("registration_authority")
+
+ sp_restrictions = self._restrictions.get(sp_entity_id)
+ ra_restrictions = self._restrictions.get(ra_entity_id)
+ default_restrictions = (
+ self._restrictions.get("default")
+ or self._restrictions.get("")
+ )
+ restrictions = (
+ sp_restrictions
+ if sp_restrictions is not None
+ else ra_restrictions
+ if ra_restrictions is not None
+ else default_restrictions
+ if default_restrictions is not None
+ else {}
+ )
+
+ attribute_restriction = restrictions.get(attribute)
+ restriction = (
+ attribute_restriction
+ if attribute_restriction is not None
+ else default
+ )
+ return restriction
def get_nameid_format(self, sp_entity_id):
""" Get the NameIDFormat to used for the entity id
@@ -484,18 +422,78 @@ class Policy(object):
return self.get("fail_on_missing_requested", sp_entity_id, default=True)
- def get_entity_categories(self, sp_entity_id, mds, required):
+ def get_sign(self, sp_entity_id):
+ """
+ Possible choices
+ "sign": ["response", "assertion", "on_demand"]
+
+ :param sp_entity_id:
+ :return:
+ """
+
+ return self.get("sign", sp_entity_id, default=[])
+
+ def get_entity_categories(self, sp_entity_id, mds=None, required=None):
"""
:param sp_entity_id:
- :param mds: MetadataStore instance
:param required: required attributes
:return: A dictionary with restrictions
"""
- kwargs = {"mds": mds, 'required': required}
+ if mds is not None:
+ warn_msg = (
+ "The mds parameter for saml2.assertion.Policy.get_entity_categories "
+ "is deprecated; "
+ "instead, initialize the Policy object setting the mds param."
+ )
+ logger.warning(warn_msg)
+ _warn(warn_msg, DeprecationWarning)
+
+ def post_entity_categories(maps, sp_entity_id=None, mds=None, required=None):
+ restrictions = {}
+ required = [d['friendly_name'].lower() for d in (required or [])]
+
+ if mds:
+ ecs = mds.entity_categories(sp_entity_id)
+ for ec_map in maps:
+ for key, (atlist, only_required) in ec_map.items():
+ if key == "": # always released
+ attrs = atlist
+ elif isinstance(key, tuple):
+ if only_required:
+ attrs = [a for a in atlist if a in required]
+ else:
+ attrs = atlist
+ for _key in key:
+ if _key not in ecs:
+ attrs = []
+ break
+ elif key in ecs:
+ if only_required:
+ attrs = [a for a in atlist if a in required]
+ else:
+ attrs = atlist
+ else:
+ attrs = []
+
+ for attr in attrs:
+ restrictions[attr] = None
- return self.get("entity_categories", sp_entity_id, default={}, post_func=post_entity_categories, **kwargs)
+ return restrictions
+
+ sentinel = object()
+ result1 = self.get("entity_categories", sp_entity_id, default=sentinel)
+ if result1 is sentinel:
+ return {}
+
+ result2 = post_entity_categories(
+ result1,
+ sp_entity_id=sp_entity_id,
+ mds=(mds or self.metadata_store),
+ required=required,
+ )
+ return result2
def not_on_or_after(self, sp_entity_id):
""" When the assertion stops being valid, should not be
@@ -507,74 +505,84 @@ class Policy(object):
return in_a_while(**self.get_lifetime(sp_entity_id))
- def get_sign(self, sp_entity_id):
- """
- Possible choices
- "sign": ["response", "assertion", "on_demand"]
-
- :param sp_entity_id:
- :return:
- """
-
- return self.get("sign", sp_entity_id, default=[])
-
- def filter(self, ava, sp_entity_id, mdstore, required=None, optional=None):
+ def filter(self, ava, sp_entity_id, mdstore=None, required=None, optional=None):
""" What attribute and attribute values returns depends on what
- the SP has said it wants in the request or in the metadata file and
- what the IdP/AA wants to release. An assumption is that what the SP
+ the SP or the registration authority has said it wants in the request
+ or in the metadata file and what the IdP/AA wants to release.
+ An assumption is that what the SP or the registration authority
asks for overrides whatever is in the metadata. But of course the
IdP never releases anything it doesn't want to.
:param ava: The information about the subject as a dictionary
:param sp_entity_id: The entity ID of the SP
- :param mdstore: A Metadata store
:param required: Attributes that the SP requires in the assertion
:param optional: Attributes that the SP regards as optional
:return: A possibly modified AVA
"""
- _ava = None
-
- if not self.acs: # acs MUST have a value, fall back to default.
+ if mdstore is not None:
+ warn_msg = (
+ "The mdstore parameter for saml2.assertion.Policy.filter "
+ "is deprecated; "
+ "instead, initialize the Policy object setting the mds param."
+ )
+ logger.warning(warn_msg)
+ _warn(warn_msg, DeprecationWarning)
+
+ # acs MUST have a value, fall back to default.
+ if not self.acs:
self.acs = ac_factory()
- _rest = self.get_entity_categories(sp_entity_id, mdstore, required)
- if _rest:
- _ava = filter_attribute_value_assertions(ava.copy(), _rest)
+ subject_ava = ava.copy()
+
+ # entity category restrictions
+ _ent_rest = self.get_entity_categories(sp_entity_id, mds=mdstore, required=required)
+ if _ent_rest:
+ subject_ava = filter_attribute_value_assertions(subject_ava, _ent_rest)
elif required or optional:
logger.debug("required: %s, optional: %s", required, optional)
- _ava = filter_on_attributes(
- ava.copy(), required, optional, self.acs,
- self.get_fail_on_missing_requested(sp_entity_id))
-
- _rest = self.get_attribute_restrictions(sp_entity_id)
- if _rest:
- if _ava is None:
- _ava = ava.copy()
- _ava = filter_attribute_value_assertions(_ava, _rest)
- elif _ava is None:
- _ava = ava.copy()
-
- if _ava is None:
- return {}
- else:
- return _ava
+ subject_ava = filter_on_attributes(
+ subject_ava,
+ required,
+ optional,
+ self.acs,
+ self.get_fail_on_missing_requested(sp_entity_id),
+ )
+
+ # attribute restrictions
+ _attr_rest = self.get_attribute_restrictions(sp_entity_id)
+ subject_ava = filter_attribute_value_assertions(subject_ava, _attr_rest)
+
+ return subject_ava or {}
def restrict(self, ava, sp_entity_id, metadata=None):
- """ Identity attribute names are expected to be expressed in
- the local lingo (== friendlyName)
+ """ Identity attribute names are expected to be expressed as FriendlyNames
:return: A filtered ava according to the IdPs/AAs rules and
the list of required/optional attributes according to the SP.
If the requirements can't be met an exception is raised.
"""
- if metadata:
- spec = metadata.attribute_requirement(sp_entity_id)
- if spec:
- return self.filter(ava, sp_entity_id, metadata,
- spec["required"], spec["optional"])
-
- return self.filter(ava, sp_entity_id, metadata, [], [])
+ if metadata is not None:
+ warn_msg = (
+ "The metadata parameter for saml2.assertion.Policy.restrict "
+ "is deprecated and ignored; "
+ "instead, initialize the Policy object setting the mds param."
+ )
+ logger.warning(warn_msg)
+ _warn(warn_msg, DeprecationWarning)
+
+ metadata_store = metadata or self.metadata_store
+ spec = (
+ metadata_store.attribute_requirement(sp_entity_id) or {}
+ if metadata_store
+ else {}
+ )
+ return self.filter(
+ ava,
+ sp_entity_id,
+ required=spec.get("required"),
+ optional=spec.get("optional"),
+ )
def conditions(self, sp_entity_id):
""" Return a saml.Condition instance
@@ -582,27 +590,18 @@ class Policy(object):
:param sp_entity_id: The SP entity ID
:return: A saml.Condition instance
"""
- return factory(saml.Conditions,
- not_before=instant(),
- # How long might depend on who's getting it
- not_on_or_after=self.not_on_or_after(sp_entity_id),
- audience_restriction=[factory(
- saml.AudienceRestriction,
- audience=[factory(saml.Audience,
- text=sp_entity_id)])])
-
- def entity_category_attributes(self, ec):
- # TODO: Not used. Remove?
- if not self._restrictions:
- return None
-
- ec_maps = self._restrictions["default"]["entity_categories"]
- for ec_map in ec_maps:
- try:
- return ec_map[ec]
- except KeyError:
- pass
- return []
+ return factory(
+ saml.Conditions,
+ not_before=instant(),
+ # How long might depend on who's getting it
+ not_on_or_after=self.not_on_or_after(sp_entity_id),
+ audience_restriction=[
+ factory(
+ saml.AudienceRestriction,
+ audience=[factory(saml.Audience, text=sp_entity_id)],
+ ),
+ ],
+ )
class EntityCategories(object):
@@ -740,12 +739,10 @@ def do_subject_confirmation(policy, sp_entity_id, key_info=None, **treeargs):
def do_subject(policy, sp_entity_id, name_id, **farg):
- #
specs = farg['subject_confirmation']
if isinstance(specs, list):
- res = [do_subject_confirmation(policy, sp_entity_id, **s) for s in
- specs]
+ res = [do_subject_confirmation(policy, sp_entity_id, **s) for s in specs]
else:
res = [do_subject_confirmation(policy, sp_entity_id, **specs)]
@@ -833,17 +830,16 @@ class Assertion(dict):
return _ass
- def apply_policy(self, sp_entity_id, policy, metadata=None):
+ def apply_policy(self, sp_entity_id, policy):
""" Apply policy to the assertion I'm representing
:param sp_entity_id: The SP entity ID
:param policy: The policy
- :param metadata: Metadata to use
:return: The resulting AVA after the policy is applied
"""
policy.acs = self.acs
- ava = policy.restrict(self, sp_entity_id, metadata)
+ ava = policy.restrict(self, sp_entity_id)
for key, val in list(self.items()):
if key in ava:
diff --git a/src/saml2/server.py b/src/saml2/server.py
index e8b1d737..50250c3a 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -347,9 +347,9 @@ class Server(Entity):
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters")
if policy is None:
- policy = Policy()
+ policy = Policy(mds=self.metadata)
try:
- ast.apply_policy(sp_entity_id, policy, self.metadata)
+ ast.apply_policy(sp_entity_id, policy)
except MissingValue as exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
@@ -537,9 +537,9 @@ class Server(Entity):
_issuer = self._issuer(issuer)
ast = Assertion(identity)
if policy:
- ast.apply_policy(sp_entity_id, policy, self.metadata)
+ ast.apply_policy(sp_entity_id, policy)
else:
- policy = Policy()
+ policy = Policy(mds=self.metadata)
if attributes:
restr = restriction_from_attribute_spec(attributes)
diff --git a/tests/test_20_assertion.py b/tests/test_20_assertion.py
index dc501291..da7e70fd 100644
--- a/tests/test_20_assertion.py
+++ b/tests/test_20_assertion.py
@@ -218,7 +218,7 @@ def test_ava_filter_1():
"surName": "Jeter",
"mail": "derek@example.com"}
- ava = r.filter(ava, "urn:mace:umu.se:saml:roland:sp", None, None)
+ ava = r.filter(ava, "urn:mace:umu.se:saml:roland:sp")
assert _eq(list(ava.keys()), ["givenName", "surName"])
ava = {"givenName": "Derek",
@@ -247,8 +247,7 @@ def test_ava_filter_2():
ava = {"givenName": "Derek", "sn": "Jeter", "mail": "derek@example.com"}
# mail removed because it doesn't match the regular expression
- _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None, [mail],
- [gn, sn])
+ _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[mail], optional=[gn, sn])
assert _eq(sorted(list(_ava.keys())), ["givenName", 'sn'])
@@ -256,8 +255,7 @@ def test_ava_filter_2():
# it wasn't there to begin with
try:
- policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None,
- [gn, sn, mail])
+ policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[gn, sn, mail])
except MissingValue:
pass
@@ -287,8 +285,7 @@ def test_ava_filter_dont_fail():
# mail removed because it doesn't match the regular expression
# So it should fail if the 'fail_on_ ...' flag wasn't set
- _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None,
- [mail], [gn, sn])
+ _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[mail], optional=[gn, sn])
assert _ava
@@ -296,8 +293,7 @@ def test_ava_filter_dont_fail():
"surName": "Jeter"}
# it wasn't there to begin with
- _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp',
- None, [gn, sn, mail])
+ _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[gn, sn, mail])
assert _ava
@@ -633,7 +629,7 @@ def test_filter_ava_0():
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(sorted(list(ava.keys())), ["givenName", "mail", "surName"])
assert ava["givenName"] == ["Derek"]
@@ -660,7 +656,7 @@ def test_filter_ava_1():
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(sorted(list(ava.keys())), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
@@ -685,7 +681,7 @@ def test_filter_ava_2():
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(list(ava.keys()), ["mail"])
assert ava["mail"] == ["derek@nyy.mlb.com"]
@@ -709,7 +705,7 @@ def test_filter_ava_3():
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(list(ava.keys()), ["mail"])
assert ava["mail"] == ["dj@example.com"]
@@ -733,7 +729,7 @@ def test_filter_ava_4():
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp")
assert _eq(sorted(list(ava.keys())), ['mail', 'givenName', 'surName'])
assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"])
@@ -772,7 +768,7 @@ def test_req_opt():
'uid': 'rohe0002', 'edupersonaffiliation': 'staff'}
sp_entity_id = "urn:mace:example.com:saml:curt:sp"
- fava = policy.filter(ava, sp_entity_id, None, req, opt)
+ fava = policy.filter(ava, sp_entity_id, required=req, optional=opt)
assert fava
@@ -872,22 +868,27 @@ def test_assertion_with_noop_attribute_conv():
def test_filter_ava_5():
- mds = MetadataStore(ATTRCONV, sec_config,
- disable_ssl_certificate_validation=True)
+ mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["1"])
- policy = Policy({
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
"attribute_restrictions": None, # means all I have
"entity_categories": ["swamid", "edugain"]
}
- })
-
- ava = {"givenName": ["Derek"], "surName": ["Jeter"],
- "mail": ["derek@nyy.mlb.com", "dj@example.com"]}
-
- ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", mdstore=mds, required=[], optional=[])
+ }
+ policy = Policy(restrictions=policy_conf, mds=mds)
+
+ ava = {
+ "givenName": ["Derek"],
+ "surName": ["Jeter"],
+ "mail": [
+ "derek@nyy.mlb.com",
+ "dj@example.com",
+ ],
+ }
+ ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp")
# using entity_categories means there *always* are restrictions
# in this case the only allowed attribute is eduPersonTargetedID
@@ -896,37 +897,40 @@ def test_filter_ava_5():
def test_filter_ava_registration_authority_1():
- mds = MetadataStore(ATTRCONV, sec_config,
- disable_ssl_certificate_validation=True)
+ mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["1"])
- config.metadata = mds
- policy = Policy({
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
"attribute_restrictions": None,
},
- "registration_authorities": {
- "http://rr.aai.switch.ch/": {
- "attribute_restrictions": {
- "givenName": None,
- "surName": None,
- }
+ "http://rr.aai.switch.ch/": {
+ "attribute_restrictions": {
+ "givenName": None,
+ "surName": None,
}
}
- }, config=config)
-
- attributes = {"givenName": ["Derek"], "surName": ["Jeter"],
- "mail": ["derek@nyy.mlb.com", "dj@example.com"]}
+ }
+ policy = Policy(restrictions=policy_conf, mds=mds)
+
+ attributes = {
+ "givenName": ["Derek"],
+ "surName": ["Jeter"],
+ "mail": [
+ "derek@nyy.mlb.com",
+ "dj@example.com",
+ ],
+ }
# SP registered with http://rr.aai.switch.ch/
- ava = policy.filter(attributes, "https://aai-idp.unibe.ch/idp/shibboleth", mdstore=mds, required=[], optional=[])
+ ava = policy.filter(attributes, "https://aai-idp.unibe.ch/idp/shibboleth")
assert _eq(sorted(list(ava.keys())), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
# SP not registered with http://rr.aai.switch.ch/
- ava = policy.filter(attributes, "https://alpha.kib.ki.se/shibboleth", mdstore=mds, required=[], optional=[])
+ ava = policy.filter(attributes, "https://alpha.kib.ki.se/shibboleth")
assert _eq(sorted(list(ava.keys())), ["givenName", "mail", "surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
@@ -936,13 +940,16 @@ def test_filter_ava_registration_authority_1():
def test_assertion_with_zero_attributes():
ava = {}
ast = Assertion(ava)
- policy = Policy({
+
+ policy_conf = {
"default": {
"lifetime": {"minutes": 240},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI
},
- })
+ }
+ policy = Policy(policy_conf)
+
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
farg = add_path(
diff --git a/tests/test_37_entity_categories.py b/tests/test_37_entity_categories.py
index 839030fd..a24a4feb 100644
--- a/tests/test_37_entity_categories.py
+++ b/tests/test_37_entity_categories.py
@@ -25,37 +25,47 @@ def _eq(l1, l2):
def test_filter_ava():
- policy = Policy({
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["swamid"]
}
- })
+ }
+ policy = Policy(policy_conf, MDS)
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com", "dj@example.com"], "c": ["USA"]}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com", "dj@example.com"],
+ "c": ["USA"]
+ }
- ava = policy.filter(ava, "https://connect.sunet.se/shibboleth", MDS)
+ ava = policy.filter(ava, "https://connect.sunet.se/shibboleth")
assert _eq(list(ava.keys()), ['mail', 'givenName', 'sn', 'c'])
assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"])
def test_filter_ava2():
- policy = Policy({
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["refeds", "edugain"]
}
- })
+ }
+ policy = Policy(policy_conf, MDS)
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz"}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz"
+ }
- ava = policy.filter(ava, "https://connect.sunet.se/shibboleth", MDS)
+ ava = policy.filter(ava, "https://connect.sunet.se/shibboleth")
# Mismatch, policy deals with eduGAIN, metadata says SWAMID
# So only minimum should come out
@@ -63,96 +73,120 @@ def test_filter_ava2():
def test_filter_ava3():
- policy = Policy({
+ mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
+ mds.imp(
+ [
+ {
+ "class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("entity_cat_sfs_hei.xml"),)]
+ }
+ ]
+ )
+
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["swamid"]
}
- })
+ }
+ policy = Policy(policy_conf, mds)
+
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "norEduPersonNIN": "19800101134"
+ }
+
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
+ assert _eq(list(ava.keys()), ['eduPersonTargetedID', "norEduPersonNIN"])
+
+def test_filter_ava4():
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp([{"class": "saml2.mdstore.MetaDataFile",
- "metadata": [(full_path("entity_cat_sfs_hei.xml"),)]}])
-
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "norEduPersonNIN": "19800101134"}
-
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
-
- assert _eq(list(ava.keys()), ['eduPersonTargetedID', "norEduPersonNIN"])
-
+ "metadata": [(full_path("entity_cat_re_nren.xml"),)]}])
-def test_filter_ava4():
- policy = Policy({
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["swamid"]
}
- })
-
- mds = MetadataStore(ATTRCONV, sec_config,
- disable_ssl_certificate_validation=True)
- mds.imp([{"class": "saml2.mdstore.MetaDataFile",
- "metadata": [(full_path("entity_cat_re_nren.xml"),)]}])
+ }
+ policy = Policy(policy_conf, mds)
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "norEduPersonNIN": "19800101134"}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "norEduPersonNIN": "19800101134"
+ }
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
-
- assert _eq(list(ava.keys()),
- ['eduPersonTargetedID', "givenName", "c", "mail",
- "sn"])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
+ assert _eq(
+ list(ava.keys()), ['eduPersonTargetedID', "givenName", "c", "mail", "sn"]
+ )
def test_filter_ava5():
+ mds = MetadataStore(ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+ mds.imp([{"class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("entity_cat_re.xml"),)]}])
+
policy = Policy({
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["swamid"]
}
- })
+ }, mds)
- mds = MetadataStore(ATTRCONV, sec_config,
- disable_ssl_certificate_validation=True)
- mds.imp([{"class": "saml2.mdstore.MetaDataFile",
- "metadata": [(full_path("entity_cat_re.xml"),)]}])
-
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "norEduPersonNIN": "19800101134"}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "norEduPersonNIN": "19800101134"
+ }
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(list(ava.keys()), ['eduPersonTargetedID'])
def test_idp_policy_filter():
with closing(Server("idp_conf_ec")) as idp:
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "norEduPersonNIN": "19800101134"}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "norEduPersonNIN": "19800101134"
+ }
policy = idp.config.getattr("policy", "idp")
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp",
- idp.metadata)
-
- print(ava)
- assert list(ava.keys()) == [
- "eduPersonTargetedID"] # because no entity category
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
+ # because no entity category
+ assert list(ava.keys()) == ["eduPersonTargetedID"]
def test_entity_category_import_from_path():
+ mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
+ # The file entity_cat_rs.xml contains the SAML metadata for an SP
+ # tagged with the REFEDs R&S entity category.
+ mds.imp([{"class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("entity_cat_rs.xml"),)]}])
+
# The entity category module myentitycategory.py is in the tests
# directory which is on the standard module search path.
# The module uses a custom interpretation of the REFEDs R&S entity category
@@ -162,34 +196,35 @@ def test_entity_category_import_from_path():
"lifetime": {"minutes": 15},
"entity_categories": ["myentitycategory"]
}
- })
-
- mds = MetadataStore(ATTRCONV, sec_config,
- disable_ssl_certificate_validation=True)
-
- # The file entity_cat_rs.xml contains the SAML metadata for an SP
- # tagged with the REFEDs R&S entity category.
- mds.imp([{"class": "saml2.mdstore.MetaDataFile",
- "metadata": [(full_path("entity_cat_rs.xml"),)]}])
-
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "displayName": "Derek Jeter",
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "eduPersonUniqueId": "R13ET7UD68K0HGR153KE@my.org",
- "eduPersonScopedAffiliation": "member@my.org",
- "eduPersonPrincipalName": "user01@my.org",
- "norEduPersonNIN": "19800101134"}
-
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
+ }, mds)
+
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "displayName": "Derek Jeter",
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "eduPersonUniqueId": "R13ET7UD68K0HGR153KE@my.org",
+ "eduPersonScopedAffiliation": "member@my.org",
+ "eduPersonPrincipalName": "user01@my.org",
+ "norEduPersonNIN": "19800101134"
+ }
+
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
# We expect c and norEduPersonNIN to be filtered out since they are not
# part of the custom entity category.
- assert _eq(list(ava.keys()),
- ["eduPersonTargetedID", "eduPersonPrincipalName",
- "eduPersonUniqueId", "displayName", "givenName",
- "eduPersonScopedAffiliation", "mail", "sn"])
-
-
-if __name__ == "__main__":
- test_filter_ava3()
+ assert _eq(
+ list(ava.keys()),
+ [
+ "eduPersonTargetedID",
+ "eduPersonPrincipalName",
+ "eduPersonUniqueId",
+ "displayName",
+ "givenName",
+ "eduPersonScopedAffiliation",
+ "mail",
+ "sn"
+ ]
+ )