diff options
author | Ivan Kanakarakis <ivan.kanak@gmail.com> | 2018-10-10 14:37:40 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-10 14:37:40 +0300 |
commit | 691e981b0ddca9b7d75fa468ef31f494e80c0268 (patch) | |
tree | c131fab7847486aeba461ed448abad1712f0233b /tests | |
parent | c5c7e2ded26bf02ea51723708c1ee554e26b6811 (diff) | |
parent | 35fc1dc813884d4ade6f463e418ecc751f342608 (diff) | |
download | pysaml2-691e981b0ddca9b7d75fa468ef31f494e80c0268.tar.gz |
Merge pull request #550 from johanlundberg/non_ascii_ava_encryption_decryption
Support non-ascii attribute values for encryption and decryption
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_40_sigver.py | 424 | ||||
-rw-r--r-- | tests/test_50_server.py | 1105 | ||||
-rw-r--r-- | tests/test_51_client.py | 1360 |
3 files changed, 2883 insertions, 6 deletions
diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index f975b5ea..ba5cf639 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- import base64 from saml2.xmldsig import SIG_RSA_SHA256 @@ -420,6 +421,300 @@ class TestSecurity(): s_response, response2, class_name(response2)) +class TestSecurityNonAsciiAva(): + def setup_class(self): + # This would be one way to initialize the security context : + # + # conf = config.SPConfig() + # conf.load_file("server_conf") + # conf.only_use_keys_in_metadata = False + # + # but instead, FakeConfig() is used to really only use the minimal + # set of parameters needed for these test cases. Other test cases + # (TestSecurityMetadata below) excersise the SPConfig() mechanism. + # + conf = FakeConfig() + self.sec = sigver.security_context(conf) + + self._assertion = factory( + saml.Assertion, + version="2.0", + id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), + }) + ) + + def test_verify_1(self): + with open(SIGNED) as fp: + xml_response = fp.read() + response = self.sec.correctly_signed_response(xml_response) + assert response + + def test_non_verify_1(self): + """ unsigned is OK """ + with open(UNSIGNED) as fp: + xml_response = fp.read() + response = self.sec.correctly_signed_response(xml_response) + assert response + + def test_sign_assertion(self): + ass = self._assertion + print(ass) + sign_ass = self.sec.sign_assertion("%s" % ass, node_id=ass.id) + #print(sign_ass) + sass = saml.assertion_from_string(sign_ass) + #print(sass) + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11111" + assert time_util.str_to_time(sass.issue_instant) + + print("Crypto version : %s" % (self.sec.crypto.version())) + + item = self.sec.check_signature(sass, class_name(sass), sign_ass) + + assert isinstance(item, saml.Assertion) + + def test_multiple_signatures_assertion(self): + ass = self._assertion + # basic test with two of the same + to_sign = [(ass, ass.id, ''), + (ass, ass.id, '') + ] + sign_ass = self.sec.multiple_signatures("%s" % ass, to_sign) + sass = saml.assertion_from_string(sign_ass) + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11111" + assert time_util.str_to_time(sass.issue_instant) + + print("Crypto version : %s" % (self.sec.crypto.version())) + + item = self.sec.check_signature(sass, class_name(sass), + sign_ass, must=True) + + assert isinstance(item, saml.Assertion) + + def test_multiple_signatures_response(self): + response = factory(samlp.Response, + assertion=self._assertion, + id="22222", + signature=sigver.pre_signature_part( + "22222", self.sec.my_cert)) + + # order is important, we can't validate if the signatures are made + # in the reverse order + to_sign = [(self._assertion, self._assertion.id, ''), + (response, response.id, '')] + + s_response = self.sec.multiple_signatures("%s" % response, to_sign) + assert s_response is not None + response = response_from_string(s_response) + + item = self.sec.check_signature(response, class_name(response), + s_response, must=True) + assert item == response + assert item.id == "22222" + + s_assertion = item.assertion[0] + assert isinstance(s_assertion, saml.Assertion) + # make sure the assertion was modified when we supposedly signed it + assert s_assertion != self._assertion + + ci = "".join(sigver.cert_from_instance(s_assertion)[0].split()) + assert ci == self.sec.my_cert + + res = self.sec.check_signature(s_assertion, class_name(s_assertion), + s_response, must=True) + assert res == s_assertion + assert s_assertion.id == "11111" + assert s_assertion.version == "2.0" + assert _eq(s_assertion.keyswv(), ['attribute_statement', + 'issue_instant', + 'version', 'signature', 'id']) + + def test_sign_response(self): + response = factory(samlp.Response, + assertion=self._assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) + + to_sign = [(class_name(self._assertion), self._assertion.id), + (class_name(response), response.id)] + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) + + assert s_response is not None + print(s_response) + response = response_from_string(s_response) + sass = response.assertion[0] + + print(sass) + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11111" + + item = self.sec.check_signature(response, class_name(response), + s_response) + assert isinstance(item, samlp.Response) + assert item.id == "22222" + + def test_sign_response_2(self): + assertion2 = factory(saml.Assertion, + version="2.0", + id="11122", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11122", + self.sec + .my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Räv", ""), + ("", "", "givenName"): ("Björn", ""), + }) + ) + response = factory(samlp.Response, + assertion=assertion2, + id="22233", + signature=sigver.pre_signature_part("22233", + self.sec + .my_cert)) + + to_sign = [(class_name(assertion2), assertion2.id), + (class_name(response), response.id)] + + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) + + assert s_response is not None + response2 = response_from_string(s_response) + + sass = response2.assertion[0] + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) + assert sass.version == "2.0" + assert sass.id == "11122" + + item = self.sec.check_signature(response2, class_name(response), + s_response) + + assert isinstance(item, samlp.Response) + + def test_sign_verify(self): + response = factory(samlp.Response, + assertion=self._assertion, + id="22233", + signature=sigver.pre_signature_part("22233", + self.sec + .my_cert)) + + to_sign = [(class_name(self._assertion), self._assertion.id), + (class_name(response), response.id)] + + s_response = sigver.signed_instance_factory(response, self.sec, + to_sign) + + print(s_response) + res = self.sec.verify_signature(s_response, + node_name=class_name(samlp.Response())) + + print(res) + assert res + + def test_sign_verify_with_cert_from_instance(self): + response = factory(samlp.Response, + assertion=self._assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) + + to_sign = [(class_name(self._assertion), self._assertion.id), + (class_name(response), response.id)] + + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) + + response2 = response_from_string(s_response) + + ci = "".join(sigver.cert_from_instance(response2)[0].split()) + + assert ci == self.sec.my_cert + + res = self.sec.verify_signature(s_response, + node_name=class_name(samlp.Response())) + + assert res + + res = self.sec._check_signature(s_response, response2, + class_name(response2), s_response) + assert res == response2 + + def test_sign_verify_assertion_with_cert_from_instance(self): + assertion = factory(saml.Assertion, + version="2.0", + id="11100", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11100", + self.sec + .my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Räv", ""), + ("", "", "givenName"): ("Björn", ""), + }) + ) + + to_sign = [(class_name(assertion), assertion.id)] + s_assertion = sigver.signed_instance_factory(assertion, self.sec, + to_sign) + print(s_assertion) + ass = assertion_from_string(s_assertion) + ci = "".join(sigver.cert_from_instance(ass)[0].split()) + assert ci == self.sec.my_cert + + res = self.sec.verify_signature(s_assertion, + node_name=class_name(ass)) + assert res + + res = self.sec._check_signature(s_assertion, ass, class_name(ass)) + + assert res + + def test_exception_sign_verify_with_cert_from_instance(self): + assertion = factory(saml.Assertion, + version="2.0", + id="11100", + issue_instant="2009-10-30T13:20:28Z", + #signature= sigver.pre_signature_part("11100", + # self.sec.my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), + }) + ) + + response = factory(samlp.Response, + assertion=assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) + + to_sign = [(class_name(response), response.id)] + + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) + + response2 = response_from_string(s_response) + # Change something that should make everything fail + response2.id = "23456" + raises(sigver.SignatureError, self.sec._check_signature, + s_response, response2, class_name(response2)) + class TestSecurityMetadata(): def setup_class(self): @@ -442,6 +737,27 @@ class TestSecurityMetadata(): ) +class TestSecurityMetadataNonAsciiAva(): + def setup_class(self): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("metadata_cert.xml")) + + conf.metadata = md + conf.only_use_keys_in_metadata = False + self.sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), }) + ) + + def test_xbox(): conf = config.SPConfig() conf.load_file("server_conf") @@ -495,6 +811,59 @@ def test_xbox(): print(assertions) +def test_xbox_non_ascii_ava(): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("idp_example.xml")) + + conf.metadata = md + conf.only_use_keys_in_metadata = False + sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", sec.my_cert, 1), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), }) + ) + + sigass = sec.sign_statement(assertion, class_name(assertion), + key_file=full_path("test.key"), + node_id=assertion.id) + + _ass0 = saml.assertion_from_string(sigass) + + encrypted_assertion = EncryptedAssertion() + encrypted_assertion.add_extension_element(_ass0) + + _, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False) + enctext = sec.crypto.encrypt( + str(encrypted_assertion), conf.cert_file, pre, "des-192", + '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]') + + decr_text = sec.decrypt(enctext) + _seass = saml.encrypted_assertion_from_string(decr_text) + assertions = [] + assers = extension_elements_to_elements(_seass.extension_elements, + [saml, samlp]) + + sign_cert_file = full_path("test.pem") + + for ass in assers: + _ass = "%s" % ass + #_ass = _ass.replace('xsi:nil="true" ', '') + #assert sigass == _ass + _txt = sec.verify_signature(_ass, sign_cert_file, + node_name=class_name(assertion)) + if _txt: + assertions.append(ass) + + print(assertions) + + def test_okta(): conf = config.Config() conf.load_file("server_conf") @@ -548,6 +917,35 @@ def test_xmlsec_err(): assert False +def test_xmlsec_err_non_ascii_ava(): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("idp_example.xml")) + + conf.metadata = md + conf.only_use_keys_in_metadata = False + sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", sec.my_cert, 1), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), }) + ) + + try: + sec.sign_statement(assertion, class_name(assertion), + key_file=full_path("tes.key"), + node_id=assertion.id) + except (XmlsecError, SigverError) as err: # should throw an exception + pass + else: + assert False + + def test_sha256_signing(): conf = config.SPConfig() conf.load_file("server_conf") @@ -574,6 +972,32 @@ def test_sha256_signing(): assert s +def test_sha256_signing_non_ascii_ava(): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("idp_example.xml")) + + conf.metadata = md + conf.only_use_keys_in_metadata = False + sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", sec.my_cert, 1, + sign_alg=SIG_RSA_SHA256), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), }) + ) + + s = sec.sign_statement(assertion, class_name(assertion), + key_file=full_path("test.key"), + node_id=assertion.id) + assert s + + def test_xmlsec_output_line_parsing(): output1 = "prefix\nOK\npostfix" assert sigver.parse_xmlsec_output(output1) diff --git a/tests/test_50_server.py b/tests/test_50_server.py index f0dcae3c..11ace47b 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -1185,7 +1185,1110 @@ class TestServer1(): idp.ident.close() assert request -#------------------------------------------------------------------------ +# ------------------------------------------------------------------------ + + +class TestServer1NonAsciiAva(): + + def setup_class(self): + self.server = Server("idp_conf") + + conf = config.SPConfig() + conf.load_file("server_conf") + self.client = client.Saml2Client(conf) + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") + self.ava = {"givenName": ["Dave"], "sn": ["Concepción"], + "mail": ["dave@cnr.mlb.com"], "title": "#13"} + + def teardown_class(self): + self.server.close() + + def verify_assertion(self, assertion): + assert assertion + assert assertion[0].attribute_statement + + ava = get_ava(assertion[0]) + + assert ava == \ + {"givenName": ["Dave"], "sn": [u"Concepción"], + "mail": ["dave@cnr.mlb.com"], "title": ["#13"]} + + + def verify_encrypted_assertion(self, assertion, decr_text): + self.verify_assertion(assertion) + assert assertion[0].signature is None + + assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \ + 'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text + + def verify_advice_assertion(self, resp, decr_text): + assert resp.assertion[0].signature is None + + assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements + + assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements, + [saml, samlp]) + self.verify_encrypted_assertion(assertion, decr_text) + + + def test_issuer(self): + issuer = self.server._issuer() + assert isinstance(issuer, saml.Issuer) + assert _eq(issuer.keyswv(), ["text", "format"]) + assert issuer.format == saml.NAMEID_FORMAT_ENTITY + assert issuer.text == self.server.config.entityid + + def test_assertion(self): + assertion = s_utils.assertion_factory( + subject=factory( + saml.Subject, text="_aaa", + name_id=factory(saml.NameID, + format=saml.NAMEID_FORMAT_TRANSIENT)), + attribute_statement=do_attribute_statement( + { + ("", "", "sn"): ("Jeter", ""), + ("", "", "givenName"): ("Derek", ""), + } + ), + issuer=self.server._issuer(), + ) + + assert _eq(assertion.keyswv(), ['attribute_statement', 'issuer', 'id', + 'subject', 'issue_instant', 'version']) + assert assertion.version == "2.0" + assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp" + # + assert assertion.attribute_statement + attribute_statement = assertion.attribute_statement + assert len(attribute_statement.attribute) == 2 + attr0 = attribute_statement.attribute[0] + attr1 = attribute_statement.attribute[1] + if attr0.attribute_value[0].text == "Derek": + assert attr0.friendly_name == "givenName" + assert attr1.friendly_name == "sn" + assert attr1.attribute_value[0].text == "Jeter" + else: + assert attr1.friendly_name == "givenName" + assert attr1.attribute_value[0].text == "Derek" + assert attr0.friendly_name == "sn" + assert attr0.attribute_value[0].text == "Jeter" + # + subject = assertion.subject + assert _eq(subject.keyswv(), ["text", "name_id"]) + assert subject.text == "_aaa" + assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT + + def test_response(self): + response = sigver.response_factory( + in_response_to="_012345", + destination="https:#www.example.com", + status=s_utils.success_status_factory(), + assertion=s_utils.assertion_factory( + subject=factory(saml.Subject, text="_aaa", + name_id=saml.NAMEID_FORMAT_TRANSIENT), + attribute_statement=do_attribute_statement( + { + ("", "", "sn"): ("Jeter", ""), + ("", "", "givenName"): ("Derek", ""), + } + ), + issuer=self.server._issuer(), + ), + issuer=self.server._issuer(), + ) + + print(response.keyswv()) + assert _eq(response.keyswv(), ['destination', 'assertion', 'status', + 'in_response_to', 'issue_instant', + 'version', 'issuer', 'id']) + assert response.version == "2.0" + assert response.issuer.text == "urn:mace:example.com:saml:roland:idp" + assert response.destination == "https:#www.example.com" + assert response.in_response_to == "_012345" + # + status = response.status + print(status) + assert status.status_code.value == samlp.STATUS_SUCCESS + + def test_parse_faulty_request(self): + req_id, authn_request = self.client.create_authn_request( + destination="http://www.example.com", id="id1") + + # should raise an error because faulty spentityid + binding = BINDING_HTTP_REDIRECT + htargs = self.client.apply_binding( + binding, "%s" % authn_request, "http://www.example.com", "abcd") + _dict = parse_qs(htargs["headers"][0][1].split('?')[1]) + print(_dict) + raises(OtherError, self.server.parse_authn_request, + _dict["SAMLRequest"][0], binding) + + def test_parse_faulty_request_to_err_status(self): + req_id, authn_request = self.client.create_authn_request( + destination="http://www.example.com") + + binding = BINDING_HTTP_REDIRECT + htargs = self.client.apply_binding(binding, "%s" % authn_request, + "http://www.example.com", "abcd") + _dict = parse_qs(htargs["headers"][0][1].split('?')[1]) + print(_dict) + + try: + self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) + status = None + except OtherError as oe: + print(oe.args) + status = s_utils.error_status_factory(oe) + + assert status + print(status) + assert _eq(status.keyswv(), ["status_code", "status_message"]) + assert status.status_message.text == 'Not destined for me!' + status_code = status.status_code + assert _eq(status_code.keyswv(), ["status_code", "value"]) + assert status_code.value == samlp.STATUS_RESPONDER + assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL + + def test_parse_ok_request(self): + req_id, authn_request = self.client.create_authn_request( + message_id="id1", destination="http://localhost:8088/sso") + + print(authn_request) + binding = BINDING_HTTP_REDIRECT + htargs = self.client.apply_binding(binding, "%s" % authn_request, + "http://www.example.com", "abcd") + _dict = parse_qs(htargs["headers"][0][1].split('?')[1]) + print(_dict) + + req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) + # returns a dictionary + print(req) + resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST]) + assert resp_args["destination"] == "http://lingon.catalogix.se:8087/" + assert resp_args["in_response_to"] == "id1" + name_id_policy = resp_args["name_id_policy"] + assert _eq(name_id_policy.keyswv(), ["format", "allow_create"]) + assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT + assert resp_args[ + "sp_entity_id"] == "urn:mace:example.com:saml:roland:sp" + + def test_sso_response_with_identity(self): + name_id = self.server.ident.transient_nameid( + "https://example.com/sp", "id12") + resp = self.server.create_authn_response( + { + "eduPersonEntitlement": "Short stop", + "sn": "Jeter", + "givenName": "Derek", + "mail": "derek.jeter@nyy.mlb.com", + "title": "The man" + }, + "id12", # in_response_to + "http://localhost:8087/", # destination + "https://example.com/sp", # sp_entity_id + name_id=name_id, + authn=AUTHN + ) + + print(resp.keyswv()) + assert _eq(resp.keyswv(), ['status', 'destination', 'assertion', + 'in_response_to', 'issue_instant', + 'version', 'id', 'issuer']) + assert resp.destination == "http://localhost:8087/" + assert resp.in_response_to == "id12" + assert resp.status + assert resp.status.status_code.value == samlp.STATUS_SUCCESS + assert resp.assertion + assertion = resp.assertion + print(assertion) + assert assertion.authn_statement + assert assertion.conditions + assert assertion.attribute_statement + attribute_statement = assertion.attribute_statement + print(attribute_statement) + assert len(attribute_statement[0].attribute) == 4 + # Pick out one attribute + attr = None + for attr in attribute_statement[0].attribute: + if attr.friendly_name == "givenName": + break + assert len(attr.attribute_value) == 1 + assert attr.name == "urn:mace:dir:attribute-def:givenName" + assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:basic" + value = attr.attribute_value[0] + assert value.text.strip() == "Derek" + assert value.get_type() == "xs:string" + assert assertion.subject + assert assertion.subject.name_id + assert assertion.subject.subject_confirmation + confirmation = assertion.subject.subject_confirmation[0] + print(confirmation.keyswv()) + print(confirmation.subject_confirmation_data) + assert confirmation.subject_confirmation_data.in_response_to == "id12" + + def test_sso_response_without_identity(self): + resp = self.server.create_authn_response( + {}, + "id12", # in_response_to + "http://localhost:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + userid="USER1", + authn=AUTHN, + release_policy=Policy(), + best_effort=True + ) + + print(resp.keyswv()) + assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to', + 'issue_instant', 'version', 'id', 'issuer', + 'assertion']) + assert resp.destination == "http://localhost:8087/" + assert resp.in_response_to == "id12" + assert resp.status + assert resp.status.status_code.value == samlp.STATUS_SUCCESS + assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" + assert not resp.assertion.attribute_statement + + def test_sso_response_specific_instant(self): + _authn = AUTHN.copy() + _authn["authn_instant"] = 1234567890 + + resp = self.server.create_authn_response( + {}, + "id12", # in_response_to + "http://localhost:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + userid="USER1", + authn=_authn, + best_effort=True + ) + + print(resp.keyswv()) + assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to', + 'issue_instant', 'version', 'id', 'issuer', + 'assertion']) + authn_statement = resp.assertion.authn_statement[0] + assert authn_statement.authn_instant == '2009-02-13T23:31:30Z' + + def test_sso_failure_response(self): + exc = s_utils.MissingValue("eduPersonAffiliation missing") + resp = self.server.create_error_response( + "id12", "http://localhost:8087/", exc) + + print(resp.keyswv()) + assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to', + 'issue_instant', 'version', 'id', 'issuer']) + assert resp.destination == "http://localhost:8087/" + assert resp.in_response_to == "id12" + assert resp.status + print(resp.status) + assert resp.status.status_code.value == samlp.STATUS_RESPONDER + assert resp.status.status_code.status_code.value == \ + samlp.STATUS_REQUEST_UNSUPPORTED + assert resp.status.status_message.text == \ + "eduPersonAffiliation missing" + assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" + assert not resp.assertion + + def test_authn_response_0(self): + conf = config.SPConfig() + conf.load_file("server_conf") + self.client = client.Saml2Client(conf) + + ava = {"givenName": ["Derek"], "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": "The man"} + + npolicy = samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT, + allow_create="true") + resp_str = "%s" % self.server.create_authn_response( + ava, "id1", "http://local:8087/", + "urn:mace:example.com:saml:roland:sp", npolicy, + "foba0001@example.com", authn=AUTHN) + + response = samlp.response_from_string(resp_str) + print(response.keyswv()) + assert _eq(response.keyswv(), ['status', 'destination', 'assertion', + 'in_response_to', 'issue_instant', + 'version', 'issuer', 'id']) + print(response.assertion[0].keyswv()) + assert len(response.assertion) == 1 + assert _eq(response.assertion[0].keyswv(), ['attribute_statement', + 'issue_instant', 'version', + 'subject', 'conditions', + 'id', 'issuer', + 'authn_statement']) + assertion = response.assertion[0] + assert len(assertion.attribute_statement) == 1 + astate = assertion.attribute_statement[0] + print(astate) + assert len(astate.attribute) == 4 + + def test_signed_response(self): + name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id12") + ava = {"givenName": ["Derek"], "sn": ["Jeter"], + "mail": ["derek@nyy.mlb.com"], "title": "The man"} + + signed_resp = self.server.create_authn_response( + ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=name_id, + sign_assertion=True + ) + + print(signed_resp) + assert signed_resp + + sresponse = response_from_string(signed_resp) + # It's the assertions that are signed not the response per se + assert len(sresponse.assertion) == 1 + assertion = sresponse.assertion[0] + + # Since the reponse is created dynamically I don't know the signature + # value. Just that there should be one + assert assertion.signature.signature_value.text != "" + + def test_signed_response_1(self): + + + signed_resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=True, + sign_assertion=True, + ) + + sresponse = response_from_string(signed_resp) + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response', + node_id=sresponse.id, + id_attr="") + assert valid + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + node_id=sresponse.assertion[0].id, + id_attr="") + assert valid + + self.verify_assertion(sresponse.assertion) + + def test_signed_response_2(self): + signed_resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=True, + sign_assertion=False, + ) + + sresponse = response_from_string(signed_resp) + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response', + node_id=sresponse.id, + id_attr="") + assert valid + + assert sresponse.assertion[0].signature == None + + def test_signed_response_3(self): + + + signed_resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=True, + ) + + sresponse = response_from_string(signed_resp) + + assert sresponse.signature == None + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + node_id=sresponse.assertion[0].id, + id_attr="") + assert valid + + self.verify_assertion(sresponse.assertion) + + def test_encrypted_signed_response_1(self): + + cert_str, cert_key_str = generate_cert() + + signed_resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=True, + sign_assertion=True, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice=cert_str, + ) + + sresponse = response_from_string(signed_resp) + + valid = self.server.sec.verify_signature( + signed_resp, self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response', + node_id=sresponse.id, id_attr="") + + assert valid + + valid = self.server.sec.verify_signature( + signed_resp, self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + node_id=sresponse.assertion[0].id, id_attr="") + + assert valid + + _, key_file = make_temp(cert_key_str, decode=False) + + decr_text = self.server.sec.decrypt(signed_resp, key_file) + + resp = samlp.response_from_string(decr_text) + + assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements + + assertion = extension_elements_to_elements( + resp.assertion[0].advice.encrypted_assertion[0].extension_elements, + [saml, samlp]) + + self.verify_assertion(assertion) + + + + #PEFIM never signs assertions. + assert assertion[0].signature is None + #valid = self.server.sec.verify_signature(decr_text, + # self.server.config.cert_file, + # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + # node_id=assertion[0].id, + # id_attr="") + assert valid + + def test_encrypted_signed_response_2(self): + cert_str, cert_key_str = generate_cert() + + signed_resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=True, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + ) + + sresponse = response_from_string(signed_resp) + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response', + node_id=sresponse.id, + id_attr="") + assert valid + + decr_text_old = copy.deepcopy("%s" % signed_resp) + + decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[0]["key_file"]) + + assert decr_text == decr_text_old + + decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"]) + + assert decr_text != decr_text_old + + resp = samlp.response_from_string(decr_text) + + resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + + assert resp.assertion[0].signature == None + + self.verify_assertion(resp.assertion) + + + def test_encrypted_signed_response_3(self): + cert_str, cert_key_str = generate_cert() + + signed_resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=False, + encrypt_cert_assertion=cert_str, + ) + + sresponse = response_from_string(signed_resp) + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response', + node_id=sresponse.id, + id_attr="") + assert valid + + _, key_file = make_temp(cert_key_str, decode=False) + + decr_text = self.server.sec.decrypt(signed_resp, key_file) + + resp = samlp.response_from_string(decr_text) + + resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + + valid = self.server.sec.verify_signature(decr_text, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + node_id=resp.assertion[0].id, + id_attr="") + + assert valid + + self.verify_assertion(resp.assertion) + + assert 'xmlns:encas' not in decr_text + + + def test_encrypted_signed_response_4(self): + + cert_str, cert_key_str = generate_cert() + + signed_resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice=cert_str, + ) + + sresponse = response_from_string(signed_resp) + + valid = self.server.sec.verify_signature(signed_resp, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response', + node_id=sresponse.id, + id_attr="") + assert valid + + decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"]) + + resp = samlp.response_from_string(decr_text) + + resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + + valid = self.server.sec.verify_signature(decr_text, + self.server.config.cert_file, + node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + node_id=resp.assertion[0].id, + id_attr="") + + assert valid + + _, key_file = make_temp(cert_key_str, decode=False) + + decr_text = self.server.sec.decrypt(decr_text, key_file) + + resp = samlp.response_from_string(decr_text) + + assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + assertion = \ + extension_elements_to_elements(assertion[0].advice.encrypted_assertion[0].extension_elements,[saml, samlp]) + self.verify_assertion(assertion) + + #PEFIM never signs assertion in advice + assert assertion[0].signature is None + #valid = self.server.sec.verify_signature(decr_text, + # self.server.config.cert_file, + # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion', + # node_id=assertion[0].id, + # id_attr="") + assert valid + + def test_encrypted_response_1(self): + cert_str_advice, cert_key_str_advice = generate_cert() + + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice=cert_str_advice, + ) + + _resp = "%s" % _resp + + sresponse = response_from_string(_resp) + + assert sresponse.signature is None + + _, key_file = make_temp(cert_key_str_advice, decode=False) + + decr_text = self.server.sec.decrypt(_resp, key_file) + + resp = samlp.response_from_string(decr_text) + + self.verify_advice_assertion(resp, decr_text) + + def test_encrypted_response_2(self): + + cert_str_advice, cert_key_str_advice = generate_cert() + + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice=cert_str_advice, + ) + + sresponse = response_from_string(_resp) + + assert sresponse.signature is None + + decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) + + _, key_file = make_temp(cert_key_str_advice, decode=False) + + decr_text_2 = self.server.sec.decrypt(decr_text_1, key_file) + + resp = samlp.response_from_string(decr_text_2) + + resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + + self.verify_advice_assertion(resp, decr_text_2) + + def test_encrypted_response_3(self): + cert_str_assertion, cert_key_str_assertion = generate_cert() + + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=False, + encrypt_cert_assertion=cert_str_assertion + ) + + sresponse = response_from_string(_resp) + + assert sresponse.signature is None + + _, key_file = make_temp(cert_key_str_assertion, decode=False) + + decr_text = self.server.sec.decrypt(_resp, key_file) + + resp = samlp.response_from_string(decr_text) + + assert resp.encrypted_assertion[0].extension_elements + + assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + + self.verify_encrypted_assertion(assertion, decr_text) + + def test_encrypted_response_4(self): + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=False, + ) + + sresponse = response_from_string(_resp) + + assert sresponse.signature is None + + decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) + + resp = samlp.response_from_string(decr_text) + + assert resp.encrypted_assertion[0].extension_elements + + assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + + self.verify_encrypted_assertion(assertion, decr_text) + + def test_encrypted_response_5(self): + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True + ) + + _resp = "%s" % _resp + + sresponse = response_from_string(_resp) + + assert sresponse.signature is None + + decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) + + resp = samlp.response_from_string(decr_text) + + self.verify_advice_assertion(resp, decr_text) + + def test_encrypted_response_6(self): + _server = Server("idp_conf_verify_cert") + + cert_str_advice, cert_key_str_advice = generate_cert() + + cert_str_assertion, cert_key_str_assertion = generate_cert() + + _resp = _server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice=cert_str_advice, + encrypt_cert_assertion=cert_str_assertion + ) + + sresponse = response_from_string(_resp) + + assert sresponse.signature is None + + _, key_file = make_temp(cert_key_str_assertion, decode=False) + + decr_text_1 = _server.sec.decrypt(_resp, key_file) + + _, key_file = make_temp(cert_key_str_advice, decode=False) + + decr_text_2 = _server.sec.decrypt(decr_text_1, key_file) + + resp = samlp.response_from_string(decr_text_2) + + resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + + self.verify_advice_assertion(resp, decr_text_2) + + def test_encrypted_response_7(self): + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True + ) + + sresponse = response_from_string(_resp) + + assert sresponse.signature is None + + decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"]) + + decr_text_2 = self.server.sec.decrypt(decr_text_1, self.client.config.encryption_keypairs[1]["key_file"]) + + resp = samlp.response_from_string(decr_text_2) + + resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp]) + + self.verify_advice_assertion(resp, decr_text_2) + + def test_encrypted_response_8(self): + try: + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice="whatever", + encrypt_cert_assertion="whatever" + ) + assert False, "Must throw an exception" + except EncryptError as ex: + pass + except Exception as ex: + assert False, "Wrong exception!" + + try: + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice="whatever", + ) + assert False, "Must throw an exception" + except EncryptError as ex: + pass + except Exception as ex: + assert False, "Wrong exception!" + + try: + _resp = self.server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=False, + encrypt_cert_assertion="whatever" + ) + assert False, "Must throw an exception" + except EncryptError as ex: + pass + except Exception as ex: + assert False, "Wrong exception!" + + _server = Server("idp_conf_verify_cert") + + try: + _resp = _server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice="whatever", + encrypt_cert_assertion="whatever" + ) + assert False, "Must throw an exception" + except CertificateError as ex: + pass + except Exception as ex: + assert False, "Wrong exception!" + + try: + _resp = _server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice="whatever", + ) + assert False, "Must throw an exception" + except CertificateError as ex: + pass + except Exception as ex: + assert False, "Wrong exception!" + + try: + _resp = _server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=False, + encrypt_cert_assertion="whatever" + ) + assert False, "Must throw an exception" + except CertificateError as ex: + pass + except Exception as ex: + assert False, "Wrong exception!" + + def test_encrypted_response_9(self): + _server = Server("idp_conf_sp_no_encrypt") + + _resp = _server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + ) + + self.verify_assertion(_resp.assertion.advice.assertion) + + _resp = _server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True + ) + + self.verify_assertion(_resp.assertion.advice.assertion) + + _resp = _server.create_authn_response( + self.ava, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + name_id=self.name_id, + sign_response=False, + sign_assertion=False, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=False, + ) + + self.verify_assertion([_resp.assertion]) + + + def test_slo_http_post(self): + soon = time_util.in_a_while(days=1) + sinfo = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": soon, + "user": { + "givenName": "Leo", + "sn": "Laport", + } + } + self.client.users.add_information_about_person(sinfo) + + req_id, logout_request = self.client.create_logout_request( + destination="http://localhost:8088/slop", name_id=nid, + issuer_entity_id="urn:mace:example.com:saml:roland:idp", + reason="I'm tired of this") + + intermed = base64.b64encode(str(logout_request).encode('utf-8')) + + #saml_soap = make_soap_enveloped_saml_thingy(logout_request) + request = self.server.parse_logout_request(intermed, BINDING_HTTP_POST) + assert request + + def test_slo_soap(self): + soon = time_util.in_a_while(days=1) + sinfo = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": soon, + "user": { + "givenName": "Leo", + "sn": "Laport", + } + } + + sp = client.Saml2Client(config_file="server_conf") + sp.users.add_information_about_person(sinfo) + + req_id, logout_request = sp.create_logout_request( + name_id=nid, destination="http://localhost:8088/slo", + issuer_entity_id="urn:mace:example.com:saml:roland:idp", + reason="I'm tired of this") + + #_ = s_utils.deflate_and_base64_encode("%s" % (logout_request,)) + + saml_soap = make_soap_enveloped_saml_thingy(logout_request) + self.server.ident.close() + + with closing(Server("idp_soap_conf")) as idp: + request = idp.parse_logout_request(saml_soap) + idp.ident.close() + assert request + +# ------------------------------------------------------------------------ + IDENTITY = {"eduPersonAffiliation": ["staff", "member"], "sn": ["Jeter"], "givenName": ["Derek"], diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 8571a36a..78adeac5 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -78,6 +78,11 @@ def generate_cert(): def add_subelement(xmldoc, node_name, subelem): + if six.PY2: + _str = unicode + else: + _str = str + s = xmldoc.find(node_name) if s > 0: x = xmldoc.rindex("<", 0, s) @@ -88,7 +93,7 @@ def add_subelement(xmldoc, node_name, subelem): spaces += " " c += 1 # Sometimes we get an xml header, sometimes we don't. - subelem_str = str(subelem) + subelem_str = _str(subelem) if subelem_str[0:5].lower() == '<?xml': subelem_str = subelem_str.split("\n", 1)[1] xmldoc = xmldoc.replace( @@ -1483,6 +1488,1351 @@ class TestClient: assert b'<ns0:SessionIndex>_foo</ns0:SessionIndex>' in res.xmlstr +class TestClientNonAsciiAva: + def setup_class(self): + self.server = Server("idp_conf") + + conf = config.SPConfig() + conf.load_file("server_conf") + self.client = Saml2Client(conf) + + def teardown_class(self): + self.server.close() + + def test_create_attribute_query1(self): + req_id, req = self.client.create_attribute_query( + "https://idp.example.com/idp/", + "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", + format=saml.NAMEID_FORMAT_PERSISTENT, + message_id="id1") + reqstr = "%s" % req.to_string().decode() + + assert req.destination == "https://idp.example.com/idp/" + assert req.id == "id1" + assert req.version == "2.0" + subject = req.subject + name_id = subject.name_id + assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT + assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD" + issuer = req.issuer + assert issuer.text == "urn:mace:example.com:saml:roland:sp" + + attrq = samlp.attribute_query_from_string(reqstr) + + assert _leq(attrq.keyswv(), ['destination', 'subject', 'issue_instant', + 'version', 'id', 'issuer']) + + assert attrq.destination == req.destination + assert attrq.id == req.id + assert attrq.version == req.version + assert attrq.issuer.text == issuer.text + assert attrq.issue_instant == req.issue_instant + assert attrq.subject.name_id.format == name_id.format + assert attrq.subject.name_id.text == name_id.text + + def test_create_attribute_query2(self): + req_id, req = self.client.create_attribute_query( + "https://idp.example.com/idp/", + "E8042FB4-4D5B-48C3-8E14-8EDD852790DD", + attribute={ + ("urn:oid:2.5.4.42", + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + "givenName"): None, + ("urn:oid:2.5.4.4", + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + "surname"): None, + ("urn:oid:1.2.840.113549.1.9.1", + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None, + }, + format=saml.NAMEID_FORMAT_PERSISTENT, + message_id="id1") + + assert req.destination == "https://idp.example.com/idp/" + assert req.id == "id1" + assert req.version == "2.0" + subject = req.subject + name_id = subject.name_id + assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT + assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD" + assert len(req.attribute) == 3 + # one is givenName + seen = [] + for attribute in req.attribute: + if attribute.name == "urn:oid:2.5.4.42": + assert attribute.name_format == saml.NAME_FORMAT_URI + assert attribute.friendly_name == "givenName" + seen.append("givenName") + elif attribute.name == "urn:oid:2.5.4.4": + assert attribute.name_format == saml.NAME_FORMAT_URI + assert attribute.friendly_name == "surname" + seen.append("surname") + elif attribute.name == "urn:oid:1.2.840.113549.1.9.1": + assert attribute.name_format == saml.NAME_FORMAT_URI + if getattr(attribute, "friendly_name"): + assert False + seen.append("email") + assert _leq(seen, ["givenName", "surname", "email"]) + + def test_create_attribute_query_3(self): + req_id, req = self.client.create_attribute_query( + "https://aai-demo-idp.switch.ch/idp/shibboleth", + "_e7b68a04488f715cda642fbdd90099f5", + format=saml.NAMEID_FORMAT_TRANSIENT, + message_id="id1") + + assert isinstance(req, samlp.AttributeQuery) + assert req.destination == "https://aai-demo-idp.switch" \ + ".ch/idp/shibboleth" + assert req.id == "id1" + assert req.version == "2.0" + assert req.issue_instant + assert req.issuer.text == "urn:mace:example.com:saml:roland:sp" + nameid = req.subject.name_id + assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT + assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5" + + def test_create_auth_request_0(self): + ar_str = "%s" % self.client.create_authn_request( + "http://www.example.com/sso", message_id="id1")[1] + + ar = samlp.authn_request_from_string(ar_str) + assert ar.assertion_consumer_service_url == ("http://lingon.catalogix" + ".se:8087/") + assert ar.destination == "http://www.example.com/sso" + assert ar.protocol_binding == BINDING_HTTP_POST + assert ar.version == "2.0" + assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" + assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" + nid_policy = ar.name_id_policy + assert nid_policy.allow_create == "false" + assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT + + node_requested_attributes = None + for e in ar.extensions.extension_elements: + if e.tag == RequestedAttributes.c_tag: + node_requested_attributes = e + break + assert node_requested_attributes is not None + + for c in node_requested_attributes.children: + assert c.tag == RequestedAttribute.c_tag + assert c.attributes['isRequired'] in ['true', 'false'] + assert c.attributes['Name'] + assert c.attributes['FriendlyName'] + assert c.attributes['NameFormat'] + + def test_create_auth_request_unset_force_authn(self): + req_id, req = self.client.create_authn_request( + "http://www.example.com/sso", sign=False, message_id="id1") + assert bool(req.force_authn) == False + + def test_create_auth_request_set_force_authn(self): + req_id, req = self.client.create_authn_request( + "http://www.example.com/sso", sign=False, message_id="id1", + force_authn="true") + assert bool(req.force_authn) == True + + def test_create_auth_request_nameid_policy_allow_create(self): + conf = config.SPConfig() + conf.load_file("sp_conf_nameidpolicy") + client = Saml2Client(conf) + ar_str = "%s" % client.create_authn_request( + "http://www.example.com/sso", message_id="id1")[1] + + ar = samlp.authn_request_from_string(ar_str) + assert ar.assertion_consumer_service_url == ("http://lingon.catalogix" + ".se:8087/") + assert ar.destination == "http://www.example.com/sso" + assert ar.protocol_binding == BINDING_HTTP_POST + assert ar.version == "2.0" + assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" + assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" + nid_policy = ar.name_id_policy + assert nid_policy.allow_create == "true" + assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT + + def test_create_auth_request_vo(self): + assert list(self.client.config.vorg.keys()) == [ + "urn:mace:example.com:it:tek"] + + ar_str = "%s" % self.client.create_authn_request( + "http://www.example.com/sso", + "urn:mace:example.com:it:tek", # vo + nameid_format=NAMEID_FORMAT_PERSISTENT, + message_id="666")[1] + + ar = samlp.authn_request_from_string(ar_str) + assert ar.id == "666" + assert ar.assertion_consumer_service_url == "http://lingon.catalogix" \ + ".se:8087/" + assert ar.destination == "http://www.example.com/sso" + assert ar.protocol_binding == BINDING_HTTP_POST + assert ar.version == "2.0" + assert ar.provider_name == "urn:mace:example.com:saml:roland:sp" + assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp" + nid_policy = ar.name_id_policy + assert nid_policy.allow_create == "false" + assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT + assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek" + + def test_sign_auth_request_0(self): + req_id, areq = self.client.create_authn_request( + "http://www.example.com/sso", sign=True, message_id="id1") + + ar_str = "%s" % areq + ar = samlp.authn_request_from_string(ar_str) + + assert ar + assert ar.signature + assert ar.signature.signature_value + signed_info = ar.signature.signed_info + assert len(signed_info.reference) == 1 + assert signed_info.reference[0].uri == "#id1" + assert signed_info.reference[0].digest_value + try: + assert self.client.sec.correctly_signed_authn_request( + ar_str, self.client.config.xmlsec_binary, + self.client.config.metadata) + except Exception: # missing certificate + self.client.sec.verify_signature(ar_str, node_name=class_name(ar)) + + def test_create_logout_request(self): + req_id, req = self.client.create_logout_request( + "http://localhost:8088/slo", "urn:mace:example.com:saml:roland:idp", + name_id=nid, reason="Tired", expire=in_a_while(minutes=15), + session_indexes=["_foo"]) + + assert req.destination == "http://localhost:8088/slo" + assert req.reason == "Tired" + assert req.version == "2.0" + assert req.name_id == nid + assert req.issuer.text == "urn:mace:example.com:saml:roland:sp" + assert req.session_index == [SessionIndex("_foo")] + + def test_response_1(self): + IDP = "urn:mace:example.com:saml:roland:idp" + + ava = {"givenName": ["Dave"], "sn": ["Concepción"], + "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id_policy=nameid_policy, + sign_response=True, + userid="foba0001@example.com", + authn=AUTHN) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode('utf-8')) + + authn_response = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + assert authn_response is not None + assert authn_response.issuer() == IDP + assert authn_response.response.assertion[0].issuer.text == IDP + session_info = authn_response.session_info() + + assert session_info["ava"] == {"givenName": ["Dave"], "sn": [u"Concepción"], + "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} + assert session_info["issuer"] == IDP + assert session_info["came_from"] == "http://foo.example.com/service" + response = samlp.response_from_string(authn_response.xmlstr) + assert response.destination == "http://lingon.catalogix.se:8087/" + assert "session_index" in session_info + + # One person in the cache + assert len(self.client.users.subjects()) == 1 + subject_id = self.client.users.subjects()[0] + # The information I have about the subject comes from one source + assert self.client.users.issuers_of_info(subject_id) == [IDP] + + # --- authenticate another person + + ava = {"givenName": ["Alfonson"], "sn": ["Soriano"], + "mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]} + + resp_str = "%s" % self.server.create_authn_response( + identity=ava, + in_response_to="id2", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + sign_response=True, + name_id_policy=nameid_policy, + userid="also0001@example.com", + authn=AUTHN) + + resp_str = encode_fn(resp_str.encode()) + + self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id2": "http://foo.example.com/service"}) + + # Two persons in the cache + assert len(self.client.users.subjects()) == 2 + issuers = [self.client.users.issuers_of_info(s) for s in + self.client.users.subjects()] + # The information I have about the subjects comes from the same source + assert issuers == [[IDP], [IDP]] + + def test_response_2(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + cert_str, cert_key_str = generate_cert() + + cert = \ + { + "cert": cert_str, + "key": cert_key_str + } + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_advice=cert_str + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_3(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=False, + encrypt_assertion_self_contained=True, + pefim=True, + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_4(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_5(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_str, cert_key_str = generate_cert() + + cert = \ + { + "cert": cert_str, + "key": cert_key_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_assertion=cert_str + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_6(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_assertion_str, cert_key_assertion_str = generate_cert() + + cert_assertion = \ + { + "cert": cert_assertion_str, + "key": cert_key_assertion_str + } + + cert_advice_str, cert_key_advice_str = generate_cert() + + cert_advice = \ + { + "cert": cert_advice_str, + "key": cert_key_advice_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + pefim=True, + encrypt_cert_assertion=cert_assertion_str, + encrypt_cert_advice=cert_advice_str + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, + {"id1": [cert_assertion, cert_advice]}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_7(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=True, + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_8(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_str, cert_key_str = generate_cert() + + cert = \ + { + "cert": cert_str, + "key": cert_key_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypt_cert_assertion=cert_str + ) + + resp_str = "%s" % resp + + resp_str = encode_fn(resp_str.encode()) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_no_name_id(self): + """ Test that the SP client can parse an authentication response + from an IdP that does not contain a <NameID> element.""" + + if six.PY2: + _bytes = str + else: + _bytes = bytes + + conf = config.SPConfig() + conf.load_file("server_conf") + client = Saml2Client(conf) + + # Use the same approach as the other tests for mocking up + # an authentication response to parse. + idp, ava, ava_verify, nameid_policy = ( + self.setup_verify_authn_response() + ) + + # Mock up an authentication response but do not encrypt it + # nor sign it since below we will modify it directly. Note that + # setting name_id to None still results in a response that includes + # a <NameID> element. + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + name_id=None, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=False, + sign_assertion=False, + encrypt_assertion=False, + encrypt_assertion_self_contained=False + ) + + # The create_authn_response method above will return an instance + # of saml2.samlp.Response when neither encrypting nor signing and + # so we can remove the <NameID> element directly. + resp.assertion.subject.name_id = None + + # Assert that the response does not contain a NameID element so that + # the parsing below is a fair test. + assert str(resp).find("NameID") == -1 + + # Cast the response to a string and encode it to mock up the payload + # the SP client is expected to receive via HTTP POST binding. + if six.PY2: + resp_str = encode_fn(str(resp)) + else: + resp_str = encode_fn(bytes(str(resp), 'utf-8')) + + + # We do not need the client to verify a signature for this test. + client.want_assertions_signed = False + client.want_response_signed = False + + # Parse the authentication response that does not include a <NameID>. + authn_response = client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + # A successful test is parsing the response. + assert authn_response is not None + + def setup_verify_authn_response(self): + idp = "urn:mace:example.com:saml:roland:idp" + ava = {"givenName": ["Dave"], "sn": ["Concepción"], + "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} + ava_verify = {"givenName": ["Dave"], "sn": [u"Concepción"], + "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]} + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + return idp, ava, ava_verify, nameid_policy + + def verify_authn_response(self, idp, authn_response, _client, ava_verify): + assert authn_response is not None + assert authn_response.issuer() == idp + assert authn_response.assertion.issuer.text == idp + session_info = authn_response.session_info() + + assert session_info["ava"] == ava_verify + assert session_info["issuer"] == idp + assert session_info["came_from"] == "http://foo.example.com/service" + response = samlp.response_from_string(authn_response.xmlstr) + assert response.destination == "http://lingon.catalogix.se:8087/" + + # One person in the cache + assert len(_client.users.subjects()) == 1 + subject_id = _client.users.subjects()[0] + # The information I have about the subject comes from one source + assert _client.users.issuers_of_info(subject_id) == [idp] + + def test_init_values(self): + entityid = self.client.config.entityid + assert entityid == "urn:mace:example.com:saml:roland:sp" + location = self.client._sso_location() + assert location == 'http://localhost:8088/sso' + my_name = self.client._my_name() + assert my_name == "urn:mace:example.com:saml:roland:sp" + + def test_sign_then_encrypt_assertion(self): + # Begin with the IdPs side + _sec = self.server.sec + + assertion = s_utils.assertion_factory( + subject=factory(saml.Subject, text="_aaa", + name_id=factory( + saml.NameID, + format=saml.NAMEID_FORMAT_TRANSIENT)), + attribute_statement=do_attribute_statement( + { + ("", "", "sn"): ("Jeter", ""), + ("", "", "givenName"): ("Derek", ""), + } + ), + issuer=self.server._issuer(), + ) + + assertion.signature = sigver.pre_signature_part( + assertion.id, _sec.my_cert, 1) + + sigass = _sec.sign_statement(assertion, class_name(assertion), + key_file=full_path("test.key"), + node_id=assertion.id) + # Create an Assertion instance from the signed assertion + _ass = saml.assertion_from_string(sigass) + + response = sigver.response_factory( + in_response_to="_012345", + destination="https:#www.example.com", + status=s_utils.success_status_factory(), + issuer=self.server._issuer(), + assertion=_ass + ) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 0]["cert_file"], + pre_encryption_part()) + + seresp = samlp.response_from_string(enctext) + + # Now over to the client side + _csec = self.client.sec + if seresp.encrypted_assertion: + decr_text = _csec.decrypt(enctext) + seresp = samlp.response_from_string(decr_text) + resp_ass = [] + + sign_cert_file = full_path("test.pem") + for enc_ass in seresp.encrypted_assertion: + assers = extension_elements_to_elements( + enc_ass.extension_elements, [saml, samlp]) + for ass in assers: + if ass.signature: + if not _csec.verify_signature("%s" % ass, + sign_cert_file, + node_name=class_name( + ass)): + continue + resp_ass.append(ass) + + seresp.assertion = resp_ass + seresp.encrypted_assertion = None + + assert seresp.assertion + + def test_sign_then_encrypt_assertion2(self): + # Begin with the IdPs side + _sec = self.server.sec + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + asser = Assertion({"givenName": "Dave", "sn": "Concepción"}) + farg = add_path( + {}, + ['assertion', 'subject', 'subject_confirmation', 'method', + saml.SCM_BEARER]) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'in_response_to', + '_012345']) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'recipient', + "http://lingon.catalogix.se:8087/"]) + + assertion = asser.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + name_id=factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + farg=farg['assertion'] + ) + + assertion.signature = sigver.pre_signature_part( + assertion.id, _sec.my_cert, 1) + + sigass = _sec.sign_statement(assertion, class_name(assertion), + key_file=self.client.sec.key_file, + node_id=assertion.id) + + sigass = rm_xmltag(sigass) + response = sigver.response_factory( + in_response_to="_012345", + destination="http://lingon.catalogix.se:8087/", + status=s_utils.success_status_factory(), + issuer=self.server._issuer(), + encrypted_assertion=EncryptedAssertion() + ) + + xmldoc = "%s" % response + # strangely enough I get different tags if I run this test separately + # or as part of a bunch of tests. + xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass) + + enctext = _sec.crypto.encrypt_assertion(xmldoc, + self.client.sec.encryption_keypairs[ + 1]["cert_file"], + pre_encryption_part()) + + # seresp = samlp.response_from_string(enctext) + + resp_str = encode_fn(enctext.encode()) + # Now over to the client side + # Explicitely allow unsigned responses for this and the following 2 tests + self.client.want_response_signed = False + resp = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"_012345": "http://foo.example.com/service"}) + + # assert resp.encrypted_assertion == [] + assert resp.assertion + assert resp.ava == {"sn": [u"Concepción"], "givenName": ["Dave"]} + + def test_sign_then_encrypt_assertion_advice_1(self): + # Begin with the IdPs side + _sec = self.server.sec + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + asser = Assertion({"givenName": "Dave", "sn": "Concepción"}) + + subject_confirmation_specs = { + 'recipient': "http://lingon.catalogix.se:8087/", + 'in_response_to': "_012345", + 'subject_confirmation_method': saml.SCM_BEARER + } + name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT) + + farg = add_path( + {}, + ['assertion', 'subject', 'subject_confirmation', 'method', + saml.SCM_BEARER]) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'in_response_to', + '_012345']) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'recipient', + "http://lingon.catalogix.se:8087/"]) + + assertion = asser.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + name_id=name_id, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + farg=farg['assertion']) + + a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"}) + a_assertion = a_asser.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_assertion.signature = sigver.pre_signature_part( + a_assertion.id, _sec.my_cert, 1) + + assertion.advice = Advice() + + assertion.advice.encrypted_assertion = [] + assertion.advice.encrypted_assertion.append(EncryptedAssertion()) + + assertion.advice.encrypted_assertion[0].add_extension_element( + a_assertion) + + response = sigver.response_factory( + in_response_to="_012345", + destination="http://lingon.catalogix.se:8087/", + status=s_utils.success_status_factory(), + issuer=self.server._issuer() + ) + + response.assertion.append(assertion) + + response = _sec.sign_statement("%s" % response, class_name(a_assertion), + key_file=self.client.sec.key_file, + node_id=a_assertion.id) + + # xmldoc = "%s" % response + # strangely enough I get different tags if I run this test separately + # or as part of a bunch of tests. + # xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 0]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + + # seresp = samlp.response_from_string(enctext) + + if six.PY2: + resp_str = encode_fn(enctext.encode('utf-8')) + else: + resp_str = encode_fn(bytes(enctext, 'utf-8')) + + # Now over to the client side + resp = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"_012345": "http://foo.example.com/service"}) + + # assert resp.encrypted_assertion == [] + assert resp.assertion + assert resp.assertion.advice + assert resp.assertion.advice.assertion + assert resp.ava == \ + {'givenName': ['Dave'], 'sn': [u'Concepción'], 'uid': ['test01'], + 'email': ['test.testsson@test.se']} + + def test_sign_then_encrypt_assertion_advice_2(self): + # Begin with the IdPs side + _sec = self.server.sec + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + asser_1 = Assertion({"givenName": "Dave"}) + + farg = add_path( + {}, + ['assertion', 'subject', 'subject_confirmation', 'method', + saml.SCM_BEARER]) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'in_response_to', + '_012345']) + add_path( + farg['assertion']['subject']['subject_confirmation'], + ['subject_confirmation_data', 'recipient', + "http://lingon.catalogix.se:8087/"]) + name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT) + + assertion_1 = asser_1.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + asser_2 = Assertion({"sn": "Concepción"}) + + assertion_2 = asser_2.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_asser_1 = Assertion({"uid": "test01"}) + a_assertion_1 = a_asser_1.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_asser_2 = Assertion({"email": "test.testsson@test.se"}) + a_assertion_2 = a_asser_2.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_asser_3 = Assertion({"street": "street"}) + a_assertion_3 = a_asser_3.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_asser_4 = Assertion({"title": "title"}) + a_assertion_4 = a_asser_4.construct( + self.client.config.entityid, + self.server.config.attribute_converters, + self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login", + name_id=name_id, + farg=farg['assertion']) + + a_assertion_1.signature = sigver.pre_signature_part( + a_assertion_1.id, _sec.my_cert, 1) + + a_assertion_2.signature = sigver.pre_signature_part( + a_assertion_2.id, _sec.my_cert, 1) + + a_assertion_3.signature = sigver.pre_signature_part( + a_assertion_3.id, _sec.my_cert, 1) + + a_assertion_4.signature = sigver.pre_signature_part( + a_assertion_4.id, _sec.my_cert, 1) + + assertion_1.signature = sigver.pre_signature_part(assertion_1.id, + _sec.my_cert, 1) + + assertion_2.signature = sigver.pre_signature_part(assertion_2.id, + _sec.my_cert, 1) + + response = sigver.response_factory( + in_response_to="_012345", + destination="http://lingon.catalogix.se:8087/", + status=s_utils.success_status_factory(), + issuer=self.server._issuer() + ) + + response.assertion = assertion_1 + + response.assertion.advice = Advice() + + response.assertion.advice.encrypted_assertion = [] + response.assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + + response.assertion.advice.encrypted_assertion[0].add_extension_element( + a_assertion_1) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_1._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, + class_name(a_assertion_1), + key_file=self.server.sec.key_file, + node_id=a_assertion_1.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 1]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response.assertion = response.assertion[0] + + response.assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + response.assertion.advice.encrypted_assertion[1].add_extension_element( + a_assertion_2) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_2._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, + class_name(a_assertion_2), + key_file=self.server.sec.key_file, + node_id=a_assertion_2.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 0]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response.assertion = response.assertion[0] + + assertion_tag = response.assertion._to_element_tree().tag + response = pre_encrypt_assertion(response) + response = \ + response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion( + assertion_tag) + + response = _sec.sign_statement("%s" % response, class_name(assertion_1), + key_file=self.server.sec.key_file, + node_id=assertion_1.id) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 1]["cert_file"], + pre_encryption_part()) + + response = samlp.response_from_string(enctext) + + response.assertion = assertion_2 + + response.assertion.advice = Advice() + + response.assertion.advice.encrypted_assertion = [] + response.assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + + response.assertion.advice.encrypted_assertion[0].add_extension_element( + a_assertion_3) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_3._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, + class_name(a_assertion_3), + key_file=self.server.sec.key_file, + node_id=a_assertion_3.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 0]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response.assertion = response.assertion[0] + + response.assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + + response.assertion.advice.encrypted_assertion[1].add_extension_element( + a_assertion_4) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_4._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, + class_name(a_assertion_4), + key_file=self.server.sec.key_file, + node_id=a_assertion_4.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, + self.client.sec.encryption_keypairs[ + 1]["cert_file"], + pre_encryption_part(), + node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response = _sec.sign_statement("%s" % response, + class_name(response.assertion[0]), + key_file=self.server.sec.key_file, + node_id=response.assertion[0].id) + + response = samlp.response_from_string(response) + + # seresp = samlp.response_from_string(enctext) + + resp_str = encode_fn(response.to_string()) + + # Now over to the client side + resp = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"_012345": "http://foo.example.com/service"}) + + # assert resp.encrypted_assertion == [] + assert resp.assertion + assert resp.assertion.advice + assert resp.assertion.advice.assertion + assert resp.ava == \ + {'street': ['street'], 'uid': ['test01'], 'title': ['title'], + 'givenName': ['Dave'], 'email': + ['test.testsson@test.se'], 'sn': [u'Concepción']} + + def test_signed_redirect(self): + + # Revert configuration change to disallow unsinged responses + self.client.want_response_signed = True + + msg_str = "%s" % self.client.create_authn_request( + "http://localhost:8088/sso", message_id="id1")[1] + + info = self.client.apply_binding( + BINDING_HTTP_REDIRECT, msg_str, destination="", + relay_state="relay2", sigalg=SIG_RSA_SHA256) + + loc = info["headers"][0][1] + qs = parse_qs(loc[1:]) + assert _leq(qs.keys(), + ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']) + + assert verify_redirect_signature(list_values2simpletons(qs), + self.client.sec.sec_backend) + + res = self.server.parse_authn_request(qs["SAMLRequest"][0], + BINDING_HTTP_REDIRECT) + + def test_do_logout_signed_redirect(self): + conf = config.SPConfig() + conf.load_file("sp_slo_redirect_conf") + client = Saml2Client(conf) + + # information about the user from an IdP + session_info = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": in_a_while(minutes=15), + "ava": { + "givenName": "Anders", + "sn": "Österberg", + "mail": "anders.osterberg@example.com" + } + } + client.users.add_information_about_person(session_info) + entity_ids = client.users.issuers_of_info(nid) + assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] + + resp = client.do_logout(nid, entity_ids, "Tired", in_a_while(minutes=5), + sign=True, + expected_binding=BINDING_HTTP_REDIRECT) + + assert list(resp.keys()) == entity_ids + binding, info = resp[entity_ids[0]] + assert binding == BINDING_HTTP_REDIRECT + + loc = info["headers"][0][1] + _, _, _, _, qs, _ = urlparse(loc) + qs = parse_qs(qs) + assert _leq(qs.keys(), + ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature']) + + assert verify_redirect_signature(list_values2simpletons(qs), + client.sec.sec_backend) + + res = self.server.parse_logout_request(qs["SAMLRequest"][0], + BINDING_HTTP_REDIRECT) + + def test_do_logout_post(self): + # information about the user from an IdP + session_info = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": in_a_while(minutes=15), + "ava": { + "givenName": "Anders", + "sn": "Österberg", + "mail": "anders.osterberg@example.com" + }, + "session_index": SessionIndex("_foo") + } + self.client.users.add_information_about_person(session_info) + entity_ids = self.client.users.issuers_of_info(nid) + assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] + resp = self.client.do_logout(nid, entity_ids, "Tired", + in_a_while(minutes=5), sign=True, + expected_binding=BINDING_HTTP_POST) + assert resp + assert len(resp) == 1 + assert list(resp.keys()) == entity_ids + binding, info = resp[entity_ids[0]] + assert binding == BINDING_HTTP_POST + + _dic = unpack_form(info["data"]) + res = self.server.parse_logout_request(_dic["SAMLRequest"], + BINDING_HTTP_POST) + assert b'<ns0:SessionIndex>_foo</ns0:SessionIndex>' in res.xmlstr + + def test_do_logout_session_expired(self): + # information about the user from an IdP + session_info = { + "name_id": nid, + "issuer": "urn:mace:example.com:saml:roland:idp", + "not_on_or_after": a_while_ago(minutes=15), + "ava": { + "givenName": "Anders", + "sn": "Österberg", + "mail": "anders.osterberg@example.com" + }, + "session_index": SessionIndex("_foo") + } + self.client.users.add_information_about_person(session_info) + entity_ids = self.client.users.issuers_of_info(nid) + assert entity_ids == ["urn:mace:example.com:saml:roland:idp"] + resp = self.client.do_logout(nid, entity_ids, "Tired", + in_a_while(minutes=5), sign=True, + expected_binding=BINDING_HTTP_POST) + assert resp + assert len(resp) == 1 + assert list(resp.keys()) == entity_ids + binding, info = resp[entity_ids[0]] + assert binding == BINDING_HTTP_POST + + _dic = unpack_form(info["data"]) + res = self.server.parse_logout_request(_dic["SAMLRequest"], + BINDING_HTTP_POST) + assert b'<ns0:SessionIndex>_foo</ns0:SessionIndex>' in res.xmlstr + # Below can only be done with dummy Server IDP = "urn:mace:example.com:saml:roland:idp" @@ -1553,8 +2903,8 @@ class TestClientWithDummy(): "not_on_or_after": in_a_while(minutes=15), "ava": { "givenName": "Anders", - "sn": "Andersson", - "mail": "anders.andersson@example.com" + "sn": "Österberg", + "mail": "anders.osterberg@example.com" } } self.client.users.add_information_about_person(session_info) @@ -1658,8 +3008,8 @@ class TestClientNoConfigContext(): "not_on_or_after": in_a_while(minutes=15), "ava": { "givenName": "Anders", - "sn": "Andersson", - "mail": "anders.andersson@example.com" + "sn": "Österberg", + "mail": "anders.osterberg@example.com" } } self.client.users.add_information_about_person(session_info) |