diff options
Diffstat (limited to 'tests/test_ssl.py')
-rw-r--r-- | tests/test_ssl.py | 395 |
1 files changed, 187 insertions, 208 deletions
diff --git a/tests/test_ssl.py b/tests/test_ssl.py index 9a3f294..50e2026 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -1846,277 +1846,256 @@ class TestApplicationLayerProtoNegotiation(object): """ Tests for ALPN in PyOpenSSL. """ - # Skip tests on versions that don't support ALPN. - if _lib.Cryptography_HAS_ALPN: - - def test_alpn_success(self): - """ - Clients and servers that agree on the negotiated ALPN protocol can - correct establish a connection, and the agreed protocol is reported - by the connections. - """ - select_args = [] - - def select(conn, options): - select_args.append((conn, options)) - return b'spdy/2' + def test_alpn_success(self): + """ + Clients and servers that agree on the negotiated ALPN protocol can + correct establish a connection, and the agreed protocol is reported + by the connections. + """ + select_args = [] - client_context = Context(TLSv1_METHOD) - client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + def select(conn, options): + select_args.append((conn, options)) + return b'spdy/2' - server_context = Context(TLSv1_METHOD) - server_context.set_alpn_select_callback(select) + client_context = Context(TLSv1_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem)) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem)) + server_context = Context(TLSv1_METHOD) + server_context.set_alpn_select_callback(select) - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) - client = Connection(client_context, None) - client.set_connect_state() + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() - interact_in_memory(server, client) + client = Connection(client_context, None) + client.set_connect_state() - assert select_args == [(server, [b'http/1.1', b'spdy/2'])] + interact_in_memory(server, client) - assert server.get_alpn_proto_negotiated() == b'spdy/2' - assert client.get_alpn_proto_negotiated() == b'spdy/2' + assert select_args == [(server, [b'http/1.1', b'spdy/2'])] - def test_alpn_set_on_connection(self): - """ - The same as test_alpn_success, but setting the ALPN protocols on - the connection rather than the context. - """ - select_args = [] + assert server.get_alpn_proto_negotiated() == b'spdy/2' + assert client.get_alpn_proto_negotiated() == b'spdy/2' - def select(conn, options): - select_args.append((conn, options)) - return b'spdy/2' + def test_alpn_set_on_connection(self): + """ + The same as test_alpn_success, but setting the ALPN protocols on + the connection rather than the context. + """ + select_args = [] - # Setup the client context but don't set any ALPN protocols. - client_context = Context(TLSv1_METHOD) + def select(conn, options): + select_args.append((conn, options)) + return b'spdy/2' - server_context = Context(TLSv1_METHOD) - server_context.set_alpn_select_callback(select) + # Setup the client context but don't set any ALPN protocols. + client_context = Context(TLSv1_METHOD) - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem)) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem)) + server_context = Context(TLSv1_METHOD) + server_context.set_alpn_select_callback(select) - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) - # Set the ALPN protocols on the client connection. - client = Connection(client_context, None) - client.set_alpn_protos([b'http/1.1', b'spdy/2']) - client.set_connect_state() + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() - interact_in_memory(server, client) + # Set the ALPN protocols on the client connection. + client = Connection(client_context, None) + client.set_alpn_protos([b'http/1.1', b'spdy/2']) + client.set_connect_state() - assert select_args == [(server, [b'http/1.1', b'spdy/2'])] + interact_in_memory(server, client) - assert server.get_alpn_proto_negotiated() == b'spdy/2' - assert client.get_alpn_proto_negotiated() == b'spdy/2' + assert select_args == [(server, [b'http/1.1', b'spdy/2'])] - def test_alpn_server_fail(self): - """ - When clients and servers cannot agree on what protocol to use next - the TLS connection does not get established. - """ - select_args = [] + assert server.get_alpn_proto_negotiated() == b'spdy/2' + assert client.get_alpn_proto_negotiated() == b'spdy/2' - def select(conn, options): - select_args.append((conn, options)) - return b'' + def test_alpn_server_fail(self): + """ + When clients and servers cannot agree on what protocol to use next + the TLS connection does not get established. + """ + select_args = [] - client_context = Context(TLSv1_METHOD) - client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + def select(conn, options): + select_args.append((conn, options)) + return b'' - server_context = Context(TLSv1_METHOD) - server_context.set_alpn_select_callback(select) + client_context = Context(TLSv1_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem)) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem)) + server_context = Context(TLSv1_METHOD) + server_context.set_alpn_select_callback(select) - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) - client = Connection(client_context, None) - client.set_connect_state() + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() - # If the client doesn't return anything, the connection will fail. - with pytest.raises(Error): - interact_in_memory(server, client) + client = Connection(client_context, None) + client.set_connect_state() - assert select_args == [(server, [b'http/1.1', b'spdy/2'])] + # If the client doesn't return anything, the connection will fail. + with pytest.raises(Error): + interact_in_memory(server, client) - def test_alpn_no_server_overlap(self): - """ - A server can allow a TLS handshake to complete without - agreeing to an application protocol by returning - ``NO_OVERLAPPING_PROTOCOLS``. - """ - refusal_args = [] + assert select_args == [(server, [b'http/1.1', b'spdy/2'])] - def refusal(conn, options): - refusal_args.append((conn, options)) - return NO_OVERLAPPING_PROTOCOLS + def test_alpn_no_server_overlap(self): + """ + A server can allow a TLS handshake to complete without + agreeing to an application protocol by returning + ``NO_OVERLAPPING_PROTOCOLS``. + """ + refusal_args = [] - client_context = Context(SSLv23_METHOD) - client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + def refusal(conn, options): + refusal_args.append((conn, options)) + return NO_OVERLAPPING_PROTOCOLS - server_context = Context(SSLv23_METHOD) - server_context.set_alpn_select_callback(refusal) + client_context = Context(SSLv23_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem)) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem)) + server_context = Context(SSLv23_METHOD) + server_context.set_alpn_select_callback(refusal) - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) - client = Connection(client_context, None) - client.set_connect_state() + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() - # Do the dance. - interact_in_memory(server, client) + client = Connection(client_context, None) + client.set_connect_state() - assert refusal_args == [(server, [b'http/1.1', b'spdy/2'])] + # Do the dance. + interact_in_memory(server, client) - assert client.get_alpn_proto_negotiated() == b'' + assert refusal_args == [(server, [b'http/1.1', b'spdy/2'])] - def test_alpn_select_cb_returns_invalid_value(self): - """ - If the ALPN selection callback returns anything other than - a bytestring or ``NO_OVERLAPPING_PROTOCOLS``, a - :py:exc:`TypeError` is raised. - """ - invalid_cb_args = [] + assert client.get_alpn_proto_negotiated() == b'' - def invalid_cb(conn, options): - invalid_cb_args.append((conn, options)) - return u"can't return unicode" + def test_alpn_select_cb_returns_invalid_value(self): + """ + If the ALPN selection callback returns anything other than + a bytestring or ``NO_OVERLAPPING_PROTOCOLS``, a + :py:exc:`TypeError` is raised. + """ + invalid_cb_args = [] - client_context = Context(SSLv23_METHOD) - client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + def invalid_cb(conn, options): + invalid_cb_args.append((conn, options)) + return u"can't return unicode" - server_context = Context(SSLv23_METHOD) - server_context.set_alpn_select_callback(invalid_cb) + client_context = Context(SSLv23_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem)) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem)) + server_context = Context(SSLv23_METHOD) + server_context.set_alpn_select_callback(invalid_cb) - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) - client = Connection(client_context, None) - client.set_connect_state() + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() - # Do the dance. - with pytest.raises(TypeError): - interact_in_memory(server, client) + client = Connection(client_context, None) + client.set_connect_state() - assert invalid_cb_args == [(server, [b'http/1.1', b'spdy/2'])] + # Do the dance. + with pytest.raises(TypeError): + interact_in_memory(server, client) - assert client.get_alpn_proto_negotiated() == b'' + assert invalid_cb_args == [(server, [b'http/1.1', b'spdy/2'])] - def test_alpn_no_server(self): - """ - When clients and servers cannot agree on what protocol to use next - because the server doesn't offer ALPN, no protocol is negotiated. - """ - client_context = Context(TLSv1_METHOD) - client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + assert client.get_alpn_proto_negotiated() == b'' - server_context = Context(TLSv1_METHOD) + def test_alpn_no_server(self): + """ + When clients and servers cannot agree on what protocol to use next + because the server doesn't offer ALPN, no protocol is negotiated. + """ + client_context = Context(TLSv1_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem)) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem)) + server_context = Context(TLSv1_METHOD) - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) - client = Connection(client_context, None) - client.set_connect_state() + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() - # Do the dance. - interact_in_memory(server, client) + client = Connection(client_context, None) + client.set_connect_state() - assert client.get_alpn_proto_negotiated() == b'' + # Do the dance. + interact_in_memory(server, client) - def test_alpn_callback_exception(self): - """ - We can handle exceptions in the ALPN select callback. - """ - select_args = [] + assert client.get_alpn_proto_negotiated() == b'' - def select(conn, options): - select_args.append((conn, options)) - raise TypeError() + def test_alpn_callback_exception(self): + """ + We can handle exceptions in the ALPN select callback. + """ + select_args = [] - client_context = Context(TLSv1_METHOD) - client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) + def select(conn, options): + select_args.append((conn, options)) + raise TypeError() - server_context = Context(TLSv1_METHOD) - server_context.set_alpn_select_callback(select) + client_context = Context(TLSv1_METHOD) + client_context.set_alpn_protos([b'http/1.1', b'spdy/2']) - # Necessary to actually accept the connection - server_context.use_privatekey( - load_privatekey(FILETYPE_PEM, server_key_pem)) - server_context.use_certificate( - load_certificate(FILETYPE_PEM, server_cert_pem)) + server_context = Context(TLSv1_METHOD) + server_context.set_alpn_select_callback(select) - # Do a little connection to trigger the logic - server = Connection(server_context, None) - server.set_accept_state() + # Necessary to actually accept the connection + server_context.use_privatekey( + load_privatekey(FILETYPE_PEM, server_key_pem)) + server_context.use_certificate( + load_certificate(FILETYPE_PEM, server_cert_pem)) - client = Connection(client_context, None) - client.set_connect_state() + # Do a little connection to trigger the logic + server = Connection(server_context, None) + server.set_accept_state() - with pytest.raises(TypeError): - interact_in_memory(server, client) - assert select_args == [(server, [b'http/1.1', b'spdy/2'])] + client = Connection(client_context, None) + client.set_connect_state() - else: - # No ALPN. - def test_alpn_not_implemented(self): - """ - If ALPN is not in OpenSSL, we should raise NotImplementedError. - """ - # Test the context methods first. - context = Context(TLSv1_METHOD) - with pytest.raises(NotImplementedError): - context.set_alpn_protos(None) - with pytest.raises(NotImplementedError): - context.set_alpn_select_callback(None) - - # Now test a connection. - conn = Connection(context) - with pytest.raises(NotImplementedError): - conn.set_alpn_protos(None) + with pytest.raises(TypeError): + interact_in_memory(server, client) + assert select_args == [(server, [b'http/1.1', b'spdy/2'])] class TestSession(object): |