diff options
-rw-r--r-- | PEP.txt | 415 | ||||
-rw-r--r-- | docs/index.rst | 413 | ||||
-rw-r--r-- | python2/crawl.py | 38 | ||||
-rw-r--r-- | python2/futures/__init__.py | 24 | ||||
-rw-r--r-- | python2/futures/_base.py | 687 | ||||
-rw-r--r-- | python2/futures/process.py | 132 | ||||
-rw-r--r-- | python2/futures/thread.py | 129 | ||||
-rw-r--r-- | python2/primes.py | 16 | ||||
-rwxr-xr-x | python2/setup.py | 8 | ||||
-rw-r--r-- | python2/test_futures.py | 984 | ||||
-rw-r--r-- | python3/crawl.py | 19 | ||||
-rw-r--r-- | python3/futures/__init__.py | 18 | ||||
-rw-r--r-- | python3/futures/_base.py | 599 | ||||
-rw-r--r-- | python3/futures/process.py | 111 | ||||
-rw-r--r-- | python3/futures/thread.py | 98 | ||||
-rw-r--r-- | python3/moprocessmoproblems.py | 59 | ||||
-rw-r--r-- | python3/primes.py | 3 | ||||
-rw-r--r-- | python3/test_futures.py | 994 |
18 files changed, 2064 insertions, 2683 deletions
diff --git a/PEP.txt b/PEP.txt deleted file mode 100644 index 6e2491d..0000000 --- a/PEP.txt +++ /dev/null @@ -1,415 +0,0 @@ -PEP: XXX -Title: futures - execute computations asynchronously -Version: $Revision$ -Last-Modified: $Date$ -Author: Brian Quinlan <brian@sweetapp.com> -Status: Draft -Type: Standards Track -Content-Type: text/x-rst -Created: 16-Oct-2009 -Python-Version: 3.2 -Post-History: - -======== -Abstract -======== - -This PEP proposes a design for a package that facilitates the evaluation of -callables using threads and processes. - -========== -Motivation -========== - -Python currently has powerful primitives to construct multi-threaded and -multi-process applications but parallelizing simple operations requires a lot of -work i.e. explicitly launching processes/threads, constructing a work/results -queue, and waiting for completion or some other termination condition (e.g. -failure, timeout). It is also difficult to design an application with a global -process/thread limit when each component invents its own parallel execution -strategy. - -============= -Specification -============= - -Check Prime Example -------------------- - -:: - - import futures - import math - - PRIMES = [ - 112272535095293, - 112451234512351, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099, - 115912095245127, - 117450548693743, - 993960000099397] - - def is_prime(n): - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True - - # Contructs one worker process per processor. - with futures.ProcessPoolExecutor() as executor: - for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): - print('%d: %s' % (number, prime)) - -Web Crawl Example ------------------ - -:: - - import functools - import urllib.request - import futures - - URLS = ['http://www.foxnews.com/', - 'http://www.cnn.com/', - 'http://europe.wsj.com/', - 'http://www.bbc.co.uk/', - 'http://some-made-up-domain.com/'] - - def load_url(url, timeout): - return urllib.request.urlopen(url, timeout=timeout).read() - - with futures.ThreadPoolExecutor(max_threads=5) as executor: - future_list = executor.run_to_futures( - [functools.partial(load_url, url, 30) for url in URLS]) - - for url, future in zip(URLS, future_list): - if future.exception() is not None: - print('%r generated an exception: %s' % (url, future.exception())) - else: - print('%r page is %d bytes' % (url, len(future.result()))) - -Interface ---------- - -The proposed package provides three core class: `Executors`, `FutureLists` and -`Futures`. - -Executor -'''''''' - -`Executor` is an abstract class that provides methods to execute calls -asynchronously. - -`run_to_futures(calls, timeout=None, return_when=ALL_COMPLETED)` - -Schedule the given calls for execution and return a `FutureList` -containing a `Future` for each call. This method should always be -called using keyword arguments, which are: - -*calls* must be a sequence of callables that take no arguments. - -*timeout* can be used to control the maximum number of seconds to wait before -returning. If *timeout* is not specified or ``None`` then there is no limit -to the wait time. - -*return_when* indicates when the method should return. It must be one of the -following constants: - -============================= ================================================== - Constant Description -============================= ================================================== -`FIRST_COMPLETED` The method will return when any call finishes. -`FIRST_EXCEPTION` The method will return when any call raises an - exception or when all calls finish. -`ALL_COMPLETED` The method will return when all calls finish. -`RETURN_IMMEDIATELY` The method will return immediately. -============================= ================================================== - -`run_to_results(calls, timeout=None)` - -Schedule the given calls for execution and return an iterator over their -results. The returned iterator raises a `TimeoutError` if `__next__()` is called -and the result isn't available after *timeout* seconds from the original call to -`run_to_results()`. If *timeout* is not specified or ``None`` then there is no -limit to the wait time. If a call raises an exception then that exception will -be raised when its value is retrieved from the iterator. - -`map(func, *iterables, timeout=None)` - -Equivalent to map(*func*, *\*iterables*) but executed asynchronously and -possibly out-of-order. The returned iterator raises a `TimeoutError` if -`__next__()` is called and the result isn't available after *timeout* seconds -from the original call to `run_to_results()`. If *timeout* is not specified or -``None`` then there is no limit to the wait time. If a call raises an exception -then that exception will be raised when its value is retrieved from the -iterator. - -`Executor.shutdown()` - -Signal the executor that it should free any resources that it is using when -the currently pending futures are done executing. Calls to -`Executor.run_to_futures`, `Executor.run_to_results` and -`Executor.map` made after shutdown will raise `RuntimeError`. - -ProcessPoolExecutor -''''''''''''''''''' - -The `ProcessPoolExecutor` class is an `Executor` subclass that uses a pool of -processes to execute calls asynchronously. - -`__init__(max_processes)` - -Executes calls asynchronously using a pool of a most *max_processes* -processes. If *max_processes* is ``None`` or not given then as many worker -processes will be created as the machine has processors. - -ThreadPoolExecutor -'''''''''''''''''' - -The `ThreadPoolExecutor` class is an `Executor` subclass that uses a pool of -threads to execute calls asynchronously. - -`__init__(max_threads)` - -Executes calls asynchronously using a pool of at most *max_threads* threads. - -FutureList Objects -'''''''''''''''''' - -The `FutureList` class is an immutable container for `Future` instances and -should only be instantiated by `Executor.run_to_futures`. - -`wait(timeout=None, return_when=ALL_COMPLETED)` - -Wait until the given conditions are met. This method should always be called -using keyword arguments, which are: - -*timeout* can be used to control the maximum number of seconds to wait before -returning. If *timeout* is not specified or ``None`` then there is no limit -to the wait time. - -*return_when* indicates when the method should return. It must be one of the -following constants: - -============================= ================================================== - Constant Description -============================= ================================================== -`FIRST_COMPLETED` The method will return when any call finishes. -`FIRST_EXCEPTION` The method will return when any call raises an - exception or when all calls finish. -`ALL_COMPLETED` The method will return when all calls finish. -`RETURN_IMMEDIATELY` The method will return immediately. - This option is only available for consistency with - `Executor.run_to_results` and is not likely to be - useful. -============================= ================================================== - -`cancel(timeout=None)` - -Cancel every `Future` in the list and wait up to *timeout* seconds for -them to be cancelled or, if any are already running, to finish. Raises a -`TimeoutError` if the running calls do not complete before the timeout. -If *timeout* is not specified or ``None`` then there is no limit to the wait -time. - -`has_running_futures()` - -Return `True` if any `Future` in the list is currently executing. - -`has_cancelled_futures()` - -Return `True` if any `Future` in the list was successfully cancelled. - -`has_done_futures()` - -Return `True` if any `Future` in the list has completed or was successfully -cancelled. - -`has_successful_futures()` - -Return `True` if any `Future` in the list has completed without raising an -exception. - -`has_exception_futures()` - -Return `True` if any `Future` in the list completed by raising an -exception. - -`cancelled_futures()` - -Return an iterator over all `Future` instances that were successfully -cancelled. - -`done_futures()` - -Return an iterator over all `Future` instances that completed or were -cancelled. - -`successful_futures()` - -Return an iterator over all `Future` instances that completed without raising an -exception. - -`exception_futures()` - -Return an iterator over all `Future` instances that completed by raising an -exception. - -`running_futures()` - -Return an iterator over all `Future` instances that are currently executing. - -`__len__()` - -Return the number of futures in the `FutureList`. - -`__getitem__(i)` - -Return the ith `Future` in the list. The order of the futures in the -`FutureList` matches the order of the class passed to -`Executor.run_to_futures` - -`FutureList.__contains__(future)` - -Return `True` if *future* is in the `FutureList`. - -Future Objects -'''''''''''''' - -The `Future` class encapsulates the asynchronous execution of a function -or method call. `Future` instances are created by -`Executor.run_to_futures` and bundled into a `FutureList`. - -`cancel()` - -Attempt to cancel the call. If the call is currently being executed then -it cannot be cancelled and the method will return `False`, otherwise the call -will be cancelled and the method will return `True`. - -`Future.cancelled()` - -Return `True` if the call was successfully cancelled. - -`Future.done()` - -Return `True` if the call was successfully cancelled or finished running. - -`result(timeout=None)` - -Return the value returned by the call. If the call hasn't yet completed then -this method will wait up to *timeout* seconds. If the call hasn't completed -in *timeout* seconds then a `TimeoutError` will be raised. If *timeout* -is not specified or ``None`` then there is no limit to the wait time. - -If the future is cancelled before completing then `CancelledError` will -be raised. - -If the call raised then this method will raise the same exception. - -`exception(timeout=None)` - -Return the exception raised by the call. If the call hasn't yet completed -then this method will wait up to *timeout* seconds. If the call hasn't -completed in *timeout* seconds then a `TimeoutError` will be raised. -If *timeout* is not specified or ``None`` then there is no limit to the wait -time. - -If the future is cancelled before completing then `CancelledError` will -be raised. - -If the call completed without raising then ``None`` is returned. - -`index` - -int indicating the index of the future in its `FutureList`. - -========= -Rationale -========= - -The proposed design of this module was heavily influenced by the the Java -java.util.concurrent package [1]_. The conceptual basis of the module, as in -Java, is the Future class, which represents the progress and result of an -asynchronous computation. The Future class makes little commitment to the -evaluation mode being used e.g. it can be be used to represent lazy or eager -evaluation, for evaluation using threads, processes or remote procedure call. - -Futures are created by concrete implementations of the Executor class -(called ExecutorService in Java). The reference implementation provides -classes that use either a process a thread pool to eagerly evaluate -computations. - -Futures have already been seen in Python as part of a popular Python -cookbook recipe [2]_ and have discussed on the Python-3000 mailing list [3]_. - -The proposed design is explicit i.e. it requires that clients be aware that -they are consuming Futures. It would be possible to design a module that -would return proxy objects (in the style of `weakref`) that could be used -transparently. It is possible to build a proxy implementation on top of -the proposed explicit mechanism. - -The proposed design does not introduce any changes to Python language syntax -or semantics. Special syntax could be introduced [4]_ to mark function and -method calls as asynchronous. A proxy result would be returned while the -operation is eagerly evaluated asynchronously, and execution would only -block if the proxy object were used before the operation completed. - -======================== -Reference Implementation -======================== - -The reference implementation [5]_ contains a complete implementation of the -proposed design. It has been tested on Linux and Mac OS X. - -========== -References -========== - -.. [1] - - `java.util.concurrent` package documentation - `http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/package-summary.html` - -.. [2] - - Python Cookbook recipe 84317, "Easy threading with Futures" - `http://code.activestate.com/recipes/84317/` - -.. [3] - - `Python-3000` thread, "mechanism for handling asynchronous concurrency" - `http://mail.python.org/pipermail/python-3000/2006-April/000960.html` - - -.. [4] - - `Python 3000` thread, "Futures in Python 3000 (was Re: mechanism for handling asynchronous concurrency)" - `http://mail.python.org/pipermail/python-3000/2006-April/000970.html` - -.. [5] - - Reference `futures` implementation `http://code.google.com/p/pythonfutures` - -========= -Copyright -========= - -This document has been placed in the public domain. - - - -.. - Local Variables: - mode: indented-text - indent-tabs-mode: nil - sentence-end-double-space: t - fill-column: 70 - coding: utf-8 - End: diff --git a/docs/index.rst b/docs/index.rst index 1a80434..ac006a8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -5,10 +5,10 @@ :synopsis: Execute computations asynchronously using threads or processes. The :mod:`futures` module provides a high-level interface for asynchronously -executing functions and methods. +executing callables. -The asynchronous execution can be be performed by threads, using -:class:`ThreadPoolExecutor`, or seperate processes, using +The asynchronous execution can be be performed by threads using +:class:`ThreadPoolExecutor` or seperate processes using :class:`ProcessPoolExecutor`. Both implement the same interface, which is defined by the abstract :class:`Executor` class. @@ -19,63 +19,56 @@ Executor Objects asynchronously. It should not be used directly, but through its two subclasses: :class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`. -.. method:: Executor.run_to_futures(calls, timeout=None, return_when=ALL_COMPLETED) +.. method:: Executor.submit(fn, *args, **kwargs) - Schedule the given calls for execution and return a :class:`FutureList` - containing a :class:`Future` for each call. This method should always be - called using keyword arguments, which are: + Schedules the callable to be executed as *fn*(*\*args*, *\*\*kwargs*) and + returns a :class:`Future` representing the execution of the callable. - *calls* must be a sequence of callables that take no arguments. +:: - *timeout* can be used to control the maximum number of seconds to wait before - returning. If *timeout* is not specified or ``None`` then there is no limit - to the wait time. + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(pow, 323, 1235) + print(future.result()) - *return_when* indicates when the method should return. It must be one of the - following constants: +.. method:: Executor.map(func, *iterables, timeout=None) - +-----------------------------+----------------------------------------+ - | Constant | Description | - +=============================+========================================+ - | :const:`FIRST_COMPLETED` | The method will return when any call | - | | finishes. | - +-----------------------------+----------------------------------------+ - | :const:`FIRST_EXCEPTION` | The method will return when any call | - | | raises an exception or when all calls | - | | finish. | - +-----------------------------+----------------------------------------+ - | :const:`ALL_COMPLETED` | The method will return when all calls | - | | finish. | - +-----------------------------+----------------------------------------+ - | :const:`RETURN_IMMEDIATELY` | The method will return immediately. | - +-----------------------------+----------------------------------------+ + Equivalent to map(*func*, *\*iterables*) but func is executed asynchronously + and several calls to *func* may be made concurrently. The returned iterator + raises a :exc:`TimeoutError` if :meth:`__next__()` is called and the result + isn't available after *timeout* seconds from the original call to + :meth:`map()`. *timeout* can be an int or float. If *timeout* is not + specified or ``None`` then there is no limit to the wait time. If a call + raises an exception then that exception will be raised when its value is + retrieved from the iterator. -.. method:: Executor.run_to_results(calls, timeout=None) +.. method:: Executor.shutdown(wait=True) - Schedule the given calls for execution and return an iterator over their - results. The returned iterator raises a :exc:`TimeoutError` if - :meth:`__next__()` is called and the result isn't available after - *timeout* seconds from the original call to :meth:`run_to_results()`. If - *timeout* is not specified or ``None`` then there is no limit to the wait - time. If a call raises an exception then that exception will be raised when - its value is retrieved from the iterator. + Signal the executor that it should free any resources that it is using when + the currently pending futures are done executing. Calls to + :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will + raise :exc:`RuntimeError`. -.. method:: Executor.map(func, *iterables, timeout=None) + If *wait* is `True` then this method will not return until all the pending + futures are done executing and the resources associated with the executor + have been freed. If *wait* is `False` then this method will return + immediately and the resources associated with the executor will be freed + when all pending futures are done executing. Regardless of the value of + *wait*, the entire Python program will not exit until all pending futures + are done executing. - Equivalent to map(*func*, *\*iterables*) but executed asynchronously and - possibly out-of-order. The returned iterator raises a :exc:`TimeoutError` if - :meth:`__next__()` is called and the result isn't available after - *timeout* seconds from the original call to :meth:`run_to_results()`. If - *timeout* is not specified or ``None`` then there is no limit to the wait - time. If a call raises an exception then that exception will be raised when - its value is retrieved from the iterator. + You can avoid having to call this method explicitly if you use the `with` + statement, which will shutdown the `Executor` (waiting as if + `Executor.shutdown` were called with *wait* set to `True`): -.. method:: Executor.shutdown() +:: + + import shutil + with ThreadPoolExecutor(max_workers=4) as e: + e.submit(shutil.copy, 'src1.txt', 'dest1.txt') + e.submit(shutil.copy, 'src2.txt', 'dest2.txt') + e.submit(shutil.copy, 'src3.txt', 'dest3.txt') + e.submit(shutil.copy, 'src3.txt', 'dest4.txt') - Signal the executor that it should free any resources that it is using when - the currently pending futures are done executing. Calls to - :meth:`Executor.run_to_futures`, :meth:`Executor.run_to_results` and - :meth:`Executor.map` made after shutdown will raise :exc:`RuntimeError`. ThreadPoolExecutor Objects -------------------------- @@ -83,9 +76,43 @@ ThreadPoolExecutor Objects The :class:`ThreadPoolExecutor` class is an :class:`Executor` subclass that uses a pool of threads to execute calls asynchronously. -.. class:: ThreadPoolExecutor(max_threads) +Deadlock can occur when the callable associated with a :class:`Future` waits on +the results of another :class:`Future`. For example: + +:: + + import time + def wait_on_b(): + time.sleep(5) + print(b.result()) # b will never complete because it is waiting on a. + return 5 + + def wait_on_a(): + time.sleep(5) + print(a.result()) # a will never complete because it is waiting on b. + return 6 + + + executor = ThreadPoolExecutor(max_workers=2) + a = executor.submit(wait_on_b) + b = executor.submit(wait_on_a) + +And: + +:: + + def wait_on_future(): + f = executor.submit(pow, 5, 2) + # This will never complete because there is only one worker thread and + # it is executing this function. + print(f.result()) + + executor = ThreadPoolExecutor(max_workers=1) + executor.submit(wait_on_future) + +.. class:: ThreadPoolExecutor(max_workers) - Executes calls asynchronously using at pool of at most *max_threads* threads. + Executes calls asynchronously using at pool of at most *max_workers* threads. .. _threadpoolexecutor-example: @@ -93,28 +120,29 @@ ThreadPoolExecutor Example ^^^^^^^^^^^^^^^^^^^^^^^^^^ :: - import functools - import urllib.request - import futures - - URLS = ['http://www.foxnews.com/', - 'http://www.cnn.com/', - 'http://europe.wsj.com/', - 'http://www.bbc.co.uk/', - 'http://some-made-up-domain.com/'] - - def load_url(url, timeout): - return urllib.request.urlopen(url, timeout=timeout).read() - - with futures.ThreadPoolExecutor(50) as executor: - future_list = executor.run_to_futures( - [functools.partial(load_url, url, 30) for url in URLS]) - - for url, future in zip(URLS, future_list): - if future.exception() is not None: - print('%r generated an exception: %s' % (url, future.exception())) - else: - print('%r page is %d bytes' % (url, len(future.result()))) + import futures + import urllib.request + + URLS = ['http://www.foxnews.com/', + 'http://www.cnn.com/', + 'http://europe.wsj.com/', + 'http://www.bbc.co.uk/', + 'http://some-made-up-domain.com/'] + + def load_url(url, timeout): + return urllib.request.urlopen(url, timeout=timeout).read() + + with futures.ThreadPoolExecutor(max_workers=5) as executor: + future_to_url = dict((executor.submit(load_url, url, 60), url) + for url in URLS) + + for future in futures.as_completed(future_to_url): + url = future_to_url[future] + if future.exception() is not None: + print('%r generated an exception: %s' % (url, + future.exception())) + else: + print('%r page is %d bytes' % (url, len(future.result()))) ProcessPoolExecutor Objects --------------------------- @@ -125,16 +153,23 @@ uses a pool of processes to execute calls asynchronously. allows it to side-step the :term:`Global Interpreter Lock` but also means that only picklable objects can be executed and returned. -.. class:: ProcessPoolExecutor(max_processes=None) +Calling :class:`Executor` or :class:`Future` methods from a callable submitted +to a :class:`ProcessPoolExecutor` will result in deadlock. - Executes calls asynchronously using a pool of at most *max_processes* - processes. If *max_processes* is ``None`` or not given then as many worker +.. class:: ProcessPoolExecutor(max_workers=None) + + Executes calls asynchronously using a pool of at most *max_workers* + processes. If *max_workers* is ``None`` or not given then as many worker processes will be created as the machine has processors. +.. _processpoolexecutor-example: + ProcessPoolExecutor Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^ :: + import math + PRIMES = [ 112272535095293, 112582705942171, @@ -153,164 +188,158 @@ ProcessPoolExecutor Example return False return True - with futures.ProcessPoolExecutor() as executor: - for number, is_prime in zip(PRIMES, executor.map(is_prime, PRIMES)): - print('%d is prime: %s' % (number, is_prime)) - -FutureList Objects ------------------- + def main(): + with futures.ProcessPoolExecutor() as executor: + for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + print('%d is prime: %s' % (number, prime)) -The :class:`FutureList` class is an immutable container for :class:`Future` -instances and should only be instantiated by :meth:`Executor.run_to_futures`. + if __name__ == '__main__': + main() -.. method:: FutureList.wait(timeout=None, return_when=ALL_COMPLETED) +Future Objects +-------------- - Wait until the given conditions are met. This method should always be - called using keyword arguments, which are: +The :class:`Future` class encapulates the asynchronous execution of a callable. +:class:`Future` instances are created by :meth:`Executor.submit`. - *timeout* can be used to control the maximum number of seconds to wait before - returning. If *timeout* is not specified or ``None`` then there is no limit - to the wait time. +.. method:: Future.cancel() - *return_when* indicates when the method should return. It must be one of the - following constants: + Attempt to cancel the call. If the call is currently being executed then + it cannot be cancelled and the method will return `False`, otherwise the call + will be cancelled and the method will return `True`. - +-----------------------------+----------------------------------------+ - | Constant | Description | - +=============================+========================================+ - | :const:`FIRST_COMPLETED` | The method will return when any call | - | | finishes. | - +-----------------------------+----------------------------------------+ - | :const:`FIRST_EXCEPTION` | The method will return when any call | - | | raises an exception or when all calls | - | | finish. | - +-----------------------------+----------------------------------------+ - | :const:`ALL_COMPLETED` | The method will return when all calls | - | | finish. | - +-----------------------------+----------------------------------------+ - | :const:`RETURN_IMMEDIATELY` | The method will return immediately. | - | | This option is only available for | - | | consistency with | - | | :meth:`Executor.run_to_results` and is | - | | not likely to be useful. | - +-----------------------------+----------------------------------------+ +.. method:: Future.cancelled() -.. method:: FutureList.cancel(timeout=None) + Return `True` if the call was successfully cancelled. - Cancel every :class:`Future` in the list and wait up to *timeout* seconds for - them to be cancelled or, if any are already running, to finish. Raises a - :exc:`TimeoutError` if the running calls do not complete before the timeout. - If *timeout* is not specified or ``None`` then there is no limit to the wait - time. +.. method:: Future.running() -.. method:: FutureList.has_running_futures() + Return `True` if the call is currently being executed and cannot be + cancelled. - Return `True` if any :class:`Future` in the list is currently executing. +.. method:: Future.done() -.. method:: FutureList.has_cancelled_futures() + Return `True` if the call was successfully cancelled or finished running. - Return `True` if any :class:`Future` in the list was successfully cancelled. +.. method:: Future.result(timeout=None) -.. method:: FutureList.has_done_futures() + Return the value returned by the call. If the call hasn't yet completed then + this method will wait up to *timeout* seconds. If the call hasn't completed + in *timeout* seconds then a :exc:`TimeoutError` will be raised. *timeout* can + be an int or float.If *timeout* is not specified or ``None`` then there is no + limit to the wait time. - Return `True` if any :class:`Future` in the list has completed or was - successfully cancelled. + If the future is cancelled before completing then :exc:`CancelledError` will + be raised. -.. method:: FutureList.has_successful_futures() + If the call raised then this method will raise the same exception. - Return `True` if any :class:`Future` in the list has completed without raising - an exception. +.. method:: Future.exception(timeout=None) -.. method:: FutureList.has_exception_futures() + Return the exception raised by the call. If the call hasn't yet completed + then this method will wait up to *timeout* seconds. If the call hasn't + completed in *timeout* seconds then a :exc:`TimeoutError` will be raised. + *timeout* can be an int or float. If *timeout* is not specified or ``None`` + then there is no limit to the wait time. - Return `True` if any :class:`Future` in the list completed by raising an - exception. + If the future is cancelled before completing then :exc:`CancelledError` will + be raised. -.. method:: FutureList.cancelled_futures() + If the call completed without raising then ``None`` is returned. - Return an iterator over all :class:`Future` instances that were successfully - cancelled. +.. method:: Future.add_done_callback(fn) -.. method:: FutureList.done_futures() + Attaches the callable *fn* to the future. *fn* will be called, with the + future as its only argument, when the future is cancelled or finishes + running. - Return an iterator over all :class:`Future` instances that completed or - were cancelled. + Added callables are called in the order that they were added and are always + called in a thread belonging to the process that added them. If the callable + raises an :exc:`Exception` then it will be logged and ignored. If the + callable raises another :exc:`BaseException` then the behavior is not + defined. -.. method:: FutureList.successful_futures() + If the future has already completed or been cancelled then *fn* will be + called immediately. - Return an iterator over all :class:`Future` instances that completed without - raising an exception. +Internal Future Methods +^^^^^^^^^^^^^^^^^^^^^^^ -.. method:: FutureList.exception_futures() +The following :class:`Future` methods are meant for use in unit tests and +:class:`Executor` implementations. - Return an iterator over all :class:`Future` instances that completed by - raising an exception. +.. method:: Future.set_running_or_notify_cancel() -.. method:: FutureList.running_futures() + This method should only be called by :class:`Executor` implementations before + executing the work associated with the :class:`Future` and by unit tests. - Return an iterator over all :class:`Future` instances that are currently - executing. + If the method returns `False` then the :class:`Future` was cancelled i.e. + :meth:`Future.cancel` was called and returned `True`. Any threads waiting + on the :class:`Future` completing (i.e. through :func:`as_completed` or + :func:`wait`) will be woken up. -.. method:: FutureList.__len__() + If the method returns `True` then the :class:`Future` was not cancelled + and has been put in the running state i.e. calls to + :meth:`Future.running` will return `True`. - Return the number of futures in the :class:`FutureList`. + This method can only be called once and cannot be called after + :meth:`Future.set_result` or :meth:`Future.set_exception` have been + called. -.. method:: FutureList.__getitem__(i) +.. method:: Future.set_result(result) - Return the ith :class:`Future` in the list. The order of the futures in the - :class:`FutureList` matches the order of the class passed to - :meth:`Executor.run_to_futures` + Sets the result of the work associated with the :class:`Future` to *result*. -.. method:: FutureList.__contains__(future) + This method should only be used by Executor implementations and unit tests. - Return `True` if *future* is in the :class:`FutureList`. +.. method:: Future.set_exception(exception) -Future Objects --------------- + Sets the result of the work associated with the :class:`Future` to the + :class:`Exception` *exception*. -The :class:`Future` class encapulates the asynchronous execution of a function -or method call. :class:`Future` instances are created by -:meth:`Executor.run_to_futures` and bundled into a :class:`FutureList`. + This method should only be used by Executor implementations and unit tests. -.. method:: Future.cancel() +Module Functions +---------------- - Attempt to cancel the call. If the call is currently being executed then - it cannot be cancelled and the method will return `False`, otherwise the call - will be cancelled and the method will return `True`. +.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED) -.. method:: Future.cancelled() + Wait for the :class:`Future` instances (possibly created by different + :class:`Executor` instances) given by *fs* to complete. Returns a named + 2-tuple of sets. The first set, named "done", contains the futures that + completed (finished or were cancelled) before the wait completed. The second + set, named "not_done", contains uncompleted futures. - Return `True` if the call was successfully cancelled. + *timeout* can be used to control the maximum number of seconds to wait before + returning. *timeout* can be an int or float. If *timeout* is not specified or + ``None`` then there is no limit to the wait time. -.. method:: Future.done() + *return_when* indicates when this function should return. It must be one of + the following constants: - Return `True` if the call was successfully cancelled or finished running. + +-----------------------------+----------------------------------------+ + | Constant | Description | + +=============================+========================================+ + | :const:`FIRST_COMPLETED` | The function will return when any | + | | future finishes or is cancelled. | + +-----------------------------+----------------------------------------+ + | :const:`FIRST_EXCEPTION` | The function will return when any | + | | future finishes by raising an | + | | exception. If no future raises an | + | | exception then it is equivalent to | + | | `ALL_COMPLETED`. | + +-----------------------------+----------------------------------------+ + | :const:`ALL_COMPLETED` | The function will return when all | + | | futures finish or are cancelled. | + +-----------------------------+----------------------------------------+ -.. method:: Future.result(timeout=None) +.. function:: as_completed(fs, timeout=None) - Return the value returned by the call. If the call hasn't yet completed then - this method will wait up to *timeout* seconds. If the call hasn't completed - in *timeout* seconds then a :exc:`TimeoutError` will be raised. If *timeout* + Returns an iterator over the :class:`Future` instances (possibly created + by different :class:`Executor` instances) given by *fs* that yields futures + as they complete (finished or were cancelled). Any futures that completed + before :func:`as_completed()` was called will be yielded first. The returned + iterator raises a :exc:`TimeoutError` if :meth:`__next__()` is called and + the result isn't available after *timeout* seconds from the original call + to :func:`as_completed()`. *timeout* can be an int or float. If *timeout* is not specified or ``None`` then there is no limit to the wait time. - - If the future is cancelled before completing then :exc:`CancelledError` will - be raised. - - If the call raised then this method will raise the same exception. - -.. method:: Future.exception(timeout=None) - - Return the exception raised by the call. If the call hasn't yet completed - then this method will wait up to *timeout* seconds. If the call hasn't - completed in *timeout* seconds then a :exc:`TimeoutError` will be raised. - If *timeout* is not specified or ``None`` then there is no limit to the wait - time. - - If the future is cancelled before completing then :exc:`CancelledError` will - be raised. - - If the call completed without raising then ``None`` is returned. - -.. attribute:: Future.index - - int indicating the index of the future in its :class:`FutureList`.
\ No newline at end of file diff --git a/python2/crawl.py b/python2/crawl.py index cf4a218..f1d29ca 100644 --- a/python2/crawl.py +++ b/python2/crawl.py @@ -19,26 +19,29 @@ URLS = ['http://www.google.com/', 'http://www.youtube.com/', 'http://www.blogger.com/'] -def load_url(url): - return urllib2.urlopen(url).read() +def load_url(url, timeout): + return urllib2.urlopen(url, timeout=timeout).read() -def download_urls_sequential(urls): +def download_urls_sequential(urls, timeout=60): url_to_content = {} for url in urls: try: - url_to_content[url] = load_url(url) + url_to_content[url] = load_url(url, timeout=timeout) except: pass return url_to_content -def download_urls_with_executor(urls, executor): +def download_urls_with_executor(urls, executor, timeout=60): try: url_to_content = {} - fs = executor.run_to_futures( - (functools.partial(load_url, url) for url in urls)) - for future in fs.successful_futures(): - url = urls[future.index] - url_to_content[url] = future.result() + future_to_url = dict((executor.submit(load_url, url, timeout), url) + for url in urls) + + for future in futures.as_completed(future_to_url): + try: + url_to_content[future_to_url[future]] = future.result() + except: + pass return url_to_content finally: executor.shutdown() @@ -46,19 +49,20 @@ def download_urls_with_executor(urls, executor): def main(): for name, fn in [('sequential', functools.partial(download_urls_sequential, URLS)), - ('threads', + ('processes', functools.partial(download_urls_with_executor, URLS, - futures.ThreadPoolExecutor(10))), - ('processes', + futures.ProcessPoolExecutor(10))), + ('threads', functools.partial(download_urls_with_executor, URLS, futures.ThreadPoolExecutor(10)))]: - print '%s: ' % name.ljust(12), + print name.ljust(12), start = time.time() url_map = fn() print '%.2f seconds (%d of %d downloaded)' % (time.time() - start, - len(url_map), - len(URLS)) + len(url_map), + len(URLS)) -main() +if __name__ == '__main__': + main() diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py index 27a5720..8331d53 100644 --- a/python2/futures/__init__.py +++ b/python2/futures/__init__.py @@ -1,18 +1,18 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Execute computations asynchronously using threads or processes.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, - ALL_COMPLETED, RETURN_IMMEDIATELY, - CancelledError, TimeoutError, - Future, FutureList) +from futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed) +from futures.process import ProcessPoolExecutor from futures.thread import ThreadPoolExecutor - -try: - import multiprocessing -except ImportError: - pass -else: - from futures.process import ProcessPoolExecutor diff --git a/python2/futures/_base.py b/python2/futures/_base.py index bec7212..ed7a094 100644 --- a/python2/futures/_base.py +++ b/python2/futures/_base.py @@ -1,54 +1,24 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. __author__ = 'Brian Quinlan (brian@sweetapp.com)' +import collections +import functools import logging import threading import time -try: - from functools import partial -except ImportError: - def partial(func, *args, **keywords): - def newfunc(*fargs, **fkeywords): - newkeywords = keywords.copy() - newkeywords.update(fkeywords) - return func(*(args + fargs), **newkeywords) - newfunc.func = func - newfunc.args = args - newfunc.keywords = keywords - return newfunc - -# The "any" and "all" builtins weren't introduced until Python 2.5. -try: - any -except NameError: - def any(iterable): - for element in iterable: - if element: - return True - return False - -try: - all -except NameError: - def all(iterable): - for element in iterable: - if not element: - return False - return True - FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' -RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' RUNNING = 'RUNNING' # The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and ThreadEventSink.add_cancelled() was called by a worker. +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' FINISHED = 'FINISHED' @@ -70,184 +40,249 @@ _STATE_TO_DESCRIPTION_MAP = { # Logger for internal use by the futures package. LOGGER = logging.getLogger("futures") -_handler = logging.StreamHandler() -LOGGER.addHandler(_handler) -del _handler - -def set_future_exception(future, event_sink, exception): - """Sets a future as having terminated with an exception. - - This function should only be used within the futures package. - - Args: - future: The Future that finished with an exception. - event_sink: The ThreadEventSink accociated with the Future's FutureList. - The event_sink will be notified of the Future's completion, which - may unblock some clients that have called FutureList.wait(). - exception: The expection that executing the Future raised. - """ - future._condition.acquire() - try: - future._exception = exception - event_sink._condition.acquire() - try: - future._state = FINISHED - event_sink.add_exception() - finally: - event_sink._condition.release() - - future._condition.notifyAll() - finally: - future._condition.release() - -def set_future_result(future, event_sink, result): - """Sets a future as having terminated without exception. - - This function should only be used within the futures package. - - Args: - future: The Future that completed. - event_sink: The ThreadEventSink accociated with the Future's FutureList. - The event_sink will be notified of the Future's completion, which - may unblock some clients that have called FutureList.wait(). - result: The value returned by the Future. - """ - future._condition.acquire() - try: - future._result = result - event_sink._condition.acquire() - try: - future._state = FINISHED - event_sink.add_result() - finally: - event_sink._condition.release() - - future._condition.notifyAll() - finally: - future._condition.release() +STDERR_HANDLER = logging.StreamHandler() +LOGGER.addHandler(STDERR_HANDLER) class Error(Exception): + """Base class for all future-related exceptions.""" pass class CancelledError(Error): + """The Future was cancelled.""" pass class TimeoutError(Error): + """The operation exceeded the given deadline.""" pass -class _WaitTracker(object): - """Provides the event that FutureList.wait(...) blocks on. - - """ +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" def __init__(self): self.event = threading.Event() + self.finished_futures = [] - def add_result(self): - raise NotImplementedError() + def add_result(self, future): + self.finished_futures.append(future) - def add_exception(self): - raise NotImplementedError() + def add_exception(self, future): + self.finished_futures.append(future) - def add_cancelled(self): - raise NotImplementedError() + def add_cancelled(self, future): + self.finished_futures.append(future) -class _FirstCompletedWaitTracker(_WaitTracker): - """Used by wait(return_when=FIRST_COMPLETED).""" +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" - def add_result(self): + def add_result(self, future): + super(_FirstCompletedWaiter, self).add_result(future) self.event.set() - def add_exception(self): + def add_exception(self, future): + super(_FirstCompletedWaiter, self).add_exception(future) self.event.set() - def add_cancelled(self): + def add_cancelled(self, future): + super(_FirstCompletedWaiter, self).add_cancelled(future) self.event.set() -class _AllCompletedWaitTracker(_WaitTracker): +class _AllCompletedWaiter(_Waiter): """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" def __init__(self, num_pending_calls, stop_on_exception): self.num_pending_calls = num_pending_calls self.stop_on_exception = stop_on_exception - _WaitTracker.__init__(self) + super(_AllCompletedWaiter, self).__init__() - def add_result(self): + def _decrement_pending_calls(self): self.num_pending_calls -= 1 if not self.num_pending_calls: self.event.set() - def add_exception(self): + def add_result(self, future): + super(_AllCompletedWaiter, self).add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super(_AllCompletedWaiter, self).add_exception(future) if self.stop_on_exception: self.event.set() else: - self.add_result() + self._decrement_pending_calls() - def add_cancelled(self): - self.add_result() + def add_cancelled(self, future): + super(_AllCompletedWaiter, self).add_cancelled(future) + self._decrement_pending_calls() -class ThreadEventSink(object): - """Forwards events to many _WaitTrackers. +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) - Each FutureList has a ThreadEventSink and each call to FutureList.wait() - causes a new _WaitTracker to be added to the ThreadEventSink. This design - allows many threads to call FutureList.wait() on the same FutureList with - different arguments. + for f in fs: + f._waiters.append(waiter) - This class should not be used by clients. + return waiter + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. """ - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] + if timeout is not None: + end_time = timeout + time.time() - def add(self, e): - self._waiters.append(e) + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = set(fs) - finished + waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) + + try: + for future in finished: + yield future + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.time() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), len(fs))) + + waiter.event.wait(timeout) + + for future in waiter.finished_futures[:]: + yield future + waiter.finished_futures.remove(future) + pending.remove(future) + + finally: + for f in fs: + f._waiters.remove(waiter) + +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done - def remove(self, e): - self._waiters.remove(e) + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) - def add_result(self): - for waiter in self._waiters: - waiter.add_result() + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) - def add_exception(self): - for waiter in self._waiters: - waiter.add_exception() + waiter = _create_and_install_waiters(fs, return_when) - def add_cancelled(self): - for waiter in self._waiters: - waiter.add_cancelled() + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) class Future(object): """Represents the result of an asynchronous computation.""" - def __init__(self, index): + def __init__(self): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None - self._index = index + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) def __repr__(self): - self._condition.acquire() - try: + with self._condition: if self._state == FINISHED: if self._exception: - return '<Future state=%s raised %s>' % ( + return '<Future at %s state=%s raised %s>' % ( + hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._exception.__class__.__name__) else: - return '<Future state=%s returned %s>' % ( + return '<Future at %s state=%s returned %s>' % ( + hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__) - return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] - finally: - self._condition.release() - - @property - def index(self): - """The index of the future in its FutureList.""" - return self._index + return '<Future at %s state=%s>' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state]) def cancel(self): """Cancel the future if possible. @@ -255,40 +290,33 @@ class Future(object): Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """ - self._condition.acquire() - try: + with self._condition: if self._state in [RUNNING, FINISHED]: return False - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: - self._state = CANCELLED - self._condition.notify_all() - return True - finally: - self._condition.release() + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True def cancelled(self): """Return True if the future has cancelled.""" - self._condition.acquire() - try: + with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - finally: - self._condition.release() def running(self): - self._condition.acquire() - try: + """Return True if the future is currently executing.""" + with self._condition: return self._state == RUNNING - finally: - self._condition.release() def done(self): """Return True of the future was cancelled or finished executing.""" - self._condition.acquire() - try: + with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - finally: - self._condition.release() def __get_result(self): if self._exception: @@ -296,6 +324,23 @@ class Future(object): else: return self._result + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + fn(self) + def result(self, timeout=None): """Return the result of the call that the future represents. @@ -312,8 +357,7 @@ class Future(object): timeout. Exception: If the call raised then that exception will be raised. """ - self._condition.acquire() - try: + with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: @@ -327,8 +371,6 @@ class Future(object): return self.__get_result() else: raise TimeoutError() - finally: - self._condition.release() def exception(self, timeout=None): """Return the exception raised by the call that the future represents. @@ -348,8 +390,7 @@ class Future(object): timeout. """ - self._condition.acquire() - try: + with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: @@ -363,215 +404,111 @@ class Future(object): return self._exception else: raise TimeoutError() - finally: - self._condition.release() -class FutureList(object): - def __init__(self, futures, event_sink): - """Initializes the FutureList. Should not be called by clients.""" - self._futures = futures - self._event_sink = event_sink + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. - def wait(self, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the list to complete. + Should only be used by Executor implementations and unit tests. - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting (this is not likely - to be a useful option but it is there to - be symmetrical with the - executor.run_to_futures() method. + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. - Raises: - TimeoutError: If the wait condition wasn't satisfied before the - given timeout. - """ - if return_when == RETURN_IMMEDIATELY: - return - - # Futures cannot change state without this condition being held. - self._event_sink._condition.acquire() - try: - # Make a quick exit if every future is already done. This check is - # necessary because, if every future is in the - # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will - # never receive any events. - if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - for f in self): - return - - if return_when == FIRST_COMPLETED: - completed_tracker = _FirstCompletedWaitTracker() - else: - # Calculate how many events are expected before every future - # is complete. This can be done without holding the futures' - # locks because a future cannot transition itself into either - # of the states being looked for. - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] - for f in self) - - if return_when == FIRST_EXCEPTION: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=False) - - self._event_sink.add(completed_tracker) - finally: - self._event_sink._condition.release() + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. - try: - completed_tracker.event.wait(timeout) - finally: - self._event_sink.remove(completed_tracker) + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. - def cancel(self, timeout=None): - """Cancel the futures in the list. - - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. + Returns: + False if the Future was cancelled, True otherwise. Raises: - TimeoutError: If all the futures were not finished before the - given timeout. + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. """ - for f in self: - f.cancel() - self.wait(timeout=timeout, return_when=ALL_COMPLETED) - if any(not f.done() for f in self): - raise TimeoutError() - - def has_running_futures(self): - """Returns True if any futures in the list are still running.""" - return any(self.running_futures()) - - def has_cancelled_futures(self): - """Returns True if any futures in the list were cancelled.""" - return any(self.cancelled_futures()) - - def has_done_futures(self): - """Returns True if any futures in the list are finished or cancelled.""" - return any(self.done_futures()) - - def has_successful_futures(self): - """Returns True if any futures in the list finished without raising.""" - return any(self.successful_futures()) - - def has_exception_futures(self): - """Returns True if any futures in the list finished by raising.""" - return any(self.exception_futures()) - - def cancelled_futures(self): - """Returns all cancelled futures in the list.""" - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) - - def done_futures(self): - """Returns all futures in the list that are finished or cancelled.""" - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) - - def successful_futures(self): - """Returns all futures in the list that finished without raising.""" - return (f for f in self - if f._state == FINISHED and f._exception is None) - - def exception_futures(self): - """Returns all futures in the list that finished by raising.""" - return (f for f in self - if f._state == FINISHED and f._exception is not None) - - def running_futures(self): - """Returns all futures in the list that are still running.""" - return (f for f in self if f._state == RUNNING) - - def __len__(self): - return len(self._futures) - - def __getitem__(self, i): - return self._futures[i] - - def __iter__(self): - return iter(self._futures) - - def __contains__(self, future): - return future in self._futures + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True + else: + LOGGER.critical('Future %s in unexpected state: %s', + id(self.future), + self.future._state) + raise RuntimeError('Future in unexpected state') - def __repr__(self): - states = dict([(state, 0) for state in _FUTURE_STATES]) - for f in self: - states[f._state] += 1 - - return ('<FutureList #futures=%d ' - '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % ( - len(self), - states[PENDING], - states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], - states[RUNNING], - states[FINISHED])) + def set_result(self, result): + """Sets the return value of work associated with the future. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - """Return a list of futures representing the given calls. - Args: - calls: A sequence of callables that take no arguments. These will - be bound to Futures and returned. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting. + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. Returns: - A FutureList containing Futures for the given calls. + A Future representing the given call. """ raise NotImplementedError() - def run_to_results(self, calls, timeout=None): - """Returns a iterator of the results of the given calls. + def map(self, fn, *iterables, **kwargs): + """Returns a iterator equivalent to map(fn, iter). Args: - calls: A sequence of callables that take no arguments. These will - be called and their results returned. + fn: A callable that will take take as many arguments as there are + passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: - An iterator over the results of the given calls. Equivalent to: - (call() for call in calls) but the calls may be evaluated - out-of-order. + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. Raises: - TimeoutError: If all the given calls were not completed before the - given timeout. - Exception: If any call() raises. + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. """ + timeout = kwargs.get('timeout') if timeout is not None: end_time = timeout + time.time() - fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) + fs = [self.submit(fn, *args) for args in zip(*iterables)] try: for future in fs: @@ -579,44 +516,26 @@ class Executor(object): yield future.result() else: yield future.result(end_time - time.time()) - except Exception, e: - # Python 2.4 and earlier don't allow yield statements in - # try/finally blocks - try: - fs.cancel(timeout=0) - except TimeoutError: - pass - raise e - - def map(self, func, *iterables, **kwargs): - """Returns a iterator equivalent to map(fn, iter). + finally: + for future in fs: + future.cancel() - Args: - func: A callable that will take take as many arguments as there - are passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. """ - timeout = kwargs.get('timeout') or None - calls = [partial(func, *args) for args in zip(*iterables)] - return self.run_to_results(calls, timeout=timeout) - - def shutdown(self): - """Clean-up. No other methods can be called afterwards.""" - raise NotImplementedError() + pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown() + self.shutdown(wait=True) return False diff --git a/python2/futures/process.py b/python2/futures/process.py index 7f1b153..ec48377 100644 --- a/python2/futures/process.py +++ b/python2/futures/process.py @@ -1,4 +1,5 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Implements ProcessPoolExecutor. @@ -23,9 +24,8 @@ The follow diagram and text describe the data-flow through the system: | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ -Executor.run_to_futures() called: -- creates a uniquely numbered _WorkItem for each call and adds them to the - "Work Items" dict +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: @@ -42,15 +42,11 @@ Process #1..n: - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Request Q" """ - + __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) import atexit +import _base import Queue import multiprocessing import threading @@ -63,7 +59,7 @@ import weakref # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the @@ -86,7 +82,7 @@ def _remove_dead_thread_references(): Should be called periodically to prevent memory leaks in scenarios such as: >>> while True: - >>> ... t = ThreadPoolExecutor(max_threads=5) + >>> ... t = ThreadPoolExecutor(max_workers=5) >>> ... t.map(int, ['1', '2', '3', '4', '5']) """ for thread_reference in set(_thread_references): @@ -100,10 +96,11 @@ def _remove_dead_thread_references(): EXTRA_QUEUED_CALLS = 1 class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call + def __init__(self, future, fn, args, kwargs): self.future = future - self.completion_tracker = completion_tracker + self.fn = fn + self.args = args + self.kwargs = kwargs class _ResultItem(object): def __init__(self, work_id, exception=None, result=None): @@ -112,9 +109,11 @@ class _ResultItem(object): self.result = result class _CallItem(object): - def __init__(self, work_id, call): + def __init__(self, work_id, fn, args, kwargs): self.work_id = work_id - self.call = call + self.fn = fn + self.args = args + self.kwargs = kwargs def _process_worker(call_queue, result_queue, shutdown): """Evaluates calls from call_queue and places the results in result_queue. @@ -137,8 +136,8 @@ def _process_worker(call_queue, result_queue, shutdown): return else: try: - r = call_item.call() - except Exception, e: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException as e: result_queue.put(_ResultItem(call_item.work_id, exception=e)) else: @@ -172,19 +171,15 @@ def _add_call_item_to_queue(pending_work_items, else: work_item = pending_work_items[work_id] - if work_item.future.cancelled(): - work_item.future._condition.acquire() - work_item.future._condition.notify_all() - work_item.future._condition.release() - - work_item.completion_tracker.add_cancelled() + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: del pending_work_items[work_id] continue - else: - work_item.future._condition.acquire() - work_item.future._state = RUNNING - work_item.future._condition.release() - call_queue.put(_CallItem(work_id, work_item.call), block=True) def _queue_manangement_worker(executor_reference, processes, @@ -218,6 +213,7 @@ def _queue_manangement_worker(executor_reference, _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) + try: result_item = result_queue.get(block=True, timeout=0.1) except Queue.Empty: @@ -244,34 +240,30 @@ def _queue_manangement_worker(executor_reference, del pending_work_items[result_item.work_id] if result_item.exception: - set_future_exception(work_item.future, - work_item.completion_tracker, - result_item.exception) + work_item.future.set_exception(result_item.exception) else: - set_future_result(work_item.future, - work_item.completion_tracker, - result_item.result) + work_item.future.set_result(result_item.result) -class ProcessPoolExecutor(Executor): - def __init__(self, max_processes=None): +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. Args: - max_processes: The maximum number of processes that can be used to + max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """ _remove_dead_thread_references() - if max_processes is None: - self._max_processes = multiprocessing.cpu_count() + if max_workers is None: + self._max_workers = multiprocessing.cpu_count() else: - self._max_processes = max_processes + self._max_workers = max_workers # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_processes + + self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) self._result_queue = multiprocessing.Queue() self._work_ids = Queue.Queue() @@ -296,12 +288,12 @@ class ProcessPoolExecutor(Executor): self._call_queue, self._result_queue, self._shutdown_process_event)) - self._queue_management_thread.setDaemon(True) + self._queue_management_thread.daemon = True self._queue_management_thread.start() _thread_references.add(weakref.ref(self._queue_management_thread)) def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_processes): + for _ in range(len(self._processes), self._max_workers): p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, @@ -310,36 +302,36 @@ class ProcessPoolExecutor(Executor): p.start() self._processes.add(p) - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - self._shutdown_lock.acquire() - try: + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: if self._shutdown_thread: - raise RuntimeError('cannot run new futures after shutdown') + raise RuntimeError('cannot schedule new futures after shutdown') - futures = [] - event_sink = ThreadEventSink() + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) - for index, call in enumerate(calls): - f = Future(index) - self._pending_work_items[self._queue_count] = _WorkItem( - call, f, event_sink) - self._work_ids.put(self._queue_count) - futures.append(f) - self._queue_count += 1 + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 self._start_queue_management_thread() self._adjust_process_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl - finally: - self._shutdown_lock.release() - - def shutdown(self): - self._shutdown_lock.acquire() - try: + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def shutdown(self, wait=True): + with self._shutdown_lock: self._shutdown_thread = True - finally: - self._shutdown_lock.release() + if wait: + if self._queue_management_thread: + self._queue_management_thread.join() + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._shutdown_process_event = None + self._processes = None + shutdown.__doc__ = _base.Executor.shutdown.__doc__ -atexit.register(_python_exit)
\ No newline at end of file +atexit.register(_python_exit) diff --git a/python2/futures/thread.py b/python2/futures/thread.py index 4071574..3f1584a 100644 --- a/python2/futures/thread.py +++ b/python2/futures/thread.py @@ -1,16 +1,12 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Implements ThreadPoolExecutor.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - LOGGER, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) import atexit +import _base import Queue import threading import weakref @@ -22,15 +18,15 @@ import weakref # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until they -# finish. +# workers to exit when their work queues are empty and then waits until the +# threads finish. -_thread_references = set() # Weakrefs to every active worker thread. -_shutdown = False # Indicates that the interpreter is shutting down. +_thread_references = set() +_shutdown = False def _python_exit(): global _shutdown @@ -43,11 +39,10 @@ def _python_exit(): def _remove_dead_thread_references(): """Remove inactive threads from _thread_references. - Should be called periodically to prevent thread objects from accumulating in - scenarios such as: + Should be called periodically to prevent memory leaks in scenarios such as: >>> while True: - >>> ... t = ThreadPoolExecutor(max_threads=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) + ... t = ThreadPoolExecutor(max_workers=5) + ... t.map(int, ['1', '2', '3', '4', '5']) """ for thread_reference in set(_thread_references): if thread_reference() is None: @@ -56,38 +51,22 @@ def _remove_dead_thread_references(): atexit.register(_python_exit) class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call + def __init__(self, future, fn, args, kwargs): self.future = future - self.completion_tracker = completion_tracker + self.fn = fn + self.args = args + self.kwargs = kwargs def run(self): - self.future._condition.acquire() - try: - if self.future._state == PENDING: - self.future._state = RUNNING - elif self.future._state == CANCELLED: - self.completion_tracker._condition.acquire() - try: - self.future._state = CANCELLED_AND_NOTIFIED - self.completion_tracker.add_cancelled() - return - finally: - self.completion_tracker._condition.release() - else: - LOGGER.critical('Future %s in unexpected state: %d', - id(self.future), - self.future._state) - return - finally: - self.future._condition.release() + if not self.future.set_running_or_notify_cancel(): + return try: - result = self.call() - except Exception, e: - set_future_exception(self.future, self.completion_tracker, e) + result = self.fn(*self.args, **self.kwargs) + except BaseException as e: + self.future.set_exception(e) else: - set_future_result(self.future, self.completion_tracker, result) + self.future.set_result(result) def _worker(executor_reference, work_queue): try: @@ -105,61 +84,53 @@ def _worker(executor_reference, work_queue): del executor else: work_item.run() - except Exception, e: - LOGGER.critical('Exception in worker', exc_info=True) + except BaseException as e: + _base.LOGGER.critical('Exception in worker', exc_info=True) -class ThreadPoolExecutor(Executor): - def __init__(self, max_threads): +class ThreadPoolExecutor(_base.Executor): + def __init__(self, max_workers): """Initializes a new ThreadPoolExecutor instance. Args: - max_threads: The maximum number of threads that can be used to + max_workers: The maximum number of threads that can be used to execute the given calls. """ _remove_dead_thread_references() - self._max_threads = max_threads + self._max_workers = max_workers self._work_queue = Queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + def _adjust_thread_count(self): - for _ in range(len(self._threads), - min(self._max_threads, self._work_queue.qsize())): + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self), self._work_queue)) - t.setDaemon(True) + t.daemon = True t.start() self._threads.add(t) _thread_references.add(weakref.ref(t)) - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - self._shutdown_lock.acquire() - try: - if self._shutdown: - raise RuntimeError('cannot run new futures after shutdown') - - futures = [] - event_sink = ThreadEventSink() - for index, call in enumerate(calls): - f = Future(index) - w = _WorkItem(call, f, event_sink) - self._work_queue.put(w) - futures.append(f) - - self._adjust_thread_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl - finally: - self._shutdown_lock.release() - run_to_futures.__doc__ = Executor.run_to_futures.__doc__ - - def shutdown(self): - self._shutdown_lock.acquire() - try: + def shutdown(self, wait=True): + with self._shutdown_lock: self._shutdown = True - finally: - self._shutdown_lock.release() - shutdown.__doc__ = Executor.shutdown.__doc__ + if wait: + for t in self._threads: + t.join() + shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/python2/primes.py b/python2/primes.py index 0b2bf81..fa6c355 100644 --- a/python2/primes.py +++ b/python2/primes.py @@ -25,29 +25,23 @@ def sequential(): return list(map(is_prime, PRIMES)) def with_process_pool_executor(): - executor = futures.ProcessPoolExecutor(10) - try: + with futures.ProcessPoolExecutor(10) as executor: return list(executor.map(is_prime, PRIMES)) - finally: - executor.shutdown() def with_thread_pool_executor(): - executor = futures.ThreadPoolExecutor(10) - try: + with futures.ThreadPoolExecutor(10) as executor: return list(executor.map(is_prime, PRIMES)) - finally: - executor.shutdown() def main(): for name, fn in [('sequential', sequential), ('processes', with_process_pool_executor), ('threads', with_thread_pool_executor)]: - print '%s: ' % name.ljust(12), - + print name.ljust(12), start = time.time() if fn() != [True] * len(PRIMES): print 'failed' else: print '%.2f seconds' % (time.time() - start) -main()
\ No newline at end of file +if __name__ == '__main__': + main() diff --git a/python2/setup.py b/python2/setup.py index 897dc86..fcd05f2 100755 --- a/python2/setup.py +++ b/python2/setup.py @@ -1,10 +1,10 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 from distutils.core import setup -setup(name='futures', +setup(name='futures3', version='1.0', - description='Java-style futures implementation in Python 2.x', + description='Java-style futures implementation in Python 3.x', author='Brian Quinlan', author_email='brian@sweetapp.com', url='http://code.google.com/p/pythonfutures', @@ -14,5 +14,5 @@ setup(name='futures', classifiers=['License :: OSI Approved :: BSD License', 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', - 'Programming Language :: Python :: 2'] + 'Programming Language :: Python :: 3'] ) diff --git a/python2/test_futures.py b/python2/test_futures.py index e4bdf36..2d5672b 100644 --- a/python2/test_futures.py +++ b/python2/test_futures.py @@ -1,16 +1,25 @@ -import unittest -import threading -import time +import logging import multiprocessing +import re +import StringIO +import sys +import threading from test import test_support +import time +import unittest + +if sys.platform.startswith('win'): + import ctypes + import ctypes.wintypes import futures -import futures._base from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, + LOGGER, STDERR_HANDLER, wait) +import futures.process def create_future(state=PENDING, exception=None, result=None): - f = Future(0) + f = Future() f._state = state f._exception = exception f._result = result @@ -23,68 +32,104 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) +def mul(x, y): + return x * y + class Call(object): + """A call that can be submitted to a future.Executor for testing. + + The call signals when it is called and waits for an event before finishing. + """ CALL_LOCKS = {} + def _create_event(self): + if sys.platform.startswith('win'): + class SECURITY_ATTRIBUTES(ctypes.Structure): + _fields_ = [("nLength", ctypes.wintypes.DWORD), + ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), + ("bInheritHandle", ctypes.wintypes.BOOL)] + + s = SECURITY_ATTRIBUTES() + s.nLength = ctypes.sizeof(s) + s.lpSecurityDescriptor = None + s.bInheritHandle = True + + handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), + True, + False, + None) + assert handle is not None + return handle + else: + event = multiprocessing.Event() + self.CALL_LOCKS[id(event)] = event + return id(event) + + def _wait_on_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) + assert r == 0 + else: + self.CALL_LOCKS[handle].wait() + + def _signal_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.SetEvent(handle) + assert r != 0 + else: + self.CALL_LOCKS[handle].set() + def __init__(self, manual_finish=False, result=42): - called_event = multiprocessing.Event() - can_finish = multiprocessing.Event() + self._called_event = self._create_event() + self._can_finish = self._create_event() self._result = result - self._called_event_id = id(called_event) - self._can_finish_event_id = id(can_finish) - - self.CALL_LOCKS[self._called_event_id] = called_event - self.CALL_LOCKS[self._can_finish_event_id] = can_finish if not manual_finish: - self._can_finish.set() - - @property - def _can_finish(self): - return self.CALL_LOCKS[self._can_finish_event_id] - - @property - def _called_event(self): - return self.CALL_LOCKS[self._called_event_id] + self._signal_event(self._can_finish) def wait_on_called(self): - self._called_event.wait() + self._wait_on_event(self._called_event) def set_can(self): - self._can_finish.set() - - def called(self): - return self._called_event.is_set() + self._signal_event(self._can_finish) def __call__(self): - if self._called_event.is_set(): print('called twice') + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) - self._called_event.set() - self._can_finish.wait() return self._result def close(self): - del self.CALL_LOCKS[self._called_event_id] - del self.CALL_LOCKS[self._can_finish_event_id] + self.set_can() + if sys.platform.startswith('win'): + ctypes.windll.kernel32.CloseHandle(self._called_event) + ctypes.windll.kernel32.CloseHandle(self._can_finish) + else: + del self.CALL_LOCKS[self._called_event] + del self.CALL_LOCKS[self._can_finish] class ExceptionCall(Call): def __call__(self): - assert not self._called_event.is_set(), 'already called' - - self._called_event.set() - self._can_finish.wait() + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) raise ZeroDivisionError() +class MapCall(Call): + def __init__(self, result=42): + super(MapCall, self).__init__(manual_finish=True, result=result) + + def __call__(self, manual_finish): + if manual_finish: + super(MapCall, self).__call__() + return self._result + class ExecutorShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): - call1 = Call() - try: - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.run_to_futures, - [call1]) - finally: - call1.close() + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.submit, + pow, 2, 5) + def _start_some_futures(self): call1 = Call(manual_finish=True) @@ -92,13 +137,14 @@ class ExecutorShutdownTest(unittest.TestCase): call3 = Call(manual_finish=True) try: - self.executor.run_to_futures([call1, call2, call3], - return_when=futures.RETURN_IMMEDIATELY) - + self.executor.submit(call1) + self.executor.submit(call2) + self.executor.submit(call3) + call1.wait_on_called() call2.wait_on_called() call3.wait_on_called() - + call1.set_can() call2.set_can() call3.set_can() @@ -109,10 +155,10 @@ class ExecutorShutdownTest(unittest.TestCase): class ThreadPoolShutdownTest(ExecutorShutdownTest): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=5) + self.executor = futures.ThreadPoolExecutor(max_workers=5) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) def test_threads_terminate(self): self._start_some_futures() @@ -122,7 +168,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): t.join() def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_threads=5) as e: + with futures.ThreadPoolExecutor(max_workers=5) as e: executor = e self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) @@ -131,7 +177,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): t.join() def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_threads=5) + executor = futures.ThreadPoolExecutor(max_workers=5) executor.map(abs, range(-5, 5)) threads = executor._threads del executor @@ -141,32 +187,31 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): class ProcessPoolShutdownTest(ExecutorShutdownTest): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=5) + self.executor = futures.ProcessPoolExecutor(max_workers=5) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) def test_processes_terminate(self): self._start_some_futures() self.assertEqual(len(self.executor._processes), 5) + processes = self.executor._processes self.executor.shutdown() - self.executor._queue_management_thread.join() - for p in self.executor._processes: + for p in processes: p.join() def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_processes=5) as e: + with futures.ProcessPoolExecutor(max_workers=5) as e: executor = e self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - executor._queue_management_thread.join() - for p in executor._processes: + for p in self.executor._processes: p.join() def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_processes=5) + executor = futures.ProcessPoolExecutor(max_workers=5) list(executor.map(abs, range(-5, 5))) queue_management_thread = executor._queue_management_thread processes = executor._processes @@ -176,312 +221,316 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): for p in processes: p.join() -class WaitsTest(unittest.TestCase): - def test_concurrent_waits(self): - def wait_for_ALL_COMPLETED(): - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertTrue(f3.done()) - self.assertTrue(f4.done()) - all_completed.release() - - def wait_for_FIRST_COMPLETED(): - fs.wait(return_when=futures.FIRST_COMPLETED) - self.assertTrue(f1.done()) - self.assertFalse(f2.done()) - self.assertFalse(f3.done()) - self.assertFalse(f4.done()) - first_completed.release() - def wait_for_FIRST_EXCEPTION(): - fs.wait(return_when=futures.FIRST_EXCEPTION) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertFalse(f3.done()) - self.assertFalse(f4.done()) - first_exception.release() - - all_completed = threading.Semaphore(0) - first_completed = threading.Semaphore(0) - first_exception = threading.Semaphore(0) +class WaitTests(unittest.TestCase): + def test_first_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call() - + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - threads = [] - for wait_test in [wait_for_ALL_COMPLETED, - wait_for_FIRST_COMPLETED, - wait_for_FIRST_EXCEPTION]: - t = threading.Thread(target=wait_test) - t.start() - threads.append(t) - - time.sleep(1) # give threads enough time to execute wait + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) - call1.set_can() - first_completed.acquire() - call2.set_can() - first_exception.acquire() - call3.set_can() - all_completed.acquire() - - self.executor.shutdown() + t = threading.Thread(target=wait_test) + t.start() + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) + + self.assertEquals(set([future1]), done) + self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) finally: call1.close() call2.close() - call3.close() - call4.close() -class ThreadPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + def test_first_completed_one_already_completed(self): + call1 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) - def tearDown(self): - self.executor.shutdown() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) -class ProcessPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() - def tearDown(self): - self.executor.shutdown() + def test_first_exception(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() -class CancelTests(unittest.TestCase): - def test_cancel_states(self): call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - + call2 = ExceptionCall(manual_finish=True) + call3 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertEqual(f1.cancel(), False) - self.assertEqual(f2.cancel(), True) - self.assertEqual(f4.cancel(), True) - self.assertEqual(f1.cancelled(), False) - self.assertEqual(f2.cancelled(), True) - self.assertEqual(f3.cancelled(), False) - self.assertEqual(f4.cancelled(), True) - self.assertEqual(f1.done(), False) - self.assertEqual(f2.done(), True) - self.assertEqual(f3.done(), False) - self.assertEqual(f4.done(), True) - - call1.set_can() - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertEqual(f1.result(), 42) - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - self.assertEqual(f3.result(), 42) - self.assertRaises(futures.CancelledError, f4.result) - self.assertRaises(futures.CancelledError, f4.exception) - - self.assertEqual(call2.called(), False) - self.assertEqual(call4.called(), False) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set([future3]), pending) finally: call1.close() call2.close() call3.close() - call4.close() - def test_wait_for_individual_cancel_while_waiting(self): - def end_call(): - # Wait until the main thread is waiting on the results of the - # future. - time.sleep(1) - f2.cancel() + def test_first_exception_some_already_complete(self): + def wait_test(): + while not future1._waiters: + pass call1.set_can() - call1 = Call(manual_finish=True) - call2 = Call() - + call1 = ExceptionCall(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2 = fs - - call1.wait_on_called() - t = threading.Thread(target=end_call) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) t.start() - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - t.join() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) + + finally: call1.close() call2.close() - def test_wait_with_already_cancelled_futures(self): + def test_first_exception_one_already_failed(self): call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() + try: + future1 = self.executor.submit(call1) + + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([EXCEPTION_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() + + def test_all_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertTrue(f2.cancel()) - self.assertTrue(f3.cancel()) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set(), pending) + + + finally: + call1.close() + call2.close() + + def test_all_completed_some_already_completed(self): + def wait_test(): + while not future1._waiters: + pass + + future4.cancel() call1.set_can() - - fs.wait(return_when=futures.ALL_COMPLETED) + call2.set_can() + call3.set_can() + + self.assertTrue( + futures.process.EXTRA_QUEUED_CALLS <= 1, + 'this test assumes that future4 will be cancelled before it is ' + 'queued to run - which might not be the case if ' + 'ProcessPoolExecutor is too aggresive in scheduling futures') + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + future4 = self.executor.submit(call4) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4]), + finished) + self.assertEquals(set(), pending) finally: call1.close() call2.close() call3.close() call4.close() - def test_cancel_all(self): - call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() + def test_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) - call1.set_can() - fs.cancel() - - self.assertFalse(f1.cancelled()) - self.assertTrue(f2.cancelled()) - self.assertTrue(f3.cancelled()) - self.assertTrue(f4.cancelled()) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=1, + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEquals(set([future2]), pending) + + finally: call1.close() call2.close() - call3.close() - call4.close() -class ThreadPoolCancelTests(CancelTests): + +class ThreadPoolWaitTests(WaitTests): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + self.executor = futures.ThreadPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) -class ProcessPoolCancelTests(WaitsTest): +class ProcessPoolWaitTests(WaitTests): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.executor = futures.ProcessPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_run_to_futures(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) - call4 = Call() - call5 = Call() + self.executor.shutdown(wait=True) + +class AsCompletedTests(unittest.TestCase): + # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. + def test_no_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - f1, f2, f3, f4, f5 = self.executor.run_to_futures( - [call1, call2, call3, call4, call5], - return_when=futures.RETURN_IMMEDIATELY) - - call3.wait_on_called() + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) - # ProcessPoolExecutor uses a thread to propogate results into the - # future. Calling result() ensures that the thread has done its work - # before doing the next set of checks. - f1.result() - f2.result() - - self.assertTrue(f1.done()) - self.assertFalse(f1.running()) - self.assertEqual(f1.index, 0) - - self.assertTrue(f2.done()) - self.assertFalse(f2.running()) - self.assertEqual(f2.index, 1) - - self.assertFalse(f3.done()) - self.assertTrue(f3.running()) - self.assertEqual(f3.index, 2) - - # ProcessPoolExecutor may mark some futures as running before they - # actually are so don't check these ones. - self.assertFalse(f4.done()) - self.assertEqual(f4.index, 3) - - self.assertFalse(f5.done()) - self.assertEqual(f5.index, 4) + t = threading.Thread(target=wait_test) + t.start() + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEquals(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) finally: - call3.set_can() # Let the call finish executing. call1.close() call2.close() - call3.close() - call4.close() - call5.close() - def test_run_to_results(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(result=3) + def test_zero_timeout(self): + call1 = Call(manual_finish=True) try: - self.assertEqual( - list(self.executor.run_to_results([call1, call2, call3])), - [1, 2, 3]) + future1 = self.executor.submit(call1) + completed_futures = set() + try: + for future in futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1], + timeout=0): + completed_futures.add(future) + except futures.TimeoutError: + pass + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + completed_futures) finally: call1.close() - call2.close() - call3.close() - def test_run_to_results_exception(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = ExceptionCall() - try: - i = self.executor.run_to_results([call1, call2, call3]) - - self.assertEqual(i.next(), 1) - self.assertEqual(i.next(), 2) - self.assertRaises(ZeroDivisionError, i.next) - finally: - call1.close() - call2.close() - call3.close() +class ThreadPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_workers=1) - def test_run_to_results_timeout(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) + def tearDown(self): + self.executor.shutdown(wait=True) - try: - i = self.executor.run_to_results([call1, call2, call3], timeout=1) - self.assertEqual(i.next(), 1) - self.assertEqual(i.next(), 2) - self.assertRaises(futures.TimeoutError, i.next) - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() +class ProcessPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_submit(self): + future = self.executor.submit(pow, 2, 8) + self.assertEquals(256, future.result()) + + def test_submit_keyword(self): + future = self.executor.submit(mul, 2, y=8) + self.assertEquals(16, future.result()) def test_map(self): self.assertEqual( @@ -494,36 +543,142 @@ class ExecutorTest(unittest.TestCase): self.assertEqual(i.next(), (0, 1)) self.assertRaises(ZeroDivisionError, i.next) + def test_map_timeout(self): + results = [] + timeout_call = MapCall() + try: + try: + for i in self.executor.map(timeout_call, + [False, False, True], + timeout=1): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + finally: + timeout_call.close() + + self.assertEquals([42, 42], results) + class ThreadPoolExecutorTest(ExecutorTest): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + self.executor = futures.ThreadPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) class ProcessPoolExecutorTest(ExecutorTest): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.executor = futures.ProcessPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) class FutureTests(unittest.TestCase): - # Future.index() is tested by ExecutorTest - # Future.cancel() is further tested by CancelTests. + def test_done_callback_with_result(self): + self.callback_result = None + def fn(callback_future): + self.callback_result = callback_future.result() + + f = Future() + f.add_done_callback(fn) + f.set_result(5) + self.assertEquals(5, self.callback_result) + + def test_done_callback_with_exception(self): + self.callback_exception = None + def fn(callback_future): + self.callback_exception = callback_future.exception() + + f = Future() + f.add_done_callback(fn) + f.set_exception(Exception('test')) + self.assertEquals(('test',), self.callback_exception.args) + + def test_done_callback_with_cancel(self): + self.was_cancelled = None + def fn(callback_future): + self.was_cancelled = callback_future.cancelled() + + f = Future() + f.add_done_callback(fn) + self.assertTrue(f.cancel()) + self.assertTrue(self.was_cancelled) + + def test_done_callback_raises(self): + LOGGER.removeHandler(STDERR_HANDLER) + logging_stream = StringIO.StringIO() + handler = logging.StreamHandler(logging_stream) + LOGGER.addHandler(handler) + try: + self.raising_was_called = False + self.fn_was_called = False + + def raising_fn(callback_future): + self.raising_was_called = True + raise Exception('doh!') + + def fn(callback_future): + self.fn_was_called = True + + f = Future() + f.add_done_callback(raising_fn) + f.add_done_callback(fn) + f.set_result(5) + self.assertTrue(self.raising_was_called) + self.assertTrue(self.fn_was_called) + self.assertTrue('Exception: doh!' in logging_stream.getvalue()) + finally: + LOGGER.removeHandler(handler) + LOGGER.addHandler(STDERR_HANDLER) + + def test_done_callback_already_successful(self): + self.callback_result = None + def fn(callback_future): + self.callback_result = callback_future.result() + + f = Future() + f.set_result(5) + f.add_done_callback(fn) + self.assertEquals(5, self.callback_result) + + def test_done_callback_already_failed(self): + self.callback_exception = None + def fn(callback_future): + self.callback_exception = callback_future.exception() + + f = Future() + f.set_exception(Exception('test')) + f.add_done_callback(fn) + self.assertEquals(('test',), self.callback_exception.args) + + def test_done_callback_already_cancelled(self): + self.was_cancelled = None + def fn(callback_future): + self.was_cancelled = callback_future.cancelled() + + f = Future() + self.assertTrue(f.cancel()) + f.add_done_callback(fn) + self.assertTrue(self.was_cancelled) def test_repr(self): - self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>') - self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>') - self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>') - self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '<Future state=cancelled>') - self.assertEqual(repr(EXCEPTION_FUTURE), - '<Future state=finished raised IOError>') - self.assertEqual(repr(SUCCESSFUL_FUTURE), - '<Future state=finished returned int>') - - create_future + self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=pending>', + repr(PENDING_FUTURE))) + self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=running>', + repr(RUNNING_FUTURE))) + self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>', + repr(CANCELLED_FUTURE))) + self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>', + repr(CANCELLED_AND_NOTIFIED_FUTURE))) + self.assertTrue(re.match( + '<Future at 0x[0-9a-f]+L? state=finished raised IOError>', + repr(EXCEPTION_FUTURE))) + self.assertTrue(re.match( + '<Future at 0x[0-9a-f]+L? state=finished returned int>', + repr(SUCCESSFUL_FUTURE))) + def test_cancel(self): f1 = create_future(state=PENDING) @@ -588,13 +743,11 @@ class FutureTests(unittest.TestCase): self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) def test_result_with_success(self): + # TODO(brian@sweetapp.com): This test is timing dependant. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._result = 42 - f1._condition.notify_all() + f1.set_result(42) f1 = create_future(state=PENDING) t = threading.Thread(target=notification) @@ -603,12 +756,11 @@ class FutureTests(unittest.TestCase): self.assertEquals(f1.result(timeout=5), 42) def test_result_with_cancel(self): + # TODO(brian@sweetapp.com): This test is timing dependant. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - with f1._condition: - f1._state = CANCELLED - f1._condition.notify_all() + f1.cancel() f1 = create_future(state=PENDING) t = threading.Thread(target=notification) @@ -644,186 +796,16 @@ class FutureTests(unittest.TestCase): self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) -class FutureListTests(unittest.TestCase): - # FutureList.wait() is further tested by WaitsTest. - # FutureList.cancel() is tested by CancelTests. - def test_wait_RETURN_IMMEDIATELY(self): - f = futures.FutureList(futures=None, event_sink=None) - f.wait(return_when=futures.RETURN_IMMEDIATELY) - - def test_wait_timeout(self): - f = futures.FutureList([PENDING_FUTURE], - futures._base.ThreadEventSink()) - - for t in [futures.FIRST_COMPLETED, - futures.FIRST_EXCEPTION, - futures.ALL_COMPLETED]: - f.wait(timeout=0.1, return_when=t) - self.assertFalse(PENDING_FUTURE.done()) - - def test_wait_all_done(self): - f = futures.FutureList([CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - futures._base.ThreadEventSink()) - - f.wait(return_when=futures.ALL_COMPLETED) - - def test_filters(self): - fs = [PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE] - f = futures.FutureList(fs, None) - - self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) - self.assertEqual(list(f.cancelled_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE]) - self.assertEqual(list(f.done_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.successful_futures()), - [SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.exception_futures()), - [EXCEPTION_FUTURE]) - - def test_has_running_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_running_futures()) - self.assertTrue( - futures.FutureList([RUNNING_FUTURE], - None).has_running_futures()) - - def test_has_cancelled_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_cancelled_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_cancelled_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_cancelled_futures()) - - def test_has_done_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None).has_done_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_done_futures()) - - def test_has_successful_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE], - None).has_successful_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_successful_futures()) - - def test_has_exception_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE], - None).has_exception_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_exception_futures()) - - def test_get_item(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(f[0], PENDING_FUTURE) - self.assertEqual(f[1], RUNNING_FUTURE) - self.assertEqual(f[2], CANCELLED_FUTURE) - self.assertRaises(IndexError, f.__getitem__, 3) - - def test_len(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE], - None) - self.assertEqual(len(f), 3) - - def test_iter(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(list(iter(f)), fs) - - def test_contains(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None) - self.assertTrue(PENDING_FUTURE in f) - self.assertTrue(RUNNING_FUTURE in f) - self.assertFalse(CANCELLED_FUTURE in f) - - def test_repr(self): - pending = create_future(state=PENDING) - cancelled = create_future(state=CANCELLED) - cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) - running = create_future(state=RUNNING) - finished = create_future(state=FINISHED) - - f = futures.FutureList( - [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + - [CANCELLED_AND_NOTIFIED_FUTURE] + - [RUNNING_FUTURE] * 2 + - [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, - None) - - self.assertEqual(repr(f), - '<FutureList #futures=15 ' - '[#pending=4 #cancelled=3 #running=2 #finished=6]>') - def test_main(): - test_support.run_unittest(ProcessPoolCancelTests, - ThreadPoolCancelTests, - ProcessPoolExecutorTest, + test_support.run_unittest(ProcessPoolExecutorTest, ThreadPoolExecutorTest, ProcessPoolWaitTests, ThreadPoolWaitTests, + ProcessPoolAsCompletedTests, + ThreadPoolAsCompletedTests, FutureTests, - FutureListTests, ProcessPoolShutdownTest, ThreadPoolShutdownTest) if __name__ == "__main__": - test_main()
\ No newline at end of file + test_main() diff --git a/python3/crawl.py b/python3/crawl.py index 8d49e18..7135682 100644 --- a/python3/crawl.py +++ b/python3/crawl.py @@ -34,12 +34,14 @@ def download_urls_sequential(urls, timeout=60): def download_urls_with_executor(urls, executor, timeout=60): try: url_to_content = {} - fs = executor.run_to_futures( - (functools.partial(load_url, url, timeout) for url in urls), - timeout=timeout) - for future in fs.successful_futures(): - url = urls[future.index] - url_to_content[url] = future.result() + future_to_url = dict((executor.submit(load_url, url, timeout), url) + for url in urls) + + for future in futures.as_completed(future_to_url): + try: + url_to_content[future_to_url[future]] = future.result() + except: + pass return url_to_content finally: executor.shutdown() @@ -47,7 +49,7 @@ def download_urls_with_executor(urls, executor, timeout=60): def main(): for name, fn in [('sequential', functools.partial(download_urls_sequential, URLS)), - ('processes', + ('processes', functools.partial(download_urls_with_executor, URLS, futures.ProcessPoolExecutor(10))), @@ -62,4 +64,5 @@ def main(): len(url_map), len(URLS))) -main() +if __name__ == '__main__': + main() diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py index 86b67dc..8331d53 100644 --- a/python3/futures/__init__.py +++ b/python3/futures/__init__.py @@ -1,12 +1,18 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Execute computations asynchronously using threads or processes.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, - ALL_COMPLETED, RETURN_IMMEDIATELY, - CancelledError, TimeoutError, - Future, FutureList) -from futures.thread import ThreadPoolExecutor +from futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed) from futures.process import ProcessPoolExecutor +from futures.thread import ThreadPoolExecutor diff --git a/python3/futures/_base.py b/python3/futures/_base.py index 330423e..561f4d2 100644 --- a/python3/futures/_base.py +++ b/python3/futures/_base.py @@ -1,7 +1,9 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. __author__ = 'Brian Quinlan (brian@sweetapp.com)' +import collections import functools import logging import threading @@ -10,14 +12,13 @@ import time FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' -RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' RUNNING = 'RUNNING' # The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and ThreadEventSink.add_cancelled() was called by a worker. +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' FINISHED = 'FINISHED' @@ -39,86 +40,52 @@ _STATE_TO_DESCRIPTION_MAP = { # Logger for internal use by the futures package. LOGGER = logging.getLogger("futures") -_handler = logging.StreamHandler() -LOGGER.addHandler(_handler) -del _handler - -def set_future_exception(future, event_sink, exception): - """Sets a future as having terminated with an exception. - - This function should only be used within the futures package. - - Args: - future: The Future that finished with an exception. - event_sink: The ThreadEventSink accociated with the Future's FutureList. - The event_sink will be notified of the Future's completion, which - may unblock some clients that have called FutureList.wait(). - exception: The expection that executing the Future raised. - """ - with future._condition: - future._exception = exception - with event_sink._condition: - future._state = FINISHED - event_sink.add_exception() - future._condition.notify_all() - -def set_future_result(future, event_sink, result): - """Sets a future as having terminated without exception. - - This function should only be used within the futures package. - - Args: - future: The Future that completed. - event_sink: The ThreadEventSink accociated with the Future's FutureList. - The event_sink will be notified of the Future's completion, which - may unblock some clients that have called FutureList.wait(). - result: The value returned by the Future. - """ - with future._condition: - future._result = result - with event_sink._condition: - future._state = FINISHED - event_sink.add_result() - future._condition.notify_all() +STDERR_HANDLER = logging.StreamHandler() +LOGGER.addHandler(STDERR_HANDLER) class Error(Exception): + """Base class for all future-related exceptions.""" pass class CancelledError(Error): + """The Future was cancelled.""" pass class TimeoutError(Error): + """The operation exceeded the given deadline.""" pass -class _WaitTracker(object): - """Provides the event that FutureList.wait(...) blocks on. - - """ +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" def __init__(self): self.event = threading.Event() + self.finished_futures = [] - def add_result(self): - raise NotImplementedError() + def add_result(self, future): + self.finished_futures.append(future) - def add_exception(self): - raise NotImplementedError() + def add_exception(self, future): + self.finished_futures.append(future) - def add_cancelled(self): - raise NotImplementedError() - -class _FirstCompletedWaitTracker(_WaitTracker): - """Used by wait(return_when=FIRST_COMPLETED).""" + def add_cancelled(self, future): + self.finished_futures.append(future) - def add_result(self): +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" + + def add_result(self, future): + super().add_result(future) self.event.set() - def add_exception(self): + def add_exception(self, future): + super().add_exception(future) self.event.set() - def add_cancelled(self): + def add_cancelled(self, future): + super().add_cancelled(future) self.event.set() -class _AllCompletedWaitTracker(_WaitTracker): +class _AllCompletedWaiter(_Waiter): """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" def __init__(self, num_pending_calls, stop_on_exception): @@ -126,87 +93,196 @@ class _AllCompletedWaitTracker(_WaitTracker): self.stop_on_exception = stop_on_exception super().__init__() - def add_result(self): + def _decrement_pending_calls(self): self.num_pending_calls -= 1 if not self.num_pending_calls: self.event.set() - def add_exception(self): + def add_result(self, future): + super().add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super().add_exception(future) if self.stop_on_exception: self.event.set() else: - self.add_result() + self._decrement_pending_calls() + + def add_cancelled(self, future): + super().add_cancelled(future) + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) - def add_cancelled(self): - self.add_result() + for f in fs: + f._waiters.append(waiter) -class ThreadEventSink(object): - """Forwards events to many _WaitTrackers. + return waiter - Each FutureList has a ThreadEventSink and each call to FutureList.wait() - causes a new _WaitTracker to be added to the ThreadEventSink. This design - allows many threads to call FutureList.wait() on the same FutureList with - different arguments. +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. - This class should not be used by clients. + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. """ - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] + if timeout is not None: + end_time = timeout + time.time() + + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = set(fs) - finished + waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) + + try: + for future in finished: + yield future + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.time() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), len(fs))) - def add(self, e): - self._waiters.append(e) + waiter.event.wait(timeout) - def remove(self, e): - self._waiters.remove(e) + for future in waiter.finished_futures[:]: + yield future + waiter.finished_futures.remove(future) + pending.remove(future) - def add_result(self): - for waiter in self._waiters: - waiter.add_result() + finally: + for f in fs: + f._waiters.remove(waiter) - def add_exception(self): - for waiter in self._waiters: - waiter.add_exception() +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. - def add_cancelled(self): - for waiter in self._waiters: - waiter.add_cancelled() + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) class Future(object): """Represents the result of an asynchronous computation.""" - # Transitions into the CANCELLED_AND_NOTIFIED and FINISHED states trigger notifications to the ThreadEventSink - # belonging to the Future's FutureList and must be made with ThreadEventSink._condition held to prevent a race - # condition when the transition is made concurrently with the addition of a new _WaitTracker to the ThreadEventSink. - # Other state transitions need only have the Future._condition held. - # When ThreadEventSink._condition and Future._condition must both be held then Future._condition is always acquired - # first. - - def __init__(self, index): + def __init__(self): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None - self._index = index + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) def __repr__(self): with self._condition: if self._state == FINISHED: if self._exception: - return '<Future state=%s raised %s>' % ( + return '<Future at %s state=%s raised %s>' % ( + hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._exception.__class__.__name__) else: - return '<Future state=%s returned %s>' % ( + return '<Future at %s state=%s returned %s>' % ( + hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__) - return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] - - @property - def index(self): - """The index of the future in its FutureList.""" - return self._index + return '<Future at %s state=%s>' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state]) def cancel(self): """Cancel the future if possible. @@ -218,10 +294,14 @@ class Future(object): if self._state in [RUNNING, FINISHED]: return False - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: - self._state = CANCELLED - self._condition.notify_all() - return True + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True def cancelled(self): """Return True if the future has cancelled.""" @@ -229,6 +309,7 @@ class Future(object): return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] def running(self): + """Return True if the future is currently executing.""" with self._condition: return self._state == RUNNING @@ -243,6 +324,23 @@ class Future(object): else: return self._result + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + fn(self) + def result(self, timeout=None): """Return the result of the call that the future represents. @@ -307,210 +405,109 @@ class Future(object): else: raise TimeoutError() + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. -class FutureList(object): - def __init__(self, futures, event_sink): - """Initializes the FutureList. Should not be called by clients.""" - self._futures = futures - self._event_sink = event_sink + Should only be used by Executor implementations and unit tests. - def wait(self, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the list to complete. + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting (this is not likely - to be a useful option but it is there to - be symmetrical with the - executor.run_to_futures() method. + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. Raises: - TimeoutError: If the wait condition wasn't satisfied before the - given timeout. + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. """ - if return_when == RETURN_IMMEDIATELY: - return - - # Futures cannot change state without this condition being held. - with self._event_sink._condition: - # Make a quick exit if every future is already done. This check is - # necessary because, if every future is in the - # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will - # never receive any events. - if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - for f in self): - return - - if return_when == FIRST_COMPLETED: - completed_tracker = _FirstCompletedWaitTracker() + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True else: - # Calculate how many events are expected before every future - # is complete. This can be done without holding the futures' - # locks because a future cannot transition itself into either - # of the states being looked for. - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] - for f in self) - - if return_when == FIRST_EXCEPTION: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=False) - - self._event_sink.add(completed_tracker) + LOGGER.critical('Future %s in unexpected state: %s', + id(self.future), + self.future._state) + raise RuntimeError('Future in unexpected state') - try: - completed_tracker.event.wait(timeout) - finally: - self._event_sink.remove(completed_tracker) + def set_result(self, result): + """Sets the return value of work associated with the future. - def cancel(self, timeout=None): - """Cancel the futures in the list. + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. + def set_exception(self, exception): + """Sets the result of the future as being the given exception. - Raises: - TimeoutError: If all the futures were not finished before the - given timeout. + Should only be used by Executor implementations and unit tests. """ - for f in self: - f.cancel() - self.wait(timeout=timeout, return_when=ALL_COMPLETED) - if any(not f.done() for f in self): - raise TimeoutError() - - def has_running_futures(self): - """Returns True if any futures in the list are still running.""" - return any(self.running_futures()) - - def has_cancelled_futures(self): - """Returns True if any futures in the list were cancelled.""" - return any(self.cancelled_futures()) - - def has_done_futures(self): - """Returns True if any futures in the list are finished or cancelled.""" - return any(self.done_futures()) - - def has_successful_futures(self): - """Returns True if any futures in the list finished without raising.""" - return any(self.successful_futures()) - - def has_exception_futures(self): - """Returns True if any futures in the list finished by raising.""" - return any(self.exception_futures()) - - def cancelled_futures(self): - """Returns all cancelled futures in the list.""" - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) - - def done_futures(self): - """Returns all futures in the list that are finished or cancelled.""" - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) - - def successful_futures(self): - """Returns all futures in the list that finished without raising.""" - return (f for f in self - if f._state == FINISHED and f._exception is None) - - def exception_futures(self): - """Returns all futures in the list that finished by raising.""" - return (f for f in self - if f._state == FINISHED and f._exception is not None) - - def running_futures(self): - """Returns all futures in the list that are still running.""" - return (f for f in self if f._state == RUNNING) - - def __len__(self): - return len(self._futures) - - def __getitem__(self, i): - return self._futures[i] - - def __iter__(self): - return iter(self._futures) - - def __contains__(self, future): - return future in self._futures - - def __repr__(self): - states = {state: 0 for state in _FUTURE_STATES} - for f in self: - states[f._state] += 1 - - return ('<FutureList #futures=%d ' - '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % ( - len(self), - states[PENDING], - states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], - states[RUNNING], - states[FINISHED])) + with self._condition: + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() class Executor(object): - """This is an abstract base class for conrete asynchronous executors.""" - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - """Return a list of futures representing the given calls. + """This is an abstract base class for concrete asynchronous executors.""" - Args: - calls: A sequence of callables that take no arguments. These will - be bound to Futures and returned. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting. + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. Returns: - A FutureList containing Futures for the given calls. + A Future representing the given call. """ raise NotImplementedError() - def run_to_results(self, calls, timeout=None): - """Returns a iterator of the results of the given calls. + def map(self, fn, *iterables, timeout=None): + """Returns a iterator equivalent to map(fn, iter). Args: - calls: A sequence of callables that take no arguments. These will - be called and their results returned. + fn: A callable that will take take as many arguments as there are + passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: - An iterator over the results of the given calls. Equivalent to: - (call() for call in calls) but the calls may be evaluated - out-of-order. + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. Raises: - TimeoutError: If all the given calls were not completed before the - given timeout. - Exception: If any call() raises. + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. """ if timeout is not None: end_time = timeout + time.time() - fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) + fs = [self.submit(fn, *args) for args in zip(*iterables)] try: for future in fs: @@ -519,39 +516,25 @@ class Executor(object): else: yield future.result(end_time - time.time()) finally: - try: - fs.cancel(timeout=0) - except TimeoutError: - pass + for future in fs: + future.cancel() - def map(self, func, *iterables, timeout=None): - """Returns a iterator equivalent to map(fn, iter). + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. - Args: - func: A callable that will take take as many arguments as there - are passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. """ - calls = [functools.partial(func, *args) for args in zip(*iterables)] - return self.run_to_results(calls, timeout) - - def shutdown(self): - """Clean-up. No other methods can be called afterwards.""" - raise NotImplementedError() + pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown() + self.shutdown(wait=True) return False diff --git a/python3/futures/process.py b/python3/futures/process.py index e888a14..6de870f 100644 --- a/python3/futures/process.py +++ b/python3/futures/process.py @@ -1,4 +1,5 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Implements ProcessPoolExecutor. @@ -23,9 +24,8 @@ The follow diagram and text describe the data-flow through the system: | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ -Executor.run_to_futures() called: -- creates a uniquely numbered _WorkItem for each call and adds them to the - "Work Items" dict +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: @@ -45,12 +45,8 @@ Process #1..n: __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) import atexit +from futures import _base import queue import multiprocessing import threading @@ -63,7 +59,7 @@ import weakref # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the @@ -86,7 +82,7 @@ def _remove_dead_thread_references(): Should be called periodically to prevent memory leaks in scenarios such as: >>> while True: - >>> ... t = ThreadPoolExecutor(max_threads=5) + >>> ... t = ThreadPoolExecutor(max_workers=5) >>> ... t.map(int, ['1', '2', '3', '4', '5']) """ for thread_reference in set(_thread_references): @@ -100,10 +96,11 @@ def _remove_dead_thread_references(): EXTRA_QUEUED_CALLS = 1 class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call + def __init__(self, future, fn, args, kwargs): self.future = future - self.completion_tracker = completion_tracker + self.fn = fn + self.args = args + self.kwargs = kwargs class _ResultItem(object): def __init__(self, work_id, exception=None, result=None): @@ -112,9 +109,11 @@ class _ResultItem(object): self.result = result class _CallItem(object): - def __init__(self, work_id, call): + def __init__(self, work_id, fn, args, kwargs): self.work_id = work_id - self.call = call + self.fn = fn + self.args = args + self.kwargs = kwargs def _process_worker(call_queue, result_queue, shutdown): """Evaluates calls from call_queue and places the results in result_queue. @@ -137,7 +136,7 @@ def _process_worker(call_queue, result_queue, shutdown): return else: try: - r = call_item.call() + r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: result_queue.put(_ResultItem(call_item.work_id, exception=e)) @@ -172,17 +171,15 @@ def _add_call_item_to_queue(pending_work_items, else: work_item = pending_work_items[work_id] - if work_item.future.cancelled(): - with work_item.future._condition: - work_item.future._condition.notify_all() - work_item.completion_tracker.add_cancelled() + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: del pending_work_items[work_id] continue - else: - with work_item.future._condition: - work_item.future._state = RUNNING - call_queue.put(_CallItem(work_id, work_item.call), - block=True) def _queue_manangement_worker(executor_reference, processes, @@ -243,34 +240,30 @@ def _queue_manangement_worker(executor_reference, del pending_work_items[result_item.work_id] if result_item.exception: - set_future_exception(work_item.future, - work_item.completion_tracker, - result_item.exception) + work_item.future.set_exception(result_item.exception) else: - set_future_result(work_item.future, - work_item.completion_tracker, - result_item.result) + work_item.future.set_result(result_item.result) -class ProcessPoolExecutor(Executor): - def __init__(self, max_processes=None): +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. Args: - max_processes: The maximum number of processes that can be used to + max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """ _remove_dead_thread_references() - if max_processes is None: - self._max_processes = multiprocessing.cpu_count() + if max_workers is None: + self._max_workers = multiprocessing.cpu_count() else: - self._max_processes = max_processes + self._max_workers = max_workers # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_processes + + self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) self._result_queue = multiprocessing.Queue() self._work_ids = queue.Queue() @@ -300,7 +293,7 @@ class ProcessPoolExecutor(Executor): _thread_references.add(weakref.ref(self._queue_management_thread)) def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_processes): + for _ in range(len(self._processes), self._max_workers): p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, @@ -309,30 +302,36 @@ class ProcessPoolExecutor(Executor): p.start() self._processes.add(p) - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown_thread: - raise RuntimeError('cannot run new futures after shutdown') + raise RuntimeError('cannot schedule new futures after shutdown') - futures = [] - event_sink = ThreadEventSink() + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) - for index, call in enumerate(calls): - f = Future(index) - self._pending_work_items[self._queue_count] = _WorkItem( - call, f, event_sink) - self._work_ids.put(self._queue_count) - futures.append(f) - self._queue_count += 1 + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 self._start_queue_management_thread() self._adjust_process_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl + return f + submit.__doc__ = _base.Executor.submit.__doc__ - def shutdown(self): + def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True + if wait: + if self._queue_management_thread: + self._queue_management_thread.join() + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._shutdown_process_event = None + self._processes = None + shutdown.__doc__ = _base.Executor.shutdown.__doc__ -atexit.register(_python_exit)
\ No newline at end of file +atexit.register(_python_exit) diff --git a/python3/futures/thread.py b/python3/futures/thread.py index 851a548..a2d96bf 100644 --- a/python3/futures/thread.py +++ b/python3/futures/thread.py @@ -1,16 +1,12 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Implements ThreadPoolExecutor.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - LOGGER, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) import atexit +from futures import _base import queue import threading import weakref @@ -22,7 +18,7 @@ import weakref # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the @@ -45,8 +41,8 @@ def _remove_dead_thread_references(): Should be called periodically to prevent memory leaks in scenarios such as: >>> while True: - >>> ... t = ThreadPoolExecutor(max_threads=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) + ... t = ThreadPoolExecutor(max_workers=5) + ... t.map(int, ['1', '2', '3', '4', '5']) """ for thread_reference in set(_thread_references): if thread_reference() is None: @@ -55,32 +51,22 @@ def _remove_dead_thread_references(): atexit.register(_python_exit) class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call + def __init__(self, future, fn, args, kwargs): self.future = future - self.completion_tracker = completion_tracker + self.fn = fn + self.args = args + self.kwargs = kwargs def run(self): - with self.future._condition: - if self.future._state == PENDING: - self.future._state = RUNNING - elif self.future._state == CANCELLED: - with self.completion_tracker._condition: - self.future._state = CANCELLED_AND_NOTIFIED - self.completion_tracker.add_cancelled() - return - else: - LOGGER.critical('Future %s in unexpected state: %d', - id(self.future), - self.future._state) - return + if not self.future.set_running_or_notify_cancel(): + return try: - result = self.call() + result = self.fn(*self.args, **self.kwargs) except BaseException as e: - set_future_exception(self.future, self.completion_tracker, e) + self.future.set_exception(e) else: - set_future_result(self.future, self.completion_tracker, result) + self.future.set_result(result) def _worker(executor_reference, work_queue): try: @@ -99,27 +85,41 @@ def _worker(executor_reference, work_queue): else: work_item.run() except BaseException as e: - LOGGER.critical('Exception in worker', exc_info=True) + _base.LOGGER.critical('Exception in worker', exc_info=True) -class ThreadPoolExecutor(Executor): - def __init__(self, max_threads): +class ThreadPoolExecutor(_base.Executor): + def __init__(self, max_workers): """Initializes a new ThreadPoolExecutor instance. Args: - max_threads: The maximum number of threads that can be used to + max_workers: The maximum number of threads that can be used to execute the given calls. """ _remove_dead_thread_references() - self._max_threads = max_threads + self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + def _adjust_thread_count(self): - for _ in range(len(self._threads), - min(self._max_threads, self._work_queue.qsize())): + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self), self._work_queue)) t.daemon = True @@ -127,26 +127,10 @@ class ThreadPoolExecutor(Executor): self._threads.add(t) _thread_references.add(weakref.ref(t)) - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('cannot run new futures after shutdown') - - futures = [] - event_sink = ThreadEventSink() - for index, call in enumerate(calls): - f = Future(index) - w = _WorkItem(call, f, event_sink) - self._work_queue.put(w) - futures.append(f) - - self._adjust_thread_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl - run_to_futures.__doc__ = Executor.run_to_futures.__doc__ - - def shutdown(self): + def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True - shutdown.__doc__ = Executor.shutdown.__doc__ + if wait: + for t in self._threads: + t.join() + shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/python3/moprocessmoproblems.py b/python3/moprocessmoproblems.py deleted file mode 100644 index 18bfc39..0000000 --- a/python3/moprocessmoproblems.py +++ /dev/null @@ -1,59 +0,0 @@ -import multiprocessing -import queue - -def _process_worker(q): - while True: - try: - something = q.get(block=True, timeout=0.1) - except queue.Empty: - return - else: - print('Grabbed item from queue:', something) - - -def _make_some_processes(q): - processes = [] - for _ in range(10): - p = multiprocessing.Process(target=_process_worker, args=(q,)) - p.start() - processes.append(p) - return processes - -def _do(i): - print('Run:', i) - q = multiprocessing.Queue() - print('Created queue') - for j in range(30): - q.put(i*30+j) - processes = _make_some_processes(q) - print('Created processes') - - while not q.empty(): - pass - print('Q is empty') - - # Without the two following commented lines, the output on Mac OS 10.5 (the - # output is as expected on Linux) will be: - # Run: 0 - # Created queue - # Grabbed item from queue: 0 - # ... - # Grabbed item from queue: 29 - # Created processes - # Q is empty - # Run: 1 - # Created queue - # Grabbed item from queue: 30 - # ... - # Grabbed item from queue: 59 - # Created processes - # Q is empty - # Run: 2 - # Created queue - # Created processes - # <no further output> -# for p in processes: -# p.join() - -for i in range(100): - _do(i)
\ No newline at end of file diff --git a/python3/primes.py b/python3/primes.py index 20b5202..5152fcb 100644 --- a/python3/primes.py +++ b/python3/primes.py @@ -43,4 +43,5 @@ def main(): else: print('%.2f seconds' % (time.time() - start)) -main()
\ No newline at end of file +if __name__ == '__main__': + main() diff --git a/python3/test_futures.py b/python3/test_futures.py index b7dee65..98eea27 100644 --- a/python3/test_futures.py +++ b/python3/test_futures.py @@ -1,17 +1,24 @@ -import test.support - -import unittest +import io +import logging +import multiprocessing +import sys import threading +import test.support import time -import multiprocessing +import unittest + +if sys.platform.startswith('win'): + import ctypes + import ctypes.wintypes import futures -import futures._base from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, + LOGGER, STDERR_HANDLER, wait) +import futures.process def create_future(state=PENDING, exception=None, result=None): - f = Future(0) + f = Future() f._state = state f._exception = exception f._result = result @@ -24,68 +31,104 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) +def mul(x, y): + return x * y + class Call(object): + """A call that can be submitted to a future.Executor for testing. + + The call signals when it is called and waits for an event before finishing. + """ CALL_LOCKS = {} + def _create_event(self): + if sys.platform.startswith('win'): + class SECURITY_ATTRIBUTES(ctypes.Structure): + _fields_ = [("nLength", ctypes.wintypes.DWORD), + ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), + ("bInheritHandle", ctypes.wintypes.BOOL)] + + s = SECURITY_ATTRIBUTES() + s.nLength = ctypes.sizeof(s) + s.lpSecurityDescriptor = None + s.bInheritHandle = True + + handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), + True, + False, + None) + assert handle is not None + return handle + else: + event = multiprocessing.Event() + self.CALL_LOCKS[id(event)] = event + return id(event) + + def _wait_on_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) + assert r == 0 + else: + self.CALL_LOCKS[handle].wait() + + def _signal_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.SetEvent(handle) + assert r != 0 + else: + self.CALL_LOCKS[handle].set() + def __init__(self, manual_finish=False, result=42): - called_event = multiprocessing.Event() - can_finish = multiprocessing.Event() + self._called_event = self._create_event() + self._can_finish = self._create_event() self._result = result - self._called_event_id = id(called_event) - self._can_finish_event_id = id(can_finish) - - self.CALL_LOCKS[self._called_event_id] = called_event - self.CALL_LOCKS[self._can_finish_event_id] = can_finish if not manual_finish: - self._can_finish.set() - - @property - def _can_finish(self): - return self.CALL_LOCKS[self._can_finish_event_id] - - @property - def _called_event(self): - return self.CALL_LOCKS[self._called_event_id] + self._signal_event(self._can_finish) def wait_on_called(self): - self._called_event.wait() + self._wait_on_event(self._called_event) def set_can(self): - self._can_finish.set() - - def called(self): - return self._called_event.is_set() + self._signal_event(self._can_finish) def __call__(self): - if self._called_event.is_set(): print('called twice') + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) - self._called_event.set() - self._can_finish.wait() return self._result def close(self): - del self.CALL_LOCKS[self._called_event_id] - del self.CALL_LOCKS[self._can_finish_event_id] + self.set_can() + if sys.platform.startswith('win'): + ctypes.windll.kernel32.CloseHandle(self._called_event) + ctypes.windll.kernel32.CloseHandle(self._can_finish) + else: + del self.CALL_LOCKS[self._called_event] + del self.CALL_LOCKS[self._can_finish] class ExceptionCall(Call): def __call__(self): - assert not self._called_event.is_set(), 'already called' - - self._called_event.set() - self._can_finish.wait() + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) raise ZeroDivisionError() +class MapCall(Call): + def __init__(self, result=42): + super().__init__(manual_finish=True, result=result) + + def __call__(self, manual_finish): + if manual_finish: + super().__call__() + return self._result + class ExecutorShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): - call1 = Call() - try: - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.run_to_futures, - [call1]) - finally: - call1.close() + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.submit, + pow, 2, 5) + def _start_some_futures(self): call1 = Call(manual_finish=True) @@ -93,13 +136,14 @@ class ExecutorShutdownTest(unittest.TestCase): call3 = Call(manual_finish=True) try: - self.executor.run_to_futures([call1, call2, call3], - return_when=futures.RETURN_IMMEDIATELY) - + self.executor.submit(call1) + self.executor.submit(call2) + self.executor.submit(call3) + call1.wait_on_called() call2.wait_on_called() call3.wait_on_called() - + call1.set_can() call2.set_can() call3.set_can() @@ -110,10 +154,10 @@ class ExecutorShutdownTest(unittest.TestCase): class ThreadPoolShutdownTest(ExecutorShutdownTest): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=5) + self.executor = futures.ThreadPoolExecutor(max_workers=5) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) def test_threads_terminate(self): self._start_some_futures() @@ -123,7 +167,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): t.join() def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_threads=5) as e: + with futures.ThreadPoolExecutor(max_workers=5) as e: executor = e self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) @@ -132,7 +176,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): t.join() def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_threads=5) + executor = futures.ThreadPoolExecutor(max_workers=5) executor.map(abs, range(-5, 5)) threads = executor._threads del executor @@ -142,32 +186,31 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest): class ProcessPoolShutdownTest(ExecutorShutdownTest): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=5) + self.executor = futures.ProcessPoolExecutor(max_workers=5) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) def test_processes_terminate(self): self._start_some_futures() self.assertEqual(len(self.executor._processes), 5) + processes = self.executor._processes self.executor.shutdown() - self.executor._queue_management_thread.join() - for p in self.executor._processes: + for p in processes: p.join() def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_processes=5) as e: + with futures.ProcessPoolExecutor(max_workers=5) as e: executor = e self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - executor._queue_management_thread.join() for p in self.executor._processes: p.join() def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_processes=5) + executor = futures.ProcessPoolExecutor(max_workers=5) list(executor.map(abs, range(-5, 5))) queue_management_thread = executor._queue_management_thread processes = executor._processes @@ -177,313 +220,317 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): for p in processes: p.join() -class WaitsTest(unittest.TestCase): - def test_concurrent_waits(self): - def wait_for_ALL_COMPLETED(): - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertTrue(f3.done()) - self.assertTrue(f4.done()) - all_completed.release() - - def wait_for_FIRST_COMPLETED(): - fs.wait(return_when=futures.FIRST_COMPLETED) - self.assertTrue(f1.done()) - self.assertFalse(f2.done()) # XXX - self.assertFalse(f3.done()) - self.assertFalse(f4.done()) - first_completed.release() - - def wait_for_FIRST_EXCEPTION(): - fs.wait(return_when=futures.FIRST_EXCEPTION) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertFalse(f3.done()) # XXX - self.assertFalse(f4.done()) - first_exception.release() - - all_completed = threading.Semaphore(0) - first_completed = threading.Semaphore(0) - first_exception = threading.Semaphore(0) +class WaitTests(unittest.TestCase): + def test_first_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call() - + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - threads = [] - for wait_test in [wait_for_ALL_COMPLETED, - wait_for_FIRST_COMPLETED, - wait_for_FIRST_EXCEPTION]: - t = threading.Thread(target=wait_test) - t.start() - threads.append(t) - - time.sleep(1) # give threads enough time to execute wait - - call1.set_can() - first_completed.acquire() - call2.set_can() - first_exception.acquire() - call3.set_can() - all_completed.acquire() - - self.executor.shutdown() + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) + + self.assertEquals(set([future1]), done) + self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) finally: call1.close() call2.close() - call3.close() - call4.close() -class ThreadPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + def test_first_completed_one_already_completed(self): + call1 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) - def tearDown(self): - self.executor.shutdown() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) -class ProcessPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() - def tearDown(self): - self.executor.shutdown() + def test_first_exception(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() -class CancelTests(unittest.TestCase): - def test_cancel_states(self): call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - + call2 = ExceptionCall(manual_finish=True) + call3 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertEqual(f1.cancel(), False) - self.assertEqual(f2.cancel(), True) - self.assertEqual(f4.cancel(), True) - self.assertEqual(f1.cancelled(), False) - self.assertEqual(f2.cancelled(), True) - self.assertEqual(f3.cancelled(), False) - self.assertEqual(f4.cancelled(), True) - self.assertEqual(f1.done(), False) - self.assertEqual(f2.done(), True) - self.assertEqual(f3.done(), False) - self.assertEqual(f4.done(), True) - - call1.set_can() - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertEqual(f1.result(), 42) - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - self.assertEqual(f3.result(), 42) - self.assertRaises(futures.CancelledError, f4.result) - self.assertRaises(futures.CancelledError, f4.exception) - - self.assertEqual(call2.called(), False) - self.assertEqual(call4.called(), False) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set([future3]), pending) finally: call1.close() call2.close() call3.close() - call4.close() - def test_wait_for_individual_cancel_while_waiting(self): - def end_call(): - # Wait until the main thread is waiting on the results of the - # future. - time.sleep(1) - f2.cancel() + def test_first_exception_some_already_complete(self): + def wait_test(): + while not future1._waiters: + pass call1.set_can() - call1 = Call(manual_finish=True) - call2 = Call() - + call1 = ExceptionCall(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2 = fs - - call1.wait_on_called() - t = threading.Thread(target=end_call) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) t.start() - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - t.join() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) + + finally: call1.close() call2.close() - def test_wait_with_already_cancelled_futures(self): + def test_first_exception_one_already_failed(self): call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() + try: + future1 = self.executor.submit(call1) + + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) + self.assertEquals(set([EXCEPTION_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() + + def test_all_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertTrue(f2.cancel()) - self.assertTrue(f3.cancel()) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set(), pending) + + + finally: + call1.close() + call2.close() + + def test_all_completed_some_already_completed(self): + def wait_test(): + while not future1._waiters: + pass + + future4.cancel() call1.set_can() - - fs.wait(return_when=futures.ALL_COMPLETED) + call2.set_can() + call3.set_can() + + self.assertLessEqual( + futures.process.EXTRA_QUEUED_CALLS, + 1, + 'this test assumes that future4 will be cancelled before it is ' + 'queued to run - which might not be the case if ' + 'ProcessPoolExecutor is too aggresive in scheduling futures') + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + future4 = self.executor.submit(call4) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4]), + finished) + self.assertEquals(set(), pending) finally: call1.close() call2.close() call3.close() call4.close() - def test_cancel_all(self): - call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() + def test_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) - call1.set_can() - fs.cancel() - - self.assertFalse(f1.cancelled()) - self.assertTrue(f2.cancelled()) - self.assertTrue(f3.cancelled()) - self.assertTrue(f4.cancelled()) + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=1, + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEquals(set([future2]), pending) + + finally: call1.close() call2.close() - call3.close() - call4.close() -class ThreadPoolCancelTests(CancelTests): + +class ThreadPoolWaitTests(WaitTests): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + self.executor = futures.ThreadPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) -class ProcessPoolCancelTests(WaitsTest): +class ProcessPoolWaitTests(WaitTests): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.executor = futures.ProcessPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_run_to_futures(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) - call4 = Call() - call5 = Call() + self.executor.shutdown(wait=True) + +class AsCompletedTests(unittest.TestCase): + # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. + def test_no_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) try: - f1, f2, f3, f4, f5 = self.executor.run_to_futures( - [call1, call2, call3, call4, call5], - return_when=futures.RETURN_IMMEDIATELY) - - call3.wait_on_called() + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) - # ProcessPoolExecutor uses a thread to propogate results into the - # future. Calling result() ensures that the thread has done its work - # before doing the next set of checks. - f1.result() - f2.result() - - self.assertTrue(f1.done()) - self.assertFalse(f1.running()) - self.assertEqual(f1.index, 0) - - self.assertTrue(f2.done()) - self.assertFalse(f2.running()) - self.assertEqual(f2.index, 1) - - self.assertFalse(f3.done()) - self.assertTrue(f3.running()) - self.assertEqual(f3.index, 2) - - # ProcessPoolExecutor may mark some futures as running before they - # actually are so don't check these ones. - self.assertFalse(f4.done()) - self.assertEqual(f4.index, 3) - - self.assertFalse(f5.done()) - self.assertEqual(f5.index, 4) + t = threading.Thread(target=wait_test) + t.start() + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEquals(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) finally: - call3.set_can() # Let the call finish executing. call1.close() call2.close() - call3.close() - call4.close() - call5.close() - def test_run_to_results(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(result=3) + def test_zero_timeout(self): + call1 = Call(manual_finish=True) try: - self.assertEqual( - list(self.executor.run_to_results([call1, call2, call3])), - [1, 2, 3]) + future1 = self.executor.submit(call1) + completed_futures = set() + try: + for future in futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1], + timeout=0): + completed_futures.add(future) + except futures.TimeoutError: + pass + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + completed_futures) finally: call1.close() - call2.close() - call3.close() - def test_run_to_results_exception(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = ExceptionCall() - try: - i = self.executor.run_to_results([call1, call2, call3]) - - self.assertEqual(i.__next__(), 1) - self.assertEqual(i.__next__(), 2) - self.assertRaises(ZeroDivisionError, i.__next__) - finally: - call1.close() - call2.close() - call3.close() +class ThreadPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_workers=1) - def test_run_to_results_timeout(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) + def tearDown(self): + self.executor.shutdown(wait=True) - try: - i = self.executor.run_to_results([call1, call2, call3], timeout=1) - self.assertEqual(i.__next__(), 1) - self.assertEqual(i.__next__(), 2) - self.assertRaises(futures.TimeoutError, i.__next__) - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() +class ProcessPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_submit(self): + future = self.executor.submit(pow, 2, 8) + self.assertEquals(256, future.result()) + + def test_submit_keyword(self): + future = self.executor.submit(mul, 2, y=8) + self.assertEquals(16, future.result()) def test_map(self): self.assertEqual( @@ -496,36 +543,150 @@ class ExecutorTest(unittest.TestCase): self.assertEqual(i.__next__(), (0, 1)) self.assertRaises(ZeroDivisionError, i.__next__) + def test_map_timeout(self): + results = [] + timeout_call = MapCall() + try: + try: + for i in self.executor.map(timeout_call, + [False, False, True], + timeout=1): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + finally: + timeout_call.close() + + self.assertEquals([42, 42], results) + class ThreadPoolExecutorTest(ExecutorTest): def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) + self.executor = futures.ThreadPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) class ProcessPoolExecutorTest(ExecutorTest): def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) + self.executor = futures.ProcessPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) class FutureTests(unittest.TestCase): - # Future.index() is tested by ExecutorTest - # Future.cancel() is further tested by CancelTests. + def test_done_callback_with_result(self): + callback_result = None + def fn(callback_future): + nonlocal callback_result + callback_result = callback_future.result() + + f = Future() + f.add_done_callback(fn) + f.set_result(5) + self.assertEquals(5, callback_result) + + def test_done_callback_with_exception(self): + callback_exception = None + def fn(callback_future): + nonlocal callback_exception + callback_exception = callback_future.exception() + + f = Future() + f.add_done_callback(fn) + f.set_exception(Exception('test')) + self.assertEquals(('test',), callback_exception.args) + + def test_done_callback_with_cancel(self): + was_cancelled = None + def fn(callback_future): + nonlocal was_cancelled + was_cancelled = callback_future.cancelled() + + f = Future() + f.add_done_callback(fn) + self.assertTrue(f.cancel()) + self.assertTrue(was_cancelled) + + def test_done_callback_raises(self): + LOGGER.removeHandler(STDERR_HANDLER) + logging_stream = io.StringIO() + handler = logging.StreamHandler(logging_stream) + LOGGER.addHandler(handler) + try: + raising_was_called = False + fn_was_called = False + + def raising_fn(callback_future): + nonlocal raising_was_called + raising_was_called = True + raise Exception('doh!') + + def fn(callback_future): + nonlocal fn_was_called + fn_was_called = True + + f = Future() + f.add_done_callback(raising_fn) + f.add_done_callback(fn) + f.set_result(5) + self.assertTrue(raising_was_called) + self.assertTrue(fn_was_called) + self.assertIn('Exception: doh!', logging_stream.getvalue()) + finally: + LOGGER.removeHandler(handler) + LOGGER.addHandler(STDERR_HANDLER) + + def test_done_callback_already_successful(self): + callback_result = None + def fn(callback_future): + nonlocal callback_result + callback_result = callback_future.result() + + f = Future() + f.set_result(5) + f.add_done_callback(fn) + self.assertEquals(5, callback_result) + + def test_done_callback_already_failed(self): + callback_exception = None + def fn(callback_future): + nonlocal callback_exception + callback_exception = callback_future.exception() + + f = Future() + f.set_exception(Exception('test')) + f.add_done_callback(fn) + self.assertEquals(('test',), callback_exception.args) + + def test_done_callback_already_cancelled(self): + was_cancelled = None + def fn(callback_future): + nonlocal was_cancelled + was_cancelled = callback_future.cancelled() + + f = Future() + self.assertTrue(f.cancel()) + f.add_done_callback(fn) + self.assertTrue(was_cancelled) def test_repr(self): - self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>') - self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>') - self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>') - self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '<Future state=cancelled>') - self.assertEqual(repr(EXCEPTION_FUTURE), - '<Future state=finished raised IOError>') - self.assertEqual(repr(SUCCESSFUL_FUTURE), - '<Future state=finished returned int>') - - create_future + self.assertRegexpMatches(repr(PENDING_FUTURE), + '<Future at 0x[0-9a-f]+ state=pending>') + self.assertRegexpMatches(repr(RUNNING_FUTURE), + '<Future at 0x[0-9a-f]+ state=running>') + self.assertRegexpMatches(repr(CANCELLED_FUTURE), + '<Future at 0x[0-9a-f]+ state=cancelled>') + self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '<Future at 0x[0-9a-f]+ state=cancelled>') + self.assertRegexpMatches( + repr(EXCEPTION_FUTURE), + '<Future at 0x[0-9a-f]+ state=finished raised IOError>') + self.assertRegexpMatches( + repr(SUCCESSFUL_FUTURE), + '<Future at 0x[0-9a-f]+ state=finished returned int>') + def test_cancel(self): f1 = create_future(state=PENDING) @@ -590,13 +751,11 @@ class FutureTests(unittest.TestCase): self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) def test_result_with_success(self): + # TODO(brian@sweetapp.com): This test is timing dependant. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._result = 42 - f1._condition.notify_all() + f1.set_result(42) f1 = create_future(state=PENDING) t = threading.Thread(target=notification) @@ -605,12 +764,11 @@ class FutureTests(unittest.TestCase): self.assertEquals(f1.result(timeout=5), 42) def test_result_with_cancel(self): + # TODO(brian@sweetapp.com): This test is timing dependant. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - with f1._condition: - f1._state = CANCELLED - f1._condition.notify_all() + f1.cancel() f1 = create_future(state=PENDING) t = threading.Thread(target=notification) @@ -646,186 +804,16 @@ class FutureTests(unittest.TestCase): self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) -class FutureListTests(unittest.TestCase): - # FutureList.wait() is further tested by WaitsTest. - # FutureList.cancel() is tested by CancelTests. - def test_wait_RETURN_IMMEDIATELY(self): - f = futures.FutureList(futures=None, event_sink=None) - f.wait(return_when=futures.RETURN_IMMEDIATELY) - - def test_wait_timeout(self): - f = futures.FutureList([PENDING_FUTURE], - futures._base.ThreadEventSink()) - - for t in [futures.FIRST_COMPLETED, - futures.FIRST_EXCEPTION, - futures.ALL_COMPLETED]: - f.wait(timeout=0.1, return_when=t) - self.assertFalse(PENDING_FUTURE.done()) - - def test_wait_all_done(self): - f = futures.FutureList([CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - futures._base.ThreadEventSink()) - - f.wait(return_when=futures.ALL_COMPLETED) - - def test_filters(self): - fs = [PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE] - f = futures.FutureList(fs, None) - - self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) - self.assertEqual(list(f.cancelled_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE]) - self.assertEqual(list(f.done_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.successful_futures()), - [SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.exception_futures()), - [EXCEPTION_FUTURE]) - - def test_has_running_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_running_futures()) - self.assertTrue( - futures.FutureList([RUNNING_FUTURE], - None).has_running_futures()) - - def test_has_cancelled_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_cancelled_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_cancelled_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_cancelled_futures()) - - def test_has_done_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None).has_done_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_done_futures()) - - def test_has_successful_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE], - None).has_successful_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_successful_futures()) - - def test_has_exception_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE], - None).has_exception_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_exception_futures()) - - def test_get_item(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(f[0], PENDING_FUTURE) - self.assertEqual(f[1], RUNNING_FUTURE) - self.assertEqual(f[2], CANCELLED_FUTURE) - self.assertRaises(IndexError, f.__getitem__, 3) - - def test_len(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE], - None) - self.assertEqual(len(f), 3) - - def test_iter(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(list(iter(f)), fs) - - def test_contains(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None) - self.assertTrue(PENDING_FUTURE in f) - self.assertTrue(RUNNING_FUTURE in f) - self.assertFalse(CANCELLED_FUTURE in f) - - def test_repr(self): - pending = create_future(state=PENDING) - cancelled = create_future(state=CANCELLED) - cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) - running = create_future(state=RUNNING) - finished = create_future(state=FINISHED) - - f = futures.FutureList( - [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + - [CANCELLED_AND_NOTIFIED_FUTURE] + - [RUNNING_FUTURE] * 2 + - [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, - None) - - self.assertEqual(repr(f), - '<FutureList #futures=15 ' - '[#pending=4 #cancelled=3 #running=2 #finished=6]>') - def test_main(): - test.support.run_unittest(ProcessPoolCancelTests, - ThreadPoolCancelTests, - ProcessPoolExecutorTest, + test.support.run_unittest(ProcessPoolExecutorTest, ThreadPoolExecutorTest, ProcessPoolWaitTests, ThreadPoolWaitTests, + ProcessPoolAsCompletedTests, + ThreadPoolAsCompletedTests, FutureTests, - FutureListTests, ProcessPoolShutdownTest, ThreadPoolShutdownTest) if __name__ == "__main__": - test_main()
\ No newline at end of file + test_main() |