summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2020-11-23 14:57:57 +0200
committerIvan Kanakarakis <ivan.kanak@gmail.com>2020-11-23 14:58:15 +0200
commit2f756bad04da7358449a78ae75804f08fb6d25e0 (patch)
tree2a1251b653ab8385115c1f67719dad5eaed09f2c
parentfc42b2a23516737a7bbbe396985f31d3c22fd46a (diff)
parent1994002191811831c6575971b2ec2b23e1806e2d (diff)
downloadpysaml2-2f756bad04da7358449a78ae75804f08fb6d25e0.tar.gz
Merge branch 'fix-signed-authnreq-w-redirect-binding'
-rw-r--r--src/saml2/client.py229
-rw-r--r--src/saml2/client_base.py62
-rw-r--r--src/saml2/ecp_client.py42
-rw-r--r--src/saml2/entity.py114
-rw-r--r--src/saml2/httpbase.py59
-rw-r--r--src/saml2/pack.py30
-rw-r--r--src/saml2/server.py36
-rw-r--r--src/saml2/sigver.py16
-rw-r--r--tests/test_51_client.py67
-rw-r--r--tests/test_70_redirect_signing.py14
10 files changed, 419 insertions, 250 deletions
diff --git a/src/saml2/client.py b/src/saml2/client.py
index 11c737c1..9809150e 100644
--- a/src/saml2/client.py
+++ b/src/saml2/client.py
@@ -14,7 +14,7 @@ from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_SOAP
-import saml2.xmldsig as ds
+from saml2.xmldsig import DefaultSignature
from saml2.ident import decode, code
from saml2.httpbase import HTTPError
@@ -40,10 +40,19 @@ class Saml2Client(Base):
""" The basic pySAML2 service provider class """
def prepare_for_authenticate(
- self, entityid=None, relay_state="",
- binding=saml2.BINDING_HTTP_REDIRECT, vorg="", nameid_format=None,
- scoping=None, consent=None, extensions=None, sign=None,
- response_binding=saml2.BINDING_HTTP_POST, **kwargs):
+ self,
+ entityid=None,
+ relay_state="",
+ binding=saml2.BINDING_HTTP_REDIRECT,
+ vorg="",
+ nameid_format=None,
+ scoping=None,
+ consent=None, extensions=None,
+ sign=None,
+ sigalg=None,
+ response_binding=saml2.BINDING_HTTP_POST,
+ **kwargs,
+ ):
""" Makes all necessary preparations for an authentication request.
:param entityid: The entity ID of the IdP to send the request to
@@ -61,19 +70,20 @@ class Saml2Client(Base):
:return: session id and AuthnRequest info
"""
- reqid, negotiated_binding, info = \
- self.prepare_for_negotiated_authenticate(
- entityid=entityid,
- relay_state=relay_state,
- binding=binding,
- vorg=vorg,
- nameid_format=nameid_format,
- scoping=scoping,
- consent=consent,
- extensions=extensions,
- sign=sign,
- response_binding=response_binding,
- **kwargs)
+ reqid, negotiated_binding, info = self.prepare_for_negotiated_authenticate(
+ entityid=entityid,
+ relay_state=relay_state,
+ binding=binding,
+ vorg=vorg,
+ nameid_format=nameid_format,
+ scoping=scoping,
+ consent=consent,
+ extensions=extensions,
+ sign=sign,
+ sigalg=sigalg,
+ response_binding=response_binding,
+ **kwargs,
+ )
if negotiated_binding != binding:
raise ValueError(
@@ -85,9 +95,20 @@ class Saml2Client(Base):
return reqid, info
def prepare_for_negotiated_authenticate(
- self, entityid=None, relay_state="", binding=None, vorg="",
- nameid_format=None, scoping=None, consent=None, extensions=None,
- sign=None, response_binding=saml2.BINDING_HTTP_POST, **kwargs):
+ self,
+ entityid=None,
+ relay_state="",
+ binding=None,
+ vorg="",
+ nameid_format=None,
+ scoping=None,
+ consent=None,
+ extensions=None,
+ sign=None,
+ response_binding=saml2.BINDING_HTTP_POST,
+ sigalg=None,
+ **kwargs,
+ ):
""" Makes all necessary preparations for an authentication request
that negotiates which binding to use for authentication.
@@ -115,27 +136,41 @@ class Saml2Client(Base):
destination = self._sso_location(entityid, binding)
logger.info("destination to provider: %s", destination)
+ # XXX - sign_post will embed the signature to the xml doc
+ # XXX ^through self.create_authn_request(...)
+ # XXX - sign_redirect will add the signature to the query params
+ # XXX ^through self.apply_binding(...)
+ sign_post = (binding == BINDING_HTTP_POST and sign)
+ sign_redirect = (binding == BINDING_HTTP_REDIRECT and sign)
+
reqid, request = self.create_authn_request(
- destination, vorg, scoping, response_binding, nameid_format,
- consent=consent, extensions=extensions, sign=sign,
- **kwargs)
+ destination,
+ vorg,
+ scoping,
+ response_binding,
+ nameid_format,
+ consent=consent,
+ extensions=extensions,
+ sign=sign_post,
+ sign_alg=sigalg,
+ **kwargs,
+ )
_req_str = str(request)
-
logger.info("AuthNReq: %s", _req_str)
- try:
- args = {'sigalg': kwargs["sigalg"]}
- except KeyError:
- args = {}
-
- http_info = self.apply_binding(binding, _req_str, destination,
- relay_state, sign=sign, **args)
+ http_info = self.apply_binding(
+ binding,
+ _req_str,
+ destination,
+ relay_state,
+ sign=sign_redirect,
+ sigalg=sigalg,
+ )
return reqid, binding, http_info
else:
- raise SignOnError(
- "No supported bindings available for authentication")
+ raise SignOnError("No supported bindings available for authentication")
def global_logout(self, name_id, reason="", expire=None, sign=None,
sign_alg=None, digest_alg=None):
@@ -194,14 +229,13 @@ class Saml2Client(Base):
for entity_id in entity_ids:
logger.debug("Logout from '%s'", entity_id)
# for all where I can use the SOAP binding, do those first
- for binding in [BINDING_SOAP, BINDING_HTTP_POST,
- BINDING_HTTP_REDIRECT]:
+ for binding in [BINDING_SOAP, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT]:
if expected_binding and binding != expected_binding:
continue
try:
- srvs = self.metadata.single_logout_service(entity_id,
- binding,
- "idpsso")
+ srvs = self.metadata.single_logout_service(
+ entity_id, binding, "idpsso"
+ )
except:
srvs = None
@@ -212,9 +246,9 @@ class Saml2Client(Base):
destination = next(locations(srvs), None)
logger.info("destination to provider: %s", destination)
try:
- session_info = self.users.get_info_from(name_id,
- entity_id,
- False)
+ session_info = self.users.get_info_from(
+ name_id, entity_id, False
+ )
session_indexes = [session_info['session_index']]
except KeyError:
session_indexes = None
@@ -222,53 +256,56 @@ class Saml2Client(Base):
destination, entity_id, name_id=name_id, reason=reason,
expire=expire, session_indexes=session_indexes)
- # to_sign = []
- if binding.startswith("http://"):
- sign = True
-
- if sign is None:
- sign = self.logout_requests_signed
+ sign = sign if sign is not None else self.logout_requests_signed
+ def_sig = DefaultSignature()
+ sign_alg = def_sig.get_sign_alg() if sign_alg is None else sign_alg
+ digest_alg = (
+ def_sig.get_digest_alg()
+ if digest_alg is None
+ else digest_alg
+ )
- sigalg = None
if sign:
if binding == BINDING_HTTP_REDIRECT:
- sigalg = kwargs.get(
- "sigalg", ds.DefaultSignature().get_sign_alg())
- # key = kwargs.get("key", self.signkey)
srequest = str(request)
else:
- srequest = self.sign(request, sign_alg=sign_alg,
- digest_alg=digest_alg)
+ srequest = self.sign(
+ request, sign_alg=sign_alg, digest_alg=digest_alg
+ )
else:
srequest = str(request)
relay_state = self._relay_state(req_id)
- http_info = self.apply_binding(binding, srequest, destination,
- relay_state, sign=sign, sigalg=sigalg)
+ http_info = self.apply_binding(
+ binding,
+ srequest,
+ destination,
+ relay_state,
+ sign=sign,
+ sigalg=sign_alg,
+ )
if binding == BINDING_SOAP:
response = self.send(**http_info)
-
if response and response.status_code == 200:
not_done.remove(entity_id)
response = response.text
logger.info("Response: %s", response)
- res = self.parse_logout_request_response(response,
- binding)
+ res = self.parse_logout_request_response(response, binding)
responses[entity_id] = res
else:
logger.info("NOT OK response from %s", destination)
-
else:
- self.state[req_id] = {"entity_id": entity_id,
- "operation": "SLO",
- "entity_ids": entity_ids,
- "name_id": code(name_id),
- "reason": reason,
- "not_on_or_after": expire,
- "sign": sign}
-
+ self.state[req_id] = {
+ "entity_id": entity_id,
+ "operation": "SLO",
+ "entity_ids": entity_ids,
+ "name_id": code(name_id),
+ "reason": reason,
+ "not_on_or_after": expire,
+ "sign": sign,
+ }
responses[entity_id] = (binding, http_info)
not_done.remove(entity_id)
@@ -419,11 +456,22 @@ class Saml2Client(Base):
return None
- def do_attribute_query(self, entityid, subject_id,
- attribute=None, sp_name_qualifier=None,
- name_qualifier=None, nameid_format=None,
- real_id=None, consent=None, extensions=None,
- sign=False, binding=BINDING_SOAP, nsprefix=None):
+ def do_attribute_query(
+ self,
+ entityid,
+ subject_id,
+ attribute=None,
+ sp_name_qualifier=None,
+ name_qualifier=None,
+ nameid_format=None,
+ real_id=None,
+ consent=None,
+ extensions=None,
+ sign=False,
+ binding=BINDING_SOAP,
+ nsprefix=None,
+ sign_alg=None,
+ ):
""" Does a attribute request to an attribute authority, this is
by default done over SOAP.
@@ -482,13 +530,20 @@ class Saml2Client(Base):
"subject_id": subject_id,
"sign": sign}
relay_state = self._relay_state(query.id)
- return self.apply_binding(binding, "%s" % query, destination,
- relay_state, sign=sign)
+ return self.apply_binding(
+ binding,
+ str(query),
+ destination,
+ relay_state,
+ sign=sign,
+ sigalg=sign_alg,
+ )
else:
raise SAMLError("Unsupported binding")
- def handle_logout_request(self, request, name_id, binding, sign=None,
- sign_alg=None, relay_state=""):
+ def handle_logout_request(
+ self, request, name_id, binding, sign=None, sign_alg=None, relay_state=""
+ ):
"""
Deal with a LogoutRequest
@@ -531,16 +586,22 @@ class Saml2Client(Base):
elif binding in [BINDING_HTTP_POST, BINDING_HTTP_REDIRECT]:
response_bindings = [BINDING_HTTP_POST, BINDING_HTTP_REDIRECT]
else:
- response_bindings = self.config.preferred_binding[
- "single_logout_service"]
+ response_bindings = self.config.preferred_binding["single_logout_service"]
if sign is None:
sign = self.logout_responses_signed
- response = self.create_logout_response(_req.message, response_bindings,
- status, sign, sign_alg=sign_alg)
+ response = self.create_logout_response(
+ _req.message, response_bindings, status, sign, sign_alg=sign_alg
+ )
rinfo = self.response_args(_req.message, response_bindings)
- return self.apply_binding(rinfo["binding"], response,
- rinfo["destination"], relay_state,
- response=True, sign=sign)
+ return self.apply_binding(
+ rinfo["binding"],
+ response,
+ rinfo["destination"],
+ relay_state,
+ response=True,
+ sign=sign,
+ sigalg=sign_alg,
+ )
diff --git a/src/saml2/client_base.py b/src/saml2/client_base.py
index 41df6585..889c4359 100644
--- a/src/saml2/client_base.py
+++ b/src/saml2/client_base.py
@@ -54,6 +54,9 @@ from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_PAOS
+from saml2.xmldsig import SIG_ALLOWED_ALG
+from saml2.xmldsig import DIGEST_ALLOWED_ALG
+from saml2.xmldsig import DefaultSignature
logger = logging.getLogger(__name__)
@@ -281,13 +284,25 @@ class Base(Entity):
else:
return None
- def create_authn_request(self, destination, vorg="", scoping=None,
- binding=saml2.BINDING_HTTP_POST,
- nameid_format=None,
- service_url_binding=None, message_id=0,
- consent=None, extensions=None, sign=None,
- allow_create=None, sign_prepare=False, sign_alg=None,
- digest_alg=None, requested_attributes=None, **kwargs):
+ def create_authn_request(
+ self,
+ destination,
+ vorg="",
+ scoping=None,
+ binding=BINDING_HTTP_POST,
+ nameid_format=None,
+ service_url_binding=None,
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=None,
+ sign_prepare=False,
+ sign_alg=None,
+ digest_alg=None,
+ allow_create=None,
+ requested_attributes=None,
+ **kwargs,
+ ):
""" Creates an authentication request.
:param destination: Where the request should be sent.
@@ -302,6 +317,8 @@ class Base(Entity):
:param extensions: Possible extensions
:param sign: Whether the request should be signed or not.
:param sign_prepare: Whether the signature should be prepared or not.
+ :param sign_alg: The request signature algorithm
+ :param digest_alg: The request digest algorithm
:param allow_create: If the identity provider is allowed, in the course
of fulfilling the request, to create a new identifier to represent
the principal.
@@ -430,7 +447,22 @@ class Base(Entity):
client_crt = kwargs.get("client_crt")
nsprefix = kwargs.get("nsprefix")
+
+ # XXX will be used to embed the signature to the xml doc - ie, POST binding
+ # XXX always called by the SP, no need to check the context
sign = self.authn_requests_signed if sign is None else sign
+ def_sig = DefaultSignature()
+ sign_alg = sign_alg or def_sig.get_sign_alg()
+ digest_alg = digest_alg or def_sig.get_digest_alg()
+
+ if sign_alg not in [long_name for short_name, long_name in SIG_ALLOWED_ALG]:
+ raise Exception(
+ "Signature algo not in allowed list: {algo}".format(algo=sign_alg)
+ )
+ if digest_alg not in [long_name for short_name, long_name in DIGEST_ALLOWED_ALG]:
+ raise Exception(
+ "Digest algo not in allowed list: {algo}".format(algo=digest_alg)
+ )
if (sign and self.sec.cert_handler.generate_cert()) or client_crt is not None:
with self.lock:
@@ -445,11 +477,11 @@ class Base(Entity):
extensions,
sign,
sign_prepare,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
protocol_binding=binding,
scoping=scoping,
nsprefix=nsprefix,
- sign_alg=sign_alg,
- digest_alg=digest_alg,
**args,
)
else:
@@ -461,11 +493,11 @@ class Base(Entity):
extensions,
sign,
sign_prepare,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
protocol_binding=binding,
scoping=scoping,
nsprefix=nsprefix,
- sign_alg=sign_alg,
- digest_alg=digest_alg,
**args,
)
@@ -843,10 +875,12 @@ class Base(Entity):
# The IDP publishes support for ECP by using the SOAP binding on
# SingleSignOnService
- _, location = self.pick_binding("single_sign_on_service",
- [_binding], entity_id=entityid)
+ _, location = self.pick_binding(
+ "single_sign_on_service", [_binding], entity_id=entityid
+ )
req_id, authn_req = self.create_authn_request(
- location, service_url_binding=BINDING_PAOS, **kwargs)
+ location, service_url_binding=BINDING_PAOS, **kwargs
+ )
# ----------------------------------------
# The SOAP envelope
diff --git a/src/saml2/ecp_client.py b/src/saml2/ecp_client.py
index 5265f99d..94cfe135 100644
--- a/src/saml2/ecp_client.py
+++ b/src/saml2/ecp_client.py
@@ -91,8 +91,16 @@ class Client(Entity):
self.done_ecp = False
self.cookie_jar = cookielib.LWPCookieJar()
- def phase2(self, authn_request, rc_url, idp_entity_id, headers=None,
- sign=False, **kwargs):
+ def phase2(
+ self,
+ authn_request,
+ rc_url,
+ idp_entity_id,
+ headers=None,
+ sign=False,
+ sign_alg=None,
+ **kwargs,
+ ):
"""
Doing the second phase of the ECP conversation, the conversation
with the IdP happens.
@@ -105,12 +113,13 @@ class Client(Entity):
:return: The response from the IdP
"""
- _, destination = self.pick_binding("single_sign_on_service",
- [BINDING_SOAP], "idpsso",
- entity_id=idp_entity_id)
+ _, destination = self.pick_binding(
+ "single_sign_on_service", [BINDING_SOAP], "idpsso", entity_id=idp_entity_id
+ )
- ht_args = self.apply_binding(BINDING_SOAP, authn_request, destination,
- sign=sign)
+ ht_args = self.apply_binding(
+ BINDING_SOAP, authn_request, destination, sign=sign, sigalg=sign_alg
+ )
if headers:
ht_args["headers"].extend(headers)
@@ -124,8 +133,10 @@ class Client(Entity):
if response.status_code != 200:
raise SAMLError(
- "Request to IdP failed (%s): %s" % (response.status_code,
- response.text))
+ "Request to IdP failed ({status}): {text}".format(
+ status=response.status_code, text=response.text
+ )
+ )
# SAMLP response in a SOAP envelope body, ecp response in headers
respdict = self.parse_soap_message(response.text)
@@ -195,8 +206,11 @@ class Client(Entity):
_rc_url = _paos_request.response_consumer_url
- return {"authn_request": authn_request, "rc_url": _rc_url,
- "relay_state": _relay_state}
+ return {
+ "authn_request": authn_request,
+ "rc_url": _rc_url,
+ "relay_state": _relay_state,
+ }
def ecp_conversation(self, respdict, idp_entity_id=None):
"""
@@ -218,8 +232,7 @@ class Client(Entity):
# Phase 3 - back to the SP
# **********************************
- ht_args = self.use_soap(idp_response, args["rc_url"],
- [args["relay_state"]])
+ ht_args = self.use_soap(idp_response, args["rc_url"], [args["relay_state"]])
ht_args["headers"][0] = ('Content-Type', MIME_PAOS)
logger.debug("[P3] Post to SP: %s", ht_args["data"])
@@ -231,8 +244,7 @@ class Client(Entity):
# url I started off with.
pass
else:
- raise SAMLError(
- "Error POSTing package to SP: %s" % response.text)
+ raise SAMLError("Error POSTing package to SP: %s" % response.text)
logger.debug("[P3] SP response: %s", response.text)
diff --git a/src/saml2/entity.py b/src/saml2/entity.py
index 672ad6f7..fdea5a74 100644
--- a/src/saml2/entity.py
+++ b/src/saml2/entity.py
@@ -71,6 +71,11 @@ from saml2.sigver import pre_signature_part
from saml2.sigver import pre_encrypt_assertion
from saml2.sigver import signed_instance_factory
from saml2.virtual_org import VirtualOrg
+from saml2.pack import http_redirect_message
+from saml2.pack import http_form_post_message
+
+from saml2.xmldsig import DefaultSignature
+
logger = logging.getLogger(__name__)
@@ -190,8 +195,17 @@ class Entity(HTTPBase):
return Issuer(text=self.config.entityid,
format=NAMEID_FORMAT_ENTITY)
- def apply_binding(self, binding, msg_str, destination="", relay_state="",
- response=False, sign=False, **kwargs):
+ def apply_binding(
+ self,
+ binding,
+ msg_str,
+ destination="",
+ relay_state="",
+ response=False,
+ sign=None,
+ sigalg=None,
+ **kwargs,
+ ):
"""
Construct the necessary HTTP arguments dependent on Binding
@@ -204,6 +218,22 @@ class Entity(HTTPBase):
:param kwargs: response type specific arguments
:return: A dictionary
"""
+
+ # XXX authn_requests_signed (obj property) applies only to an SP
+ # XXX sign_response (config option) applies to idp/aa
+ # XXX Looking into sp/idp/aa properties should be done in the same way
+ # XXX ^this discrepancy should be fixed
+ sign_config = (
+ self.authn_requests_signed
+ if self.config.context == "sp"
+ else self.config.getattr("sign_response")
+ if self.config.context == "idp"
+ else None
+ )
+ sign = sign_config if sign is None else sign
+ def_sig = DefaultSignature()
+ sigalg = sigalg or def_sig.get_sign_alg()
+
# unless if BINDING_HTTP_ARTIFACT
if response:
typ = "SAMLResponse"
@@ -212,29 +242,27 @@ class Entity(HTTPBase):
if binding == BINDING_HTTP_POST:
logger.info("HTTP POST")
- # if self.entity_type == 'sp':
- # info = self.use_http_post(msg_str, destination, relay_state,
- # typ)
- # info["url"] = destination
- # info["method"] = "POST"
- # else:
- info = self.use_http_form_post(msg_str, destination,
- relay_state, typ)
+ info = http_form_post_message(msg_str, destination, relay_state, typ)
+ (msg_str, destination, relay_state, typ)
info["url"] = destination
info["method"] = "POST"
elif binding == BINDING_HTTP_REDIRECT:
logger.info("HTTP REDIRECT")
- sigalg = kwargs.get("sigalg")
- if sign and sigalg:
- signer = self.sec.sec_backend.get_signer(sigalg)
- else:
- signer = None
- info = self.use_http_get(msg_str, destination, relay_state, typ,
- signer=signer, **kwargs)
+ info = http_redirect_message(
+ message=msg_str,
+ location=destination,
+ relay_state=relay_state,
+ typ=typ,
+ sign=sign,
+ sigalg=sigalg,
+ backend=self.sec.sec_backend,
+ )
info["url"] = str(destination)
info["method"] = "GET"
elif binding == BINDING_SOAP or binding == BINDING_PAOS:
- info = self.use_soap(msg_str, destination, sign=sign, **kwargs)
+ info = self.use_soap(
+ msg_str, destination, sign=sign, sigalg=sigalg, **kwargs
+ )
elif binding == BINDING_URI:
info = self.use_http_uri(msg_str, typ, destination)
elif binding == BINDING_HTTP_ARTIFACT:
@@ -416,12 +444,19 @@ class Entity(HTTPBase):
# --------------------------------------------------------------------------
- def sign(self, msg, mid=None, to_sign=None, sign_prepare=False,
- sign_alg=None, digest_alg=None):
+ def sign(
+ self,
+ msg,
+ mid=None,
+ to_sign=None,
+ sign_prepare=False,
+ sign_alg=None,
+ digest_alg=None,
+ ):
if msg.signature is None:
- msg.signature = pre_signature_part(msg.id, self.sec.my_cert, 1,
- sign_alg=sign_alg,
- digest_alg=digest_alg)
+ msg.signature = pre_signature_part(
+ msg.id, self.sec.my_cert, 1, sign_alg=sign_alg, digest_alg=digest_alg
+ )
if sign_prepare:
return msg
@@ -437,9 +472,20 @@ class Entity(HTTPBase):
logger.info("REQUEST: %s", msg)
return signed_instance_factory(msg, self.sec, to_sign)
- def _message(self, request_cls, destination=None, message_id=0,
- consent=None, extensions=None, sign=False, sign_prepare=False,
- nsprefix=None, sign_alg=None, digest_alg=None, **kwargs):
+ def _message(
+ self,
+ request_cls,
+ destination=None,
+ message_id=0,
+ consent=None,
+ extensions=None,
+ sign=False,
+ sign_prepare=False,
+ nsprefix=None,
+ sign_alg=None,
+ digest_alg=None,
+ **kwargs,
+ ):
"""
Some parameters appear in all requests so simplify by doing
it in one place
@@ -480,13 +526,17 @@ class Entity(HTTPBase):
req = self.msg_cb(req)
reqid = req.id
-
if sign:
- return reqid, self.sign(req, sign_prepare=sign_prepare,
- sign_alg=sign_alg, digest_alg=digest_alg)
- else:
- logger.info("REQUEST: %s", req)
- return reqid, req
+ signed_req = self.sign(
+ req,
+ sign_prepare=sign_prepare,
+ sign_alg=sign_alg,
+ digest_alg=digest_alg,
+ )
+ req = signed_req
+
+ logger.info("REQUEST: %s", req)
+ return reqid, req
@staticmethod
def _filter_args(instance, extensions=None, **kwargs):
diff --git a/src/saml2/httpbase.py b/src/saml2/httpbase.py
index a6846dab..5860992d 100644
--- a/src/saml2/httpbase.py
+++ b/src/saml2/httpbase.py
@@ -10,10 +10,8 @@ import time
from six.moves.http_cookies import SimpleCookie
from saml2.time_util import utc_now
from saml2 import class_name, SAMLError
-from saml2.pack import http_form_post_message
from saml2.pack import http_post_message
from saml2.pack import make_soap_enveloped_saml_thingy
-from saml2.pack import http_redirect_message
import logging
@@ -255,41 +253,6 @@ class HTTPBase(object):
return r
@staticmethod
- def use_http_post(message, destination, relay_state,
- typ="SAMLRequest"):
- """
- Return a urlencoded message that should be POSTed to the recipient.
-
- :param message: The response
- :param destination: Where the response should be sent
- :param relay_state: The relay_state received in the request
- :param typ: Whether a Request, Response or Artifact
- :return: dictionary
- """
- if not isinstance(message, six.string_types):
- message = "%s" % (message,)
-
- return http_post_message(message, relay_state, typ)
-
- @staticmethod
- def use_http_form_post(message, destination, relay_state,
- typ="SAMLRequest"):
- """
- Return a form that will automagically execute and POST the message
- to the recipient.
-
- :param message:
- :param destination:
- :param relay_state:
- :param typ: Whether a Request, Response or Artifact
- :return: dictionary
- """
- if not isinstance(message, six.string_types):
- message = "%s" % (message,)
-
- return http_form_post_message(message, destination, relay_state, typ)
-
- @staticmethod
def use_http_artifact(message, destination="", relay_state=""):
if relay_state:
query = urlencode({"SAMLart": message,
@@ -388,25 +351,3 @@ class HTTPBase(object):
def add_credentials(self, user, passwd):
self.user = user
self.passwd = passwd
-
- @staticmethod
- def use_http_get(message, destination, relay_state,
- typ="SAMLRequest", sigalg="", signer=None, **kwargs):
- """
- Send a message using GET, this is the HTTP-Redirect case so
- no direct response is expected to this request.
-
- :param message:
- :param destination:
- :param relay_state:
- :param typ: Whether a Request, Response or Artifact
- :param sigalg: Which algorithm the signature function will use to sign
- the message
- :param signer: A signing function that can be used to sign the message
- :return: dictionary
- """
- if not isinstance(message, six.string_types):
- message = "%s" % (message,)
-
- return http_redirect_message(message, destination, relay_state, typ,
- sigalg, signer)
diff --git a/src/saml2/pack.py b/src/saml2/pack.py
index f8fdbfcb..f0890471 100644
--- a/src/saml2/pack.py
+++ b/src/saml2/pack.py
@@ -141,8 +141,15 @@ def http_post_message(message, relay_state="", typ="SAMLRequest", **kwargs):
"status": 200}
-def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
- sigalg='', signer=None, **kwargs):
+def http_redirect_message(
+ message,
+ location,
+ relay_state="",
+ typ="SAMLRequest",
+ sigalg=None,
+ sign=None,
+ backend=None,
+):
"""The HTTP Redirect binding defines a mechanism by which SAML protocol
messages can be transmitted within URL parameters.
Messages are encoded for use with this binding using a URL encoding
@@ -156,7 +163,7 @@ def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
:param typ: What type of message it is SAMLRequest/SAMLResponse/SAMLart
:param sigalg: Which algorithm the signature function will use to sign
the message
- :param signer: A signature function that can be used to sign the message
+ :param sign: Whether the message should be signed
:return: A tuple containing header information and a HTML message.
"""
@@ -178,21 +185,22 @@ def http_redirect_message(message, location, relay_state="", typ="SAMLRequest",
if relay_state:
args["RelayState"] = relay_state
- if signer:
+ if sign:
# sigalgs, should be one defined in xmldsig
if sigalg not in [long_name for short_name, long_name in SIG_ALLOWED_ALG]:
raise Exception(
"Signature algo not in allowed list: {algo}".format(algo=sigalg)
)
- args["SigAlg"] = sigalg
+ signer = backend.get_signer(sigalg) if sign and sigalg else None
+ if not signer:
+ raise Exception("Could not init signer fro algo {algo}".format(algo=sigalg))
- string = "&".join([urlencode({k: args[k]})
- for k in _order if k in args]).encode('ascii')
- args["Signature"] = base64.b64encode(signer.sign(string))
- string = urlencode(args)
- else:
- string = urlencode(args)
+ args["SigAlg"] = sigalg
+ string = "&".join(urlencode({k: args[k]}) for k in _order if k in args)
+ string_enc = string.encode('ascii')
+ args["Signature"] = base64.b64encode(signer.sign(string_enc))
+ string = urlencode(args)
glue_char = "&" if urlparse(location).query else "?"
login_url = glue_char.join([location, string])
headers = [('Location', str(login_url))]
diff --git a/src/saml2/server.py b/src/saml2/server.py
index 50250c3a..bcdbd2bb 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -671,18 +671,30 @@ class Server(Entity):
return args
- def create_authn_response(self, identity, in_response_to, destination,
- sp_entity_id, name_id_policy=None, userid=None,
- name_id=None, authn=None, issuer=None,
- sign_response=None, sign_assertion=None,
- encrypt_cert_advice=None,
- encrypt_cert_assertion=None,
- encrypt_assertion=None,
- encrypt_assertion_self_contained=True,
- encrypted_advice_attributes=False, pefim=False,
- sign_alg=None, digest_alg=None,
- session_not_on_or_after=None,
- **kwargs):
+ def create_authn_response(
+ self,
+ identity,
+ in_response_to,
+ destination,
+ sp_entity_id,
+ name_id_policy=None,
+ userid=None,
+ name_id=None,
+ authn=None,
+ issuer=None,
+ sign_response=None,
+ sign_assertion=None,
+ encrypt_cert_advice=None,
+ encrypt_cert_assertion=None,
+ encrypt_assertion=None,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ pefim=False,
+ sign_alg=None,
+ digest_alg=None,
+ session_not_on_or_after=None,
+ **kwargs
+ ):
""" Constructs an AuthenticationResponse
:param identity: Information about an user
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
index 02daadc8..ee630340 100644
--- a/src/saml2/sigver.py
+++ b/src/saml2/sigver.py
@@ -566,8 +566,7 @@ def verify_redirect_signature(saml_msg, crypto, cert=None, sigkey=None):
try:
signer = crypto.get_signer(saml_msg['SigAlg'], sigkey)
except KeyError:
- raise Unsupported('Signature algorithm: {alg}'.format(
- alg=saml_msg['SigAlg']))
+ raise Unsupported('Signature algorithm: {alg}'.format(alg=saml_msg['SigAlg']))
else:
if saml_msg['SigAlg'] in SIGNER_ALGS:
if 'SAMLRequest' in saml_msg:
@@ -576,13 +575,18 @@ def verify_redirect_signature(saml_msg, crypto, cert=None, sigkey=None):
_order = RESP_ORDER
else:
raise Unsupported(
- 'Verifying signature on something that should not be '
- 'signed')
+ 'Verifying signature on something that should not be signed'
+ )
+
_args = saml_msg.copy()
del _args['Signature'] # everything but the signature
string = '&'.join(
- [parse.urlencode({k: _args[k]}) for k in _order if k in
- _args]).encode('ascii')
+ [
+ parse.urlencode({k: _args[k]})
+ for k in _order
+ if k in _args
+ ]
+ ).encode('ascii')
if cert:
_key = extract_rsa_key_from_x509_cert(pem_format(cert))
diff --git a/tests/test_51_client.py b/tests/test_51_client.py
index d30a8746..a20cf941 100644
--- a/tests/test_51_client.py
+++ b/tests/test_51_client.py
@@ -10,6 +10,7 @@ from pytest import raises
from saml2.argtree import add_path
from saml2.cert import OpenSSLWrapper
+from saml2.xmldsig import sig_default
from saml2.xmldsig import SIG_RSA_SHA256
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
@@ -1445,28 +1446,70 @@ class TestClient:
'givenName': ['Derek'], 'email':
['test.testsson@test.se'], 'sn': ['Jeter']}
- def test_signed_redirect(self):
-
+ def test_signed_with_default_algo_redirect(self):
# Revert configuration change to disallow unsinged responses
self.client.want_response_signed = True
- msg_str = "%s" % self.client.create_authn_request(
- "http://localhost:8088/sso", message_id="id1")[1]
+ reqid, req = self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1"
+ )
+ msg_str = str(req)
info = self.client.apply_binding(
- BINDING_HTTP_REDIRECT, msg_str, destination="",
- relay_state="relay2", sign=True, sigalg=SIG_RSA_SHA256)
+ BINDING_HTTP_REDIRECT,
+ msg_str,
+ destination="",
+ relay_state="relay2",
+ sign=True,
+ )
+ loc = info["headers"][0][1]
+ qs = parse.parse_qs(loc[1:])
+
+ expected_query_params = ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']
+
+ assert _leq(qs.keys(), expected_query_params)
+ assert all(len(qs[k]) == 1 for k in expected_query_params)
+ assert qs["SigAlg"] == [sig_default]
+ assert verify_redirect_signature(
+ list_values2simpletons(qs), self.client.sec.sec_backend
+ )
+
+ res = self.server.parse_authn_request(
+ qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT
+ )
+
+ def test_signed_redirect(self):
+ # Revert configuration change to disallow unsinged responses
+ self.client.want_response_signed = True
+
+ reqid, req = self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1"
+ )
+ msg_str = str(req)
+ info = self.client.apply_binding(
+ BINDING_HTTP_REDIRECT,
+ msg_str,
+ destination="",
+ relay_state="relay2",
+ sign=True,
+ sigalg=SIG_RSA_SHA256,
+ )
loc = info["headers"][0][1]
qs = parse.parse_qs(loc[1:])
- assert _leq(qs.keys(),
- ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
- assert verify_redirect_signature(list_values2simpletons(qs),
- self.client.sec.sec_backend)
+ expected_query_params = ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']
- res = self.server.parse_authn_request(qs["SAMLRequest"][0],
- BINDING_HTTP_REDIRECT)
+ assert _leq(qs.keys(), expected_query_params)
+ assert all(len(qs[k]) == 1 for k in expected_query_params)
+ assert qs["SigAlg"] == [SIG_RSA_SHA256]
+ assert verify_redirect_signature(
+ list_values2simpletons(qs), self.client.sec.sec_backend
+ )
+
+ res = self.server.parse_authn_request(
+ qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT
+ )
def test_do_logout_signed_redirect(self):
conf = config.SPConfig()
diff --git a/tests/test_70_redirect_signing.py b/tests/test_70_redirect_signing.py
index a079d6cb..5286d4c6 100644
--- a/tests/test_70_redirect_signing.py
+++ b/tests/test_70_redirect_signing.py
@@ -30,11 +30,15 @@ def test():
destination = srvs[0]["location"]
req_id, req = sp.create_authn_request(destination, id="id1")
- signer = sp.sec.sec_backend.get_signer(SIG_RSA_SHA1)
-
- info = http_redirect_message(req, destination, relay_state="RS",
- typ="SAMLRequest", sigalg=SIG_RSA_SHA1,
- signer=signer)
+ info = http_redirect_message(
+ req,
+ destination,
+ relay_state="RS",
+ typ="SAMLRequest",
+ sigalg=SIG_RSA_SHA1,
+ sign=True,
+ backend=sp.sec.sec_backend,
+ )
verified_ok = False