diff options
Diffstat (limited to 'src/saml2/sigver.py')
-rw-r--r-- | src/saml2/sigver.py | 147 |
1 files changed, 58 insertions, 89 deletions
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 6f4b464a..94666b8d 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -8,6 +8,7 @@ import hashlib import itertools import logging import os +import uuid import six from time import mktime @@ -649,13 +650,13 @@ class CryptoBackend(object): def encrypt_assertion(self, statement, enc_key, template, key_type, node_xpath): raise NotImplementedError() - def decrypt(self, enctext, key_file, id_attr): + def decrypt(self, enctext, key_file): raise NotImplementedError() - def sign_statement(self, statement, node_name, key_file, node_id, id_attr): + def sign_statement(self, statement, node_name, key_file, node_id): raise NotImplementedError() - def validate_signature(self, enctext, cert_file, cert_type, node_name, node_id, id_attr): + def validate_signature(self, enctext, cert_file, cert_type, node_name, node_id): raise NotImplementedError() @@ -767,7 +768,7 @@ class CryptoBackendXmlSec1(CryptoBackend): return output.decode('utf-8') - def decrypt(self, enctext, key_file, id_attr): + def decrypt(self, enctext, key_file): """ :param enctext: XML document containing an encrypted part @@ -782,8 +783,7 @@ class CryptoBackendXmlSec1(CryptoBackend): self.xmlsec, '--decrypt', '--privkey-pem', key_file, - '--id-attr:{id_attr}'.format(id_attr=id_attr), - ENC_KEY_CLASS, + '--id-attr:Id', ENC_KEY_CLASS, ] try: @@ -793,7 +793,7 @@ class CryptoBackendXmlSec1(CryptoBackend): return output.decode('utf-8') - def sign_statement(self, statement, node_name, key_file, node_id, id_attr): + def sign_statement(self, statement, node_name, key_file, node_id): """ Sign an XML statement. @@ -801,8 +801,6 @@ class CryptoBackendXmlSec1(CryptoBackend): :param node_name: string like 'urn:oasis:names:...:Assertion' :param key_file: The file where the key can be found :param node_id: - :param id_attr: The attribute name for the identifier, normally one of - 'id','Id' or 'ID' :return: The signed statement """ if isinstance(statement, SamlBase): @@ -817,8 +815,7 @@ class CryptoBackendXmlSec1(CryptoBackend): self.xmlsec, '--sign', '--privkey-pem', key_file, - '--id-attr:{id_attr_name}'.format(id_attr_name=id_attr), - node_name, + '--id-attr:ID', node_name, ] if node_id: @@ -836,7 +833,7 @@ class CryptoBackendXmlSec1(CryptoBackend): return stdout.decode("utf-8") raise SignatureError(stderr) - def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_id, id_attr): + def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_id): """ Validate signature on XML document. @@ -845,8 +842,6 @@ class CryptoBackendXmlSec1(CryptoBackend): :param cert_type: The file type of the certificate :param node_name: The name of the class that is signed :param node_id: The identifier of the node - :param id_attr: The attribute name for the identifier, normally one of - 'id','Id' or 'ID' :return: Boolean True if the signature was correct otherwise False. """ if not isinstance(signedtext, six.binary_type): @@ -862,8 +857,7 @@ class CryptoBackendXmlSec1(CryptoBackend): '--verify', '--enabled-reference-uris', 'empty,same-doc', '--pubkey-cert-{type}'.format(type=cert_type), cert_file, - '--id-attr:{id_attr_name}'.format(id_attr_name=id_attr), - node_name, + '--id-attr:ID', node_name, ] if node_id: @@ -927,7 +921,7 @@ class CryptoBackendXMLSecurity(CryptoBackend): # better than static 0.0 here. return 'XMLSecurity 0.0' - def sign_statement(self, statement, node_name, key_file, node_id, id_attr): + def sign_statement(self, statement, node_name, key_file, node_id): """ Sign an XML statement. @@ -950,7 +944,7 @@ class CryptoBackendXMLSecurity(CryptoBackend): signed_str = signed_str.decode("utf-8") return signed_str - def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_id, id_attr): + def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_id): """ Validate signature on XML document. @@ -989,11 +983,6 @@ def security_context(conf): except AttributeError: metadata = None - try: - id_attr = conf.id_attr_name - except AttributeError: - id_attr = None - sec_backend = None if conf.crypto_backend == 'xmlsec1': @@ -1054,7 +1043,6 @@ def security_context(conf): enc_key_files=enc_key_files, encryption_keypairs=conf.encryption_keypairs, sec_backend=sec_backend, - id_attr=id_attr, delete_tmpfiles=conf.delete_tmpfiles) @@ -1225,7 +1213,6 @@ class CertHandler(object): # openssl x509 -inform pem -noout -in server.crt -pubkey > publickey.pem # openssl rsa -inform pem -noout -in publickey.pem -pubin -modulus class SecurityContext(object): - DEFAULT_ID_ATTR_NAME = 'ID' my_cert = None def __init__( @@ -1245,11 +1232,8 @@ class SecurityContext(object): encryption_keypairs=None, enc_cert_type='pem', sec_backend=None, - id_attr='', delete_tmpfiles=True): - self.id_attr = id_attr or SecurityContext.DEFAULT_ID_ATTR_NAME - self.crypto = crypto assert (isinstance(self.crypto, CryptoBackend)) @@ -1335,13 +1319,11 @@ class SecurityContext(object): return self.crypto.encrypt_assertion( statement, enc_key, template, key_type, node_xpath) - def decrypt_keys(self, enctext, keys=None, id_attr=''): + def decrypt_keys(self, enctext, keys=None): """ Decrypting an encrypted text by the use of a private key. :param enctext: The encrypted text as a string :param keys: Keys to try to decrypt enctext with - :param id_attr: The attribute name for the identifier, normally one of - 'id','Id' or 'ID' :return: The decrypted text """ key_files = [] @@ -1361,21 +1343,18 @@ class SecurityContext(object): key_file_names = list(tmp.name for tmp in key_files) try: - dectext = self.decrypt(enctext, key_file=key_file_names, id_attr=id_attr) + dectext = self.decrypt(enctext, key_file=key_file_names) except DecryptError as e: raise else: return dectext - def decrypt(self, enctext, key_file=None, id_attr=''): + def decrypt(self, enctext, key_file=None): """ Decrypting an encrypted text by the use of a private key. :param enctext: The encrypted text as a string :return: The decrypted text """ - if not id_attr: - id_attr = self.id_attr - if not isinstance(key_file, list): key_file = [key_file] @@ -1384,7 +1363,7 @@ class SecurityContext(object): ] for key_file in key_files: try: - dectext = self.crypto.decrypt(enctext, key_file, id_attr) + dectext = self.crypto.decrypt(enctext, key_file) except XmlsecError as e: continue else: @@ -1395,7 +1374,7 @@ class SecurityContext(object): errmsg = errmsg.format(keys=key_files) raise DecryptError(errmsg) - def verify_signature(self, signedtext, cert_file=None, cert_type='pem', node_name=NODE_NAME, node_id=None, id_attr=''): + def verify_signature(self, signedtext, cert_file=None, cert_type='pem', node_name=NODE_NAME, node_id=None): """ Verifies the signature of a XML document. :param signedtext: The XML document as a string @@ -1403,8 +1382,6 @@ class SecurityContext(object): :param cert_type: The file type of the certificate :param node_name: The name of the class that is signed :param node_id: The identifier of the node - :param id_attr: The attribute name for the identifier, normally one of - 'id','Id' or 'ID' :return: Boolean True if the signature was correct otherwise False. """ # This is only for testing purposes, otherwise when would you receive @@ -1413,18 +1390,15 @@ class SecurityContext(object): cert_file = self.cert_file cert_type = self.cert_type - if not id_attr: - id_attr = self.id_attr - return self.crypto.validate_signature( - signedtext, - cert_file=cert_file, - cert_type=cert_type, - node_name=node_name, - node_id=node_id, - id_attr=id_attr) - - def _check_signature(self, decoded_xml, item, node_name=NODE_NAME, origdoc=None, id_attr='', must=False, only_valid_cert=False, issuer=None): + signedtext, + cert_file=cert_file, + cert_type=cert_type, + node_name=node_name, + node_id=node_id, + ) + + def _check_signature(self, decoded_xml, item, node_name=NODE_NAME, origdoc=None, must=False, only_valid_cert=False, issuer=None): try: _issuer = item.issuer.text.strip() except AttributeError: @@ -1528,11 +1502,11 @@ class SecurityContext(object): try: last_pem_file = pem_fd.name if self.verify_signature( - decoded_xml, - pem_fd.name, - node_name=node_name, - node_id=item.id, - id_attr=id_attr): + decoded_xml, + pem_fd.name, + node_name=node_name, + node_id=item.id, + ): verified = True break except XmlsecError as exc: @@ -1550,25 +1524,23 @@ class SecurityContext(object): return item - def check_signature(self, item, node_name=NODE_NAME, origdoc=None, id_attr='', must=False, issuer=None): + def check_signature(self, item, node_name=NODE_NAME, origdoc=None, must=False, issuer=None): """ :param item: Parsed entity :param node_name: The name of the node/class/element that is signed :param origdoc: The original XML string - :param id_attr: The attribute name for the identifier, normally one of - 'id','Id' or 'ID' :param must: :return: """ return self._check_signature( - origdoc, - item, - node_name, - origdoc, - id_attr=id_attr, - must=must, - issuer=issuer) + origdoc, + item, + node_name, + origdoc, + must=must, + issuer=issuer, + ) def correctly_signed_message(self, decoded_xml, msgtype, must=False, origdoc=None, only_valid_cert=False): """Check if a request is correctly signed, if we have metadata for @@ -1683,7 +1655,7 @@ class SecurityContext(object): """ Deprecated function. See sign_statement(). """ return self.sign_statement(statement, **kwargs) - def sign_statement(self, statement, node_name, key=None, key_file=None, node_id=None, id_attr=''): + def sign_statement(self, statement, node_name, key=None, key_file=None, node_id=None): """Sign a SAML statement. :param statement: The statement to be signed @@ -1691,13 +1663,8 @@ class SecurityContext(object): :param key: The key to be used for the signing, either this or :param key_file: The file where the key can be found :param node_id: - :param id_attr: The attribute name for the identifier, normally one of - 'id','Id' or 'ID' :return: The signed statement """ - if not id_attr: - id_attr = self.id_attr - if not key_file and key: content = str(key).encode() tmp = make_temp(content, suffix=".pem", delete_tmpfiles=self.delete_tmpfiles) @@ -1707,11 +1674,11 @@ class SecurityContext(object): key_file = self.key_file return self.crypto.sign_statement( - statement, - node_name, - key_file, - node_id, - id_attr) + statement, + node_name, + key_file, + node_id, + ) def sign_assertion_using_xmlsec(self, statement, **kwargs): """ Deprecated function. See sign_assertion(). """ @@ -1749,13 +1716,12 @@ class SecurityContext(object): Sign multiple parts of a statement :param statement: The statement that should be sign, this is XML text - :param to_sign: A list of (items, id, id attribute name) tuples that - specifies what to sign + :param to_sign: A list of (items, id) tuples that specifies what to sign :param key: A key that should be used for doing the signing :param key_file: A file that contains the key to be used :return: A possibly multiple signed statement """ - for (item, sid, id_attr) in to_sign: + for (item, sid) in to_sign: if not sid: if not item.id: sid = item.id = sid() @@ -1770,12 +1736,12 @@ class SecurityContext(object): digest_alg=digest_alg) statement = self.sign_statement( - statement, - class_name(item), - key=key, - key_file=key_file, - node_id=sid, - id_attr=id_attr) + statement, + class_name(item), + key=key, + key_file=key_file, + node_id=sid, + ) return statement @@ -1860,7 +1826,8 @@ def pre_signature_part(ident, public_key=None, identifier=None, digest_alg=None, # </EncryptedData> -def pre_encryption_part(msg_enc=TRIPLE_DES_CBC, key_enc=RSA_1_5, key_name='my-rsa-key'): +def pre_encryption_part(msg_enc=TRIPLE_DES_CBC, key_enc=RSA_1_5, key_name='my-rsa-key', + encrypted_key_id=None, encrypted_data_id=None): """ :param msg_enc: @@ -1868,10 +1835,12 @@ def pre_encryption_part(msg_enc=TRIPLE_DES_CBC, key_enc=RSA_1_5, key_name='my-rs :param key_name: :return: """ + ek_id = encrypted_key_id or str(uuid.uuid4()) + ed_id = encrypted_data_id or str(uuid.uuid4()) msg_encryption_method = EncryptionMethod(algorithm=msg_enc) key_encryption_method = EncryptionMethod(algorithm=key_enc) encrypted_key = EncryptedKey( - id='EK', + id=ek_id, encryption_method=key_encryption_method, key_info=ds.KeyInfo( key_name=ds.KeyName(text=key_name)), @@ -1879,7 +1848,7 @@ def pre_encryption_part(msg_enc=TRIPLE_DES_CBC, key_enc=RSA_1_5, key_name='my-rs cipher_value=CipherValue(text=''))) key_info = ds.KeyInfo(encrypted_key=encrypted_key) encrypted_data = EncryptedData( - id='ED', + id=ed_id, type='http://www.w3.org/2001/04/xmlenc#Element', encryption_method=msg_encryption_method, key_info=key_info, |