summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVishal Kadam <vishal.kadam@cengage.com>2021-03-26 17:56:20 -0400
committerIvan Kanakarakis <ivan.kanak@gmail.com>2021-11-15 14:07:20 +0200
commit058cc801b7867c3e42588a9d57d81f9350b01238 (patch)
tree3b2059bfe7d44255dd8eefd6d9b2c562214aea41
parent5caf6da27bd81b0e191254a329c4dd3c67458f8e (diff)
downloadpysaml2-058cc801b7867c3e42588a9d57d81f9350b01238.tar.gz
633: Support for redirect binding signature check using query param values
-rw-r--r--.gitignore1
-rw-r--r--src/saml2/entity.py7
-rw-r--r--src/saml2/request.py53
-rw-r--r--src/saml2/server.py10
-rw-r--r--tests/test_51_client.py92
5 files changed, 150 insertions, 13 deletions
diff --git a/.gitignore b/.gitignore
index 534546d2..17e7bb35 100644
--- a/.gitignore
+++ b/.gitignore
@@ -123,6 +123,7 @@ venv.bak/
# PyCharm project files
.idea/
+*.iml
# mkdocs documentation
/site
diff --git a/src/saml2/entity.py b/src/saml2/entity.py
index 6658a8cf..f37ecc72 100644
--- a/src/saml2/entity.py
+++ b/src/saml2/entity.py
@@ -1024,7 +1024,8 @@ class Entity(HTTPBase):
else:
return typ
- def _parse_request(self, enc_request, request_cls, service, binding):
+ def _parse_request(self, enc_request, request_cls, service, binding,
+ relay_state=None, sigalg=None, signature=None):
"""Parse a Request
:param enc_request: The request in its transport format
@@ -1070,7 +1071,9 @@ class Entity(HTTPBase):
if only_valid_cert:
must = True
_request = _request.loads(xmlstr, binding, origdoc=enc_request,
- must=must, only_valid_cert=only_valid_cert)
+ must=must, only_valid_cert=only_valid_cert,
+ relay_state=relay_state, sigalg=sigalg,
+ signature=signature)
_log_debug("Loaded request")
diff --git a/src/saml2/request.py b/src/saml2/request.py
index 20b711cf..3698c093 100644
--- a/src/saml2/request.py
+++ b/src/saml2/request.py
@@ -7,6 +7,8 @@ from saml2.s_utils import OtherError
from saml2.validate import valid_instance
from saml2.validate import NotValid
from saml2.response import IncorrectlySigned
+from saml2.sigver import verify_redirect_signature
+from saml2.sigver import SignatureError
logger = logging.getLogger(__name__)
@@ -37,20 +39,56 @@ class Request(object):
self.not_on_or_after = 0
def _loads(self, xmldata, binding=None, origdoc=None, must=None,
- only_valid_cert=False):
- if binding == BINDING_HTTP_REDIRECT:
- pass
-
+ only_valid_cert=False, relayState=None, sigalg=None, signature=None):
# own copy
self.xmlstr = xmldata[:]
- logger.debug("xmlstr: %s", self.xmlstr)
+ logger.debug("xmlstr: %s, relayState: %s, sigalg: %s, signature: %s",
+ self.xmlstr, relayState, sigalg, signature)
try:
+ # If redirect binding, and provided SigAlg, Signature use that to verify
+ # and skip signatureCheck withing SAMLRequest/xmldata
+ _do_redirect_sig_check = False
+ _saml_msg = {}
+ if binding == BINDING_HTTP_REDIRECT and must \
+ and sigalg is not None and signature is not None:
+ logger.debug("Request signature check will be done using query param,"
+ " instead of SAMLRequest content")
+ _do_redirect_sig_check = True
+ must = False
+ _saml_msg = {
+ "SAMLRequest": origdoc,
+ "SigAlg": sigalg,
+ "Signature": signature
+ }
+ # RelayState is optional so only add when available,
+ # signature validate fails if passed as None
+ if relayState is not None:
+ _saml_msg["RelayState"] = relayState
+
self.message = self.signature_check(xmldata, origdoc=origdoc,
must=must,
only_valid_cert=only_valid_cert)
+
+ if _do_redirect_sig_check:
+ _issuer = self.message.issuer.text.strip()
+ _certs = self.sec.metadata.certs(_issuer, "any", "signing")
+ logger.debug("Certs: %s, _saml_msg: %s", _certs, _saml_msg)
+ _verified_ok = False
+ for cert in _certs:
+ if verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert):
+ _verified_ok = True
+ break
+
+ logger.info("Redirect request signature check: %s", _verified_ok)
+ # Set self.message to None, it shall raise error further down.
+ if not _verified_ok:
+ self.message = None
+ raise SignatureError('Failed to verify signature')
+
except TypeError:
raise
except Exception as excp:
+ self.message = None
logger.info("EXCEPTION: %s", excp)
if not self.message:
@@ -97,9 +135,10 @@ class Request(object):
return valid
def loads(self, xmldata, binding, origdoc=None, must=None,
- only_valid_cert=False):
+ only_valid_cert=False, relay_state=None, sigalg=None, signature=None):
return self._loads(xmldata, binding, origdoc, must,
- only_valid_cert=only_valid_cert)
+ only_valid_cert=only_valid_cert, relayState=relay_state,
+ sigalg=sigalg, signature=signature)
def verify(self):
try:
diff --git a/src/saml2/server.py b/src/saml2/server.py
index 9e34cce2..1bcf7ead 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -226,17 +226,23 @@ class Server(Entity):
# -------------------------------------------------------------------------
- def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT):
+ def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT,
+ relay_state=None, sigalg=None, signature=None):
"""Parse a Authentication Request
:param enc_request: The request in its transport format
:param binding: Which binding that was used to transport the message
+ :param relay_state: RelayState, when binding=redirect
+ :param sigalg: Signature Algorithm, when binding=redirect
+ :param signature: Signature, when binding=redirect
to this entity.
:return: A request instance
"""
return self._parse_request(enc_request, AuthnRequest,
- "single_sign_on_service", binding)
+ "single_sign_on_service", binding,
+ relay_state=relay_state, sigalg=sigalg,
+ signature=signature)
def parse_attribute_query(self, xml_string, binding):
""" Parse an attribute query
diff --git a/tests/test_51_client.py b/tests/test_51_client.py
index 0fbf63f8..7e46ccaa 100644
--- a/tests/test_51_client.py
+++ b/tests/test_51_client.py
@@ -11,7 +11,7 @@ from pytest import raises
from saml2.argtree import add_path
from saml2.cert import OpenSSLWrapper
from saml2.xmldsig import sig_default
-from saml2.xmldsig import SIG_RSA_SHA256
+from saml2.xmldsig import SIG_RSA_SHA256, SIG_RSA_SHA1
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import config
@@ -29,7 +29,8 @@ from saml2.extension.requested_attributes import RequestedAttribute
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client
from saml2.pack import parse_soap_enveloped_saml
-from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError
+from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError, \
+ IncorrectlySigned
from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NameID
@@ -172,6 +173,9 @@ class TestClient:
conf.load_file("server_conf")
self.client = Saml2Client(conf)
+ def setup_method(self):
+ self.server.config.setattr("idp", "want_authn_requests_signed", None)
+
def teardown_class(self):
self.server.close()
@@ -1524,6 +1528,90 @@ class TestClient:
qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT
)
+ def test_signed_redirect_passes_if_needs_signed_requests(self):
+ # Revert configuration change to disallow unsinged responses
+ self.client.want_response_signed = True
+ self.server.config.setattr("idp", "want_authn_requests_signed", True)
+
+ reqid, req = self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1"
+ )
+
+ info = self.client.apply_binding(
+ BINDING_HTTP_REDIRECT,
+ str(req),
+ destination="",
+ relay_state="relay2",
+ sign=True,
+ sigalg=SIG_RSA_SHA256,
+ )
+ loc = info["headers"][0][1]
+ qs = list_values2simpletons(parse.parse_qs(loc[1:]))
+
+ res = self.server.parse_authn_request(
+ qs["SAMLRequest"],
+ BINDING_HTTP_REDIRECT,
+ relay_state=qs["RelayState"],
+ sigalg=qs["SigAlg"],
+ signature=qs["Signature"]
+ )
+ assert res.message.destination == "http://localhost:8088/sso"
+ assert res.message.id == "id1"
+
+ def test_signed_redirect_fail_if_needs_signed_request_but_received_unsigned(self):
+ # Revert configuration change to disallow unsinged responses
+ self.client.want_response_signed = True
+ self.server.config.setattr("idp", "want_authn_requests_signed", True)
+
+ reqid, req = self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1"
+ )
+
+ info = self.client.apply_binding(
+ BINDING_HTTP_REDIRECT,
+ str(req),
+ destination="",
+ relay_state="relay2",
+ sign=True,
+ sigalg=SIG_RSA_SHA256,
+ )
+ loc = info["headers"][0][1]
+ qs = list_values2simpletons(parse.parse_qs(loc[1:]))
+
+ with raises(IncorrectlySigned):
+ self.server.parse_authn_request(
+ qs["SAMLRequest"], BINDING_HTTP_REDIRECT
+ )
+
+ def test_signed_redirect_fail_if_needs_signed_request_but_sigalg_not_matches(self):
+ # Revert configuration change to disallow unsinged responses
+ self.client.want_response_signed = True
+ self.server.config.setattr("idp", "want_authn_requests_signed", True)
+
+ reqid, req = self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1"
+ )
+
+ info = self.client.apply_binding(
+ BINDING_HTTP_REDIRECT,
+ str(req),
+ destination="",
+ relay_state="relay2",
+ sign=True,
+ sigalg=SIG_RSA_SHA256,
+ )
+ loc = info["headers"][0][1]
+ qs = list_values2simpletons(parse.parse_qs(loc[1:]))
+
+ with raises(IncorrectlySigned):
+ self.server.parse_authn_request(
+ qs["SAMLRequest"],
+ BINDING_HTTP_REDIRECT,
+ relay_state=qs["RelayState"],
+ sigalg=SIG_RSA_SHA1,
+ signature=qs["Signature"]
+ )
+
def test_do_logout_signed_redirect(self):
conf = config.SPConfig()
conf.load_file("sp_slo_redirect_conf")