summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xexample/sp-wsgi/sp.py37
-rw-r--r--src/saml2/__init__.py49
-rw-r--r--src/saml2/assertion.py80
-rw-r--r--src/saml2/entity.py2
-rw-r--r--src/saml2/mdstore.py12
-rw-r--r--src/saml2/metadata.py7
-rw-r--r--src/saml2/s_utils.py3
-rw-r--r--src/saml2/server.py161
-rw-r--r--src/saml2/sigver.py6
-rw-r--r--tests/metadata.xml125
-rw-r--r--tests/test_12_s_utils.py42
-rw-r--r--tests/test_19_attribute_converter.py12
-rw-r--r--tests/test_20_assertion.py45
-rw-r--r--tests/test_51_client.py133
14 files changed, 470 insertions, 244 deletions
diff --git a/example/sp-wsgi/sp.py b/example/sp-wsgi/sp.py
index 77345bca..d1c182e6 100755
--- a/example/sp-wsgi/sp.py
+++ b/example/sp-wsgi/sp.py
@@ -9,6 +9,7 @@ import os
import re
import sys
import xml.dom.minidom
+from saml2.sigver import SignatureError
import six
from six.moves.http_cookies import SimpleCookie
@@ -48,7 +49,7 @@ from saml2.samlp import Extensions
logger = logging.getLogger("")
hdlr = logging.FileHandler('spx.log')
base_formatter = logging.Formatter(
- "%(asctime)s %(name)s:%(levelname)s %(message)s")
+ "%(asctime)s %(name)s:%(levelname)s %(message)s")
hdlr.setFormatter(base_formatter)
logger.addHandler(hdlr)
@@ -329,7 +330,8 @@ class User(object):
@property
def authn_statement(self):
- xml_doc = xml.dom.minidom.parseString(str(self.response.assertion.authn_statement[0]))
+ xml_doc = xml.dom.minidom.parseString(
+ str(self.response.assertion.authn_statement[0]))
return xml_doc.toprettyxml()
@@ -355,7 +357,8 @@ class ACS(Service):
try:
self.response = self.sp.parse_authn_request_response(
- response, binding, self.outstanding_queries, self.cache.outstanding_certs)
+ response, binding, self.outstanding_queries,
+ self.cache.outstanding_certs)
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s", excp)
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
@@ -367,6 +370,9 @@ class ACS(Service):
except VerificationError as err:
resp = ServiceError("Verification error: %s" % (err,))
return resp(self.environ, self.start_response)
+ except SignatureError as err:
+ resp = ServiceError("Signature error: %s" % (err,))
+ return resp(self.environ, self.start_response)
except Exception as err:
resp = ServiceError("Other error: %s" % (err,))
return resp(self.environ, self.start_response)
@@ -384,7 +390,7 @@ class ACS(Service):
def verify_attributes(self, ava):
logger.info("SP: %s", self.sp.config.entityid)
rest = POLICY.get_entity_categories(
- self.sp.config.entityid, self.sp.metadata)
+ self.sp.config.entityid, self.sp.metadata)
akeys = [k.lower() for k in ava.keys()]
@@ -469,7 +475,7 @@ class SSO(object):
_rstate = rndstr()
self.cache.relay_state[_rstate] = geturl(self.environ)
_entityid = _cli.config.ecp_endpoint(
- self.environ["REMOTE_ADDR"])
+ self.environ["REMOTE_ADDR"])
if not _entityid:
return -1, ServiceError("No IdP to talk to")
@@ -521,7 +527,7 @@ class SSO(object):
elif self.discosrv:
if query:
idp_entity_id = _cli.parse_discovery_service_response(
- query=self.environ.get("QUERY_STRING"))
+ query=self.environ.get("QUERY_STRING"))
if not idp_entity_id:
sid_ = sid()
self.cache.outstanding_queries[sid_] = came_from
@@ -531,7 +537,7 @@ class SSO(object):
"sp")["discovery_response"][0][0]
ret += "?sid=%s" % sid_
loc = _cli.create_discovery_service_request(
- self.discosrv, eid, **{"return": ret})
+ self.discosrv, eid, **{"return": ret})
return -1, SeeOther(loc)
elif len(idps) == 1:
# idps is a dictionary
@@ -548,8 +554,8 @@ class SSO(object):
try:
# Picks a binding to use for sending the Request to the IDP
_binding, destination = _cli.pick_binding(
- "single_sign_on_service", self.bindings, "idpsso",
- entity_id=entity_id)
+ "single_sign_on_service", self.bindings, "idpsso",
+ entity_id=entity_id)
logger.debug("binding: %s, destination: %s", _binding,
destination)
# Binding here is the response binding that is which binding the
@@ -568,7 +574,7 @@ class SSO(object):
"key": req_key_str
}
spcertenc = SPCertEnc(x509_data=ds.X509Data(
- x509_certificate=ds.X509Certificate(text=cert_str)))
+ x509_certificate=ds.X509Certificate(text=cert_str)))
extensions = Extensions(extension_elements=[
element_to_extension_element(spcertenc)])
@@ -589,7 +595,7 @@ class SSO(object):
except Exception as exc:
logger.exception(exc)
resp = ServiceError(
- "Failed to construct the AuthnRequest: %s" % exc)
+ "Failed to construct the AuthnRequest: %s" % exc)
return resp
# remember the request
@@ -782,7 +788,8 @@ def metadata(environ, start_response):
if path[-1] != "/":
path += "/"
metadata = create_metadata_string(path + "sp_conf.py", None,
- _args.valid, _args.cert, _args.keyfile,
+ _args.valid, _args.cert,
+ _args.keyfile,
_args.id, _args.name, _args.sign)
start_response('200 OK', [('Content-Type', "text/xml")])
return metadata
@@ -851,10 +858,12 @@ if __name__ == '__main__':
_parser.add_argument("config", help="SAML client config")
_parser.add_argument('-p', dest='path', help='Path to configuration file.')
_parser.add_argument('-v', dest='valid', default="4",
- help="How long, in days, the metadata is valid from the time of creation")
+ help="How long, in days, the metadata is valid from "
+ "the time of creation")
_parser.add_argument('-c', dest='cert', help='certificate')
_parser.add_argument('-i', dest='id',
- help="The ID of the entities descriptor in the metadata")
+ help="The ID of the entities descriptor in the "
+ "metadata")
_parser.add_argument('-k', dest='keyfile',
help="A file with a key to sign the metadata with")
_parser.add_argument('-n', dest='name')
diff --git a/src/saml2/__init__.py b/src/saml2/__init__.py
index 631b7b5f..9f5118b6 100644
--- a/src/saml2/__init__.py
+++ b/src/saml2/__init__.py
@@ -25,6 +25,7 @@ from saml2.validate import valid_instance
try:
from xml.etree import cElementTree as ElementTree
+
if ElementTree.VERSION < '1.3.0':
# cElementTree has no support for register_namespace
# neither _namespace_map, thus we sacrify performance
@@ -40,17 +41,17 @@ root_logger = logging.getLogger(__name__)
root_logger.level = logging.NOTSET
NAMESPACE = 'urn:oasis:names:tc:SAML:2.0:assertion'
-#TEMPLATE = '{urn:oasis:names:tc:SAML:2.0:assertion}%s'
-#XSI_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance'
+# TEMPLATE = '{urn:oasis:names:tc:SAML:2.0:assertion}%s'
+# XSI_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance'
NAMEID_FORMAT_EMAILADDRESS = (
"urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress")
# These are defined in saml2.saml
-#NAME_FORMAT_UNSPECIFIED = (
+# NAME_FORMAT_UNSPECIFIED = (
# "urn:oasis:names:tc:SAML:2.0:attrname-format:unspecified")
-#NAME_FORMAT_URI = "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
-#NAME_FORMAT_BASIC = "urn:oasis:names:tc:SAML:2.0:attrname-format:basic"
+# NAME_FORMAT_URI = "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
+# NAME_FORMAT_BASIC = "urn:oasis:names:tc:SAML:2.0:attrname-format:basic"
DECISION_TYPE_PERMIT = "Permit"
DECISION_TYPE_DENY = "Deny"
@@ -290,7 +291,6 @@ def _extension_element_from_element_tree(element_tree):
class ExtensionContainer(object):
-
c_tag = ""
c_namespace = ""
@@ -300,6 +300,7 @@ class ExtensionContainer(object):
self.text = text
self.extension_elements = extension_elements or []
self.extension_attributes = extension_attributes or {}
+ self.encrypted_assertion = None
# Three methods to create an object from an ElementTree
def harvest_element_tree(self, tree):
@@ -403,7 +404,7 @@ def make_vals(val, klass, klass_inst=None, prop=None, part=False,
"""
cinst = None
- #print("make_vals(%s, %s)" % (val, klass))
+ # print("make_vals(%s, %s)" % (val, klass))
if isinstance(val, dict):
cinst = klass().loadd(val, base64encode=base64encode)
@@ -571,8 +572,8 @@ class SamlBase(ExtensionContainer):
def tag_get_uri(self, elem):
if elem.tag[0] == "{":
- uri, tag = elem.tag[1:].split("}")
- return uri
+ uri, tag = elem.tag[1:].split("}")
+ return uri
return None
def get_ns_map(self, elements, uri_set):
@@ -592,12 +593,17 @@ class SamlBase(ExtensionContainer):
prefix_map["encas%d" % len(prefix_map)] = uri
return prefix_map
- def get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(self, assertion_tag, advice_tag):
- for tmp_encrypted_assertion in self.assertion.advice.encrypted_assertion:
+ def get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
+ self, assertion_tag, advice_tag):
+ for tmp_encrypted_assertion in \
+ self.assertion.advice.encrypted_assertion:
if tmp_encrypted_assertion.encrypted_data is None:
- prefix_map = self.get_prefix_map([tmp_encrypted_assertion._to_element_tree().find(assertion_tag)])
+ prefix_map = self.get_prefix_map([
+ tmp_encrypted_assertion._to_element_tree().find(
+ assertion_tag)])
tree = self._to_element_tree()
- encs = tree.find(assertion_tag).find(advice_tag).findall(tmp_encrypted_assertion._to_element_tree().tag)
+ encs = tree.find(assertion_tag).find(advice_tag).findall(
+ tmp_encrypted_assertion._to_element_tree().tag)
for enc in encs:
assertion = enc.find(assertion_tag)
if assertion is not None:
@@ -605,17 +611,23 @@ class SamlBase(ExtensionContainer):
return ElementTree.tostring(tree, encoding="UTF-8").decode('utf-8')
- def get_xml_string_with_self_contained_assertion_within_encrypted_assertion(self, assertion_tag):
- """ Makes a encrypted assertion only containing self contained namespaces.
+ def get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
+ self, assertion_tag):
+ """ Makes a encrypted assertion only containing self contained
+ namespaces.
:param assertion_tag: Tag for the assertion to be transformed.
:return: A new samlp.Resonse in string representation.
"""
- prefix_map = self.get_prefix_map([self.encrypted_assertion._to_element_tree().find(assertion_tag)])
+ prefix_map = self.get_prefix_map(
+ [self.encrypted_assertion._to_element_tree().find(assertion_tag)])
tree = self._to_element_tree()
- self.set_prefixes(tree.find(self.encrypted_assertion._to_element_tree().tag).find(assertion_tag), prefix_map)
+ self.set_prefixes(
+ tree.find(
+ self.encrypted_assertion._to_element_tree().tag).find(
+ assertion_tag), prefix_map)
return ElementTree.tostring(tree, encoding="UTF-8").decode('utf-8')
@@ -648,6 +660,7 @@ class SamlBase(ExtensionContainer):
new_name = uri_map[uri] + ":" + tag
memo[name] = new_name
return new_name
+
# fix element name
name = fixup(elem.tag)
if name:
@@ -724,7 +737,7 @@ class SamlBase(ExtensionContainer):
childs.append(member)
return childs
- #noinspection PyUnusedLocal
+ # noinspection PyUnusedLocal
def set_text(self, val, base64encode=False):
""" Sets the text property of this instance.
diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py
index c6e24d2f..3a5220ec 100644
--- a/src/saml2/assertion.py
+++ b/src/saml2/assertion.py
@@ -654,6 +654,47 @@ def authn_statement(authn_class=None, authn_auth=None,
return res
+def do_subject_confirmation(policy, sp_entity_id, key_info=None, **treeargs):
+ """
+
+ :param policy: Policy instance
+ :param sp_entity_id: The entityid of the SP
+ :param subject_confirmation_method: How was the subject confirmed
+ :param address: The network address/location from which an attesting entity
+ can present the assertion.
+ :param key_info: Information of the key used to confirm the subject
+ :param in_response_to: The ID of a SAML protocol message in response to
+ which an attesting entity can present the assertion.
+ :param recipient: A URI specifying the entity or location to which an
+ attesting entity can present the assertion.
+ :param not_before: A time instant before which the subject cannot be
+ confirmed. The time value MUST be encoded in UTC.
+ :return:
+ """
+
+ _sc = factory(saml.SubjectConfirmation, **treeargs)
+
+ _scd = _sc.subject_confirmation_data
+ _scd.not_on_or_after = policy.not_on_or_after(sp_entity_id)
+
+ if _sc.method == saml.SCM_HOLDER_OF_KEY:
+ _scd.add_extension_element(key_info)
+
+ return _sc
+
+
+def do_subject(policy, sp_entity_id, name_id, **farg):
+ #
+ specs = farg['subject_confirmation']
+
+ if isinstance(specs, list):
+ res = [do_subject_confirmation(policy, sp_entity_id, **s) for s in specs]
+ else:
+ res = [do_subject_confirmation(policy, sp_entity_id, **specs)]
+
+ return factory(saml.Subject, name_id=name_id, subject_confirmation=res)
+
+
class Assertion(dict):
""" Handles assertions about subjects """
@@ -661,17 +702,16 @@ class Assertion(dict):
dict.__init__(self, dic)
self.acs = []
- def construct(self, sp_entity_id, in_response_to, consumer_url,
- name_id, attrconvs, policy, issuer, authn_class=None,
- authn_auth=None, authn_decl=None, encrypt=None,
- sec_context=None, authn_decl_ref=None, authn_instant="",
- subject_locality="", authn_statem=None, add_subject=True):
+ def construct(self, sp_entity_id, attrconvs, policy, issuer, farg,
+ authn_class=None, authn_auth=None, authn_decl=None,
+ encrypt=None, sec_context=None, authn_decl_ref=None,
+ authn_instant="", subject_locality="", authn_statem=None,
+ name_id=None):
""" Construct the Assertion
:param sp_entity_id: The entityid of the SP
:param in_response_to: An identifier of the message, this message is
a response to
- :param consumer_url: The intended consumer of the assertion
:param name_id: An NameID instance
:param attrconvs: AttributeConverters
:param policy: The policy that should be adhered to when replying
@@ -721,29 +761,11 @@ class Assertion(dict):
else:
_authn_statement = None
- if not add_subject:
- _ass = assertion_factory(
- issuer=issuer,
- conditions=conds,
- subject=None
- )
- else:
- _ass = assertion_factory(
- issuer=issuer,
- conditions=conds,
- subject=factory(
- saml.Subject,
- name_id=name_id,
- subject_confirmation=[factory(
- saml.SubjectConfirmation,
- method=saml.SCM_BEARER,
- subject_confirmation_data=factory(
- saml.SubjectConfirmationData,
- in_response_to=in_response_to,
- recipient=consumer_url,
- not_on_or_after=policy.not_on_or_after(sp_entity_id)))]
- ),
- )
+ subject = do_subject(policy, sp_entity_id, name_id,
+ **farg['subject'])
+
+ _ass = assertion_factory(issuer=issuer, conditions=conds,
+ subject=subject)
if _authn_statement:
_ass.authn_statement = [_authn_statement]
diff --git a/src/saml2/entity.py b/src/saml2/entity.py
index f696dabc..c6b287f5 100644
--- a/src/saml2/entity.py
+++ b/src/saml2/entity.py
@@ -604,7 +604,7 @@ class Entity(HTTPBase):
:param in_response_to: The session identifier of the request
:param consumer_url: The URL which should receive the response
- :param status: The status of the response
+ :param status: An instance of samlp.Status
:param issuer: The issuer of the response
:param sign: Whether the response should be signed or not
:param to_sign: If there are other parts to sign
diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py
index 16297052..018f454a 100644
--- a/src/saml2/mdstore.py
+++ b/src/saml2/mdstore.py
@@ -831,7 +831,7 @@ class MetadataStore(MetaData):
typ = args[0]
if typ == "local":
- key = args[0]
+ key = args[1]
# if library read every file in the library
if os.path.isdir(key):
files = [f for f in os.listdir(key) if isfile(join(key, f))]
@@ -848,7 +848,7 @@ class MetadataStore(MetaData):
self.ii += 1
key = self.ii
kwargs.update(_args)
- _md = InMemoryMetaData(self.attrc, args[0])
+ _md = InMemoryMetaData(self.attrc, args[1])
elif typ == "remote":
key = kwargs["url"]
for _key in ["node_name", "check_validity"]:
@@ -864,11 +864,11 @@ class MetadataStore(MetaData):
kwargs["url"], self.security,
kwargs["cert"], self.http, **_args)
elif typ == "mdfile":
- key = args[0]
- _md = MetaDataMD(self.attrc, args[0], **_args)
+ key = args[1]
+ _md = MetaDataMD(self.attrc, args[1], **_args)
elif typ == "loader":
- key = args[0]
- _md = MetaDataLoader(self.attrc, args[0], **_args)
+ key = args[1]
+ _md = MetaDataLoader(self.attrc, args[1], **_args)
else:
raise SAMLError("Unknown metadata type '%s'" % typ)
_md.load()
diff --git a/src/saml2/metadata.py b/src/saml2/metadata.py
index 8999cb91..50ec0bae 100644
--- a/src/saml2/metadata.py
+++ b/src/saml2/metadata.py
@@ -760,7 +760,8 @@ def entity_descriptor(confd):
return entd
-def entities_descriptor(eds, valid_for, name, ident, sign, secc, sign_alg=None, digest_alg=None):
+def entities_descriptor(eds, valid_for, name, ident, sign, secc, sign_alg=None,
+ digest_alg=None):
entities = md.EntitiesDescriptor(entity_descriptor=eds)
if valid_for:
entities.valid_until = in_a_while(hours=valid_for)
@@ -781,7 +782,9 @@ def entities_descriptor(eds, valid_for, name, ident, sign, secc, sign_alg=None,
raise SAMLError("If you want to do signing you should define " +
"where your public key are")
- entities.signature = pre_signature_part(ident, secc.my_cert, 1, sign_alg=sign_alg, digest_alg=digest_alg)
+ entities.signature = pre_signature_part(ident, secc.my_cert, 1,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg)
entities.id = ident
xmldoc = secc.sign_statement("%s" % entities, class_name(entities))
entities = md.entities_descriptor_from_string(xmldoc)
diff --git a/src/saml2/s_utils.py b/src/saml2/s_utils.py
index 07513604..c2689338 100644
--- a/src/saml2/s_utils.py
+++ b/src/saml2/s_utils.py
@@ -370,6 +370,9 @@ def do_attribute_statement(identity):
def factory(klass, **kwargs):
instance = klass()
for key, val in kwargs.items():
+ if isinstance(val, dict):
+ cls = instance.child_class(key)
+ val = factory(cls, **val)
setattr(instance, key, val)
return instance
diff --git a/src/saml2/server.py b/src/saml2/server.py
index fd3f7c07..cc66ccc2 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -33,7 +33,9 @@ from saml2.request import NameIDMappingRequest
from saml2.request import AuthzDecisionQuery
from saml2.request import AuthnQuery
-from saml2.s_utils import MissingValue, Unknown, rndstr
+from saml2.s_utils import MissingValue
+from saml2.s_utils import rndstr
+from saml2.s_utils import Unknown
from saml2.sigver import pre_signature_part
from saml2.sigver import signed_instance_factory
@@ -286,14 +288,29 @@ class Server(Entity):
return self._parse_request(xml_string, NameIDMappingRequest,
"name_id_mapping_service", binding)
- # ------------------------------------------------------------------------
-
- # ------------------------------------------------------------------------
-
def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url,
- name_id, policy, _issuer,
- authn_statement, identity, best_effort, sign_response,
- add_subject=True):
+ name_id, policy, _issuer, authn_statement, identity,
+ best_effort, sign_response, farg, **kwargs):
+ """
+ Construct and return the Assertion
+
+ :param authn: Authentication information
+ :param sp_entity_id:
+ :param in_response_to: The ID of the request this is an answer to
+ :param consumer_url: The recipient of the assertion
+ :param name_id: The NameID of the subject
+ :param policy: Assertion policies
+ :param _issuer: Issuer of the statement
+ :param authn_statement: An AuthnStatement instance
+ :param identity: Identity information about the Subject
+ :param best_effort: Even if not the SPs demands can be met send a
+ response.
+ :param sign_response: Sign the response, only applicable if
+ ErrorResponse
+ :param kwargs: Extra keyword arguments
+ :return: An Assertion instance
+ """
+
ast = Assertion(identity)
ast.acs = self.config.getattr("attribute_converters", "idp")
if policy is None:
@@ -305,32 +322,37 @@ class Server(Entity):
return self.create_error_response(in_response_to, consumer_url,
exc, sign_response)
+ try:
+ subject_confirmation_specs = kwargs['subject_confirmation']
+ except KeyError:
+ subject_confirmation_data = {
+ 'recipient': consumer_url,
+ 'in_response_to': in_response_to,
+ 'method': saml.SCM_BEARER
+ }
+
if authn: # expected to be a dictionary
# Would like to use dict comprehension but ...
- authn_args = dict([
- (AUTHN_DICT_MAP[k], v) for k, v in
- authn.items()
- if k in AUTHN_DICT_MAP])
-
- assertion = ast.construct(sp_entity_id, in_response_to,
- consumer_url, name_id,
- self.config.attribute_converters,
- policy, issuer=_issuer,
- add_subject=add_subject,
- **authn_args)
+ authn_args = dict(
+ [(AUTHN_DICT_MAP[k], v) for k, v in authn.items() if
+ k in AUTHN_DICT_MAP])
+ authn_args.update(kwargs)
+
+ assertion = ast.construct(
+ sp_entity_id, self.config.attribute_converters, policy,
+ issuer=_issuer, farg=farg['assertion'], name_id=name_id,
+ **authn_args)
+
elif authn_statement: # Got a complete AuthnStatement
- assertion = ast.construct(sp_entity_id, in_response_to,
- consumer_url, name_id,
- self.config.attribute_converters,
- policy, issuer=_issuer,
- authn_statem=authn_statement,
- add_subject=add_subject)
+ assertion = ast.construct(
+ sp_entity_id, self.config.attribute_converters, policy,
+ issuer=_issuer, authn_statem=authn_statement,
+ farg=farg['assertion'], name_id=name_id, **kwargs)
else:
- assertion = ast.construct(sp_entity_id, in_response_to,
- consumer_url, name_id,
- self.config.attribute_converters,
- policy, issuer=_issuer,
- add_subject=add_subject)
+ assertion = ast.construct(
+ sp_entity_id, self.config.attribute_converters, policy,
+ issuer=_issuer, farg=farg['assertion'], name_id=name_id,
+ **kwargs)
return assertion
def _authn_response(self, in_response_to, consumer_url,
@@ -342,7 +364,8 @@ class Server(Entity):
authn_statement=None,
encrypt_assertion_self_contained=False,
encrypted_advice_attributes=False,
- pefim=False, sign_alg=None, digest_alg=None):
+ pefim=False, sign_alg=None, digest_alg=None,
+ assertion_args=None):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
@@ -374,9 +397,13 @@ class Server(Entity):
:param sign_assertion: True if assertions should be signed.
:param pefim: True if a response according to the PEFIM profile
should be created.
+ :param assertion_args: Argument to pass on to the assertion constructor
:return: A response instance
"""
+ if assertion_args is None:
+ assertion_args = {}
+
args = {}
# if identity:
_issuer = self._issuer(issuer)
@@ -390,6 +417,13 @@ class Server(Entity):
# tmp_authn_statement = authn_statement
# authn_statement = None
+ try:
+ ass_in_response_to = assertion_args['in_response_to']
+ except KeyError:
+ ass_in_response_to = in_response_to
+ else:
+ del assertion_args['in_response_to']
+
if pefim:
encrypted_advice_attributes = True
encrypt_assertion_self_contained = True
@@ -398,12 +432,13 @@ class Server(Entity):
policy,
None, None, identity,
best_effort,
- sign_response, False)
+ sign_response, False,
+ **assertion_args)
assertion = self.setup_assertion(authn, sp_entity_id,
- in_response_to, consumer_url,
+ ass_in_response_to, consumer_url,
name_id, policy, _issuer,
authn_statement, [], True,
- sign_response)
+ sign_response, **assertion_args)
assertion.advice = saml.Advice()
# assertion.advice.assertion_id_ref.append(saml.AssertionIDRef())
@@ -411,10 +446,10 @@ class Server(Entity):
assertion.advice.assertion.append(assertion_attributes)
else:
assertion = self.setup_assertion(authn, sp_entity_id,
- in_response_to, consumer_url,
+ ass_in_response_to, consumer_url,
name_id, policy, _issuer,
authn_statement, identity, True,
- sign_response)
+ sign_response, **assertion_args)
to_sign = []
if not encrypt_assertion:
@@ -430,17 +465,16 @@ class Server(Entity):
if (self.support_AssertionIDRequest() or self.support_AuthnQuery()):
self.session_db.store_assertion(assertion, to_sign)
- return self._response(in_response_to, consumer_url, status, issuer,
- sign_response, to_sign, sp_entity_id=sp_entity_id,
- encrypt_assertion=encrypt_assertion,
- encrypt_cert_advice=encrypt_cert_advice,
- encrypt_cert_assertion=encrypt_cert_assertion,
- encrypt_assertion_self_contained=encrypt_assertion_self_contained,
- encrypted_advice_attributes=encrypted_advice_attributes,
- sign_assertion=sign_assertion,
- pefim=pefim, sign_alg=sign_alg,
- digest_alg=digest_alg,
- **args)
+ return self._response(
+ in_response_to, consumer_url, status, issuer, sign_response,
+ to_sign, sp_entity_id=sp_entity_id,
+ encrypt_assertion=encrypt_assertion,
+ encrypt_cert_advice=encrypt_cert_advice,
+ encrypt_cert_assertion=encrypt_cert_assertion,
+ encrypt_assertion_self_contained=encrypt_assertion_self_contained,
+ encrypted_advice_attributes=encrypted_advice_attributes,
+ sign_assertion=sign_assertion,
+ pefim=pefim, sign_alg=sign_alg, digest_alg=digest_alg, **args)
# ------------------------------------------------------------------------
@@ -493,10 +527,19 @@ class Server(Entity):
restr = restriction_from_attribute_spec(attributes)
ast = filter_attribute_value_assertions(ast)
- assertion = ast.construct(sp_entity_id, in_response_to,
- destination, name_id,
- self.config.attribute_converters,
- policy, issuer=_issuer)
+ try:
+ subject_confirmation_specs = kwargs['subject_confirmation_specs']
+ except KeyError:
+ subject_confirmation_specs = {
+ 'recipient': destination,
+ 'in_response_to': in_response_to,
+ 'subject_confirmation_method': saml.SCM_BEARER
+ }
+
+ assertion = ast.construct(
+ sp_entity_id, self.config.attribute_converters, policy,
+ issuer=_issuer, name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
if sign_assertion:
assertion.signature = pre_signature_part(assertion.id,
@@ -561,7 +604,7 @@ class Server(Entity):
for arg, attr, eca, pefim in [
('encrypted_advice_attributes', 'verify_encrypt_cert_advice',
- 'encrypt_cert_advice', kwargs["pefim"]),
+ 'encrypt_cert_advice', kwargs["pefim"]),
('encrypt_assertion', 'verify_encrypt_cert_assertion',
'encrypt_cert_assertion', False)]:
@@ -611,6 +654,12 @@ class Server(Entity):
else:
args['name_id'] = kwargs['name_id']
+ for param in ['status', 'assertion_args']:
+ try:
+ args[param] = kwargs[param]
+ except KeyError:
+ pass
+
return args
def create_authn_response(self, identity, in_response_to, destination,
@@ -633,7 +682,7 @@ class Server(Entity):
:param sp_entity_id: The entity identifier of the Service Provider
:param name_id_policy: How the NameID should be constructed
:param userid: The subject identifier
- :param name_id: The identifier of the subject.
+ :param name_id: The identifier of the subject. A saml.NameID instance.
:param authn: Dictionary with information about the authentication
context
:param issuer: Issuer of the response
@@ -663,7 +712,8 @@ class Server(Entity):
encrypt_cert_advice=encrypt_cert_advice,
encrypt_cert_assertion=encrypt_cert_assertion,
encrypt_assertion=encrypt_assertion,
- encrypt_assertion_self_contained=encrypt_assertion_self_contained,
+ encrypt_assertion_self_contained
+ =encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes,
pefim=pefim, **kwargs)
except IOError as exc:
@@ -675,8 +725,7 @@ class Server(Entity):
try:
_authn = authn
- if (
- sign_assertion or sign_response) and \
+ if (sign_assertion or sign_response) and \
self.sec.cert_handler.generate_cert():
with self.lock:
self.sec.cert_handler.update_cert(True)
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
index a02325a0..30be593e 100644
--- a/src/saml2/sigver.py
+++ b/src/saml2/sigver.py
@@ -154,6 +154,12 @@ def rm_xmltag(statement):
def signed(item):
+ """
+ Is any part of the document signed ?
+
+ :param item: A Samlbase instance
+ :return: True if some part of it is signed
+ """
if SIG in item.c_children.keys() and item.signature:
return True
else:
diff --git a/tests/metadata.xml b/tests/metadata.xml
index 3081faf5..0c82b68a 100644
--- a/tests/metadata.xml
+++ b/tests/metadata.xml
@@ -1,34 +1,93 @@
<?xml version='1.0' encoding='UTF-8'?>
-<ns0:EntitiesDescriptor name="urn:mace:example.com:saml:test" validUntil="2020-12-04T17:31:07Z" xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"><ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:sp"><ns0:SPSSODescriptor AuthnRequestsSigned="False" WantAssertionsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
-BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
-EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
-MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
-YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
-DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
-bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
-FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
-mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
-BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
-o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
-BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
-AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
-BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
-zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
-+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
-</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087/" index="0" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationURL xml:lang="en">http://www.example.com/</ns0:OrganizationURL><ns0:OrganizationName xml:lang="en">Example Co</ns0:OrganizationName><ns0:OrganizationDisplayName xml:lang="en">Example Co</ns0:OrganizationDisplayName></ns0:Organization><ns0:ContactPerson contactType="technical"><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor><ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:idp"><ns0:IDPSSODescriptor WantAuthnRequestsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
-BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
-EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
-MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
-YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
-DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
-bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
-FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
-mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
-BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
-o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
-BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
-AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
-BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
-zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
-+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
-</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8088/sso/" /></ns0:IDPSSODescriptor><ns0:Organization><ns0:OrganizationURL xml:lang="en">http://www.example.com/</ns0:OrganizationURL><ns0:OrganizationName xml:lang="en">Example Co</ns0:OrganizationName><ns0:OrganizationDisplayName xml:lang="en">Example Co</ns0:OrganizationDisplayName></ns0:Organization><ns0:ContactPerson contactType="technical"><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor></ns0:EntitiesDescriptor>
+<ns0:EntitiesDescriptor name="urn:mace:example.com:saml:test"
+ validUntil="2020-12-04T17:31:07Z"
+ xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata">
+ <ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:sp">
+ <ns0:SPSSODescriptor AuthnRequestsSigned="False" WantAssertionsSigned="True"
+ protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
+ <ns0:KeyDescriptor>
+ <ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#">
+ <ns1:X509Data>
+ <ns1:X509Certificate>
+ MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
+ BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
+ EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
+ MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
+ YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
+ DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
+ bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
+ FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
+ mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
+ BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
+ o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
+ BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
+ AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
+ BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
+ zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
+ +vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
+ </ns1:X509Certificate>
+ </ns1:X509Data>
+ </ns1:KeyInfo>
+ </ns0:KeyDescriptor>
+ <ns0:AssertionConsumerService
+ Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
+ Location="http://localhost:8087/" index="0"/>
+ </ns0:SPSSODescriptor>
+ <ns0:Organization>
+ <ns0:OrganizationURL xml:lang="en">http://www.example.com/
+ </ns0:OrganizationURL>
+ <ns0:OrganizationName xml:lang="en">Example Co</ns0:OrganizationName>
+ <ns0:OrganizationDisplayName xml:lang="en">Example Co
+ </ns0:OrganizationDisplayName>
+ </ns0:Organization>
+ <ns0:ContactPerson contactType="technical">
+ <ns0:GivenName>Roland</ns0:GivenName>
+ <ns0:SurName>Hedberg</ns0:SurName>
+ <ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress>
+ </ns0:ContactPerson>
+ </ns0:EntityDescriptor>
+ <ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:idp">
+ <ns0:IDPSSODescriptor WantAuthnRequestsSigned="True"
+ protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
+ <ns0:KeyDescriptor>
+ <ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#">
+ <ns1:X509Data>
+ <ns1:X509Certificate>
+ MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
+ BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
+ EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
+ MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
+ YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
+ DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
+ bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
+ FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
+ mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
+ BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
+ o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
+ BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
+ AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
+ BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
+ zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
+ +vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
+ </ns1:X509Certificate>
+ </ns1:X509Data>
+ </ns1:KeyInfo>
+ </ns0:KeyDescriptor>
+ <ns0:SingleSignOnService
+ Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
+ Location="http://localhost:8088/sso/"/>
+ </ns0:IDPSSODescriptor>
+ <ns0:Organization>
+ <ns0:OrganizationURL xml:lang="en">http://www.example.com/
+ </ns0:OrganizationURL>
+ <ns0:OrganizationName xml:lang="en">Example Co</ns0:OrganizationName>
+ <ns0:OrganizationDisplayName xml:lang="en">Example Co
+ </ns0:OrganizationDisplayName>
+ </ns0:Organization>
+ <ns0:ContactPerson contactType="technical">
+ <ns0:GivenName>Roland</ns0:GivenName>
+ <ns0:SurName>Hedberg</ns0:SurName>
+ <ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress>
+ </ns0:ContactPerson>
+ </ns0:EntityDescriptor>
+</ns0:EntitiesDescriptor>
diff --git a/tests/test_12_s_utils.py b/tests/test_12_s_utils.py
index ca530039..c0293c55 100644
--- a/tests/test_12_s_utils.py
+++ b/tests/test_12_s_utils.py
@@ -8,9 +8,10 @@ import six
from saml2 import s_utils as utils
from saml2 import saml
from saml2 import samlp
+from saml2.argtree import set_arg
from saml2.s_utils import do_attribute_statement
-from saml2.saml import Attribute
+from saml2.saml import Attribute, Subject
from saml2.saml import NAME_FORMAT_URI
from py.test import raises
@@ -20,26 +21,30 @@ from pathutils import full_path
XML_HEADER = '<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n'
SUCCESS_STATUS_NO_HEADER = (
-'<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode '
-'Value="urn:oasis:names:tc:SAML:2.0:status:Success" /></ns0:Status>')
+ '<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0'
+ ':StatusCode '
+ 'Value="urn:oasis:names:tc:SAML:2.0:status:Success" /></ns0:Status>')
SUCCESS_STATUS = '%s%s' % (XML_HEADER, SUCCESS_STATUS_NO_HEADER)
ERROR_STATUS_NO_HEADER = (
-'<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode '
-'Value="urn:oasis:names:tc:SAML:2.0:status:Responder"><ns0:StatusCode '
-'Value="urn:oasis:names:tc:SAML:2.0:status:UnknownPrincipal" '
-'/></ns0:StatusCode><ns0:StatusMessage>Error resolving '
-'principal</ns0:StatusMessage></ns0:Status>')
+ '<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0'
+ ':StatusCode '
+ 'Value="urn:oasis:names:tc:SAML:2.0:status:Responder"><ns0:StatusCode '
+ 'Value="urn:oasis:names:tc:SAML:2.0:status:UnknownPrincipal" '
+ '/></ns0:StatusCode><ns0:StatusMessage>Error resolving '
+ 'principal</ns0:StatusMessage></ns0:Status>')
ERROR_STATUS_NO_HEADER_EMPTY = (
-'<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode '
-'Value="urn:oasis:names:tc:SAML:2.0:status:Responder"><ns0:StatusCode '
-'Value="urn:oasis:names:tc:SAML:2.0:status:UnknownPrincipal" '
-'/></ns0:StatusCode></ns0:Status>')
+ '<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0'
+ ':StatusCode '
+ 'Value="urn:oasis:names:tc:SAML:2.0:status:Responder"><ns0:StatusCode '
+ 'Value="urn:oasis:names:tc:SAML:2.0:status:UnknownPrincipal" '
+ '/></ns0:StatusCode></ns0:Status>')
ERROR_STATUS = '%s%s' % (XML_HEADER, ERROR_STATUS_NO_HEADER)
ERROR_STATUS_EMPTY = '%s%s' % (XML_HEADER, ERROR_STATUS_NO_HEADER_EMPTY)
+
def _eq(l1, l2):
return set(l1) == set(l2)
@@ -213,7 +218,7 @@ def test_conditions():
def test_value_1():
- #FriendlyName="givenName" Name="urn:oid:2.5.4.42"
+ # FriendlyName="givenName" Name="urn:oid:2.5.4.42"
# NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
attribute = utils.factory(saml.Attribute, name="urn:oid:2.5.4.42",
name_format=NAME_FORMAT_URI)
@@ -530,3 +535,14 @@ def test_signature():
arr.append(csum)
assert utils.verify_signature("abcdef", arr)
+
+
+def test_complex_factory():
+ r = set_arg(Subject, 'in_response_to', '123456')
+ subject = utils.factory(Subject, **r[0])
+ assert _eq(subject.keyswv(), ['subject_confirmation'])
+ assert _eq(subject.subject_confirmation.keyswv(),
+ ['subject_confirmation_data'])
+ assert _eq(subject.subject_confirmation.subject_confirmation_data.keyswv(),
+ ['in_response_to'])
+ assert subject.subject_confirmation.subject_confirmation_data.in_response_to == '123456'
diff --git a/tests/test_19_attribute_converter.py b/tests/test_19_attribute_converter.py
index 5763d100..01960f6c 100644
--- a/tests/test_19_attribute_converter.py
+++ b/tests/test_19_attribute_converter.py
@@ -230,11 +230,13 @@ def test_noop_attribute_conversion():
ava = """<?xml version='1.0' encoding='UTF-8'?>
<ns0:Attribute xmlns:ns0="urn:oasis:names:tc:SAML:2.0:assertion"
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-FriendlyName="schacHomeOrganization" Name="urn:oid:1.3.6.1.4.1.25178.1.2.9"
-NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"><ns0
-:AttributeValue xsi:nil="true"
-xsi:type="xs:string">uu.se</ns0:AttributeValue></ns0:Attribute>"""
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ FriendlyName="schacHomeOrganization" Name="urn:oid:1.3.6.1.4.1.25178.1.2.9"
+ NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri">
+ <ns0:AttributeValue xsi:nil="true" xsi:type="xs:string">
+ uu.se
+ </ns0:AttributeValue>
+</ns0:Attribute>"""
def test_schac():
diff --git a/tests/test_20_assertion.py b/tests/test_20_assertion.py
index 779a6efc..1e656ea7 100644
--- a/tests/test_20_assertion.py
+++ b/tests/test_20_assertion.py
@@ -810,10 +810,16 @@ def test_assertion_with_noop_attribute_conv():
})
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
- msg = ast.construct("sp_entity_id", "in_response_to", "consumer_url",
- name_id, [AttributeConverterNOOP(NAME_FORMAT_URI)],
- policy, issuer=issuer, authn_decl=ACD,
- authn_auth="authn_authn")
+ subject_confirmation_specs = {
+ 'recipient': 'consumer_url',
+ 'in_response_to': 'in_response_to',
+ 'subject_confirmation_method': saml.SCM_BEARER
+ }
+ msg = ast.construct(
+ "sp_entity_id", [AttributeConverterNOOP(NAME_FORMAT_URI)], policy,
+ issuer=issuer, authn_decl=ACD, name_id=name_id,
+ authn_auth="authn_authn",
+ subject_confirmation_specs=subject_confirmation_specs)
print(msg)
for attr in msg.attribute_statement[0].attribute:
@@ -858,10 +864,16 @@ def test_assertion_with_zero_attributes():
})
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
- msg = ast.construct("sp_entity_id", "in_response_to", "consumer_url",
- name_id, [AttributeConverterNOOP(NAME_FORMAT_URI)],
- policy, issuer=issuer, authn_decl=ACD,
- authn_auth="authn_authn")
+ subject_confirmation_specs = {
+ 'recipient': 'consumer_url',
+ 'in_response_to': 'in_response_to',
+ 'subject_confirmation_method': saml.SCM_BEARER
+ }
+
+ msg = ast.construct(
+ "sp_entity_id", [AttributeConverterNOOP(NAME_FORMAT_URI)], policy,
+ issuer=issuer, authn_decl=ACD, authn_auth="authn_authn",
+ name_id=name_id, subject_confirmation_specs=subject_confirmation_specs)
print(msg)
assert msg.attribute_statement == []
@@ -879,11 +891,18 @@ def test_assertion_with_authn_instant():
})
name_id = NameID(format=NAMEID_FORMAT_TRANSIENT, text="foobar")
issuer = Issuer(text="entityid", format=NAMEID_FORMAT_ENTITY)
- msg = ast.construct("sp_entity_id", "in_response_to", "consumer_url",
- name_id, [AttributeConverterNOOP(NAME_FORMAT_URI)],
- policy, issuer=issuer, authn_decl=ACD,
- authn_auth="authn_authn",
- authn_instant=1234567890)
+
+ subject_confirmation_specs = {
+ 'recipient': 'consumer_url',
+ 'in_response_to': 'in_response_to',
+ 'subject_confirmation_method': saml.SCM_BEARER
+ }
+
+ msg = ast.construct(
+ "sp_entity_id", [AttributeConverterNOOP(NAME_FORMAT_URI)], policy,
+ issuer=issuer, authn_decl=ACD, authn_auth="authn_authn",
+ authn_instant=1234567890, name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
print(msg)
assert msg.authn_statement[0].authn_instant == "2009-02-13T23:31:30Z"
diff --git a/tests/test_51_client.py b/tests/test_51_client.py
index be250fac..166322b4 100644
--- a/tests/test_51_client.py
+++ b/tests/test_51_client.py
@@ -811,15 +811,22 @@ class TestClient:
format=saml.NAMEID_FORMAT_PERSISTENT)
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
+ subject_confirmation_specs = {
+ 'recipient': "http://lingon.catalogix.se:8087/",
+ 'in_response_to': "_012345",
+ 'subject_confirmation_method': saml.SCM_BEARER
+ }
+
assertion = asser.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ name_id=factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ subject_confirmation_specs=subject_confirmation_specs
+ )
assertion.signature = sigver.pre_signature_part(
assertion.id, _sec.my_cert, 1)
@@ -866,26 +873,34 @@ class TestClient:
format=saml.NAMEID_FORMAT_PERSISTENT)
asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
+
+ subject_confirmation_specs = {
+ 'recipient': "http://lingon.catalogix.se:8087/",
+ 'in_response_to': "_012345",
+ 'subject_confirmation_method': saml.SCM_BEARER
+ }
+ name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT)
+
assertion = asser.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
+ name_id=name_id,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ subject_confirmation_specs=subject_confirmation_specs)
a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"})
a_assertion = a_asser.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
a_assertion.signature = sigver.pre_signature_part(
a_assertion.id, _sec.my_cert, 1)
@@ -944,70 +959,80 @@ class TestClient:
format=saml.NAMEID_FORMAT_PERSISTENT)
asser_1 = Assertion({"givenName": "Derek"})
+
+ subject_confirmation_specs = {
+ 'recipient': "http://lingon.catalogix.se:8087/",
+ 'in_response_to': "_012345",
+ 'subject_confirmation_method': saml.SCM_BEARER
+ }
+ name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT)
+
assertion_1 = asser_1.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
asser_2 = Assertion({"surName": "Jeter"})
+
assertion_2 = asser_2.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
a_asser_1 = Assertion({"uid": "test01"})
a_assertion_1 = a_asser_1.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
+
a_asser_2 = Assertion({"email": "test.testsson@test.se"})
a_assertion_2 = a_asser_2.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
a_asser_3 = Assertion({"street": "street"})
a_assertion_3 = a_asser_3.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
a_asser_4 = Assertion({"title": "title"})
a_assertion_4 = a_asser_4.construct(
- self.client.config.entityid, "_012345",
- "http://lingon.catalogix.se:8087/",
- factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
- policy=self.server.config.getattr("policy", "idp"),
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
issuer=self.server._issuer(),
- attrconvs=self.server.config.attribute_converters,
authn_class=INTERNETPROTOCOLPASSWORD,
- authn_auth="http://www.example.com/login")
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ subject_confirmation_specs=subject_confirmation_specs)
a_assertion_1.signature = sigver.pre_signature_part(
a_assertion_1.id, _sec.my_cert, 1)