diff options
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 100 |
1 files changed, 62 insertions, 38 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 2afa93802e..04c4c3717e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -19,7 +19,7 @@ import unittest from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) -import concurrent.futures.process +from concurrent.futures.process import BrokenProcessPool def create_future(state=PENDING, exception=None, result=None): @@ -34,7 +34,7 @@ PENDING_FUTURE = create_future(state=PENDING) RUNNING_FUTURE = create_future(state=RUNNING) CANCELLED_FUTURE = create_future(state=CANCELLED) CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) @@ -88,7 +88,7 @@ class ProcessPoolMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor -class ExecutorShutdownTest(unittest.TestCase): +class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown() self.assertRaises(RuntimeError, @@ -116,7 +116,7 @@ class ExecutorShutdownTest(unittest.TestCase): f.result() -class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): +class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase): def _prime_executor(self): pass @@ -148,7 +148,7 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): t.join() -class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): +class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase): def _prime_executor(self): pass @@ -160,7 +160,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): processes = self.executor._processes self.executor.shutdown() - for p in processes: + for p in processes.values(): p.join() def test_context_manager_shutdown(self): @@ -169,7 +169,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - for p in processes: + for p in processes.values(): p.join() def test_del_shutdown(self): @@ -180,11 +180,11 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): del executor queue_management_thread.join() - for p in processes: + for p in processes.values(): p.join() -class WaitTests(unittest.TestCase): +class WaitTests: def test_first_completed(self): future1 = self.executor.submit(mul, 21, 2) @@ -268,14 +268,14 @@ class WaitTests(unittest.TestCase): def test_timeout(self): future1 = self.executor.submit(mul, 6, 7) - future2 = self.executor.submit(time.sleep, 3) + future2 = self.executor.submit(time.sleep, 6) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2], - timeout=1.5, + timeout=5, return_when=futures.ALL_COMPLETED) self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, @@ -285,7 +285,7 @@ class WaitTests(unittest.TestCase): self.assertEqual(set([future2]), pending) -class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): +class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase): def test_pending_calls_race(self): # Issue #14406: multi-threaded race condition when waiting on all @@ -303,11 +303,11 @@ class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): sys.setswitchinterval(oldswitchinterval) -class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): +class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase): pass -class AsCompletedTests(unittest.TestCase): +class AsCompletedTests: # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. def test_no_timeout(self): future1 = self.executor.submit(mul, 2, 21) @@ -344,16 +344,23 @@ class AsCompletedTests(unittest.TestCase): SUCCESSFUL_FUTURE]), completed_futures) + def test_duplicate_futures(self): + # Issue 20367. Duplicate futures should not raise exceptions or give + # duplicate responses. + future1 = self.executor.submit(time.sleep, 2) + completed = [f for f in futures.as_completed([future1,future1])] + self.assertEqual(len(completed), 1) + -class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): +class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase): pass -class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): +class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase): pass -class ExecutorTest(unittest.TestCase): +class ExecutorTest: # Executor.shutdown() and context manager usage is tested by # ExecutorShutdownTest. def test_submit(self): @@ -379,8 +386,8 @@ class ExecutorTest(unittest.TestCase): results = [] try: for i in self.executor.map(time.sleep, - [0, 0, 3], - timeout=1.5): + [0, 0, 6], + timeout=5): results.append(i) except futures.TimeoutError: pass @@ -389,13 +396,38 @@ class ExecutorTest(unittest.TestCase): self.assertEqual([None, None], results) + def test_shutdown_race_issue12456(self): + # Issue #12456: race condition at shutdown where trying to post a + # sentinel in the call queue blocks (the queue is full while processes + # have exited). + self.executor.map(str, [2] * (self.worker_count + 1)) + self.executor.shutdown() -class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): - pass +class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase): + def test_map_submits_without_iteration(self): + """Tests verifying issue 11777.""" + finished = [] + def record_finished(n): + finished.append(n) -class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): - pass + self.executor.map(record_finished, range(10)) + self.executor.shutdown(wait=True) + self.assertCountEqual(finished, range(10)) + + +class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase): + def test_killed_child(self): + # When a child process is abruptly terminated, the whole pool gets + # "broken". + futures = [self.executor.submit(time.sleep, 3)] + # Get one of the processes, and terminate (kill) it + p = next(iter(self.executor._processes.values())) + p.terminate() + for fut in futures: + self.assertRaises(BrokenProcessPool, fut.result) + # Submitting other jobs fails as well. + self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) class FutureTests(unittest.TestCase): @@ -498,7 +530,7 @@ class FutureTests(unittest.TestCase): '<Future at 0x[0-9a-f]+ state=cancelled>') self.assertRegex( repr(EXCEPTION_FUTURE), - '<Future at 0x[0-9a-f]+ state=finished raised IOError>') + '<Future at 0x[0-9a-f]+ state=finished raised OSError>') self.assertRegex( repr(SUCCESSFUL_FUTURE), '<Future at 0x[0-9a-f]+ state=finished returned int>') @@ -509,7 +541,7 @@ class FutureTests(unittest.TestCase): f2 = create_future(state=RUNNING) f3 = create_future(state=CANCELLED) f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) + f5 = create_future(state=FINISHED, exception=OSError()) f6 = create_future(state=FINISHED, result=5) self.assertTrue(f1.cancel()) @@ -563,7 +595,7 @@ class FutureTests(unittest.TestCase): CANCELLED_FUTURE.result, timeout=0) self.assertRaises(futures.CancelledError, CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) + self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) def test_result_with_success(self): @@ -602,7 +634,7 @@ class FutureTests(unittest.TestCase): self.assertRaises(futures.CancelledError, CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) + OSError)) self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) def test_exception_with_success(self): @@ -611,27 +643,19 @@ class FutureTests(unittest.TestCase): time.sleep(1) with f1._condition: f1._state = FINISHED - f1._exception = IOError() + f1._exception = OSError() f1._condition.notify_all() f1 = create_future(state=PENDING) t = threading.Thread(target=notification) t.start() - self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) + self.assertTrue(isinstance(f1.exception(timeout=5), OSError)) @test.support.reap_threads def test_main(): try: - test.support.run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) + test.support.run_unittest(__name__) finally: test.support.reap_children() |