summaryrefslogtreecommitdiff
path: root/tests/test_50_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_50_server.py')
-rw-r--r--tests/test_50_server.py1105
1 files changed, 1104 insertions, 1 deletions
diff --git a/tests/test_50_server.py b/tests/test_50_server.py
index f0dcae3c..11ace47b 100644
--- a/tests/test_50_server.py
+++ b/tests/test_50_server.py
@@ -1185,7 +1185,1110 @@ class TestServer1():
idp.ident.close()
assert request
-#------------------------------------------------------------------------
+# ------------------------------------------------------------------------
+
+
+class TestServer1NonAsciiAva():
+
+ def setup_class(self):
+ self.server = Server("idp_conf")
+
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ self.client = client.Saml2Client(conf)
+ self.name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id12")
+ self.ava = {"givenName": ["Dave"], "sn": ["ConcepciĆ³n"],
+ "mail": ["dave@cnr.mlb.com"], "title": "#13"}
+
+ def teardown_class(self):
+ self.server.close()
+
+ def verify_assertion(self, assertion):
+ assert assertion
+ assert assertion[0].attribute_statement
+
+ ava = get_ava(assertion[0])
+
+ assert ava == \
+ {"givenName": ["Dave"], "sn": [u"ConcepciĆ³n"],
+ "mail": ["dave@cnr.mlb.com"], "title": ["#13"]}
+
+
+ def verify_encrypted_assertion(self, assertion, decr_text):
+ self.verify_assertion(assertion)
+ assert assertion[0].signature is None
+
+ assert 'EncryptedAssertion><encas1:Assertion xmlns:encas0="http://www.w3.org/2001/XMLSchema-instance" ' \
+ 'xmlns:encas1="urn:oasis:names:tc:SAML:2.0:assertion"' in decr_text
+
+ def verify_advice_assertion(self, resp, decr_text):
+ assert resp.assertion[0].signature is None
+
+ assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
+
+ assertion = extension_elements_to_elements(resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
+ [saml, samlp])
+ self.verify_encrypted_assertion(assertion, decr_text)
+
+
+ def test_issuer(self):
+ issuer = self.server._issuer()
+ assert isinstance(issuer, saml.Issuer)
+ assert _eq(issuer.keyswv(), ["text", "format"])
+ assert issuer.format == saml.NAMEID_FORMAT_ENTITY
+ assert issuer.text == self.server.config.entityid
+
+ def test_assertion(self):
+ assertion = s_utils.assertion_factory(
+ subject=factory(
+ saml.Subject, text="_aaa",
+ name_id=factory(saml.NameID,
+ format=saml.NAMEID_FORMAT_TRANSIENT)),
+ attribute_statement=do_attribute_statement(
+ {
+ ("", "", "sn"): ("Jeter", ""),
+ ("", "", "givenName"): ("Derek", ""),
+ }
+ ),
+ issuer=self.server._issuer(),
+ )
+
+ assert _eq(assertion.keyswv(), ['attribute_statement', 'issuer', 'id',
+ 'subject', 'issue_instant', 'version'])
+ assert assertion.version == "2.0"
+ assert assertion.issuer.text == "urn:mace:example.com:saml:roland:idp"
+ #
+ assert assertion.attribute_statement
+ attribute_statement = assertion.attribute_statement
+ assert len(attribute_statement.attribute) == 2
+ attr0 = attribute_statement.attribute[0]
+ attr1 = attribute_statement.attribute[1]
+ if attr0.attribute_value[0].text == "Derek":
+ assert attr0.friendly_name == "givenName"
+ assert attr1.friendly_name == "sn"
+ assert attr1.attribute_value[0].text == "Jeter"
+ else:
+ assert attr1.friendly_name == "givenName"
+ assert attr1.attribute_value[0].text == "Derek"
+ assert attr0.friendly_name == "sn"
+ assert attr0.attribute_value[0].text == "Jeter"
+ #
+ subject = assertion.subject
+ assert _eq(subject.keyswv(), ["text", "name_id"])
+ assert subject.text == "_aaa"
+ assert subject.name_id.format == saml.NAMEID_FORMAT_TRANSIENT
+
+ def test_response(self):
+ response = sigver.response_factory(
+ in_response_to="_012345",
+ destination="https:#www.example.com",
+ status=s_utils.success_status_factory(),
+ assertion=s_utils.assertion_factory(
+ subject=factory(saml.Subject, text="_aaa",
+ name_id=saml.NAMEID_FORMAT_TRANSIENT),
+ attribute_statement=do_attribute_statement(
+ {
+ ("", "", "sn"): ("Jeter", ""),
+ ("", "", "givenName"): ("Derek", ""),
+ }
+ ),
+ issuer=self.server._issuer(),
+ ),
+ issuer=self.server._issuer(),
+ )
+
+ print(response.keyswv())
+ assert _eq(response.keyswv(), ['destination', 'assertion', 'status',
+ 'in_response_to', 'issue_instant',
+ 'version', 'issuer', 'id'])
+ assert response.version == "2.0"
+ assert response.issuer.text == "urn:mace:example.com:saml:roland:idp"
+ assert response.destination == "https:#www.example.com"
+ assert response.in_response_to == "_012345"
+ #
+ status = response.status
+ print(status)
+ assert status.status_code.value == samlp.STATUS_SUCCESS
+
+ def test_parse_faulty_request(self):
+ req_id, authn_request = self.client.create_authn_request(
+ destination="http://www.example.com", id="id1")
+
+ # should raise an error because faulty spentityid
+ binding = BINDING_HTTP_REDIRECT
+ htargs = self.client.apply_binding(
+ binding, "%s" % authn_request, "http://www.example.com", "abcd")
+ _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
+ print(_dict)
+ raises(OtherError, self.server.parse_authn_request,
+ _dict["SAMLRequest"][0], binding)
+
+ def test_parse_faulty_request_to_err_status(self):
+ req_id, authn_request = self.client.create_authn_request(
+ destination="http://www.example.com")
+
+ binding = BINDING_HTTP_REDIRECT
+ htargs = self.client.apply_binding(binding, "%s" % authn_request,
+ "http://www.example.com", "abcd")
+ _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
+ print(_dict)
+
+ try:
+ self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
+ status = None
+ except OtherError as oe:
+ print(oe.args)
+ status = s_utils.error_status_factory(oe)
+
+ assert status
+ print(status)
+ assert _eq(status.keyswv(), ["status_code", "status_message"])
+ assert status.status_message.text == 'Not destined for me!'
+ status_code = status.status_code
+ assert _eq(status_code.keyswv(), ["status_code", "value"])
+ assert status_code.value == samlp.STATUS_RESPONDER
+ assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
+
+ def test_parse_ok_request(self):
+ req_id, authn_request = self.client.create_authn_request(
+ message_id="id1", destination="http://localhost:8088/sso")
+
+ print(authn_request)
+ binding = BINDING_HTTP_REDIRECT
+ htargs = self.client.apply_binding(binding, "%s" % authn_request,
+ "http://www.example.com", "abcd")
+ _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
+ print(_dict)
+
+ req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
+ # returns a dictionary
+ print(req)
+ resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST])
+ assert resp_args["destination"] == "http://lingon.catalogix.se:8087/"
+ assert resp_args["in_response_to"] == "id1"
+ name_id_policy = resp_args["name_id_policy"]
+ assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
+ assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
+ assert resp_args[
+ "sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
+
+ def test_sso_response_with_identity(self):
+ name_id = self.server.ident.transient_nameid(
+ "https://example.com/sp", "id12")
+ resp = self.server.create_authn_response(
+ {
+ "eduPersonEntitlement": "Short stop",
+ "sn": "Jeter",
+ "givenName": "Derek",
+ "mail": "derek.jeter@nyy.mlb.com",
+ "title": "The man"
+ },
+ "id12", # in_response_to
+ "http://localhost:8087/", # destination
+ "https://example.com/sp", # sp_entity_id
+ name_id=name_id,
+ authn=AUTHN
+ )
+
+ print(resp.keyswv())
+ assert _eq(resp.keyswv(), ['status', 'destination', 'assertion',
+ 'in_response_to', 'issue_instant',
+ 'version', 'id', 'issuer'])
+ assert resp.destination == "http://localhost:8087/"
+ assert resp.in_response_to == "id12"
+ assert resp.status
+ assert resp.status.status_code.value == samlp.STATUS_SUCCESS
+ assert resp.assertion
+ assertion = resp.assertion
+ print(assertion)
+ assert assertion.authn_statement
+ assert assertion.conditions
+ assert assertion.attribute_statement
+ attribute_statement = assertion.attribute_statement
+ print(attribute_statement)
+ assert len(attribute_statement[0].attribute) == 4
+ # Pick out one attribute
+ attr = None
+ for attr in attribute_statement[0].attribute:
+ if attr.friendly_name == "givenName":
+ break
+ assert len(attr.attribute_value) == 1
+ assert attr.name == "urn:mace:dir:attribute-def:givenName"
+ assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:basic"
+ value = attr.attribute_value[0]
+ assert value.text.strip() == "Derek"
+ assert value.get_type() == "xs:string"
+ assert assertion.subject
+ assert assertion.subject.name_id
+ assert assertion.subject.subject_confirmation
+ confirmation = assertion.subject.subject_confirmation[0]
+ print(confirmation.keyswv())
+ print(confirmation.subject_confirmation_data)
+ assert confirmation.subject_confirmation_data.in_response_to == "id12"
+
+ def test_sso_response_without_identity(self):
+ resp = self.server.create_authn_response(
+ {},
+ "id12", # in_response_to
+ "http://localhost:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ userid="USER1",
+ authn=AUTHN,
+ release_policy=Policy(),
+ best_effort=True
+ )
+
+ print(resp.keyswv())
+ assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
+ 'issue_instant', 'version', 'id', 'issuer',
+ 'assertion'])
+ assert resp.destination == "http://localhost:8087/"
+ assert resp.in_response_to == "id12"
+ assert resp.status
+ assert resp.status.status_code.value == samlp.STATUS_SUCCESS
+ assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
+ assert not resp.assertion.attribute_statement
+
+ def test_sso_response_specific_instant(self):
+ _authn = AUTHN.copy()
+ _authn["authn_instant"] = 1234567890
+
+ resp = self.server.create_authn_response(
+ {},
+ "id12", # in_response_to
+ "http://localhost:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ userid="USER1",
+ authn=_authn,
+ best_effort=True
+ )
+
+ print(resp.keyswv())
+ assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
+ 'issue_instant', 'version', 'id', 'issuer',
+ 'assertion'])
+ authn_statement = resp.assertion.authn_statement[0]
+ assert authn_statement.authn_instant == '2009-02-13T23:31:30Z'
+
+ def test_sso_failure_response(self):
+ exc = s_utils.MissingValue("eduPersonAffiliation missing")
+ resp = self.server.create_error_response(
+ "id12", "http://localhost:8087/", exc)
+
+ print(resp.keyswv())
+ assert _eq(resp.keyswv(), ['status', 'destination', 'in_response_to',
+ 'issue_instant', 'version', 'id', 'issuer'])
+ assert resp.destination == "http://localhost:8087/"
+ assert resp.in_response_to == "id12"
+ assert resp.status
+ print(resp.status)
+ assert resp.status.status_code.value == samlp.STATUS_RESPONDER
+ assert resp.status.status_code.status_code.value == \
+ samlp.STATUS_REQUEST_UNSUPPORTED
+ assert resp.status.status_message.text == \
+ "eduPersonAffiliation missing"
+ assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
+ assert not resp.assertion
+
+ def test_authn_response_0(self):
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ self.client = client.Saml2Client(conf)
+
+ ava = {"givenName": ["Derek"], "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"], "title": "The man"}
+
+ npolicy = samlp.NameIDPolicy(format=saml.NAMEID_FORMAT_TRANSIENT,
+ allow_create="true")
+ resp_str = "%s" % self.server.create_authn_response(
+ ava, "id1", "http://local:8087/",
+ "urn:mace:example.com:saml:roland:sp", npolicy,
+ "foba0001@example.com", authn=AUTHN)
+
+ response = samlp.response_from_string(resp_str)
+ print(response.keyswv())
+ assert _eq(response.keyswv(), ['status', 'destination', 'assertion',
+ 'in_response_to', 'issue_instant',
+ 'version', 'issuer', 'id'])
+ print(response.assertion[0].keyswv())
+ assert len(response.assertion) == 1
+ assert _eq(response.assertion[0].keyswv(), ['attribute_statement',
+ 'issue_instant', 'version',
+ 'subject', 'conditions',
+ 'id', 'issuer',
+ 'authn_statement'])
+ assertion = response.assertion[0]
+ assert len(assertion.attribute_statement) == 1
+ astate = assertion.attribute_statement[0]
+ print(astate)
+ assert len(astate.attribute) == 4
+
+ def test_signed_response(self):
+ name_id = self.server.ident.transient_nameid(
+ "urn:mace:example.com:saml:roland:sp", "id12")
+ ava = {"givenName": ["Derek"], "sn": ["Jeter"],
+ "mail": ["derek@nyy.mlb.com"], "title": "The man"}
+
+ signed_resp = self.server.create_authn_response(
+ ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=name_id,
+ sign_assertion=True
+ )
+
+ print(signed_resp)
+ assert signed_resp
+
+ sresponse = response_from_string(signed_resp)
+ # It's the assertions that are signed not the response per se
+ assert len(sresponse.assertion) == 1
+ assertion = sresponse.assertion[0]
+
+ # Since the reponse is created dynamically I don't know the signature
+ # value. Just that there should be one
+ assert assertion.signature.signature_value.text != ""
+
+ def test_signed_response_1(self):
+
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=True,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=sresponse.assertion[0].id,
+ id_attr="")
+ assert valid
+
+ self.verify_assertion(sresponse.assertion)
+
+ def test_signed_response_2(self):
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=False,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ assert sresponse.assertion[0].signature == None
+
+ def test_signed_response_3(self):
+
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=True,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ assert sresponse.signature == None
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=sresponse.assertion[0].id,
+ id_attr="")
+ assert valid
+
+ self.verify_assertion(sresponse.assertion)
+
+ def test_encrypted_signed_response_1(self):
+
+ cert_str, cert_key_str = generate_cert()
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(
+ signed_resp, self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id, id_attr="")
+
+ assert valid
+
+ valid = self.server.sec.verify_signature(
+ signed_resp, self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=sresponse.assertion[0].id, id_attr="")
+
+ assert valid
+
+ _, key_file = make_temp(cert_key_str, decode=False)
+
+ decr_text = self.server.sec.decrypt(signed_resp, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ assert resp.assertion[0].advice.encrypted_assertion[0].extension_elements
+
+ assertion = extension_elements_to_elements(
+ resp.assertion[0].advice.encrypted_assertion[0].extension_elements,
+ [saml, samlp])
+
+ self.verify_assertion(assertion)
+
+
+
+ #PEFIM never signs assertions.
+ assert assertion[0].signature is None
+ #valid = self.server.sec.verify_signature(decr_text,
+ # self.server.config.cert_file,
+ # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ # node_id=assertion[0].id,
+ # id_attr="")
+ assert valid
+
+ def test_encrypted_signed_response_2(self):
+ cert_str, cert_key_str = generate_cert()
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ decr_text_old = copy.deepcopy("%s" % signed_resp)
+
+ decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[0]["key_file"])
+
+ assert decr_text == decr_text_old
+
+ decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ assert decr_text != decr_text_old
+
+ resp = samlp.response_from_string(decr_text)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ assert resp.assertion[0].signature == None
+
+ self.verify_assertion(resp.assertion)
+
+
+ def test_encrypted_signed_response_3(self):
+ cert_str, cert_key_str = generate_cert()
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=False,
+ encrypt_cert_assertion=cert_str,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ _, key_file = make_temp(cert_key_str, decode=False)
+
+ decr_text = self.server.sec.decrypt(signed_resp, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ valid = self.server.sec.verify_signature(decr_text,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=resp.assertion[0].id,
+ id_attr="")
+
+ assert valid
+
+ self.verify_assertion(resp.assertion)
+
+ assert 'xmlns:encas' not in decr_text
+
+
+ def test_encrypted_signed_response_4(self):
+
+ cert_str, cert_key_str = generate_cert()
+
+ signed_resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=True,
+ sign_assertion=True,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str,
+ )
+
+ sresponse = response_from_string(signed_resp)
+
+ valid = self.server.sec.verify_signature(signed_resp,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:protocol:Response',
+ node_id=sresponse.id,
+ id_attr="")
+ assert valid
+
+ decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ resp = samlp.response_from_string(decr_text)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ valid = self.server.sec.verify_signature(decr_text,
+ self.server.config.cert_file,
+ node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ node_id=resp.assertion[0].id,
+ id_attr="")
+
+ assert valid
+
+ _, key_file = make_temp(cert_key_str, decode=False)
+
+ decr_text = self.server.sec.decrypt(decr_text, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+ assertion = \
+ extension_elements_to_elements(assertion[0].advice.encrypted_assertion[0].extension_elements,[saml, samlp])
+ self.verify_assertion(assertion)
+
+ #PEFIM never signs assertion in advice
+ assert assertion[0].signature is None
+ #valid = self.server.sec.verify_signature(decr_text,
+ # self.server.config.cert_file,
+ # node_name='urn:oasis:names:tc:SAML:2.0:assertion:Assertion',
+ # node_id=assertion[0].id,
+ # id_attr="")
+ assert valid
+
+ def test_encrypted_response_1(self):
+ cert_str_advice, cert_key_str_advice = generate_cert()
+
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str_advice,
+ )
+
+ _resp = "%s" % _resp
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ _, key_file = make_temp(cert_key_str_advice, decode=False)
+
+ decr_text = self.server.sec.decrypt(_resp, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ self.verify_advice_assertion(resp, decr_text)
+
+ def test_encrypted_response_2(self):
+
+ cert_str_advice, cert_key_str_advice = generate_cert()
+
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str_advice,
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ _, key_file = make_temp(cert_key_str_advice, decode=False)
+
+ decr_text_2 = self.server.sec.decrypt(decr_text_1, key_file)
+
+ resp = samlp.response_from_string(decr_text_2)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_advice_assertion(resp, decr_text_2)
+
+ def test_encrypted_response_3(self):
+ cert_str_assertion, cert_key_str_assertion = generate_cert()
+
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ encrypt_cert_assertion=cert_str_assertion
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ _, key_file = make_temp(cert_key_str_assertion, decode=False)
+
+ decr_text = self.server.sec.decrypt(_resp, key_file)
+
+ resp = samlp.response_from_string(decr_text)
+
+ assert resp.encrypted_assertion[0].extension_elements
+
+ assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_encrypted_assertion(assertion, decr_text)
+
+ def test_encrypted_response_4(self):
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ resp = samlp.response_from_string(decr_text)
+
+ assert resp.encrypted_assertion[0].extension_elements
+
+ assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_encrypted_assertion(assertion, decr_text)
+
+ def test_encrypted_response_5(self):
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True
+ )
+
+ _resp = "%s" % _resp
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ decr_text = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ resp = samlp.response_from_string(decr_text)
+
+ self.verify_advice_assertion(resp, decr_text)
+
+ def test_encrypted_response_6(self):
+ _server = Server("idp_conf_verify_cert")
+
+ cert_str_advice, cert_key_str_advice = generate_cert()
+
+ cert_str_assertion, cert_key_str_assertion = generate_cert()
+
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice=cert_str_advice,
+ encrypt_cert_assertion=cert_str_assertion
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ _, key_file = make_temp(cert_key_str_assertion, decode=False)
+
+ decr_text_1 = _server.sec.decrypt(_resp, key_file)
+
+ _, key_file = make_temp(cert_key_str_advice, decode=False)
+
+ decr_text_2 = _server.sec.decrypt(decr_text_1, key_file)
+
+ resp = samlp.response_from_string(decr_text_2)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_advice_assertion(resp, decr_text_2)
+
+ def test_encrypted_response_7(self):
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True
+ )
+
+ sresponse = response_from_string(_resp)
+
+ assert sresponse.signature is None
+
+ decr_text_1 = self.server.sec.decrypt(_resp, self.client.config.encryption_keypairs[1]["key_file"])
+
+ decr_text_2 = self.server.sec.decrypt(decr_text_1, self.client.config.encryption_keypairs[1]["key_file"])
+
+ resp = samlp.response_from_string(decr_text_2)
+
+ resp.assertion = extension_elements_to_elements(resp.encrypted_assertion[0].extension_elements, [saml, samlp])
+
+ self.verify_advice_assertion(resp, decr_text_2)
+
+ def test_encrypted_response_8(self):
+ try:
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice="whatever",
+ encrypt_cert_assertion="whatever"
+ )
+ assert False, "Must throw an exception"
+ except EncryptError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ try:
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice="whatever",
+ )
+ assert False, "Must throw an exception"
+ except EncryptError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ try:
+ _resp = self.server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ encrypt_cert_assertion="whatever"
+ )
+ assert False, "Must throw an exception"
+ except EncryptError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ _server = Server("idp_conf_verify_cert")
+
+ try:
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice="whatever",
+ encrypt_cert_assertion="whatever"
+ )
+ assert False, "Must throw an exception"
+ except CertificateError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ try:
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ encrypt_cert_advice="whatever",
+ )
+ assert False, "Must throw an exception"
+ except CertificateError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ try:
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ encrypt_cert_assertion="whatever"
+ )
+ assert False, "Must throw an exception"
+ except CertificateError as ex:
+ pass
+ except Exception as ex:
+ assert False, "Wrong exception!"
+
+ def test_encrypted_response_9(self):
+ _server = Server("idp_conf_sp_no_encrypt")
+
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ pefim=True,
+ )
+
+ self.verify_assertion(_resp.assertion.advice.assertion)
+
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=True,
+ pefim=True
+ )
+
+ self.verify_assertion(_resp.assertion.advice.assertion)
+
+ _resp = _server.create_authn_response(
+ self.ava,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=self.name_id,
+ sign_response=False,
+ sign_assertion=False,
+ encrypt_assertion=True,
+ encrypt_assertion_self_contained=True,
+ encrypted_advice_attributes=False,
+ )
+
+ self.verify_assertion([_resp.assertion])
+
+
+ def test_slo_http_post(self):
+ soon = time_util.in_a_while(days=1)
+ sinfo = {
+ "name_id": nid,
+ "issuer": "urn:mace:example.com:saml:roland:idp",
+ "not_on_or_after": soon,
+ "user": {
+ "givenName": "Leo",
+ "sn": "Laport",
+ }
+ }
+ self.client.users.add_information_about_person(sinfo)
+
+ req_id, logout_request = self.client.create_logout_request(
+ destination="http://localhost:8088/slop", name_id=nid,
+ issuer_entity_id="urn:mace:example.com:saml:roland:idp",
+ reason="I'm tired of this")
+
+ intermed = base64.b64encode(str(logout_request).encode('utf-8'))
+
+ #saml_soap = make_soap_enveloped_saml_thingy(logout_request)
+ request = self.server.parse_logout_request(intermed, BINDING_HTTP_POST)
+ assert request
+
+ def test_slo_soap(self):
+ soon = time_util.in_a_while(days=1)
+ sinfo = {
+ "name_id": nid,
+ "issuer": "urn:mace:example.com:saml:roland:idp",
+ "not_on_or_after": soon,
+ "user": {
+ "givenName": "Leo",
+ "sn": "Laport",
+ }
+ }
+
+ sp = client.Saml2Client(config_file="server_conf")
+ sp.users.add_information_about_person(sinfo)
+
+ req_id, logout_request = sp.create_logout_request(
+ name_id=nid, destination="http://localhost:8088/slo",
+ issuer_entity_id="urn:mace:example.com:saml:roland:idp",
+ reason="I'm tired of this")
+
+ #_ = s_utils.deflate_and_base64_encode("%s" % (logout_request,))
+
+ saml_soap = make_soap_enveloped_saml_thingy(logout_request)
+ self.server.ident.close()
+
+ with closing(Server("idp_soap_conf")) as idp:
+ request = idp.parse_logout_request(saml_soap)
+ idp.ident.close()
+ assert request
+
+# ------------------------------------------------------------------------
+
IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"sn": ["Jeter"], "givenName": ["Derek"],