summaryrefslogtreecommitdiff
path: root/Lib/test/test_multiprocessing.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
-rw-r--r--Lib/test/test_multiprocessing.py327
1 files changed, 322 insertions, 5 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 298faf73fd..2bcdb4e07c 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -8,6 +8,7 @@ import unittest
import queue as pyqueue
import time
import io
+import itertools
import sys
import os
import gc
@@ -82,6 +83,23 @@ HAVE_GETVALUE = not getattr(_multiprocessing,
'HAVE_BROKEN_SEM_GETVALUE', False)
WIN32 = (sys.platform == "win32")
+if WIN32:
+ from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
+
+ def wait_for_handle(handle, timeout):
+ if timeout is None or timeout < 0.0:
+ timeout = INFINITE
+ else:
+ timeout = int(1000 * timeout)
+ return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
+else:
+ from select import select
+ _select = util._eintr_retry(select)
+
+ def wait_for_handle(handle, timeout):
+ if timeout is not None and timeout < 0.0:
+ timeout = None
+ return handle in _select([handle], [], [], timeout)[0]
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
@@ -196,6 +214,18 @@ class _TestProcess(BaseTestCase):
self.assertEqual(current.ident, os.getpid())
self.assertEqual(current.exitcode, None)
+ def test_daemon_argument(self):
+ if self.TYPE == "threads":
+ return
+
+ # By default uses the current process's daemon flag.
+ proc0 = self.Process(target=self._test)
+ self.assertEqual(proc0.daemon, self.current_process().daemon)
+ proc1 = self.Process(target=self._test, daemon=True)
+ self.assertTrue(proc1.daemon)
+ proc2 = self.Process(target=self._test, daemon=False)
+ self.assertFalse(proc2.daemon)
+
@classmethod
def _test(cls, q, *args, **kwds):
current = cls.current_process()
@@ -328,6 +358,26 @@ class _TestProcess(BaseTestCase):
]
self.assertEqual(result, expected)
+ @classmethod
+ def _test_sentinel(cls, event):
+ event.wait(10.0)
+
+ def test_sentinel(self):
+ if self.TYPE == "threads":
+ return
+ event = self.Event()
+ p = self.Process(target=self._test_sentinel, args=(event,))
+ with self.assertRaises(ValueError):
+ p.sentinel
+ p.start()
+ self.addCleanup(p.join)
+ sentinel = p.sentinel
+ self.assertIsInstance(sentinel, int)
+ self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
+ event.set()
+ p.join()
+ self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
+
#
#
#
@@ -1099,6 +1149,9 @@ def sqr(x, wait=0.0):
time.sleep(wait)
return x*x
+def mul(x, y):
+ return x*y
+
class _TestPool(BaseTestCase):
def test_apply(self):
@@ -1112,6 +1165,20 @@ class _TestPool(BaseTestCase):
self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
list(map(sqr, list(range(100)))))
+ def test_starmap(self):
+ psmap = self.pool.starmap
+ tuples = list(zip(range(10), range(9,-1, -1)))
+ self.assertEqual(psmap(mul, tuples),
+ list(itertools.starmap(mul, tuples)))
+ tuples = list(zip(range(100), range(99,-1, -1)))
+ self.assertEqual(psmap(mul, tuples, chunksize=20),
+ list(itertools.starmap(mul, tuples)))
+
+ def test_starmap_async(self):
+ tuples = list(zip(range(100), range(99,-1, -1)))
+ self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
+ list(itertools.starmap(mul, tuples)))
+
def test_map_chunksize(self):
try:
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
@@ -1712,6 +1779,17 @@ class _TestConnection(BaseTestCase):
self.assertRaises(RuntimeError, reduction.recv_handle, conn)
p.join()
+class _TestListener(BaseTestCase):
+
+ ALLOWED_TYPES = ('processes')
+
+ def test_multiple_bind(self):
+ for family in self.connection.families:
+ l = self.connection.Listener(family=family)
+ self.addCleanup(l.close)
+ self.assertRaises(OSError, self.connection.Listener,
+ l.address, family)
+
class _TestListenerClient(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads')
@@ -1732,6 +1810,85 @@ class _TestListenerClient(BaseTestCase):
self.assertEqual(conn.recv(), 'hello')
p.join()
l.close()
+
+class _TestPoll(unittest.TestCase):
+
+ ALLOWED_TYPES = ('processes', 'threads')
+
+ def test_empty_string(self):
+ a, b = self.Pipe()
+ self.assertEqual(a.poll(), False)
+ b.send_bytes(b'')
+ self.assertEqual(a.poll(), True)
+ self.assertEqual(a.poll(), True)
+
+ @classmethod
+ def _child_strings(cls, conn, strings):
+ for s in strings:
+ time.sleep(0.1)
+ conn.send_bytes(s)
+ conn.close()
+
+ def test_strings(self):
+ strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
+ a, b = self.Pipe()
+ p = self.Process(target=self._child_strings, args=(b, strings))
+ p.start()
+
+ for s in strings:
+ for i in range(200):
+ if a.poll(0.01):
+ break
+ x = a.recv_bytes()
+ self.assertEqual(s, x)
+
+ p.join()
+
+ @classmethod
+ def _child_boundaries(cls, r):
+ # Polling may "pull" a message in to the child process, but we
+ # don't want it to pull only part of a message, as that would
+ # corrupt the pipe for any other processes which might later
+ # read from it.
+ r.poll(5)
+
+ def test_boundaries(self):
+ r, w = self.Pipe(False)
+ p = self.Process(target=self._child_boundaries, args=(r,))
+ p.start()
+ time.sleep(2)
+ L = [b"first", b"second"]
+ for obj in L:
+ w.send_bytes(obj)
+ w.close()
+ p.join()
+ self.assertIn(r.recv_bytes(), L)
+
+ @classmethod
+ def _child_dont_merge(cls, b):
+ b.send_bytes(b'a')
+ b.send_bytes(b'b')
+ b.send_bytes(b'cd')
+
+ def test_dont_merge(self):
+ a, b = self.Pipe()
+ self.assertEqual(a.poll(0.0), False)
+ self.assertEqual(a.poll(0.1), False)
+
+ p = self.Process(target=self._child_dont_merge, args=(b,))
+ p.start()
+
+ self.assertEqual(a.recv_bytes(), b'a')
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.recv_bytes(), b'b')
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(1.0), True)
+ self.assertEqual(a.poll(0.0), True)
+ self.assertEqual(a.recv_bytes(), b'cd')
+
+ p.join()
+
#
# Test of sending connection and socket objects between processes
#
@@ -2114,9 +2271,15 @@ class TestInvalidHandle(unittest.TestCase):
@unittest.skipIf(WIN32, "skipped on Windows")
def test_invalid_handles(self):
- conn = _multiprocessing.Connection(44977608)
- self.assertRaises(IOError, conn.poll)
- self.assertRaises(IOError, _multiprocessing.Connection, -1)
+ conn = multiprocessing.connection.Connection(44977608)
+ try:
+ self.assertRaises((ValueError, IOError), conn.poll)
+ finally:
+ # Hack private attribute _handle to avoid printing an error
+ # in conn.__del__
+ conn._handle = None
+ self.assertRaises((ValueError, IOError),
+ multiprocessing.connection.Connection, -1)
#
# Functions used to create test cases from the base ones in this module
@@ -2320,6 +2483,161 @@ class TestStdinBadfiledescriptor(unittest.TestCase):
assert sio.getvalue() == 'foo'
+class TestWait(unittest.TestCase):
+
+ @classmethod
+ def _child_test_wait(cls, w, slow):
+ for i in range(10):
+ if slow:
+ time.sleep(random.random()*0.1)
+ w.send((i, os.getpid()))
+ w.close()
+
+ def test_wait(self, slow=False):
+ from multiprocessing.connection import wait
+ readers = []
+ procs = []
+ messages = []
+
+ for i in range(4):
+ r, w = multiprocessing.Pipe(duplex=False)
+ p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
+ p.daemon = True
+ p.start()
+ w.close()
+ readers.append(r)
+ procs.append(p)
+ self.addCleanup(p.join)
+
+ while readers:
+ for r in wait(readers):
+ try:
+ msg = r.recv()
+ except EOFError:
+ readers.remove(r)
+ r.close()
+ else:
+ messages.append(msg)
+
+ messages.sort()
+ expected = sorted((i, p.pid) for i in range(10) for p in procs)
+ self.assertEqual(messages, expected)
+
+ @classmethod
+ def _child_test_wait_socket(cls, address, slow):
+ s = socket.socket()
+ s.connect(address)
+ for i in range(10):
+ if slow:
+ time.sleep(random.random()*0.1)
+ s.sendall(('%s\n' % i).encode('ascii'))
+ s.close()
+
+ def test_wait_socket(self, slow=False):
+ from multiprocessing.connection import wait
+ l = socket.socket()
+ l.bind(('', 0))
+ l.listen(4)
+ addr = ('localhost', l.getsockname()[1])
+ readers = []
+ procs = []
+ dic = {}
+
+ for i in range(4):
+ p = multiprocessing.Process(target=self._child_test_wait_socket,
+ args=(addr, slow))
+ p.daemon = True
+ p.start()
+ procs.append(p)
+ self.addCleanup(p.join)
+
+ for i in range(4):
+ r, _ = l.accept()
+ readers.append(r)
+ dic[r] = []
+ l.close()
+
+ while readers:
+ for r in wait(readers):
+ msg = r.recv(32)
+ if not msg:
+ readers.remove(r)
+ r.close()
+ else:
+ dic[r].append(msg)
+
+ expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
+ for v in dic.values():
+ self.assertEqual(b''.join(v), expected)
+
+ def test_wait_slow(self):
+ self.test_wait(True)
+
+ def test_wait_socket_slow(self):
+ self.test_wait(True)
+
+ def test_wait_timeout(self):
+ from multiprocessing.connection import wait
+
+ expected = 1
+ a, b = multiprocessing.Pipe()
+
+ start = time.time()
+ res = wait([a, b], 1)
+ delta = time.time() - start
+
+ self.assertEqual(res, [])
+ self.assertLess(delta, expected + 0.5)
+ self.assertGreater(delta, expected - 0.5)
+
+ b.send(None)
+
+ start = time.time()
+ res = wait([a, b], 1)
+ delta = time.time() - start
+
+ self.assertEqual(res, [a])
+ self.assertLess(delta, 0.4)
+
+ def test_wait_integer(self):
+ from multiprocessing.connection import wait
+
+ expected = 5
+ a, b = multiprocessing.Pipe()
+ p = multiprocessing.Process(target=time.sleep, args=(expected,))
+
+ p.start()
+ self.assertIsInstance(p.sentinel, int)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], expected + 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [p.sentinel])
+ self.assertLess(delta, expected + 2)
+ self.assertGreater(delta, expected - 2)
+
+ a.send(None)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [p.sentinel, b])
+ self.assertLess(delta, 0.4)
+
+ b.send(None)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [a, p.sentinel, b])
+ self.assertLess(delta, 0.4)
+
+ p.join()
+
+
#
# Issue 14151: Test invalid family on invalid environment
#
@@ -2336,9 +2654,8 @@ class TestInvalidFamily(unittest.TestCase):
with self.assertRaises(ValueError):
multiprocessing.connection.Listener('/var/test.pipe')
-
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
- TestStdinBadfiledescriptor, TestInvalidFamily]
+ TestStdinBadfiledescriptor, TestWait, TestInvalidFamily]
#
#