summaryrefslogtreecommitdiff
path: root/Lib/test/test_ssl.py
diff options
context:
space:
mode:
authorChristian Heimes <christian@python.org>2016-09-10 23:44:53 +0200
committerChristian Heimes <christian@python.org>2016-09-10 23:44:53 +0200
commit7f85dccc70da5218cbd16e4c0a35e9b7cf349276 (patch)
tree4f62e41d86e10267b4c48d4f18572bed91a64d0a /Lib/test/test_ssl.py
parentde56c23b9a770f4872dd5a89478b0b334cc7de86 (diff)
downloadcpython-7f85dccc70da5218cbd16e4c0a35e9b7cf349276.tar.gz
Issue #19500: Add client-side SSL session resumption to the ssl module.
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r--Lib/test/test_ssl.py112
1 files changed, 110 insertions, 2 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index aed226c7e1..61744ae95a 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -2163,7 +2163,8 @@ if _have_threads:
self.server.close()
def server_params_test(client_context, server_context, indata=b"FOO\n",
- chatty=True, connectionchatty=False, sni_name=None):
+ chatty=True, connectionchatty=False, sni_name=None,
+ session=None):
"""
Launch a server, connect a client to it and try various reads
and writes.
@@ -2174,7 +2175,7 @@ if _have_threads:
connectionchatty=False)
with server:
with client_context.wrap_socket(socket.socket(),
- server_hostname=sni_name) as s:
+ server_hostname=sni_name, session=session) as s:
s.connect((HOST, server.port))
for arg in [indata, bytearray(indata), memoryview(indata)]:
if connectionchatty:
@@ -2202,6 +2203,8 @@ if _have_threads:
'client_alpn_protocol': s.selected_alpn_protocol(),
'client_npn_protocol': s.selected_npn_protocol(),
'version': s.version(),
+ 'session_reused': s.session_reused,
+ 'session': s.session,
})
s.close()
stats['server_alpn_protocols'] = server.selected_alpn_protocols
@@ -3412,6 +3415,111 @@ if _have_threads:
s.sendfile(file)
self.assertEqual(s.recv(1024), TEST_DATA)
+ def test_session(self):
+ server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ server_context.load_cert_chain(SIGNED_CERTFILE)
+ client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ client_context.verify_mode = ssl.CERT_REQUIRED
+ client_context.load_verify_locations(SIGNING_CA)
+
+ # first conncetion without session
+ stats = server_params_test(client_context, server_context)
+ session = stats['session']
+ self.assertTrue(session.id)
+ self.assertGreater(session.time, 0)
+ self.assertGreater(session.timeout, 0)
+ self.assertTrue(session.has_ticket)
+ if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
+ self.assertGreater(session.ticket_lifetime_hint, 0)
+ self.assertFalse(stats['session_reused'])
+ sess_stat = server_context.session_stats()
+ self.assertEqual(sess_stat['accept'], 1)
+ self.assertEqual(sess_stat['hits'], 0)
+
+ # reuse session
+ stats = server_params_test(client_context, server_context, session=session)
+ sess_stat = server_context.session_stats()
+ self.assertEqual(sess_stat['accept'], 2)
+ self.assertEqual(sess_stat['hits'], 1)
+ self.assertTrue(stats['session_reused'])
+ session2 = stats['session']
+ self.assertEqual(session2.id, session.id)
+ self.assertEqual(session2, session)
+ self.assertIsNot(session2, session)
+ self.assertGreaterEqual(session2.time, session.time)
+ self.assertGreaterEqual(session2.timeout, session.timeout)
+
+ # another one without session
+ stats = server_params_test(client_context, server_context)
+ self.assertFalse(stats['session_reused'])
+ session3 = stats['session']
+ self.assertNotEqual(session3.id, session.id)
+ self.assertNotEqual(session3, session)
+ sess_stat = server_context.session_stats()
+ self.assertEqual(sess_stat['accept'], 3)
+ self.assertEqual(sess_stat['hits'], 1)
+
+ # reuse session again
+ stats = server_params_test(client_context, server_context, session=session)
+ self.assertTrue(stats['session_reused'])
+ session4 = stats['session']
+ self.assertEqual(session4.id, session.id)
+ self.assertEqual(session4, session)
+ self.assertGreaterEqual(session4.time, session.time)
+ self.assertGreaterEqual(session4.timeout, session.timeout)
+ sess_stat = server_context.session_stats()
+ self.assertEqual(sess_stat['accept'], 4)
+ self.assertEqual(sess_stat['hits'], 2)
+
+ def test_session_handling(self):
+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context.verify_mode = ssl.CERT_REQUIRED
+ context.load_verify_locations(CERTFILE)
+ context.load_cert_chain(CERTFILE)
+
+ context2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context2.verify_mode = ssl.CERT_REQUIRED
+ context2.load_verify_locations(CERTFILE)
+ context2.load_cert_chain(CERTFILE)
+
+ server = ThreadedEchoServer(context=context, chatty=False)
+ with server:
+ with context.wrap_socket(socket.socket()) as s:
+ # session is None before handshake
+ self.assertEqual(s.session, None)
+ self.assertEqual(s.session_reused, None)
+ s.connect((HOST, server.port))
+ session = s.session
+ self.assertTrue(session)
+ with self.assertRaises(TypeError) as e:
+ s.session = object
+ self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
+
+ with context.wrap_socket(socket.socket()) as s:
+ s.connect((HOST, server.port))
+ # cannot set session after handshake
+ with self.assertRaises(ValueError) as e:
+ s.session = session
+ self.assertEqual(str(e.exception),
+ 'Cannot set session after handshake.')
+
+ with context.wrap_socket(socket.socket()) as s:
+ # can set session before handshake and before the
+ # connection was established
+ s.session = session
+ s.connect((HOST, server.port))
+ self.assertEqual(s.session.id, session.id)
+ self.assertEqual(s.session, session)
+ self.assertEqual(s.session_reused, True)
+
+ with context2.wrap_socket(socket.socket()) as s:
+ # cannot re-use session with a different SSLContext
+ with self.assertRaises(ValueError) as e:
+ s.session = session
+ s.connect((HOST, server.port))
+ self.assertEqual(str(e.exception),
+ 'Session refers to a different SSLContext.')
+
def test_main(verbose=False):
if support.verbose: