From 8b5d969b2f06427f0460be47b20e0e2bc9400535 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Tue, 6 Sep 2016 00:04:45 +0200 Subject: Issue #27866: Add SSLContext.get_ciphers() method to get a list of all enabled ciphers. --- Lib/test/test_ssl.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index f6afa267c5..f19cf43336 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -834,6 +834,15 @@ class ContextTests(unittest.TestCase): with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): ctx.set_ciphers("^$:,;?*'dorothyx") + @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old') + def test_get_ciphers(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_ciphers('ECDHE+AESGCM:!ECDSA') + names = set(d['name'] for d in ctx.get_ciphers()) + self.assertEqual(names, + {'ECDHE-RSA-AES256-GCM-SHA384', + 'ECDHE-RSA-AES128-GCM-SHA256'}) + @skip_if_broken_ubuntu_ssl def test_options(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) -- cgit v1.2.1 From 2b780402fed75e28ab5fd75bb9ff9ecefa270ca5 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Tue, 6 Sep 2016 10:45:44 +0200 Subject: Issue 27866: relax test case for set_cipher() and allow more cipher suites --- Lib/test/test_ssl.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index f19cf43336..07fb1026a5 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -837,11 +837,10 @@ class ContextTests(unittest.TestCase): @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old') def test_get_ciphers(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - ctx.set_ciphers('ECDHE+AESGCM:!ECDSA') + ctx.set_ciphers('AESGCM') names = set(d['name'] for d in ctx.get_ciphers()) - self.assertEqual(names, - {'ECDHE-RSA-AES256-GCM-SHA384', - 'ECDHE-RSA-AES128-GCM-SHA256'}) + self.assertIn('ECDHE-RSA-AES256-GCM-SHA384', names) + self.assertIn('ECDHE-RSA-AES128-GCM-SHA256', names) @skip_if_broken_ubuntu_ssl def test_options(self): -- cgit v1.2.1 From e295df31d14122eb1c56300d2f2932e47195d8a9 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Tue, 6 Sep 2016 11:27:25 +0200 Subject: Issue 27866: relax get_cipher() test even more. Gentoo buildbot has no ECDHE --- Lib/test/test_ssl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 07fb1026a5..4e0e4a2185 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -839,8 +839,8 @@ class ContextTests(unittest.TestCase): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx.set_ciphers('AESGCM') names = set(d['name'] for d in ctx.get_ciphers()) - self.assertIn('ECDHE-RSA-AES256-GCM-SHA384', names) - self.assertIn('ECDHE-RSA-AES128-GCM-SHA256', names) + self.assertIn('AES256-GCM-SHA384', names) + self.assertIn('AES128-GCM-SHA256', names) @skip_if_broken_ubuntu_ssl def test_options(self): -- cgit v1.2.1 From 03c8a473999b08710ae3661724cf1bf1d76bc555 Mon Sep 17 00:00:00 2001 From: R David Murray Date: Thu, 8 Sep 2016 13:59:53 -0400 Subject: #27364: fix "incorrect" uses of escape character in the stdlib. And most of the tools. Patch by Emanual Barry, reviewed by me, Serhiy Storchaka, and Martin Panter. --- Lib/test/test_ssl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index d8d53af62b..0bdb86d5f2 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -3405,7 +3405,7 @@ def test_main(verbose=False): with warnings.catch_warnings(): warnings.filterwarnings( 'ignore', - 'dist\(\) and linux_distribution\(\) ' + r'dist\(\) and linux_distribution\(\) ' 'functions are deprecated .*', PendingDeprecationWarning, ) -- cgit v1.2.1 From 945180b06b9c2a5b60c68d3b950d29b13beada58 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Sat, 10 Sep 2016 22:43:48 +0200 Subject: Issue 28043: SSLContext has improved default settings The options OP_NO_COMPRESSION, OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE, OP_SINGLE_ECDH_USE, OP_NO_SSLv2 (except for PROTOCOL_SSLv2), and OP_NO_SSLv3 (except for PROTOCOL_SSLv3) are set by default. The initial cipher suite list contains only HIGH ciphers, no NULL ciphers and MD5 ciphers (except for PROTOCOL_SSLv2). --- Lib/test/test_ssl.py | 62 ++++++++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 29 deletions(-) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 0bdb86d5f2..7488dc7baa 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -80,6 +80,12 @@ NULLBYTECERT = data_file("nullbytecert.pem") DHFILE = data_file("dh1024.pem") BYTES_DHFILE = os.fsencode(DHFILE) +# Not defined in all versions of OpenSSL +OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0) +OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0) +OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0) +OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0) + def handle_error(prefix): exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) @@ -870,8 +876,9 @@ class ContextTests(unittest.TestCase): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) - if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0): - default |= ssl.OP_NO_COMPRESSION + # SSLContext also enables these by default + default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE | + OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE) self.assertEqual(default, ctx.options) ctx.options |= ssl.OP_NO_TLSv1 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options) @@ -1236,16 +1243,29 @@ class ContextTests(unittest.TestCase): stats["x509"] += 1 self.assertEqual(ctx.cert_store_stats(), stats) + def _assert_context_options(self, ctx): + self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + if OP_NO_COMPRESSION != 0: + self.assertEqual(ctx.options & OP_NO_COMPRESSION, + OP_NO_COMPRESSION) + if OP_SINGLE_DH_USE != 0: + self.assertEqual(ctx.options & OP_SINGLE_DH_USE, + OP_SINGLE_DH_USE) + if OP_SINGLE_ECDH_USE != 0: + self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE, + OP_SINGLE_ECDH_USE) + if OP_CIPHER_SERVER_PREFERENCE != 0: + self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE, + OP_CIPHER_SERVER_PREFERENCE) + def test_create_default_context(self): ctx = ssl.create_default_context() + self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) self.assertTrue(ctx.check_hostname) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) - self.assertEqual( - ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), - getattr(ssl, "OP_NO_COMPRESSION", 0), - ) + self._assert_context_options(ctx) + with open(SIGNING_CA) as f: cadata = f.read() @@ -1253,40 +1273,24 @@ class ContextTests(unittest.TestCase): cadata=cadata) self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) - self.assertEqual( - ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), - getattr(ssl, "OP_NO_COMPRESSION", 0), - ) + self._assert_context_options(ctx) ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) - self.assertEqual( - ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), - getattr(ssl, "OP_NO_COMPRESSION", 0), - ) - self.assertEqual( - ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0), - getattr(ssl, "OP_SINGLE_DH_USE", 0), - ) - self.assertEqual( - ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0), - getattr(ssl, "OP_SINGLE_ECDH_USE", 0), - ) + self._assert_context_options(ctx) def test__create_stdlib_context(self): ctx = ssl._create_stdlib_context() self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) self.assertFalse(ctx.check_hostname) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + self._assert_context_options(ctx) ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1) self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + self._assert_context_options(ctx) ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED, @@ -1294,12 +1298,12 @@ class ContextTests(unittest.TestCase): self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) self.assertTrue(ctx.check_hostname) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + self._assert_context_options(ctx) ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH) self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + self._assert_context_options(ctx) def test_check_hostname(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) -- cgit v1.2.1 From de56c23b9a770f4872dd5a89478b0b334cc7de86 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Sat, 10 Sep 2016 23:23:33 +0200 Subject: Issue #28022: Deprecate ssl-related arguments in favor of SSLContext. The deprecation include manual creation of SSLSocket and certfile/keyfile (or similar) in ftplib, httplib, imaplib, smtplib, poplib and urllib. ssl.wrap_socket() is not marked as deprecated yet. --- Lib/test/test_ssl.py | 87 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 51 insertions(+), 36 deletions(-) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 7488dc7baa..aed226c7e1 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -143,6 +143,21 @@ def skip_if_broken_ubuntu_ssl(func): needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") +def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *, + cert_reqs=ssl.CERT_NONE, ca_certs=None, + ciphers=None, certfile=None, keyfile=None, + **kwargs): + context = ssl.SSLContext(ssl_version) + if cert_reqs is not None: + context.verify_mode = cert_reqs + if ca_certs is not None: + context.load_verify_locations(ca_certs) + if certfile is not None or keyfile is not None: + context.load_cert_chain(certfile, keyfile) + if ciphers is not None: + context.set_ciphers(ciphers) + return context.wrap_socket(sock, **kwargs) + class BasicSocketTests(unittest.TestCase): def test_constants(self): @@ -363,7 +378,7 @@ class BasicSocketTests(unittest.TestCase): # Issue #7943: an SSL object doesn't create reference cycles with # itself. s = socket.socket(socket.AF_INET) - ss = ssl.wrap_socket(s) + ss = test_wrap_socket(s) wr = weakref.ref(ss) with support.check_warnings(("", ResourceWarning)): del ss @@ -373,7 +388,7 @@ class BasicSocketTests(unittest.TestCase): # Methods on an unconnected SSLSocket propagate the original # OSError raise by the underlying socket object. s = socket.socket(socket.AF_INET) - with ssl.wrap_socket(s) as ss: + with test_wrap_socket(s) as ss: self.assertRaises(OSError, ss.recv, 1) self.assertRaises(OSError, ss.recv_into, bytearray(b'x')) self.assertRaises(OSError, ss.recvfrom, 1) @@ -387,10 +402,10 @@ class BasicSocketTests(unittest.TestCase): for timeout in (None, 0.0, 5.0): s = socket.socket(socket.AF_INET) s.settimeout(timeout) - with ssl.wrap_socket(s) as ss: + with test_wrap_socket(s) as ss: self.assertEqual(timeout, ss.gettimeout()) - def test_errors(self): + def test_errors_sslwrap(self): sock = socket.socket() self.assertRaisesRegex(ValueError, "certfile must be specified", @@ -400,10 +415,10 @@ class BasicSocketTests(unittest.TestCase): ssl.wrap_socket, sock, server_side=True) self.assertRaisesRegex(ValueError, "certfile must be specified for server-side operations", - ssl.wrap_socket, sock, server_side=True, certfile="") + ssl.wrap_socket, sock, server_side=True, certfile="") with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s: self.assertRaisesRegex(ValueError, "can't connect in server-side mode", - s.connect, (HOST, 8080)) + s.connect, (HOST, 8080)) with self.assertRaises(OSError) as cm: with socket.socket() as sock: ssl.wrap_socket(sock, certfile=NONEXISTINGCERT) @@ -426,7 +441,7 @@ class BasicSocketTests(unittest.TestCase): sock = socket.socket() self.addCleanup(sock.close) with self.assertRaises(ssl.SSLError): - ssl.wrap_socket(sock, + test_wrap_socket(sock, certfile=certfile, ssl_version=ssl.PROTOCOL_TLSv1) @@ -613,7 +628,7 @@ class BasicSocketTests(unittest.TestCase): s.listen() c = socket.socket(socket.AF_INET) c.connect(s.getsockname()) - with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss: + with test_wrap_socket(c, do_handshake_on_connect=False) as ss: with self.assertRaises(ValueError): ss.get_channel_binding("unknown-type") s.close() @@ -623,15 +638,15 @@ class BasicSocketTests(unittest.TestCase): def test_tls_unique_channel_binding(self): # unconnected should return None for known type s = socket.socket(socket.AF_INET) - with ssl.wrap_socket(s) as ss: + with test_wrap_socket(s) as ss: self.assertIsNone(ss.get_channel_binding("tls-unique")) # the same for server-side s = socket.socket(socket.AF_INET) - with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss: + with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss: self.assertIsNone(ss.get_channel_binding("tls-unique")) def test_dealloc_warn(self): - ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) + ss = test_wrap_socket(socket.socket(socket.AF_INET)) r = repr(ss) with self.assertWarns(ResourceWarning) as cm: ss = None @@ -750,7 +765,7 @@ class BasicSocketTests(unittest.TestCase): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.addCleanup(s.close) with self.assertRaises(NotImplementedError) as cx: - ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE) + test_wrap_socket(s, cert_reqs=ssl.CERT_NONE) self.assertEqual(str(cx.exception), "only stream sockets are supported") ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) with self.assertRaises(NotImplementedError) as cx: @@ -826,7 +841,7 @@ class BasicSocketTests(unittest.TestCase): server = socket.socket(socket.AF_INET) self.addCleanup(server.close) port = support.bind_port(server) # Reserve port but don't listen - s = ssl.wrap_socket(socket.socket(socket.AF_INET), + s = test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED) self.addCleanup(s.close) rc = s.connect_ex((HOST, port)) @@ -1444,13 +1459,13 @@ class SimpleBackgroundTests(unittest.TestCase): self.addCleanup(server.__exit__, None, None, None) def test_connect(self): - with ssl.wrap_socket(socket.socket(socket.AF_INET), + with test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_NONE) as s: s.connect(self.server_addr) self.assertEqual({}, s.getpeercert()) # this should succeed because we specify the root cert - with ssl.wrap_socket(socket.socket(socket.AF_INET), + with test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SIGNING_CA) as s: s.connect(self.server_addr) @@ -1460,7 +1475,7 @@ class SimpleBackgroundTests(unittest.TestCase): # This should fail because we have no verification certs. Connection # failure crashes ThreadedEchoServer, so run this in an independent # test method. - s = ssl.wrap_socket(socket.socket(socket.AF_INET), + s = test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED) self.addCleanup(s.close) self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", @@ -1468,7 +1483,7 @@ class SimpleBackgroundTests(unittest.TestCase): def test_connect_ex(self): # Issue #11326: check connect_ex() implementation - s = ssl.wrap_socket(socket.socket(socket.AF_INET), + s = test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SIGNING_CA) self.addCleanup(s.close) @@ -1478,7 +1493,7 @@ class SimpleBackgroundTests(unittest.TestCase): def test_non_blocking_connect_ex(self): # Issue #11326: non-blocking connect_ex() should allow handshake # to proceed after the socket gets ready. - s = ssl.wrap_socket(socket.socket(socket.AF_INET), + s = test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SIGNING_CA, do_handshake_on_connect=False) @@ -1578,7 +1593,7 @@ class SimpleBackgroundTests(unittest.TestCase): # Issue #5238: creating a file-like object with makefile() shouldn't # delay closing the underlying "real socket" (here tested with its # file descriptor, hence skipping the test under Windows). - ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) + ss = test_wrap_socket(socket.socket(socket.AF_INET)) ss.connect(self.server_addr) fd = ss.fileno() f = ss.makefile() @@ -1596,7 +1611,7 @@ class SimpleBackgroundTests(unittest.TestCase): s = socket.socket(socket.AF_INET) s.connect(self.server_addr) s.setblocking(False) - s = ssl.wrap_socket(s, + s = test_wrap_socket(s, cert_reqs=ssl.CERT_NONE, do_handshake_on_connect=False) self.addCleanup(s.close) @@ -1622,16 +1637,16 @@ class SimpleBackgroundTests(unittest.TestCase): _test_get_server_certificate_fail(self, *self.server_addr) def test_ciphers(self): - with ssl.wrap_socket(socket.socket(socket.AF_INET), + with test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s: s.connect(self.server_addr) - with ssl.wrap_socket(socket.socket(socket.AF_INET), + with test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s: s.connect(self.server_addr) # Error checking can happen at instantiation or when connecting with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): with socket.socket(socket.AF_INET) as sock: - s = ssl.wrap_socket(sock, + s = test_wrap_socket(sock, cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") s.connect(self.server_addr) @@ -1749,7 +1764,7 @@ class NetworkedTests(unittest.TestCase): # Issue #12065: on a timeout, connect_ex() should return the original # errno (mimicking the behaviour of non-SSL sockets). with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), + s = test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, do_handshake_on_connect=False) self.addCleanup(s.close) @@ -2040,7 +2055,7 @@ if _have_threads: class ConnectionHandler (asyncore.dispatcher_with_send): def __init__(self, conn, certfile): - self.socket = ssl.wrap_socket(conn, server_side=True, + self.socket = test_wrap_socket(conn, server_side=True, certfile=certfile, do_handshake_on_connect=False) asyncore.dispatcher_with_send.__init__(self, self.socket) @@ -2401,7 +2416,7 @@ if _have_threads: connectionchatty=False) with server, \ socket.socket() as sock, \ - ssl.wrap_socket(sock, + test_wrap_socket(sock, certfile=certfile, ssl_version=ssl.PROTOCOL_TLSv1) as s: try: @@ -2448,7 +2463,7 @@ if _have_threads: c.connect((HOST, port)) listener_gone.wait() try: - ssl_sock = ssl.wrap_socket(c) + ssl_sock = test_wrap_socket(c) except OSError: pass else: @@ -2638,7 +2653,7 @@ if _have_threads: sys.stdout.write( " client: read %r from server, starting TLS...\n" % msg) - conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) + conn = test_wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) wrapped = True elif indata == b"ENDTLS" and msg.startswith(b"ok"): # ENDTLS ok, switch back to clear text @@ -2699,7 +2714,7 @@ if _have_threads: indata = b"FOO\n" server = AsyncoreEchoServer(CERTFILE) with server: - s = ssl.wrap_socket(socket.socket()) + s = test_wrap_socket(socket.socket()) s.connect(('127.0.0.1', server.port)) if support.verbose: sys.stdout.write( @@ -2732,7 +2747,7 @@ if _have_threads: chatty=True, connectionchatty=False) with server: - s = ssl.wrap_socket(socket.socket(), + s = test_wrap_socket(socket.socket(), server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, @@ -2856,7 +2871,7 @@ if _have_threads: self.addCleanup(server.__exit__, None, None) s = socket.create_connection((HOST, server.port)) self.addCleanup(s.close) - s = ssl.wrap_socket(s, suppress_ragged_eofs=False) + s = test_wrap_socket(s, suppress_ragged_eofs=False) self.addCleanup(s.close) # recv/read(0) should return no data @@ -2878,7 +2893,7 @@ if _have_threads: chatty=True, connectionchatty=False) with server: - s = ssl.wrap_socket(socket.socket(), + s = test_wrap_socket(socket.socket(), server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, @@ -2932,12 +2947,12 @@ if _have_threads: c.connect((host, port)) # Will attempt handshake and time out self.assertRaisesRegex(socket.timeout, "timed out", - ssl.wrap_socket, c) + test_wrap_socket, c) finally: c.close() try: c = socket.socket(socket.AF_INET) - c = ssl.wrap_socket(c) + c = test_wrap_socket(c) c.settimeout(0.2) # Will attempt handshake and time out self.assertRaisesRegex(socket.timeout, "timed out", @@ -3062,7 +3077,7 @@ if _have_threads: chatty=True, connectionchatty=False) with server: - s = ssl.wrap_socket(socket.socket(), + s = test_wrap_socket(socket.socket(), server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, @@ -3087,7 +3102,7 @@ if _have_threads: s.close() # now, again - s = ssl.wrap_socket(socket.socket(), + s = test_wrap_socket(socket.socket(), server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, -- cgit v1.2.1 From 7f85dccc70da5218cbd16e4c0a35e9b7cf349276 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Sat, 10 Sep 2016 23:44:53 +0200 Subject: Issue #19500: Add client-side SSL session resumption to the ssl module. --- Lib/test/test_ssl.py | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 2 deletions(-) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index aed226c7e1..61744ae95a 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -2163,7 +2163,8 @@ if _have_threads: self.server.close() def server_params_test(client_context, server_context, indata=b"FOO\n", - chatty=True, connectionchatty=False, sni_name=None): + chatty=True, connectionchatty=False, sni_name=None, + session=None): """ Launch a server, connect a client to it and try various reads and writes. @@ -2174,7 +2175,7 @@ if _have_threads: connectionchatty=False) with server: with client_context.wrap_socket(socket.socket(), - server_hostname=sni_name) as s: + server_hostname=sni_name, session=session) as s: s.connect((HOST, server.port)) for arg in [indata, bytearray(indata), memoryview(indata)]: if connectionchatty: @@ -2202,6 +2203,8 @@ if _have_threads: 'client_alpn_protocol': s.selected_alpn_protocol(), 'client_npn_protocol': s.selected_npn_protocol(), 'version': s.version(), + 'session_reused': s.session_reused, + 'session': s.session, }) s.close() stats['server_alpn_protocols'] = server.selected_alpn_protocols @@ -3412,6 +3415,111 @@ if _have_threads: s.sendfile(file) self.assertEqual(s.recv(1024), TEST_DATA) + def test_session(self): + server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + server_context.load_cert_chain(SIGNED_CERTFILE) + client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + client_context.verify_mode = ssl.CERT_REQUIRED + client_context.load_verify_locations(SIGNING_CA) + + # first conncetion without session + stats = server_params_test(client_context, server_context) + session = stats['session'] + self.assertTrue(session.id) + self.assertGreater(session.time, 0) + self.assertGreater(session.timeout, 0) + self.assertTrue(session.has_ticket) + if ssl.OPENSSL_VERSION_INFO > (1, 0, 1): + self.assertGreater(session.ticket_lifetime_hint, 0) + self.assertFalse(stats['session_reused']) + sess_stat = server_context.session_stats() + self.assertEqual(sess_stat['accept'], 1) + self.assertEqual(sess_stat['hits'], 0) + + # reuse session + stats = server_params_test(client_context, server_context, session=session) + sess_stat = server_context.session_stats() + self.assertEqual(sess_stat['accept'], 2) + self.assertEqual(sess_stat['hits'], 1) + self.assertTrue(stats['session_reused']) + session2 = stats['session'] + self.assertEqual(session2.id, session.id) + self.assertEqual(session2, session) + self.assertIsNot(session2, session) + self.assertGreaterEqual(session2.time, session.time) + self.assertGreaterEqual(session2.timeout, session.timeout) + + # another one without session + stats = server_params_test(client_context, server_context) + self.assertFalse(stats['session_reused']) + session3 = stats['session'] + self.assertNotEqual(session3.id, session.id) + self.assertNotEqual(session3, session) + sess_stat = server_context.session_stats() + self.assertEqual(sess_stat['accept'], 3) + self.assertEqual(sess_stat['hits'], 1) + + # reuse session again + stats = server_params_test(client_context, server_context, session=session) + self.assertTrue(stats['session_reused']) + session4 = stats['session'] + self.assertEqual(session4.id, session.id) + self.assertEqual(session4, session) + self.assertGreaterEqual(session4.time, session.time) + self.assertGreaterEqual(session4.timeout, session.timeout) + sess_stat = server_context.session_stats() + self.assertEqual(sess_stat['accept'], 4) + self.assertEqual(sess_stat['hits'], 2) + + def test_session_handling(self): + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(CERTFILE) + context.load_cert_chain(CERTFILE) + + context2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context2.verify_mode = ssl.CERT_REQUIRED + context2.load_verify_locations(CERTFILE) + context2.load_cert_chain(CERTFILE) + + server = ThreadedEchoServer(context=context, chatty=False) + with server: + with context.wrap_socket(socket.socket()) as s: + # session is None before handshake + self.assertEqual(s.session, None) + self.assertEqual(s.session_reused, None) + s.connect((HOST, server.port)) + session = s.session + self.assertTrue(session) + with self.assertRaises(TypeError) as e: + s.session = object + self.assertEqual(str(e.exception), 'Value is not a SSLSession.') + + with context.wrap_socket(socket.socket()) as s: + s.connect((HOST, server.port)) + # cannot set session after handshake + with self.assertRaises(ValueError) as e: + s.session = session + self.assertEqual(str(e.exception), + 'Cannot set session after handshake.') + + with context.wrap_socket(socket.socket()) as s: + # can set session before handshake and before the + # connection was established + s.session = session + s.connect((HOST, server.port)) + self.assertEqual(s.session.id, session.id) + self.assertEqual(s.session, session) + self.assertEqual(s.session_reused, True) + + with context2.wrap_socket(socket.socket()) as s: + # cannot re-use session with a different SSLContext + with self.assertRaises(ValueError) as e: + s.session = session + s.connect((HOST, server.port)) + self.assertEqual(str(e.exception), + 'Session refers to a different SSLContext.') + def test_main(verbose=False): if support.verbose: -- cgit v1.2.1 From 99829a612c71c4fed8ac55e921f27a19a74bc634 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Mon, 12 Sep 2016 00:01:11 +0200 Subject: Issue #28085: Add PROTOCOL_TLS_CLIENT and PROTOCOL_TLS_SERVER for SSLContext --- Lib/test/test_ssl.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 61744ae95a..557b6dec5b 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -1342,6 +1342,17 @@ class ContextTests(unittest.TestCase): ctx.check_hostname = False self.assertFalse(ctx.check_hostname) + def test_context_client_server(self): + # PROTOCOL_TLS_CLIENT has sane defaults + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + self.assertTrue(ctx.check_hostname) + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + + # PROTOCOL_TLS_SERVER has different but also sane defaults + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + self.assertFalse(ctx.check_hostname) + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + class SSLErrorTests(unittest.TestCase): @@ -2280,12 +2291,33 @@ if _have_threads: if support.verbose: sys.stdout.write("\n") for protocol in PROTOCOLS: + if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}: + continue with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]): context = ssl.SSLContext(protocol) context.load_cert_chain(CERTFILE) server_params_test(context, context, chatty=True, connectionchatty=True) + client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + client_context.load_verify_locations(SIGNING_CA) + server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + # server_context.load_verify_locations(SIGNING_CA) + server_context.load_cert_chain(SIGNED_CERTFILE2) + + with self.subTest(client='PROTOCOL_TLS_CLIENT', server='PROTOCOL_TLS_SERVER'): + server_params_test(client_context=client_context, + server_context=server_context, + chatty=True, connectionchatty=True, + sni_name='fakehostname') + + with self.subTest(client='PROTOCOL_TLS_SERVER', server='PROTOCOL_TLS_CLIENT'): + with self.assertRaises(ssl.SSLError): + server_params_test(client_context=server_context, + server_context=client_context, + chatty=True, connectionchatty=True, + sni_name='fakehostname') + def test_getpeercert(self): if support.verbose: sys.stdout.write("\n") -- cgit v1.2.1 From 419ef272447233d9921a146ba6f03316f32fc549 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Mon, 12 Sep 2016 10:48:20 +0200 Subject: Issue #28093: Check more invalid combinations of PROTOCOL_TLS_CLIENT / PROTOCOL_TLS_SERVER --- Lib/test/test_ssl.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 557b6dec5b..46ec8223e8 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -2305,18 +2305,38 @@ if _have_threads: # server_context.load_verify_locations(SIGNING_CA) server_context.load_cert_chain(SIGNED_CERTFILE2) - with self.subTest(client='PROTOCOL_TLS_CLIENT', server='PROTOCOL_TLS_SERVER'): + with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER): server_params_test(client_context=client_context, server_context=server_context, chatty=True, connectionchatty=True, sni_name='fakehostname') - with self.subTest(client='PROTOCOL_TLS_SERVER', server='PROTOCOL_TLS_CLIENT'): - with self.assertRaises(ssl.SSLError): + client_context.check_hostname = False + with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT): + with self.assertRaises(ssl.SSLError) as e: server_params_test(client_context=server_context, server_context=client_context, chatty=True, connectionchatty=True, sni_name='fakehostname') + self.assertIn('called a function you should not call', + str(e.exception)) + + with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER): + with self.assertRaises(ssl.SSLError) as e: + server_params_test(client_context=server_context, + server_context=server_context, + chatty=True, connectionchatty=True) + self.assertIn('called a function you should not call', + str(e.exception)) + + with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT): + with self.assertRaises(ssl.SSLError) as e: + server_params_test(client_context=server_context, + server_context=client_context, + chatty=True, connectionchatty=True) + self.assertIn('called a function you should not call', + str(e.exception)) + def test_getpeercert(self): if support.verbose: -- cgit v1.2.1 From 491968ae74c1e7faf2822ad42a8eddcb615343b7 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Sat, 24 Sep 2016 10:48:05 +0200 Subject: Finish GC code for SSLSession and increase test coverage --- Lib/test/test_ssl.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 631c8c187d..ad30105b0f 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -1474,6 +1474,7 @@ class SimpleBackgroundTests(unittest.TestCase): cert_reqs=ssl.CERT_NONE) as s: s.connect(self.server_addr) self.assertEqual({}, s.getpeercert()) + self.assertFalse(s.server_side) # this should succeed because we specify the root cert with test_wrap_socket(socket.socket(socket.AF_INET), @@ -1481,6 +1482,7 @@ class SimpleBackgroundTests(unittest.TestCase): ca_certs=SIGNING_CA) as s: s.connect(self.server_addr) self.assertTrue(s.getpeercert()) + self.assertFalse(s.server_side) def test_connect_fail(self): # This should fail because we have no verification certs. Connection @@ -3028,6 +3030,7 @@ if _have_threads: host = "127.0.0.1" port = support.bind_port(server) server = context.wrap_socket(server, server_side=True) + self.assertTrue(server.server_side) evt = threading.Event() remote = None -- cgit v1.2.1 From 01c29c2b0cbe7645628250701cdda38f6d751d66 Mon Sep 17 00:00:00 2001 From: Martin Panter Date: Mon, 10 Oct 2016 00:38:21 +0000 Subject: Issue #28394: More typo fixes for 3.6+ --- Lib/test/test_ssl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/test/test_ssl.py') diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index ad30105b0f..d203cddbdf 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -3475,7 +3475,7 @@ if _have_threads: client_context.verify_mode = ssl.CERT_REQUIRED client_context.load_verify_locations(SIGNING_CA) - # first conncetion without session + # first connection without session stats = server_params_test(client_context, server_context) session = stats['session'] self.assertTrue(session.id) -- cgit v1.2.1