summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2020-12-08 15:14:48 +0200
committerGitHub <noreply@github.com>2020-12-08 15:14:48 +0200
commitb4abecf337817ae09ca4d5fce95a58a2ac7307f9 (patch)
tree198ccce209fbc0174ca25638d02924e397053046
parent2f756bad04da7358449a78ae75804f08fb6d25e0 (diff)
parenteeaf5e0ba6a20a41e212233f3350ada10b35d767 (diff)
downloadpysaml2-b4abecf337817ae09ca4d5fce95a58a2ac7307f9.tar.gz
Merge pull request #744 from peppelinux/sign_dig_algs
-rw-r--r--docs/howto/config.rst19
-rw-r--r--src/saml2/client.py175
-rw-r--r--src/saml2/client_base.py395
-rw-r--r--src/saml2/config.py4
-rw-r--r--src/saml2/ecp.py37
-rw-r--r--src/saml2/entity.py604
-rw-r--r--src/saml2/s2repoze/plugins/sp.py8
-rw-r--r--src/saml2/server.py505
-rw-r--r--src/saml2/sigver.py69
-rw-r--r--tests/test_50_server.py15
-rw-r--r--tests/test_51_client.py39
-rw-r--r--tests/test_52_default_sign_alg.py26
12 files changed, 1248 insertions, 648 deletions
diff --git a/docs/howto/config.rst b/docs/howto/config.rst
index 88b0f6fa..0fa2401a 100644
--- a/docs/howto/config.rst
+++ b/docs/howto/config.rst
@@ -247,6 +247,7 @@ The globally unique identifier of the entity.
.. note:: It is recommended that the entityid should point to a real
webpage where the metadata for the entity can be found.
+
key_file
^^^^^^^^
@@ -1013,6 +1014,23 @@ Example::
}
}
+
+signing_algorithm
+"""""""""""""""""
+
+Default algorithm to be used. Example::
+
+ 'signing_algorithm': "http://www.w3.org/2001/04/xmldsig-more#rsa-sha512"
+
+
+digest_algorithm
+"""""""""""""""""
+
+Default algorithm to be used. Example::
+
+ 'digest_algorithm': "http://www.w3.org/2001/04/xmlenc#sha512"
+
+
logout_responses_signed
"""""""""""""""""""""""
@@ -1031,6 +1049,7 @@ Example::
}
}
+
subject_data
""""""""""""
diff --git a/src/saml2/client.py b/src/saml2/client.py
index 9809150e..3aaefcd1 100644
--- a/src/saml2/client.py
+++ b/src/saml2/client.py
@@ -14,8 +14,6 @@ from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_SOAP
-from saml2.xmldsig import DefaultSignature
-
from saml2.ident import decode, code
from saml2.httpbase import HTTPError
from saml2.s_utils import sid
@@ -50,6 +48,7 @@ class Saml2Client(Base):
consent=None, extensions=None,
sign=None,
sigalg=None,
+ digest_alg=None,
response_binding=saml2.BINDING_HTTP_POST,
**kwargs,
):
@@ -81,6 +80,7 @@ class Saml2Client(Base):
extensions=extensions,
sign=sign,
sigalg=sigalg,
+ digest_alg=digest_alg,
response_binding=response_binding,
**kwargs,
)
@@ -107,6 +107,7 @@ class Saml2Client(Base):
sign=None,
response_binding=saml2.BINDING_HTTP_POST,
sigalg=None,
+ digest_alg=None,
**kwargs,
):
""" Makes all necessary preparations for an authentication request
@@ -140,8 +141,8 @@ class Saml2Client(Base):
# XXX ^through self.create_authn_request(...)
# XXX - sign_redirect will add the signature to the query params
# XXX ^through self.apply_binding(...)
- sign_post = (binding == BINDING_HTTP_POST and sign)
- sign_redirect = (binding == BINDING_HTTP_REDIRECT and sign)
+ sign_post = False if binding == BINDING_HTTP_REDIRECT else sign
+ sign_redirect = False if binding == BINDING_HTTP_POST and sign else sign
reqid, request = self.create_authn_request(
destination,
@@ -153,6 +154,7 @@ class Saml2Client(Base):
extensions=extensions,
sign=sign_post,
sign_alg=sigalg,
+ digest_alg=digest_alg,
**kwargs,
)
@@ -172,8 +174,15 @@ class Saml2Client(Base):
else:
raise SignOnError("No supported bindings available for authentication")
- def global_logout(self, name_id, reason="", expire=None, sign=None,
- sign_alg=None, digest_alg=None):
+ def global_logout(
+ self,
+ name_id,
+ reason="",
+ expire=None,
+ sign=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
""" More or less a layer of indirection :-/
Bootstrapping the whole thing by finding all the IdPs that should
be notified.
@@ -198,12 +207,28 @@ class Saml2Client(Base):
# find out which IdPs/AAs I should notify
entity_ids = self.users.issuers_of_info(name_id)
- return self.do_logout(name_id, entity_ids, reason, expire, sign,
- sign_alg=sign_alg, digest_alg=digest_alg)
+ return self.do_logout(
+ name_id,
+ entity_ids,
+ reason,
+ expire,
+ sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
- def do_logout(self, name_id, entity_ids, reason, expire, sign=None,
- expected_binding=None, sign_alg=None, digest_alg=None,
- **kwargs):
+ def do_logout(
+ self,
+ name_id,
+ entity_ids,
+ reason,
+ expire,
+ sign=None,
+ expected_binding=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
"""
:param name_id: Identifier of the Subject (a NameID instance)
@@ -232,6 +257,7 @@ class Saml2Client(Base):
for binding in [BINDING_SOAP, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT]:
if expected_binding and binding != expected_binding:
continue
+
try:
srvs = self.metadata.single_logout_service(
entity_id, binding, "idpsso"
@@ -245,6 +271,7 @@ class Saml2Client(Base):
destination = next(locations(srvs), None)
logger.info("destination to provider: %s", destination)
+
try:
session_info = self.users.get_info_from(
name_id, entity_id, False
@@ -252,37 +279,29 @@ class Saml2Client(Base):
session_indexes = [session_info['session_index']]
except KeyError:
session_indexes = None
+
+ sign_post = False if binding == BINDING_HTTP_REDIRECT else sign
+ sign_redirect = False if binding == BINDING_HTTP_POST and sign else sign
+
req_id, request = self.create_logout_request(
- destination, entity_id, name_id=name_id, reason=reason,
- expire=expire, session_indexes=session_indexes)
-
- sign = sign if sign is not None else self.logout_requests_signed
- def_sig = DefaultSignature()
- sign_alg = def_sig.get_sign_alg() if sign_alg is None else sign_alg
- digest_alg = (
- def_sig.get_digest_alg()
- if digest_alg is None
- else digest_alg
+ destination,
+ entity_id,
+ name_id=name_id,
+ reason=reason,
+ expire=expire,
+ session_indexes=session_indexes,
+ sign=sign_post,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
)
- if sign:
- if binding == BINDING_HTTP_REDIRECT:
- srequest = str(request)
- else:
- srequest = self.sign(
- request, sign_alg=sign_alg, digest_alg=digest_alg
- )
- else:
- srequest = str(request)
-
relay_state = self._relay_state(req_id)
-
http_info = self.apply_binding(
binding,
- srequest,
+ str(request),
destination,
relay_state,
- sign=sign,
+ sign=sign_redirect,
sigalg=sign_alg,
)
@@ -355,11 +374,15 @@ class Saml2Client(Base):
status["entity_ids"].remove(issuer)
if "sign_alg" in status:
sign_alg = status["sign_alg"]
- return self.do_logout(decode(status["name_id"]),
- status["entity_ids"],
- status["reason"], status["not_on_or_after"],
- status["sign"], sign_alg=sign_alg,
- digest_alg=digest_alg)
+ return self.do_logout(
+ decode(status["name_id"]),
+ status["entity_ids"],
+ status["reason"],
+ status["not_on_or_after"],
+ status["sign"],
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
def _use_soap(self, destination, query_type, **kwargs):
_create_func = getattr(self, "create_%s" % query_type)
@@ -471,6 +494,7 @@ class Saml2Client(Base):
binding=BINDING_SOAP,
nsprefix=None,
sign_alg=None,
+ digest_alg=None,
):
""" Does a attribute request to an attribute authority, this is
by default done over SOAP.
@@ -500,10 +524,9 @@ class Saml2Client(Base):
response_args = {}
if not binding:
- binding, destination = self.pick_binding("attribute_service",
- None,
- "attribute_authority",
- entity_id=entityid)
+ binding, destination = self.pick_binding(
+ "attribute_service", None, "attribute_authority", entity_id=entityid
+ )
else:
srvs = self.metadata.attribute_service(entityid, binding)
if srvs is []:
@@ -512,37 +535,62 @@ class Saml2Client(Base):
destination = next(locations(srvs), None)
if binding == BINDING_SOAP:
- return self._use_soap(destination, "attribute_query",
- consent=consent, extensions=extensions,
- sign=sign, subject_id=subject_id,
- attribute=attribute,
- sp_name_qualifier=sp_name_qualifier,
- name_qualifier=name_qualifier,
- format=nameid_format,
- response_args=response_args)
+ return self._use_soap(
+ destination,
+ "attribute_query",
+ consent=consent,
+ extensions=extensions,
+ sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ subject_id=subject_id,
+ attribute=attribute,
+ sp_name_qualifier=sp_name_qualifier,
+ name_qualifier=name_qualifier,
+ format=nameid_format,
+ response_args=response_args,
+ )
elif binding == BINDING_HTTP_POST:
mid = sid()
- query = self.create_attribute_query(destination, subject_id,
- attribute, mid, consent,
- extensions, sign, nsprefix)
- self.state[query.id] = {"entity_id": entityid,
- "operation": "AttributeQuery",
- "subject_id": subject_id,
- "sign": sign}
+ query = self.create_attribute_query(
+ destination,
+ name_id=subject_id,
+ attribute=attribute,
+ message_id=mid,
+ consent=consent,
+ extensions=extensions,
+ sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ nsprefix=nsprefix,
+ )
+ self.state[query.id] = {
+ "entity_id": entityid,
+ "operation": "AttributeQuery",
+ "subject_id": subject_id,
+ "sign": sign,
+ }
relay_state = self._relay_state(query.id)
return self.apply_binding(
binding,
str(query),
destination,
relay_state,
- sign=sign,
+ sign=False,
sigalg=sign_alg,
)
else:
raise SAMLError("Unsupported binding")
def handle_logout_request(
- self, request, name_id, binding, sign=None, sign_alg=None, relay_state=""
+ self,
+ request,
+ name_id,
+ binding,
+ sign=None,
+ sign_alg=None,
+ digest_alg=None,
+ relay_state="",
):
"""
Deal with a LogoutRequest
@@ -592,7 +640,12 @@ class Saml2Client(Base):
sign = self.logout_responses_signed
response = self.create_logout_response(
- _req.message, response_bindings, status, sign, sign_alg=sign_alg
+ _req.message,
+ bindings=response_bindings,
+ status=status,
+ sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
)
rinfo = self.response_args(_req.message, response_bindings)
diff --git a/src/saml2/client_base.py b/src/saml2/client_base.py
index 889c4359..90a4fbd1 100644
--- a/src/saml2/client_base.py
+++ b/src/saml2/client_base.py
@@ -56,7 +56,7 @@ from saml2 import BINDING_PAOS
from saml2.xmldsig import SIG_ALLOWED_ALG
from saml2.xmldsig import DIGEST_ALLOWED_ALG
-from saml2.xmldsig import DefaultSignature
+
logger = logging.getLogger(__name__)
@@ -171,19 +171,17 @@ class Base(Entity):
"authn_requests_signed": False,
"want_assertions_signed": False,
"want_response_signed": True,
- "want_assertions_or_response_signed" : False
+ "want_assertions_or_response_signed": False,
}
-
for attr, val_default in attribute_defaults.items():
val_config = self.config.getattr(attr, "sp")
- if val_config is None:
- val = val_default
- else:
- val = val_config
-
+ val = (
+ val_config
+ if val_config is not None
+ else val_default
+ )
if val == 'true':
val = True
-
setattr(self, attr, val)
if self.entity_type == "sp" and not any(
@@ -236,6 +234,7 @@ class Base(Entity):
try:
srvs = self.metadata.single_sign_on_service(list(eids.keys())[0], binding)
return next(locations(srvs), None)
+
except IndexError:
raise IdpUnspecified("No IdP to send to given the premises")
@@ -284,6 +283,7 @@ class Base(Entity):
else:
return None
+ # XXX DONE sp create > _message
def create_authn_request(
self,
destination,
@@ -296,7 +296,7 @@ class Base(Entity):
consent=None,
extensions=None,
sign=None,
- sign_prepare=False,
+ sign_prepare=None,
sign_alg=None,
digest_alg=None,
allow_create=None,
@@ -448,66 +448,39 @@ class Base(Entity):
client_crt = kwargs.get("client_crt")
nsprefix = kwargs.get("nsprefix")
- # XXX will be used to embed the signature to the xml doc - ie, POST binding
- # XXX always called by the SP, no need to check the context
- sign = self.authn_requests_signed if sign is None else sign
- def_sig = DefaultSignature()
- sign_alg = sign_alg or def_sig.get_sign_alg()
- digest_alg = digest_alg or def_sig.get_digest_alg()
-
- if sign_alg not in [long_name for short_name, long_name in SIG_ALLOWED_ALG]:
- raise Exception(
- "Signature algo not in allowed list: {algo}".format(algo=sign_alg)
- )
- if digest_alg not in [long_name for short_name, long_name in DIGEST_ALLOWED_ALG]:
- raise Exception(
- "Digest algo not in allowed list: {algo}".format(algo=digest_alg)
- )
-
- if (sign and self.sec.cert_handler.generate_cert()) or client_crt is not None:
- with self.lock:
- self.sec.cert_handler.update_cert(True, client_crt)
- if client_crt is not None:
- sign_prepare = True
- msg = self._message(
- AuthnRequest,
- destination,
- message_id,
- consent,
- extensions,
- sign,
- sign_prepare,
- sign_alg=sign_alg,
- digest_alg=digest_alg,
- protocol_binding=binding,
- scoping=scoping,
- nsprefix=nsprefix,
- **args,
- )
- else:
- msg = self._message(
- AuthnRequest,
- destination,
- message_id,
- consent,
- extensions,
- sign,
- sign_prepare,
- sign_alg=sign_alg,
- digest_alg=digest_alg,
- protocol_binding=binding,
- scoping=scoping,
- nsprefix=nsprefix,
- **args,
- )
+ msg = self._message(
+ AuthnRequest,
+ destination,
+ message_id,
+ consent,
+ extensions,
+ sign,
+ sign_prepare,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ protocol_binding=binding,
+ scoping=scoping,
+ nsprefix=nsprefix,
+ **args,
+ )
return msg
- def create_attribute_query(self, destination, name_id=None,
- attribute=None, message_id=0, consent=None,
- extensions=None, sign=False, sign_prepare=False, sign_alg=None,
- digest_alg=None,
- **kwargs):
+ # XXX DONE sp create > _message
+ def create_attribute_query(
+ self,
+ destination,
+ name_id=None,
+ attribute=None,
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=None,
+ sign_prepare=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
""" Constructs an AttributeQuery
:param destination: To whom the query should be sent
@@ -561,18 +534,40 @@ class Base(Entity):
except KeyError:
nsprefix = None
- return self._message(AttributeQuery, destination, message_id, consent,
- extensions, sign, sign_prepare, subject=subject,
- attribute=attribute, nsprefix=nsprefix,
- sign_alg=sign_alg, digest_alg=digest_alg)
+ return self._message(
+ AttributeQuery,
+ destination,
+ message_id,
+ consent,
+ extensions,
+ sign,
+ sign_prepare,
+ subject=subject,
+ attribute=attribute,
+ nsprefix=nsprefix,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
# MUST use SOAP for
# AssertionIDRequest, SubjectQuery,
# AuthnQuery, AttributeQuery, or AuthzDecisionQuery
- def create_authz_decision_query(self, destination, action,
- evidence=None, resource=None, subject=None,
- message_id=0, consent=None, extensions=None,
- sign=None, sign_alg=None, digest_alg=None, **kwargs):
+ # XXX DONE sp create > _message
+ def create_authz_decision_query(
+ self,
+ destination,
+ action,
+ evidence=None,
+ resource=None,
+ subject=None,
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
""" Creates an authz decision query.
:param destination: The IdP endpoint
@@ -587,19 +582,38 @@ class Base(Entity):
:return: AuthzDecisionQuery instance
"""
- return self._message(AuthzDecisionQuery, destination, message_id,
- consent, extensions, sign, action=action,
- evidence=evidence, resource=resource,
- subject=subject, sign_alg=sign_alg,
- digest_alg=digest_alg, **kwargs)
-
- def create_authz_decision_query_using_assertion(self, destination,
- assertion, action=None,
- resource=None,
- subject=None, message_id=0,
- consent=None,
- extensions=None,
- sign=False, nsprefix=None):
+ return self._message(
+ AuthzDecisionQuery,
+ destination,
+ message_id,
+ consent,
+ extensions,
+ sign,
+ action=action,
+ evidence=evidence,
+ resource=resource,
+ subject=subject,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **kwargs,
+ )
+
+ # XXX DONE sp create > self.create_authz_decision_query (FIXME pass sign/sign_alg/etc) > _message
+ def create_authz_decision_query_using_assertion(
+ self,
+ destination,
+ assertion,
+ action=None,
+ resource=None,
+ subject=None,
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=None,
+ sign_ag=None,
+ digest_alg=None,
+ nsprefix=None,
+ ):
""" Makes an authz decision query based on a previously received
Assertion.
@@ -624,9 +638,19 @@ class Base(Entity):
_action = None
return self.create_authz_decision_query(
- destination, _action, saml.Evidence(assertion=assertion),
- resource, subject, message_id=message_id, consent=consent,
- extensions=extensions, sign=sign, nsprefix=nsprefix)
+ destination,
+ _action,
+ saml.Evidence(assertion=assertion),
+ resource,
+ subject,
+ message_id=message_id,
+ consent=consent,
+ extensions=extensions,
+ sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ nsprefix=nsprefix,
+ )
@staticmethod
def create_assertion_id_request(assertion_id_refs, **kwargs):
@@ -641,10 +665,21 @@ class Base(Entity):
else:
return 0, assertion_id_refs[0]
- def create_authn_query(self, subject, destination=None, authn_context=None,
- session_index="", message_id=0, consent=None,
- extensions=None, sign=False, nsprefix=None, sign_alg=None,
- digest_alg=None):
+ # XXX DONE sp create > _message
+ def create_authn_query(
+ self,
+ subject,
+ destination=None,
+ authn_context=None,
+ session_index="",
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=None,
+ nsprefix=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
"""
:param subject: The subject its all about as a <Subject> instance
@@ -657,19 +692,37 @@ class Base(Entity):
:param sign: Whether the request should be signed or not.
:return:
"""
- return self._message(AuthnQuery, destination, message_id, consent,
- extensions, sign, subject=subject,
- session_index=session_index,
- requested_authn_context=authn_context,
- nsprefix=nsprefix, sign_alg=sign_alg,
- digest_alg=digest_alg)
-
- def create_name_id_mapping_request(self, name_id_policy,
- name_id=None, base_id=None,
- encrypted_id=None, destination=None,
- message_id=0, consent=None,
- extensions=None, sign=False,
- nsprefix=None, sign_alg=None, digest_alg=None):
+ return self._message(
+ AuthnQuery,
+ destination,
+ message_id,
+ consent,
+ extensions,
+ sign,
+ subject=subject,
+ session_index=session_index,
+ requested_authn_context=authn_context,
+ nsprefix=nsprefix,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+
+ # XXX DONE sp create > _message
+ def create_name_id_mapping_request(
+ self,
+ name_id_policy,
+ name_id=None,
+ base_id=None,
+ encrypted_id=None,
+ destination=None,
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=None,
+ nsprefix=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
"""
:param name_id_policy:
@@ -689,29 +742,39 @@ class Base(Entity):
"At least one of name_id, base_id or encrypted_id must be present."
)
- if name_id:
- return self._message(NameIDMappingRequest, destination, message_id,
- consent, extensions, sign,
- name_id_policy=name_id_policy, name_id=name_id,
- nsprefix=nsprefix, sign_alg=sign_alg,
- digest_alg=digest_alg)
- elif base_id:
- return self._message(NameIDMappingRequest, destination, message_id,
- consent, extensions, sign,
- name_id_policy=name_id_policy, base_id=base_id,
- nsprefix=nsprefix, sign_alg=sign_alg,
- digest_alg=digest_alg)
- else:
- return self._message(NameIDMappingRequest, destination, message_id,
- consent, extensions, sign,
- name_id_policy=name_id_policy,
- encrypted_id=encrypted_id, nsprefix=nsprefix,
- sign_alg=sign_alg, digest_alg=digest_alg)
+ id_attr = {
+ "name_id": name_id,
+ "base_id": (
+ base_id
+ if not name_id
+ else None
+ ),
+ "encrypted_id": (
+ encrypted_id
+ if not name_id and not base_id
+ else None
+ ),
+ }
+
+ return self._message(
+ NameIDMappingRequest,
+ destination,
+ message_id,
+ consent,
+ extensions,
+ sign,
+ name_id_policy=name_id_policy,
+ **id_attr,
+ nsprefix=nsprefix,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
# ======== response handling ===========
- def parse_authn_request_response(self, xmlstr, binding, outstanding=None,
- outstanding_certs=None, conv_info=None):
+ def parse_authn_request_response(
+ self, xmlstr, binding, outstanding=None, outstanding_certs=None, conv_info=None
+ ):
""" Deal with an AuthnResponse
:param xmlstr: The reply as a xml string
@@ -740,15 +803,14 @@ class Base(Entity):
"return_addrs": self.service_urls(binding=binding),
"entity_id": self.config.entityid,
"attribute_converters": self.config.attribute_converters,
- "allow_unknown_attributes":
- self.config.allow_unknown_attributes,
- 'conv_info': conv_info
+ "allow_unknown_attributes": self.config.allow_unknown_attributes,
+ 'conv_info': conv_info,
}
try:
- resp = self._parse_response(xmlstr, AuthnResponse,
- "assertion_consumer_service",
- binding, **kwargs)
+ resp = self._parse_response(
+ xmlstr, AuthnResponse, "assertion_consumer_service", binding, **kwargs
+ )
except StatusError as err:
logger.error("SAML status error: %s", err)
raise
@@ -759,12 +821,14 @@ class Base(Entity):
raise
if not isinstance(resp, AuthnResponse):
- logger.error("Response type not supported: %s",
- saml2.class_name(resp))
+ logger.error("Response type not supported: %s", saml2.class_name(resp))
return None
- if (resp.assertion and len(resp.response.encrypted_assertion) == 0 and
- resp.assertion.subject.name_id):
+ if (
+ resp.assertion
+ and len(resp.response.encrypted_assertion) == 0
+ and resp.assertion.subject.name_id
+ ):
self.users.add_information_about_person(resp.session_info())
logger.info("--- ADDED person info ----")
@@ -774,15 +838,15 @@ class Base(Entity):
# SubjectQuery, AuthnQuery, RequestedAuthnContext, AttributeQuery,
# AuthzDecisionQuery all get Response as response
- def parse_authz_decision_query_response(self, response,
- binding=BINDING_SOAP):
+ def parse_authz_decision_query_response(self, response, binding=BINDING_SOAP):
""" Verify that the response is OK
"""
- kwargs = {"entity_id": self.config.entityid,
- "attribute_converters": self.config.attribute_converters}
+ kwargs = {
+ "entity_id": self.config.entityid,
+ "attribute_converters": self.config.attribute_converters,
+ }
- return self._parse_response(response, AuthzResponse, "", binding,
- **kwargs)
+ return self._parse_response(response, AuthzResponse, "", binding, **kwargs)
def parse_authn_query_response(self, response, binding=BINDING_SOAP):
""" Verify that the response is OK
@@ -825,8 +889,16 @@ class Base(Entity):
# ------------------- ECP ------------------------------------------------
- def create_ecp_authn_request(self, entityid=None, relay_state="",
- sign=False, **kwargs):
+ # XXX DONE sp create > create_authn_request (FIXME DONE sign/sign_alg/etc) > _message
+ def create_ecp_authn_request(
+ self,
+ entityid=None,
+ relay_state="",
+ sign=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
""" Makes an authentication request.
:param entityid: The entity ID of the IdP to send the request to
@@ -843,16 +915,22 @@ class Base(Entity):
# must_understand and act according to the standard
#
- paos_request = paos.Request(must_understand="1", actor=ACTOR,
- response_consumer_url=my_url,
- service=ECP_SERVICE)
+ paos_request = paos.Request(
+ must_understand="1",
+ actor=ACTOR,
+ response_consumer_url=my_url,
+ service=ECP_SERVICE,
+ )
# ----------------------------------------
# <ecp:RelayState>
# ----------------------------------------
- relay_state = ecp.RelayState(actor=ACTOR, must_understand="1",
- text=relay_state)
+ relay_state = ecp.RelayState(
+ actor=ACTOR,
+ must_understand="1",
+ text=relay_state,
+ )
# ----------------------------------------
# <samlp:AuthnRequest>
@@ -879,18 +957,23 @@ class Base(Entity):
"single_sign_on_service", [_binding], entity_id=entityid
)
req_id, authn_req = self.create_authn_request(
- location, service_url_binding=BINDING_PAOS, **kwargs
+ location,
+ service_url_binding=BINDING_PAOS,
+ sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **kwargs,
)
# ----------------------------------------
# The SOAP envelope
# ----------------------------------------
- soap_envelope = make_soap_enveloped_saml_thingy(authn_req,
- [paos_request,
- relay_state])
+ soap_envelope = make_soap_enveloped_saml_thingy(
+ authn_req, [paos_request, relay_state]
+ )
- return req_id, "%s" % soap_envelope
+ return req_id, str(soap_envelope)
def parse_ecp_authn_response(self, txt, outstanding=None):
rdict = soap.class_instances_from_soap_enveloped_saml_thingies(txt,
diff --git a/src/saml2/config.py b/src/saml2/config.py
index f28d7748..8b865dcb 100644
--- a/src/saml2/config.py
+++ b/src/saml2/config.py
@@ -76,6 +76,8 @@ COMMON_ARGS = [
"metadata",
"ui_info",
"name_id_format",
+ "signing_algorithm",
+ "digest_algorithm",
]
SP_ARGS = [
@@ -225,6 +227,8 @@ class Config(object):
self.attribute_profile = []
self.requested_attribute_name_format = NAME_FORMAT_URI
self.delete_tmpfiles = True
+ self.signing_algorithm = None
+ self.digest_algorithm = None
def setattr(self, context, attr, val):
if context == "":
diff --git a/src/saml2/ecp.py b/src/saml2/ecp.py
index 56448e9c..8db0afad 100644
--- a/src/saml2/ecp.py
+++ b/src/saml2/ecp.py
@@ -40,7 +40,9 @@ def ecp_capable(headers):
#noinspection PyUnusedLocal
-def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
+def ecp_auth_request(
+ cls, entityid=None, relay_state="", sign=None, sign_alg=None, digest_alg=None
+):
""" Makes an authentication request.
:param entityid: The entity ID of the IdP to send the request to
@@ -59,9 +61,12 @@ def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
# must_understand and actor according to the standard
#
- paos_request = paos.Request(must_understand="1", actor=ACTOR,
- response_consumer_url=my_url,
- service=SERVICE)
+ paos_request = paos.Request(
+ must_understand="1",
+ actor=ACTOR,
+ response_consumer_url=my_url,
+ service=SERVICE,
+ )
eelist.append(element_to_extension_element(paos_request))
@@ -73,7 +78,13 @@ def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
location = cls._sso_location(entityid, binding=BINDING_SOAP)
req_id, authn_req = cls.create_authn_request(
- location, binding=BINDING_PAOS, service_url_binding=BINDING_PAOS)
+ location,
+ binding=BINDING_PAOS,
+ service_url_binding=BINDING_PAOS,
+ sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
body = soapenv.Body()
body.extension_elements = [element_to_extension_element(authn_req)]
@@ -96,7 +107,8 @@ def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
must_understand="1",
provider_name=None,
issuer=saml.Issuer(text=authn_req.issuer.text),
- idp_list=idp_list)
+ idp_list=idp_list,
+ )
eelist.append(element_to_extension_element(ecp_request))
@@ -104,8 +116,7 @@ def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
# <ecp:RelayState>
# ----------------------------------------
- relay_state = ecp.RelayState(actor=ACTOR, must_understand="1",
- text=relay_state)
+ relay_state = ecp.RelayState(actor=ACTOR, must_understand="1", text=relay_state)
eelist.append(element_to_extension_element(relay_state))
@@ -118,20 +129,22 @@ def ecp_auth_request(cls, entityid=None, relay_state="", sign=False):
soap_envelope = soapenv.Envelope(header=header, body=body)
- return req_id, "%s" % soap_envelope
+ return req_id, str(soap_envelope)
def handle_ecp_authn_response(cls, soap_message, outstanding=None):
rdict = soap.class_instances_from_soap_enveloped_saml_thingies(
- soap_message, [paos, ecp, samlp])
+ soap_message, [paos, ecp, samlp]
+ )
_relay_state = None
for item in rdict["header"]:
if item.c_tag == "RelayState" and item.c_namespace == ecp.NAMESPACE:
_relay_state = item
- response = authn_response(cls.config, cls.service_urls(), outstanding,
- allow_unsolicited=True)
+ response = authn_response(
+ cls.config, cls.service_urls(), outstanding, allow_unsolicited=True
+ )
response.loads("%s" % rdict["body"], False, soap_message)
response.verify()
diff --git a/src/saml2/entity.py b/src/saml2/entity.py
index fdea5a74..71e9ecfb 100644
--- a/src/saml2/entity.py
+++ b/src/saml2/entity.py
@@ -62,7 +62,6 @@ from saml2 import class_name
from saml2.config import config_factory
from saml2.httpbase import HTTPBase
from saml2.sigver import security_context
-from saml2.sigver import response_factory
from saml2.sigver import SigverError
from saml2.sigver import SignatureError
from saml2.sigver import make_temp
@@ -75,6 +74,8 @@ from saml2.pack import http_redirect_message
from saml2.pack import http_form_post_message
from saml2.xmldsig import DefaultSignature
+from saml2.xmldsig import SIG_ALLOWED_ALG
+from saml2.xmldsig import DIGEST_ALLOWED_ALG
logger = logging.getLogger(__name__)
@@ -141,6 +142,23 @@ class Entity(HTTPBase):
else:
raise SAMLError("Missing configuration")
+ def_sig = DefaultSignature()
+ self.signing_algorithm = (
+ self.config.getattr('signing_algorithm')
+ or def_sig.get_sign_alg()
+ )
+ self.digest_algorithm = (
+ self.config.getattr('digest_algorithm')
+ or def_sig.get_digest_alg()
+ )
+
+ sign_config_per_entity_type = {
+ 'sp': self.config.getattr("authn_requests_signed", "sp"),
+ 'idp': self.config.getattr("sign_response", "idp"),
+ }
+ sign_config = sign_config_per_entity_type.get(self.entity_type, False)
+ self.should_sign = sign_config
+
for item in ["cert_file", "key_file", "ca_certs"]:
_val = getattr(self.config, item, None)
if not _val:
@@ -195,6 +213,10 @@ class Entity(HTTPBase):
return Issuer(text=self.config.entityid,
format=NAMEID_FORMAT_ENTITY)
+ # XXX DONE will actually use sign_alg and digest_alg for the Redirect-Binding
+ # XXX DONE deepest level - needs to decide the sign_alg (no digest_alg here)
+ # XXX verify digest_alg is not needed
+ # XXX deprecate sigalg for sign_alg
def apply_binding(
self,
binding,
@@ -219,20 +241,13 @@ class Entity(HTTPBase):
:return: A dictionary
"""
- # XXX authn_requests_signed (obj property) applies only to an SP
- # XXX sign_response (config option) applies to idp/aa
- # XXX Looking into sp/idp/aa properties should be done in the same way
- # XXX ^this discrepancy should be fixed
- sign_config = (
- self.authn_requests_signed
- if self.config.context == "sp"
- else self.config.getattr("sign_response")
- if self.config.context == "idp"
- else None
- )
- sign = sign_config if sign is None else sign
- def_sig = DefaultSignature()
- sigalg = sigalg or def_sig.get_sign_alg()
+ # XXX sig-allowed should be configurable
+ sign = sign if sign is not None else self.should_sign
+ sign_alg = sigalg or self.signing_algorithm
+ if sign_alg not in [long_name for short_name, long_name in SIG_ALLOWED_ALG]:
+ raise Exception(
+ "Signature algo not in allowed list: {algo}".format(algo=sign_alg)
+ )
# unless if BINDING_HTTP_ARTIFACT
if response:
@@ -254,14 +269,14 @@ class Entity(HTTPBase):
relay_state=relay_state,
typ=typ,
sign=sign,
- sigalg=sigalg,
+ sigalg=sign_alg,
backend=self.sec.sec_backend,
)
info["url"] = str(destination)
info["method"] = "GET"
elif binding == BINDING_SOAP or binding == BINDING_PAOS:
info = self.use_soap(
- msg_str, destination, sign=sign, sigalg=sigalg, **kwargs
+ msg_str, destination, sign=sign, sigalg=sign_alg, **kwargs
)
elif binding == BINDING_URI:
info = self.use_http_uri(msg_str, typ, destination)
@@ -329,8 +344,13 @@ class Entity(HTTPBase):
if not message_id:
message_id = sid()
- return {"id": message_id, "version": VERSION,
- "issue_instant": instant(), "issuer": self._issuer()}
+ margs = {
+ "id": message_id,
+ "version": VERSION,
+ "issue_instant": instant(),
+ "issuer": self._issuer(),
+ }
+ return margs
def response_args(self, message, bindings=None, descr_type=""):
"""
@@ -444,15 +464,35 @@ class Entity(HTTPBase):
# --------------------------------------------------------------------------
+ # XXX DONE will actually use sign_alg and digest_alg for the POST-Binding
+ # XXX DONE deepest level - needs to decide the sign_alg and digest_alg value
+ # XXX a controler for signed_instance_factory
+ # XXX syncs pre_signature_part and signed_instance_factory
+ # XXX makes sure pre_signature_part is called before signed_instance_factory
+ # XXX calls pre_signature_part - must have sign_alg & digest_alg
+ # XXX calls signed_instance_factory - after pre_signature_part
+ # XXX !!expects a msg object!!
def sign(
self,
msg,
mid=None,
to_sign=None,
- sign_prepare=False,
+ sign_prepare=None,
sign_alg=None,
digest_alg=None,
):
+ # XXX sig/digest-allowed should be configurable
+ sign_alg = sign_alg or self.signing_algorithm
+ digest_alg = digest_alg or self.digest_algorithm
+ if sign_alg not in [long_name for short_name, long_name in SIG_ALLOWED_ALG]:
+ raise Exception(
+ "Signature algo not in allowed list: {algo}".format(algo=sign_alg)
+ )
+ if digest_alg not in [long_name for short_name, long_name in DIGEST_ALLOWED_ALG]:
+ raise Exception(
+ "Digest algo not in allowed list: {algo}".format(algo=digest_alg)
+ )
+
if msg.signature is None:
msg.signature = pre_signature_part(
msg.id, self.sec.my_cert, 1, sign_alg=sign_alg, digest_alg=digest_alg
@@ -472,6 +512,11 @@ class Entity(HTTPBase):
logger.info("REQUEST: %s", msg)
return signed_instance_factory(msg, self.sec, to_sign)
+ # XXX DONE will actually use sign the POST-Binding
+ # XXX DONE deepest level - needs to decide the sign value
+ # XXX DONE calls self.sign must figure out sign
+ # XXX DONE ensure both SPs and IdPs go through this
+ # XXX DONE ensure this works for the POST-Binding
def _message(
self,
request_cls,
@@ -479,8 +524,8 @@ class Entity(HTTPBase):
message_id=0,
consent=None,
extensions=None,
- sign=False,
- sign_prepare=False,
+ sign=None,
+ sign_prepare=None,
nsprefix=None,
sign_alg=None,
digest_alg=None,
@@ -526,6 +571,8 @@ class Entity(HTTPBase):
req = self.msg_cb(req)
reqid = req.id
+
+ sign = sign if sign is not None else self.should_sign
if sign:
signed_req = self.sign(
req,
@@ -586,8 +633,7 @@ class Entity(HTTPBase):
return True
return False
- def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response,
- node_xpath=None):
+ def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response, node_xpath=None):
""" Encryption of assertions.
:param encrypt_cert: Certificate to be used for encryption.
@@ -625,14 +671,29 @@ class Entity(HTTPBase):
raise exception
return response
- def _response(self, in_response_to, consumer_url=None, status=None,
- issuer=None, sign=False, to_sign=None, sp_entity_id=None,
- encrypt_assertion=False,
- encrypt_assertion_self_contained=False,
- encrypted_advice_attributes=False,
- encrypt_cert_advice=None, encrypt_cert_assertion=None,
- sign_assertion=None, pefim=False, sign_alg=None,
- digest_alg=None, **kwargs):
+ # XXX DONE calls self.sign must figure out sign
+ # XXX calls signed_instance_factory - must have called pre_signature_part
+ # XXX calls pre_signature_part - must figure out sign_alg/digest_alg
+ def _response(
+ self,
+ in_response_to,
+ consumer_url=None,
+ status=None,
+ issuer=None,
+ sign=None,
+ to_sign=None,
+ sp_entity_id=None,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=False,
+ encrypted_advice_attributes=False,
+ encrypt_cert_advice=None,
+ encrypt_cert_assertion=None,
+ sign_assertion=None,
+ pefim=False,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
""" Create a Response.
Encryption:
encrypt_assertion must be true for encryption to be
@@ -674,17 +735,22 @@ class Entity(HTTPBase):
_issuer = self._issuer(issuer)
- response = response_factory(issuer=_issuer,
- in_response_to=in_response_to,
- status=status, sign_alg=sign_alg,
- digest_alg=digest_alg)
+ response = samlp.Response(id=sid(), version=VERSION, issue_instant=instant())
+ response.issuer = _issuer
+ response.in_response_to = in_response_to
+ response.status = status
if consumer_url:
response.destination = consumer_url
self._add_info(response, **kwargs)
- if not sign and to_sign and not encrypt_assertion:
+ sign = sign if sign is not None else self.should_sign
+ if (
+ to_sign
+ and not sign
+ and not encrypt_assertion
+ ):
return signed_instance_factory(response, self.sec, to_sign)
has_encrypt_cert = self.has_encrypt_cert_in_metadata(sp_entity_id)
@@ -693,117 +759,185 @@ class Entity(HTTPBase):
if not has_encrypt_cert and encrypt_cert_assertion is None:
encrypt_assertion = False
- if encrypt_assertion or (
- encrypted_advice_attributes and
- response.assertion.advice is
- not None and
- len(response.assertion.advice.assertion) == 1):
+ # XXX if encrypt_assertion or encrypted_advice_attributes
+ # XXX once in, response becomes a str and uses signed_instance_factory
+ if (
+ # XXX goto part-C
+ encrypt_assertion
+ or (
+ # XXX goto part-B
+ encrypted_advice_attributes
+ and response.assertion.advice is not None
+ and len(response.assertion.advice.assertion) == 1
+ )
+ ):
+ # XXX sig/digest-allowed should be configurable
+ sign_alg = sign_alg or self.signing_algorithm
+ digest_alg = digest_alg or self.digest_algorithm
+
+ # XXX part-A (common) prepare sign response
if sign:
- response.signature = pre_signature_part(response.id,
- self.sec.my_cert, 1,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
+ response.signature = pre_signature_part(
+ response.id,
+ self.sec.my_cert,
+ 1,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
sign_class = [(class_name(response), response.id)]
else:
sign_class = []
- if encrypted_advice_attributes and response.assertion.advice is \
- not None \
- and len(response.assertion.advice.assertion) > 0:
+ # XXX part-B if encrypted_advice_attributes
+ if (
+ encrypted_advice_attributes
+ and response.assertion.advice is not None
+ and len(response.assertion.advice.assertion) > 0
+ ):
_assertions = response.assertion
if not isinstance(_assertions, list):
_assertions = [_assertions]
+
for _assertion in _assertions:
_assertion.advice.encrypted_assertion = []
- _assertion.advice.encrypted_assertion.append(
- EncryptedAssertion())
- _advice_assertions = copy.deepcopy(
- _assertion.advice.assertion)
+ _assertion.advice.encrypted_assertion.append(EncryptedAssertion())
+ _advice_assertions = copy.deepcopy(_assertion.advice.assertion)
_assertion.advice.assertion = []
+
if not isinstance(_advice_assertions, list):
_advice_assertions = [_advice_assertions]
+
for tmp_assertion in _advice_assertions:
to_sign_advice = []
+ # XXX prepare sign assertion
if sign_assertion and not pefim:
tmp_assertion.signature = pre_signature_part(
- tmp_assertion.id, self.sec.my_cert, 1,
- sign_alg=sign_alg, digest_alg=digest_alg)
+ tmp_assertion.id,
+ self.sec.my_cert,
+ 1,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
to_sign_advice.append(
- (class_name(tmp_assertion), tmp_assertion.id))
+ (class_name(tmp_assertion), tmp_assertion.id),
+ )
+ # XXX prepare encrypt assertion
# tmp_assertion = response.assertion.advice.assertion[0]
- _assertion.advice.encrypted_assertion[
- 0].add_extension_element(tmp_assertion)
+ _assertion.advice.encrypted_assertion[0].add_extension_element(
+ tmp_assertion
+ )
if encrypt_assertion_self_contained:
- advice_tag = \
- response.assertion.advice._to_element_tree().tag
+ advice_tag = response.assertion.advice._to_element_tree().tag
assertion_tag = tmp_assertion._to_element_tree().tag
- response = \
- response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
- assertion_tag, advice_tag)
+ response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
+ assertion_tag, advice_tag
+ )
node_xpath = ''.join(
- ["/*[local-name()=\"%s\"]" % v for v in
- ["Response", "Assertion", "Advice",
- "EncryptedAssertion", "Assertion"]])
-
+ [
+ "/*[local-name()=\"%s\"]" % v
+ for v in [
+ "Response",
+ "Assertion",
+ "Advice",
+ "EncryptedAssertion",
+ "Assertion"
+ ]
+ ]
+ )
+
+ # XXX sign assertion
if to_sign_advice:
- response = signed_instance_factory(response,
- self.sec,
- to_sign_advice)
+ response = signed_instance_factory(
+ response, self.sec, to_sign_advice
+ )
+
+ # XXX encrypt assertion
response = self._encrypt_assertion(
- encrypt_cert_advice, sp_entity_id, response,
- node_xpath=node_xpath)
+ encrypt_cert_advice,
+ sp_entity_id,
+ response,
+ node_xpath=node_xpath,
+ )
response = response_from_string(response)
+ # XXX part-C if encrypt_assertion
if encrypt_assertion:
to_sign_assertion = []
- if sign_assertion is not None and sign_assertion:
+
+ # XXX prepare sign assertion
+ if sign_assertion:
_assertions = response.assertion
+
if not isinstance(_assertions, list):
_assertions = [_assertions]
+
for _assertion in _assertions:
_assertion.signature = pre_signature_part(
- _assertion.id, self.sec.my_cert, 1,
- sign_alg=sign_alg, digest_alg=digest_alg)
+ _assertion.id,
+ self.sec.my_cert,
+ 1,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
to_sign_assertion.append(
- (class_name(_assertion), _assertion.id))
+ (class_name(_assertion), _assertion.id),
+ )
+
+ # XXX prepare encrypt assertion
if encrypt_assertion_self_contained:
try:
- assertion_tag = response.assertion._to_element_tree(
-
- ).tag
+ assertion_tag = response.assertion._to_element_tree().tag
except:
- assertion_tag = response.assertion[
- 0]._to_element_tree().tag
+ assertion_tag = response.assertion[0]._to_element_tree().tag
response = pre_encrypt_assertion(response)
- response = \
- response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
- assertion_tag)
+ response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
+ assertion_tag
+ )
else:
response = pre_encrypt_assertion(response)
+
+ # XXX sign assertion
if to_sign_assertion:
- response = signed_instance_factory(response, self.sec,
- to_sign_assertion)
- response = self._encrypt_assertion(encrypt_cert_assertion,
- sp_entity_id, response)
+ response = signed_instance_factory(
+ response, self.sec, to_sign_assertion
+ )
+
+ # XXX encrypt assertion
+ response = self._encrypt_assertion(
+ encrypt_cert_assertion, sp_entity_id, response
+ )
else:
+ # XXX sign other parts! (defiend by to_sign)
if to_sign:
- response = signed_instance_factory(response, self.sec,
- to_sign)
+ response = signed_instance_factory(response, self.sec, to_sign)
+
+ # XXX part-D (common) sign response
+ # XXX handle response having been signed/encrypted => str
if sign:
return signed_instance_factory(response, self.sec, sign_class)
else:
return response
+ # XXX sign response
if sign:
- return self.sign(response, to_sign=to_sign, sign_alg=sign_alg,
- digest_alg=digest_alg)
- else:
- return response
+ return self.sign(
+ response, to_sign=to_sign, sign_alg=sign_alg, digest_alg=digest_alg
+ )
+
+ return response
- def _status_response(self, response_class, issuer, status, sign=False,
- sign_alg=None, digest_alg=None,
- **kwargs):
+ # XXX DONE calls self.sign must figure out sign
+ def _status_response(
+ self,
+ response_class,
+ issuer,
+ status,
+ sign=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
""" Create a StatusResponse.
:param response_class: Which subclass of StatusResponse that should be
@@ -818,21 +952,23 @@ class Entity(HTTPBase):
mid = sid()
for key in ["binding"]:
- try:
- del kwargs[key]
- except KeyError:
- pass
+ kwargs.pop(key, None)
if not status:
status = success_status_factory()
- response = response_class(issuer=issuer, id=mid, version=VERSION,
- issue_instant=instant(),
- status=status, **kwargs)
+ response = response_class(
+ issuer=issuer,
+ id=mid,
+ version=VERSION,
+ issue_instant=instant(),
+ status=status,
+ **kwargs,
+ )
+ sign = sign if sign is not None else self.should_sign
if sign:
- return self.sign(response, mid, sign_alg=sign_alg,
- digest_alg=digest_alg)
+ return self.sign(response, mid, sign_alg=sign_alg, digest_alg=digest_alg)
else:
return response
@@ -910,9 +1046,18 @@ class Entity(HTTPBase):
# ------------------------------------------------------------------------
- def create_error_response(self, in_response_to, destination, info,
- sign=False, issuer=None, sign_alg=None,
- digest_alg=None, **kwargs):
+ # XXX DONE ent create > _response
+ def create_error_response(
+ self,
+ in_response_to,
+ destination,
+ info,
+ sign=None,
+ issuer=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
""" Create a error response.
:param in_response_to: The identifier of the message this is a response
@@ -927,17 +1072,35 @@ class Entity(HTTPBase):
"""
status = error_status_factory(info)
- return self._response(in_response_to, destination, status, issuer,
- sign, sign_alg=sign_alg, digest_alg=digest_alg)
+ return self._response(
+ in_response_to,
+ destination,
+ status,
+ issuer,
+ sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
# ------------------------------------------------------------------------
- def create_logout_request(self, destination, issuer_entity_id,
- subject_id=None, name_id=None,
- reason=None, expire=None, message_id=0,
- consent=None, extensions=None, sign=False,
- session_indexes=None, sign_alg=None,
- digest_alg=None):
+ # XXX DONE ent create > _message
+ def create_logout_request(
+ self,
+ destination,
+ issuer_entity_id,
+ subject_id=None,
+ name_id=None,
+ reason=None,
+ expire=None,
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=None,
+ session_indexes=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
""" Constructs a LogoutRequest
:param destination: Destination of the request
@@ -959,9 +1122,9 @@ class Entity(HTTPBase):
if subject_id:
if self.entity_type == "idp":
- name_id = NameID(text=self.users.get_entityid(subject_id,
- issuer_entity_id,
- False))
+ name_id = NameID(
+ text=self.users.get_entityid(subject_id, issuer_entity_id, False)
+ )
else:
name_id = NameID(text=subject_id)
@@ -978,15 +1141,33 @@ class Entity(HTTPBase):
sis.append(SessionIndex(text=si))
args["session_index"] = sis
- return self._message(LogoutRequest, destination, message_id,
- consent, extensions, sign, name_id=name_id,
- reason=reason, not_on_or_after=expire,
- issuer=self._issuer(), sign_alg=sign_alg,
- digest_alg=digest_alg, **args)
+ return self._message(
+ LogoutRequest,
+ destination,
+ message_id,
+ consent,
+ extensions,
+ sign,
+ name_id=name_id,
+ reason=reason,
+ not_on_or_after=expire,
+ issuer=self._issuer(),
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **args,
+ )
- def create_logout_response(self, request, bindings=None, status=None,
- sign=False, issuer=None, sign_alg=None,
- digest_alg=None):
+ # XXX DONE ent create > _status_response
+ def create_logout_response(
+ self,
+ request,
+ bindings=None,
+ status=None,
+ sign=None,
+ issuer=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
""" Create a LogoutResponse.
:param request: The request this is a response to
@@ -1003,17 +1184,32 @@ class Entity(HTTPBase):
if not issuer:
issuer = self._issuer()
- response = self._status_response(samlp.LogoutResponse, issuer, status,
- sign, sign_alg=sign_alg,
- digest_alg=digest_alg, **rinfo)
+ response = self._status_response(
+ samlp.LogoutResponse,
+ issuer,
+ status,
+ sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **rinfo,
+ )
logger.info("Response: %s", response)
return response
- def create_artifact_resolve(self, artifact, destination, sessid,
- consent=None, extensions=None, sign=False,
- sign_alg=None, digest_alg=None):
+ # XXX DONE ent create > _message
+ def create_artifact_resolve(
+ self,
+ artifact,
+ destination,
+ sessid,
+ consent=None,
+ extensions=None,
+ sign=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
"""
Create a ArtifactResolve request
@@ -1028,22 +1224,45 @@ class Entity(HTTPBase):
artifact = Artifact(text=artifact)
- return self._message(ArtifactResolve, destination, sessid,
- consent, extensions, sign, artifact=artifact,
- sign_alg=sign_alg, digest_alg=digest_alg)
+ return self._message(
+ ArtifactResolve,
+ destination,
+ sessid,
+ consent,
+ extensions,
+ sign,
+ artifact=artifact,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
- def create_artifact_response(self, request, artifact, bindings=None,
- status=None, sign=False, issuer=None,
- sign_alg=None, digest_alg=None):
+ # XXX DONE ent create > _status_response
+ def create_artifact_response(
+ self,
+ request,
+ artifact,
+ bindings=None,
+ status=None,
+ sign=None,
+ issuer=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
"""
Create an ArtifactResponse
:return:
"""
rinfo = self.response_args(request, bindings)
- response = self._status_response(ArtifactResponse, issuer, status,
- sign=sign, sign_alg=sign_alg,
- digest_alg=digest_alg, **rinfo)
+ response = self._status_response(
+ ArtifactResponse,
+ issuer,
+ status,
+ sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **rinfo,
+ )
msg = element_to_extension_element(self.artifact[artifact])
response.extension_elements = [msg]
@@ -1052,12 +1271,22 @@ class Entity(HTTPBase):
return response
- def create_manage_name_id_request(self, destination, message_id=0,
- consent=None, extensions=None, sign=False,
- name_id=None, new_id=None,
- encrypted_id=None, new_encrypted_id=None,
- terminate=None, sign_alg=None,
- digest_alg=None):
+ # XXX DONE ent create > _message
+ def create_manage_name_id_request(
+ self,
+ destination,
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=None,
+ name_id=None,
+ new_id=None,
+ encrypted_id=None,
+ new_encrypted_id=None,
+ terminate=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
"""
:param destination:
@@ -1093,9 +1322,16 @@ class Entity(HTTPBase):
"One of NewID, NewEncryptedNameID or Terminate has to be "
"provided")
- return self._message(ManageNameIDRequest, destination, consent=consent,
- extensions=extensions, sign=sign,
- sign_alg=sign_alg, digest_alg=digest_alg, **kwargs)
+ return self._message(
+ ManageNameIDRequest,
+ destination,
+ consent=consent,
+ extensions=extensions,
+ sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **kwargs,
+ )
def parse_manage_name_id_request(self, xmlstr, binding=BINDING_SOAP):
""" Deal with a LogoutRequest
@@ -1110,31 +1346,55 @@ class Entity(HTTPBase):
return self._parse_request(xmlstr, saml_request.ManageNameIDRequest,
"manage_name_id_service", binding)
- def create_manage_name_id_response(self, request, bindings=None,
- status=None, sign=False, issuer=None,
- sign_alg=None, digest_alg=None,
- **kwargs):
+ # XXX DONE ent create > _status_response
+ def create_manage_name_id_response(
+ self,
+ request,
+ bindings=None,
+ status=None,
+ sign=None,
+ issuer=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
rinfo = self.response_args(request, bindings)
- response = self._status_response(samlp.ManageNameIDResponse, issuer,
- status, sign, sign_alg=sign_alg,
- digest_alg=digest_alg, **rinfo)
+ response = self._status_response(
+ samlp.ManageNameIDResponse,
+ issuer,
+ status,
+ sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **rinfo,
+ )
logger.info("Response: %s", response)
return response
- def parse_manage_name_id_request_response(self, string,
- binding=BINDING_SOAP):
- return self._parse_response(string, saml_response.ManageNameIDResponse,
- "manage_name_id_service", binding,
- asynchop=False)
+ def parse_manage_name_id_request_response(self, string, binding=BINDING_SOAP):
+ return self._parse_response(
+ string,
+ saml_response.ManageNameIDResponse,
+ "manage_name_id_service",
+ binding,
+ asynchop=False,
+ )
# ------------------------------------------------------------------------
- def _parse_response(self, xmlstr, response_cls, service, binding,
- outstanding_certs=None, **kwargs):
+ def _parse_response(
+ self,
+ xmlstr,
+ response_cls,
+ service,
+ binding,
+ outstanding_certs=None,
+ **kwargs,
+ ):
""" Deal with a Response
:param xmlstr: The response as a xml string
@@ -1335,7 +1595,15 @@ class Entity(HTTPBase):
return destination
- def artifact2message(self, artifact, descriptor, sign=False):
+ # XXX DONE uses sign but not a create_*
+ def artifact2message(
+ self,
+ artifact,
+ descriptor,
+ sign=None,
+ sign_alg=None,
+ digest_alg=None,
+ ):
"""
:param artifact: The Base64 encoded SAML artifact as sent over the net
@@ -1355,6 +1623,8 @@ class Entity(HTTPBase):
destination,
_sid,
sign=sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
)
return self.send_using_soap(msg, destination)
diff --git a/src/saml2/s2repoze/plugins/sp.py b/src/saml2/s2repoze/plugins/sp.py
index a840ab37..3dec8d14 100644
--- a/src/saml2/s2repoze/plugins/sp.py
+++ b/src/saml2/s2repoze/plugins/sp.py
@@ -271,7 +271,6 @@ class SAML2Plugin(object):
#### IChallenger ####
# noinspection PyUnusedLocal
def challenge(self, environ, _status, _app_headers, _forget_headers):
-
_cli = self.saml_client
if "REMOTE_USER" in environ:
@@ -346,7 +345,7 @@ class SAML2Plugin(object):
)
if _cli.authn_requests_signed:
- _sid = saml2.s_utils.sid()
+ _sid = sid()
req_id, msg_str = _cli.create_authn_request(
dest,
vorg=vorg_name,
@@ -357,7 +356,10 @@ class SAML2Plugin(object):
_sid = req_id
else:
req_id, req = _cli.create_authn_request(
- dest, vorg=vorg_name, sign=False, extensions=extensions
+ dest,
+ vorg=vorg_name,
+ sign=False,
+ extensions=extensions,
)
msg_str = "%s" % req
_sid = req_id
diff --git a/src/saml2/server.py b/src/saml2/server.py
index bcdbd2bb..9e34cce2 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -77,8 +77,15 @@ def _shelve_compat(name, *args, **kwargs):
class Server(Entity):
""" A class that does things that IdPs or AAs do """
- def __init__(self, config_file="", config=None, cache=None, stype="idp",
- symkey="", msg_cb=None):
+ def __init__(
+ self,
+ config_file="",
+ config=None,
+ cache=None,
+ stype="idp",
+ symkey="",
+ msg_cb=None,
+ ):
Entity.__init__(self, stype, config, config_file, msg_cb=msg_cb)
self.eptid = None
self.init_config(stype)
@@ -218,6 +225,7 @@ class Server(Entity):
return False
# -------------------------------------------------------------------------
+
def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT):
"""Parse a Authentication Request
@@ -320,10 +328,25 @@ class Server(Entity):
consumer_url])
return farg
- def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url,
- name_id, policy, _issuer, authn_statement, identity,
- best_effort, sign_response, farg=None,
- session_not_on_or_after=None, **kwargs):
+ def setup_assertion(
+ self,
+ authn,
+ sp_entity_id,
+ in_response_to,
+ consumer_url,
+ name_id,
+ policy,
+ _issuer,
+ authn_statement,
+ identity,
+ best_effort,
+ sign_response,
+ farg=None,
+ session_not_on_or_after=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
"""
Construct and return the Assertion
@@ -352,8 +375,15 @@ class Server(Entity):
ast.apply_policy(sp_entity_id, policy)
except MissingValue as exc:
if not best_effort:
- return self.create_error_response(in_response_to, consumer_url,
- exc, sign_response)
+ response = self.create_error_response(
+ in_response_to,
+ destination=consumer_url,
+ info=exc,
+ sign=sign_response,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+ return str(response).split("\n")
farg = self.update_farg(in_response_to, consumer_url, farg)
@@ -384,17 +414,34 @@ class Server(Entity):
**kwargs)
return assertion
- def _authn_response(self, in_response_to, consumer_url,
- sp_entity_id, identity=None, name_id=None,
- status=None, authn=None, issuer=None, policy=None,
- sign_assertion=False, sign_response=False,
- best_effort=False, encrypt_assertion=False,
- encrypt_cert_advice=None, encrypt_cert_assertion=None,
- authn_statement=None,
- encrypt_assertion_self_contained=False,
- encrypted_advice_attributes=False,
- pefim=False, sign_alg=None, digest_alg=None,
- farg=None, session_not_on_or_after=None):
+ # XXX DONE calls pre_signature_part
+ # XXX calls _response
+ def _authn_response(
+ self,
+ in_response_to,
+ consumer_url,
+ sp_entity_id,
+ identity=None,
+ name_id=None,
+ status=None,
+ authn=None,
+ issuer=None,
+ policy=None,
+ sign_assertion=None,
+ sign_response=None,
+ best_effort=False,
+ encrypt_assertion=False,
+ encrypt_cert_advice=None,
+ encrypt_cert_assertion=None,
+ authn_statement=None,
+ encrypt_assertion_self_contained=False,
+ encrypted_advice_attributes=False,
+ pefim=False,
+ sign_alg=None,
+ digest_alg=None,
+ farg=None,
+ session_not_on_or_after=None,
+ ):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
@@ -423,7 +470,6 @@ class Server(Entity):
:param encrypt_cert_assertion: Certificate to be used for encryption
of assertions.
:param authn_statement: Authentication statement.
- :param sign_assertion: True if assertions should be signed.
:param pefim: True if a response according to the PEFIM profile
should be created.
:param farg: Argument to pass on to the assertion constructor
@@ -433,7 +479,6 @@ class Server(Entity):
if farg is None:
assertion_args = {}
- args = {}
# if identity:
_issuer = self._issuer(issuer)
@@ -471,37 +516,65 @@ class Server(Entity):
to_sign = []
if not encrypt_assertion:
if sign_assertion:
- assertion.signature = pre_signature_part(assertion.id,
- self.sec.my_cert, 2,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
+ # XXX self.signing_algorithm self.digest_algorithm defined by entity
+ # XXX this should be handled through entity.py
+ # XXX sig/digest-allowed should be configurable
+ sign_alg = sign_alg or self.signing_algorithm
+ digest_alg = digest_alg or self.digest_algorithm
+
+ assertion.signature = pre_signature_part(
+ assertion.id,
+ self.sec.my_cert,
+ 2,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
to_sign.append((class_name(assertion), assertion.id))
- args["assertion"] = assertion
-
if (self.support_AssertionIDRequest() or self.support_AuthnQuery()):
self.session_db.store_assertion(assertion, to_sign)
return self._response(
- in_response_to, consumer_url, status, issuer, sign_response,
- to_sign, sp_entity_id=sp_entity_id,
+ in_response_to,
+ consumer_url,
+ status,
+ issuer,
+ sign_response,
+ to_sign,
+ sp_entity_id=sp_entity_id,
encrypt_assertion=encrypt_assertion,
encrypt_cert_advice=encrypt_cert_advice,
encrypt_cert_assertion=encrypt_cert_assertion,
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes,
sign_assertion=sign_assertion,
- pefim=pefim, sign_alg=sign_alg, digest_alg=digest_alg, **args)
+ pefim=pefim,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ assertion=assertion,
+ )
# ------------------------------------------------------------------------
- # noinspection PyUnusedLocal
- def create_attribute_response(self, identity, in_response_to, destination,
- sp_entity_id, userid="", name_id=None,
- status=None, issuer=None,
- sign_assertion=False, sign_response=False,
- attributes=None, sign_alg=None,
- digest_alg=None, farg=None, **kwargs):
+ # XXX DONE idp create > _response
+ def create_attribute_response(
+ self,
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ userid="",
+ name_id=None,
+ status=None,
+ issuer=None,
+ sign_assertion=None,
+ sign_response=None,
+ attributes=None,
+ sign_alg=None,
+ digest_alg=None,
+ farg=None,
+ **kwargs,
+ ):
""" Create an attribute assertion response.
:param identity: A dictionary with attributes and values that are
@@ -550,66 +623,52 @@ class Server(Entity):
issuer=_issuer, name_id=name_id,
farg=farg['assertion'])
- if sign_assertion:
- assertion.signature = pre_signature_part(assertion.id,
- self.sec.my_cert, 1,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
- # Just the assertion or the response and the assertion ?
- to_sign = [(class_name(assertion), assertion.id)]
- kwargs['sign_assertion'] = True
-
- kwargs["assertion"] = assertion
-
- if sp_entity_id:
- kwargs['sp_entity_id'] = sp_entity_id
-
- return self._response(in_response_to, destination, status, issuer,
- sign_response, to_sign, sign_alg=sign_alg,
- digest_alg=digest_alg, **kwargs)
+ return self._response(
+ in_response_to,
+ destination,
+ status,
+ issuer,
+ sign_response,
+ to_sign,
+ sign_assertion=sign_assertion,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ assertion=assertion,
+ sp_entity_id=sp_entity_id,
+ **kwargs,
+ )
+
+ def gather_authn_response_args(
+ self, sp_entity_id, name_id_policy, userid, **kwargs
+ ):
+ kwargs["policy"] = kwargs.get("release_policy")
- # ------------------------------------------------------------------------
+ # collect args and return them
+ args = {}
- def gather_authn_response_args(self, sp_entity_id, name_id_policy, userid,
- **kwargs):
- param_default = {
+ # XXX will be passed to _authn_response
+ param_defaults = {
+ 'policy': None,
+ 'best_effort': False,
'sign_assertion': False,
'sign_response': False,
'encrypt_assertion': False,
'encrypt_assertion_self_contained': True,
'encrypted_advice_attributes': False,
'encrypt_cert_advice': None,
- 'encrypt_cert_assertion': None
+ 'encrypt_cert_assertion': None,
+ # need to be named sign_alg and digest_alg
}
-
- args = {}
-
- try:
- args["policy"] = kwargs["release_policy"]
- except KeyError:
- args["policy"] = self.config.getattr("policy", "idp")
-
- try:
- args['best_effort'] = kwargs["best_effort"]
- except KeyError:
- args['best_effort'] = False
-
- for param in ['sign_assertion', 'sign_response', 'encrypt_assertion',
- 'encrypt_assertion_self_contained',
- 'encrypted_advice_attributes', 'encrypt_cert_advice',
- 'encrypt_cert_assertion']:
- try:
- _val = kwargs[param]
- except KeyError:
- _val = None
-
- if _val is None:
- _val = self.config.getattr(param, "idp")
-
- if _val is None:
- args[param] = param_default[param]
- else:
- args[param] = _val
+ for param, val_default in param_defaults.items():
+ val_kw = kwargs.get(param)
+ val_config = self.config.getattr(param, "idp")
+ args[param] = (
+ val_kw
+ if val_kw is not None
+ else val_config
+ if val_config is not None
+ else val_default
+ )
for arg, attr, eca, pefim in [
('encrypted_advice_attributes', 'verify_encrypt_cert_advice',
@@ -671,6 +730,7 @@ class Server(Entity):
return args
+ # XXX DONE idp create > _authn_response > _response
def create_authn_response(
self,
identity,
@@ -693,7 +753,7 @@ class Server(Entity):
sign_alg=None,
digest_alg=None,
session_not_on_or_after=None,
- **kwargs
+ **kwargs,
):
""" Constructs an AuthenticationResponse
@@ -720,7 +780,6 @@ class Server(Entity):
assertions in the advice element.
:param encrypt_cert_assertion: Certificate to be used for encryption
of assertions.
- :param sign_assertion: True if assertions should be signed.
:param pefim: True if a response according to the PEFIM profile
should be created.
:return: A response instance
@@ -728,70 +787,99 @@ class Server(Entity):
try:
args = self.gather_authn_response_args(
- sp_entity_id, name_id_policy=name_id_policy, userid=userid,
- name_id=name_id, sign_response=sign_response,
+ sp_entity_id,
+ name_id_policy=name_id_policy,
+ userid=userid,
+ name_id=name_id,
+ sign_response=sign_response,
sign_assertion=sign_assertion,
encrypt_cert_advice=encrypt_cert_advice,
encrypt_cert_assertion=encrypt_cert_assertion,
encrypt_assertion=encrypt_assertion,
- encrypt_assertion_self_contained
- =encrypt_assertion_self_contained,
+ encrypt_assertion_self_contained=encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes,
- pefim=pefim, **kwargs)
+ pefim=pefim,
+ **kwargs,
+ )
except IOError as exc:
- response = self.create_error_response(in_response_to,
- destination,
- sp_entity_id,
- exc, name_id)
- return ("%s" % response).split("\n")
+ response = self.create_error_response(
+ in_response_to,
+ destination=destination,
+ info=exc,
+ sign=sign_response,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+ return str(response).split("\n")
try:
_authn = authn
- if (sign_assertion or sign_response) and \
- self.sec.cert_handler.generate_cert():
- with self.lock:
- self.sec.cert_handler.update_cert(True)
- return self._authn_response(
- in_response_to, destination, sp_entity_id, identity,
- authn=_authn, issuer=issuer, pefim=pefim,
- sign_alg=sign_alg, digest_alg=digest_alg,
- session_not_on_or_after=session_not_on_or_after, **args)
return self._authn_response(
- in_response_to, destination, sp_entity_id, identity,
- authn=_authn, issuer=issuer, pefim=pefim, sign_alg=sign_alg,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ identity,
+ authn=_authn,
+ issuer=issuer,
+ pefim=pefim,
+ sign_alg=sign_alg,
digest_alg=digest_alg,
- session_not_on_or_after=session_not_on_or_after, **args)
-
+ session_not_on_or_after=session_not_on_or_after,
+ **args,
+ )
except MissingValue as exc:
- return self.create_error_response(in_response_to, destination,
- sp_entity_id, exc, name_id)
-
- def create_authn_request_response(self, identity, in_response_to,
- destination, sp_entity_id,
- name_id_policy=None, userid=None,
- name_id=None, authn=None, authn_decl=None,
- issuer=None, sign_response=False,
- sign_assertion=False,
- session_not_on_or_after=None, **kwargs):
-
- return self.create_authn_response(identity, in_response_to, destination,
- sp_entity_id, name_id_policy, userid,
- name_id, authn, issuer,
- sign_response, sign_assertion,
- authn_decl=authn_decl,
- session_not_on_or_after=session_not_on_or_after)
-
- # noinspection PyUnusedLocal
- def create_assertion_id_request_response(self, assertion_id, sign=False,
- sign_alg=None,
- digest_alg=None, **kwargs):
- """
-
- :param assertion_id:
- :param sign:
- :return:
- """
+ return self.create_error_response(
+ in_response_to,
+ destination=destination,
+ info=exc,
+ sign=sign_response,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+ # XXX DONE idp create > create_authn_response > _authn_response > _response
+ def create_authn_request_response(
+ self,
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy=None,
+ userid=None,
+ name_id=None,
+ authn=None,
+ authn_decl=None,
+ issuer=None,
+ sign_response=None,
+ sign_assertion=None,
+ session_not_on_or_after=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
+ return self.create_authn_response(
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy,
+ userid,
+ name_id,
+ authn,
+ issuer,
+ sign_response,
+ sign_assertion,
+ authn_decl=authn_decl,
+ session_not_on_or_after=session_not_on_or_after,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+
+ # XXX DONE calls pre_signature_part
+ # XXX DONE idp create > [...]
+ def create_assertion_id_request_response(
+ self, assertion_id, sign=None, sign_alg=None, digest_alg=None, **kwargs
+ ):
try:
(assertion, to_sign) = self.session_db.get_assertion(assertion_id)
except KeyError:
@@ -799,21 +887,38 @@ class Server(Entity):
if to_sign:
if assertion.signature is None:
- assertion.signature = pre_signature_part(assertion.id,
- self.sec.my_cert, 1,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
-
+ # XXX self.signing_algorithm self.digest_algorithm defined by entity
+ # XXX this should be handled through entity.py
+ # XXX sig/digest-allowed should be configurable
+ sign_alg = sign_alg or self.signing_algorithm
+ digest_alg = digest_alg or self.digest_algorithm
+
+ assertion.signature = pre_signature_part(
+ assertion.id,
+ self.sec.my_cert,
+ 1,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
return signed_instance_factory(assertion, self.sec, to_sign)
else:
return assertion
- # noinspection PyUnusedLocal
- def create_name_id_mapping_response(self, name_id=None, encrypted_id=None,
- in_response_to=None,
- issuer=None, sign_response=False,
- status=None, sign_alg=None,
- digest_alg=None, **kwargs):
+ # XXX calls self.sign without ensuring sign
+ # XXX calls self.sign => should it call _message (which calls self.sign)?
+ # XXX idp create > NameIDMappingResponse & sign?
+ def create_name_id_mapping_response(
+ self,
+ name_id=None,
+ encrypted_id=None,
+ in_response_to=None,
+ issuer=None,
+ sign_response=None,
+ status=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
"""
protocol for mapping a principal's name identifier into a
different name identifier for the same principal.
@@ -831,8 +936,9 @@ class Server(Entity):
ms_args = self.message_args()
- _resp = NameIDMappingResponse(name_id, encrypted_id,
- in_response_to=in_response_to, **ms_args)
+ _resp = NameIDMappingResponse(
+ name_id, encrypted_id, in_response_to=in_response_to, **ms_args
+ )
if sign_response:
return self.sign(_resp, sign_alg=sign_alg, digest_alg=digest_alg)
@@ -840,11 +946,20 @@ class Server(Entity):
logger.info("Message: %s", _resp)
return _resp
- def create_authn_query_response(self, subject, session_index=None,
- requested_context=None, in_response_to=None,
- issuer=None, sign_response=False,
- status=None, sign_alg=None, digest_alg=None,
- **kwargs):
+ # XXX DONE idp create > _response
+ def create_authn_query_response(
+ self,
+ subject,
+ session_index=None,
+ requested_context=None,
+ in_response_to=None,
+ issuer=None,
+ sign_response=None,
+ status=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
"""
A successful <Response> will contain one or more assertions containing
authentication statements.
@@ -853,32 +968,54 @@ class Server(Entity):
"""
margs = self.message_args()
- asserts = []
- for statement in self.session_db.get_authn_statements(
- subject.name_id, session_index, requested_context):
- asserts.append(saml.Assertion(authn_statement=statement,
- subject=subject, **margs))
+ asserts = [
+ saml.Assertion(authn_statement=statement, subject=subject, **margs)
+ for statement in self.session_db.get_authn_statements(
+ subject.name_id, session_index, requested_context
+ )
+ ]
if asserts:
args = {"assertion": asserts}
else:
args = {}
- return self._response(in_response_to, "", status, issuer,
- sign_response, to_sign=[], sign_alg=sign_alg,
- digest_alg=digest_alg, **args)
+ return self._response(
+ in_response_to,
+ "",
+ status,
+ issuer,
+ sign_response,
+ to_sign=[],
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **args,
+ )
# ---------
def parse_ecp_authn_request(self):
pass
- def create_ecp_authn_request_response(self, acs_url, identity,
- in_response_to, destination,
- sp_entity_id, name_id_policy=None,
- userid=None, name_id=None, authn=None,
- issuer=None, sign_response=False,
- sign_assertion=False, **kwargs):
+ # XXX DONE idp create > create_authn_response > _authn_response > _response
+ def create_ecp_authn_request_response(
+ self,
+ acs_url,
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy=None,
+ userid=None,
+ name_id=None,
+ authn=None,
+ issuer=None,
+ sign_response=None,
+ sign_assertion=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
# ----------------------------------------
# <ecp:Response
@@ -892,17 +1029,27 @@ class Server(Entity):
# <samlp:Response
# ----------------------------------------
- response = self.create_authn_response(identity, in_response_to,
- destination, sp_entity_id,
- name_id_policy, userid, name_id,
- authn, issuer,
- sign_response, sign_assertion)
+ response = self.create_authn_response(
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy,
+ userid,
+ name_id,
+ authn,
+ issuer,
+ sign_response,
+ sign_assertion,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg
+ )
body = soapenv.Body()
body.extension_elements = [element_to_extension_element(response)]
soap_envelope = soapenv.Envelope(header=header, body=body)
- return "%s" % soap_envelope
+ return str(soap_envelope)
def close(self):
self.ident.close()
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
index ee630340..52324eb4 100644
--- a/src/saml2/sigver.py
+++ b/src/saml2/sigver.py
@@ -301,6 +301,12 @@ def _instance(klass, ava, seccont, base64encode=False, elements_to_sign=None):
return instance
+# XXX will actually sign the nodes
+# XXX assumes pre_signature_part has already been called
+# XXX calls sign without specifying sign_alg/digest_alg
+# XXX this is fine as the algs are embeded in the document
+# XXX as setup by pre_signature_part
+# XXX !!expects instance string!!
def signed_instance_factory(instance, seccont, elements_to_sign=None):
"""
@@ -309,17 +315,20 @@ def signed_instance_factory(instance, seccont, elements_to_sign=None):
:param elements_to_sign: Which parts if any that should be signed
:return: A class instance if not signed otherwise a string
"""
- if elements_to_sign:
- signed_xml = instance
- if not isinstance(instance, six.string_types):
- signed_xml = instance.to_string()
- for (node_name, nodeid) in elements_to_sign:
- signed_xml = seccont.sign_statement(
- signed_xml, node_name=node_name, node_id=nodeid)
- return signed_xml
- else:
+ if not elements_to_sign:
return instance
+ signed_xml = instance
+ if not isinstance(instance, six.string_types):
+ signed_xml = instance.to_string()
+
+ for (node_name, nodeid) in elements_to_sign:
+ signed_xml = seccont.sign_statement(
+ signed_xml, node_name=node_name, node_id=nodeid
+ )
+
+ return signed_xml
+
def make_temp(content, suffix="", decode=True, delete_tmpfiles=True):
"""
@@ -1689,11 +1698,6 @@ class SecurityContext(object):
node_id,
)
- def sign_assertion_using_xmlsec(self, statement, **kwargs):
- """ Deprecated function. See sign_assertion(). """
- return self.sign_statement(
- statement, class_name(saml.Assertion()), **kwargs)
-
def sign_assertion(self, statement, **kwargs):
"""Sign a SAML assertion.
@@ -1739,10 +1743,11 @@ class SecurityContext(object):
if not item.signature:
item.signature = pre_signature_part(
- sid,
- self.cert_file,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
+ ident=sid,
+ public_key=self.cert_file,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
statement = self.sign_statement(
statement,
@@ -1755,7 +1760,14 @@ class SecurityContext(object):
return statement
-def pre_signature_part(ident, public_key=None, identifier=None, digest_alg=None, sign_alg=None):
+# XXX FIXME calls DefaultSignature - remove to unveil chain of calls without proper args
+def pre_signature_part(
+ ident,
+ public_key=None,
+ identifier=None,
+ digest_alg=None,
+ sign_alg=None,
+):
"""
If an assertion is to be signed the signature part has to be preset
with which algorithms to be used, this function returns such a
@@ -1768,10 +1780,12 @@ def pre_signature_part(ident, public_key=None, identifier=None, digest_alg=None,
:return: A preset signature part
"""
+ # XXX
if not digest_alg:
digest_alg = ds.DefaultSignature().get_digest_alg()
if not sign_alg:
sign_alg = ds.DefaultSignature().get_sign_alg()
+
signature_method = ds.SignatureMethod(algorithm=sign_alg)
canonicalization_method = ds.CanonicalizationMethod(
algorithm=ds.ALG_EXC_C14N)
@@ -1882,23 +1896,6 @@ def pre_encrypt_assertion(response):
return response
-def response_factory(sign=False, encrypt=False, sign_alg=None, digest_alg=None,
- **kwargs):
- response = samlp.Response(id=sid(), version=VERSION,
- issue_instant=instant())
-
- if sign:
- response.signature = pre_signature_part(
- kwargs['id'], sign_alg=sign_alg, digest_alg=digest_alg)
- if encrypt:
- pass
-
- for key, val in kwargs.items():
- setattr(response, key, val)
-
- return response
-
-
if __name__ == '__main__':
import argparse
diff --git a/tests/test_50_server.py b/tests/test_50_server.py
index 7ee82499..dfc24eee 100644
--- a/tests/test_50_server.py
+++ b/tests/test_50_server.py
@@ -23,12 +23,15 @@ from saml2 import extension_elements_to_elements
from saml2 import s_utils
from saml2 import sigver
from saml2 import time_util
+from saml2 import VERSION
from saml2.s_utils import OtherError
from saml2.s_utils import do_attribute_statement
from saml2.s_utils import factory
+from saml2.s_utils import sid
from saml2.soap import make_soap_enveloped_saml_thingy
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
+from saml2.time_util import instant
from pytest import raises
from pathutils import full_path
@@ -44,6 +47,14 @@ AUTHN = {
}
+def response_factory(**kwargs):
+ response = samlp.Response(id=sid(), version=VERSION, issue_instant=instant())
+
+ for key, val in kwargs.items():
+ setattr(response, key, val)
+
+ return response
+
def _eq(l1, l2):
return set(l1) == set(l2)
@@ -179,7 +190,7 @@ class TestServer1():
assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT
def test_response(self):
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
@@ -1239,7 +1250,7 @@ class TestServer1NonAsciiAva():
assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT
def test_response(self):
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
diff --git a/tests/test_51_client.py b/tests/test_51_client.py
index a20cf941..c82917cd 100644
--- a/tests/test_51_client.py
+++ b/tests/test_51_client.py
@@ -21,6 +21,7 @@ from saml2 import saml
from saml2 import samlp
from saml2 import sigver
from saml2 import s_utils
+from saml2 import VERSION
from saml2.assertion import Assertion
from saml2.extension.requested_attributes import RequestedAttributes
from saml2.extension.requested_attributes import RequestedAttribute
@@ -40,7 +41,10 @@ from saml2.sigver import verify_redirect_signature
from saml2.sigver import SignatureError, SigverError
from saml2.s_utils import do_attribute_statement
from saml2.s_utils import factory
-from saml2.time_util import in_a_while, a_while_ago
+from saml2.s_utils import sid
+from saml2.time_util import in_a_while
+from saml2.time_util import a_while_ago
+from saml2.time_util import instant
from defusedxml.common import EntitiesForbidden
@@ -53,6 +57,14 @@ AUTHN = {
"authn_auth": "http://www.example.com/login"
}
+def response_factory(**kwargs):
+ response = samlp.Response(id=sid(), version=VERSION, issue_instant=instant())
+
+ for key, val in kwargs.items():
+ setattr(response, key, val)
+
+ return response
+
def generate_cert():
sn = uuid.uuid4().urn
cert_info = {
@@ -943,7 +955,7 @@ class TestClient:
# Create an Assertion instance from the signed assertion
_ass = saml.assertion_from_string(sigass)
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
@@ -951,10 +963,11 @@ class TestClient:
assertion=_ass
)
- enctext = _sec.crypto.encrypt_assertion(response,
- self.client.sec.encryption_keypairs[
- 0]["cert_file"],
- pre_encryption_part())
+ enctext = _sec.crypto.encrypt_assertion(
+ response,
+ self.client.sec.encryption_keypairs[0]["cert_file"],
+ pre_encryption_part(),
+ )
seresp = samlp.response_from_string(enctext)
@@ -1023,7 +1036,7 @@ class TestClient:
node_id=assertion.id)
sigass = rm_xmltag(sigass)
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
@@ -1116,7 +1129,7 @@ class TestClient:
assertion.advice.encrypted_assertion[0].add_extension_element(
a_assertion)
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
@@ -1267,7 +1280,7 @@ class TestClient:
assertion_2.signature = sigver.pre_signature_part(assertion_2.id,
_sec.my_cert, 1)
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
@@ -2560,7 +2573,7 @@ class TestClientNonAsciiAva:
# Create an Assertion instance from the signed assertion
_ass = saml.assertion_from_string(sigass)
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="https:#www.example.com",
status=s_utils.success_status_factory(),
@@ -2640,7 +2653,7 @@ class TestClientNonAsciiAva:
node_id=assertion.id)
sigass = rm_xmltag(sigass)
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
@@ -2733,7 +2746,7 @@ class TestClientNonAsciiAva:
assertion.advice.encrypted_assertion[0].add_extension_element(
a_assertion)
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
@@ -2885,7 +2898,7 @@ class TestClientNonAsciiAva:
assertion_2.signature = sigver.pre_signature_part(assertion_2.id,
_sec.my_cert, 1)
- response = sigver.response_factory(
+ response = response_factory(
in_response_to="_012345",
destination="http://lingon.catalogix.se:8087/",
status=s_utils.success_status_factory(),
diff --git a/tests/test_52_default_sign_alg.py b/tests/test_52_default_sign_alg.py
index 274ee858..fee4ee21 100644
--- a/tests/test_52_default_sign_alg.py
+++ b/tests/test_52_default_sign_alg.py
@@ -42,17 +42,8 @@ def get_ava(assertion):
class TestSignedResponse():
-
def setup_class(self):
self.server = Server("idp_conf")
- sign_alg = Mock()
- sign_alg.return_value = ds.SIG_RSA_SHA512
- digest_alg = Mock()
- digest_alg.return_value = ds.DIGEST_SHA512
- self.restet_default = ds.DefaultSignature
- ds.DefaultSignature = MagicMock()
- ds.DefaultSignature().get_sign_alg = sign_alg
- ds.DefaultSignature().get_digest_alg = digest_alg
conf = config.SPConfig()
conf.load_file("server_conf")
self.client = client.Saml2Client(conf)
@@ -62,7 +53,6 @@ class TestSignedResponse():
"mail": ["derek@nyy.mlb.com"], "title": "The man"}
def teardown_class(self):
- ds.DefaultSignature = self.restet_default
self.server.close()
def verify_assertion(self, assertion):
@@ -76,7 +66,6 @@ class TestSignedResponse():
'surName': ['Jeter'], 'title': ['The man']}
def test_signed_response(self):
-
print(ds.DefaultSignature().get_digest_alg())
name_id = self.server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
@@ -96,11 +85,10 @@ class TestSignedResponse():
assert signed_resp
sresponse = response_from_string(signed_resp)
- assert ds.SIG_RSA_SHA512 in str(sresponse), "Not correctly signed!"
- assert ds.DIGEST_SHA512 in str(sresponse), "Not correctly signed!"
+ assert ds.SIG_RSA_SHA1 in str(sresponse), "Not correctly signed!"
+ assert ds.DIGEST_SHA1 in str(sresponse), "Not correctly signed!"
def test_signed_response_1(self):
-
signed_resp = self.server.create_authn_response(
self.ava,
"id12", # in_response_to
@@ -112,15 +100,15 @@ class TestSignedResponse():
)
sresponse = response_from_string(signed_resp)
- assert ds.SIG_RSA_SHA512 in str(sresponse), "Not correctly signed!"
- assert ds.DIGEST_SHA512 in str(sresponse), "Not correctly signed!"
+ assert ds.SIG_RSA_SHA1 in str(sresponse), "Not correctly signed!"
+ assert ds.DIGEST_SHA1 in str(sresponse), "Not correctly signed!"
valid = self.server.sec.verify_signature(signed_resp,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
node_id=sresponse.id)
assert valid
- assert ds.SIG_RSA_SHA512 in str(sresponse.assertion[0]), "Not correctly signed!"
- assert ds.DIGEST_SHA512 in str(sresponse.assertion[0]), "Not correctly signed!"
+ assert ds.SIG_RSA_SHA1 in str(sresponse.assertion[0]), "Not correctly signed!"
+ assert ds.DIGEST_SHA1 in str(sresponse.assertion[0]), "Not correctly signed!"
valid = self.server.sec.verify_signature(signed_resp,
self.server.config.cert_file,
node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
@@ -130,7 +118,6 @@ class TestSignedResponse():
self.verify_assertion(sresponse.assertion)
def test_signed_response_2(self):
-
signed_resp = self.server.create_authn_response(
self.ava,
"id12", # in_response_to
@@ -161,6 +148,7 @@ class TestSignedResponse():
self.verify_assertion(sresponse.assertion)
+
if __name__ == "__main__":
ts = TestSignedResponse()
ts.setup_class()