diff options
-rw-r--r-- | src/saml2/assertion.py | 422 | ||||
-rw-r--r-- | src/saml2/server.py | 8 | ||||
-rw-r--r-- | tests/test_20_assertion.py | 91 | ||||
-rw-r--r-- | tests/test_37_entity_categories.py | 215 |
4 files changed, 387 insertions, 349 deletions
diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py index dd18affb..30cd90ab 100644 --- a/src/saml2/assertion.py +++ b/src/saml2/assertion.py @@ -5,6 +5,7 @@ import importlib import logging import re import six +from warnings import warn as _warn from saml2 import saml from saml2 import xmlenc @@ -18,6 +19,7 @@ from saml2.saml import NAME_FORMAT_URI from saml2.time_util import instant from saml2.time_util import in_a_while + logger = logging.getLogger(__name__) @@ -276,52 +278,54 @@ def restriction_from_attribute_spec(attributes): return restr -def post_entity_categories(maps, **kwargs): - restrictions = {} - try: - required = [d['friendly_name'].lower() for d in kwargs['required']] - except (KeyError, TypeError): - required = [] +def compile(restrictions): + """ This is only for IdPs or AAs, and it's about limiting what + is returned to the SP. + In the configuration file, restrictions on which values that + can be returned are specified with the help of regular expressions. + This function goes through and pre-compiles the regular expressions. - if kwargs["mds"]: - if "sp_entity_id" in kwargs: - ecs = kwargs["mds"].entity_categories(kwargs["sp_entity_id"]) - for ec_map in maps: - for key, (atlist, only_required) in ec_map.items(): - if key == "": # always released - attrs = atlist - elif isinstance(key, tuple): - if only_required: - attrs = [a for a in atlist if a in required] - else: - attrs = atlist - for _key in key: - if _key not in ecs: - attrs = [] - break - elif key in ecs: - if only_required: - attrs = [a for a in atlist if a in required] - else: - attrs = atlist - else: - attrs = [] + :param restrictions: policy configuration + :return: The assertion with the string specification replaced with + a compiled regular expression. + """ + for who, spec in restrictions.items(): + spec = spec or {} - for attr in attrs: - restrictions[attr] = None - else: - for ec_map in maps: - for attr in ec_map[""]: - restrictions[attr] = None + entity_categories = spec.get("entity_categories", []) + ecs = [] + for cat in entity_categories: + try: + _mod = importlib.import_module(cat) + except ImportError: + _mod = importlib.import_module("saml2.entity_category.%s" % cat) + + _ec = {} + for key, items in _mod.RELEASE.items(): + alist = [k.lower() for k in items] + _only_required = getattr(_mod, "ONLY_REQUIRED", {}).get(key, False) + _ec[key] = (alist, _only_required) + ecs.append(_ec) + spec["entity_categories"] = ecs or None + + attribute_restrictions = spec.get("attribute_restrictions") or {} + _attribute_restrictions = {} + for key, values in attribute_restrictions.items(): + lkey = key.lower() + values = [] if not values else values + _attribute_restrictions[lkey] = ( + [re.compile(value) for value in values] or None + ) + spec["attribute_restrictions"] = _attribute_restrictions or None return restrictions class Policy(object): - """ handles restrictions on assertions """ + """Handles restrictions on assertions.""" - def __init__(self, restrictions=None, config=None): - self._config = config + def __init__(self, restrictions=None, mds=None): + self.metadata_store = mds self._restrictions = self.setup_restrictions(restrictions) logger.debug("policy restrictions: %s", self._restrictions) self.acs = [] @@ -331,116 +335,50 @@ class Policy(object): return None restrictions = copy.deepcopy(restrictions) - # TODO: Split policy config in service_providers and registration_authorities - # "policy": { - # "service_providers": { - # "default": ..., - # "urn:mace:example.com:saml:roland:sp": ..., - # }, - # "registration_authorities": { - # "default": ..., - # "http://www.swamid.se": ..., - # }, - # }, - registration_authorities = restrictions.pop('registration_authorities', None) - restrictions = self.compile(restrictions) - if registration_authorities: - restrictions['registration_authorities'] = self.compile(registration_authorities) + restrictions = compile(restrictions) return restrictions - @staticmethod - def compile(restrictions): - """ This is only for IdPs or AAs, and it's about limiting what - is returned to the SP. - In the configuration file, restrictions on which values that - can be returned are specified with the help of regular expressions. - This function goes through and pre-compiles the regular expressions. - - :param restrictions: policy configuration - :return: The assertion with the string specification replaced with - a compiled regular expression. - """ - for who, spec in restrictions.items(): - if spec is None: - continue - - entity_categories = spec.get("entity_categories") - if entity_categories is not None: - ecs = [] - for cat in entity_categories: - try: - _mod = importlib.import_module(cat) - except ImportError: - _mod = importlib.import_module( - "saml2.entity_category.%s" % cat) - _ec = {} - for key, items in _mod.RELEASE.items(): - alist = [k.lower() for k in items] - try: - _only_required = _mod.ONLY_REQUIRED[key] - except (AttributeError, KeyError): - _only_required = False - _ec[key] = (alist, _only_required) - ecs.append(_ec) - spec["entity_categories"] = ecs - - attribute_restrictions = spec.get("attribute_restrictions") - if attribute_restrictions is None: - continue - - _attribute_restrictions = {} - for key, values in attribute_restrictions.items(): - if not values: - _attribute_restrictions[key.lower()] = None - continue - _attribute_restrictions[key.lower()] = [re.compile(value) for value in values] - - spec["attribute_restrictions"] = _attribute_restrictions - - return restrictions - - def _lookup_registry_authority(self, sp_entity_id): - if self._config and self._config.metadata: - registration_info = self._config.metadata.registration_info(sp_entity_id) - return registration_info.get('registration_authority') - return None - - def get(self, attribute, sp_entity_id, default=None, post_func=None, - **kwargs): + def get(self, attribute, sp_entity_id, default=None): """ :param attribute: :param sp_entity_id: :param default: - :param post_func: :return: """ if not self._restrictions: return default - registration_authority_name = self._lookup_registry_authority(sp_entity_id) - registration_authorities = self._restrictions.get("registration_authorities") - - val = None - # Specific SP takes precedence - if sp_entity_id in self._restrictions: - val = self._restrictions[sp_entity_id].get(attribute) - # Second choice is if the SP is part of a configured registration authority - elif registration_authorities and registration_authority_name in registration_authorities: - val = registration_authorities[registration_authority_name].get(attribute) - # Third is to try default for registration authorities - elif registration_authorities and 'default' in registration_authorities: - val = registration_authorities['default'].get(attribute) - # Lastly we try default for SPs - elif 'default' in self._restrictions: - val = self._restrictions.get('default').get(attribute) - - if val is None: - return default - elif post_func: - return post_func(val, sp_entity_id=sp_entity_id, **kwargs) - else: - return val + ra_info = ( + self.metadata_store.registration_info(sp_entity_id) or {} + if self.metadata_store is not None + else {} + ) + ra_entity_id = ra_info.get("registration_authority") + + sp_restrictions = self._restrictions.get(sp_entity_id) + ra_restrictions = self._restrictions.get(ra_entity_id) + default_restrictions = ( + self._restrictions.get("default") + or self._restrictions.get("") + ) + restrictions = ( + sp_restrictions + if sp_restrictions is not None + else ra_restrictions + if ra_restrictions is not None + else default_restrictions + if default_restrictions is not None + else {} + ) + + attribute_restriction = restrictions.get(attribute) + restriction = ( + attribute_restriction + if attribute_restriction is not None + else default + ) + return restriction def get_nameid_format(self, sp_entity_id): """ Get the NameIDFormat to used for the entity id @@ -484,18 +422,78 @@ class Policy(object): return self.get("fail_on_missing_requested", sp_entity_id, default=True) - def get_entity_categories(self, sp_entity_id, mds, required): + def get_sign(self, sp_entity_id): + """ + Possible choices + "sign": ["response", "assertion", "on_demand"] + + :param sp_entity_id: + :return: + """ + + return self.get("sign", sp_entity_id, default=[]) + + def get_entity_categories(self, sp_entity_id, mds=None, required=None): """ :param sp_entity_id: - :param mds: MetadataStore instance :param required: required attributes :return: A dictionary with restrictions """ - kwargs = {"mds": mds, 'required': required} + if mds is not None: + warn_msg = ( + "The mds parameter for saml2.assertion.Policy.get_entity_categories " + "is deprecated; " + "instead, initialize the Policy object setting the mds param." + ) + logger.warning(warn_msg) + _warn(warn_msg, DeprecationWarning) + + def post_entity_categories(maps, sp_entity_id=None, mds=None, required=None): + restrictions = {} + required = [d['friendly_name'].lower() for d in (required or [])] + + if mds: + ecs = mds.entity_categories(sp_entity_id) + for ec_map in maps: + for key, (atlist, only_required) in ec_map.items(): + if key == "": # always released + attrs = atlist + elif isinstance(key, tuple): + if only_required: + attrs = [a for a in atlist if a in required] + else: + attrs = atlist + for _key in key: + if _key not in ecs: + attrs = [] + break + elif key in ecs: + if only_required: + attrs = [a for a in atlist if a in required] + else: + attrs = atlist + else: + attrs = [] + + for attr in attrs: + restrictions[attr] = None - return self.get("entity_categories", sp_entity_id, default={}, post_func=post_entity_categories, **kwargs) + return restrictions + + sentinel = object() + result1 = self.get("entity_categories", sp_entity_id, default=sentinel) + if result1 is sentinel: + return {} + + result2 = post_entity_categories( + result1, + sp_entity_id=sp_entity_id, + mds=(mds or self.metadata_store), + required=required, + ) + return result2 def not_on_or_after(self, sp_entity_id): """ When the assertion stops being valid, should not be @@ -507,74 +505,84 @@ class Policy(object): return in_a_while(**self.get_lifetime(sp_entity_id)) - def get_sign(self, sp_entity_id): - """ - Possible choices - "sign": ["response", "assertion", "on_demand"] - - :param sp_entity_id: - :return: - """ - - return self.get("sign", sp_entity_id, default=[]) - - def filter(self, ava, sp_entity_id, mdstore, required=None, optional=None): + def filter(self, ava, sp_entity_id, mdstore=None, required=None, optional=None): """ What attribute and attribute values returns depends on what - the SP has said it wants in the request or in the metadata file and - what the IdP/AA wants to release. An assumption is that what the SP + the SP or the registration authority has said it wants in the request + or in the metadata file and what the IdP/AA wants to release. + An assumption is that what the SP or the registration authority asks for overrides whatever is in the metadata. But of course the IdP never releases anything it doesn't want to. :param ava: The information about the subject as a dictionary :param sp_entity_id: The entity ID of the SP - :param mdstore: A Metadata store :param required: Attributes that the SP requires in the assertion :param optional: Attributes that the SP regards as optional :return: A possibly modified AVA """ - _ava = None - - if not self.acs: # acs MUST have a value, fall back to default. + if mdstore is not None: + warn_msg = ( + "The mdstore parameter for saml2.assertion.Policy.filter " + "is deprecated; " + "instead, initialize the Policy object setting the mds param." + ) + logger.warning(warn_msg) + _warn(warn_msg, DeprecationWarning) + + # acs MUST have a value, fall back to default. + if not self.acs: self.acs = ac_factory() - _rest = self.get_entity_categories(sp_entity_id, mdstore, required) - if _rest: - _ava = filter_attribute_value_assertions(ava.copy(), _rest) + subject_ava = ava.copy() + + # entity category restrictions + _ent_rest = self.get_entity_categories(sp_entity_id, mds=mdstore, required=required) + if _ent_rest: + subject_ava = filter_attribute_value_assertions(subject_ava, _ent_rest) elif required or optional: logger.debug("required: %s, optional: %s", required, optional) - _ava = filter_on_attributes( - ava.copy(), required, optional, self.acs, - self.get_fail_on_missing_requested(sp_entity_id)) - - _rest = self.get_attribute_restrictions(sp_entity_id) - if _rest: - if _ava is None: - _ava = ava.copy() - _ava = filter_attribute_value_assertions(_ava, _rest) - elif _ava is None: - _ava = ava.copy() - - if _ava is None: - return {} - else: - return _ava + subject_ava = filter_on_attributes( + subject_ava, + required, + optional, + self.acs, + self.get_fail_on_missing_requested(sp_entity_id), + ) + + # attribute restrictions + _attr_rest = self.get_attribute_restrictions(sp_entity_id) + subject_ava = filter_attribute_value_assertions(subject_ava, _attr_rest) + + return subject_ava or {} def restrict(self, ava, sp_entity_id, metadata=None): - """ Identity attribute names are expected to be expressed in - the local lingo (== friendlyName) + """ Identity attribute names are expected to be expressed as FriendlyNames :return: A filtered ava according to the IdPs/AAs rules and the list of required/optional attributes according to the SP. If the requirements can't be met an exception is raised. """ - if metadata: - spec = metadata.attribute_requirement(sp_entity_id) - if spec: - return self.filter(ava, sp_entity_id, metadata, - spec["required"], spec["optional"]) - - return self.filter(ava, sp_entity_id, metadata, [], []) + if metadata is not None: + warn_msg = ( + "The metadata parameter for saml2.assertion.Policy.restrict " + "is deprecated and ignored; " + "instead, initialize the Policy object setting the mds param." + ) + logger.warning(warn_msg) + _warn(warn_msg, DeprecationWarning) + + metadata_store = metadata or self.metadata_store + spec = ( + metadata_store.attribute_requirement(sp_entity_id) or {} + if metadata_store + else {} + ) + return self.filter( + ava, + sp_entity_id, + required=spec.get("required"), + optional=spec.get("optional"), + ) def conditions(self, sp_entity_id): """ Return a saml.Condition instance @@ -582,27 +590,18 @@ class Policy(object): :param sp_entity_id: The SP entity ID :return: A saml.Condition instance """ - return factory(saml.Conditions, - not_before=instant(), - # How long might depend on who's getting it - not_on_or_after=self.not_on_or_after(sp_entity_id), - audience_restriction=[factory( - saml.AudienceRestriction, - audience=[factory(saml.Audience, - text=sp_entity_id)])]) - - def entity_category_attributes(self, ec): - # TODO: Not used. Remove? - if not self._restrictions: - return None - - ec_maps = self._restrictions["default"]["entity_categories"] - for ec_map in ec_maps: - try: - return ec_map[ec] - except KeyError: - pass - return [] + return factory( + saml.Conditions, + not_before=instant(), + # How long might depend on who's getting it + not_on_or_after=self.not_on_or_after(sp_entity_id), + audience_restriction=[ + factory( + saml.AudienceRestriction, + audience=[factory(saml.Audience, text=sp_entity_id)], + ), + ], + ) class EntityCategories(object): @@ -740,12 +739,10 @@ def do_subject_confirmation(policy, sp_entity_id, key_info=None, **treeargs): def do_subject(policy, sp_entity_id, name_id, **farg): - # specs = farg['subject_confirmation'] if isinstance(specs, list): - res = [do_subject_confirmation(policy, sp_entity_id, **s) for s in - specs] + res = [do_subject_confirmation(policy, sp_entity_id, **s) for s in specs] else: res = [do_subject_confirmation(policy, sp_entity_id, **specs)] @@ -833,17 +830,16 @@ class Assertion(dict): return _ass - def apply_policy(self, sp_entity_id, policy, metadata=None): + def apply_policy(self, sp_entity_id, policy): """ Apply policy to the assertion I'm representing :param sp_entity_id: The SP entity ID :param policy: The policy - :param metadata: Metadata to use :return: The resulting AVA after the policy is applied """ policy.acs = self.acs - ava = policy.restrict(self, sp_entity_id, metadata) + ava = policy.restrict(self, sp_entity_id) for key, val in list(self.items()): if key in ava: diff --git a/src/saml2/server.py b/src/saml2/server.py index e8b1d737..50250c3a 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -347,9 +347,9 @@ class Server(Entity): ast = Assertion(identity) ast.acs = self.config.getattr("attribute_converters") if policy is None: - policy = Policy() + policy = Policy(mds=self.metadata) try: - ast.apply_policy(sp_entity_id, policy, self.metadata) + ast.apply_policy(sp_entity_id, policy) except MissingValue as exc: if not best_effort: return self.create_error_response(in_response_to, consumer_url, @@ -537,9 +537,9 @@ class Server(Entity): _issuer = self._issuer(issuer) ast = Assertion(identity) if policy: - ast.apply_policy(sp_entity_id, policy, self.metadata) + ast.apply_policy(sp_entity_id, policy) else: - policy = Policy() + policy = Policy(mds=self.metadata) if attributes: restr = restriction_from_attribute_spec(attributes) diff --git a/tests/test_20_assertion.py b/tests/test_20_assertion.py index dc501291..da7e70fd 100644 --- a/tests/test_20_assertion.py +++ b/tests/test_20_assertion.py @@ -218,7 +218,7 @@ def test_ava_filter_1(): "surName": "Jeter", "mail": "derek@example.com"} - ava = r.filter(ava, "urn:mace:umu.se:saml:roland:sp", None, None) + ava = r.filter(ava, "urn:mace:umu.se:saml:roland:sp") assert _eq(list(ava.keys()), ["givenName", "surName"]) ava = {"givenName": "Derek", @@ -247,8 +247,7 @@ def test_ava_filter_2(): ava = {"givenName": "Derek", "sn": "Jeter", "mail": "derek@example.com"} # mail removed because it doesn't match the regular expression - _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None, [mail], - [gn, sn]) + _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[mail], optional=[gn, sn]) assert _eq(sorted(list(_ava.keys())), ["givenName", 'sn']) @@ -256,8 +255,7 @@ def test_ava_filter_2(): # it wasn't there to begin with try: - policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None, - [gn, sn, mail]) + policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[gn, sn, mail]) except MissingValue: pass @@ -287,8 +285,7 @@ def test_ava_filter_dont_fail(): # mail removed because it doesn't match the regular expression # So it should fail if the 'fail_on_ ...' flag wasn't set - _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None, - [mail], [gn, sn]) + _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[mail], optional=[gn, sn]) assert _ava @@ -296,8 +293,7 @@ def test_ava_filter_dont_fail(): "surName": "Jeter"} # it wasn't there to begin with - _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', - None, [gn, sn, mail]) + _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[gn, sn, mail]) assert _ava @@ -633,7 +629,7 @@ def test_filter_ava_0(): "mail": ["derek@nyy.mlb.com"]} # No restrictions apply - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], []) + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") assert _eq(sorted(list(ava.keys())), ["givenName", "mail", "surName"]) assert ava["givenName"] == ["Derek"] @@ -660,7 +656,7 @@ def test_filter_ava_1(): "mail": ["derek@nyy.mlb.com"]} # No restrictions apply - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], []) + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") assert _eq(sorted(list(ava.keys())), ["givenName", "surName"]) assert ava["givenName"] == ["Derek"] @@ -685,7 +681,7 @@ def test_filter_ava_2(): "mail": ["derek@nyy.mlb.com"]} # No restrictions apply - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], []) + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") assert _eq(list(ava.keys()), ["mail"]) assert ava["mail"] == ["derek@nyy.mlb.com"] @@ -709,7 +705,7 @@ def test_filter_ava_3(): "mail": ["derek@nyy.mlb.com", "dj@example.com"]} # No restrictions apply - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], []) + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") assert _eq(list(ava.keys()), ["mail"]) assert ava["mail"] == ["dj@example.com"] @@ -733,7 +729,7 @@ def test_filter_ava_4(): "mail": ["derek@nyy.mlb.com", "dj@example.com"]} # No restrictions apply - ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", [], []) + ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp") assert _eq(sorted(list(ava.keys())), ['mail', 'givenName', 'surName']) assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"]) @@ -772,7 +768,7 @@ def test_req_opt(): 'uid': 'rohe0002', 'edupersonaffiliation': 'staff'} sp_entity_id = "urn:mace:example.com:saml:curt:sp" - fava = policy.filter(ava, sp_entity_id, None, req, opt) + fava = policy.filter(ava, sp_entity_id, required=req, optional=opt) assert fava @@ -872,22 +868,27 @@ def test_assertion_with_noop_attribute_conv(): def test_filter_ava_5(): - mds = MetadataStore(ATTRCONV, sec_config, - disable_ssl_certificate_validation=True) + mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["1"]) - policy = Policy({ + policy_conf = { "default": { "lifetime": {"minutes": 15}, "attribute_restrictions": None, # means all I have "entity_categories": ["swamid", "edugain"] } - }) - - ava = {"givenName": ["Derek"], "surName": ["Jeter"], - "mail": ["derek@nyy.mlb.com", "dj@example.com"]} - - ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", mdstore=mds, required=[], optional=[]) + } + policy = Policy(restrictions=policy_conf, mds=mds) + + ava = { + "givenName": ["Derek"], + "surName": ["Jeter"], + "mail": [ + "derek@nyy.mlb.com", + "dj@example.com", + ], + } + ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp") # using entity_categories means there *always* are restrictions # in this case the only allowed attribute is eduPersonTargetedID @@ -896,37 +897,40 @@ def test_filter_ava_5(): def test_filter_ava_registration_authority_1(): - mds = MetadataStore(ATTRCONV, sec_config, - disable_ssl_certificate_validation=True) + mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp(METADATACONF["1"]) - config.metadata = mds - policy = Policy({ + policy_conf = { "default": { "lifetime": {"minutes": 15}, "attribute_restrictions": None, }, - "registration_authorities": { - "http://rr.aai.switch.ch/": { - "attribute_restrictions": { - "givenName": None, - "surName": None, - } + "http://rr.aai.switch.ch/": { + "attribute_restrictions": { + "givenName": None, + "surName": None, } } - }, config=config) - - attributes = {"givenName": ["Derek"], "surName": ["Jeter"], - "mail": ["derek@nyy.mlb.com", "dj@example.com"]} + } + policy = Policy(restrictions=policy_conf, mds=mds) + + attributes = { + "givenName": ["Derek"], + "surName": ["Jeter"], + "mail": [ + "derek@nyy.mlb.com", + "dj@example.com", + ], + } # SP registered with http://rr.aai.switch.ch/ - ava = policy.filter(attributes, "https://aai-idp.unibe.ch/idp/shibboleth", mdstore=mds, required=[], optional=[]) + ava = policy.filter(attributes, "https://aai-idp.unibe.ch/idp/shibboleth") assert _eq(sorted(list(ava.keys())), ["givenName", "surName"]) assert ava["givenName"] == ["Derek"] assert ava["surName"] == ["Jeter"] # SP not registered with http://rr.aai.switch.ch/ - ava = policy.filter(attributes, "https://alpha.kib.ki.se/shibboleth", mdstore=mds, required=[], optional=[]) + ava = policy.filter(attributes, "https://alpha.kib.ki.se/shibboleth") assert _eq(sorted(list(ava.keys())), ["givenName", "mail", "surName"]) assert ava["givenName"] == ["Derek"] assert ava["surName"] == ["Jeter"] @@ -936,13 +940,16 @@ def test_filter_ava_registration_authority_1(): def test_assertion_with_zero_attributes(): ava = {} ast = Assertion(ava) - policy = Policy({ + + policy_conf = { "default": { "lifetime": {"minutes": 240}, "attribute_restrictions": None, # means all I have "name_form": NAME_FORMAT_URI }, - }) + } + policy = Policy(policy_conf) + name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar") issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY) farg = add_path( diff --git a/tests/test_37_entity_categories.py b/tests/test_37_entity_categories.py index 839030fd..a24a4feb 100644 --- a/tests/test_37_entity_categories.py +++ b/tests/test_37_entity_categories.py @@ -25,37 +25,47 @@ def _eq(l1, l2): def test_filter_ava(): - policy = Policy({ + policy_conf = { "default": { "lifetime": {"minutes": 15}, # "attribute_restrictions": None # means all I have "entity_categories": ["swamid"] } - }) + } + policy = Policy(policy_conf, MDS) - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com", "dj@example.com"], "c": ["USA"]} + ava = { + "givenName": ["Derek"], + "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com", "dj@example.com"], + "c": ["USA"] + } - ava = policy.filter(ava, "https://connect.sunet.se/shibboleth", MDS) + ava = policy.filter(ava, "https://connect.sunet.se/shibboleth") assert _eq(list(ava.keys()), ['mail', 'givenName', 'sn', 'c']) assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"]) def test_filter_ava2(): - policy = Policy({ + policy_conf = { "default": { "lifetime": {"minutes": 15}, # "attribute_restrictions": None # means all I have "entity_categories": ["refeds", "edugain"] } - }) + } + policy = Policy(policy_conf, MDS) - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "c": ["USA"], - "eduPersonTargetedID": "foo!bar!xyz"} + ava = { + "givenName": ["Derek"], + "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], + "c": ["USA"], + "eduPersonTargetedID": "foo!bar!xyz" + } - ava = policy.filter(ava, "https://connect.sunet.se/shibboleth", MDS) + ava = policy.filter(ava, "https://connect.sunet.se/shibboleth") # Mismatch, policy deals with eduGAIN, metadata says SWAMID # So only minimum should come out @@ -63,96 +73,120 @@ def test_filter_ava2(): def test_filter_ava3(): - policy = Policy({ + mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) + mds.imp( + [ + { + "class": "saml2.mdstore.MetaDataFile", + "metadata": [(full_path("entity_cat_sfs_hei.xml"),)] + } + ] + ) + + policy_conf = { "default": { "lifetime": {"minutes": 15}, # "attribute_restrictions": None # means all I have "entity_categories": ["swamid"] } - }) + } + policy = Policy(policy_conf, mds) + + ava = { + "givenName": ["Derek"], + "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], + "c": ["USA"], + "eduPersonTargetedID": "foo!bar!xyz", + "norEduPersonNIN": "19800101134" + } + + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") + assert _eq(list(ava.keys()), ['eduPersonTargetedID', "norEduPersonNIN"]) + +def test_filter_ava4(): mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) mds.imp([{"class": "saml2.mdstore.MetaDataFile", - "metadata": [(full_path("entity_cat_sfs_hei.xml"),)]}]) - - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "c": ["USA"], - "eduPersonTargetedID": "foo!bar!xyz", - "norEduPersonNIN": "19800101134"} - - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds) - - assert _eq(list(ava.keys()), ['eduPersonTargetedID', "norEduPersonNIN"]) - + "metadata": [(full_path("entity_cat_re_nren.xml"),)]}]) -def test_filter_ava4(): - policy = Policy({ + policy_conf = { "default": { "lifetime": {"minutes": 15}, # "attribute_restrictions": None # means all I have "entity_categories": ["swamid"] } - }) - - mds = MetadataStore(ATTRCONV, sec_config, - disable_ssl_certificate_validation=True) - mds.imp([{"class": "saml2.mdstore.MetaDataFile", - "metadata": [(full_path("entity_cat_re_nren.xml"),)]}]) + } + policy = Policy(policy_conf, mds) - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "c": ["USA"], - "eduPersonTargetedID": "foo!bar!xyz", - "norEduPersonNIN": "19800101134"} + ava = { + "givenName": ["Derek"], + "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], + "c": ["USA"], + "eduPersonTargetedID": "foo!bar!xyz", + "norEduPersonNIN": "19800101134" + } - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds) - - assert _eq(list(ava.keys()), - ['eduPersonTargetedID', "givenName", "c", "mail", - "sn"]) + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") + assert _eq( + list(ava.keys()), ['eduPersonTargetedID', "givenName", "c", "mail", "sn"] + ) def test_filter_ava5(): + mds = MetadataStore(ATTRCONV, sec_config, + disable_ssl_certificate_validation=True) + mds.imp([{"class": "saml2.mdstore.MetaDataFile", + "metadata": [(full_path("entity_cat_re.xml"),)]}]) + policy = Policy({ "default": { "lifetime": {"minutes": 15}, # "attribute_restrictions": None # means all I have "entity_categories": ["swamid"] } - }) + }, mds) - mds = MetadataStore(ATTRCONV, sec_config, - disable_ssl_certificate_validation=True) - mds.imp([{"class": "saml2.mdstore.MetaDataFile", - "metadata": [(full_path("entity_cat_re.xml"),)]}]) - - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "c": ["USA"], - "eduPersonTargetedID": "foo!bar!xyz", - "norEduPersonNIN": "19800101134"} + ava = { + "givenName": ["Derek"], + "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], + "c": ["USA"], + "eduPersonTargetedID": "foo!bar!xyz", + "norEduPersonNIN": "19800101134" + } - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds) + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") assert _eq(list(ava.keys()), ['eduPersonTargetedID']) def test_idp_policy_filter(): with closing(Server("idp_conf_ec")) as idp: - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "c": ["USA"], - "eduPersonTargetedID": "foo!bar!xyz", - "norEduPersonNIN": "19800101134"} + ava = { + "givenName": ["Derek"], + "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], + "c": ["USA"], + "eduPersonTargetedID": "foo!bar!xyz", + "norEduPersonNIN": "19800101134" + } policy = idp.config.getattr("policy", "idp") - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", - idp.metadata) - - print(ava) - assert list(ava.keys()) == [ - "eduPersonTargetedID"] # because no entity category + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") + # because no entity category + assert list(ava.keys()) == ["eduPersonTargetedID"] def test_entity_category_import_from_path(): + mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True) + # The file entity_cat_rs.xml contains the SAML metadata for an SP + # tagged with the REFEDs R&S entity category. + mds.imp([{"class": "saml2.mdstore.MetaDataFile", + "metadata": [(full_path("entity_cat_rs.xml"),)]}]) + # The entity category module myentitycategory.py is in the tests # directory which is on the standard module search path. # The module uses a custom interpretation of the REFEDs R&S entity category @@ -162,34 +196,35 @@ def test_entity_category_import_from_path(): "lifetime": {"minutes": 15}, "entity_categories": ["myentitycategory"] } - }) - - mds = MetadataStore(ATTRCONV, sec_config, - disable_ssl_certificate_validation=True) - - # The file entity_cat_rs.xml contains the SAML metadata for an SP - # tagged with the REFEDs R&S entity category. - mds.imp([{"class": "saml2.mdstore.MetaDataFile", - "metadata": [(full_path("entity_cat_rs.xml"),)]}]) - - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "displayName": "Derek Jeter", - "mail": ["derek@nyy.mlb.com"], "c": ["USA"], - "eduPersonTargetedID": "foo!bar!xyz", - "eduPersonUniqueId": "R13ET7UD68K0HGR153KE@my.org", - "eduPersonScopedAffiliation": "member@my.org", - "eduPersonPrincipalName": "user01@my.org", - "norEduPersonNIN": "19800101134"} - - ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds) + }, mds) + + ava = { + "givenName": ["Derek"], + "sn": ["Jeter"], + "displayName": "Derek Jeter", + "mail": ["derek@nyy.mlb.com"], + "c": ["USA"], + "eduPersonTargetedID": "foo!bar!xyz", + "eduPersonUniqueId": "R13ET7UD68K0HGR153KE@my.org", + "eduPersonScopedAffiliation": "member@my.org", + "eduPersonPrincipalName": "user01@my.org", + "norEduPersonNIN": "19800101134" + } + + ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp") # We expect c and norEduPersonNIN to be filtered out since they are not # part of the custom entity category. - assert _eq(list(ava.keys()), - ["eduPersonTargetedID", "eduPersonPrincipalName", - "eduPersonUniqueId", "displayName", "givenName", - "eduPersonScopedAffiliation", "mail", "sn"]) - - -if __name__ == "__main__": - test_filter_ava3() + assert _eq( + list(ava.keys()), + [ + "eduPersonTargetedID", + "eduPersonPrincipalName", + "eduPersonUniqueId", + "displayName", + "givenName", + "eduPersonScopedAffiliation", + "mail", + "sn" + ] + ) |