summaryrefslogtreecommitdiff
path: root/src/M2Crypto/SSL/Context.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/M2Crypto/SSL/Context.py')
-rw-r--r--src/M2Crypto/SSL/Context.py445
1 files changed, 445 insertions, 0 deletions
diff --git a/src/M2Crypto/SSL/Context.py b/src/M2Crypto/SSL/Context.py
new file mode 100644
index 0000000..d7cb7bc
--- /dev/null
+++ b/src/M2Crypto/SSL/Context.py
@@ -0,0 +1,445 @@
+from __future__ import absolute_import
+
+"""SSL Context
+
+Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved."""
+
+from M2Crypto import BIO, Err, RSA, X509, m2, util # noqa
+from M2Crypto.SSL import cb # noqa
+from M2Crypto.SSL.Session import Session # noqa
+from weakref import WeakValueDictionary
+from typing import Any, AnyStr, Callable, Optional, Union # noqa
+
+__all__ = ['ctxmap', 'Context', 'map']
+
+
+class _ctxmap(object):
+ singleton = None # type: Optional[_ctxmap]
+
+ def __init__(self):
+ # type: () -> None
+ """Simple WeakReffed list.
+ """
+ self._ctxmap = WeakValueDictionary()
+
+ def __getitem__(self, key):
+ # type: (int) -> Any
+ return self._ctxmap[key]
+
+ def __setitem__(self, key, value):
+ # type: (int, Any) -> None
+ self._ctxmap[key] = value
+
+ def __delitem__(self, key):
+ # type: (int) -> None
+ del self._ctxmap[key]
+
+
+def ctxmap():
+ # type: () -> _ctxmap
+ if _ctxmap.singleton is None:
+ _ctxmap.singleton = _ctxmap()
+ return _ctxmap.singleton
+# deprecated!!!
+map = ctxmap
+
+
+class Context(object):
+
+ """'Context' for SSL connections."""
+
+ m2_ssl_ctx_free = m2.ssl_ctx_free
+
+ def __init__(self, protocol='tls', weak_crypto=None,
+ post_connection_check=None):
+ # type: (str, Optional[int], Optional[Callable]) -> None
+ proto = getattr(m2, protocol + '_method', None)
+ if proto is None:
+ # default is 'sslv23' for older versions of OpenSSL
+ if protocol == 'tls':
+ proto = getattr(m2, 'sslv23_method')
+ else:
+ raise ValueError("no such protocol '%s'" % protocol)
+ self.ctx = m2.ssl_ctx_new(proto())
+ self.allow_unknown_ca = 0 # type: Union[int, bool]
+ self.post_connection_check = post_connection_check
+ ctxmap()[int(self.ctx)] = self
+ m2.ssl_ctx_set_cache_size(self.ctx, 128)
+ if weak_crypto is None and protocol in ('sslv23', 'tls'):
+ self.set_options(m2.SSL_OP_ALL | m2.SSL_OP_NO_SSLv2 |
+ m2.SSL_OP_NO_SSLv3)
+
+ def __del__(self):
+ # type: () -> None
+ if getattr(self, 'ctx', None):
+ self.m2_ssl_ctx_free(self.ctx)
+
+ def close(self):
+ # type: () -> None
+ del ctxmap()[int(self.ctx)]
+
+ def load_cert(self, certfile, keyfile=None,
+ callback=util.passphrase_callback):
+ # type: (AnyStr, Optional[AnyStr], Callable) -> None
+ """Load certificate and private key into the context.
+
+ :param certfile: File that contains the PEM-encoded certificate.
+ :param keyfile: File that contains the PEM-encoded private key.
+ Default value of None indicates that the private key
+ is to be found in 'certfile'.
+ :param callback: Callable object to be invoked if the private key is
+ passphrase-protected. Default callback provides a
+ simple terminal-style input for the passphrase.
+ """
+ m2.ssl_ctx_passphrase_callback(self.ctx, callback)
+ m2.ssl_ctx_use_cert(self.ctx, certfile)
+ if not keyfile:
+ keyfile = certfile
+ m2.ssl_ctx_use_privkey(self.ctx, keyfile)
+ if not m2.ssl_ctx_check_privkey(self.ctx):
+ raise ValueError('public/private key mismatch')
+
+ def load_cert_chain(self, certchainfile, keyfile=None,
+ callback=util.passphrase_callback):
+ # type: (AnyStr, Optional[AnyStr], Callable) -> None
+ """Load certificate chain and private key into the context.
+
+ :param certchainfile: File object containing the PEM-encoded
+ certificate chain.
+ :param keyfile: File object containing the PEM-encoded private
+ key. Default value of None indicates that the
+ private key is to be found in 'certchainfile'.
+ :param callback: Callable object to be invoked if the private key
+ is passphrase-protected. Default callback
+ provides a simple terminal-style input for the
+ passphrase.
+ """
+ m2.ssl_ctx_passphrase_callback(self.ctx, callback)
+ m2.ssl_ctx_use_cert_chain(self.ctx, certchainfile)
+ if not keyfile:
+ keyfile = certchainfile
+ m2.ssl_ctx_use_privkey(self.ctx, keyfile)
+ if not m2.ssl_ctx_check_privkey(self.ctx):
+ raise ValueError('public/private key mismatch')
+
+ def set_client_CA_list_from_file(self, cafile):
+ # type: (AnyStr) -> None
+ """Load CA certs into the context. These CA certs are sent to the
+ peer during *SSLv3 certificate request*.
+
+ :param cafile: File object containing one or more PEM-encoded CA
+ certificates concatenated together.
+ """
+ m2.ssl_ctx_set_client_CA_list_from_file(self.ctx, cafile)
+
+ # Deprecated.
+ load_client_CA = load_client_ca = set_client_CA_list_from_file
+
+ def load_verify_locations(self, cafile=None, capath=None):
+ # type: (Optional[AnyStr], Optional[AnyStr]) -> int
+ """Load CA certs into the context.
+
+ These CA certs are used during verification of the peer's
+ certificate.
+
+ :param cafile: File containing one or more PEM-encoded CA
+ certificates concatenated together.
+
+ :param capath: Directory containing PEM-encoded CA certificates
+ (one certificate per file).
+
+ :return: 0 if the operation failed because CAfile and CApath are NULL
+ or the processing at one of the locations specified failed.
+ Check the error stack to find out the reason.
+
+ 1 The operation succeeded.
+ """
+ if cafile is None and capath is None:
+ raise ValueError("cafile and capath can not both be None.")
+ return m2.ssl_ctx_load_verify_locations(self.ctx, cafile, capath)
+
+ # Deprecated.
+ load_verify_info = load_verify_locations
+
+ def set_session_id_ctx(self, id):
+ # type: (bytes) -> None
+ """Sets the session id for the SSL.Context w/in a session can be reused.
+
+ :param id: Sessions are generated within a certain context. When
+ exporting/importing sessions with
+ i2d_SSL_SESSION/d2i_SSL_SESSION it would be possible,
+ to re-import a session generated from another context
+ (e.g. another application), which might lead to
+ malfunctions. Therefore each application must set its
+ own session id context sid_ctx which is used to
+ distinguish the contexts and is stored in exported
+ sessions. The sid_ctx can be any kind of binary data
+ with a given length, it is therefore possible to use
+ e.g. the name of the application and/or the hostname
+ and/or service name.
+ """
+ ret = m2.ssl_ctx_set_session_id_context(self.ctx, id)
+ if not ret:
+ raise Err.SSLError(Err.get_error_code(), '')
+
+ def set_default_verify_paths(self):
+ # type: () -> int
+ """
+ Specifies that the default locations from which CA certs are
+ loaded should be used.
+
+ There is one default directory and one default file. The default
+ CA certificates directory is called "certs" in the default
+ OpenSSL directory. Alternatively the SSL_CERT_DIR environment
+ variable can be defined to override this location. The default
+ CA certificates file is called "cert.pem" in the default OpenSSL
+ directory. Alternatively the SSL_CERT_FILE environment variable
+ can be defined to override this location.
+
+ @return 0 if the operation failed. A missing default location is
+ still treated as a success. No error code is set.
+
+ 1 The operation succeeded.
+ """
+ ret = m2.ssl_ctx_set_default_verify_paths(self.ctx)
+ if not ret:
+ raise ValueError('Cannot use default SSL certificate store!')
+
+ def set_allow_unknown_ca(self, ok):
+ # type: (Union[int, bool]) -> None
+ """Set the context to accept/reject a peer certificate if the
+ certificate's CA is unknown.
+
+ :param ok: True to accept, False to reject.
+ """
+ self.allow_unknown_ca = ok
+
+ def get_allow_unknown_ca(self):
+ # type: () -> Union[int, bool]
+ """Get the context's setting that accepts/rejects a peer
+ certificate if the certificate's CA is unknown.
+
+ FIXME 2Bconverted to bool
+ """
+ return self.allow_unknown_ca
+
+ def set_verify(self, mode, depth, callback=None):
+ # type: (int, int, Optional[Callable]) -> None
+ """
+ Set verify options. Most applications will need to call this
+ method with the right options to make a secure SSL connection.
+
+ :param mode: The verification mode to use. Typically at least
+ SSL.verify_peer is used. Clients would also typically
+ add SSL.verify_fail_if_no_peer_cert.
+ :param depth: The maximum allowed depth of the certificate chain
+ returned by the peer.
+ :param callback: Callable that can be used to specify custom
+ verification checks.
+ """
+ if callback is None:
+ m2.ssl_ctx_set_verify_default(self.ctx, mode)
+ else:
+ m2.ssl_ctx_set_verify(self.ctx, mode, callback)
+ m2.ssl_ctx_set_verify_depth(self.ctx, depth)
+
+ def get_verify_mode(self):
+ # type: () -> int
+ return m2.ssl_ctx_get_verify_mode(self.ctx)
+
+ def get_verify_depth(self):
+ # type: () -> int
+ """Returns the verification mode currently set in the SSL Context."""
+ return m2.ssl_ctx_get_verify_depth(self.ctx)
+
+ def set_tmp_dh(self, dhpfile):
+ # type: (AnyStr) -> int
+ """Load ephemeral DH parameters into the context.
+
+ :param dhpfile: Filename of the file containing the PEM-encoded
+ DH parameters.
+ """
+ f = BIO.openfile(dhpfile)
+ dhp = m2.dh_read_parameters(f.bio_ptr())
+ return m2.ssl_ctx_set_tmp_dh(self.ctx, dhp)
+
+ def set_tmp_dh_callback(self, callback=None):
+ # type: (Optional[Callable]) -> None
+ """Sets the callback function for SSL.Context.
+
+ :param callback: Callable to be used when a DH parameters are required.
+ """
+ if callback is not None:
+ m2.ssl_ctx_set_tmp_dh_callback(self.ctx, callback)
+
+ def set_tmp_rsa(self, rsa):
+ # type: (RSA.RSA) -> int
+ """Load ephemeral RSA key into the context.
+
+ :param rsa: RSA.RSA instance.
+ """
+ if isinstance(rsa, RSA.RSA):
+ return m2.ssl_ctx_set_tmp_rsa(self.ctx, rsa.rsa)
+ else:
+ raise TypeError("Expected an instance of RSA.RSA, got %s." % rsa)
+
+ def set_tmp_rsa_callback(self, callback=None):
+ # type: (Optional[Callable]) -> None
+ """Sets the callback function to be used when
+ a temporary/ephemeral RSA key is required.
+ """
+ if callback is not None:
+ m2.ssl_ctx_set_tmp_rsa_callback(self.ctx, callback)
+
+ def set_info_callback(self, callback=cb.ssl_info_callback):
+ # type: (Callable) -> None
+ """Set a callback function to get state information.
+
+ It can be used to get state information about the SSL
+ connections that are created from this context.
+
+ :param callback: Callback function. The default prints
+ information to stderr.
+ """
+ m2.ssl_ctx_set_info_callback(self.ctx, callback)
+
+ def set_cipher_list(self, cipher_list):
+ # type: (str) -> int
+ """Sets the list of available ciphers.
+
+ :param cipher_list: The format of the string is described in
+ ciphers(1).
+ :return: 1 if any cipher could be selected and 0 on complete
+ failure.
+ """
+ return m2.ssl_ctx_set_cipher_list(self.ctx, cipher_list)
+
+ def add_session(self, session):
+ # type: (Session) -> int
+ """Add the session to the context.
+
+ :param session: the session to be added.
+
+ :return: 0 The operation failed. It was tried to add the same
+ (identical) session twice.
+
+ 1 The operation succeeded.
+ """
+ return m2.ssl_ctx_add_session(self.ctx, session._ptr())
+
+ def remove_session(self, session):
+ # type: (Session) -> int
+ """Remove the session from the context.
+
+ :param session: the session to be removed.
+
+ :return: 0 The operation failed. The session was not found in
+ the cache.
+
+ 1 The operation succeeded.
+ """
+ return m2.ssl_ctx_remove_session(self.ctx, session._ptr())
+
+ def get_session_timeout(self):
+ # type: () -> int
+ """Get current session timeout.
+
+ Whenever a new session is created, it is assigned a maximum
+ lifetime. This lifetime is specified by storing the creation
+ time of the session and the timeout value valid at this time. If
+ the actual time is later than creation time plus timeout, the
+ session is not reused.
+
+ Due to this realization, all sessions behave according to the
+ timeout value valid at the time of the session negotiation.
+ Changes of the timeout value do not affect already established
+ sessions.
+
+ Expired sessions are removed from the internal session cache,
+ whenever SSL_CTX_flush_sessions(3) is called, either directly by
+ the application or automatically (see
+ SSL_CTX_set_session_cache_mode(3))
+
+ The default value for session timeout is decided on a per
+ protocol basis, see SSL_get_default_timeout(3). All currently
+ supported protocols have the same default timeout value of 300
+ seconds.
+
+ SSL_CTX_set_timeout() returns the previously set timeout value.
+
+ :return: the currently set timeout value.
+ """
+ return m2.ssl_ctx_get_session_timeout(self.ctx)
+
+ def set_session_timeout(self, timeout):
+ # type: (int) -> int
+ """Set new session timeout.
+
+ See self.get_session_timeout() for explanation of the session
+ timeouts.
+
+ :param timeout: new timeout value.
+
+ :return: the previously set timeout value.
+ """
+ return m2.ssl_ctx_set_session_timeout(self.ctx, timeout)
+
+ def set_session_cache_mode(self, mode):
+ # type: (int) -> int
+ """Enables/disables session caching.
+
+ The mode is set by using m2.SSL_SESS_CACHE_* constants.
+
+ :param mode: new mode value.
+
+ :return: the previously set cache mode value.
+ """
+ return m2.ssl_ctx_set_session_cache_mode(self.ctx, mode)
+
+ def get_session_cache_mode(self):
+ # type: () -> int
+ """Gets the current session caching.
+
+ The mode is set to m2.SSL_SESS_CACHE_* constants.
+
+ :return: the previously set cache mode value.
+ """
+ return m2.ssl_ctx_get_session_cache_mode(self.ctx)
+
+ def set_options(self, op):
+ # type: (int) -> int
+ """Adds the options set via bitmask in options to the Context.
+
+ !!! Options already set before are not cleared!
+
+ The behaviour of the SSL library can be changed by setting
+ several options. The options are coded as bitmasks and can be
+ combined by a logical or operation (|).
+
+ SSL.Context.set_options() and SSL.set_options() affect the
+ (external) protocol behaviour of the SSL library. The (internal)
+ behaviour of the API can be changed by using the similar
+ SSL.Context.set_mode() and SSL.set_mode() functions.
+
+ During a handshake, the option settings of the SSL object are
+ used. When a new SSL object is created from a context using
+ SSL(), the current option setting is copied. Changes to ctx
+ do not affect already created SSL objects. SSL.clear() does not
+ affect the settings.
+
+ :param op: bitmask of additional options specified in
+ SSL_CTX_set_options(3) manpage.
+
+ :return: the new options bitmask after adding options.
+ """
+ return m2.ssl_ctx_set_options(self.ctx, op)
+
+ def get_cert_store(self):
+ # type: () -> X509.X509
+ """
+ Get the certificate store associated with this context.
+
+ :warning: The store is NOT refcounted, and as such can not be relied
+ to be valid once the context goes away or is changed.
+ """
+ return X509.X509_Store(m2.ssl_ctx_get_cert_store(self.ctx))