summaryrefslogtreecommitdiff
path: root/src/OpenSSL/crypto.py
diff options
context:
space:
mode:
authorAlex Gaynor <alex.gaynor@gmail.com>2020-07-23 20:40:46 -0400
committerGitHub <noreply@github.com>2020-07-23 19:40:46 -0500
commit037371861693f26297320dcd5fd8c221b6d8df26 (patch)
treeab18ca46617b0036e137cd6a154726acbab36bdf /src/OpenSSL/crypto.py
parent4ca4fb9e8ed3c45f09efab8269e4078d40f39d9b (diff)
downloadpyopenssl-037371861693f26297320dcd5fd8c221b6d8df26.tar.gz
Paint it Black by the Rolling Stones (#920)
Diffstat (limited to 'src/OpenSSL/crypto.py')
-rw-r--r--src/OpenSSL/crypto.py248
1 files changed, 151 insertions, 97 deletions
diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py
index 30dd478..0744ca7 100644
--- a/src/OpenSSL/crypto.py
+++ b/src/OpenSSL/crypto.py
@@ -7,7 +7,8 @@ from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__
from six import (
integer_types as _integer_types,
text_type as _text_type,
- PY2 as _PY2)
+ PY2 as _PY2,
+)
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import dsa, rsa
@@ -24,42 +25,42 @@ from OpenSSL._util import (
)
__all__ = [
- 'FILETYPE_PEM',
- 'FILETYPE_ASN1',
- 'FILETYPE_TEXT',
- 'TYPE_RSA',
- 'TYPE_DSA',
- 'Error',
- 'PKey',
- 'get_elliptic_curves',
- 'get_elliptic_curve',
- 'X509Name',
- 'X509Extension',
- 'X509Req',
- 'X509',
- 'X509StoreFlags',
- 'X509Store',
- 'X509StoreContextError',
- 'X509StoreContext',
- 'load_certificate',
- 'dump_certificate',
- 'dump_publickey',
- 'dump_privatekey',
- 'Revoked',
- 'CRL',
- 'PKCS7',
- 'PKCS12',
- 'NetscapeSPKI',
- 'load_publickey',
- 'load_privatekey',
- 'dump_certificate_request',
- 'load_certificate_request',
- 'sign',
- 'verify',
- 'dump_crl',
- 'load_crl',
- 'load_pkcs7_data',
- 'load_pkcs12'
+ "FILETYPE_PEM",
+ "FILETYPE_ASN1",
+ "FILETYPE_TEXT",
+ "TYPE_RSA",
+ "TYPE_DSA",
+ "Error",
+ "PKey",
+ "get_elliptic_curves",
+ "get_elliptic_curve",
+ "X509Name",
+ "X509Extension",
+ "X509Req",
+ "X509",
+ "X509StoreFlags",
+ "X509Store",
+ "X509StoreContextError",
+ "X509StoreContext",
+ "load_certificate",
+ "dump_certificate",
+ "dump_publickey",
+ "dump_privatekey",
+ "Revoked",
+ "CRL",
+ "PKCS7",
+ "PKCS12",
+ "NetscapeSPKI",
+ "load_publickey",
+ "load_privatekey",
+ "dump_certificate_request",
+ "load_certificate_request",
+ "sign",
+ "verify",
+ "dump_crl",
+ "load_crl",
+ "load_pkcs7_data",
+ "load_pkcs12",
]
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
@@ -93,6 +94,7 @@ def _get_backend():
triggering this side effect unless _get_backend is called.
"""
from cryptography.hazmat.backends.openssl.backend import backend
+
return backend
@@ -135,7 +137,7 @@ def _bio_to_string(bio):
"""
Copy the contents of an OpenSSL BIO object into a Python byte string.
"""
- result_buffer = _ffi.new('char**')
+ result_buffer = _ffi.new("char**")
buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
return _ffi.buffer(result_buffer[0], buffer_length)[:]
@@ -172,7 +174,7 @@ def _get_asn1_time(timestamp):
@return: The time value from C{timestamp} as a L{bytes} string in a certain
format. Or C{None} if the object contains no time value.
"""
- string_timestamp = _ffi.cast('ASN1_STRING*', timestamp)
+ string_timestamp = _ffi.cast("ASN1_STRING*", timestamp)
if _lib.ASN1_STRING_length(string_timestamp) == 0:
return None
elif (
@@ -195,7 +197,8 @@ def _get_asn1_time(timestamp):
_untested_error("ASN1_TIME_to_generalizedtime")
else:
string_timestamp = _ffi.cast(
- "ASN1_STRING*", generalized_timestamp[0])
+ "ASN1_STRING*", generalized_timestamp[0]
+ )
string_data = _lib.ASN1_STRING_data(string_timestamp)
string_result = _ffi.string(string_data)
_lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
@@ -219,6 +222,7 @@ class PKey(object):
"""
A class representing an DSA or RSA public key or key pair.
"""
+
_only_public = False
_initialized = True
@@ -257,8 +261,15 @@ class PKey(object):
.. versionadded:: 16.1.0
"""
pkey = cls()
- if not isinstance(crypto_key, (rsa.RSAPublicKey, rsa.RSAPrivateKey,
- dsa.DSAPublicKey, dsa.DSAPrivateKey)):
+ if not isinstance(
+ crypto_key,
+ (
+ rsa.RSAPublicKey,
+ rsa.RSAPrivateKey,
+ dsa.DSAPublicKey,
+ dsa.DSAPrivateKey,
+ ),
+ ):
raise TypeError("Unsupported key type")
pkey._pkey = crypto_key._evp_pkey
@@ -375,6 +386,7 @@ class _EllipticCurve(object):
instances each of which represents one curve supported by the system.
@type _curves: :py:type:`NoneType` or :py:type:`set`
"""
+
_curves = None
if not _PY2:
@@ -401,14 +413,12 @@ class _EllipticCurve(object):
elliptic curves the underlying library supports.
"""
num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0)
- builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves)
+ builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves)
# The return value on this call should be num_curves again. We
# could check it to make sure but if it *isn't* then.. what could
# we do? Abort the whole process, I suppose...? -exarkun
lib.EC_get_builtin_curves(builtin_curves, num_curves)
- return set(
- cls.from_nid(lib, c.nid)
- for c in builtin_curves)
+ return set(cls.from_nid(lib, c.nid) for c in builtin_curves)
@classmethod
def _get_elliptic_curves(cls, lib):
@@ -541,14 +551,16 @@ class X509Name(object):
self._name = _ffi.gc(name, _lib.X509_NAME_free)
def __setattr__(self, name, value):
- if name.startswith('_'):
+ if name.startswith("_"):
return super(X509Name, self).__setattr__(name, value)
# Note: we really do not want str subclasses here, so we do not use
# isinstance.
if type(name) is not str:
- raise TypeError("attribute name must be string, not '%.200s'" % (
- type(value).__name__,))
+ raise TypeError(
+ "attribute name must be string, not '%.200s'"
+ % (type(value).__name__,)
+ )
nid = _lib.OBJ_txt2nid(_byte_string(name))
if nid == _lib.NID_undef:
@@ -569,10 +581,11 @@ class X509Name(object):
break
if isinstance(value, _text_type):
- value = value.encode('utf-8')
+ value = value.encode("utf-8")
add_result = _lib.X509_NAME_add_entry_by_NID(
- self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0)
+ self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0
+ )
if not add_result:
_raise_current_error()
@@ -608,9 +621,9 @@ class X509Name(object):
_openssl_assert(data_length >= 0)
try:
- result = _ffi.buffer(
- result_buffer[0], data_length
- )[:].decode('utf-8')
+ result = _ffi.buffer(result_buffer[0], data_length)[:].decode(
+ "utf-8"
+ )
finally:
# XXX untested
_lib.OPENSSL_free(result_buffer[0])
@@ -622,6 +635,7 @@ class X509Name(object):
return NotImplemented
result = _lib.X509_NAME_cmp(self._name, other._name)
return op(result, 0)
+
return f
__eq__ = _cmp(__eq__)
@@ -639,11 +653,13 @@ class X509Name(object):
"""
result_buffer = _ffi.new("char[]", 512)
format_result = _lib.X509_NAME_oneline(
- self._name, result_buffer, len(result_buffer))
+ self._name, result_buffer, len(result_buffer)
+ )
_openssl_assert(format_result != _ffi.NULL)
return "<X509Name object '%s'>" % (
- _native(_ffi.string(result_buffer)),)
+ _native(_ffi.string(result_buffer)),
+ )
def hash(self):
"""
@@ -664,7 +680,7 @@ class X509Name(object):
:return: The DER encoded form of this name.
:rtype: :py:class:`bytes`
"""
- result_buffer = _ffi.new('unsigned char**')
+ result_buffer = _ffi.new("unsigned char**")
encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
_openssl_assert(encode_result >= 0)
@@ -691,8 +707,9 @@ class X509Name(object):
# ffi.string does not handle strings containing NULL bytes
# (which may have been generated by old, broken software)
- value = _ffi.buffer(_lib.ASN1_STRING_data(fval),
- _lib.ASN1_STRING_length(fval))[:]
+ value = _ffi.buffer(
+ _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval)
+ )[:]
result.append((_ffi.string(name), value))
return result
@@ -793,7 +810,8 @@ class X509Extension(object):
parts.append(_native(_bio_to_string(bio)))
else:
value = _native(
- _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:])
+ _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]
+ )
parts.append(label + ":" + value)
return ", ".join(parts)
@@ -843,7 +861,7 @@ class X509Extension(object):
.. versionadded:: 0.12
"""
octet_result = _lib.X509_EXTENSION_get_data(self._extension)
- string_result = _ffi.cast('ASN1_STRING*', octet_result)
+ string_result = _ffi.cast("ASN1_STRING*", octet_result)
char_result = _lib.ASN1_STRING_data(string_result)
result_length = _lib.ASN1_STRING_length(string_result)
return _ffi.buffer(char_result, result_length)[:]
@@ -869,8 +887,9 @@ class X509Req(object):
.. versionadded:: 17.1.0
"""
from cryptography.hazmat.backends.openssl.x509 import (
- _CertificateSigningRequest
+ _CertificateSigningRequest,
)
+
backend = _get_backend()
return _CertificateSigningRequest(backend, self._req)
@@ -1052,6 +1071,7 @@ class X509(object):
"""
An X.509 certificate.
"""
+
def __init__(self):
x509 = _lib.X509_new()
_openssl_assert(x509 != _ffi.NULL)
@@ -1077,6 +1097,7 @@ class X509(object):
.. versionadded:: 17.1.0
"""
from cryptography.hazmat.backends.openssl.x509 import _Certificate
+
backend = _get_backend()
return _Certificate(backend, self._x509)
@@ -1218,12 +1239,16 @@ class X509(object):
result_length[0] = len(result_buffer)
digest_result = _lib.X509_digest(
- self._x509, digest, result_buffer, result_length)
+ self._x509, digest, result_buffer, result_length
+ )
_openssl_assert(digest_result == 1)
- return b":".join([
- b16encode(ch).upper() for ch
- in _ffi.buffer(result_buffer, result_length[0])])
+ return b":".join(
+ [
+ b16encode(ch).upper()
+ for ch in _ffi.buffer(result_buffer, result_length[0])
+ ]
+ )
def subject_name_hash(self):
"""
@@ -1248,7 +1273,7 @@ class X509(object):
hex_serial = hex(serial)[2:]
if not isinstance(hex_serial, bytes):
- hex_serial = hex_serial.encode('ascii')
+ hex_serial = hex_serial.encode("ascii")
bignum_serial = _ffi.new("BIGNUM**")
@@ -1259,7 +1284,8 @@ class X509(object):
if bignum_serial[0] == _ffi.NULL:
set_result = _lib.ASN1_INTEGER_set(
- _lib.X509_get_serialNumber(self._x509), small_serial)
+ _lib.X509_get_serialNumber(self._x509), small_serial
+ )
if set_result:
# TODO Not tested
_raise_current_error()
@@ -1524,6 +1550,7 @@ class X509StoreFlags(object):
.. _OpenSSL Verification Flags:
https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html
"""
+
CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK
CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL
IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL
@@ -1644,7 +1671,7 @@ class X509Store(object):
param = _lib.X509_VERIFY_PARAM_new()
param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free)
- _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime('%s')))
+ _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime("%s")))
_openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
@@ -1722,8 +1749,13 @@ class X509StoreContext(object):
errors = [
_lib.X509_STORE_CTX_get_error(self._store_ctx),
_lib.X509_STORE_CTX_get_error_depth(self._store_ctx),
- _native(_ffi.string(_lib.X509_verify_cert_error_string(
- _lib.X509_STORE_CTX_get_error(self._store_ctx)))),
+ _native(
+ _ffi.string(
+ _lib.X509_verify_cert_error_string(
+ _lib.X509_STORE_CTX_get_error(self._store_ctx)
+ )
+ )
+ ),
]
# A context error should always be associated with a certificate, so we
# expect this call to never return :class:`None`.
@@ -1787,8 +1819,7 @@ def load_certificate(type, buffer):
elif type == FILETYPE_ASN1:
x509 = _lib.d2i_X509_bio(bio, _ffi.NULL)
else:
- raise ValueError(
- "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
+ raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
if x509 == _ffi.NULL:
_raise_current_error()
@@ -1817,7 +1848,8 @@ def dump_certificate(type, cert):
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
- "FILETYPE_TEXT")
+ "FILETYPE_TEXT"
+ )
_openssl_assert(result_code == 1)
return _bio_to_string(bio)
@@ -1873,7 +1905,8 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
if passphrase is None:
raise TypeError(
"if a value is given for cipher "
- "one must also be given for passphrase")
+ "one must also be given for passphrase"
+ )
cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
if cipher_obj == _ffi.NULL:
raise ValueError("Invalid cipher name")
@@ -1883,8 +1916,14 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
helper = _PassphraseHelper(type, passphrase)
if type == FILETYPE_PEM:
result_code = _lib.PEM_write_bio_PrivateKey(
- bio, pkey._pkey, cipher_obj, _ffi.NULL, 0,
- helper.callback, helper.callback_args)
+ bio,
+ pkey._pkey,
+ cipher_obj,
+ _ffi.NULL,
+ 0,
+ helper.callback,
+ helper.callback_args,
+ )
helper.raise_if_problem()
elif type == FILETYPE_ASN1:
result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey)
@@ -1892,15 +1931,13 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA:
raise TypeError("Only RSA keys are supported for FILETYPE_TEXT")
- rsa = _ffi.gc(
- _lib.EVP_PKEY_get1_RSA(pkey._pkey),
- _lib.RSA_free
- )
+ rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free)
result_code = _lib.RSA_print(bio, rsa, 0)
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
- "FILETYPE_TEXT")
+ "FILETYPE_TEXT"
+ )
_openssl_assert(result_code != 0)
@@ -1911,6 +1948,7 @@ class Revoked(object):
"""
A certificate revocation.
"""
+
# https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points
# which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches
# OCSP_crl_reason_str. We use the latter, just like the command line
@@ -1950,7 +1988,8 @@ class Revoked(object):
asn1_serial = _ffi.gc(
_lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL),
- _lib.ASN1_INTEGER_free)
+ _lib.ASN1_INTEGER_free,
+ )
_lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
def get_serial(self):
@@ -2001,7 +2040,7 @@ class Revoked(object):
elif not isinstance(reason, bytes):
raise TypeError("reason must be None or a byte string")
else:
- reason = reason.lower().replace(b' ', b'')
+ reason = reason.lower().replace(b" ", b"")
reason_code = [r.lower() for r in self._crl_reasons].index(reason)
new_reason_ext = _lib.ASN1_ENUMERATED_new()
@@ -2013,7 +2052,8 @@ class Revoked(object):
self._delete_reason()
add_result = _lib.X509_REVOKED_add1_ext_i2d(
- self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0)
+ self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0
+ )
_openssl_assert(add_result == 1)
def get_reason(self):
@@ -2095,8 +2135,9 @@ class CRL(object):
.. versionadded:: 17.1.0
"""
from cryptography.hazmat.backends.openssl.x509 import (
- _CertificateRevocationList
+ _CertificateRevocationList,
)
+
backend = _get_backend()
return _CertificateRevocationList(backend, self._crl)
@@ -2236,13 +2277,15 @@ class CRL(object):
digest_obj = _lib.EVP_get_digestbyname(digest)
_openssl_assert(digest_obj != _ffi.NULL)
_lib.X509_CRL_set_issuer_name(
- self._crl, _lib.X509_get_subject_name(issuer_cert._x509))
+ self._crl, _lib.X509_get_subject_name(issuer_cert._x509)
+ )
_lib.X509_CRL_sort(self._crl)
result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj)
_openssl_assert(result != 0)
- def export(self, cert, key, type=FILETYPE_PEM, days=100,
- digest=_UNSPECIFIED):
+ def export(
+ self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED
+ ):
"""
Export the CRL as a string.
@@ -2500,10 +2543,17 @@ class PKCS12(object):
cert = self._cert._x509
pkcs12 = _lib.PKCS12_create(
- passphrase, friendlyname, pkey, cert, cacerts,
+ passphrase,
+ friendlyname,
+ pkey,
+ cert,
+ cacerts,
_lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
_lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
- iter, maciter, 0)
+ iter,
+ maciter,
+ 0,
+ )
if pkcs12 == _ffi.NULL:
_raise_current_error()
pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free)
@@ -2667,7 +2717,7 @@ class _PassphraseHelper(object):
"passphrase returned by callback is too long"
)
for i in range(len(result)):
- buf[i] = result[i:i + 1]
+ buf[i] = result[i : i + 1]
return len(result)
except Exception as e:
self._problems.append(e)
@@ -2692,7 +2742,8 @@ def load_publickey(type, buffer):
if type == FILETYPE_PEM:
evp_pkey = _lib.PEM_read_bio_PUBKEY(
- bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
+ bio, _ffi.NULL, _ffi.NULL, _ffi.NULL
+ )
elif type == FILETYPE_ASN1:
evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL)
else:
@@ -2728,7 +2779,8 @@ def load_privatekey(type, buffer, passphrase=None):
helper = _PassphraseHelper(type, passphrase)
if type == FILETYPE_PEM:
evp_pkey = _lib.PEM_read_bio_PrivateKey(
- bio, _ffi.NULL, helper.callback, helper.callback_args)
+ bio, _ffi.NULL, helper.callback, helper.callback_args
+ )
helper.raise_if_problem()
elif type == FILETYPE_ASN1:
evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL)
@@ -2827,7 +2879,8 @@ def sign(pkey, data, digest):
signature_buffer = _ffi.new("unsigned char[]", length)
signature_length = _ffi.new("unsigned int *")
final_result = _lib.EVP_SignFinal(
- md_ctx, signature_buffer, signature_length, pkey._pkey)
+ md_ctx, signature_buffer, signature_length, pkey._pkey
+ )
_openssl_assert(final_result == 1)
return _ffi.buffer(signature_buffer, signature_length[0])[:]
@@ -2891,7 +2944,8 @@ def dump_crl(type, crl):
else:
raise ValueError(
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
- "FILETYPE_TEXT")
+ "FILETYPE_TEXT"
+ )
_openssl_assert(ret == 1)
return _bio_to_string(bio)
@@ -3061,4 +3115,4 @@ _lib.SSL_load_error_strings()
# Set the default string mask to match OpenSSL upstream (since 2005) and
# RFC5280 recommendations.
-_lib.ASN1_STRING_set_default_mask_asc(b'utf8only')
+_lib.ASN1_STRING_set_default_mask_asc(b"utf8only")