diff options
author | Vishal Kadam <vishal.kadam@cengage.com> | 2021-03-26 17:56:20 -0400 |
---|---|---|
committer | Ivan Kanakarakis <ivan.kanak@gmail.com> | 2021-11-15 14:07:20 +0200 |
commit | 058cc801b7867c3e42588a9d57d81f9350b01238 (patch) | |
tree | 3b2059bfe7d44255dd8eefd6d9b2c562214aea41 | |
parent | 5caf6da27bd81b0e191254a329c4dd3c67458f8e (diff) | |
download | pysaml2-058cc801b7867c3e42588a9d57d81f9350b01238.tar.gz |
633: Support for redirect binding signature check using query param values
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | src/saml2/entity.py | 7 | ||||
-rw-r--r-- | src/saml2/request.py | 53 | ||||
-rw-r--r-- | src/saml2/server.py | 10 | ||||
-rw-r--r-- | tests/test_51_client.py | 92 |
5 files changed, 150 insertions, 13 deletions
@@ -123,6 +123,7 @@ venv.bak/ # PyCharm project files .idea/ +*.iml # mkdocs documentation /site diff --git a/src/saml2/entity.py b/src/saml2/entity.py index 6658a8cf..f37ecc72 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -1024,7 +1024,8 @@ class Entity(HTTPBase): else: return typ - def _parse_request(self, enc_request, request_cls, service, binding): + def _parse_request(self, enc_request, request_cls, service, binding, + relay_state=None, sigalg=None, signature=None): """Parse a Request :param enc_request: The request in its transport format @@ -1070,7 +1071,9 @@ class Entity(HTTPBase): if only_valid_cert: must = True _request = _request.loads(xmlstr, binding, origdoc=enc_request, - must=must, only_valid_cert=only_valid_cert) + must=must, only_valid_cert=only_valid_cert, + relay_state=relay_state, sigalg=sigalg, + signature=signature) _log_debug("Loaded request") diff --git a/src/saml2/request.py b/src/saml2/request.py index 20b711cf..3698c093 100644 --- a/src/saml2/request.py +++ b/src/saml2/request.py @@ -7,6 +7,8 @@ from saml2.s_utils import OtherError from saml2.validate import valid_instance from saml2.validate import NotValid from saml2.response import IncorrectlySigned +from saml2.sigver import verify_redirect_signature +from saml2.sigver import SignatureError logger = logging.getLogger(__name__) @@ -37,20 +39,56 @@ class Request(object): self.not_on_or_after = 0 def _loads(self, xmldata, binding=None, origdoc=None, must=None, - only_valid_cert=False): - if binding == BINDING_HTTP_REDIRECT: - pass - + only_valid_cert=False, relayState=None, sigalg=None, signature=None): # own copy self.xmlstr = xmldata[:] - logger.debug("xmlstr: %s", self.xmlstr) + logger.debug("xmlstr: %s, relayState: %s, sigalg: %s, signature: %s", + self.xmlstr, relayState, sigalg, signature) try: + # If redirect binding, and provided SigAlg, Signature use that to verify + # and skip signatureCheck withing SAMLRequest/xmldata + _do_redirect_sig_check = False + _saml_msg = {} + if binding == BINDING_HTTP_REDIRECT and must \ + and sigalg is not None and signature is not None: + logger.debug("Request signature check will be done using query param," + " instead of SAMLRequest content") + _do_redirect_sig_check = True + must = False + _saml_msg = { + "SAMLRequest": origdoc, + "SigAlg": sigalg, + "Signature": signature + } + # RelayState is optional so only add when available, + # signature validate fails if passed as None + if relayState is not None: + _saml_msg["RelayState"] = relayState + self.message = self.signature_check(xmldata, origdoc=origdoc, must=must, only_valid_cert=only_valid_cert) + + if _do_redirect_sig_check: + _issuer = self.message.issuer.text.strip() + _certs = self.sec.metadata.certs(_issuer, "any", "signing") + logger.debug("Certs: %s, _saml_msg: %s", _certs, _saml_msg) + _verified_ok = False + for cert in _certs: + if verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert): + _verified_ok = True + break + + logger.info("Redirect request signature check: %s", _verified_ok) + # Set self.message to None, it shall raise error further down. + if not _verified_ok: + self.message = None + raise SignatureError('Failed to verify signature') + except TypeError: raise except Exception as excp: + self.message = None logger.info("EXCEPTION: %s", excp) if not self.message: @@ -97,9 +135,10 @@ class Request(object): return valid def loads(self, xmldata, binding, origdoc=None, must=None, - only_valid_cert=False): + only_valid_cert=False, relay_state=None, sigalg=None, signature=None): return self._loads(xmldata, binding, origdoc, must, - only_valid_cert=only_valid_cert) + only_valid_cert=only_valid_cert, relayState=relay_state, + sigalg=sigalg, signature=signature) def verify(self): try: diff --git a/src/saml2/server.py b/src/saml2/server.py index 9e34cce2..1bcf7ead 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -226,17 +226,23 @@ class Server(Entity): # ------------------------------------------------------------------------- - def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT): + def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT, + relay_state=None, sigalg=None, signature=None): """Parse a Authentication Request :param enc_request: The request in its transport format :param binding: Which binding that was used to transport the message + :param relay_state: RelayState, when binding=redirect + :param sigalg: Signature Algorithm, when binding=redirect + :param signature: Signature, when binding=redirect to this entity. :return: A request instance """ return self._parse_request(enc_request, AuthnRequest, - "single_sign_on_service", binding) + "single_sign_on_service", binding, + relay_state=relay_state, sigalg=sigalg, + signature=signature) def parse_attribute_query(self, xml_string, binding): """ Parse an attribute query diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 0fbf63f8..7e46ccaa 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -11,7 +11,7 @@ from pytest import raises from saml2.argtree import add_path from saml2.cert import OpenSSLWrapper from saml2.xmldsig import sig_default -from saml2.xmldsig import SIG_RSA_SHA256 +from saml2.xmldsig import SIG_RSA_SHA256, SIG_RSA_SHA1 from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_REDIRECT from saml2 import config @@ -29,7 +29,8 @@ from saml2.extension.requested_attributes import RequestedAttribute from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.client import Saml2Client from saml2.pack import parse_soap_enveloped_saml -from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError +from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError, \ + IncorrectlySigned from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID @@ -172,6 +173,9 @@ class TestClient: conf.load_file("server_conf") self.client = Saml2Client(conf) + def setup_method(self): + self.server.config.setattr("idp", "want_authn_requests_signed", None) + def teardown_class(self): self.server.close() @@ -1524,6 +1528,90 @@ class TestClient: qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT ) + def test_signed_redirect_passes_if_needs_signed_requests(self): + # Revert configuration change to disallow unsinged responses + self.client.want_response_signed = True + self.server.config.setattr("idp", "want_authn_requests_signed", True) + + reqid, req = self.client.create_authn_request( + "http://localhost:8088/sso", message_id="id1" + ) + + info = self.client.apply_binding( + BINDING_HTTP_REDIRECT, + str(req), + destination="", + relay_state="relay2", + sign=True, + sigalg=SIG_RSA_SHA256, + ) + loc = info["headers"][0][1] + qs = list_values2simpletons(parse.parse_qs(loc[1:])) + + res = self.server.parse_authn_request( + qs["SAMLRequest"], + BINDING_HTTP_REDIRECT, + relay_state=qs["RelayState"], + sigalg=qs["SigAlg"], + signature=qs["Signature"] + ) + assert res.message.destination == "http://localhost:8088/sso" + assert res.message.id == "id1" + + def test_signed_redirect_fail_if_needs_signed_request_but_received_unsigned(self): + # Revert configuration change to disallow unsinged responses + self.client.want_response_signed = True + self.server.config.setattr("idp", "want_authn_requests_signed", True) + + reqid, req = self.client.create_authn_request( + "http://localhost:8088/sso", message_id="id1" + ) + + info = self.client.apply_binding( + BINDING_HTTP_REDIRECT, + str(req), + destination="", + relay_state="relay2", + sign=True, + sigalg=SIG_RSA_SHA256, + ) + loc = info["headers"][0][1] + qs = list_values2simpletons(parse.parse_qs(loc[1:])) + + with raises(IncorrectlySigned): + self.server.parse_authn_request( + qs["SAMLRequest"], BINDING_HTTP_REDIRECT + ) + + def test_signed_redirect_fail_if_needs_signed_request_but_sigalg_not_matches(self): + # Revert configuration change to disallow unsinged responses + self.client.want_response_signed = True + self.server.config.setattr("idp", "want_authn_requests_signed", True) + + reqid, req = self.client.create_authn_request( + "http://localhost:8088/sso", message_id="id1" + ) + + info = self.client.apply_binding( + BINDING_HTTP_REDIRECT, + str(req), + destination="", + relay_state="relay2", + sign=True, + sigalg=SIG_RSA_SHA256, + ) + loc = info["headers"][0][1] + qs = list_values2simpletons(parse.parse_qs(loc[1:])) + + with raises(IncorrectlySigned): + self.server.parse_authn_request( + qs["SAMLRequest"], + BINDING_HTTP_REDIRECT, + relay_state=qs["RelayState"], + sigalg=SIG_RSA_SHA1, + signature=qs["Signature"] + ) + def test_do_logout_signed_redirect(self): conf = config.SPConfig() conf.load_file("sp_slo_redirect_conf") |