summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2018-10-10 14:37:40 +0300
committerGitHub <noreply@github.com>2018-10-10 14:37:40 +0300
commit691e981b0ddca9b7d75fa468ef31f494e80c0268 (patch)
treec131fab7847486aeba461ed448abad1712f0233b /tests
parentc5c7e2ded26bf02ea51723708c1ee554e26b6811 (diff)
parent35fc1dc813884d4ade6f463e418ecc751f342608 (diff)
downloadpysaml2-691e981b0ddca9b7d75fa468ef31f494e80c0268.tar.gz
Merge pull request #550 from johanlundberg/non_ascii_ava_encryption_decryption
Support non-ascii attribute values for encryption and decryption
Diffstat (limited to 'tests')
-rw-r--r--tests/test_40_sigver.py424
-rw-r--r--tests/test_50_server.py1105
-rw-r--r--tests/test_51_client.py1360
3 files changed, 2883 insertions, 6 deletions
diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py
index f975b5ea..ba5cf639 100644
--- a/tests/test_40_sigver.py
+++ b/tests/test_40_sigver.py
@@ -1,4 +1,5 @@
#!/usr/bin/env python
+# -*- coding: utf-8 -*-
import base64
from saml2.xmldsig import SIG_RSA_SHA256
@@ -420,6 +421,300 @@ class TestSecurity():
s_response, response2, class_name(response2))
+class TestSecurityNonAsciiAva():
+ def setup_class(self):
+ # This would be one way to initialize the security context :
+ #
+ # conf = config.SPConfig()
+ # conf.load_file("server_conf")
+ # conf.only_use_keys_in_metadata = False
+ #
+ # but instead, FakeConfig() is used to really only use the minimal
+ # set of parameters needed for these test cases. Other test cases
+ # (TestSecurityMetadata below) excersise the SPConfig() mechanism.
+ #
+ conf = FakeConfig()
+ self.sec = sigver.security_context(conf)
+
+ self._assertion = factory(
+ saml.Assertion,
+ version="2.0",
+ id="11111",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1),
+ attribute_statement=do_attribute_statement({
+ ("", "", "surName"): ("Föö", ""),
+ ("", "", "givenName"): ("Bär", ""),
+ })
+ )
+
+ def test_verify_1(self):
+ with open(SIGNED) as fp:
+ xml_response = fp.read()
+ response = self.sec.correctly_signed_response(xml_response)
+ assert response
+
+ def test_non_verify_1(self):
+ """ unsigned is OK """
+ with open(UNSIGNED) as fp:
+ xml_response = fp.read()
+ response = self.sec.correctly_signed_response(xml_response)
+ assert response
+
+ def test_sign_assertion(self):
+ ass = self._assertion
+ print(ass)
+ sign_ass = self.sec.sign_assertion("%s" % ass, node_id=ass.id)
+ #print(sign_ass)
+ sass = saml.assertion_from_string(sign_ass)
+ #print(sass)
+ assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
+ 'version', 'signature', 'id'])
+ assert sass.version == "2.0"
+ assert sass.id == "11111"
+ assert time_util.str_to_time(sass.issue_instant)
+
+ print("Crypto version : %s" % (self.sec.crypto.version()))
+
+ item = self.sec.check_signature(sass, class_name(sass), sign_ass)
+
+ assert isinstance(item, saml.Assertion)
+
+ def test_multiple_signatures_assertion(self):
+ ass = self._assertion
+ # basic test with two of the same
+ to_sign = [(ass, ass.id, ''),
+ (ass, ass.id, '')
+ ]
+ sign_ass = self.sec.multiple_signatures("%s" % ass, to_sign)
+ sass = saml.assertion_from_string(sign_ass)
+ assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
+ 'version', 'signature', 'id'])
+ assert sass.version == "2.0"
+ assert sass.id == "11111"
+ assert time_util.str_to_time(sass.issue_instant)
+
+ print("Crypto version : %s" % (self.sec.crypto.version()))
+
+ item = self.sec.check_signature(sass, class_name(sass),
+ sign_ass, must=True)
+
+ assert isinstance(item, saml.Assertion)
+
+ def test_multiple_signatures_response(self):
+ response = factory(samlp.Response,
+ assertion=self._assertion,
+ id="22222",
+ signature=sigver.pre_signature_part(
+ "22222", self.sec.my_cert))
+
+ # order is important, we can't validate if the signatures are made
+ # in the reverse order
+ to_sign = [(self._assertion, self._assertion.id, ''),
+ (response, response.id, '')]
+
+ s_response = self.sec.multiple_signatures("%s" % response, to_sign)
+ assert s_response is not None
+ response = response_from_string(s_response)
+
+ item = self.sec.check_signature(response, class_name(response),
+ s_response, must=True)
+ assert item == response
+ assert item.id == "22222"
+
+ s_assertion = item.assertion[0]
+ assert isinstance(s_assertion, saml.Assertion)
+ # make sure the assertion was modified when we supposedly signed it
+ assert s_assertion != self._assertion
+
+ ci = "".join(sigver.cert_from_instance(s_assertion)[0].split())
+ assert ci == self.sec.my_cert
+
+ res = self.sec.check_signature(s_assertion, class_name(s_assertion),
+ s_response, must=True)
+ assert res == s_assertion
+ assert s_assertion.id == "11111"
+ assert s_assertion.version == "2.0"
+ assert _eq(s_assertion.keyswv(), ['attribute_statement',
+ 'issue_instant',
+ 'version', 'signature', 'id'])
+
+ def test_sign_response(self):
+ response = factory(samlp.Response,
+ assertion=self._assertion,
+ id="22222",
+ signature=sigver.pre_signature_part("22222",
+ self.sec
+ .my_cert))
+
+ to_sign = [(class_name(self._assertion), self._assertion.id),
+ (class_name(response), response.id)]
+ s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
+
+ assert s_response is not None
+ print(s_response)
+ response = response_from_string(s_response)
+ sass = response.assertion[0]
+
+ print(sass)
+ assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
+ 'version', 'signature', 'id'])
+ assert sass.version == "2.0"
+ assert sass.id == "11111"
+
+ item = self.sec.check_signature(response, class_name(response),
+ s_response)
+ assert isinstance(item, samlp.Response)
+ assert item.id == "22222"
+
+ def test_sign_response_2(self):
+ assertion2 = factory(saml.Assertion,
+ version="2.0",
+ id="11122",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11122",
+ self.sec
+ .my_cert),
+ attribute_statement=do_attribute_statement({
+ ("", "", "surName"): ("Räv", ""),
+ ("", "", "givenName"): ("Björn", ""),
+ })
+ )
+ response = factory(samlp.Response,
+ assertion=assertion2,
+ id="22233",
+ signature=sigver.pre_signature_part("22233",
+ self.sec
+ .my_cert))
+
+ to_sign = [(class_name(assertion2), assertion2.id),
+ (class_name(response), response.id)]
+
+ s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
+
+ assert s_response is not None
+ response2 = response_from_string(s_response)
+
+ sass = response2.assertion[0]
+ assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
+ 'version', 'signature', 'id'])
+ assert sass.version == "2.0"
+ assert sass.id == "11122"
+
+ item = self.sec.check_signature(response2, class_name(response),
+ s_response)
+
+ assert isinstance(item, samlp.Response)
+
+ def test_sign_verify(self):
+ response = factory(samlp.Response,
+ assertion=self._assertion,
+ id="22233",
+ signature=sigver.pre_signature_part("22233",
+ self.sec
+ .my_cert))
+
+ to_sign = [(class_name(self._assertion), self._assertion.id),
+ (class_name(response), response.id)]
+
+ s_response = sigver.signed_instance_factory(response, self.sec,
+ to_sign)
+
+ print(s_response)
+ res = self.sec.verify_signature(s_response,
+ node_name=class_name(samlp.Response()))
+
+ print(res)
+ assert res
+
+ def test_sign_verify_with_cert_from_instance(self):
+ response = factory(samlp.Response,
+ assertion=self._assertion,
+ id="22222",
+ signature=sigver.pre_signature_part("22222",
+ self.sec
+ .my_cert))
+
+ to_sign = [(class_name(self._assertion), self._assertion.id),
+ (class_name(response), response.id)]
+
+ s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
+
+ response2 = response_from_string(s_response)
+
+ ci = "".join(sigver.cert_from_instance(response2)[0].split())
+
+ assert ci == self.sec.my_cert
+
+ res = self.sec.verify_signature(s_response,
+ node_name=class_name(samlp.Response()))
+
+ assert res
+
+ res = self.sec._check_signature(s_response, response2,
+ class_name(response2), s_response)
+ assert res == response2
+
+ def test_sign_verify_assertion_with_cert_from_instance(self):
+ assertion = factory(saml.Assertion,
+ version="2.0",
+ id="11100",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11100",
+ self.sec
+ .my_cert),
+ attribute_statement=do_attribute_statement({
+ ("", "", "surName"): ("Räv", ""),
+ ("", "", "givenName"): ("Björn", ""),
+ })
+ )
+
+ to_sign = [(class_name(assertion), assertion.id)]
+ s_assertion = sigver.signed_instance_factory(assertion, self.sec,
+ to_sign)
+ print(s_assertion)
+ ass = assertion_from_string(s_assertion)
+ ci = "".join(sigver.cert_from_instance(ass)[0].split())
+ assert ci == self.sec.my_cert
+
+ res = self.sec.verify_signature(s_assertion,
+ node_name=class_name(ass))
+ assert res
+
+ res = self.sec._check_signature(s_assertion, ass, class_name(ass))
+
+ assert res
+
+ def test_exception_sign_verify_with_cert_from_instance(self):
+ assertion = factory(saml.Assertion,
+ version="2.0",
+ id="11100",
+ issue_instant="2009-10-30T13:20:28Z",
+ #signature= sigver.pre_signature_part("11100",
+ # self.sec.my_cert),
+ attribute_statement=do_attribute_statement({
+ ("", "", "surName"): ("Föö", ""),
+ ("", "", "givenName"): ("Bär", ""),
+ })
+ )
+
+ response = factory(samlp.Response,
+ assertion=assertion,
+ id="22222",
+ signature=sigver.pre_signature_part("22222",
+ self.sec
+ .my_cert))
+
+ to_sign = [(class_name(response), response.id)]
+
+ s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
+
+ response2 = response_from_string(s_response)
+ # Change something that should make everything fail
+ response2.id = "23456"
+ raises(sigver.SignatureError, self.sec._check_signature,
+ s_response, response2, class_name(response2))
+
class TestSecurityMetadata():
def setup_class(self):
@@ -442,6 +737,27 @@ class TestSecurityMetadata():
)
+class TestSecurityMetadataNonAsciiAva():
+ def setup_class(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ md = MetadataStore([saml, samlp], None, conf)
+ md.load("local", full_path("metadata_cert.xml"))
+
+ conf.metadata = md
+ conf.only_use_keys_in_metadata = False
+ self.sec = sigver.security_context(conf)
+
+ assertion = factory(
+ saml.Assertion, version="2.0", id="11111",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1),
+ attribute_statement=do_attribute_statement(
+ {("", "", "surName"): ("Föö", ""),
+ ("", "", "givenName"): ("Bär", ""), })
+ )
+
+
def test_xbox():
conf = config.SPConfig()
conf.load_file("server_conf")
@@ -495,6 +811,59 @@ def test_xbox():
print(assertions)
+def test_xbox_non_ascii_ava():
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ md = MetadataStore([saml, samlp], None, conf)
+ md.load("local", full_path("idp_example.xml"))
+
+ conf.metadata = md
+ conf.only_use_keys_in_metadata = False
+ sec = sigver.security_context(conf)
+
+ assertion = factory(
+ saml.Assertion, version="2.0", id="11111",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11111", sec.my_cert, 1),
+ attribute_statement=do_attribute_statement(
+ {("", "", "surName"): ("Föö", ""),
+ ("", "", "givenName"): ("Bär", ""), })
+ )
+
+ sigass = sec.sign_statement(assertion, class_name(assertion),
+ key_file=full_path("test.key"),
+ node_id=assertion.id)
+
+ _ass0 = saml.assertion_from_string(sigass)
+
+ encrypted_assertion = EncryptedAssertion()
+ encrypted_assertion.add_extension_element(_ass0)
+
+ _, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False)
+ enctext = sec.crypto.encrypt(
+ str(encrypted_assertion), conf.cert_file, pre, "des-192",
+ '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')
+
+ decr_text = sec.decrypt(enctext)
+ _seass = saml.encrypted_assertion_from_string(decr_text)
+ assertions = []
+ assers = extension_elements_to_elements(_seass.extension_elements,
+ [saml, samlp])
+
+ sign_cert_file = full_path("test.pem")
+
+ for ass in assers:
+ _ass = "%s" % ass
+ #_ass = _ass.replace('xsi:nil="true" ', '')
+ #assert sigass == _ass
+ _txt = sec.verify_signature(_ass, sign_cert_file,
+ node_name=class_name(assertion))
+ if _txt:
+ assertions.append(ass)
+
+ print(assertions)
+
+
def test_okta():
conf = config.Config()
conf.load_file("server_conf")
@@ -548,6 +917,35 @@ def test_xmlsec_err():
assert False
+def test_xmlsec_err_non_ascii_ava():
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ md = MetadataStore([saml, samlp], None, conf)
+ md.load("local", full_path("idp_example.xml"))
+
+ conf.metadata = md
+ conf.only_use_keys_in_metadata = False
+ sec = sigver.security_context(conf)
+
+ assertion = factory(
+ saml.Assertion, version="2.0", id="11111",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11111", sec.my_cert, 1),
+ attribute_statement=do_attribute_statement(
+ {("", "", "surName"): ("Föö", ""),
+ ("", "", "givenName"): ("Bär", ""), })
+ )
+
+ try:
+ sec.sign_statement(assertion, class_name(assertion),
+ key_file=full_path("tes.key"),
+ node_id=assertion.id)
+ except (XmlsecError, SigverError) as err: # should throw an exception
+ pass
+ else:
+ assert False
+
+
def test_sha256_signing():
conf = config.SPConfig()
conf.load_file("server_conf")
@@ -574,6 +972,32 @@ def test_sha256_signing():
assert s
+def test_sha256_signing_non_ascii_ava():
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ md = MetadataStore([saml, samlp], None, conf)
+ md.load("local", full_path("idp_example.xml"))
+
+ conf.metadata = md
+ conf.only_use_keys_in_metadata = False
+ sec = sigver.security_context(conf)
+
+ assertion = factory(
+ saml.Assertion, version="2.0", id="11111",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11111", sec.my_cert, 1,
+ sign_alg=SIG_RSA_SHA256),
+ attribute_statement=do_attribute_statement(
+ {("", "", "surName"): ("Föö", ""),
+ ("", "", "givenName"): ("Bär", ""), })
+ )
+
+ s = sec.sign_statement(assertion, class_name(assertion),
+ key_file=full_path("test.key"),
+ node_id=assertion.id)
+ assert s
+
+
def test_xmlsec_output_line_parsing():
output1 = "prefix\nOK\npostfix"
assert sigver.parse_xmlsec_output(output1)
diff --git a/tests/test_50_server.py b/tests/test_50_server.py
index f0dcae3c..11ace47b 100644
--- a/tests/test_50_server.py
+++ b/tests/test_50_server.py
@@ -1185,7 +1185,1110 @@ class TestServer1():
idp.ident.close()
assert request
-#------------------------------------------------------------------------
+# ------------------------------------------------------------------------
+
+
+class TestServer1NonAsciiAva():
+
+ def setup_class(self):
+ self.server = Server("idp_conf")
+
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ self.client = client.Saml2Client(conf)
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id12")
+ self.ava = {"givenName": ["Dave"], "sn": ["Concepción"],
+ "mail": ["dave@cnr.mlb.com"], "title": "#13"}
+
+ def teardown_class(self):
+ self.server.close()
+
+ def verify_assertion(self, assertion):
+ assert assertion
+ assert assertion[0].attribute_statement
+
+ ava = get_ava(assertion[0])
+
+ assert ava == \
+ {"givenName": ["Dave"], "sn": [u"Concepción"],
+ "mail": ["dave@cnr.mlb.com"], "title": ["#13"]}
+
+
+ def verify_encrypted_assertion(self, assertion, decr_text):
+ self.verify_assertion(assertion)
+ assert assertion[0].signature is None
+
+ assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
+ 'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
+
+ def verify_advice_assertion(self, resp, decr_text):
+ assert resp.assertion[0].signature is None
+
+ assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
+
+ assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
+ [saml, samlp])
+ self.verify_encrypted_assertion(assertion, decr_text)
+
+
+ def test_issuer(self):
+ issuer = self.server._issuer()
+ assert isinstance(issuer, saml.Issuer)
+ assert _eq(issuer.keyswv(), ["text", "format"])
+ assert issuer.format == saml.NAMEID_FORMAT_ENTITY
+ assert issuer.text == self.server.config.entityid
+
+ def test_assertion(self):
+ assertion = s_utils.assertion_factory(
+ subject=factory(
+ saml.Subject, text="_aaa",
+ name_id=factory(saml.NameID,
+ format=saml.NAMEID_FORMAT_TRANSIENT)),
+ attribute_statement=do_attribute_statement(
+ {
+ ("", "", "sn"): ("Jeter", ""),
+ ("", "", "givenName"): ("Derek", ""),
+ }
+ ),
+ issuer=self.server._issuer(),
+ )
+
+ assert _eq(assertion.keyswv(), ['attribute_statement', 'issuer', 'id',
+ 'subject', 'issue_instant', 'version'])
+ assert assertion.version == "2.0"
+ assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp"
+ #
+ assert assertion.attribute_statement
+ attribute_statement = assertion.attribute_statement
+ assert len(attribute_statement.attribute) == 2
+ attr0 = attribute_statement.attribute[0]
+ attr1 = attribute_statement.attribute[1]
+ if attr0.attribute_value[0].text == "Derek":
+ assert attr0.friendly_name == "givenName"
+ assert attr1.friendly_name == "sn"
+ assert attr1.attribute_value[0].text == "Jeter"
+ else:
+ assert attr1.friendly_name == "givenName"
+ assert attr1.attribute_value[0].text == "Derek"
+ assert attr0.friendly_name == "sn"
+ assert attr0.attribute_value[0].text == "Jeter"
+ #
+ subject = assertion.subject
+ assert _eq(subject.keyswv(), ["text", "name_id"])
+ assert subject.text == "_aaa"
+ assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT
+
+ def test_response(self):
+ response = sigver.response_factory(
+ in_response_to="_012345",
+ destination="https:#www.example.com",
+ status=s_utils.success_status_factory(),
+ assertion=s_utils.assertion_factory(
+ subject=factory(saml.Subject, text="_aaa",
+ name_id=saml.NAMEID_FORMAT_TRANSIENT),
+ attribute_statement=do_attribute_statement(
+ {
+ ("", "", "sn"): ("Jeter", ""),
+ ("", "", "givenName"): ("Derek", ""),
+ }
+ ),
+ issuer=self.server._issuer(),
+ ),
+ issuer=self.server._issuer(),
+ )
+
+ print(response.keyswv())
+ assert _eq(response.keyswv(), ['destination', 'assertion', 'status',
+ 'in_response_to', 'issue_instant',
+ 'version', 'issuer', 'id'])
+ assert response.version == "2.0"
+ assert response.issuer.text == "urn:mace:example.com:saml:roland:idp"
+ assert response.destination == "https:#www.example.com"
+ assert response.in_response_to == "_012345"
+ #
+ status = response.status
+ print(status)
+ assert status.status_code.value == samlp.STATUS_SUCCESS
+
+ def test_parse_faulty_request(self):
+ req_id, authn_request = self.client.create_authn_request(
+ destination="http://www.example.com", id="id1")
+
+ # should raise an error because faulty spentityid
+ binding = BINDING_HTTP_REDIRECT
+ htargs = self.client.apply_binding(
+ binding, "%s" % authn_request, "http://www.example.com", "abcd")
+ _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
+ print(_dict)
+ raises(OtherError, self.server.parse_authn_request,
+ _dict["SAMLRequest"][0], binding)
+
+ def test_parse_faulty_request_to_err_status(self):
+ req_id, authn_request = self.client.create_authn_request(
+ destination="http://www.example.com")
+
+ binding = BINDING_HTTP_REDIRECT
+ htargs = self.client.apply_binding(binding, "%s" % authn_request,
+ "http://www.example.com", "abcd")
+ _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
+ print(_dict)
+
+ try:
+ self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
+ status = None
+ except OtherError as oe:
+ print(oe.args)
+ status = s_utils.error_status_factory(oe)
+
+ assert status
+ print(status)
+ assert _eq(status.keyswv(), ["status_code", "status_message"])
+ assert status.status_message.text == 'Not destined for me!'
+ status_code = status.status_code
+ assert _eq(status_code.keyswv(), ["status_code", "value"])
+ assert status_code.value == samlp.STATUS_RESPONDER
+ assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
+
+ def test_parse_ok_request(self):
+ req_id, authn_request = self.client.create_authn_request(
+ message_id="id1", destination="http://localhost:8088/sso")
+
+ print(authn_request)
+ binding = BINDING_HTTP_REDIRECT
+ htargs = self.client.apply_binding(binding, "%s" % authn_request,
+ "http://www.example.com", "abcd")
+ _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
+ print(_dict)
+
+ req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
+ # returns a dictionary
+ print(req)
+ resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST])
+ assert resp_args["destination"] == "http://lingon.catalogix.se:8087/"
+ assert resp_args["in_response_to"] == "id1"
+ name_id_policy = resp_args["name_id_policy"]
+ assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
+ assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
+ assert resp_args[
+ "sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
+
+ def test_sso_response_with_identity(self):
+ name_id = self.server.ident.transient_nameid(
+ "https://example.com/sp", "id12")
+ resp = self.server.create_authn_response(
+ {
+ "eduPersonEntitlement": "Short stop",
+ "sn": "Jeter",
+ "givenName": "Derek",
+ "mail": "derek.jeter@nyy.mlb.com",
+ "title": "The man"
+ },
+ "id12", # in_response_to
+ "http://localhost:8087/", # destination
+ "https://example.com/sp", # sp_entity_id
+ name_id=name_id,
+ authn=AUTHN
+ )
+
+ print(resp.keyswv())
+ assert _eq(resp.keyswv(), ['status', 'destination', 'assertion',
+ 'in_response_to', 'issue_instant',
+ 'version', 'id', 'issuer'])
+ assert resp.destination == "http://localhost:8087/"
+ assert resp.in_response_to == "id12"
+ assert resp.status
+ assert resp.status.status_code.value == samlp.STATUS_SUCCESS
+ assert resp.assertion
+ assertion = resp.assertion
+ print(assertion)
+ assert assertion.authn_statement
+ assert assertion.conditions
+ assert assertion.attribute_statement
+ attribute_statement = assertion.attribute_statement
+ print(attribute_statement)
+ assert len(attribute_statement[0].attribute) == 4
+ # Pick out one attribute
+ attr = None
+ for attr in attribute_statement[0].attribute:
+ if attr.friendly_name == "givenName":
+ break
+ assert len(attr.attribute_value) == 1
+ assert attr.name == "urn:mace:dir:attribute-def:givenName"
+ assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:basic"
+ value = attr.attribute_value[0]
+ assert value.text.strip() == "Derek"
+ assert value.get_type() == "xs:string"
+ assert assertion.subject
+ assert assertion.subject.name_id
+ assert assertion.subject.subject_confirmation
+ confirmation = assertion.subject.subject_confirmation[0]
+ print(confirmation.keyswv())
+ print(confirmation.subject_confirmation_data)
+ assert confirmation.subject_confirmation_data.in_response_to == "id12"
+
+ def test_sso_response_without_identity(self):
+ resp = self.server.create_authn_response(
+ {},
+ "id12", # in_response_to
+ "http://localhost:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ userid="USER1",
+ authn=AUTHN,
+ release_policy=Policy(),
+ best_effort=True
+ )
+
+ print(resp.keyswv())
+ assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
+ 'issue_instant', 'version', 'id', 'issuer',
+ 'assertion'])
+ assert resp.destination == "http://localhost:8087/"
+ assert resp.in_response_to == "id12"
+ assert resp.status
+ assert resp.status.status_code.value == samlp.STATUS_SUCCESS
+ assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
+ assert not resp.assertion.attribute_statement
+
+ def test_sso_response_specific_instant(self):
+ _authn = AUTHN.copy()
+ _authn["authn_instant"] = 1234567890
+
+ resp = self.server.create_authn_response(
+ {},
+ "id12", # in_response_to
+ "http://localhost:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ userid="USER1",
+ authn=_authn,
+ best_effort=True
+ )
+
+ print(resp.keyswv())
+ assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
+ 'issue_instant', 'version', 'id', 'issuer',
+ 'assertion'])
+ authn_statement = resp.assertion.authn_statement[0]
+ assert authn_statement.authn_instant == '2009-02-13T23:31:30Z'
+
+ def test_sso_failure_response(self):
+ exc = s_utils.MissingValue("eduPersonAffiliation missing")
+ resp = self.server.create_error_response(
+ "id12", "http://localhost:8087/", exc)
+
+ print(resp.keyswv())
+ assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
+ 'issue_instant', 'version', 'id', 'issuer'])
+ assert resp.destination == "http://localhost:8087/"
+ assert resp.in_response_to == "id12"
+ assert resp.status
+ print(resp.status)
+ assert resp.status.status_code.value == samlp.STATUS_RESPONDER
+ assert resp.status.status_code.status_code.value == \
+ samlp.STATUS_REQUEST_UNSUPPORTED
+ assert resp.status.status_message.text == \
+ "eduPersonAffiliation missing"
+ assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
+ assert not resp.assertion
+
+ def test_authn_response_0(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ self.client = client.Saml2Client(conf)
+
+ ava = {"givenName": ["Derek"], "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"], "title": "The man"}
+
+ npolicy = samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT,
+ allow_create="true")
+ resp_str = "%s" % self.server.create_authn_response(
+ ava, "id1", "http://local:8087/",
+ "urn:mace:example.com:saml:roland:sp", npolicy,
+ "foba0001@example.com", authn=AUTHN)
+
+ response = samlp.response_from_string(resp_str)
+ print(response.keyswv())
+ assert _eq(response.keyswv(), ['status', 'destination', 'assertion',
+ 'in_response_to', 'issue_instant',
+ 'version', 'issuer', 'id'])
+ print(response.assertion[0].keyswv())
+ assert len(response.assertion) == 1
+ assert _eq(response.assertion[0].keyswv(), ['attribute_statement',
+ 'issue_instant', 'version',
+ 'subject', 'conditions',
+ 'id', 'issuer',
+ 'authn_statement'])
+ assertion = response.assertion[0]
+ assert len(assertion.attribute_statement) == 1
+ astate = assertion.attribute_statement[0]
+ print(astate)
+ assert len(astate.attribute) == 4
+
+ def test_signed_response(self):
+ name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id12")
+ ava = {"givenName": ["Derek"], "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"], "title": "The man"}
+
+ signed_resp = self.server.create_authn_response(
+ ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=name_id,
+ sign_assertion=True
+ )
+
+ print(signed_resp)
+ assert signed_resp
+
+ sresponse = response_from_string(signed_resp)
+ # It's the assertions that are signed not the response per se
+ assert len(sresponse.assertion) == 1
+ assertion = sresponse.assertion[0]
+
+ # Since the reponse is created dynamically I don't know the signature
+ # value. Just that there should be one
+ assert assertion.signature.signature_value.text != ""
+
+ def test_signed_response_1(self):
+
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=True,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=sresponse.assertion[0].id,
+ id_attr="")
+ assert valid
+
+ self.verify_assertion(sresponse.assertion)
+
+ def test_signed_response_2(self):
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=False,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ assert sresponse.assertion[0].signature == None
+
+ def test_signed_response_3(self):
+
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=True,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ assert sresponse.signature == None
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=sresponse.assertion[0].id,
+ id_attr="")
+ assert valid
+
+ self.verify_assertion(sresponse.assertion)
+
+ def test_encrypted_signed_response_1(self):
+
+ cert_str, cert_key_str = generate_cert()
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(
+ signed_resp, self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id, id_attr="")
+
+ assert valid
+
+ valid = self.server.sec.verify_signature(
+ signed_resp, self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=sresponse.assertion[0].id, id_attr="")
+
+ assert valid
+
+ _, key_file = make_temp(cert_key_str, decode=False)
+
+ decr_text = self.server.sec.decrypt(signed_resp, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
+
+ assertion = extension_elements_to_elements(
+ resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
+ [saml, samlp])
+
+ self.verify_assertion(assertion)
+
+
+
+ #PEFIM never signs assertions.
+ assert assertion[0].signature is None
+ #valid = self.server.sec.verify_signature(decr_text,
+ # self.server.config.cert_file,
+ # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ # node_id=assertion[0].id,
+ # id_attr="")
+ assert valid
+
+ def test_encrypted_signed_response_2(self):
+ cert_str, cert_key_str = generate_cert()
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ decr_text_old = copy.deepcopy("%s" % signed_resp)
+
+ decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[0]["key_file"])
+
+ assert decr_text == decr_text_old
+
+ decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ assert decr_text != decr_text_old
+
+ resp = samlp.response_from_string(decr_text)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ assert resp.assertion[0].signature == None
+
+ self.verify_assertion(resp.assertion)
+
+
+ def test_encrypted_signed_response_3(self):
+ cert_str, cert_key_str = generate_cert()
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=False,
+ encrypt_cert_assertion=cert_str,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ _, key_file = make_temp(cert_key_str, decode=False)
+
+ decr_text = self.server.sec.decrypt(signed_resp, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ valid = self.server.sec.verify_signature(decr_text,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=resp.assertion[0].id,
+ id_attr="")
+
+ assert valid
+
+ self.verify_assertion(resp.assertion)
+
+ assert 'xmlns:encas' not in decr_text
+
+
+ def test_encrypted_signed_response_4(self):
+
+ cert_str, cert_key_str = generate_cert()
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ resp = samlp.response_from_string(decr_text)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ valid = self.server.sec.verify_signature(decr_text,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=resp.assertion[0].id,
+ id_attr="")
+
+ assert valid
+
+ _, key_file = make_temp(cert_key_str, decode=False)
+
+ decr_text = self.server.sec.decrypt(decr_text, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+ assertion = \
+ extension_elements_to_elements(assertion[0].advice.encrypted_assertion[0].extension_elements,[saml, samlp])
+ self.verify_assertion(assertion)
+
+ #PEFIM never signs assertion in advice
+ assert assertion[0].signature is None
+ #valid = self.server.sec.verify_signature(decr_text,
+ # self.server.config.cert_file,
+ # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ # node_id=assertion[0].id,
+ # id_attr="")
+ assert valid
+
+ def test_encrypted_response_1(self):
+ cert_str_advice, cert_key_str_advice = generate_cert()
+
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str_advice,
+ )
+
+ _resp = "%s" % _resp
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ _, key_file = make_temp(cert_key_str_advice, decode=False)
+
+ decr_text = self.server.sec.decrypt(_resp, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ self.verify_advice_assertion(resp, decr_text)
+
+ def test_encrypted_response_2(self):
+
+ cert_str_advice, cert_key_str_advice = generate_cert()
+
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str_advice,
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ _, key_file = make_temp(cert_key_str_advice, decode=False)
+
+ decr_text_2 = self.server.sec.decrypt(decr_text_1, key_file)
+
+ resp = samlp.response_from_string(decr_text_2)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_advice_assertion(resp, decr_text_2)
+
+ def test_encrypted_response_3(self):
+ cert_str_assertion, cert_key_str_assertion = generate_cert()
+
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ encrypt_cert_assertion=cert_str_assertion
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ _, key_file = make_temp(cert_key_str_assertion, decode=False)
+
+ decr_text = self.server.sec.decrypt(_resp, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ assert resp.encrypted_assertion[0].extension_elements
+
+ assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_encrypted_assertion(assertion, decr_text)
+
+ def test_encrypted_response_4(self):
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ resp = samlp.response_from_string(decr_text)
+
+ assert resp.encrypted_assertion[0].extension_elements
+
+ assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_encrypted_assertion(assertion, decr_text)
+
+ def test_encrypted_response_5(self):
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True
+ )
+
+ _resp = "%s" % _resp
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ resp = samlp.response_from_string(decr_text)
+
+ self.verify_advice_assertion(resp, decr_text)
+
+ def test_encrypted_response_6(self):
+ _server = Server("idp_conf_verify_cert")
+
+ cert_str_advice, cert_key_str_advice = generate_cert()
+
+ cert_str_assertion, cert_key_str_assertion = generate_cert()
+
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str_advice,
+ encrypt_cert_assertion=cert_str_assertion
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ _, key_file = make_temp(cert_key_str_assertion, decode=False)
+
+ decr_text_1 = _server.sec.decrypt(_resp, key_file)
+
+ _, key_file = make_temp(cert_key_str_advice, decode=False)
+
+ decr_text_2 = _server.sec.decrypt(decr_text_1, key_file)
+
+ resp = samlp.response_from_string(decr_text_2)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_advice_assertion(resp, decr_text_2)
+
+ def test_encrypted_response_7(self):
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ decr_text_2 = self.server.sec.decrypt(decr_text_1, self.client.config.encryption_keypairs[1]["key_file"])
+
+ resp = samlp.response_from_string(decr_text_2)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_advice_assertion(resp, decr_text_2)
+
+ def test_encrypted_response_8(self):
+ try:
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice="whatever",
+ encrypt_cert_assertion="whatever"
+ )
+ assert False, "Must throw an exception"
+ except EncryptError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ try:
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice="whatever",
+ )
+ assert False, "Must throw an exception"
+ except EncryptError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ try:
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ encrypt_cert_assertion="whatever"
+ )
+ assert False, "Must throw an exception"
+ except EncryptError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ _server = Server("idp_conf_verify_cert")
+
+ try:
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice="whatever",
+ encrypt_cert_assertion="whatever"
+ )
+ assert False, "Must throw an exception"
+ except CertificateError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ try:
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice="whatever",
+ )
+ assert False, "Must throw an exception"
+ except CertificateError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ try:
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ encrypt_cert_assertion="whatever"
+ )
+ assert False, "Must throw an exception"
+ except CertificateError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ def test_encrypted_response_9(self):
+ _server = Server("idp_conf_sp_no_encrypt")
+
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ )
+
+ self.verify_assertion(_resp.assertion.advice.assertion)
+
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True
+ )
+
+ self.verify_assertion(_resp.assertion.advice.assertion)
+
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ )
+
+ self.verify_assertion([_resp.assertion])
+
+
+ def test_slo_http_post(self):
+ soon = time_util.in_a_while(days=1)
+ sinfo = {
+ "name_id": nid,
+ "issuer": "urn:mace:example.com:saml:roland:idp",
+ "not_on_or_after": soon,
+ "user": {
+ "givenName": "Leo",
+ "sn": "Laport",
+ }
+ }
+ self.client.users.add_information_about_person(sinfo)
+
+ req_id, logout_request = self.client.create_logout_request(
+ destination="http://localhost:8088/slop", name_id=nid,
+ issuer_entity_id="urn:mace:example.com:saml:roland:idp",
+ reason="I'm tired of this")
+
+ intermed = base64.b64encode(str(logout_request).encode('utf-8'))
+
+ #saml_soap = make_soap_enveloped_saml_thingy(logout_request)
+ request = self.server.parse_logout_request(intermed, BINDING_HTTP_POST)
+ assert request
+
+ def test_slo_soap(self):
+ soon = time_util.in_a_while(days=1)
+ sinfo = {
+ "name_id": nid,
+ "issuer": "urn:mace:example.com:saml:roland:idp",
+ "not_on_or_after": soon,
+ "user": {
+ "givenName": "Leo",
+ "sn": "Laport",
+ }
+ }
+
+ sp = client.Saml2Client(config_file="server_conf")
+ sp.users.add_information_about_person(sinfo)
+
+ req_id, logout_request = sp.create_logout_request(
+ name_id=nid, destination="http://localhost:8088/slo",
+ issuer_entity_id="urn:mace:example.com:saml:roland:idp",
+ reason="I'm tired of this")
+
+ #_ = s_utils.deflate_and_base64_encode("%s" % (logout_request,))
+
+ saml_soap = make_soap_enveloped_saml_thingy(logout_request)
+ self.server.ident.close()
+
+ with closing(Server("idp_soap_conf")) as idp:
+ request = idp.parse_logout_request(saml_soap)
+ idp.ident.close()
+ assert request
+
+# ------------------------------------------------------------------------
+
IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"sn": ["Jeter"], "givenName": ["Derek"],
diff --git a/tests/test_51_client.py b/tests/test_51_client.py
index 8571a36a..78adeac5 100644
--- a/tests/test_51_client.py
+++ b/tests/test_51_client.py
@@ -78,6 +78,11 @@ def generate_cert():
def add_subelement(xmldoc, node_name, subelem):
+ if six.PY2:
+ _str = unicode
+ else:
+ _str = str
+
s = xmldoc.find(node_name)
if s > 0:
x = xmldoc.rindex("<", 0, s)
@@ -88,7 +93,7 @@ def add_subelement(xmldoc, node_name, subelem):
spaces += " "
c += 1
# Sometimes we get an xml header, sometimes we don't.
- subelem_str = str(subelem)
+ subelem_str = _str(subelem)
if subelem_str[0:5].lower() == '<?xml':
subelem_str = subelem_str.split("\n", 1)[1]
xmldoc = xmldoc.replace(
@@ -1483,6 +1488,1351 @@ class TestClient:
assert b'<ns0:SessionIndex>_foo</ns0:SessionIndex>' in res.xmlstr
+class TestClientNonAsciiAva:
+ def setup_class(self):
+ self.server = Server("idp_conf")
+
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ self.client = Saml2Client(conf)
+
+ def teardown_class(self):
+ self.server.close()
+
+ def test_create_attribute_query1(self):
+ req_id, req = self.client.create_attribute_query(
+ "https://idp.example.com/idp/",
+ "E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
+ format=saml.NAMEID_FORMAT_PERSISTENT,
+ message_id="id1")
+ reqstr = "%s" % req.to_string().decode()
+
+ assert req.destination == "https://idp.example.com/idp/"
+ assert req.id == "id1"
+ assert req.version == "2.0"
+ subject = req.subject
+ name_id = subject.name_id
+ assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT
+ assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD"
+ issuer = req.issuer
+ assert issuer.text == "urn:mace:example.com:saml:roland:sp"
+
+ attrq = samlp.attribute_query_from_string(reqstr)
+
+ assert _leq(attrq.keyswv(), ['destination', 'subject', 'issue_instant',
+ 'version', 'id', 'issuer'])
+
+ assert attrq.destination == req.destination
+ assert attrq.id == req.id
+ assert attrq.version == req.version
+ assert attrq.issuer.text == issuer.text
+ assert attrq.issue_instant == req.issue_instant
+ assert attrq.subject.name_id.format == name_id.format
+ assert attrq.subject.name_id.text == name_id.text
+
+ def test_create_attribute_query2(self):
+ req_id, req = self.client.create_attribute_query(
+ "https://idp.example.com/idp/",
+ "E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
+ attribute={
+ ("urn:oid:2.5.4.42",
+ "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
+ "givenName"): None,
+ ("urn:oid:2.5.4.4",
+ "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
+ "surname"): None,
+ ("urn:oid:1.2.840.113549.1.9.1",
+ "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None,
+ },
+ format=saml.NAMEID_FORMAT_PERSISTENT,
+ message_id="id1")
+
+ assert req.destination == "https://idp.example.com/idp/"
+ assert req.id == "id1"
+ assert req.version == "2.0"
+ subject = req.subject
+ name_id = subject.name_id
+ assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT
+ assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD"
+ assert len(req.attribute) == 3
+ # one is givenName
+ seen = []
+ for attribute in req.attribute:
+ if attribute.name == "urn:oid:2.5.4.42":
+ assert attribute.name_format == saml.NAME_FORMAT_URI
+ assert attribute.friendly_name == "givenName"
+ seen.append("givenName")
+ elif attribute.name == "urn:oid:2.5.4.4":
+ assert attribute.name_format == saml.NAME_FORMAT_URI
+ assert attribute.friendly_name == "surname"
+ seen.append("surname")
+ elif attribute.name == "urn:oid:1.2.840.113549.1.9.1":
+ assert attribute.name_format == saml.NAME_FORMAT_URI
+ if getattr(attribute, "friendly_name"):
+ assert False
+ seen.append("email")
+ assert _leq(seen, ["givenName", "surname", "email"])
+
+ def test_create_attribute_query_3(self):
+ req_id, req = self.client.create_attribute_query(
+ "https://aai-demo-idp.switch.ch/idp/shibboleth",
+ "_e7b68a04488f715cda642fbdd90099f5",
+ format=saml.NAMEID_FORMAT_TRANSIENT,
+ message_id="id1")
+
+ assert isinstance(req, samlp.AttributeQuery)
+ assert req.destination == "https://aai-demo-idp.switch" \
+ ".ch/idp/shibboleth"
+ assert req.id == "id1"
+ assert req.version == "2.0"
+ assert req.issue_instant
+ assert req.issuer.text == "urn:mace:example.com:saml:roland:sp"
+ nameid = req.subject.name_id
+ assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT
+ assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5"
+
+ def test_create_auth_request_0(self):
+ ar_str = "%s" % self.client.create_authn_request(
+ "http://www.example.com/sso", message_id="id1")[1]
+
+ ar = samlp.authn_request_from_string(ar_str)
+ assert ar.assertion_consumer_service_url == ("http://lingon.catalogix"
+ ".se:8087/")
+ assert ar.destination == "http://www.example.com/sso"
+ assert ar.protocol_binding == BINDING_HTTP_POST
+ assert ar.version == "2.0"
+ assert ar.provider_name == "urn:mace:example.com:saml:roland:sp"
+ assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp"
+ nid_policy = ar.name_id_policy
+ assert nid_policy.allow_create == "false"
+ assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT
+
+ node_requested_attributes = None
+ for e in ar.extensions.extension_elements:
+ if e.tag == RequestedAttributes.c_tag:
+ node_requested_attributes = e
+ break
+ assert node_requested_attributes is not None
+
+ for c in node_requested_attributes.children:
+ assert c.tag == RequestedAttribute.c_tag
+ assert c.attributes['isRequired'] in ['true', 'false']
+ assert c.attributes['Name']
+ assert c.attributes['FriendlyName']
+ assert c.attributes['NameFormat']
+
+ def test_create_auth_request_unset_force_authn(self):
+ req_id, req = self.client.create_authn_request(
+ "http://www.example.com/sso", sign=False, message_id="id1")
+ assert bool(req.force_authn) == False
+
+ def test_create_auth_request_set_force_authn(self):
+ req_id, req = self.client.create_authn_request(
+ "http://www.example.com/sso", sign=False, message_id="id1",
+ force_authn="true")
+ assert bool(req.force_authn) == True
+
+ def test_create_auth_request_nameid_policy_allow_create(self):
+ conf = config.SPConfig()
+ conf.load_file("sp_conf_nameidpolicy")
+ client = Saml2Client(conf)
+ ar_str = "%s" % client.create_authn_request(
+ "http://www.example.com/sso", message_id="id1")[1]
+
+ ar = samlp.authn_request_from_string(ar_str)
+ assert ar.assertion_consumer_service_url == ("http://lingon.catalogix"
+ ".se:8087/")
+ assert ar.destination == "http://www.example.com/sso"
+ assert ar.protocol_binding == BINDING_HTTP_POST
+ assert ar.version == "2.0"
+ assert ar.provider_name == "urn:mace:example.com:saml:roland:sp"
+ assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp"
+ nid_policy = ar.name_id_policy
+ assert nid_policy.allow_create == "true"
+ assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT
+
+ def test_create_auth_request_vo(self):
+ assert list(self.client.config.vorg.keys()) == [
+ "urn:mace:example.com:it:tek"]
+
+ ar_str = "%s" % self.client.create_authn_request(
+ "http://www.example.com/sso",
+ "urn:mace:example.com:it:tek", # vo
+ nameid_format=NAMEID_FORMAT_PERSISTENT,
+ message_id="666")[1]
+
+ ar = samlp.authn_request_from_string(ar_str)
+ assert ar.id == "666"
+ assert ar.assertion_consumer_service_url == "http://lingon.catalogix" \
+ ".se:8087/"
+ assert ar.destination == "http://www.example.com/sso"
+ assert ar.protocol_binding == BINDING_HTTP_POST
+ assert ar.version == "2.0"
+ assert ar.provider_name == "urn:mace:example.com:saml:roland:sp"
+ assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp"
+ nid_policy = ar.name_id_policy
+ assert nid_policy.allow_create == "false"
+ assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT
+ assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek"
+
+ def test_sign_auth_request_0(self):
+ req_id, areq = self.client.create_authn_request(
+ "http://www.example.com/sso", sign=True, message_id="id1")
+
+ ar_str = "%s" % areq
+ ar = samlp.authn_request_from_string(ar_str)
+
+ assert ar
+ assert ar.signature
+ assert ar.signature.signature_value
+ signed_info = ar.signature.signed_info
+ assert len(signed_info.reference) == 1
+ assert signed_info.reference[0].uri == "#id1"
+ assert signed_info.reference[0].digest_value
+ try:
+ assert self.client.sec.correctly_signed_authn_request(
+ ar_str, self.client.config.xmlsec_binary,
+ self.client.config.metadata)
+ except Exception: # missing certificate
+ self.client.sec.verify_signature(ar_str, node_name=class_name(ar))
+
+ def test_create_logout_request(self):
+ req_id, req = self.client.create_logout_request(
+ "http://localhost:8088/slo", "urn:mace:example.com:saml:roland:idp",
+ name_id=nid, reason="Tired", expire=in_a_while(minutes=15),
+ session_indexes=["_foo"])
+
+ assert req.destination == "http://localhost:8088/slo"
+ assert req.reason == "Tired"
+ assert req.version == "2.0"
+ assert req.name_id == nid
+ assert req.issuer.text == "urn:mace:example.com:saml:roland:sp"
+ assert req.session_index == [SessionIndex("_foo")]
+
+ def test_response_1(self):
+ IDP = "urn:mace:example.com:saml:roland:idp"
+
+ ava = {"givenName": ["Dave"], "sn": ["Concepción"],
+ "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]}
+
+ nameid_policy = samlp.NameIDPolicy(allow_create="false",
+ format=saml.NAMEID_FORMAT_PERSISTENT)
+
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id_policy=nameid_policy,
+ sign_response=True,
+ userid="foba0001@example.com",
+ authn=AUTHN)
+
+ resp_str = "%s" % resp
+
+ resp_str = encode_fn(resp_str.encode('utf-8'))
+
+ authn_response = self.client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"})
+
+ assert authn_response is not None
+ assert authn_response.issuer() == IDP
+ assert authn_response.response.assertion[0].issuer.text == IDP
+ session_info = authn_response.session_info()
+
+ assert session_info["ava"] == {"givenName": ["Dave"], "sn": [u"Concepción"],
+ "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]}
+ assert session_info["issuer"] == IDP
+ assert session_info["came_from"] == "http://foo.example.com/service"
+ response = samlp.response_from_string(authn_response.xmlstr)
+ assert response.destination == "http://lingon.catalogix.se:8087/"
+ assert "session_index" in session_info
+
+ # One person in the cache
+ assert len(self.client.users.subjects()) == 1
+ subject_id = self.client.users.subjects()[0]
+ # The information I have about the subject comes from one source
+ assert self.client.users.issuers_of_info(subject_id) == [IDP]
+
+ # --- authenticate another person
+
+ ava = {"givenName": ["Alfonson"], "sn": ["Soriano"],
+ "mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]}
+
+ resp_str = "%s" % self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id2",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ sign_response=True,
+ name_id_policy=nameid_policy,
+ userid="also0001@example.com",
+ authn=AUTHN)
+
+ resp_str = encode_fn(resp_str.encode())
+
+ self.client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id2": "http://foo.example.com/service"})
+
+ # Two persons in the cache
+ assert len(self.client.users.subjects()) == 2
+ issuers = [self.client.users.issuers_of_info(s) for s in
+ self.client.users.subjects()]
+ # The information I have about the subjects comes from the same source
+ assert issuers == [[IDP], [IDP]]
+
+ def test_response_2(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ _client = Saml2Client(conf)
+
+ idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
+
+ cert_str, cert_key_str = generate_cert()
+
+ cert = \
+ {
+ "cert": cert_str,
+ "key": cert_key_str
+ }
+
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id1")
+
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id=self.name_id,
+ userid="foba0001@example.com",
+ authn=AUTHN,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str
+ )
+
+ resp_str = "%s" % resp
+
+ resp_str = encode_fn(resp_str.encode())
+
+ authn_response = _client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"}, {"id1": cert})
+
+ self.verify_authn_response(idp, authn_response, _client, ava_verify)
+
+ def test_response_3(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ _client = Saml2Client(conf)
+
+ idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
+
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id1")
+
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id=self.name_id,
+ userid="foba0001@example.com",
+ authn=AUTHN,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ )
+
+ resp_str = "%s" % resp
+
+ resp_str = encode_fn(resp_str.encode())
+
+ authn_response = _client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"})
+
+ self.verify_authn_response(idp, authn_response, _client, ava_verify)
+
+ def test_response_4(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ _client = Saml2Client(conf)
+
+ idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
+
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id1")
+
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id=self.name_id,
+ userid="foba0001@example.com",
+ authn=AUTHN,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ )
+
+ resp_str = "%s" % resp
+
+ resp_str = encode_fn(resp_str.encode())
+
+ authn_response = _client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"})
+
+ self.verify_authn_response(idp, authn_response, _client, ava_verify)
+
+ def test_response_5(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ _client = Saml2Client(conf)
+
+ idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
+
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id1")
+
+ cert_str, cert_key_str = generate_cert()
+
+ cert = \
+ {
+ "cert": cert_str,
+ "key": cert_key_str
+ }
+
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id=self.name_id,
+ userid="foba0001@example.com",
+ authn=AUTHN,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_assertion=cert_str
+ )
+
+ resp_str = "%s" % resp
+
+ resp_str = encode_fn(resp_str.encode())
+
+ authn_response = _client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"}, {"id1": cert})
+
+ self.verify_authn_response(idp, authn_response, _client, ava_verify)
+
+ def test_response_6(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ _client = Saml2Client(conf)
+
+ idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
+
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id1")
+
+ cert_assertion_str, cert_key_assertion_str = generate_cert()
+
+ cert_assertion = \
+ {
+ "cert": cert_assertion_str,
+ "key": cert_key_assertion_str
+ }
+
+ cert_advice_str, cert_key_advice_str = generate_cert()
+
+ cert_advice = \
+ {
+ "cert": cert_advice_str,
+ "key": cert_key_advice_str
+ }
+
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id=self.name_id,
+ userid="foba0001@example.com",
+ authn=AUTHN,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_assertion=cert_assertion_str,
+ encrypt_cert_advice=cert_advice_str
+ )
+
+ resp_str = "%s" % resp
+
+ resp_str = encode_fn(resp_str.encode())
+
+ authn_response = _client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"},
+ {"id1": [cert_assertion, cert_advice]})
+
+ self.verify_authn_response(idp, authn_response, _client, ava_verify)
+
+ def test_response_7(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ _client = Saml2Client(conf)
+
+ idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
+
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id1")
+
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id=self.name_id,
+ userid="foba0001@example.com",
+ authn=AUTHN,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=True,
+ )
+
+ resp_str = "%s" % resp
+
+ resp_str = encode_fn(resp_str.encode())
+
+ authn_response = _client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"})
+
+ self.verify_authn_response(idp, authn_response, _client, ava_verify)
+
+ def test_response_8(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ _client = Saml2Client(conf)
+
+ idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response()
+
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id1")
+
+ cert_str, cert_key_str = generate_cert()
+
+ cert = \
+ {
+ "cert": cert_str,
+ "key": cert_key_str
+ }
+
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id=self.name_id,
+ userid="foba0001@example.com",
+ authn=AUTHN,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypt_cert_assertion=cert_str
+ )
+
+ resp_str = "%s" % resp
+
+ resp_str = encode_fn(resp_str.encode())
+
+ authn_response = _client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"}, {"id1": cert})
+
+ self.verify_authn_response(idp, authn_response, _client, ava_verify)
+
+ def test_response_no_name_id(self):
+ """ Test that the SP client can parse an authentication response
+ from an IdP that does not contain a <NameID> element."""
+
+ if six.PY2:
+ _bytes = str
+ else:
+ _bytes = bytes
+
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ client = Saml2Client(conf)
+
+ # Use the same approach as the other tests for mocking up
+ # an authentication response to parse.
+ idp, ava, ava_verify, nameid_policy = (
+ self.setup_verify_authn_response()
+ )
+
+ # Mock up an authentication response but do not encrypt it
+ # nor sign it since below we will modify it directly. Note that
+ # setting name_id to None still results in a response that includes
+ # a <NameID> element.
+ resp = self.server.create_authn_response(
+ identity=ava,
+ in_response_to="id1",
+ destination="http://lingon.catalogix.se:8087/",
+ sp_entity_id="urn:mace:example.com:saml:roland:sp",
+ name_id=None,
+ userid="foba0001@example.com",
+ authn=AUTHN,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=False
+ )
+
+ # The create_authn_response method above will return an instance
+ # of saml2.samlp.Response when neither encrypting nor signing and
+ # so we can remove the <NameID> element directly.
+ resp.assertion.subject.name_id = None
+
+ # Assert that the response does not contain a NameID element so that
+ # the parsing below is a fair test.
+ assert str(resp).find("NameID") == -1
+
+ # Cast the response to a string and encode it to mock up the payload
+ # the SP client is expected to receive via HTTP POST binding.
+ if six.PY2:
+ resp_str = encode_fn(str(resp))
+ else:
+ resp_str = encode_fn(bytes(str(resp), 'utf-8'))
+
+
+ # We do not need the client to verify a signature for this test.
+ client.want_assertions_signed = False
+ client.want_response_signed = False
+
+ # Parse the authentication response that does not include a <NameID>.
+ authn_response = client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"id1": "http://foo.example.com/service"})
+
+ # A successful test is parsing the response.
+ assert authn_response is not None
+
+ def setup_verify_authn_response(self):
+ idp = "urn:mace:example.com:saml:roland:idp"
+ ava = {"givenName": ["Dave"], "sn": ["Concepción"],
+ "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]}
+ ava_verify = {"givenName": ["Dave"], "sn": [u"Concepción"],
+ "mail": ["Dave@cnr.mlb.com"], "title": ["#13"]}
+ nameid_policy = samlp.NameIDPolicy(allow_create="false",
+ format=saml.NAMEID_FORMAT_PERSISTENT)
+ return idp, ava, ava_verify, nameid_policy
+
+ def verify_authn_response(self, idp, authn_response, _client, ava_verify):
+ assert authn_response is not None
+ assert authn_response.issuer() == idp
+ assert authn_response.assertion.issuer.text == idp
+ session_info = authn_response.session_info()
+
+ assert session_info["ava"] == ava_verify
+ assert session_info["issuer"] == idp
+ assert session_info["came_from"] == "http://foo.example.com/service"
+ response = samlp.response_from_string(authn_response.xmlstr)
+ assert response.destination == "http://lingon.catalogix.se:8087/"
+
+ # One person in the cache
+ assert len(_client.users.subjects()) == 1
+ subject_id = _client.users.subjects()[0]
+ # The information I have about the subject comes from one source
+ assert _client.users.issuers_of_info(subject_id) == [idp]
+
+ def test_init_values(self):
+ entityid = self.client.config.entityid
+ assert entityid == "urn:mace:example.com:saml:roland:sp"
+ location = self.client._sso_location()
+ assert location == 'http://localhost:8088/sso'
+ my_name = self.client._my_name()
+ assert my_name == "urn:mace:example.com:saml:roland:sp"
+
+ def test_sign_then_encrypt_assertion(self):
+ # Begin with the IdPs side
+ _sec = self.server.sec
+
+ assertion = s_utils.assertion_factory(
+ subject=factory(saml.Subject, text="_aaa",
+ name_id=factory(
+ saml.NameID,
+ format=saml.NAMEID_FORMAT_TRANSIENT)),
+ attribute_statement=do_attribute_statement(
+ {
+ ("", "", "sn"): ("Jeter", ""),
+ ("", "", "givenName"): ("Derek", ""),
+ }
+ ),
+ issuer=self.server._issuer(),
+ )
+
+ assertion.signature = sigver.pre_signature_part(
+ assertion.id, _sec.my_cert, 1)
+
+ sigass = _sec.sign_statement(assertion, class_name(assertion),
+ key_file=full_path("test.key"),
+ node_id=assertion.id)
+ # Create an Assertion instance from the signed assertion
+ _ass = saml.assertion_from_string(sigass)
+
+ response = sigver.response_factory(
+ in_response_to="_012345",
+ destination="https:#www.example.com",
+ status=s_utils.success_status_factory(),
+ issuer=self.server._issuer(),
+ assertion=_ass
+ )
+
+ enctext = _sec.crypto.encrypt_assertion(response,
+ self.client.sec.encryption_keypairs[
+ 0]["cert_file"],
+ pre_encryption_part())
+
+ seresp = samlp.response_from_string(enctext)
+
+ # Now over to the client side
+ _csec = self.client.sec
+ if seresp.encrypted_assertion:
+ decr_text = _csec.decrypt(enctext)
+ seresp = samlp.response_from_string(decr_text)
+ resp_ass = []
+
+ sign_cert_file = full_path("test.pem")
+ for enc_ass in seresp.encrypted_assertion:
+ assers = extension_elements_to_elements(
+ enc_ass.extension_elements, [saml, samlp])
+ for ass in assers:
+ if ass.signature:
+ if not _csec.verify_signature("%s" % ass,
+ sign_cert_file,
+ node_name=class_name(
+ ass)):
+ continue
+ resp_ass.append(ass)
+
+ seresp.assertion = resp_ass
+ seresp.encrypted_assertion = None
+
+ assert seresp.assertion
+
+ def test_sign_then_encrypt_assertion2(self):
+ # Begin with the IdPs side
+ _sec = self.server.sec
+
+ nameid_policy = samlp.NameIDPolicy(allow_create="false",
+ format=saml.NAMEID_FORMAT_PERSISTENT)
+
+ asser = Assertion({"givenName": "Dave", "sn": "Concepción"})
+ farg = add_path(
+ {},
+ ['assertion', 'subject', 'subject_confirmation', 'method',
+ saml.SCM_BEARER])
+ add_path(
+ farg['assertion']['subject']['subject_confirmation'],
+ ['subject_confirmation_data', 'in_response_to',
+ '_012345'])
+ add_path(
+ farg['assertion']['subject']['subject_confirmation'],
+ ['subject_confirmation_data', 'recipient',
+ "http://lingon.catalogix.se:8087/"])
+
+ assertion = asser.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ name_id=factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
+ issuer=self.server._issuer(),
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ farg=farg['assertion']
+ )
+
+ assertion.signature = sigver.pre_signature_part(
+ assertion.id, _sec.my_cert, 1)
+
+ sigass = _sec.sign_statement(assertion, class_name(assertion),
+ key_file=self.client.sec.key_file,
+ node_id=assertion.id)
+
+ sigass = rm_xmltag(sigass)
+ response = sigver.response_factory(
+ in_response_to="_012345",
+ destination="http://lingon.catalogix.se:8087/",
+ status=s_utils.success_status_factory(),
+ issuer=self.server._issuer(),
+ encrypted_assertion=EncryptedAssertion()
+ )
+
+ xmldoc = "%s" % response
+ # strangely enough I get different tags if I run this test separately
+ # or as part of a bunch of tests.
+ xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
+
+ enctext = _sec.crypto.encrypt_assertion(xmldoc,
+ self.client.sec.encryption_keypairs[
+ 1]["cert_file"],
+ pre_encryption_part())
+
+ # seresp = samlp.response_from_string(enctext)
+
+ resp_str = encode_fn(enctext.encode())
+ # Now over to the client side
+ # Explicitely allow unsigned responses for this and the following 2 tests
+ self.client.want_response_signed = False
+ resp = self.client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"_012345": "http://foo.example.com/service"})
+
+ # assert resp.encrypted_assertion == []
+ assert resp.assertion
+ assert resp.ava == {"sn": [u"Concepción"], "givenName": ["Dave"]}
+
+ def test_sign_then_encrypt_assertion_advice_1(self):
+ # Begin with the IdPs side
+ _sec = self.server.sec
+
+ nameid_policy = samlp.NameIDPolicy(allow_create="false",
+ format=saml.NAMEID_FORMAT_PERSISTENT)
+
+ asser = Assertion({"givenName": "Dave", "sn": "Concepción"})
+
+ subject_confirmation_specs = {
+ 'recipient': "http://lingon.catalogix.se:8087/",
+ 'in_response_to': "_012345",
+ 'subject_confirmation_method': saml.SCM_BEARER
+ }
+ name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT)
+
+ farg = add_path(
+ {},
+ ['assertion', 'subject', 'subject_confirmation', 'method',
+ saml.SCM_BEARER])
+ add_path(
+ farg['assertion']['subject']['subject_confirmation'],
+ ['subject_confirmation_data', 'in_response_to',
+ '_012345'])
+ add_path(
+ farg['assertion']['subject']['subject_confirmation'],
+ ['subject_confirmation_data', 'recipient',
+ "http://lingon.catalogix.se:8087/"])
+
+ assertion = asser.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ name_id=name_id,
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ farg=farg['assertion'])
+
+ a_asser = Assertion({"uid": "test01", "email": "test.testsson@test.se"})
+ a_assertion = a_asser.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ farg=farg['assertion'])
+
+ a_assertion.signature = sigver.pre_signature_part(
+ a_assertion.id, _sec.my_cert, 1)
+
+ assertion.advice = Advice()
+
+ assertion.advice.encrypted_assertion = []
+ assertion.advice.encrypted_assertion.append(EncryptedAssertion())
+
+ assertion.advice.encrypted_assertion[0].add_extension_element(
+ a_assertion)
+
+ response = sigver.response_factory(
+ in_response_to="_012345",
+ destination="http://lingon.catalogix.se:8087/",
+ status=s_utils.success_status_factory(),
+ issuer=self.server._issuer()
+ )
+
+ response.assertion.append(assertion)
+
+ response = _sec.sign_statement("%s" % response, class_name(a_assertion),
+ key_file=self.client.sec.key_file,
+ node_id=a_assertion.id)
+
+ # xmldoc = "%s" % response
+ # strangely enough I get different tags if I run this test separately
+ # or as part of a bunch of tests.
+ # xmldoc = add_subelement(xmldoc, "EncryptedAssertion", sigass)
+
+ node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
+ ["Response", "Assertion", "Advice",
+ "EncryptedAssertion", "Assertion"]])
+
+ enctext = _sec.crypto.encrypt_assertion(response,
+ self.client.sec.encryption_keypairs[
+ 0]["cert_file"],
+ pre_encryption_part(),
+ node_xpath=node_xpath)
+
+ # seresp = samlp.response_from_string(enctext)
+
+ if six.PY2:
+ resp_str = encode_fn(enctext.encode('utf-8'))
+ else:
+ resp_str = encode_fn(bytes(enctext, 'utf-8'))
+
+ # Now over to the client side
+ resp = self.client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"_012345": "http://foo.example.com/service"})
+
+ # assert resp.encrypted_assertion == []
+ assert resp.assertion
+ assert resp.assertion.advice
+ assert resp.assertion.advice.assertion
+ assert resp.ava == \
+ {'givenName': ['Dave'], 'sn': [u'Concepción'], 'uid': ['test01'],
+ 'email': ['test.testsson@test.se']}
+
+ def test_sign_then_encrypt_assertion_advice_2(self):
+ # Begin with the IdPs side
+ _sec = self.server.sec
+
+ nameid_policy = samlp.NameIDPolicy(allow_create="false",
+ format=saml.NAMEID_FORMAT_PERSISTENT)
+
+ asser_1 = Assertion({"givenName": "Dave"})
+
+ farg = add_path(
+ {},
+ ['assertion', 'subject', 'subject_confirmation', 'method',
+ saml.SCM_BEARER])
+ add_path(
+ farg['assertion']['subject']['subject_confirmation'],
+ ['subject_confirmation_data', 'in_response_to',
+ '_012345'])
+ add_path(
+ farg['assertion']['subject']['subject_confirmation'],
+ ['subject_confirmation_data', 'recipient',
+ "http://lingon.catalogix.se:8087/"])
+ name_id = factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT)
+
+ assertion_1 = asser_1.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ farg=farg['assertion'])
+
+ asser_2 = Assertion({"sn": "Concepción"})
+
+ assertion_2 = asser_2.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ farg=farg['assertion'])
+
+ a_asser_1 = Assertion({"uid": "test01"})
+ a_assertion_1 = a_asser_1.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ farg=farg['assertion'])
+
+ a_asser_2 = Assertion({"email": "test.testsson@test.se"})
+ a_assertion_2 = a_asser_2.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ farg=farg['assertion'])
+
+ a_asser_3 = Assertion({"street": "street"})
+ a_assertion_3 = a_asser_3.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ farg=farg['assertion'])
+
+ a_asser_4 = Assertion({"title": "title"})
+ a_assertion_4 = a_asser_4.construct(
+ self.client.config.entityid,
+ self.server.config.attribute_converters,
+ self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login",
+ name_id=name_id,
+ farg=farg['assertion'])
+
+ a_assertion_1.signature = sigver.pre_signature_part(
+ a_assertion_1.id, _sec.my_cert, 1)
+
+ a_assertion_2.signature = sigver.pre_signature_part(
+ a_assertion_2.id, _sec.my_cert, 1)
+
+ a_assertion_3.signature = sigver.pre_signature_part(
+ a_assertion_3.id, _sec.my_cert, 1)
+
+ a_assertion_4.signature = sigver.pre_signature_part(
+ a_assertion_4.id, _sec.my_cert, 1)
+
+ assertion_1.signature = sigver.pre_signature_part(assertion_1.id,
+ _sec.my_cert, 1)
+
+ assertion_2.signature = sigver.pre_signature_part(assertion_2.id,
+ _sec.my_cert, 1)
+
+ response = sigver.response_factory(
+ in_response_to="_012345",
+ destination="http://lingon.catalogix.se:8087/",
+ status=s_utils.success_status_factory(),
+ issuer=self.server._issuer()
+ )
+
+ response.assertion = assertion_1
+
+ response.assertion.advice = Advice()
+
+ response.assertion.advice.encrypted_assertion = []
+ response.assertion.advice.encrypted_assertion.append(
+ EncryptedAssertion())
+
+ response.assertion.advice.encrypted_assertion[0].add_extension_element(
+ a_assertion_1)
+
+ advice_tag = response.assertion.advice._to_element_tree().tag
+ assertion_tag = a_assertion_1._to_element_tree().tag
+ response = \
+ response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
+ assertion_tag, advice_tag)
+
+ response = _sec.sign_statement("%s" % response,
+ class_name(a_assertion_1),
+ key_file=self.server.sec.key_file,
+ node_id=a_assertion_1.id)
+
+ node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
+ ["Response", "Assertion", "Advice",
+ "EncryptedAssertion", "Assertion"]])
+
+ enctext = _sec.crypto.encrypt_assertion(response,
+ self.client.sec.encryption_keypairs[
+ 1]["cert_file"],
+ pre_encryption_part(),
+ node_xpath=node_xpath)
+
+ response = samlp.response_from_string(enctext)
+
+ response.assertion = response.assertion[0]
+
+ response.assertion.advice.encrypted_assertion.append(
+ EncryptedAssertion())
+ response.assertion.advice.encrypted_assertion[1].add_extension_element(
+ a_assertion_2)
+
+ advice_tag = response.assertion.advice._to_element_tree().tag
+ assertion_tag = a_assertion_2._to_element_tree().tag
+ response = \
+ response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
+ assertion_tag, advice_tag)
+
+ response = _sec.sign_statement("%s" % response,
+ class_name(a_assertion_2),
+ key_file=self.server.sec.key_file,
+ node_id=a_assertion_2.id)
+
+ node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
+ ["Response", "Assertion", "Advice",
+ "EncryptedAssertion", "Assertion"]])
+
+ enctext = _sec.crypto.encrypt_assertion(response,
+ self.client.sec.encryption_keypairs[
+ 0]["cert_file"],
+ pre_encryption_part(),
+ node_xpath=node_xpath)
+
+ response = samlp.response_from_string(enctext)
+
+ response.assertion = response.assertion[0]
+
+ assertion_tag = response.assertion._to_element_tree().tag
+ response = pre_encrypt_assertion(response)
+ response = \
+ response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
+ assertion_tag)
+
+ response = _sec.sign_statement("%s" % response, class_name(assertion_1),
+ key_file=self.server.sec.key_file,
+ node_id=assertion_1.id)
+
+ enctext = _sec.crypto.encrypt_assertion(response,
+ self.client.sec.encryption_keypairs[
+ 1]["cert_file"],
+ pre_encryption_part())
+
+ response = samlp.response_from_string(enctext)
+
+ response.assertion = assertion_2
+
+ response.assertion.advice = Advice()
+
+ response.assertion.advice.encrypted_assertion = []
+ response.assertion.advice.encrypted_assertion.append(
+ EncryptedAssertion())
+
+ response.assertion.advice.encrypted_assertion[0].add_extension_element(
+ a_assertion_3)
+
+ advice_tag = response.assertion.advice._to_element_tree().tag
+ assertion_tag = a_assertion_3._to_element_tree().tag
+ response = \
+ response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
+ assertion_tag, advice_tag)
+
+ response = _sec.sign_statement("%s" % response,
+ class_name(a_assertion_3),
+ key_file=self.server.sec.key_file,
+ node_id=a_assertion_3.id)
+
+ node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
+ ["Response", "Assertion", "Advice",
+ "EncryptedAssertion", "Assertion"]])
+
+ enctext = _sec.crypto.encrypt_assertion(response,
+ self.client.sec.encryption_keypairs[
+ 0]["cert_file"],
+ pre_encryption_part(),
+ node_xpath=node_xpath)
+
+ response = samlp.response_from_string(enctext)
+
+ response.assertion = response.assertion[0]
+
+ response.assertion.advice.encrypted_assertion.append(
+ EncryptedAssertion())
+
+ response.assertion.advice.encrypted_assertion[1].add_extension_element(
+ a_assertion_4)
+
+ advice_tag = response.assertion.advice._to_element_tree().tag
+ assertion_tag = a_assertion_4._to_element_tree().tag
+ response = \
+ response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
+ assertion_tag, advice_tag)
+
+ response = _sec.sign_statement("%s" % response,
+ class_name(a_assertion_4),
+ key_file=self.server.sec.key_file,
+ node_id=a_assertion_4.id)
+
+ node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
+ ["Response", "Assertion", "Advice",
+ "EncryptedAssertion", "Assertion"]])
+
+ enctext = _sec.crypto.encrypt_assertion(response,
+ self.client.sec.encryption_keypairs[
+ 1]["cert_file"],
+ pre_encryption_part(),
+ node_xpath=node_xpath)
+
+ response = samlp.response_from_string(enctext)
+
+ response = _sec.sign_statement("%s" % response,
+ class_name(response.assertion[0]),
+ key_file=self.server.sec.key_file,
+ node_id=response.assertion[0].id)
+
+ response = samlp.response_from_string(response)
+
+ # seresp = samlp.response_from_string(enctext)
+
+ resp_str = encode_fn(response.to_string())
+
+ # Now over to the client side
+ resp = self.client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"_012345": "http://foo.example.com/service"})
+
+ # assert resp.encrypted_assertion == []
+ assert resp.assertion
+ assert resp.assertion.advice
+ assert resp.assertion.advice.assertion
+ assert resp.ava == \
+ {'street': ['street'], 'uid': ['test01'], 'title': ['title'],
+ 'givenName': ['Dave'], 'email':
+ ['test.testsson@test.se'], 'sn': [u'Concepción']}
+
+ def test_signed_redirect(self):
+
+ # Revert configuration change to disallow unsinged responses
+ self.client.want_response_signed = True
+
+ msg_str = "%s" % self.client.create_authn_request(
+ "http://localhost:8088/sso", message_id="id1")[1]
+
+ info = self.client.apply_binding(
+ BINDING_HTTP_REDIRECT, msg_str, destination="",
+ relay_state="relay2", sigalg=SIG_RSA_SHA256)
+
+ loc = info["headers"][0][1]
+ qs = parse_qs(loc[1:])
+ assert _leq(qs.keys(),
+ ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
+
+ assert verify_redirect_signature(list_values2simpletons(qs),
+ self.client.sec.sec_backend)
+
+ res = self.server.parse_authn_request(qs["SAMLRequest"][0],
+ BINDING_HTTP_REDIRECT)
+
+ def test_do_logout_signed_redirect(self):
+ conf = config.SPConfig()
+ conf.load_file("sp_slo_redirect_conf")
+ client = Saml2Client(conf)
+
+ # information about the user from an IdP
+ session_info = {
+ "name_id": nid,
+ "issuer": "urn:mace:example.com:saml:roland:idp",
+ "not_on_or_after": in_a_while(minutes=15),
+ "ava": {
+ "givenName": "Anders",
+ "sn": "Österberg",
+ "mail": "anders.osterberg@example.com"
+ }
+ }
+ client.users.add_information_about_person(session_info)
+ entity_ids = client.users.issuers_of_info(nid)
+ assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
+
+ resp = client.do_logout(nid, entity_ids, "Tired", in_a_while(minutes=5),
+ sign=True,
+ expected_binding=BINDING_HTTP_REDIRECT)
+
+ assert list(resp.keys()) == entity_ids
+ binding, info = resp[entity_ids[0]]
+ assert binding == BINDING_HTTP_REDIRECT
+
+ loc = info["headers"][0][1]
+ _, _, _, _, qs, _ = urlparse(loc)
+ qs = parse_qs(qs)
+ assert _leq(qs.keys(),
+ ['SigAlg', 'SAMLRequest', 'RelayState', 'Signature'])
+
+ assert verify_redirect_signature(list_values2simpletons(qs),
+ client.sec.sec_backend)
+
+ res = self.server.parse_logout_request(qs["SAMLRequest"][0],
+ BINDING_HTTP_REDIRECT)
+
+ def test_do_logout_post(self):
+ # information about the user from an IdP
+ session_info = {
+ "name_id": nid,
+ "issuer": "urn:mace:example.com:saml:roland:idp",
+ "not_on_or_after": in_a_while(minutes=15),
+ "ava": {
+ "givenName": "Anders",
+ "sn": "Österberg",
+ "mail": "anders.osterberg@example.com"
+ },
+ "session_index": SessionIndex("_foo")
+ }
+ self.client.users.add_information_about_person(session_info)
+ entity_ids = self.client.users.issuers_of_info(nid)
+ assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
+ resp = self.client.do_logout(nid, entity_ids, "Tired",
+ in_a_while(minutes=5), sign=True,
+ expected_binding=BINDING_HTTP_POST)
+ assert resp
+ assert len(resp) == 1
+ assert list(resp.keys()) == entity_ids
+ binding, info = resp[entity_ids[0]]
+ assert binding == BINDING_HTTP_POST
+
+ _dic = unpack_form(info["data"])
+ res = self.server.parse_logout_request(_dic["SAMLRequest"],
+ BINDING_HTTP_POST)
+ assert b'<ns0:SessionIndex>_foo</ns0:SessionIndex>' in res.xmlstr
+
+ def test_do_logout_session_expired(self):
+ # information about the user from an IdP
+ session_info = {
+ "name_id": nid,
+ "issuer": "urn:mace:example.com:saml:roland:idp",
+ "not_on_or_after": a_while_ago(minutes=15),
+ "ava": {
+ "givenName": "Anders",
+ "sn": "Österberg",
+ "mail": "anders.osterberg@example.com"
+ },
+ "session_index": SessionIndex("_foo")
+ }
+ self.client.users.add_information_about_person(session_info)
+ entity_ids = self.client.users.issuers_of_info(nid)
+ assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
+ resp = self.client.do_logout(nid, entity_ids, "Tired",
+ in_a_while(minutes=5), sign=True,
+ expected_binding=BINDING_HTTP_POST)
+ assert resp
+ assert len(resp) == 1
+ assert list(resp.keys()) == entity_ids
+ binding, info = resp[entity_ids[0]]
+ assert binding == BINDING_HTTP_POST
+
+ _dic = unpack_form(info["data"])
+ res = self.server.parse_logout_request(_dic["SAMLRequest"],
+ BINDING_HTTP_POST)
+ assert b'<ns0:SessionIndex>_foo</ns0:SessionIndex>' in res.xmlstr
+
# Below can only be done with dummy Server
IDP = "urn:mace:example.com:saml:roland:idp"
@@ -1553,8 +2903,8 @@ class TestClientWithDummy():
"not_on_or_after": in_a_while(minutes=15),
"ava": {
"givenName": "Anders",
- "sn": "Andersson",
- "mail": "anders.andersson@example.com"
+ "sn": "Österberg",
+ "mail": "anders.osterberg@example.com"
}
}
self.client.users.add_information_about_person(session_info)
@@ -1658,8 +3008,8 @@ class TestClientNoConfigContext():
"not_on_or_after": in_a_while(minutes=15),
"ava": {
"givenName": "Anders",
- "sn": "Andersson",
- "mail": "anders.andersson@example.com"
+ "sn": "Österberg",
+ "mail": "anders.osterberg@example.com"
}
}
self.client.users.add_information_about_person(session_info)