From 9d4b88aabc07af29601775a537b476c959f5f11b Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 18 Dec 2012 23:10:48 +0200 Subject: Issue #16717: get rid of socket.error, replace with OSError --- Lib/test/test_socket.py | 112 ++++++++++++++++++++++++------------------------ 1 file changed, 56 insertions(+), 56 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 0ce97295fd..c0ccb5d96d 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -46,7 +46,7 @@ def _have_socket_can(): """Check whether CAN sockets are supported on this host.""" try: s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) - except (AttributeError, socket.error, OSError): + except (AttributeError, OSError): return False else: s.close() @@ -126,7 +126,7 @@ class SocketCANTest(unittest.TestCase): self.addCleanup(self.s.close) try: self.s.bind((self.interface,)) - except socket.error: + except OSError: self.skipTest('network interface `%s` does not exist' % self.interface) @@ -295,7 +295,7 @@ class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) try: self.cli.bind((self.interface,)) - except socket.error: + except OSError: # skipTest should not be called here, and will be called in the # server instead pass @@ -608,7 +608,7 @@ def requireSocket(*args): for obj in args] try: s = socket.socket(*callargs) - except socket.error as e: + except OSError as e: # XXX: check errno? err = str(e) else: @@ -645,11 +645,11 @@ class GeneralModuleTests(unittest.TestCase): def testSocketError(self): # Testing socket module exceptions msg = "Error raising socket exception (%s)." - with self.assertRaises(socket.error, msg=msg % 'socket.error'): - raise socket.error - with self.assertRaises(socket.error, msg=msg % 'socket.herror'): + with self.assertRaises(OSError, msg=msg % 'OSError'): + raise OSError + with self.assertRaises(OSError, msg=msg % 'socket.herror'): raise socket.herror - with self.assertRaises(socket.error, msg=msg % 'socket.gaierror'): + with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): raise socket.gaierror def testSendtoErrors(self): @@ -712,13 +712,13 @@ class GeneralModuleTests(unittest.TestCase): hostname = socket.gethostname() try: ip = socket.gethostbyname(hostname) - except socket.error: + except OSError: # Probably name lookup wasn't set up right; skip this test return self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") try: hname, aliases, ipaddrs = socket.gethostbyaddr(ip) - except socket.error: + except OSError: # Probably a similar problem as above; skip this test return all_host_names = [hostname, hname] + aliases @@ -732,7 +732,7 @@ class GeneralModuleTests(unittest.TestCase): oldhn = socket.gethostname() try: socket.sethostname('new') - except socket.error as e: + except OSError as e: if e.errno == errno.EPERM: self.skipTest("test should be run as root") else: @@ -766,8 +766,8 @@ class GeneralModuleTests(unittest.TestCase): 'socket.if_nameindex() not available.') def testInvalidInterfaceNameIndex(self): # test nonexistent interface index/name - self.assertRaises(socket.error, socket.if_indextoname, 0) - self.assertRaises(socket.error, socket.if_nametoindex, '_DEADBEEF') + self.assertRaises(OSError, socket.if_indextoname, 0) + self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') # test with invalid values self.assertRaises(TypeError, socket.if_nametoindex, 0) self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') @@ -788,7 +788,7 @@ class GeneralModuleTests(unittest.TestCase): try: # On some versions, this crashes the interpreter. socket.getnameinfo(('x', 0, 0, 0), 0) - except socket.error: + except OSError: pass def testNtoH(self): @@ -835,17 +835,17 @@ class GeneralModuleTests(unittest.TestCase): try: port = socket.getservbyname(service, 'tcp') break - except socket.error: + except OSError: pass else: - raise socket.error + raise OSError # Try same call with optional protocol omitted port2 = socket.getservbyname(service) eq(port, port2) # Try udp, but don't barf it it doesn't exist try: udpport = socket.getservbyname(service, 'udp') - except socket.error: + except OSError: udpport = None else: eq(udpport, port) @@ -901,7 +901,7 @@ class GeneralModuleTests(unittest.TestCase): g = lambda a: inet_pton(AF_INET, a) assertInvalid = lambda func,a: self.assertRaises( - (socket.error, ValueError), func, a + (OSError, ValueError), func, a ) self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) @@ -936,7 +936,7 @@ class GeneralModuleTests(unittest.TestCase): return f = lambda a: inet_pton(AF_INET6, a) assertInvalid = lambda a: self.assertRaises( - (socket.error, ValueError), f, a + (OSError, ValueError), f, a ) self.assertEqual(b'\x00' * 16, f('::')) @@ -985,7 +985,7 @@ class GeneralModuleTests(unittest.TestCase): from socket import inet_ntoa as f, inet_ntop, AF_INET g = lambda a: inet_ntop(AF_INET, a) assertInvalid = lambda func,a: self.assertRaises( - (socket.error, ValueError), func, a + (OSError, ValueError), func, a ) self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) @@ -1014,7 +1014,7 @@ class GeneralModuleTests(unittest.TestCase): return f = lambda a: inet_ntop(AF_INET6, a) assertInvalid = lambda a: self.assertRaises( - (socket.error, ValueError), f, a + (OSError, ValueError), f, a ) self.assertEqual('::', f(b'\x00' * 16)) @@ -1042,7 +1042,7 @@ class GeneralModuleTests(unittest.TestCase): # At least for eCos. This is required for the S/390 to pass. try: my_ip_addr = socket.gethostbyname(socket.gethostname()) - except socket.error: + except OSError: # Probably name lookup wasn't set up right; skip this test return self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) @@ -1069,7 +1069,7 @@ class GeneralModuleTests(unittest.TestCase): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) sock.close() - self.assertRaises(socket.error, sock.send, b"spam") + self.assertRaises(OSError, sock.send, b"spam") def testNewAttributes(self): # testing .family, .type and .protocol @@ -1168,7 +1168,7 @@ class GeneralModuleTests(unittest.TestCase): def test_getnameinfo(self): # only IP addresses are allowed - self.assertRaises(socket.error, socket.getnameinfo, ('mail.python.org',0), 0) + self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) @unittest.skipUnless(support.is_resource_enabled('network'), 'network is not enabled') @@ -1296,7 +1296,7 @@ class BasicCANTest(unittest.TestCase): def testTooLongInterfaceName(self): # most systems limit IFNAMSIZ to 16, take 1024 to be sure with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: - self.assertRaisesRegex(socket.error, 'interface name too long', + self.assertRaisesRegex(OSError, 'interface name too long', s.bind, ('x' * 1024,)) @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), @@ -1591,7 +1591,7 @@ class BasicTCPTest(SocketConnectedTest): self.assertEqual(f, fileno) # cli_conn cannot be used anymore... self.assertTrue(self.cli_conn._closed) - self.assertRaises(socket.error, self.cli_conn.recv, 1024) + self.assertRaises(OSError, self.cli_conn.recv, 1024) self.cli_conn.close() # ...but we can create another socket using the (still open) # file descriptor @@ -1960,7 +1960,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase): def _testSendmsgExcessCmsgReject(self): if not hasattr(socket, "CMSG_SPACE"): # Can only send one item - with self.assertRaises(socket.error) as cm: + with self.assertRaises(OSError) as cm: self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) self.assertIsNone(cm.exception.errno) self.sendToServer(b"done") @@ -1971,7 +1971,7 @@ class SendmsgTests(SendrecvmsgServerTimeoutBase): def _testSendmsgAfterClose(self): self.cli_sock.close() - self.assertRaises(socket.error, self.sendmsgToServer, [MSG]) + self.assertRaises(OSError, self.sendmsgToServer, [MSG]) class SendmsgStreamTests(SendmsgTests): @@ -2015,7 +2015,7 @@ class SendmsgStreamTests(SendmsgTests): @testSendmsgDontWait.client_skip def _testSendmsgDontWait(self): try: - with self.assertRaises(socket.error) as cm: + with self.assertRaises(OSError) as cm: while True: self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) self.assertIn(cm.exception.errno, @@ -2035,9 +2035,9 @@ class SendmsgConnectionlessTests(SendmsgTests): pass def _testSendmsgNoDestAddr(self): - self.assertRaises(socket.error, self.cli_sock.sendmsg, + self.assertRaises(OSError, self.cli_sock.sendmsg, [MSG]) - self.assertRaises(socket.error, self.cli_sock.sendmsg, + self.assertRaises(OSError, self.cli_sock.sendmsg, [MSG], [], 0, None) @@ -2123,7 +2123,7 @@ class RecvmsgGenericTests(SendrecvmsgBase): def testRecvmsgAfterClose(self): # Check that recvmsg[_into]() fails on a closed socket. self.serv_sock.close() - self.assertRaises(socket.error, self.doRecvmsg, self.serv_sock, 1024) + self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) def _testRecvmsgAfterClose(self): pass @@ -2571,7 +2571,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase): # call fails, just send msg with no ancillary data. try: nbytes = self.sendmsgToServer([msg], ancdata) - except socket.error as e: + except OSError as e: # Check that it was the system call that failed self.assertIsInstance(e.errno, int) nbytes = self.sendmsgToServer([msg]) @@ -2949,7 +2949,7 @@ class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): array.array("i", [self.traffic_class]).tobytes() + b"\x00"), (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, array.array("i", [self.hop_limit]))]) - except socket.error as e: + except OSError as e: self.assertIsInstance(e.errno, int) nbytes = self.sendmsgToServer( [MSG], @@ -3396,10 +3396,10 @@ class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): self.serv.settimeout(self.timeout) def checkInterruptedRecv(self, func, *args, **kwargs): - # Check that func(*args, **kwargs) raises socket.error with an + # Check that func(*args, **kwargs) raises OSError with an # errno of EINTR when interrupted by a signal. self.setAlarm(self.alarm_time) - with self.assertRaises(socket.error) as cm: + with self.assertRaises(OSError) as cm: func(*args, **kwargs) self.assertNotIsInstance(cm.exception, socket.timeout) self.assertEqual(cm.exception.errno, errno.EINTR) @@ -3456,9 +3456,9 @@ class InterruptedSendTimeoutTest(InterruptedTimeoutBase, def checkInterruptedSend(self, func, *args, **kwargs): # Check that func(*args, **kwargs), run in a loop, raises - # socket.error with an errno of EINTR when interrupted by a + # OSError with an errno of EINTR when interrupted by a # signal. - with self.assertRaises(socket.error) as cm: + with self.assertRaises(OSError) as cm: while True: self.setAlarm(self.alarm_time) func(*args, **kwargs) @@ -3552,7 +3552,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): start = time.time() try: self.serv.accept() - except socket.error: + except OSError: pass end = time.time() self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") @@ -3573,7 +3573,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): start = time.time() try: self.serv.accept() - except socket.error: + except OSError: pass end = time.time() self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.") @@ -3603,7 +3603,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): self.serv.setblocking(0) try: conn, addr = self.serv.accept() - except socket.error: + except OSError: pass else: self.fail("Error trying to do non-blocking accept.") @@ -3633,7 +3633,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): conn.setblocking(0) try: msg = conn.recv(len(MSG)) - except socket.error: + except OSError: pass else: self.fail("Error trying to do non-blocking recv.") @@ -3808,7 +3808,7 @@ class FileObjectClassTestCase(SocketConnectedTest): self.read_file.close() self.assertRaises(ValueError, self.read_file.fileno) self.cli_conn.close() - self.assertRaises(socket.error, self.cli_conn.getsockname) + self.assertRaises(OSError, self.cli_conn.getsockname) def _testRealClose(self): pass @@ -3845,7 +3845,7 @@ class FileObjectInterruptedTestCase(unittest.TestCase): @staticmethod def _raise_eintr(): - raise socket.error(errno.EINTR, "interrupted") + raise OSError(errno.EINTR, "interrupted") def _textiowrap_mock_socket(self, mock, buffering=-1): raw = socket.SocketIO(mock, "r") @@ -3957,7 +3957,7 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): self.assertEqual(msg, self.read_msg) # ...until the file is itself closed self.read_file.close() - self.assertRaises(socket.error, self.cli_conn.recv, 1024) + self.assertRaises(OSError, self.cli_conn.recv, 1024) def _testMakefileClose(self): self.write_file.write(self.write_msg) @@ -4106,7 +4106,7 @@ class NetworkConnectionNoServer(unittest.TestCase): port = support.find_unused_port() cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addCleanup(cli.close) - with self.assertRaises(socket.error) as cm: + with self.assertRaises(OSError) as cm: cli.connect((HOST, port)) self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) @@ -4114,7 +4114,7 @@ class NetworkConnectionNoServer(unittest.TestCase): # Issue #9792: errors raised by create_connection() should have # a proper errno attribute. port = support.find_unused_port() - with self.assertRaises(socket.error) as cm: + with self.assertRaises(OSError) as cm: socket.create_connection((HOST, port)) # Issue #16257: create_connection() calls getaddrinfo() against @@ -4262,7 +4262,7 @@ class TCPTimeoutTest(SocketTCPTest): foo = self.serv.accept() except socket.timeout: self.fail("caught timeout instead of error (TCP)") - except socket.error: + except OSError: ok = True except: self.fail("caught unexpected exception (TCP)") @@ -4319,7 +4319,7 @@ class UDPTimeoutTest(SocketUDPTest): foo = self.serv.recv(1024) except socket.timeout: self.fail("caught timeout instead of error (UDP)") - except socket.error: + except OSError: ok = True except: self.fail("caught unexpected exception (UDP)") @@ -4329,10 +4329,10 @@ class UDPTimeoutTest(SocketUDPTest): class TestExceptions(unittest.TestCase): def testExceptionTree(self): - self.assertTrue(issubclass(socket.error, Exception)) - self.assertTrue(issubclass(socket.herror, socket.error)) - self.assertTrue(issubclass(socket.gaierror, socket.error)) - self.assertTrue(issubclass(socket.timeout, socket.error)) + self.assertTrue(issubclass(OSError, Exception)) + self.assertTrue(issubclass(socket.herror, OSError)) + self.assertTrue(issubclass(socket.gaierror, OSError)) + self.assertTrue(issubclass(socket.timeout, OSError)) class TestLinuxAbstractNamespace(unittest.TestCase): @@ -4358,7 +4358,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase): def testNameOverflow(self): address = "\x00" + "h" * self.UNIX_PATH_MAX with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: - self.assertRaises(socket.error, s.bind, address) + self.assertRaises(OSError, s.bind, address) def testStrName(self): # Check that an abstract name can be passed as a string. @@ -4597,7 +4597,7 @@ class ContextManagersTest(ThreadedTCPSocketTest): self.assertTrue(sock._closed) # exception inside with block with socket.socket() as sock: - self.assertRaises(socket.error, sock.sendall, b'foo') + self.assertRaises(OSError, sock.sendall, b'foo') self.assertTrue(sock._closed) def testCreateConnectionBase(self): @@ -4625,7 +4625,7 @@ class ContextManagersTest(ThreadedTCPSocketTest): with socket.create_connection(address) as sock: sock.close() self.assertTrue(sock._closed) - self.assertRaises(socket.error, sock.sendall, b'foo') + self.assertRaises(OSError, sock.sendall, b'foo') @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), -- cgit v1.2.1 From 45d1bc39881473d6fec76c4627070ac0b89d4b4c Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 25 Dec 2012 16:47:37 +0200 Subject: Replace IOError with OSError (#16715) --- Lib/test/test_socket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index e5d9dbc1d8..be9206e99b 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -3723,7 +3723,7 @@ class FileObjectClassTestCase(SocketConnectedTest): # First read raises a timeout self.assertRaises(socket.timeout, self.read_file.read, 1) # Second read is disallowed - with self.assertRaises(IOError) as ctx: + with self.assertRaises(OSError) as ctx: self.read_file.read(1) self.assertIn("cannot read from timed out object", str(ctx.exception)) -- cgit v1.2.1 From e22777ea0898484ffb7c3fb87eedaaeef981206a Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Tue, 15 Jan 2013 01:12:17 +0200 Subject: Issue #15989: Fix several occurrences of integer overflow when result of PyLong_AsLong() narrowed to int without checks. --- Lib/test/test_socket.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index be9206e99b..282596f08e 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -1262,11 +1262,17 @@ class GeneralModuleTests(unittest.TestCase): for protocol in range(pickle.HIGHEST_PROTOCOL + 1): self.assertRaises(TypeError, pickle.dumps, sock, protocol) - def test_listen_backlog0(self): + def test_listen_backlog(self): + for backlog in 0, -1: + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.bind((HOST, 0)) + srv.listen(backlog) + srv.close() + + # Issue 15989 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.bind((HOST, 0)) - # backlog = 0 - srv.listen(0) + self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) srv.close() @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') @@ -1582,6 +1588,11 @@ class BasicTCPTest(SocketConnectedTest): def _testShutdown(self): self.serv_conn.send(MSG) + # Issue 15989 + self.assertRaises(OverflowError, self.serv_conn.shutdown, + _testcapi.INT_MAX + 1) + self.assertRaises(OverflowError, self.serv_conn.shutdown, + 2 + (_testcapi.UINT_MAX + 1)) self.serv_conn.shutdown(2) def testDetach(self): @@ -3563,6 +3574,11 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): pass end = time.time() self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") + # Issue 15989 + self.assertRaises(OverflowError, self.serv.setblocking, + _testcapi.INT_MAX + 1) + self.assertRaises(OverflowError, self.serv.setblocking, + _testcapi.UINT_MAX + 1) def _testSetBlocking(self): pass -- cgit v1.2.1 From 1e09c1ff456bc21b4d399519d8c84a84ef7133e1 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Tue, 15 Jan 2013 11:08:30 +0200 Subject: Fix test_socket broken in previous commit (changeset 13e2e44db99d). Added new checks for socket.setblocking(). --- Lib/test/test_socket.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 282596f08e..53ad35d386 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -3566,7 +3566,10 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): def testSetBlocking(self): # Testing whether set blocking works - self.serv.setblocking(0) + self.serv.setblocking(True) + self.assertIsNone(self.serv.gettimeout()) + self.serv.setblocking(False) + self.assertEqual(self.serv.gettimeout(), 0.0) start = time.time() try: self.serv.accept() @@ -3575,10 +3578,9 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): end = time.time() self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") # Issue 15989 - self.assertRaises(OverflowError, self.serv.setblocking, - _testcapi.INT_MAX + 1) - self.assertRaises(OverflowError, self.serv.setblocking, - _testcapi.UINT_MAX + 1) + if _testcapi.UINT_MAX < _testcapi.ULONG_MAX: + self.serv.setblocking(_testcapi.UINT_MAX + 1) + self.assertIsNone(self.serv.gettimeout()) def _testSetBlocking(self): pass -- cgit v1.2.1 From 9219ff2b55b62a4c1144892671e23eb66c570db9 Mon Sep 17 00:00:00 2001 From: Charles-Fran?ois Natali Date: Tue, 5 Feb 2013 19:42:01 +0100 Subject: Issue #15359: Add CAN_BCM protocol support to the socket module. Patch by Brian Thorne. --- Lib/test/test_socket.py | 106 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 11 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 53ad35d386..19377d0858 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -121,6 +121,36 @@ class SocketCANTest(unittest.TestCase): interface = 'vcan0' bufsize = 128 + """The CAN frame structure is defined in : + + struct can_frame { + canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ + __u8 can_dlc; /* data length code: 0 .. 8 */ + __u8 data[8] __attribute__((aligned(8))); + }; + """ + can_frame_fmt = "=IB3x8s" + can_frame_size = struct.calcsize(can_frame_fmt) + + """The Broadcast Management Command frame structure is defined + in : + + struct bcm_msg_head { + __u32 opcode; + __u32 flags; + __u32 count; + struct timeval ival1, ival2; + canid_t can_id; + __u32 nframes; + struct can_frame frames[0]; + } + + `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see + `struct can_frame` definition). Must use native not standard types for packing. + """ + bcm_cmd_msg_fmt = "@3I4l2I" + bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) + def setUp(self): self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) self.addCleanup(self.s.close) @@ -1291,10 +1321,35 @@ class BasicCANTest(unittest.TestCase): socket.PF_CAN socket.CAN_RAW + @unittest.skipUnless(hasattr(socket, "CAN_BCM"), + 'socket.CAN_BCM required for this test.') + def testBCMConstants(self): + socket.CAN_BCM + + # opcodes + socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task + socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task + socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task + socket.CAN_BCM_TX_SEND # send one CAN frame + socket.CAN_BCM_RX_SETUP # create RX content filter subscription + socket.CAN_BCM_RX_DELETE # remove RX content filter subscription + socket.CAN_BCM_RX_READ # read properties of RX content filter subscription + socket.CAN_BCM_TX_STATUS # reply to TX_READ request + socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) + socket.CAN_BCM_RX_STATUS # reply to RX_READ request + socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent + socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) + def testCreateSocket(self): with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: pass + @unittest.skipUnless(hasattr(socket, "CAN_BCM"), + 'socket.CAN_BCM required for this test.') + def testCreateBCMSocket(self): + with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: + pass + def testBindAny(self): with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: s.bind(('', )) @@ -1327,19 +1382,8 @@ class BasicCANTest(unittest.TestCase): @unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') -@unittest.skipUnless(thread, 'Threading required for this test.') class CANTest(ThreadedCANSocketTest): - """The CAN frame structure is defined in : - - struct can_frame { - canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ - __u8 can_dlc; /* data length code: 0 .. 8 */ - __u8 data[8] __attribute__((aligned(8))); - }; - """ - can_frame_fmt = "=IB3x8s" - def __init__(self, methodName='runTest'): ThreadedCANSocketTest.__init__(self, methodName=methodName) @@ -1388,6 +1432,46 @@ class CANTest(ThreadedCANSocketTest): self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') self.cli.send(self.cf2) + @unittest.skipUnless(hasattr(socket, "CAN_BCM"), + 'socket.CAN_BCM required for this test.') + def _testBCM(self): + cf, addr = self.cli.recvfrom(self.bufsize) + self.assertEqual(self.cf, cf) + can_id, can_dlc, data = self.dissect_can_frame(cf) + self.assertEqual(self.can_id, can_id) + self.assertEqual(self.data, data) + + @unittest.skipUnless(hasattr(socket, "CAN_BCM"), + 'socket.CAN_BCM required for this test.') + def testBCM(self): + bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) + self.addCleanup(bcm.close) + bcm.connect((self.interface,)) + self.can_id = 0x123 + self.data = bytes([0xc0, 0xff, 0xee]) + self.cf = self.build_can_frame(self.can_id, self.data) + opcode = socket.CAN_BCM_TX_SEND + flags = 0 + count = 0 + ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 + bcm_can_id = 0x0222 + nframes = 1 + assert len(self.cf) == 16 + header = struct.pack(self.bcm_cmd_msg_fmt, + opcode, + flags, + count, + ival1_seconds, + ival1_usec, + ival2_seconds, + ival2_usec, + bcm_can_id, + nframes, + ) + header_plus_frame = header + self.cf + bytes_sent = bcm.send(header_plus_frame) + self.assertEqual(bytes_sent, len(header_plus_frame)) + @unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') class BasicRDSTest(unittest.TestCase): -- cgit v1.2.1 From efb404c2acb8b094b135e015fe2451bbe7e2fb83 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 20 Mar 2013 20:16:47 +0100 Subject: Issue #16997: unittest.TestCase now provides a subTest() context manager to procedurally generate, in an easy way, small test instances. --- Lib/test/test_socket.py | 4 ---- 1 file changed, 4 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index d4ea43ac54..ec76671025 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -2,7 +2,6 @@ import unittest from test import support -from unittest.case import _ExpectedFailure import errno import io @@ -272,9 +271,6 @@ class ThreadableTest: raise TypeError("test_func must be a callable function") try: test_func() - except _ExpectedFailure: - # We deliberately ignore expected failures - pass except BaseException as e: self.queue.put(e) finally: -- cgit v1.2.1 From edf464885b4bda39cc4cc91f5c998e8b8f54dbed Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola' Date: Wed, 10 Apr 2013 15:49:47 +0200 Subject: Fix issue #17675: make socket repr() provide local and remote addresses (if any). --- Lib/test/test_socket.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index ec76671025..9b6d18448a 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -652,8 +652,17 @@ class GeneralModuleTests(unittest.TestCase): def test_repr(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.addCleanup(s.close) - self.assertTrue(repr(s).startswith(" Date: Fri, 12 Apr 2013 18:28:15 +0200 Subject: attempt to fix bb failure as per http://bugs.python.org/issue17675#msg186595 --- Lib/test/test_socket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 9b6d18448a..4693ea56b8 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -657,9 +657,9 @@ class GeneralModuleTests(unittest.TestCase): self.assertIn('family=%i' % socket.AF_INET, repr(s)) self.assertIn('type=%i' % socket.SOCK_STREAM, repr(s)) self.assertIn('proto=0', repr(s)) - self.assertIn('laddr', repr(s)) self.assertNotIn('raddr', repr(s)) s.bind(('127.0.0.1', 0)) + self.assertIn('laddr', repr(s)) self.assertIn(str(s.getsockname()), repr(s)) self.assertIn('[closed]', repr(s)) self.assertNotIn('laddr', repr(s)) -- cgit v1.2.1 From 6d0be1a7e6c10ad1852dc42ad31d6a04293f1cc6 Mon Sep 17 00:00:00 2001 From: Charles-Francois Natali Date: Mon, 20 May 2013 19:08:19 +0200 Subject: Issue #17684: Fix some test_socket failures due to limited FD passing support on OS-X. Patch by Jeff Ramnani. --- Lib/test/test_socket.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 4693ea56b8..546d793091 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -2618,8 +2618,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase): def _testFDPassCMSG_LEN(self): self.createAndSendFDs(1) - # Issue #12958: The following test has problems on Mac OS X - @support.anticipate_failure(sys.platform == "darwin") + @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") @requireAttrs(socket, "CMSG_SPACE") def testFDPassSeparate(self): # Pass two FDs in two separate arrays. Arrays may be combined @@ -2629,7 +2628,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase): maxcmsgs=2) @testFDPassSeparate.client_skip - @support.anticipate_failure(sys.platform == "darwin") + @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") def _testFDPassSeparate(self): fd0, fd1 = self.newFDs(2) self.assertEqual( @@ -2641,8 +2640,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase): array.array("i", [fd1]))]), len(MSG)) - # Issue #12958: The following test has problems on Mac OS X - @support.anticipate_failure(sys.platform == "darwin") + @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") @requireAttrs(socket, "CMSG_SPACE") def testFDPassSeparateMinSpace(self): # Pass two FDs in two separate arrays, receiving them into the @@ -2654,7 +2652,7 @@ class SCMRightsTest(SendrecvmsgServerTimeoutBase): maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) @testFDPassSeparateMinSpace.client_skip - @support.anticipate_failure(sys.platform == "darwin") + @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") def _testFDPassSeparateMinSpace(self): fd0, fd1 = self.newFDs(2) self.assertEqual( -- cgit v1.2.1 From 45fdbc0ffa91699a5eeb026b2ce8cb953a9c6288 Mon Sep 17 00:00:00 2001 From: Charles-Fran?ois Natali Date: Tue, 21 May 2013 09:49:18 +0200 Subject: Issue #17683: socket module: return AF_UNIX addresses in Linux abstract namespace as string. --- Lib/test/test_socket.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 546d793091..cb00c382f6 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -4451,7 +4451,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase): UNIX_PATH_MAX = 108 def testLinuxAbstractNamespace(self): - address = b"\x00python-test-hello\x00\xff" + address = "\x00python-test-hello\x00\xff" with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: s1.bind(address) s1.listen(1) @@ -4462,7 +4462,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase): self.assertEqual(s2.getpeername(), address) def testMaxName(self): - address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) + address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1) with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: s.bind(address) self.assertEqual(s.getsockname(), address) @@ -4472,12 +4472,12 @@ class TestLinuxAbstractNamespace(unittest.TestCase): with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: self.assertRaises(OSError, s.bind, address) - def testStrName(self): - # Check that an abstract name can be passed as a string. + def testBytesName(self): + # Check that an abstract name can be passed as bytes. s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: - s.bind("\x00python\x00test\x00") - self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") + s.bind(b"\x00python\x00test\x00") + self.assertEqual(s.getsockname(), "\x00python\x00test\x00") finally: s.close() -- cgit v1.2.1 From c927fff56475cae463b48c9f7ce58a4b70847600 Mon Sep 17 00:00:00 2001 From: Charles-Fran?ois Natali Date: Tue, 21 May 2013 10:45:46 +0200 Subject: Backed out changeset c0f2b038fc12 --- Lib/test/test_socket.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index cb00c382f6..546d793091 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -4451,7 +4451,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase): UNIX_PATH_MAX = 108 def testLinuxAbstractNamespace(self): - address = "\x00python-test-hello\x00\xff" + address = b"\x00python-test-hello\x00\xff" with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: s1.bind(address) s1.listen(1) @@ -4462,7 +4462,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase): self.assertEqual(s2.getpeername(), address) def testMaxName(self): - address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1) + address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: s.bind(address) self.assertEqual(s.getsockname(), address) @@ -4472,12 +4472,12 @@ class TestLinuxAbstractNamespace(unittest.TestCase): with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: self.assertRaises(OSError, s.bind, address) - def testBytesName(self): - # Check that an abstract name can be passed as bytes. + def testStrName(self): + # Check that an abstract name can be passed as a string. s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: - s.bind(b"\x00python\x00test\x00") - self.assertEqual(s.getsockname(), "\x00python\x00test\x00") + s.bind("\x00python\x00test\x00") + self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") finally: s.close() -- cgit v1.2.1 From e5883d6d95989121670eca012dd5bde1721747e7 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Wed, 28 Aug 2013 00:53:59 +0200 Subject: Issue #18571: Implementation of the PEP 446: file descriptors and file handles are now created non-inheritable; add functions os.get/set_inheritable(), os.get/set_handle_inheritable() and socket.socket.get/set_inheritable(). --- Lib/test/test_socket.py | 54 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 11 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index c7a52d87a9..c7b9ba8dd7 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -22,10 +22,6 @@ import signal import math import pickle import struct -try: - import fcntl -except ImportError: - fcntl = False try: import multiprocessing except ImportError: @@ -1108,9 +1104,15 @@ class GeneralModuleTests(unittest.TestCase): def testNewAttributes(self): # testing .family, .type and .protocol + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.assertEqual(sock.family, socket.AF_INET) - self.assertEqual(sock.type, socket.SOCK_STREAM) + if hasattr(socket, 'SOCK_CLOEXEC'): + self.assertIn(sock.type, + (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, + socket.SOCK_STREAM)) + else: + self.assertEqual(sock.type, socket.SOCK_STREAM) self.assertEqual(sock.proto, 0) sock.close() @@ -4749,16 +4751,46 @@ class ContextManagersTest(ThreadedTCPSocketTest): self.assertRaises(OSError, sock.sendall, b'foo') -@unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), - "SOCK_CLOEXEC not defined") -@unittest.skipUnless(fcntl, "module fcntl not available") -class CloexecConstantTest(unittest.TestCase): +class InheritanceTest(unittest.TestCase): + @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), + "SOCK_CLOEXEC not defined") @support.requires_linux_version(2, 6, 28) def test_SOCK_CLOEXEC(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: self.assertTrue(s.type & socket.SOCK_CLOEXEC) - self.assertTrue(fcntl.fcntl(s, fcntl.F_GETFD) & fcntl.FD_CLOEXEC) + self.assertTrue(sock.get_inheritable()) + + def test_default_inheritable(self): + sock = socket.socket() + with sock: + self.assertEqual(sock.get_inheritable(), False) + + def test_dup(self): + sock = socket.socket() + with sock: + newsock = sock.dup() + sock.close() + with newsock: + self.assertEqual(newsock.get_inheritable(), False) + + def test_set_inheritable(self): + sock = socket.socket() + with sock: + sock.set_inheritable(True) + self.assertEqual(sock.get_inheritable(), True) + + sock.set_inheritable(False) + self.assertEqual(sock.get_inheritable(), False) + + @unittest.skipUnless(hasattr(socket, "socketpair"), + "need socket.socketpair()") + def test_socketpair(self): + s1, s2 = socket.socketpair() + self.addCleanup(s1.close) + self.addCleanup(s2.close) + self.assertEqual(s1.get_inheritable(), False) + self.assertEqual(s2.get_inheritable(), False) @unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), @@ -4927,7 +4959,7 @@ def test_main(): NetworkConnectionAttributesTest, NetworkConnectionBehaviourTest, ContextManagersTest, - CloexecConstantTest, + InheritanceTest, NonblockConstantTest ]) if hasattr(socket, "socketpair"): -- cgit v1.2.1 From 4d0bdca26790665603d87f8b71fa665b91cf41df Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Wed, 28 Aug 2013 12:28:18 +0200 Subject: Fix test_socket.test_SOCK_CLOEXEC(), the test was wrong --- Lib/test/test_socket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index c7b9ba8dd7..81b287655d 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -4759,7 +4759,7 @@ class InheritanceTest(unittest.TestCase): with socket.socket(socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: self.assertTrue(s.type & socket.SOCK_CLOEXEC) - self.assertTrue(sock.get_inheritable()) + self.assertFalse(s.get_inheritable()) def test_default_inheritable(self): sock = socket.socket() -- cgit v1.2.1 From 21b9590a33765f6d1b532df391270c108852eae8 Mon Sep 17 00:00:00 2001 From: Charles-Fran?ois Natali Date: Sat, 31 Aug 2013 14:40:49 +0200 Subject: Use the recent support.HOSTv6 addition. --- Lib/test/test_socket.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index ae84dbfda3..f090a48868 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -565,11 +565,7 @@ class SCTPStreamBase(InetTestBase): class Inet6TestBase(InetTestBase): """Base class for IPv6 socket tests.""" - # Don't use "localhost" here - it may not have an IPv6 address - # assigned to it by default (e.g. in /etc/hosts), and if someone - # has assigned it an IPv4-mapped address, then it's unlikely to - # work with the full IPv6 API. - host = "::1" + host = support.HOSTv6 class UDP6TestBase(Inet6TestBase): """Base class for UDP-over-IPv6 tests.""" @@ -1321,9 +1317,9 @@ class GeneralModuleTests(unittest.TestCase): @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') def test_flowinfo(self): self.assertRaises(OverflowError, socket.getnameinfo, - ('::1',0, 0xffffffff), 0) + (support.HOSTv6, 0, 0xffffffff), 0) with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: - self.assertRaises(OverflowError, s.bind, ('::1', 0, -10)) + self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10)) @unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') -- cgit v1.2.1 From 2a02c3afd44266c7812b2ac030b5b667fd0635cb Mon Sep 17 00:00:00 2001 From: Eli Bendersky Date: Sat, 31 Aug 2013 15:13:30 -0700 Subject: Switch the AF_* and SOCK_* constants in the socket module to IntEnum. Closes #18720. --- Lib/test/test_socket.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index f090a48868..620576812a 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -1161,9 +1161,12 @@ class GeneralModuleTests(unittest.TestCase): socket.getaddrinfo(HOST, 80) socket.getaddrinfo(HOST, None) # test family and socktype filters - infos = socket.getaddrinfo(HOST, None, socket.AF_INET) - for family, _, _, _, _ in infos: + infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) + for family, type, _, _, _ in infos: self.assertEqual(family, socket.AF_INET) + self.assertEqual(str(family), 'AddressFamily.AF_INET') + self.assertEqual(type, socket.SOCK_STREAM) + self.assertEqual(str(type), 'SocketType.SOCK_STREAM') infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) for _, socktype, _, _, _ in infos: self.assertEqual(socktype, socket.SOCK_STREAM) @@ -1321,6 +1324,27 @@ class GeneralModuleTests(unittest.TestCase): with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10)) + def test_str_for_enums(self): + # Make sure that the AF_* and SOCK_* constants have enum-like string + # reprs. + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + self.assertEqual(str(s.family), 'AddressFamily.AF_INET') + self.assertEqual(str(s.type), 'SocketType.SOCK_STREAM') + + @unittest.skipIf(os.name == 'nt', 'Will not work on Windows') + def test_uknown_socket_family_repr(self): + # Test that when created with a family that's not one of the known + # AF_*/SOCK_* constants, socket.family just returns the number. + # + # To do this we fool socket.socket into believing it already has an + # open fd because on this path it doesn't actually verify the family and + # type and populates the socket object. + # + # On Windows this trick won't work, so the test is skipped. + fd, _ = tempfile.mkstemp() + with socket.socket(family=42424, type=13331, fileno=fd) as s: + self.assertEqual(s.family, 42424) + self.assertEqual(s.type, 13331) @unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') class BasicCANTest(unittest.TestCase): -- cgit v1.2.1 From 6a99b5b01be31255800c683fd183fef5b627b505 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sun, 8 Sep 2013 11:53:09 +0200 Subject: Issue #18904: test_socket: add inheritance tests using fcntl and FD_CLOEXEC --- Lib/test/test_socket.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 620576812a..fc478b6a87 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -26,6 +26,10 @@ try: import multiprocessing except ImportError: multiprocessing = False +try: + import fcntl +except ImportError: + fcntl = None HOST = support.HOST MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return @@ -4804,6 +4808,32 @@ class InheritanceTest(unittest.TestCase): sock.set_inheritable(False) self.assertEqual(sock.get_inheritable(), False) + if fcntl: + def test_get_inheritable_cloexec(self): + sock = socket.socket() + with sock: + fd = sock.fileno() + self.assertEqual(sock.get_inheritable(), False) + + # clear FD_CLOEXEC flag + flags = fcntl.fcntl(fd, fcntl.F_GETFD) + flags &= ~fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + + self.assertEqual(sock.get_inheritable(), True) + + def test_set_inheritable_cloexec(self): + sock = socket.socket() + with sock: + fd = sock.fileno() + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + fcntl.FD_CLOEXEC) + + sock.set_inheritable(True) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + 0) + + @unittest.skipUnless(hasattr(socket, "socketpair"), "need socket.socketpair()") def test_socketpair(self): -- cgit v1.2.1 From e3e0acbe2b56d713cb828e53270f8daf3e71c51e Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sun, 8 Sep 2013 14:14:38 +0200 Subject: Issue #18904: test_os and test_socket use unittest.skipIf() to check if fcntl module is present (to record skipped tests) --- Lib/test/test_socket.py | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index fc478b6a87..490f776d21 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -4808,30 +4808,31 @@ class InheritanceTest(unittest.TestCase): sock.set_inheritable(False) self.assertEqual(sock.get_inheritable(), False) - if fcntl: - def test_get_inheritable_cloexec(self): - sock = socket.socket() - with sock: - fd = sock.fileno() - self.assertEqual(sock.get_inheritable(), False) - - # clear FD_CLOEXEC flag - flags = fcntl.fcntl(fd, fcntl.F_GETFD) - flags &= ~fcntl.FD_CLOEXEC - fcntl.fcntl(fd, fcntl.F_SETFD, flags) - - self.assertEqual(sock.get_inheritable(), True) - - def test_set_inheritable_cloexec(self): - sock = socket.socket() - with sock: - fd = sock.fileno() - self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, - fcntl.FD_CLOEXEC) - - sock.set_inheritable(True) - self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, - 0) + @unittest.skipIf(fcntl is None, "need fcntl") + def test_get_inheritable_cloexec(self): + sock = socket.socket() + with sock: + fd = sock.fileno() + self.assertEqual(sock.get_inheritable(), False) + + # clear FD_CLOEXEC flag + flags = fcntl.fcntl(fd, fcntl.F_GETFD) + flags &= ~fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + + self.assertEqual(sock.get_inheritable(), True) + + @unittest.skipIf(fcntl is None, "need fcntl") + def test_set_inheritable_cloexec(self): + sock = socket.socket() + with sock: + fd = sock.fileno() + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + fcntl.FD_CLOEXEC) + + sock.set_inheritable(True) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + 0) @unittest.skipUnless(hasattr(socket, "socketpair"), -- cgit v1.2.1 From be88a3808c74a715a5c93ab7911ca8a89e7ea951 Mon Sep 17 00:00:00 2001 From: Charles-Fran?ois Natali Date: Fri, 13 Sep 2013 19:53:08 +0200 Subject: Issue #16201: socket: Use inet_pton()/inet_addr() instead of ad-hoc parsing for numeric IP addresses. --- Lib/test/test_socket.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 490f776d21..e995c99df1 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -757,6 +757,20 @@ class GeneralModuleTests(unittest.TestCase): if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) + def test_host_resolution(self): + for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', + '1:1:1:1:1:1:1:1:1']: + self.assertRaises(OSError, socket.gethostbyname, addr) + self.assertRaises(OSError, socket.gethostbyaddr, addr) + + for addr in [support.HOST, '10.0.0.1', '255.255.255.255']: + self.assertEqual(socket.gethostbyname(addr), addr) + + # we don't test support.HOSTv6 because there's a chance it doesn't have + # a matching name entry (e.g. 'ip6-localhost') + for host in [support.HOST]: + self.assertIn(host, socket.gethostbyaddr(host)[2]) + @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") def test_sethostname(self): -- cgit v1.2.1 From 01761c88525db30ad9136f4978aaf1451361a5ec Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Sun, 3 Nov 2013 21:31:38 +0200 Subject: Issue #18702: All skipped tests now reported as skipped. --- Lib/test/test_socket.py | 103 +++++++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 50 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index e995c99df1..beff31a7f5 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -817,16 +817,17 @@ class GeneralModuleTests(unittest.TestCase): self.assertRaises(TypeError, socket.if_nametoindex, 0) self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') + @unittest.skipUnless(hasattr(sys, 'getrefcount'), + 'test needs sys.getrefcount()') def testRefCountGetNameInfo(self): # Testing reference count for getnameinfo - if hasattr(sys, "getrefcount"): - try: - # On some versions, this loses a reference - orig = sys.getrefcount(__name__) - socket.getnameinfo(__name__,0) - except TypeError: - if sys.getrefcount(__name__) != orig: - self.fail("socket.getnameinfo loses a reference") + try: + # On some versions, this loses a reference + orig = sys.getrefcount(__name__) + socket.getnameinfo(__name__,0) + except TypeError: + if sys.getrefcount(__name__) != orig: + self.fail("socket.getnameinfo loses a reference") def testInterpreterCrash(self): # Making sure getnameinfo doesn't crash the interpreter @@ -931,17 +932,17 @@ class GeneralModuleTests(unittest.TestCase): # Check that setting it to an invalid type raises TypeError self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") + @unittest.skipUnless(hasattr(socket, 'inet_aton'), + 'test needs socket.inet_aton()') def testIPv4_inet_aton_fourbytes(self): - if not hasattr(socket, 'inet_aton'): - return # No inet_aton, nothing to check # Test that issue1008086 and issue767150 are fixed. # It must return 4 bytes. self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) + @unittest.skipUnless(hasattr(socket, 'inet_pton'), + 'test needs socket.inet_pton()') def testIPv4toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform from socket import inet_aton as f, inet_pton, AF_INET g = lambda a: inet_pton(AF_INET, a) @@ -970,9 +971,9 @@ class GeneralModuleTests(unittest.TestCase): assertInvalid(g, '1.2.3.4.5') assertInvalid(g, '::1') + @unittest.skipUnless(hasattr(socket, 'inet_pton'), + 'test needs socket.inet_pton()') def testIPv6toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform try: from socket import inet_pton, AF_INET6, has_ipv6 if not has_ipv6: @@ -1024,9 +1025,9 @@ class GeneralModuleTests(unittest.TestCase): assertInvalid('::1.2.3.4:0') assertInvalid('0.100.200.0:3:4:5:6:7:8') + @unittest.skipUnless(hasattr(socket, 'inet_ntop'), + 'test needs socket.inet_ntop()') def testStringToIPv4(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform from socket import inet_ntoa as f, inet_ntop, AF_INET g = lambda a: inet_ntop(AF_INET, a) assertInvalid = lambda func,a: self.assertRaises( @@ -1048,9 +1049,9 @@ class GeneralModuleTests(unittest.TestCase): assertInvalid(g, b'\x00' * 5) assertInvalid(g, b'\x00' * 16) + @unittest.skipUnless(hasattr(socket, 'inet_ntop'), + 'test needs socket.inet_ntop()') def testStringToIPv6(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform try: from socket import inet_ntop, AF_INET6, has_ipv6 if not has_ipv6: @@ -3660,6 +3661,8 @@ class TCPCloserTest(ThreadedTCPSocketTest): self.cli.connect((HOST, self.port)) time.sleep(1.0) +@unittest.skipUnless(hasattr(socket, 'socketpair'), + 'test needs socket.socketpair()') @unittest.skipUnless(thread, 'Threading required for this test.') class BasicSocketPairTest(SocketPairTest): @@ -3722,26 +3725,27 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): def _testSetBlocking(self): pass - if hasattr(socket, "SOCK_NONBLOCK"): - @support.requires_linux_version(2, 6, 28) - def testInitNonBlocking(self): - # reinit server socket - self.serv.close() - self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM | - socket.SOCK_NONBLOCK) - self.port = support.bind_port(self.serv) - self.serv.listen(1) - # actual testing - start = time.time() - try: - self.serv.accept() - except OSError: - pass - end = time.time() - self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.") - - def _testInitNonBlocking(self): + @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), + 'test needs socket.SOCK_NONBLOCK') + @support.requires_linux_version(2, 6, 28) + def testInitNonBlocking(self): + # reinit server socket + self.serv.close() + self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM | + socket.SOCK_NONBLOCK) + self.port = support.bind_port(self.serv) + self.serv.listen(1) + # actual testing + start = time.time() + try: + self.serv.accept() + except OSError: pass + end = time.time() + self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.") + + def _testInitNonBlocking(self): + pass def testInheritFlags(self): # Issue #7995: when calling accept() on a listening socket with a @@ -4431,12 +4435,12 @@ class TCPTimeoutTest(SocketTCPTest): if not ok: self.fail("accept() returned success when we did not expect it") + @unittest.skipUnless(hasattr(signal, 'alarm'), + 'test needs signal.alarm()') def testInterruptedTimeout(self): # XXX I don't know how to do this test on MSWindows or any other # plaform that doesn't support signal.alarm() or os.kill(), though # the bug should have existed on all platforms. - if not hasattr(signal, "alarm"): - return # can only test on *nix self.serv.settimeout(5.0) # must be longer than alarm class Alarm(Exception): pass @@ -4496,6 +4500,7 @@ class TestExceptions(unittest.TestCase): self.assertTrue(issubclass(socket.gaierror, OSError)) self.assertTrue(issubclass(socket.timeout, OSError)) +@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') class TestLinuxAbstractNamespace(unittest.TestCase): UNIX_PATH_MAX = 108 @@ -4531,6 +4536,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase): finally: s.close() +@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') class TestUnixDomain(unittest.TestCase): def setUp(self): @@ -4680,10 +4686,10 @@ def isTipcAvailable(): for line in f: if line.startswith("tipc "): return True - if support.verbose: - print("TIPC module is not loaded, please 'sudo modprobe tipc'") return False +@unittest.skipUnless(isTipcAvailable(), + "TIPC module is not loaded, please 'sudo modprobe tipc'") class TIPCTest(unittest.TestCase): def testRDM(self): srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) @@ -4706,6 +4712,8 @@ class TIPCTest(unittest.TestCase): self.assertEqual(msg, MSG) +@unittest.skipUnless(isTipcAvailable(), + "TIPC module is not loaded, please 'sudo modprobe tipc'") class TIPCThreadableTest(unittest.TestCase, ThreadableTest): def __init__(self, methodName = 'runTest'): unittest.TestCase.__init__(self, methodName = methodName) @@ -5028,15 +5036,10 @@ def test_main(): InheritanceTest, NonblockConstantTest ]) - if hasattr(socket, "socketpair"): - tests.append(BasicSocketPairTest) - if hasattr(socket, "AF_UNIX"): - tests.append(TestUnixDomain) - if sys.platform == 'linux': - tests.append(TestLinuxAbstractNamespace) - if isTipcAvailable(): - tests.append(TIPCTest) - tests.append(TIPCThreadableTest) + tests.append(BasicSocketPairTest) + tests.append(TestUnixDomain) + tests.append(TestLinuxAbstractNamespace) + tests.extend([TIPCTest, TIPCThreadableTest]) tests.extend([BasicCANTest, CANTest]) tests.extend([BasicRDSTest, RDSTest]) tests.extend([ -- cgit v1.2.1 From 10d58d3bd700037986829279885951466cf09f49 Mon Sep 17 00:00:00 2001 From: Atsuo Ishimoto Date: Mon, 16 Jul 2012 15:16:54 +0900 Subject: Issue #7171: Add Windows implementation of ``inet_ntop`` and ``inet_pton`` to socket module. --- Lib/test/test_socket.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index beff31a7f5..eb8619f75c 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -980,6 +980,14 @@ class GeneralModuleTests(unittest.TestCase): return except ImportError: return + + if sys.platform == "win32": + try: + inet_pton(AF_INET6, '::') + except OSError as e: + if e.winerror == 10022: + return # IPv6 might not be installed on this PC + f = lambda a: inet_pton(AF_INET6, a) assertInvalid = lambda a: self.assertRaises( (OSError, ValueError), f, a @@ -1058,6 +1066,14 @@ class GeneralModuleTests(unittest.TestCase): return except ImportError: return + + if sys.platform == "win32": + try: + inet_ntop(AF_INET6, b'\x00' * 16) + except OSError as e: + if e.winerror == 10022: + return # IPv6 might not be installed on this PC + f = lambda a: inet_ntop(AF_INET6, a) assertInvalid = lambda a: self.assertRaises( (OSError, ValueError), f, a -- cgit v1.2.1 From 0511c5f34946290a803109dd0e4aef1aa7e197ee Mon Sep 17 00:00:00 2001 From: "Jason R. Coombs" Date: Sun, 10 Nov 2013 14:02:04 -0500 Subject: Normalize whitespace --- Lib/test/test_socket.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index eb8619f75c..a7c2516075 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -980,14 +980,14 @@ class GeneralModuleTests(unittest.TestCase): return except ImportError: return - + if sys.platform == "win32": try: inet_pton(AF_INET6, '::') except OSError as e: if e.winerror == 10022: return # IPv6 might not be installed on this PC - + f = lambda a: inet_pton(AF_INET6, a) assertInvalid = lambda a: self.assertRaises( (OSError, ValueError), f, a -- cgit v1.2.1 From 3ee9210cf5e744a175acec3e1f49ff57f8413443 Mon Sep 17 00:00:00 2001 From: Nick Coghlan Date: Wed, 13 Nov 2013 22:10:16 +1000 Subject: Fix test_socket for repr update --- Lib/test/test_socket.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'Lib/test/test_socket.py') diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index a7c2516075..3c9fbf0603 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -650,8 +650,8 @@ class GeneralModuleTests(unittest.TestCase): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) with s: self.assertIn('fd=%i' % s.fileno(), repr(s)) - self.assertIn('family=%i' % socket.AF_INET, repr(s)) - self.assertIn('type=%i' % socket.SOCK_STREAM, repr(s)) + self.assertIn('family=%s' % socket.AF_INET, repr(s)) + self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) self.assertIn('proto=0', repr(s)) self.assertNotIn('raddr', repr(s)) s.bind(('127.0.0.1', 0)) -- cgit v1.2.1