From 5b07161d78ac34a285a4844989f954bc49879462 Mon Sep 17 00:00:00 2001 From: Vishal Kadam Date: Tue, 1 Jun 2021 12:18:26 -0400 Subject: Refactored redirect signature check into separate method --- src/saml2/request.py | 89 ++++++++++++++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/src/saml2/request.py b/src/saml2/request.py index 3698c093..860bf28e 100644 --- a/src/saml2/request.py +++ b/src/saml2/request.py @@ -8,7 +8,6 @@ from saml2.validate import valid_instance from saml2.validate import NotValid from saml2.response import IncorrectlySigned from saml2.sigver import verify_redirect_signature -from saml2.sigver import SignatureError logger = logging.getLogger(__name__) @@ -44,59 +43,34 @@ class Request(object): self.xmlstr = xmldata[:] logger.debug("xmlstr: %s, relayState: %s, sigalg: %s, signature: %s", self.xmlstr, relayState, sigalg, signature) - try: - # If redirect binding, and provided SigAlg, Signature use that to verify - # and skip signatureCheck withing SAMLRequest/xmldata - _do_redirect_sig_check = False - _saml_msg = {} - if binding == BINDING_HTTP_REDIRECT and must \ - and sigalg is not None and signature is not None: - logger.debug("Request signature check will be done using query param," - " instead of SAMLRequest content") - _do_redirect_sig_check = True - must = False - _saml_msg = { - "SAMLRequest": origdoc, - "SigAlg": sigalg, - "Signature": signature - } - # RelayState is optional so only add when available, - # signature validate fails if passed as None - if relayState is not None: - _saml_msg["RelayState"] = relayState + # If redirect binding, and provided SigAlg, Signature use that to verify + # and skip signatureCheck withing SAMLRequest/xmldata + _need_redirect_sig_check, _saml_msg, must = self._should_do_redirect_sig_check( + binding, must, origdoc, relayState, sigalg, signature) + try: self.message = self.signature_check(xmldata, origdoc=origdoc, must=must, only_valid_cert=only_valid_cert) - - if _do_redirect_sig_check: - _issuer = self.message.issuer.text.strip() - _certs = self.sec.metadata.certs(_issuer, "any", "signing") - logger.debug("Certs: %s, _saml_msg: %s", _certs, _saml_msg) - _verified_ok = False - for cert in _certs: - if verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert): - _verified_ok = True - break - - logger.info("Redirect request signature check: %s", _verified_ok) - # Set self.message to None, it shall raise error further down. - if not _verified_ok: - self.message = None - raise SignatureError('Failed to verify signature') - except TypeError: raise except Exception as excp: self.message = None logger.info("EXCEPTION: %s", excp) + if _need_redirect_sig_check and self.message is not None: + _verified_ok = self._do_redirect_sig_check(_saml_msg) + # Set self.message to None, it shall raise error further down. + if not _verified_ok: + self.message = None + logger.error('Failed to verify signature') + if not self.message: - logger.error("Response was not correctly signed") - logger.info("Response: %s", xmldata) + logger.error("Request was not correctly signed") + logger.info("Request: %s", xmldata) raise IncorrectlySigned() - logger.info("request: %s", self.message) + logger.info("Request: %s", self.message) try: valid_instance(self.message) @@ -106,6 +80,39 @@ class Request(object): return self + def _do_redirect_sig_check(self, _saml_msg): + _issuer = self.message.issuer.text.strip() + _certs = self.sec.metadata.certs(_issuer, "any", "signing") + logger.debug("Certs: %s, _saml_msg: %s", _certs, _saml_msg) + _verified_ok = False + for cert in _certs: + if verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert): + _verified_ok = True + break + logger.info("Redirect request signature check: %s", _verified_ok) + return _verified_ok + + def _should_do_redirect_sig_check(self, binding, must, origdoc, relayState, sigalg, + signature): + _do_redirect_sig_check = False + _saml_msg = {} + if binding == BINDING_HTTP_REDIRECT and must \ + and sigalg is not None and signature is not None: + logger.debug("Request signature check will be done using query param," + " instead of SAMLRequest content") + _do_redirect_sig_check = True + must = False + _saml_msg = { + "SAMLRequest": origdoc, + "SigAlg": sigalg, + "Signature": signature + } + # RelayState is optional so only add when available, + # signature validate fails if passed as None + if relayState is not None: + _saml_msg["RelayState"] = relayState + return _do_redirect_sig_check, _saml_msg, must + def issue_instant_ok(self): """ Check that the request was issued at a reasonable time """ upper = time_util.shift_time(time_util.time_in_a_while(days=1), -- cgit v1.2.1