diff options
author | Hans Hörberg <hans.horberg@umu.se> | 2015-05-21 16:23:25 +0200 |
---|---|---|
committer | Hans Hörberg <hans.horberg@umu.se> | 2015-05-21 16:23:25 +0200 |
commit | e2b04612470264bda302d232a801db02dfed9ff6 (patch) | |
tree | 777c96691352136921fc0a2b1e8b1fb7043e5d67 | |
parent | e70835bb25b59ffdfd1fd3dfe392de2df8e69941 (diff) | |
download | pysaml2-e2b04612470264bda302d232a801db02dfed9ff6.tar.gz |
Pysaml can now decrypt multiple encrypted assertions with multiple advice elements with multiple encrypted assertions.
-rw-r--r-- | src/saml2/__init__.py | 15 | ||||
-rw-r--r-- | src/saml2/client_base.py | 5 | ||||
-rw-r--r-- | src/saml2/entity.py | 20 | ||||
-rw-r--r-- | src/saml2/response.py | 111 | ||||
-rw-r--r-- | src/saml2/sigver.py | 30 | ||||
-rw-r--r-- | tests/test_51_client.py | 463 |
6 files changed, 575 insertions, 69 deletions
diff --git a/src/saml2/__init__.py b/src/saml2/__init__.py index 87bfafa6..4493c9c8 100644 --- a/src/saml2/__init__.py +++ b/src/saml2/__init__.py @@ -591,13 +591,14 @@ class SamlBase(ExtensionContainer): def get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(self, assertion_tag, advice_tag): for tmp_encrypted_assertion in self.assertion.advice.encrypted_assertion: - prefix_map = self.get_prefix_map([tmp_encrypted_assertion._to_element_tree(). - find(assertion_tag)]) - - tree = self._to_element_tree() - - self.set_prefixes(tree.find(assertion_tag).find(advice_tag).find(tmp_encrypted_assertion._to_element_tree() - .tag).find(assertion_tag), prefix_map) + if tmp_encrypted_assertion.encrypted_data is None: + prefix_map = self.get_prefix_map([tmp_encrypted_assertion._to_element_tree().find(assertion_tag)]) + tree = self._to_element_tree() + encs = tree.find(assertion_tag).find(advice_tag).findall(tmp_encrypted_assertion._to_element_tree().tag) + for enc in encs: + assertion = enc.find(assertion_tag) + if assertion is not None: + self.set_prefixes(assertion, prefix_map) return ElementTree.tostring(tree, encoding="UTF-8") diff --git a/src/saml2/client_base.py b/src/saml2/client_base.py index 3f7a227b..da35ae9c 100644 --- a/src/saml2/client_base.py +++ b/src/saml2/client_base.py @@ -542,7 +542,7 @@ class Base(Entity): # ======== response handling =========== def parse_authn_request_response(self, xmlstr, binding, outstanding=None, - outstanding_certs=None, decrypt=True, pefim=False): + outstanding_certs=None): """ Deal with an AuthnResponse :param xmlstr: The reply as a xml string @@ -573,12 +573,11 @@ class Base(Entity): "attribute_converters": self.config.attribute_converters, "allow_unknown_attributes": self.config.allow_unknown_attributes, - "decrypt": decrypt } try: resp = self._parse_response(xmlstr, AuthnResponse, "assertion_consumer_service", - binding, pefim=pefim, **kwargs) + binding, **kwargs) except StatusError as err: logger.error("SAML status error: %s" % err) raise diff --git a/src/saml2/entity.py b/src/saml2/entity.py index e7a75a5a..e7ef879c 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -978,7 +978,7 @@ class Entity(HTTPBase): # ------------------------------------------------------------------------ def _parse_response(self, xmlstr, response_cls, service, binding, - outstanding_certs=None, pefim=False, **kwargs): + outstanding_certs=None, **kwargs): """ Deal with a Response :param xmlstr: The response as a xml string @@ -1040,23 +1040,23 @@ class Entity(HTTPBase): logger.debug("XMLSTR: %s" % xmlstr) if response: + keys = None if outstanding_certs: try: cert = outstanding_certs[response.in_response_to] except KeyError: - key_file = "" + keys = None else: - _, key_file = make_temp("%s" % cert["key"], - decode=False) - else: - key_file = "" + if not isinstance(cert, list): + cert = [cert] + keys = [] + for _cert in cert: + keys.append(_cert["key"]) only_identity_in_encrypted_assertion = False if "only_identity_in_encrypted_assertion" in kwargs: only_identity_in_encrypted_assertion = kwargs["only_identity_in_encrypted_assertion"] - decrypt = True - if "decrypt" in kwargs: - decrypt = kwargs["decrypt"] - response = response.verify(key_file, decrypt=decrypt, pefim=pefim) + + response = response.verify(keys) if not response: return None diff --git a/src/saml2/response.py b/src/saml2/response.py index d46a0b0f..6f733280 100644 --- a/src/saml2/response.py +++ b/src/saml2/response.py @@ -395,7 +395,7 @@ class StatusResponse(object): def loads(self, xmldata, decode=True, origxml=None): return self._loads(xmldata, decode, origxml) - def verify(self, key_file="", decrypt=True, pefim=False): + def verify(self, keys=None): try: return self._verify() except AssertionError: @@ -636,18 +636,19 @@ class AuthnResponse(StatusResponse): """ ava = {} - if self.assertion.advice: - if self.assertion.advice.assertion: - for tmp_assertion in self.assertion.advice.assertion: - if tmp_assertion.attribute_statement: - assert len(tmp_assertion.attribute_statement) == 1 - ava.update(self.read_attribute_statement(tmp_assertion.attribute_statement[0])) - if self.assertion.attribute_statement: - assert len(self.assertion.attribute_statement) == 1 - _attr_statem = self.assertion.attribute_statement[0] - ava.update(self.read_attribute_statement(_attr_statem)) - if not ava: - logger.error("Missing Attribute Statement") + for _assertion in self.assertions: + if _assertion.advice: + if _assertion.advice.assertion: + for tmp_assertion in _assertion.advice.assertion: + if tmp_assertion.attribute_statement: + assert len(tmp_assertion.attribute_statement) == 1 + ava.update(self.read_attribute_statement(tmp_assertion.attribute_statement[0])) + if _assertion.attribute_statement: + assert len(_assertion.attribute_statement) == 1 + _attr_statem = _assertion.attribute_statement[0] + ava.update(self.read_attribute_statement(_attr_statem)) + if not ava: + logger.error("Missing Attribute Statement") return ava def _bearer_confirmed(self, data): @@ -796,14 +797,14 @@ class AuthnResponse(StatusResponse): logger.exception("get subject") raise - def decrypt_assertions(self, encrypted_assertions, decr_txt, issuer=None): + def decrypt_assertions(self, encrypted_assertions, decr_txt, issuer=None, verified=False): res = [] for encrypted_assertion in encrypted_assertions: if encrypted_assertion.extension_elements: assertions = extension_elements_to_elements( encrypted_assertion.extension_elements, [saml, samlp]) for assertion in assertions: - if assertion.signature: + if assertion.signature and not verified: if not self.sec.check_signature( assertion, origdoc=decr_txt, node_name=class_name(assertion), issuer=issuer): @@ -812,7 +813,35 @@ class AuthnResponse(StatusResponse): res.append(assertion) return res - def parse_assertion(self, key_file="", decrypt=True, pefim=False): + def find_encrypt_data_assertion(self, enc_assertions): + for _assertion in enc_assertions: + if _assertion.encrypted_data is not None: + return True + + def find_encrypt_data_assertion_list(self, _assertions): + for _assertion in _assertions: + if _assertion.advice: + if _assertion.advice.encrypted_assertion: + res = self.find_encrypt_data_assertion(_assertion.advice.encrypted_assertion) + if res: + return True + + def find_encrypt_data(self, resp): + _has_encrypt_data = False + if resp.encrypted_assertion: + res = self.find_encrypt_data_assertion(resp.encrypted_assertion) + if res: + return True + if resp.assertion: + for tmp_assertion in resp.assertion: + if tmp_assertion.advice: + if tmp_assertion.advice.encrypted_assertion: + res = self.find_encrypt_data_assertion(tmp_assertion.advice.encrypted_assertion) + if res: + return True + return False + + def parse_assertion(self, keys=None): if self.context == "AuthnQuery": # can contain one or more assertions pass @@ -823,13 +852,13 @@ class AuthnResponse(StatusResponse): except AssertionError: raise Exception("No assertion part") - has_encrypted_assertions = self.response.encrypted_assertion - if not has_encrypted_assertions and self.response.assertion: - for tmp_assertion in self.response.assertion: - if tmp_assertion.advice: - if tmp_assertion.advice.encrypted_assertion: - has_encrypted_assertions = True - break + has_encrypted_assertions = self.find_encrypt_data(self.response) #self.response.encrypted_assertion + #if not has_encrypted_assertions and self.response.assertion: + # for tmp_assertion in self.response.assertion: + # if tmp_assertion.advice: + # if tmp_assertion.advice.encrypted_assertion: + # has_encrypted_assertions = True + # break if self.response.assertion: logger.debug("***Unencrypted assertion***") @@ -837,16 +866,25 @@ class AuthnResponse(StatusResponse): if not self._assertion(assertion, False): return False - if has_encrypted_assertions and decrypt: + if has_encrypted_assertions: _enc_assertions = [] logger.debug("***Encrypted assertion/-s***") - decr_text = self.sec.decrypt(self.xmlstr, key_file) - resp = samlp.response_from_string(decr_text) + decr_text = "%s" % self.response + resp = self.response + while self.find_encrypt_data(resp): + decr_text = self.sec.decrypt_keys(decr_text, keys) + resp = samlp.response_from_string(decr_text) _enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text) - decr_text = self.sec.decrypt(decr_text, key_file) - resp = samlp.response_from_string(decr_text) + while self.find_encrypt_data(resp) or self.find_encrypt_data_assertion_list(_enc_assertions): + decr_text = self.sec.decrypt_keys(decr_text, keys) + resp = samlp.response_from_string(decr_text) + _enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text, verified=True) + #_enc_assertions = self.decrypt_assertions(resp.encrypted_assertion, decr_text, verified=True) + all_assertions = _enc_assertions if resp.assertion: - for tmp_ass in resp.assertion: + all_assertions = all_assertions + resp.assertion + if len(all_assertions) > 0: + for tmp_ass in all_assertions: if tmp_ass.advice and tmp_ass.advice.encrypted_assertion: advice_res = self.decrypt_assertions(tmp_ass.advice.encrypted_assertion, decr_text, @@ -855,21 +893,20 @@ class AuthnResponse(StatusResponse): tmp_ass.advice.assertion.extend(advice_res) else: tmp_ass.advice.assertion = advice_res - if not pefim: - _enc_assertions.extend(advice_res) tmp_ass.advice.encrypted_assertion = [] self.response.assertion = resp.assertion for assertion in _enc_assertions: if not self._assertion(assertion, True): return False + else: + self.assertions.append(assertion) + self.xmlstr = decr_text self.response.encrypted_assertion = [] if self.response.assertion: for assertion in self.response.assertion: - if assertion.advice and assertion.advice.assertion: - for advice_assertion in assertion.advice.assertion: - self.assertions.append(assertion) + self.assertions.append(assertion) if self.assertions and len(self.assertions) > 0: self.assertion = self.assertions[0] @@ -880,7 +917,7 @@ class AuthnResponse(StatusResponse): return True - def verify(self, key_file="", decrypt=True, pefim=False): + def verify(self, keys=None): """ Verify that the assertion is syntactically correct and the signature is correct if present. :param key_file: If not the default key file should be used this is it. @@ -898,7 +935,7 @@ class AuthnResponse(StatusResponse): if not isinstance(self.response, samlp.Response): return self - if self.parse_assertion(key_file, decrypt=decrypt, pefim=pefim): + if self.parse_assertion(keys): return self else: logger.error("Could not parse the assertion") @@ -1126,7 +1163,7 @@ class AssertionIDResponse(object): return self._postamble() - def verify(self, key_file="", decrypt=True, pefim=False): + def verify(self, keys=None): try: valid_instance(self.response) except NotValid as exc: diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 096134e2..ca928686 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -769,7 +769,7 @@ class CryptoBackendXmlSec1(CryptoBackend): return output def encrypt_assertion(self, statement, enc_key, template, - key_type="des-192", node_xpath=None): + key_type="des-192", node_xpath=None, node_id=None): """ Will encrypt an assertion @@ -792,6 +792,8 @@ class CryptoBackendXmlSec1(CryptoBackend): com_list = [self.xmlsec, "encrypt", "--pubkey-cert-pem", enc_key, "--session-key", key_type, "--xml-data", fil, "--node-xpath", node_xpath] + if node_id: + com_list.extend(["--node-id", node_id]) (_stdout, _stderr, output) = self._run_xmlsec( com_list, [tmpl], exception=EncryptError, validate_output=False) @@ -1300,22 +1302,38 @@ class SecurityContext(object): """ raise NotImplemented() - def decrypt(self, enctext, key_file=None): + def decrypt_keys(self, enctext, keys=None): """ Decrypting an encrypted text by the use of a private key. :param enctext: The encrypted text as a string :return: The decrypted text """ + if not isinstance(keys, list): + keys = [keys] _enctext = self.crypto.decrypt(enctext, self.key_file) if _enctext is not None and len(_enctext) > 0: return _enctext - if key_file is not None and len(key_file.strip()) > 0: - _enctext = self.crypto.decrypt(enctext, key_file) - if _enctext is not None and len(_enctext) > 0: - return _enctext + for _key in keys: + if _key is not None and len(_key.strip()) > 0: + _, key_file = make_temp("%s" % _key, decode=False) + _enctext = self.crypto.decrypt(enctext, key_file) + if _enctext is not None and len(_enctext) > 0: + return _enctext + return enctext + + def decrypt(self, enctext, key_file=None): + """ Decrypting an encrypted text by the use of a private key. + + :param enctext: The encrypted text as a string + :return: The decrypted text + """ _enctext = self.crypto.decrypt(enctext, self.key_file) if _enctext is not None and len(_enctext) > 0: return _enctext + if key_file is not None and len(key_file.strip()) > 0: + _enctext = self.crypto.decrypt(enctext, key_file) + if _enctext is not None and len(_enctext) > 0: + return _enctext return enctext def verify_signature(self, signedtext, cert_file=None, cert_type="pem", diff --git a/tests/test_51_client.py b/tests/test_51_client.py index 7affc3e8..e69e337b 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -27,7 +27,7 @@ from saml2.saml import NAMEID_FORMAT_PERSISTENT, EncryptedAssertion, Advice from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID from saml2.server import Server -from saml2.sigver import pre_encryption_part, make_temp +from saml2.sigver import pre_encryption_part, make_temp, pre_encrypt_assertion from saml2.sigver import rm_xmltag from saml2.sigver import verify_redirect_signature from saml2.s_utils import do_attribute_statement @@ -434,7 +434,7 @@ class TestClient: authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, - {"id1": "http://foo.example.com/service"}, {"id1": cert}, pefim=True) + {"id1": "http://foo.example.com/service"}, {"id1": cert}) self.verify_authn_response(idp, authn_response, _client, ava_verify) @@ -471,7 +471,225 @@ class TestClient: authn_response = _client.parse_authn_request_response( resp_str, BINDING_HTTP_POST, - {"id1": "http://foo.example.com/service"}, pefim=True) + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_4(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + #name_id_policy=nameid_policy, + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + #encrypted_advice_attributes=True, + pefim=True, + ) + + resp_str = "%s" % resp + + resp_str = base64.encodestring(resp_str) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_5(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_str, cert_key_str = generate_cert() + + cert =\ + { + "cert": cert_str, + "key": cert_key_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + #name_id_policy=nameid_policy, + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + #encrypted_advice_attributes=True, + pefim=True, + encrypt_cert_assertion=cert_str + ) + + resp_str = "%s" % resp + + resp_str = base64.encodestring(resp_str) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_6(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_assertion_str, cert_key_assertion_str = generate_cert() + + cert_assertion =\ + { + "cert": cert_assertion_str, + "key": cert_key_assertion_str + } + + cert_advice_str, cert_key_advice_str = generate_cert() + + cert_advice =\ + { + "cert": cert_advice_str, + "key": cert_key_advice_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + #name_id_policy=nameid_policy, + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + #encrypted_advice_attributes=True, + pefim=True, + encrypt_cert_assertion=cert_assertion_str, + encrypt_cert_advice=cert_advice_str + ) + + resp_str = "%s" % resp + + resp_str = base64.encodestring(resp_str) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": [cert_assertion, cert_advice]}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_7(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + #name_id_policy=nameid_policy, + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypted_advice_attributes=True, + ) + + resp_str = "%s" % resp + + resp_str = base64.encodestring(resp_str) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}) + + self.verify_authn_response(idp, authn_response, _client, ava_verify) + + def test_response_8(self): + conf = config.SPConfig() + conf.load_file("server_conf") + _client = Saml2Client(conf) + + idp, ava, ava_verify, nameid_policy = self.setup_verify_authn_response() + + self.name_id = self.server.ident.transient_nameid( + "urn:mace:example.com:saml:roland:sp", "id1") + + cert_str, cert_key_str = generate_cert() + + cert =\ + { + "cert": cert_str, + "key": cert_key_str + } + + resp = self.server.create_authn_response( + identity=ava, + in_response_to="id1", + destination="http://lingon.catalogix.se:8087/", + sp_entity_id="urn:mace:example.com:saml:roland:sp", + #name_id_policy=nameid_policy, + name_id=self.name_id, + userid="foba0001@example.com", + authn=AUTHN, + sign_response=True, + sign_assertion=True, + encrypt_assertion=True, + encrypt_assertion_self_contained=True, + encrypt_cert_assertion=cert_str + ) + + resp_str = "%s" % resp + + resp_str = base64.encodestring(resp_str) + + authn_response = _client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"id1": "http://foo.example.com/service"}, {"id1": cert}) self.verify_authn_response(idp, authn_response, _client, ava_verify) @@ -486,7 +704,7 @@ class TestClient: def verify_authn_response(self, idp, authn_response, _client, ava_verify): assert authn_response is not None assert authn_response.issuer() == idp - assert authn_response.response.assertion[0].issuer.text == idp + assert authn_response.assertion.issuer.text == idp session_info = authn_response.session_info() assert session_info["ava"] == ava_verify @@ -634,7 +852,7 @@ class TestClient: assert resp.assertion assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']} - def test_sign_then_encrypt_assertion_advice(self): + def test_sign_then_encrypt_assertion_advice_1(self): # Begin with the IdPs side _sec = self.server.sec @@ -712,7 +930,240 @@ class TestClient: assert resp.ava == \ {'sn': ['Jeter'], 'givenName': ['Derek'], 'uid': ['test01'], 'email': ['test.testsson@test.se']} + def test_sign_then_encrypt_assertion_advice_2(self): + # Begin with the IdPs side + _sec = self.server.sec + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + asser_1 = Assertion({"givenName": "Derek"}) + assertion_1 = asser_1.construct( + self.client.config.entityid, "_012345", + "http://lingon.catalogix.se:8087/", + factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + policy=self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + attrconvs=self.server.config.attribute_converters, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login") + + asser_2 = Assertion({"surName": "Jeter"}) + assertion_2 = asser_2.construct( + self.client.config.entityid, "_012345", + "http://lingon.catalogix.se:8087/", + factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + policy=self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + attrconvs=self.server.config.attribute_converters, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login") + + a_asser_1 = Assertion({"uid": "test01"}) + a_assertion_1 = a_asser_1.construct( + self.client.config.entityid, "_012345", + "http://lingon.catalogix.se:8087/", + factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + policy=self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + attrconvs=self.server.config.attribute_converters, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login") + + a_asser_2 = Assertion({"email": "test.testsson@test.se"}) + a_assertion_2 = a_asser_2.construct( + self.client.config.entityid, "_012345", + "http://lingon.catalogix.se:8087/", + factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + policy=self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + attrconvs=self.server.config.attribute_converters, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login") + + a_asser_3 = Assertion({"street": "street"}) + a_assertion_3 = a_asser_3.construct( + self.client.config.entityid, "_012345", + "http://lingon.catalogix.se:8087/", + factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + policy=self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + attrconvs=self.server.config.attribute_converters, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login") + + a_asser_4 = Assertion({"title": "title"}) + a_assertion_4 = a_asser_4.construct( + self.client.config.entityid, "_012345", + "http://lingon.catalogix.se:8087/", + factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + policy=self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + attrconvs=self.server.config.attribute_converters, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login") + + a_assertion_1.signature = sigver.pre_signature_part( + a_assertion_1.id, _sec.my_cert, 1) + + a_assertion_2.signature = sigver.pre_signature_part( + a_assertion_2.id, _sec.my_cert, 1) + + a_assertion_3.signature = sigver.pre_signature_part( + a_assertion_3.id, _sec.my_cert, 1) + + a_assertion_4.signature = sigver.pre_signature_part( + a_assertion_4.id, _sec.my_cert, 1) + + assertion_1.signature = sigver.pre_signature_part(assertion_1.id, _sec.my_cert, 1) + + assertion_2.signature = sigver.pre_signature_part(assertion_2.id, _sec.my_cert, 1) + + response = sigver.response_factory( + in_response_to="_012345", + destination="http://lingon.catalogix.se:8087/", + status=s_utils.success_status_factory(), + issuer=self.server._issuer() + ) + + response.assertion = assertion_1 + + response.assertion.advice = Advice() + + response.assertion.advice.encrypted_assertion = [] + response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) + + response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_1) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_1._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, class_name(a_assertion_1), + key_file=self.server.sec.key_file, + node_id=a_assertion_1.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file, + pre_encryption_part(), node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response.assertion = response.assertion[0] + + response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) + response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_2) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_2._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, class_name(a_assertion_2), + key_file=self.server.sec.key_file, + node_id=a_assertion_2.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file, + pre_encryption_part(), node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response.assertion = response.assertion[0] + + assertion_tag = response.assertion._to_element_tree().tag + response = pre_encrypt_assertion(response) + response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion( + assertion_tag) + response = _sec.sign_statement("%s" % response, class_name(assertion_1), + key_file=self.server.sec.key_file, + node_id=assertion_1.id) + + enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file, pre_encryption_part()) + + response = samlp.response_from_string(enctext) + + response.assertion = assertion_2 + + response.assertion.advice = Advice() + + response.assertion.advice.encrypted_assertion = [] + response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) + + + response.assertion.advice.encrypted_assertion[0].add_extension_element(a_assertion_3) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_3._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, class_name(a_assertion_3), + key_file=self.server.sec.key_file, + node_id=a_assertion_3.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file, + pre_encryption_part(), node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response.assertion = response.assertion[0] + + response.assertion.advice.encrypted_assertion.append(EncryptedAssertion()) + + response.assertion.advice.encrypted_assertion[1].add_extension_element(a_assertion_4) + + advice_tag = response.assertion.advice._to_element_tree().tag + assertion_tag = a_assertion_4._to_element_tree().tag + response = \ + response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( + assertion_tag, advice_tag) + + response = _sec.sign_statement("%s" % response, class_name(a_assertion_4), + key_file=self.server.sec.key_file, + node_id=a_assertion_4.id) + + node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]]) + + enctext = _sec.crypto.encrypt_assertion(response, self.client.sec.cert_file, + pre_encryption_part(), node_xpath=node_xpath) + + response = samlp.response_from_string(enctext) + + response = _sec.sign_statement("%s" % response, class_name(response.assertion[0]), + key_file=self.server.sec.key_file, + node_id=response.assertion[0].id) + + response = samlp.response_from_string(response) + + #seresp = samlp.response_from_string(enctext) + + resp_str = base64.encodestring("%s" % response) + # Now over to the client side + resp = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"_012345": "http://foo.example.com/service"}) + + #assert resp.encrypted_assertion == [] + assert resp.assertion + assert resp.assertion.advice + assert resp.assertion.advice.assertion + assert resp.ava == \ + {'street': ['street'], 'uid': ['test01'], 'title': ['title'], 'givenName': ['Derek'], 'email': + ['test.testsson@test.se'], 'sn': ['Jeter']} def test_signed_redirect(self): @@ -898,4 +1349,4 @@ class TestClientWithDummy(): if __name__ == "__main__": tc = TestClient() tc.setup_class() - tc.test_response_3() + tc.test_response_8() |