summaryrefslogtreecommitdiff
path: root/Lib/test/test_socket.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_socket.py')
-rw-r--r--Lib/test/test_socket.py253
1 files changed, 246 insertions, 7 deletions
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 70c03f9c46..97dc3cd76b 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -13,7 +13,6 @@ import queue
import sys
import os
import array
-import platform
import contextlib
from weakref import proxy
import signal
@@ -66,10 +65,22 @@ def _have_socket_rds():
s.close()
return True
+def _have_socket_alg():
+ """Check whether AF_ALG sockets are supported on this host."""
+ try:
+ s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
+ except (AttributeError, OSError):
+ return False
+ else:
+ s.close()
+ return True
+
HAVE_SOCKET_CAN = _have_socket_can()
HAVE_SOCKET_RDS = _have_socket_rds()
+HAVE_SOCKET_ALG = _have_socket_alg()
+
# Size in bytes of the int type
SIZEOF_INT = array.array("i").itemsize
@@ -267,8 +278,14 @@ class ThreadableTest:
def clientRun(self, test_func):
self.server_ready.wait()
- self.clientSetUp()
- self.client_ready.set()
+ try:
+ self.clientSetUp()
+ except BaseException as e:
+ self.queue.put(e)
+ self.clientTearDown()
+ return
+ finally:
+ self.client_ready.set()
if self.server_crashed:
self.clientTearDown()
return
@@ -509,8 +526,11 @@ class ConnectedStreamTestMixin(SocketListeningTestMixin,
self.serv_conn = self.cli
def clientTearDown(self):
- self.serv_conn.close()
- self.serv_conn = None
+ try:
+ self.serv_conn.close()
+ self.serv_conn = None
+ except AttributeError:
+ pass
super().clientTearDown()
@@ -529,7 +549,7 @@ class UnixSocketTestBase(SocketTestBase):
def bindSock(self, sock):
path = tempfile.mktemp(dir=self.dir_path)
- sock.bind(path)
+ support.bind_unix_socket(sock, path)
self.addCleanup(support.unlink, path)
class UnixStreamBase(UnixSocketTestBase):
@@ -1161,6 +1181,17 @@ class GeneralModuleTests(unittest.TestCase):
sock.close()
self.assertRaises(OSError, sock.send, b"spam")
+ def testCloseException(self):
+ sock = socket.socket()
+ socket.socket(fileno=sock.fileno()).close()
+ try:
+ sock.close()
+ except OSError as err:
+ # Winsock apparently raises ENOTSOCK
+ self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
+ else:
+ self.fail("close() should raise EBADF/ENOTSOCK")
+
def testNewAttributes(self):
# testing .family, .type and .protocol
@@ -1207,6 +1238,22 @@ class GeneralModuleTests(unittest.TestCase):
self.assertRaises(ValueError, s.ioctl, -1, None)
s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
+ @unittest.skipUnless(os.name == "nt", "Windows specific")
+ @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
+ 'Loopback fast path support required for this test')
+ def test_sio_loopback_fast_path(self):
+ s = socket.socket()
+ self.addCleanup(s.close)
+ try:
+ s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
+ except OSError as exc:
+ WSAEOPNOTSUPP = 10045
+ if exc.winerror == WSAEOPNOTSUPP:
+ self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
+ "doesn't implemented in this Windows version")
+ raise
+ self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
+
def testGetaddrinfo(self):
try:
socket.getaddrinfo('localhost', 80)
@@ -2843,6 +2890,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase):
nbytes = self.sendmsgToServer([msg])
self.assertEqual(nbytes, len(msg))
+ @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
def testFDPassEmpty(self):
# Try to pass an empty FD array. Can receive either no array
# or an empty array.
@@ -4514,6 +4562,19 @@ class TestExceptions(unittest.TestCase):
self.assertTrue(issubclass(socket.gaierror, OSError))
self.assertTrue(issubclass(socket.timeout, OSError))
+ def test_setblocking_invalidfd(self):
+ # Regression test for issue #28471
+
+ sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ sock = socket.socket(
+ socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
+ sock0.close()
+ self.addCleanup(sock.detach)
+
+ with self.assertRaises(OSError):
+ sock.setblocking(False)
+
+
@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase):
@@ -4579,7 +4640,7 @@ class TestUnixDomain(unittest.TestCase):
def bind(self, sock, path):
# Bind the socket
try:
- sock.bind(path)
+ support.bind_unix_socket(sock, path)
except OSError as e:
if str(e) == "AF_UNIX path too long":
self.skipTest(
@@ -5325,6 +5386,183 @@ class SendfileUsingSendfileTest(SendfileUsingSendTest):
return getattr(sock, "_sendfile_use_sendfile")
+@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
+class LinuxKernelCryptoAPI(unittest.TestCase):
+ # tests for AF_ALG
+ def create_alg(self, typ, name):
+ sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
+ try:
+ sock.bind((typ, name))
+ except FileNotFoundError as e:
+ # type / algorithm is not available
+ sock.close()
+ raise unittest.SkipTest(str(e), typ, name)
+ else:
+ return sock
+
+ def test_sha256(self):
+ expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
+ "177a9cb410ff61f20015ad")
+ with self.create_alg('hash', 'sha256') as algo:
+ op, _ = algo.accept()
+ with op:
+ op.sendall(b"abc")
+ self.assertEqual(op.recv(512), expected)
+
+ op, _ = algo.accept()
+ with op:
+ op.send(b'a', socket.MSG_MORE)
+ op.send(b'b', socket.MSG_MORE)
+ op.send(b'c', socket.MSG_MORE)
+ op.send(b'')
+ self.assertEqual(op.recv(512), expected)
+
+ def test_hmac_sha1(self):
+ expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
+ with self.create_alg('hash', 'hmac(sha1)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
+ op, _ = algo.accept()
+ with op:
+ op.sendall(b"what do ya want for nothing?")
+ self.assertEqual(op.recv(512), expected)
+
+ # Although it should work with 3.19 and newer the test blocks on
+ # Ubuntu 15.10 with Kernel 4.2.0-19.
+ @support.requires_linux_version(4, 3)
+ def test_aes_cbc(self):
+ key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
+ iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
+ msg = b"Single block msg"
+ ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
+ msglen = len(msg)
+ with self.create_alg('skcipher', 'cbc(aes)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
+ flags=socket.MSG_MORE)
+ op.sendall(msg)
+ self.assertEqual(op.recv(msglen), ciphertext)
+
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg([ciphertext],
+ op=socket.ALG_OP_DECRYPT, iv=iv)
+ self.assertEqual(op.recv(msglen), msg)
+
+ # long message
+ multiplier = 1024
+ longmsg = [msg] * multiplier
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(longmsg,
+ op=socket.ALG_OP_ENCRYPT, iv=iv)
+ enc = op.recv(msglen * multiplier)
+ self.assertEqual(len(enc), msglen * multiplier)
+ self.assertTrue(enc[:msglen], ciphertext)
+
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg([enc],
+ op=socket.ALG_OP_DECRYPT, iv=iv)
+ dec = op.recv(msglen * multiplier)
+ self.assertEqual(len(dec), msglen * multiplier)
+ self.assertEqual(dec, msg * multiplier)
+
+ @support.requires_linux_version(4, 3) # see test_aes_cbc
+ def test_aead_aes_gcm(self):
+ key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
+ iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
+ plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
+ assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
+ expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
+ expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
+
+ taglen = len(expected_tag)
+ assoclen = len(assoc)
+
+ with self.create_alg('aead', 'gcm(aes)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
+ None, taglen)
+
+ # send assoc, plain and tag buffer in separate steps
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
+ assoclen=assoclen, flags=socket.MSG_MORE)
+ op.sendall(assoc, socket.MSG_MORE)
+ op.sendall(plain, socket.MSG_MORE)
+ op.sendall(b'\x00' * taglen)
+ res = op.recv(assoclen + len(plain) + taglen)
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+
+ # now with msg
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + plain + b'\x00' * taglen
+ op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
+ assoclen=assoclen)
+ res = op.recv(assoclen + len(plain) + taglen)
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+
+ # create anc data manually
+ pack_uint32 = struct.Struct('I').pack
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + plain + b'\x00' * taglen
+ op.sendmsg(
+ [msg],
+ ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
+ [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
+ [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
+ )
+ )
+ res = op.recv(len(msg))
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+
+ # decrypt and verify
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + expected_ct + expected_tag
+ op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
+ assoclen=assoclen)
+ res = op.recv(len(msg))
+ self.assertEqual(plain, res[assoclen:-taglen])
+
+ @support.requires_linux_version(4, 3) # see test_aes_cbc
+ def test_drbg_pr_sha256(self):
+ # deterministic random bit generator, prediction resistance, sha256
+ with self.create_alg('rng', 'drbg_pr_sha256') as algo:
+ extra_seed = os.urandom(32)
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
+ op, _ = algo.accept()
+ with op:
+ rn = op.recv(32)
+ self.assertEqual(len(rn), 32)
+
+ def test_sendmsg_afalg_args(self):
+ sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
+ with sock:
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg()
+
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=None)
+
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(1)
+
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
+
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
+
+
def test_main():
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, UDPTimeoutTest ]
@@ -5351,6 +5589,7 @@ def test_main():
tests.extend([TIPCTest, TIPCThreadableTest])
tests.extend([BasicCANTest, CANTest])
tests.extend([BasicRDSTest, RDSTest])
+ tests.append(LinuxKernelCryptoAPI)
tests.extend([
CmsgMacroTests,
SendmsgUDPTest,