diff options
author | brian.quinlan <devnull@localhost> | 2010-06-01 15:03:43 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2010-06-01 15:03:43 +0000 |
commit | dc546660bb529d1cbeb80d3f6b918f7f687ffb51 (patch) | |
tree | b5ccb5d2a78eef81d762bab75ca3d85d4d84a8c9 | |
parent | 9661177078488305ddf2d36cd849f9330775377c (diff) | |
download | futures-dc546660bb529d1cbeb80d3f6b918f7f687ffb51.tar.gz |
Changes based on the last round of python-dev feedback.
-rw-r--r-- | docs/index.rst | 67 | ||||
-rw-r--r-- | python3/futures/_base.py | 78 | ||||
-rw-r--r-- | python3/futures/process.py | 2 | ||||
-rw-r--r-- | python3/futures/thread.py | 2 | ||||
-rw-r--r-- | python3/test_futures.py | 26 |
5 files changed, 71 insertions, 104 deletions
diff --git a/docs/index.rst b/docs/index.rst index 4078fe6..e7d4378 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -5,7 +5,7 @@ :synopsis: Execute computations asynchronously using threads or processes. The :mod:`futures` module provides a high-level interface for asynchronously -executing functions and methods. +executing callables. The asynchronous execution can be be performed by threads using :class:`ThreadPoolExecutor` or seperate processes using @@ -22,7 +22,7 @@ subclasses: :class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`. .. method:: Executor.submit(fn, *args, **kwargs) Schedules the callable to be executed as *fn*(*\*args*, *\*\*kwargs*) and - returns a :class:`Future` representing the execution of the function. + returns a :class:`Future` representing the execution of the callable. :: @@ -48,13 +48,17 @@ subclasses: :class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`. :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will raise :exc:`RuntimeError`. - If *wait* is `True` then the executor will not return until all the pending + If *wait* is `True` then this method will not return until all the pending futures are done executing and the resources associated with the executor - have been freed. + have been freed. If *wait* is `False` then this method will return + immediately and the resources associated with the executor will be freed + when all pending futures are done executing. Regardless of the value of + *wait*, the entire Python program will not exit until all pending futures + are done executing. - You can avoid having to call this method explicitly if you use the `with` - statement, which will shutdown the `Executor` (waiting as if - `Executor.shutdown` were called with *wait* set to `True`): + You can avoid having to call this method explicitly if you use the `with` + statement, which will shutdown the `Executor` (waiting as if + `Executor.shutdown` were called with *wait* set to `True`): :: @@ -195,9 +199,8 @@ ProcessPoolExecutor Example Future Objects -------------- -The :class:`Future` class encapulates the asynchronous execution of a function -or method call. :class:`Future` instances are created by -:meth:`Executor.submit`. +The :class:`Future` class encapulates the asynchronous execution of a callable. +:class:`Future` instances are created by :meth:`Executor.submit`. .. method:: Future.cancel() @@ -246,20 +249,13 @@ or method call. :class:`Future` instances are created by .. method:: Future.add_done_callback(fn) - Attaches the function *fn* to the future. *fn* will be called, with the + Attaches the callable *fn* to the future. *fn* will be called, with the future as its only argument, when the future is cancelled or finishes - running. + running. Added callables are called in the order that they were added and are + always called in a thread belonging to the process that added them. If the future has already completed or been cancelled then *fn* will be - called immediately. If the same function is added several times then it will - still only be called once. - -.. method:: Future.remove_done_callback(fn) - - Removes the function *fn*, which was previously attached to the future using - :meth:`Future.add_done_callback`. - - `KeyError` is raised if the function *fn* was not previously attached. + called immediately. Internal Future Methods ^^^^^^^^^^^^^^^^^^^^^^^ @@ -299,17 +295,18 @@ Module Functions .. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED) - Wait for the :class:`Future` instances given by *fs* to complete. Returns a - named 2-tuple of sets. The first set, named "finished", contains the futures - that completed (finished or were cancelled) before the wait completed. The - second set, named "not_finished", contains uncompleted futures. + Wait for the :class:`Future` instances (possibly created by different + :class:`Executor`s) given by *fs* to complete. Returns a named 2-tuple of + sets. The first set, named "done", contains the futures that completed + (finished or were cancelled) before the wait completed. The second set, named + "not_done", contains uncompleted futures. *timeout* can be used to control the maximum number of seconds to wait before returning. *timeout* can be an int or float. If *timeout* is not specified or ``None`` then there is no limit to the wait time. - *return_when* indicates when the method should return. It must be one of the - following constants: + *return_when* indicates when this function should return. It must be one of + the following constants: +-----------------------------+----------------------------------------+ | Constant | Description | @@ -329,11 +326,11 @@ Module Functions .. function:: as_completed(fs, timeout=None) - Returns an iterator over the :class:`Future` instances given by *fs* that - yields futures as they complete (finished or were cancelled). Any futures - that completed before :func:`as_completed()` was called will be yielded - first. The returned iterator raises a :exc:`TimeoutError` if - :meth:`__next__()` is called and the result isn't available after - *timeout* seconds from the original call to :func:`as_completed()`. *timeout* - can be an int or float. If *timeout* is not specified or ``None`` then there - is no limit to the wait time. + Returns an iterator over the :class:`Future` instances (possibly created + by different :class:`Executor`s)given by *fs* that yields futures as they + complete (finished or were cancelled). Any futures that completed before + :func:`as_completed()` was called will be yielded first. The returned + iterator raises a :exc:`TimeoutError` if :meth:`__next__()` is called and + the result isn't available after *timeout* seconds from the original call + to :func:`as_completed()`. *timeout* can be an int or float. If *timeout* + is not specified or ``None`` then there is no limit to the wait time. diff --git a/python3/futures/_base.py b/python3/futures/_base.py index c68b6f1..b275fbc 100644 --- a/python3/futures/_base.py +++ b/python3/futures/_base.py @@ -149,7 +149,8 @@ def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. Args: - fs: The sequence of Futures to iterate over. + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. @@ -196,16 +197,17 @@ def as_completed(fs, timeout=None): for f in fs: f._waiters.remove(waiter) -FinishedAndNotFinishedFutures = collections.namedtuple( - 'FinishedAndNotFinishedFutures', 'finished not_finished') +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') def wait(fs, timeout=None, return_when=ALL_COMPLETED): """Wait for the futures in the given sequence to complete. Args: - fs: The sequence of futures (created by Executor.submit()) to wait upon. + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. - return_when: Indicates when the method should return. The options + return_when: Indicates when this function should return. The options are: FIRST_COMPLETED - Return when any future finishes or is @@ -216,26 +218,25 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): ALL_COMPLETED - Return when all futures finish or are cancelled. Returns: - A named 2-tuple of sets. The first set, named 'finished', contains the + A named 2-tuple of sets. The first set, named 'done', contains the futures that completed (is finished or cancelled) before the wait - completed. The second set, named 'not_finished', contains uncompleted + completed. The second set, named 'not_done', contains uncompleted futures. """ with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_finished = set(fs) - finished - - if (return_when == FIRST_COMPLETED) and finished: - return finished, not_finished - elif (return_when == FIRST_EXCEPTION) and finished: - if any(f for f in finished + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done if not f.cancelled() and f.exception() is not None): - return finished, not_finished + return DoneAndNotDoneFutures(done, not_done) - if len(finished) == len(fs): - return finished, not_finished + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) waiter = _create_and_install_waiters(fs, return_when) @@ -243,8 +244,8 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): for f in fs: f._waiters.remove(waiter) - finished.update(waiter.finished_futures) - return FinishedAndNotFinishedFutures(finished, set(fs) - finished) + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) class Future(object): """Represents the result of an asynchronous computation.""" @@ -256,7 +257,7 @@ class Future(object): self._result = None self._exception = None self._waiters = [] - self._done_callbacks = set() + self._done_callbacks = [] def _invoke_callbacks(self): for callback in self._done_callbacks: @@ -320,33 +321,22 @@ class Future(object): return self._result def add_done_callback(self, fn): - """Attaches a function that will be called when the future finishes. + """Attaches a callable that will be called when the future finishes. Args: - fn: A function that will be called with this future as its only - argument when the future completes or is cancelled. If the - future has already completed or been cancelled then the function - will be called immediately. If the same function is added - several times then it will still only be called once. + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. """ with self._condition: if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: - self._done_callbacks.add(fn) + self._done_callbacks.append(fn) return fn(self) - def remove_done_callback(self, fn): - """Removes a function previously attached by add_done_callback. - - Args: - fn: A function that was previously added using add_done_callback. - - Raises: - KeyError: if the function was not added using add_done_callback or - was already removed. - """ - self._done_callbacks.remove(fn) - def result(self, timeout=None): """Return the result of the call that the future represents. @@ -480,13 +470,13 @@ class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" def submit(self, fn, *args, **kwargs): - """Submits a function to be executed with the given arguments. + """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the function. + a Future instance representing the execution of the callable. Returns: - A Future representing the given function call. + A Future representing the given call. """ raise NotImplementedError() diff --git a/python3/futures/process.py b/python3/futures/process.py index 29244f1..92a1b55 100644 --- a/python3/futures/process.py +++ b/python3/futures/process.py @@ -58,7 +58,7 @@ import weakref # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the diff --git a/python3/futures/thread.py b/python3/futures/thread.py index 4c0b2bc..b6f2713 100644 --- a/python3/futures/thread.py +++ b/python3/futures/thread.py @@ -17,7 +17,7 @@ import weakref # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the diff --git a/python3/test_futures.py b/python3/test_futures.py index 09e1866..b032769 100644 --- a/python3/test_futures.py +++ b/python3/test_futures.py @@ -234,12 +234,12 @@ class WaitTests(unittest.TestCase): t = threading.Thread(target=wait_test) t.start() - finished, pending = futures.wait( + done, not_done = futures.wait( [CANCELLED_FUTURE, future1, future2], return_when=futures.FIRST_COMPLETED) - self.assertEquals(set([future1]), finished) - self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) + self.assertEquals(set([future1]), done) + self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) finally: call2.set_can() call1.close() @@ -651,26 +651,6 @@ class FutureTests(unittest.TestCase): f.add_done_callback(fn) self.assertTrue(was_cancelled) - def test_remove_done_callback(self): - was_called = False - def fn(callback_future): - nonlocal was_called - was_called = True - - f = Future() - f.add_done_callback(fn) - f.remove_done_callback(fn) - self.assertFalse(was_called) - - def test_remove_done_callback_twice(self): - def fn(callback_future): - pass - - f = Future() - f.add_done_callback(fn) - f.remove_done_callback(fn) - self.assertRaises(KeyError, f.remove_done_callback, fn) - def test_repr(self): self.assertRegexpMatches(repr(PENDING_FUTURE), '<Future at 0x[0-9a-f]+ state=pending>') |