#!/usr/bin/env python import base64 from contextlib import closing import copy import os import re from urllib.parse import parse_qs import uuid from pathutils import full_path from pytest import raises from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_REDIRECT from saml2 import BINDING_SOAP from saml2 import VERSION from saml2 import client from saml2 import config from saml2 import extension_elements_to_elements from saml2 import s_utils from saml2 import saml from saml2 import samlp from saml2 import time_util from saml2.assertion import Policy from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.cert import OpenSSLWrapper from saml2.response import IncorrectlySigned from saml2.s_utils import OtherError from saml2.s_utils import do_attribute_statement from saml2.s_utils import factory from saml2.s_utils import sid from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID from saml2.samlp import response_from_string from saml2.server import Server from saml2.sigver import CertificateError from saml2.sigver import DecryptError from saml2.sigver import EncryptError from saml2.sigver import make_temp from saml2.soap import make_soap_enveloped_saml_thingy from saml2.time_util import instant import saml2.xmldsig as ds nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="123456") AUTHN = {"class_ref": INTERNETPROTOCOLPASSWORD, "authn_auth": "http://www.example.com/login"} def response_factory(**kwargs): response = samlp.Response(id=sid(), version=VERSION, issue_instant=instant()) for key, val in kwargs.items(): setattr(response, key, val) return response def _eq(l1, l2): return set(l1) == set(l2) BASEDIR = os.path.abspath(os.path.dirname(__file__)) def get_ava(assertion): ava = {} for statement in assertion.attribute_statement: for attr in statement.attribute: value = [] for tmp_val in attr.attribute_value: value.append(tmp_val.text) key = attr.friendly_name if key is None or len(key) == 0: key = attr.text ava[key] = value return ava def generate_cert(): sn = uuid.uuid4().urn cert_info = { "cn": "localhost", "country_code": "se", "state": "ac", "city": "Umea", "organization": "ITS", "organization_unit": "DIRG", } osw = OpenSSLWrapper() ca_cert_str = osw.read_str_from_file(full_path("root_cert/localhost.ca.crt")) ca_key_str = osw.read_str_from_file(full_path("root_cert/localhost.ca.key")) req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True, sn=sn, key_length=2048) cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) return cert_str, req_key_str class TestServer1: def setup_class(self): self.server = Server("idp_conf") conf = config.SPConfig() conf.load_file("server_conf") self.client = client.Saml2Client(conf) self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id12") self.ava = {"givenName": ["Derek"], "sn": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": "The man"} def teardown_class(self): self.server.close() def verify_assertion(self, assertion): assert assertion assert assertion[0].attribute_statement ava = ava = get_ava(assertion[0]) assert ava == {"mail": ["derek@nyy.mlb.com"], "givenName": ["Derek"], "sn": ["Jeter"], "title": ["The man"]} def verify_encrypted_assertion(self, assertion, decr_text): self.verify_assertion(assertion) assert assertion[0].signature is None assert re.search( r':EncryptedAssertion>]* )*xmlns:encas[0-9]="urn:oasis:names:tc:SAML:2.0:assertion"', decr_text, ) def verify_advice_assertion(self, resp, decr_text): assert resp.assertion[0].signature is None assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements assertion = extension_elements_to_elements( resp.assertion[0].advice.encrypted_assertion[0].extension_elements, [saml, samlp] ) self.verify_encrypted_assertion(assertion, decr_text) def test_issuer(self): issuer = self.server._issuer() assert isinstance(issuer, saml.Issuer) assert _eq(issuer.keyswv(), ["text", "format"]) assert issuer.format == saml.NAMEID_FORMAT_ENTITY assert issuer.text == self.server.config.entityid def test_assertion(self): assertion = s_utils.assertion_factory( subject=factory( saml.Subject, text="_aaa", name_id=factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT) ), attribute_statement=do_attribute_statement( { ("", "", "sn"): ("Jeter", ""), ("", "", "givenName"): ("Derek", ""), } ), issuer=self.server._issuer(), ) assert _eq(assertion.keyswv(), ["attribute_statement", "issuer", "id", "subject", "issue_instant", "version"]) assert assertion.version == "2.0" assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp" # assert assertion.attribute_statement attribute_statement = assertion.attribute_statement assert len(attribute_statement.attribute) == 2 attr0 = attribute_statement.attribute[0] attr1 = attribute_statement.attribute[1] if attr0.attribute_value[0].text == "Derek": assert attr0.friendly_name == "givenName" assert attr1.friendly_name == "sn" assert attr1.attribute_value[0].text == "Jeter" else: assert attr1.friendly_name == "givenName" assert attr1.attribute_value[0].text == "Derek" assert attr0.friendly_name == "sn" assert attr0.attribute_value[0].text == "Jeter" subject = assertion.subject assert _eq(subject.keyswv(), ["text", "name_id"]) assert subject.text == "_aaa" assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT def test_response(self): response = response_factory( in_response_to="_012345", destination="https:#www.example.com", status=s_utils.success_status_factory(), assertion=s_utils.assertion_factory( subject=factory(saml.Subject, text="_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT), attribute_statement=do_attribute_statement( { ("", "", "sn"): ("Jeter", ""), ("", "", "givenName"): ("Derek", ""), } ), issuer=self.server._issuer(), ), issuer=self.server._issuer(), ) print(response.keyswv()) assert _eq( response.keyswv(), ["destination", "assertion", "status", "in_response_to", "issue_instant", "version", "issuer", "id"], ) assert response.version == "2.0" assert response.issuer.text == "urn:mace:example.com:saml:roland:idp" assert response.destination == "https:#www.example.com" assert response.in_response_to == "_012345" # status = response.status print(status) assert status.status_code.value == samlp.STATUS_SUCCESS def test_parse_faulty_request(self): req_id, authn_request = self.client.create_authn_request(destination="http://www.example.com", id="id1") # should raise an error because faulty spentityid binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, f"{authn_request}", "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split("?")[1]) print(_dict) with raises(OtherError): self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) def test_parse_faulty_request_to_err_status(self): req_id, authn_request = self.client.create_authn_request(destination="http://www.example.com") binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, f"{authn_request}", "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split("?")[1]) print(_dict) try: self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) status = None except OtherError as oe: print(oe.args) status = s_utils.error_status_factory(oe) assert status print(status) assert _eq(status.keyswv(), ["status_code", "status_message"]) assert status.status_message.text == "Not destined for me!" status_code = status.status_code assert _eq(status_code.keyswv(), ["status_code", "value"]) assert status_code.value == samlp.STATUS_RESPONDER assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL def test_parse_ok_request(self): req_id, authn_request = self.client.create_authn_request( message_id="id1", destination="http://localhost:8088/sso", nameid_format=saml.NAMEID_FORMAT_TRANSIENT, ) print(authn_request) binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, f"{authn_request}", "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split("?")[1]) print(_dict) req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) # returns a dictionary print(req) resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST]) assert resp_args["destination"] == "http://lingon.catalogix.se:8087/" assert resp_args["in_response_to"] == "id1" name_id_policy = resp_args["name_id_policy"] assert _eq(name_id_policy.keyswv(), ["format"]) assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT assert resp_args["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp" def test_sso_response_with_identity(self): name_id = self.server.ident.transient_nameid("https://example.com/sp", "id12") resp = self.server.create_authn_response( { "eduPersonEntitlement": "Short stop", "sn": "Jeter", "givenName": "Derek", "mail": "derek.jeter@nyy.mlb.com", "title": "The man", }, "id12", # in_response_to "http://localhost:8087/", # destination "https://example.com/sp", # sp_entity_id name_id=name_id, authn=AUTHN, ) print(resp.keyswv()) assert _eq( resp.keyswv(), ["status", "destination", "assertion", "in_response_to", "issue_instant", "version", "id", "issuer"], ) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status assert resp.status.status_code.value == samlp.STATUS_SUCCESS assert resp.assertion assertion = resp.assertion print(assertion) assert assertion.authn_statement assert assertion.conditions assert assertion.attribute_statement attribute_statement = assertion.attribute_statement print(attribute_statement) assert len(attribute_statement[0].attribute) == 4 # Pick out one attribute attr = None for attr in attribute_statement[0].attribute: if attr.friendly_name == "givenName": break assert len(attr.attribute_value) == 1 assert attr.name == "urn:mace:dir:attribute-def:givenName" assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:basic" value = attr.attribute_value[0] assert value.text.strip() == "Derek" assert value.get_type() == "xs:string" assert assertion.subject assert assertion.subject.name_id assert assertion.subject.subject_confirmation confirmation = assertion.subject.subject_confirmation[0] print(confirmation.keyswv()) print(confirmation.subject_confirmation_data) assert confirmation.subject_confirmation_data.in_response_to == "id12" def test_sso_response_without_identity(self): resp = self.server.create_authn_response( {}, "id12", # in_response_to "http://localhost:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id userid="USER1", authn=AUTHN, release_policy=Policy(), best_effort=True, ) print(resp.keyswv()) assert _eq( resp.keyswv(), ["status", "destination", "in_response_to", "issue_instant", "version", "id", "issuer", "assertion"], ) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status assert resp.status.status_code.value == samlp.STATUS_SUCCESS assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" assert not resp.assertion.attribute_statement def test_sso_response_specific_instant(self): _authn = AUTHN.copy() _authn["authn_instant"] = 1234567890 resp = self.server.create_authn_response( {}, "id12", # in_response_to "http://localhost:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id userid="USER1", authn=_authn, best_effort=True, ) print(resp.keyswv()) assert _eq( resp.keyswv(), ["status", "destination", "in_response_to", "issue_instant", "version", "id", "issuer", "assertion"], ) authn_statement = resp.assertion.authn_statement[0] assert authn_statement.authn_instant == "2009-02-13T23:31:30Z" def test_sso_failure_response(self): exc = s_utils.MissingValue("eduPersonAffiliation missing") resp = self.server.create_error_response("id12", "http://localhost:8087/", exc) print(resp.keyswv()) assert _eq( resp.keyswv(), ["status", "destination", "in_response_to", "issue_instant", "version", "id", "issuer"] ) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status print(resp.status) assert resp.status.status_code.value == samlp.STATUS_RESPONDER assert resp.status.status_code.status_code.value == samlp.STATUS_REQUEST_UNSUPPORTED assert resp.status.status_message.text == "eduPersonAffiliation missing" assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" assert not resp.assertion def test_authn_response_0(self): conf = config.SPConfig() conf.load_file("server_conf") self.client = client.Saml2Client(conf) ava = {"givenName": ["Derek"], "sn": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": "The man"} npolicy = samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT, allow_create="true") resp_str = "%s" % self.server.create_authn_response( ava, "id1", "http://local:8087/", "urn:mace:example.com:saml:roland:sp", npolicy, "foba0001@example.com", authn=AUTHN, ) response = samlp.response_from_string(resp_str) print(response.keyswv()) assert _eq( response.keyswv(), ["status", "destination", "assertion", "in_response_to", "issue_instant", "version", "issuer", "id"], ) print(response.assertion[0].keyswv()) assert len(response.assertion) == 1 assert _eq( response.assertion[0].keyswv(), [ "attribute_statement", "issue_instant", "version", "subject", "conditions", "id", "issuer", "authn_statement", ], ) assertion = response.assertion[0] assert len(assertion.attribute_statement) == 1 astate = assertion.attribute_statement[0] print(astate) assert len(astate.attribute) == 4 def test_signed_response(self): name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id12") ava = {"givenName": ["Derek"], "sn": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": "The man"} signed_resp = self.server.create_authn_response( ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=name_id, sign_assertion=True, ) print(signed_resp) assert signed_resp sresponse = response_from_string(signed_resp) # It's the assertions that are signed not the response per se assert len(sresponse.assertion) == 1 assertion = sresponse.assertion[0] # Since the reponse is created dynamically I don't know the signature # value. Just that there should be one assert assertion.signature.signature_value.text != "" def test_signed_response_1(self): signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=True, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=sresponse.assertion[0].id, ) assert valid self.verify_assertion(sresponse.assertion) def test_signed_response_2(self): signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=False, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid assert sresponse.assertion[0].signature == None def test_signed_response_3(self): signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=True, ) sresponse = response_from_string(signed_resp) assert sresponse.signature == None valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=sresponse.assertion[0].id, ) assert valid self.verify_assertion(sresponse.assertion) def test_encrypted_signed_response_1(self): cert_str, cert_key_str = generate_cert() signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=True, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=sresponse.assertion[0].id, ) assert valid key_fd = make_temp(cert_key_str, decode=False) decr_text = self.server.sec.decrypt(signed_resp, key_fd.name) resp = samlp.response_from_string(decr_text) assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements assertion = extension_elements_to_elements( resp.assertion[0].advice.encrypted_assertion[0].extension_elements, [saml, samlp] ) self.verify_assertion(assertion) # PEFIM never signs assertions. assert assertion[0].signature is None # valid = self.server.sec.verify_signature(decr_text, # self.server.config.cert_file, # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', # node_id=assertion[0].id) assert valid def test_encrypted_signed_response_2(self): cert_str, cert_key_str = generate_cert() signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid decr_text_old = copy.deepcopy(f"{signed_resp}") with raises(DecryptError): decr_text = self.server.sec.decrypt( signed_resp, self.client.config.encryption_keypairs[0]["key_file"], ) decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"]) assert decr_text != decr_text_old resp = samlp.response_from_string(decr_text) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) assert resp.assertion[0].signature == None self.verify_assertion(resp.assertion) def test_encrypted_signed_response_3(self): cert_str, cert_key_str = generate_cert() signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=False, encrypt_cert_assertion=cert_str, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid key_fd = make_temp(cert_key_str, decode=False) decr_text = self.server.sec.decrypt(signed_resp, key_fd.name) resp = samlp.response_from_string(decr_text) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) valid = self.server.sec.verify_signature( decr_text, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=resp.assertion[0].id, ) assert valid self.verify_assertion(resp.assertion) assert "xmlns:encas" not in decr_text def test_encrypted_signed_response_4(self): cert_str, cert_key_str = generate_cert() signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"]) resp = samlp.response_from_string(decr_text) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) valid = self.server.sec.verify_signature( decr_text, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=resp.assertion[0].id, ) assert valid key_fd = make_temp(cert_key_str, decode=False) decr_text = self.server.sec.decrypt(decr_text, key_fd.name) resp = samlp.response_from_string(decr_text) assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) assertion = extension_elements_to_elements( assertion[0].advice.encrypted_assertion[0].extension_elements, [saml, samlp] ) self.verify_assertion(assertion) # PEFIM never signs assertion in advice assert assertion[0].signature is None # valid = self.server.sec.verify_signature(decr_text, # self.server.config.cert_file, # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', # node_id=assertion[0].id) assert valid def test_encrypted_response_1(self): cert_str_advice, cert_key_str_advice = generate_cert() _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str_advice, ) _resp = f"{_resp}" sresponse = response_from_string(_resp) assert sresponse.signature is None key_fd = make_temp(cert_key_str_advice, decode=False) decr_text = self.server.sec.decrypt(_resp, key_fd.name) resp = samlp.response_from_string(decr_text) self.verify_advice_assertion(resp, decr_text) def test_encrypted_response_2(self): cert_str_advice, cert_key_str_advice = generate_cert() _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str_advice, ) sresponse = response_from_string(_resp) assert sresponse.signature is None decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) key_fd = make_temp(cert_key_str_advice, decode=False) decr_text_2 = self.server.sec.decrypt(decr_text_1, key_fd.name) resp = samlp.response_from_string(decr_text_2) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_advice_assertion(resp, decr_text_2) def test_encrypted_response_3(self): cert_str_assertion, cert_key_str_assertion = generate_cert() _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, encrypt_cert_assertion=cert_str_assertion, ) sresponse = response_from_string(_resp) assert sresponse.signature is None key_fd = make_temp(cert_key_str_assertion, decode=False) decr_text = self.server.sec.decrypt(_resp, key_fd.name) resp = samlp.response_from_string(decr_text) assert resp.encrypted_assertion[0].extension_elements assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_encrypted_assertion(assertion, decr_text) def test_encrypted_response_4(self): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, ) sresponse = response_from_string(_resp) assert sresponse.signature is None decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) resp = samlp.response_from_string(decr_text) assert resp.encrypted_assertion[0].extension_elements assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_encrypted_assertion(assertion, decr_text) def test_encrypted_response_5(self): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, ) _resp = f"{_resp}" sresponse = response_from_string(_resp) assert sresponse.signature is None decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) resp = samlp.response_from_string(decr_text) self.verify_advice_assertion(resp, decr_text) def test_encrypted_response_6(self): _server = Server("idp_conf_verify_cert") cert_str_advice, cert_key_str_advice = generate_cert() cert_str_assertion, cert_key_str_assertion = generate_cert() _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str_advice, encrypt_cert_assertion=cert_str_assertion, ) sresponse = response_from_string(_resp) assert sresponse.signature is None key_fd1 = make_temp(cert_key_str_assertion, decode=False) decr_text_1 = _server.sec.decrypt(_resp, key_fd1.name) key_fd2 = make_temp(cert_key_str_advice, decode=False) decr_text_2 = _server.sec.decrypt(decr_text_1, key_fd2.name) resp = samlp.response_from_string(decr_text_2) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_advice_assertion(resp, decr_text_2) def test_encrypted_response_7(self): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, ) sresponse = response_from_string(_resp) assert sresponse.signature is None decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) decr_text_2 = self.server.sec.decrypt(decr_text_1, self.client.config.encryption_keypairs[1]["key_file"]) resp = samlp.response_from_string(decr_text_2) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_advice_assertion(resp, decr_text_2) def test_encrypted_response_8(self): with raises(EncryptError): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice="whatever", encrypt_cert_assertion="whatever", ) with raises(EncryptError): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice="whatever", ) with raises(EncryptError): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, encrypt_cert_assertion="whatever", ) _server = Server("idp_conf_verify_cert") with raises(CertificateError): _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice="whatever", encrypt_cert_assertion="whatever", ) with raises(CertificateError): _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice="whatever", ) with raises(CertificateError): _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, encrypt_cert_assertion="whatever", ) def test_encrypted_response_9(self): _server = Server("idp_conf_sp_no_encrypt") _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, ) self.verify_assertion(_resp.assertion.advice.assertion) _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, ) self.verify_assertion(_resp.assertion.advice.assertion) _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, ) self.verify_assertion([_resp.assertion]) def test_slo_http_post(self): soon = time_util.in_a_while(days=1) sinfo = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": soon, "user": { "givenName": "Leo", "sn": "Laport", }, } self.client.users.add_information_about_person(sinfo) req_id, logout_request = self.client.create_logout_request( destination="http://localhost:8088/slop", name_id=nid, issuer_entity_id="urn:mace:example.com:saml:roland:idp", reason="I'm tired of this", ) intermed = base64.b64encode(str(logout_request).encode("utf-8")) # saml_soap = make_soap_enveloped_saml_thingy(logout_request) request = self.server.parse_logout_request(intermed, BINDING_HTTP_POST) assert request def test_slo_soap(self): soon = time_util.in_a_while(days=1) sinfo = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": soon, "user": { "givenName": "Leo", "sn": "Laport", }, } sp = client.Saml2Client(config_file="server_conf") sp.users.add_information_about_person(sinfo) req_id, logout_request = sp.create_logout_request( name_id=nid, destination="http://localhost:8088/slo", issuer_entity_id="urn:mace:example.com:saml:roland:idp", reason="I'm tired of this", ) # _ = s_utils.deflate_and_base64_encode("%s" % (logout_request,)) saml_soap = make_soap_enveloped_saml_thingy(logout_request) self.server.ident.close() with closing(Server("idp_soap_conf")) as idp: request = idp.parse_logout_request(saml_soap) idp.ident.close() assert request # ------------------------------------------------------------------------ class TestServer1NonAsciiAva: def setup_class(self): self.server = Server("idp_conf") conf = config.SPConfig() conf.load_file("server_conf") self.client = client.Saml2Client(conf) self.name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id12") self.ava = {"givenName": ["Dave"], "sn": ["ConcepciĆ³n"], "mail": ["dave@cnr.mlb.com"], "title": "#13"} def teardown_class(self): self.server.close() def verify_assertion(self, assertion): assert assertion assert assertion[0].attribute_statement ava = get_ava(assertion[0]) assert ava == {"givenName": ["Dave"], "sn": ["ConcepciĆ³n"], "mail": ["dave@cnr.mlb.com"], "title": ["#13"]} def verify_encrypted_assertion(self, assertion, decr_text): self.verify_assertion(assertion) assert assertion[0].signature is None assert re.search( r':EncryptedAssertion>]* )*xmlns:encas[0-9]="urn:oasis:names:tc:SAML:2.0:assertion"', decr_text, ) def verify_advice_assertion(self, resp, decr_text): assert resp.assertion[0].signature is None assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements assertion = extension_elements_to_elements( resp.assertion[0].advice.encrypted_assertion[0].extension_elements, [saml, samlp] ) self.verify_encrypted_assertion(assertion, decr_text) def test_issuer(self): issuer = self.server._issuer() assert isinstance(issuer, saml.Issuer) assert _eq(issuer.keyswv(), ["text", "format"]) assert issuer.format == saml.NAMEID_FORMAT_ENTITY assert issuer.text == self.server.config.entityid def test_assertion(self): assertion = s_utils.assertion_factory( subject=factory( saml.Subject, text="_aaa", name_id=factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT) ), attribute_statement=do_attribute_statement( { ("", "", "sn"): ("Jeter", ""), ("", "", "givenName"): ("Derek", ""), } ), issuer=self.server._issuer(), ) assert _eq(assertion.keyswv(), ["attribute_statement", "issuer", "id", "subject", "issue_instant", "version"]) assert assertion.version == "2.0" assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp" # assert assertion.attribute_statement attribute_statement = assertion.attribute_statement assert len(attribute_statement.attribute) == 2 attr0 = attribute_statement.attribute[0] attr1 = attribute_statement.attribute[1] if attr0.attribute_value[0].text == "Derek": assert attr0.friendly_name == "givenName" assert attr1.friendly_name == "sn" assert attr1.attribute_value[0].text == "Jeter" else: assert attr1.friendly_name == "givenName" assert attr1.attribute_value[0].text == "Derek" assert attr0.friendly_name == "sn" assert attr0.attribute_value[0].text == "Jeter" # subject = assertion.subject assert _eq(subject.keyswv(), ["text", "name_id"]) assert subject.text == "_aaa" assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT def test_response(self): response = response_factory( in_response_to="_012345", destination="https:#www.example.com", status=s_utils.success_status_factory(), assertion=s_utils.assertion_factory( subject=factory(saml.Subject, text="_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT), attribute_statement=do_attribute_statement( { ("", "", "sn"): ("Jeter", ""), ("", "", "givenName"): ("Derek", ""), } ), issuer=self.server._issuer(), ), issuer=self.server._issuer(), ) print(response.keyswv()) assert _eq( response.keyswv(), ["destination", "assertion", "status", "in_response_to", "issue_instant", "version", "issuer", "id"], ) assert response.version == "2.0" assert response.issuer.text == "urn:mace:example.com:saml:roland:idp" assert response.destination == "https:#www.example.com" assert response.in_response_to == "_012345" # status = response.status print(status) assert status.status_code.value == samlp.STATUS_SUCCESS def test_parse_faulty_request(self): req_id, authn_request = self.client.create_authn_request(destination="http://www.example.com", id="id1") # should raise an error because faulty spentityid binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, f"{authn_request}", "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split("?")[1]) print(_dict) with raises(OtherError): self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) def test_parse_faulty_request_to_err_status(self): req_id, authn_request = self.client.create_authn_request(destination="http://www.example.com") binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, f"{authn_request}", "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split("?")[1]) print(_dict) try: self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) status = None except OtherError as oe: print(oe.args) status = s_utils.error_status_factory(oe) assert status print(status) assert _eq(status.keyswv(), ["status_code", "status_message"]) assert status.status_message.text == "Not destined for me!" status_code = status.status_code assert _eq(status_code.keyswv(), ["status_code", "value"]) assert status_code.value == samlp.STATUS_RESPONDER assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL def test_parse_ok_request(self): req_id, authn_request = self.client.create_authn_request( message_id="id1", destination="http://localhost:8088/sso", nameid_format=saml.NAMEID_FORMAT_TRANSIENT, ) print(authn_request) binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, f"{authn_request}", "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split("?")[1]) print(_dict) req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) # returns a dictionary print(req) resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST]) assert resp_args["destination"] == "http://lingon.catalogix.se:8087/" assert resp_args["in_response_to"] == "id1" name_id_policy = resp_args["name_id_policy"] assert _eq(name_id_policy.keyswv(), ["format"]) assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT assert resp_args["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp" def test_sso_response_with_identity(self): name_id = self.server.ident.transient_nameid("https://example.com/sp", "id12") resp = self.server.create_authn_response( { "eduPersonEntitlement": "Short stop", "sn": "Jeter", "givenName": "Derek", "mail": "derek.jeter@nyy.mlb.com", "title": "The man", }, "id12", # in_response_to "http://localhost:8087/", # destination "https://example.com/sp", # sp_entity_id name_id=name_id, authn=AUTHN, ) print(resp.keyswv()) assert _eq( resp.keyswv(), ["status", "destination", "assertion", "in_response_to", "issue_instant", "version", "id", "issuer"], ) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status assert resp.status.status_code.value == samlp.STATUS_SUCCESS assert resp.assertion assertion = resp.assertion print(assertion) assert assertion.authn_statement assert assertion.conditions assert assertion.attribute_statement attribute_statement = assertion.attribute_statement print(attribute_statement) assert len(attribute_statement[0].attribute) == 4 # Pick out one attribute attr = None for attr in attribute_statement[0].attribute: if attr.friendly_name == "givenName": break assert len(attr.attribute_value) == 1 assert attr.name == "urn:mace:dir:attribute-def:givenName" assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:basic" value = attr.attribute_value[0] assert value.text.strip() == "Derek" assert value.get_type() == "xs:string" assert assertion.subject assert assertion.subject.name_id assert assertion.subject.subject_confirmation confirmation = assertion.subject.subject_confirmation[0] print(confirmation.keyswv()) print(confirmation.subject_confirmation_data) assert confirmation.subject_confirmation_data.in_response_to == "id12" def test_sso_response_without_identity(self): resp = self.server.create_authn_response( {}, "id12", # in_response_to "http://localhost:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id userid="USER1", authn=AUTHN, release_policy=Policy(), best_effort=True, ) print(resp.keyswv()) assert _eq( resp.keyswv(), ["status", "destination", "in_response_to", "issue_instant", "version", "id", "issuer", "assertion"], ) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status assert resp.status.status_code.value == samlp.STATUS_SUCCESS assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" assert not resp.assertion.attribute_statement def test_sso_response_specific_instant(self): _authn = AUTHN.copy() _authn["authn_instant"] = 1234567890 resp = self.server.create_authn_response( {}, "id12", # in_response_to "http://localhost:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id userid="USER1", authn=_authn, best_effort=True, ) print(resp.keyswv()) assert _eq( resp.keyswv(), ["status", "destination", "in_response_to", "issue_instant", "version", "id", "issuer", "assertion"], ) authn_statement = resp.assertion.authn_statement[0] assert authn_statement.authn_instant == "2009-02-13T23:31:30Z" def test_sso_failure_response(self): exc = s_utils.MissingValue("eduPersonAffiliation missing") resp = self.server.create_error_response("id12", "http://localhost:8087/", exc) print(resp.keyswv()) assert _eq( resp.keyswv(), ["status", "destination", "in_response_to", "issue_instant", "version", "id", "issuer"] ) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "id12" assert resp.status print(resp.status) assert resp.status.status_code.value == samlp.STATUS_RESPONDER assert resp.status.status_code.status_code.value == samlp.STATUS_REQUEST_UNSUPPORTED assert resp.status.status_message.text == "eduPersonAffiliation missing" assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" assert not resp.assertion def test_authn_response_0(self): conf = config.SPConfig() conf.load_file("server_conf") self.client = client.Saml2Client(conf) ava = {"givenName": ["Derek"], "sn": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": "The man"} npolicy = samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT, allow_create="true") resp_str = "%s" % self.server.create_authn_response( ava, "id1", "http://local:8087/", "urn:mace:example.com:saml:roland:sp", npolicy, "foba0001@example.com", authn=AUTHN, ) response = samlp.response_from_string(resp_str) print(response.keyswv()) assert _eq( response.keyswv(), ["status", "destination", "assertion", "in_response_to", "issue_instant", "version", "issuer", "id"], ) print(response.assertion[0].keyswv()) assert len(response.assertion) == 1 assert _eq( response.assertion[0].keyswv(), [ "attribute_statement", "issue_instant", "version", "subject", "conditions", "id", "issuer", "authn_statement", ], ) assertion = response.assertion[0] assert len(assertion.attribute_statement) == 1 astate = assertion.attribute_statement[0] print(astate) assert len(astate.attribute) == 4 def test_signed_response(self): name_id = self.server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id12") ava = {"givenName": ["Derek"], "sn": ["Jeter"], "mail": ["derek@nyy.mlb.com"], "title": "The man"} signed_resp = self.server.create_authn_response( ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=name_id, sign_assertion=True, ) print(signed_resp) assert signed_resp sresponse = response_from_string(signed_resp) # It's the assertions that are signed not the response per se assert len(sresponse.assertion) == 1 assertion = sresponse.assertion[0] # Since the reponse is created dynamically I don't know the signature # value. Just that there should be one assert assertion.signature.signature_value.text != "" def test_signed_response_1(self): signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=True, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=sresponse.assertion[0].id, ) assert valid self.verify_assertion(sresponse.assertion) def test_signed_response_2(self): signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=False, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid assert sresponse.assertion[0].signature == None def test_signed_response_3(self): signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=True, ) sresponse = response_from_string(signed_resp) assert sresponse.signature == None valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=sresponse.assertion[0].id, ) assert valid self.verify_assertion(sresponse.assertion) def test_encrypted_signed_response_1(self): cert_str, cert_key_str = generate_cert() signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=True, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=sresponse.assertion[0].id, ) assert valid key_fd = make_temp(cert_key_str, decode=False) decr_text = self.server.sec.decrypt(signed_resp, key_fd.name) resp = samlp.response_from_string(decr_text) assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements assertion = extension_elements_to_elements( resp.assertion[0].advice.encrypted_assertion[0].extension_elements, [saml, samlp] ) self.verify_assertion(assertion) # PEFIM never signs assertions. assert assertion[0].signature is None # valid = self.server.sec.verify_signature(decr_text, # self.server.config.cert_file, # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', # node_id=assertion[0].id) assert valid def test_encrypted_signed_response_2(self): cert_str, cert_key_str = generate_cert() signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid decr_text_old = copy.deepcopy(f"{signed_resp}") with raises(DecryptError): decr_text = self.server.sec.decrypt( signed_resp, self.client.config.encryption_keypairs[0]["key_file"], ) decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"]) assert decr_text != decr_text_old resp = samlp.response_from_string(decr_text) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) assert resp.assertion[0].signature == None self.verify_assertion(resp.assertion) def test_encrypted_signed_response_3(self): cert_str, cert_key_str = generate_cert() signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=False, encrypt_cert_assertion=cert_str, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid key_fd = make_temp(cert_key_str, decode=False) decr_text = self.server.sec.decrypt(signed_resp, key_fd.name) resp = samlp.response_from_string(decr_text) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) valid = self.server.sec.verify_signature( decr_text, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=resp.assertion[0].id, ) assert valid self.verify_assertion(resp.assertion) assert "xmlns:encas" not in decr_text def test_encrypted_signed_response_4(self): cert_str, cert_key_str = generate_cert() signed_resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=True, sign_assertion=True, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str, ) sresponse = response_from_string(signed_resp) valid = self.server.sec.verify_signature( signed_resp, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:protocol:Response", node_id=sresponse.id, ) assert valid decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"]) resp = samlp.response_from_string(decr_text) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) valid = self.server.sec.verify_signature( decr_text, self.server.config.cert_file, node_name="urn:oasis:names:tc:SAML:2.0:assertion:Assertion", node_id=resp.assertion[0].id, ) assert valid key_fd = make_temp(cert_key_str, decode=False) decr_text = self.server.sec.decrypt(decr_text, key_fd.name) resp = samlp.response_from_string(decr_text) assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) assertion = extension_elements_to_elements( assertion[0].advice.encrypted_assertion[0].extension_elements, [saml, samlp] ) self.verify_assertion(assertion) # PEFIM never signs assertion in advice assert assertion[0].signature is None # valid = self.server.sec.verify_signature(decr_text, # self.server.config.cert_file, # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', # node_id=assertion[0].id) assert valid def test_encrypted_response_1(self): cert_str_advice, cert_key_str_advice = generate_cert() _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str_advice, ) _resp = f"{_resp}" sresponse = response_from_string(_resp) assert sresponse.signature is None key_fd = make_temp(cert_key_str_advice, decode=False) decr_text = self.server.sec.decrypt(_resp, key_fd.name) resp = samlp.response_from_string(decr_text) self.verify_advice_assertion(resp, decr_text) def test_encrypted_response_2(self): cert_str_advice, cert_key_str_advice = generate_cert() _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str_advice, ) sresponse = response_from_string(_resp) assert sresponse.signature is None decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) key_fd = make_temp(cert_key_str_advice, decode=False) decr_text_2 = self.server.sec.decrypt(decr_text_1, key_fd.name) resp = samlp.response_from_string(decr_text_2) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_advice_assertion(resp, decr_text_2) def test_encrypted_response_3(self): cert_str_assertion, cert_key_str_assertion = generate_cert() _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, encrypt_cert_assertion=cert_str_assertion, ) sresponse = response_from_string(_resp) assert sresponse.signature is None key_fd = make_temp(cert_key_str_assertion, decode=False) decr_text = self.server.sec.decrypt(_resp, key_fd.name) resp = samlp.response_from_string(decr_text) assert resp.encrypted_assertion[0].extension_elements assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_encrypted_assertion(assertion, decr_text) def test_encrypted_response_4(self): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, ) sresponse = response_from_string(_resp) assert sresponse.signature is None decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) resp = samlp.response_from_string(decr_text) assert resp.encrypted_assertion[0].extension_elements assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_encrypted_assertion(assertion, decr_text) def test_encrypted_response_5(self): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, ) _resp = f"{_resp}" sresponse = response_from_string(_resp) assert sresponse.signature is None decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) resp = samlp.response_from_string(decr_text) self.verify_advice_assertion(resp, decr_text) def test_encrypted_response_6(self): _server = Server("idp_conf_verify_cert") cert_str_advice, cert_key_str_advice = generate_cert() cert_str_assertion, cert_key_str_assertion = generate_cert() _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice=cert_str_advice, encrypt_cert_assertion=cert_str_assertion, ) sresponse = response_from_string(_resp) assert sresponse.signature is None key_fd1 = make_temp(cert_key_str_assertion, decode=False) decr_text_1 = _server.sec.decrypt(_resp, key_fd1.name) key_fd2 = make_temp(cert_key_str_advice, decode=False) decr_text_2 = _server.sec.decrypt(decr_text_1, key_fd2.name) resp = samlp.response_from_string(decr_text_2) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_advice_assertion(resp, decr_text_2) def test_encrypted_response_7(self): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, ) sresponse = response_from_string(_resp) assert sresponse.signature is None decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) decr_text_2 = self.server.sec.decrypt(decr_text_1, self.client.config.encryption_keypairs[1]["key_file"]) resp = samlp.response_from_string(decr_text_2) resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) self.verify_advice_assertion(resp, decr_text_2) def test_encrypted_response_8(self): try: _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice="whatever", encrypt_cert_assertion="whatever", ) assert False, "Must throw an exception" except EncryptError as ex: pass except Exception as ex: assert False, "Wrong exception!" try: _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice="whatever", ) assert False, "Must throw an exception" except EncryptError as ex: pass except Exception as ex: assert False, "Wrong exception!" try: _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, encrypt_cert_assertion="whatever", ) assert False, "Must throw an exception" except EncryptError as ex: pass except Exception as ex: assert False, "Wrong exception!" _server = Server("idp_conf_verify_cert") try: _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice="whatever", encrypt_cert_assertion="whatever", ) assert False, "Must throw an exception" except CertificateError as ex: pass except Exception as ex: assert False, "Wrong exception!" try: _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, encrypt_cert_advice="whatever", ) assert False, "Must throw an exception" except CertificateError as ex: pass except Exception as ex: assert False, "Wrong exception!" try: _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, encrypt_cert_assertion="whatever", ) assert False, "Must throw an exception" except CertificateError as ex: pass except Exception as ex: assert False, "Wrong exception!" def test_encrypted_response_9(self): _server = Server("idp_conf_sp_no_encrypt") _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, pefim=True, ) self.verify_assertion(_resp.assertion.advice.assertion) _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=False, encrypt_assertion_self_contained=True, pefim=True, ) self.verify_assertion(_resp.assertion.advice.assertion) _resp = _server.create_authn_response( self.ava, "id12", # in_response_to "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=self.name_id, sign_response=False, sign_assertion=False, encrypt_assertion=True, encrypt_assertion_self_contained=True, encrypted_advice_attributes=False, ) self.verify_assertion([_resp.assertion]) def test_slo_http_post(self): soon = time_util.in_a_while(days=1) sinfo = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": soon, "user": { "givenName": "Leo", "sn": "Laport", }, } self.client.users.add_information_about_person(sinfo) req_id, logout_request = self.client.create_logout_request( destination="http://localhost:8088/slop", name_id=nid, issuer_entity_id="urn:mace:example.com:saml:roland:idp", reason="I'm tired of this", ) intermed = base64.b64encode(str(logout_request).encode("utf-8")) # saml_soap = make_soap_enveloped_saml_thingy(logout_request) request = self.server.parse_logout_request(intermed, BINDING_HTTP_POST) assert request def test_slo_soap(self): soon = time_util.in_a_while(days=1) sinfo = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": soon, "user": { "givenName": "Leo", "sn": "Laport", }, } sp = client.Saml2Client(config_file="server_conf") sp.users.add_information_about_person(sinfo) req_id, logout_request = sp.create_logout_request( name_id=nid, destination="http://localhost:8088/slo", issuer_entity_id="urn:mace:example.com:saml:roland:idp", reason="I'm tired of this", ) # _ = s_utils.deflate_and_base64_encode("%s" % (logout_request,)) saml_soap = make_soap_enveloped_saml_thingy(logout_request) self.server.ident.close() with closing(Server("idp_soap_conf")) as idp: request = idp.parse_logout_request(saml_soap) assert request idp.config.setattr("idp", "want_authn_requests_signed", True) assert idp.config.getattr("want_authn_requests_signed", "idp") with raises(IncorrectlySigned): # check unsigned requests over SOAP to fail request = idp.parse_logout_request(saml_soap) assert not request idp.ident.close() def test_slo_soap_signed(self): soon = time_util.in_a_while(days=1) sinfo = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": soon, "user": { "givenName": "Leo", "sn": "Laport", }, } sp = client.Saml2Client(config_file="server_conf") sp.users.add_information_about_person(sinfo) req_id, logout_request = sp.create_logout_request( name_id=nid, destination="http://localhost:8088/slo", issuer_entity_id="urn:mace:example.com:saml:roland:idp", reason="I'm tired of this", sign=True, sign_alg=ds.SIG_RSA_SHA512, digest_alg=ds.DIGEST_SHA512, ) saml_soap = sp.apply_binding(BINDING_SOAP, logout_request, sign=False) saml_soap = saml_soap["data"] self.server.ident.close() with closing(Server("idp_soap_conf")) as idp: idp.config.setattr("idp", "want_authn_requests_signed", True) assert idp.config.getattr("want_authn_requests_signed", "idp") with raises(IncorrectlySigned): # idp_soap_conf has invalid certificate for sp request = idp.parse_logout_request(saml_soap) assert not request idp.ident.close() with closing(Server("idp_conf_verify_cert")) as idp: idp.config.setattr("idp", "want_authn_requests_signed", True) assert idp.config.getattr("want_authn_requests_signed", "idp") request = idp.parse_logout_request(saml_soap) idp.ident.close() assert request # ------------------------------------------------------------------------ IDENTITY = { "eduPersonAffiliation": ["staff", "member"], "sn": ["Jeter"], "givenName": ["Derek"], "mail": ["foo@gmail.com"], "title": "The man", } class TestServer2: def setup_class(self): self.server = Server("restrictive_idp_conf") def teardown_class(self): self.server.close() def test_do_attribute_reponse(self): aa_policy = self.server.config.getattr("policy", "idp") print(aa_policy.__dict__) response = self.server.create_attribute_response( IDENTITY.copy(), "aaa", "http://example.com/sp/", "http://www.example.com/roland/sp" ) assert response is not None assert response.destination == "http://example.com/sp/" assert response.in_response_to == "aaa" assert response.version == "2.0" assert response.issuer.text == "urn:mace:example.com:saml:roland:idpr" assert response.status.status_code.value == samlp.STATUS_SUCCESS assert response.assertion assertion = response.assertion assert assertion.version == "2.0" subject = assertion.subject # assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT assert subject.subject_confirmation subject_conf = subject.subject_confirmation[0] assert subject_conf.subject_confirmation_data.in_response_to == "aaa" def _logout_request(conf_file): conf = config.SPConfig() conf.load_file(conf_file) sp = client.Saml2Client(conf) soon = time_util.in_a_while(days=1) sinfo = { "name_id": nid, "issuer": "urn:mace:example.com:saml:roland:idp", "not_on_or_after": soon, "user": { "givenName": "Leo", "sn": "Laport", }, } sp.users.add_information_about_person(sinfo) return sp.create_logout_request( name_id=nid, destination="http://localhost:8088/slo", issuer_entity_id="urn:mace:example.com:saml:roland:idp", reason="I'm tired of this", ) class TestServerLogout: def test_1(self): with closing(Server("idp_slo_redirect_conf")) as server: req_id, request = _logout_request("sp_slo_redirect_conf") print(request) bindings = [BINDING_HTTP_REDIRECT] response = server.create_logout_response(request, bindings) binding, destination = server.pick_binding("single_logout_service", bindings, "spsso", request) http_args = server.apply_binding(binding, f"{response}", destination, "relay_state", response=True) assert len(http_args) == 5 assert http_args["headers"][0][0] == "Location" assert http_args["data"] == [] assert http_args["status"] == 303 assert http_args["url"] == "http://lingon.catalogix.se:8087/sloresp" def test_2(self): with closing(Server("idp_slo_redirect_conf")) as server: req_id, request = _logout_request("sp_slo_redirect_conf") print(request) bindings = [BINDING_HTTP_POST] response = server.create_logout_response(request, bindings) binding, destination = server.pick_binding("single_logout_service", bindings, "spsso", request) http_args = server.apply_binding(binding, f"{response}", destination, "relay_state", response=True) assert len(http_args) == 5 assert len(http_args["data"]) > 0 assert http_args["method"] == "POST" assert http_args["url"] == "http://lingon.catalogix.se:8087/slo" assert http_args["status"] == 200 if __name__ == "__main__": ts = TestServer1() ts.setup_class() ts.test_encrypted_signed_response_1()