diff options
author | brian.quinlan <devnull@localhost> | 2009-05-04 21:02:48 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2009-05-04 21:02:48 +0000 |
commit | a12b38ce97773a42379c032cb4003411c094c030 (patch) | |
tree | 48b6a806f5e7320eec8cd4fe15b5a55cb570ff67 | |
parent | 3b24ddaa96d138bac457f5d394c3a71947ad2d15 (diff) | |
download | futures-a12b38ce97773a42379c032cb4003411c094c030.tar.gz |
First maybe-working version of process pools.
-rw-r--r-- | crawl.py | 8 | ||||
-rw-r--r-- | futures/__init__.py | 3 | ||||
-rw-r--r-- | futures/_base.py | 261 | ||||
-rw-r--r-- | futures/process.py | 165 | ||||
-rw-r--r-- | futures/thread.py | 274 | ||||
-rw-r--r-- | primes.py | 46 | ||||
-rw-r--r-- | test_futures.py | 123 |
7 files changed, 491 insertions, 389 deletions
@@ -11,7 +11,7 @@ URLS = ['http://www.google.com/', 'http://www.thisurlprobablydoesnotexist.com', 'http://www.slashdot.org/', 'http://www.python.org/', - 'http://www.sweetapp.com/'] + 'http://www.sweetapp.com/'] * 1000 def load_url(url, timeout): return urllib.request.urlopen(url, timeout=timeout).read() @@ -25,14 +25,14 @@ def download_urls(urls, timeout=60): pass return url_to_content -executor = futures.thread.ThreadPoolExecutor(max_threads=100) +executor = futures.ProcessPoolExecutor(100) def download_urls_with_futures(urls, timeout=60): url_to_content = {} fs = executor.run( (functools.partial(load_url, url, timeout) for url in urls), timeout=timeout) - for url, future in zip(urls, fs.result_futures()): + for url, future in zip(urls, fs.successful_futures()): url_to_content[url] = future.result() return url_to_content -print(download_urls(URLS)) +print(download_urls_with_futures(URLS)) diff --git a/futures/__init__.py b/futures/__init__.py index e69de29..33a3e38 100644 --- a/futures/__init__.py +++ b/futures/__init__.py @@ -0,0 +1,3 @@ +from futures._base import CancelledException, TimeoutException, Future, FutureList, FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED, RETURN_IMMEDIATELY +from futures.thread import ThreadPoolExecutor +from futures.process import ProcessPoolExecutor diff --git a/futures/_base.py b/futures/_base.py new file mode 100644 index 0000000..d7d3337 --- /dev/null +++ b/futures/_base.py @@ -0,0 +1,261 @@ +import threading + +FIRST_COMPLETED = 0 +FIRST_EXCEPTION = 1 +ALL_COMPLETED = 2 +RETURN_IMMEDIATELY = 3 + +PENDING = 0 +RUNNING = 1 +CANCELLED = 2 +FINISHED = 3 + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + FINISHED: "finished" +} + +class CancelledException(Exception): + pass + +class TimeoutException(Exception): + pass + +class Future(object): + def __init__(self): + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '<Future state=%s raised %s>' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '<Future state=%s returned %s>' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] + + def cancel(self): + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + self._state = CANCELLED + return True + + def cancelled(self): + with self._condition: + return self._state == CANCELLED + + def done(self): + with self._condition: + return self._state in [CANCELLED, FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def result(self, timeout=None): + with self._condition: + if self._state == CANCELLED: + raise CancelledException() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state == CANCELLED: + raise CancelledException() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutException() + + def exception(self, timeout=None): + with self._condition: + if self._state == CANCELLED: + raise CancelledException() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state == CANCELLED: + raise CancelledException() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutException() + +class _NullWaitTracker(object): + def add_result(self): + pass + + def add_exception(self): + pass + + def add_cancelled(self): + pass + +class _FirstCompletedWaitTracker(object): + def __init__(self): + self.event = threading.Event() + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(object): + def __init__(self, pending_calls, stop_on_exception): + self.event = threading.Event() + self.pending_calls = pending_calls + self.stop_on_exception = stop_on_exception + + def add_result(self): + self.pending_calls -= 1 + if not self.pending_calls: + self.event.set() + + def add_exception(self): + self.add_result() + if self.stop_on_exception: + self.event.set() + + def add_cancelled(self): + self.add_result() + +class ThreadEventSink(object): + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def add_result(self): + with self._condition: + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + with self._condition: + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + with self._condition: + for waiter in self._waiters: + waiter.add_cancelled() + +class FutureList(object): + def __init__(self, futures, event_sink): + self._futures = futures + self._event_sink = event_sink + + def wait(self, timeout=None, run_until=ALL_COMPLETED): + with self._event_sink._condition: + if all(f.done() for f in self): + return + + if run_until == FIRST_COMPLETED: + m = _FirstCompletedWaitTracker() + elif run_until == FIRST_EXCEPTION: + m = _AllCompletedWaitTracker(len(self), stop_on_exception=True) + elif run_until == ALL_COMPLETED: + m = _AllCompletedWaitTracker(len(self), stop_on_exception=False) + elif run_until == RETURN_IMMEDIATELY: + m = _NullWaitTracker() + else: + raise ValueError() + + self._event_sink.add(m) + + if run_until != RETURN_IMMEDIATELY: + m.event.wait(timeout) + + def cancel(self, timeout=None): + for f in self: + f.cancel() + self.wait(timeout=timeout, run_until=ALL_COMPLETED) + if any(not f.done() for f in self): + raise TimeoutException() + + def has_running_futures(self): + return bool(self.running_futures()) + + def has_cancelled_futures(self): + return bool(self.cancelled_futures()) + + def has_done_futures(self): + return bool(self.done_futures()) + + def has_successful_futures(self): + return bool(self.successful_futures()) + + def has_exception_futures(self): + return bool(self.exception_futures()) + + def running_futures(self): + return [f for f in self if not f.done() and not f.cancelled()] + + def cancelled_futures(self): + return [f for f in self if f.cancelled()] + + def done_futures(self): + return [f for f in self if f.done()] + + def successful_futures(self): + return [f for f in self + if f.done() and not f.cancelled() and f.exception() is None] + + def exception_futures(self): + return [f for f in self if f.done() and f.exception() is not None] + + def __getitem__(self, i): + return self._futures[i] + + def __len__(self): + return len(self._futures) + + def __iter__(self): + return iter(self._futures) + + def __contains__(self, f): + return f in self._futures + + def __repr__(self): + return ('<FutureList #futures=%d ' + '[#success=%d #exception=%d #cancelled=%d]>' % ( + len(self), + len(self.successful_futures()), + len(self.exception_futures()), + len(self.cancelled_futures()))) + +import functools +class Executor(object): + def map(self, fn, iter): + calls = [functools.partial(fn, a) for a in iter] + return self.runXXX(calls) + + def runXXX(self, calls): + fs = self.run(calls, timeout=None, run_until=FIRST_EXCEPTION) + + if fs.has_exception_futures(): + raise fs.exception_futures()[0].exception() + else: + return [f.result() for f in fs] diff --git a/futures/process.py b/futures/process.py index 945303f..499eeb6 100644 --- a/futures/process.py +++ b/futures/process.py @@ -1,94 +1,153 @@ #!/usr/bin/env python +from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList +import queue +import multiprocessing +import threading + class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call self.future = future self.completion_tracker = completion_tracker - def run(self): - if self.future.cancelled(): - with self.future._condition: - self.future._condition.notify_all() - self.completion_tracker.add_cancelled() - return +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, call): + self.work_id = work_id + self.call = call - self.future._state = _RUNNING +def _process_worker(call_queue, result_queue, shutdown): + while True: try: - r = self.call() - except BaseException as e: - with self.future._condition: - self.future._exception = e - self.future._state = _FINISHED - self.future._condition.notify_all() - self.completion_tracker.add_exception() + call_item = call_queue.get(block=True, timeout=0.1) + except queue.Empty: + if shutdown.is_set(): + return else: - with self.future._condition: - self.future._result = r - self.future._state = _FINISHED - self.future._condition.notify_all() - self.completion_tracker.add_result() - -class XXX: - def wait(self, timeout=None, run_until=ALL_COMPLETED): - - pass - -class ProcessPoolExecutor(object): - def __init__(self, max_processes): + try: + r = call_item.call() + except BaseException as e: + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +class ProcessPoolExecutor(Executor): + def __init__(self, max_processes=None): + if max_processes is None: + try: + max_processes = multiprocessing.cpu_count() + except NotImplementedError: + max_processes = 16 + self._max_processes = max_processes - self._work_queue = multiprocessing.Queue() + self._call_queue = multiprocessing.Queue(self._max_processes + 1) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None self._processes = set() self._shutdown = False + self._shutdown_process_event = multiprocessing.Event() self._lock = threading.Lock() self._queue_count = 0 - self._pending_futures = {} + self._pending_work_items = {} + - def _(self): + def _add_call_item_to_queue(self): while True: try: + work_id = self._work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = self._pending_work_items[work_id] + + if work_item.future.cancelled(): + with work_item.future._condition: + work_item.future._condition.notify_all() + work_item.completion_tracker.add_cancelled() + continue + else: + with work_item.future._condition: + work_item.future._state = RUNNING + + self._call_queue.put(_CallItem(work_id, work_item.call), + block=True) + if self._call_queue.full(): + return + + def _result(self): + while True: + self._add_call_item_to_queue() + try: result_item = self._result_queue.get(block=True, timeout=0.1) - except multiprocessing.TimeoutError: - if self._shutdown: + except queue.Empty: + if self._shutdown and not self._pending_work_items: + self._shutdown_process_event.set() return else: - completion_tracker, future = self._pending_futures[ - result_item.index] - + work_item = self._pending_work_items[result_item.work_id] + del self._pending_work_items[result_item.work_id] + if result_item.exception: - with future._condition: - future._exception = result_item.exception - future._state = _FINISHED - future._condition.notify_all() - completion_tracker.add_exception() + with work_item.future._condition: + work_item.future._exception = result_item.exception + work_item.future._state = FINISHED + work_item.future._condition.notify_all() + work_item.completion_tracker.add_exception() else: - with future._condition: - future._result = result_item.result - future._state = _FINISHED - future._condition.notify_all() - completion_tracker.add_result() - - + with work_item.future._condition: + work_item.future._result = result_item.result + work_item.future._state = FINISHED + work_item.future._condition.notify_all() + work_item.completion_tracker.add_result() def _adjust_process_count(self): - + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=self._result) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + + for _ in range(len(self._processes), self._max_processes): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._shutdown_process_event)) + p.daemon = True + p.start() + self._processes.add(p) + def run(self, calls, timeout=None, run_until=ALL_COMPLETED): with self._lock: if self._shutdown: raise RuntimeError() futures = [] - event_sink = _ThreadEventSink() + event_sink = ThreadEventSink() + self._queue_count for call in calls: f = Future() - w = _WorkItem(call, f, event_sink) - self._work_queue.put(w) + self._pending_work_items[self._queue_count] = _WorkItem( + call, f, event_sink) + self._work_ids.put(self._queue_count) futures.append(f) self._queue_count += 1 - - print('futures:', futures) + self._adjust_process_count() fl = FutureList(futures, event_sink) fl.wait(timeout=timeout, run_until=run_until) return fl + + def shutdown(self): + with self._lock: + self._shutdown = True diff --git a/futures/thread.py b/futures/thread.py index a4c779d..3f1b807 100644 --- a/futures/thread.py +++ b/futures/thread.py @@ -1,259 +1,9 @@ #!/usr/bin/env python +from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList import queue import threading -FIRST_COMPLETED = 0 -FIRST_EXCEPTION = 1 -ALL_COMPLETED = 2 -RETURN_IMMEDIATELY = 3 - -_PENDING = 0 -_RUNNING = 1 -_CANCELLED = 2 -_FINISHED = 3 - -_STATE_TO_DESCRIPTION_MAP = { - _PENDING: "pending", - _RUNNING: "running", - _CANCELLED: "cancelled", - _FINISHED: "finished" -} - -class CancelledException(Exception): - pass - -class TimeoutException(Exception): - pass - -class Future(object): - def __init__(self): - self._condition = threading.Condition() - self._state = _PENDING - self._result = None - self._exception = None - - def __repr__(self): - with self._condition: - if self._state == _FINISHED: - if self._exception: - return '<Future state=%s raised %s>' % ( - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '<Future state=%s returned %s>' % ( - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] - - def cancel(self): - with self._condition: - if self._state in [_RUNNING, _FINISHED]: - return False - - self._state = _CANCELLED - return True - - def cancelled(self): - with self._condition: - return self._state == _CANCELLED - - def done(self): - with self._condition: - return self._state in [_CANCELLED, _FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def result(self, timeout=None): - with self._condition: - if self._state == _CANCELLED: - raise CancelledException() - elif self._state == _FINISHED: - return self.__get_result() - - print('Waiting...') - self._condition.wait(timeout) - print('Post Waiting...') - - if self._state == _CANCELLED: - raise CancelledException() - elif self._state == _FINISHED: - return self.__get_result() - else: - raise TimeoutException() - - def exception(self, timeout=None): - with self._condition: - if self._state == _CANCELLED: - raise CancelledException() - elif self._state == _FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state == _CANCELLED: - raise CancelledException() - elif self._state == _FINISHED: - return self._exception - else: - raise TimeoutException() - -class _NullWaitTracker(object): - def add_result(self): - pass - - def add_exception(self): - pass - - def add_cancelled(self): - pass - -class _FirstCompletedWaitTracker(object): - def __init__(self): - self.event = threading.Event() - - def add_result(self): - self.event.set() - - def add_exception(self): - self.event.set() - - def add_cancelled(self): - self.event.set() - -class _AllCompletedWaitTracker(object): - def __init__(self, pending_calls, stop_on_exception): - self.event = threading.Event() - self.pending_calls = pending_calls - self.stop_on_exception = stop_on_exception - - def add_result(self): - self.pending_calls -= 1 - if not self.pending_calls: - self.event.set() - - def add_exception(self): - self.add_result() - if self.stop_on_exception: - self.event.set() - - def add_cancelled(self): - self.add_result() - -class _ThreadEventSink(object): - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] - - def add(self, e): - self._waiters.append(e) - - def add_result(self): - with self._condition: - for waiter in self._waiters: - waiter.add_result() - - def add_exception(self): - with self._condition: - for waiter in self._waiters: - waiter.add_exception() - - def add_cancelled(self): - with self._condition: - for waiter in self._waiters: - waiter.add_cancelled() - -class FutureList(object): - def __init__(self, futures, event_sink): - self._futures = futures - self._event_sink = event_sink - - def wait(self, timeout=None, run_until=ALL_COMPLETED): - with self._event_sink._condition: - print('WAIT 123') - if all(f.done() for f in self): - return - print('WAIT 1234') - - if run_until == FIRST_COMPLETED: - m = _FirstCompletedWaitTracker() - elif run_until == FIRST_EXCEPTION: - m = _AllCompletedWaitTracker(len(self), stop_on_exception=True) - elif run_until == ALL_COMPLETED: - m = _AllCompletedWaitTracker(len(self), stop_on_exception=False) - elif run_until == RETURN_IMMEDIATELY: - m = _NullWaitTracker() - else: - raise ValueError() - - self._event_sink.add(m) - - if run_until != RETURN_IMMEDIATELY: - print('WAIT 12345', timeout) - m.event.wait(timeout) - - def cancel(self, timeout=None): - for f in self: - f.cancel() - self.wait(timeout=timeout, run_until=ALL_COMPLETED) - if any(not f.done() for f in self): - raise TimeoutException() - - def has_running_futures(self): - return bool(self.running_futures()) - - def has_cancelled_futures(self): - return bool(self.cancelled_futures()) - - def has_done_futures(self): - return bool(self.done_futures()) - - def has_successful_futures(self): - return bool(self.successful_futures()) - - def has_exception_futures(self): - return bool(self.exception_futures()) - - def running_futures(self): - return [f for f in self if not f.done() and not f.cancelled()] - - def cancelled_futures(self): - return [f for f in self if f.cancelled()] - - def done_futures(self): - return [f for f in self if f.done()] - - def successful_futures(self): - return [f for f in self - if f.done() and not f.cancelled() and f.exception() is None] - - def exception_futures(self): - return [f for f in self if f.done() and f.exception() is not None] - - def __getitem__(self, i): - return self._futures[i] - - def __len__(self): - return len(self._futures) - - def __iter__(self): - return iter(self._futures) - - def __contains__(self, f): - return f in self._futures - - def __repr__(self): - return ('<FutureList #futures=%d ' - '[#success=%d #exception=%d #cancelled=%d]>' % ( - len(self), - len(self.successful_futures()), - len(self.exception_futures()), - len(self.cancelled_futures()))) - class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call @@ -267,23 +17,25 @@ class _WorkItem(object): self.completion_tracker.add_cancelled() return - self.future._state = _RUNNING + with self.future._condition: + self.future._state = RUNNING + try: r = self.call() except BaseException as e: with self.future._condition: self.future._exception = e - self.future._state = _FINISHED + self.future._state = FINISHED self.future._condition.notify_all() self.completion_tracker.add_exception() else: with self.future._condition: self.future._result = r - self.future._state = _FINISHED + self.future._state = FINISHED self.future._condition.notify_all() self.completion_tracker.add_result() -class ThreadPoolExecutor(object): +class ThreadPoolExecutor(Executor): def __init__(self, max_threads): self._max_threads = max_threads self._work_queue = queue.Queue() @@ -308,7 +60,6 @@ class ThreadPoolExecutor(object): def _adjust_thread_count(self): for _ in range(len(self._threads), min(self._max_threads, self._work_queue.qsize())): - print('Creating a thread') t = threading.Thread(target=self._worker) t.daemon = True t.start() @@ -320,27 +71,18 @@ class ThreadPoolExecutor(object): raise RuntimeError() futures = [] - event_sink = _ThreadEventSink() + event_sink = ThreadEventSink() for call in calls: f = Future() w = _WorkItem(call, f, event_sink) self._work_queue.put(w) futures.append(f) - print('futures:', futures) self._adjust_thread_count() fl = FutureList(futures, event_sink) fl.wait(timeout=timeout, run_until=run_until) return fl - def runXXX(self, calls, timeout=None): - fs = self.run(calls, timeout, run_util=FIRST_EXCEPTION) - - if fs.has_exception_futures(): - raise fs.exception_futures()[0].exception() - else: - return [f.result() for f in fs] - def shutdown(self): with self._lock: self._shutdown = True diff --git a/primes.py b/primes.py new file mode 100644 index 0000000..4397c67 --- /dev/null +++ b/primes.py @@ -0,0 +1,46 @@ +import futures +import math +import time + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099] + +def is_prime(n): + n = abs(n) + i = 2 + while i <= math.sqrt(n): + if n % i == 0: + return False + i += 1 + return True + +def sequential(): + return list(map(is_prime, PRIMES)) + +def with_process_pool_executor(): + executor = futures.ProcessPoolExecutor() + try: + return list(executor.map(is_prime, PRIMES)) + finally: + executor.shutdown() + +def with_thread_pool_executor(): + executor = futures.ThreadPoolExecutor(10) + try: + return list(executor.map(is_prime, PRIMES)) + finally: + executor.shutdown() + +def main(): + for name, fn in [('sequential', sequential), + ('processes', with_process_pool_executor), + ('threads', with_thread_pool_executor)]: + start = time.time() + fn() + print('%s: %.2f seconds' % (name.ljust(10), time.time() - start)) + +main()
\ No newline at end of file diff --git a/test_futures.py b/test_futures.py index c0f0539..91ac955 100644 --- a/test_futures.py +++ b/test_futures.py @@ -4,8 +4,9 @@ from test.support import verbose import unittest import threading import time +import multiprocessing -import futures.thread as threaded_futures +import futures class Call(object): def __init__(self, manual_finish=False): @@ -27,20 +28,16 @@ class Call(object): def __call__(self): if self._called_event.is_set(): print('called twice') - print('Doing call...') self._called_event.set() self._can_finished.wait() - print('About to return...') return 42 class ExceptionCall(Call): def __call__(self): assert not self._called_event.is_set(), 'already called' - print('Doing exception call...') self._called_event.set() self._can_finished.wait() - print('About to raise...') raise ZeroDivisionError() class FutureStub(object): @@ -60,7 +57,7 @@ class FutureStub(object): class ShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): - self.executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + self.executor = futures.ThreadPoolExecutor(max_threads=1) call1 = Call() self.executor.shutdown() @@ -69,14 +66,14 @@ class ShutdownTest(unittest.TestCase): [call1]) def test_threads_terminate(self): - self.executor = threaded_futures.ThreadPoolExecutor(max_threads=5) + self.executor = futures.ThreadPoolExecutor(max_threads=5) call1 = Call(manual_finish=True) call2 = Call(manual_finish=True) call3 = Call(manual_finish=True) self.executor.run([call1, call2, call3], - run_until=threaded_futures.RETURN_IMMEDIATELY) + run_until=futures.RETURN_IMMEDIATELY) call1.wait_on_called() call2.wait_on_called() @@ -92,40 +89,36 @@ class ShutdownTest(unittest.TestCase): t.join() -class ConcurrentWaitsTest(unittest.TestCase): +class WaitsTest(unittest.TestCase): def test(self): def aaa(): - fs.wait(run_until=threaded_futures.ALL_COMPLETED) + fs.wait(run_until=futures.ALL_COMPLETED) self.assertTrue(f1.done()) self.assertTrue(f2.done()) self.assertTrue(f3.done()) self.assertTrue(f4.done()) def bbb(): - fs.wait(run_until=threaded_futures.FIRST_COMPLETED) + fs.wait(run_until=futures.FIRST_COMPLETED) self.assertTrue(f1.done()) self.assertFalse(f2.done()) self.assertFalse(f3.done()) self.assertFalse(f4.done()) def ccc(): - fs.wait(run_until=threaded_futures.FIRST_EXCEPTION) + fs.wait(run_until=futures.FIRST_EXCEPTION) self.assertTrue(f1.done()) self.assertTrue(f2.done()) - print('fs:', fs) - print(f1, f2, f3, f4) self.assertFalse(f3.done()) self.assertFalse(f4.done()) - executor = threaded_futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = ExceptionCall(manual_finish=True) call3 = Call(manual_finish=True) call4 = Call() - fs = executor.run([call1, call2, call3, call4], - run_until=threaded_futures.RETURN_IMMEDIATELY) + fs = self.executor.run([call1, call2, call3, call4], + run_until=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs threads = [] @@ -144,15 +137,18 @@ class ConcurrentWaitsTest(unittest.TestCase): call4.set_can() for t in threads: - print('join') t.join() - print('shutdown') - executor.shutdown() - print('done shutdown') + self.executor.shutdown() + +class ThreadPoolWaitTests(WaitsTest): + executor = futures.ThreadPoolExecutor(max_threads=1) + +class ProcessPoolWaitTests(WaitsTest): + executor = futures.ProcessPoolExecutor(max_processes=1) class CancelTests(unittest.TestCase): def test_cancel_states(self): - executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + executor = futures.ThreadPoolExecutor(max_threads=1) call1 = Call(manual_finish=True) call2 = Call() @@ -160,7 +156,7 @@ class CancelTests(unittest.TestCase): call4 = Call() fs = executor.run([call1, call2, call3, call4], - run_until=threaded_futures.RETURN_IMMEDIATELY) + run_until=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() @@ -177,13 +173,13 @@ class CancelTests(unittest.TestCase): self.assertEqual(f4.done(), True) call1.set_can() - fs.wait(run_until=threaded_futures.ALL_COMPLETED) + fs.wait(run_until=futures.ALL_COMPLETED) self.assertEqual(f1.result(), 42) - self.assertRaises(threaded_futures.CancelledException, f2.result) - self.assertRaises(threaded_futures.CancelledException, f2.exception) + self.assertRaises(futures.CancelledException, f2.result) + self.assertRaises(futures.CancelledException, f2.exception) self.assertEqual(f3.result(), 42) - self.assertRaises(threaded_futures.CancelledException, f4.result) - self.assertRaises(threaded_futures.CancelledException, f4.exception) + self.assertRaises(futures.CancelledException, f4.result) + self.assertRaises(futures.CancelledException, f4.exception) self.assertEqual(call2.called(), False) self.assertEqual(call4.called(), False) @@ -191,32 +187,28 @@ class CancelTests(unittest.TestCase): def test_wait_for_individual_cancel(self): def end_call(): - print ('Here1') time.sleep(1) - print ('Here2') f2.cancel() - print ('Here3') call1.set_can() - print ('Here4') - executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + executor = futures.ThreadPoolExecutor(max_threads=1) call1 = Call(manual_finish=True) call2 = Call() - fs = executor.run([call1, call2], run_until=threaded_futures.RETURN_IMMEDIATELY) + fs = executor.run([call1, call2], run_until=futures.RETURN_IMMEDIATELY) f1, f2 = fs call1.wait_on_called() t = threading.Thread(target=end_call) t.start() - self.assertRaises(threaded_futures.CancelledException, f2.result) - self.assertRaises(threaded_futures.CancelledException, f2.exception) + self.assertRaises(futures.CancelledException, f2.result) + self.assertRaises(futures.CancelledException, f2.exception) t.join() executor.shutdown() def test_cancel_all(self): - executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + executor = futures.ThreadPoolExecutor(max_threads=1) call1 = Call(manual_finish=True) call2 = Call() @@ -224,13 +216,11 @@ class CancelTests(unittest.TestCase): call4 = Call() fs = executor.run([call1, call2, call3, call4], - run_until=threaded_futures.RETURN_IMMEDIATELY) + run_until=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() - print('HERE!!!') - self.assertRaises(threaded_futures.TimeoutException, fs.cancel, timeout=0) - print('HERE 2!!!') + self.assertRaises(futures.TimeoutException, fs.cancel, timeout=0) call1.set_can() fs.cancel() @@ -241,12 +231,12 @@ class CancelTests(unittest.TestCase): executor.shutdown() def test_cancel_repr(self): - executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + executor = futures.ThreadPoolExecutor(max_threads=1) call1 = Call(manual_finish=True) call2 = Call() - fs = executor.run([call1, call2], run_until=threaded_futures.RETURN_IMMEDIATELY) + fs = executor.run([call1, call2], run_until=futures.RETURN_IMMEDIATELY) f1, f2 = fs call1.wait_on_called() @@ -263,7 +253,7 @@ class FutureListTests(unittest.TestCase): f4 = FutureStub(cancelled=True, done=True) fs = [f1, f2, f3, f4] - f = threaded_futures.FutureList(fs, None) + f = futures.FutureList(fs, None) self.assertEqual(f.running_futures(), [f1]) self.assertEqual(f.cancelled_futures(), [f4]) @@ -276,18 +266,18 @@ class FutureListTests(unittest.TestCase): f2 = FutureStub(cancelled=False, done=True) self.assertTrue( - threaded_futures.FutureList([f1], None).has_running_futures()) + futures.FutureList([f1], None).has_running_futures()) self.assertFalse( - threaded_futures.FutureList([f2], None).has_running_futures()) + futures.FutureList([f2], None).has_running_futures()) def test_has_cancelled_futures(self): f1 = FutureStub(cancelled=True, done=True) f2 = FutureStub(cancelled=False, done=True) self.assertTrue( - threaded_futures.FutureList([f1], None).has_cancelled_futures()) + futures.FutureList([f1], None).has_cancelled_futures()) self.assertFalse( - threaded_futures.FutureList([f2], None).has_cancelled_futures()) + futures.FutureList([f2], None).has_cancelled_futures()) def test_has_done_futures(self): f1 = FutureStub(cancelled=True, done=True) @@ -295,11 +285,11 @@ class FutureListTests(unittest.TestCase): f3 = FutureStub(cancelled=False, done=False) self.assertTrue( - threaded_futures.FutureList([f1], None).has_done_futures()) + futures.FutureList([f1], None).has_done_futures()) self.assertTrue( - threaded_futures.FutureList([f2], None).has_done_futures()) + futures.FutureList([f2], None).has_done_futures()) self.assertFalse( - threaded_futures.FutureList([f3], None).has_done_futures()) + futures.FutureList([f3], None).has_done_futures()) def test_has_successful_futures(self): f1 = FutureStub(cancelled=False, done=True) @@ -308,13 +298,13 @@ class FutureListTests(unittest.TestCase): f4 = FutureStub(cancelled=True, done=True) self.assertTrue( - threaded_futures.FutureList([f1], None).has_successful_futures()) + futures.FutureList([f1], None).has_successful_futures()) self.assertFalse( - threaded_futures.FutureList([f2], None).has_successful_futures()) + futures.FutureList([f2], None).has_successful_futures()) self.assertFalse( - threaded_futures.FutureList([f3], None).has_successful_futures()) + futures.FutureList([f3], None).has_successful_futures()) self.assertFalse( - threaded_futures.FutureList([f4], None).has_successful_futures()) + futures.FutureList([f4], None).has_successful_futures()) def test_has_exception_futures(self): f1 = FutureStub(cancelled=False, done=True) @@ -323,13 +313,13 @@ class FutureListTests(unittest.TestCase): f4 = FutureStub(cancelled=True, done=True) self.assertFalse( - threaded_futures.FutureList([f1], None).has_exception_futures()) + futures.FutureList([f1], None).has_exception_futures()) self.assertTrue( - threaded_futures.FutureList([f2], None).has_exception_futures()) + futures.FutureList([f2], None).has_exception_futures()) self.assertFalse( - threaded_futures.FutureList([f3], None).has_exception_futures()) + futures.FutureList([f3], None).has_exception_futures()) self.assertFalse( - threaded_futures.FutureList([f4], None).has_exception_futures()) + futures.FutureList([f4], None).has_exception_futures()) def test_get_item(self): f1 = FutureStub(cancelled=False, done=False) @@ -337,7 +327,7 @@ class FutureListTests(unittest.TestCase): f3 = FutureStub(cancelled=False, done=True) fs = [f1, f2, f3] - f = threaded_futures.FutureList(fs, None) + f = futures.FutureList(fs, None) self.assertEqual(f[0], f1) self.assertEqual(f[1], f2) self.assertEqual(f[2], f3) @@ -348,7 +338,7 @@ class FutureListTests(unittest.TestCase): f2 = FutureStub(cancelled=False, done=True) f3 = FutureStub(cancelled=False, done=True) - f = threaded_futures.FutureList([f1, f2, f3], None) + f = futures.FutureList([f1, f2, f3], None) self.assertEqual(len(f), 3) def test_iter(self): @@ -357,7 +347,7 @@ class FutureListTests(unittest.TestCase): f3 = FutureStub(cancelled=False, done=True) fs = [f1, f2, f3] - f = threaded_futures.FutureList(fs, None) + f = futures.FutureList(fs, None) self.assertEqual(list(iter(f)), fs) def test_contains(self): @@ -365,7 +355,7 @@ class FutureListTests(unittest.TestCase): f2 = FutureStub(cancelled=False, done=True) f3 = FutureStub(cancelled=False, done=True) - f = threaded_futures.FutureList([f1, f2], None) + f = futures.FutureList([f1, f2], None) self.assertTrue(f1 in f) self.assertTrue(f2 in f) self.assertFalse(f3 in f) @@ -376,7 +366,7 @@ class FutureListTests(unittest.TestCase): exception = FutureStub(cancelled=False, done=True, exception=IOError()) cancelled = FutureStub(cancelled=True, done=True) - f = threaded_futures.FutureList( + f = futures.FutureList( [running] * 4 + [result] * 3 + [exception] * 2 + [cancelled], None) @@ -385,7 +375,8 @@ class FutureListTests(unittest.TestCase): '[#success=3 #exception=2 #cancelled=1]>') def test_main(): test.support.run_unittest(CancelTests, - ConcurrentWaitsTest, +# ProcessPoolWaitTests, + ThreadPoolWaitTests, FutureListTests, ShutdownTest) |