From 9a80576f9fa841d0045f90e495055f8c61d49496 Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Mon, 3 Aug 2020 22:47:37 -0500 Subject: remove npn support entirely. you should be using alpn (#932) * remove npn support entirely. you should be using alpn * flake8 --- CHANGELOG.rst | 1 + src/OpenSSL/SSL.py | 180 --------------------------------------------------- tests/test_ssl.py | 184 ----------------------------------------------------- 3 files changed, 1 insertion(+), 364 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f7c2c94..d188dc6 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,7 @@ Backward-incompatible changes: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - Remove deprecated ``OpenSSL.tsafe`` module. +- Removed deprecated ``OpenSSL.SSL.Context.set_npn_advertise_callback``, ``OpenSSL.SSL.Context.set_npn_select_callback``, and ``OpenSSL.SSL.Connection.get_next_proto_negotiated``. - Drop support for Python 3.4 - Drop support for OpenSSL 1.0.1 diff --git a/src/OpenSSL/SSL.py b/src/OpenSSL/SSL.py index ed20d30..d809743 100644 --- a/src/OpenSSL/SSL.py +++ b/src/OpenSSL/SSL.py @@ -1,6 +1,5 @@ import os import socket -import warnings from sys import platform from functools import wraps, partial from itertools import count, chain @@ -342,98 +341,6 @@ class _VerifyHelper(_CallbackExceptionHelper): ) -class _NpnAdvertiseHelper(_CallbackExceptionHelper): - """ - Wrap a callback such that it can be used as an NPN advertisement callback. - """ - - def __init__(self, callback): - _CallbackExceptionHelper.__init__(self) - - @wraps(callback) - def wrapper(ssl, out, outlen, arg): - try: - conn = Connection._reverse_mapping[ssl] - protos = callback(conn) - - # Join the protocols into a Python bytestring, length-prefixing - # each element. - protostr = b"".join( - chain.from_iterable((int2byte(len(p)), p) for p in protos) - ) - - # Save our callback arguments on the connection object. This is - # done to make sure that they don't get freed before OpenSSL - # uses them. Then, return them appropriately in the output - # parameters. - conn._npn_advertise_callback_args = [ - _ffi.new("unsigned int *", len(protostr)), - _ffi.new("unsigned char[]", protostr), - ] - outlen[0] = conn._npn_advertise_callback_args[0][0] - out[0] = conn._npn_advertise_callback_args[1] - return 0 - except Exception as e: - self._problems.append(e) - return 2 # SSL_TLSEXT_ERR_ALERT_FATAL - - self.callback = _ffi.callback( - "int (*)(SSL *, const unsigned char **, unsigned int *, void *)", - wrapper, - ) - - -class _NpnSelectHelper(_CallbackExceptionHelper): - """ - Wrap a callback such that it can be used as an NPN selection callback. - """ - - def __init__(self, callback): - _CallbackExceptionHelper.__init__(self) - - @wraps(callback) - def wrapper(ssl, out, outlen, in_, inlen, arg): - try: - conn = Connection._reverse_mapping[ssl] - - # The string passed to us is actually made up of multiple - # length-prefixed bytestrings. We need to split that into a - # list. - instr = _ffi.buffer(in_, inlen)[:] - protolist = [] - while instr: - length = indexbytes(instr, 0) - proto = instr[1 : length + 1] - protolist.append(proto) - instr = instr[length + 1 :] - - # Call the callback - outstr = callback(conn, protolist) - - # Save our callback arguments on the connection object. This is - # done to make sure that they don't get freed before OpenSSL - # uses them. Then, return them appropriately in the output - # parameters. - conn._npn_select_callback_args = [ - _ffi.new("unsigned char *", len(outstr)), - _ffi.new("unsigned char[]", outstr), - ] - outlen[0] = conn._npn_select_callback_args[0][0] - out[0] = conn._npn_select_callback_args[1] - return 0 - except Exception as e: - self._problems.append(e) - return 2 # SSL_TLSEXT_ERR_ALERT_FATAL - - self.callback = _ffi.callback( - ( - "int (*)(SSL *, unsigned char **, unsigned char *, " - "const unsigned char *, unsigned int, void *)" - ), - wrapper, - ) - - NO_OVERLAPPING_PROTOCOLS = object() @@ -653,14 +560,6 @@ def SSLeay_version(type): return _ffi.string(_lib.SSLeay_version(type)) -def _warn_npn(): - warnings.warn( - "NPN is deprecated. Protocols should switch to using ALPN.", - DeprecationWarning, - stacklevel=3, - ) - - def _make_requires(flag, error): """ Builds a decorator that ensures that functions that rely on OpenSSL @@ -686,11 +585,6 @@ def _make_requires(flag, error): return _requires_decorator -_requires_npn = _make_requires( - _lib.Cryptography_HAS_NEXTPROTONEG, "NPN not available" -) - - _requires_alpn = _make_requires( _lib.Cryptography_HAS_ALPN, "ALPN not available" ) @@ -768,10 +662,6 @@ class Context(object): self._keylog_callback = None self._tlsext_servername_callback = None self._app_data = None - self._npn_advertise_helper = None - self._npn_advertise_callback = None - self._npn_select_helper = None - self._npn_select_callback = None self._alpn_select_helper = None self._alpn_select_callback = None self._ocsp_helper = None @@ -1467,47 +1357,6 @@ class Context(object): _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0 ) - @_requires_npn - def set_npn_advertise_callback(self, callback): - """ - Specify a callback function that will be called when offering `Next - Protocol Negotiation - `_ as a server. - - :param callback: The callback function. It will be invoked with one - argument, the :class:`Connection` instance. It should return a - list of bytestrings representing the advertised protocols, like - ``[b'http/1.1', b'spdy/2']``. - - .. versionadded:: 0.15 - """ - _warn_npn() - self._npn_advertise_helper = _NpnAdvertiseHelper(callback) - self._npn_advertise_callback = self._npn_advertise_helper.callback - _lib.SSL_CTX_set_next_protos_advertised_cb( - self._context, self._npn_advertise_callback, _ffi.NULL - ) - - @_requires_npn - def set_npn_select_callback(self, callback): - """ - Specify a callback function that will be called when a server offers - Next Protocol Negotiation options. - - :param callback: The callback function. It will be invoked with two - arguments: the Connection, and a list of offered protocols as - bytestrings, e.g. ``[b'http/1.1', b'spdy/2']``. It should return - one of those bytestrings, the chosen protocol. - - .. versionadded:: 0.15 - """ - _warn_npn() - self._npn_select_helper = _NpnSelectHelper(callback) - self._npn_select_callback = self._npn_select_helper.callback - _lib.SSL_CTX_set_next_proto_select_cb( - self._context, self._npn_select_callback, _ffi.NULL - ) - @_requires_alpn def set_alpn_protos(self, protos): """ @@ -1637,13 +1486,6 @@ class Connection(object): self._context = context self._app_data = None - # References to strings used for Next Protocol Negotiation. OpenSSL's - # header files suggest that these might get copied at some point, but - # doesn't specify when, so we store them here to make sure they don't - # get freed before OpenSSL uses them. - self._npn_advertise_callback_args = None - self._npn_select_callback_args = None - # References to strings used for Application Layer Protocol # Negotiation. These strings get copied at some point but it's well # after the callback returns, so we have to hang them somewhere to @@ -1687,10 +1529,6 @@ class Connection(object): def _raise_ssl_error(self, ssl, result): if self._context._verify_helper is not None: self._context._verify_helper.raise_if_problem() - if self._context._npn_advertise_helper is not None: - self._context._npn_advertise_helper.raise_if_problem() - if self._context._npn_select_helper is not None: - self._context._npn_select_helper.raise_if_problem() if self._context._alpn_select_helper is not None: self._context._alpn_select_helper.raise_if_problem() if self._context._ocsp_helper is not None: @@ -2505,24 +2343,6 @@ class Connection(object): version = _lib.SSL_version(self._ssl) return version - @_requires_npn - def get_next_proto_negotiated(self): - """ - Get the protocol that was negotiated by NPN. - - :returns: A bytestring of the protocol name. If no protocol has been - negotiated yet, returns an empty string. - - .. versionadded:: 0.15 - """ - _warn_npn() - data = _ffi.new("unsigned char **") - data_len = _ffi.new("unsigned int *") - - _lib.SSL_get0_next_proto_negotiated(self._ssl, data, data_len) - - return _ffi.buffer(data[0], data_len[0])[:] - @_requires_alpn def set_alpn_protos(self, protos): """ diff --git a/tests/test_ssl.py b/tests/test_ssl.py index daaafae..8f34a4d 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -1764,190 +1764,6 @@ class TestServerNameCallback(object): assert args == [(server, b"foo1.example.com")] -@pytest.mark.skipif( - not _lib.Cryptography_HAS_NEXTPROTONEG, reason="NPN is not available" -) -class TestNextProtoNegotiation(object): - """ - Test for Next Protocol Negotiation in PyOpenSSL. - """ - - def test_npn_success(self): - """ - Tests that clients and servers that agree on the negotiated next - protocol can correct establish a connection, and that the agreed - protocol is reported by the connections. - """ - advertise_args = [] - select_args = [] - - def advertise(conn): - advertise_args.append((conn,)) - return [b"http/1.1", b"spdy/2"] - - def select(conn, options): - select_args.append((conn, options)) - return b"spdy/2" - - server_context = Context(SSLv23_METHOD) - server_context.set_npn_advertise_callback(advertise) - - client_context = Context(SSLv23_METHOD) - client_context.set_npn_select_callback(select) - - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem) - ) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem) - ) - - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() - - client = Connection(client_context, None) - client.set_connect_state() - - interact_in_memory(server, client) - - assert advertise_args == [(server,)] - assert select_args == [(client, [b"http/1.1", b"spdy/2"])] - - assert server.get_next_proto_negotiated() == b"spdy/2" - assert client.get_next_proto_negotiated() == b"spdy/2" - - def test_npn_client_fail(self): - """ - Tests that when clients and servers cannot agree on what protocol - to use next that the TLS connection does not get established. - """ - advertise_args = [] - select_args = [] - - def advertise(conn): - advertise_args.append((conn,)) - return [b"http/1.1", b"spdy/2"] - - def select(conn, options): - select_args.append((conn, options)) - return b"" - - server_context = Context(SSLv23_METHOD) - server_context.set_npn_advertise_callback(advertise) - - client_context = Context(SSLv23_METHOD) - client_context.set_npn_select_callback(select) - - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem) - ) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem) - ) - - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() - - client = Connection(client_context, None) - client.set_connect_state() - - # If the client doesn't return anything, the connection will fail. - with pytest.raises(Error): - interact_in_memory(server, client) - - assert advertise_args == [(server,)] - assert select_args == [(client, [b"http/1.1", b"spdy/2"])] - - def test_npn_select_error(self): - """ - Test that we can handle exceptions in the select callback. If - select fails it should be fatal to the connection. - """ - advertise_args = [] - - def advertise(conn): - advertise_args.append((conn,)) - return [b"http/1.1", b"spdy/2"] - - def select(conn, options): - raise TypeError - - server_context = Context(SSLv23_METHOD) - server_context.set_npn_advertise_callback(advertise) - - client_context = Context(SSLv23_METHOD) - client_context.set_npn_select_callback(select) - - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem) - ) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem) - ) - - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() - - client = Connection(client_context, None) - client.set_connect_state() - - # If the callback throws an exception it should be raised here. - with pytest.raises(TypeError): - interact_in_memory(server, client) - assert advertise_args == [ - (server,), - ] - - def test_npn_advertise_error(self): - """ - Test that we can handle exceptions in the advertise callback. If - advertise fails no NPN is advertised to the client. - """ - select_args = [] - - def advertise(conn): - raise TypeError - - def select(conn, options): # pragma: nocover - """ - Assert later that no args are actually appended. - """ - select_args.append((conn, options)) - return b"" - - server_context = Context(SSLv23_METHOD) - server_context.set_npn_advertise_callback(advertise) - - client_context = Context(SSLv23_METHOD) - client_context.set_npn_select_callback(select) - - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem) - ) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem) - ) - - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() - - client = Connection(client_context, None) - client.set_connect_state() - - # If the client doesn't return anything, the connection will fail. - with pytest.raises(TypeError): - interact_in_memory(server, client) - assert select_args == [] - - class TestApplicationLayerProtoNegotiation(object): """ Tests for ALPN in PyOpenSSL. -- cgit v1.2.1