summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/saml2/sigver.py73
-rw-r--r--tests/test_40_sigver.py4
2 files changed, 49 insertions, 28 deletions
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
index e66d4dfa..f4434d55 100644
--- a/src/saml2/sigver.py
+++ b/src/saml2/sigver.py
@@ -591,9 +591,6 @@ def verify_redirect_signature(saml_msg, crypto, cert=None, sigkey=None):
return bool(signer.verify(string, _sign, _key))
-LOG_LINE = 60 * '=' + '\n%s\n' + 60 * '-' + '\n%s' + 60 * '='
-
-
def make_str(txt):
if isinstance(txt, six.string_types):
return txt
@@ -726,7 +723,10 @@ class CryptoBackendXmlSec1(CryptoBackend):
if xpath:
com_list.extend(['--node-xpath', xpath])
- (_stdout, _stderr, output) = self._run_xmlsec(com_list, [template])
+ try:
+ (_stdout, _stderr, output) = self._run_xmlsec(com_list, [template])
+ except XmlsecError as e:
+ six.raise_from(EncryptError(com_list), e)
return output
@@ -748,8 +748,9 @@ class CryptoBackendXmlSec1(CryptoBackend):
if isinstance(statement, SamlBase):
statement = pre_encrypt_assertion(statement)
- _, fil = make_temp(_str(statement), decode=False,
- delete=False)
+ _, fil = make_temp(
+ _str(statement), decode=False, delete=self._xmlsec_delete_tmpfiles
+ )
_, tmpl = make_temp(_str(template), decode=False)
if not node_xpath:
@@ -767,11 +768,10 @@ class CryptoBackendXmlSec1(CryptoBackend):
if node_id:
com_list.extend(['--node-id', node_id])
- (_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl])
-
- os.unlink(fil)
- if not output:
- raise EncryptError(_stderr)
+ try:
+ (_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl])
+ except XmlsecError as e:
+ six.raise_from(EncryptError(com_list), e)
return output.decode('utf-8')
@@ -794,7 +794,11 @@ class CryptoBackendXmlSec1(CryptoBackend):
ENC_KEY_CLASS,
]
- (_stdout, _stderr, output) = self._run_xmlsec(com_list, [fil])
+ try:
+ (_stdout, _stderr, output) = self._run_xmlsec(com_list, [fil])
+ except XmlsecError as e:
+ six.raise_from(DecryptError(com_list), e)
+
return output.decode('utf-8')
def sign_statement(self, statement, node_name, key_file, node_id, id_attr):
@@ -877,7 +881,10 @@ class CryptoBackendXmlSec1(CryptoBackend):
if node_id:
com_list.extend(['--node-id', node_id])
- (_stdout, stderr, _output) = self._run_xmlsec(com_list, [fil])
+ try:
+ (_stdout, stderr, _output) = self._run_xmlsec(com_list, [fil])
+ except XmlsecError as e:
+ six.raise_from(SignatureError(com_list), e)
return parse_xmlsec_output(stderr)
@@ -900,10 +907,12 @@ class CryptoBackendXmlSec1(CryptoBackend):
p_out = p_out.decode()
p_err = p_err.decode()
- if pof.returncode is not None and pof.returncode < 0:
- logger.error(LOG_LINE, p_out, p_err)
- raise XmlsecError('{err_code}:{err_msg}'.format(
- err_code=pof.returncode, err_msg=p_err))
+ if pof.returncode != 0:
+ errmsg = "returncode={code}\nerror={err}\noutput={out}".format(
+ code=pof.returncode, err=p_err, out=p_out
+ )
+ logger.error(errmsg)
+ raise XmlsecError(errmsg)
ntf.seek(0)
return p_out, p_err, ntf.read()
@@ -1352,18 +1361,26 @@ class SecurityContext(object):
if self.enc_key_files is not None:
for _enc_key_file in self.enc_key_files:
- _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr)
- if _enctext is not None and len(_enctext) > 0:
- return _enctext
+ try:
+ _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr)
+ except XmlsecError as e:
+ continue
+ else:
+ if _enctext:
+ return _enctext
for _key in keys:
if _key is not None and len(_key.strip()) > 0:
if not isinstance(_key, six.binary_type):
_key = str(_key).encode('ascii')
_, key_file = make_temp(_key, decode=False)
- _enctext = self.crypto.decrypt(enctext, key_file, id_attr)
- if _enctext is not None and len(_enctext) > 0:
- return _enctext
+ try:
+ _enctext = self.crypto.decrypt(enctext, key_file, id_attr)
+ except XmlsecError as e:
+ continue
+ else:
+ if _enctext:
+ return _enctext
return enctext
@@ -1380,9 +1397,13 @@ class SecurityContext(object):
if self.enc_key_files is not None:
for _enc_key_file in self.enc_key_files:
- _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr)
- if _enctext is not None and len(_enctext) > 0:
- return _enctext
+ try:
+ _enctext = self.crypto.decrypt(enctext, _enc_key_file, id_attr)
+ except XmlsecError:
+ continue
+ else:
+ if _enctext is not None and len(_enctext) > 0:
+ return _enctext
if key_file is not None and len(key_file.strip()) > 0:
_enctext = self.crypto.decrypt(enctext, key_file, id_attr)
diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py
index ba5cf639..e3a20e49 100644
--- a/tests/test_40_sigver.py
+++ b/tests/test_40_sigver.py
@@ -791,7 +791,7 @@ def test_xbox():
str(encrypted_assertion), conf.cert_file, pre, "des-192",
'/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')
- decr_text = sec.decrypt(enctext)
+ decr_text = sec.decrypt(enctext, key_file=PRIV_KEY)
_seass = saml.encrypted_assertion_from_string(decr_text)
assertions = []
assers = extension_elements_to_elements(_seass.extension_elements,
@@ -844,7 +844,7 @@ def test_xbox_non_ascii_ava():
str(encrypted_assertion), conf.cert_file, pre, "des-192",
'/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')
- decr_text = sec.decrypt(enctext)
+ decr_text = sec.decrypt(enctext, key_file=PRIV_KEY)
_seass = saml.encrypted_assertion_from_string(decr_text)
assertions = []
assers = extension_elements_to_elements(_seass.extension_elements,