summaryrefslogtreecommitdiff
path: root/src/M2Crypto/SSL/Context.py
blob: d7cb7bcc5101fb306a0ed8acecabaa7ebb8222bd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
from __future__ import absolute_import

"""SSL Context

Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved."""

from M2Crypto import BIO, Err, RSA, X509, m2, util  # noqa
from M2Crypto.SSL import cb  # noqa
from M2Crypto.SSL.Session import Session  # noqa
from weakref import WeakValueDictionary
from typing import Any, AnyStr, Callable, Optional, Union  # noqa

__all__ = ['ctxmap', 'Context', 'map']


class _ctxmap(object):
    singleton = None  # type: Optional[_ctxmap]

    def __init__(self):
        # type: () -> None
        """Simple WeakReffed list.
        """
        self._ctxmap = WeakValueDictionary()

    def __getitem__(self, key):
        # type: (int) -> Any
        return self._ctxmap[key]

    def __setitem__(self, key, value):
        # type: (int, Any) -> None
        self._ctxmap[key] = value

    def __delitem__(self, key):
        # type: (int) -> None
        del self._ctxmap[key]


def ctxmap():
    # type: () -> _ctxmap
    if _ctxmap.singleton is None:
        _ctxmap.singleton = _ctxmap()
    return _ctxmap.singleton
# deprecated!!!
map = ctxmap


class Context(object):

    """'Context' for SSL connections."""

    m2_ssl_ctx_free = m2.ssl_ctx_free

    def __init__(self, protocol='tls', weak_crypto=None,
                 post_connection_check=None):
        # type: (str, Optional[int], Optional[Callable]) -> None
        proto = getattr(m2, protocol + '_method', None)
        if proto is None:
            # default is 'sslv23' for older versions of OpenSSL
            if protocol == 'tls':
                proto = getattr(m2, 'sslv23_method')
            else:
                raise ValueError("no such protocol '%s'" % protocol)
        self.ctx = m2.ssl_ctx_new(proto())
        self.allow_unknown_ca = 0  # type: Union[int, bool]
        self.post_connection_check = post_connection_check
        ctxmap()[int(self.ctx)] = self
        m2.ssl_ctx_set_cache_size(self.ctx, 128)
        if weak_crypto is None and protocol in ('sslv23', 'tls'):
            self.set_options(m2.SSL_OP_ALL | m2.SSL_OP_NO_SSLv2 |
                             m2.SSL_OP_NO_SSLv3)

    def __del__(self):
        # type: () -> None
        if getattr(self, 'ctx', None):
            self.m2_ssl_ctx_free(self.ctx)

    def close(self):
        # type: () -> None
        del ctxmap()[int(self.ctx)]

    def load_cert(self, certfile, keyfile=None,
                  callback=util.passphrase_callback):
        # type: (AnyStr, Optional[AnyStr], Callable) -> None
        """Load certificate and private key into the context.

        :param certfile: File that contains the PEM-encoded certificate.
        :param keyfile:  File that contains the PEM-encoded private key.
                         Default value of None indicates that the private key
                         is to be found in 'certfile'.
        :param callback: Callable object to be invoked if the private key is
                         passphrase-protected. Default callback provides a
                         simple terminal-style input for the passphrase.
        """
        m2.ssl_ctx_passphrase_callback(self.ctx, callback)
        m2.ssl_ctx_use_cert(self.ctx, certfile)
        if not keyfile:
            keyfile = certfile
        m2.ssl_ctx_use_privkey(self.ctx, keyfile)
        if not m2.ssl_ctx_check_privkey(self.ctx):
            raise ValueError('public/private key mismatch')

    def load_cert_chain(self, certchainfile, keyfile=None,
                        callback=util.passphrase_callback):
        # type: (AnyStr, Optional[AnyStr], Callable) -> None
        """Load certificate chain and private key into the context.

        :param certchainfile: File object containing the PEM-encoded
                              certificate chain.
        :param keyfile:       File object containing the PEM-encoded private
                              key. Default value of None indicates that the
                              private key is to be found in 'certchainfile'.
        :param callback:      Callable object to be invoked if the private key
                              is passphrase-protected. Default callback
                              provides a simple terminal-style input for the
                              passphrase.
        """
        m2.ssl_ctx_passphrase_callback(self.ctx, callback)
        m2.ssl_ctx_use_cert_chain(self.ctx, certchainfile)
        if not keyfile:
            keyfile = certchainfile
        m2.ssl_ctx_use_privkey(self.ctx, keyfile)
        if not m2.ssl_ctx_check_privkey(self.ctx):
            raise ValueError('public/private key mismatch')

    def set_client_CA_list_from_file(self, cafile):
        # type: (AnyStr) -> None
        """Load CA certs into the context. These CA certs are sent to the
        peer during *SSLv3 certificate request*.

        :param cafile: File object containing one or more PEM-encoded CA
                       certificates concatenated together.
        """
        m2.ssl_ctx_set_client_CA_list_from_file(self.ctx, cafile)

    # Deprecated.
    load_client_CA = load_client_ca = set_client_CA_list_from_file

    def load_verify_locations(self, cafile=None, capath=None):
        # type: (Optional[AnyStr], Optional[AnyStr]) -> int
        """Load CA certs into the context.

        These CA certs are used during verification of the peer's
        certificate.

        :param cafile: File containing one or more PEM-encoded CA
                       certificates concatenated together.

        :param capath: Directory containing PEM-encoded CA certificates
                       (one certificate per file).

        :return: 0 if the operation failed because CAfile and CApath are NULL
                  or the processing at one of the locations specified failed.
                  Check the error stack to find out the reason.

                1 The operation succeeded.
        """
        if cafile is None and capath is None:
            raise ValueError("cafile and capath can not both be None.")
        return m2.ssl_ctx_load_verify_locations(self.ctx, cafile, capath)

    # Deprecated.
    load_verify_info = load_verify_locations

    def set_session_id_ctx(self, id):
        # type: (bytes) -> None
        """Sets the session id for the SSL.Context w/in a session can be reused.

        :param id: Sessions are generated within a certain context. When
                   exporting/importing sessions with
                   i2d_SSL_SESSION/d2i_SSL_SESSION it would be possible,
                   to re-import a session generated from another context
                   (e.g. another application), which might lead to
                   malfunctions. Therefore each application must set its
                   own session id context sid_ctx which is used to
                   distinguish the contexts and is stored in exported
                   sessions. The sid_ctx can be any kind of binary data
                   with a given length, it is therefore possible to use
                   e.g. the name of the application and/or the hostname
                   and/or service name.
        """
        ret = m2.ssl_ctx_set_session_id_context(self.ctx, id)
        if not ret:
            raise Err.SSLError(Err.get_error_code(), '')

    def set_default_verify_paths(self):
        # type: () -> int
        """
        Specifies that the default locations from which CA certs are
        loaded should be used.

        There is one default directory and one default file. The default
        CA certificates directory is called "certs" in the default
        OpenSSL directory. Alternatively the SSL_CERT_DIR environment
        variable can be defined to override this location. The default
        CA certificates file is called "cert.pem" in the default OpenSSL
        directory. Alternatively the SSL_CERT_FILE environment variable
        can be defined to override this location.

        @return 0 if the operation failed. A missing default location is
                  still treated as a success. No error code is set.

                1 The operation succeeded.
        """
        ret = m2.ssl_ctx_set_default_verify_paths(self.ctx)
        if not ret:
            raise ValueError('Cannot use default SSL certificate store!')

    def set_allow_unknown_ca(self, ok):
        # type: (Union[int, bool]) -> None
        """Set the context to accept/reject a peer certificate if the
        certificate's CA is unknown.

        :param ok:       True to accept, False to reject.
        """
        self.allow_unknown_ca = ok

    def get_allow_unknown_ca(self):
        # type: () -> Union[int, bool]
        """Get the context's setting that accepts/rejects a peer
        certificate if the certificate's CA is unknown.

        FIXME 2Bconverted to bool
        """
        return self.allow_unknown_ca

    def set_verify(self, mode, depth, callback=None):
        # type: (int, int, Optional[Callable]) -> None
        """
        Set verify options. Most applications will need to call this
        method with the right options to make a secure SSL connection.

        :param mode:     The verification mode to use. Typically at least
                         SSL.verify_peer is used. Clients would also typically
                         add SSL.verify_fail_if_no_peer_cert.
        :param depth:    The maximum allowed depth of the certificate chain
                         returned by the peer.
        :param callback: Callable that can be used to specify custom
                         verification checks.
        """
        if callback is None:
            m2.ssl_ctx_set_verify_default(self.ctx, mode)
        else:
            m2.ssl_ctx_set_verify(self.ctx, mode, callback)
        m2.ssl_ctx_set_verify_depth(self.ctx, depth)

    def get_verify_mode(self):
        # type: () -> int
        return m2.ssl_ctx_get_verify_mode(self.ctx)

    def get_verify_depth(self):
        # type: () -> int
        """Returns the verification mode currently set in the SSL Context."""
        return m2.ssl_ctx_get_verify_depth(self.ctx)

    def set_tmp_dh(self, dhpfile):
        # type: (AnyStr) -> int
        """Load ephemeral DH parameters into the context.

        :param dhpfile: Filename of the file containing the PEM-encoded
                        DH parameters.
        """
        f = BIO.openfile(dhpfile)
        dhp = m2.dh_read_parameters(f.bio_ptr())
        return m2.ssl_ctx_set_tmp_dh(self.ctx, dhp)

    def set_tmp_dh_callback(self, callback=None):
        # type: (Optional[Callable]) -> None
        """Sets the callback function for SSL.Context.

        :param callback: Callable to be used when a DH parameters are required.
        """
        if callback is not None:
            m2.ssl_ctx_set_tmp_dh_callback(self.ctx, callback)

    def set_tmp_rsa(self, rsa):
        # type: (RSA.RSA) -> int
        """Load ephemeral RSA key into the context.

        :param rsa: RSA.RSA instance.
        """
        if isinstance(rsa, RSA.RSA):
            return m2.ssl_ctx_set_tmp_rsa(self.ctx, rsa.rsa)
        else:
            raise TypeError("Expected an instance of RSA.RSA, got %s." % rsa)

    def set_tmp_rsa_callback(self, callback=None):
        # type: (Optional[Callable]) -> None
        """Sets the callback function to be used when
        a temporary/ephemeral RSA key is required.
        """
        if callback is not None:
            m2.ssl_ctx_set_tmp_rsa_callback(self.ctx, callback)

    def set_info_callback(self, callback=cb.ssl_info_callback):
        # type: (Callable) -> None
        """Set a callback function to get state information.

        It can be used to get state information about the SSL
        connections that are created from this context.

        :param callback: Callback function. The default prints
                         information to stderr.
        """
        m2.ssl_ctx_set_info_callback(self.ctx, callback)

    def set_cipher_list(self, cipher_list):
        # type: (str) -> int
        """Sets the list of available ciphers.

        :param cipher_list: The format of the string is described in
                            ciphers(1).
        :return: 1 if any cipher could be selected and 0 on complete
                 failure.
        """
        return m2.ssl_ctx_set_cipher_list(self.ctx, cipher_list)

    def add_session(self, session):
        # type: (Session) -> int
        """Add the session to the context.

        :param session: the session to be added.

        :return: 0 The operation failed. It was tried to add the same
                   (identical) session twice.

                 1 The operation succeeded.
        """
        return m2.ssl_ctx_add_session(self.ctx, session._ptr())

    def remove_session(self, session):
        # type: (Session) -> int
        """Remove the session from the context.

        :param session: the session to be removed.

        :return: 0 The operation failed. The session was not found in
                   the cache.

                 1 The operation succeeded.
        """
        return m2.ssl_ctx_remove_session(self.ctx, session._ptr())

    def get_session_timeout(self):
        # type: () -> int
        """Get current session timeout.

        Whenever a new session is created, it is assigned a maximum
        lifetime.  This lifetime is specified by storing the creation
        time of the session and the timeout value valid at this time. If
        the actual time is later than creation time plus timeout, the
        session is not reused.

        Due to this realization, all sessions behave according to the
        timeout value valid at the time of the session negotiation.
        Changes of the timeout value do not affect already established
        sessions.

        Expired sessions are removed from the internal session cache,
        whenever SSL_CTX_flush_sessions(3) is called, either directly by
        the application or automatically (see
        SSL_CTX_set_session_cache_mode(3))

        The default value for session timeout is decided on a per
        protocol basis, see SSL_get_default_timeout(3).  All currently
        supported protocols have the same default timeout value of 300
        seconds.

        SSL_CTX_set_timeout() returns the previously set timeout value.

        :return: the currently set timeout value.
        """
        return m2.ssl_ctx_get_session_timeout(self.ctx)

    def set_session_timeout(self, timeout):
        # type: (int) -> int
        """Set new session timeout.

        See self.get_session_timeout() for explanation of the session
        timeouts.

        :param timeout: new timeout value.

        :return: the previously set timeout value.
        """
        return m2.ssl_ctx_set_session_timeout(self.ctx, timeout)

    def set_session_cache_mode(self, mode):
        # type: (int) -> int
        """Enables/disables session caching.

        The mode is set by using m2.SSL_SESS_CACHE_* constants.

        :param mode: new mode value.

        :return: the previously set cache mode value.
        """
        return m2.ssl_ctx_set_session_cache_mode(self.ctx, mode)

    def get_session_cache_mode(self):
        # type: () -> int
        """Gets the current session caching.

        The mode is set to m2.SSL_SESS_CACHE_* constants.

        :return: the previously set cache mode value.
        """
        return m2.ssl_ctx_get_session_cache_mode(self.ctx)

    def set_options(self, op):
        # type: (int) -> int
        """Adds the options set via bitmask in options to the Context.

        !!! Options already set before are not cleared!

        The behaviour of the SSL library can be changed by setting
        several options.  The options are coded as bitmasks and can be
        combined by a logical or operation (|).

        SSL.Context.set_options() and SSL.set_options() affect the
        (external) protocol behaviour of the SSL library. The (internal)
        behaviour of the API can be changed by using the similar
        SSL.Context.set_mode() and SSL.set_mode() functions.

        During a handshake, the option settings of the SSL object are
        used. When a new SSL object is created from a context using
        SSL(), the current option setting is copied. Changes to ctx
        do not affect already created SSL objects. SSL.clear() does not
        affect the settings.

        :param op: bitmask of additional options specified in
                   SSL_CTX_set_options(3) manpage.

        :return: the new options bitmask after adding options.
        """
        return m2.ssl_ctx_set_options(self.ctx, op)

    def get_cert_store(self):
        # type: () -> X509.X509
        """
        Get the certificate store associated with this context.

        :warning: The store is NOT refcounted, and as such can not be relied
                  to be valid once the context goes away or is changed.
        """
        return X509.X509_Store(m2.ssl_ctx_get_cert_store(self.ctx))