diff options
author | Roland Hedberg <roland.hedberg@adm.umu.se> | 2014-04-14 14:19:19 +0200 |
---|---|---|
committer | Roland Hedberg <roland.hedberg@adm.umu.se> | 2014-04-14 14:19:19 +0200 |
commit | a6ef115ff77d81bec4e8beb2cbd720d903bb806f (patch) | |
tree | 059d990a090ac242845321c42bbce610e025160f | |
parent | fca906e9b0b2df455885119c9a9603171874ef4a (diff) | |
download | pysaml2-a6ef115ff77d81bec4e8beb2cbd720d903bb806f.tar.gz |
Fixed handling of signed and then encrypted response assertions. At the same time added support for dealing with any combination of encrypted/non-encrypted assertions.
-rw-r--r-- | src/saml2/assertion.py | 198 | ||||
-rw-r--r-- | src/saml2/entity.py | 26 | ||||
-rw-r--r-- | src/saml2/response.py | 89 | ||||
-rw-r--r-- | src/saml2/sigver.py | 50 | ||||
-rw-r--r-- | tests/test_12_s_utils.py | 2 | ||||
-rw-r--r-- | tests/test_40_sigver.py | 262 | ||||
-rw-r--r-- | tests/test_41_response.py | 53 | ||||
-rw-r--r-- | tests/test_50_server.py | 59 | ||||
-rw-r--r-- | tests/test_51_client.py | 142 | ||||
-rw-r--r-- | tests/test_60_sp.py | 1 | ||||
-rw-r--r-- | tests/test_81_certificates.py | 129 |
11 files changed, 632 insertions, 379 deletions
diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py index 59cd6216..ad083450 100644 --- a/src/saml2/assertion.py +++ b/src/saml2/assertion.py @@ -543,107 +543,109 @@ class EntityCategories(object): pass -class Assertion(dict): - """ Handles assertions about subjects """ - - def __init__(self, dic=None): - dict.__init__(self, dic) - self.acs = [] - - @staticmethod - def _authn_context_decl(decl, authn_auth=None): - """ - Construct the authn context with a authn context declaration - :param decl: The authn context declaration - :param authn_auth: Authenticating Authority - :return: An AuthnContext instance - """ +def _authn_context_class_ref(authn_class, authn_auth=None): + """ + Construct the authn context with a authn context class reference + :param authn_class: The authn context class reference + :param authn_auth: Authenticating Authority + :return: An AuthnContext instance + """ + cntx_class = factory(saml.AuthnContextClassRef, text=authn_class) + if authn_auth: return factory(saml.AuthnContext, - authn_context_decl=decl, + authn_context_class_ref=cntx_class, authenticating_authority=factory( saml.AuthenticatingAuthority, text=authn_auth)) - - def _authn_context_decl_ref(self, decl_ref, authn_auth=None): - """ - Construct the authn context with a authn context declaration reference - :param decl_ref: The authn context declaration reference - :param authn_auth: Authenticating Authority - :return: An AuthnContext instance - """ + else: return factory(saml.AuthnContext, - authn_context_decl_ref=decl_ref, - authenticating_authority=factory( - saml.AuthenticatingAuthority, text=authn_auth)) + authn_context_class_ref=cntx_class) - @staticmethod - def _authn_context_class_ref(authn_class, authn_auth=None): - """ - Construct the authn context with a authn context class reference - :param authn_class: The authn context class reference - :param authn_auth: Authenticating Authority - :return: An AuthnContext instance - """ - cntx_class = factory(saml.AuthnContextClassRef, text=authn_class) - if authn_auth: - return factory(saml.AuthnContext, - authn_context_class_ref=cntx_class, - authenticating_authority=factory( - saml.AuthenticatingAuthority, text=authn_auth)) - else: - return factory(saml.AuthnContext, - authn_context_class_ref=cntx_class) - - def _authn_statement(self, authn_class=None, authn_auth=None, - authn_decl=None, authn_decl_ref=None, authn_instant="", - subject_locality=""): - """ - Construct the AuthnStatement - :param authn_class: Authentication Context Class reference - :param authn_auth: Authenticating Authority - :param authn_decl: Authentication Context Declaration - :param authn_decl_ref: Authentication Context Declaration reference - :param authn_instant: When the Authentication was performed. - Assumed to be seconds since the Epoch. - :param subject_locality: Specifies the DNS domain name and IP address - for the system from which the assertion subject was apparently - authenticated. - :return: An AuthnContext instance - """ - if authn_instant: - _instant = instant(time_stamp=authn_instant) - else: - _instant = instant() - - if authn_class: - res = factory( - saml.AuthnStatement, - authn_instant=_instant, - session_index=sid(), - authn_context=self._authn_context_class_ref( - authn_class, authn_auth)) - elif authn_decl: - res = factory( - saml.AuthnStatement, - authn_instant=_instant, - session_index=sid(), - authn_context=self._authn_context_decl(authn_decl, authn_auth)) - elif authn_decl_ref: - res = factory( - saml.AuthnStatement, - authn_instant=_instant, - session_index=sid(), - authn_context=self._authn_context_decl_ref(authn_decl_ref, - authn_auth)) - else: - res = factory( - saml.AuthnStatement, - authn_instant=_instant, - session_index=sid()) - if subject_locality: - res.subject_locality = saml.SubjectLocality(text=subject_locality) +def _authn_context_decl(decl, authn_auth=None): + """ + Construct the authn context with a authn context declaration + :param decl: The authn context declaration + :param authn_auth: Authenticating Authority + :return: An AuthnContext instance + """ + return factory(saml.AuthnContext, + authn_context_decl=decl, + authenticating_authority=factory( + saml.AuthenticatingAuthority, text=authn_auth)) - return res + +def _authn_context_decl_ref(decl_ref, authn_auth=None): + """ + Construct the authn context with a authn context declaration reference + :param decl_ref: The authn context declaration reference + :param authn_auth: Authenticating Authority + :return: An AuthnContext instance + """ + return factory(saml.AuthnContext, + authn_context_decl_ref=decl_ref, + authenticating_authority=factory( + saml.AuthenticatingAuthority, text=authn_auth)) + + +def authn_statement(authn_class=None, authn_auth=None, + authn_decl=None, authn_decl_ref=None, authn_instant="", + subject_locality=""): + """ + Construct the AuthnStatement + :param authn_class: Authentication Context Class reference + :param authn_auth: Authenticating Authority + :param authn_decl: Authentication Context Declaration + :param authn_decl_ref: Authentication Context Declaration reference + :param authn_instant: When the Authentication was performed. + Assumed to be seconds since the Epoch. + :param subject_locality: Specifies the DNS domain name and IP address + for the system from which the assertion subject was apparently + authenticated. + :return: An AuthnContext instance + """ + if authn_instant: + _instant = instant(time_stamp=authn_instant) + else: + _instant = instant() + + if authn_class: + res = factory( + saml.AuthnStatement, + authn_instant=_instant, + session_index=sid(), + authn_context=_authn_context_class_ref( + authn_class, authn_auth)) + elif authn_decl: + res = factory( + saml.AuthnStatement, + authn_instant=_instant, + session_index=sid(), + authn_context=_authn_context_decl(authn_decl, authn_auth)) + elif authn_decl_ref: + res = factory( + saml.AuthnStatement, + authn_instant=_instant, + session_index=sid(), + authn_context=_authn_context_decl_ref(authn_decl_ref, + authn_auth)) + else: + res = factory( + saml.AuthnStatement, + authn_instant=_instant, + session_index=sid()) + + if subject_locality: + res.subject_locality = saml.SubjectLocality(text=subject_locality) + + return res + + +class Assertion(dict): + """ Handles assertions about subjects """ + + def __init__(self, dic=None): + dict.__init__(self, dic) + self.acs = [] def construct(self, sp_entity_id, in_response_to, consumer_url, name_id, attrconvs, policy, issuer, authn_class=None, @@ -695,10 +697,10 @@ class Assertion(dict): conds = policy.conditions(sp_entity_id) if authn_auth or authn_class or authn_decl or authn_decl_ref: - _authn_statement = self._authn_statement(authn_class, authn_auth, - authn_decl, authn_decl_ref, - authn_instant, - subject_locality) + _authn_statement = authn_statement(authn_class, authn_auth, + authn_decl, authn_decl_ref, + authn_instant, + subject_locality) else: _authn_statement = None diff --git a/src/saml2/entity.py b/src/saml2/entity.py index 96248730..db1a34f7 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -852,14 +852,6 @@ class Entity(HTTPBase): xmlstr = self.unravel(xmlstr, binding, response_cls.msgtype) origxml = xmlstr - if outstanding_certs is not None: - _response = samlp.any_response_from_string(xmlstr) - if len(_response.encrypted_assertion) > 0: - _, cert_file = make_temp( - "%s" % outstanding_certs[ - _response.in_response_to]["key"], decode=False) - cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary) - xmlstr = cbxs.decrypt(xmlstr, cert_file) if not xmlstr: # Not a valid reponse return None @@ -878,18 +870,14 @@ class Entity(HTTPBase): logger.debug("XMLSTR: %s" % xmlstr) - if hasattr(response.response, 'encrypted_assertion'): - for encrypted_assertion in response.response.encrypted_assertion: - if encrypted_assertion.extension_elements is not None: - assertion_list = extension_elements_to_elements( - encrypted_assertion.extension_elements, [saml]) - for assertion in assertion_list: - _assertion = saml.assertion_from_string( - str(assertion)) - response.response.assertion.append(_assertion) - if response: - response = response.verify() + if outstanding_certs is not None: + _, key_file = make_temp( + "%s" % outstanding_certs[ + response.in_response_to]["key"], decode=False) + else: + key_file = "" + response = response.verify(key_file) if not response: return None diff --git a/src/saml2/response.py b/src/saml2/response.py index 8212584d..c197380b 100644 --- a/src/saml2/response.py +++ b/src/saml2/response.py @@ -42,8 +42,8 @@ import xmldsig as ds import xmlenc as xenc from saml2 import samlp +from saml2 import class_name from saml2 import saml -from saml2 import extension_element_to_element from saml2 import extension_elements_to_elements from saml2 import SAMLError from saml2 import time_util @@ -387,7 +387,7 @@ class StatusResponse(object): if self.asynchop: if self.response.destination and \ - self.response.destination not in self.return_addrs: + self.response.destination not in self.return_addrs: logger.error("%s not in %s" % (self.response.destination, self.return_addrs)) return None @@ -399,7 +399,7 @@ class StatusResponse(object): def loads(self, xmldata, decode=True, origxml=None): return self._loads(xmldata, decode, origxml) - def verify(self): + def verify(self, key_file=""): try: return self._verify() except AssertionError: @@ -473,6 +473,7 @@ class AuthnResponse(StatusResponse): self.came_from = "" self.ava = None self.assertion = None + self.assertions = [] self.session_not_on_or_after = 0 self.allow_unsolicited = allow_unsolicited self.require_signature = want_assertions_signed @@ -739,8 +740,13 @@ class AuthnResponse(StatusResponse): return self.name_id def _assertion(self, assertion): - self.assertion = assertion + """ + Check the assertion + :param assertion: + :return: True/False depending on if the assertion is sane or not + """ + self.assertion = assertion logger.debug("assertion context: %s" % (self.context,)) logger.debug("assertion keys: %s" % (assertion.keyswv())) logger.debug("outstanding_queries: %s" % (self.outstanding_queries,)) @@ -773,54 +779,61 @@ class AuthnResponse(StatusResponse): logger.exception("get subject") raise - def _encrypted_assertion(self, xmlstr): - if xmlstr.encrypted_data: - assertion_str = self.sec.decrypt(xmlstr.encrypted_data.to_string()) - if not assertion_str: - raise DecryptionFailed() - assertion = saml.assertion_from_string(assertion_str) - else: - decrypt_xml = self.sec.decrypt(xmlstr) - - logger.debug("Decryption successfull") - - self.response = samlp.response_from_string(decrypt_xml) - logger.debug("Parsed decrypted assertion successfull") - - enc = self.response.encrypted_assertion[0].extension_elements[0] - assertion = extension_element_to_element( - enc, saml.ELEMENT_FROM_STRING, namespace=saml.NAMESPACE) - - logger.debug("Decrypted Assertion: %s" % assertion) - return self._assertion(assertion) + def decrypt_assertions(self, encrypted_assertions, key_file=""): + res = [] + for encrypted_assertion in encrypted_assertions: + if encrypted_assertion.extension_elements: + assertions = extension_elements_to_elements( + encrypted_assertion.extension_elements, [saml, samlp]) + for assertion in assertions: + if assertion.signature: + if not self.sec.verify_signature( + "%s" % assertion, key_file, + node_name=class_name(assertion)): + logger.error( + "Failed to verify signature on '%s'" % assertion) + raise SignatureError() + res.append(assertion) + return res - def parse_assertion(self): + def parse_assertion(self, key_file=""): if self.context == "AuthnQuery": # can contain one or more assertions pass else: # This is a saml2int limitation try: assert len(self.response.assertion) == 1 or \ - len(self.response.encrypted_assertion) == 1 + len(self.response.encrypted_assertion) == 1 except AssertionError: raise Exception("No assertion part") + if self.response.encrypted_assertion: + logger.debug("***Encrypted assertion/-s***") + decr_text = self.sec.decrypt(self.xmlstr) + resp = samlp.response_from_string(decr_text) + res = self.decrypt_assertions(resp.encrypted_assertion, key_file) + if self.response.assertion: + self.response.assertion.extend(res) + else: + self.response.assertion = res + self.response.encrypted_assertion = [] + if self.response.assertion: - logger.debug("***Unencrypted response***") + logger.debug("***Unencrypted assertion***") for assertion in self.response.assertion: if not self._assertion(assertion): return False - return True - else: - logger.debug("***Encrypted response***") - for assertion in self.response.encrypted_assertion: - if not self._encrypted_assertion(assertion): - return False - return True + else: + self.assertions.append(assertion) + self.assertion = self.assertions[0] - def verify(self): + return True + + def verify(self, key_file): """ Verify that the assertion is syntactically correct and - the signature is correct if present.""" + the signature is correct if present. + :param key_file: If not the default key file should be used this is it. + """ try: self._verify() @@ -830,7 +843,7 @@ class AuthnResponse(StatusResponse): if not isinstance(self.response, samlp.Response): return self - if self.parse_assertion(): + if self.parse_assertion(key_file): return self else: logger.error("Could not parse the assertion") @@ -1056,7 +1069,7 @@ class AssertionIDResponse(object): return self._postamble() - def verify(self): + def verify(self, key_file=""): try: valid_instance(self.response) except NotValid, exc: diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 57fa5914..9db02bb4 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -35,12 +35,13 @@ from Crypto.PublicKey import RSA from saml2.cert import OpenSSLWrapper from saml2.extension import pefim from saml2.saml import EncryptedAssertion -from saml2.samlp import Response, response_from_string +from saml2.samlp import Response import xmldsig as ds -import xmlenc as enc -from saml2 import samlp, SAMLError, extension_elements_to_elements +from saml2 import samlp +from saml2 import SAMLError +from saml2 import extension_elements_to_elements from saml2 import class_name from saml2 import saml from saml2 import ExtensionElement @@ -74,7 +75,8 @@ RSA_SHA1 = "http://www.w3.org/2000/09/xmldsig#rsa-sha1" RSA_1_5 = "http://www.w3.org/2001/04/xmlenc#rsa-1_5" TRIPLE_DES_CBC = "http://www.w3.org/2001/04/xmlenc#tripledes-cbc" XMLTAG = "<?xml version='1.0'?>" -PREFIX = "<?xml version='1.0' encoding='UTF-8'?>" +PREFIX1 = "<?xml version='1.0' encoding='UTF-8'?>" +PREFIX2 = '<?xml version="1.0" encoding="UTF-8"?>' class SigverError(SAMLError): @@ -125,8 +127,12 @@ def rm_xmltag(statement): statement = statement[len(XMLTAG):] if statement[0] == '\n': statement = statement[1:] - elif statement.startswith(PREFIX): - statement = statement[len(PREFIX):] + elif statement.startswith(PREFIX1): + statement = statement[len(PREFIX1):] + if statement[0] == '\n': + statement = statement[1:] + elif statement.startswith(PREFIX2): + statement = statement[len(PREFIX2):] if statement[0] == '\n': statement = statement[1:] @@ -693,7 +699,7 @@ class CryptoBackend(): def encrypt(self, text, recv_key, template, key_type): raise NotImplementedError() - def encrypt_assertion(self, statement, recv_key, key_type): + def encrypt_assertion(self, statement, recv_key, key_type, xpath=""): raise NotImplementedError() def decrypt(self, enctext, key_file): @@ -733,12 +739,25 @@ class CryptoBackendXmlSec1(CryptoBackend): except IndexError: return "" - def encrypt(self, text, recv_key, template, key_type): + def encrypt(self, text, recv_key, template, session_key_type, xpath=""): + """ + + :param text: The text to be compiled + :param recv_key: Filename of a file where the key resides + :param template: Filename of a file with the pre-encryption part + :param session_key_type: Type and size of a new session key + "des-192" generates a new 192 bits DES key for DES3 encryption + :param xpath: What should be encrypted + :return: + """ logger.debug("Encryption input len: %d" % len(text)) _, fil = make_temp("%s" % text, decode=False) com_list = [self.xmlsec, "--encrypt", "--pubkey-cert-pem", recv_key, - "--session-key", key_type, "--xml-data", fil] + "--session-key", session_key_type, "--xml-data", fil] + + if xpath: + com_list.extend(['--node-xpath', xpath]) (_stdout, _stderr, output) = self._run_xmlsec(com_list, [template], exception=DecryptError, @@ -767,7 +786,8 @@ class CryptoBackendXmlSec1(CryptoBackend): "--session-key", key_type, "--xml-data", fil, "--node-xpath", ASSERT_XPATH] - (_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl], exception=EncryptError, validate_output=False) + (_stdout, _stderr, output) = self._run_xmlsec( + com_list, [tmpl], exception=EncryptError, validate_output=False) os.unlink(fil) if not output: @@ -1206,7 +1226,7 @@ class SecurityContext(object): :param text: Text to encrypt :param recv_key: A file containing the receivers public key - :param template: A file containing the XML document template + :param template: A file containing the XMLSEC template :param key_type: The type of session key to use :result: An encrypted XML text """ @@ -1262,8 +1282,7 @@ class SecurityContext(object): return self.crypto.validate_signature(signedtext, cert_file=cert_file, cert_type=cert_type, node_name=node_name, - node_id=node_id, id_attr=id_attr, - ) + node_id=node_id, id_attr=id_attr) def _check_signature(self, decoded_xml, item, node_name=NODE_NAME, origdoc=None, id_attr="", must=False, @@ -1735,7 +1754,10 @@ def pre_encrypt_assertion(response): assertion = response.assertion response.assertion = None response.encrypted_assertion = EncryptedAssertion() - response.encrypted_assertion.add_extension_element(assertion) + if isinstance(assertion, list): + response.encrypted_assertion.add_extension_elements(assertion) + else: + response.encrypted_assertion.add_extension_element(assertion) # txt = "%s" % response # _ass = "%s" % assertion # _ass = rm_xmltag(_ass) diff --git a/tests/test_12_s_utils.py b/tests/test_12_s_utils.py index 9d41d691..f4cfdda8 100644 --- a/tests/test_12_s_utils.py +++ b/tests/test_12_s_utils.py @@ -151,7 +151,7 @@ def test_audience(): assert aud_restr.audience.text == "urn:foo:bar" def test_conditions(): - conditions = utils.factory( saml.Conditions, + conditions = utils.factory(saml.Conditions, not_before="2009-10-30T07:58:10.852Z", not_on_or_after="2009-10-30T08:03:10.852Z", audience_restriction=[utils.factory(saml.AudienceRestriction, diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index 89a85408..3ce200e2 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -1,11 +1,12 @@ #!/usr/bin/env python import base64 +from saml2.sigver import pre_encryption_part, make_temp from saml2.mdstore import MetadataStore -from saml2.saml import assertion_from_string +from saml2.saml import assertion_from_string, EncryptedAssertion from saml2.samlp import response_from_string -from saml2 import sigver +from saml2 import sigver, extension_elements_to_elements from saml2 import class_name from saml2 import time_util from saml2 import saml, samlp @@ -25,9 +26,10 @@ PUB_KEY = full_path("test.pem") PRIV_KEY = full_path("test.key") -def _eq(l1,l2): +def _eq(l1, l2): return set(l1) == set(l2) + CERT1 = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF @@ -60,7 +62,7 @@ wKe3ACFXBvqGQN0IbcH49hu0FKhYFM/GPDJcIHFBsiyMBXChpye9vBaTNEBCtU3K jjyG0hRT2mAQ9h+bkPmOvlEo/aH0xR68Z9hw4PF13w==""" from pyasn1.codec.der import decoder - + def test_cert_from_instance_1(): xml_response = open(SIGNED).read() @@ -104,7 +106,6 @@ class FakeConfig(): class TestSecurity(): - def setup_class(self): # This would be one way to initialize the security context : # @@ -126,8 +127,8 @@ class TestSecurity(): issue_instant="2009-10-30T13:20:28Z", signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1), attribute_statement=do_attribute_statement({ - ("", "", "surName"): ("Foo", ""), - ("", "", "givenName"): ("Bar", ""), + ("", "", "surName"): ("Foo", ""), + ("", "", "givenName"): ("Bar", ""), }) ) @@ -144,7 +145,7 @@ class TestSecurity(): def test_non_verify_2(self): xml_response = open(FALSE_SIGNED).read() - raises(sigver.SignatureError,self.sec.correctly_signed_response, + raises(sigver.SignatureError, self.sec.correctly_signed_response, xml_response) def test_sign_assertion(self): @@ -154,8 +155,8 @@ class TestSecurity(): #print sign_ass sass = saml.assertion_from_string(sign_ass) #print sass - assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', - 'version', 'signature', 'id']) + assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', + 'version', 'signature', 'id']) assert sass.version == "2.0" assert sass.id == "11111" assert time_util.str_to_time(sass.issue_instant) @@ -227,19 +228,21 @@ class TestSecurity(): def test_sign_response(self): response = factory(samlp.Response, - assertion=self._assertion, - id="22222", - signature=sigver.pre_signature_part("22222", self.sec.my_cert)) + assertion=self._assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) to_sign = [(class_name(self._assertion), self._assertion.id), - (class_name(response), response.id)] + (class_name(response), response.id)] s_response = sigver.signed_instance_factory(response, self.sec, to_sign) - + assert s_response is not None print s_response response = response_from_string(s_response) sass = response.assertion[0] - + print sass assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', 'version', 'signature', 'id']) @@ -252,23 +255,27 @@ class TestSecurity(): assert item.id == "22222" def test_sign_response_2(self): - assertion2 = factory( saml.Assertion, - version= "2.0", - id= "11122", - issue_instant= "2009-10-30T13:20:28Z", - signature= sigver.pre_signature_part("11122", self.sec.my_cert), - attribute_statement=do_attribute_statement({ - ("","","surName"): ("Fox",""), - ("","","givenName") :("Bear",""), - }) - ) + assertion2 = factory(saml.Assertion, + version="2.0", + id="11122", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11122", + self.sec + .my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Fox", ""), + ("", "", "givenName"): ("Bear", ""), + }) + ) response = factory(samlp.Response, - assertion=assertion2, - id="22233", - signature=sigver.pre_signature_part("22233", self.sec.my_cert)) + assertion=assertion2, + id="22233", + signature=sigver.pre_signature_part("22233", + self.sec + .my_cert)) to_sign = [(class_name(assertion2), assertion2.id), - (class_name(response), response.id)] + (class_name(response), response.id)] s_response = sigver.signed_instance_factory(response, self.sec, to_sign) @@ -277,7 +284,7 @@ class TestSecurity(): sass = response2.assertion[0] assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', - 'version', 'signature', 'id']) + 'version', 'signature', 'id']) assert sass.version == "2.0" assert sass.id == "11122" @@ -285,105 +292,115 @@ class TestSecurity(): s_response) assert isinstance(item, samlp.Response) - - def test_sign_verify(self): + + def test_sign_verify(self): response = factory(samlp.Response, - assertion=self._assertion, - id="22233", - signature=sigver.pre_signature_part("22233", self.sec.my_cert)) + assertion=self._assertion, + id="22233", + signature=sigver.pre_signature_part("22233", + self.sec + .my_cert)) to_sign = [(class_name(self._assertion), self._assertion.id), - (class_name(response), response.id)] + (class_name(response), response.id)] s_response = sigver.signed_instance_factory(response, self.sec, to_sign) print s_response - res = self.sec.verify_signature("%s" % s_response, - node_name=class_name(samlp.Response())) + res = self.sec.verify_signature("%s" % s_response, + node_name=class_name(samlp.Response())) - print res + print res assert res def test_sign_verify_with_cert_from_instance(self): response = factory(samlp.Response, - assertion=self._assertion, - id="22222", - signature=sigver.pre_signature_part("22222", self.sec.my_cert)) + assertion=self._assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) to_sign = [(class_name(self._assertion), self._assertion.id), - (class_name(response), response.id)] + (class_name(response), response.id)] s_response = sigver.signed_instance_factory(response, self.sec, to_sign) response2 = response_from_string(s_response) ci = "".join(sigver.cert_from_instance(response2)[0].split()) - assert ci == self.sec.my_cert - - res = self.sec.verify_signature("%s" % s_response, - node_name=class_name(samlp.Response())) + + res = self.sec.verify_signature("%s" % s_response, + node_name=class_name(samlp.Response())) assert res res = self.sec._check_signature(s_response, response2, class_name(response2), s_response) assert res == response2 - + def test_sign_verify_assertion_with_cert_from_instance(self): - assertion = factory( saml.Assertion, - version= "2.0", - id= "11100", - issue_instant= "2009-10-30T13:20:28Z", - signature= sigver.pre_signature_part("11100", self.sec.my_cert), - attribute_statement=do_attribute_statement({ - ("","","surName"): ("Fox",""), - ("","","givenName") :("Bear",""), - }) - ) + assertion = factory(saml.Assertion, + version="2.0", + id="11100", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11100", + self.sec + .my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Fox", ""), + ("", "", "givenName"): ("Bear", ""), + }) + ) to_sign = [(class_name(assertion), assertion.id)] - s_assertion = sigver.signed_instance_factory(assertion, self.sec, to_sign) + s_assertion = sigver.signed_instance_factory(assertion, self.sec, + to_sign) print s_assertion ass = assertion_from_string(s_assertion) ci = "".join(sigver.cert_from_instance(ass)[0].split()) assert ci == self.sec.my_cert - + res = self.sec.verify_signature("%s" % s_assertion, node_name=class_name(ass)) - assert res - + assert res + res = self.sec._check_signature(s_assertion, ass, class_name(ass)) - + assert res def test_exception_sign_verify_with_cert_from_instance(self): - assertion = factory( saml.Assertion, - version= "2.0", - id= "11100", - issue_instant= "2009-10-30T13:20:28Z", - #signature= sigver.pre_signature_part("11100", self.sec.my_cert), - attribute_statement=do_attribute_statement({ - ("","","surName"): ("Foo",""), - ("","","givenName") :("Bar",""), - }) - ) + assertion = factory(saml.Assertion, + version="2.0", + id="11100", + issue_instant="2009-10-30T13:20:28Z", + #signature= sigver.pre_signature_part("11100", + # self.sec.my_cert), + attribute_statement=do_attribute_statement({ + ("", "", "surName"): ("Foo", ""), + ("", "", "givenName"): ("Bar", ""), + }) + ) response = factory(samlp.Response, - assertion=assertion, - id="22222", - signature=sigver.pre_signature_part("22222", self.sec.my_cert)) + assertion=assertion, + id="22222", + signature=sigver.pre_signature_part("22222", + self.sec + .my_cert)) to_sign = [(class_name(response), response.id)] - + s_response = sigver.signed_instance_factory(response, self.sec, to_sign) response2 = response_from_string(s_response) # Change something that should make everything fail response2.id = "23456" raises(sigver.SignatureError, self.sec._check_signature, - s_response, response2, class_name(response2)) + s_response, response2, class_name(response2)) + class TestSecurityMetadata(): @@ -397,36 +414,71 @@ class TestSecurityMetadata(): conf.only_use_keys_in_metadata = False self.sec = sigver.security_context(conf) - self._assertion = factory( saml.Assertion, - version="2.0", - id="11111", - issue_instant="2009-10-30T13:20:28Z", - signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1), - attribute_statement=do_attribute_statement({ - ("","","surName"): ("Foo",""), - ("","","givenName") :("Bar",""), - }) + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Foo", ""), + ("", "", "givenName"): ("Bar", ""), }) ) - def test_sign_assertion(self): - ass = self._assertion - print ass - sign_ass = self.sec.sign_assertion("%s" % ass, node_id=ass.id) - #print sign_ass - sass = saml.assertion_from_string(sign_ass) - #print sass - assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant', - 'version', 'signature', 'id']) - assert sass.version == "2.0" - assert sass.id == "11111" - assert time_util.str_to_time(sass.issue_instant) - print "Crypto version : %s" % (self.sec.crypto.version()) +def test_xbox(): + conf = config.SPConfig() + conf.load_file("server_conf") + md = MetadataStore([saml, samlp], None, conf) + md.load("local", full_path("idp_example.xml")) - item = self.sec.check_signature(sass, class_name(sass), sign_ass) + conf.metadata = md + conf.only_use_keys_in_metadata = False + sec = sigver.security_context(conf) + + assertion = factory( + saml.Assertion, version="2.0", id="11111", + issue_instant="2009-10-30T13:20:28Z", + signature=sigver.pre_signature_part("11111", sec.my_cert, 1), + attribute_statement=do_attribute_statement( + {("", "", "surName"): ("Foo", ""), + ("", "", "givenName"): ("Bar", ""), }) + ) + + sigass = sec.sign_statement(assertion, class_name(assertion), + key_file="pki/mykey.pem", node_id=assertion.id) + + _ass0 = saml.assertion_from_string(sigass) + + encrypted_assertion = EncryptedAssertion() + encrypted_assertion.add_extension_element(_ass0) + + _, pre = make_temp("%s" % pre_encryption_part(), decode=False) + enctext = sec.crypto.encrypt( + "%s" % encrypted_assertion, conf.cert_file, pre, "des-192", + '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]') + + + decr_text = sec.decrypt(enctext) + _seass = saml.encrypted_assertion_from_string(decr_text) + assertions = [] + assers = extension_elements_to_elements(_seass.extension_elements, + [saml, samlp]) + + sign_cert_file = "pki/mycert.pem" + + for ass in assers: + _ass = "%s" % ass + #_ass = _ass.replace('xsi:nil="true" ', '') + #assert sigass == _ass + _txt = sec.verify_signature(_ass, sign_cert_file, + node_name=class_name(assertion)) + if _txt: + assertions.append(ass) + + print assertions - assert isinstance(item, saml.Assertion) if __name__ == "__main__": - t = TestSecurity() - t.setup_class() + #t = TestSecurity() + #t.setup_class() + #t.test_sign_then_encrypt_assertion() + test_xbox()
\ No newline at end of file diff --git a/tests/test_41_response.py b/tests/test_41_response.py index ef8d4d32..9ac3bae1 100644 --- a/tests/test_41_response.py +++ b/tests/test_41_response.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from saml2 import saml from saml2 import config from saml2.authn_context import INTERNETPROTOCOLPASSWORD @@ -9,7 +8,8 @@ from saml2.server import Server from saml2.response import response_factory from saml2.response import StatusResponse from saml2.response import AuthnResponse -from saml2.sigver import security_context, MissingKey +from saml2.sigver import security_context +from saml2.sigver import MissingKey from pytest import raises @@ -26,7 +26,6 @@ IDENTITY = {"eduPersonAffiliation": ["staff", "member"], "mail": ["foo@gmail.com"], "title": ["shortstop"]} - AUTHN = { "class_ref": INTERNETPROTOCOLPASSWORD, "authn_auth": "http://www.example.com/login" @@ -39,29 +38,28 @@ class TestResponse: name_id = server.ident.transient_nameid( "urn:mace:example.com:saml:roland:sp", "id12") - self._resp_ = server.create_authn_response(IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", - - # consumer_url - "urn:mace:example" - ".com:saml:roland:sp", - # sp_entity_id - name_id=name_id) + self._resp_ = server.create_authn_response( + IDENTITY, + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", + # consumer_url + "urn:mace:example.com:saml:roland:sp", + # sp_entity_id + name_id=name_id) self._sign_resp_ = server.create_authn_response( IDENTITY, "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=name_id, sign_assertion=True) self._resp_authn = server.create_authn_response( IDENTITY, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=name_id, authn=AUTHN) @@ -72,7 +70,8 @@ class TestResponse: def test_1(self): xml_response = ("%s" % (self._resp_,)) resp = response_factory(xml_response, self.conf, - return_addrs=["http://lingon.catalogix.se:8087/"], + return_addrs=[ + "http://lingon.catalogix.se:8087/"], outstanding_queries={ "id12": "http://localhost:8088/sso"}, timeslack=10000, decode=False) @@ -83,7 +82,8 @@ class TestResponse: def test_2(self): xml_response = self._sign_resp_ resp = response_factory(xml_response, self.conf, - return_addrs=["http://lingon.catalogix.se:8087/"], + return_addrs=[ + "http://lingon.catalogix.se:8087/"], outstanding_queries={ "id12": "http://localhost:8088/sso"}, timeslack=10000, decode=False) @@ -92,14 +92,15 @@ class TestResponse: assert isinstance(resp, AuthnResponse) - def test_only_use_keys_in_metadata(self): - conf = config.SPConfig() - conf.load_file("sp_2_conf") +def test_only_use_keys_in_metadata(self): + conf = config.SPConfig() + conf.load_file("sp_2_conf") + + sc = security_context(conf) + # should fail + raises(MissingKey, + 'sc.correctly_signed_response("%s" % self._sign_resp_)') - sc = security_context(conf) - # should fail - raises(MissingKey, - 'sc.correctly_signed_response("%s" % self._sign_resp_)') if __name__ == "__main__": t = TestResponse() diff --git a/tests/test_50_server.py b/tests/test_50_server.py index 05544676..f8678c2a 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -2,18 +2,25 @@ # -*- coding: utf-8 -*- import base64 from urlparse import parse_qs +from saml2.sigver import pre_encryption_part from saml2.assertion import Policy from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT from saml2.samlp import response_from_string from saml2.server import Server -from saml2 import samlp, saml, client, config +from saml2 import samlp +from saml2 import saml +from saml2 import client +from saml2 import config +from saml2 import class_name +from saml2 import extension_elements_to_elements from saml2 import s_utils from saml2 import sigver from saml2 import time_util from saml2.s_utils import OtherError -from saml2.s_utils import do_attribute_statement, factory +from saml2.s_utils import do_attribute_statement +from saml2.s_utils import factory from saml2.soap import make_soap_enveloped_saml_thingy from saml2 import BINDING_HTTP_POST from saml2 import BINDING_HTTP_REDIRECT @@ -182,7 +189,8 @@ class TestServer1(): name_id_policy = resp_args["name_id_policy"] assert _eq(name_id_policy.keyswv(), ["format", "allow_create"]) assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT - assert resp_args["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp" + assert resp_args[ + "sp_entity_id"] == "urn:mace:example.com:saml:roland:sp" def test_sso_response_with_identity(self): name_id = self.server.ident.transient_nameid( @@ -195,8 +203,8 @@ class TestServer1(): "mail": "derek.jeter@nyy.mlb.com", "title": "The man" }, - "id12", # in_response_to - "http://localhost:8087/", # destination + "id12", # in_response_to + "http://localhost:8087/", # destination "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=name_id, authn=AUTHN @@ -227,7 +235,8 @@ class TestServer1(): break assert len(attr.attribute_value) == 1 assert attr.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7" - assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:uri" + assert attr.name_format == "urn:oasis:names:tc:SAML:2" \ + ".0:attrname-format:uri" value = attr.attribute_value[0] assert value.text.strip() == "Short stop" assert value.get_type() == "xs:string" @@ -242,13 +251,13 @@ class TestServer1(): def test_sso_response_without_identity(self): resp = self.server.create_authn_response( {}, - "id12", # in_response_to - "http://localhost:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - userid="USER1", - authn=AUTHN, - release_policy=Policy(), - best_effort=True + "id12", # in_response_to + "http://localhost:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + userid="USER1", + authn=AUTHN, + release_policy=Policy(), + best_effort=True ) print resp.keyswv() @@ -268,12 +277,12 @@ class TestServer1(): resp = self.server.create_authn_response( {}, - "id12", # in_response_to - "http://localhost:8087/", # consumer_url - "urn:mace:example.com:saml:roland:sp", # sp_entity_id - userid="USER1", - authn=_authn, - best_effort=True + "id12", # in_response_to + "http://localhost:8087/", # consumer_url + "urn:mace:example.com:saml:roland:sp", # sp_entity_id + userid="USER1", + authn=_authn, + best_effort=True ) print resp.keyswv() @@ -297,11 +306,11 @@ class TestServer1(): print resp.status assert resp.status.status_code.value == samlp.STATUS_RESPONDER assert resp.status.status_code.status_code.value == \ - samlp.STATUS_REQUEST_UNSUPPORTED + samlp.STATUS_REQUEST_UNSUPPORTED assert resp.status.status_message.text == \ - "eduPersonAffiliation missing" + "eduPersonAffiliation missing" assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp" - assert not resp.assertion + assert not resp.assertion def test_authn_response_0(self): self.server = Server("idp_conf") @@ -346,8 +355,8 @@ class TestServer1(): signed_resp = self.server.create_authn_response( ava, - "id12", # in_response_to - "http://lingon.catalogix.se:8087/", # consumer_url + "id12", # in_response_to + "http://lingon.catalogix.se:8087/", # consumer_url "urn:mace:example.com:saml:roland:sp", # sp_entity_id name_id=name_id, sign_assertion=True @@ -480,7 +489,6 @@ def _logout_request(conf_file): class TestServerLogout(): - def test_1(self): server = Server("idp_slo_redirect_conf") req_id, request = _logout_request("sp_slo_redirect_conf") @@ -502,4 +510,3 @@ class TestServerLogout(): if __name__ == "__main__": ts = TestServer1() ts.setup_class() - ts.test_sso_response_specific_instant() diff --git a/tests/test_51_client.py b/tests/test_51_client.py index a808fca6..3eedcf9d 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -4,21 +4,32 @@ import base64 import urllib import urlparse -from saml2.authn_context import INTERNETPROTOCOLPASSWORD -from saml2.response import LogoutResponse +from saml2 import BINDING_HTTP_POST +from saml2 import BINDING_HTTP_REDIRECT +from saml2 import config +from saml2 import class_name +from saml2 import extension_elements_to_elements +from saml2 import saml +from saml2 import samlp +from saml2 import sigver +from saml2 import s_utils +from saml2.assertion import Assertion +from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.client import Saml2Client -from saml2 import samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT -from saml2 import saml, config, class_name from saml2.config import SPConfig +from saml2.response import LogoutResponse from saml2.saml import NAMEID_FORMAT_PERSISTENT from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.saml import NameID from saml2.server import Server +from saml2.sigver import pre_encryption_part +from saml2.s_utils import do_attribute_statement +from saml2.s_utils import factory from saml2.time_util import in_a_while -from py.test import raises -from fakeIDP import FakeIDP, unpack_form +from fakeIDP import FakeIDP +from fakeIDP import unpack_form AUTHN = { @@ -341,8 +352,123 @@ class TestClient: print my_name assert my_name == "urn:mace:example.com:saml:roland:sp" -# Below can only be done with dummy Server + def test_sign_then_encrypt_assertion(self): + # Begin with the IdPs side + _sec = self.server.sec + + assertion = s_utils.assertion_factory( + subject=factory(saml.Subject, text="_aaa", + name_id=factory( + saml.NameID, + format=saml.NAMEID_FORMAT_TRANSIENT)), + attribute_statement=do_attribute_statement( + { + ("", "", "surName"): ("Jeter", ""), + ("", "", "givenName"): ("Derek", ""), + } + ), + issuer=self.server._issuer(), + ) + + assertion.signature = sigver.pre_signature_part( + assertion.id, _sec.my_cert, 1) + + sigass = _sec.sign_statement(assertion, class_name(assertion), + key_file="pki/mykey.pem", + node_id=assertion.id) + # Create an Assertion instance from the signed assertion + _ass = saml.assertion_from_string(sigass) + + response = sigver.response_factory( + in_response_to="_012345", + destination="https:#www.example.com", + status=s_utils.success_status_factory(), + issuer=self.server._issuer(), + assertion=_ass + ) + + enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file, + pre_encryption_part()) + + seresp = samlp.response_from_string(enctext) + + # Now over to the client side + _csec = self.client.sec + if seresp.encrypted_assertion: + decr_text = _csec.decrypt(enctext) + seresp = samlp.response_from_string(decr_text) + resp_ass = [] + + sign_cert_file = "pki/mycert.pem" + for enc_ass in seresp.encrypted_assertion: + assers = extension_elements_to_elements( + enc_ass.extension_elements, [saml, samlp]) + for ass in assers: + if ass.signature: + if not _csec.verify_signature("%s" % ass, + sign_cert_file, + node_name=class_name(ass)): + continue + resp_ass.append(ass) + + seresp.assertion = resp_ass + seresp.encrypted_assertion = None + #print _sresp + + assert seresp.assertion + + def test_sign_then_encrypt_assertion2(self): + # Begin with the IdPs side + _sec = self.server.sec + + nameid_policy = samlp.NameIDPolicy(allow_create="false", + format=saml.NAMEID_FORMAT_PERSISTENT) + + asser = Assertion({"givenName": "Derek", "surName": "Jeter"}) + assertion = asser.construct( + self.client.config.entityid, "_012345", + "http://lingon.catalogix.se:8087/", + factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT), + policy=self.server.config.getattr("policy", "idp"), + issuer=self.server._issuer(), + attrconvs=self.server.config.attribute_converters, + authn_class=INTERNETPROTOCOLPASSWORD, + authn_auth="http://www.example.com/login") + + assertion.signature = sigver.pre_signature_part( + assertion.id, _sec.my_cert, 1) + + sigass = _sec.sign_statement(assertion, class_name(assertion), + #key_file="pki/mykey.pem", + key_file="test.key", + node_id=assertion.id) + # Create an Assertion instance from the signed assertion + _ass = saml.assertion_from_string(sigass) + + response = sigver.response_factory( + in_response_to="_012345", + destination="https://www.example.com", + status=s_utils.success_status_factory(), + issuer=self.server._issuer(), + assertion=_ass + ) + + enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file, + pre_encryption_part()) + + #seresp = samlp.response_from_string(enctext) + + resp_str = base64.encodestring(enctext) + # Now over to the client side + resp = self.client.parse_authn_request_response( + resp_str, BINDING_HTTP_POST, + {"_012345": "http://foo.example.com/service"}) + + #assert resp.encrypted_assertion == [] + assert resp.assertion + assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']} +# Below can only be done with dummy Server IDP = "urn:mace:example.com:saml:roland:idp" @@ -448,4 +574,4 @@ class TestClientWithDummy(): if __name__ == "__main__": tc = TestClient() tc.setup_class() - tc.test_sign_auth_request_0() + tc.test_sign_then_encrypt_assertion2() diff --git a/tests/test_60_sp.py b/tests/test_60_sp.py index 76c2b551..cb07c65e 100644 --- a/tests/test_60_sp.py +++ b/tests/test_60_sp.py @@ -73,6 +73,7 @@ class TestSP(): 'sn': ['Jeter'], 'title': ['The man']} + if __name__ == "__main__": _sp = TestSP() _sp.setup_class() diff --git a/tests/test_81_certificates.py b/tests/test_81_certificates.py index 88a43120..714a5c48 100644 --- a/tests/test_81_certificates.py +++ b/tests/test_81_certificates.py @@ -8,7 +8,6 @@ from saml2.cert import OpenSSLWrapper class TestGenerateCertificates(unittest.TestCase): - def test_validate_with_root_cert(self): cert_info_ca = { @@ -33,23 +32,35 @@ class TestGenerateCertificates(unittest.TestCase): ca_cert, ca_key = osw.create_certificate(cert_info_ca, request=False, write_to_file=True, - cert_dir=os.path.dirname(os.path.abspath(__file__)) + "/pki") + cert_dir=os.path.dirname( + os.path.abspath( + __file__)) + "/pki") - req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True) + req_cert_str, req_key_str = osw.create_certificate(cert_info, + request=True) ca_cert_str = osw.read_str_from_file(ca_cert) ca_key_str = osw.read_str_from_file(ca_key) - cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, + req_cert_str) valid, mess = osw.verify(ca_cert_str, cert_str) self.assertTrue(valid) - false_ca_cert, false_ca_key = osw.create_certificate(cert_info_ca, request=False, write_to_file=False) - false_req_cert_str_1, false_req_key_str_1 = osw.create_certificate(cert_info_ca, request=True) - false_cert_str_1 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_1) - false_req_cert_str_2, false_req_key_str_2 = osw.create_certificate(cert_info, request=True) - false_cert_str_2 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_2) + false_ca_cert, false_ca_key = osw.create_certificate(cert_info_ca, + request=False, + write_to_file=False) + false_req_cert_str_1, false_req_key_str_1 = osw.create_certificate( + cert_info_ca, request=True) + false_cert_str_1 = osw.create_cert_signed_certificate(false_ca_cert, + false_ca_key, + false_req_cert_str_1) + false_req_cert_str_2, false_req_key_str_2 = osw.create_certificate( + cert_info, request=True) + false_cert_str_2 = osw.create_cert_signed_certificate(false_ca_cert, + false_ca_key, + false_req_cert_str_2) valid, mess = osw.verify(false_ca_cert, cert_str) self.assertFalse(valid) @@ -106,20 +117,28 @@ class TestGenerateCertificates(unittest.TestCase): osw = OpenSSLWrapper() - ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False) + ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, + request=False) - req_cert_str, intermediate_1_key_str = osw.create_certificate(cert_intermediate_1_info, request=True) - intermediate_cert_1_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) + req_cert_str, intermediate_1_key_str = osw.create_certificate( + cert_intermediate_1_info, request=True) + intermediate_cert_1_str = osw.create_cert_signed_certificate( + ca_cert_str, ca_key_str, req_cert_str) - req_cert_str, intermediate_2_key_str = osw.create_certificate(cert_intermediate_2_info, request=True) - intermediate_cert_2_str = osw.create_cert_signed_certificate(intermediate_cert_1_str, intermediate_1_key_str, - req_cert_str) + req_cert_str, intermediate_2_key_str = osw.create_certificate( + cert_intermediate_2_info, request=True) + intermediate_cert_2_str = osw.create_cert_signed_certificate( + intermediate_cert_1_str, intermediate_1_key_str, + req_cert_str) - req_cert_str, client_key_str = osw.create_certificate(cert_client_cert_info, request=True) - client_cert_str = osw.create_cert_signed_certificate(intermediate_cert_2_str, intermediate_2_key_str, - req_cert_str) + req_cert_str, client_key_str = osw.create_certificate( + cert_client_cert_info, request=True) + client_cert_str = osw.create_cert_signed_certificate( + intermediate_cert_2_str, intermediate_2_key_str, + req_cert_str) - cert_chain = [intermediate_cert_2_str, intermediate_cert_1_str, ca_cert_str] + cert_chain = [intermediate_cert_2_str, intermediate_cert_1_str, + ca_cert_str] valid, mess = osw.verify_chain(cert_chain, client_cert_str) self.assertTrue(valid) @@ -145,21 +164,23 @@ class TestGenerateCertificates(unittest.TestCase): "organization_unit": "asdfg" } - osw = OpenSSLWrapper() - ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, - cipher_passphrase= - {"cipher": "blowfish", "passphrase": "qwerty"}) + ca_cert_str, ca_key_str = osw.create_certificate( + cert_info_ca, request=False, + cipher_passphrase={"cipher": "blowfish", "passphrase": "qwerty"}) - req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True) - cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, - passphrase="qwerty") + req_cert_str, req_key_str = osw.create_certificate(cert_info, + request=True) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, + req_cert_str, + passphrase="qwerty") valid = False try: - cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, - passphrase="qwertyqwerty") + cert_str = osw.create_cert_signed_certificate( + ca_cert_str, ca_key_str, req_cert_str, + passphrase="qwertyqwerty") except Exception: valid = True @@ -185,39 +206,59 @@ class TestGenerateCertificates(unittest.TestCase): "organization_unit": "asdfg" } - osw = OpenSSLWrapper() - ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False) + ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, + request=False) - req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True) - cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) + req_cert_str, req_key_str = osw.create_certificate(cert_info, + request=True) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, + req_cert_str) valid, mess = osw.verify(ca_cert_str, cert_str) - ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=1000, valid_to=100000) - req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True) - cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) + ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, + request=False, + valid_from=1000, + valid_to=100000) + req_cert_str, req_key_str = osw.create_certificate(cert_info, + request=True) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, + req_cert_str) valid, mess = osw.verify(ca_cert_str, cert_str) self.assertFalse(valid) - ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False) - req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True) - cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=1000, + ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, + request=False) + req_cert_str, req_key_str = osw.create_certificate(cert_info, + request=True) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, + req_cert_str, + valid_from=1000, valid_to=100000) valid, mess = osw.verify(ca_cert_str, cert_str) self.assertFalse(valid) - ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=0, valid_to=1) - req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True) - cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str) + ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, + request=False, + valid_from=0, + valid_to=1) + req_cert_str, req_key_str = osw.create_certificate(cert_info, + request=True) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, + req_cert_str) time.sleep(2) valid, mess = osw.verify(ca_cert_str, cert_str) self.assertFalse(valid) - ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False) - req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True) - cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=0, valid_to=1) + ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, + request=False) + req_cert_str, req_key_str = osw.create_certificate(cert_info, + request=True) + cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, + req_cert_str, + valid_from=0, valid_to=1) time.sleep(2) valid, mess = osw.verify(ca_cert_str, cert_str) self.assertFalse(valid) |