summaryrefslogtreecommitdiff
path: root/src/saml2/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/saml2/server.py')
-rw-r--r--src/saml2/server.py132
1 files changed, 73 insertions, 59 deletions
diff --git a/src/saml2/server.py b/src/saml2/server.py
index 519f6db1..68e04e27 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -384,17 +384,32 @@ class Server(Entity):
**kwargs)
return assertion
- def _authn_response(self, in_response_to, consumer_url,
- sp_entity_id, identity=None, name_id=None,
- status=None, authn=None, issuer=None, policy=None,
- sign_assertion=False, sign_response=False,
- best_effort=False, encrypt_assertion=False,
- encrypt_cert_advice=None, encrypt_cert_assertion=None,
- authn_statement=None,
- encrypt_assertion_self_contained=False,
- encrypted_advice_attributes=False,
- pefim=False, sign_alg=None, digest_alg=None,
- farg=None, session_not_on_or_after=None):
+ def _authn_response(
+ self,
+ in_response_to,
+ consumer_url,
+ sp_entity_id,
+ identity=None,
+ name_id=None,
+ status=None,
+ authn=None,
+ issuer=None,
+ policy=None,
+ sign_assertion=False,
+ sign_response=False,
+ best_effort=False,
+ encrypt_assertion=False,
+ encrypt_cert_advice=None,
+ encrypt_cert_assertion=None,
+ authn_statement=None,
+ encrypt_assertion_self_contained=False,
+ encrypted_advice_attributes=False,
+ pefim=False,
+ sign_alg=None,
+ digest_alg=None,
+ farg=None,
+ session_not_on_or_after=None,
+ ):
""" Create a response. A layer of indirection.
:param in_response_to: The session identifier of the request
@@ -524,8 +539,7 @@ class Server(Entity):
if not name_id and userid:
try:
- name_id = self.ident.construct_nameid(userid, policy,
- sp_entity_id)
+ name_id = self.ident.construct_nameid(userid, policy, sp_entity_id)
logger.warning("Unspecified NameID format")
except Exception:
pass
@@ -565,56 +579,53 @@ class Server(Entity):
if sp_entity_id:
kwargs['sp_entity_id'] = sp_entity_id
- return self._response(in_response_to, destination, status, issuer,
- sign_response, to_sign, sign_alg=sign_alg,
- digest_alg=digest_alg, **kwargs)
+ return self._response(
+ in_response_to,
+ destination,
+ status,
+ issuer,
+ sign_response,
+ to_sign,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ **kwargs,
+ )
+
+ def gather_authn_response_args(
+ self, sp_entity_id, name_id_policy, userid, **kwargs
+ ):
+ # collect args and return them
+ args = {}
- # ------------------------------------------------------------------------
+ args["policy"] = kwargs.get(
+ "release_policy", self.config.getattr("policy", "idp")
+ )
+ args['best_effort'] = kwargs.get("best_effort", False)
- def gather_authn_response_args(self, sp_entity_id, name_id_policy, userid,
- **kwargs):
- param_default = {
+ param_defaults = {
'sign_assertion': False,
'sign_response': False,
'encrypt_assertion': False,
'encrypt_assertion_self_contained': True,
'encrypted_advice_attributes': False,
'encrypt_cert_advice': None,
- 'encrypt_cert_assertion': None
+ 'encrypt_cert_assertion': None,
}
- args = {}
-
- try:
- args["policy"] = kwargs["release_policy"]
- except KeyError:
- args["policy"] = self.config.getattr("policy", "idp")
-
- try:
- args['best_effort'] = kwargs["best_effort"]
- except KeyError:
- args['best_effort'] = False
-
# signing and digest algs
self.signing_algorithm = self.config.getattr('signing_algorithm', "idp")
self.digest_algorithm = self.config.getattr('digest_algorithm', "idp")
-
- for param in ['sign_assertion', 'sign_response', 'encrypt_assertion',
- 'encrypt_assertion_self_contained',
- 'encrypted_advice_attributes', 'encrypt_cert_advice',
- 'encrypt_cert_assertion']:
- try:
- _val = kwargs[param]
- except KeyError:
- _val = None
-
- if _val is None:
- _val = self.config.getattr(param, "idp")
- if _val is None:
- args[param] = param_default[param]
- else:
- args[param] = _val
+ for param, val_default in param_defaults.items():
+ val_kw = kwargs.get(param)
+ val_config = self.config.getattr(param, "idp")
+ args[param] = (
+ val_kw
+ if val_kw is not None
+ else val_config
+ if val_config is not None
+ else val_default
+ )
for arg, attr, eca, pefim in [
('encrypted_advice_attributes', 'verify_encrypt_cert_advice',
@@ -698,7 +709,7 @@ class Server(Entity):
sign_alg=None,
digest_alg=None,
session_not_on_or_after=None,
- **kwargs
+ **kwargs,
):
""" Constructs an AuthenticationResponse
@@ -733,21 +744,24 @@ class Server(Entity):
try:
args = self.gather_authn_response_args(
- sp_entity_id, name_id_policy=name_id_policy, userid=userid,
- name_id=name_id, sign_response=sign_response,
+ sp_entity_id,
+ name_id_policy=name_id_policy,
+ userid=userid,
+ name_id=name_id,
+ sign_response=sign_response,
sign_assertion=sign_assertion,
encrypt_cert_advice=encrypt_cert_advice,
encrypt_cert_assertion=encrypt_cert_assertion,
encrypt_assertion=encrypt_assertion,
- encrypt_assertion_self_contained
- =encrypt_assertion_self_contained,
+ encrypt_assertion_self_contained=encrypt_assertion_self_contained,
encrypted_advice_attributes=encrypted_advice_attributes,
- pefim=pefim, **kwargs)
+ pefim=pefim,
+ **kwargs,
+ )
except IOError as exc:
- response = self.create_error_response(in_response_to,
- destination,
- sp_entity_id,
- exc, name_id)
+ response = self.create_error_response(
+ in_response_to, destination, sp_entity_id, exc, name_id
+ )
return ("%s" % response).split("\n")
try: