diff options
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
-rw-r--r-- | Lib/test/test_multiprocessing.py | 327 |
1 files changed, 322 insertions, 5 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 298faf73fd..2bcdb4e07c 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -8,6 +8,7 @@ import unittest import queue as pyqueue import time import io +import itertools import sys import os import gc @@ -82,6 +83,23 @@ HAVE_GETVALUE = not getattr(_multiprocessing, 'HAVE_BROKEN_SEM_GETVALUE', False) WIN32 = (sys.platform == "win32") +if WIN32: + from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0 + + def wait_for_handle(handle, timeout): + if timeout is None or timeout < 0.0: + timeout = INFINITE + else: + timeout = int(1000 * timeout) + return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0 +else: + from select import select + _select = util._eintr_retry(select) + + def wait_for_handle(handle, timeout): + if timeout is not None and timeout < 0.0: + timeout = None + return handle in _select([handle], [], [], timeout)[0] try: MAXFD = os.sysconf("SC_OPEN_MAX") @@ -196,6 +214,18 @@ class _TestProcess(BaseTestCase): self.assertEqual(current.ident, os.getpid()) self.assertEqual(current.exitcode, None) + def test_daemon_argument(self): + if self.TYPE == "threads": + return + + # By default uses the current process's daemon flag. + proc0 = self.Process(target=self._test) + self.assertEqual(proc0.daemon, self.current_process().daemon) + proc1 = self.Process(target=self._test, daemon=True) + self.assertTrue(proc1.daemon) + proc2 = self.Process(target=self._test, daemon=False) + self.assertFalse(proc2.daemon) + @classmethod def _test(cls, q, *args, **kwds): current = cls.current_process() @@ -328,6 +358,26 @@ class _TestProcess(BaseTestCase): ] self.assertEqual(result, expected) + @classmethod + def _test_sentinel(cls, event): + event.wait(10.0) + + def test_sentinel(self): + if self.TYPE == "threads": + return + event = self.Event() + p = self.Process(target=self._test_sentinel, args=(event,)) + with self.assertRaises(ValueError): + p.sentinel + p.start() + self.addCleanup(p.join) + sentinel = p.sentinel + self.assertIsInstance(sentinel, int) + self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) + event.set() + p.join() + self.assertTrue(wait_for_handle(sentinel, timeout=DELTA)) + # # # @@ -1099,6 +1149,9 @@ def sqr(x, wait=0.0): time.sleep(wait) return x*x +def mul(x, y): + return x*y + class _TestPool(BaseTestCase): def test_apply(self): @@ -1112,6 +1165,20 @@ class _TestPool(BaseTestCase): self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), list(map(sqr, list(range(100))))) + def test_starmap(self): + psmap = self.pool.starmap + tuples = list(zip(range(10), range(9,-1, -1))) + self.assertEqual(psmap(mul, tuples), + list(itertools.starmap(mul, tuples))) + tuples = list(zip(range(100), range(99,-1, -1))) + self.assertEqual(psmap(mul, tuples, chunksize=20), + list(itertools.starmap(mul, tuples))) + + def test_starmap_async(self): + tuples = list(zip(range(100), range(99,-1, -1))) + self.assertEqual(self.pool.starmap_async(mul, tuples).get(), + list(itertools.starmap(mul, tuples))) + def test_map_chunksize(self): try: self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) @@ -1712,6 +1779,17 @@ class _TestConnection(BaseTestCase): self.assertRaises(RuntimeError, reduction.recv_handle, conn) p.join() +class _TestListener(BaseTestCase): + + ALLOWED_TYPES = ('processes') + + def test_multiple_bind(self): + for family in self.connection.families: + l = self.connection.Listener(family=family) + self.addCleanup(l.close) + self.assertRaises(OSError, self.connection.Listener, + l.address, family) + class _TestListenerClient(BaseTestCase): ALLOWED_TYPES = ('processes', 'threads') @@ -1732,6 +1810,85 @@ class _TestListenerClient(BaseTestCase): self.assertEqual(conn.recv(), 'hello') p.join() l.close() + +class _TestPoll(unittest.TestCase): + + ALLOWED_TYPES = ('processes', 'threads') + + def test_empty_string(self): + a, b = self.Pipe() + self.assertEqual(a.poll(), False) + b.send_bytes(b'') + self.assertEqual(a.poll(), True) + self.assertEqual(a.poll(), True) + + @classmethod + def _child_strings(cls, conn, strings): + for s in strings: + time.sleep(0.1) + conn.send_bytes(s) + conn.close() + + def test_strings(self): + strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') + a, b = self.Pipe() + p = self.Process(target=self._child_strings, args=(b, strings)) + p.start() + + for s in strings: + for i in range(200): + if a.poll(0.01): + break + x = a.recv_bytes() + self.assertEqual(s, x) + + p.join() + + @classmethod + def _child_boundaries(cls, r): + # Polling may "pull" a message in to the child process, but we + # don't want it to pull only part of a message, as that would + # corrupt the pipe for any other processes which might later + # read from it. + r.poll(5) + + def test_boundaries(self): + r, w = self.Pipe(False) + p = self.Process(target=self._child_boundaries, args=(r,)) + p.start() + time.sleep(2) + L = [b"first", b"second"] + for obj in L: + w.send_bytes(obj) + w.close() + p.join() + self.assertIn(r.recv_bytes(), L) + + @classmethod + def _child_dont_merge(cls, b): + b.send_bytes(b'a') + b.send_bytes(b'b') + b.send_bytes(b'cd') + + def test_dont_merge(self): + a, b = self.Pipe() + self.assertEqual(a.poll(0.0), False) + self.assertEqual(a.poll(0.1), False) + + p = self.Process(target=self._child_dont_merge, args=(b,)) + p.start() + + self.assertEqual(a.recv_bytes(), b'a') + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.recv_bytes(), b'b') + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(1.0), True) + self.assertEqual(a.poll(0.0), True) + self.assertEqual(a.recv_bytes(), b'cd') + + p.join() + # # Test of sending connection and socket objects between processes # @@ -2114,9 +2271,15 @@ class TestInvalidHandle(unittest.TestCase): @unittest.skipIf(WIN32, "skipped on Windows") def test_invalid_handles(self): - conn = _multiprocessing.Connection(44977608) - self.assertRaises(IOError, conn.poll) - self.assertRaises(IOError, _multiprocessing.Connection, -1) + conn = multiprocessing.connection.Connection(44977608) + try: + self.assertRaises((ValueError, IOError), conn.poll) + finally: + # Hack private attribute _handle to avoid printing an error + # in conn.__del__ + conn._handle = None + self.assertRaises((ValueError, IOError), + multiprocessing.connection.Connection, -1) # # Functions used to create test cases from the base ones in this module @@ -2320,6 +2483,161 @@ class TestStdinBadfiledescriptor(unittest.TestCase): assert sio.getvalue() == 'foo' +class TestWait(unittest.TestCase): + + @classmethod + def _child_test_wait(cls, w, slow): + for i in range(10): + if slow: + time.sleep(random.random()*0.1) + w.send((i, os.getpid())) + w.close() + + def test_wait(self, slow=False): + from multiprocessing.connection import wait + readers = [] + procs = [] + messages = [] + + for i in range(4): + r, w = multiprocessing.Pipe(duplex=False) + p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) + p.daemon = True + p.start() + w.close() + readers.append(r) + procs.append(p) + self.addCleanup(p.join) + + while readers: + for r in wait(readers): + try: + msg = r.recv() + except EOFError: + readers.remove(r) + r.close() + else: + messages.append(msg) + + messages.sort() + expected = sorted((i, p.pid) for i in range(10) for p in procs) + self.assertEqual(messages, expected) + + @classmethod + def _child_test_wait_socket(cls, address, slow): + s = socket.socket() + s.connect(address) + for i in range(10): + if slow: + time.sleep(random.random()*0.1) + s.sendall(('%s\n' % i).encode('ascii')) + s.close() + + def test_wait_socket(self, slow=False): + from multiprocessing.connection import wait + l = socket.socket() + l.bind(('', 0)) + l.listen(4) + addr = ('localhost', l.getsockname()[1]) + readers = [] + procs = [] + dic = {} + + for i in range(4): + p = multiprocessing.Process(target=self._child_test_wait_socket, + args=(addr, slow)) + p.daemon = True + p.start() + procs.append(p) + self.addCleanup(p.join) + + for i in range(4): + r, _ = l.accept() + readers.append(r) + dic[r] = [] + l.close() + + while readers: + for r in wait(readers): + msg = r.recv(32) + if not msg: + readers.remove(r) + r.close() + else: + dic[r].append(msg) + + expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') + for v in dic.values(): + self.assertEqual(b''.join(v), expected) + + def test_wait_slow(self): + self.test_wait(True) + + def test_wait_socket_slow(self): + self.test_wait(True) + + def test_wait_timeout(self): + from multiprocessing.connection import wait + + expected = 1 + a, b = multiprocessing.Pipe() + + start = time.time() + res = wait([a, b], 1) + delta = time.time() - start + + self.assertEqual(res, []) + self.assertLess(delta, expected + 0.5) + self.assertGreater(delta, expected - 0.5) + + b.send(None) + + start = time.time() + res = wait([a, b], 1) + delta = time.time() - start + + self.assertEqual(res, [a]) + self.assertLess(delta, 0.4) + + def test_wait_integer(self): + from multiprocessing.connection import wait + + expected = 5 + a, b = multiprocessing.Pipe() + p = multiprocessing.Process(target=time.sleep, args=(expected,)) + + p.start() + self.assertIsInstance(p.sentinel, int) + + start = time.time() + res = wait([a, p.sentinel, b], expected + 20) + delta = time.time() - start + + self.assertEqual(res, [p.sentinel]) + self.assertLess(delta, expected + 2) + self.assertGreater(delta, expected - 2) + + a.send(None) + + start = time.time() + res = wait([a, p.sentinel, b], 20) + delta = time.time() - start + + self.assertEqual(res, [p.sentinel, b]) + self.assertLess(delta, 0.4) + + b.send(None) + + start = time.time() + res = wait([a, p.sentinel, b], 20) + delta = time.time() - start + + self.assertEqual(res, [a, p.sentinel, b]) + self.assertLess(delta, 0.4) + + p.join() + + # # Issue 14151: Test invalid family on invalid environment # @@ -2336,9 +2654,8 @@ class TestInvalidFamily(unittest.TestCase): with self.assertRaises(ValueError): multiprocessing.connection.Listener('/var/test.pipe') - testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, - TestStdinBadfiledescriptor, TestInvalidFamily] + TestStdinBadfiledescriptor, TestWait, TestInvalidFamily] # # |