From 058cc801b7867c3e42588a9d57d81f9350b01238 Mon Sep 17 00:00:00 2001 From: Vishal Kadam Date: Fri, 26 Mar 2021 17:56:20 -0400 Subject: 633: Support for redirect binding signature check using query param values --- .gitignore | 1 + src/saml2/entity.py | 7 ++-- src/saml2/request.py | 53 ++++++++++++++++++++++++---- src/saml2/server.py | 10 ++++-- tests/test_51_client.py | 92 +++++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 150 insertions(+), 13 deletions(-) diff --git a/.gitignore b/.gitignore index 534546d2..17e7bb35 100644 --- a/.gitignore +++ b/.gitignore @@ -123,6 +123,7 @@ venv.bak/ # PyCharm project files .idea/ +*.iml # mkdocs documentation /site diff --git a/src/saml2/entity.py b/src/saml2/entity.py index 6658a8cf..f37ecc72 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -1024,7 +1024,8 @@ class Entity(HTTPBase): else: return typ - def _parse_request(self, enc_request, request_cls, service, binding): + def _parse_request(self, enc_request, request_cls, service, binding, + relay_state=None, sigalg=None, signature=None): """Parse a Request :param enc_request: The request in its transport format @@ -1070,7 +1071,9 @@ class Entity(HTTPBase): if only_valid_cert: must = True _request = _request.loads(xmlstr, binding, origdoc=enc_request, - must=must, only_valid_cert=only_valid_cert) + must=must, only_valid_cert=only_valid_cert, + relay_state=relay_state, sigalg=sigalg, + signature=signature) _log_debug("Loaded request") diff --git a/src/saml2/request.py b/src/saml2/request.py index 20b711cf..3698c093 100644 --- a/src/saml2/request.py +++ b/src/saml2/request.py @@ -7,6 +7,8 @@ from saml2.s_utils import OtherError from saml2.validate import valid_instance from saml2.validate import NotValid from saml2.response import IncorrectlySigned +from saml2.sigver import verify_redirect_signature +from saml2.sigver import SignatureError logger = logging.getLogger(__name__) @@ -37,20 +39,56 @@ class Request(object): self.not_on_or_after = 0 def _loads(self, xmldata, binding=None, origdoc=None, must=None, - only_valid_cert=False): - if binding == BINDING_HTTP_REDIRECT: - pass - + only_valid_cert=False, relayState=None, sigalg=None, signature=None): # own copy self.xmlstr = xmldata[:] - logger.debug("xmlstr: %s", self.xmlstr) + logger.debug("xmlstr: %s, relayState: %s, sigalg: %s, signature: %s", + self.xmlstr, relayState, sigalg, signature) try: + # If redirect binding, and provided SigAlg, Signature use that to verify + # and skip signatureCheck withing SAMLRequest/xmldata + _do_redirect_sig_check = False + _saml_msg = {} + if binding == BINDING_HTTP_REDIRECT and must \ + and sigalg is not None and signature is not None: + logger.debug("Request signature check will be done using query param," + " instead of SAMLRequest content") + _do_redirect_sig_check = True + must = False + _saml_msg = { + "SAMLRequest": origdoc, + "SigAlg": sigalg, + "Signature": signature + } + # RelayState is optional so only add when available, + # signature validate fails if passed as None + if relayState is not None: + _saml_msg["RelayState"] = relayState + self.message = self.signature_check(xmldata, origdoc=origdoc, must=must, only_valid_cert=only_valid_cert) + + if _do_redirect_sig_check: + _issuer = self.message.issuer.text.strip() + _certs = self.sec.metadata.certs(_issuer, "any", "signing") + logger.debug("Certs: %s, _saml_msg: %s", _certs, _saml_msg) + _verified_ok = False + for cert in _certs: + if verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert): + _verified_ok = True + break + + logger.info("Redirect request signature check: %s", _verified_ok) + # Set self.message to None, it shall raise error further down. + if not _verified_ok: + self.message = None + raise SignatureError('Failed to verify signature') + except TypeError: raise except Exception as excp: + self.message = None logger.info("EXCEPTION: %s", excp) if not self.message: @@ -97,9 +135,10 @@ class Request(object): return valid def loads(self, xmldata, binding, origdoc=None, must=None, - only_valid_cert=False): + only_valid_cert=False, relay_state=None, sigalg=None, signature=None): return self._loads(xmldata, binding, origdoc, must, - only_valid_cert=only_valid_cert) + only_valid_cert=only_valid_cert, relayState=relay_state, + sigalg=sigalg, signature=signature) def verify(self): try: diff --git a/src/saml2/server.py b/src/saml2/server.py index 9e34cce2..1bcf7ead 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -226,17 +226,23 @@ class Server(Entity): # ------------------------------------------------------------------------- - def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT): + def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT, + relay_state=None, sigalg=None, signature=None): """Parse a Authentication Request :param enc_request: The request in its transport format :param binding: Which binding that was used to transport the message + :param relay_state: RelayState, when binding=redirect + :param sigalg: Signature Algorithm, when binding=redirect + :param signature: Signature, when binding=redirect to this entity. :return: A request instance """ return self._parse_request(enc_request, AuthnRequest, - "single_sign_on_service", binding) + "single_sign_on_service", binding, + relay_state=relay_state, sigalg=sigalg, + signature=signature) def parse_attribute_query(self, xml_string, binding): """ Parse an attribute query diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 0fbf63f8..7e46ccaa 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -11,7 +11,7 @@ from pytest import raises from saml2.argtree import add_path from saml2.cert import OpenSSLWrapper from saml2.xmldsig import sig_default -from saml2.xmldsig import SIG_RSA_SHA256 +from saml2.xmldsig import SIG_RSA_SHA256, SIG_RSA_SHA1 from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_REDIRECT from saml2 import config @@ -29,7 +29,8 @@ from saml2.extension.requested_attributes import RequestedAttribute from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.client import Saml2Client from saml2.pack import parse_soap_enveloped_saml -from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError +from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError, \ + IncorrectlySigned from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID @@ -172,6 +173,9 @@ class TestClient: conf.load_file("server_conf") self.client = Saml2Client(conf) + def setup_method(self): + self.server.config.setattr("idp", "want_authn_requests_signed", None) + def teardown_class(self): self.server.close() @@ -1524,6 +1528,90 @@ class TestClient: qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT ) + def test_signed_redirect_passes_if_needs_signed_requests(self): + # Revert configuration change to disallow unsinged responses + self.client.want_response_signed = True + self.server.config.setattr("idp", "want_authn_requests_signed", True) + + reqid, req = self.client.create_authn_request( + "http://localhost:8088/sso", message_id="id1" + ) + + info = self.client.apply_binding( + BINDING_HTTP_REDIRECT, + str(req), + destination="", + relay_state="relay2", + sign=True, + sigalg=SIG_RSA_SHA256, + ) + loc = info["headers"][0][1] + qs = list_values2simpletons(parse.parse_qs(loc[1:])) + + res = self.server.parse_authn_request( + qs["SAMLRequest"], + BINDING_HTTP_REDIRECT, + relay_state=qs["RelayState"], + sigalg=qs["SigAlg"], + signature=qs["Signature"] + ) + assert res.message.destination == "http://localhost:8088/sso" + assert res.message.id == "id1" + + def test_signed_redirect_fail_if_needs_signed_request_but_received_unsigned(self): + # Revert configuration change to disallow unsinged responses + self.client.want_response_signed = True + self.server.config.setattr("idp", "want_authn_requests_signed", True) + + reqid, req = self.client.create_authn_request( + "http://localhost:8088/sso", message_id="id1" + ) + + info = self.client.apply_binding( + BINDING_HTTP_REDIRECT, + str(req), + destination="", + relay_state="relay2", + sign=True, + sigalg=SIG_RSA_SHA256, + ) + loc = info["headers"][0][1] + qs = list_values2simpletons(parse.parse_qs(loc[1:])) + + with raises(IncorrectlySigned): + self.server.parse_authn_request( + qs["SAMLRequest"], BINDING_HTTP_REDIRECT + ) + + def test_signed_redirect_fail_if_needs_signed_request_but_sigalg_not_matches(self): + # Revert configuration change to disallow unsinged responses + self.client.want_response_signed = True + self.server.config.setattr("idp", "want_authn_requests_signed", True) + + reqid, req = self.client.create_authn_request( + "http://localhost:8088/sso", message_id="id1" + ) + + info = self.client.apply_binding( + BINDING_HTTP_REDIRECT, + str(req), + destination="", + relay_state="relay2", + sign=True, + sigalg=SIG_RSA_SHA256, + ) + loc = info["headers"][0][1] + qs = list_values2simpletons(parse.parse_qs(loc[1:])) + + with raises(IncorrectlySigned): + self.server.parse_authn_request( + qs["SAMLRequest"], + BINDING_HTTP_REDIRECT, + relay_state=qs["RelayState"], + sigalg=SIG_RSA_SHA1, + signature=qs["Signature"] + ) + def test_do_logout_signed_redirect(self): conf = config.SPConfig() conf.load_file("sp_slo_redirect_conf") -- cgit v1.2.1 From 5b07161d78ac34a285a4844989f954bc49879462 Mon Sep 17 00:00:00 2001 From: Vishal Kadam Date: Tue, 1 Jun 2021 12:18:26 -0400 Subject: Refactored redirect signature check into separate method --- src/saml2/request.py | 89 ++++++++++++++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/src/saml2/request.py b/src/saml2/request.py index 3698c093..860bf28e 100644 --- a/src/saml2/request.py +++ b/src/saml2/request.py @@ -8,7 +8,6 @@ from saml2.validate import valid_instance from saml2.validate import NotValid from saml2.response import IncorrectlySigned from saml2.sigver import verify_redirect_signature -from saml2.sigver import SignatureError logger = logging.getLogger(__name__) @@ -44,59 +43,34 @@ class Request(object): self.xmlstr = xmldata[:] logger.debug("xmlstr: %s, relayState: %s, sigalg: %s, signature: %s", self.xmlstr, relayState, sigalg, signature) - try: - # If redirect binding, and provided SigAlg, Signature use that to verify - # and skip signatureCheck withing SAMLRequest/xmldata - _do_redirect_sig_check = False - _saml_msg = {} - if binding == BINDING_HTTP_REDIRECT and must \ - and sigalg is not None and signature is not None: - logger.debug("Request signature check will be done using query param," - " instead of SAMLRequest content") - _do_redirect_sig_check = True - must = False - _saml_msg = { - "SAMLRequest": origdoc, - "SigAlg": sigalg, - "Signature": signature - } - # RelayState is optional so only add when available, - # signature validate fails if passed as None - if relayState is not None: - _saml_msg["RelayState"] = relayState + # If redirect binding, and provided SigAlg, Signature use that to verify + # and skip signatureCheck withing SAMLRequest/xmldata + _need_redirect_sig_check, _saml_msg, must = self._should_do_redirect_sig_check( + binding, must, origdoc, relayState, sigalg, signature) + try: self.message = self.signature_check(xmldata, origdoc=origdoc, must=must, only_valid_cert=only_valid_cert) - - if _do_redirect_sig_check: - _issuer = self.message.issuer.text.strip() - _certs = self.sec.metadata.certs(_issuer, "any", "signing") - logger.debug("Certs: %s, _saml_msg: %s", _certs, _saml_msg) - _verified_ok = False - for cert in _certs: - if verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert): - _verified_ok = True - break - - logger.info("Redirect request signature check: %s", _verified_ok) - # Set self.message to None, it shall raise error further down. - if not _verified_ok: - self.message = None - raise SignatureError('Failed to verify signature') - except TypeError: raise except Exception as excp: self.message = None logger.info("EXCEPTION: %s", excp) + if _need_redirect_sig_check and self.message is not None: + _verified_ok = self._do_redirect_sig_check(_saml_msg) + # Set self.message to None, it shall raise error further down. + if not _verified_ok: + self.message = None + logger.error('Failed to verify signature') + if not self.message: - logger.error("Response was not correctly signed") - logger.info("Response: %s", xmldata) + logger.error("Request was not correctly signed") + logger.info("Request: %s", xmldata) raise IncorrectlySigned() - logger.info("request: %s", self.message) + logger.info("Request: %s", self.message) try: valid_instance(self.message) @@ -106,6 +80,39 @@ class Request(object): return self + def _do_redirect_sig_check(self, _saml_msg): + _issuer = self.message.issuer.text.strip() + _certs = self.sec.metadata.certs(_issuer, "any", "signing") + logger.debug("Certs: %s, _saml_msg: %s", _certs, _saml_msg) + _verified_ok = False + for cert in _certs: + if verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert): + _verified_ok = True + break + logger.info("Redirect request signature check: %s", _verified_ok) + return _verified_ok + + def _should_do_redirect_sig_check(self, binding, must, origdoc, relayState, sigalg, + signature): + _do_redirect_sig_check = False + _saml_msg = {} + if binding == BINDING_HTTP_REDIRECT and must \ + and sigalg is not None and signature is not None: + logger.debug("Request signature check will be done using query param," + " instead of SAMLRequest content") + _do_redirect_sig_check = True + must = False + _saml_msg = { + "SAMLRequest": origdoc, + "SigAlg": sigalg, + "Signature": signature + } + # RelayState is optional so only add when available, + # signature validate fails if passed as None + if relayState is not None: + _saml_msg["RelayState"] = relayState + return _do_redirect_sig_check, _saml_msg, must + def issue_instant_ok(self): """ Check that the request was issued at a reasonable time """ upper = time_util.shift_time(time_util.time_in_a_while(days=1), -- cgit v1.2.1 From 68c0a8902e20b384a548f634d92312077340562d Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Tue, 16 Nov 2021 13:17:06 +0200 Subject: Small refactor Signed-off-by: Ivan Kanakarakis --- src/saml2/entity.py | 15 ++++-- src/saml2/request.py | 120 +++++++++++++++++++++++++----------------------- tests/test_51_client.py | 7 +-- 3 files changed, 77 insertions(+), 65 deletions(-) diff --git a/src/saml2/entity.py b/src/saml2/entity.py index f37ecc72..e926d2c5 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -1024,8 +1024,16 @@ class Entity(HTTPBase): else: return typ - def _parse_request(self, enc_request, request_cls, service, binding, - relay_state=None, sigalg=None, signature=None): + def _parse_request( + self, + enc_request, + request_cls, + service, + binding, + relay_state=None, + sigalg=None, + signature=None, + ): """Parse a Request :param enc_request: The request in its transport format @@ -1040,8 +1048,7 @@ class Entity(HTTPBase): _log_debug = logger.debug # The addresses I should receive messages like this on - receiver_addresses = self.config.endpoint(service, binding, - self.entity_type) + receiver_addresses = self.config.endpoint(service, binding, self.entity_type) if not receiver_addresses and self.entity_type == "idp": for typ in ["aa", "aq", "pdp"]: receiver_addresses = self.config.endpoint(service, binding, typ) diff --git a/src/saml2/request.py b/src/saml2/request.py index 860bf28e..200a1ff8 100644 --- a/src/saml2/request.py +++ b/src/saml2/request.py @@ -1,7 +1,9 @@ import logging +from saml2 import time_util +from saml2 import BINDING_HTTP_REDIRECT +from saml2 import BINDING_HTTP_POST from saml2.attribute_converter import to_local -from saml2 import time_util, BINDING_HTTP_REDIRECT from saml2.s_utils import OtherError from saml2.validate import valid_instance @@ -37,81 +39,83 @@ class Request(object): self.message = None self.not_on_or_after = 0 - def _loads(self, xmldata, binding=None, origdoc=None, must=None, - only_valid_cert=False, relayState=None, sigalg=None, signature=None): + def _loads( + self, + xmldata, + binding=None, + origdoc=None, + must=None, + only_valid_cert=False, + relay_state=None, + sigalg=None, + signature=None, + ): # own copy self.xmlstr = xmldata[:] - logger.debug("xmlstr: %s, relayState: %s, sigalg: %s, signature: %s", - self.xmlstr, relayState, sigalg, signature) - # If redirect binding, and provided SigAlg, Signature use that to verify - # and skip signatureCheck withing SAMLRequest/xmldata - _need_redirect_sig_check, _saml_msg, must = self._should_do_redirect_sig_check( - binding, must, origdoc, relayState, sigalg, signature) + logger.debug("xmlstr: %s, relay_state: %s, sigalg: %s, signature: %s", + self.xmlstr, relay_state, sigalg, signature) + + signed_post = must and binding == BINDING_HTTP_POST + signed_redirect = must and binding == BINDING_HTTP_REDIRECT + incorrectly_signed = IncorrectlySigned("Request was not signed correctly") try: - self.message = self.signature_check(xmldata, origdoc=origdoc, - must=must, - only_valid_cert=only_valid_cert) - except TypeError: - raise - except Exception as excp: + self.message = self.signature_check( + xmldata, + origdoc=origdoc, + must=signed_post, + only_valid_cert=only_valid_cert, + ) + except Exception as e: self.message = None - logger.info("EXCEPTION: %s", excp) + raise incorrectly_signed from e - if _need_redirect_sig_check and self.message is not None: - _verified_ok = self._do_redirect_sig_check(_saml_msg) - # Set self.message to None, it shall raise error further down. - if not _verified_ok: + if signed_redirect: + if sigalg is None or signature is None: + raise incorrectly_signed + + _saml_msg = { + "SAMLRequest": origdoc, + "Signature": signature, + "SigAlg": sigalg, + } + if relay_state is not None: + _saml_msg["RelayState"] = relay_state + try: + sig_verified = self._do_redirect_sig_check(_saml_msg) + except Exception as e: self.message = None - logger.error('Failed to verify signature') + raise incorrectly_signed from e + else: + if not sig_verified: + self.message = None + raise incorrectly_signed if not self.message: - logger.error("Request was not correctly signed") - logger.info("Request: %s", xmldata) - raise IncorrectlySigned() + logger.error("Request was not signed correctly") + logger.info("Request data: %s", xmldata) + raise incorrectly_signed - logger.info("Request: %s", self.message) + logger.info("Request message: %s", self.message) try: valid_instance(self.message) except NotValid as exc: - logger.error("Not valid request: %s", exc.args[0]) + logger.error("Request not valid: %s", exc.args[0]) raise return self def _do_redirect_sig_check(self, _saml_msg): - _issuer = self.message.issuer.text.strip() - _certs = self.sec.metadata.certs(_issuer, "any", "signing") - logger.debug("Certs: %s, _saml_msg: %s", _certs, _saml_msg) - _verified_ok = False - for cert in _certs: - if verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert): - _verified_ok = True - break - logger.info("Redirect request signature check: %s", _verified_ok) - return _verified_ok - - def _should_do_redirect_sig_check(self, binding, must, origdoc, relayState, sigalg, - signature): - _do_redirect_sig_check = False - _saml_msg = {} - if binding == BINDING_HTTP_REDIRECT and must \ - and sigalg is not None and signature is not None: - logger.debug("Request signature check will be done using query param," - " instead of SAMLRequest content") - _do_redirect_sig_check = True - must = False - _saml_msg = { - "SAMLRequest": origdoc, - "SigAlg": sigalg, - "Signature": signature - } - # RelayState is optional so only add when available, - # signature validate fails if passed as None - if relayState is not None: - _saml_msg["RelayState"] = relayState - return _do_redirect_sig_check, _saml_msg, must + issuer = self.message.issuer.text.strip() + certs = self.sec.metadata.certs(issuer, "any", "signing") + logger.debug("Certs to verify request sig: %s, _saml_msg: %s", certs, _saml_msg) + verified = any( + verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert) + for cert_name, cert in certs + ) + logger.info("Redirect request signature check: %s", verified) + return verified def issue_instant_ok(self): """ Check that the request was issued at a reasonable time """ @@ -144,7 +148,7 @@ class Request(object): def loads(self, xmldata, binding, origdoc=None, must=None, only_valid_cert=False, relay_state=None, sigalg=None, signature=None): return self._loads(xmldata, binding, origdoc, must, - only_valid_cert=only_valid_cert, relayState=relay_state, + only_valid_cert=only_valid_cert, relay_state=relay_state, sigalg=sigalg, signature=signature) def verify(self): diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 7e46ccaa..095d80ab 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -11,7 +11,8 @@ from pytest import raises from saml2.argtree import add_path from saml2.cert import OpenSSLWrapper from saml2.xmldsig import sig_default -from saml2.xmldsig import SIG_RSA_SHA256, SIG_RSA_SHA1 +from saml2.xmldsig import SIG_RSA_SHA256 +from saml2.xmldsig import SIG_RSA_SHA1 from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_REDIRECT from saml2 import config @@ -29,8 +30,8 @@ from saml2.extension.requested_attributes import RequestedAttribute from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.client import Saml2Client from saml2.pack import parse_soap_enveloped_saml -from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError, \ - IncorrectlySigned +from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError +from saml2.response import IncorrectlySigned from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID -- cgit v1.2.1 From a3b26f3b6d3ea8122c69f7172cafc250c74f1481 Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Tue, 16 Nov 2021 13:17:28 +0200 Subject: Verify signed logout requests with the redirect binding Signed-off-by: Ivan Kanakarakis --- src/saml2/client.py | 18 +++++++++-- src/saml2/entity.py | 20 ++++++++++-- tests/test_51_client.py | 82 +++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 102 insertions(+), 18 deletions(-) diff --git a/src/saml2/client.py b/src/saml2/client.py index aa0bd0c9..a7469d4f 100644 --- a/src/saml2/client.py +++ b/src/saml2/client.py @@ -630,7 +630,9 @@ class Saml2Client(Base): sign=None, sign_alg=None, digest_alg=None, - relay_state="", + relay_state=None, + sigalg=None, + signature=None, ): """ Deal with a LogoutRequest @@ -639,6 +641,11 @@ class Saml2Client(Base): :param name_id: The id of the current user :param binding: Which binding the message came in over :param sign: Whether the response will be signed or not + :param sign_alg: The signing algorithm for the response + :param digest_alg: The digest algorithm for the the response + :param relay_state: The relay state of the request + :param sigalg: The SigAlg query param of the request + :param signature: The Signature query param of the request :return: Keyword arguments which can be used to send the response what's returned follow different patterns for different bindings. If the binding is BINDIND_SOAP, what is returned looks like this:: @@ -652,8 +659,13 @@ class Saml2Client(Base): """ logger.info("logout request: %s", request) - _req = self._parse_request(request, LogoutRequest, - "single_logout_service", binding) + _req = self.parse_logout_request( + xmlstr=request, + binding=binding, + relay_state=relay_state, + sigalg=sigalg, + signature=signature, + ) if _req.message.name_id == name_id: try: diff --git a/src/saml2/entity.py b/src/saml2/entity.py index e926d2c5..f818b702 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -1584,7 +1584,14 @@ class Entity(HTTPBase): # ------------------------------------------------------------------------ - def parse_logout_request(self, xmlstr, binding=BINDING_SOAP): + def parse_logout_request( + self, + xmlstr, + binding=BINDING_SOAP, + relay_state=None, + sigalg=None, + signature=None, + ): """ Deal with a LogoutRequest :param xmlstr: The response as a xml string @@ -1594,8 +1601,15 @@ class Entity(HTTPBase): was not. """ - return self._parse_request(xmlstr, saml_request.LogoutRequest, - "single_logout_service", binding) + return self._parse_request( + enc_request=xmlstr, + request_cls=saml_request.LogoutRequest, + service="single_logout_service", + binding=binding, + relay_state=relay_state, + sigalg=sigalg, + signature=signature, + ) def use_artifact(self, message, endpoint_index=0): """ diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 095d80ab..a323de79 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -1644,14 +1644,68 @@ class TestClient: loc = info["headers"][0][1] _, _, _, _, qs, _ = parse.urlparse(loc) qs = parse.parse_qs(qs) - assert _leq(qs.keys(), - ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']) + assert _leq(qs.keys(), ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']) - assert verify_redirect_signature(list_values2simpletons(qs), - client.sec.sec_backend) + qs_simple = list_values2simpletons(qs) + assert verify_redirect_signature(qs_simple, client.sec.sec_backend) - res = self.server.parse_logout_request(qs["SAMLRequest"][0], - BINDING_HTTP_REDIRECT) + res = self.server.parse_logout_request( + qs_simple["SAMLRequest"], + BINDING_HTTP_REDIRECT, + relay_state=qs_simple['RelayState'], + sigalg=qs_simple['SigAlg'], + signature=qs_simple['Signature'], + ) + + def test_do_logout_signed_redirect_invalid(self): + conf = config.SPConfig() + conf.load_file("sp_slo_redirect_conf") + client = Saml2Client(conf) + + session_info = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": in_a_while(minutes=15), + "ava": { + "givenName": "Anders", + "sn": "Andersson", + "mail": "anders.andersson@example.com" + } + } + client.users.add_information_about_person(session_info) + entity_ids = client.users.issuers_of_info(nid) + + resp = client.do_logout( + nid, + entity_ids, + "Tired", + in_a_while(minutes=5), + sign=True, + expected_binding=BINDING_HTTP_REDIRECT, + ) + + binding, info = resp[entity_ids[0]] + loc = info["headers"][0][1] + _, _, _, _, qs, _ = parse.urlparse(loc) + qs = parse.parse_qs(qs) + qs_simple = list_values2simpletons(qs) + + invalid_signature = 'ZEdMZUQ3SjBjQ2ozWmlGaHhyV3JZSzNkTWhQWU02bjA0dzVNeUd1UWgrVDhnYm1oc1R1TTFjPQo=' + qs_simple_invalid = { + **qs_simple, + 'Signature': invalid_signature, + } + assert not verify_redirect_signature(qs_simple_invalid, client.sec.sec_backend) + + self.server.config.setattr("idp", "want_authn_requests_signed", True) + with raises(IncorrectlySigned): + res = self.server.parse_logout_request( + qs_simple["SAMLRequest"], + BINDING_HTTP_REDIRECT, + relay_state=qs_simple['RelayState'], + sigalg=qs_simple['SigAlg'], + signature=invalid_signature, + ) def test_do_logout_post(self): # information about the user from an IdP @@ -3245,14 +3299,18 @@ class TestClientNonAsciiAva: loc = info["headers"][0][1] _, _, _, _, qs, _ = parse.urlparse(loc) qs = parse.parse_qs(qs) - assert _leq(qs.keys(), - ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']) + assert _leq(qs.keys(), ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']) - assert verify_redirect_signature(list_values2simpletons(qs), - client.sec.sec_backend) + qs_simple = list_values2simpletons(qs) + assert verify_redirect_signature(qs_simple, client.sec.sec_backend) - res = self.server.parse_logout_request(qs["SAMLRequest"][0], - BINDING_HTTP_REDIRECT) + res = self.server.parse_logout_request( + qs_simple["SAMLRequest"], + BINDING_HTTP_REDIRECT, + relay_state=qs_simple['RelayState'], + sigalg=qs_simple['SigAlg'], + signature=qs_simple['Signature'], + ) def test_do_logout_post(self): # information about the user from an IdP -- cgit v1.2.1