From 037371861693f26297320dcd5fd8c221b6d8df26 Mon Sep 17 00:00:00 2001 From: Alex Gaynor Date: Thu, 23 Jul 2020 20:40:46 -0400 Subject: Paint it Black by the Rolling Stones (#920) --- src/OpenSSL/crypto.py | 248 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 151 insertions(+), 97 deletions(-) (limited to 'src/OpenSSL/crypto.py') diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py index 30dd478..0744ca7 100644 --- a/src/OpenSSL/crypto.py +++ b/src/OpenSSL/crypto.py @@ -7,7 +7,8 @@ from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ from six import ( integer_types as _integer_types, text_type as _text_type, - PY2 as _PY2) + PY2 as _PY2, +) from cryptography import x509 from cryptography.hazmat.primitives.asymmetric import dsa, rsa @@ -24,42 +25,42 @@ from OpenSSL._util import ( ) __all__ = [ - 'FILETYPE_PEM', - 'FILETYPE_ASN1', - 'FILETYPE_TEXT', - 'TYPE_RSA', - 'TYPE_DSA', - 'Error', - 'PKey', - 'get_elliptic_curves', - 'get_elliptic_curve', - 'X509Name', - 'X509Extension', - 'X509Req', - 'X509', - 'X509StoreFlags', - 'X509Store', - 'X509StoreContextError', - 'X509StoreContext', - 'load_certificate', - 'dump_certificate', - 'dump_publickey', - 'dump_privatekey', - 'Revoked', - 'CRL', - 'PKCS7', - 'PKCS12', - 'NetscapeSPKI', - 'load_publickey', - 'load_privatekey', - 'dump_certificate_request', - 'load_certificate_request', - 'sign', - 'verify', - 'dump_crl', - 'load_crl', - 'load_pkcs7_data', - 'load_pkcs12' + "FILETYPE_PEM", + "FILETYPE_ASN1", + "FILETYPE_TEXT", + "TYPE_RSA", + "TYPE_DSA", + "Error", + "PKey", + "get_elliptic_curves", + "get_elliptic_curve", + "X509Name", + "X509Extension", + "X509Req", + "X509", + "X509StoreFlags", + "X509Store", + "X509StoreContextError", + "X509StoreContext", + "load_certificate", + "dump_certificate", + "dump_publickey", + "dump_privatekey", + "Revoked", + "CRL", + "PKCS7", + "PKCS12", + "NetscapeSPKI", + "load_publickey", + "load_privatekey", + "dump_certificate_request", + "load_certificate_request", + "sign", + "verify", + "dump_crl", + "load_crl", + "load_pkcs7_data", + "load_pkcs12", ] FILETYPE_PEM = _lib.SSL_FILETYPE_PEM @@ -93,6 +94,7 @@ def _get_backend(): triggering this side effect unless _get_backend is called. """ from cryptography.hazmat.backends.openssl.backend import backend + return backend @@ -135,7 +137,7 @@ def _bio_to_string(bio): """ Copy the contents of an OpenSSL BIO object into a Python byte string. """ - result_buffer = _ffi.new('char**') + result_buffer = _ffi.new("char**") buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) return _ffi.buffer(result_buffer[0], buffer_length)[:] @@ -172,7 +174,7 @@ def _get_asn1_time(timestamp): @return: The time value from C{timestamp} as a L{bytes} string in a certain format. Or C{None} if the object contains no time value. """ - string_timestamp = _ffi.cast('ASN1_STRING*', timestamp) + string_timestamp = _ffi.cast("ASN1_STRING*", timestamp) if _lib.ASN1_STRING_length(string_timestamp) == 0: return None elif ( @@ -195,7 +197,8 @@ def _get_asn1_time(timestamp): _untested_error("ASN1_TIME_to_generalizedtime") else: string_timestamp = _ffi.cast( - "ASN1_STRING*", generalized_timestamp[0]) + "ASN1_STRING*", generalized_timestamp[0] + ) string_data = _lib.ASN1_STRING_data(string_timestamp) string_result = _ffi.string(string_data) _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) @@ -219,6 +222,7 @@ class PKey(object): """ A class representing an DSA or RSA public key or key pair. """ + _only_public = False _initialized = True @@ -257,8 +261,15 @@ class PKey(object): .. versionadded:: 16.1.0 """ pkey = cls() - if not isinstance(crypto_key, (rsa.RSAPublicKey, rsa.RSAPrivateKey, - dsa.DSAPublicKey, dsa.DSAPrivateKey)): + if not isinstance( + crypto_key, + ( + rsa.RSAPublicKey, + rsa.RSAPrivateKey, + dsa.DSAPublicKey, + dsa.DSAPrivateKey, + ), + ): raise TypeError("Unsupported key type") pkey._pkey = crypto_key._evp_pkey @@ -375,6 +386,7 @@ class _EllipticCurve(object): instances each of which represents one curve supported by the system. @type _curves: :py:type:`NoneType` or :py:type:`set` """ + _curves = None if not _PY2: @@ -401,14 +413,12 @@ class _EllipticCurve(object): elliptic curves the underlying library supports. """ num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) - builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves) + builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves) # The return value on this call should be num_curves again. We # could check it to make sure but if it *isn't* then.. what could # we do? Abort the whole process, I suppose...? -exarkun lib.EC_get_builtin_curves(builtin_curves, num_curves) - return set( - cls.from_nid(lib, c.nid) - for c in builtin_curves) + return set(cls.from_nid(lib, c.nid) for c in builtin_curves) @classmethod def _get_elliptic_curves(cls, lib): @@ -541,14 +551,16 @@ class X509Name(object): self._name = _ffi.gc(name, _lib.X509_NAME_free) def __setattr__(self, name, value): - if name.startswith('_'): + if name.startswith("_"): return super(X509Name, self).__setattr__(name, value) # Note: we really do not want str subclasses here, so we do not use # isinstance. if type(name) is not str: - raise TypeError("attribute name must be string, not '%.200s'" % ( - type(value).__name__,)) + raise TypeError( + "attribute name must be string, not '%.200s'" + % (type(value).__name__,) + ) nid = _lib.OBJ_txt2nid(_byte_string(name)) if nid == _lib.NID_undef: @@ -569,10 +581,11 @@ class X509Name(object): break if isinstance(value, _text_type): - value = value.encode('utf-8') + value = value.encode("utf-8") add_result = _lib.X509_NAME_add_entry_by_NID( - self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0) + self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0 + ) if not add_result: _raise_current_error() @@ -608,9 +621,9 @@ class X509Name(object): _openssl_assert(data_length >= 0) try: - result = _ffi.buffer( - result_buffer[0], data_length - )[:].decode('utf-8') + result = _ffi.buffer(result_buffer[0], data_length)[:].decode( + "utf-8" + ) finally: # XXX untested _lib.OPENSSL_free(result_buffer[0]) @@ -622,6 +635,7 @@ class X509Name(object): return NotImplemented result = _lib.X509_NAME_cmp(self._name, other._name) return op(result, 0) + return f __eq__ = _cmp(__eq__) @@ -639,11 +653,13 @@ class X509Name(object): """ result_buffer = _ffi.new("char[]", 512) format_result = _lib.X509_NAME_oneline( - self._name, result_buffer, len(result_buffer)) + self._name, result_buffer, len(result_buffer) + ) _openssl_assert(format_result != _ffi.NULL) return "" % ( - _native(_ffi.string(result_buffer)),) + _native(_ffi.string(result_buffer)), + ) def hash(self): """ @@ -664,7 +680,7 @@ class X509Name(object): :return: The DER encoded form of this name. :rtype: :py:class:`bytes` """ - result_buffer = _ffi.new('unsigned char**') + result_buffer = _ffi.new("unsigned char**") encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) _openssl_assert(encode_result >= 0) @@ -691,8 +707,9 @@ class X509Name(object): # ffi.string does not handle strings containing NULL bytes # (which may have been generated by old, broken software) - value = _ffi.buffer(_lib.ASN1_STRING_data(fval), - _lib.ASN1_STRING_length(fval))[:] + value = _ffi.buffer( + _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval) + )[:] result.append((_ffi.string(name), value)) return result @@ -793,7 +810,8 @@ class X509Extension(object): parts.append(_native(_bio_to_string(bio))) else: value = _native( - _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]) + _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:] + ) parts.append(label + ":" + value) return ", ".join(parts) @@ -843,7 +861,7 @@ class X509Extension(object): .. versionadded:: 0.12 """ octet_result = _lib.X509_EXTENSION_get_data(self._extension) - string_result = _ffi.cast('ASN1_STRING*', octet_result) + string_result = _ffi.cast("ASN1_STRING*", octet_result) char_result = _lib.ASN1_STRING_data(string_result) result_length = _lib.ASN1_STRING_length(string_result) return _ffi.buffer(char_result, result_length)[:] @@ -869,8 +887,9 @@ class X509Req(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import ( - _CertificateSigningRequest + _CertificateSigningRequest, ) + backend = _get_backend() return _CertificateSigningRequest(backend, self._req) @@ -1052,6 +1071,7 @@ class X509(object): """ An X.509 certificate. """ + def __init__(self): x509 = _lib.X509_new() _openssl_assert(x509 != _ffi.NULL) @@ -1077,6 +1097,7 @@ class X509(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import _Certificate + backend = _get_backend() return _Certificate(backend, self._x509) @@ -1218,12 +1239,16 @@ class X509(object): result_length[0] = len(result_buffer) digest_result = _lib.X509_digest( - self._x509, digest, result_buffer, result_length) + self._x509, digest, result_buffer, result_length + ) _openssl_assert(digest_result == 1) - return b":".join([ - b16encode(ch).upper() for ch - in _ffi.buffer(result_buffer, result_length[0])]) + return b":".join( + [ + b16encode(ch).upper() + for ch in _ffi.buffer(result_buffer, result_length[0]) + ] + ) def subject_name_hash(self): """ @@ -1248,7 +1273,7 @@ class X509(object): hex_serial = hex(serial)[2:] if not isinstance(hex_serial, bytes): - hex_serial = hex_serial.encode('ascii') + hex_serial = hex_serial.encode("ascii") bignum_serial = _ffi.new("BIGNUM**") @@ -1259,7 +1284,8 @@ class X509(object): if bignum_serial[0] == _ffi.NULL: set_result = _lib.ASN1_INTEGER_set( - _lib.X509_get_serialNumber(self._x509), small_serial) + _lib.X509_get_serialNumber(self._x509), small_serial + ) if set_result: # TODO Not tested _raise_current_error() @@ -1524,6 +1550,7 @@ class X509StoreFlags(object): .. _OpenSSL Verification Flags: https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html """ + CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL @@ -1644,7 +1671,7 @@ class X509Store(object): param = _lib.X509_VERIFY_PARAM_new() param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free) - _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime('%s'))) + _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime("%s"))) _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) @@ -1722,8 +1749,13 @@ class X509StoreContext(object): errors = [ _lib.X509_STORE_CTX_get_error(self._store_ctx), _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), - _native(_ffi.string(_lib.X509_verify_cert_error_string( - _lib.X509_STORE_CTX_get_error(self._store_ctx)))), + _native( + _ffi.string( + _lib.X509_verify_cert_error_string( + _lib.X509_STORE_CTX_get_error(self._store_ctx) + ) + ) + ), ] # A context error should always be associated with a certificate, so we # expect this call to never return :class:`None`. @@ -1787,8 +1819,7 @@ def load_certificate(type, buffer): elif type == FILETYPE_ASN1: x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) else: - raise ValueError( - "type argument must be FILETYPE_PEM or FILETYPE_ASN1") + raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") if x509 == _ffi.NULL: _raise_current_error() @@ -1817,7 +1848,8 @@ def dump_certificate(type, cert): else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) _openssl_assert(result_code == 1) return _bio_to_string(bio) @@ -1873,7 +1905,8 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): if passphrase is None: raise TypeError( "if a value is given for cipher " - "one must also be given for passphrase") + "one must also be given for passphrase" + ) cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) if cipher_obj == _ffi.NULL: raise ValueError("Invalid cipher name") @@ -1883,8 +1916,14 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): helper = _PassphraseHelper(type, passphrase) if type == FILETYPE_PEM: result_code = _lib.PEM_write_bio_PrivateKey( - bio, pkey._pkey, cipher_obj, _ffi.NULL, 0, - helper.callback, helper.callback_args) + bio, + pkey._pkey, + cipher_obj, + _ffi.NULL, + 0, + helper.callback, + helper.callback_args, + ) helper.raise_if_problem() elif type == FILETYPE_ASN1: result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) @@ -1892,15 +1931,13 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA: raise TypeError("Only RSA keys are supported for FILETYPE_TEXT") - rsa = _ffi.gc( - _lib.EVP_PKEY_get1_RSA(pkey._pkey), - _lib.RSA_free - ) + rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free) result_code = _lib.RSA_print(bio, rsa, 0) else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) _openssl_assert(result_code != 0) @@ -1911,6 +1948,7 @@ class Revoked(object): """ A certificate revocation. """ + # https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches # OCSP_crl_reason_str. We use the latter, just like the command line @@ -1950,7 +1988,8 @@ class Revoked(object): asn1_serial = _ffi.gc( _lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL), - _lib.ASN1_INTEGER_free) + _lib.ASN1_INTEGER_free, + ) _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) def get_serial(self): @@ -2001,7 +2040,7 @@ class Revoked(object): elif not isinstance(reason, bytes): raise TypeError("reason must be None or a byte string") else: - reason = reason.lower().replace(b' ', b'') + reason = reason.lower().replace(b" ", b"") reason_code = [r.lower() for r in self._crl_reasons].index(reason) new_reason_ext = _lib.ASN1_ENUMERATED_new() @@ -2013,7 +2052,8 @@ class Revoked(object): self._delete_reason() add_result = _lib.X509_REVOKED_add1_ext_i2d( - self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0) + self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0 + ) _openssl_assert(add_result == 1) def get_reason(self): @@ -2095,8 +2135,9 @@ class CRL(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import ( - _CertificateRevocationList + _CertificateRevocationList, ) + backend = _get_backend() return _CertificateRevocationList(backend, self._crl) @@ -2236,13 +2277,15 @@ class CRL(object): digest_obj = _lib.EVP_get_digestbyname(digest) _openssl_assert(digest_obj != _ffi.NULL) _lib.X509_CRL_set_issuer_name( - self._crl, _lib.X509_get_subject_name(issuer_cert._x509)) + self._crl, _lib.X509_get_subject_name(issuer_cert._x509) + ) _lib.X509_CRL_sort(self._crl) result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj) _openssl_assert(result != 0) - def export(self, cert, key, type=FILETYPE_PEM, days=100, - digest=_UNSPECIFIED): + def export( + self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED + ): """ Export the CRL as a string. @@ -2500,10 +2543,17 @@ class PKCS12(object): cert = self._cert._x509 pkcs12 = _lib.PKCS12_create( - passphrase, friendlyname, pkey, cert, cacerts, + passphrase, + friendlyname, + pkey, + cert, + cacerts, _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, - iter, maciter, 0) + iter, + maciter, + 0, + ) if pkcs12 == _ffi.NULL: _raise_current_error() pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free) @@ -2667,7 +2717,7 @@ class _PassphraseHelper(object): "passphrase returned by callback is too long" ) for i in range(len(result)): - buf[i] = result[i:i + 1] + buf[i] = result[i : i + 1] return len(result) except Exception as e: self._problems.append(e) @@ -2692,7 +2742,8 @@ def load_publickey(type, buffer): if type == FILETYPE_PEM: evp_pkey = _lib.PEM_read_bio_PUBKEY( - bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) + bio, _ffi.NULL, _ffi.NULL, _ffi.NULL + ) elif type == FILETYPE_ASN1: evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL) else: @@ -2728,7 +2779,8 @@ def load_privatekey(type, buffer, passphrase=None): helper = _PassphraseHelper(type, passphrase) if type == FILETYPE_PEM: evp_pkey = _lib.PEM_read_bio_PrivateKey( - bio, _ffi.NULL, helper.callback, helper.callback_args) + bio, _ffi.NULL, helper.callback, helper.callback_args + ) helper.raise_if_problem() elif type == FILETYPE_ASN1: evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) @@ -2827,7 +2879,8 @@ def sign(pkey, data, digest): signature_buffer = _ffi.new("unsigned char[]", length) signature_length = _ffi.new("unsigned int *") final_result = _lib.EVP_SignFinal( - md_ctx, signature_buffer, signature_length, pkey._pkey) + md_ctx, signature_buffer, signature_length, pkey._pkey + ) _openssl_assert(final_result == 1) return _ffi.buffer(signature_buffer, signature_length[0])[:] @@ -2891,7 +2944,8 @@ def dump_crl(type, crl): else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) _openssl_assert(ret == 1) return _bio_to_string(bio) @@ -3061,4 +3115,4 @@ _lib.SSL_load_error_strings() # Set the default string mask to match OpenSSL upstream (since 2005) and # RFC5280 recommendations. -_lib.ASN1_STRING_set_default_mask_asc(b'utf8only') +_lib.ASN1_STRING_set_default_mask_asc(b"utf8only") -- cgit v1.2.1