diff options
author | brian.quinlan <devnull@localhost> | 2010-02-24 23:08:21 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2010-02-24 23:08:21 +0000 |
commit | ff6ed1dfbc9d919823c85f4b9046eef80b30e545 (patch) | |
tree | 863bf03fd0c20b1b776e8a926304ec82db868b9f | |
parent | 36bcb2a39438545db7c4b20d26ac67c462214e6e (diff) | |
download | futures-ff6ed1dfbc9d919823c85f4b9046eef80b30e545.tar.gz |
Updates to the Future class to make it possible to create Executor subclasses without calling private methods
-rw-r--r-- | PEP.txt | 35 | ||||
-rw-r--r-- | docs/index.rst | 38 | ||||
-rw-r--r-- | python3/futures/_base.py | 95 | ||||
-rw-r--r-- | python3/futures/process.py | 24 | ||||
-rw-r--r-- | python3/futures/thread.py | 31 | ||||
-rw-r--r-- | python3/test_futures.py | 2 |
6 files changed, 158 insertions, 67 deletions
@@ -179,6 +179,10 @@ will be cancelled and the method will return `True`. Return `True` if the call was successfully cancelled. +`Future.running()` + +Return `True` if the call is currently being executed and cannot be cancelled. + `Future.done()` Return `True` if the call was successfully cancelled or finished running. @@ -208,6 +212,37 @@ be raised. If the call completed without raising then ``None`` is returned. +Internal Future Methods +^^^^^^^^^^^^^^^^^^^^^^^ + +The following `Future` methods are meant for use in unit tests and +:class:`Executor` implementations. + +`set_running_or_notify_cancel()` + +Should be called by `Executor` implementations before executing the work +associated with the `Future`. + +If the method returns `False` then the `Future` was cancelled i.e. +`Future.cancel` was called and returned `True`. Any threads waiting on the +`Future` completing (i.e. through `as_completed()` or `wait()`) will be woken +up. + +If the method returns `True` then the `Future` was not cancelled and has been +put in the running state i.e. calls to `Future.running()` will return `True`. + +This method can only be called once and cannot be called after +`Future.set_result()` or `Future.set_exception()` have been called. + +`set_result(result)` + +Sets the result of the work associated with the `Future`. + +`set_exception(exception)` + +Sets the result of the work associated with the `Future` to the given +`Exception`. + Module Functions '''''''''''''''' diff --git a/docs/index.rst b/docs/index.rst index 4b1a7aa..797aa57 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -167,6 +167,11 @@ or method call. :class:`Future` instances are created by Return `True` if the call was successfully cancelled. +.. method:: Future.running() + + Return `True` if the call is currently being executed and cannot be + cancelled. + .. method:: Future.done() Return `True` if the call was successfully cancelled or finished running. @@ -196,6 +201,39 @@ or method call. :class:`Future` instances are created by If the call completed without raising then ``None`` is returned. +Internal Future Methods +^^^^^^^^^^^^^^^^^^^^^^^ + +The following :class:`Future` methods are meant for use in unit tests and +:class:`Executor` implementations. + +.. method:: Future.set_running_or_notify_cancel() + + Should be called by :class:`Executor` implementations before executing the + work associated with the :class:`Future`. + + If the method returns `False` then the :class:`Future` was cancelled i.e. + :meth:`Future.cancel` was called and returned `True`. Any threads waiting + on the :class:`Future` completing (i.e. through :func:`as_completed` or + :func:`wait`) will be woken up. + + If the method returns `True` then the :class:`Future` was not cancelled + and has been put in the running state i.e. calls to + :meth:`Future.running` will return `True`. + + This method can only be called once and cannot be called after + :meth:`Future.set_result` or :meth:`Future.set_exception` have been + called. + +.. method:: Future.set_result(result) + + Sets the result of the work associated with the :class:`Future` to *result*. + +.. method:: Future.set_exception(exception) + + Sets the result of the work associated with the :class:`Future` to the + :class:`Exception` *exception*. + Module Functions ---------------- diff --git a/python3/futures/_base.py b/python3/futures/_base.py index 366cd9d..df8975c 100644 --- a/python3/futures/_base.py +++ b/python3/futures/_base.py @@ -304,38 +304,6 @@ class Future(object): with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - def _check_cancel_and_notify(self): - '''Check if the Future was cancelled and notify its waiter if it was. - - Returns: - True if the Future was cancelled, False otherwise. - ''' - with self._condition: - if self._state == CANCELLED: - self._state = CANCELLED_AND_NOTIFIED - for waiter in self._waiters: - waiter.add_cancelled(self) - # self._condition.notify_all() is not necessary because - # self.cancel() triggers a notification. - return True - return False - - def _set_result(self, result): - with self._condition: - self._result = result - self._state = FINISHED - for waiter in self._waiters: - waiter.add_result(self) - self._condition.notify_all() - - def _set_exception(self, exception): - with self._condition: - self._exception = exception - self._state = FINISHED - for waiter in self._waiters: - waiter.add_exception(self) - self._condition.notify_all() - def __get_result(self): if self._exception: raise self._exception @@ -406,6 +374,69 @@ class Future(object): else: raise TimeoutError() + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + '''Mark the future as running or process any cancel notifications. + + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. + + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. + + Raises: + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. + ''' + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True + else: + LOGGER.critical('Future %s in unexpected state: %s', + id(self.future), + self.future._state) + raise RuntimeError('Future in unexpected state') + + def set_result(self, result): + """Sets the return value of work associated with the future. + + Should only be called by Executor implementations. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be called by Executor implementations. + """ + with self._condition: + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" diff --git a/python3/futures/process.py b/python3/futures/process.py index 4d3b643..a2c8227 100644 --- a/python3/futures/process.py +++ b/python3/futures/process.py @@ -44,8 +44,8 @@ Process #1..n: __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import RUNNING, Executor, Future import atexit +import futures import queue import multiprocessing import threading @@ -170,17 +170,15 @@ def _add_call_item_to_queue(pending_work_items, else: work_item = pending_work_items[work_id] - if work_item.future._check_cancel_and_notify(): - del pending_work_items[work_id] - continue - else: - with work_item.future._condition: - work_item.future._state = RUNNING + if work_item.future.set_running_or_notify_cancel(): call_queue.put(_CallItem(work_id, work_item.fn, work_item.args, work_item.kwargs), block=True) + else: + del pending_work_items[work_id] + continue def _queue_manangement_worker(executor_reference, processes, @@ -242,11 +240,11 @@ def _queue_manangement_worker(executor_reference, with work_item.future._condition: if result_item.exception: - work_item.future._set_exception(result_item.exception) + work_item.future.set_exception(result_item.exception) else: - work_item.future._set_result(result_item.result) + work_item.future.set_result(result_item.result) -class ProcessPoolExecutor(Executor): +class ProcessPoolExecutor(futures._base.Executor): def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. @@ -309,7 +307,7 @@ class ProcessPoolExecutor(Executor): if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') - f = Future() + f = futures._base.Future() w = _WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w @@ -319,7 +317,7 @@ class ProcessPoolExecutor(Executor): self._start_queue_management_thread() self._adjust_process_count() return f - submit.__doc__ = Executor.submit.__doc__ + submit.__doc__ = futures._base.Executor.submit.__doc__ def shutdown(self, wait=True): with self._shutdown_lock: @@ -328,7 +326,7 @@ class ProcessPoolExecutor(Executor): if self._queue_management_thread: self._queue_management_thread.join() - shutdown.__doc__ = Executor.shutdown.__doc__ + shutdown.__doc__ = futures._base.Executor.shutdown.__doc__ atexit.register(_python_exit) diff --git a/python3/futures/thread.py b/python3/futures/thread.py index b6ea1ba..4c0b2bc 100644 --- a/python3/futures/thread.py +++ b/python3/futures/thread.py @@ -4,11 +4,8 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - LOGGER, - Executor, Future) import atexit +import futures._base import queue import threading import weakref @@ -60,23 +57,15 @@ class _WorkItem(object): self.kwargs = kwargs def run(self): - with self.future._condition: - if self.future._state == PENDING: - self.future._state = RUNNING - elif self.future._check_cancel_and_notify(): - return - else: - LOGGER.critical('Future %s in unexpected state: %s', - id(self.future), - self.future._state) - return + if not self.future.set_running_or_notify_cancel(): + return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: - self.future._set_exception(e) + self.future.set_exception(e) else: - self.future._set_result(result) + self.future.set_result(result) def _worker(executor_reference, work_queue): try: @@ -95,9 +84,9 @@ def _worker(executor_reference, work_queue): else: work_item.run() except BaseException as e: - LOGGER.critical('Exception in worker', exc_info=True) + futures._base.LOGGER.critical('Exception in worker', exc_info=True) -class ThreadPoolExecutor(Executor): +class ThreadPoolExecutor(futures._base.Executor): def __init__(self, max_workers): """Initializes a new ThreadPoolExecutor instance. @@ -118,13 +107,13 @@ class ThreadPoolExecutor(Executor): if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') - f = Future() + f = futures._base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f - submit.__doc__ = Executor.submit.__doc__ + submit.__doc__ = futures._base.Executor.submit.__doc__ def _adjust_thread_count(self): # TODO(bquinlan): Should avoid creating new threads if there are more @@ -143,5 +132,5 @@ class ThreadPoolExecutor(Executor): if wait: for t in self._threads: t.join() - shutdown.__doc__ = Executor.shutdown.__doc__ + shutdown.__doc__ = futures._base.Executor.shutdown.__doc__ diff --git a/python3/test_futures.py b/python3/test_futures.py index 6241d0f..baf5e42 100644 --- a/python3/test_futures.py +++ b/python3/test_futures.py @@ -621,7 +621,7 @@ class FutureTests(unittest.TestCase): def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - f1._set_result(42) + f1.set_result(42) f1 = create_future(state=PENDING) t = threading.Thread(target=notification) |