summaryrefslogtreecommitdiff
path: root/Lib/test/test_ssl.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r--Lib/test/test_ssl.py997
1 files changed, 592 insertions, 405 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 551558304f..d203cddbdf 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -21,6 +21,13 @@ import functools
ssl = support.import_module("ssl")
+try:
+ import threading
+except ImportError:
+ _have_threads = False
+else:
+ _have_threads = True
+
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = support.HOST
IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
@@ -56,12 +63,12 @@ CRLFILE = data_file("revocation.crl")
# Two keys and certs signed by the same CA (for SNI tests)
SIGNED_CERTFILE = data_file("keycert3.pem")
SIGNED_CERTFILE2 = data_file("keycert4.pem")
-SIGNING_CA = data_file("pycacert.pem")
+# Same certificate as pycacert.pem, but without extra text in file
+SIGNING_CA = data_file("capath", "ceff1710.0")
# cert with all kinds of subject alt names
ALLSANFILE = data_file("allsans.pem")
REMOTE_HOST = "self-signed.pythontest.net"
-REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
EMPTYCERT = data_file("nullcert.pem")
BADCERT = data_file("badcert.pem")
@@ -73,6 +80,12 @@ NULLBYTECERT = data_file("nullbytecert.pem")
DHFILE = data_file("dh1024.pem")
BYTES_DHFILE = os.fsencode(DHFILE)
+# Not defined in all versions of OpenSSL
+OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
+OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
+OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
+OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
+
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
@@ -130,6 +143,21 @@ def skip_if_broken_ubuntu_ssl(func):
needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
+def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
+ cert_reqs=ssl.CERT_NONE, ca_certs=None,
+ ciphers=None, certfile=None, keyfile=None,
+ **kwargs):
+ context = ssl.SSLContext(ssl_version)
+ if cert_reqs is not None:
+ context.verify_mode = cert_reqs
+ if ca_certs is not None:
+ context.load_verify_locations(ca_certs)
+ if certfile is not None or keyfile is not None:
+ context.load_cert_chain(certfile, keyfile)
+ if ciphers is not None:
+ context.set_ciphers(ciphers)
+ return context.wrap_socket(sock, **kwargs)
+
class BasicSocketTests(unittest.TestCase):
def test_constants(self):
@@ -350,17 +378,17 @@ class BasicSocketTests(unittest.TestCase):
# Issue #7943: an SSL object doesn't create reference cycles with
# itself.
s = socket.socket(socket.AF_INET)
- ss = ssl.wrap_socket(s)
+ ss = test_wrap_socket(s)
wr = weakref.ref(ss)
with support.check_warnings(("", ResourceWarning)):
del ss
- self.assertEqual(wr(), None)
+ self.assertEqual(wr(), None)
def test_wrapped_unconnected(self):
# Methods on an unconnected SSLSocket propagate the original
# OSError raise by the underlying socket object.
s = socket.socket(socket.AF_INET)
- with ssl.wrap_socket(s) as ss:
+ with test_wrap_socket(s) as ss:
self.assertRaises(OSError, ss.recv, 1)
self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
self.assertRaises(OSError, ss.recvfrom, 1)
@@ -374,10 +402,10 @@ class BasicSocketTests(unittest.TestCase):
for timeout in (None, 0.0, 5.0):
s = socket.socket(socket.AF_INET)
s.settimeout(timeout)
- with ssl.wrap_socket(s) as ss:
+ with test_wrap_socket(s) as ss:
self.assertEqual(timeout, ss.gettimeout())
- def test_errors(self):
+ def test_errors_sslwrap(self):
sock = socket.socket()
self.assertRaisesRegex(ValueError,
"certfile must be specified",
@@ -387,10 +415,10 @@ class BasicSocketTests(unittest.TestCase):
ssl.wrap_socket, sock, server_side=True)
self.assertRaisesRegex(ValueError,
"certfile must be specified for server-side operations",
- ssl.wrap_socket, sock, server_side=True, certfile="")
+ ssl.wrap_socket, sock, server_side=True, certfile="")
with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
- s.connect, (HOST, 8080))
+ s.connect, (HOST, 8080))
with self.assertRaises(OSError) as cm:
with socket.socket() as sock:
ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
@@ -413,7 +441,7 @@ class BasicSocketTests(unittest.TestCase):
sock = socket.socket()
self.addCleanup(sock.close)
with self.assertRaises(ssl.SSLError):
- ssl.wrap_socket(sock,
+ test_wrap_socket(sock,
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1)
@@ -600,7 +628,7 @@ class BasicSocketTests(unittest.TestCase):
s.listen()
c = socket.socket(socket.AF_INET)
c.connect(s.getsockname())
- with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
+ with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
with self.assertRaises(ValueError):
ss.get_channel_binding("unknown-type")
s.close()
@@ -610,15 +638,15 @@ class BasicSocketTests(unittest.TestCase):
def test_tls_unique_channel_binding(self):
# unconnected should return None for known type
s = socket.socket(socket.AF_INET)
- with ssl.wrap_socket(s) as ss:
+ with test_wrap_socket(s) as ss:
self.assertIsNone(ss.get_channel_binding("tls-unique"))
# the same for server-side
s = socket.socket(socket.AF_INET)
- with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
+ with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
self.assertIsNone(ss.get_channel_binding("tls-unique"))
def test_dealloc_warn(self):
- ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
+ ss = test_wrap_socket(socket.socket(socket.AF_INET))
r = repr(ss)
with self.assertWarns(ResourceWarning) as cm:
ss = None
@@ -737,7 +765,7 @@ class BasicSocketTests(unittest.TestCase):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
with self.assertRaises(NotImplementedError) as cx:
- ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
+ test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
self.assertEqual(str(cx.exception), "only stream sockets are supported")
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
with self.assertRaises(NotImplementedError) as cx:
@@ -809,6 +837,22 @@ class BasicSocketTests(unittest.TestCase):
self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
+ def test_connect_ex_error(self):
+ server = socket.socket(socket.AF_INET)
+ self.addCleanup(server.close)
+ port = support.bind_port(server) # Reserve port but don't listen
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED)
+ self.addCleanup(s.close)
+ rc = s.connect_ex((HOST, port))
+ # Issue #19919: Windows machines or VMs hosted on Windows
+ # machines sometimes return EWOULDBLOCK.
+ errors = (
+ errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
+ errno.EWOULDBLOCK,
+ )
+ self.assertIn(rc, errors)
+
class ContextTests(unittest.TestCase):
@@ -834,13 +878,22 @@ class ContextTests(unittest.TestCase):
with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
ctx.set_ciphers("^$:,;?*'dorothyx")
+ @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
+ def test_get_ciphers(self):
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ ctx.set_ciphers('AESGCM')
+ names = set(d['name'] for d in ctx.get_ciphers())
+ self.assertIn('AES256-GCM-SHA384', names)
+ self.assertIn('AES128-GCM-SHA256', names)
+
@skip_if_broken_ubuntu_ssl
def test_options(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
# OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
- if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
- default |= ssl.OP_NO_COMPRESSION
+ # SSLContext also enables these by default
+ default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
+ OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
self.assertEqual(default, ctx.options)
ctx.options |= ssl.OP_NO_TLSv1
self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
@@ -1205,16 +1258,29 @@ class ContextTests(unittest.TestCase):
stats["x509"] += 1
self.assertEqual(ctx.cert_store_stats(), stats)
+ def _assert_context_options(self, ctx):
+ self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
+ if OP_NO_COMPRESSION != 0:
+ self.assertEqual(ctx.options & OP_NO_COMPRESSION,
+ OP_NO_COMPRESSION)
+ if OP_SINGLE_DH_USE != 0:
+ self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
+ OP_SINGLE_DH_USE)
+ if OP_SINGLE_ECDH_USE != 0:
+ self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
+ OP_SINGLE_ECDH_USE)
+ if OP_CIPHER_SERVER_PREFERENCE != 0:
+ self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
+ OP_CIPHER_SERVER_PREFERENCE)
+
def test_create_default_context(self):
ctx = ssl.create_default_context()
+
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
self.assertTrue(ctx.check_hostname)
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
- self.assertEqual(
- ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
- getattr(ssl, "OP_NO_COMPRESSION", 0),
- )
+ self._assert_context_options(ctx)
+
with open(SIGNING_CA) as f:
cadata = f.read()
@@ -1222,40 +1288,24 @@ class ContextTests(unittest.TestCase):
cadata=cadata)
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
- self.assertEqual(
- ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
- getattr(ssl, "OP_NO_COMPRESSION", 0),
- )
+ self._assert_context_options(ctx)
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
- self.assertEqual(
- ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
- getattr(ssl, "OP_NO_COMPRESSION", 0),
- )
- self.assertEqual(
- ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
- getattr(ssl, "OP_SINGLE_DH_USE", 0),
- )
- self.assertEqual(
- ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
- getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
- )
+ self._assert_context_options(ctx)
def test__create_stdlib_context(self):
ctx = ssl._create_stdlib_context()
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
self.assertFalse(ctx.check_hostname)
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
+ self._assert_context_options(ctx)
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
+ self._assert_context_options(ctx)
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
cert_reqs=ssl.CERT_REQUIRED,
@@ -1263,12 +1313,12 @@ class ContextTests(unittest.TestCase):
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
self.assertTrue(ctx.check_hostname)
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
+ self._assert_context_options(ctx)
ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
+ self._assert_context_options(ctx)
def test_check_hostname(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
@@ -1292,6 +1342,17 @@ class ContextTests(unittest.TestCase):
ctx.check_hostname = False
self.assertFalse(ctx.check_hostname)
+ def test_context_client_server(self):
+ # PROTOCOL_TLS_CLIENT has sane defaults
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ self.assertTrue(ctx.check_hostname)
+ self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
+
+ # PROTOCOL_TLS_SERVER has different but also sane defaults
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ self.assertFalse(ctx.check_hostname)
+ self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
+
class SSLErrorTests(unittest.TestCase):
@@ -1397,140 +1458,105 @@ class MemoryBIOTests(unittest.TestCase):
self.assertRaises(TypeError, bio.write, 1)
-class NetworkedTests(unittest.TestCase):
+@unittest.skipUnless(_have_threads, "Needs threading module")
+class SimpleBackgroundTests(unittest.TestCase):
- def test_connect(self):
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_NONE)
- try:
- s.connect((REMOTE_HOST, 443))
- self.assertEqual({}, s.getpeercert())
- finally:
- s.close()
+ """Tests that connect to a simple server running in the background"""
- # this should fail because we have no verification certs
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED)
- self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
- s.connect, (REMOTE_HOST, 443))
- s.close()
+ def setUp(self):
+ server = ThreadedEchoServer(SIGNED_CERTFILE)
+ self.server_addr = (HOST, server.port)
+ server.__enter__()
+ self.addCleanup(server.__exit__, None, None, None)
- # this should succeed because we specify the root cert
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT)
- try:
- s.connect((REMOTE_HOST, 443))
- self.assertTrue(s.getpeercert())
- finally:
- s.close()
+ def test_connect(self):
+ with test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_NONE) as s:
+ s.connect(self.server_addr)
+ self.assertEqual({}, s.getpeercert())
+ self.assertFalse(s.server_side)
+
+ # this should succeed because we specify the root cert
+ with test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED,
+ ca_certs=SIGNING_CA) as s:
+ s.connect(self.server_addr)
+ self.assertTrue(s.getpeercert())
+ self.assertFalse(s.server_side)
+
+ def test_connect_fail(self):
+ # This should fail because we have no verification certs. Connection
+ # failure crashes ThreadedEchoServer, so run this in an independent
+ # test method.
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED)
+ self.addCleanup(s.close)
+ self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
+ s.connect, self.server_addr)
def test_connect_ex(self):
# Issue #11326: check connect_ex() implementation
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT)
- try:
- self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
- self.assertTrue(s.getpeercert())
- finally:
- s.close()
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED,
+ ca_certs=SIGNING_CA)
+ self.addCleanup(s.close)
+ self.assertEqual(0, s.connect_ex(self.server_addr))
+ self.assertTrue(s.getpeercert())
def test_non_blocking_connect_ex(self):
# Issue #11326: non-blocking connect_ex() should allow handshake
# to proceed after the socket gets ready.
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT,
- do_handshake_on_connect=False)
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED,
+ ca_certs=SIGNING_CA,
+ do_handshake_on_connect=False)
+ self.addCleanup(s.close)
+ s.setblocking(False)
+ rc = s.connect_ex(self.server_addr)
+ # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
+ self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
+ # Wait for connect to finish
+ select.select([], [s], [], 5.0)
+ # Non-blocking handshake
+ while True:
try:
- s.setblocking(False)
- rc = s.connect_ex((REMOTE_HOST, 443))
- # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
- self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
- # Wait for connect to finish
+ s.do_handshake()
+ break
+ except ssl.SSLWantReadError:
+ select.select([s], [], [], 5.0)
+ except ssl.SSLWantWriteError:
select.select([], [s], [], 5.0)
- # Non-blocking handshake
- while True:
- try:
- s.do_handshake()
- break
- except ssl.SSLWantReadError:
- select.select([s], [], [], 5.0)
- except ssl.SSLWantWriteError:
- select.select([], [s], [], 5.0)
- # SSL established
- self.assertTrue(s.getpeercert())
- finally:
- s.close()
-
- def test_timeout_connect_ex(self):
- # Issue #12065: on a timeout, connect_ex() should return the original
- # errno (mimicking the behaviour of non-SSL sockets).
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT,
- do_handshake_on_connect=False)
- try:
- s.settimeout(0.0000001)
- rc = s.connect_ex((REMOTE_HOST, 443))
- if rc == 0:
- self.skipTest("REMOTE_HOST responded too quickly")
- self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
- finally:
- s.close()
-
- def test_connect_ex_error(self):
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT)
- try:
- rc = s.connect_ex((REMOTE_HOST, 444))
- # Issue #19919: Windows machines or VMs hosted on Windows
- # machines sometimes return EWOULDBLOCK.
- errors = (
- errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
- errno.EWOULDBLOCK,
- )
- self.assertIn(rc, errors)
- finally:
- s.close()
+ # SSL established
+ self.assertTrue(s.getpeercert())
def test_connect_with_context(self):
- with support.transient_internet(REMOTE_HOST):
- # Same as test_connect, but with a separately created context
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- self.assertEqual({}, s.getpeercert())
- finally:
- s.close()
- # Same with a server hostname
- s = ctx.wrap_socket(socket.socket(socket.AF_INET),
- server_hostname=REMOTE_HOST)
- s.connect((REMOTE_HOST, 443))
- s.close()
- # This should fail because we have no verification certs
- ctx.verify_mode = ssl.CERT_REQUIRED
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
- s.connect, (REMOTE_HOST, 443))
- s.close()
- # This should succeed because we specify the root cert
- ctx.load_verify_locations(REMOTE_ROOT_CERT)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- cert = s.getpeercert()
- self.assertTrue(cert)
- finally:
- s.close()
+ # Same as test_connect, but with a separately created context
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ self.assertEqual({}, s.getpeercert())
+ # Same with a server hostname
+ with ctx.wrap_socket(socket.socket(socket.AF_INET),
+ server_hostname="dummy") as s:
+ s.connect(self.server_addr)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ # This should succeed because we specify the root cert
+ ctx.load_verify_locations(SIGNING_CA)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
+
+ def test_connect_with_context_fail(self):
+ # This should fail because we have no verification certs. Connection
+ # failure crashes ThreadedEchoServer, so run this in an independent
+ # test method.
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+ self.addCleanup(s.close)
+ self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
+ s.connect, self.server_addr)
def test_connect_capath(self):
# Verify server certificates using the `capath` argument
@@ -1538,198 +1564,130 @@ class NetworkedTests(unittest.TestCase):
# OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
# contain both versions of each certificate (same content, different
# filename) for this test to be portable across OpenSSL releases.
- with support.transient_internet(REMOTE_HOST):
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(capath=CAPATH)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- cert = s.getpeercert()
- self.assertTrue(cert)
- finally:
- s.close()
- # Same with a bytes `capath` argument
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(capath=BYTES_CAPATH)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- cert = s.getpeercert()
- self.assertTrue(cert)
- finally:
- s.close()
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(capath=CAPATH)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
+ # Same with a bytes `capath` argument
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(capath=BYTES_CAPATH)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
def test_connect_cadata(self):
- with open(REMOTE_ROOT_CERT) as f:
+ with open(SIGNING_CA) as f:
pem = f.read()
der = ssl.PEM_cert_to_DER_cert(pem)
- with support.transient_internet(REMOTE_HOST):
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(cadata=pem)
- with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
- s.connect((REMOTE_HOST, 443))
- cert = s.getpeercert()
- self.assertTrue(cert)
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(cadata=pem)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
- # same with DER
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(cadata=der)
- with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
- s.connect((REMOTE_HOST, 443))
- cert = s.getpeercert()
- self.assertTrue(cert)
+ # same with DER
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(cadata=der)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
def test_makefile_close(self):
# Issue #5238: creating a file-like object with makefile() shouldn't
# delay closing the underlying "real socket" (here tested with its
# file descriptor, hence skipping the test under Windows).
- with support.transient_internet(REMOTE_HOST):
- ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
- ss.connect((REMOTE_HOST, 443))
- fd = ss.fileno()
- f = ss.makefile()
- f.close()
- # The fd is still open
+ ss = test_wrap_socket(socket.socket(socket.AF_INET))
+ ss.connect(self.server_addr)
+ fd = ss.fileno()
+ f = ss.makefile()
+ f.close()
+ # The fd is still open
+ os.read(fd, 0)
+ # Closing the SSL socket should close the fd too
+ ss.close()
+ gc.collect()
+ with self.assertRaises(OSError) as e:
os.read(fd, 0)
- # Closing the SSL socket should close the fd too
- ss.close()
- gc.collect()
- with self.assertRaises(OSError) as e:
- os.read(fd, 0)
- self.assertEqual(e.exception.errno, errno.EBADF)
+ self.assertEqual(e.exception.errno, errno.EBADF)
def test_non_blocking_handshake(self):
- with support.transient_internet(REMOTE_HOST):
- s = socket.socket(socket.AF_INET)
- s.connect((REMOTE_HOST, 443))
- s.setblocking(False)
- s = ssl.wrap_socket(s,
- cert_reqs=ssl.CERT_NONE,
- do_handshake_on_connect=False)
- count = 0
- while True:
- try:
- count += 1
- s.do_handshake()
- break
- except ssl.SSLWantReadError:
- select.select([s], [], [])
- except ssl.SSLWantWriteError:
- select.select([], [s], [])
- s.close()
- if support.verbose:
- sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
+ s = socket.socket(socket.AF_INET)
+ s.connect(self.server_addr)
+ s.setblocking(False)
+ s = test_wrap_socket(s,
+ cert_reqs=ssl.CERT_NONE,
+ do_handshake_on_connect=False)
+ self.addCleanup(s.close)
+ count = 0
+ while True:
+ try:
+ count += 1
+ s.do_handshake()
+ break
+ except ssl.SSLWantReadError:
+ select.select([s], [], [])
+ except ssl.SSLWantWriteError:
+ select.select([], [s], [])
+ if support.verbose:
+ sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
def test_get_server_certificate(self):
- def _test_get_server_certificate(host, port, cert=None):
- with support.transient_internet(host):
- pem = ssl.get_server_certificate((host, port))
- if not pem:
- self.fail("No server certificate on %s:%s!" % (host, port))
+ _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
- try:
- pem = ssl.get_server_certificate((host, port),
- ca_certs=CERTFILE)
- except ssl.SSLError as x:
- #should fail
- if support.verbose:
- sys.stdout.write("%s\n" % x)
- else:
- self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
-
- pem = ssl.get_server_certificate((host, port),
- ca_certs=cert)
- if not pem:
- self.fail("No server certificate on %s:%s!" % (host, port))
- if support.verbose:
- sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
-
- _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
- if support.IPV6_ENABLED:
- _test_get_server_certificate('ipv6.google.com', 443)
+ def test_get_server_certificate_fail(self):
+ # Connection failure crashes ThreadedEchoServer, so run this in an
+ # independent test method
+ _test_get_server_certificate_fail(self, *self.server_addr)
def test_ciphers(self):
- remote = (REMOTE_HOST, 443)
- with support.transient_internet(remote[0]):
- with ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
- s.connect(remote)
- with ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
- s.connect(remote)
- # Error checking can happen at instantiation or when connecting
- with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
- with socket.socket(socket.AF_INET) as sock:
- s = ssl.wrap_socket(sock,
- cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
- s.connect(remote)
-
- def test_algorithms(self):
- # Issue #8484: all algorithms should be available when verifying a
- # certificate.
- # SHA256 was added in OpenSSL 0.9.8
- if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
- self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
- # sha256.tbs-internet.com needs SNI to use the correct certificate
- if not ssl.HAS_SNI:
- self.skipTest("SNI needed for this test")
- # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
- remote = ("sha256.tbs-internet.com", 443)
- sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
- with support.transient_internet("sha256.tbs-internet.com"):
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(sha256_cert)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET),
- server_hostname="sha256.tbs-internet.com")
- try:
- s.connect(remote)
- if support.verbose:
- sys.stdout.write("\nCipher with %r is %r\n" %
- (remote, s.cipher()))
- sys.stdout.write("Certificate is:\n%s\n" %
- pprint.pformat(s.getpeercert()))
- finally:
- s.close()
+ with test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
+ s.connect(self.server_addr)
+ with test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
+ s.connect(self.server_addr)
+ # Error checking can happen at instantiation or when connecting
+ with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
+ with socket.socket(socket.AF_INET) as sock:
+ s = test_wrap_socket(sock,
+ cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
+ s.connect(self.server_addr)
def test_get_ca_certs_capath(self):
# capath certs are loaded on request
- with support.transient_internet(REMOTE_HOST):
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(capath=CAPATH)
- self.assertEqual(ctx.get_ca_certs(), [])
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- cert = s.getpeercert()
- self.assertTrue(cert)
- finally:
- s.close()
- self.assertEqual(len(ctx.get_ca_certs()), 1)
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(capath=CAPATH)
+ self.assertEqual(ctx.get_ca_certs(), [])
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
+ self.assertEqual(len(ctx.get_ca_certs()), 1)
@needs_sni
def test_context_setget(self):
# Check that the context of a connected socket can be replaced.
- with support.transient_internet(REMOTE_HOST):
- ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- s = socket.socket(socket.AF_INET)
- with ctx1.wrap_socket(s) as ss:
- ss.connect((REMOTE_HOST, 443))
- self.assertIs(ss.context, ctx1)
- self.assertIs(ss._sslobj.context, ctx1)
- ss.context = ctx2
- self.assertIs(ss.context, ctx2)
- self.assertIs(ss._sslobj.context, ctx2)
-
-
-class NetworkedBIOTests(unittest.TestCase):
+ ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ s = socket.socket(socket.AF_INET)
+ with ctx1.wrap_socket(s) as ss:
+ ss.connect(self.server_addr)
+ self.assertIs(ss.context, ctx1)
+ self.assertIs(ss._sslobj.context, ctx1)
+ ss.context = ctx2
+ self.assertIs(ss.context, ctx2)
+ self.assertIs(ss._sslobj.context, ctx2)
def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
# A simple IO loop. Call func(*args) depending on the error we get
@@ -1765,64 +1723,128 @@ class NetworkedBIOTests(unittest.TestCase):
% (count, func.__name__))
return ret
- def test_handshake(self):
+ def test_bio_handshake(self):
+ sock = socket.socket(socket.AF_INET)
+ self.addCleanup(sock.close)
+ sock.connect(self.server_addr)
+ incoming = ssl.MemoryBIO()
+ outgoing = ssl.MemoryBIO()
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(SIGNING_CA)
+ ctx.check_hostname = True
+ sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
+ self.assertIs(sslobj._sslobj.owner, sslobj)
+ self.assertIsNone(sslobj.cipher())
+ self.assertIsNotNone(sslobj.shared_ciphers())
+ self.assertRaises(ValueError, sslobj.getpeercert)
+ if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
+ self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
+ self.assertTrue(sslobj.cipher())
+ self.assertIsNotNone(sslobj.shared_ciphers())
+ self.assertTrue(sslobj.getpeercert())
+ if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
+ self.assertTrue(sslobj.get_channel_binding('tls-unique'))
+ try:
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
+ except ssl.SSLSyscallError:
+ # If the server shuts down the TCP connection without sending a
+ # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
+ pass
+ self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
+
+ def test_bio_read_write_data(self):
+ sock = socket.socket(socket.AF_INET)
+ self.addCleanup(sock.close)
+ sock.connect(self.server_addr)
+ incoming = ssl.MemoryBIO()
+ outgoing = ssl.MemoryBIO()
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_NONE
+ sslobj = ctx.wrap_bio(incoming, outgoing, False)
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
+ req = b'FOO\n'
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
+ buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
+ self.assertEqual(buf, b'foo\n')
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
+
+
+class NetworkedTests(unittest.TestCase):
+
+ def test_timeout_connect_ex(self):
+ # Issue #12065: on a timeout, connect_ex() should return the original
+ # errno (mimicking the behaviour of non-SSL sockets).
with support.transient_internet(REMOTE_HOST):
- sock = socket.socket(socket.AF_INET)
- sock.connect((REMOTE_HOST, 443))
- incoming = ssl.MemoryBIO()
- outgoing = ssl.MemoryBIO()
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ s = test_wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED,
+ do_handshake_on_connect=False)
+ self.addCleanup(s.close)
+ s.settimeout(0.0000001)
+ rc = s.connect_ex((REMOTE_HOST, 443))
+ if rc == 0:
+ self.skipTest("REMOTE_HOST responded too quickly")
+ self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
+
+ @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
+ def test_get_server_certificate_ipv6(self):
+ with support.transient_internet('ipv6.google.com'):
+ _test_get_server_certificate(self, 'ipv6.google.com', 443)
+ _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
+
+ def test_algorithms(self):
+ # Issue #8484: all algorithms should be available when verifying a
+ # certificate.
+ # SHA256 was added in OpenSSL 0.9.8
+ if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
+ self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
+ # sha256.tbs-internet.com needs SNI to use the correct certificate
+ if not ssl.HAS_SNI:
+ self.skipTest("SNI needed for this test")
+ # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
+ remote = ("sha256.tbs-internet.com", 443)
+ sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
+ with support.transient_internet("sha256.tbs-internet.com"):
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(REMOTE_ROOT_CERT)
- ctx.check_hostname = True
- sslobj = ctx.wrap_bio(incoming, outgoing, False, REMOTE_HOST)
- self.assertIs(sslobj._sslobj.owner, sslobj)
- self.assertIsNone(sslobj.cipher())
- self.assertIsNotNone(sslobj.shared_ciphers())
- self.assertRaises(ValueError, sslobj.getpeercert)
- if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
- self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
- self.assertTrue(sslobj.cipher())
- self.assertIsNotNone(sslobj.shared_ciphers())
- self.assertTrue(sslobj.getpeercert())
- if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
- self.assertTrue(sslobj.get_channel_binding('tls-unique'))
+ ctx.load_verify_locations(sha256_cert)
+ s = ctx.wrap_socket(socket.socket(socket.AF_INET),
+ server_hostname="sha256.tbs-internet.com")
try:
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
- except ssl.SSLSyscallError:
- # self-signed.pythontest.net probably shuts down the TCP
- # connection without sending a secure shutdown message, and
- # this is reported as SSL_ERROR_SYSCALL
- pass
- self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
- sock.close()
+ s.connect(remote)
+ if support.verbose:
+ sys.stdout.write("\nCipher with %r is %r\n" %
+ (remote, s.cipher()))
+ sys.stdout.write("Certificate is:\n%s\n" %
+ pprint.pformat(s.getpeercert()))
+ finally:
+ s.close()
- def test_read_write_data(self):
- with support.transient_internet(REMOTE_HOST):
- sock = socket.socket(socket.AF_INET)
- sock.connect((REMOTE_HOST, 443))
- incoming = ssl.MemoryBIO()
- outgoing = ssl.MemoryBIO()
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_NONE
- sslobj = ctx.wrap_bio(incoming, outgoing, False)
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
- req = b'GET / HTTP/1.0\r\n\r\n'
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
- buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
- self.assertEqual(buf[:5], b'HTTP/')
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
- sock.close()
+def _test_get_server_certificate(test, host, port, cert=None):
+ pem = ssl.get_server_certificate((host, port))
+ if not pem:
+ test.fail("No server certificate on %s:%s!" % (host, port))
+
+ pem = ssl.get_server_certificate((host, port), ca_certs=cert)
+ if not pem:
+ test.fail("No server certificate on %s:%s!" % (host, port))
+ if support.verbose:
+ sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
+
+def _test_get_server_certificate_fail(test, host, port):
+ try:
+ pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
+ except ssl.SSLError as x:
+ #should fail
+ if support.verbose:
+ sys.stdout.write("%s\n" % x)
+ else:
+ test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
-try:
- import threading
-except ImportError:
- _have_threads = False
-else:
- _have_threads = True
+if _have_threads:
from test.ssl_servers import make_https_server
class ThreadedEchoServer(threading.Thread):
@@ -1910,6 +1932,15 @@ else:
if not stripped:
# eof, so quit this handler
self.running = False
+ try:
+ self.sock = self.sslconn.unwrap()
+ except OSError:
+ # Many tests shut the TCP connection down
+ # without an SSL shutdown. This causes
+ # unwrap() to raise OSError with errno=0!
+ pass
+ else:
+ self.sslconn = None
self.close()
elif stripped == b'over':
if support.verbose and self.server.connectionchatty:
@@ -2037,7 +2068,7 @@ else:
class ConnectionHandler (asyncore.dispatcher_with_send):
def __init__(self, conn, certfile):
- self.socket = ssl.wrap_socket(conn, server_side=True,
+ self.socket = test_wrap_socket(conn, server_side=True,
certfile=certfile,
do_handshake_on_connect=False)
asyncore.dispatcher_with_send.__init__(self, self.socket)
@@ -2145,7 +2176,8 @@ else:
self.server.close()
def server_params_test(client_context, server_context, indata=b"FOO\n",
- chatty=True, connectionchatty=False, sni_name=None):
+ chatty=True, connectionchatty=False, sni_name=None,
+ session=None):
"""
Launch a server, connect a client to it and try various reads
and writes.
@@ -2156,7 +2188,7 @@ else:
connectionchatty=False)
with server:
with client_context.wrap_socket(socket.socket(),
- server_hostname=sni_name) as s:
+ server_hostname=sni_name, session=session) as s:
s.connect((HOST, server.port))
for arg in [indata, bytearray(indata), memoryview(indata)]:
if connectionchatty:
@@ -2184,6 +2216,8 @@ else:
'client_alpn_protocol': s.selected_alpn_protocol(),
'client_npn_protocol': s.selected_npn_protocol(),
'version': s.version(),
+ 'session_reused': s.session_reused,
+ 'session': s.session,
})
s.close()
stats['server_alpn_protocols'] = server.selected_alpn_protocols
@@ -2259,12 +2293,53 @@ else:
if support.verbose:
sys.stdout.write("\n")
for protocol in PROTOCOLS:
+ if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
+ continue
with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
context = ssl.SSLContext(protocol)
context.load_cert_chain(CERTFILE)
server_params_test(context, context,
chatty=True, connectionchatty=True)
+ client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ client_context.load_verify_locations(SIGNING_CA)
+ server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ # server_context.load_verify_locations(SIGNING_CA)
+ server_context.load_cert_chain(SIGNED_CERTFILE2)
+
+ with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
+ server_params_test(client_context=client_context,
+ server_context=server_context,
+ chatty=True, connectionchatty=True,
+ sni_name='fakehostname')
+
+ client_context.check_hostname = False
+ with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
+ with self.assertRaises(ssl.SSLError) as e:
+ server_params_test(client_context=server_context,
+ server_context=client_context,
+ chatty=True, connectionchatty=True,
+ sni_name='fakehostname')
+ self.assertIn('called a function you should not call',
+ str(e.exception))
+
+ with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
+ with self.assertRaises(ssl.SSLError) as e:
+ server_params_test(client_context=server_context,
+ server_context=server_context,
+ chatty=True, connectionchatty=True)
+ self.assertIn('called a function you should not call',
+ str(e.exception))
+
+ with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
+ with self.assertRaises(ssl.SSLError) as e:
+ server_params_test(client_context=server_context,
+ server_context=client_context,
+ chatty=True, connectionchatty=True)
+ self.assertIn('called a function you should not call',
+ str(e.exception))
+
+
def test_getpeercert(self):
if support.verbose:
sys.stdout.write("\n")
@@ -2398,7 +2473,7 @@ else:
connectionchatty=False)
with server, \
socket.socket() as sock, \
- ssl.wrap_socket(sock,
+ test_wrap_socket(sock,
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1) as s:
try:
@@ -2445,7 +2520,7 @@ else:
c.connect((HOST, port))
listener_gone.wait()
try:
- ssl_sock = ssl.wrap_socket(c)
+ ssl_sock = test_wrap_socket(c)
except OSError:
pass
else:
@@ -2635,7 +2710,7 @@ else:
sys.stdout.write(
" client: read %r from server, starting TLS...\n"
% msg)
- conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
+ conn = test_wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
wrapped = True
elif indata == b"ENDTLS" and msg.startswith(b"ok"):
# ENDTLS ok, switch back to clear text
@@ -2694,7 +2769,7 @@ else:
indata = b"FOO\n"
server = AsyncoreEchoServer(CERTFILE)
with server:
- s = ssl.wrap_socket(socket.socket())
+ s = test_wrap_socket(socket.socket())
s.connect(('127.0.0.1', server.port))
if support.verbose:
sys.stdout.write(
@@ -2727,7 +2802,7 @@ else:
chatty=True,
connectionchatty=False)
with server:
- s = ssl.wrap_socket(socket.socket(),
+ s = test_wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
@@ -2745,12 +2820,13 @@ else:
count, addr = s.recvfrom_into(b)
return b[:count]
- # (name, method, whether to expect success, *args)
+ # (name, method, expect success?, *args, return value func)
send_methods = [
- ('send', s.send, True, []),
- ('sendto', s.sendto, False, ["some.address"]),
- ('sendall', s.sendall, True, []),
+ ('send', s.send, True, [], len),
+ ('sendto', s.sendto, False, ["some.address"], len),
+ ('sendall', s.sendall, True, [], lambda x: None),
]
+ # (name, method, whether to expect success, *args)
recv_methods = [
('recv', s.recv, True, []),
('recvfrom', s.recvfrom, False, ["some.address"]),
@@ -2759,10 +2835,13 @@ else:
]
data_prefix = "PREFIX_"
- for meth_name, send_meth, expect_success, args in send_methods:
+ for (meth_name, send_meth, expect_success, args,
+ ret_val_meth) in send_methods:
indata = (data_prefix + meth_name).encode('ascii')
try:
- send_meth(indata, *args)
+ ret = send_meth(indata, *args)
+ msg = "sending with {}".format(meth_name)
+ self.assertEqual(ret, ret_val_meth(indata), msg=msg)
outdata = s.read()
if outdata != indata.lower():
self.fail(
@@ -2847,7 +2926,7 @@ else:
self.addCleanup(server.__exit__, None, None)
s = socket.create_connection((HOST, server.port))
self.addCleanup(s.close)
- s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
+ s = test_wrap_socket(s, suppress_ragged_eofs=False)
self.addCleanup(s.close)
# recv/read(0) should return no data
@@ -2869,7 +2948,7 @@ else:
chatty=True,
connectionchatty=False)
with server:
- s = ssl.wrap_socket(socket.socket(),
+ s = test_wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
@@ -2923,12 +3002,12 @@ else:
c.connect((host, port))
# Will attempt handshake and time out
self.assertRaisesRegex(socket.timeout, "timed out",
- ssl.wrap_socket, c)
+ test_wrap_socket, c)
finally:
c.close()
try:
c = socket.socket(socket.AF_INET)
- c = ssl.wrap_socket(c)
+ c = test_wrap_socket(c)
c.settimeout(0.2)
# Will attempt handshake and time out
self.assertRaisesRegex(socket.timeout, "timed out",
@@ -2951,6 +3030,7 @@ else:
host = "127.0.0.1"
port = support.bind_port(server)
server = context.wrap_socket(server, server_side=True)
+ self.assertTrue(server.server_side)
evt = threading.Event()
remote = None
@@ -3053,7 +3133,7 @@ else:
chatty=True,
connectionchatty=False)
with server:
- s = ssl.wrap_socket(socket.socket(),
+ s = test_wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
@@ -3078,7 +3158,7 @@ else:
s.close()
# now, again
- s = ssl.wrap_socket(socket.socket(),
+ s = test_wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
@@ -3388,6 +3468,111 @@ else:
s.sendfile(file)
self.assertEqual(s.recv(1024), TEST_DATA)
+ def test_session(self):
+ server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ server_context.load_cert_chain(SIGNED_CERTFILE)
+ client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ client_context.verify_mode = ssl.CERT_REQUIRED
+ client_context.load_verify_locations(SIGNING_CA)
+
+ # first connection without session
+ stats = server_params_test(client_context, server_context)
+ session = stats['session']
+ self.assertTrue(session.id)
+ self.assertGreater(session.time, 0)
+ self.assertGreater(session.timeout, 0)
+ self.assertTrue(session.has_ticket)
+ if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
+ self.assertGreater(session.ticket_lifetime_hint, 0)
+ self.assertFalse(stats['session_reused'])
+ sess_stat = server_context.session_stats()
+ self.assertEqual(sess_stat['accept'], 1)
+ self.assertEqual(sess_stat['hits'], 0)
+
+ # reuse session
+ stats = server_params_test(client_context, server_context, session=session)
+ sess_stat = server_context.session_stats()
+ self.assertEqual(sess_stat['accept'], 2)
+ self.assertEqual(sess_stat['hits'], 1)
+ self.assertTrue(stats['session_reused'])
+ session2 = stats['session']
+ self.assertEqual(session2.id, session.id)
+ self.assertEqual(session2, session)
+ self.assertIsNot(session2, session)
+ self.assertGreaterEqual(session2.time, session.time)
+ self.assertGreaterEqual(session2.timeout, session.timeout)
+
+ # another one without session
+ stats = server_params_test(client_context, server_context)
+ self.assertFalse(stats['session_reused'])
+ session3 = stats['session']
+ self.assertNotEqual(session3.id, session.id)
+ self.assertNotEqual(session3, session)
+ sess_stat = server_context.session_stats()
+ self.assertEqual(sess_stat['accept'], 3)
+ self.assertEqual(sess_stat['hits'], 1)
+
+ # reuse session again
+ stats = server_params_test(client_context, server_context, session=session)
+ self.assertTrue(stats['session_reused'])
+ session4 = stats['session']
+ self.assertEqual(session4.id, session.id)
+ self.assertEqual(session4, session)
+ self.assertGreaterEqual(session4.time, session.time)
+ self.assertGreaterEqual(session4.timeout, session.timeout)
+ sess_stat = server_context.session_stats()
+ self.assertEqual(sess_stat['accept'], 4)
+ self.assertEqual(sess_stat['hits'], 2)
+
+ def test_session_handling(self):
+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context.verify_mode = ssl.CERT_REQUIRED
+ context.load_verify_locations(CERTFILE)
+ context.load_cert_chain(CERTFILE)
+
+ context2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context2.verify_mode = ssl.CERT_REQUIRED
+ context2.load_verify_locations(CERTFILE)
+ context2.load_cert_chain(CERTFILE)
+
+ server = ThreadedEchoServer(context=context, chatty=False)
+ with server:
+ with context.wrap_socket(socket.socket()) as s:
+ # session is None before handshake
+ self.assertEqual(s.session, None)
+ self.assertEqual(s.session_reused, None)
+ s.connect((HOST, server.port))
+ session = s.session
+ self.assertTrue(session)
+ with self.assertRaises(TypeError) as e:
+ s.session = object
+ self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
+
+ with context.wrap_socket(socket.socket()) as s:
+ s.connect((HOST, server.port))
+ # cannot set session after handshake
+ with self.assertRaises(ValueError) as e:
+ s.session = session
+ self.assertEqual(str(e.exception),
+ 'Cannot set session after handshake.')
+
+ with context.wrap_socket(socket.socket()) as s:
+ # can set session before handshake and before the
+ # connection was established
+ s.session = session
+ s.connect((HOST, server.port))
+ self.assertEqual(s.session.id, session.id)
+ self.assertEqual(s.session, session)
+ self.assertEqual(s.session_reused, True)
+
+ with context2.wrap_socket(socket.socket()) as s:
+ # cannot re-use session with a different SSLContext
+ with self.assertRaises(ValueError) as e:
+ s.session = session
+ s.connect((HOST, server.port))
+ self.assertEqual(str(e.exception),
+ 'Session refers to a different SSLContext.')
+
def test_main(verbose=False):
if support.verbose:
@@ -3400,7 +3585,7 @@ def test_main(verbose=False):
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
- 'dist\(\) and linux_distribution\(\) '
+ r'dist\(\) and linux_distribution\(\) '
'functions are deprecated .*',
PendingDeprecationWarning,
)
@@ -3422,18 +3607,20 @@ def test_main(verbose=False):
pass
for filename in [
- CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
+ CERTFILE, BYTES_CERTFILE,
ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
BADCERT, BADKEY, EMPTYCERT]:
if not os.path.exists(filename):
raise support.TestFailed("Can't read certificate file %r" % filename)
- tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests]
+ tests = [
+ ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
+ SimpleBackgroundTests,
+ ]
if support.is_resource_enabled('network'):
tests.append(NetworkedTests)
- tests.append(NetworkedBIOTests)
if _have_threads:
thread_info = support.threading_setup()