summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2019-01-14 21:20:35 +0200
committerGitHub <noreply@github.com>2019-01-14 21:20:35 +0200
commitddb78de1ff43a3a43e20017cb05d16d223d7da7e (patch)
tree64b069227656016133e019821dd65f1abd2cfd28
parent435ae0176f917b089f6ed7de9c866b7b99ad8097 (diff)
parent9ce6dfd8940ba9c24d1452200d55247f0a766b0d (diff)
downloadpysaml2-ddb78de1ff43a3a43e20017cb05d16d223d7da7e.tar.gz
Merge pull request #583 from c00kiemon5ter/fix-check-xmlsec-returncode
Check the xmlsec returncode
-rw-r--r--src/saml2/sigver.py210
-rw-r--r--tests/test_40_sigver.py153
-rw-r--r--tests/test_42_enc.py3
-rw-r--r--tests/test_50_server.py63
4 files changed, 210 insertions, 219 deletions
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
index 59fe2dee..6e9ebf9b 100644
--- a/src/saml2/sigver.py
+++ b/src/saml2/sigver.py
@@ -5,6 +5,7 @@ from OpenSSL import crypto
import base64
import hashlib
+import itertools
import logging
import os
import ssl
@@ -591,10 +592,6 @@ def verify_redirect_signature(saml_msg, crypto, cert=None, sigkey=None):
return bool(signer.verify(string, _sign, _key))
-LOG_LINE = 60 * '=' + '\n%s\n' + 60 * '-' + '\n%s' + 60 * '='
-LOG_LINE_2 = 60 * '=' + '\n%s\n%s\n' + 60 * '-' + '\n%s' + 60 * '='
-
-
def make_str(txt):
if isinstance(txt, six.string_types):
return txt
@@ -682,10 +679,9 @@ class CryptoBackendXmlSec1(CryptoBackend):
CryptoBackend.__init__(self, **kwargs)
assert (isinstance(xmlsec_binary, six.string_types))
self.xmlsec = xmlsec_binary
- if os.environ.get('PYSAML2_KEEP_XMLSEC_TMP', None):
- self._xmlsec_delete_tmpfiles = False
- else:
- self._xmlsec_delete_tmpfiles = True
+ self._xmlsec_delete_tmpfiles = os.environ.get(
+ 'PYSAML2_KEEP_XMLSEC_TMP', False
+ )
try:
self.non_xml_crypto = RSACrypto(kwargs['rsa_key'])
@@ -727,11 +723,10 @@ class CryptoBackendXmlSec1(CryptoBackend):
if xpath:
com_list.extend(['--node-xpath', xpath])
- (_stdout, _stderr, output) = self._run_xmlsec(
- com_list,
- [template],
- exception=DecryptError,
- validate_output=False)
+ try:
+ (_stdout, _stderr, output) = self._run_xmlsec(com_list, [template])
+ except XmlsecError as e:
+ six.raise_from(EncryptError(com_list), e)
return output
@@ -753,8 +748,9 @@ class CryptoBackendXmlSec1(CryptoBackend):
if isinstance(statement, SamlBase):
statement = pre_encrypt_assertion(statement)
- _, fil = make_temp(_str(statement), decode=False,
- delete=False)
+ _, fil = make_temp(
+ _str(statement), decode=False, delete=self._xmlsec_delete_tmpfiles
+ )
_, tmpl = make_temp(_str(template), decode=False)
if not node_xpath:
@@ -772,15 +768,10 @@ class CryptoBackendXmlSec1(CryptoBackend):
if node_id:
com_list.extend(['--node-id', node_id])
- (_stdout, _stderr, output) = self._run_xmlsec(
- com_list,
- [tmpl],
- exception=EncryptError,
- validate_output=False)
-
- os.unlink(fil)
- if not output:
- raise EncryptError(_stderr)
+ try:
+ (_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl])
+ except XmlsecError as e:
+ six.raise_from(EncryptError(com_list), e)
return output.decode('utf-8')
@@ -803,11 +794,11 @@ class CryptoBackendXmlSec1(CryptoBackend):
ENC_KEY_CLASS,
]
- (_stdout, _stderr, output) = self._run_xmlsec(
- com_list,
- [fil],
- exception=DecryptError,
- validate_output=False)
+ try:
+ (_stdout, _stderr, output) = self._run_xmlsec(com_list, [fil])
+ except XmlsecError as e:
+ six.raise_from(DecryptError(com_list), e)
+
return output.decode('utf-8')
def sign_statement(self, statement, node_name, key_file, node_id, id_attr):
@@ -826,10 +817,11 @@ class CryptoBackendXmlSec1(CryptoBackend):
statement = str(statement)
_, fil = make_temp(
- statement,
- suffix='.xml',
- decode=False,
- delete=self._xmlsec_delete_tmpfiles)
+ statement,
+ suffix='.xml',
+ decode=False,
+ delete=self._xmlsec_delete_tmpfiles,
+ )
com_list = [
self.xmlsec,
@@ -843,20 +835,16 @@ class CryptoBackendXmlSec1(CryptoBackend):
com_list.extend(['--node-id', node_id])
try:
- (stdout, stderr, signed_statement) = self._run_xmlsec(
- com_list,
- [fil],
- validate_output=False)
+ (stdout, stderr, output) = self._run_xmlsec(com_list, [fil])
+ except XmlsecError as e:
+ raise SignatureError(com_list)
- # this doesn't work if --store-signatures are used
- if stdout == '':
- if signed_statement:
- return signed_statement.decode('utf-8')
-
- logger.error('Signing operation failed :\nstdout : %s\nstderr : %s', stdout, stderr)
- raise SigverError(stderr)
- except DecryptError:
- raise SigverError('Signing failed')
+ # this does not work if --store-signatures is used
+ if output:
+ return output.decode("utf-8")
+ if stdout:
+ return stdout.decode("utf-8")
+ raise SignatureError(stderr)
def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_id, id_attr):
"""
@@ -867,17 +855,19 @@ class CryptoBackendXmlSec1(CryptoBackend):
:param cert_type: The file type of the certificate
:param node_name: The name of the class that is signed
:param node_id: The identifier of the node
- :param id_attr: Should normally be one of 'id', 'Id' or 'ID'
+ :param id_attr: The attribute name for the identifier, normally one of
+ 'id','Id' or 'ID'
:return: Boolean True if the signature was correct otherwise False.
"""
if not isinstance(signedtext, six.binary_type):
signedtext = signedtext.encode('utf-8')
_, fil = make_temp(
- signedtext,
- suffix='.xml',
- decode=False,
- delete=self._xmlsec_delete_tmpfiles)
+ signedtext,
+ suffix='.xml',
+ decode=False,
+ delete=self._xmlsec_delete_tmpfiles,
+ )
com_list = [
self.xmlsec,
@@ -891,21 +881,19 @@ class CryptoBackendXmlSec1(CryptoBackend):
if node_id:
com_list.extend(['--node-id', node_id])
- (_stdout, stderr, _output) = self._run_xmlsec(
- com_list,
- [fil],
- exception=SignatureError)
+ try:
+ (_stdout, stderr, _output) = self._run_xmlsec(com_list, [fil])
+ except XmlsecError as e:
+ six.raise_from(SignatureError(com_list), e)
return parse_xmlsec_output(stderr)
- def _run_xmlsec(self, com_list, extra_args, validate_output=True, exception=XmlsecError):
+ def _run_xmlsec(self, com_list, extra_args):
"""
Common code to invoke xmlsec and parse the output.
:param com_list: Key-value parameter list for xmlsec
:param extra_args: Positional parameters to be appended after all
key-value parameters
- :param validate_output: Parse and validate the output
- :param exception: The exception class to raise on errors
:result: Whatever xmlsec wrote to an --output temporary file
"""
with NamedTemporaryFile(suffix='.xml', delete=self._xmlsec_delete_tmpfiles) as ntf:
@@ -919,17 +907,12 @@ class CryptoBackendXmlSec1(CryptoBackend):
p_out = p_out.decode()
p_err = p_err.decode()
- if pof.returncode is not None and pof.returncode < 0:
- logger.error(LOG_LINE, p_out, p_err)
- raise XmlsecError('{err_code}:{err_msg}'.format(
- err_code=pof.returncode, err_msg=p_err))
-
- try:
- if validate_output:
- parse_xmlsec_output(p_err)
- except XmlsecError as exc:
- logger.error(LOG_LINE_2, p_out, p_err, exc)
- raise
+ if pof.returncode != 0:
+ errmsg = "returncode={code}\nerror={err}\noutput={out}".format(
+ code=pof.returncode, err=p_err, out=p_out
+ )
+ logger.error(errmsg)
+ raise XmlsecError(errmsg)
ntf.seek(0)
return p_out, p_err, ntf.read()
@@ -1366,32 +1349,32 @@ class SecurityContext(object):
""" Decrypting an encrypted text by the use of a private key.
:param enctext: The encrypted text as a string
+ :param keys: Keys to try to decrypt enctext with
+ :param id_attr: The attribute name for the identifier, normally one of
+ 'id','Id' or 'ID'
:return: The decrypted text
"""
- _enctext = None
-
- if not id_attr:
- id_attr = self.id_attr
+ key_files = []
if not isinstance(keys, list):
keys = [keys]
- if self.enc_key_files is not None:
- for _enc_key_file in self.enc_key_files:
- _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr)
- if _enctext is not None and len(_enctext) > 0:
- return _enctext
-
- for _key in keys:
- if _key is not None and len(_key.strip()) > 0:
- if not isinstance(_key, six.binary_type):
- _key = str(_key).encode('ascii')
- _, key_file = make_temp(_key, decode=False)
- _enctext = self.crypto.decrypt(enctext, key_file, id_attr)
- if _enctext is not None and len(_enctext) > 0:
- return _enctext
+ keys = [key for key in keys if key]
+ for key in keys:
+ if not isinstance(key, six.binary_type):
+ key = key.encode("ascii")
+ _, key_file = make_temp(key, decode=False, delete=False)
+ key_files.append(key_file)
- return enctext
+ try:
+ dectext = self.decrypt(enctext, key_file=key_files, id_attr=id_attr)
+ except DecryptError as e:
+ raise
+ else:
+ return dectext
+ finally:
+ for key_file in key_files:
+ os.unlink(key_file)
def decrypt(self, enctext, key_file=None, id_attr=''):
""" Decrypting an encrypted text by the use of a private key.
@@ -1399,22 +1382,27 @@ class SecurityContext(object):
:param enctext: The encrypted text as a string
:return: The decrypted text
"""
- _enctext = None
-
if not id_attr:
id_attr = self.id_attr
- if self.enc_key_files is not None:
- for _enc_key_file in self.enc_key_files:
- _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr)
- if _enctext is not None and len(_enctext) > 0:
- return _enctext
+ if not isinstance(key_file, list):
+ key_file = [key_file]
+
+ key_files = [
+ key for key in itertools.chain(key_file, self.enc_key_files) if key
+ ]
+ for key_file in key_files:
+ try:
+ dectext = self.crypto.decrypt(enctext, key_file, id_attr)
+ except XmlsecError as e:
+ continue
+ else:
+ if dectext:
+ return dectext
- if key_file is not None and len(key_file.strip()) > 0:
- _enctext = self.crypto.decrypt(enctext, key_file, id_attr)
- if _enctext is not None and len(_enctext) > 0:
- return _enctext
- return enctext
+ errmsg = "No key was able to decrypt the ciphertext. Keys tried: {keys}"
+ errmsg = errmsg.format(keys=key_files)
+ raise DecryptError(errmsg)
def verify_signature(self, signedtext, cert_file=None, cert_type='pem', node_name=NODE_NAME, node_id=None, id_attr=''):
""" Verifies the signature of a XML document.
@@ -1424,7 +1412,8 @@ class SecurityContext(object):
:param cert_type: The file type of the certificate
:param node_name: The name of the class that is signed
:param node_id: The identifier of the node
- :param id_attr: Should normally be one of 'id', 'Id' or 'ID'
+ :param id_attr: The attribute name for the identifier, normally one of
+ 'id','Id' or 'ID'
:return: Boolean True if the signature was correct otherwise False.
"""
# This is only for testing purposes, otherwise when would you receive
@@ -1466,11 +1455,14 @@ class SecurityContext(object):
for cert in _certs:
if isinstance(cert, six.string_types):
- certs.append(make_temp(
- pem_format(cert),
- suffix='.pem',
- decode=False,
- delete=self._xmlsec_delete_tmpfiles))
+ certs.append(
+ make_temp(
+ pem_format(cert),
+ suffix='.pem',
+ decode=False,
+ delete=self._xmlsec_delete_tmpfiles,
+ )
+ )
else:
certs.append(cert)
else:
@@ -1483,7 +1475,8 @@ class SecurityContext(object):
pem_format(cert),
suffix='.pem',
decode=False,
- delete=self._xmlsec_delete_tmpfiles)
+ delete=self._xmlsec_delete_tmpfiles,
+ )
for cert in cert_from_instance(item)
]
else:
@@ -1527,7 +1520,8 @@ class SecurityContext(object):
:param item: Parsed entity
:param node_name: The name of the node/class/element that is signed
:param origdoc: The original XML string
- :param id_attr:
+ :param id_attr: The attribute name for the identifier, normally one of
+ 'id','Id' or 'ID'
:param must:
:return:
"""
diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py
index ba5cf639..092fbc67 100644
--- a/tests/test_40_sigver.py
+++ b/tests/test_40_sigver.py
@@ -24,6 +24,7 @@ from py.test import raises
from pathutils import full_path
+
SIGNED = full_path("saml_signed.xml")
UNSIGNED = full_path("saml_unsigned.xml")
SIMPLE_SAML_PHP_RESPONSE = full_path("simplesamlphp_authnresponse.xml")
@@ -36,6 +37,12 @@ PRIV_KEY = full_path("test.key")
ENC_PUB_KEY = full_path("pki/test_1.crt")
ENC_PRIV_KEY = full_path("pki/test.key")
+INVALID_KEY = full_path("non-existent.key")
+
+IDP_EXAMPLE = full_path("idp_example.xml")
+METADATA_CERT = full_path("metadata_cert.xml")
+
+
def _eq(l1, l2):
return set(l1) == set(l2)
@@ -721,7 +728,7 @@ class TestSecurityMetadata():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("metadata_cert.xml"))
+ md.load("local", METADATA_CERT)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -742,7 +749,7 @@ class TestSecurityMetadataNonAsciiAva():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("metadata_cert.xml"))
+ md.load("local", METADATA_CERT)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -762,7 +769,7 @@ def test_xbox():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("idp_example.xml"))
+ md.load("local", IDP_EXAMPLE)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -773,41 +780,50 @@ def test_xbox():
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11111", sec.my_cert, 1),
attribute_statement=do_attribute_statement(
- {("", "", "surName"): ("Foo", ""),
- ("", "", "givenName"): ("Bar", ""), })
+ {
+ ("", "", "surName"): ("Foo", ""),
+ ("", "", "givenName"): ("Bar", ""),
+ }
+ )
)
- sigass = sec.sign_statement(assertion, class_name(assertion),
- key_file=full_path("test.key"),
- node_id=assertion.id)
+ sigass = sec.sign_statement(
+ assertion,
+ class_name(assertion),
+ key_file=PRIV_KEY,
+ node_id=assertion.id,
+ )
_ass0 = saml.assertion_from_string(sigass)
-
encrypted_assertion = EncryptedAssertion()
encrypted_assertion.add_extension_element(_ass0)
- _, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False)
+ _, pre = make_temp(
+ str(pre_encryption_part()).encode('utf-8'), decode=False
+ )
enctext = sec.crypto.encrypt(
- str(encrypted_assertion), conf.cert_file, pre, "des-192",
- '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')
+ str(encrypted_assertion),
+ conf.cert_file,
+ pre,
+ "des-192",
+ '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]',
+ )
- decr_text = sec.decrypt(enctext)
+ decr_text = sec.decrypt(enctext, key_file=PRIV_KEY)
_seass = saml.encrypted_assertion_from_string(decr_text)
assertions = []
- assers = extension_elements_to_elements(_seass.extension_elements,
- [saml, samlp])
-
- sign_cert_file = full_path("test.pem")
+ assers = extension_elements_to_elements(
+ _seass.extension_elements, [saml, samlp]
+ )
for ass in assers:
- _ass = "%s" % ass
- #_ass = _ass.replace('xsi:nil="true" ', '')
- #assert sigass == _ass
- _txt = sec.verify_signature(_ass, sign_cert_file,
- node_name=class_name(assertion))
+ _txt = sec.verify_signature(
+ str(ass), PUB_KEY, node_name=class_name(assertion)
+ )
if _txt:
assertions.append(ass)
+ assert assertions
print(assertions)
@@ -815,7 +831,7 @@ def test_xbox_non_ascii_ava():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("idp_example.xml"))
+ md.load("local", IDP_EXAMPLE)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -826,41 +842,50 @@ def test_xbox_non_ascii_ava():
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11111", sec.my_cert, 1),
attribute_statement=do_attribute_statement(
- {("", "", "surName"): ("Föö", ""),
- ("", "", "givenName"): ("Bär", ""), })
+ {
+ ("", "", "surName"): ("Föö", ""),
+ ("", "", "givenName"): ("Bär", ""),
+ }
+ )
)
- sigass = sec.sign_statement(assertion, class_name(assertion),
- key_file=full_path("test.key"),
- node_id=assertion.id)
+ sigass = sec.sign_statement(
+ assertion,
+ class_name(assertion),
+ key_file=PRIV_KEY,
+ node_id=assertion.id,
+ )
_ass0 = saml.assertion_from_string(sigass)
-
encrypted_assertion = EncryptedAssertion()
encrypted_assertion.add_extension_element(_ass0)
- _, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False)
+ _, pre = make_temp(
+ str(pre_encryption_part()).encode('utf-8'), decode=False
+ )
enctext = sec.crypto.encrypt(
- str(encrypted_assertion), conf.cert_file, pre, "des-192",
- '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')
+ str(encrypted_assertion),
+ conf.cert_file,
+ pre,
+ "des-192",
+ '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]',
+ )
- decr_text = sec.decrypt(enctext)
+ decr_text = sec.decrypt(enctext, key_file=PRIV_KEY)
_seass = saml.encrypted_assertion_from_string(decr_text)
assertions = []
- assers = extension_elements_to_elements(_seass.extension_elements,
- [saml, samlp])
-
- sign_cert_file = full_path("test.pem")
+ assers = extension_elements_to_elements(
+ _seass.extension_elements, [saml, samlp]
+ )
for ass in assers:
- _ass = "%s" % ass
- #_ass = _ass.replace('xsi:nil="true" ', '')
- #assert sigass == _ass
- _txt = sec.verify_signature(_ass, sign_cert_file,
- node_name=class_name(assertion))
+ _txt = sec.verify_signature(
+ str(ass), PUB_KEY, node_name=class_name(assertion)
+ )
if _txt:
assertions.append(ass)
+ assert assertions
print(assertions)
@@ -869,7 +894,7 @@ def test_okta():
conf.load_file("server_conf")
conf.id_attr_name = 'Id'
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("idp_example.xml"))
+ md.load("local", IDP_EXAMPLE)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -892,7 +917,7 @@ def test_xmlsec_err():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("idp_example.xml"))
+ md.load("local", IDP_EXAMPLE)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -907,21 +932,20 @@ def test_xmlsec_err():
("", "", "givenName"): ("Bar", ""), })
)
- try:
- sec.sign_statement(assertion, class_name(assertion),
- key_file=full_path("tes.key"),
- node_id=assertion.id)
- except (XmlsecError, SigverError) as err: # should throw an exception
- pass
- else:
- assert False
+ with raises(XmlsecError):
+ sec.sign_statement(
+ assertion,
+ class_name(assertion),
+ key_file=INVALID_KEY,
+ node_id=assertion.id,
+ )
def test_xmlsec_err_non_ascii_ava():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("idp_example.xml"))
+ md.load("local", IDP_EXAMPLE)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -936,21 +960,20 @@ def test_xmlsec_err_non_ascii_ava():
("", "", "givenName"): ("Bär", ""), })
)
- try:
- sec.sign_statement(assertion, class_name(assertion),
- key_file=full_path("tes.key"),
- node_id=assertion.id)
- except (XmlsecError, SigverError) as err: # should throw an exception
- pass
- else:
- assert False
+ with raises(XmlsecError):
+ sec.sign_statement(
+ assertion,
+ class_name(assertion),
+ key_file=INVALID_KEY,
+ node_id=assertion.id,
+ )
def test_sha256_signing():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("idp_example.xml"))
+ md.load("local", IDP_EXAMPLE)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -967,7 +990,7 @@ def test_sha256_signing():
)
s = sec.sign_statement(assertion, class_name(assertion),
- key_file=full_path("test.key"),
+ key_file=PRIV_KEY,
node_id=assertion.id)
assert s
@@ -976,7 +999,7 @@ def test_sha256_signing_non_ascii_ava():
conf = config.SPConfig()
conf.load_file("server_conf")
md = MetadataStore([saml, samlp], None, conf)
- md.load("local", full_path("idp_example.xml"))
+ md.load("local", IDP_EXAMPLE)
conf.metadata = md
conf.only_use_keys_in_metadata = False
@@ -993,7 +1016,7 @@ def test_sha256_signing_non_ascii_ava():
)
s = sec.sign_statement(assertion, class_name(assertion),
- key_file=full_path("test.key"),
+ key_file=PRIV_KEY,
node_id=assertion.id)
assert s
diff --git a/tests/test_42_enc.py b/tests/test_42_enc.py
index d8e38f95..c29eca1e 100644
--- a/tests/test_42_enc.py
+++ b/tests/test_42_enc.py
@@ -72,8 +72,7 @@ def test_enc1():
"--node-xpath", ASSERT_XPATH]
crypto = CryptoBackendXmlSec1(xmlsec_path)
- (_stdout, _stderr, output) = crypto._run_xmlsec(
- com_list, [tmpl], exception=EncryptError, validate_output=False)
+ (_stdout, _stderr, output) = crypto._run_xmlsec(com_list, [tmpl])
print(output)
assert _stderr == ""
diff --git a/tests/test_50_server.py b/tests/test_50_server.py
index 11ace47b..dc6cbf42 100644
--- a/tests/test_50_server.py
+++ b/tests/test_50_server.py
@@ -8,7 +8,7 @@ from six.moves.urllib.parse import parse_qs
import uuid
from saml2.cert import OpenSSLWrapper
-from saml2.sigver import make_temp, EncryptError, CertificateError
+from saml2.sigver import make_temp, DecryptError, EncryptError, CertificateError
from saml2.assertion import Policy
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
@@ -34,6 +34,7 @@ from py.test import raises
from pathutils import full_path
import saml2.xmldsig as ds
+
nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
text="123456")
@@ -171,7 +172,7 @@ class TestServer1():
assert attr1.attribute_value[0].text == "Derek"
assert attr0.friendly_name == "sn"
assert attr0.attribute_value[0].text == "Jeter"
- #
+
subject = assertion.subject
assert _eq(subject.keyswv(), ["text", "name_id"])
assert subject.text == "_aaa"
@@ -613,9 +614,11 @@ class TestServer1():
decr_text_old = copy.deepcopy("%s" % signed_resp)
- decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[0]["key_file"])
-
- assert decr_text == decr_text_old
+ with raises(DecryptError):
+ decr_text = self.server.sec.decrypt(
+ signed_resp,
+ self.client.config.encryption_keypairs[0]["key_file"],
+ )
decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"])
@@ -958,7 +961,7 @@ class TestServer1():
self.verify_advice_assertion(resp, decr_text_2)
def test_encrypted_response_8(self):
- try:
+ with raises(EncryptError):
_resp = self.server.create_authn_response(
self.ava,
"id12", # in_response_to
@@ -973,13 +976,8 @@ class TestServer1():
encrypt_cert_advice="whatever",
encrypt_cert_assertion="whatever"
)
- assert False, "Must throw an exception"
- except EncryptError as ex:
- pass
- except Exception as ex:
- assert False, "Wrong exception!"
- try:
+ with raises(EncryptError):
_resp = self.server.create_authn_response(
self.ava,
"id12", # in_response_to
@@ -993,13 +991,8 @@ class TestServer1():
pefim=True,
encrypt_cert_advice="whatever",
)
- assert False, "Must throw an exception"
- except EncryptError as ex:
- pass
- except Exception as ex:
- assert False, "Wrong exception!"
- try:
+ with raises(EncryptError):
_resp = self.server.create_authn_response(
self.ava,
"id12", # in_response_to
@@ -1013,15 +1006,10 @@ class TestServer1():
encrypted_advice_attributes=False,
encrypt_cert_assertion="whatever"
)
- assert False, "Must throw an exception"
- except EncryptError as ex:
- pass
- except Exception as ex:
- assert False, "Wrong exception!"
_server = Server("idp_conf_verify_cert")
- try:
+ with raises(CertificateError):
_resp = _server.create_authn_response(
self.ava,
"id12", # in_response_to
@@ -1036,13 +1024,8 @@ class TestServer1():
encrypt_cert_advice="whatever",
encrypt_cert_assertion="whatever"
)
- assert False, "Must throw an exception"
- except CertificateError as ex:
- pass
- except Exception as ex:
- assert False, "Wrong exception!"
- try:
+ with raises(CertificateError):
_resp = _server.create_authn_response(
self.ava,
"id12", # in_response_to
@@ -1056,13 +1039,8 @@ class TestServer1():
pefim=True,
encrypt_cert_advice="whatever",
)
- assert False, "Must throw an exception"
- except CertificateError as ex:
- pass
- except Exception as ex:
- assert False, "Wrong exception!"
- try:
+ with raises(CertificateError):
_resp = _server.create_authn_response(
self.ava,
"id12", # in_response_to
@@ -1076,11 +1054,6 @@ class TestServer1():
encrypted_advice_attributes=False,
encrypt_cert_assertion="whatever"
)
- assert False, "Must throw an exception"
- except CertificateError as ex:
- pass
- except Exception as ex:
- assert False, "Wrong exception!"
def test_encrypted_response_9(self):
_server = Server("idp_conf_sp_no_encrypt")
@@ -1715,9 +1688,11 @@ class TestServer1NonAsciiAva():
decr_text_old = copy.deepcopy("%s" % signed_resp)
- decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[0]["key_file"])
-
- assert decr_text == decr_text_old
+ with raises(DecryptError):
+ decr_text = self.server.sec.decrypt(
+ signed_resp,
+ self.client.config.encryption_keypairs[0]["key_file"],
+ )
decr_text = self.server.sec.decrypt(signed_resp, self.client.config.encryption_keypairs[1]["key_file"])