summaryrefslogtreecommitdiff
path: root/tests/test_ssl.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_ssl.py')
-rw-r--r--tests/test_ssl.py395
1 files changed, 187 insertions, 208 deletions
diff --git a/tests/test_ssl.py b/tests/test_ssl.py
index 9a3f294..50e2026 100644
--- a/tests/test_ssl.py
+++ b/tests/test_ssl.py
@@ -1846,277 +1846,256 @@ class TestApplicationLayerProtoNegotiation(object):
"""
Tests for ALPN in PyOpenSSL.
"""
- # Skip tests on versions that don't support ALPN.
- if _lib.Cryptography_HAS_ALPN:
-
- def test_alpn_success(self):
- """
- Clients and servers that agree on the negotiated ALPN protocol can
- correct establish a connection, and the agreed protocol is reported
- by the connections.
- """
- select_args = []
-
- def select(conn, options):
- select_args.append((conn, options))
- return b'spdy/2'
+ def test_alpn_success(self):
+ """
+ Clients and servers that agree on the negotiated ALPN protocol can
+ correct establish a connection, and the agreed protocol is reported
+ by the connections.
+ """
+ select_args = []
- client_context = Context(TLSv1_METHOD)
- client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
+ def select(conn, options):
+ select_args.append((conn, options))
+ return b'spdy/2'
- server_context = Context(TLSv1_METHOD)
- server_context.set_alpn_select_callback(select)
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
- # Necessary to actually accept the connection
- server_context.use_privatekey(
- load_privatekey(FILETYPE_PEM, server_key_pem))
- server_context.use_certificate(
- load_certificate(FILETYPE_PEM, server_cert_pem))
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_alpn_select_callback(select)
- # Do a little connection to trigger the logic
- server = Connection(server_context, None)
- server.set_accept_state()
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
- client = Connection(client_context, None)
- client.set_connect_state()
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
- interact_in_memory(server, client)
+ client = Connection(client_context, None)
+ client.set_connect_state()
- assert select_args == [(server, [b'http/1.1', b'spdy/2'])]
+ interact_in_memory(server, client)
- assert server.get_alpn_proto_negotiated() == b'spdy/2'
- assert client.get_alpn_proto_negotiated() == b'spdy/2'
+ assert select_args == [(server, [b'http/1.1', b'spdy/2'])]
- def test_alpn_set_on_connection(self):
- """
- The same as test_alpn_success, but setting the ALPN protocols on
- the connection rather than the context.
- """
- select_args = []
+ assert server.get_alpn_proto_negotiated() == b'spdy/2'
+ assert client.get_alpn_proto_negotiated() == b'spdy/2'
- def select(conn, options):
- select_args.append((conn, options))
- return b'spdy/2'
+ def test_alpn_set_on_connection(self):
+ """
+ The same as test_alpn_success, but setting the ALPN protocols on
+ the connection rather than the context.
+ """
+ select_args = []
- # Setup the client context but don't set any ALPN protocols.
- client_context = Context(TLSv1_METHOD)
+ def select(conn, options):
+ select_args.append((conn, options))
+ return b'spdy/2'
- server_context = Context(TLSv1_METHOD)
- server_context.set_alpn_select_callback(select)
+ # Setup the client context but don't set any ALPN protocols.
+ client_context = Context(TLSv1_METHOD)
- # Necessary to actually accept the connection
- server_context.use_privatekey(
- load_privatekey(FILETYPE_PEM, server_key_pem))
- server_context.use_certificate(
- load_certificate(FILETYPE_PEM, server_cert_pem))
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_alpn_select_callback(select)
- # Do a little connection to trigger the logic
- server = Connection(server_context, None)
- server.set_accept_state()
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
- # Set the ALPN protocols on the client connection.
- client = Connection(client_context, None)
- client.set_alpn_protos([b'http/1.1', b'spdy/2'])
- client.set_connect_state()
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
- interact_in_memory(server, client)
+ # Set the ALPN protocols on the client connection.
+ client = Connection(client_context, None)
+ client.set_alpn_protos([b'http/1.1', b'spdy/2'])
+ client.set_connect_state()
- assert select_args == [(server, [b'http/1.1', b'spdy/2'])]
+ interact_in_memory(server, client)
- assert server.get_alpn_proto_negotiated() == b'spdy/2'
- assert client.get_alpn_proto_negotiated() == b'spdy/2'
+ assert select_args == [(server, [b'http/1.1', b'spdy/2'])]
- def test_alpn_server_fail(self):
- """
- When clients and servers cannot agree on what protocol to use next
- the TLS connection does not get established.
- """
- select_args = []
+ assert server.get_alpn_proto_negotiated() == b'spdy/2'
+ assert client.get_alpn_proto_negotiated() == b'spdy/2'
- def select(conn, options):
- select_args.append((conn, options))
- return b''
+ def test_alpn_server_fail(self):
+ """
+ When clients and servers cannot agree on what protocol to use next
+ the TLS connection does not get established.
+ """
+ select_args = []
- client_context = Context(TLSv1_METHOD)
- client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
+ def select(conn, options):
+ select_args.append((conn, options))
+ return b''
- server_context = Context(TLSv1_METHOD)
- server_context.set_alpn_select_callback(select)
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
- # Necessary to actually accept the connection
- server_context.use_privatekey(
- load_privatekey(FILETYPE_PEM, server_key_pem))
- server_context.use_certificate(
- load_certificate(FILETYPE_PEM, server_cert_pem))
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_alpn_select_callback(select)
- # Do a little connection to trigger the logic
- server = Connection(server_context, None)
- server.set_accept_state()
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
- client = Connection(client_context, None)
- client.set_connect_state()
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
- # If the client doesn't return anything, the connection will fail.
- with pytest.raises(Error):
- interact_in_memory(server, client)
+ client = Connection(client_context, None)
+ client.set_connect_state()
- assert select_args == [(server, [b'http/1.1', b'spdy/2'])]
+ # If the client doesn't return anything, the connection will fail.
+ with pytest.raises(Error):
+ interact_in_memory(server, client)
- def test_alpn_no_server_overlap(self):
- """
- A server can allow a TLS handshake to complete without
- agreeing to an application protocol by returning
- ``NO_OVERLAPPING_PROTOCOLS``.
- """
- refusal_args = []
+ assert select_args == [(server, [b'http/1.1', b'spdy/2'])]
- def refusal(conn, options):
- refusal_args.append((conn, options))
- return NO_OVERLAPPING_PROTOCOLS
+ def test_alpn_no_server_overlap(self):
+ """
+ A server can allow a TLS handshake to complete without
+ agreeing to an application protocol by returning
+ ``NO_OVERLAPPING_PROTOCOLS``.
+ """
+ refusal_args = []
- client_context = Context(SSLv23_METHOD)
- client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
+ def refusal(conn, options):
+ refusal_args.append((conn, options))
+ return NO_OVERLAPPING_PROTOCOLS
- server_context = Context(SSLv23_METHOD)
- server_context.set_alpn_select_callback(refusal)
+ client_context = Context(SSLv23_METHOD)
+ client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
- # Necessary to actually accept the connection
- server_context.use_privatekey(
- load_privatekey(FILETYPE_PEM, server_key_pem))
- server_context.use_certificate(
- load_certificate(FILETYPE_PEM, server_cert_pem))
+ server_context = Context(SSLv23_METHOD)
+ server_context.set_alpn_select_callback(refusal)
- # Do a little connection to trigger the logic
- server = Connection(server_context, None)
- server.set_accept_state()
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
- client = Connection(client_context, None)
- client.set_connect_state()
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
- # Do the dance.
- interact_in_memory(server, client)
+ client = Connection(client_context, None)
+ client.set_connect_state()
- assert refusal_args == [(server, [b'http/1.1', b'spdy/2'])]
+ # Do the dance.
+ interact_in_memory(server, client)
- assert client.get_alpn_proto_negotiated() == b''
+ assert refusal_args == [(server, [b'http/1.1', b'spdy/2'])]
- def test_alpn_select_cb_returns_invalid_value(self):
- """
- If the ALPN selection callback returns anything other than
- a bytestring or ``NO_OVERLAPPING_PROTOCOLS``, a
- :py:exc:`TypeError` is raised.
- """
- invalid_cb_args = []
+ assert client.get_alpn_proto_negotiated() == b''
- def invalid_cb(conn, options):
- invalid_cb_args.append((conn, options))
- return u"can't return unicode"
+ def test_alpn_select_cb_returns_invalid_value(self):
+ """
+ If the ALPN selection callback returns anything other than
+ a bytestring or ``NO_OVERLAPPING_PROTOCOLS``, a
+ :py:exc:`TypeError` is raised.
+ """
+ invalid_cb_args = []
- client_context = Context(SSLv23_METHOD)
- client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
+ def invalid_cb(conn, options):
+ invalid_cb_args.append((conn, options))
+ return u"can't return unicode"
- server_context = Context(SSLv23_METHOD)
- server_context.set_alpn_select_callback(invalid_cb)
+ client_context = Context(SSLv23_METHOD)
+ client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
- # Necessary to actually accept the connection
- server_context.use_privatekey(
- load_privatekey(FILETYPE_PEM, server_key_pem))
- server_context.use_certificate(
- load_certificate(FILETYPE_PEM, server_cert_pem))
+ server_context = Context(SSLv23_METHOD)
+ server_context.set_alpn_select_callback(invalid_cb)
- # Do a little connection to trigger the logic
- server = Connection(server_context, None)
- server.set_accept_state()
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
- client = Connection(client_context, None)
- client.set_connect_state()
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
- # Do the dance.
- with pytest.raises(TypeError):
- interact_in_memory(server, client)
+ client = Connection(client_context, None)
+ client.set_connect_state()
- assert invalid_cb_args == [(server, [b'http/1.1', b'spdy/2'])]
+ # Do the dance.
+ with pytest.raises(TypeError):
+ interact_in_memory(server, client)
- assert client.get_alpn_proto_negotiated() == b''
+ assert invalid_cb_args == [(server, [b'http/1.1', b'spdy/2'])]
- def test_alpn_no_server(self):
- """
- When clients and servers cannot agree on what protocol to use next
- because the server doesn't offer ALPN, no protocol is negotiated.
- """
- client_context = Context(TLSv1_METHOD)
- client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
+ assert client.get_alpn_proto_negotiated() == b''
- server_context = Context(TLSv1_METHOD)
+ def test_alpn_no_server(self):
+ """
+ When clients and servers cannot agree on what protocol to use next
+ because the server doesn't offer ALPN, no protocol is negotiated.
+ """
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
- # Necessary to actually accept the connection
- server_context.use_privatekey(
- load_privatekey(FILETYPE_PEM, server_key_pem))
- server_context.use_certificate(
- load_certificate(FILETYPE_PEM, server_cert_pem))
+ server_context = Context(TLSv1_METHOD)
- # Do a little connection to trigger the logic
- server = Connection(server_context, None)
- server.set_accept_state()
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
- client = Connection(client_context, None)
- client.set_connect_state()
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
- # Do the dance.
- interact_in_memory(server, client)
+ client = Connection(client_context, None)
+ client.set_connect_state()
- assert client.get_alpn_proto_negotiated() == b''
+ # Do the dance.
+ interact_in_memory(server, client)
- def test_alpn_callback_exception(self):
- """
- We can handle exceptions in the ALPN select callback.
- """
- select_args = []
+ assert client.get_alpn_proto_negotiated() == b''
- def select(conn, options):
- select_args.append((conn, options))
- raise TypeError()
+ def test_alpn_callback_exception(self):
+ """
+ We can handle exceptions in the ALPN select callback.
+ """
+ select_args = []
- client_context = Context(TLSv1_METHOD)
- client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
+ def select(conn, options):
+ select_args.append((conn, options))
+ raise TypeError()
- server_context = Context(TLSv1_METHOD)
- server_context.set_alpn_select_callback(select)
+ client_context = Context(TLSv1_METHOD)
+ client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
- # Necessary to actually accept the connection
- server_context.use_privatekey(
- load_privatekey(FILETYPE_PEM, server_key_pem))
- server_context.use_certificate(
- load_certificate(FILETYPE_PEM, server_cert_pem))
+ server_context = Context(TLSv1_METHOD)
+ server_context.set_alpn_select_callback(select)
- # Do a little connection to trigger the logic
- server = Connection(server_context, None)
- server.set_accept_state()
+ # Necessary to actually accept the connection
+ server_context.use_privatekey(
+ load_privatekey(FILETYPE_PEM, server_key_pem))
+ server_context.use_certificate(
+ load_certificate(FILETYPE_PEM, server_cert_pem))
- client = Connection(client_context, None)
- client.set_connect_state()
+ # Do a little connection to trigger the logic
+ server = Connection(server_context, None)
+ server.set_accept_state()
- with pytest.raises(TypeError):
- interact_in_memory(server, client)
- assert select_args == [(server, [b'http/1.1', b'spdy/2'])]
+ client = Connection(client_context, None)
+ client.set_connect_state()
- else:
- # No ALPN.
- def test_alpn_not_implemented(self):
- """
- If ALPN is not in OpenSSL, we should raise NotImplementedError.
- """
- # Test the context methods first.
- context = Context(TLSv1_METHOD)
- with pytest.raises(NotImplementedError):
- context.set_alpn_protos(None)
- with pytest.raises(NotImplementedError):
- context.set_alpn_select_callback(None)
-
- # Now test a connection.
- conn = Connection(context)
- with pytest.raises(NotImplementedError):
- conn.set_alpn_protos(None)
+ with pytest.raises(TypeError):
+ interact_in_memory(server, client)
+ assert select_args == [(server, [b'http/1.1', b'spdy/2'])]
class TestSession(object):