From 47c61dfdc4bdb4a58a0b308ad2a7a0786f42de0e Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Thu, 4 Oct 2018 15:00:56 +0200 Subject: Duplicated tests and added non ascii characters --- tests/test_40_sigver.py | 424 ++++++++++++++ tests/test_50_server.py | 1105 +++++++++++++++++++++++++++++++++- tests/test_51_client.py | 1500 ++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 2953 insertions(+), 76 deletions(-) diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index f975b5ea..ba5cf639 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- import base64 from saml2.xmldsig import SIG_RSA_SHA256 @@ -420,6 +421,300 @@ class TestSecurity(): s_response, response2, class_name(response2)) +class TestSecurityNonAsciiAva(): + def setup_class(self): + # This would be one way to initialize the security context : + # + # conf = config.SPConfig() + # conf.load_file("server_conf") + # conf.only_use_keys_in_metadata = False + # + # but instead, FakeConfig() is used to really only use the minimal + # set of parameters needed for these test cases. Other test cases + # (TestSecurityMetadata below) excersise the SPConfig() mechanism. + # + conf = FakeConfig() + self.sec = sigver.security_context(conf) + + self._assertion = factory( + saml.Assertion, + version="2.0", + id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), + }) + ) + + def test_verify_1(self): + with open(SIGNED) as fp: + xml_response = fp.read() + response = self.sec.correctly_signed_response(xml_response) + assert response + + def test_non_verify_1(self): + """ unsigned is OK """ + with open(UNSIGNED) as fp: + xml_response = fp.read() + response = self.sec.correctly_signed_response(xml_response) + assert response + + def test_sign_assertion(self): + ass = self._assertion + print(ass) + sign_ass = self.sec.sign_assertion("%s" % ass, node_id=ass.id) + #print(sign_ass) + sass = saml.assertion_from_string(sign_ass) + #print(sass) + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11111" + assert time_util.str_to_time(sass.issue_instant) + + print("Crypto version : %s" % (self.sec.crypto.version())) + + item = self.sec.check_signature(sass, class_name(sass), sign_ass) + + assert isinstance(item, saml.Assertion) + + def test_multiple_signatures_assertion(self): + ass = self._assertion + # basic test with two of the same + to_sign = [(ass, ass.id, ''), + (ass, ass.id, '') + ] + sign_ass = self.sec.multiple_signatures("%s" % ass, to_sign) + sass = saml.assertion_from_string(sign_ass) + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11111" + assert time_util.str_to_time(sass.issue_instant) + + print("Crypto version : %s" % (self.sec.crypto.version())) + + item = self.sec.check_signature(sass, class_name(sass), + sign_ass, must=True) + + assert isinstance(item, saml.Assertion) + + def test_multiple_signatures_response(self): + response = factory(samlp.Response, + assertion=self._assertion, + id="22222", + signature=sigver.pre_signature_part( + "22222", self.sec.my_cert)) + + # order is important, we can't validate if the signatures are made + # in the reverse order + to_sign = [(self._assertion, self._assertion.id, ''), + (response, response.id, '')] + + s_response = self.sec.multiple_signatures("%s" % response, to_sign) + assert s_response is not None + response = response_from_string(s_response) + + item = self.sec.check_signature(response, class_name(response), + s_response, must=True) + assert item == response + assert item.id == "22222" + + s_assertion = item.assertion[0] + assert isinstance(s_assertion, saml.Assertion) + # make sure the assertion was modified when we supposedly signed it + assert s_assertion != self._assertion + + ci = "".join(sigver.cert_from_instance(s_assertion)[0].split()) + assert ci == self.sec.my_cert + + res = self.sec.check_signature(s_assertion, class_name(s_assertion), + s_response, must=True) + assert res == s_assertion + assert s_assertion.id == "11111" + assert s_assertion.version == "2.0" + assert _eq(s_assertion.keyswv(), ['attribute_statement', + 'issue_instant', + 'version', 'signature', 'id']) + + def test_sign_response(self): + response = factory(samlp.Response, + assertion=self._assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) + + to_sign = [(class_name(self._assertion), self._assertion.id), + (class_name(response), response.id)] + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) + + assert s_response is not None + print(s_response) + response = response_from_string(s_response) + sass = response.assertion[0] + + print(sass) + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11111" + + item = self.sec.check_signature(response, class_name(response), + s_response) + assert isinstance(item, samlp.Response) + assert item.id == "22222" + + def test_sign_response_2(self): + assertion2 = factory(saml.Assertion, + version="2.0", + id="11122", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11122", + self.sec + .my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Räv", ""), + ("", "", "givenName"): ("Björn", ""), + }) + ) + response = factory(samlp.Response, + assertion=assertion2, + id="22233", + signature=sigver.pre_signature_part("22233", + self.sec + .my_cert)) + + to_sign = [(class_name(assertion2), assertion2.id), + (class_name(response), response.id)] + + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) + + assert s_response is not None + response2 = response_from_string(s_response) + + sass = response2.assertion[0] + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11122" + + item = self.sec.check_signature(response2, class_name(response), + s_response) + + assert isinstance(item, samlp.Response) + + def test_sign_verify(self): + response = factory(samlp.Response, + assertion=self._assertion, + id="22233", + signature=sigver.pre_signature_part("22233", + self.sec + .my_cert)) + + to_sign = [(class_name(self._assertion), self._assertion.id), + (class_name(response), response.id)] + + s_response = sigver.signed_instance_factory(response, self.sec, + to_sign) + + print(s_response) + res = self.sec.verify_signature(s_response, + node_name=class_name(samlp.Response())) + + print(res) + assert res + + def test_sign_verify_with_cert_from_instance(self): + response = factory(samlp.Response, + assertion=self._assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) + + to_sign = [(class_name(self._assertion), self._assertion.id), + (class_name(response), response.id)] + + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) + + response2 = response_from_string(s_response) + + ci = "".join(sigver.cert_from_instance(response2)[0].split()) + + assert ci == self.sec.my_cert + + res = self.sec.verify_signature(s_response, + node_name=class_name(samlp.Response())) + + assert res + + res = self.sec._check_signature(s_response, response2, + class_name(response2), s_response) + assert res == response2 + + def test_sign_verify_assertion_with_cert_from_instance(self): + assertion = factory(saml.Assertion, + version="2.0", + id="11100", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11100", + self.sec + .my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Räv", ""), + ("", "", "givenName"): ("Björn", ""), + }) + ) + + to_sign = [(class_name(assertion), assertion.id)] + s_assertion = sigver.signed_instance_factory(assertion, self.sec, + to_sign) + print(s_assertion) + ass = assertion_from_string(s_assertion) + ci = "".join(sigver.cert_from_instance(ass)[0].split()) + assert ci == self.sec.my_cert + + res = self.sec.verify_signature(s_assertion, + node_name=class_name(ass)) + assert res + + res = self.sec._check_signature(s_assertion, ass, class_name(ass)) + + assert res + + def test_exception_sign_verify_with_cert_from_instance(self): + assertion = factory(saml.Assertion, + version="2.0", + id="11100", + issue_instant="2009-10-30T13:20:28Z", + #signature= sigver.pre_signature_part("11100", + # self.sec.my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), + }) + ) + + response = factory(samlp.Response, + assertion=assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) + + to_sign = [(class_name(response), response.id)] + + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) + + response2 = response_from_string(s_response) + # Change something that should make everything fail + response2.id = "23456" + raises(sigver.SignatureError, self.sec._check_signature, + s_response, response2, class_name(response2)) + class TestSecurityMetadata(): def setup_class(self): @@ -442,6 +737,27 @@ class TestSecurityMetadata(): ) +class TestSecurityMetadataNonAsciiAva(): + def setup_class(self): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("metadata_cert.xml")) + + conf.metadata = md + conf.only_use_keys_in_metadata = False + self.sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), }) + ) + + def test_xbox(): conf = config.SPConfig() conf.load_file("server_conf") @@ -495,6 +811,59 @@ def test_xbox(): print(assertions) +def test_xbox_non_ascii_ava(): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("idp_example.xml")) + + conf.metadata = md + conf.only_use_keys_in_metadata = False + sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", sec.my_cert, 1), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), }) + ) + + sigass = sec.sign_statement(assertion, class_name(assertion), + key_file=full_path("test.key"), + node_id=assertion.id) + + _ass0 = saml.assertion_from_string(sigass) + + encrypted_assertion = EncryptedAssertion() + encrypted_assertion.add_extension_element(_ass0) + + _, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False) + enctext = sec.crypto.encrypt( + str(encrypted_assertion), conf.cert_file, pre, "des-192", + '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]') + + decr_text = sec.decrypt(enctext) + _seass = saml.encrypted_assertion_from_string(decr_text) + assertions = [] + assers = extension_elements_to_elements(_seass.extension_elements, + [saml, samlp]) + + sign_cert_file = full_path("test.pem") + + for ass in assers: + _ass = "%s" % ass + #_ass = _ass.replace('xsi:nil="true" ', '') + #assert sigass == _ass + _txt = sec.verify_signature(_ass, sign_cert_file, + node_name=class_name(assertion)) + if _txt: + assertions.append(ass) + + print(assertions) + + def test_okta(): conf = config.Config() conf.load_file("server_conf") @@ -548,6 +917,35 @@ def test_xmlsec_err(): assert False +def test_xmlsec_err_non_ascii_ava(): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("idp_example.xml")) + + conf.metadata = md + conf.only_use_keys_in_metadata = False + sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", sec.my_cert, 1), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), }) + ) + + try: + sec.sign_statement(assertion, class_name(assertion), + key_file=full_path("tes.key"), + node_id=assertion.id) + except (XmlsecError, SigverError) as err: # should throw an exception + pass + else: + assert False + + def test_sha256_signing(): conf = config.SPConfig() conf.load_file("server_conf") @@ -574,6 +972,32 @@ def test_sha256_signing(): assert s +def test_sha256_signing_non_ascii_ava(): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("idp_example.xml")) + + conf.metadata = md + conf.only_use_keys_in_metadata = False + sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", sec.my_cert, 1, + sign_alg=SIG_RSA_SHA256), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), }) + ) + + s = sec.sign_statement(assertion, class_name(assertion), + key_file=full_path("test.key"), + node_id=assertion.id) + assert s + + def test_xmlsec_output_line_parsing(): output1 = "prefix\nOK\npostfix" assert sigver.parse_xmlsec_output(output1) diff --git a/tests/test_50_server.py b/tests/test_50_server.py index f0dcae3c..11ace47b 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -1185,7 +1185,1110 @@ class TestServer1(): idp.ident.close() assert request -#------------------------------------------------------------------------ +# ------------------------------------------------------------------------ + + +class TestServer1NonAsciiAva(): + + def setup_class(self): + self.server = Server("idp_conf") + + conf = config.SPConfig() + conf.load_file("server_conf") + self.client = client.Saml2Client(conf) + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") + self.ava = {"givenName": ["Dave"], "sn": ["Concepción"], + "mail": ["dave@cnr.mlb.com"], "title": "#13"} + + def teardown_class(self): + self.server.close() + + def verify_assertion(self, assertion): + assert assertion + assert assertion[0].attribute_statement + + ava = get_ava(assertion[0]) + + assert ava == \ + {"givenName": ["Dave"], "sn": [u"Concepción"], + "mail": ["dave@cnr.mlb.com"], "title": ["#13"]} + + + def verify_encrypted_assertion(self, assertion, decr_text): + self.verify_assertion(assertion) + assert assertion[0].signature is None + + assert 'EncryptedAssertion> 0: x = xmldoc.rindex("<", 0, s) @@ -88,7 +93,7 @@ def add_subelement(xmldoc, node_name, subelem): spaces += " " c += 1 # Sometimes we get an xml header, sometimes we don't. - subelem_str = str(subelem) + subelem_str = _str(subelem) if subelem_str[0:5].lower() == '%s" % (tag, node_name, spaces, subelem_str, tag, node_name)) - return xmldoc + return xmldoc + + +def for_me(condition, me): + for restriction in condition.audience_restriction: + audience = restriction.audience + if audience.text.strip() == me: + return True + + +def ava(attribute_statement): + result = {} + for attribute in attribute_statement.attribute: + # Check name_format ?? + name = attribute.name.strip() + result[name] = [] + for value in attribute.attribute_value: + result[name].append(value.text.strip()) + return result + + +def _leq(l1, l2): + return set(l1) == set(l2) + + +REQ1 = {"1.2.14": """ +urn:mace:example.com:saml:roland:spE8042FB4-4D5B-48C3-8E14-8EDD852790DD""", + "1.2.16": """ +urn:mace:example +.com:saml:roland:spE8042FB4-4D5B-48C3-8E14-8EDD852790DD"""} + +nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, + text="123456") + + +def list_values2simpletons(_dict): + return dict([(k, v[0]) for k, v in _dict.items()]) + + +class TestClient: + def setup_class(self): + self.server = Server("idp_conf") + + conf = config.SPConfig() + conf.load_file("server_conf") + self.client = Saml2Client(conf) + + def teardown_class(self): + self.server.close() + + def test_create_attribute_query1(self): + req_id, req = self.client.create_attribute_query( + "https://idp.example.com/idp/", + "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", + format=saml.NAMEID_FORMAT_PERSISTENT, + message_id="id1") + reqstr = "%s" % req.to_string().decode() + + assert req.destination == "https://idp.example.com/idp/" + assert req.id == "id1" + assert req.version == "2.0" + subject = req.subject + name_id = subject.name_id + assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT + assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD" + issuer = req.issuer + assert issuer.text == "urn:mace:example.com:saml:roland:sp" + + attrq = samlp.attribute_query_from_string(reqstr) + + assert _leq(attrq.keyswv(), ['destination', 'subject', 'issue_instant', + 'version', 'id', 'issuer']) + + assert attrq.destination == req.destination + assert attrq.id == req.id + assert attrq.version == req.version + assert attrq.issuer.text == issuer.text + assert attrq.issue_instant == req.issue_instant + assert attrq.subject.name_id.format == name_id.format + assert attrq.subject.name_id.text == name_id.text + + def test_create_attribute_query2(self): + req_id, req = self.client.create_attribute_query( + "https://idp.example.com/idp/", + "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", + attribute={ + ("urn:oid:2.5.4.42", + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + "givenName"): None, + ("urn:oid:2.5.4.4", + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + "surname"): None, + ("urn:oid:1.2.840.113549.1.9.1", + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None, + }, + format=saml.NAMEID_FORMAT_PERSISTENT, + message_id="id1") + + assert req.destination == "https://idp.example.com/idp/" + assert req.id == "id1" + assert req.version == "2.0" + subject = req.subject + name_id = subject.name_id + assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT + assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD" + assert len(req.attribute) == 3 + # one is givenName + seen = [] + for attribute in req.attribute: + if attribute.name == "urn:oid:2.5.4.42": + assert attribute.name_format == saml.NAME_FORMAT_URI + assert attribute.friendly_name == "givenName" + seen.append("givenName") + elif attribute.name == "urn:oid:2.5.4.4": + assert attribute.name_format == saml.NAME_FORMAT_URI + assert attribute.friendly_name == "surname" + seen.append("surname") + elif attribute.name == "urn:oid:1.2.840.113549.1.9.1": + assert attribute.name_format == saml.NAME_FORMAT_URI + if getattr(attribute, "friendly_name"): + assert False + seen.append("email") + assert _leq(seen, ["givenName", "surname", "email"]) + + def test_create_attribute_query_3(self): + req_id, req = self.client.create_attribute_query( + "https://aai-demo-idp.switch.ch/idp/shibboleth", + "_e7b68a04488f715cda642fbdd90099f5", + format=saml.NAMEID_FORMAT_TRANSIENT, + message_id="id1") + + assert isinstance(req, samlp.AttributeQuery) + assert req.destination == "https://aai-demo-idp.switch" \ + ".ch/idp/shibboleth" + assert req.id == "id1" + assert req.version == "2.0" + assert req.issue_instant + assert req.issuer.text == "urn:mace:example.com:saml:roland:sp" + nameid = req.subject.name_id + assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT + assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5" + + def test_create_auth_request_0(self): + ar_str = "%s" % self.client.create_authn_request( + "http://www.example.com/sso", message_id="id1")[1] + + ar = samlp.authn_request_from_string(ar_str) + assert ar.assertion_consumer_service_url == ("http://lingon.catalogix" + ".se:8087/") + assert ar.destination == "http://www.example.com/sso" + assert ar.protocol_binding == BINDING_HTTP_POST + assert ar.version == "2.0" + assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" + assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" + nid_policy = ar.name_id_policy + assert nid_policy.allow_create == "false" + assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT + + node_requested_attributes = None + for e in ar.extensions.extension_elements: + if e.tag == RequestedAttributes.c_tag: + node_requested_attributes = e + break + assert node_requested_attributes is not None + + for c in node_requested_attributes.children: + assert c.tag == RequestedAttribute.c_tag + assert c.attributes['isRequired'] in ['true', 'false'] + assert c.attributes['Name'] + assert c.attributes['FriendlyName'] + assert c.attributes['NameFormat'] + + def test_create_auth_request_unset_force_authn(self): + req_id, req = self.client.create_authn_request( + "http://www.example.com/sso", sign=False, message_id="id1") + assert bool(req.force_authn) == False + + def test_create_auth_request_set_force_authn(self): + req_id, req = self.client.create_authn_request( + "http://www.example.com/sso", sign=False, message_id="id1", + force_authn="true") + assert bool(req.force_authn) == True + + def test_create_auth_request_nameid_policy_allow_create(self): + conf = config.SPConfig() + conf.load_file("sp_conf_nameidpolicy") + client = Saml2Client(conf) + ar_str = "%s" % client.create_authn_request( + "http://www.example.com/sso", message_id="id1")[1] + + ar = samlp.authn_request_from_string(ar_str) + assert ar.assertion_consumer_service_url == ("http://lingon.catalogix" + ".se:8087/") + assert ar.destination == "http://www.example.com/sso" + assert ar.protocol_binding == BINDING_HTTP_POST + assert ar.version == "2.0" + assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" + assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" + nid_policy = ar.name_id_policy + assert nid_policy.allow_create == "true" + assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT + + def test_create_auth_request_vo(self): + assert list(self.client.config.vorg.keys()) == [ + "urn:mace:example.com:it:tek"] + + ar_str = "%s" % self.client.create_authn_request( + "http://www.example.com/sso", + "urn:mace:example.com:it:tek", # vo + nameid_format=NAMEID_FORMAT_PERSISTENT, + message_id="666")[1] + + ar = samlp.authn_request_from_string(ar_str) + assert ar.id == "666" + assert ar.assertion_consumer_service_url == "http://lingon.catalogix" \ + ".se:8087/" + assert ar.destination == "http://www.example.com/sso" + assert ar.protocol_binding == BINDING_HTTP_POST + assert ar.version == "2.0" + assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" + assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" + nid_policy = ar.name_id_policy + assert nid_policy.allow_create == "false" + assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT + assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek" + + def test_sign_auth_request_0(self): + req_id, areq = self.client.create_authn_request( + "http://www.example.com/sso", sign=True, message_id="id1") + + ar_str = "%s" % areq + ar = samlp.authn_request_from_string(ar_str) + + assert ar + assert ar.signature + assert ar.signature.signature_value + signed_info = ar.signature.signed_info + assert len(signed_info.reference) == 1 + assert signed_info.reference[0].uri == "#id1" + assert signed_info.reference[0].digest_value + try: + assert self.client.sec.correctly_signed_authn_request( + ar_str, self.client.config.xmlsec_binary, + self.client.config.metadata) + except Exception: # missing certificate + self.client.sec.verify_signature(ar_str, node_name=class_name(ar)) + + def test_create_logout_request(self): + req_id, req = self.client.create_logout_request( + "http://localhost:8088/slo", "urn:mace:example.com:saml:roland:idp", + name_id=nid, reason="Tired", expire=in_a_while(minutes=15), + session_indexes=["_foo"]) + + assert req.destination == "http://localhost:8088/slo" + assert req.reason == "Tired" + assert req.version == "2.0" + assert req.name_id == nid + assert req.issuer.text == "urn:mace:example.com:saml:roland:sp" + assert req.session_index == [SessionIndex("_foo")] + + def test_response_1(self): + IDP = "urn:mace:example.com:saml:roland:idp" + + ava = {"givenName": ["Derek"], "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": ["The man"]} + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id_policy=nameid_policy, + sign_response=True, + userid="foba0001@example.com", + authn=AUTHN) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + assert authn_response is not None + assert authn_response.issuer() == IDP + assert authn_response.response.assertion[0].issuer.text == IDP + session_info = authn_response.session_info() + + assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'], + 'givenName': ['Derek'], + 'sn': ['Jeter'], + 'title': ["The man"]} + assert session_info["issuer"] == IDP + assert session_info["came_from"] == "http://foo.example.com/service" + response = samlp.response_from_string(authn_response.xmlstr) + assert response.destination == "http://lingon.catalogix.se:8087/" + assert "session_index" in session_info + + # One person in the cache + assert len(self.client.users.subjects()) == 1 + subject_id = self.client.users.subjects()[0] + # The information I have about the subject comes from one source + assert self.client.users.issuers_of_info(subject_id) == [IDP] + + # --- authenticate another person + + ava = {"givenName": ["Alfonson"], "sn": ["Soriano"], + "mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]} + + resp_str = "%s" % self.server.create_authn_response( + identity=ava, + in_response_to="id2", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + sign_response=True, + name_id_policy=nameid_policy, + userid="also0001@example.com", + authn=AUTHN) + + resp_str = encode_fn(resp_str.encode()) + + self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id2": "http://foo.example.com/service"}) + + # Two persons in the cache + assert len(self.client.users.subjects()) == 2 + issuers = [self.client.users.issuers_of_info(s) for s in + self.client.users.subjects()] + # The information I have about the subjects comes from the same source + assert issuers == [[IDP], [IDP]] + + def test_response_2(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + cert_str, cert_key_str = generate_cert() + + cert = \ + { + "cert": cert_str, + "key": cert_key_str + } + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice=cert_str + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_3(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True, + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_4(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_5(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_str, cert_key_str = generate_cert() + + cert = \ + { + "cert": cert_str, + "key": cert_key_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_assertion=cert_str + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_6(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_assertion_str, cert_key_assertion_str = generate_cert() + + cert_assertion = \ + { + "cert": cert_assertion_str, + "key": cert_key_assertion_str + } + + cert_advice_str, cert_key_advice_str = generate_cert() + + cert_advice = \ + { + "cert": cert_advice_str, + "key": cert_key_advice_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_assertion=cert_assertion_str, + encrypt_cert_advice=cert_advice_str + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, + {"id1": [cert_assertion, cert_advice]}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_7(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=True, + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_8(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_str, cert_key_str = generate_cert() + + cert = \ + { + "cert": cert_str, + "key": cert_key_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypt_cert_assertion=cert_str + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_no_name_id(self): + """ Test that the SP client can parse an authentication response + from an IdP that does not contain a element.""" + + conf = config.SPConfig() + conf.load_file("server_conf") + client = Saml2Client(conf) + + # Use the same approach as the other tests for mocking up + # an authentication response to parse. + idp, ava, ava_verify, nameid_policy = ( + self.setup_verify_authn_response() + ) + + # Mock up an authentication response but do not encrypt it + # nor sign it since below we will modify it directly. Note that + # setting name_id to None still results in a response that includes + # a element. + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=None, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=False, + sign_assertion=False, + encrypt_assertion=False, + encrypt_assertion_self_contained=False + ) + + # The create_authn_response method above will return an instance + # of saml2.samlp.Response when neither encrypting nor signing and + # so we can remove the element directly. + resp.assertion.subject.name_id = None + + # Assert that the response does not contain a NameID element so that + # the parsing below is a fair test. + assert str(resp).find("NameID") == -1 + + # Cast the response to a string and encode it to mock up the payload + # the SP client is expected to receive via HTTP POST binding. + resp_str = encode_fn(str(resp).encode()) + + # We do not need the client to verify a signature for this test. + client.want_assertions_signed = False + client.want_response_signed = False + + # Parse the authentication response that does not include a . + authn_response = client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + # A successful test is parsing the response. + assert authn_response is not None + + def setup_verify_authn_response(self): + idp = "urn:mace:example.com:saml:roland:idp" + ava = {"givenName": ["Derek"], "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": ["The man"]} + ava_verify = {'mail': ['derek@nyy.mlb.com'], 'givenName': ['Derek'], + 'sn': ['Jeter'], 'title': ["The man"]} + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + return idp, ava, ava_verify, nameid_policy + + def verify_authn_response(self, idp, authn_response, _client, ava_verify): + assert authn_response is not None + assert authn_response.issuer() == idp + assert authn_response.assertion.issuer.text == idp + session_info = authn_response.session_info() + + assert session_info["ava"] == ava_verify + assert session_info["issuer"] == idp + assert session_info["came_from"] == "http://foo.example.com/service" + response = samlp.response_from_string(authn_response.xmlstr) + assert response.destination == "http://lingon.catalogix.se:8087/" + + # One person in the cache + assert len(_client.users.subjects()) == 1 + subject_id = _client.users.subjects()[0] + # The information I have about the subject comes from one source + assert _client.users.issuers_of_info(subject_id) == [idp] + + def test_init_values(self): + entityid = self.client.config.entityid + assert entityid == "urn:mace:example.com:saml:roland:sp" + location = self.client._sso_location() + assert location == 'http://localhost:8088/sso' + my_name = self.client._my_name() + assert my_name == "urn:mace:example.com:saml:roland:sp" + + def test_sign_then_encrypt_assertion(self): + # Begin with the IdPs side + _sec = self.server.sec + + assertion = s_utils.assertion_factory( + subject=factory(saml.Subject, text="_aaa", + name_id=factory( + saml.NameID, + format=saml.NAMEID_FORMAT_TRANSIENT)), + attribute_statement=do_attribute_statement( + { + ("", "", "sn"): ("Jeter", ""), + ("", "", "givenName"): ("Derek", ""), + } + ), + issuer=self.server._issuer(), + ) + + assertion.signature = sigver.pre_signature_part( + assertion.id, _sec.my_cert, 1) + + sigass = _sec.sign_statement(assertion, class_name(assertion), + key_file=full_path("test.key"), + node_id=assertion.id) + # Create an Assertion instance from the signed assertion + _ass = saml.assertion_from_string(sigass) + + response = sigver.response_factory( + in_response_to="_012345", + destination="https:#www.example.com", + status=s_utils.success_status_factory(), + issuer=self.server._issuer(), + assertion=_ass + ) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 0]["cert_file"], + pre_encryption_part()) + + seresp = samlp.response_from_string(enctext) + + # Now over to the client side + _csec = self.client.sec + if seresp.encrypted_assertion: + decr_text = _csec.decrypt(enctext) + seresp = samlp.response_from_string(decr_text) + resp_ass = [] + + sign_cert_file = full_path("test.pem") + for enc_ass in seresp.encrypted_assertion: + assers = extension_elements_to_elements( + enc_ass.extension_elements, [saml, samlp]) + for ass in assers: + if ass.signature: + if not _csec.verify_signature("%s" % ass, + sign_cert_file, + node_name=class_name( + ass)): + continue + resp_ass.append(ass) + + seresp.assertion = resp_ass + seresp.encrypted_assertion = None + + assert seresp.assertion + + def test_sign_then_encrypt_assertion2(self): + # Begin with the IdPs side + _sec = self.server.sec + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + asser = Assertion({"givenName": "Derek", "sn": "Jeter"}) + farg = add_path( + {}, + ['assertion', 'subject', 'subject_confirmation', 'method', + saml.SCM_BEARER]) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'in_response_to', + '_012345']) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'recipient', + "http://lingon.catalogix.se:8087/"]) + + assertion = asser.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + name_id=factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + farg=farg['assertion'] + ) + + assertion.signature = sigver.pre_signature_part( + assertion.id, _sec.my_cert, 1) + + sigass = _sec.sign_statement(assertion, class_name(assertion), + key_file=self.client.sec.key_file, + node_id=assertion.id) + + sigass = rm_xmltag(sigass) + response = sigver.response_factory( + in_response_to="_012345", + destination="http://lingon.catalogix.se:8087/", + status=s_utils.success_status_factory(), + issuer=self.server._issuer(), + encrypted_assertion=EncryptedAssertion() + ) + + xmldoc = "%s" % response + # strangely enough I get different tags if I run this test separately + # or as part of a bunch of tests. + xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass) + + enctext = _sec.crypto.encrypt_assertion(xmldoc, + self.client.sec.encryption_keypairs[ + 1]["cert_file"], + pre_encryption_part()) + + # seresp = samlp.response_from_string(enctext) + + resp_str = encode_fn(enctext.encode()) + # Now over to the client side + # Explicitely allow unsigned responses for this and the following 2 tests + self.client.want_response_signed = False + resp = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"_012345": "http://foo.example.com/service"}) + + # assert resp.encrypted_assertion == [] + assert resp.assertion + assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']} + + def test_sign_then_encrypt_assertion_advice_1(self): + # Begin with the IdPs side + _sec = self.server.sec + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + asser = Assertion({"givenName": "Derek", "sn": "Jeter"}) + + subject_confirmation_specs = { + 'recipient': "http://lingon.catalogix.se:8087/", + 'in_response_to': "_012345", + 'subject_confirmation_method': saml.SCM_BEARER + } + name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT) + + farg = add_path( + {}, + ['assertion', 'subject', 'subject_confirmation', 'method', + saml.SCM_BEARER]) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'in_response_to', + '_012345']) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'recipient', + "http://lingon.catalogix.se:8087/"]) + + assertion = asser.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + name_id=name_id, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + farg=farg['assertion']) + + a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"}) + a_assertion = a_asser.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_assertion.signature = sigver.pre_signature_part( + a_assertion.id, _sec.my_cert, 1) + + assertion.advice = Advice() + + assertion.advice.encrypted_assertion = [] + assertion.advice.encrypted_assertion.append(EncryptedAssertion()) + + assertion.advice.encrypted_assertion[0].add_extension_element( + a_assertion) + + response = sigver.response_factory( + in_response_to="_012345", + destination="http://lingon.catalogix.se:8087/", + status=s_utils.success_status_factory(), + issuer=self.server._issuer() + ) + + response.assertion.append(assertion) + + response = _sec.sign_statement("%s" % response, class_name(a_assertion), + key_file=self.client.sec.key_file, + node_id=a_assertion.id) + + # xmldoc = "%s" % response + # strangely enough I get different tags if I run this test separately + # or as part of a bunch of tests. + # xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 0]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + + # seresp = samlp.response_from_string(enctext) + + resp_str = encode_fn(enctext.encode()) + # Now over to the client side + resp = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"_012345": "http://foo.example.com/service"}) + + # assert resp.encrypted_assertion == [] + assert resp.assertion + assert resp.assertion.advice + assert resp.assertion.advice.assertion + assert resp.ava == \ + {'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], + 'email': ['test.testsson@test.se']} + + def test_sign_then_encrypt_assertion_advice_2(self): + # Begin with the IdPs side + _sec = self.server.sec + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + asser_1 = Assertion({"givenName": "Derek"}) + + farg = add_path( + {}, + ['assertion', 'subject', 'subject_confirmation', 'method', + saml.SCM_BEARER]) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'in_response_to', + '_012345']) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'recipient', + "http://lingon.catalogix.se:8087/"]) + name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT) + + assertion_1 = asser_1.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + asser_2 = Assertion({"sn": "Jeter"}) + + assertion_2 = asser_2.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_asser_1 = Assertion({"uid": "test01"}) + a_assertion_1 = a_asser_1.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_asser_2 = Assertion({"email": "test.testsson@test.se"}) + a_assertion_2 = a_asser_2.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_asser_3 = Assertion({"street": "street"}) + a_assertion_3 = a_asser_3.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_asser_4 = Assertion({"title": "title"}) + a_assertion_4 = a_asser_4.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_assertion_1.signature = sigver.pre_signature_part( + a_assertion_1.id, _sec.my_cert, 1) + + a_assertion_2.signature = sigver.pre_signature_part( + a_assertion_2.id, _sec.my_cert, 1) + + a_assertion_3.signature = sigver.pre_signature_part( + a_assertion_3.id, _sec.my_cert, 1) + + a_assertion_4.signature = sigver.pre_signature_part( + a_assertion_4.id, _sec.my_cert, 1) + + assertion_1.signature = sigver.pre_signature_part(assertion_1.id, + _sec.my_cert, 1) + + assertion_2.signature = sigver.pre_signature_part(assertion_2.id, + _sec.my_cert, 1) + + response = sigver.response_factory( + in_response_to="_012345", + destination="http://lingon.catalogix.se:8087/", + status=s_utils.success_status_factory(), + issuer=self.server._issuer() + ) + + response.assertion = assertion_1 + + response.assertion.advice = Advice() + + response.assertion.advice.encrypted_assertion = [] + response.assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + + response.assertion.advice.encrypted_assertion[0].add_extension_element( + a_assertion_1) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_1._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, + class_name(a_assertion_1), + key_file=self.server.sec.key_file, + node_id=a_assertion_1.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 1]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response.assertion = response.assertion[0] + + response.assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + response.assertion.advice.encrypted_assertion[1].add_extension_element( + a_assertion_2) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_2._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, + class_name(a_assertion_2), + key_file=self.server.sec.key_file, + node_id=a_assertion_2.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 0]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response.assertion = response.assertion[0] + + assertion_tag = response.assertion._to_element_tree().tag + response = pre_encrypt_assertion(response) + response = \ + response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion( + assertion_tag) + + response = _sec.sign_statement("%s" % response, class_name(assertion_1), + key_file=self.server.sec.key_file, + node_id=assertion_1.id) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 1]["cert_file"], + pre_encryption_part()) + + response = samlp.response_from_string(enctext) + + response.assertion = assertion_2 + + response.assertion.advice = Advice() + + response.assertion.advice.encrypted_assertion = [] + response.assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + + response.assertion.advice.encrypted_assertion[0].add_extension_element( + a_assertion_3) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_3._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, + class_name(a_assertion_3), + key_file=self.server.sec.key_file, + node_id=a_assertion_3.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 0]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) -def for_me(condition, me): - for restriction in condition.audience_restriction: - audience = restriction.audience - if audience.text.strip() == me: - return True + response = samlp.response_from_string(enctext) + response.assertion = response.assertion[0] -def ava(attribute_statement): - result = {} - for attribute in attribute_statement.attribute: - # Check name_format ?? - name = attribute.name.strip() - result[name] = [] - for value in attribute.attribute_value: - result[name].append(value.text.strip()) - return result + response.assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + response.assertion.advice.encrypted_assertion[1].add_extension_element( + a_assertion_4) -def _leq(l1, l2): - return set(l1) == set(l2) + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_4._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + response = _sec.sign_statement("%s" % response, + class_name(a_assertion_4), + key_file=self.server.sec.key_file, + node_id=a_assertion_4.id) -REQ1 = {"1.2.14": """ -urn:mace:example.com:saml:roland:spE8042FB4-4D5B-48C3-8E14-8EDD852790DD""", - "1.2.16": """ -urn:mace:example -.com:saml:roland:spE8042FB4-4D5B-48C3-8E14-8EDD852790DD"""} + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) -nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, - text="123456") + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 1]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + response = samlp.response_from_string(enctext) -def list_values2simpletons(_dict): - return dict([(k, v[0]) for k, v in _dict.items()]) + response = _sec.sign_statement("%s" % response, + class_name(response.assertion[0]), + key_file=self.server.sec.key_file, + node_id=response.assertion[0].id) + response = samlp.response_from_string(response) -class TestClient: + # seresp = samlp.response_from_string(enctext) + + resp_str = encode_fn(str(response).encode()) + # Now over to the client side + resp = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"_012345": "http://foo.example.com/service"}) + + # assert resp.encrypted_assertion == [] + assert resp.assertion + assert resp.assertion.advice + assert resp.assertion.advice.assertion + assert resp.ava == \ + {'street': ['street'], 'uid': ['test01'], 'title': ['title'], + 'givenName': ['Derek'], 'email': + ['test.testsson@test.se'], 'sn': ['Jeter']} + + def test_signed_redirect(self): + + # Revert configuration change to disallow unsinged responses + self.client.want_response_signed = True + + msg_str = "%s" % self.client.create_authn_request( + "http://localhost:8088/sso", message_id="id1")[1] + + info = self.client.apply_binding( + BINDING_HTTP_REDIRECT, msg_str, destination="", + relay_state="relay2", sigalg=SIG_RSA_SHA256) + + loc = info["headers"][0][1] + qs = parse_qs(loc[1:]) + assert _leq(qs.keys(), + ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']) + + assert verify_redirect_signature(list_values2simpletons(qs), + self.client.sec.sec_backend) + + res = self.server.parse_authn_request(qs["SAMLRequest"][0], + BINDING_HTTP_REDIRECT) + + def test_do_logout_signed_redirect(self): + conf = config.SPConfig() + conf.load_file("sp_slo_redirect_conf") + client = Saml2Client(conf) + + # information about the user from an IdP + session_info = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": in_a_while(minutes=15), + "ava": { + "givenName": "Anders", + "sn": "Andersson", + "mail": "anders.andersson@example.com" + } + } + client.users.add_information_about_person(session_info) + entity_ids = client.users.issuers_of_info(nid) + assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] + + resp = client.do_logout(nid, entity_ids, "Tired", in_a_while(minutes=5), + sign=True, + expected_binding=BINDING_HTTP_REDIRECT) + + assert list(resp.keys()) == entity_ids + binding, info = resp[entity_ids[0]] + assert binding == BINDING_HTTP_REDIRECT + + loc = info["headers"][0][1] + _, _, _, _, qs, _ = urlparse(loc) + qs = parse_qs(qs) + assert _leq(qs.keys(), + ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']) + + assert verify_redirect_signature(list_values2simpletons(qs), + client.sec.sec_backend) + + res = self.server.parse_logout_request(qs["SAMLRequest"][0], + BINDING_HTTP_REDIRECT) + + def test_do_logout_post(self): + # information about the user from an IdP + session_info = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": in_a_while(minutes=15), + "ava": { + "givenName": "Anders", + "sn": "Andersson", + "mail": "anders.andersson@example.com" + }, + "session_index": SessionIndex("_foo") + } + self.client.users.add_information_about_person(session_info) + entity_ids = self.client.users.issuers_of_info(nid) + assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] + resp = self.client.do_logout(nid, entity_ids, "Tired", + in_a_while(minutes=5), sign=True, + expected_binding=BINDING_HTTP_POST) + assert resp + assert len(resp) == 1 + assert list(resp.keys()) == entity_ids + binding, info = resp[entity_ids[0]] + assert binding == BINDING_HTTP_POST + + _dic = unpack_form(info["data"]) + res = self.server.parse_logout_request(_dic["SAMLRequest"], + BINDING_HTTP_POST) + assert b'_foo' in res.xmlstr + + def test_do_logout_session_expired(self): + # information about the user from an IdP + session_info = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": a_while_ago(minutes=15), + "ava": { + "givenName": "Anders", + "sn": "Andersson", + "mail": "anders.andersson@example.com" + }, + "session_index": SessionIndex("_foo") + } + self.client.users.add_information_about_person(session_info) + entity_ids = self.client.users.issuers_of_info(nid) + assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] + resp = self.client.do_logout(nid, entity_ids, "Tired", + in_a_while(minutes=5), sign=True, + expected_binding=BINDING_HTTP_POST) + assert resp + assert len(resp) == 1 + assert list(resp.keys()) == entity_ids + binding, info = resp[entity_ids[0]] + assert binding == BINDING_HTTP_POST + + _dic = unpack_form(info["data"]) + res = self.server.parse_logout_request(_dic["SAMLRequest"], + BINDING_HTTP_POST) + assert b'_foo' in res.xmlstr + + +class TestClientNonAsciiAva: def setup_class(self): self.server = Server("idp_conf") @@ -373,8 +1712,8 @@ class TestClient: def test_response_1(self): IDP = "urn:mace:example.com:saml:roland:idp" - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "title": ["The man"]} + ava = {"givenName": ["Dave"], "sn": ["Concepción"], + "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) @@ -391,7 +1730,7 @@ class TestClient: resp_str = "%s" % resp - resp_str = encode_fn(resp_str.encode()) + resp_str = encode_fn(resp_str.encode('utf-8')) authn_response = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, @@ -402,10 +1741,8 @@ class TestClient: assert authn_response.response.assertion[0].issuer.text == IDP session_info = authn_response.session_info() - assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'], - 'givenName': ['Derek'], - 'sn': ['Jeter'], - 'title': ["The man"]} + assert session_info["ava"] == {"givenName": ["Dave"], "sn": [u"Concepción"], + "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} assert session_info["issuer"] == IDP assert session_info["came_from"] == "http://foo.example.com/service" response = samlp.response_from_string(authn_response.xmlstr) @@ -740,6 +2077,11 @@ class TestClient: """ Test that the SP client can parse an authentication response from an IdP that does not contain a element.""" + if six.PY2: + _bytes = str + else: + _bytes = bytes + conf = config.SPConfig() conf.load_file("server_conf") client = Saml2Client(conf) @@ -779,7 +2121,11 @@ class TestClient: # Cast the response to a string and encode it to mock up the payload # the SP client is expected to receive via HTTP POST binding. - resp_str = encode_fn(str(resp).encode()) + if six.PY2: + resp_str = encode_fn(str(resp)) + else: + resp_str = encode_fn(bytes(str(resp), 'utf-8')) + # We do not need the client to verify a signature for this test. client.want_assertions_signed = False @@ -795,10 +2141,10 @@ class TestClient: def setup_verify_authn_response(self): idp = "urn:mace:example.com:saml:roland:idp" - ava = {"givenName": ["Derek"], "sn": ["Jeter"], - "mail": ["derek@nyy.mlb.com"], "title": ["The man"]} - ava_verify = {'mail': ['derek@nyy.mlb.com'], 'givenName': ['Derek'], - 'sn': ['Jeter'], 'title': ["The man"]} + ava = {"givenName": ["Dave"], "sn": ["Concepción"], + "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} + ava_verify = {"givenName": ["Dave"], "sn": [u"Concepción"], + "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) return idp, ava, ava_verify, nameid_policy @@ -903,7 +2249,7 @@ class TestClient: nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) - asser = Assertion({"givenName": "Derek", "sn": "Jeter"}) + asser = Assertion({"givenName": "Dave", "sn": "Concepción"}) farg = add_path( {}, ['assertion', 'subject', 'subject_confirmation', 'method', @@ -966,7 +2312,7 @@ class TestClient: # assert resp.encrypted_assertion == [] assert resp.assertion - assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']} + assert resp.ava == {"sn": [u"Concepción"], "givenName": ["Dave"]} def test_sign_then_encrypt_assertion_advice_1(self): # Begin with the IdPs side @@ -975,7 +2321,7 @@ class TestClient: nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) - asser = Assertion({"givenName": "Derek", "sn": "Jeter"}) + asser = Assertion({"givenName": "Dave", "sn": "Concepción"}) subject_confirmation_specs = { 'recipient': "http://lingon.catalogix.se:8087/", @@ -1059,7 +2405,11 @@ class TestClient: # seresp = samlp.response_from_string(enctext) - resp_str = encode_fn(enctext.encode()) + if six.PY2: + resp_str = encode_fn(enctext.encode('utf-8')) + else: + resp_str = encode_fn(bytes(enctext, 'utf-8')) + # Now over to the client side resp = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, @@ -1070,7 +2420,7 @@ class TestClient: assert resp.assertion.advice assert resp.assertion.advice.assertion assert resp.ava == \ - {'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], + {'givenName': ['Dave'], 'sn': [u'Concepción'], 'uid': ['test01'], 'email': ['test.testsson@test.se']} def test_sign_then_encrypt_assertion_advice_2(self): @@ -1080,7 +2430,7 @@ class TestClient: nameid_policy = samlp.NameIDPolicy(allow_create="false", format=saml.NAMEID_FORMAT_PERSISTENT) - asser_1 = Assertion({"givenName": "Derek"}) + asser_1 = Assertion({"givenName": "Dave"}) farg = add_path( {}, @@ -1106,7 +2456,7 @@ class TestClient: name_id=name_id, farg=farg['assertion']) - asser_2 = Assertion({"sn": "Jeter"}) + asser_2 = Assertion({"sn": "Concepción"}) assertion_2 = asser_2.construct( self.client.config.entityid, @@ -1344,7 +2694,8 @@ class TestClient: # seresp = samlp.response_from_string(enctext) - resp_str = encode_fn(str(response).encode()) + resp_str = encode_fn(response.to_string()) + # Now over to the client side resp = self.client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, @@ -1356,8 +2707,8 @@ class TestClient: assert resp.assertion.advice.assertion assert resp.ava == \ {'street': ['street'], 'uid': ['test01'], 'title': ['title'], - 'givenName': ['Derek'], 'email': - ['test.testsson@test.se'], 'sn': ['Jeter']} + 'givenName': ['Dave'], 'email': + ['test.testsson@test.se'], 'sn': [u'Concepción']} def test_signed_redirect(self): @@ -1394,8 +2745,8 @@ class TestClient: "not_on_or_after": in_a_while(minutes=15), "ava": { "givenName": "Anders", - "sn": "Andersson", - "mail": "anders.andersson@example.com" + "sn": "Österberg", + "mail": "anders.osterberg@example.com" } } client.users.add_information_about_person(session_info) @@ -1430,8 +2781,8 @@ class TestClient: "not_on_or_after": in_a_while(minutes=15), "ava": { "givenName": "Anders", - "sn": "Andersson", - "mail": "anders.andersson@example.com" + "sn": "Österberg", + "mail": "anders.osterberg@example.com" }, "session_index": SessionIndex("_foo") } @@ -1460,8 +2811,8 @@ class TestClient: "not_on_or_after": a_while_ago(minutes=15), "ava": { "givenName": "Anders", - "sn": "Andersson", - "mail": "anders.andersson@example.com" + "sn": "Österberg", + "mail": "anders.osterberg@example.com" }, "session_index": SessionIndex("_foo") } @@ -1482,7 +2833,6 @@ class TestClient: BINDING_HTTP_POST) assert b'_foo' in res.xmlstr - # Below can only be done with dummy Server IDP = "urn:mace:example.com:saml:roland:idp" @@ -1553,8 +2903,8 @@ class TestClientWithDummy(): "not_on_or_after": in_a_while(minutes=15), "ava": { "givenName": "Anders", - "sn": "Andersson", - "mail": "anders.andersson@example.com" + "sn": "Österberg", + "mail": "anders.osterberg@example.com" } } self.client.users.add_information_about_person(session_info) @@ -1658,8 +3008,8 @@ class TestClientNoConfigContext(): "not_on_or_after": in_a_while(minutes=15), "ava": { "givenName": "Anders", - "sn": "Andersson", - "mail": "anders.andersson@example.com" + "sn": "Österberg", + "mail": "anders.osterberg@example.com" } } self.client.users.add_information_about_person(session_info) -- cgit v1.2.1