From 4d1e36c43f70e43c66055f46fd092799a025a111 Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Thu, 10 Jan 2019 19:39:20 +0200 Subject: Remove configurable exception type _run_xmlsec function allowed to pass the kind of exception that would be raised in case of error. This was parameter was ignored. As such, it is not needed and is removed completely. Signed-off-by: Ivan Kanakarakis --- src/saml2/sigver.py | 31 ++++++++++--------------------- tests/test_42_enc.py | 3 ++- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 59fe2dee..a18c6c4b 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -728,10 +728,8 @@ class CryptoBackendXmlSec1(CryptoBackend): com_list.extend(['--node-xpath', xpath]) (_stdout, _stderr, output) = self._run_xmlsec( - com_list, - [template], - exception=DecryptError, - validate_output=False) + com_list, [template], validate_output=False + ) return output @@ -773,10 +771,8 @@ class CryptoBackendXmlSec1(CryptoBackend): com_list.extend(['--node-id', node_id]) (_stdout, _stderr, output) = self._run_xmlsec( - com_list, - [tmpl], - exception=EncryptError, - validate_output=False) + com_list, [tmpl], validate_output=False + ) os.unlink(fil) if not output: @@ -804,10 +800,8 @@ class CryptoBackendXmlSec1(CryptoBackend): ] (_stdout, _stderr, output) = self._run_xmlsec( - com_list, - [fil], - exception=DecryptError, - validate_output=False) + com_list, [fil], validate_output=False + ) return output.decode('utf-8') def sign_statement(self, statement, node_name, key_file, node_id, id_attr): @@ -844,9 +838,8 @@ class CryptoBackendXmlSec1(CryptoBackend): try: (stdout, stderr, signed_statement) = self._run_xmlsec( - com_list, - [fil], - validate_output=False) + com_list, [fil], validate_output=False + ) # this doesn't work if --store-signatures are used if stdout == '': @@ -891,21 +884,17 @@ class CryptoBackendXmlSec1(CryptoBackend): if node_id: com_list.extend(['--node-id', node_id]) - (_stdout, stderr, _output) = self._run_xmlsec( - com_list, - [fil], - exception=SignatureError) + (_stdout, stderr, _output) = self._run_xmlsec(com_list, [fil]) return parse_xmlsec_output(stderr) - def _run_xmlsec(self, com_list, extra_args, validate_output=True, exception=XmlsecError): + def _run_xmlsec(self, com_list, extra_args, validate_output=True): """ Common code to invoke xmlsec and parse the output. :param com_list: Key-value parameter list for xmlsec :param extra_args: Positional parameters to be appended after all key-value parameters :param validate_output: Parse and validate the output - :param exception: The exception class to raise on errors :result: Whatever xmlsec wrote to an --output temporary file """ with NamedTemporaryFile(suffix='.xml', delete=self._xmlsec_delete_tmpfiles) as ntf: diff --git a/tests/test_42_enc.py b/tests/test_42_enc.py index d8e38f95..d5c348f1 100644 --- a/tests/test_42_enc.py +++ b/tests/test_42_enc.py @@ -73,7 +73,8 @@ def test_enc1(): crypto = CryptoBackendXmlSec1(xmlsec_path) (_stdout, _stderr, output) = crypto._run_xmlsec( - com_list, [tmpl], exception=EncryptError, validate_output=False) + com_list, [tmpl], validate_output=False + ) print(output) assert _stderr == "" -- cgit v1.2.1 From dbbfe1e3b032a1ecf8e2c84eca5fa74728f257f2 Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Thu, 10 Jan 2019 22:34:26 +0200 Subject: Remove validate_output parameter from _run_xmlsec All callers set it to false, but one which calls the validation method itself after the call to _run_xmlsec (which means that validation is done twice). Signed-off-by: Ivan Kanakarakis --- src/saml2/sigver.py | 25 +++++-------------------- tests/test_42_enc.py | 4 +--- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index a18c6c4b..e66d4dfa 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -592,7 +592,6 @@ def verify_redirect_signature(saml_msg, crypto, cert=None, sigkey=None): LOG_LINE = 60 * '=' + '\n%s\n' + 60 * '-' + '\n%s' + 60 * '=' -LOG_LINE_2 = 60 * '=' + '\n%s\n%s\n' + 60 * '-' + '\n%s' + 60 * '=' def make_str(txt): @@ -727,9 +726,7 @@ class CryptoBackendXmlSec1(CryptoBackend): if xpath: com_list.extend(['--node-xpath', xpath]) - (_stdout, _stderr, output) = self._run_xmlsec( - com_list, [template], validate_output=False - ) + (_stdout, _stderr, output) = self._run_xmlsec(com_list, [template]) return output @@ -770,9 +767,7 @@ class CryptoBackendXmlSec1(CryptoBackend): if node_id: com_list.extend(['--node-id', node_id]) - (_stdout, _stderr, output) = self._run_xmlsec( - com_list, [tmpl], validate_output=False - ) + (_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl]) os.unlink(fil) if not output: @@ -799,9 +794,7 @@ class CryptoBackendXmlSec1(CryptoBackend): ENC_KEY_CLASS, ] - (_stdout, _stderr, output) = self._run_xmlsec( - com_list, [fil], validate_output=False - ) + (_stdout, _stderr, output) = self._run_xmlsec(com_list, [fil]) return output.decode('utf-8') def sign_statement(self, statement, node_name, key_file, node_id, id_attr): @@ -838,7 +831,7 @@ class CryptoBackendXmlSec1(CryptoBackend): try: (stdout, stderr, signed_statement) = self._run_xmlsec( - com_list, [fil], validate_output=False + com_list, [fil] ) # this doesn't work if --store-signatures are used @@ -888,13 +881,12 @@ class CryptoBackendXmlSec1(CryptoBackend): return parse_xmlsec_output(stderr) - def _run_xmlsec(self, com_list, extra_args, validate_output=True): + def _run_xmlsec(self, com_list, extra_args): """ Common code to invoke xmlsec and parse the output. :param com_list: Key-value parameter list for xmlsec :param extra_args: Positional parameters to be appended after all key-value parameters - :param validate_output: Parse and validate the output :result: Whatever xmlsec wrote to an --output temporary file """ with NamedTemporaryFile(suffix='.xml', delete=self._xmlsec_delete_tmpfiles) as ntf: @@ -913,13 +905,6 @@ class CryptoBackendXmlSec1(CryptoBackend): raise XmlsecError('{err_code}:{err_msg}'.format( err_code=pof.returncode, err_msg=p_err)) - try: - if validate_output: - parse_xmlsec_output(p_err) - except XmlsecError as exc: - logger.error(LOG_LINE_2, p_out, p_err, exc) - raise - ntf.seek(0) return p_out, p_err, ntf.read() diff --git a/tests/test_42_enc.py b/tests/test_42_enc.py index d5c348f1..c29eca1e 100644 --- a/tests/test_42_enc.py +++ b/tests/test_42_enc.py @@ -72,9 +72,7 @@ def test_enc1(): "--node-xpath", ASSERT_XPATH] crypto = CryptoBackendXmlSec1(xmlsec_path) - (_stdout, _stderr, output) = crypto._run_xmlsec( - com_list, [tmpl], validate_output=False - ) + (_stdout, _stderr, output) = crypto._run_xmlsec(com_list, [tmpl]) print(output) assert _stderr == "" -- cgit v1.2.1 From 2cd3c84cdc9aeeccedbebe95494e1f831cdb36b1 Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Thu, 10 Jan 2019 22:22:46 +0200 Subject: Raise XmlsecError if xmlsec1 returns an error When xmlsec1 fails, it returns a non-zero returncode. The returncode was checked only for values less than zero, and not greater than zero. This results in situations where xmlsec1 fails to run a command, but the executation continues as nothing failed. This happens to be ok, because, the result we depend upon is coupled to xmlsec1's output stream. When xmlsec1 fails, the output stream is empty and the error stream will have information relevant to the failure cause. Now, the check expects a returncode with value zero, otherwise an XmlsecError exception is raised, to be handled by the caller up the stack. This could have been a major security issue, but we stood lucky. Special thanks to @pjsg for bringing this to our attention. Signed-off-by: Ivan Kanakarakis --- src/saml2/sigver.py | 73 +++++++++++++++++++++++++++++++------------------ tests/test_40_sigver.py | 4 +-- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index e66d4dfa..f4434d55 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -591,9 +591,6 @@ def verify_redirect_signature(saml_msg, crypto, cert=None, sigkey=None): return bool(signer.verify(string, _sign, _key)) -LOG_LINE = 60 * '=' + '\n%s\n' + 60 * '-' + '\n%s' + 60 * '=' - - def make_str(txt): if isinstance(txt, six.string_types): return txt @@ -726,7 +723,10 @@ class CryptoBackendXmlSec1(CryptoBackend): if xpath: com_list.extend(['--node-xpath', xpath]) - (_stdout, _stderr, output) = self._run_xmlsec(com_list, [template]) + try: + (_stdout, _stderr, output) = self._run_xmlsec(com_list, [template]) + except XmlsecError as e: + six.raise_from(EncryptError(com_list), e) return output @@ -748,8 +748,9 @@ class CryptoBackendXmlSec1(CryptoBackend): if isinstance(statement, SamlBase): statement = pre_encrypt_assertion(statement) - _, fil = make_temp(_str(statement), decode=False, - delete=False) + _, fil = make_temp( + _str(statement), decode=False, delete=self._xmlsec_delete_tmpfiles + ) _, tmpl = make_temp(_str(template), decode=False) if not node_xpath: @@ -767,11 +768,10 @@ class CryptoBackendXmlSec1(CryptoBackend): if node_id: com_list.extend(['--node-id', node_id]) - (_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl]) - - os.unlink(fil) - if not output: - raise EncryptError(_stderr) + try: + (_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl]) + except XmlsecError as e: + six.raise_from(EncryptError(com_list), e) return output.decode('utf-8') @@ -794,7 +794,11 @@ class CryptoBackendXmlSec1(CryptoBackend): ENC_KEY_CLASS, ] - (_stdout, _stderr, output) = self._run_xmlsec(com_list, [fil]) + try: + (_stdout, _stderr, output) = self._run_xmlsec(com_list, [fil]) + except XmlsecError as e: + six.raise_from(DecryptError(com_list), e) + return output.decode('utf-8') def sign_statement(self, statement, node_name, key_file, node_id, id_attr): @@ -877,7 +881,10 @@ class CryptoBackendXmlSec1(CryptoBackend): if node_id: com_list.extend(['--node-id', node_id]) - (_stdout, stderr, _output) = self._run_xmlsec(com_list, [fil]) + try: + (_stdout, stderr, _output) = self._run_xmlsec(com_list, [fil]) + except XmlsecError as e: + six.raise_from(SignatureError(com_list), e) return parse_xmlsec_output(stderr) @@ -900,10 +907,12 @@ class CryptoBackendXmlSec1(CryptoBackend): p_out = p_out.decode() p_err = p_err.decode() - if pof.returncode is not None and pof.returncode < 0: - logger.error(LOG_LINE, p_out, p_err) - raise XmlsecError('{err_code}:{err_msg}'.format( - err_code=pof.returncode, err_msg=p_err)) + if pof.returncode != 0: + errmsg = "returncode={code}\nerror={err}\noutput={out}".format( + code=pof.returncode, err=p_err, out=p_out + ) + logger.error(errmsg) + raise XmlsecError(errmsg) ntf.seek(0) return p_out, p_err, ntf.read() @@ -1352,18 +1361,26 @@ class SecurityContext(object): if self.enc_key_files is not None: for _enc_key_file in self.enc_key_files: - _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr) - if _enctext is not None and len(_enctext) > 0: - return _enctext + try: + _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr) + except XmlsecError as e: + continue + else: + if _enctext: + return _enctext for _key in keys: if _key is not None and len(_key.strip()) > 0: if not isinstance(_key, six.binary_type): _key = str(_key).encode('ascii') _, key_file = make_temp(_key, decode=False) - _enctext = self.crypto.decrypt(enctext, key_file, id_attr) - if _enctext is not None and len(_enctext) > 0: - return _enctext + try: + _enctext = self.crypto.decrypt(enctext, key_file, id_attr) + except XmlsecError as e: + continue + else: + if _enctext: + return _enctext return enctext @@ -1380,9 +1397,13 @@ class SecurityContext(object): if self.enc_key_files is not None: for _enc_key_file in self.enc_key_files: - _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr) - if _enctext is not None and len(_enctext) > 0: - return _enctext + try: + _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr) + except XmlsecError: + continue + else: + if _enctext is not None and len(_enctext) > 0: + return _enctext if key_file is not None and len(key_file.strip()) > 0: _enctext = self.crypto.decrypt(enctext, key_file, id_attr) diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index ba5cf639..e3a20e49 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -791,7 +791,7 @@ def test_xbox(): str(encrypted_assertion), conf.cert_file, pre, "des-192", '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]') - decr_text = sec.decrypt(enctext) + decr_text = sec.decrypt(enctext, key_file=PRIV_KEY) _seass = saml.encrypted_assertion_from_string(decr_text) assertions = [] assers = extension_elements_to_elements(_seass.extension_elements, @@ -844,7 +844,7 @@ def test_xbox_non_ascii_ava(): str(encrypted_assertion), conf.cert_file, pre, "des-192", '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]') - decr_text = sec.decrypt(enctext) + decr_text = sec.decrypt(enctext, key_file=PRIV_KEY) _seass = saml.encrypted_assertion_from_string(decr_text) assertions = [] assers = extension_elements_to_elements(_seass.extension_elements, -- cgit v1.2.1 From 448d6276f1c99bd2d48f9e4acc744e56c3da008c Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Fri, 11 Jan 2019 23:11:53 +0200 Subject: Reformat code related to PYSAML2_KEEP_XMLSEC_TMP Signed-off-by: Ivan Kanakarakis --- src/saml2/sigver.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index f4434d55..1cb0bec4 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -678,10 +678,9 @@ class CryptoBackendXmlSec1(CryptoBackend): CryptoBackend.__init__(self, **kwargs) assert (isinstance(xmlsec_binary, six.string_types)) self.xmlsec = xmlsec_binary - if os.environ.get('PYSAML2_KEEP_XMLSEC_TMP', None): - self._xmlsec_delete_tmpfiles = False - else: - self._xmlsec_delete_tmpfiles = True + self._xmlsec_delete_tmpfiles = os.environ.get( + 'PYSAML2_KEEP_XMLSEC_TMP', False + ) try: self.non_xml_crypto = RSACrypto(kwargs['rsa_key']) @@ -817,10 +816,11 @@ class CryptoBackendXmlSec1(CryptoBackend): statement = str(statement) _, fil = make_temp( - statement, - suffix='.xml', - decode=False, - delete=self._xmlsec_delete_tmpfiles) + statement, + suffix='.xml', + decode=False, + delete=self._xmlsec_delete_tmpfiles, + ) com_list = [ self.xmlsec, @@ -864,10 +864,11 @@ class CryptoBackendXmlSec1(CryptoBackend): signedtext = signedtext.encode('utf-8') _, fil = make_temp( - signedtext, - suffix='.xml', - decode=False, - delete=self._xmlsec_delete_tmpfiles) + signedtext, + suffix='.xml', + decode=False, + delete=self._xmlsec_delete_tmpfiles, + ) com_list = [ self.xmlsec, @@ -1461,11 +1462,14 @@ class SecurityContext(object): for cert in _certs: if isinstance(cert, six.string_types): - certs.append(make_temp( - pem_format(cert), - suffix='.pem', - decode=False, - delete=self._xmlsec_delete_tmpfiles)) + certs.append( + make_temp( + pem_format(cert), + suffix='.pem', + decode=False, + delete=self._xmlsec_delete_tmpfiles, + ) + ) else: certs.append(cert) else: @@ -1478,7 +1482,8 @@ class SecurityContext(object): pem_format(cert), suffix='.pem', decode=False, - delete=self._xmlsec_delete_tmpfiles) + delete=self._xmlsec_delete_tmpfiles, + ) for cert in cert_from_instance(item) ] else: -- cgit v1.2.1 From 342e376a4aaab2a5eeafb41f1d971019be887404 Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Thu, 10 Jan 2019 22:37:58 +0200 Subject: Refactor SecurityContext::decrypt to report failures Previously there was no reliable way to know whether decryption failed. Now, when decryption fails an DecryptError exception is raised containing the keys that were tried to decrypt the given ciphertext. The same refactoring is done to SecurityContext::decrypt_keys. Signed-off-by: Ivan Kanakarakis --- src/saml2/sigver.py | 89 ++++++++++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 46 deletions(-) diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 1cb0bec4..0e3e0839 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -5,6 +5,7 @@ from OpenSSL import crypto import base64 import hashlib +import itertools import logging import os import ssl @@ -857,7 +858,8 @@ class CryptoBackendXmlSec1(CryptoBackend): :param cert_type: The file type of the certificate :param node_name: The name of the class that is signed :param node_id: The identifier of the node - :param id_attr: Should normally be one of 'id', 'Id' or 'ID' + :param id_attr: The attribute name for the identifier, normally one of + 'id','Id' or 'ID' :return: Boolean True if the signature was correct otherwise False. """ if not isinstance(signedtext, six.binary_type): @@ -1350,40 +1352,32 @@ class SecurityContext(object): """ Decrypting an encrypted text by the use of a private key. :param enctext: The encrypted text as a string + :param keys: Keys to try to decrypt enctext with + :param id_attr: The attribute name for the identifier, normally one of + 'id','Id' or 'ID' :return: The decrypted text """ - _enctext = None - - if not id_attr: - id_attr = self.id_attr + key_files = [] if not isinstance(keys, list): keys = [keys] - if self.enc_key_files is not None: - for _enc_key_file in self.enc_key_files: - try: - _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr) - except XmlsecError as e: - continue - else: - if _enctext: - return _enctext - - for _key in keys: - if _key is not None and len(_key.strip()) > 0: - if not isinstance(_key, six.binary_type): - _key = str(_key).encode('ascii') - _, key_file = make_temp(_key, decode=False) - try: - _enctext = self.crypto.decrypt(enctext, key_file, id_attr) - except XmlsecError as e: - continue - else: - if _enctext: - return _enctext + keys = [key for key in keys if key] + for key in keys: + if not isinstance(key, six.binary_type): + key = key.encode("ascii") + _, key_file = make_temp(key, decode=False, delete=False) + key_files.append(key_file) - return enctext + try: + dectext = self.decrypt(enctext, key_file=key_files, id_attr=id_attr) + except DecryptError as e: + raise + else: + return dectext + finally: + for key_file in key_files: + os.unlink(key_file) def decrypt(self, enctext, key_file=None, id_attr=''): """ Decrypting an encrypted text by the use of a private key. @@ -1391,26 +1385,27 @@ class SecurityContext(object): :param enctext: The encrypted text as a string :return: The decrypted text """ - _enctext = None - if not id_attr: id_attr = self.id_attr - if self.enc_key_files is not None: - for _enc_key_file in self.enc_key_files: - try: - _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr) - except XmlsecError: - continue - else: - if _enctext is not None and len(_enctext) > 0: - return _enctext + if not isinstance(key_file, list): + key_file = [key_file] + + key_files = [ + key for key in itertools.chain(key_file, self.enc_key_files) if key + ] + for key_file in key_files: + try: + dectext = self.crypto.decrypt(enctext, key_file, id_attr) + except XmlsecError as e: + continue + else: + if dectext: + return dectext - if key_file is not None and len(key_file.strip()) > 0: - _enctext = self.crypto.decrypt(enctext, key_file, id_attr) - if _enctext is not None and len(_enctext) > 0: - return _enctext - return enctext + errmsg = "No key was able to decrypt the ciphertext. Keys tried: {keys}" + errmsg = errmsg.format(keys=key_files) + raise DecryptError(errmsg) def verify_signature(self, signedtext, cert_file=None, cert_type='pem', node_name=NODE_NAME, node_id=None, id_attr=''): """ Verifies the signature of a XML document. @@ -1420,7 +1415,8 @@ class SecurityContext(object): :param cert_type: The file type of the certificate :param node_name: The name of the class that is signed :param node_id: The identifier of the node - :param id_attr: Should normally be one of 'id', 'Id' or 'ID' + :param id_attr: The attribute name for the identifier, normally one of + 'id','Id' or 'ID' :return: Boolean True if the signature was correct otherwise False. """ # This is only for testing purposes, otherwise when would you receive @@ -1527,7 +1523,8 @@ class SecurityContext(object): :param item: Parsed entity :param node_name: The name of the node/class/element that is signed :param origdoc: The original XML string - :param id_attr: + :param id_attr: The attribute name for the identifier, normally one of + 'id','Id' or 'ID' :param must: :return: """ -- cgit v1.2.1 From 7f4e3e51027f2760240be7af058ab5329b73d14d Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Thu, 10 Jan 2019 22:48:39 +0200 Subject: Reformat code for test_40_sigver Signed-off-by: Ivan Kanakarakis --- tests/test_40_sigver.py | 123 +++++++++++++++++++++++++++++------------------- 1 file changed, 74 insertions(+), 49 deletions(-) diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index e3a20e49..898d1387 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -24,6 +24,7 @@ from py.test import raises from pathutils import full_path + SIGNED = full_path("saml_signed.xml") UNSIGNED = full_path("saml_unsigned.xml") SIMPLE_SAML_PHP_RESPONSE = full_path("simplesamlphp_authnresponse.xml") @@ -36,6 +37,12 @@ PRIV_KEY = full_path("test.key") ENC_PUB_KEY = full_path("pki/test_1.crt") ENC_PRIV_KEY = full_path("pki/test.key") +INVALID_KEY = full_path("non-existent.key") + +IDP_EXAMPLE = full_path("idp_example.xml") +METADATA_CERT = full_path("metadata_cert.xml") + + def _eq(l1, l2): return set(l1) == set(l2) @@ -721,7 +728,7 @@ class TestSecurityMetadata(): conf = config.SPConfig() conf.load_file("server_conf") md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("metadata_cert.xml")) + md.load("local", METADATA_CERT) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -742,7 +749,7 @@ class TestSecurityMetadataNonAsciiAva(): conf = config.SPConfig() conf.load_file("server_conf") md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("metadata_cert.xml")) + md.load("local", METADATA_CERT) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -762,7 +769,7 @@ def test_xbox(): conf = config.SPConfig() conf.load_file("server_conf") md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("idp_example.xml")) + md.load("local", IDP_EXAMPLE) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -773,41 +780,50 @@ def test_xbox(): issue_instant="2009-10-30T13:20:28Z", signature=sigver.pre_signature_part("11111", sec.my_cert, 1), attribute_statement=do_attribute_statement( - {("", "", "surName"): ("Foo", ""), - ("", "", "givenName"): ("Bar", ""), }) + { + ("", "", "surName"): ("Foo", ""), + ("", "", "givenName"): ("Bar", ""), + } + ) ) - sigass = sec.sign_statement(assertion, class_name(assertion), - key_file=full_path("test.key"), - node_id=assertion.id) + sigass = sec.sign_statement( + assertion, + class_name(assertion), + key_file=PRIV_KEY, + node_id=assertion.id, + ) _ass0 = saml.assertion_from_string(sigass) - encrypted_assertion = EncryptedAssertion() encrypted_assertion.add_extension_element(_ass0) - _, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False) + _, pre = make_temp( + str(pre_encryption_part()).encode('utf-8'), decode=False + ) enctext = sec.crypto.encrypt( - str(encrypted_assertion), conf.cert_file, pre, "des-192", - '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]') + str(encrypted_assertion), + conf.cert_file, + pre, + "des-192", + '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]', + ) decr_text = sec.decrypt(enctext, key_file=PRIV_KEY) _seass = saml.encrypted_assertion_from_string(decr_text) assertions = [] - assers = extension_elements_to_elements(_seass.extension_elements, - [saml, samlp]) - - sign_cert_file = full_path("test.pem") + assers = extension_elements_to_elements( + _seass.extension_elements, [saml, samlp] + ) for ass in assers: - _ass = "%s" % ass - #_ass = _ass.replace('xsi:nil="true" ', '') - #assert sigass == _ass - _txt = sec.verify_signature(_ass, sign_cert_file, - node_name=class_name(assertion)) + _txt = sec.verify_signature( + str(ass), PUB_KEY, node_name=class_name(assertion) + ) if _txt: assertions.append(ass) + assert assertions print(assertions) @@ -815,7 +831,7 @@ def test_xbox_non_ascii_ava(): conf = config.SPConfig() conf.load_file("server_conf") md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("idp_example.xml")) + md.load("local", IDP_EXAMPLE) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -826,41 +842,50 @@ def test_xbox_non_ascii_ava(): issue_instant="2009-10-30T13:20:28Z", signature=sigver.pre_signature_part("11111", sec.my_cert, 1), attribute_statement=do_attribute_statement( - {("", "", "surName"): ("Föö", ""), - ("", "", "givenName"): ("Bär", ""), }) + { + ("", "", "surName"): ("Föö", ""), + ("", "", "givenName"): ("Bär", ""), + } + ) ) - sigass = sec.sign_statement(assertion, class_name(assertion), - key_file=full_path("test.key"), - node_id=assertion.id) + sigass = sec.sign_statement( + assertion, + class_name(assertion), + key_file=PRIV_KEY, + node_id=assertion.id, + ) _ass0 = saml.assertion_from_string(sigass) - encrypted_assertion = EncryptedAssertion() encrypted_assertion.add_extension_element(_ass0) - _, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False) + _, pre = make_temp( + str(pre_encryption_part()).encode('utf-8'), decode=False + ) enctext = sec.crypto.encrypt( - str(encrypted_assertion), conf.cert_file, pre, "des-192", - '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]') + str(encrypted_assertion), + conf.cert_file, + pre, + "des-192", + '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]', + ) decr_text = sec.decrypt(enctext, key_file=PRIV_KEY) _seass = saml.encrypted_assertion_from_string(decr_text) assertions = [] - assers = extension_elements_to_elements(_seass.extension_elements, - [saml, samlp]) - - sign_cert_file = full_path("test.pem") + assers = extension_elements_to_elements( + _seass.extension_elements, [saml, samlp] + ) for ass in assers: - _ass = "%s" % ass - #_ass = _ass.replace('xsi:nil="true" ', '') - #assert sigass == _ass - _txt = sec.verify_signature(_ass, sign_cert_file, - node_name=class_name(assertion)) + _txt = sec.verify_signature( + str(ass), PUB_KEY, node_name=class_name(assertion) + ) if _txt: assertions.append(ass) + assert assertions print(assertions) @@ -869,7 +894,7 @@ def test_okta(): conf.load_file("server_conf") conf.id_attr_name = 'Id' md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("idp_example.xml")) + md.load("local", IDP_EXAMPLE) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -892,7 +917,7 @@ def test_xmlsec_err(): conf = config.SPConfig() conf.load_file("server_conf") md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("idp_example.xml")) + md.load("local", IDP_EXAMPLE) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -909,7 +934,7 @@ def test_xmlsec_err(): try: sec.sign_statement(assertion, class_name(assertion), - key_file=full_path("tes.key"), + key_file=INVALID_KEY, node_id=assertion.id) except (XmlsecError, SigverError) as err: # should throw an exception pass @@ -921,7 +946,7 @@ def test_xmlsec_err_non_ascii_ava(): conf = config.SPConfig() conf.load_file("server_conf") md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("idp_example.xml")) + md.load("local", IDP_EXAMPLE) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -938,7 +963,7 @@ def test_xmlsec_err_non_ascii_ava(): try: sec.sign_statement(assertion, class_name(assertion), - key_file=full_path("tes.key"), + key_file=INVALID_KEY, node_id=assertion.id) except (XmlsecError, SigverError) as err: # should throw an exception pass @@ -950,7 +975,7 @@ def test_sha256_signing(): conf = config.SPConfig() conf.load_file("server_conf") md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("idp_example.xml")) + md.load("local", IDP_EXAMPLE) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -967,7 +992,7 @@ def test_sha256_signing(): ) s = sec.sign_statement(assertion, class_name(assertion), - key_file=full_path("test.key"), + key_file=PRIV_KEY, node_id=assertion.id) assert s @@ -976,7 +1001,7 @@ def test_sha256_signing_non_ascii_ava(): conf = config.SPConfig() conf.load_file("server_conf") md = MetadataStore([saml, samlp], None, conf) - md.load("local", full_path("idp_example.xml")) + md.load("local", IDP_EXAMPLE) conf.metadata = md conf.only_use_keys_in_metadata = False @@ -993,7 +1018,7 @@ def test_sha256_signing_non_ascii_ava(): ) s = sec.sign_statement(assertion, class_name(assertion), - key_file=full_path("test.key"), + key_file=PRIV_KEY, node_id=assertion.id) assert s -- cgit v1.2.1 From cb760c81ce6739821782040b648ce42527b22b21 Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Thu, 10 Jan 2019 22:53:56 +0200 Subject: Convert exception expectation to with-raises idiom Signed-off-by: Ivan Kanakarakis --- tests/test_40_sigver.py | 30 +++++++++++------------ tests/test_50_server.py | 63 +++++++++++++++---------------------------------- 2 files changed, 33 insertions(+), 60 deletions(-) diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index 898d1387..092fbc67 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -932,14 +932,13 @@ def test_xmlsec_err(): ("", "", "givenName"): ("Bar", ""), }) ) - try: - sec.sign_statement(assertion, class_name(assertion), - key_file=INVALID_KEY, - node_id=assertion.id) - except (XmlsecError, SigverError) as err: # should throw an exception - pass - else: - assert False + with raises(XmlsecError): + sec.sign_statement( + assertion, + class_name(assertion), + key_file=INVALID_KEY, + node_id=assertion.id, + ) def test_xmlsec_err_non_ascii_ava(): @@ -961,14 +960,13 @@ def test_xmlsec_err_non_ascii_ava(): ("", "", "givenName"): ("Bär", ""), }) ) - try: - sec.sign_statement(assertion, class_name(assertion), - key_file=INVALID_KEY, - node_id=assertion.id) - except (XmlsecError, SigverError) as err: # should throw an exception - pass - else: - assert False + with raises(XmlsecError): + sec.sign_statement( + assertion, + class_name(assertion), + key_file=INVALID_KEY, + node_id=assertion.id, + ) def test_sha256_signing(): diff --git a/tests/test_50_server.py b/tests/test_50_server.py index 11ace47b..dc6cbf42 100644 --- a/tests/test_50_server.py +++ b/tests/test_50_server.py @@ -8,7 +8,7 @@ from six.moves.urllib.parse import parse_qs import uuid from saml2.cert import OpenSSLWrapper -from saml2.sigver import make_temp, EncryptError, CertificateError +from saml2.sigver import make_temp, DecryptError, EncryptError, CertificateError from saml2.assertion import Policy from saml2.authn_context import INTERNETPROTOCOLPASSWORD from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT @@ -34,6 +34,7 @@ from py.test import raises from pathutils import full_path import saml2.xmldsig as ds + nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="123456") @@ -171,7 +172,7 @@ class TestServer1(): assert attr1.attribute_value[0].text == "Derek" assert attr0.friendly_name == "sn" assert attr0.attribute_value[0].text == "Jeter" - # + subject = assertion.subject assert _eq(subject.keyswv(), ["text", "name_id"]) assert subject.text == "_aaa" @@ -613,9 +614,11 @@ class TestServer1(): decr_text_old = copy.deepcopy("%s" % signed_resp) - decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[0]["key_file"]) - - assert decr_text == decr_text_old + with raises(DecryptError): + decr_text = self.server.sec.decrypt( + signed_resp, + self.client.config.encryption_keypairs[0]["key_file"], + ) decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"]) @@ -958,7 +961,7 @@ class TestServer1(): self.verify_advice_assertion(resp, decr_text_2) def test_encrypted_response_8(self): - try: + with raises(EncryptError): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to @@ -973,13 +976,8 @@ class TestServer1(): encrypt_cert_advice="whatever", encrypt_cert_assertion="whatever" ) - assert False, "Must throw an exception" - except EncryptError as ex: - pass - except Exception as ex: - assert False, "Wrong exception!" - try: + with raises(EncryptError): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to @@ -993,13 +991,8 @@ class TestServer1(): pefim=True, encrypt_cert_advice="whatever", ) - assert False, "Must throw an exception" - except EncryptError as ex: - pass - except Exception as ex: - assert False, "Wrong exception!" - try: + with raises(EncryptError): _resp = self.server.create_authn_response( self.ava, "id12", # in_response_to @@ -1013,15 +1006,10 @@ class TestServer1(): encrypted_advice_attributes=False, encrypt_cert_assertion="whatever" ) - assert False, "Must throw an exception" - except EncryptError as ex: - pass - except Exception as ex: - assert False, "Wrong exception!" _server = Server("idp_conf_verify_cert") - try: + with raises(CertificateError): _resp = _server.create_authn_response( self.ava, "id12", # in_response_to @@ -1036,13 +1024,8 @@ class TestServer1(): encrypt_cert_advice="whatever", encrypt_cert_assertion="whatever" ) - assert False, "Must throw an exception" - except CertificateError as ex: - pass - except Exception as ex: - assert False, "Wrong exception!" - try: + with raises(CertificateError): _resp = _server.create_authn_response( self.ava, "id12", # in_response_to @@ -1056,13 +1039,8 @@ class TestServer1(): pefim=True, encrypt_cert_advice="whatever", ) - assert False, "Must throw an exception" - except CertificateError as ex: - pass - except Exception as ex: - assert False, "Wrong exception!" - try: + with raises(CertificateError): _resp = _server.create_authn_response( self.ava, "id12", # in_response_to @@ -1076,11 +1054,6 @@ class TestServer1(): encrypted_advice_attributes=False, encrypt_cert_assertion="whatever" ) - assert False, "Must throw an exception" - except CertificateError as ex: - pass - except Exception as ex: - assert False, "Wrong exception!" def test_encrypted_response_9(self): _server = Server("idp_conf_sp_no_encrypt") @@ -1715,9 +1688,11 @@ class TestServer1NonAsciiAva(): decr_text_old = copy.deepcopy("%s" % signed_resp) - decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[0]["key_file"]) - - assert decr_text == decr_text_old + with raises(DecryptError): + decr_text = self.server.sec.decrypt( + signed_resp, + self.client.config.encryption_keypairs[0]["key_file"], + ) decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"]) -- cgit v1.2.1 From 9ce6dfd8940ba9c24d1452200d55247f0a766b0d Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Fri, 11 Jan 2019 23:32:18 +0200 Subject: Refactor CryptoBackendXmlSec1::sign_statement Signed-off-by: Ivan Kanakarakis --- src/saml2/sigver.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 0e3e0839..6e9ebf9b 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -835,19 +835,16 @@ class CryptoBackendXmlSec1(CryptoBackend): com_list.extend(['--node-id', node_id]) try: - (stdout, stderr, signed_statement) = self._run_xmlsec( - com_list, [fil] - ) - - # this doesn't work if --store-signatures are used - if stdout == '': - if signed_statement: - return signed_statement.decode('utf-8') - - logger.error('Signing operation failed :\nstdout : %s\nstderr : %s', stdout, stderr) - raise SigverError(stderr) - except DecryptError: - raise SigverError('Signing failed') + (stdout, stderr, output) = self._run_xmlsec(com_list, [fil]) + except XmlsecError as e: + raise SignatureError(com_list) + + # this does not work if --store-signatures is used + if output: + return output.decode("utf-8") + if stdout: + return stdout.decode("utf-8") + raise SignatureError(stderr) def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_id, id_attr): """ -- cgit v1.2.1