summaryrefslogtreecommitdiff
path: root/src/saml2/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/saml2/server.py')
-rw-r--r--src/saml2/server.py505
1 files changed, 326 insertions, 179 deletions
diff --git a/src/saml2/server.py b/src/saml2/server.py
index bcdbd2bb..9e34cce2 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -77,8 +77,15 @@ def _shelve_compat(name, *args, **kwargs):
class Server(Entity):
""" A class that does things that IdPs or AAs do """
- def __init__(self, config_file="", config=None, cache=None, stype="idp",
- symkey="", msg_cb=None):
+ def __init__(
+ self,
+ config_file="",
+ config=None,
+ cache=None,
+ stype="idp",
+ symkey="",
+ msg_cb=None,
+ ):
Entity.__init__(self, stype, config, config_file, msg_cb=msg_cb)
self.eptid = None
self.init_config(stype)
@@ -218,6 +225,7 @@ class Server(Entity):
return False
# -------------------------------------------------------------------------
+
def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT):
"""Parse a Authentication Request
@@ -320,10 +328,25 @@ class Server(Entity):
consumer_url])
return farg
- def setup_assertion(self, authn, sp_entity_id, in_response_to, consumer_url,
- name_id, policy, _issuer, authn_statement, identity,
- best_effort, sign_response, farg=None,
- session_not_on_or_after=None, **kwargs):
+ def setup_assertion(
+ self,
+ authn,
+ sp_entity_id,
+ in_response_to,
+ consumer_url,
+ name_id,
+ policy,
+ _issuer,
+ authn_statement,
+ identity,
+ best_effort,
+ sign_response,
+ farg=None,
+ session_not_on_or_after=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
"""
Construct and return the Assertion
@@ -352,8 +375,15 @@ class Server(Entity):
ast.apply_policy(sp_entity_id, policy)
except MissingValue as exc:
if not best_effort:
- return self.create_error_response(in_response_to, consumer_url,
- exc, sign_response)
+ response = self.create_error_response(
+ in_response_to,
+ destination=consumer_url,
+ info=exc,
+ sign=sign_response,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+ return str(response).split("\n")
farg = self.update_farg(in_response_to, consumer_url, farg)
@@ -384,17 +414,34 @@ class Server(Entity):
**kwargs)
return assertion
- def _authn_response(self, in_response_to, consumer_url,
- sp_entity_id, identity=None, name_id=None,
- status=None, authn=None, issuer=None, policy=None,
- sign_assertion=False, sign_response=False,
- best_effort=False, encrypt_assertion=False,
- encrypt_cert_advice=None, encrypt_cert_assertion=None,
- authn_statement=None,
- encrypt_assertion_self_contained=False,
- encrypted_advice_attributes=False,
- pefim=False, sign_alg=None, digest_alg=None,
- farg=None, session_not_on_or_after=None):
+ # XXX DONE calls pre_signature_part
+ # XXX calls _response
+ def _authn_response(
+ self,
+ in_response_to,
+ consumer_url,
+ sp_entity_id,
+ identity=None,
+ name_id=None,
+ status=None,
+ authn=None,
+ issuer=None,
+ policy=None,
+ sign_assertion=None,
+ sign_response=None,
+ best_effort=False,
+ encrypt_assertion=False,
+ encrypt_cert_advice=None,
+ encrypt_cert_assertion=None,
+ authn_statement=None,
+ encrypt_assertion_self_contained=False,
+ encrypted_advice_attributes=False,
+ pefim=False,
+ sign_alg=None,
+ digest_alg=None,
+ farg=None,
+ session_not_on_or_after=None,
+ ):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
@@ -423,7 +470,6 @@ class Server(Entity):
:param encrypt_cert_assertion: Certificate to be used for encryption
of assertions.
:param authn_statement: Authentication statement.
- :param sign_assertion: True if assertions should be signed.
:param pefim: True if a response according to the PEFIM profile
should be created.
:param farg: Argument to pass on to the assertion constructor
@@ -433,7 +479,6 @@ class Server(Entity):
if farg is None:
assertion_args = {}
- args = {}
# if identity:
_issuer = self._issuer(issuer)
@@ -471,37 +516,65 @@ class Server(Entity):
to_sign = []
if not encrypt_assertion:
if sign_assertion:
- assertion.signature = pre_signature_part(assertion.id,
- self.sec.my_cert, 2,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
+ # XXX self.signing_algorithm self.digest_algorithm defined by entity
+ # XXX this should be handled through entity.py
+ # XXX sig/digest-allowed should be configurable
+ sign_alg = sign_alg or self.signing_algorithm
+ digest_alg = digest_alg or self.digest_algorithm
+
+ assertion.signature = pre_signature_part(
+ assertion.id,
+ self.sec.my_cert,
+ 2,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
to_sign.append((class_name(assertion), assertion.id))
- args["assertion"] = assertion
-
if (self.support_AssertionIDRequest() or self.support_AuthnQuery()):
self.session_db.store_assertion(assertion, to_sign)
return self._response(
- in_response_to, consumer_url, status, issuer, sign_response,
- to_sign, sp_entity_id=sp_entity_id,
+ in_response_to,
+ consumer_url,
+ status,
+ issuer,
+ sign_response,
+ to_sign,
+ sp_entity_id=sp_entity_id,
encrypt_assertion=encrypt_assertion,
encrypt_cert_advice=encrypt_cert_advice,
encrypt_cert_assertion=encrypt_cert_assertion,
encrypt_assertion_self_contained=encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes,
sign_assertion=sign_assertion,
- pefim=pefim, sign_alg=sign_alg, digest_alg=digest_alg, **args)
+ pefim=pefim,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ assertion=assertion,
+ )
# ------------------------------------------------------------------------
- # noinspection PyUnusedLocal
- def create_attribute_response(self, identity, in_response_to, destination,
- sp_entity_id, userid="", name_id=None,
- status=None, issuer=None,
- sign_assertion=False, sign_response=False,
- attributes=None, sign_alg=None,
- digest_alg=None, farg=None, **kwargs):
+ # XXX DONE idp create > _response
+ def create_attribute_response(
+ self,
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ userid="",
+ name_id=None,
+ status=None,
+ issuer=None,
+ sign_assertion=None,
+ sign_response=None,
+ attributes=None,
+ sign_alg=None,
+ digest_alg=None,
+ farg=None,
+ **kwargs,
+ ):
""" Create an attribute assertion response.
:param identity: A dictionary with attributes and values that are
@@ -550,66 +623,52 @@ class Server(Entity):
issuer=_issuer, name_id=name_id,
farg=farg['assertion'])
- if sign_assertion:
- assertion.signature = pre_signature_part(assertion.id,
- self.sec.my_cert, 1,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
- # Just the assertion or the response and the assertion ?
- to_sign = [(class_name(assertion), assertion.id)]
- kwargs['sign_assertion'] = True
-
- kwargs["assertion"] = assertion
-
- if sp_entity_id:
- kwargs['sp_entity_id'] = sp_entity_id
-
- return self._response(in_response_to, destination, status, issuer,
- sign_response, to_sign, sign_alg=sign_alg,
- digest_alg=digest_alg, **kwargs)
+ return self._response(
+ in_response_to,
+ destination,
+ status,
+ issuer,
+ sign_response,
+ to_sign,
+ sign_assertion=sign_assertion,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ assertion=assertion,
+ sp_entity_id=sp_entity_id,
+ **kwargs,
+ )
+
+ def gather_authn_response_args(
+ self, sp_entity_id, name_id_policy, userid, **kwargs
+ ):
+ kwargs["policy"] = kwargs.get("release_policy")
- # ------------------------------------------------------------------------
+ # collect args and return them
+ args = {}
- def gather_authn_response_args(self, sp_entity_id, name_id_policy, userid,
- **kwargs):
- param_default = {
+ # XXX will be passed to _authn_response
+ param_defaults = {
+ 'policy': None,
+ 'best_effort': False,
'sign_assertion': False,
'sign_response': False,
'encrypt_assertion': False,
'encrypt_assertion_self_contained': True,
'encrypted_advice_attributes': False,
'encrypt_cert_advice': None,
- 'encrypt_cert_assertion': None
+ 'encrypt_cert_assertion': None,
+ # need to be named sign_alg and digest_alg
}
-
- args = {}
-
- try:
- args["policy"] = kwargs["release_policy"]
- except KeyError:
- args["policy"] = self.config.getattr("policy", "idp")
-
- try:
- args['best_effort'] = kwargs["best_effort"]
- except KeyError:
- args['best_effort'] = False
-
- for param in ['sign_assertion', 'sign_response', 'encrypt_assertion',
- 'encrypt_assertion_self_contained',
- 'encrypted_advice_attributes', 'encrypt_cert_advice',
- 'encrypt_cert_assertion']:
- try:
- _val = kwargs[param]
- except KeyError:
- _val = None
-
- if _val is None:
- _val = self.config.getattr(param, "idp")
-
- if _val is None:
- args[param] = param_default[param]
- else:
- args[param] = _val
+ for param, val_default in param_defaults.items():
+ val_kw = kwargs.get(param)
+ val_config = self.config.getattr(param, "idp")
+ args[param] = (
+ val_kw
+ if val_kw is not None
+ else val_config
+ if val_config is not None
+ else val_default
+ )
for arg, attr, eca, pefim in [
('encrypted_advice_attributes', 'verify_encrypt_cert_advice',
@@ -671,6 +730,7 @@ class Server(Entity):
return args
+ # XXX DONE idp create > _authn_response > _response
def create_authn_response(
self,
identity,
@@ -693,7 +753,7 @@ class Server(Entity):
sign_alg=None,
digest_alg=None,
session_not_on_or_after=None,
- **kwargs
+ **kwargs,
):
""" Constructs an AuthenticationResponse
@@ -720,7 +780,6 @@ class Server(Entity):
assertions in the advice element.
:param encrypt_cert_assertion: Certificate to be used for encryption
of assertions.
- :param sign_assertion: True if assertions should be signed.
:param pefim: True if a response according to the PEFIM profile
should be created.
:return: A response instance
@@ -728,70 +787,99 @@ class Server(Entity):
try:
args = self.gather_authn_response_args(
- sp_entity_id, name_id_policy=name_id_policy, userid=userid,
- name_id=name_id, sign_response=sign_response,
+ sp_entity_id,
+ name_id_policy=name_id_policy,
+ userid=userid,
+ name_id=name_id,
+ sign_response=sign_response,
sign_assertion=sign_assertion,
encrypt_cert_advice=encrypt_cert_advice,
encrypt_cert_assertion=encrypt_cert_assertion,
encrypt_assertion=encrypt_assertion,
- encrypt_assertion_self_contained
- =encrypt_assertion_self_contained,
+ encrypt_assertion_self_contained=encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes,
- pefim=pefim, **kwargs)
+ pefim=pefim,
+ **kwargs,
+ )
except IOError as exc:
- response = self.create_error_response(in_response_to,
- destination,
- sp_entity_id,
- exc, name_id)
- return ("%s" % response).split("\n")
+ response = self.create_error_response(
+ in_response_to,
+ destination=destination,
+ info=exc,
+ sign=sign_response,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+ return str(response).split("\n")
try:
_authn = authn
- if (sign_assertion or sign_response) and \
- self.sec.cert_handler.generate_cert():
- with self.lock:
- self.sec.cert_handler.update_cert(True)
- return self._authn_response(
- in_response_to, destination, sp_entity_id, identity,
- authn=_authn, issuer=issuer, pefim=pefim,
- sign_alg=sign_alg, digest_alg=digest_alg,
- session_not_on_or_after=session_not_on_or_after, **args)
return self._authn_response(
- in_response_to, destination, sp_entity_id, identity,
- authn=_authn, issuer=issuer, pefim=pefim, sign_alg=sign_alg,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ identity,
+ authn=_authn,
+ issuer=issuer,
+ pefim=pefim,
+ sign_alg=sign_alg,
digest_alg=digest_alg,
- session_not_on_or_after=session_not_on_or_after, **args)
-
+ session_not_on_or_after=session_not_on_or_after,
+ **args,
+ )
except MissingValue as exc:
- return self.create_error_response(in_response_to, destination,
- sp_entity_id, exc, name_id)
-
- def create_authn_request_response(self, identity, in_response_to,
- destination, sp_entity_id,
- name_id_policy=None, userid=None,
- name_id=None, authn=None, authn_decl=None,
- issuer=None, sign_response=False,
- sign_assertion=False,
- session_not_on_or_after=None, **kwargs):
-
- return self.create_authn_response(identity, in_response_to, destination,
- sp_entity_id, name_id_policy, userid,
- name_id, authn, issuer,
- sign_response, sign_assertion,
- authn_decl=authn_decl,
- session_not_on_or_after=session_not_on_or_after)
-
- # noinspection PyUnusedLocal
- def create_assertion_id_request_response(self, assertion_id, sign=False,
- sign_alg=None,
- digest_alg=None, **kwargs):
- """
-
- :param assertion_id:
- :param sign:
- :return:
- """
+ return self.create_error_response(
+ in_response_to,
+ destination=destination,
+ info=exc,
+ sign=sign_response,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+ # XXX DONE idp create > create_authn_response > _authn_response > _response
+ def create_authn_request_response(
+ self,
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy=None,
+ userid=None,
+ name_id=None,
+ authn=None,
+ authn_decl=None,
+ issuer=None,
+ sign_response=None,
+ sign_assertion=None,
+ session_not_on_or_after=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
+ return self.create_authn_response(
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy,
+ userid,
+ name_id,
+ authn,
+ issuer,
+ sign_response,
+ sign_assertion,
+ authn_decl=authn_decl,
+ session_not_on_or_after=session_not_on_or_after,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+
+ # XXX DONE calls pre_signature_part
+ # XXX DONE idp create > [...]
+ def create_assertion_id_request_response(
+ self, assertion_id, sign=None, sign_alg=None, digest_alg=None, **kwargs
+ ):
try:
(assertion, to_sign) = self.session_db.get_assertion(assertion_id)
except KeyError:
@@ -799,21 +887,38 @@ class Server(Entity):
if to_sign:
if assertion.signature is None:
- assertion.signature = pre_signature_part(assertion.id,
- self.sec.my_cert, 1,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
-
+ # XXX self.signing_algorithm self.digest_algorithm defined by entity
+ # XXX this should be handled through entity.py
+ # XXX sig/digest-allowed should be configurable
+ sign_alg = sign_alg or self.signing_algorithm
+ digest_alg = digest_alg or self.digest_algorithm
+
+ assertion.signature = pre_signature_part(
+ assertion.id,
+ self.sec.my_cert,
+ 1,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
return signed_instance_factory(assertion, self.sec, to_sign)
else:
return assertion
- # noinspection PyUnusedLocal
- def create_name_id_mapping_response(self, name_id=None, encrypted_id=None,
- in_response_to=None,
- issuer=None, sign_response=False,
- status=None, sign_alg=None,
- digest_alg=None, **kwargs):
+ # XXX calls self.sign without ensuring sign
+ # XXX calls self.sign => should it call _message (which calls self.sign)?
+ # XXX idp create > NameIDMappingResponse & sign?
+ def create_name_id_mapping_response(
+ self,
+ name_id=None,
+ encrypted_id=None,
+ in_response_to=None,
+ issuer=None,
+ sign_response=None,
+ status=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
"""
protocol for mapping a principal's name identifier into a
different name identifier for the same principal.
@@ -831,8 +936,9 @@ class Server(Entity):
ms_args = self.message_args()
- _resp = NameIDMappingResponse(name_id, encrypted_id,
- in_response_to=in_response_to, **ms_args)
+ _resp = NameIDMappingResponse(
+ name_id, encrypted_id, in_response_to=in_response_to, **ms_args
+ )
if sign_response:
return self.sign(_resp, sign_alg=sign_alg, digest_alg=digest_alg)
@@ -840,11 +946,20 @@ class Server(Entity):
logger.info("Message: %s", _resp)
return _resp
- def create_authn_query_response(self, subject, session_index=None,
- requested_context=None, in_response_to=None,
- issuer=None, sign_response=False,
- status=None, sign_alg=None, digest_alg=None,
- **kwargs):
+ # XXX DONE idp create > _response
+ def create_authn_query_response(
+ self,
+ subject,
+ session_index=None,
+ requested_context=None,
+ in_response_to=None,
+ issuer=None,
+ sign_response=None,
+ status=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
"""
A successful <Response> will contain one or more assertions containing
authentication statements.
@@ -853,32 +968,54 @@ class Server(Entity):
"""
margs = self.message_args()
- asserts = []
- for statement in self.session_db.get_authn_statements(
- subject.name_id, session_index, requested_context):
- asserts.append(saml.Assertion(authn_statement=statement,
- subject=subject, **margs))
+ asserts = [
+ saml.Assertion(authn_statement=statement, subject=subject, **margs)
+ for statement in self.session_db.get_authn_statements(
+ subject.name_id, session_index, requested_context
+ )
+ ]
if asserts:
args = {"assertion": asserts}
else:
args = {}
- return self._response(in_response_to, "", status, issuer,
- sign_response, to_sign=[], sign_alg=sign_alg,
- digest_alg=digest_alg, **args)
+ return self._response(
+ in_response_to,
+ "",
+ status,
+ issuer,
+ sign_response,
+ to_sign=[],
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **args,
+ )
# ---------
def parse_ecp_authn_request(self):
pass
- def create_ecp_authn_request_response(self, acs_url, identity,
- in_response_to, destination,
- sp_entity_id, name_id_policy=None,
- userid=None, name_id=None, authn=None,
- issuer=None, sign_response=False,
- sign_assertion=False, **kwargs):
+ # XXX DONE idp create > create_authn_response > _authn_response > _response
+ def create_ecp_authn_request_response(
+ self,
+ acs_url,
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy=None,
+ userid=None,
+ name_id=None,
+ authn=None,
+ issuer=None,
+ sign_response=None,
+ sign_assertion=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
# ----------------------------------------
# <ecp:Response
@@ -892,17 +1029,27 @@ class Server(Entity):
# <samlp:Response
# ----------------------------------------
- response = self.create_authn_response(identity, in_response_to,
- destination, sp_entity_id,
- name_id_policy, userid, name_id,
- authn, issuer,
- sign_response, sign_assertion)
+ response = self.create_authn_response(
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy,
+ userid,
+ name_id,
+ authn,
+ issuer,
+ sign_response,
+ sign_assertion,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg
+ )
body = soapenv.Body()
body.extension_elements = [element_to_extension_element(response)]
soap_envelope = soapenv.Envelope(header=header, body=body)
- return "%s" % soap_envelope
+ return str(soap_envelope)
def close(self):
self.ident.close()