summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2021-11-16 14:58:03 +0200
committerIvan Kanakarakis <ivan.kanak@gmail.com>2021-11-16 14:58:03 +0200
commit718cf98a3baba4642ca9321e05115157c0d387dd (patch)
tree101dc962007dd4fc52ad2953216bddf58a14b847
parent5caf6da27bd81b0e191254a329c4dd3c67458f8e (diff)
parenta3b26f3b6d3ea8122c69f7172cafc250c74f1481 (diff)
downloadpysaml2-718cf98a3baba4642ca9321e05115157c0d387dd.tar.gz
Fix signature verification for the redirect binding
- When an AuthnRequest is received by the Server - When a LogoutRequest is received by the client or the server
-rw-r--r--.gitignore1
-rw-r--r--src/saml2/client.py18
-rw-r--r--src/saml2/entity.py38
-rw-r--r--src/saml2/request.py92
-rw-r--r--src/saml2/server.py10
-rw-r--r--tests/test_51_client.py171
6 files changed, 285 insertions, 45 deletions
diff --git a/.gitignore b/.gitignore
index 534546d2..17e7bb35 100644
--- a/.gitignore
+++ b/.gitignore
@@ -123,6 +123,7 @@ venv.bak/
# PyCharm project files
.idea/
+*.iml
# mkdocs documentation
/site
diff --git a/src/saml2/client.py b/src/saml2/client.py
index aa0bd0c9..a7469d4f 100644
--- a/src/saml2/client.py
+++ b/src/saml2/client.py
@@ -630,7 +630,9 @@ class Saml2Client(Base):
sign=None,
sign_alg=None,
digest_alg=None,
- relay_state="",
+ relay_state=None,
+ sigalg=None,
+ signature=None,
):
"""
Deal with a LogoutRequest
@@ -639,6 +641,11 @@ class Saml2Client(Base):
:param name_id: The id of the current user
:param binding: Which binding the message came in over
:param sign: Whether the response will be signed or not
+ :param sign_alg: The signing algorithm for the response
+ :param digest_alg: The digest algorithm for the the response
+ :param relay_state: The relay state of the request
+ :param sigalg: The SigAlg query param of the request
+ :param signature: The Signature query param of the request
:return: Keyword arguments which can be used to send the response
what's returned follow different patterns for different bindings.
If the binding is BINDIND_SOAP, what is returned looks like this::
@@ -652,8 +659,13 @@ class Saml2Client(Base):
"""
logger.info("logout request: %s", request)
- _req = self._parse_request(request, LogoutRequest,
- "single_logout_service", binding)
+ _req = self.parse_logout_request(
+ xmlstr=request,
+ binding=binding,
+ relay_state=relay_state,
+ sigalg=sigalg,
+ signature=signature,
+ )
if _req.message.name_id == name_id:
try:
diff --git a/src/saml2/entity.py b/src/saml2/entity.py
index 6658a8cf..f818b702 100644
--- a/src/saml2/entity.py
+++ b/src/saml2/entity.py
@@ -1024,7 +1024,16 @@ class Entity(HTTPBase):
else:
return typ
- def _parse_request(self, enc_request, request_cls, service, binding):
+ def _parse_request(
+ self,
+ enc_request,
+ request_cls,
+ service,
+ binding,
+ relay_state=None,
+ sigalg=None,
+ signature=None,
+ ):
"""Parse a Request
:param enc_request: The request in its transport format
@@ -1039,8 +1048,7 @@ class Entity(HTTPBase):
_log_debug = logger.debug
# The addresses I should receive messages like this on
- receiver_addresses = self.config.endpoint(service, binding,
- self.entity_type)
+ receiver_addresses = self.config.endpoint(service, binding, self.entity_type)
if not receiver_addresses and self.entity_type == "idp":
for typ in ["aa", "aq", "pdp"]:
receiver_addresses = self.config.endpoint(service, binding, typ)
@@ -1070,7 +1078,9 @@ class Entity(HTTPBase):
if only_valid_cert:
must = True
_request = _request.loads(xmlstr, binding, origdoc=enc_request,
- must=must, only_valid_cert=only_valid_cert)
+ must=must, only_valid_cert=only_valid_cert,
+ relay_state=relay_state, sigalg=sigalg,
+ signature=signature)
_log_debug("Loaded request")
@@ -1574,7 +1584,14 @@ class Entity(HTTPBase):
# ------------------------------------------------------------------------
- def parse_logout_request(self, xmlstr, binding=BINDING_SOAP):
+ def parse_logout_request(
+ self,
+ xmlstr,
+ binding=BINDING_SOAP,
+ relay_state=None,
+ sigalg=None,
+ signature=None,
+ ):
""" Deal with a LogoutRequest
:param xmlstr: The response as a xml string
@@ -1584,8 +1601,15 @@ class Entity(HTTPBase):
was not.
"""
- return self._parse_request(xmlstr, saml_request.LogoutRequest,
- "single_logout_service", binding)
+ return self._parse_request(
+ enc_request=xmlstr,
+ request_cls=saml_request.LogoutRequest,
+ service="single_logout_service",
+ binding=binding,
+ relay_state=relay_state,
+ sigalg=sigalg,
+ signature=signature,
+ )
def use_artifact(self, message, endpoint_index=0):
"""
diff --git a/src/saml2/request.py b/src/saml2/request.py
index 20b711cf..200a1ff8 100644
--- a/src/saml2/request.py
+++ b/src/saml2/request.py
@@ -1,12 +1,15 @@
import logging
+from saml2 import time_util
+from saml2 import BINDING_HTTP_REDIRECT
+from saml2 import BINDING_HTTP_POST
from saml2.attribute_converter import to_local
-from saml2 import time_util, BINDING_HTTP_REDIRECT
from saml2.s_utils import OtherError
from saml2.validate import valid_instance
from saml2.validate import NotValid
from saml2.response import IncorrectlySigned
+from saml2.sigver import verify_redirect_signature
logger = logging.getLogger(__name__)
@@ -36,38 +39,84 @@ class Request(object):
self.message = None
self.not_on_or_after = 0
- def _loads(self, xmldata, binding=None, origdoc=None, must=None,
- only_valid_cert=False):
- if binding == BINDING_HTTP_REDIRECT:
- pass
-
+ def _loads(
+ self,
+ xmldata,
+ binding=None,
+ origdoc=None,
+ must=None,
+ only_valid_cert=False,
+ relay_state=None,
+ sigalg=None,
+ signature=None,
+ ):
# own copy
self.xmlstr = xmldata[:]
- logger.debug("xmlstr: %s", self.xmlstr)
+ logger.debug("xmlstr: %s, relay_state: %s, sigalg: %s, signature: %s",
+ self.xmlstr, relay_state, sigalg, signature)
+
+ signed_post = must and binding == BINDING_HTTP_POST
+ signed_redirect = must and binding == BINDING_HTTP_REDIRECT
+ incorrectly_signed = IncorrectlySigned("Request was not signed correctly")
+
try:
- self.message = self.signature_check(xmldata, origdoc=origdoc,
- must=must,
- only_valid_cert=only_valid_cert)
- except TypeError:
- raise
- except Exception as excp:
- logger.info("EXCEPTION: %s", excp)
+ self.message = self.signature_check(
+ xmldata,
+ origdoc=origdoc,
+ must=signed_post,
+ only_valid_cert=only_valid_cert,
+ )
+ except Exception as e:
+ self.message = None
+ raise incorrectly_signed from e
+
+ if signed_redirect:
+ if sigalg is None or signature is None:
+ raise incorrectly_signed
+
+ _saml_msg = {
+ "SAMLRequest": origdoc,
+ "Signature": signature,
+ "SigAlg": sigalg,
+ }
+ if relay_state is not None:
+ _saml_msg["RelayState"] = relay_state
+ try:
+ sig_verified = self._do_redirect_sig_check(_saml_msg)
+ except Exception as e:
+ self.message = None
+ raise incorrectly_signed from e
+ else:
+ if not sig_verified:
+ self.message = None
+ raise incorrectly_signed
if not self.message:
- logger.error("Response was not correctly signed")
- logger.info("Response: %s", xmldata)
- raise IncorrectlySigned()
+ logger.error("Request was not signed correctly")
+ logger.info("Request data: %s", xmldata)
+ raise incorrectly_signed
- logger.info("request: %s", self.message)
+ logger.info("Request message: %s", self.message)
try:
valid_instance(self.message)
except NotValid as exc:
- logger.error("Not valid request: %s", exc.args[0])
+ logger.error("Request not valid: %s", exc.args[0])
raise
return self
+ def _do_redirect_sig_check(self, _saml_msg):
+ issuer = self.message.issuer.text.strip()
+ certs = self.sec.metadata.certs(issuer, "any", "signing")
+ logger.debug("Certs to verify request sig: %s, _saml_msg: %s", certs, _saml_msg)
+ verified = any(
+ verify_redirect_signature(_saml_msg, self.sec.sec_backend, cert)
+ for cert_name, cert in certs
+ )
+ logger.info("Redirect request signature check: %s", verified)
+ return verified
+
def issue_instant_ok(self):
""" Check that the request was issued at a reasonable time """
upper = time_util.shift_time(time_util.time_in_a_while(days=1),
@@ -97,9 +146,10 @@ class Request(object):
return valid
def loads(self, xmldata, binding, origdoc=None, must=None,
- only_valid_cert=False):
+ only_valid_cert=False, relay_state=None, sigalg=None, signature=None):
return self._loads(xmldata, binding, origdoc, must,
- only_valid_cert=only_valid_cert)
+ only_valid_cert=only_valid_cert, relay_state=relay_state,
+ sigalg=sigalg, signature=signature)
def verify(self):
try:
diff --git a/src/saml2/server.py b/src/saml2/server.py
index 9e34cce2..1bcf7ead 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -226,17 +226,23 @@ class Server(Entity):
# -------------------------------------------------------------------------
- def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT):
+ def parse_authn_request(self, enc_request, binding=BINDING_HTTP_REDIRECT,
+ relay_state=None, sigalg=None, signature=None):
"""Parse a Authentication Request
:param enc_request: The request in its transport format
:param binding: Which binding that was used to transport the message
+ :param relay_state: RelayState, when binding=redirect
+ :param sigalg: Signature Algorithm, when binding=redirect
+ :param signature: Signature, when binding=redirect
to this entity.
:return: A request instance
"""
return self._parse_request(enc_request, AuthnRequest,
- "single_sign_on_service", binding)
+ "single_sign_on_service", binding,
+ relay_state=relay_state, sigalg=sigalg,
+ signature=signature)
def parse_attribute_query(self, xml_string, binding):
""" Parse an attribute query
diff --git a/tests/test_51_client.py b/tests/test_51_client.py
index 0fbf63f8..a323de79 100644
--- a/tests/test_51_client.py
+++ b/tests/test_51_client.py
@@ -12,6 +12,7 @@ from saml2.argtree import add_path
from saml2.cert import OpenSSLWrapper
from saml2.xmldsig import sig_default
from saml2.xmldsig import SIG_RSA_SHA256
+from saml2.xmldsig import SIG_RSA_SHA1
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import config
@@ -30,6 +31,7 @@ from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client
from saml2.pack import parse_soap_enveloped_saml
from saml2.response import LogoutResponse, StatusInvalidNameidPolicy, StatusError
+from saml2.response import IncorrectlySigned
from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NameID
@@ -172,6 +174,9 @@ class TestClient:
conf.load_file("server_conf")
self.client = Saml2Client(conf)
+ def setup_method(self):
+ self.server.config.setattr("idp", "want_authn_requests_signed", None)
+
def teardown_class(self):
self.server.close()
@@ -1524,6 +1529,90 @@ class TestClient:
qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT
)
+ def test_signed_redirect_passes_if_needs_signed_requests(self):
+ # Revert configuration change to disallow unsinged responses
+ self.client.want_response_signed = True
+ self.server.config.setattr("idp", "want_authn_requests_signed", True)
+
+ reqid, req = self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1"
+ )
+
+ info = self.client.apply_binding(
+ BINDING_HTTP_REDIRECT,
+ str(req),
+ destination="",
+ relay_state="relay2",
+ sign=True,
+ sigalg=SIG_RSA_SHA256,
+ )
+ loc = info["headers"][0][1]
+ qs = list_values2simpletons(parse.parse_qs(loc[1:]))
+
+ res = self.server.parse_authn_request(
+ qs["SAMLRequest"],
+ BINDING_HTTP_REDIRECT,
+ relay_state=qs["RelayState"],
+ sigalg=qs["SigAlg"],
+ signature=qs["Signature"]
+ )
+ assert res.message.destination == "http://localhost:8088/sso"
+ assert res.message.id == "id1"
+
+ def test_signed_redirect_fail_if_needs_signed_request_but_received_unsigned(self):
+ # Revert configuration change to disallow unsinged responses
+ self.client.want_response_signed = True
+ self.server.config.setattr("idp", "want_authn_requests_signed", True)
+
+ reqid, req = self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1"
+ )
+
+ info = self.client.apply_binding(
+ BINDING_HTTP_REDIRECT,
+ str(req),
+ destination="",
+ relay_state="relay2",
+ sign=True,
+ sigalg=SIG_RSA_SHA256,
+ )
+ loc = info["headers"][0][1]
+ qs = list_values2simpletons(parse.parse_qs(loc[1:]))
+
+ with raises(IncorrectlySigned):
+ self.server.parse_authn_request(
+ qs["SAMLRequest"], BINDING_HTTP_REDIRECT
+ )
+
+ def test_signed_redirect_fail_if_needs_signed_request_but_sigalg_not_matches(self):
+ # Revert configuration change to disallow unsinged responses
+ self.client.want_response_signed = True
+ self.server.config.setattr("idp", "want_authn_requests_signed", True)
+
+ reqid, req = self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1"
+ )
+
+ info = self.client.apply_binding(
+ BINDING_HTTP_REDIRECT,
+ str(req),
+ destination="",
+ relay_state="relay2",
+ sign=True,
+ sigalg=SIG_RSA_SHA256,
+ )
+ loc = info["headers"][0][1]
+ qs = list_values2simpletons(parse.parse_qs(loc[1:]))
+
+ with raises(IncorrectlySigned):
+ self.server.parse_authn_request(
+ qs["SAMLRequest"],
+ BINDING_HTTP_REDIRECT,
+ relay_state=qs["RelayState"],
+ sigalg=SIG_RSA_SHA1,
+ signature=qs["Signature"]
+ )
+
def test_do_logout_signed_redirect(self):
conf = config.SPConfig()
conf.load_file("sp_slo_redirect_conf")
@@ -1555,14 +1644,68 @@ class TestClient:
loc = info["headers"][0][1]
_, _, _, _, qs, _ = parse.urlparse(loc)
qs = parse.parse_qs(qs)
- assert _leq(qs.keys(),
- ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
+ assert _leq(qs.keys(), ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
- assert verify_redirect_signature(list_values2simpletons(qs),
- client.sec.sec_backend)
+ qs_simple = list_values2simpletons(qs)
+ assert verify_redirect_signature(qs_simple, client.sec.sec_backend)
- res = self.server.parse_logout_request(qs["SAMLRequest"][0],
- BINDING_HTTP_REDIRECT)
+ res = self.server.parse_logout_request(
+ qs_simple["SAMLRequest"],
+ BINDING_HTTP_REDIRECT,
+ relay_state=qs_simple['RelayState'],
+ sigalg=qs_simple['SigAlg'],
+ signature=qs_simple['Signature'],
+ )
+
+ def test_do_logout_signed_redirect_invalid(self):
+ conf = config.SPConfig()
+ conf.load_file("sp_slo_redirect_conf")
+ client = Saml2Client(conf)
+
+ session_info = {
+ "name_id": nid,
+ "issuer": "urn:mace:example.com:saml:roland:idp",
+ "not_on_or_after": in_a_while(minutes=15),
+ "ava": {
+ "givenName": "Anders",
+ "sn": "Andersson",
+ "mail": "anders.andersson@example.com"
+ }
+ }
+ client.users.add_information_about_person(session_info)
+ entity_ids = client.users.issuers_of_info(nid)
+
+ resp = client.do_logout(
+ nid,
+ entity_ids,
+ "Tired",
+ in_a_while(minutes=5),
+ sign=True,
+ expected_binding=BINDING_HTTP_REDIRECT,
+ )
+
+ binding, info = resp[entity_ids[0]]
+ loc = info["headers"][0][1]
+ _, _, _, _, qs, _ = parse.urlparse(loc)
+ qs = parse.parse_qs(qs)
+ qs_simple = list_values2simpletons(qs)
+
+ invalid_signature = 'ZEdMZUQ3SjBjQ2ozWmlGaHhyV3JZSzNkTWhQWU02bjA0dzVNeUd1UWgrVDhnYm1oc1R1TTFjPQo='
+ qs_simple_invalid = {
+ **qs_simple,
+ 'Signature': invalid_signature,
+ }
+ assert not verify_redirect_signature(qs_simple_invalid, client.sec.sec_backend)
+
+ self.server.config.setattr("idp", "want_authn_requests_signed", True)
+ with raises(IncorrectlySigned):
+ res = self.server.parse_logout_request(
+ qs_simple["SAMLRequest"],
+ BINDING_HTTP_REDIRECT,
+ relay_state=qs_simple['RelayState'],
+ sigalg=qs_simple['SigAlg'],
+ signature=invalid_signature,
+ )
def test_do_logout_post(self):
# information about the user from an IdP
@@ -3156,14 +3299,18 @@ class TestClientNonAsciiAva:
loc = info["headers"][0][1]
_, _, _, _, qs, _ = parse.urlparse(loc)
qs = parse.parse_qs(qs)
- assert _leq(qs.keys(),
- ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
+ assert _leq(qs.keys(), ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
- assert verify_redirect_signature(list_values2simpletons(qs),
- client.sec.sec_backend)
+ qs_simple = list_values2simpletons(qs)
+ assert verify_redirect_signature(qs_simple, client.sec.sec_backend)
- res = self.server.parse_logout_request(qs["SAMLRequest"][0],
- BINDING_HTTP_REDIRECT)
+ res = self.server.parse_logout_request(
+ qs_simple["SAMLRequest"],
+ BINDING_HTTP_REDIRECT,
+ relay_state=qs_simple['RelayState'],
+ sigalg=qs_simple['SigAlg'],
+ signature=qs_simple['Signature'],
+ )
def test_do_logout_post(self):
# information about the user from an IdP