#!/usr/bin/env python from base64 import decodebytes as b64decode from base64 import encodebytes as b64encode from urllib import parse import uuid from defusedxml.common import EntitiesForbidden from fakeIDP import FakeIDP from fakeIDP import unpack_form from pathutils import full_path from pytest import raises from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_REDIRECT from saml2 import VERSION from saml2 import class_name from saml2 import config from saml2 import extension_elements_to_elements from saml2 import s_utils from saml2 import saml from saml2 import samlp from saml2 import sigver from saml2.argtree import add_path from saml2.assertion import Assertion from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.cert import OpenSSLWrapper from saml2.client import Saml2Client from saml2.extension.requested_attributes import RequestedAttribute from saml2.extension.requested_attributes import RequestedAttributes from saml2.pack import parse_soap_enveloped_saml from saml2.response import IncorrectlySigned from saml2.response import LogoutResponse from saml2.response import StatusError from saml2.response import StatusInvalidNameidPolicy from saml2.s_utils import do_attribute_statement from saml2.s_utils import factory from saml2.s_utils import sid from saml2.saml import NAMEID_FORMAT_PERSISTENT from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import Advice from saml2.saml import EncryptedAssertion from saml2.saml import NameID from saml2.samlp import SessionIndex from saml2.server import Server from saml2.sigver import SignatureError from saml2.sigver import SigverError from saml2.sigver import pre_encrypt_assertion from saml2.sigver import pre_encryption_part from saml2.sigver import rm_xmltag from saml2.sigver import verify_redirect_signature from saml2.time_util import a_while_ago from saml2.time_util import in_a_while from saml2.time_util import instant from saml2.xmldsig import SIG_RSA_SHA1 from saml2.xmldsig import SIG_RSA_SHA256 from saml2.xmldsig import sig_default AUTHN = {"class_ref": INTERNETPROTOCOLPASSWORD, "authn_auth": "http://www.example.com/login"} def response_factory(**kwargs): response = samlp.Response(id=sid(), version=VERSION, issue_instant=instant()) for key, val in kwargs.items(): setattr(response, key, val) return response def generate_cert(): sn = uuid.uuid4().urn cert_info = { "cn": "localhost", "country_code": "se", "state": "ac", "city": "Umea", "organization": "ITS", "organization_unit": "DIRG", } osw = OpenSSLWrapper() ca_cert_str = osw.read_str_from_file(full_path("root_cert/localhost.ca.crt")) ca_key_str = osw.read_str_from_file(full_path("root_cert/localhost.ca.key")) req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True, sn=sn, key_length=2048) cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) return cert_str, req_key_str def add_subelement(xmldoc, node_name, subelem): _str = str s = xmldoc.find(node_name) if s > 0: x = xmldoc.rindex("<", 0, s) tag = xmldoc[x + 1 : s - 1] c = s + len(node_name) spaces = "" while xmldoc[c] == " ": spaces += " " c += 1 # Sometimes we get an xml header, sometimes we don't. subelem_str = _str(subelem) if subelem_str[0:5].lower() == "", f"<{tag}:{node_name}{spaces}>{subelem_str}", ) return xmldoc def for_me(condition, me): for restriction in condition.audience_restriction: audience = restriction.audience if audience.text.strip() == me: return True def ava(attribute_statement): result = {} for attribute in attribute_statement.attribute: # Check name_format ?? name = attribute.name.strip() result[name] = [] for value in attribute.attribute_value: result[name].append(value.text.strip()) return result def _leq(l1, l2): return set(l1) == set(l2) REQ1 = { "1.2.14": """ urn:mace:example.com:saml:roland:spE8042FB4-4D5B-48C3-8E14-8EDD852790DD""", "1.2.16": """ urn:mace:example .com:saml:roland:spE8042FB4-4D5B-48C3-8E14-8EDD852790DD""", } nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="123456") def list_values2simpletons(_dict): return {k: v[0] for k, v in _dict.items()} class TestClient: def setup_class(self): self.server = Server("idp_conf") conf = config.SPConfig() conf.load_file("server_conf") self.client = Saml2Client(conf) def setup_method(self): self.server.config.setattr("idp", "want_authn_requests_signed", None) def teardown_class(self): self.server.close() def test_create_attribute_query1(self): req_id, req = self.client.create_attribute_query( "https://idp.example.com/idp/", "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", format=saml.NAMEID_FORMAT_PERSISTENT, message_id="id1", ) reqstr = f"{req.to_string().decode()}" assert req.destination == "https://idp.example.com/idp/" assert req.id == "id1" assert req.version == "2.0" subject = req.subject name_id = subject.name_id assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD" issuer = req.issuer assert issuer.text == "urn:mace:example.com:saml:roland:sp" attrq = samlp.attribute_query_from_string(reqstr) assert _leq(attrq.keyswv(), ["destination", "subject", "issue_instant", "version", "id", "issuer"]) assert attrq.destination == req.destination assert attrq.id == req.id assert attrq.version == req.version assert attrq.issuer.text == issuer.text assert attrq.issue_instant == req.issue_instant assert attrq.subject.name_id.format == name_id.format assert attrq.subject.name_id.text == name_id.text def test_create_attribute_query2(self): req_id, req = self.client.create_attribute_query( "https://idp.example.com/idp/", "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", attribute={ ("urn:oid:2.5.4.42", "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", "givenName"): None, ("urn:oid:2.5.4.4", "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", "surname"): None, ("urn:oid:1.2.840.113549.1.9.1", "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None, }, format=saml.NAMEID_FORMAT_PERSISTENT, message_id="id1", ) assert req.destination == "https://idp.example.com/idp/" assert req.id == "id1" assert req.version == "2.0" subject = req.subject name_id = subject.name_id assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD" assert len(req.attribute) == 3 # one is givenName seen = [] for attribute in req.attribute: if attribute.name == "urn:oid:2.5.4.42": assert attribute.name_format == saml.NAME_FORMAT_URI assert attribute.friendly_name == "givenName" seen.append("givenName") elif attribute.name == "urn:oid:2.5.4.4": assert attribute.name_format == saml.NAME_FORMAT_URI assert attribute.friendly_name == "surname" seen.append("surname") elif attribute.name == "urn:oid:1.2.840.113549.1.9.1": assert attribute.name_format == saml.NAME_FORMAT_URI if getattr(attribute, "friendly_name"): assert False seen.append("email") assert _leq(seen, ["givenName", "surname", "email"]) def test_create_attribute_query_3(self): req_id, req = self.client.create_attribute_query( "https://aai-demo-idp.switch.ch/idp/shibboleth", "_e7b68a04488f715cda642fbdd90099f5", format=NAMEID_FORMAT_TRANSIENT, message_id="id1", ) assert isinstance(req, samlp.AttributeQuery) assert req.destination == "https://aai-demo-idp.switch" ".ch/idp/shibboleth" assert req.id == "id1" assert req.version == "2.0" assert req.issue_instant assert req.issuer.text == "urn:mace:example.com:saml:roland:sp" nameid = req.subject.name_id assert nameid.format == NAMEID_FORMAT_TRANSIENT assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5" def test_create_auth_request_0(self): ar_str = ( "%s" % self.client.create_authn_request( "http://www.example.com/sso", message_id="id1", nameid_format=NAMEID_FORMAT_TRANSIENT, )[1] ) ar = samlp.authn_request_from_string(ar_str) assert ar.assertion_consumer_service_url == ("http://lingon.catalogix" ".se:8087/") assert ar.destination == "http://www.example.com/sso" assert ar.protocol_binding == BINDING_HTTP_POST assert ar.version == "2.0" assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" nid_policy = ar.name_id_policy assert nid_policy.allow_create is None assert nid_policy.format == NAMEID_FORMAT_TRANSIENT node_requested_attributes = None for e in ar.extensions.extension_elements: if e.tag == RequestedAttributes.c_tag: node_requested_attributes = e break assert node_requested_attributes is not None for c in node_requested_attributes.children: assert c.tag == RequestedAttribute.c_tag assert c.attributes["isRequired"] in ["true", "false"] assert c.attributes["Name"] assert c.attributes["FriendlyName"] assert c.attributes["NameFormat"] def test_create_auth_request_requested_attributes(self): req_attr = [{"friendly_name": "eduPersonOrgUnitDN", "required": True}] ar_id, ar = self.client.create_authn_request( "http://www.example.com/sso", message_id="id1", requested_attributes=req_attr ) req_attrs_nodes = (e for e in ar.extensions.extension_elements if e.tag == RequestedAttributes.c_tag) req_attrs_node = next(req_attrs_nodes, None) assert req_attrs_node is not None attrs = (child for child in req_attrs_node.children if child.friendly_name == "eduPersonOrgUnitDN") attr = next(attrs, None) assert attr is not None assert attr.c_tag == RequestedAttribute.c_tag assert attr.is_required == "true" assert attr.name == "urn:mace:dir:attribute-def:eduPersonOrgUnitDN" assert attr.friendly_name == "eduPersonOrgUnitDN" assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:basic" def test_create_auth_request_unset_force_authn_by_default(self): req_id, req = self.client.create_authn_request("http://www.example.com/sso", sign=False, message_id="id1") assert req.force_authn is None def test_create_auth_request_set_force_authn_not_true_or_1(self): req_id, req = self.client.create_authn_request( "http://www.example.com/sso", sign=False, message_id="id1", force_authn="0", ) assert req.force_authn is None def test_create_auth_request_set_force_authn_true(self): req_id, req = self.client.create_authn_request( "http://www.example.com/sso", sign=False, message_id="id1", force_authn="true", ) assert req.force_authn == "true" def test_create_auth_request_set_force_authn_1(self): req_id, req = self.client.create_authn_request( "http://www.example.com/sso", sign=False, message_id="id1", force_authn="true", ) assert req.force_authn == "true" def test_create_auth_request_nameid_policy_allow_create(self): conf = config.SPConfig() conf.load_file("sp_conf_nameidpolicy") client = Saml2Client(conf) ar_str = f"{client.create_authn_request('http://www.example.com/sso', message_id='id1')[1]}" ar = samlp.authn_request_from_string(ar_str) assert ar.assertion_consumer_service_url == ("http://lingon.catalogix" ".se:8087/") assert ar.destination == "http://www.example.com/sso" assert ar.protocol_binding == BINDING_HTTP_POST assert ar.version == "2.0" assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" nid_policy = ar.name_id_policy assert nid_policy.allow_create == "true" assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT def test_create_auth_request_vo(self): assert list(self.client.config.vorg.keys()) == ["urn:mace:example.com:it:tek"] ar_str = ( "%s" % self.client.create_authn_request( "http://www.example.com/sso", "urn:mace:example.com:it:tek", # vo nameid_format=NAMEID_FORMAT_PERSISTENT, message_id="666", )[1] ) ar = samlp.authn_request_from_string(ar_str) assert ar.id == "666" assert ar.assertion_consumer_service_url == "http://lingon.catalogix" ".se:8087/" assert ar.destination == "http://www.example.com/sso" assert ar.protocol_binding == BINDING_HTTP_POST assert ar.version == "2.0" assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" nid_policy = ar.name_id_policy assert nid_policy.allow_create == "false" assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek" def test_sign_auth_request_0(self): req_id, areq = self.client.create_authn_request("http://www.example.com/sso", sign=True, message_id="id1") ar_str = f"{areq}" ar = samlp.authn_request_from_string(ar_str) assert ar assert ar.signature assert ar.signature.signature_value signed_info = ar.signature.signed_info assert len(signed_info.reference) == 1 assert signed_info.reference[0].uri == "#id1" assert signed_info.reference[0].digest_value try: assert self.client.sec.correctly_signed_authn_request( ar_str, self.client.config.xmlsec_binary, self.client.config.metadata ) except Exception: # missing certificate self.client.sec.verify_signature(ar_str, node_name=class_name(ar)) def test_logout_response(self): req_id, req = self.server.create_logout_request( "http://localhost:8088/slo", "urn:mace:example.com:saml:roland:sp", name_id=nid, reason="Tired", expire=in_a_while(minutes=15), session_indexes=["_foo"], ) info = self.client.apply_binding(BINDING_HTTP_POST, req, destination="", relay_state="relay2") _dic_info = unpack_form(info["data"], "SAMLRequest") samlreq = _dic_info["SAMLRequest"] resphttp = self.client.handle_logout_request(samlreq, nid, BINDING_HTTP_POST) _dic = unpack_form(resphttp["data"], "SAMLResponse") xml = b64decode(_dic["SAMLResponse"].encode("UTF-8")) # Signature found assert xml.decode("UTF-8").find(r"Signature") > 0 # Try again with logout_responses_signed=False self.client.logout_responses_signed = False resphttp = self.client.handle_logout_request(samlreq, nid, BINDING_HTTP_POST) _dic = unpack_form(resphttp["data"], "SAMLResponse") xml = b64decode(_dic["SAMLResponse"].encode("UTF-8")) # Signature not found assert xml.decode("UTF-8").find(r"Signature") < 0 def test_create_logout_request(self): req_id, req = self.client.create_logout_request( "http://localhost:8088/slo", "urn:mace:example.com:saml:roland:idp", name_id=nid, reason="Tired", expire=in_a_while(minutes=15), session_indexes=["_foo"], ) assert req.destination == "http://localhost:8088/slo" assert req.reason == "Tired" assert req.version == "2.0" assert req.name_id == nid assert req.issuer.text == "urn:mace:example.com:saml:roland:sp" assert req.session_index == [SessionIndex("_foo")] def test_response_1(self): IDP = "urn:mace:example.com:saml:roland:idp" ava = {"givenName": ["Derek"], "sn": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": ["The man"]} nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id_policy=nameid_policy, sign_response=True, userid="foba0001@example.com", authn=AUTHN, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) assert authn_response is not None assert authn_response.issuer() == IDP assert authn_response.response.assertion[0].issuer.text == IDP session_info = authn_response.session_info() assert session_info["ava"] == { "mail": ["derek@nyy.mlb.com"], "givenName": ["Derek"], "sn": ["Jeter"], "title": ["The man"], } assert session_info["issuer"] == IDP assert session_info["came_from"] == "http://foo.example.com/service" response = samlp.response_from_string(authn_response.xmlstr) assert response.destination == "http://lingon.catalogix.se:8087/" assert "session_index" in session_info # One person in the cache assert len(self.client.users.subjects()) == 1 subject_id = self.client.users.subjects()[0] # The information I have about the subject comes from one source assert self.client.users.issuers_of_info(subject_id) == [IDP] # --- authenticate another person ava = {"givenName": ["Alfonson"], "sn": ["Soriano"], "mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]} resp_str = "%s" % self.server.create_authn_response( identity=ava, in_response_to="id2", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", sign_response=True, name_id_policy=nameid_policy, userid="also0001@example.com", authn=AUTHN, ) resp_str = b64encode(resp_str.encode()) self.client.parse_authn_request_response(resp_str, BINDING_HTTP_POST, {"id2": "http://foo.example.com/service"}) # Two persons in the cache assert len(self.client.users.subjects()) == 2 issuers = [self.client.users.issuers_of_info(s) for s in self.client.users.subjects()] # The information I have about the subjects comes from the same source assert issuers == [[IDP], [IDP]] def test_response_2(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() cert_str, cert_key_str = generate_cert() cert = {"cert": cert_str, "key": cert_key_str} self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}, {"id1": cert} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_3(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_4(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_5(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") cert_str, cert_key_str = generate_cert() cert = {"cert": cert_str, "key": cert_key_str} resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_assertion=cert_str, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}, {"id1": cert} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_6(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") cert_assertion_str, cert_key_assertion_str = generate_cert() cert_assertion = {"cert": cert_assertion_str, "key": cert_key_assertion_str} cert_advice_str, cert_key_advice_str = generate_cert() cert_advice = {"cert": cert_advice_str, "key": cert_key_advice_str} resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_assertion=cert_assertion_str, encrypt_cert_advice=cert_advice_str, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}, {"id1": [cert_assertion, cert_advice]}, ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_7(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=True, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_8(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") cert_str, cert_key_str = generate_cert() cert = {"cert": cert_str, "key": cert_key_str} resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypt_cert_assertion=cert_str, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}, {"id1": cert} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_no_name_id(self): """Test that the SP client can parse an authentication response from an IdP that does not contain a element.""" conf = config.SPConfig() conf.load_file("server_conf") client = Saml2Client(conf) # Use the same approach as the other tests for mocking up # an authentication response to parse. idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() # Mock up an authentication response but do not encrypt it # nor sign it since below we will modify it directly. Note that # setting name_id to None still results in a response that includes # a element. resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=None, userid="foba0001@example.com", authn=AUTHN, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=False, ) # The create_authn_response method above will return an instance # of saml2.samlp.Response when neither encrypting nor signing and # so we can remove the element directly. resp.assertion.subject.name_id = None # Assert that the response does not contain a NameID element so that # the parsing below is a fair test. assert str(resp).find("NameID") == -1 # Cast the response to a string and encode it to mock up the payload # the SP client is expected to receive via HTTP POST binding. resp_str = b64encode(str(resp).encode()) # We do not need the client to verify a signature for this test. client.want_assertions_signed = False client.want_response_signed = False # Parse the authentication response that does not include a . authn_response = client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) # A successful test is parsing the response. assert authn_response is not None def setup_verify_authn_response(self): idp = "urn:mace:example.com:saml:roland:idp" ava = {"givenName": ["Derek"], "sn": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": ["The man"]} ava_verify = {"mail": ["derek@nyy.mlb.com"], "givenName": ["Derek"], "sn": ["Jeter"], "title": ["The man"]} nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) return idp, ava, ava_verify, nameid_policy def verify_authn_response(self, idp, authn_response, _client, ava_verify): assert authn_response is not None assert authn_response.issuer() == idp assert authn_response.assertion.issuer.text == idp session_info = authn_response.session_info() assert session_info["ava"] == ava_verify assert session_info["issuer"] == idp assert session_info["came_from"] == "http://foo.example.com/service" response = samlp.response_from_string(authn_response.xmlstr) assert response.destination == "http://lingon.catalogix.se:8087/" # One person in the cache assert len(_client.users.subjects()) == 1 subject_id = _client.users.subjects()[0] # The information I have about the subject comes from one source assert _client.users.issuers_of_info(subject_id) == [idp] def test_init_values(self): entityid = self.client.config.entityid assert entityid == "urn:mace:example.com:saml:roland:sp" location = self.client._sso_location() assert location == "http://localhost:8088/sso" my_name = self.client._my_name() assert my_name == "urn:mace:example.com:saml:roland:sp" def test_sign_then_encrypt_assertion(self): # Begin with the IdPs side _sec = self.server.sec assertion = s_utils.assertion_factory( subject=factory(saml.Subject, text="_aaa", name_id=factory(saml.NameID, format=NAMEID_FORMAT_TRANSIENT)), attribute_statement=do_attribute_statement( { ("", "", "sn"): ("Jeter", ""), ("", "", "givenName"): ("Derek", ""), } ), issuer=self.server._issuer(), ) assertion.signature = sigver.pre_signature_part(assertion.id, _sec.my_cert, 1) sigass = _sec.sign_statement( assertion, class_name(assertion), key_file=full_path("test.key"), node_id=assertion.id ) # Create an Assertion instance from the signed assertion _ass = saml.assertion_from_string(sigass) response = response_factory( in_response_to="_012345", destination="https:#www.example.com", status=s_utils.success_status_factory(), issuer=self.server._issuer(), assertion=_ass, ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[0]["cert_file"], pre_encryption_part(), ) seresp = samlp.response_from_string(enctext) # Now over to the client side _csec = self.client.sec if seresp.encrypted_assertion: decr_text = _csec.decrypt(enctext) seresp = samlp.response_from_string(decr_text) resp_ass = [] sign_cert_file = full_path("test.pem") for enc_ass in seresp.encrypted_assertion: assers = extension_elements_to_elements(enc_ass.extension_elements, [saml, samlp]) for ass in assers: if ass.signature: if not _csec.verify_signature(f"{ass}", sign_cert_file, node_name=class_name(ass)): continue resp_ass.append(ass) seresp.assertion = resp_ass seresp.encrypted_assertion = None assert seresp.assertion def test_sign_then_encrypt_assertion2(self): # Begin with the IdPs side _sec = self.server.sec nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) asser = Assertion({"givenName": "Derek", "sn": "Jeter"}) farg = add_path({}, ["assertion", "subject", "subject_confirmation", "method", saml.SCM_BEARER]) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "in_response_to", "_012345"], ) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "recipient", "http://lingon.catalogix.se:8087/"], ) assertion = asser.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), name_id=factory(saml.NameID, format=NAMEID_FORMAT_TRANSIENT), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", farg=farg["assertion"], ) assertion.signature = sigver.pre_signature_part(assertion.id, _sec.my_cert, 1) sigass = _sec.sign_statement( assertion, class_name(assertion), key_file=self.client.sec.key_file, node_id=assertion.id ) sigass = rm_xmltag(sigass) response = response_factory( in_response_to="_012345", destination="http://lingon.catalogix.se:8087/", status=s_utils.success_status_factory(), issuer=self.server._issuer(), encrypted_assertion=EncryptedAssertion(), ) xmldoc = f"{response}" # strangely enough I get different tags if I run this test separately # or as part of a bunch of tests. xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass) enctext = _sec.crypto.encrypt_assertion( xmldoc, self.client.sec.encryption_keypairs[1]["cert_file"], pre_encryption_part() ) # seresp = samlp.response_from_string(enctext) resp_str = b64encode(enctext.encode()) # Now over to the client side # Explicitely allow unsigned responses for this and the following 2 tests self.client.want_response_signed = False resp = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"_012345": "http://foo.example.com/service"} ) # assert resp.encrypted_assertion == [] assert resp.assertion assert resp.ava == {"givenName": ["Derek"], "sn": ["Jeter"]} def test_sign_then_encrypt_assertion_advice_1(self): # Begin with the IdPs side _sec = self.server.sec nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) asser = Assertion({"givenName": "Derek", "sn": "Jeter"}) subject_confirmation_specs = { "recipient": "http://lingon.catalogix.se:8087/", "in_response_to": "_012345", "subject_confirmation_method": saml.SCM_BEARER, } name_id = factory(saml.NameID, format=NAMEID_FORMAT_TRANSIENT) farg = add_path({}, ["assertion", "subject", "subject_confirmation", "method", saml.SCM_BEARER]) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "in_response_to", "_012345"], ) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "recipient", "http://lingon.catalogix.se:8087/"], ) assertion = asser.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), name_id=name_id, authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", farg=farg["assertion"], ) a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"}) a_assertion = a_asser.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_assertion.signature = sigver.pre_signature_part(a_assertion.id, _sec.my_cert, 1) assertion.advice = Advice() assertion.advice.encrypted_assertion = [] assertion.advice.encrypted_assertion.append(EncryptedAssertion()) assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion) response = response_factory( in_response_to="_012345", destination="http://lingon.catalogix.se:8087/", status=s_utils.success_status_factory(), issuer=self.server._issuer(), ) response.assertion.append(assertion) response = _sec.sign_statement( f"{response}", class_name(a_assertion), key_file=self.client.sec.key_file, node_id=a_assertion.id ) # xmldoc = "%s" % response # strangely enough I get different tags if I run this test separately # or as part of a bunch of tests. # xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[0]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) # seresp = samlp.response_from_string(enctext) resp_str = b64encode(enctext.encode()) # Now over to the client side resp = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"_012345": "http://foo.example.com/service"} ) # assert resp.encrypted_assertion == [] assert resp.assertion assert resp.assertion.advice assert resp.assertion.advice.assertion assert resp.ava == { "sn": ["Jeter"], "givenName": ["Derek"], "uid": ["test01"], "email": ["test.testsson@test.se"], } def test_sign_then_encrypt_assertion_advice_2(self): # Begin with the IdPs side _sec = self.server.sec nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) asser_1 = Assertion({"givenName": "Derek"}) farg = add_path({}, ["assertion", "subject", "subject_confirmation", "method", saml.SCM_BEARER]) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "in_response_to", "_012345"], ) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "recipient", "http://lingon.catalogix.se:8087/"], ) name_id = factory(saml.NameID, format=NAMEID_FORMAT_TRANSIENT) assertion_1 = asser_1.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) asser_2 = Assertion({"sn": "Jeter"}) assertion_2 = asser_2.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_asser_1 = Assertion({"uid": "test01"}) a_assertion_1 = a_asser_1.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_asser_2 = Assertion({"email": "test.testsson@test.se"}) a_assertion_2 = a_asser_2.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_asser_3 = Assertion({"street": "street"}) a_assertion_3 = a_asser_3.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_asser_4 = Assertion({"title": "title"}) a_assertion_4 = a_asser_4.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_assertion_1.signature = sigver.pre_signature_part(a_assertion_1.id, _sec.my_cert, 1) a_assertion_2.signature = sigver.pre_signature_part(a_assertion_2.id, _sec.my_cert, 1) a_assertion_3.signature = sigver.pre_signature_part(a_assertion_3.id, _sec.my_cert, 1) a_assertion_4.signature = sigver.pre_signature_part(a_assertion_4.id, _sec.my_cert, 1) assertion_1.signature = sigver.pre_signature_part(assertion_1.id, _sec.my_cert, 1) assertion_2.signature = sigver.pre_signature_part(assertion_2.id, _sec.my_cert, 1) response = response_factory( in_response_to="_012345", destination="http://lingon.catalogix.se:8087/", status=s_utils.success_status_factory(), issuer=self.server._issuer(), ) response.assertion = assertion_1 response.assertion.advice = Advice() response.assertion.advice.encrypted_assertion = [] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_1) advice_tag = response.assertion.advice._to_element_tree().tag assertion_tag = a_assertion_1._to_element_tree().tag response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag ) response = _sec.sign_statement( f"{response}", class_name(a_assertion_1), key_file=self.server.sec.key_file, node_id=a_assertion_1.id ) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[1]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) response = samlp.response_from_string(enctext) response.assertion = response.assertion[0] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_2) advice_tag = response.assertion.advice._to_element_tree().tag assertion_tag = a_assertion_2._to_element_tree().tag response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag ) response = _sec.sign_statement( f"{response}", class_name(a_assertion_2), key_file=self.server.sec.key_file, node_id=a_assertion_2.id ) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[0]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) response = samlp.response_from_string(enctext) response.assertion = response.assertion[0] assertion_tag = response.assertion._to_element_tree().tag response = pre_encrypt_assertion(response) response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(assertion_tag) response = _sec.sign_statement( f"{response}", class_name(assertion_1), key_file=self.server.sec.key_file, node_id=assertion_1.id ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[1]["cert_file"], pre_encryption_part() ) response = samlp.response_from_string(enctext) response.assertion = assertion_2 response.assertion.advice = Advice() response.assertion.advice.encrypted_assertion = [] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_3) advice_tag = response.assertion.advice._to_element_tree().tag assertion_tag = a_assertion_3._to_element_tree().tag response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag ) response = _sec.sign_statement( f"{response}", class_name(a_assertion_3), key_file=self.server.sec.key_file, node_id=a_assertion_3.id ) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[0]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) response = samlp.response_from_string(enctext) response.assertion = response.assertion[0] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_4) advice_tag = response.assertion.advice._to_element_tree().tag assertion_tag = a_assertion_4._to_element_tree().tag response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag ) response = _sec.sign_statement( f"{response}", class_name(a_assertion_4), key_file=self.server.sec.key_file, node_id=a_assertion_4.id ) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[1]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) response = samlp.response_from_string(enctext) response = _sec.sign_statement( f"{response}", class_name(response.assertion[0]), key_file=self.server.sec.key_file, node_id=response.assertion[0].id, ) response = samlp.response_from_string(response) # seresp = samlp.response_from_string(enctext) resp_str = b64encode(str(response).encode()) # Now over to the client side resp = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"_012345": "http://foo.example.com/service"} ) # assert resp.encrypted_assertion == [] assert resp.assertion assert resp.assertion.advice assert resp.assertion.advice.assertion assert resp.ava == { "street": ["street"], "uid": ["test01"], "title": ["title"], "givenName": ["Derek"], "email": ["test.testsson@test.se"], "sn": ["Jeter"], } def test_signed_with_default_algo_redirect(self): # Revert configuration change to disallow unsinged responses self.client.want_response_signed = True reqid, req = self.client.create_authn_request("http://localhost:8088/sso", message_id="id1") msg_str = str(req) info = self.client.apply_binding( BINDING_HTTP_REDIRECT, msg_str, destination="", relay_state="relay2", sign=True, ) loc = info["headers"][0][1] qs = parse.parse_qs(loc[1:]) expected_query_params = ["SigAlg", "SAMLRequest", "RelayState", "Signature"] assert _leq(qs.keys(), expected_query_params) assert all(len(qs[k]) == 1 for k in expected_query_params) assert qs["SigAlg"] == [sig_default] assert verify_redirect_signature(list_values2simpletons(qs), self.client.sec.sec_backend) res = self.server.parse_authn_request(qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT) def test_signed_redirect(self): # Revert configuration change to disallow unsinged responses self.client.want_response_signed = True reqid, req = self.client.create_authn_request("http://localhost:8088/sso", message_id="id1") msg_str = str(req) info = self.client.apply_binding( BINDING_HTTP_REDIRECT, msg_str, destination="", relay_state="relay2", sign=True, sigalg=SIG_RSA_SHA256, ) loc = info["headers"][0][1] qs = parse.parse_qs(loc[1:]) expected_query_params = ["SigAlg", "SAMLRequest", "RelayState", "Signature"] assert _leq(qs.keys(), expected_query_params) assert all(len(qs[k]) == 1 for k in expected_query_params) assert qs["SigAlg"] == [SIG_RSA_SHA256] assert verify_redirect_signature(list_values2simpletons(qs), self.client.sec.sec_backend) res = self.server.parse_authn_request(qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT) def test_signed_redirect_passes_if_needs_signed_requests(self): # Revert configuration change to disallow unsinged responses self.client.want_response_signed = True self.server.config.setattr("idp", "want_authn_requests_signed", True) reqid, req = self.client.create_authn_request("http://localhost:8088/sso", message_id="id1") info = self.client.apply_binding( BINDING_HTTP_REDIRECT, str(req), destination="", relay_state="relay2", sign=True, sigalg=SIG_RSA_SHA256, ) loc = info["headers"][0][1] qs = list_values2simpletons(parse.parse_qs(loc[1:])) res = self.server.parse_authn_request( qs["SAMLRequest"], BINDING_HTTP_REDIRECT, relay_state=qs["RelayState"], sigalg=qs["SigAlg"], signature=qs["Signature"], ) assert res.message.destination == "http://localhost:8088/sso" assert res.message.id == "id1" def test_signed_redirect_fail_if_needs_signed_request_but_received_unsigned(self): # Revert configuration change to disallow unsinged responses self.client.want_response_signed = True self.server.config.setattr("idp", "want_authn_requests_signed", True) reqid, req = self.client.create_authn_request("http://localhost:8088/sso", message_id="id1") info = self.client.apply_binding( BINDING_HTTP_REDIRECT, str(req), destination="", relay_state="relay2", sign=True, sigalg=SIG_RSA_SHA256, ) loc = info["headers"][0][1] qs = list_values2simpletons(parse.parse_qs(loc[1:])) with raises(IncorrectlySigned): self.server.parse_authn_request(qs["SAMLRequest"], BINDING_HTTP_REDIRECT) def test_signed_redirect_fail_if_needs_signed_request_but_sigalg_not_matches(self): # Revert configuration change to disallow unsinged responses self.client.want_response_signed = True self.server.config.setattr("idp", "want_authn_requests_signed", True) reqid, req = self.client.create_authn_request("http://localhost:8088/sso", message_id="id1") info = self.client.apply_binding( BINDING_HTTP_REDIRECT, str(req), destination="", relay_state="relay2", sign=True, sigalg=SIG_RSA_SHA256, ) loc = info["headers"][0][1] qs = list_values2simpletons(parse.parse_qs(loc[1:])) with raises(IncorrectlySigned): self.server.parse_authn_request( qs["SAMLRequest"], BINDING_HTTP_REDIRECT, relay_state=qs["RelayState"], sigalg=SIG_RSA_SHA1, signature=qs["Signature"], ) def test_do_logout_signed_redirect(self): conf = config.SPConfig() conf.load_file("sp_slo_redirect_conf") client = Saml2Client(conf) # information about the user from an IdP session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": in_a_while(minutes=15), "ava": {"givenName": "Anders", "sn": "Andersson", "mail": "anders.andersson@example.com"}, } client.users.add_information_about_person(session_info) entity_ids = client.users.issuers_of_info(nid) assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] resp = client.do_logout( nid, entity_ids, "Tired", in_a_while(minutes=5), sign=True, expected_binding=BINDING_HTTP_REDIRECT ) assert list(resp.keys()) == entity_ids binding, info = resp[entity_ids[0]] assert binding == BINDING_HTTP_REDIRECT loc = info["headers"][0][1] _, _, _, _, qs, _ = parse.urlparse(loc) qs = parse.parse_qs(qs) assert _leq(qs.keys(), ["SigAlg", "SAMLRequest", "RelayState", "Signature"]) qs_simple = list_values2simpletons(qs) assert verify_redirect_signature(qs_simple, client.sec.sec_backend) res = self.server.parse_logout_request( qs_simple["SAMLRequest"], BINDING_HTTP_REDIRECT, relay_state=qs_simple["RelayState"], sigalg=qs_simple["SigAlg"], signature=qs_simple["Signature"], ) def test_do_logout_signed_redirect_invalid(self): conf = config.SPConfig() conf.load_file("sp_slo_redirect_conf") client = Saml2Client(conf) session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": in_a_while(minutes=15), "ava": {"givenName": "Anders", "sn": "Andersson", "mail": "anders.andersson@example.com"}, } client.users.add_information_about_person(session_info) entity_ids = client.users.issuers_of_info(nid) resp = client.do_logout( nid, entity_ids, "Tired", in_a_while(minutes=5), sign=True, expected_binding=BINDING_HTTP_REDIRECT, ) binding, info = resp[entity_ids[0]] loc = info["headers"][0][1] _, _, _, _, qs, _ = parse.urlparse(loc) qs = parse.parse_qs(qs) qs_simple = list_values2simpletons(qs) invalid_signature = "ZEdMZUQ3SjBjQ2ozWmlGaHhyV3JZSzNkTWhQWU02bjA0dzVNeUd1UWgrVDhnYm1oc1R1TTFjPQo=" qs_simple_invalid = { **qs_simple, "Signature": invalid_signature, } assert not verify_redirect_signature(qs_simple_invalid, client.sec.sec_backend) self.server.config.setattr("idp", "want_authn_requests_signed", True) with raises(IncorrectlySigned): res = self.server.parse_logout_request( qs_simple["SAMLRequest"], BINDING_HTTP_REDIRECT, relay_state=qs_simple["RelayState"], sigalg=qs_simple["SigAlg"], signature=invalid_signature, ) def test_do_logout_post(self): # information about the user from an IdP session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": in_a_while(minutes=15), "ava": {"givenName": "Anders", "sn": "Andersson", "mail": "anders.andersson@example.com"}, "session_index": SessionIndex("_foo"), } self.client.users.add_information_about_person(session_info) entity_ids = self.client.users.issuers_of_info(nid) assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] resp = self.client.do_logout( nid, entity_ids, "Tired", in_a_while(minutes=5), sign=True, expected_binding=BINDING_HTTP_POST ) assert resp assert len(resp) == 1 assert list(resp.keys()) == entity_ids binding, info = resp[entity_ids[0]] assert binding == BINDING_HTTP_POST _dic = unpack_form(info["data"]) res = self.server.parse_logout_request(_dic["SAMLRequest"], BINDING_HTTP_POST) assert b"_foo" in res.xmlstr def test_do_logout_redirect_no_cache(self): conf = config.SPConfig() conf.load_file("sp_slo_redirect_conf") client = Saml2Client(conf) entity_ids = ["urn:mace:example.com:saml:roland:idp"] resp = client.do_logout( nid, entity_ids, "urn:oasis:names:tc:SAML:2.0:logout:user", in_a_while(minutes=5), expected_binding=BINDING_HTTP_REDIRECT, ) assert resp assert len(resp) == 1 assert list(resp.keys()) == entity_ids binding, info = resp[entity_ids[0]] assert binding == BINDING_HTTP_REDIRECT loc = info["headers"][0][1] _, _, _, _, qs, _ = parse.urlparse(loc) qs = parse.parse_qs(qs) assert _leq(qs.keys(), ["SAMLRequest", "RelayState"]) res = self.server.parse_logout_request(qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT) assert res.subject_id() == nid def test_do_logout_session_expired(self): # information about the user from an IdP session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": a_while_ago(minutes=15), "ava": {"givenName": "Anders", "sn": "Andersson", "mail": "anders.andersson@example.com"}, "session_index": SessionIndex("_foo"), } self.client.users.add_information_about_person(session_info) entity_ids = self.client.users.issuers_of_info(nid) assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] resp = self.client.do_logout( nid, entity_ids, "Tired", in_a_while(minutes=5), sign=True, expected_binding=BINDING_HTTP_POST ) assert resp assert len(resp) == 1 assert list(resp.keys()) == entity_ids binding, info = resp[entity_ids[0]] assert binding == BINDING_HTTP_POST _dic = unpack_form(info["data"]) res = self.server.parse_logout_request(_dic["SAMLRequest"], BINDING_HTTP_POST) assert b"_foo" in res.xmlstr def test_signature_wants(self): ava = {"givenName": ["Derek"], "sn": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": ["The man"]} nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) kwargs = { "identity": ava, "in_response_to": "id1", "destination": "http://lingon.catalogix.se:8087/", "sp_entity_id": "urn:mace:example.com:saml:roland:sp", "name_id_policy": nameid_policy, "userid": "foba0001@example.com", "authn": AUTHN, } outstanding = {"id1": "http://foo.example.com/service"} def create_authn_response(**kwargs): return b64encode(str(self.server.create_authn_response(**kwargs)).encode()) def parse_authn_response(response): self.client.parse_authn_request_response(response, BINDING_HTTP_POST, outstanding) def set_client_want(response, assertion, either): self.client.want_response_signed = response self.client.want_assertions_signed = assertion self.client.want_assertions_or_response_signed = either # Response is signed but assertion is not. kwargs["sign_response"] = True kwargs["sign_assertion"] = False response = create_authn_response(**kwargs) set_client_want(True, True, True) with raises(SignatureError): parse_authn_response(response) set_client_want(True, True, False) with raises(SignatureError): parse_authn_response(response) set_client_want(True, False, True) parse_authn_response(response) set_client_want(True, False, False) parse_authn_response(response) set_client_want(False, True, True) with raises(SignatureError): parse_authn_response(response) set_client_want(False, True, False) with raises(SignatureError): parse_authn_response(response) set_client_want(False, False, True) parse_authn_response(response) set_client_want(False, False, False) parse_authn_response(response) # Response is not signed but assertion is signed. kwargs["sign_response"] = False kwargs["sign_assertion"] = True response = create_authn_response(**kwargs) set_client_want(True, True, True) with raises(SignatureError): parse_authn_response(response) set_client_want(True, True, False) with raises(SignatureError): parse_authn_response(response) set_client_want(True, False, True) with raises(SignatureError): parse_authn_response(response) set_client_want(True, False, False) with raises(SignatureError): parse_authn_response(response) set_client_want(False, True, True) parse_authn_response(response) set_client_want(False, True, False) parse_authn_response(response) set_client_want(False, False, True) parse_authn_response(response) set_client_want(False, False, False) parse_authn_response(response) # Both response and assertion are signed. kwargs["sign_response"] = True kwargs["sign_assertion"] = True response = create_authn_response(**kwargs) set_client_want(True, True, True) parse_authn_response(response) set_client_want(True, True, False) parse_authn_response(response) set_client_want(True, False, True) parse_authn_response(response) set_client_want(True, False, False) parse_authn_response(response) set_client_want(False, True, True) parse_authn_response(response) set_client_want(False, True, False) parse_authn_response(response) set_client_want(False, False, True) parse_authn_response(response) set_client_want(False, False, False) parse_authn_response(response) # Neither response nor assertion is signed. kwargs["sign_response"] = False kwargs["sign_assertion"] = False response = create_authn_response(**kwargs) set_client_want(True, True, True) with raises(SignatureError): parse_authn_response(response) set_client_want(True, True, False) with raises(SignatureError): parse_authn_response(response) set_client_want(True, False, True) with raises(SignatureError): parse_authn_response(response) set_client_want(True, False, False) with raises(SignatureError): parse_authn_response(response) set_client_want(False, True, True) with raises(SignatureError): parse_authn_response(response) set_client_want(False, True, False) with raises(SignatureError): parse_authn_response(response) set_client_want(False, False, True) with raises(SigverError): parse_authn_response(response) set_client_want(False, False, False) parse_authn_response(response) class TestClientNonAsciiAva: def setup_class(self): self.server = Server("idp_conf") conf = config.SPConfig() conf.load_file("server_conf") self.client = Saml2Client(conf) def teardown_class(self): self.server.close() def test_create_attribute_query1(self): req_id, req = self.client.create_attribute_query( "https://idp.example.com/idp/", "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", format=saml.NAMEID_FORMAT_PERSISTENT, message_id="id1", ) reqstr = f"{req.to_string().decode()}" assert req.destination == "https://idp.example.com/idp/" assert req.id == "id1" assert req.version == "2.0" subject = req.subject name_id = subject.name_id assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD" issuer = req.issuer assert issuer.text == "urn:mace:example.com:saml:roland:sp" attrq = samlp.attribute_query_from_string(reqstr) assert _leq(attrq.keyswv(), ["destination", "subject", "issue_instant", "version", "id", "issuer"]) assert attrq.destination == req.destination assert attrq.id == req.id assert attrq.version == req.version assert attrq.issuer.text == issuer.text assert attrq.issue_instant == req.issue_instant assert attrq.subject.name_id.format == name_id.format assert attrq.subject.name_id.text == name_id.text def test_create_attribute_query2(self): req_id, req = self.client.create_attribute_query( "https://idp.example.com/idp/", "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", attribute={ ("urn:oid:2.5.4.42", "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", "givenName"): None, ("urn:oid:2.5.4.4", "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", "surname"): None, ("urn:oid:1.2.840.113549.1.9.1", "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None, }, format=saml.NAMEID_FORMAT_PERSISTENT, message_id="id1", ) assert req.destination == "https://idp.example.com/idp/" assert req.id == "id1" assert req.version == "2.0" subject = req.subject name_id = subject.name_id assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD" assert len(req.attribute) == 3 # one is givenName seen = [] for attribute in req.attribute: if attribute.name == "urn:oid:2.5.4.42": assert attribute.name_format == saml.NAME_FORMAT_URI assert attribute.friendly_name == "givenName" seen.append("givenName") elif attribute.name == "urn:oid:2.5.4.4": assert attribute.name_format == saml.NAME_FORMAT_URI assert attribute.friendly_name == "surname" seen.append("surname") elif attribute.name == "urn:oid:1.2.840.113549.1.9.1": assert attribute.name_format == saml.NAME_FORMAT_URI if getattr(attribute, "friendly_name"): assert False seen.append("email") assert _leq(seen, ["givenName", "surname", "email"]) def test_create_attribute_query_3(self): req_id, req = self.client.create_attribute_query( "https://aai-demo-idp.switch.ch/idp/shibboleth", "_e7b68a04488f715cda642fbdd90099f5", format=NAMEID_FORMAT_TRANSIENT, message_id="id1", ) assert isinstance(req, samlp.AttributeQuery) assert req.destination == "https://aai-demo-idp.switch" ".ch/idp/shibboleth" assert req.id == "id1" assert req.version == "2.0" assert req.issue_instant assert req.issuer.text == "urn:mace:example.com:saml:roland:sp" nameid = req.subject.name_id assert nameid.format == NAMEID_FORMAT_TRANSIENT assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5" def test_create_auth_request_0(self): ar_str = ( "%s" % self.client.create_authn_request( "http://www.example.com/sso", message_id="id1", nameid_format=NAMEID_FORMAT_TRANSIENT, )[1] ) ar = samlp.authn_request_from_string(ar_str) assert ar.assertion_consumer_service_url == ("http://lingon.catalogix" ".se:8087/") assert ar.destination == "http://www.example.com/sso" assert ar.protocol_binding == BINDING_HTTP_POST assert ar.version == "2.0" assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" nid_policy = ar.name_id_policy assert nid_policy.allow_create is None assert nid_policy.format == NAMEID_FORMAT_TRANSIENT node_requested_attributes = None for e in ar.extensions.extension_elements: if e.tag == RequestedAttributes.c_tag: node_requested_attributes = e break assert node_requested_attributes is not None for c in node_requested_attributes.children: assert c.tag == RequestedAttribute.c_tag assert c.attributes["isRequired"] in ["true", "false"] assert c.attributes["Name"] assert c.attributes["FriendlyName"] assert c.attributes["NameFormat"] def test_create_auth_request_unset_force_authn(self): req_id, req = self.client.create_authn_request("http://www.example.com/sso", sign=False, message_id="id1") assert bool(req.force_authn) == False def test_create_auth_request_set_force_authn(self): req_id, req = self.client.create_authn_request( "http://www.example.com/sso", sign=False, message_id="id1", force_authn="true" ) assert bool(req.force_authn) == True def test_create_auth_request_nameid_policy_allow_create(self): conf = config.SPConfig() conf.load_file("sp_conf_nameidpolicy") client = Saml2Client(conf) ar_str = f"{client.create_authn_request('http://www.example.com/sso', message_id='id1')[1]}" ar = samlp.authn_request_from_string(ar_str) assert ar.assertion_consumer_service_url == ("http://lingon.catalogix" ".se:8087/") assert ar.destination == "http://www.example.com/sso" assert ar.protocol_binding == BINDING_HTTP_POST assert ar.version == "2.0" assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" nid_policy = ar.name_id_policy assert nid_policy.allow_create == "true" assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT def test_create_auth_request_vo(self): assert list(self.client.config.vorg.keys()) == ["urn:mace:example.com:it:tek"] ar_str = ( "%s" % self.client.create_authn_request( "http://www.example.com/sso", "urn:mace:example.com:it:tek", # vo nameid_format=NAMEID_FORMAT_PERSISTENT, message_id="666", )[1] ) ar = samlp.authn_request_from_string(ar_str) assert ar.id == "666" assert ar.assertion_consumer_service_url == "http://lingon.catalogix" ".se:8087/" assert ar.destination == "http://www.example.com/sso" assert ar.protocol_binding == BINDING_HTTP_POST assert ar.version == "2.0" assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" nid_policy = ar.name_id_policy assert nid_policy.allow_create == "false" assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek" def test_sign_auth_request_0(self): req_id, areq = self.client.create_authn_request("http://www.example.com/sso", sign=True, message_id="id1") ar_str = f"{areq}" ar = samlp.authn_request_from_string(ar_str) assert ar assert ar.signature assert ar.signature.signature_value signed_info = ar.signature.signed_info assert len(signed_info.reference) == 1 assert signed_info.reference[0].uri == "#id1" assert signed_info.reference[0].digest_value try: assert self.client.sec.correctly_signed_authn_request( ar_str, self.client.config.xmlsec_binary, self.client.config.metadata ) except Exception: # missing certificate self.client.sec.verify_signature(ar_str, node_name=class_name(ar)) def test_create_logout_request(self): req_id, req = self.client.create_logout_request( "http://localhost:8088/slo", "urn:mace:example.com:saml:roland:idp", name_id=nid, reason="Tired", expire=in_a_while(minutes=15), session_indexes=["_foo"], ) assert req.destination == "http://localhost:8088/slo" assert req.reason == "Tired" assert req.version == "2.0" assert req.name_id == nid assert req.issuer.text == "urn:mace:example.com:saml:roland:sp" assert req.session_index == [SessionIndex("_foo")] def test_response_1(self): IDP = "urn:mace:example.com:saml:roland:idp" ava = {"givenName": ["Dave"], "sn": ["Concepción"], "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id_policy=nameid_policy, sign_response=True, userid="foba0001@example.com", authn=AUTHN, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode("utf-8")) authn_response = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) assert authn_response is not None assert authn_response.issuer() == IDP assert authn_response.response.assertion[0].issuer.text == IDP session_info = authn_response.session_info() assert session_info["ava"] == { "givenName": ["Dave"], "sn": ["Concepción"], "mail": ["Dave@cnr.mlb.com"], "title": ["#13"], } assert session_info["issuer"] == IDP assert session_info["came_from"] == "http://foo.example.com/service" response = samlp.response_from_string(authn_response.xmlstr) assert response.destination == "http://lingon.catalogix.se:8087/" assert "session_index" in session_info # One person in the cache assert len(self.client.users.subjects()) == 1 subject_id = self.client.users.subjects()[0] # The information I have about the subject comes from one source assert self.client.users.issuers_of_info(subject_id) == [IDP] # --- authenticate another person ava = {"givenName": ["Alfonson"], "sn": ["Soriano"], "mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]} resp_str = "%s" % self.server.create_authn_response( identity=ava, in_response_to="id2", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", sign_response=True, name_id_policy=nameid_policy, userid="also0001@example.com", authn=AUTHN, ) resp_str = b64encode(resp_str.encode()) self.client.parse_authn_request_response(resp_str, BINDING_HTTP_POST, {"id2": "http://foo.example.com/service"}) # Two persons in the cache assert len(self.client.users.subjects()) == 2 issuers = [self.client.users.issuers_of_info(s) for s in self.client.users.subjects()] # The information I have about the subjects comes from the same source assert issuers == [[IDP], [IDP]] def test_response_2(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() cert_str, cert_key_str = generate_cert() cert = {"cert": cert_str, "key": cert_key_str} self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}, {"id1": cert} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_3(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_4(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_5(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") cert_str, cert_key_str = generate_cert() cert = {"cert": cert_str, "key": cert_key_str} resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_assertion=cert_str, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}, {"id1": cert} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_6(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") cert_assertion_str, cert_key_assertion_str = generate_cert() cert_assertion = {"cert": cert_assertion_str, "key": cert_key_assertion_str} cert_advice_str, cert_key_advice_str = generate_cert() cert_advice = {"cert": cert_advice_str, "key": cert_key_advice_str} resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_assertion=cert_assertion_str, encrypt_cert_advice=cert_advice_str, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}, {"id1": [cert_assertion, cert_advice]}, ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_7(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=True, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_8(self): conf = config.SPConfig() conf.load_file("server_conf") _client = Saml2Client(conf) idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id1") cert_str, cert_key_str = generate_cert() cert = {"cert": cert_str, "key": cert_key_str} resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=self.name_id, userid="foba0001@example.com", authn=AUTHN, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypt_cert_assertion=cert_str, ) resp_str = f"{resp}" resp_str = b64encode(resp_str.encode()) authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}, {"id1": cert} ) self.verify_authn_response(idp, authn_response, _client, ava_verify) def test_response_no_name_id(self): """Test that the SP client can parse an authentication response from an IdP that does not contain a element.""" _bytes = bytes conf = config.SPConfig() conf.load_file("server_conf") client = Saml2Client(conf) # Use the same approach as the other tests for mocking up # an authentication response to parse. idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() # Mock up an authentication response but do not encrypt it # nor sign it since below we will modify it directly. Note that # setting name_id to None still results in a response that includes # a element. resp = self.server.create_authn_response( identity=ava, in_response_to="id1", destination="http://lingon.catalogix.se:8087/", sp_entity_id="urn:mace:example.com:saml:roland:sp", name_id=None, userid="foba0001@example.com", authn=AUTHN, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=False, ) # The create_authn_response method above will return an instance # of saml2.samlp.Response when neither encrypting nor signing and # so we can remove the element directly. resp.assertion.subject.name_id = None # Assert that the response does not contain a NameID element so that # the parsing below is a fair test. assert str(resp).find("NameID") == -1 # Cast the response to a string and encode it to mock up the payload # the SP client is expected to receive via HTTP POST binding. resp_str = b64encode(bytes(str(resp), "utf-8")) # We do not need the client to verify a signature for this test. client.want_assertions_signed = False client.want_response_signed = False # Parse the authentication response that does not include a . authn_response = client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"} ) # A successful test is parsing the response. assert authn_response is not None def test_response_error_status(self): """Test that the SP client can parse an authentication response from an IdP that contains an error status.""" conf = config.SPConfig() conf.load_file("server_conf") client = Saml2Client(conf) resp = self.server.create_error_response( in_response_to="id1", destination="http://lingon.catalogix.se:8087/", info=(samlp.STATUS_INVALID_NAMEID_POLICY, None), ) # Cast the response to a string and encode it to mock up the payload # the SP client is expected to receive via HTTP POST binding. resp_str = b64encode(bytes(str(resp), "utf-8")) # We do not need the client to verify a signature for this test. client.want_assertions_signed = False client.want_response_signed = False # Parse the authentication error response with raises(StatusInvalidNameidPolicy): client.parse_authn_request_response(resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}) def test_response_error_status_non_standard_status_code(self): """Test that the SP client can parse an authentication response from an IdP that contains an error status.""" conf = config.SPConfig() conf.load_file("server_conf") client = Saml2Client(conf) resp = self.server.create_error_response( in_response_to="id1", destination="http://lingon.catalogix.se:8087/", info=("http://example.com/status/1.0/cancel", None), ) # Cast the response to a string and encode it to mock up the payload # the SP client is expected to receive via HTTP POST binding. resp_str = b64encode(bytes(str(resp), "utf-8")) # We do not need the client to verify a signature for this test. client.want_assertions_signed = False client.want_response_signed = False # Parse the authentication error response with raises(StatusError): client.parse_authn_request_response(resp_str, BINDING_HTTP_POST, {"id1": "http://foo.example.com/service"}) def setup_verify_authn_response(self): idp = "urn:mace:example.com:saml:roland:idp" ava = {"givenName": ["Dave"], "sn": ["Concepción"], "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} ava_verify = {"givenName": ["Dave"], "sn": ["Concepción"], "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) return idp, ava, ava_verify, nameid_policy def verify_authn_response(self, idp, authn_response, _client, ava_verify): assert authn_response is not None assert authn_response.issuer() == idp assert authn_response.assertion.issuer.text == idp session_info = authn_response.session_info() assert session_info["ava"] == ava_verify assert session_info["issuer"] == idp assert session_info["came_from"] == "http://foo.example.com/service" response = samlp.response_from_string(authn_response.xmlstr) assert response.destination == "http://lingon.catalogix.se:8087/" # One person in the cache assert len(_client.users.subjects()) == 1 subject_id = _client.users.subjects()[0] # The information I have about the subject comes from one source assert _client.users.issuers_of_info(subject_id) == [idp] def test_init_values(self): entityid = self.client.config.entityid assert entityid == "urn:mace:example.com:saml:roland:sp" location = self.client._sso_location() assert location == "http://localhost:8088/sso" my_name = self.client._my_name() assert my_name == "urn:mace:example.com:saml:roland:sp" def test_sign_then_encrypt_assertion(self): # Begin with the IdPs side _sec = self.server.sec assertion = s_utils.assertion_factory( subject=factory(saml.Subject, text="_aaa", name_id=factory(saml.NameID, format=NAMEID_FORMAT_TRANSIENT)), attribute_statement=do_attribute_statement( { ("", "", "sn"): ("Jeter", ""), ("", "", "givenName"): ("Derek", ""), } ), issuer=self.server._issuer(), ) assertion.signature = sigver.pre_signature_part(assertion.id, _sec.my_cert, 1) sigass = _sec.sign_statement( assertion, class_name(assertion), key_file=full_path("test.key"), node_id=assertion.id ) # Create an Assertion instance from the signed assertion _ass = saml.assertion_from_string(sigass) response = response_factory( in_response_to="_012345", destination="https:#www.example.com", status=s_utils.success_status_factory(), issuer=self.server._issuer(), assertion=_ass, ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[0]["cert_file"], pre_encryption_part() ) seresp = samlp.response_from_string(enctext) # Now over to the client side _csec = self.client.sec if seresp.encrypted_assertion: decr_text = _csec.decrypt(enctext) seresp = samlp.response_from_string(decr_text) resp_ass = [] sign_cert_file = full_path("test.pem") for enc_ass in seresp.encrypted_assertion: assers = extension_elements_to_elements(enc_ass.extension_elements, [saml, samlp]) for ass in assers: if ass.signature: if not _csec.verify_signature(f"{ass}", sign_cert_file, node_name=class_name(ass)): continue resp_ass.append(ass) seresp.assertion = resp_ass seresp.encrypted_assertion = None assert seresp.assertion def test_sign_then_encrypt_assertion2(self): # Begin with the IdPs side _sec = self.server.sec nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) asser = Assertion({"givenName": "Dave", "sn": "Concepción"}) farg = add_path({}, ["assertion", "subject", "subject_confirmation", "method", saml.SCM_BEARER]) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "in_response_to", "_012345"], ) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "recipient", "http://lingon.catalogix.se:8087/"], ) assertion = asser.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), name_id=factory(saml.NameID, format=NAMEID_FORMAT_TRANSIENT), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", farg=farg["assertion"], ) assertion.signature = sigver.pre_signature_part(assertion.id, _sec.my_cert, 1) sigass = _sec.sign_statement( assertion, class_name(assertion), key_file=self.client.sec.key_file, node_id=assertion.id ) sigass = rm_xmltag(sigass) response = response_factory( in_response_to="_012345", destination="http://lingon.catalogix.se:8087/", status=s_utils.success_status_factory(), issuer=self.server._issuer(), encrypted_assertion=EncryptedAssertion(), ) xmldoc = f"{response}" # strangely enough I get different tags if I run this test separately # or as part of a bunch of tests. xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass) enctext = _sec.crypto.encrypt_assertion( xmldoc, self.client.sec.encryption_keypairs[1]["cert_file"], pre_encryption_part() ) # seresp = samlp.response_from_string(enctext) resp_str = b64encode(enctext.encode()) # Now over to the client side # Explicitely allow unsigned responses for this and the following 2 tests self.client.want_response_signed = False resp = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"_012345": "http://foo.example.com/service"} ) # assert resp.encrypted_assertion == [] assert resp.assertion assert resp.ava == {"sn": ["Concepción"], "givenName": ["Dave"]} def test_sign_then_encrypt_assertion_advice_1(self): # Begin with the IdPs side _sec = self.server.sec nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) asser = Assertion({"givenName": "Dave", "sn": "Concepción"}) subject_confirmation_specs = { "recipient": "http://lingon.catalogix.se:8087/", "in_response_to": "_012345", "subject_confirmation_method": saml.SCM_BEARER, } name_id = factory(saml.NameID, format=NAMEID_FORMAT_TRANSIENT) farg = add_path({}, ["assertion", "subject", "subject_confirmation", "method", saml.SCM_BEARER]) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "in_response_to", "_012345"], ) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "recipient", "http://lingon.catalogix.se:8087/"], ) assertion = asser.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), name_id=name_id, authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", farg=farg["assertion"], ) a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"}) a_assertion = a_asser.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_assertion.signature = sigver.pre_signature_part(a_assertion.id, _sec.my_cert, 1) assertion.advice = Advice() assertion.advice.encrypted_assertion = [] assertion.advice.encrypted_assertion.append(EncryptedAssertion()) assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion) response = response_factory( in_response_to="_012345", destination="http://lingon.catalogix.se:8087/", status=s_utils.success_status_factory(), issuer=self.server._issuer(), ) response.assertion.append(assertion) response = _sec.sign_statement( f"{response}", class_name(a_assertion), key_file=self.client.sec.key_file, node_id=a_assertion.id ) # xmldoc = "%s" % response # strangely enough I get different tags if I run this test separately # or as part of a bunch of tests. # xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[0]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) # seresp = samlp.response_from_string(enctext) resp_str = b64encode(bytes(enctext, "utf-8")) # Now over to the client side resp = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"_012345": "http://foo.example.com/service"} ) # assert resp.encrypted_assertion == [] assert resp.assertion assert resp.assertion.advice assert resp.assertion.advice.assertion assert resp.ava == { "givenName": ["Dave"], "sn": ["Concepción"], "uid": ["test01"], "email": ["test.testsson@test.se"], } def test_sign_then_encrypt_assertion_advice_2(self): # Begin with the IdPs side _sec = self.server.sec asser_1 = Assertion({"givenName": "Dave"}) farg = add_path({}, ["assertion", "subject", "subject_confirmation", "method", saml.SCM_BEARER]) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "in_response_to", "_012345"], ) add_path( farg["assertion"]["subject"]["subject_confirmation"], ["subject_confirmation_data", "recipient", "http://lingon.catalogix.se:8087/"], ) name_id = factory(saml.NameID, format=NAMEID_FORMAT_TRANSIENT) assertion_1 = asser_1.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) asser_2 = Assertion({"sn": "Concepción"}) assertion_2 = asser_2.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_asser_1 = Assertion({"uid": "test01"}) a_assertion_1 = a_asser_1.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_asser_2 = Assertion({"email": "test.testsson@test.se"}) a_assertion_2 = a_asser_2.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_asser_3 = Assertion({"street": "street"}) a_assertion_3 = a_asser_3.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_asser_4 = Assertion({"title": "title"}) a_assertion_4 = a_asser_4.construct( self.client.config.entityid, self.server.config.attribute_converters, self.server.config.getattr("policy", "idp"), issuer=self.server._issuer(), authn_class=INTERNETPROTOCOLPASSWORD, authn_auth="http://www.example.com/login", name_id=name_id, farg=farg["assertion"], ) a_assertion_1.signature = sigver.pre_signature_part(a_assertion_1.id, _sec.my_cert, 1) a_assertion_2.signature = sigver.pre_signature_part(a_assertion_2.id, _sec.my_cert, 1) a_assertion_3.signature = sigver.pre_signature_part(a_assertion_3.id, _sec.my_cert, 1) a_assertion_4.signature = sigver.pre_signature_part(a_assertion_4.id, _sec.my_cert, 1) assertion_1.signature = sigver.pre_signature_part(assertion_1.id, _sec.my_cert, 1) assertion_2.signature = sigver.pre_signature_part(assertion_2.id, _sec.my_cert, 1) response = response_factory( in_response_to="_012345", destination="http://lingon.catalogix.se:8087/", status=s_utils.success_status_factory(), issuer=self.server._issuer(), ) response.assertion = assertion_1 response.assertion.advice = Advice() response.assertion.advice.encrypted_assertion = [] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_1) advice_tag = response.assertion.advice._to_element_tree().tag assertion_tag = a_assertion_1._to_element_tree().tag response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag ) response = _sec.sign_statement( f"{response}", class_name(a_assertion_1), key_file=self.server.sec.key_file, node_id=a_assertion_1.id ) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[1]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) response = samlp.response_from_string(enctext) response.assertion = response.assertion[0] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_2) advice_tag = response.assertion.advice._to_element_tree().tag assertion_tag = a_assertion_2._to_element_tree().tag response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag ) response = _sec.sign_statement( f"{response}", class_name(a_assertion_2), key_file=self.server.sec.key_file, node_id=a_assertion_2.id ) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[0]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) response = samlp.response_from_string(enctext) response.assertion = response.assertion[0] assertion_tag = response.assertion._to_element_tree().tag response = pre_encrypt_assertion(response) response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(assertion_tag) response = _sec.sign_statement( f"{response}", class_name(assertion_1), key_file=self.server.sec.key_file, node_id=assertion_1.id ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[1]["cert_file"], pre_encryption_part() ) response = samlp.response_from_string(enctext) response.assertion = assertion_2 response.assertion.advice = Advice() response.assertion.advice.encrypted_assertion = [] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_3) advice_tag = response.assertion.advice._to_element_tree().tag assertion_tag = a_assertion_3._to_element_tree().tag response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag ) response = _sec.sign_statement( f"{response}", class_name(a_assertion_3), key_file=self.server.sec.key_file, node_id=a_assertion_3.id ) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[0]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) response = samlp.response_from_string(enctext) response.assertion = response.assertion[0] response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_4) advice_tag = response.assertion.advice._to_element_tree().tag assertion_tag = a_assertion_4._to_element_tree().tag response = response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag ) response = _sec.sign_statement( f"{response}", class_name(a_assertion_4), key_file=self.server.sec.key_file, node_id=a_assertion_4.id ) node_xpath = "".join( [f'/*[local-name()="{v}"]' for v in ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]] ) enctext = _sec.crypto.encrypt_assertion( response, self.client.sec.encryption_keypairs[1]["cert_file"], pre_encryption_part(), node_xpath=node_xpath ) response = samlp.response_from_string(enctext) response = _sec.sign_statement( f"{response}", class_name(response.assertion[0]), key_file=self.server.sec.key_file, node_id=response.assertion[0].id, ) response = samlp.response_from_string(response) # seresp = samlp.response_from_string(enctext) resp_str = b64encode(response.to_string()) # Now over to the client side resp = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, {"_012345": "http://foo.example.com/service"} ) # assert resp.encrypted_assertion == [] assert resp.assertion assert resp.assertion.advice assert resp.assertion.advice.assertion assert resp.ava == { "street": ["street"], "uid": ["test01"], "title": ["title"], "givenName": ["Dave"], "email": ["test.testsson@test.se"], "sn": ["Concepción"], } def test_signed_redirect(self): # Revert configuration change to disallow unsinged responses self.client.want_response_signed = True msg_str = f"{self.client.create_authn_request('http://localhost:8088/sso', message_id='id1')[1]}" info = self.client.apply_binding( BINDING_HTTP_REDIRECT, msg_str, destination="", relay_state="relay2", sign=True, sigalg=SIG_RSA_SHA256 ) loc = info["headers"][0][1] qs = parse.parse_qs(loc[1:]) assert _leq(qs.keys(), ["SigAlg", "SAMLRequest", "RelayState", "Signature"]) assert verify_redirect_signature(list_values2simpletons(qs), self.client.sec.sec_backend) res = self.server.parse_authn_request(qs["SAMLRequest"][0], BINDING_HTTP_REDIRECT) def test_do_logout_signed_redirect(self): conf = config.SPConfig() conf.load_file("sp_slo_redirect_conf") client = Saml2Client(conf) # information about the user from an IdP session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": in_a_while(minutes=15), "ava": {"givenName": "Anders", "sn": "Österberg", "mail": "anders.osterberg@example.com"}, } client.users.add_information_about_person(session_info) entity_ids = client.users.issuers_of_info(nid) assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] resp = client.do_logout( nid, entity_ids, "Tired", in_a_while(minutes=5), sign=True, expected_binding=BINDING_HTTP_REDIRECT ) assert list(resp.keys()) == entity_ids binding, info = resp[entity_ids[0]] assert binding == BINDING_HTTP_REDIRECT loc = info["headers"][0][1] _, _, _, _, qs, _ = parse.urlparse(loc) qs = parse.parse_qs(qs) assert _leq(qs.keys(), ["SigAlg", "SAMLRequest", "RelayState", "Signature"]) qs_simple = list_values2simpletons(qs) assert verify_redirect_signature(qs_simple, client.sec.sec_backend) res = self.server.parse_logout_request( qs_simple["SAMLRequest"], BINDING_HTTP_REDIRECT, relay_state=qs_simple["RelayState"], sigalg=qs_simple["SigAlg"], signature=qs_simple["Signature"], ) def test_do_logout_post(self): # information about the user from an IdP session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": in_a_while(minutes=15), "ava": {"givenName": "Anders", "sn": "Österberg", "mail": "anders.osterberg@example.com"}, "session_index": SessionIndex("_foo"), } self.client.users.add_information_about_person(session_info) entity_ids = self.client.users.issuers_of_info(nid) assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] resp = self.client.do_logout( nid, entity_ids, "Tired", in_a_while(minutes=5), sign=True, expected_binding=BINDING_HTTP_POST ) assert resp assert len(resp) == 1 assert list(resp.keys()) == entity_ids binding, info = resp[entity_ids[0]] assert binding == BINDING_HTTP_POST _dic = unpack_form(info["data"]) res = self.server.parse_logout_request(_dic["SAMLRequest"], BINDING_HTTP_POST) assert b"_foo" in res.xmlstr def test_do_logout_session_expired(self): # information about the user from an IdP session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": a_while_ago(minutes=15), "ava": {"givenName": "Anders", "sn": "Österberg", "mail": "anders.osterberg@example.com"}, "session_index": SessionIndex("_foo"), } self.client.users.add_information_about_person(session_info) entity_ids = self.client.users.issuers_of_info(nid) assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] resp = self.client.do_logout( nid, entity_ids, "Tired", in_a_while(minutes=5), sign=True, expected_binding=BINDING_HTTP_POST ) assert resp assert len(resp) == 1 assert list(resp.keys()) == entity_ids binding, info = resp[entity_ids[0]] assert binding == BINDING_HTTP_POST _dic = unpack_form(info["data"]) res = self.server.parse_logout_request(_dic["SAMLRequest"], BINDING_HTTP_POST) assert b"_foo" in res.xmlstr # Below can only be done with dummy Server IDP = "urn:mace:example.com:saml:roland:idp" class TestClientWithDummy: def setup_class(self): self.server = FakeIDP("idp_all_conf") conf = config.SPConfig() conf.load_file("servera_conf") self.client = Saml2Client(conf) self.client.send = self.server.receive def test_do_authn(self): binding = BINDING_HTTP_REDIRECT response_binding = BINDING_HTTP_POST sid, http_args = self.client.prepare_for_authenticate( IDP, "http://www.example.com/relay_state", binding=binding, response_binding=response_binding ) assert isinstance(sid, str) assert len(http_args) == 5 assert http_args["headers"][0][0] == "Location" assert http_args["data"] == [] assert http_args["status"] == 303 redirect_url = http_args["headers"][0][1] _, _, _, _, qs, _ = parse.urlparse(redirect_url) qs_dict = parse.parse_qs(qs) req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0], binding) resp_args = self.server.response_args(req.message, [response_binding]) assert resp_args["binding"] == response_binding def test_do_negotiated_authn(self): binding = BINDING_HTTP_REDIRECT response_binding = BINDING_HTTP_POST sid, auth_binding, http_args = self.client.prepare_for_negotiated_authenticate( IDP, "http://www.example.com/relay_state", binding=binding, response_binding=response_binding ) assert binding == auth_binding assert isinstance(sid, str) assert len(http_args) == 5 assert http_args["headers"][0][0] == "Location" assert http_args["data"] == [] assert http_args["status"] == 303 redirect_url = http_args["headers"][0][1] _, _, _, _, qs, _ = parse.urlparse(redirect_url) qs_dict = parse.parse_qs(qs) req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0], binding) resp_args = self.server.response_args(req.message, [response_binding]) assert resp_args["binding"] == response_binding def test_do_attribute_query(self): response = self.client.do_attribute_query( IDP, "_e7b68a04488f715cda642fbdd90099f5", attribute={"eduPersonAffiliation": None}, nameid_format=NAMEID_FORMAT_TRANSIENT, ) def test_logout_1(self): """one IdP/AA logout from""" # information about the user from an IdP session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": in_a_while(minutes=15), "ava": {"givenName": "Anders", "sn": "Österberg", "mail": "anders.osterberg@example.com"}, } self.client.users.add_information_about_person(session_info) entity_ids = self.client.users.issuers_of_info(nid) assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] resp = self.client.global_logout(nid, "Tired", in_a_while(minutes=5)) assert resp assert len(resp) == 1 assert list(resp.keys()) == entity_ids response = resp[entity_ids[0]] assert isinstance(response, LogoutResponse) assert response.return_addrs assert len(response.return_addrs) == 1 def test_post_sso(self): binding = BINDING_HTTP_POST response_binding = BINDING_HTTP_POST sid, http_args = self.client.prepare_for_authenticate( "urn:mace:example.com:saml:roland:idp", relay_state="really", binding=binding, response_binding=response_binding, ) _dic = unpack_form(http_args["data"]) req = self.server.parse_authn_request(_dic["SAMLRequest"], binding) resp_args = self.server.response_args(req.message, [response_binding]) assert resp_args["binding"] == response_binding # Normally a response would now be sent back to the users web client # Here I fake what the client will do # create the form post http_args["data"] = parse.urlencode(_dic) http_args["method"] = "POST" http_args["dummy"] = _dic["SAMLRequest"] http_args["headers"] = [("Content-type", "application/x-www-form-urlencoded")] response = self.client.send(**http_args) _dic = unpack_form(response.text, "SAMLResponse") # Explicitly allow unsigned responses for this test self.client.want_response_signed = False resp = self.client.parse_authn_request_response(_dic["SAMLResponse"], BINDING_HTTP_POST, {sid: "/"}) ac = resp.assertion.authn_statement[0].authn_context assert ac.authenticating_authority[0].text == "http://www.example.com/login" assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD def test_negotiated_post_sso(self): binding = BINDING_HTTP_POST response_binding = BINDING_HTTP_POST sid, auth_binding, http_args = self.client.prepare_for_negotiated_authenticate( "urn:mace:example.com:saml:roland:idp", relay_state="really", binding=binding, response_binding=response_binding, ) _dic = unpack_form(http_args["data"]) assert binding == auth_binding req = self.server.parse_authn_request(_dic["SAMLRequest"], binding) resp_args = self.server.response_args(req.message, [response_binding]) assert resp_args["binding"] == response_binding # Normally a response would now be sent back to the users web client # Here I fake what the client will do # create the form post http_args["data"] = parse.urlencode(_dic) http_args["method"] = "POST" http_args["dummy"] = _dic["SAMLRequest"] http_args["headers"] = [("Content-type", "application/x-www-form-urlencoded")] response = self.client.send(**http_args) _dic = unpack_form(response.text, "SAMLResponse") resp = self.client.parse_authn_request_response(_dic["SAMLResponse"], BINDING_HTTP_POST, {sid: "/"}) ac = resp.assertion.authn_statement[0].authn_context assert ac.authenticating_authority[0].text == "http://www.example.com/login" assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD class TestClientNoConfigContext: def setup_class(self): self.server = FakeIDP("idp_all_conf") conf = config.Config() # not SPConfig conf.load_file("servera_conf") self.client = Saml2Client(conf) self.client.send = self.server.receive def test_logout_1(self): """one IdP/AA logout from""" # information about the user from an IdP session_info = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": in_a_while(minutes=15), "ava": {"givenName": "Anders", "sn": "Österberg", "mail": "anders.osterberg@example.com"}, } self.client.users.add_information_about_person(session_info) entity_ids = self.client.users.issuers_of_info(nid) assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] resp = self.client.global_logout(nid, "Tired", in_a_while(minutes=5)) assert resp assert len(resp) == 1 assert list(resp.keys()) == entity_ids response = resp[entity_ids[0]] assert isinstance(response, LogoutResponse) assert response.return_addrs assert len(response.return_addrs) == 1 def test_parse_soap_enveloped_saml_xxe(): xml = """ ]> &lol1; """ with raises(EntitiesForbidden): parse_soap_enveloped_saml(xml, None) if __name__ == "__main__": tc = TestClient() tc.setup_class() tc.test_sign_then_encrypt_assertion()