summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2020-10-30 17:43:40 +0200
committerGitHub <noreply@github.com>2020-10-30 17:43:40 +0200
commit59d6fa5df06989525d2d7e5b8762bbfa3485ab42 (patch)
tree3ccc40a56e5ace562257d70002b7f88dab26b902
parent9dae13cd1f0af7c1aec531da6bc9c87c6c71eaaa (diff)
parent264101909354707f613c3ecc2d8d0ad0ddb7fa77 (diff)
downloadpysaml2-59d6fa5df06989525d2d7e5b8762bbfa3485ab42.tar.gz
Merge pull request #729 from johanlundberg/feature_registry_authority_policy
Specify policy configurations based on the registration authority This changeset is backwards compatible, but to get the new features (restrictions based on the registration authority) one needs to properly upgrade, by initializing the Policy object with a metadata store. Usage that involves loading the configuration through the `saml2.config.Config` object get this automatically (this includes, the `saml2.server.Server` (IdP) object and the `saml2.client_base.Base` and `saml2.client.Saml2Client` (SP) objects.)
-rw-r--r--docs/howto/config.rst50
-rw-r--r--src/saml2/assertion.py445
-rw-r--r--src/saml2/config.py129
-rw-r--r--src/saml2/entity_category/swamid.py2
-rw-r--r--src/saml2/mdstore.py44
-rw-r--r--src/saml2/metadata.py2
-rw-r--r--src/saml2/server.py10
-rw-r--r--tests/otest_61_makemeta.py44
-rw-r--r--tests/test_20_assertion.py134
-rw-r--r--tests/test_30_mdstore.py15
-rw-r--r--tests/test_37_entity_categories.py215
-rw-r--r--tests/test_39_metadata.py6
-rw-r--r--tests/test_83_md_extensions.py6
-rwxr-xr-xtools/make_metadata.py2
14 files changed, 633 insertions, 471 deletions
diff --git a/docs/howto/config.rst b/docs/howto/config.rst
index e1c2025f..88b0f6fa 100644
--- a/docs/howto/config.rst
+++ b/docs/howto/config.rst
@@ -517,28 +517,47 @@ policy
""""""
If the server is an IdP and/or an AA, then there might be reasons to do things
-differently depending on who is asking; this is where that is specified.
-The keys are 'default' and SP entity identifiers. Default is used whenever
-there is no entry for a specific SP. The reasoning is also that if there is
-no default and only SP entity identifiers as keys, then the server will only
-accept connections from the specified SPs.
+differently depending on who is asking (which is the requesting service); the
+policy is where this behaviour is specified.
+
+The keys are SP entity identifiers, Registration Authority names, or 'default'.
+First, the policy for the requesting service is looked up using the SP entityID.
+If no such policy is found, and if the SP metadata includes a Registration
+Authority then a policy for the registration authority is looked up using the
+Registration Authority name. If no policy is found, then the 'default' is looked
+up. If there is no default and only SP entity identifiers as keys, then the
+server will only accept connections from the specified SPs.
+
An example might be::
"service": {
"idp": {
"policy": {
- "default": {
- "lifetime": {"minutes":15},
- "attribute_restrictions": None, # means all I have
- "name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
- },
+ # a policy for a service
"urn:mace:example.com:saml:roland:sp": {
"lifetime": {"minutes": 5},
"attribute_restrictions": {
"givenName": None,
"surName": None,
- }
- }
+ },
+ },
+
+ # a policy for a registration authority
+ "http://www.swamid.se/": {
+ "attribute_restrictions": {
+ "givenName": None,
+ },
+ },
+
+ # the policy for all other services
+ "default": {
+ "lifetime": {"minutes":15},
+ "attribute_restrictions": None, # means all I have
+ "name_form": "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
+ "entity_categories": [
+ "edugain",
+ ],
+ },
}
}
}
@@ -561,6 +580,13 @@ An example might be::
Using this information, the attribute name in the data source will be mapped to
the friendly name, and the saml attribute name will be taken from the uri/oid
defined in the attribute map.
+*nameid_format*
+ Which nameid format that should be used. Defaults to
+ `urn:oasis:names:tc:SAML:2.0:nameid-format:transient`.
+*entity_categories*
+ Entity categories to apply.
+*sign*
+ Possible choices: "response", "assertion", "on_demand"
If restrictions on values are deemed necessary, those are represented by
regular expressions.::
diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py
index b47f14b9..3728c7b8 100644
--- a/src/saml2/assertion.py
+++ b/src/saml2/assertion.py
@@ -5,6 +5,7 @@ import importlib
import logging
import re
import six
+from warnings import warn as _warn
from saml2 import saml
from saml2 import xmlenc
@@ -18,6 +19,7 @@ from saml2.saml import NAME_FORMAT_URI
from saml2.time_util import instant
from saml2.time_util import in_a_while
+
logger = logging.getLogger(__name__)
@@ -276,154 +278,114 @@ def restriction_from_attribute_spec(attributes):
return restr
-def post_entity_categories(maps, **kwargs):
- restrictions = {}
- try:
- required = [d['friendly_name'].lower() for d in kwargs['required']]
- except (KeyError, TypeError):
- required = []
+def compile(restrictions):
+ """ This is only for IdPs or AAs, and it's about limiting what
+ is returned to the SP.
+ In the configuration file, restrictions on which values that
+ can be returned are specified with the help of regular expressions.
+ This function goes through and pre-compiles the regular expressions.
- if kwargs["mds"]:
- if "sp_entity_id" in kwargs:
- ecs = kwargs["mds"].entity_categories(kwargs["sp_entity_id"])
- for ec_map in maps:
- for key, (atlist, only_required) in ec_map.items():
- if key == "": # always released
- attrs = atlist
- elif isinstance(key, tuple):
- if only_required:
- attrs = [a for a in atlist if a in required]
- else:
- attrs = atlist
- for _key in key:
- if _key not in ecs:
- attrs = []
- break
- elif key in ecs:
- if only_required:
- attrs = [a for a in atlist if a in required]
- else:
- attrs = atlist
- else:
- attrs = []
+ :param restrictions: policy configuration
+ :return: The assertion with the string specification replaced with
+ a compiled regular expression.
+ """
+ for who, spec in restrictions.items():
+ spec = spec or {}
- for attr in attrs:
- restrictions[attr] = None
- else:
- for ec_map in maps:
- for attr in ec_map[""]:
- restrictions[attr] = None
+ entity_categories = spec.get("entity_categories", [])
+ ecs = []
+ for cat in entity_categories:
+ try:
+ _mod = importlib.import_module(cat)
+ except ImportError:
+ _mod = importlib.import_module("saml2.entity_category.%s" % cat)
+
+ _ec = {}
+ for key, items in _mod.RELEASE.items():
+ alist = [k.lower() for k in items]
+ _only_required = getattr(_mod, "ONLY_REQUIRED", {}).get(key, False)
+ _ec[key] = (alist, _only_required)
+ ecs.append(_ec)
+ spec["entity_categories"] = ecs or None
+
+ attribute_restrictions = spec.get("attribute_restrictions") or {}
+ _attribute_restrictions = {}
+ for key, values in attribute_restrictions.items():
+ lkey = key.lower()
+ values = [] if not values else values
+ _attribute_restrictions[lkey] = (
+ [re.compile(value) for value in values] or None
+ )
+ spec["attribute_restrictions"] = _attribute_restrictions or None
return restrictions
class Policy(object):
- """ handles restrictions on assertions """
+ """Handles restrictions on assertions."""
- def __init__(self, restrictions=None):
- if restrictions:
- self.compile(restrictions)
- else:
- self._restrictions = None
+ def __init__(self, restrictions=None, mds=None):
+ self.metadata_store = mds
+ self._restrictions = self.setup_restrictions(restrictions)
+ logger.debug("policy restrictions: %s", self._restrictions)
self.acs = []
- def compile(self, restrictions):
- """ This is only for IdPs or AAs, and it's about limiting what
- is returned to the SP.
- In the configuration file, restrictions on which values that
- can be returned are specified with the help of regular expressions.
- This function goes through and pre-compiles the regular expressions.
-
- :param restrictions:
- :return: The assertion with the string specification replaced with
- a compiled regular expression.
- """
-
- self._restrictions = copy.deepcopy(restrictions)
-
- for who, spec in self._restrictions.items():
- if spec is None:
- continue
- try:
- items = spec["entity_categories"]
- except KeyError:
- pass
- else:
- ecs = []
- for cat in items:
- try:
- _mod = importlib.import_module(cat)
- except ImportError:
- _mod = importlib.import_module(
- "saml2.entity_category.%s" % cat)
- _ec = {}
- for key, items in _mod.RELEASE.items():
- alist = [k.lower() for k in items]
- try:
- _only_required = _mod.ONLY_REQUIRED[key]
- except (AttributeError, KeyError):
- _only_required = False
- _ec[key] = (alist, _only_required)
- ecs.append(_ec)
- spec["entity_categories"] = ecs
- try:
- restr = spec["attribute_restrictions"]
- except KeyError:
- continue
-
- if restr is None:
- continue
-
- _are = {}
- for key, values in restr.items():
- if not values:
- _are[key.lower()] = None
- continue
-
- _are[key.lower()] = [re.compile(value) for value in values]
- spec["attribute_restrictions"] = _are
- logger.debug("policy restrictions: %s", self._restrictions)
+ def setup_restrictions(self, restrictions=None):
+ if restrictions is None:
+ return None
- return self._restrictions
+ restrictions = copy.deepcopy(restrictions)
+ restrictions = compile(restrictions)
+ return restrictions
- def get(self, attribute, sp_entity_id, default=None, post_func=None,
- **kwargs):
+ def get(self, attribute, sp_entity_id, default=None):
"""
:param attribute:
:param sp_entity_id:
:param default:
- :param post_func:
:return:
"""
if not self._restrictions:
return default
- try:
- try:
- val = self._restrictions[sp_entity_id][attribute]
- except KeyError:
- try:
- val = self._restrictions["default"][attribute]
- except KeyError:
- val = None
- except KeyError:
- val = None
-
- if val is None:
- return default
- elif post_func:
- return post_func(val, sp_entity_id=sp_entity_id, **kwargs)
- else:
- return val
+ ra_info = (
+ self.metadata_store.registration_info(sp_entity_id) or {}
+ if self.metadata_store is not None
+ else {}
+ )
+ ra_entity_id = ra_info.get("registration_authority")
+
+ sp_restrictions = self._restrictions.get(sp_entity_id)
+ ra_restrictions = self._restrictions.get(ra_entity_id)
+ default_restrictions = (
+ self._restrictions.get("default")
+ or self._restrictions.get("")
+ )
+ restrictions = (
+ sp_restrictions
+ if sp_restrictions is not None
+ else ra_restrictions
+ if ra_restrictions is not None
+ else default_restrictions
+ if default_restrictions is not None
+ else {}
+ )
+
+ attribute_restriction = restrictions.get(attribute)
+ restriction = (
+ attribute_restriction
+ if attribute_restriction is not None
+ else default
+ )
+ return restriction
def get_nameid_format(self, sp_entity_id):
""" Get the NameIDFormat to used for the entity id
:param: The SP entity ID
:retur: The format
"""
- return self.get("nameid_format", sp_entity_id,
- saml.NAMEID_FORMAT_TRANSIENT)
+ return self.get("nameid_format", sp_entity_id, saml.NAMEID_FORMAT_TRANSIENT)
def get_name_form(self, sp_entity_id):
""" Get the NameFormat to used for the entity id
@@ -431,7 +393,7 @@ class Policy(object):
:retur: The format
"""
- return self.get("name_form", sp_entity_id, NAME_FORMAT_URI)
+ return self.get("name_form", sp_entity_id, default=NAME_FORMAT_URI)
def get_lifetime(self, sp_entity_id):
""" The lifetime of the assertion
@@ -458,32 +420,80 @@ class Policy(object):
:return: The restrictions
"""
- return self.get("fail_on_missing_requested", sp_entity_id, True)
+ return self.get("fail_on_missing_requested", sp_entity_id, default=True)
- def entity_category_attributes(self, ec):
- if not self._restrictions:
- return None
+ def get_sign(self, sp_entity_id):
+ """
+ Possible choices
+ "sign": ["response", "assertion", "on_demand"]
- ec_maps = self._restrictions["default"]["entity_categories"]
- for ec_map in ec_maps:
- try:
- return ec_map[ec]
- except KeyError:
- pass
- return []
+ :param sp_entity_id:
+ :return:
+ """
+
+ return self.get("sign", sp_entity_id, default=[])
- def get_entity_categories(self, sp_entity_id, mds, required):
+ def get_entity_categories(self, sp_entity_id, mds=None, required=None):
"""
:param sp_entity_id:
- :param mds: MetadataStore instance
+ :param required: required attributes
:return: A dictionary with restrictions
"""
- kwargs = {"mds": mds, 'required': required}
+ if mds is not None:
+ warn_msg = (
+ "The mds parameter for saml2.assertion.Policy.get_entity_categories "
+ "is deprecated; "
+ "instead, initialize the Policy object setting the mds param."
+ )
+ logger.warning(warn_msg)
+ _warn(warn_msg, DeprecationWarning)
+
+ def post_entity_categories(maps, sp_entity_id=None, mds=None, required=None):
+ restrictions = {}
+ required = [d['friendly_name'].lower() for d in (required or [])]
+
+ if mds:
+ ecs = mds.entity_categories(sp_entity_id)
+ for ec_map in maps:
+ for key, (atlist, only_required) in ec_map.items():
+ if key == "": # always released
+ attrs = atlist
+ elif isinstance(key, tuple):
+ if only_required:
+ attrs = [a for a in atlist if a in required]
+ else:
+ attrs = atlist
+ for _key in key:
+ if _key not in ecs:
+ attrs = []
+ break
+ elif key in ecs:
+ if only_required:
+ attrs = [a for a in atlist if a in required]
+ else:
+ attrs = atlist
+ else:
+ attrs = []
+
+ for attr in attrs:
+ restrictions[attr] = None
- return self.get("entity_categories", sp_entity_id, default={},
- post_func=post_entity_categories, **kwargs)
+ return restrictions
+
+ sentinel = object()
+ result1 = self.get("entity_categories", sp_entity_id, default=sentinel)
+ if result1 is sentinel:
+ return {}
+
+ result2 = post_entity_categories(
+ result1,
+ sp_entity_id=sp_entity_id,
+ mds=(mds or self.metadata_store),
+ required=required,
+ )
+ return result2
def not_on_or_after(self, sp_entity_id):
""" When the assertion stops being valid, should not be
@@ -495,63 +505,84 @@ class Policy(object):
return in_a_while(**self.get_lifetime(sp_entity_id))
- def filter(self, ava, sp_entity_id, mdstore, required=None, optional=None):
+ def filter(self, ava, sp_entity_id, mdstore=None, required=None, optional=None):
""" What attribute and attribute values returns depends on what
- the SP has said it wants in the request or in the metadata file and
- what the IdP/AA wants to release. An assumption is that what the SP
+ the SP or the registration authority has said it wants in the request
+ or in the metadata file and what the IdP/AA wants to release.
+ An assumption is that what the SP or the registration authority
asks for overrides whatever is in the metadata. But of course the
IdP never releases anything it doesn't want to.
:param ava: The information about the subject as a dictionary
:param sp_entity_id: The entity ID of the SP
- :param mdstore: A Metadata store
:param required: Attributes that the SP requires in the assertion
:param optional: Attributes that the SP regards as optional
:return: A possibly modified AVA
"""
- _ava = None
-
- if not self.acs: # acs MUST have a value, fall back to default.
+ if mdstore is not None:
+ warn_msg = (
+ "The mdstore parameter for saml2.assertion.Policy.filter "
+ "is deprecated; "
+ "instead, initialize the Policy object setting the mds param."
+ )
+ logger.warning(warn_msg)
+ _warn(warn_msg, DeprecationWarning)
+
+ # acs MUST have a value, fall back to default.
+ if not self.acs:
self.acs = ac_factory()
- _rest = self.get_entity_categories(sp_entity_id, mdstore, required)
- if _rest:
- _ava = filter_attribute_value_assertions(ava.copy(), _rest)
+ subject_ava = ava.copy()
+
+ # entity category restrictions
+ _ent_rest = self.get_entity_categories(sp_entity_id, mds=mdstore, required=required)
+ if _ent_rest:
+ subject_ava = filter_attribute_value_assertions(subject_ava, _ent_rest)
elif required or optional:
logger.debug("required: %s, optional: %s", required, optional)
- _ava = filter_on_attributes(
- ava.copy(), required, optional, self.acs,
- self.get_fail_on_missing_requested(sp_entity_id))
-
- _rest = self.get_attribute_restrictions(sp_entity_id)
- if _rest:
- if _ava is None:
- _ava = ava.copy()
- _ava = filter_attribute_value_assertions(_ava, _rest)
- elif _ava is None:
- _ava = ava.copy()
-
- if _ava is None:
- return {}
- else:
- return _ava
+ subject_ava = filter_on_attributes(
+ subject_ava,
+ required,
+ optional,
+ self.acs,
+ self.get_fail_on_missing_requested(sp_entity_id),
+ )
+
+ # attribute restrictions
+ _attr_rest = self.get_attribute_restrictions(sp_entity_id)
+ subject_ava = filter_attribute_value_assertions(subject_ava, _attr_rest)
+
+ return subject_ava or {}
def restrict(self, ava, sp_entity_id, metadata=None):
- """ Identity attribute names are expected to be expressed in
- the local lingo (== friendlyName)
+ """ Identity attribute names are expected to be expressed as FriendlyNames
:return: A filtered ava according to the IdPs/AAs rules and
the list of required/optional attributes according to the SP.
If the requirements can't be met an exception is raised.
"""
- if metadata:
- spec = metadata.attribute_requirement(sp_entity_id)
- if spec:
- return self.filter(ava, sp_entity_id, metadata,
- spec["required"], spec["optional"])
-
- return self.filter(ava, sp_entity_id, metadata, [], [])
+ if metadata is not None:
+ warn_msg = (
+ "The metadata parameter for saml2.assertion.Policy.restrict "
+ "is deprecated and ignored; "
+ "instead, initialize the Policy object setting the mds param."
+ )
+ logger.warning(warn_msg)
+ _warn(warn_msg, DeprecationWarning)
+
+ metadata_store = metadata or self.metadata_store
+ spec = (
+ metadata_store.attribute_requirement(sp_entity_id) or {}
+ if metadata_store
+ else {}
+ )
+ return self.filter(
+ ava,
+ sp_entity_id,
+ required=spec.get("required"),
+ optional=spec.get("optional"),
+ )
def conditions(self, sp_entity_id):
""" Return a saml.Condition instance
@@ -559,25 +590,18 @@ class Policy(object):
:param sp_entity_id: The SP entity ID
:return: A saml.Condition instance
"""
- return factory(saml.Conditions,
- not_before=instant(),
- # How long might depend on who's getting it
- not_on_or_after=self.not_on_or_after(sp_entity_id),
- audience_restriction=[factory(
- saml.AudienceRestriction,
- audience=[factory(saml.Audience,
- text=sp_entity_id)])])
-
- def get_sign(self, sp_entity_id):
- """
- Possible choices
- "sign": ["response", "assertion", "on_demand"]
-
- :param sp_entity_id:
- :return:
- """
-
- return self.get("sign", sp_entity_id, [])
+ return factory(
+ saml.Conditions,
+ not_before=instant(),
+ # How long might depend on who's getting it
+ not_on_or_after=self.not_on_or_after(sp_entity_id),
+ audience_restriction=[
+ factory(
+ saml.AudienceRestriction,
+ audience=[factory(saml.Audience, text=sp_entity_id)],
+ ),
+ ],
+ )
class EntityCategories(object):
@@ -685,11 +709,10 @@ def authn_statement(authn_class=None, authn_auth=None,
return res
-def do_subject_confirmation(policy, sp_entity_id, key_info=None, **treeargs):
+def do_subject_confirmation(not_on_or_after, key_info=None, **treeargs):
"""
- :param policy: Policy instance
- :param sp_entity_id: The entityid of the SP
+ :param not_on_or_after: not_on_or_after policy
:param subject_confirmation_method: How was the subject confirmed
:param address: The network address/location from which an attesting entity
can present the assertion.
@@ -706,7 +729,7 @@ def do_subject_confirmation(policy, sp_entity_id, key_info=None, **treeargs):
_sc = factory(saml.SubjectConfirmation, **treeargs)
_scd = _sc.subject_confirmation_data
- _scd.not_on_or_after = policy.not_on_or_after(sp_entity_id)
+ _scd.not_on_or_after = not_on_or_after
if _sc.method == saml.SCM_HOLDER_OF_KEY:
_scd.add_extension_element(key_info)
@@ -714,15 +737,13 @@ def do_subject_confirmation(policy, sp_entity_id, key_info=None, **treeargs):
return _sc
-def do_subject(policy, sp_entity_id, name_id, **farg):
- #
+def do_subject(not_on_or_after, name_id, **farg):
specs = farg['subject_confirmation']
if isinstance(specs, list):
- res = [do_subject_confirmation(policy, sp_entity_id, **s) for s in
- specs]
+ res = [do_subject_confirmation(not_on_or_after, **s) for s in specs]
else:
- res = [do_subject_confirmation(policy, sp_entity_id, **specs)]
+ res = [do_subject_confirmation(not_on_or_after, **specs)]
return factory(saml.Subject, name_id=name_id, subject_confirmation=res)
@@ -762,13 +783,11 @@ class Assertion(dict):
:return: An Assertion instance
"""
- if policy:
- _name_format = policy.get_name_form(sp_entity_id)
- else:
- _name_format = NAME_FORMAT_URI
+ _name_format = policy.get_name_form(sp_entity_id)
- attr_statement = saml.AttributeStatement(attribute=from_local(
- attrconvs, self, _name_format))
+ attr_statement = saml.AttributeStatement(
+ attribute=from_local(attrconvs, self, _name_format)
+ )
if encrypt == "attributes":
for attr in attr_statement.attribute:
@@ -794,11 +813,10 @@ class Assertion(dict):
else:
_authn_statement = None
- subject = do_subject(policy, sp_entity_id, name_id,
- **farg['subject'])
-
- _ass = assertion_factory(issuer=issuer, conditions=conds,
- subject=subject)
+ subject = do_subject(
+ policy.not_on_or_after(sp_entity_id), name_id, **farg['subject']
+ )
+ _ass = assertion_factory(issuer=issuer, conditions=conds, subject=subject)
if _authn_statement:
_ass.authn_statement = [_authn_statement]
@@ -808,17 +826,16 @@ class Assertion(dict):
return _ass
- def apply_policy(self, sp_entity_id, policy, metadata=None):
+ def apply_policy(self, sp_entity_id, policy):
""" Apply policy to the assertion I'm representing
:param sp_entity_id: The SP entity ID
:param policy: The policy
- :param metadata: Metadata to use
:return: The resulting AVA after the policy is applied
"""
policy.acs = self.acs
- ava = policy.restrict(self, sp_entity_id, metadata)
+ ava = policy.restrict(self, sp_entity_id)
for key, val in list(self.items()):
if key in ava:
diff --git a/src/saml2/config.py b/src/saml2/config.py
index 25218788..f28d7748 100644
--- a/src/saml2/config.py
+++ b/src/saml2/config.py
@@ -1,5 +1,3 @@
-#!/usr/bin/env python
-
import copy
import importlib
import logging
@@ -74,6 +72,10 @@ COMMON_ARGS = [
"allow_unknown_attributes",
"crypto_backend",
"delete_tmpfiles",
+ "endpoints",
+ "metadata",
+ "ui_info",
+ "name_id_format",
]
SP_ARGS = [
@@ -87,12 +89,9 @@ SP_ARGS = [
"want_assertions_or_response_signed",
"authn_requests_signed",
"name_form",
- "endpoints",
- "ui_info",
"discovery_response",
"allow_unsolicited",
"ecp",
- "name_id_format",
"name_id_policy_format",
"name_id_format_allow_create",
"logout_requests_signed",
@@ -117,10 +116,6 @@ AA_IDP_ARGS = [
"subject_data",
"sp",
"scope",
- "endpoints",
- "metadata",
- "ui_info",
- "name_id_format",
"domain",
"name_qualifier",
"edu_person_targeted_id",
@@ -133,22 +128,20 @@ AQ_ARGS = ["endpoints"]
AA_ARGS = ["attribute", "attribute_profile"]
COMPLEX_ARGS = ["attribute_converters", "metadata", "policy"]
-ALL = set(COMMON_ARGS + SP_ARGS + AA_IDP_ARGS + PDP_ARGS + COMPLEX_ARGS +
- AA_ARGS)
+ALL = set(COMMON_ARGS + SP_ARGS + AA_IDP_ARGS + PDP_ARGS + COMPLEX_ARGS + AA_ARGS)
SPEC = {
- "": COMMON_ARGS + COMPLEX_ARGS,
- "sp": COMMON_ARGS + COMPLEX_ARGS + SP_ARGS,
+ "": COMMON_ARGS + COMPLEX_ARGS,
+ "sp": COMMON_ARGS + COMPLEX_ARGS + SP_ARGS,
"idp": COMMON_ARGS + COMPLEX_ARGS + AA_IDP_ARGS,
- "aa": COMMON_ARGS + COMPLEX_ARGS + AA_IDP_ARGS + AA_ARGS,
+ "aa": COMMON_ARGS + COMPLEX_ARGS + AA_IDP_ARGS + AA_ARGS,
"pdp": COMMON_ARGS + COMPLEX_ARGS + PDP_ARGS,
- "aq": COMMON_ARGS + COMPLEX_ARGS + AQ_ARGS,
+ "aq": COMMON_ARGS + COMPLEX_ARGS + AQ_ARGS,
}
_RPA = [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT]
_PRA = [BINDING_HTTP_POST, BINDING_HTTP_REDIRECT, BINDING_HTTP_ARTIFACT]
-_SRPA = [BINDING_SOAP, BINDING_HTTP_REDIRECT, BINDING_HTTP_POST,
- BINDING_HTTP_ARTIFACT]
+_SRPA = [BINDING_SOAP, BINDING_HTTP_REDIRECT, BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT]
PREFERRED_BINDING = {
"single_logout_service": _SRPA,
@@ -169,9 +162,6 @@ class ConfigurationError(SAMLError):
pass
-# -----------------------------------------------------------------
-
-
class Config(object):
def_context = ""
@@ -251,7 +241,7 @@ class Config(object):
else:
return getattr(self, "_%s_%s" % (context, attr), None)
- def load_special(self, cnf, typ, metadata_construction=False):
+ def load_special(self, cnf, typ):
for arg in SPEC[typ]:
try:
_val = cnf[arg]
@@ -265,71 +255,39 @@ class Config(object):
self.setattr(typ, arg, _val)
self.context = typ
- self.load_complex(cnf, typ, metadata_construction=metadata_construction)
self.context = self.def_context
- def load_complex(self, cnf, typ="", metadata_construction=False):
- try:
- self.setattr(typ, "policy", Policy(cnf["policy"]))
- except KeyError:
- pass
-
- # for srv, spec in cnf["service"].items():
- # try:
- # self.setattr(srv, "policy",
- # Policy(cnf["service"][srv]["policy"]))
- # except KeyError:
- # pass
+ def load_complex(self, cnf):
+ acs = ac_factory(cnf.get("attribute_map_dir"))
+ if not acs:
+ raise ConfigurationError("No attribute converters, something is wrong!!")
+ self.setattr("", "attribute_converters", acs)
try:
- try:
- acs = ac_factory(cnf["attribute_map_dir"])
- except KeyError:
- acs = ac_factory()
-
- if not acs:
- raise ConfigurationError(
- "No attribute converters, something is wrong!!")
-
- _acs = self.getattr("attribute_converters", typ)
- if _acs:
- _acs.extend(acs)
- else:
- self.setattr(typ, "attribute_converters", acs)
-
+ self.setattr("", "metadata", self.load_metadata(cnf["metadata"]))
except KeyError:
pass
- if not metadata_construction:
- try:
- self.setattr(typ, "metadata",
- self.load_metadata(cnf["metadata"]))
- except KeyError:
- pass
-
- def unicode_convert(self, item):
- try:
- return six.text_type(item, "utf-8")
- except TypeError:
- _uc = self.unicode_convert
- if isinstance(item, dict):
- return dict([(key, _uc(val)) for key, val in item.items()])
- elif isinstance(item, list):
- return [_uc(v) for v in item]
- elif isinstance(item, tuple):
- return tuple([_uc(v) for v in item])
- else:
- return item
+ for srv, spec in cnf.get("service", {}).items():
+ policy_conf = spec.get("policy")
+ self.setattr(srv, "policy", Policy(policy_conf, self.metadata))
- def load(self, cnf, metadata_construction=False):
+ def load(self, cnf, metadata_construction=None):
""" The base load method, loads the configuration
:param cnf: The configuration as a dictionary
- :param metadata_construction: Is this only to be able to construct
- metadata. If so some things can be left out.
:return: The Configuration instance
"""
- _uc = self.unicode_convert
+
+ if metadata_construction is not None:
+ warn_msg = (
+ "The metadata_construction parameter for saml2.config.Config.load "
+ "is deprecated and ignored; "
+ "instead, initialize the Policy object setting the mds param."
+ )
+ logger.warning(warn_msg)
+ _warn(warn_msg, DeprecationWarning)
+
for arg in COMMON_ARGS:
if arg == "virtual_organization":
if "virtual_organization" in cnf:
@@ -344,7 +302,7 @@ class Config(object):
self.extension_schema[_mod.NAMESPACE] = _mod
try:
- setattr(self, arg, _uc(cnf[arg]))
+ setattr(self, arg, cnf[arg])
except KeyError:
pass
except TypeError: # Something that can't be a string
@@ -364,9 +322,7 @@ class Config(object):
if "service" in cnf:
for typ in ["aa", "idp", "sp", "pdp", "aq"]:
try:
- self.load_special(
- cnf["service"][typ], typ,
- metadata_construction=metadata_construction)
+ self.load_special(cnf["service"][typ], typ)
self.serves.append(typ)
except KeyError:
pass
@@ -374,7 +330,7 @@ class Config(object):
if "extensions" in cnf:
self.do_extensions(cnf["extensions"])
- self.load_complex(cnf, metadata_construction=metadata_construction)
+ self.load_complex(cnf)
self.context = self.def_context
return self
@@ -389,21 +345,28 @@ class Config(object):
return importlib.import_module(tail)
- def load_file(self, config_filename, metadata_construction=False):
+ def load_file(self, config_filename, metadata_construction=None):
+ if metadata_construction is not None:
+ warn_msg = (
+ "The metadata_construction parameter for saml2.config.Config.load_file "
+ "is deprecated and ignored; "
+ "instead, initialize the Policy object setting the mds param."
+ )
+ logger.warning(warn_msg)
+ _warn(warn_msg, DeprecationWarning)
+
if config_filename.endswith(".py"):
config_filename = config_filename[:-3]
mod = self._load(config_filename)
- return self.load(copy.deepcopy(mod.CONFIG), metadata_construction)
+ return self.load(copy.deepcopy(mod.CONFIG))
def load_metadata(self, metadata_conf):
""" Loads metadata into an internal structure """
acs = self.attribute_converters
-
if acs is None:
- raise ConfigurationError(
- "Missing attribute converter specification")
+ raise ConfigurationError("Missing attribute converter specification")
try:
ca_certs = self.ca_certs
diff --git a/src/saml2/entity_category/swamid.py b/src/saml2/entity_category/swamid.py
index 29be0cb5..a997d556 100644
--- a/src/saml2/entity_category/swamid.py
+++ b/src/saml2/entity_category/swamid.py
@@ -79,3 +79,5 @@ RELEASE = {
RESEARCH_AND_SCHOLARSHIP: R_AND_S,
COCO: GEANT_COCO,
}
+
+ONLY_REQUIRED = {COCO: True}
diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py
index c0eb1686..76a981a8 100644
--- a/src/saml2/mdstore.py
+++ b/src/saml2/mdstore.py
@@ -56,6 +56,9 @@ from saml2.extension.mdui import Description
from saml2.extension.mdui import InformationURL
from saml2.extension.mdui import PrivacyStatementURL
from saml2.extension.mdui import Logo
+from saml2.extension.mdrpi import NAMESPACE as NS_MDRPI
+from saml2.extension.mdrpi import RegistrationInfo
+from saml2.extension.mdrpi import RegistrationPolicy
logger = logging.getLogger(__name__)
@@ -79,6 +82,8 @@ classnames = {
"service_artifact_resolution": "{ns}&{tag}".format(ns=NS_MD, tag=ArtifactResolutionService.c_tag),
"service_single_sign_on": "{ns}&{tag}".format(ns=NS_MD, tag=SingleSignOnService.c_tag),
"service_nameid_mapping": "{ns}&{tag}".format(ns=NS_MD, tag=NameIDMappingService.c_tag),
+ "mdrpi_registration_info": "{ns}&{tag}".format(ns=NS_MDRPI, tag=RegistrationInfo.c_tag),
+ "mdrpi_registration_policy": "{ns}&{tag}".format(ns=NS_MDRPI, tag=RegistrationPolicy.c_tag),
}
ENTITY_CATEGORY = "http://macedir.org/entity-category"
@@ -1406,6 +1411,45 @@ class MetadataStore(MetaData):
res['signing_methods'].append(elem['algorithm'])
return res
+ def registration_info(self, entity_id):
+ """
+ Get all registration info for an entry in the metadata.
+
+ Example return data:
+
+ res = {
+ 'registration_authority': 'http://www.example.com',
+ 'registration_instant': '2013-06-15T18:15:03Z',
+ 'registration_policy': {
+ 'en': 'http://www.example.com/policy.html',
+ 'sv': 'http://www.example.com/sv/policy.html',
+ }
+ }
+
+ :param entity_id: Entity id
+ :return: dict with keys and value-lists from metadata
+
+ :type entity_id: string
+ :rtype: dict
+ """
+ res = {
+ 'registration_authority': None,
+ 'registration_instant': None,
+ 'registration_policy': {}
+ }
+ try:
+ ext = self.__getitem__(entity_id)["extensions"]
+ except KeyError:
+ return res
+ for elem in ext["extension_elements"]:
+ if elem["__class__"] == classnames["mdrpi_registration_info"]:
+ res["registration_authority"] = elem["registration_authority"]
+ res["registration_instant"] = elem.get("registration_instant")
+ for policy in elem.get('registration_policy'):
+ if policy["__class__"] == classnames["mdrpi_registration_policy"]:
+ res['registration_policy'][policy["lang"]] = policy["text"]
+ return res
+
def _lookup_elements_by_cls(self, root, cls):
elements = (
element
diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py
index 50b4ff71..e7ab6011 100644
--- a/src/saml2/metadata.py
+++ b/src/saml2/metadata.py
@@ -89,7 +89,7 @@ def create_metadata_string(configfile, config=None, valid=None, cert=None,
if config is None:
if configfile.endswith(".py"):
configfile = configfile[:-3]
- config = Config().load_file(configfile, metadata_construction=True)
+ config = Config().load_file(configfile)
eds.append(entity_descriptor(config))
conf = Config()
diff --git a/src/saml2/server.py b/src/saml2/server.py
index eb6dddea..50250c3a 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -345,11 +345,11 @@ class Server(Entity):
"""
ast = Assertion(identity)
- ast.acs = self.config.getattr("attribute_converters", "idp")
+ ast.acs = self.config.getattr("attribute_converters")
if policy is None:
- policy = Policy()
+ policy = Policy(mds=self.metadata)
try:
- ast.apply_policy(sp_entity_id, policy, self.metadata)
+ ast.apply_policy(sp_entity_id, policy)
except MissingValue as exc:
if not best_effort:
return self.create_error_response(in_response_to, consumer_url,
@@ -537,9 +537,9 @@ class Server(Entity):
_issuer = self._issuer(issuer)
ast = Assertion(identity)
if policy:
- ast.apply_policy(sp_entity_id, policy, self.metadata)
+ ast.apply_policy(sp_entity_id, policy)
else:
- policy = Policy()
+ policy = Policy(mds=self.metadata)
if attributes:
restr = restriction_from_attribute_spec(attributes)
diff --git a/tests/otest_61_makemeta.py b/tests/otest_61_makemeta.py
index 321d8b38..ba6fb0dc 100644
--- a/tests/otest_61_makemeta.py
+++ b/tests/otest_61_makemeta.py
@@ -116,7 +116,7 @@ def test_org_3():
org = metadata.do_organization_info(desc)
assert _eq(org.keyswv(), ['organization_display_name'])
assert len(org.organization_display_name) == 1
-
+
def test_contact_0():
conf = [{
"given_name":"Roland",
@@ -126,7 +126,7 @@ def test_contact_0():
"contact_type": "technical"
}]
contact_person = metadata.do_contact_person_info(conf)
- assert _eq(contact_person[0].keyswv(), ['given_name', 'sur_name',
+ assert _eq(contact_person[0].keyswv(), ['given_name', 'sur_name',
'contact_type', 'telephone_number',
"email_address"])
print(contact_person[0])
@@ -141,27 +141,27 @@ def test_contact_0():
assert len(person.email_address) == 2
assert isinstance(person.email_address[0], md.EmailAddress)
assert person.email_address[0].text == "foo@eample.com"
-
+
def test_do_endpoints():
eps = metadata.do_endpoints(SP["service"]["sp"]["endpoints"],
metadata.ENDPOINTS["sp"])
print(eps)
- assert _eq(eps.keys(), ["assertion_consumer_service",
+ assert _eq(eps.keys(), ["assertion_consumer_service",
"single_logout_service"])
-
+
assert len(eps["single_logout_service"]) == 1
sls = eps["single_logout_service"][0]
assert sls.location == "http://localhost:8087/logout"
assert sls.binding == BINDING_HTTP_POST
-
+
assert len(eps["assertion_consumer_service"]) == 1
acs = eps["assertion_consumer_service"][0]
assert acs.location == "http://localhost:8087/"
assert acs.binding == BINDING_HTTP_POST
-
+
assert "artifact_resolution_service" not in eps
assert "manage_name_id_service" not in eps
-
+
def test_required_attributes():
attrconverters = ac_factory("../tests/attributemaps")
ras = metadata.do_requested_attribute(
@@ -183,19 +183,19 @@ def test_optional_attributes():
assert ras[0].name == 'urn:oid:2.5.4.12'
assert ras[0].name_format == NAME_FORMAT_URI
assert ras[0].is_required == "false"
-
+
def test_do_sp_sso_descriptor():
- conf = SPConfig().load(SP, metadata_construction=True)
+ conf = SPConfig().load(SP)
spsso = metadata.do_spsso_descriptor(conf)
-
+
assert isinstance(spsso, md.SPSSODescriptor)
- assert _eq(spsso.keyswv(), ['authn_requests_signed',
- 'attribute_consuming_service',
- 'single_logout_service',
- 'protocol_support_enumeration',
- 'assertion_consumer_service',
+ assert _eq(spsso.keyswv(), ['authn_requests_signed',
+ 'attribute_consuming_service',
+ 'single_logout_service',
+ 'protocol_support_enumeration',
+ 'assertion_consumer_service',
'want_assertions_signed'])
-
+
assert spsso.authn_requests_signed == "false"
assert spsso.want_assertions_signed == "true"
assert len (spsso.attribute_consuming_service) == 1
@@ -213,8 +213,8 @@ def test_do_sp_sso_descriptor():
def test_do_sp_sso_descriptor_2():
SP["service"]["sp"]["discovery_response"] = "http://example.com/sp/ds"
-
- conf = SPConfig().load(SP, metadata_construction=True)
+
+ conf = SPConfig().load(SP)
spsso = metadata.do_spsso_descriptor(conf)
assert isinstance(spsso, md.SPSSODescriptor)
@@ -251,12 +251,12 @@ def test_entity_description():
assert entd.entity_id == "urn:mace:example.com:saml:roland:sp"
def test_do_idp_sso_descriptor():
- conf = IdPConfig().load(IDP, metadata_construction=True)
+ conf = IdPConfig().load(IDP)
idpsso = metadata.do_idpsso_descriptor(conf)
assert isinstance(idpsso, md.IDPSSODescriptor)
- assert _eq(idpsso.keyswv(), ['protocol_support_enumeration',
- 'single_sign_on_service',
+ assert _eq(idpsso.keyswv(), ['protocol_support_enumeration',
+ 'single_sign_on_service',
'want_authn_requests_signed',
"extensions"])
exts = idpsso.extensions.extension_elements
diff --git a/tests/test_20_assertion.py b/tests/test_20_assertion.py
index f617e516..da7e70fd 100644
--- a/tests/test_20_assertion.py
+++ b/tests/test_20_assertion.py
@@ -1,8 +1,11 @@
# coding=utf-8
+import copy
+
from saml2.argtree import add_path
from saml2.authn_context import pword
from saml2.mdie import to_dict
-from saml2 import md, assertion, create_class_from_xml_string
+from saml2 import md, assertion, create_class_from_xml_string, config
+from saml2.mdstore import MetadataStore
from saml2.saml import Attribute
from saml2.saml import Issuer
from saml2.saml import NAMEID_FORMAT_ENTITY
@@ -33,6 +36,15 @@ from saml2 import xmlenc
from pathutils import full_path
ONTS = [saml, mdui, mdattr, dri, idpdisc, md, xmldsig, xmlenc]
+ATTRCONV = ac_factory(full_path("attributemaps"))
+sec_config = config.Config()
+
+METADATACONF = {
+ "1": [{
+ "class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("swamid-2.0.xml"),)],
+ }],
+}
def _eq(l1, l2):
@@ -206,7 +218,7 @@ def test_ava_filter_1():
"surName": "Jeter",
"mail": "derek@example.com"}
- ava = r.filter(ava, "urn:mace:umu.se:saml:roland:sp", None, None)
+ ava = r.filter(ava, "urn:mace:umu.se:saml:roland:sp")
assert _eq(list(ava.keys()), ["givenName", "surName"])
ava = {"givenName": "Derek",
@@ -235,8 +247,7 @@ def test_ava_filter_2():
ava = {"givenName": "Derek", "sn": "Jeter", "mail": "derek@example.com"}
# mail removed because it doesn't match the regular expression
- _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None, [mail],
- [gn, sn])
+ _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[mail], optional=[gn, sn])
assert _eq(sorted(list(_ava.keys())), ["givenName", 'sn'])
@@ -244,8 +255,7 @@ def test_ava_filter_2():
# it wasn't there to begin with
try:
- policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None,
- [gn, sn, mail])
+ policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[gn, sn, mail])
except MissingValue:
pass
@@ -275,8 +285,7 @@ def test_ava_filter_dont_fail():
# mail removed because it doesn't match the regular expression
# So it should fail if the 'fail_on_ ...' flag wasn't set
- _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', None,
- [mail], [gn, sn])
+ _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[mail], optional=[gn, sn])
assert _ava
@@ -284,8 +293,7 @@ def test_ava_filter_dont_fail():
"surName": "Jeter"}
# it wasn't there to begin with
- _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp',
- None, [gn, sn, mail])
+ _ava = policy.filter(ava, 'urn:mace:umu.se:saml:roland:sp', required=[gn, sn, mail])
assert _ava
@@ -621,7 +629,7 @@ def test_filter_ava_0():
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(sorted(list(ava.keys())), ["givenName", "mail", "surName"])
assert ava["givenName"] == ["Derek"]
@@ -648,7 +656,7 @@ def test_filter_ava_1():
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(sorted(list(ava.keys())), ["givenName", "surName"])
assert ava["givenName"] == ["Derek"]
@@ -673,7 +681,7 @@ def test_filter_ava_2():
"mail": ["derek@nyy.mlb.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(list(ava.keys()), ["mail"])
assert ava["mail"] == ["derek@nyy.mlb.com"]
@@ -697,7 +705,7 @@ def test_filter_ava_3():
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(list(ava.keys()), ["mail"])
assert ava["mail"] == ["dj@example.com"]
@@ -721,7 +729,7 @@ def test_filter_ava_4():
"mail": ["derek@nyy.mlb.com", "dj@example.com"]}
# No restrictions apply
- ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", [], [])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp")
assert _eq(sorted(list(ava.keys())), ['mail', 'givenName', 'surName'])
assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"])
@@ -760,7 +768,7 @@ def test_req_opt():
'uid': 'rohe0002', 'edupersonaffiliation': 'staff'}
sp_entity_id = "urn:mace:example.com:saml:curt:sp"
- fava = policy.filter(ava, sp_entity_id, None, req, opt)
+ fava = policy.filter(ava, sp_entity_id, required=req, optional=opt)
assert fava
@@ -859,37 +867,89 @@ def test_assertion_with_noop_attribute_conv():
assert attr.attribute_value[0].text == "Roland"
-# THis test doesn't work without a MetadataStore instance
-# def test_filter_ava_5():
-# policy = Policy({
-# "default": {
-# "lifetime": {"minutes": 15},
-# #"attribute_restrictions": None # means all I have
-# "entity_categories": ["swamid", "edugain"]
-# }
-# })
-#
-# ava = {"givenName": ["Derek"], "surName": ["Jeter"],
-# "mail": ["derek@nyy.mlb.com", "dj@example.com"]}
-#
-# ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp", None, [], [])
-#
-# # using entity_categories means there *always* are restrictions
-# # in this case the only allowed attribute is eduPersonTargetedID
-# # which isn't available in the ava hence zip is returned.
-# assert ava == {}
+def test_filter_ava_5():
+ mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
+ mds.imp(METADATACONF["1"])
+
+ policy_conf = {
+ "default": {
+ "lifetime": {"minutes": 15},
+ "attribute_restrictions": None, # means all I have
+ "entity_categories": ["swamid", "edugain"]
+ }
+ }
+ policy = Policy(restrictions=policy_conf, mds=mds)
+
+ ava = {
+ "givenName": ["Derek"],
+ "surName": ["Jeter"],
+ "mail": [
+ "derek@nyy.mlb.com",
+ "dj@example.com",
+ ],
+ }
+ ava = policy.filter(ava, "urn:mace:example.com:saml:curt:sp")
+
+ # using entity_categories means there *always* are restrictions
+ # in this case the only allowed attribute is eduPersonTargetedID
+ # which isn't available in the ava hence zip is returned.
+ assert ava == {}
+
+
+def test_filter_ava_registration_authority_1():
+ mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
+ mds.imp(METADATACONF["1"])
+
+ policy_conf = {
+ "default": {
+ "lifetime": {"minutes": 15},
+ "attribute_restrictions": None,
+ },
+ "http://rr.aai.switch.ch/": {
+ "attribute_restrictions": {
+ "givenName": None,
+ "surName": None,
+ }
+ }
+ }
+ policy = Policy(restrictions=policy_conf, mds=mds)
+
+ attributes = {
+ "givenName": ["Derek"],
+ "surName": ["Jeter"],
+ "mail": [
+ "derek@nyy.mlb.com",
+ "dj@example.com",
+ ],
+ }
+
+ # SP registered with http://rr.aai.switch.ch/
+ ava = policy.filter(attributes, "https://aai-idp.unibe.ch/idp/shibboleth")
+ assert _eq(sorted(list(ava.keys())), ["givenName", "surName"])
+ assert ava["givenName"] == ["Derek"]
+ assert ava["surName"] == ["Jeter"]
+
+ # SP not registered with http://rr.aai.switch.ch/
+ ava = policy.filter(attributes, "https://alpha.kib.ki.se/shibboleth")
+ assert _eq(sorted(list(ava.keys())), ["givenName", "mail", "surName"])
+ assert ava["givenName"] == ["Derek"]
+ assert ava["surName"] == ["Jeter"]
+ assert ava["mail"] == ["derek@nyy.mlb.com", "dj@example.com"]
def test_assertion_with_zero_attributes():
ava = {}
ast = Assertion(ava)
- policy = Policy({
+
+ policy_conf = {
"default": {
"lifetime": {"minutes": 240},
"attribute_restrictions": None, # means all I have
"name_form": NAME_FORMAT_URI
},
- })
+ }
+ policy = Policy(policy_conf)
+
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
farg = add_path(
diff --git a/tests/test_30_mdstore.py b/tests/test_30_mdstore.py
index d712383f..fede1010 100644
--- a/tests/test_30_mdstore.py
+++ b/tests/test_30_mdstore.py
@@ -152,6 +152,10 @@ METADATACONF = {
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("uu.xml"),)],
}],
+ "13": [{
+ "class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("swamid-2.0.xml"),)],
+ }],
}
@@ -560,6 +564,17 @@ def test_supported_algorithms():
assert 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha256' in algs['signing_methods']
+def test_registration_info():
+ mds = MetadataStore(ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+ mds.imp(METADATACONF["13"])
+ registration_info = mds.registration_info(entity_id='https://aai-idp.unibe.ch/idp/shibboleth')
+ assert 'http://rr.aai.switch.ch/' == registration_info['registration_authority']
+ assert '2013-06-15T18:15:03Z' == registration_info['registration_instant']
+ assert 'https://www.switch.ch/aai/federation/switchaai/metadata-registration-practice-statement-20110711.txt' == \
+ registration_info['registration_policy']['en']
+
+
def test_extension():
mds = MetadataStore(ATTRCONV, None)
# use ordered dict to force expected entity to be last
diff --git a/tests/test_37_entity_categories.py b/tests/test_37_entity_categories.py
index 839030fd..a24a4feb 100644
--- a/tests/test_37_entity_categories.py
+++ b/tests/test_37_entity_categories.py
@@ -25,37 +25,47 @@ def _eq(l1, l2):
def test_filter_ava():
- policy = Policy({
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["swamid"]
}
- })
+ }
+ policy = Policy(policy_conf, MDS)
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com", "dj@example.com"], "c": ["USA"]}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com", "dj@example.com"],
+ "c": ["USA"]
+ }
- ava = policy.filter(ava, "https://connect.sunet.se/shibboleth", MDS)
+ ava = policy.filter(ava, "https://connect.sunet.se/shibboleth")
assert _eq(list(ava.keys()), ['mail', 'givenName', 'sn', 'c'])
assert _eq(ava["mail"], ["derek@nyy.mlb.com", "dj@example.com"])
def test_filter_ava2():
- policy = Policy({
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["refeds", "edugain"]
}
- })
+ }
+ policy = Policy(policy_conf, MDS)
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz"}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz"
+ }
- ava = policy.filter(ava, "https://connect.sunet.se/shibboleth", MDS)
+ ava = policy.filter(ava, "https://connect.sunet.se/shibboleth")
# Mismatch, policy deals with eduGAIN, metadata says SWAMID
# So only minimum should come out
@@ -63,96 +73,120 @@ def test_filter_ava2():
def test_filter_ava3():
- policy = Policy({
+ mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
+ mds.imp(
+ [
+ {
+ "class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("entity_cat_sfs_hei.xml"),)]
+ }
+ ]
+ )
+
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["swamid"]
}
- })
+ }
+ policy = Policy(policy_conf, mds)
+
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "norEduPersonNIN": "19800101134"
+ }
+
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
+ assert _eq(list(ava.keys()), ['eduPersonTargetedID', "norEduPersonNIN"])
+
+def test_filter_ava4():
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp([{"class": "saml2.mdstore.MetaDataFile",
- "metadata": [(full_path("entity_cat_sfs_hei.xml"),)]}])
-
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "norEduPersonNIN": "19800101134"}
-
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
-
- assert _eq(list(ava.keys()), ['eduPersonTargetedID', "norEduPersonNIN"])
-
+ "metadata": [(full_path("entity_cat_re_nren.xml"),)]}])
-def test_filter_ava4():
- policy = Policy({
+ policy_conf = {
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["swamid"]
}
- })
-
- mds = MetadataStore(ATTRCONV, sec_config,
- disable_ssl_certificate_validation=True)
- mds.imp([{"class": "saml2.mdstore.MetaDataFile",
- "metadata": [(full_path("entity_cat_re_nren.xml"),)]}])
+ }
+ policy = Policy(policy_conf, mds)
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "norEduPersonNIN": "19800101134"}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "norEduPersonNIN": "19800101134"
+ }
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
-
- assert _eq(list(ava.keys()),
- ['eduPersonTargetedID', "givenName", "c", "mail",
- "sn"])
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
+ assert _eq(
+ list(ava.keys()), ['eduPersonTargetedID', "givenName", "c", "mail", "sn"]
+ )
def test_filter_ava5():
+ mds = MetadataStore(ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+ mds.imp([{"class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("entity_cat_re.xml"),)]}])
+
policy = Policy({
"default": {
"lifetime": {"minutes": 15},
# "attribute_restrictions": None # means all I have
"entity_categories": ["swamid"]
}
- })
+ }, mds)
- mds = MetadataStore(ATTRCONV, sec_config,
- disable_ssl_certificate_validation=True)
- mds.imp([{"class": "saml2.mdstore.MetaDataFile",
- "metadata": [(full_path("entity_cat_re.xml"),)]}])
-
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "norEduPersonNIN": "19800101134"}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "norEduPersonNIN": "19800101134"
+ }
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
assert _eq(list(ava.keys()), ['eduPersonTargetedID'])
def test_idp_policy_filter():
with closing(Server("idp_conf_ec")) as idp:
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "norEduPersonNIN": "19800101134"}
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "norEduPersonNIN": "19800101134"
+ }
policy = idp.config.getattr("policy", "idp")
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp",
- idp.metadata)
-
- print(ava)
- assert list(ava.keys()) == [
- "eduPersonTargetedID"] # because no entity category
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
+ # because no entity category
+ assert list(ava.keys()) == ["eduPersonTargetedID"]
def test_entity_category_import_from_path():
+ mds = MetadataStore(ATTRCONV, sec_config, disable_ssl_certificate_validation=True)
+ # The file entity_cat_rs.xml contains the SAML metadata for an SP
+ # tagged with the REFEDs R&S entity category.
+ mds.imp([{"class": "saml2.mdstore.MetaDataFile",
+ "metadata": [(full_path("entity_cat_rs.xml"),)]}])
+
# The entity category module myentitycategory.py is in the tests
# directory which is on the standard module search path.
# The module uses a custom interpretation of the REFEDs R&S entity category
@@ -162,34 +196,35 @@ def test_entity_category_import_from_path():
"lifetime": {"minutes": 15},
"entity_categories": ["myentitycategory"]
}
- })
-
- mds = MetadataStore(ATTRCONV, sec_config,
- disable_ssl_certificate_validation=True)
-
- # The file entity_cat_rs.xml contains the SAML metadata for an SP
- # tagged with the REFEDs R&S entity category.
- mds.imp([{"class": "saml2.mdstore.MetaDataFile",
- "metadata": [(full_path("entity_cat_rs.xml"),)]}])
-
- ava = {"givenName": ["Derek"], "sn": ["Jeter"],
- "displayName": "Derek Jeter",
- "mail": ["derek@nyy.mlb.com"], "c": ["USA"],
- "eduPersonTargetedID": "foo!bar!xyz",
- "eduPersonUniqueId": "R13ET7UD68K0HGR153KE@my.org",
- "eduPersonScopedAffiliation": "member@my.org",
- "eduPersonPrincipalName": "user01@my.org",
- "norEduPersonNIN": "19800101134"}
-
- ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp", mds)
+ }, mds)
+
+ ava = {
+ "givenName": ["Derek"],
+ "sn": ["Jeter"],
+ "displayName": "Derek Jeter",
+ "mail": ["derek@nyy.mlb.com"],
+ "c": ["USA"],
+ "eduPersonTargetedID": "foo!bar!xyz",
+ "eduPersonUniqueId": "R13ET7UD68K0HGR153KE@my.org",
+ "eduPersonScopedAffiliation": "member@my.org",
+ "eduPersonPrincipalName": "user01@my.org",
+ "norEduPersonNIN": "19800101134"
+ }
+
+ ava = policy.filter(ava, "urn:mace:example.com:saml:roland:sp")
# We expect c and norEduPersonNIN to be filtered out since they are not
# part of the custom entity category.
- assert _eq(list(ava.keys()),
- ["eduPersonTargetedID", "eduPersonPrincipalName",
- "eduPersonUniqueId", "displayName", "givenName",
- "eduPersonScopedAffiliation", "mail", "sn"])
-
-
-if __name__ == "__main__":
- test_filter_ava3()
+ assert _eq(
+ list(ava.keys()),
+ [
+ "eduPersonTargetedID",
+ "eduPersonPrincipalName",
+ "eduPersonUniqueId",
+ "displayName",
+ "givenName",
+ "eduPersonScopedAffiliation",
+ "mail",
+ "sn"
+ ]
+ )
diff --git a/tests/test_39_metadata.py b/tests/test_39_metadata.py
index 8ab6dfa5..06de507a 100644
--- a/tests/test_39_metadata.py
+++ b/tests/test_39_metadata.py
@@ -30,7 +30,7 @@ sp_conf = {
def test_requested_attribute_name_format():
- cnf = SPConfig().load(sp_conf, metadata_construction=True)
+ cnf = SPConfig().load(sp_conf)
ed = entity_descriptor(cnf)
assert len(ed.spsso_descriptor.attribute_consuming_service) == 1
@@ -42,7 +42,7 @@ def test_requested_attribute_name_format():
sp2 = copy.copy(sp_conf)
sp2["service"]["sp"]["requested_attribute_name_format"] = NAME_FORMAT_BASIC
- cnf2 = SPConfig().load(sp2, metadata_construction=True)
+ cnf2 = SPConfig().load(sp2)
ed = entity_descriptor(cnf2)
acs = ed.spsso_descriptor.attribute_consuming_service[0]
assert len(acs.requested_attribute) == 4
@@ -56,7 +56,7 @@ def test_signed_metadata_proper_str_bytes_handling():
sp_conf_2['cert_file'] = full_path("inc-md-cert.pem")
# requires xmlsec binaries per https://pysaml2.readthedocs.io/en/latest/examples/sp.html
sp_conf_2['xmlsec_binary'] = sigver.get_xmlsec_binary(["/opt/local/bin"])
- cnf = SPConfig().load(sp_conf_2, metadata_construction=True)
+ cnf = SPConfig().load(sp_conf_2)
# This will raise TypeError if string/bytes handling is not correct
sp_metadata = create_metadata_string('', config=cnf, sign=True)
diff --git a/tests/test_83_md_extensions.py b/tests/test_83_md_extensions.py
index 35098595..df2c9125 100644
--- a/tests/test_83_md_extensions.py
+++ b/tests/test_83_md_extensions.py
@@ -9,7 +9,7 @@ from saml2.metadata import entity_descriptor
class TestMDExt():
def test_sp_type_true(self):
fil = "sp_mdext_conf.py"
- cnf = Config().load_file(fil, metadata_construction=True)
+ cnf = Config().load_file(fil)
ed = entity_descriptor(cnf)
assert ed.spsso_descriptor.extensions
@@ -20,7 +20,7 @@ class TestMDExt():
def test_sp_type_false(self):
fil = "sp_mdext_conf.py"
- cnf = Config().load_file(fil, metadata_construction=True)
+ cnf = Config().load_file(fil)
cnf.setattr('sp', 'sp_type_in_metadata', False)
ed = entity_descriptor(cnf)
@@ -28,7 +28,7 @@ class TestMDExt():
def test_entity_attributes(self):
fil = "sp_mdext_conf.py"
- cnf = Config().load_file(fil, metadata_construction=True)
+ cnf = Config().load_file(fil)
ed = entity_descriptor(cnf)
entity_attributes = next(
diff --git a/tools/make_metadata.py b/tools/make_metadata.py
index f2ae4476..147425fe 100755
--- a/tools/make_metadata.py
+++ b/tools/make_metadata.py
@@ -53,7 +53,7 @@ for filespec in args.config:
sys.path.insert(0, bas)
if fil.endswith(".py"):
fil = fil[:-3]
- cnf = Config().load_file(fil, metadata_construction=True)
+ cnf = Config().load_file(fil)
if valid_for:
cnf.valid_for = valid_for
eds.append(entity_descriptor(cnf))