diff options
author | Hynek Schlawack <hs@ox.cx> | 2015-10-16 20:18:38 +0200 |
---|---|---|
committer | Hynek Schlawack <hs@ox.cx> | 2015-10-17 11:27:56 +0200 |
commit | f0e6685b0880bbc30099c998454925ee8d39b14e (patch) | |
tree | ef387dada69f8fe9830fa1c9d9d289760fbbe5c1 /src/OpenSSL | |
parent | 510a04eb7cb97fda2062c0c8b51724db2edbd855 (diff) | |
download | pyopenssl-f0e6685b0880bbc30099c998454925ee8d39b14e.tar.gz |
Move package into src
Prevents accidental imports when running tests.
Diffstat (limited to 'src/OpenSSL')
-rw-r--r-- | src/OpenSSL/SSL.py | 1915 | ||||
-rw-r--r-- | src/OpenSSL/__init__.py | 20 | ||||
-rw-r--r-- | src/OpenSSL/_util.py | 125 | ||||
-rw-r--r-- | src/OpenSSL/crypto.py | 2741 | ||||
-rw-r--r-- | src/OpenSSL/rand.py | 174 | ||||
-rw-r--r-- | src/OpenSSL/tsafe.py | 28 | ||||
-rw-r--r-- | src/OpenSSL/version.py | 22 |
7 files changed, 5025 insertions, 0 deletions
diff --git a/src/OpenSSL/SSL.py b/src/OpenSSL/SSL.py new file mode 100644 index 0000000..190e4ac --- /dev/null +++ b/src/OpenSSL/SSL.py @@ -0,0 +1,1915 @@ +import socket +from sys import platform +from functools import wraps, partial +from itertools import count, chain +from weakref import WeakValueDictionary +from errno import errorcode + +from six import text_type as _text_type +from six import binary_type as _binary_type +from six import integer_types as integer_types +from six import int2byte, indexbytes + +from OpenSSL._util import ( + ffi as _ffi, + lib as _lib, + exception_from_error_queue as _exception_from_error_queue, + native as _native, + text_to_bytes_and_warn as _text_to_bytes_and_warn, + path_string as _path_string, + UNSPECIFIED as _UNSPECIFIED, +) + +from OpenSSL.crypto import ( + FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store) + +try: + _memoryview = memoryview +except NameError: + class _memoryview(object): + pass + +try: + _buffer = buffer +except NameError: + class _buffer(object): + pass + +OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER +SSLEAY_VERSION = _lib.SSLEAY_VERSION +SSLEAY_CFLAGS = _lib.SSLEAY_CFLAGS +SSLEAY_PLATFORM = _lib.SSLEAY_PLATFORM +SSLEAY_DIR = _lib.SSLEAY_DIR +SSLEAY_BUILT_ON = _lib.SSLEAY_BUILT_ON + +SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN +RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN + +SSLv2_METHOD = 1 +SSLv3_METHOD = 2 +SSLv23_METHOD = 3 +TLSv1_METHOD = 4 +TLSv1_1_METHOD = 5 +TLSv1_2_METHOD = 6 + +OP_NO_SSLv2 = _lib.SSL_OP_NO_SSLv2 +OP_NO_SSLv3 = _lib.SSL_OP_NO_SSLv3 +OP_NO_TLSv1 = _lib.SSL_OP_NO_TLSv1 + +OP_NO_TLSv1_1 = getattr(_lib, "SSL_OP_NO_TLSv1_1", 0) +OP_NO_TLSv1_2 = getattr(_lib, "SSL_OP_NO_TLSv1_2", 0) + +try: + MODE_RELEASE_BUFFERS = _lib.SSL_MODE_RELEASE_BUFFERS +except AttributeError: + pass + +OP_SINGLE_DH_USE = _lib.SSL_OP_SINGLE_DH_USE +OP_SINGLE_ECDH_USE = _lib.SSL_OP_SINGLE_ECDH_USE +OP_EPHEMERAL_RSA = _lib.SSL_OP_EPHEMERAL_RSA +OP_MICROSOFT_SESS_ID_BUG = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG +OP_NETSCAPE_CHALLENGE_BUG = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG +OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG = ( + _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG +) +OP_SSLREF2_REUSE_CERT_TYPE_BUG = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG +OP_MICROSOFT_BIG_SSLV3_BUFFER = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER +try: + OP_MSIE_SSLV2_RSA_PADDING = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING +except AttributeError: + pass +OP_SSLEAY_080_CLIENT_DH_BUG = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG +OP_TLS_D5_BUG = _lib.SSL_OP_TLS_D5_BUG +OP_TLS_BLOCK_PADDING_BUG = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG +OP_DONT_INSERT_EMPTY_FRAGMENTS = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS +OP_CIPHER_SERVER_PREFERENCE = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE +OP_TLS_ROLLBACK_BUG = _lib.SSL_OP_TLS_ROLLBACK_BUG +OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1 +OP_PKCS1_CHECK_2 = _lib.SSL_OP_PKCS1_CHECK_2 +OP_NETSCAPE_CA_DN_BUG = _lib.SSL_OP_NETSCAPE_CA_DN_BUG +OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG = ( + _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG +) +try: + OP_NO_COMPRESSION = _lib.SSL_OP_NO_COMPRESSION +except AttributeError: + pass + +OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU +OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE +try: + OP_NO_TICKET = _lib.SSL_OP_NO_TICKET +except AttributeError: + pass + +OP_ALL = _lib.SSL_OP_ALL + +VERIFY_PEER = _lib.SSL_VERIFY_PEER +VERIFY_FAIL_IF_NO_PEER_CERT = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT +VERIFY_CLIENT_ONCE = _lib.SSL_VERIFY_CLIENT_ONCE +VERIFY_NONE = _lib.SSL_VERIFY_NONE + +SESS_CACHE_OFF = _lib.SSL_SESS_CACHE_OFF +SESS_CACHE_CLIENT = _lib.SSL_SESS_CACHE_CLIENT +SESS_CACHE_SERVER = _lib.SSL_SESS_CACHE_SERVER +SESS_CACHE_BOTH = _lib.SSL_SESS_CACHE_BOTH +SESS_CACHE_NO_AUTO_CLEAR = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR +SESS_CACHE_NO_INTERNAL_LOOKUP = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP +SESS_CACHE_NO_INTERNAL_STORE = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE +SESS_CACHE_NO_INTERNAL = _lib.SSL_SESS_CACHE_NO_INTERNAL + +SSL_ST_CONNECT = _lib.SSL_ST_CONNECT +SSL_ST_ACCEPT = _lib.SSL_ST_ACCEPT +SSL_ST_MASK = _lib.SSL_ST_MASK +SSL_ST_INIT = _lib.SSL_ST_INIT +SSL_ST_BEFORE = _lib.SSL_ST_BEFORE +SSL_ST_OK = _lib.SSL_ST_OK +SSL_ST_RENEGOTIATE = _lib.SSL_ST_RENEGOTIATE + +SSL_CB_LOOP = _lib.SSL_CB_LOOP +SSL_CB_EXIT = _lib.SSL_CB_EXIT +SSL_CB_READ = _lib.SSL_CB_READ +SSL_CB_WRITE = _lib.SSL_CB_WRITE +SSL_CB_ALERT = _lib.SSL_CB_ALERT +SSL_CB_READ_ALERT = _lib.SSL_CB_READ_ALERT +SSL_CB_WRITE_ALERT = _lib.SSL_CB_WRITE_ALERT +SSL_CB_ACCEPT_LOOP = _lib.SSL_CB_ACCEPT_LOOP +SSL_CB_ACCEPT_EXIT = _lib.SSL_CB_ACCEPT_EXIT +SSL_CB_CONNECT_LOOP = _lib.SSL_CB_CONNECT_LOOP +SSL_CB_CONNECT_EXIT = _lib.SSL_CB_CONNECT_EXIT +SSL_CB_HANDSHAKE_START = _lib.SSL_CB_HANDSHAKE_START +SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE + + +class Error(Exception): + """ + An error occurred in an `OpenSSL.SSL` API. + """ + + +_raise_current_error = partial(_exception_from_error_queue, Error) + + +class WantReadError(Error): + pass + + +class WantWriteError(Error): + pass + + +class WantX509LookupError(Error): + pass + + +class ZeroReturnError(Error): + pass + + +class SysCallError(Error): + pass + + +class _CallbackExceptionHelper(object): + """ + A base class for wrapper classes that allow for intelligent exception + handling in OpenSSL callbacks. + + :ivar list _problems: Any exceptions that occurred while executing in a + context where they could not be raised in the normal way. Typically + this is because OpenSSL has called into some Python code and requires a + return value. The exceptions are saved to be raised later when it is + possible to do so. + """ + + def __init__(self): + self._problems = [] + + def raise_if_problem(self): + """ + Raise an exception from the OpenSSL error queue or that was previously + captured whe running a callback. + """ + if self._problems: + try: + _raise_current_error() + except Error: + pass + raise self._problems.pop(0) + + +class _VerifyHelper(_CallbackExceptionHelper): + """ + Wrap a callback such that it can be used as a certificate verification + callback. + """ + + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ok, store_ctx): + cert = X509.__new__(X509) + cert._x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx) + error_number = _lib.X509_STORE_CTX_get_error(store_ctx) + error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx) + + index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx() + ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index) + connection = Connection._reverse_mapping[ssl] + + try: + result = callback( + connection, cert, error_number, error_depth, ok + ) + except Exception as e: + self._problems.append(e) + return 0 + else: + if result: + _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK) + return 1 + else: + return 0 + + self.callback = _ffi.callback( + "int (*)(int, X509_STORE_CTX *)", wrapper) + + +class _NpnAdvertiseHelper(_CallbackExceptionHelper): + """ + Wrap a callback such that it can be used as an NPN advertisement callback. + """ + + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ssl, out, outlen, arg): + try: + conn = Connection._reverse_mapping[ssl] + protos = callback(conn) + + # Join the protocols into a Python bytestring, length-prefixing + # each element. + protostr = b''.join( + chain.from_iterable((int2byte(len(p)), p) for p in protos) + ) + + # Save our callback arguments on the connection object. This is + # done to make sure that they don't get freed before OpenSSL + # uses them. Then, return them appropriately in the output + # parameters. + conn._npn_advertise_callback_args = [ + _ffi.new("unsigned int *", len(protostr)), + _ffi.new("unsigned char[]", protostr), + ] + outlen[0] = conn._npn_advertise_callback_args[0][0] + out[0] = conn._npn_advertise_callback_args[1] + return 0 + except Exception as e: + self._problems.append(e) + return 2 # SSL_TLSEXT_ERR_ALERT_FATAL + + self.callback = _ffi.callback( + "int (*)(SSL *, const unsigned char **, unsigned int *, void *)", + wrapper + ) + + +class _NpnSelectHelper(_CallbackExceptionHelper): + """ + Wrap a callback such that it can be used as an NPN selection callback. + """ + + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ssl, out, outlen, in_, inlen, arg): + try: + conn = Connection._reverse_mapping[ssl] + + # The string passed to us is actually made up of multiple + # length-prefixed bytestrings. We need to split that into a + # list. + instr = _ffi.buffer(in_, inlen)[:] + protolist = [] + while instr: + l = indexbytes(instr, 0) + proto = instr[1:l + 1] + protolist.append(proto) + instr = instr[l + 1:] + + # Call the callback + outstr = callback(conn, protolist) + + # Save our callback arguments on the connection object. This is + # done to make sure that they don't get freed before OpenSSL + # uses them. Then, return them appropriately in the output + # parameters. + conn._npn_select_callback_args = [ + _ffi.new("unsigned char *", len(outstr)), + _ffi.new("unsigned char[]", outstr), + ] + outlen[0] = conn._npn_select_callback_args[0][0] + out[0] = conn._npn_select_callback_args[1] + return 0 + except Exception as e: + self._problems.append(e) + return 2 # SSL_TLSEXT_ERR_ALERT_FATAL + + self.callback = _ffi.callback( + ("int (*)(SSL *, unsigned char **, unsigned char *, " + "const unsigned char *, unsigned int, void *)"), + wrapper + ) + + +class _ALPNSelectHelper(_CallbackExceptionHelper): + """ + Wrap a callback such that it can be used as an ALPN selection callback. + """ + + def __init__(self, callback): + _CallbackExceptionHelper.__init__(self) + + @wraps(callback) + def wrapper(ssl, out, outlen, in_, inlen, arg): + try: + conn = Connection._reverse_mapping[ssl] + + # The string passed to us is made up of multiple + # length-prefixed bytestrings. We need to split that into a + # list. + instr = _ffi.buffer(in_, inlen)[:] + protolist = [] + while instr: + encoded_len = indexbytes(instr, 0) + proto = instr[1:encoded_len + 1] + protolist.append(proto) + instr = instr[encoded_len + 1:] + + # Call the callback + outstr = callback(conn, protolist) + + if not isinstance(outstr, _binary_type): + raise TypeError("ALPN callback must return a bytestring.") + + # Save our callback arguments on the connection object to make + # sure that they don't get freed before OpenSSL can use them. + # Then, return them in the appropriate output parameters. + conn._alpn_select_callback_args = [ + _ffi.new("unsigned char *", len(outstr)), + _ffi.new("unsigned char[]", outstr), + ] + outlen[0] = conn._alpn_select_callback_args[0][0] + out[0] = conn._alpn_select_callback_args[1] + return 0 + except Exception as e: + self._problems.append(e) + return 2 # SSL_TLSEXT_ERR_ALERT_FATAL + + self.callback = _ffi.callback( + ("int (*)(SSL *, unsigned char **, unsigned char *, " + "const unsigned char *, unsigned int, void *)"), + wrapper + ) + + +def _asFileDescriptor(obj): + fd = None + if not isinstance(obj, integer_types): + meth = getattr(obj, "fileno", None) + if meth is not None: + obj = meth() + + if isinstance(obj, integer_types): + fd = obj + + if not isinstance(fd, integer_types): + raise TypeError("argument must be an int, or have a fileno() method.") + elif fd < 0: + raise ValueError( + "file descriptor cannot be a negative integer (%i)" % (fd,)) + + return fd + + +def SSLeay_version(type): + """ + Return a string describing the version of OpenSSL in use. + + :param type: One of the SSLEAY_ constants defined in this module. + """ + return _ffi.string(_lib.SSLeay_version(type)) + + +def _requires_npn(func): + """ + Wraps any function that requires NPN support in OpenSSL, ensuring that + NotImplementedError is raised if NPN is not present. + """ + @wraps(func) + def wrapper(*args, **kwargs): + if not _lib.Cryptography_HAS_NEXTPROTONEG: + raise NotImplementedError("NPN not available.") + + return func(*args, **kwargs) + + return wrapper + + +def _requires_alpn(func): + """ + Wraps any function that requires ALPN support in OpenSSL, ensuring that + NotImplementedError is raised if ALPN support is not present. + """ + @wraps(func) + def wrapper(*args, **kwargs): + if not _lib.Cryptography_HAS_ALPN: + raise NotImplementedError("ALPN not available.") + + return func(*args, **kwargs) + + return wrapper + + +class Session(object): + pass + + +class Context(object): + """ + :py:obj:`OpenSSL.SSL.Context` instances define the parameters for setting + up new SSL connections. + """ + _methods = { + SSLv2_METHOD: "SSLv2_method", + SSLv3_METHOD: "SSLv3_method", + SSLv23_METHOD: "SSLv23_method", + TLSv1_METHOD: "TLSv1_method", + TLSv1_1_METHOD: "TLSv1_1_method", + TLSv1_2_METHOD: "TLSv1_2_method", + } + _methods = dict( + (identifier, getattr(_lib, name)) + for (identifier, name) in _methods.items() + if getattr(_lib, name, None) is not None) + + def __init__(self, method): + """ + :param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or + TLSv1_METHOD. + """ + if not isinstance(method, integer_types): + raise TypeError("method must be an integer") + + try: + method_func = self._methods[method] + except KeyError: + raise ValueError("No such protocol") + + method_obj = method_func() + if method_obj == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + context = _lib.SSL_CTX_new(method_obj) + if context == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + context = _ffi.gc(context, _lib.SSL_CTX_free) + + self._context = context + self._passphrase_helper = None + self._passphrase_callback = None + self._passphrase_userdata = None + self._verify_helper = None + self._verify_callback = None + self._info_callback = None + self._tlsext_servername_callback = None + self._app_data = None + self._npn_advertise_helper = None + self._npn_advertise_callback = None + self._npn_select_helper = None + self._npn_select_callback = None + self._alpn_select_helper = None + self._alpn_select_callback = None + + # SSL_CTX_set_app_data(self->ctx, self); + # SSL_CTX_set_mode(self->ctx, SSL_MODE_ENABLE_PARTIAL_WRITE | + # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER | + # SSL_MODE_AUTO_RETRY); + self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE) + + def load_verify_locations(self, cafile, capath=None): + """ + Let SSL know where we can find trusted certificates for the certificate + chain + + :param cafile: In which file we can find the certificates (``bytes`` or + ``unicode``). + :param capath: In which directory we can find the certificates + (``bytes`` or ``unicode``). + + :return: None + """ + if cafile is None: + cafile = _ffi.NULL + else: + cafile = _path_string(cafile) + + if capath is None: + capath = _ffi.NULL + else: + capath = _path_string(capath) + + load_result = _lib.SSL_CTX_load_verify_locations( + self._context, cafile, capath + ) + if not load_result: + _raise_current_error() + + def _wrap_callback(self, callback): + @wraps(callback) + def wrapper(size, verify, userdata): + return callback(size, verify, self._passphrase_userdata) + return _PassphraseHelper( + FILETYPE_PEM, wrapper, more_args=True, truncate=True) + + def set_passwd_cb(self, callback, userdata=None): + """ + Set the passphrase callback + + :param callback: The Python callback to use + :param userdata: (optional) A Python object which will be given as + argument to the callback + :return: None + """ + if not callable(callback): + raise TypeError("callback must be callable") + + self._passphrase_helper = self._wrap_callback(callback) + self._passphrase_callback = self._passphrase_helper.callback + _lib.SSL_CTX_set_default_passwd_cb( + self._context, self._passphrase_callback) + self._passphrase_userdata = userdata + + def set_default_verify_paths(self): + """ + Use the platform-specific CA certificate locations + + :return: None + """ + set_result = _lib.SSL_CTX_set_default_verify_paths(self._context) + if not set_result: + # TODO: This is untested. + _raise_current_error() + + def use_certificate_chain_file(self, certfile): + """ + Load a certificate chain from a file + + :param certfile: The name of the certificate chain file (``bytes`` or + ``unicode``). + + :return: None + """ + certfile = _path_string(certfile) + + result = _lib.SSL_CTX_use_certificate_chain_file( + self._context, certfile + ) + if not result: + _raise_current_error() + + def use_certificate_file(self, certfile, filetype=FILETYPE_PEM): + """ + Load a certificate from a file + + :param certfile: The name of the certificate file (``bytes`` or + ``unicode``). + :param filetype: (optional) The encoding of the file, default is PEM + + :return: None + """ + certfile = _path_string(certfile) + if not isinstance(filetype, integer_types): + raise TypeError("filetype must be an integer") + + use_result = _lib.SSL_CTX_use_certificate_file( + self._context, certfile, filetype + ) + if not use_result: + _raise_current_error() + + def use_certificate(self, cert): + """ + Load a certificate from a X509 object + + :param cert: The X509 object + :return: None + """ + if not isinstance(cert, X509): + raise TypeError("cert must be an X509 instance") + + use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509) + if not use_result: + _raise_current_error() + + def add_extra_chain_cert(self, certobj): + """ + Add certificate to chain + + :param certobj: The X509 certificate object to add to the chain + :return: None + """ + if not isinstance(certobj, X509): + raise TypeError("certobj must be an X509 instance") + + copy = _lib.X509_dup(certobj._x509) + add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy) + if not add_result: + # TODO: This is untested. + _lib.X509_free(copy) + _raise_current_error() + + def _raise_passphrase_exception(self): + if self._passphrase_helper is None: + _raise_current_error() + exception = self._passphrase_helper.raise_if_problem(Error) + if exception is not None: + raise exception + + def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED): + """ + Load a private key from a file + + :param keyfile: The name of the key file (``bytes`` or ``unicode``) + :param filetype: (optional) The encoding of the file, default is PEM + + :return: None + """ + keyfile = _path_string(keyfile) + + if filetype is _UNSPECIFIED: + filetype = FILETYPE_PEM + elif not isinstance(filetype, integer_types): + raise TypeError("filetype must be an integer") + + use_result = _lib.SSL_CTX_use_PrivateKey_file( + self._context, keyfile, filetype) + if not use_result: + self._raise_passphrase_exception() + + def use_privatekey(self, pkey): + """ + Load a private key from a PKey object + + :param pkey: The PKey object + :return: None + """ + if not isinstance(pkey, PKey): + raise TypeError("pkey must be a PKey instance") + + use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey) + if not use_result: + self._raise_passphrase_exception() + + def check_privatekey(self): + """ + Check that the private key and certificate match up + + :return: None (raises an exception if something's wrong) + """ + if not _lib.SSL_CTX_check_private_key(self._context): + _raise_current_error() + + def load_client_ca(self, cafile): + """ + Load the trusted certificates that will be sent to the client + (basically telling the client "These are the guys I trust"). Does not + actually imply any of the certificates are trusted; that must be + configured separately. + + :param cafile: The name of the certificates file + :return: None + """ + + def set_session_id(self, buf): + """ + Set the session identifier. This is needed if you want to do session + resumption. + + :param buf: A Python object that can be safely converted to a string + :returns: None + """ + + def set_session_cache_mode(self, mode): + """ + Enable/disable session caching and specify the mode used. + + :param mode: One or more of the SESS_CACHE_* flags (combine using + bitwise or) + :returns: The previously set caching mode. + """ + if not isinstance(mode, integer_types): + raise TypeError("mode must be an integer") + + return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) + + def get_session_cache_mode(self): + """ + :returns: The currently used cache mode. + """ + return _lib.SSL_CTX_get_session_cache_mode(self._context) + + def set_verify(self, mode, callback): + """ + Set the verify mode and verify callback + + :param mode: The verify mode, this is either VERIFY_NONE or + VERIFY_PEER combined with possible other flags + :param callback: The Python callback to use + :return: None + + See SSL_CTX_set_verify(3SSL) for further details. + """ + if not isinstance(mode, integer_types): + raise TypeError("mode must be an integer") + + if not callable(callback): + raise TypeError("callback must be callable") + + self._verify_helper = _VerifyHelper(callback) + self._verify_callback = self._verify_helper.callback + _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) + + def set_verify_depth(self, depth): + """ + Set the verify depth + + :param depth: An integer specifying the verify depth + :return: None + """ + if not isinstance(depth, integer_types): + raise TypeError("depth must be an integer") + + _lib.SSL_CTX_set_verify_depth(self._context, depth) + + def get_verify_mode(self): + """ + Get the verify mode + + :return: The verify mode + """ + return _lib.SSL_CTX_get_verify_mode(self._context) + + def get_verify_depth(self): + """ + Get the verify depth + + :return: The verify depth + """ + return _lib.SSL_CTX_get_verify_depth(self._context) + + def load_tmp_dh(self, dhfile): + """ + Load parameters for Ephemeral Diffie-Hellman + + :param dhfile: The file to load EDH parameters from (``bytes`` or + ``unicode``). + + :return: None + """ + dhfile = _path_string(dhfile) + + bio = _lib.BIO_new_file(dhfile, b"r") + if bio == _ffi.NULL: + _raise_current_error() + bio = _ffi.gc(bio, _lib.BIO_free) + + dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) + dh = _ffi.gc(dh, _lib.DH_free) + _lib.SSL_CTX_set_tmp_dh(self._context, dh) + + def set_tmp_ecdh(self, curve): + """ + Select a curve to use for ECDHE key exchange. + + :param curve: A curve object to use as returned by either + :py:meth:`OpenSSL.crypto.get_elliptic_curve` or + :py:meth:`OpenSSL.crypto.get_elliptic_curves`. + + :return: None + """ + _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY()) + + def set_cipher_list(self, cipher_list): + """ + Change the cipher list + + :param cipher_list: A cipher list, see ciphers(1) + :return: None + """ + if isinstance(cipher_list, _text_type): + cipher_list = cipher_list.encode("ascii") + + if not isinstance(cipher_list, bytes): + raise TypeError("cipher_list must be bytes or unicode") + + result = _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) + if not result: + _raise_current_error() + + def set_client_ca_list(self, certificate_authorities): + """ + Set the list of preferred client certificate signers for this server + context. + + This list of certificate authorities will be sent to the client when + the server requests a client certificate. + + :param certificate_authorities: a sequence of X509Names. + :return: None + """ + name_stack = _lib.sk_X509_NAME_new_null() + if name_stack == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + try: + for ca_name in certificate_authorities: + if not isinstance(ca_name, X509Name): + raise TypeError( + "client CAs must be X509Name objects, not %s " + "objects" % ( + type(ca_name).__name__, + ) + ) + copy = _lib.X509_NAME_dup(ca_name._name) + if copy == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + push_result = _lib.sk_X509_NAME_push(name_stack, copy) + if not push_result: + _lib.X509_NAME_free(copy) + _raise_current_error() + except: + _lib.sk_X509_NAME_free(name_stack) + raise + + _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) + + def add_client_ca(self, certificate_authority): + """ + Add the CA certificate to the list of preferred signers for this + context. + + The list of certificate authorities will be sent to the client when the + server requests a client certificate. + + :param certificate_authority: certificate authority's X509 certificate. + :return: None + """ + if not isinstance(certificate_authority, X509): + raise TypeError("certificate_authority must be an X509 instance") + + add_result = _lib.SSL_CTX_add_client_CA( + self._context, certificate_authority._x509) + if not add_result: + # TODO: This is untested. + _raise_current_error() + + def set_timeout(self, timeout): + """ + Set session timeout + + :param timeout: The timeout in seconds + :return: The previous session timeout + """ + if not isinstance(timeout, integer_types): + raise TypeError("timeout must be an integer") + + return _lib.SSL_CTX_set_timeout(self._context, timeout) + + def get_timeout(self): + """ + Get the session timeout + + :return: The session timeout + """ + return _lib.SSL_CTX_get_timeout(self._context) + + def set_info_callback(self, callback): + """ + Set the info callback + + :param callback: The Python callback to use + :return: None + """ + @wraps(callback) + def wrapper(ssl, where, return_code): + callback(Connection._reverse_mapping[ssl], where, return_code) + self._info_callback = _ffi.callback( + "void (*)(const SSL *, int, int)", wrapper) + _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) + + def get_app_data(self): + """ + Get the application data (supplied via set_app_data()) + + :return: The application data + """ + return self._app_data + + def set_app_data(self, data): + """ + Set the application data (will be returned from get_app_data()) + + :param data: Any Python object + :return: None + """ + self._app_data = data + + def get_cert_store(self): + """ + Get the certificate store for the context. + + :return: A X509Store object or None if it does not have one. + """ + store = _lib.SSL_CTX_get_cert_store(self._context) + if store == _ffi.NULL: + # TODO: This is untested. + return None + + pystore = X509Store.__new__(X509Store) + pystore._store = store + return pystore + + def set_options(self, options): + """ + Add options. Options set before are not cleared! + + :param options: The options to add. + :return: The new option bitmask. + """ + if not isinstance(options, integer_types): + raise TypeError("options must be an integer") + + return _lib.SSL_CTX_set_options(self._context, options) + + def set_mode(self, mode): + """ + Add modes via bitmask. Modes set before are not cleared! + + :param mode: The mode to add. + :return: The new mode bitmask. + """ + if not isinstance(mode, integer_types): + raise TypeError("mode must be an integer") + + return _lib.SSL_CTX_set_mode(self._context, mode) + + def set_tlsext_servername_callback(self, callback): + """ + Specify a callback function to be called when clients specify a server + name. + + :param callback: The callback function. It will be invoked with one + argument, the Connection instance. + """ + @wraps(callback) + def wrapper(ssl, alert, arg): + callback(Connection._reverse_mapping[ssl]) + return 0 + + self._tlsext_servername_callback = _ffi.callback( + "int (*)(const SSL *, int *, void *)", wrapper) + _lib.SSL_CTX_set_tlsext_servername_callback( + self._context, self._tlsext_servername_callback) + + @_requires_npn + def set_npn_advertise_callback(self, callback): + """ + Specify a callback function that will be called when offering `Next + Protocol Negotiation + <https://technotes.googlecode.com/git/nextprotoneg.html>`_ as a server. + + :param callback: The callback function. It will be invoked with one + argument, the Connection instance. It should return a list of + bytestrings representing the advertised protocols, like + ``[b'http/1.1', b'spdy/2']``. + """ + self._npn_advertise_helper = _NpnAdvertiseHelper(callback) + self._npn_advertise_callback = self._npn_advertise_helper.callback + _lib.SSL_CTX_set_next_protos_advertised_cb( + self._context, self._npn_advertise_callback, _ffi.NULL) + + @_requires_npn + def set_npn_select_callback(self, callback): + """ + Specify a callback function that will be called when a server offers + Next Protocol Negotiation options. + + :param callback: The callback function. It will be invoked with two + arguments: the Connection, and a list of offered protocols as + bytestrings, e.g. ``[b'http/1.1', b'spdy/2']``. It should return + one of those bytestrings, the chosen protocol. + """ + self._npn_select_helper = _NpnSelectHelper(callback) + self._npn_select_callback = self._npn_select_helper.callback + _lib.SSL_CTX_set_next_proto_select_cb( + self._context, self._npn_select_callback, _ffi.NULL) + + @_requires_alpn + def set_alpn_protos(self, protos): + """ + Specify the clients ALPN protocol list. + + These protocols are offered to the server during protocol negotiation. + + :param protos: A list of the protocols to be offered to the server. + This list should be a Python list of bytestrings representing the + protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. + """ + # Take the list of protocols and join them together, prefixing them + # with their lengths. + protostr = b''.join( + chain.from_iterable((int2byte(len(p)), p) for p in protos) + ) + + # Build a C string from the list. We don't need to save this off + # because OpenSSL immediately copies the data out. + input_str = _ffi.new("unsigned char[]", protostr) + input_str_len = _ffi.cast("unsigned", len(protostr)) + _lib.SSL_CTX_set_alpn_protos(self._context, input_str, input_str_len) + + @_requires_alpn + def set_alpn_select_callback(self, callback): + """ + Set the callback to handle ALPN protocol choice. + + :param callback: The callback function. It will be invoked with two + arguments: the Connection, and a list of offered protocols as + bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It should return + one of those bytestrings, the chosen protocol. + """ + self._alpn_select_helper = _ALPNSelectHelper(callback) + self._alpn_select_callback = self._alpn_select_helper.callback + _lib.SSL_CTX_set_alpn_select_cb( + self._context, self._alpn_select_callback, _ffi.NULL) + +ContextType = Context + + +class Connection(object): + """ + """ + _reverse_mapping = WeakValueDictionary() + + def __init__(self, context, socket=None): + """ + Create a new Connection object, using the given OpenSSL.SSL.Context + instance and socket. + + :param context: An SSL Context to use for this connection + :param socket: The socket to use for transport layer + """ + if not isinstance(context, Context): + raise TypeError("context must be a Context instance") + + ssl = _lib.SSL_new(context._context) + self._ssl = _ffi.gc(ssl, _lib.SSL_free) + self._context = context + + # References to strings used for Next Protocol Negotiation. OpenSSL's + # header files suggest that these might get copied at some point, but + # doesn't specify when, so we store them here to make sure they don't + # get freed before OpenSSL uses them. + self._npn_advertise_callback_args = None + self._npn_select_callback_args = None + + # References to strings used for Application Layer Protocol + # Negotiation. These strings get copied at some point but it's well + # after the callback returns, so we have to hang them somewhere to + # avoid them getting freed. + self._alpn_select_callback_args = None + + self._reverse_mapping[self._ssl] = self + + if socket is None: + self._socket = None + # Don't set up any gc for these, SSL_free will take care of them. + self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem()) + self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem()) + + if self._into_ssl == _ffi.NULL or self._from_ssl == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl) + else: + self._into_ssl = None + self._from_ssl = None + self._socket = socket + set_result = _lib.SSL_set_fd( + self._ssl, _asFileDescriptor(self._socket)) + if not set_result: + # TODO: This is untested. + _raise_current_error() + + def __getattr__(self, name): + """ + Look up attributes on the wrapped socket object if they are not found + on the Connection object. + """ + if self._socket is None: + raise AttributeError("'%s' object has no attribute '%s'" % ( + self.__class__.__name__, name + )) + else: + return getattr(self._socket, name) + + def _raise_ssl_error(self, ssl, result): + if self._context._verify_helper is not None: + self._context._verify_helper.raise_if_problem() + if self._context._npn_advertise_helper is not None: + self._context._npn_advertise_helper.raise_if_problem() + if self._context._npn_select_helper is not None: + self._context._npn_select_helper.raise_if_problem() + if self._context._alpn_select_helper is not None: + self._context._alpn_select_helper.raise_if_problem() + + error = _lib.SSL_get_error(ssl, result) + if error == _lib.SSL_ERROR_WANT_READ: + raise WantReadError() + elif error == _lib.SSL_ERROR_WANT_WRITE: + raise WantWriteError() + elif error == _lib.SSL_ERROR_ZERO_RETURN: + raise ZeroReturnError() + elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP: + # TODO: This is untested. + raise WantX509LookupError() + elif error == _lib.SSL_ERROR_SYSCALL: + if _lib.ERR_peek_error() == 0: + if result < 0: + if platform == "win32": + errno = _ffi.getwinerror()[0] + else: + errno = _ffi.errno + raise SysCallError(errno, errorcode.get(errno)) + else: + raise SysCallError(-1, "Unexpected EOF") + else: + # TODO: This is untested. + _raise_current_error() + elif error == _lib.SSL_ERROR_NONE: + pass + else: + _raise_current_error() + + def get_context(self): + """ + Get session context + """ + return self._context + + def set_context(self, context): + """ + Switch this connection to a new session context + + :param context: A :py:class:`Context` instance giving the new session + context to use. + """ + if not isinstance(context, Context): + raise TypeError("context must be a Context instance") + + _lib.SSL_set_SSL_CTX(self._ssl, context._context) + self._context = context + + def get_servername(self): + """ + Retrieve the servername extension value if provided in the client hello + message, or None if there wasn't one. + + :return: A byte string giving the server name or :py:data:`None`. + """ + name = _lib.SSL_get_servername( + self._ssl, _lib.TLSEXT_NAMETYPE_host_name + ) + if name == _ffi.NULL: + return None + + return _ffi.string(name) + + def set_tlsext_host_name(self, name): + """ + Set the value of the servername extension to send in the client hello. + + :param name: A byte string giving the name. + """ + if not isinstance(name, bytes): + raise TypeError("name must be a byte string") + elif b"\0" in name: + raise TypeError("name must not contain NUL byte") + + # XXX I guess this can fail sometimes? + _lib.SSL_set_tlsext_host_name(self._ssl, name) + + def pending(self): + """ + Get the number of bytes that can be safely read from the connection + + :return: The number of bytes available in the receive buffer. + """ + return _lib.SSL_pending(self._ssl) + + def send(self, buf, flags=0): + """ + Send data on the connection. NOTE: If you get one of the WantRead, + WantWrite or WantX509Lookup exceptions on this, you have to call the + method again with the SAME buffer. + + :param buf: The string, buffer or memoryview to send + :param flags: (optional) Included for compatibility with the socket + API, the value is ignored + :return: The number of bytes written + """ + # Backward compatibility + buf = _text_to_bytes_and_warn("buf", buf) + + if isinstance(buf, _memoryview): + buf = buf.tobytes() + if isinstance(buf, _buffer): + buf = str(buf) + if not isinstance(buf, bytes): + raise TypeError("data must be a memoryview, buffer or byte string") + + result = _lib.SSL_write(self._ssl, buf, len(buf)) + self._raise_ssl_error(self._ssl, result) + return result + write = send + + def sendall(self, buf, flags=0): + """ + Send "all" data on the connection. This calls send() repeatedly until + all data is sent. If an error occurs, it's impossible to tell how much + data has been sent. + + :param buf: The string, buffer or memoryview to send + :param flags: (optional) Included for compatibility with the socket + API, the value is ignored + :return: The number of bytes written + """ + buf = _text_to_bytes_and_warn("buf", buf) + + if isinstance(buf, _memoryview): + buf = buf.tobytes() + if isinstance(buf, _buffer): + buf = str(buf) + if not isinstance(buf, bytes): + raise TypeError("buf must be a memoryview, buffer or byte string") + + left_to_send = len(buf) + total_sent = 0 + data = _ffi.new("char[]", buf) + + while left_to_send: + result = _lib.SSL_write(self._ssl, data + total_sent, left_to_send) + self._raise_ssl_error(self._ssl, result) + total_sent += result + left_to_send -= result + + def recv(self, bufsiz, flags=None): + """ + Receive data on the connection. NOTE: If you get one of the WantRead, + WantWrite or WantX509Lookup exceptions on this, you have to call the + method again with the SAME buffer. + + :param bufsiz: The maximum number of bytes to read + :param flags: (optional) The only supported flag is ``MSG_PEEK``, + all other flags are ignored. + :return: The string read from the Connection + """ + buf = _ffi.new("char[]", bufsiz) + if flags is not None and flags & socket.MSG_PEEK: + result = _lib.SSL_peek(self._ssl, buf, bufsiz) + else: + result = _lib.SSL_read(self._ssl, buf, bufsiz) + self._raise_ssl_error(self._ssl, result) + return _ffi.buffer(buf, result)[:] + read = recv + + def recv_into(self, buffer, nbytes=None, flags=None): + """ + Receive data on the connection and store the data into a buffer rather + than creating a new string. + + :param buffer: The buffer to copy into. + :param nbytes: (optional) The maximum number of bytes to read into the + buffer. If not present, defaults to the size of the buffer. If + larger than the size of the buffer, is reduced to the size of the + buffer. + :param flags: (optional) The only supported flag is ``MSG_PEEK``, + all other flags are ignored. + :return: The number of bytes read into the buffer. + """ + if nbytes is None: + nbytes = len(buffer) + else: + nbytes = min(nbytes, len(buffer)) + + # We need to create a temporary buffer. This is annoying, it would be + # better if we could pass memoryviews straight into the SSL_read call, + # but right now we can't. Revisit this if CFFI gets that ability. + buf = _ffi.new("char[]", nbytes) + if flags is not None and flags & socket.MSG_PEEK: + result = _lib.SSL_peek(self._ssl, buf, nbytes) + else: + result = _lib.SSL_read(self._ssl, buf, nbytes) + self._raise_ssl_error(self._ssl, result) + + # This strange line is all to avoid a memory copy. The buffer protocol + # should allow us to assign a CFFI buffer to the LHS of this line, but + # on CPython 3.3+ that segfaults. As a workaround, we can temporarily + # wrap it in a memoryview, except on Python 2.6 which doesn't have a + # memoryview type. + try: + buffer[:result] = memoryview(_ffi.buffer(buf, result)) + except NameError: + buffer[:result] = _ffi.buffer(buf, result) + + return result + + def _handle_bio_errors(self, bio, result): + if _lib.BIO_should_retry(bio): + if _lib.BIO_should_read(bio): + raise WantReadError() + elif _lib.BIO_should_write(bio): + # TODO: This is untested. + raise WantWriteError() + elif _lib.BIO_should_io_special(bio): + # TODO: This is untested. I think io_special means the socket + # BIO has a not-yet connected socket. + raise ValueError("BIO_should_io_special") + else: + # TODO: This is untested. + raise ValueError("unknown bio failure") + else: + # TODO: This is untested. + _raise_current_error() + + def bio_read(self, bufsiz): + """ + When using non-socket connections this function reads the "dirty" data + that would have traveled away on the network. + + :param bufsiz: The maximum number of bytes to read + :return: The string read. + """ + if self._from_ssl is None: + raise TypeError("Connection sock was not None") + + if not isinstance(bufsiz, integer_types): + raise TypeError("bufsiz must be an integer") + + buf = _ffi.new("char[]", bufsiz) + result = _lib.BIO_read(self._from_ssl, buf, bufsiz) + if result <= 0: + self._handle_bio_errors(self._from_ssl, result) + + return _ffi.buffer(buf, result)[:] + + def bio_write(self, buf): + """ + When using non-socket connections this function sends "dirty" data that + would have traveled in on the network. + + :param buf: The string to put into the memory BIO. + :return: The number of bytes written + """ + buf = _text_to_bytes_and_warn("buf", buf) + + if self._into_ssl is None: + raise TypeError("Connection sock was not None") + + result = _lib.BIO_write(self._into_ssl, buf, len(buf)) + if result <= 0: + self._handle_bio_errors(self._into_ssl, result) + return result + + def renegotiate(self): + """ + Renegotiate the session + + :return: True if the renegotiation can be started, false otherwise + """ + + def do_handshake(self): + """ + Perform an SSL handshake (usually called after renegotiate() or one of + set_*_state()). This can raise the same exceptions as send and recv. + + :return: None. + """ + result = _lib.SSL_do_handshake(self._ssl) + self._raise_ssl_error(self._ssl, result) + + def renegotiate_pending(self): + """ + Check if there's a renegotiation in progress, it will return false once + a renegotiation is finished. + + :return: Whether there's a renegotiation in progress + """ + + def total_renegotiations(self): + """ + Find out the total number of renegotiations. + + :return: The number of renegotiations. + """ + return _lib.SSL_total_renegotiations(self._ssl) + + def connect(self, addr): + """ + Connect to remote host and set up client-side SSL + + :param addr: A remote address + :return: What the socket's connect method returns + """ + _lib.SSL_set_connect_state(self._ssl) + return self._socket.connect(addr) + + def connect_ex(self, addr): + """ + Connect to remote host and set up client-side SSL. Note that if the + socket's connect_ex method doesn't return 0, SSL won't be initialized. + + :param addr: A remove address + :return: What the socket's connect_ex method returns + """ + connect_ex = self._socket.connect_ex + self.set_connect_state() + return connect_ex(addr) + + def accept(self): + """ + Accept incoming connection and set up SSL on it + + :return: A (conn,addr) pair where conn is a Connection and addr is an + address + """ + client, addr = self._socket.accept() + conn = Connection(self._context, client) + conn.set_accept_state() + return (conn, addr) + + def bio_shutdown(self): + """ + When using non-socket connections this function signals end of + data on the input for this connection. + + :return: None + """ + if self._from_ssl is None: + raise TypeError("Connection sock was not None") + + _lib.BIO_set_mem_eof_return(self._into_ssl, 0) + + def shutdown(self): + """ + Send closure alert + + :return: True if the shutdown completed successfully (i.e. both sides + have sent closure alerts), false otherwise (i.e. you have to + wait for a ZeroReturnError on a recv() method call + """ + result = _lib.SSL_shutdown(self._ssl) + if result < 0: + self._raise_ssl_error(self._ssl, result) + elif result > 0: + return True + else: + return False + + def get_cipher_list(self): + """ + Get the session cipher list + + :return: A list of cipher strings + """ + ciphers = [] + for i in count(): + result = _lib.SSL_get_cipher_list(self._ssl, i) + if result == _ffi.NULL: + break + ciphers.append(_native(_ffi.string(result))) + return ciphers + + def get_client_ca_list(self): + """ + Get CAs whose certificates are suggested for client authentication. + + :return: If this is a server connection, a list of X509Names + representing the acceptable CAs as set by + :py:meth:`OpenSSL.SSL.Context.set_client_ca_list` or + :py:meth:`OpenSSL.SSL.Context.add_client_ca`. If this is a client + connection, the list of such X509Names sent by the server, or an + empty list if that has not yet happened. + """ + ca_names = _lib.SSL_get_client_CA_list(self._ssl) + if ca_names == _ffi.NULL: + # TODO: This is untested. + return [] + + result = [] + for i in range(_lib.sk_X509_NAME_num(ca_names)): + name = _lib.sk_X509_NAME_value(ca_names, i) + copy = _lib.X509_NAME_dup(name) + if copy == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + pyname = X509Name.__new__(X509Name) + pyname._name = _ffi.gc(copy, _lib.X509_NAME_free) + result.append(pyname) + return result + + def makefile(self): + """ + The makefile() method is not implemented, since there is no dup + semantics for SSL connections + + :raise: NotImplementedError + """ + raise NotImplementedError( + "Cannot make file object of OpenSSL.SSL.Connection") + + def get_app_data(self): + """ + Get application data + + :return: The application data + """ + return self._app_data + + def set_app_data(self, data): + """ + Set application data + + :param data - The application data + :return: None + """ + self._app_data = data + + def get_shutdown(self): + """ + Get shutdown state + + :return: The shutdown state, a bitvector of SENT_SHUTDOWN, + RECEIVED_SHUTDOWN. + """ + return _lib.SSL_get_shutdown(self._ssl) + + def set_shutdown(self, state): + """ + Set shutdown state + + :param state - bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN. + :return: None + """ + if not isinstance(state, integer_types): + raise TypeError("state must be an integer") + + _lib.SSL_set_shutdown(self._ssl, state) + + def state_string(self): + """ + Get a verbose state description + + :return: A string representing the state + """ + return _ffi.string(_lib.SSL_state_string_long(self._ssl)) + + def server_random(self): + """ + Get a copy of the server hello nonce. + + :return: A string representing the state + """ + if self._ssl.session == _ffi.NULL: + return None + return _ffi.buffer( + self._ssl.s3.server_random, + _lib.SSL3_RANDOM_SIZE)[:] + + def client_random(self): + """ + Get a copy of the client hello nonce. + + :return: A string representing the state + """ + if self._ssl.session == _ffi.NULL: + return None + return _ffi.buffer( + self._ssl.s3.client_random, + _lib.SSL3_RANDOM_SIZE)[:] + + def master_key(self): + """ + Get a copy of the master key. + + :return: A string representing the state + """ + if self._ssl.session == _ffi.NULL: + return None + return _ffi.buffer( + self._ssl.session.master_key, + self._ssl.session.master_key_length)[:] + + def sock_shutdown(self, *args, **kwargs): + """ + See shutdown(2) + + :return: What the socket's shutdown() method returns + """ + return self._socket.shutdown(*args, **kwargs) + + def get_peer_certificate(self): + """ + Retrieve the other side's certificate (if any) + + :return: The peer's certificate + """ + cert = _lib.SSL_get_peer_certificate(self._ssl) + if cert != _ffi.NULL: + pycert = X509.__new__(X509) + pycert._x509 = _ffi.gc(cert, _lib.X509_free) + return pycert + return None + + def get_peer_cert_chain(self): + """ + Retrieve the other side's certificate (if any) + + :return: A list of X509 instances giving the peer's certificate chain, + or None if it does not have one. + """ + cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl) + if cert_stack == _ffi.NULL: + return None + + result = [] + for i in range(_lib.sk_X509_num(cert_stack)): + # TODO could incref instead of dup here + cert = _lib.X509_dup(_lib.sk_X509_value(cert_stack, i)) + pycert = X509.__new__(X509) + pycert._x509 = _ffi.gc(cert, _lib.X509_free) + result.append(pycert) + return result + + def want_read(self): + """ + Checks if more data has to be read from the transport layer to complete + an operation. + + :return: True iff more data has to be read + """ + return _lib.SSL_want_read(self._ssl) + + def want_write(self): + """ + Checks if there is data to write to the transport layer to complete an + operation. + + :return: True iff there is data to write + """ + return _lib.SSL_want_write(self._ssl) + + def set_accept_state(self): + """ + Set the connection to work in server mode. The handshake will be + handled automatically by read/write. + + :return: None + """ + _lib.SSL_set_accept_state(self._ssl) + + def set_connect_state(self): + """ + Set the connection to work in client mode. The handshake will be + handled automatically by read/write. + + :return: None + """ + _lib.SSL_set_connect_state(self._ssl) + + def get_session(self): + """ + Returns the Session currently used. + + @return: An instance of :py:class:`OpenSSL.SSL.Session` or + :py:obj:`None` if no session exists. + """ + session = _lib.SSL_get1_session(self._ssl) + if session == _ffi.NULL: + return None + + pysession = Session.__new__(Session) + pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free) + return pysession + + def set_session(self, session): + """ + Set the session to be used when the TLS/SSL connection is established. + + :param session: A Session instance representing the session to use. + :returns: None + """ + if not isinstance(session, Session): + raise TypeError("session must be a Session instance") + + result = _lib.SSL_set_session(self._ssl, session._session) + if not result: + _raise_current_error() + + def _get_finished_message(self, function): + """ + Helper to implement :py:meth:`get_finished` and + :py:meth:`get_peer_finished`. + + :param function: Either :py:data:`SSL_get_finished`: or + :py:data:`SSL_get_peer_finished`. + + :return: :py:data:`None` if the desired message has not yet been + received, otherwise the contents of the message. + :rtype: :py:class:`bytes` or :py:class:`NoneType` + """ + # The OpenSSL documentation says nothing about what might happen if the + # count argument given is zero. Specifically, it doesn't say whether + # the output buffer may be NULL in that case or not. Inspection of the + # implementation reveals that it calls memcpy() unconditionally. + # Section 7.1.4, paragraph 1 of the C standard suggests that + # memcpy(NULL, source, 0) is not guaranteed to produce defined (let + # alone desirable) behavior (though it probably does on just about + # every implementation...) + # + # Allocate a tiny buffer to pass in (instead of just passing NULL as + # one might expect) for the initial call so as to be safe against this + # potentially undefined behavior. + empty = _ffi.new("char[]", 0) + size = function(self._ssl, empty, 0) + if size == 0: + # No Finished message so far. + return None + + buf = _ffi.new("char[]", size) + function(self._ssl, buf, size) + return _ffi.buffer(buf, size)[:] + + def get_finished(self): + """ + Obtain the latest `handshake finished` message sent to the peer. + + :return: The contents of the message or :py:obj:`None` if the TLS + handshake has not yet completed. + :rtype: :py:class:`bytes` or :py:class:`NoneType` + """ + return self._get_finished_message(_lib.SSL_get_finished) + + def get_peer_finished(self): + """ + Obtain the latest `handshake finished` message received from the peer. + + :return: The contents of the message or :py:obj:`None` if the TLS + handshake has not yet completed. + :rtype: :py:class:`bytes` or :py:class:`NoneType` + """ + return self._get_finished_message(_lib.SSL_get_peer_finished) + + def get_cipher_name(self): + """ + Obtain the name of the currently used cipher. + + :returns: The name of the currently used cipher or :py:obj:`None` + if no connection has been established. + :rtype: :py:class:`unicode` or :py:class:`NoneType` + """ + cipher = _lib.SSL_get_current_cipher(self._ssl) + if cipher == _ffi.NULL: + return None + else: + name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher)) + return name.decode("utf-8") + + def get_cipher_bits(self): + """ + Obtain the number of secret bits of the currently used cipher. + + :returns: The number of secret bits of the currently used cipher + or :py:obj:`None` if no connection has been established. + :rtype: :py:class:`int` or :py:class:`NoneType` + """ + cipher = _lib.SSL_get_current_cipher(self._ssl) + if cipher == _ffi.NULL: + return None + else: + return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL) + + def get_cipher_version(self): + """ + Obtain the protocol version of the currently used cipher. + + :returns: The protocol name of the currently used cipher + or :py:obj:`None` if no connection has been established. + :rtype: :py:class:`unicode` or :py:class:`NoneType` + """ + cipher = _lib.SSL_get_current_cipher(self._ssl) + if cipher == _ffi.NULL: + return None + else: + version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher)) + return version.decode("utf-8") + + def get_protocol_version_name(self): + """ + Obtain the protocol version of the current connection. + + :returns: The TLS version of the current connection, for example + the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown`` + for connections that were not successfully established. + :rtype: :py:class:`unicode` + """ + version = _ffi.string(_lib.SSL_get_version(self._ssl)) + return version.decode("utf-8") + + def get_protocol_version(self): + """ + Obtain the protocol version of the current connection. + + :returns: The TLS version of the current connection, for example + the value for TLS 1 would be 0x769. + :rtype: :py:class:`int` + """ + version = _lib.SSL_version(self._ssl) + return version + + @_requires_npn + def get_next_proto_negotiated(self): + """ + Get the protocol that was negotiated by NPN. + """ + data = _ffi.new("unsigned char **") + data_len = _ffi.new("unsigned int *") + + _lib.SSL_get0_next_proto_negotiated(self._ssl, data, data_len) + + return _ffi.buffer(data[0], data_len[0])[:] + + @_requires_alpn + def set_alpn_protos(self, protos): + """ + Specify the client's ALPN protocol list. + + These protocols are offered to the server during protocol negotiation. + + :param protos: A list of the protocols to be offered to the server. + This list should be a Python list of bytestrings representing the + protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``. + """ + # Take the list of protocols and join them together, prefixing them + # with their lengths. + protostr = b''.join( + chain.from_iterable((int2byte(len(p)), p) for p in protos) + ) + + # Build a C string from the list. We don't need to save this off + # because OpenSSL immediately copies the data out. + input_str = _ffi.new("unsigned char[]", protostr) + input_str_len = _ffi.cast("unsigned", len(protostr)) + _lib.SSL_set_alpn_protos(self._ssl, input_str, input_str_len) + + @_requires_alpn + def get_alpn_proto_negotiated(self): + """ + Get the protocol that was negotiated by ALPN. + """ + data = _ffi.new("unsigned char **") + data_len = _ffi.new("unsigned int *") + + _lib.SSL_get0_alpn_selected(self._ssl, data, data_len) + + if not data_len: + return b'' + + return _ffi.buffer(data[0], data_len[0])[:] + + +ConnectionType = Connection + +# This is similar to the initialization calls at the end of OpenSSL/crypto.py +# but is exercised mostly by the Context initializer. +_lib.SSL_library_init() diff --git a/src/OpenSSL/__init__.py b/src/OpenSSL/__init__.py new file mode 100644 index 0000000..3c80e54 --- /dev/null +++ b/src/OpenSSL/__init__.py @@ -0,0 +1,20 @@ +# Copyright (C) AB Strakt +# See LICENSE for details. + +""" +pyOpenSSL - A simple wrapper around the OpenSSL library +""" + +from OpenSSL import rand, crypto, SSL +from OpenSSL.version import ( + __author__, __copyright__, __email__, __license__, __summary__, __title__, + __uri__, __version__, +) + + +__all__ = [ + "SSL", "crypto", "rand", "tsafe", + + "__author__", "__copyright__", "__email__", "__license__", "__summary__", + "__title__", "__uri__", "__version__", +] diff --git a/src/OpenSSL/_util.py b/src/OpenSSL/_util.py new file mode 100644 index 0000000..074ef3d --- /dev/null +++ b/src/OpenSSL/_util.py @@ -0,0 +1,125 @@ +from warnings import warn +import sys + +from six import PY3, binary_type, text_type + +from cryptography.hazmat.bindings.openssl.binding import Binding +binding = Binding() +binding.init_static_locks() +ffi = binding.ffi +lib = binding.lib + + +def text(charp): + """ + Get a native string type representing of the given CFFI ``char*`` object. + + :param charp: A C-style string represented using CFFI. + + :return: :class:`str` + """ + if not charp: + return "" + return native(ffi.string(charp)) + + +def exception_from_error_queue(exception_type): + """ + Convert an OpenSSL library failure into a Python exception. + + When a call to the native OpenSSL library fails, this is usually signalled + by the return value, and an error code is stored in an error queue + associated with the current thread. The err library provides functions to + obtain these error codes and textual error messages. + """ + + errors = [] + + while True: + error = lib.ERR_get_error() + if error == 0: + break + errors.append(( + text(lib.ERR_lib_error_string(error)), + text(lib.ERR_func_error_string(error)), + text(lib.ERR_reason_error_string(error)))) + + raise exception_type(errors) + + +def native(s): + """ + Convert :py:class:`bytes` or :py:class:`unicode` to the native + :py:class:`str` type, using UTF-8 encoding if conversion is necessary. + + :raise UnicodeError: The input string is not UTF-8 decodeable. + + :raise TypeError: The input is neither :py:class:`bytes` nor + :py:class:`unicode`. + """ + if not isinstance(s, (binary_type, text_type)): + raise TypeError("%r is neither bytes nor unicode" % s) + if PY3: + if isinstance(s, binary_type): + return s.decode("utf-8") + else: + if isinstance(s, text_type): + return s.encode("utf-8") + return s + + +def path_string(s): + """ + Convert a Python string to a :py:class:`bytes` string identifying the same + path and which can be passed into an OpenSSL API accepting a filename. + + :param s: An instance of :py:class:`bytes` or :py:class:`unicode`. + + :return: An instance of :py:class:`bytes`. + """ + if isinstance(s, binary_type): + return s + elif isinstance(s, text_type): + return s.encode(sys.getfilesystemencoding()) + else: + raise TypeError("Path must be represented as bytes or unicode string") + + +if PY3: + def byte_string(s): + return s.encode("charmap") +else: + def byte_string(s): + return s + + +# A marker object to observe whether some optional arguments are passed any +# value or not. +UNSPECIFIED = object() + +_TEXT_WARNING = ( + text_type.__name__ + " for {0} is no longer accepted, use bytes" +) + + +def text_to_bytes_and_warn(label, obj): + """ + If ``obj`` is text, emit a warning that it should be bytes instead and try + to convert it to bytes automatically. + + :param str label: The name of the parameter from which ``obj`` was taken + (so a developer can easily find the source of the problem and correct + it). + + :return: If ``obj`` is the text string type, a ``bytes`` object giving the + UTF-8 encoding of that text is returned. Otherwise, ``obj`` itself is + returned. + """ + if isinstance(obj, text_type): + warn( + _TEXT_WARNING.format(label), + category=DeprecationWarning, + stacklevel=3 + ) + return obj.encode('utf-8') + return obj diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py new file mode 100644 index 0000000..132d98d --- /dev/null +++ b/src/OpenSSL/crypto.py @@ -0,0 +1,2741 @@ +from time import time +from base64 import b16encode +from functools import partial +from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ +from warnings import warn as _warn + +from six import ( + integer_types as _integer_types, + text_type as _text_type, + PY3 as _PY3) + +from OpenSSL._util import ( + ffi as _ffi, + lib as _lib, + exception_from_error_queue as _exception_from_error_queue, + byte_string as _byte_string, + native as _native, + UNSPECIFIED as _UNSPECIFIED, + text_to_bytes_and_warn as _text_to_bytes_and_warn, +) + +FILETYPE_PEM = _lib.SSL_FILETYPE_PEM +FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1 + +# TODO This was an API mistake. OpenSSL has no such constant. +FILETYPE_TEXT = 2 ** 16 - 1 + +TYPE_RSA = _lib.EVP_PKEY_RSA +TYPE_DSA = _lib.EVP_PKEY_DSA + + +class Error(Exception): + """ + An error occurred in an `OpenSSL.crypto` API. + """ + + +_raise_current_error = partial(_exception_from_error_queue, Error) + + +def _untested_error(where): + """ + An OpenSSL API failed somehow. Additionally, the failure which was + encountered isn't one that's exercised by the test suite so future behavior + of pyOpenSSL is now somewhat less predictable. + """ + raise RuntimeError("Unknown %s failure" % (where,)) + + +def _new_mem_buf(buffer=None): + """ + Allocate a new OpenSSL memory BIO. + + Arrange for the garbage collector to clean it up automatically. + + :param buffer: None or some bytes to use to put into the BIO so that they + can be read out. + """ + if buffer is None: + bio = _lib.BIO_new(_lib.BIO_s_mem()) + free = _lib.BIO_free + else: + data = _ffi.new("char[]", buffer) + bio = _lib.BIO_new_mem_buf(data, len(buffer)) + + # Keep the memory alive as long as the bio is alive! + def free(bio, ref=data): + return _lib.BIO_free(bio) + + if bio == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + bio = _ffi.gc(bio, free) + return bio + + +def _bio_to_string(bio): + """ + Copy the contents of an OpenSSL BIO object into a Python byte string. + """ + result_buffer = _ffi.new('char**') + buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) + return _ffi.buffer(result_buffer[0], buffer_length)[:] + + +def _set_asn1_time(boundary, when): + """ + The the time value of an ASN1 time object. + + @param boundary: An ASN1_GENERALIZEDTIME pointer (or an object safely + castable to that type) which will have its value set. + @param when: A string representation of the desired time value. + + @raise TypeError: If C{when} is not a L{bytes} string. + @raise ValueError: If C{when} does not represent a time in the required + format. + @raise RuntimeError: If the time value cannot be set for some other + (unspecified) reason. + """ + if not isinstance(when, bytes): + raise TypeError("when must be a byte string") + + set_result = _lib.ASN1_GENERALIZEDTIME_set_string( + _ffi.cast('ASN1_GENERALIZEDTIME*', boundary), when) + if set_result == 0: + dummy = _ffi.gc(_lib.ASN1_STRING_new(), _lib.ASN1_STRING_free) + _lib.ASN1_STRING_set(dummy, when, len(when)) + check_result = _lib.ASN1_GENERALIZEDTIME_check( + _ffi.cast('ASN1_GENERALIZEDTIME*', dummy)) + if not check_result: + raise ValueError("Invalid string") + else: + _untested_error() + + +def _get_asn1_time(timestamp): + """ + Retrieve the time value of an ASN1 time object. + + @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to + that type) from which the time value will be retrieved. + + @return: The time value from C{timestamp} as a L{bytes} string in a certain + format. Or C{None} if the object contains no time value. + """ + string_timestamp = _ffi.cast('ASN1_STRING*', timestamp) + if _lib.ASN1_STRING_length(string_timestamp) == 0: + return None + elif ( + _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME + ): + return _ffi.string(_lib.ASN1_STRING_data(string_timestamp)) + else: + generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") + _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) + if generalized_timestamp[0] == _ffi.NULL: + # This may happen: + # - if timestamp was not an ASN1_TIME + # - if allocating memory for the ASN1_GENERALIZEDTIME failed + # - if a copy of the time data from timestamp cannot be made for + # the newly allocated ASN1_GENERALIZEDTIME + # + # These are difficult to test. cffi enforces the ASN1_TIME type. + # Memory allocation failures are a pain to trigger + # deterministically. + _untested_error("ASN1_TIME_to_generalizedtime") + else: + string_timestamp = _ffi.cast( + "ASN1_STRING*", generalized_timestamp[0]) + string_data = _lib.ASN1_STRING_data(string_timestamp) + string_result = _ffi.string(string_data) + _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) + return string_result + + +class PKey(object): + """ + A class representing an DSA or RSA public key or key pair. + """ + _only_public = False + _initialized = True + + def __init__(self): + pkey = _lib.EVP_PKEY_new() + self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) + self._initialized = False + + def generate_key(self, type, bits): + """ + Generate a key pair of the given type, with the given number of bits. + + This generates a key "into" the this object. + + :param type: The key type. + :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA` + :param bits: The number of bits. + :type bits: :py:data:`int` ``>= 0`` + :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't + of the appropriate type. + :raises ValueError: If the number of bits isn't an integer of + the appropriate size. + :return: :py:const:`None` + """ + if not isinstance(type, int): + raise TypeError("type must be an integer") + + if not isinstance(bits, int): + raise TypeError("bits must be an integer") + + # TODO Check error return + exponent = _lib.BN_new() + exponent = _ffi.gc(exponent, _lib.BN_free) + _lib.BN_set_word(exponent, _lib.RSA_F4) + + if type == TYPE_RSA: + if bits <= 0: + raise ValueError("Invalid number of bits") + + rsa = _lib.RSA_new() + + result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL) + if result == 0: + # TODO: The test for this case is commented out. Different + # builds of OpenSSL appear to have different failure modes that + # make it hard to test. Visual inspection of the OpenSSL + # source reveals that a return value of 0 signals an error. + # Manual testing on a particular build of OpenSSL suggests that + # this is probably the appropriate way to handle those errors. + _raise_current_error() + + result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa) + if not result: + # TODO: It appears as though this can fail if an engine is in + # use which does not support RSA. + _raise_current_error() + + elif type == TYPE_DSA: + dsa = _lib.DSA_generate_parameters( + bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL) + if dsa == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + if not _lib.DSA_generate_key(dsa): + # TODO: This is untested. + _raise_current_error() + if not _lib.EVP_PKEY_assign_DSA(self._pkey, dsa): + # TODO: This is untested. + _raise_current_error() + else: + raise Error("No such key type") + + self._initialized = True + + def check(self): + """ + Check the consistency of an RSA private key. + + This is the Python equivalent of OpenSSL's ``RSA_check_key``. + + :return: True if key is consistent. + :raise Error: if the key is inconsistent. + :raise TypeError: if the key is of a type which cannot be checked. + Only RSA keys can currently be checked. + """ + if self._only_public: + raise TypeError("public key only") + + if _lib.EVP_PKEY_type(self._pkey.type) != _lib.EVP_PKEY_RSA: + raise TypeError("key type unsupported") + + rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) + rsa = _ffi.gc(rsa, _lib.RSA_free) + result = _lib.RSA_check_key(rsa) + if result: + return True + _raise_current_error() + + def type(self): + """ + Returns the type of the key + + :return: The type of the key. + """ + return self._pkey.type + + def bits(self): + """ + Returns the number of bits of the key + + :return: The number of bits of the key. + """ + return _lib.EVP_PKEY_bits(self._pkey) +PKeyType = PKey + + +class _EllipticCurve(object): + """ + A representation of a supported elliptic curve. + + @cvar _curves: :py:obj:`None` until an attempt is made to load the curves. + Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve` + instances each of which represents one curve supported by the system. + @type _curves: :py:type:`NoneType` or :py:type:`set` + """ + _curves = None + + if _PY3: + # This only necessary on Python 3. Morever, it is broken on Python 2. + def __ne__(self, other): + """ + Implement cooperation with the right-hand side argument of ``!=``. + + Python 3 seems to have dropped this cooperation in this very narrow + circumstance. + """ + if isinstance(other, _EllipticCurve): + return super(_EllipticCurve, self).__ne__(other) + return NotImplemented + + @classmethod + def _load_elliptic_curves(cls, lib): + """ + Get the curves supported by OpenSSL. + + :param lib: The OpenSSL library binding object. + + :return: A :py:type:`set` of ``cls`` instances giving the names of the + elliptic curves the underlying library supports. + """ + if lib.Cryptography_HAS_EC: + num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) + builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves) + # The return value on this call should be num_curves again. We + # could check it to make sure but if it *isn't* then.. what could + # we do? Abort the whole process, I suppose...? -exarkun + lib.EC_get_builtin_curves(builtin_curves, num_curves) + return set( + cls.from_nid(lib, c.nid) + for c in builtin_curves) + return set() + + @classmethod + def _get_elliptic_curves(cls, lib): + """ + Get, cache, and return the curves supported by OpenSSL. + + :param lib: The OpenSSL library binding object. + + :return: A :py:type:`set` of ``cls`` instances giving the names of the + elliptic curves the underlying library supports. + """ + if cls._curves is None: + cls._curves = cls._load_elliptic_curves(lib) + return cls._curves + + @classmethod + def from_nid(cls, lib, nid): + """ + Instantiate a new :py:class:`_EllipticCurve` associated with the given + OpenSSL NID. + + :param lib: The OpenSSL library binding object. + + :param nid: The OpenSSL NID the resulting curve object will represent. + This must be a curve NID (and not, for example, a hash NID) or + subsequent operations will fail in unpredictable ways. + :type nid: :py:class:`int` + + :return: The curve object. + """ + return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) + + def __init__(self, lib, nid, name): + """ + :param _lib: The :py:mod:`cryptography` binding instance used to + interface with OpenSSL. + + :param _nid: The OpenSSL NID identifying the curve this object + represents. + :type _nid: :py:class:`int` + + :param name: The OpenSSL short name identifying the curve this object + represents. + :type name: :py:class:`unicode` + """ + self._lib = lib + self._nid = nid + self.name = name + + def __repr__(self): + return "<Curve %r>" % (self.name,) + + def _to_EC_KEY(self): + """ + Create a new OpenSSL EC_KEY structure initialized to use this curve. + + The structure is automatically garbage collected when the Python object + is garbage collected. + """ + key = self._lib.EC_KEY_new_by_curve_name(self._nid) + return _ffi.gc(key, _lib.EC_KEY_free) + + +def get_elliptic_curves(): + """ + Return a set of objects representing the elliptic curves supported in the + OpenSSL build in use. + + The curve objects have a :py:class:`unicode` ``name`` attribute by which + they identify themselves. + + The curve objects are useful as values for the argument accepted by + :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be + used for ECDHE key exchange. + """ + return _EllipticCurve._get_elliptic_curves(_lib) + + +def get_elliptic_curve(name): + """ + Return a single curve object selected by name. + + See :py:func:`get_elliptic_curves` for information about curve objects. + + :param name: The OpenSSL short name identifying the curve object to + retrieve. + :type name: :py:class:`unicode` + + If the named curve is not supported then :py:class:`ValueError` is raised. + """ + for curve in get_elliptic_curves(): + if curve.name == name: + return curve + raise ValueError("unknown curve name", name) + + +class X509Name(object): + """ + An X.509 Distinguished Name. + + :ivar countryName: The country of the entity. + :ivar C: Alias for :py:attr:`countryName`. + + :ivar stateOrProvinceName: The state or province of the entity. + :ivar ST: Alias for :py:attr:`stateOrProvinceName`. + + :ivar localityName: The locality of the entity. + :ivar L: Alias for :py:attr:`localityName`. + + :ivar organizationName: The organization name of the entity. + :ivar O: Alias for :py:attr:`organizationName`. + + :ivar organizationalUnitName: The organizational unit of the entity. + :ivar OU: Alias for :py:attr:`organizationalUnitName` + + :ivar commonName: The common name of the entity. + :ivar CN: Alias for :py:attr:`commonName`. + + :ivar emailAddress: The e-mail address of the entity. + """ + + def __init__(self, name): + """ + Create a new X509Name, copying the given X509Name instance. + + :param name: The name to copy. + :type name: :py:class:`X509Name` + """ + name = _lib.X509_NAME_dup(name._name) + self._name = _ffi.gc(name, _lib.X509_NAME_free) + + def __setattr__(self, name, value): + if name.startswith('_'): + return super(X509Name, self).__setattr__(name, value) + + # Note: we really do not want str subclasses here, so we do not use + # isinstance. + if type(name) is not str: + raise TypeError("attribute name must be string, not '%.200s'" % ( + type(value).__name__,)) + + nid = _lib.OBJ_txt2nid(_byte_string(name)) + if nid == _lib.NID_undef: + try: + _raise_current_error() + except Error: + pass + raise AttributeError("No such attribute") + + # If there's an old entry for this NID, remove it + for i in range(_lib.X509_NAME_entry_count(self._name)): + ent = _lib.X509_NAME_get_entry(self._name, i) + ent_obj = _lib.X509_NAME_ENTRY_get_object(ent) + ent_nid = _lib.OBJ_obj2nid(ent_obj) + if nid == ent_nid: + ent = _lib.X509_NAME_delete_entry(self._name, i) + _lib.X509_NAME_ENTRY_free(ent) + break + + if isinstance(value, _text_type): + value = value.encode('utf-8') + + add_result = _lib.X509_NAME_add_entry_by_NID( + self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0) + if not add_result: + _raise_current_error() + + def __getattr__(self, name): + """ + Find attribute. An X509Name object has the following attributes: + countryName (alias C), stateOrProvince (alias ST), locality (alias L), + organization (alias O), organizationalUnit (alias OU), commonName + (alias CN) and more... + """ + nid = _lib.OBJ_txt2nid(_byte_string(name)) + if nid == _lib.NID_undef: + # This is a bit weird. OBJ_txt2nid indicated failure, but it seems + # a lower level function, a2d_ASN1_OBJECT, also feels the need to + # push something onto the error queue. If we don't clean that up + # now, someone else will bump into it later and be quite confused. + # See lp#314814. + try: + _raise_current_error() + except Error: + pass + return super(X509Name, self).__getattr__(name) + + entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1) + if entry_index == -1: + return None + + entry = _lib.X509_NAME_get_entry(self._name, entry_index) + data = _lib.X509_NAME_ENTRY_get_data(entry) + + result_buffer = _ffi.new("unsigned char**") + data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data) + if data_length < 0: + # TODO: This is untested. + _raise_current_error() + + try: + result = _ffi.buffer( + result_buffer[0], data_length + )[:].decode('utf-8') + finally: + # XXX untested + _lib.OPENSSL_free(result_buffer[0]) + return result + + def _cmp(op): + def f(self, other): + if not isinstance(other, X509Name): + return NotImplemented + result = _lib.X509_NAME_cmp(self._name, other._name) + return op(result, 0) + return f + + __eq__ = _cmp(__eq__) + __ne__ = _cmp(__ne__) + + __lt__ = _cmp(__lt__) + __le__ = _cmp(__le__) + + __gt__ = _cmp(__gt__) + __ge__ = _cmp(__ge__) + + def __repr__(self): + """ + String representation of an X509Name + """ + result_buffer = _ffi.new("char[]", 512) + format_result = _lib.X509_NAME_oneline( + self._name, result_buffer, len(result_buffer)) + + if format_result == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + return "<X509Name object '%s'>" % ( + _native(_ffi.string(result_buffer)),) + + def hash(self): + """ + Return an integer representation of the first four bytes of the + MD5 digest of the DER representation of the name. + + This is the Python equivalent of OpenSSL's ``X509_NAME_hash``. + + :return: The (integer) hash of this name. + :rtype: :py:class:`int` + """ + return _lib.X509_NAME_hash(self._name) + + def der(self): + """ + Return the DER encoding of this name. + + :return: The DER encoded form of this name. + :rtype: :py:class:`bytes` + """ + result_buffer = _ffi.new('unsigned char**') + encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) + if encode_result < 0: + # TODO: This is untested. + _raise_current_error() + + string_result = _ffi.buffer(result_buffer[0], encode_result)[:] + _lib.OPENSSL_free(result_buffer[0]) + return string_result + + def get_components(self): + """ + Returns the components of this name, as a sequence of 2-tuples. + + :return: The components of this name. + :rtype: :py:class:`list` of ``name, value`` tuples. + """ + result = [] + for i in range(_lib.X509_NAME_entry_count(self._name)): + ent = _lib.X509_NAME_get_entry(self._name, i) + + fname = _lib.X509_NAME_ENTRY_get_object(ent) + fval = _lib.X509_NAME_ENTRY_get_data(ent) + + nid = _lib.OBJ_obj2nid(fname) + name = _lib.OBJ_nid2sn(nid) + + result.append(( + _ffi.string(name), + _ffi.string( + _lib.ASN1_STRING_data(fval), + _lib.ASN1_STRING_length(fval)))) + + return result + + +X509NameType = X509Name + + +class X509Extension(object): + """ + An X.509 v3 certificate extension. + """ + + def __init__(self, type_name, critical, value, subject=None, issuer=None): + """ + Initializes an X509 extension. + + :param type_name: The name of the type of extension to create. See + http://openssl.org/docs/apps/x509v3_config.html#STANDARD_EXTENSIONS + :type type_name: :py:data:`bytes` + + :param bool critical: A flag indicating whether this is a critical + extension. + + :param value: The value of the extension. + :type value: :py:data:`bytes` + + :param subject: Optional X509 certificate to use as subject. + :type subject: :py:class:`X509` + + :param issuer: Optional X509 certificate to use as issuer. + :type issuer: :py:class:`X509` + """ + ctx = _ffi.new("X509V3_CTX*") + + # A context is necessary for any extension which uses the r2i + # conversion method. That is, X509V3_EXT_nconf may segfault if passed + # a NULL ctx. Start off by initializing most of the fields to NULL. + _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0) + + # We have no configuration database - but perhaps we should (some + # extensions may require it). + _lib.X509V3_set_ctx_nodb(ctx) + + # Initialize the subject and issuer, if appropriate. ctx is a local, + # and as far as I can tell none of the X509V3_* APIs invoked here steal + # any references, so no need to mess with reference counts or + # duplicates. + if issuer is not None: + if not isinstance(issuer, X509): + raise TypeError("issuer must be an X509 instance") + ctx.issuer_cert = issuer._x509 + if subject is not None: + if not isinstance(subject, X509): + raise TypeError("subject must be an X509 instance") + ctx.subject_cert = subject._x509 + + if critical: + # There are other OpenSSL APIs which would let us pass in critical + # separately, but they're harder to use, and since value is already + # a pile of crappy junk smuggling a ton of utterly important + # structured data, what's the point of trying to avoid nasty stuff + # with strings? (However, X509V3_EXT_i2d in particular seems like + # it would be a better API to invoke. I do not know where to get + # the ext_struc it desires for its last parameter, though.) + value = b"critical," + value + + extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value) + if extension == _ffi.NULL: + _raise_current_error() + self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) + + @property + def _nid(self): + return _lib.OBJ_obj2nid(self._extension.object) + + _prefixes = { + _lib.GEN_EMAIL: "email", + _lib.GEN_DNS: "DNS", + _lib.GEN_URI: "URI", + } + + def _subjectAltNameString(self): + method = _lib.X509V3_EXT_get(self._extension) + if method == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + payload = self._extension.value.data + length = self._extension.value.length + + payloadptr = _ffi.new("unsigned char**") + payloadptr[0] = payload + + if method.it != _ffi.NULL: + ptr = _lib.ASN1_ITEM_ptr(method.it) + data = _lib.ASN1_item_d2i(_ffi.NULL, payloadptr, length, ptr) + names = _ffi.cast("GENERAL_NAMES*", data) + else: + names = _ffi.cast( + "GENERAL_NAMES*", + method.d2i(_ffi.NULL, payloadptr, length)) + + names = _ffi.gc(names, _lib.GENERAL_NAMES_free) + parts = [] + for i in range(_lib.sk_GENERAL_NAME_num(names)): + name = _lib.sk_GENERAL_NAME_value(names, i) + try: + label = self._prefixes[name.type] + except KeyError: + bio = _new_mem_buf() + _lib.GENERAL_NAME_print(bio, name) + parts.append(_native(_bio_to_string(bio))) + else: + value = _native( + _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]) + parts.append(label + ":" + value) + return ", ".join(parts) + + def __str__(self): + """ + :return: a nice text representation of the extension + """ + if _lib.NID_subject_alt_name == self._nid: + return self._subjectAltNameString() + + bio = _new_mem_buf() + print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0) + if not print_result: + # TODO: This is untested. + _raise_current_error() + + return _native(_bio_to_string(bio)) + + def get_critical(self): + """ + Returns the critical field of this X.509 extension. + + :return: The critical field. + """ + return _lib.X509_EXTENSION_get_critical(self._extension) + + def get_short_name(self): + """ + Returns the short type name of this X.509 extension. + + The result is a byte string such as :py:const:`b"basicConstraints"`. + + :return: The short type name. + :rtype: :py:data:`bytes` + + .. versionadded:: 0.12 + """ + obj = _lib.X509_EXTENSION_get_object(self._extension) + nid = _lib.OBJ_obj2nid(obj) + return _ffi.string(_lib.OBJ_nid2sn(nid)) + + def get_data(self): + """ + Returns the data of the X509 extension, encoded as ASN.1. + + :return: The ASN.1 encoded data of this X509 extension. + :rtype: :py:data:`bytes` + + .. versionadded:: 0.12 + """ + octet_result = _lib.X509_EXTENSION_get_data(self._extension) + string_result = _ffi.cast('ASN1_STRING*', octet_result) + char_result = _lib.ASN1_STRING_data(string_result) + result_length = _lib.ASN1_STRING_length(string_result) + return _ffi.buffer(char_result, result_length)[:] + + +X509ExtensionType = X509Extension + + +class X509Req(object): + """ + An X.509 certificate signing requests. + """ + + def __init__(self): + req = _lib.X509_REQ_new() + self._req = _ffi.gc(req, _lib.X509_REQ_free) + + def set_pubkey(self, pkey): + """ + Set the public key of the certificate signing request. + + :param pkey: The public key to use. + :type pkey: :py:class:`PKey` + + :return: :py:const:`None` + """ + set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) + if not set_result: + # TODO: This is untested. + _raise_current_error() + + def get_pubkey(self): + """ + Get the public key of the certificate signing request. + + :return: The public key. + :rtype: :py:class:`PKey` + """ + pkey = PKey.__new__(PKey) + pkey._pkey = _lib.X509_REQ_get_pubkey(self._req) + if pkey._pkey == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) + pkey._only_public = True + return pkey + + def set_version(self, version): + """ + Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate + request. + + :param int version: The version number. + :return: :py:const:`None` + """ + set_result = _lib.X509_REQ_set_version(self._req, version) + if not set_result: + _raise_current_error() + + def get_version(self): + """ + Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate + request. + + :return: The value of the version subfield. + :rtype: :py:class:`int` + """ + return _lib.X509_REQ_get_version(self._req) + + def get_subject(self): + """ + Return the subject of this certificate signing request. + + This creates a new :py:class:`X509Name`: modifying it does not affect + this request. + + :return: The subject of this certificate signing request. + :rtype: :py:class:`X509Name` + """ + name = X509Name.__new__(X509Name) + name._name = _lib.X509_REQ_get_subject_name(self._req) + if name._name == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + # The name is owned by the X509Req structure. As long as the X509Name + # Python object is alive, keep the X509Req Python object alive. + name._owner = self + + return name + + def add_extensions(self, extensions): + """ + Add extensions to the certificate signing request. + + :param extensions: The X.509 extensions to add. + :type extensions: iterable of :py:class:`X509Extension` + :return: :py:const:`None` + """ + stack = _lib.sk_X509_EXTENSION_new_null() + if stack == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free) + + for ext in extensions: + if not isinstance(ext, X509Extension): + raise ValueError("One of the elements is not an X509Extension") + + # TODO push can fail (here and elsewhere) + _lib.sk_X509_EXTENSION_push(stack, ext._extension) + + add_result = _lib.X509_REQ_add_extensions(self._req, stack) + if not add_result: + # TODO: This is untested. + _raise_current_error() + + def get_extensions(self): + """ + Get X.509 extensions in the certificate signing request. + + :return: The X.509 extensions in this request. + :rtype: :py:class:`list` of :py:class:`X509Extension` objects. + + .. versionadded:: 0.15 + """ + exts = [] + native_exts_obj = _lib.X509_REQ_get_extensions(self._req) + for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)): + ext = X509Extension.__new__(X509Extension) + ext._extension = _lib.sk_X509_EXTENSION_value(native_exts_obj, i) + exts.append(ext) + return exts + + def sign(self, pkey, digest): + """ + Sign the certificate signing request with this key and digest type. + + :param pkey: The key pair to sign with. + :type pkey: :py:class:`PKey` + :param digest: The name of the message digest to use for the signature, + e.g. :py:data:`b"sha1"`. + :type digest: :py:class:`bytes` + :return: :py:const:`None` + """ + if pkey._only_public: + raise ValueError("Key has only public part") + + if not pkey._initialized: + raise ValueError("Key is uninitialized") + + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) + if digest_obj == _ffi.NULL: + raise ValueError("No such digest method") + + sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) + if not sign_result: + # TODO: This is untested. + _raise_current_error() + + def verify(self, pkey): + """ + Verifies the signature on this certificate signing request. + + :param key: A public key. + :type key: :py:class:`PKey` + :return: :py:data:`True` if the signature is correct. + :rtype: :py:class:`bool` + :raises Error: If the signature is invalid or there is a + problem verifying the signature. + """ + if not isinstance(pkey, PKey): + raise TypeError("pkey must be a PKey instance") + + result = _lib.X509_REQ_verify(self._req, pkey._pkey) + if result <= 0: + _raise_current_error() + + return result + + +X509ReqType = X509Req + + +class X509(object): + """ + An X.509 certificate. + """ + + def __init__(self): + # TODO Allocation failure? And why not __new__ instead of __init__? + x509 = _lib.X509_new() + self._x509 = _ffi.gc(x509, _lib.X509_free) + + def set_version(self, version): + """ + Set the version number of the certificate. + + :param version: The version number of the certificate. + :type version: :py:class:`int` + + :return: :py:const:`None` + """ + if not isinstance(version, int): + raise TypeError("version must be an integer") + + _lib.X509_set_version(self._x509, version) + + def get_version(self): + """ + Return the version number of the certificate. + + :return: The version number of the certificate. + :rtype: :py:class:`int` + """ + return _lib.X509_get_version(self._x509) + + def get_pubkey(self): + """ + Get the public key of the certificate. + + :return: The public key. + :rtype: :py:class:`PKey` + """ + pkey = PKey.__new__(PKey) + pkey._pkey = _lib.X509_get_pubkey(self._x509) + if pkey._pkey == _ffi.NULL: + _raise_current_error() + pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) + pkey._only_public = True + return pkey + + def set_pubkey(self, pkey): + """ + Set the public key of the certificate. + + :param pkey: The public key. + :type pkey: :py:class:`PKey` + + :return: :py:data:`None` + """ + if not isinstance(pkey, PKey): + raise TypeError("pkey must be a PKey instance") + + set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) + if not set_result: + _raise_current_error() + + def sign(self, pkey, digest): + """ + Sign the certificate with this key and digest type. + + :param pkey: The key to sign with. + :type pkey: :py:class:`PKey` + + :param digest: The name of the message digest to use. + :type digest: :py:class:`bytes` + + :return: :py:data:`None` + """ + if not isinstance(pkey, PKey): + raise TypeError("pkey must be a PKey instance") + + if pkey._only_public: + raise ValueError("Key only has public part") + + if not pkey._initialized: + raise ValueError("Key is uninitialized") + + evp_md = _lib.EVP_get_digestbyname(_byte_string(digest)) + if evp_md == _ffi.NULL: + raise ValueError("No such digest method") + + sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) + if not sign_result: + _raise_current_error() + + def get_signature_algorithm(self): + """ + Return the signature algorithm used in the certificate. + + :return: The name of the algorithm. + :rtype: :py:class:`bytes` + + :raises ValueError: If the signature algorithm is undefined. + + .. versionadded:: 0.13 + """ + alg = self._x509.cert_info.signature.algorithm + nid = _lib.OBJ_obj2nid(alg) + if nid == _lib.NID_undef: + raise ValueError("Undefined signature algorithm") + return _ffi.string(_lib.OBJ_nid2ln(nid)) + + def digest(self, digest_name): + """ + Return the digest of the X509 object. + + :param digest_name: The name of the digest algorithm to use. + :type digest_name: :py:class:`bytes` + + :return: The digest of the object, formatted as + :py:const:`b":"`-delimited hex pairs. + :rtype: :py:class:`bytes` + """ + digest = _lib.EVP_get_digestbyname(_byte_string(digest_name)) + if digest == _ffi.NULL: + raise ValueError("No such digest method") + + result_buffer = _ffi.new("char[]", _lib.EVP_MAX_MD_SIZE) + result_length = _ffi.new("unsigned int[]", 1) + result_length[0] = len(result_buffer) + + digest_result = _lib.X509_digest( + self._x509, digest, result_buffer, result_length) + + if not digest_result: + # TODO: This is untested. + _raise_current_error() + + return b":".join([ + b16encode(ch).upper() for ch + in _ffi.buffer(result_buffer, result_length[0])]) + + def subject_name_hash(self): + """ + Return the hash of the X509 subject. + + :return: The hash of the subject. + :rtype: :py:class:`bytes` + """ + return _lib.X509_subject_name_hash(self._x509) + + def set_serial_number(self, serial): + """ + Set the serial number of the certificate. + + :param serial: The new serial number. + :type serial: :py:class:`int` + + :return: :py:data`None` + """ + if not isinstance(serial, _integer_types): + raise TypeError("serial must be an integer") + + hex_serial = hex(serial)[2:] + if not isinstance(hex_serial, bytes): + hex_serial = hex_serial.encode('ascii') + + bignum_serial = _ffi.new("BIGNUM**") + + # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like + # it. If bignum is still NULL after this call, then the return value + # is actually the result. I hope. -exarkun + small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial) + + if bignum_serial[0] == _ffi.NULL: + set_result = _lib.ASN1_INTEGER_set( + _lib.X509_get_serialNumber(self._x509), small_serial) + if set_result: + # TODO Not tested + _raise_current_error() + else: + asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL) + _lib.BN_free(bignum_serial[0]) + if asn1_serial == _ffi.NULL: + # TODO Not tested + _raise_current_error() + asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free) + set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) + if not set_result: + # TODO Not tested + _raise_current_error() + + def get_serial_number(self): + """ + Return the serial number of this certificate. + + :return: The serial number. + :rtype: :py:class:`int` + """ + asn1_serial = _lib.X509_get_serialNumber(self._x509) + bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL) + try: + hex_serial = _lib.BN_bn2hex(bignum_serial) + try: + hexstring_serial = _ffi.string(hex_serial) + serial = int(hexstring_serial, 16) + return serial + finally: + _lib.OPENSSL_free(hex_serial) + finally: + _lib.BN_free(bignum_serial) + + def gmtime_adj_notAfter(self, amount): + """ + Adjust the time stamp on which the certificate stops being valid. + + :param amount: The number of seconds by which to adjust the timestamp. + :type amount: :py:class:`int` + + :return: :py:const:`None` + """ + if not isinstance(amount, int): + raise TypeError("amount must be an integer") + + notAfter = _lib.X509_get_notAfter(self._x509) + _lib.X509_gmtime_adj(notAfter, amount) + + def gmtime_adj_notBefore(self, amount): + """ + Adjust the timestamp on which the certificate starts being valid. + + :param amount: The number of seconds by which to adjust the timestamp. + :return: :py:const:`None` + """ + if not isinstance(amount, int): + raise TypeError("amount must be an integer") + + notBefore = _lib.X509_get_notBefore(self._x509) + _lib.X509_gmtime_adj(notBefore, amount) + + def has_expired(self): + """ + Check whether the certificate has expired. + + :return: :py:const:`True` if the certificate has expired, + :py:const:`False` otherwise. + :rtype: :py:class:`bool` + """ + now = int(time()) + notAfter = _lib.X509_get_notAfter(self._x509) + return _lib.ASN1_UTCTIME_cmp_time_t( + _ffi.cast('ASN1_UTCTIME*', notAfter), now) < 0 + + def _get_boundary_time(self, which): + return _get_asn1_time(which(self._x509)) + + def get_notBefore(self): + """ + Get the timestamp at which the certificate starts being valid. + + The timestamp is formatted as an ASN.1 GENERALIZEDTIME:: + + YYYYMMDDhhmmssZ + YYYYMMDDhhmmss+hhmm + YYYYMMDDhhmmss-hhmm + + :return: A timestamp string, or :py:const:`None` if there is none. + :rtype: :py:class:`bytes` or :py:const:`None` + """ + return self._get_boundary_time(_lib.X509_get_notBefore) + + def _set_boundary_time(self, which, when): + return _set_asn1_time(which(self._x509), when) + + def set_notBefore(self, when): + """ + Set the timestamp at which the certificate starts being valid. + + The timestamp is formatted as an ASN.1 GENERALIZEDTIME:: + + YYYYMMDDhhmmssZ + YYYYMMDDhhmmss+hhmm + YYYYMMDDhhmmss-hhmm + + :param when: A timestamp string. + :type when: :py:class:`bytes` + + :return: :py:const:`None` + """ + return self._set_boundary_time(_lib.X509_get_notBefore, when) + + def get_notAfter(self): + """ + Get the timestamp at which the certificate stops being valid. + + The timestamp is formatted as an ASN.1 GENERALIZEDTIME:: + + YYYYMMDDhhmmssZ + YYYYMMDDhhmmss+hhmm + YYYYMMDDhhmmss-hhmm + + :return: A timestamp string, or :py:const:`None` if there is none. + :rtype: :py:class:`bytes` or :py:const:`None` + """ + return self._get_boundary_time(_lib.X509_get_notAfter) + + def set_notAfter(self, when): + """ + Set the timestamp at which the certificate stops being valid. + + The timestamp is formatted as an ASN.1 GENERALIZEDTIME:: + + YYYYMMDDhhmmssZ + YYYYMMDDhhmmss+hhmm + YYYYMMDDhhmmss-hhmm + + :param when: A timestamp string. + :type when: :py:class:`bytes` + + :return: :py:const:`None` + """ + return self._set_boundary_time(_lib.X509_get_notAfter, when) + + def _get_name(self, which): + name = X509Name.__new__(X509Name) + name._name = which(self._x509) + if name._name == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + # The name is owned by the X509 structure. As long as the X509Name + # Python object is alive, keep the X509 Python object alive. + name._owner = self + + return name + + def _set_name(self, which, name): + if not isinstance(name, X509Name): + raise TypeError("name must be an X509Name") + set_result = which(self._x509, name._name) + if not set_result: + # TODO: This is untested. + _raise_current_error() + + def get_issuer(self): + """ + Return the issuer of this certificate. + + This creates a new :py:class:`X509Name`: modifying it does not affect + this certificate. + + :return: The issuer of this certificate. + :rtype: :py:class:`X509Name` + """ + return self._get_name(_lib.X509_get_issuer_name) + + def set_issuer(self, issuer): + """ + Set the issuer of this certificate. + + :param issuer: The issuer. + :type issuer: :py:class:`X509Name` + + :return: :py:const:`None` + """ + return self._set_name(_lib.X509_set_issuer_name, issuer) + + def get_subject(self): + """ + Return the subject of this certificate. + + This creates a new :py:class:`X509Name`: modifying it does not affect + this certificate. + + :return: The subject of this certificate. + :rtype: :py:class:`X509Name` + """ + return self._get_name(_lib.X509_get_subject_name) + + def set_subject(self, subject): + """ + Set the subject of this certificate. + + :param subject: The subject. + :type subject: :py:class:`X509Name` + + :return: :py:const:`None` + """ + return self._set_name(_lib.X509_set_subject_name, subject) + + def get_extension_count(self): + """ + Get the number of extensions on this certificate. + + :return: The number of extensions. + :rtype: :py:class:`int` + + .. versionadded:: 0.12 + """ + return _lib.X509_get_ext_count(self._x509) + + def add_extensions(self, extensions): + """ + Add extensions to the certificate. + + :param extensions: The extensions to add. + :type extensions: An iterable of :py:class:`X509Extension` objects. + :return: :py:const:`None` + """ + for ext in extensions: + if not isinstance(ext, X509Extension): + raise ValueError("One of the elements is not an X509Extension") + + add_result = _lib.X509_add_ext(self._x509, ext._extension, -1) + if not add_result: + _raise_current_error() + + def get_extension(self, index): + """ + Get a specific extension of the certificate by index. + + Extensions on a certificate are kept in order. The index + parameter selects which extension will be returned. + + :param int index: The index of the extension to retrieve. + :return: The extension at the specified index. + :rtype: :py:class:`X509Extension` + :raises IndexError: If the extension index was out of bounds. + + .. versionadded:: 0.12 + """ + ext = X509Extension.__new__(X509Extension) + ext._extension = _lib.X509_get_ext(self._x509, index) + if ext._extension == _ffi.NULL: + raise IndexError("extension index out of bounds") + + extension = _lib.X509_EXTENSION_dup(ext._extension) + ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) + return ext + + +X509Type = X509 + + +class X509Store(object): + """ + An X509 certificate store. + """ + + def __init__(self): + store = _lib.X509_STORE_new() + self._store = _ffi.gc(store, _lib.X509_STORE_free) + + def add_cert(self, cert): + """ + Adds the certificate :py:data:`cert` to this store. + + This is the Python equivalent of OpenSSL's ``X509_STORE_add_cert``. + + :param X509 cert: The certificate to add to this store. + :raises TypeError: If the certificate is not an :py:class:`X509`. + :raises Error: If OpenSSL was unhappy with your certificate. + :return: :py:data:`None` if the certificate was added successfully. + """ + if not isinstance(cert, X509): + raise TypeError() + + result = _lib.X509_STORE_add_cert(self._store, cert._x509) + if not result: + _raise_current_error() + + +X509StoreType = X509Store + + +class X509StoreContextError(Exception): + """ + An exception raised when an error occurred while verifying a certificate + using `OpenSSL.X509StoreContext.verify_certificate`. + + :ivar certificate: The certificate which caused verificate failure. + :type certificate: :class:`X509` + """ + + def __init__(self, message, certificate): + super(X509StoreContextError, self).__init__(message) + self.certificate = certificate + + +class X509StoreContext(object): + """ + An X.509 store context. + + An :py:class:`X509StoreContext` is used to define some of the criteria for + certificate verification. The information encapsulated in this object + includes, but is not limited to, a set of trusted certificates, + verification parameters, and revoked certificates. + + .. note:: + + Currently, one can only set the trusted certificates on an + :py:class:`X509StoreContext`. Future versions of pyOpenSSL will expose + verification parameters and certificate revocation lists. + + :ivar _store_ctx: The underlying X509_STORE_CTX structure used by this + instance. It is dynamically allocated and automatically garbage + collected. + + :ivar _store: See the ``store`` ``__init__`` parameter. + + :ivar _cert: See the ``certificate`` ``__init__`` parameter. + + :param X509Store store: The certificates which will be trusted for the + purposes of any verifications. + + :param X509 certificate: The certificate to be verified. + """ + + def __init__(self, store, certificate): + store_ctx = _lib.X509_STORE_CTX_new() + self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) + self._store = store + self._cert = certificate + # Make the store context available for use after instantiating this + # class by initializing it now. Per testing, subsequent calls to + # :py:meth:`_init` have no adverse affect. + self._init() + + def _init(self): + """ + Set up the store context for a subsequent verification operation. + """ + ret = _lib.X509_STORE_CTX_init( + self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL + ) + if ret <= 0: + _raise_current_error() + + def _cleanup(self): + """ + Internally cleans up the store context. + + The store context can then be reused with a new call to + :py:meth:`_init`. + """ + _lib.X509_STORE_CTX_cleanup(self._store_ctx) + + def _exception_from_context(self): + """ + Convert an OpenSSL native context error failure into a Python + exception. + + When a call to native OpenSSL X509_verify_cert fails, additional + information about the failure can be obtained from the store context. + """ + errors = [ + _lib.X509_STORE_CTX_get_error(self._store_ctx), + _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), + _native(_ffi.string(_lib.X509_verify_cert_error_string( + _lib.X509_STORE_CTX_get_error(self._store_ctx)))), + ] + # A context error should always be associated with a certificate, so we + # expect this call to never return :class:`None`. + _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx) + _cert = _lib.X509_dup(_x509) + pycert = X509.__new__(X509) + pycert._x509 = _ffi.gc(_cert, _lib.X509_free) + return X509StoreContextError(errors, pycert) + + def set_store(self, store): + """ + Set the context's trust store. + + .. versionadded:: 0.15 + + :param X509Store store: The certificates which will be trusted for the + purposes of any *future* verifications. + """ + self._store = store + + def verify_certificate(self): + """ + Verify a certificate in a context. + + .. versionadded:: 0.15 + + :param store_ctx: The :py:class:`X509StoreContext` to verify. + + :raises X509StoreContextError: If an error occurred when validating a + certificate in the context. Sets ``certificate`` attribute to + indicate which certificate caused the error. + """ + # Always re-initialize the store context in case + # :py:meth:`verify_certificate` is called multiple times. + self._init() + ret = _lib.X509_verify_cert(self._store_ctx) + self._cleanup() + if ret <= 0: + raise self._exception_from_context() + + +def load_certificate(type, buffer): + """ + Load a certificate from a buffer + + :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) + + :param buffer: The buffer the certificate is stored in + :type buffer: :py:class:`bytes` + + :return: The X509 object + """ + if isinstance(buffer, _text_type): + buffer = buffer.encode("ascii") + + bio = _new_mem_buf(buffer) + + if type == FILETYPE_PEM: + x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) + elif type == FILETYPE_ASN1: + x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) + else: + raise ValueError( + "type argument must be FILETYPE_PEM or FILETYPE_ASN1") + + if x509 == _ffi.NULL: + _raise_current_error() + + cert = X509.__new__(X509) + cert._x509 = _ffi.gc(x509, _lib.X509_free) + return cert + + +def dump_certificate(type, cert): + """ + Dump a certificate to a buffer + + :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or + FILETYPE_TEXT) + :param cert: The certificate to dump + :return: The buffer with the dumped certificate in + """ + bio = _new_mem_buf() + + if type == FILETYPE_PEM: + result_code = _lib.PEM_write_bio_X509(bio, cert._x509) + elif type == FILETYPE_ASN1: + result_code = _lib.i2d_X509_bio(bio, cert._x509) + elif type == FILETYPE_TEXT: + result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0) + else: + raise ValueError( + "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " + "FILETYPE_TEXT") + + assert result_code == 1 + return _bio_to_string(bio) + + +def dump_privatekey(type, pkey, cipher=None, passphrase=None): + """ + Dump a private key to a buffer + + :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or + FILETYPE_TEXT) + :param pkey: The PKey to dump + :param cipher: (optional) if encrypted PEM format, the cipher to + use + :param passphrase: (optional) if encrypted PEM format, this can be either + the passphrase to use, or a callback for providing the + passphrase. + :return: The buffer with the dumped key in + :rtype: :py:data:`bytes` + """ + bio = _new_mem_buf() + + if cipher is not None: + if passphrase is None: + raise TypeError( + "if a value is given for cipher " + "one must also be given for passphrase") + cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) + if cipher_obj == _ffi.NULL: + raise ValueError("Invalid cipher name") + else: + cipher_obj = _ffi.NULL + + helper = _PassphraseHelper(type, passphrase) + if type == FILETYPE_PEM: + result_code = _lib.PEM_write_bio_PrivateKey( + bio, pkey._pkey, cipher_obj, _ffi.NULL, 0, + helper.callback, helper.callback_args) + helper.raise_if_problem() + elif type == FILETYPE_ASN1: + result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) + elif type == FILETYPE_TEXT: + rsa = _lib.EVP_PKEY_get1_RSA(pkey._pkey) + result_code = _lib.RSA_print(bio, rsa, 0) + # TODO RSA_free(rsa)? + else: + raise ValueError( + "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " + "FILETYPE_TEXT") + + if result_code == 0: + _raise_current_error() + + return _bio_to_string(bio) + + +def _X509_REVOKED_dup(original): + copy = _lib.X509_REVOKED_new() + if copy == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + if original.serialNumber != _ffi.NULL: + _lib.ASN1_INTEGER_free(copy.serialNumber) + copy.serialNumber = _lib.ASN1_INTEGER_dup(original.serialNumber) + + if original.revocationDate != _ffi.NULL: + _lib.ASN1_TIME_free(copy.revocationDate) + copy.revocationDate = _lib.M_ASN1_TIME_dup(original.revocationDate) + + if original.extensions != _ffi.NULL: + extension_stack = _lib.sk_X509_EXTENSION_new_null() + for i in range(_lib.sk_X509_EXTENSION_num(original.extensions)): + original_ext = _lib.sk_X509_EXTENSION_value(original.extensions, i) + copy_ext = _lib.X509_EXTENSION_dup(original_ext) + _lib.sk_X509_EXTENSION_push(extension_stack, copy_ext) + copy.extensions = extension_stack + + copy.sequence = original.sequence + return copy + + +class Revoked(object): + """ + A certificate revocation. + """ + # http://www.openssl.org/docs/apps/x509v3_config.html#CRL_distribution_points_ + # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches + # OCSP_crl_reason_str. We use the latter, just like the command line + # program. + _crl_reasons = [ + b"unspecified", + b"keyCompromise", + b"CACompromise", + b"affiliationChanged", + b"superseded", + b"cessationOfOperation", + b"certificateHold", + # b"removeFromCRL", + ] + + def __init__(self): + revoked = _lib.X509_REVOKED_new() + self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free) + + def set_serial(self, hex_str): + """ + Set the serial number. + + The serial number is formatted as a hexadecimal number encoded in + ASCII. + + :param hex_str: The new serial number. + :type hex_str: :py:class:`bytes` + + :return: :py:const:`None` + """ + bignum_serial = _ffi.gc(_lib.BN_new(), _lib.BN_free) + bignum_ptr = _ffi.new("BIGNUM**") + bignum_ptr[0] = bignum_serial + bn_result = _lib.BN_hex2bn(bignum_ptr, hex_str) + if not bn_result: + raise ValueError("bad hex string") + + asn1_serial = _ffi.gc( + _lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL), + _lib.ASN1_INTEGER_free) + _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) + + def get_serial(self): + """ + Get the serial number. + + The serial number is formatted as a hexadecimal number encoded in + ASCII. + + :return: The serial number. + :rtype: :py:class:`bytes` + """ + bio = _new_mem_buf() + + result = _lib.i2a_ASN1_INTEGER(bio, self._revoked.serialNumber) + if result < 0: + # TODO: This is untested. + _raise_current_error() + + return _bio_to_string(bio) + + def _delete_reason(self): + stack = self._revoked.extensions + for i in range(_lib.sk_X509_EXTENSION_num(stack)): + ext = _lib.sk_X509_EXTENSION_value(stack, i) + if _lib.OBJ_obj2nid(ext.object) == _lib.NID_crl_reason: + _lib.X509_EXTENSION_free(ext) + _lib.sk_X509_EXTENSION_delete(stack, i) + break + + def set_reason(self, reason): + """ + Set the reason of this revocation. + + If :py:data:`reason` is :py:const:`None`, delete the reason instead. + + :param reason: The reason string. + :type reason: :py:class:`bytes` or :py:class:`NoneType` + + :return: :py:const:`None` + + .. seealso:: + + :py:meth:`all_reasons`, which gives you a list of all supported + reasons which you might pass to this method. + """ + if reason is None: + self._delete_reason() + elif not isinstance(reason, bytes): + raise TypeError("reason must be None or a byte string") + else: + reason = reason.lower().replace(b' ', b'') + reason_code = [r.lower() for r in self._crl_reasons].index(reason) + + new_reason_ext = _lib.ASN1_ENUMERATED_new() + if new_reason_ext == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + new_reason_ext = _ffi.gc(new_reason_ext, _lib.ASN1_ENUMERATED_free) + + set_result = _lib.ASN1_ENUMERATED_set(new_reason_ext, reason_code) + if set_result == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + self._delete_reason() + add_result = _lib.X509_REVOKED_add1_ext_i2d( + self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0) + + if not add_result: + # TODO: This is untested. + _raise_current_error() + + def get_reason(self): + """ + Set the reason of this revocation. + + :return: The reason, or :py:const:`None` if there is none. + :rtype: :py:class:`bytes` or :py:class:`NoneType` + + .. seealso:: + + :py:meth:`all_reasons`, which gives you a list of all supported + reasons this method might return. + """ + extensions = self._revoked.extensions + for i in range(_lib.sk_X509_EXTENSION_num(extensions)): + ext = _lib.sk_X509_EXTENSION_value(extensions, i) + if _lib.OBJ_obj2nid(ext.object) == _lib.NID_crl_reason: + bio = _new_mem_buf() + + print_result = _lib.X509V3_EXT_print(bio, ext, 0, 0) + if not print_result: + print_result = _lib.M_ASN1_OCTET_STRING_print( + bio, ext.value + ) + if print_result == 0: + # TODO: This is untested. + _raise_current_error() + + return _bio_to_string(bio) + + def all_reasons(self): + """ + Return a list of all the supported reason strings. + + This list is a copy; modifying it does not change the supported reason + strings. + + :return: A list of reason strings. + :rtype: :py:class:`list` of :py:class:`bytes` + """ + return self._crl_reasons[:] + + def set_rev_date(self, when): + """ + Set the revocation timestamp. + + :param when: The timestamp of the revocation, as ASN.1 GENERALIZEDTIME. + :type when: :py:class:`bytes` + :return: :py:const:`None` + """ + return _set_asn1_time(self._revoked.revocationDate, when) + + def get_rev_date(self): + """ + Get the revocation timestamp. + + :return: The timestamp of the revocation, as ASN.1 GENERALIZEDTIME. + :rtype: :py:class:`bytes` + """ + return _get_asn1_time(self._revoked.revocationDate) + + +class CRL(object): + """ + A certificate revocation list. + """ + + def __init__(self): + """ + Create a new empty certificate revocation list. + """ + crl = _lib.X509_CRL_new() + self._crl = _ffi.gc(crl, _lib.X509_CRL_free) + + def get_revoked(self): + """ + Return the revocations in this certificate revocation list. + + These revocations will be provided by value, not by reference. + That means it's okay to mutate them: it won't affect this CRL. + + :return: The revocations in this CRL. + :rtype: :py:class:`tuple` of :py:class:`Revocation` + """ + results = [] + revoked_stack = self._crl.crl.revoked + for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)): + revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i) + revoked_copy = _X509_REVOKED_dup(revoked) + pyrev = Revoked.__new__(Revoked) + pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free) + results.append(pyrev) + if results: + return tuple(results) + + def add_revoked(self, revoked): + """ + Add a revoked (by value not reference) to the CRL structure + + This revocation will be added by value, not by reference. That + means it's okay to mutate it after adding: it won't affect + this CRL. + + :param revoked: The new revocation. + :type revoked: :class:`Revoked` + + :return: :py:const:`None` + """ + copy = _X509_REVOKED_dup(revoked._revoked) + if copy == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + add_result = _lib.X509_CRL_add0_revoked(self._crl, copy) + if add_result == 0: + # TODO: This is untested. + _raise_current_error() + + def export(self, cert, key, type=FILETYPE_PEM, days=100, + digest=_UNSPECIFIED): + """ + Export a CRL as a string. + + :param cert: The certificate used to sign the CRL. + :type cert: :py:class:`X509` + + :param key: The key used to sign the CRL. + :type key: :py:class:`PKey` + + :param type: The export format, either :py:data:`FILETYPE_PEM`, + :py:data:`FILETYPE_ASN1`, or :py:data:`FILETYPE_TEXT`. + + :param int days: The number of days until the next update of this CRL. + + :param bytes digest: The name of the message digest to use (eg + ``b"sha1"``). + + :return: :py:data:`bytes` + """ + if not isinstance(cert, X509): + raise TypeError("cert must be an X509 instance") + if not isinstance(key, PKey): + raise TypeError("key must be a PKey instance") + if not isinstance(type, int): + raise TypeError("type must be an integer") + + if digest is _UNSPECIFIED: + _warn( + "The default message digest (md5) is deprecated. " + "Pass the name of a message digest explicitly.", + category=DeprecationWarning, + stacklevel=2, + ) + digest = b"md5" + + digest_obj = _lib.EVP_get_digestbyname(digest) + if digest_obj == _ffi.NULL: + raise ValueError("No such digest method") + + bio = _lib.BIO_new(_lib.BIO_s_mem()) + if bio == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + # A scratch time object to give different values to different CRL + # fields + sometime = _lib.ASN1_TIME_new() + if sometime == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + _lib.X509_gmtime_adj(sometime, 0) + _lib.X509_CRL_set_lastUpdate(self._crl, sometime) + + _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60) + _lib.X509_CRL_set_nextUpdate(self._crl, sometime) + + _lib.X509_CRL_set_issuer_name( + self._crl, _lib.X509_get_subject_name(cert._x509) + ) + + sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj) + if not sign_result: + _raise_current_error() + + if type == FILETYPE_PEM: + ret = _lib.PEM_write_bio_X509_CRL(bio, self._crl) + elif type == FILETYPE_ASN1: + ret = _lib.i2d_X509_CRL_bio(bio, self._crl) + elif type == FILETYPE_TEXT: + ret = _lib.X509_CRL_print(bio, self._crl) + else: + raise ValueError( + "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " + "FILETYPE_TEXT" + ) + + if not ret: + # TODO: This is untested. + _raise_current_error() + + return _bio_to_string(bio) +CRLType = CRL + + +class PKCS7(object): + def type_is_signed(self): + """ + Check if this NID_pkcs7_signed object + + :return: True if the PKCS7 is of type signed + """ + if _lib.PKCS7_type_is_signed(self._pkcs7): + return True + return False + + def type_is_enveloped(self): + """ + Check if this NID_pkcs7_enveloped object + + :returns: True if the PKCS7 is of type enveloped + """ + if _lib.PKCS7_type_is_enveloped(self._pkcs7): + return True + return False + + def type_is_signedAndEnveloped(self): + """ + Check if this NID_pkcs7_signedAndEnveloped object + + :returns: True if the PKCS7 is of type signedAndEnveloped + """ + if _lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7): + return True + return False + + def type_is_data(self): + """ + Check if this NID_pkcs7_data object + + :return: True if the PKCS7 is of type data + """ + if _lib.PKCS7_type_is_data(self._pkcs7): + return True + return False + + def get_type_name(self): + """ + Returns the type name of the PKCS7 structure + + :return: A string with the typename + """ + nid = _lib.OBJ_obj2nid(self._pkcs7.type) + string_type = _lib.OBJ_nid2sn(nid) + return _ffi.string(string_type) + +PKCS7Type = PKCS7 + + +class PKCS12(object): + """ + A PKCS #12 archive. + """ + + def __init__(self): + self._pkey = None + self._cert = None + self._cacerts = None + self._friendlyname = None + + def get_certificate(self): + """ + Get the certificate in the PKCS #12 structure. + + :return: The certificate, or :py:const:`None` if there is none. + :rtype: :py:class:`X509` or :py:const:`None` + """ + return self._cert + + def set_certificate(self, cert): + """ + Set the certificate in the PKCS #12 structure. + + :param cert: The new certificate, or :py:const:`None` to unset it. + :type cert: :py:class:`X509` or :py:const:`None` + + :return: :py:const:`None` + """ + if not isinstance(cert, X509): + raise TypeError("cert must be an X509 instance") + self._cert = cert + + def get_privatekey(self): + """ + Get the private key in the PKCS #12 structure. + + :return: The private key, or :py:const:`None` if there is none. + :rtype: :py:class:`PKey` + """ + return self._pkey + + def set_privatekey(self, pkey): + """ + Set the certificate portion of the PKCS #12 structure. + + :param pkey: The new private key, or :py:const:`None` to unset it. + :type pkey: :py:class:`PKey` or :py:const:`None` + + :return: :py:const:`None` + """ + if not isinstance(pkey, PKey): + raise TypeError("pkey must be a PKey instance") + self._pkey = pkey + + def get_ca_certificates(self): + """ + Get the CA certificates in the PKCS #12 structure. + + :return: A tuple with the CA certificates in the chain, or + :py:const:`None` if there are none. + :rtype: :py:class:`tuple` of :py:class:`X509` or :py:const:`None` + """ + if self._cacerts is not None: + return tuple(self._cacerts) + + def set_ca_certificates(self, cacerts): + """ + Replace or set the CA certificates within the PKCS12 object. + + :param cacerts: The new CA certificates, or :py:const:`None` to unset + them. + :type cacerts: An iterable of :py:class:`X509` or :py:const:`None` + + :return: :py:const:`None` + """ + if cacerts is None: + self._cacerts = None + else: + cacerts = list(cacerts) + for cert in cacerts: + if not isinstance(cert, X509): + raise TypeError( + "iterable must only contain X509 instances" + ) + self._cacerts = cacerts + + def set_friendlyname(self, name): + """ + Set the friendly name in the PKCS #12 structure. + + :param name: The new friendly name, or :py:const:`None` to unset. + :type name: :py:class:`bytes` or :py:const:`None` + + :return: :py:const:`None` + """ + if name is None: + self._friendlyname = None + elif not isinstance(name, bytes): + raise TypeError( + "name must be a byte string or None (not %r)" % (name,) + ) + self._friendlyname = name + + def get_friendlyname(self): + """ + Get the friendly name in the PKCS# 12 structure. + + :returns: The friendly name, or :py:const:`None` if there is none. + :rtype: :py:class:`bytes` or :py:const:`None` + """ + return self._friendlyname + + def export(self, passphrase=None, iter=2048, maciter=1): + """ + Dump a PKCS12 object as a string. + + For more information, see the :c:func:`PKCS12_create` man page. + + :param passphrase: The passphrase used to encrypt the structure. Unlike + some other passphrase arguments, this *must* be a string, not a + callback. + :type passphrase: :py:data:`bytes` + + :param iter: Number of times to repeat the encryption step. + :type iter: :py:data:`int` + + :param maciter: Number of times to repeat the MAC step. + :type maciter: :py:data:`int` + + :return: The string representation of the PKCS #12 structure. + :rtype: + """ + passphrase = _text_to_bytes_and_warn("passphrase", passphrase) + + if self._cacerts is None: + cacerts = _ffi.NULL + else: + cacerts = _lib.sk_X509_new_null() + cacerts = _ffi.gc(cacerts, _lib.sk_X509_free) + for cert in self._cacerts: + _lib.sk_X509_push(cacerts, cert._x509) + + if passphrase is None: + passphrase = _ffi.NULL + + friendlyname = self._friendlyname + if friendlyname is None: + friendlyname = _ffi.NULL + + if self._pkey is None: + pkey = _ffi.NULL + else: + pkey = self._pkey._pkey + + if self._cert is None: + cert = _ffi.NULL + else: + cert = self._cert._x509 + + pkcs12 = _lib.PKCS12_create( + passphrase, friendlyname, pkey, cert, cacerts, + _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, + _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, + iter, maciter, 0) + if pkcs12 == _ffi.NULL: + _raise_current_error() + pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free) + + bio = _new_mem_buf() + _lib.i2d_PKCS12_bio(bio, pkcs12) + return _bio_to_string(bio) + + +PKCS12Type = PKCS12 + + +class NetscapeSPKI(object): + """ + A Netscape SPKI object. + """ + + def __init__(self): + spki = _lib.NETSCAPE_SPKI_new() + self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free) + + def sign(self, pkey, digest): + """ + Sign the certificate request with this key and digest type. + + :param pkey: The private key to sign with. + :type pkey: :py:class:`PKey` + + :param digest: The message digest to use. + :type digest: :py:class:`bytes` + + :return: :py:const:`None` + """ + if pkey._only_public: + raise ValueError("Key has only public part") + + if not pkey._initialized: + raise ValueError("Key is uninitialized") + + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) + if digest_obj == _ffi.NULL: + raise ValueError("No such digest method") + + sign_result = _lib.NETSCAPE_SPKI_sign( + self._spki, pkey._pkey, digest_obj + ) + if not sign_result: + # TODO: This is untested. + _raise_current_error() + + def verify(self, key): + """ + Verifies a signature on a certificate request. + + :param key: The public key that signature is supposedly from. + :type pkey: :py:class:`PKey` + + :return: :py:const:`True` if the signature is correct. + :rtype: :py:class:`bool` + + :raises Error: If the signature is invalid, or there was a problem + verifying the signature. + """ + answer = _lib.NETSCAPE_SPKI_verify(self._spki, key._pkey) + if answer <= 0: + _raise_current_error() + return True + + def b64_encode(self): + """ + Generate a base64 encoded representation of this SPKI object. + + :return: The base64 encoded string. + :rtype: :py:class:`bytes` + """ + encoded = _lib.NETSCAPE_SPKI_b64_encode(self._spki) + result = _ffi.string(encoded) + _lib.CRYPTO_free(encoded) + return result + + def get_pubkey(self): + """ + Get the public key of this certificate. + + :return: The public key. + :rtype: :py:class:`PKey` + """ + pkey = PKey.__new__(PKey) + pkey._pkey = _lib.NETSCAPE_SPKI_get_pubkey(self._spki) + if pkey._pkey == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) + pkey._only_public = True + return pkey + + def set_pubkey(self, pkey): + """ + Set the public key of the certificate + + :param pkey: The public key + :return: :py:const:`None` + """ + set_result = _lib.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey) + if not set_result: + # TODO: This is untested. + _raise_current_error() + + +NetscapeSPKIType = NetscapeSPKI + + +class _PassphraseHelper(object): + def __init__(self, type, passphrase, more_args=False, truncate=False): + if type != FILETYPE_PEM and passphrase is not None: + raise ValueError( + "only FILETYPE_PEM key format supports encryption" + ) + self._passphrase = passphrase + self._more_args = more_args + self._truncate = truncate + self._problems = [] + + @property + def callback(self): + if self._passphrase is None: + return _ffi.NULL + elif isinstance(self._passphrase, bytes): + return _ffi.NULL + elif callable(self._passphrase): + return _ffi.callback("pem_password_cb", self._read_passphrase) + else: + raise TypeError("Last argument must be string or callable") + + @property + def callback_args(self): + if self._passphrase is None: + return _ffi.NULL + elif isinstance(self._passphrase, bytes): + return self._passphrase + elif callable(self._passphrase): + return _ffi.NULL + else: + raise TypeError("Last argument must be string or callable") + + def raise_if_problem(self, exceptionType=Error): + try: + _exception_from_error_queue(exceptionType) + except exceptionType as e: + from_queue = e + if self._problems: + raise self._problems[0] + return from_queue + + def _read_passphrase(self, buf, size, rwflag, userdata): + try: + if self._more_args: + result = self._passphrase(size, rwflag, userdata) + else: + result = self._passphrase(rwflag) + if not isinstance(result, bytes): + raise ValueError("String expected") + if len(result) > size: + if self._truncate: + result = result[:size] + else: + raise ValueError( + "passphrase returned by callback is too long" + ) + for i in range(len(result)): + buf[i] = result[i:i + 1] + return len(result) + except Exception as e: + self._problems.append(e) + return 0 + + +def load_privatekey(type, buffer, passphrase=None): + """ + Load a private key from a buffer + + :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) + :param buffer: The buffer the key is stored in + :param passphrase: (optional) if encrypted PEM format, this can be + either the passphrase to use, or a callback for + providing the passphrase. + + :return: The PKey object + """ + if isinstance(buffer, _text_type): + buffer = buffer.encode("ascii") + + bio = _new_mem_buf(buffer) + + helper = _PassphraseHelper(type, passphrase) + if type == FILETYPE_PEM: + evp_pkey = _lib.PEM_read_bio_PrivateKey( + bio, _ffi.NULL, helper.callback, helper.callback_args) + helper.raise_if_problem() + elif type == FILETYPE_ASN1: + evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) + else: + raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") + + if evp_pkey == _ffi.NULL: + _raise_current_error() + + pkey = PKey.__new__(PKey) + pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) + return pkey + + +def dump_certificate_request(type, req): + """ + Dump a certificate request to a buffer + + :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) + :param req: The certificate request to dump + :return: The buffer with the dumped certificate request in + """ + bio = _new_mem_buf() + + if type == FILETYPE_PEM: + result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req) + elif type == FILETYPE_ASN1: + result_code = _lib.i2d_X509_REQ_bio(bio, req._req) + elif type == FILETYPE_TEXT: + result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0) + else: + raise ValueError( + "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " + "FILETYPE_TEXT" + ) + + if result_code == 0: + # TODO: This is untested. + _raise_current_error() + + return _bio_to_string(bio) + + +def load_certificate_request(type, buffer): + """ + Load a certificate request from a buffer + + :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) + :param buffer: The buffer the certificate request is stored in + :return: The X509Req object + """ + if isinstance(buffer, _text_type): + buffer = buffer.encode("ascii") + + bio = _new_mem_buf(buffer) + + if type == FILETYPE_PEM: + req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) + elif type == FILETYPE_ASN1: + req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL) + else: + raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") + + if req == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + x509req = X509Req.__new__(X509Req) + x509req._req = _ffi.gc(req, _lib.X509_REQ_free) + return x509req + + +def sign(pkey, data, digest): + """ + Sign data with a digest + + :param pkey: Pkey to sign with + :param data: data to be signed + :param digest: message digest to use + :return: signature + """ + data = _text_to_bytes_and_warn("data", data) + + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) + if digest_obj == _ffi.NULL: + raise ValueError("No such digest method") + + md_ctx = _ffi.new("EVP_MD_CTX*") + md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_cleanup) + + _lib.EVP_SignInit(md_ctx, digest_obj) + _lib.EVP_SignUpdate(md_ctx, data, len(data)) + + signature_buffer = _ffi.new("unsigned char[]", 512) + signature_length = _ffi.new("unsigned int*") + signature_length[0] = len(signature_buffer) + final_result = _lib.EVP_SignFinal( + md_ctx, signature_buffer, signature_length, pkey._pkey) + + if final_result != 1: + # TODO: This is untested. + _raise_current_error() + + return _ffi.buffer(signature_buffer, signature_length[0])[:] + + +def verify(cert, signature, data, digest): + """ + Verify a signature. + + :param cert: signing certificate (X509 object) + :param signature: signature returned by sign function + :param data: data to be verified + :param digest: message digest to use + :return: :py:const:`None` if the signature is correct, raise exception + otherwise + """ + data = _text_to_bytes_and_warn("data", data) + + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) + if digest_obj == _ffi.NULL: + raise ValueError("No such digest method") + + pkey = _lib.X509_get_pubkey(cert._x509) + if pkey == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) + + md_ctx = _ffi.new("EVP_MD_CTX*") + md_ctx = _ffi.gc(md_ctx, _lib.EVP_MD_CTX_cleanup) + + _lib.EVP_VerifyInit(md_ctx, digest_obj) + _lib.EVP_VerifyUpdate(md_ctx, data, len(data)) + verify_result = _lib.EVP_VerifyFinal( + md_ctx, signature, len(signature), pkey + ) + + if verify_result != 1: + _raise_current_error() + + +def load_crl(type, buffer): + """ + Load a certificate revocation list from a buffer + + :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) + :param buffer: The buffer the CRL is stored in + + :return: The PKey object + """ + if isinstance(buffer, _text_type): + buffer = buffer.encode("ascii") + + bio = _new_mem_buf(buffer) + + if type == FILETYPE_PEM: + crl = _lib.PEM_read_bio_X509_CRL(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) + elif type == FILETYPE_ASN1: + crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL) + else: + raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") + + if crl == _ffi.NULL: + _raise_current_error() + + result = CRL.__new__(CRL) + result._crl = crl + return result + + +def load_pkcs7_data(type, buffer): + """ + Load pkcs7 data from a buffer + + :param type: The file type (one of FILETYPE_PEM or FILETYPE_ASN1) + :param buffer: The buffer with the pkcs7 data. + :return: The PKCS7 object + """ + if isinstance(buffer, _text_type): + buffer = buffer.encode("ascii") + + bio = _new_mem_buf(buffer) + + if type == FILETYPE_PEM: + pkcs7 = _lib.PEM_read_bio_PKCS7(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) + elif type == FILETYPE_ASN1: + pkcs7 = _lib.d2i_PKCS7_bio(bio, _ffi.NULL) + else: + # TODO: This is untested. + _raise_current_error() + raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") + + if pkcs7 == _ffi.NULL: + _raise_current_error() + + pypkcs7 = PKCS7.__new__(PKCS7) + pypkcs7._pkcs7 = _ffi.gc(pkcs7, _lib.PKCS7_free) + return pypkcs7 + + +def load_pkcs12(buffer, passphrase=None): + """ + Load a PKCS12 object from a buffer + + :param buffer: The buffer the certificate is stored in + :param passphrase: (Optional) The password to decrypt the PKCS12 lump + :returns: The PKCS12 object + """ + passphrase = _text_to_bytes_and_warn("passphrase", passphrase) + + if isinstance(buffer, _text_type): + buffer = buffer.encode("ascii") + + bio = _new_mem_buf(buffer) + + # Use null passphrase if passphrase is None or empty string. With PKCS#12 + # password based encryption no password and a zero length password are two + # different things, but OpenSSL implementation will try both to figure out + # which one works. + if not passphrase: + passphrase = _ffi.NULL + + p12 = _lib.d2i_PKCS12_bio(bio, _ffi.NULL) + if p12 == _ffi.NULL: + _raise_current_error() + p12 = _ffi.gc(p12, _lib.PKCS12_free) + + pkey = _ffi.new("EVP_PKEY**") + cert = _ffi.new("X509**") + cacerts = _ffi.new("Cryptography_STACK_OF_X509**") + + parse_result = _lib.PKCS12_parse(p12, passphrase, pkey, cert, cacerts) + if not parse_result: + _raise_current_error() + + cacerts = _ffi.gc(cacerts[0], _lib.sk_X509_free) + + # openssl 1.0.0 sometimes leaves an X509_check_private_key error in the + # queue for no particular reason. This error isn't interesting to anyone + # outside this function. It's not even interesting to us. Get rid of it. + try: + _raise_current_error() + except Error: + pass + + if pkey[0] == _ffi.NULL: + pykey = None + else: + pykey = PKey.__new__(PKey) + pykey._pkey = _ffi.gc(pkey[0], _lib.EVP_PKEY_free) + + if cert[0] == _ffi.NULL: + pycert = None + friendlyname = None + else: + pycert = X509.__new__(X509) + pycert._x509 = _ffi.gc(cert[0], _lib.X509_free) + + friendlyname_length = _ffi.new("int*") + friendlyname_buffer = _lib.X509_alias_get0( + cert[0], friendlyname_length + ) + friendlyname = _ffi.buffer( + friendlyname_buffer, friendlyname_length[0] + )[:] + if friendlyname_buffer == _ffi.NULL: + friendlyname = None + + pycacerts = [] + for i in range(_lib.sk_X509_num(cacerts)): + pycacert = X509.__new__(X509) + pycacert._x509 = _lib.sk_X509_value(cacerts, i) + pycacerts.append(pycacert) + if not pycacerts: + pycacerts = None + + pkcs12 = PKCS12.__new__(PKCS12) + pkcs12._pkey = pykey + pkcs12._cert = pycert + pkcs12._cacerts = pycacerts + pkcs12._friendlyname = friendlyname + return pkcs12 + + +# There are no direct unit tests for this initialization. It is tested +# indirectly since it is necessary for functions like dump_privatekey when +# using encryption. +# +# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase +# and some other similar tests may fail without this (though they may not if +# the Python runtime has already done some initialization of the underlying +# OpenSSL library (and is linked against the same one that cryptography is +# using)). +_lib.OpenSSL_add_all_algorithms() + +# This is similar but exercised mainly by exception_from_error_queue. It calls +# both ERR_load_crypto_strings() and ERR_load_SSL_strings(). +_lib.SSL_load_error_strings() + + +# Set the default string mask to match OpenSSL upstream (since 2005) and +# RFC5280 recommendations. +_lib.ASN1_STRING_set_default_mask_asc(b'utf8only') diff --git a/src/OpenSSL/rand.py b/src/OpenSSL/rand.py new file mode 100644 index 0000000..65cd597 --- /dev/null +++ b/src/OpenSSL/rand.py @@ -0,0 +1,174 @@ +""" +PRNG management routines, thin wrappers. + +See the file RATIONALE for a short explanation of why this module was written. +""" + +from functools import partial + +from six import integer_types as _integer_types + +from OpenSSL._util import ( + ffi as _ffi, + lib as _lib, + exception_from_error_queue as _exception_from_error_queue, + path_string as _path_string) + + +class Error(Exception): + """ + An error occurred in an `OpenSSL.rand` API. + """ + +_raise_current_error = partial(_exception_from_error_queue, Error) + +_unspecified = object() + +_builtin_bytes = bytes + + +def bytes(num_bytes): + """ + Get some random bytes as a string. + + :param num_bytes: The number of bytes to fetch + :return: A string of random bytes + """ + if not isinstance(num_bytes, _integer_types): + raise TypeError("num_bytes must be an integer") + + if num_bytes < 0: + raise ValueError("num_bytes must not be negative") + + result_buffer = _ffi.new("char[]", num_bytes) + result_code = _lib.RAND_bytes(result_buffer, num_bytes) + if result_code == -1: + # TODO: No tests for this code path. Triggering a RAND_bytes failure + # might involve supplying a custom ENGINE? That's hard. + _raise_current_error() + + return _ffi.buffer(result_buffer)[:] + + +def add(buffer, entropy): + """ + Add data with a given entropy to the PRNG + + :param buffer: Buffer with random data + :param entropy: The entropy (in bytes) measurement of the buffer + :return: None + """ + if not isinstance(buffer, _builtin_bytes): + raise TypeError("buffer must be a byte string") + + if not isinstance(entropy, int): + raise TypeError("entropy must be an integer") + + # TODO Nothing tests this call actually being made, or made properly. + _lib.RAND_add(buffer, len(buffer), entropy) + + +def seed(buffer): + """ + Alias for rand_add, with entropy equal to length + + :param buffer: Buffer with random data + :return: None + """ + if not isinstance(buffer, _builtin_bytes): + raise TypeError("buffer must be a byte string") + + # TODO Nothing tests this call actually being made, or made properly. + _lib.RAND_seed(buffer, len(buffer)) + + +def status(): + """ + Retrieve the status of the PRNG + + :return: True if the PRNG is seeded enough, false otherwise + """ + return _lib.RAND_status() + + +def egd(path, bytes=_unspecified): + """ + Query an entropy gathering daemon (EGD) for random data and add it to the + PRNG. I haven't found any problems when the socket is missing, the function + just returns 0. + + :param path: The path to the EGD socket + :param bytes: (optional) The number of bytes to read, default is 255 + :returns: The number of bytes read (NB: a value of 0 isn't necessarily an + error, check rand.status()) + """ + if not isinstance(path, _builtin_bytes): + raise TypeError("path must be a byte string") + + if bytes is _unspecified: + bytes = 255 + elif not isinstance(bytes, int): + raise TypeError("bytes must be an integer") + + return _lib.RAND_egd_bytes(path, bytes) + + +def cleanup(): + """ + Erase the memory used by the PRNG. + + :return: None + """ + # TODO Nothing tests this call actually being made, or made properly. + _lib.RAND_cleanup() + + +def load_file(filename, maxbytes=_unspecified): + """ + Seed the PRNG with data from a file + + :param filename: The file to read data from (``bytes`` or ``unicode``). + :param maxbytes: (optional) The number of bytes to read, default is to read + the entire file + + :return: The number of bytes read + """ + filename = _path_string(filename) + + if maxbytes is _unspecified: + maxbytes = -1 + elif not isinstance(maxbytes, int): + raise TypeError("maxbytes must be an integer") + + return _lib.RAND_load_file(filename, maxbytes) + + +def write_file(filename): + """ + Save PRNG state to a file + + :param filename: The file to write data to (``bytes`` or ``unicode``). + + :return: The number of bytes written + """ + filename = _path_string(filename) + return _lib.RAND_write_file(filename) + + +# TODO There are no tests for screen at all +def screen(): + """ + Add the current contents of the screen to the PRNG state. Availability: + Windows. + + :return: None + """ + _lib.RAND_screen() + +if getattr(_lib, 'RAND_screen', None) is None: + del screen + + +# TODO There are no tests for the RAND strings being loaded, whatever that +# means. +_lib.ERR_load_RAND_strings() diff --git a/src/OpenSSL/tsafe.py b/src/OpenSSL/tsafe.py new file mode 100644 index 0000000..ba17d73 --- /dev/null +++ b/src/OpenSSL/tsafe.py @@ -0,0 +1,28 @@ +from OpenSSL import SSL +_ssl = SSL +del SSL + +import threading +_RLock = threading.RLock +del threading + + +class Connection: + def __init__(self, *args): + self._ssl_conn = _ssl.Connection(*args) + self._lock = _RLock() + + for f in ('get_context', 'pending', 'send', 'write', 'recv', 'read', + 'renegotiate', 'bind', 'listen', 'connect', 'accept', + 'setblocking', 'fileno', 'shutdown', 'close', 'get_cipher_list', + 'getpeername', 'getsockname', 'getsockopt', 'setsockopt', + 'makefile', 'get_app_data', 'set_app_data', 'state_string', + 'sock_shutdown', 'get_peer_certificate', 'get_peer_cert_chain', + 'want_read', 'want_write', 'set_connect_state', + 'set_accept_state', 'connect_ex', 'sendall'): + exec("""def %s(self, *args): + self._lock.acquire() + try: + return self._ssl_conn.%s(*args) + finally: + self._lock.release()\n""" % (f, f)) diff --git a/src/OpenSSL/version.py b/src/OpenSSL/version.py new file mode 100644 index 0000000..f284b04 --- /dev/null +++ b/src/OpenSSL/version.py @@ -0,0 +1,22 @@ +# Copyright (C) AB Strakt +# Copyright (C) Jean-Paul Calderone +# See LICENSE for details. + +""" +pyOpenSSL - A simple wrapper around the OpenSSL library +""" + +__all__ = [ + "__author__", "__copyright__", "__email__", "__license__", "__summary__", + "__title__", "__uri__", "__version__", +] + +__version__ = "0.16.dev0" + +__title__ = "pyOpenSSL" +__uri__ = "https://github.com/pyca/pyopenssl" +__summary__ = "Python wrapper module around the OpenSSL library" +__author__ = "The pyOpenSSL developers" +__email__ = "cryptography-dev@python.org" +__license__ = "Apache License, Version 2.0" +__copyright__ = "Copyright 2001-2015 {0}".format(__author__) |