summaryrefslogtreecommitdiff
path: root/Lib/test/test_concurrent_futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r--Lib/test/test_concurrent_futures.py100
1 files changed, 62 insertions, 38 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 2afa93802e..04c4c3717e 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -19,7 +19,7 @@ import unittest
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
-import concurrent.futures.process
+from concurrent.futures.process import BrokenProcessPool
def create_future(state=PENDING, exception=None, result=None):
@@ -34,7 +34,7 @@ PENDING_FUTURE = create_future(state=PENDING)
RUNNING_FUTURE = create_future(state=RUNNING)
CANCELLED_FUTURE = create_future(state=CANCELLED)
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
-EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
+EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
@@ -88,7 +88,7 @@ class ProcessPoolMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
-class ExecutorShutdownTest(unittest.TestCase):
+class ExecutorShutdownTest:
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
@@ -116,7 +116,7 @@ class ExecutorShutdownTest(unittest.TestCase):
f.result()
-class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
def _prime_executor(self):
pass
@@ -148,7 +148,7 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
t.join()
-class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
def _prime_executor(self):
pass
@@ -160,7 +160,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
processes = self.executor._processes
self.executor.shutdown()
- for p in processes:
+ for p in processes.values():
p.join()
def test_context_manager_shutdown(self):
@@ -169,7 +169,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in processes:
+ for p in processes.values():
p.join()
def test_del_shutdown(self):
@@ -180,11 +180,11 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
del executor
queue_management_thread.join()
- for p in processes:
+ for p in processes.values():
p.join()
-class WaitTests(unittest.TestCase):
+class WaitTests:
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
@@ -268,14 +268,14 @@ class WaitTests(unittest.TestCase):
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
- future2 = self.executor.submit(time.sleep, 3)
+ future2 = self.executor.submit(time.sleep, 6)
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
- timeout=1.5,
+ timeout=5,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
@@ -285,7 +285,7 @@ class WaitTests(unittest.TestCase):
self.assertEqual(set([future2]), pending)
-class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
@@ -303,11 +303,11 @@ class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
sys.setswitchinterval(oldswitchinterval)
-class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
pass
-class AsCompletedTests(unittest.TestCase):
+class AsCompletedTests:
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
def test_no_timeout(self):
future1 = self.executor.submit(mul, 2, 21)
@@ -344,16 +344,23 @@ class AsCompletedTests(unittest.TestCase):
SUCCESSFUL_FUTURE]),
completed_futures)
+ def test_duplicate_futures(self):
+ # Issue 20367. Duplicate futures should not raise exceptions or give
+ # duplicate responses.
+ future1 = self.executor.submit(time.sleep, 2)
+ completed = [f for f in futures.as_completed([future1,future1])]
+ self.assertEqual(len(completed), 1)
+
-class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
pass
-class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
pass
-class ExecutorTest(unittest.TestCase):
+class ExecutorTest:
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
@@ -379,8 +386,8 @@ class ExecutorTest(unittest.TestCase):
results = []
try:
for i in self.executor.map(time.sleep,
- [0, 0, 3],
- timeout=1.5):
+ [0, 0, 6],
+ timeout=5):
results.append(i)
except futures.TimeoutError:
pass
@@ -389,13 +396,38 @@ class ExecutorTest(unittest.TestCase):
self.assertEqual([None, None], results)
+ def test_shutdown_race_issue12456(self):
+ # Issue #12456: race condition at shutdown where trying to post a
+ # sentinel in the call queue blocks (the queue is full while processes
+ # have exited).
+ self.executor.map(str, [2] * (self.worker_count + 1))
+ self.executor.shutdown()
-class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
- pass
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
+ def test_map_submits_without_iteration(self):
+ """Tests verifying issue 11777."""
+ finished = []
+ def record_finished(n):
+ finished.append(n)
-class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
- pass
+ self.executor.map(record_finished, range(10))
+ self.executor.shutdown(wait=True)
+ self.assertCountEqual(finished, range(10))
+
+
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
+ def test_killed_child(self):
+ # When a child process is abruptly terminated, the whole pool gets
+ # "broken".
+ futures = [self.executor.submit(time.sleep, 3)]
+ # Get one of the processes, and terminate (kill) it
+ p = next(iter(self.executor._processes.values()))
+ p.terminate()
+ for fut in futures:
+ self.assertRaises(BrokenProcessPool, fut.result)
+ # Submitting other jobs fails as well.
+ self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
class FutureTests(unittest.TestCase):
@@ -498,7 +530,7 @@ class FutureTests(unittest.TestCase):
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegex(
repr(EXCEPTION_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
+ '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
self.assertRegex(
repr(SUCCESSFUL_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished returned int>')
@@ -509,7 +541,7 @@ class FutureTests(unittest.TestCase):
f2 = create_future(state=RUNNING)
f3 = create_future(state=CANCELLED)
f4 = create_future(state=CANCELLED_AND_NOTIFIED)
- f5 = create_future(state=FINISHED, exception=IOError())
+ f5 = create_future(state=FINISHED, exception=OSError())
f6 = create_future(state=FINISHED, result=5)
self.assertTrue(f1.cancel())
@@ -563,7 +595,7 @@ class FutureTests(unittest.TestCase):
CANCELLED_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
- self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
+ self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
def test_result_with_success(self):
@@ -602,7 +634,7 @@ class FutureTests(unittest.TestCase):
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
- IOError))
+ OSError))
self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
def test_exception_with_success(self):
@@ -611,27 +643,19 @@ class FutureTests(unittest.TestCase):
time.sleep(1)
with f1._condition:
f1._state = FINISHED
- f1._exception = IOError()
+ f1._exception = OSError()
f1._condition.notify_all()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
- self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
+ self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
@test.support.reap_threads
def test_main():
try:
- test.support.run_unittest(ProcessPoolExecutorTest,
- ThreadPoolExecutorTest,
- ProcessPoolWaitTests,
- ThreadPoolWaitTests,
- ProcessPoolAsCompletedTests,
- ThreadPoolAsCompletedTests,
- FutureTests,
- ProcessPoolShutdownTest,
- ThreadPoolShutdownTest)
+ test.support.run_unittest(__name__)
finally:
test.support.reap_children()