summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2010-02-24 23:08:21 +0000
committerbrian.quinlan <devnull@localhost>2010-02-24 23:08:21 +0000
commitff6ed1dfbc9d919823c85f4b9046eef80b30e545 (patch)
tree863bf03fd0c20b1b776e8a926304ec82db868b9f
parent36bcb2a39438545db7c4b20d26ac67c462214e6e (diff)
downloadfutures-ff6ed1dfbc9d919823c85f4b9046eef80b30e545.tar.gz
Updates to the Future class to make it possible to create Executor subclasses without calling private methods
-rw-r--r--PEP.txt35
-rw-r--r--docs/index.rst38
-rw-r--r--python3/futures/_base.py95
-rw-r--r--python3/futures/process.py24
-rw-r--r--python3/futures/thread.py31
-rw-r--r--python3/test_futures.py2
6 files changed, 158 insertions, 67 deletions
diff --git a/PEP.txt b/PEP.txt
index d2e2175..2041d0f 100644
--- a/PEP.txt
+++ b/PEP.txt
@@ -179,6 +179,10 @@ will be cancelled and the method will return `True`.
Return `True` if the call was successfully cancelled.
+`Future.running()`
+
+Return `True` if the call is currently being executed and cannot be cancelled.
+
`Future.done()`
Return `True` if the call was successfully cancelled or finished running.
@@ -208,6 +212,37 @@ be raised.
If the call completed without raising then ``None`` is returned.
+Internal Future Methods
+^^^^^^^^^^^^^^^^^^^^^^^
+
+The following `Future` methods are meant for use in unit tests and
+:class:`Executor` implementations.
+
+`set_running_or_notify_cancel()`
+
+Should be called by `Executor` implementations before executing the work
+associated with the `Future`.
+
+If the method returns `False` then the `Future` was cancelled i.e.
+`Future.cancel` was called and returned `True`. Any threads waiting on the
+`Future` completing (i.e. through `as_completed()` or `wait()`) will be woken
+up.
+
+If the method returns `True` then the `Future` was not cancelled and has been
+put in the running state i.e. calls to `Future.running()` will return `True`.
+
+This method can only be called once and cannot be called after
+`Future.set_result()` or `Future.set_exception()` have been called.
+
+`set_result(result)`
+
+Sets the result of the work associated with the `Future`.
+
+`set_exception(exception)`
+
+Sets the result of the work associated with the `Future` to the given
+`Exception`.
+
Module Functions
''''''''''''''''
diff --git a/docs/index.rst b/docs/index.rst
index 4b1a7aa..797aa57 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -167,6 +167,11 @@ or method call. :class:`Future` instances are created by
Return `True` if the call was successfully cancelled.
+.. method:: Future.running()
+
+ Return `True` if the call is currently being executed and cannot be
+ cancelled.
+
.. method:: Future.done()
Return `True` if the call was successfully cancelled or finished running.
@@ -196,6 +201,39 @@ or method call. :class:`Future` instances are created by
If the call completed without raising then ``None`` is returned.
+Internal Future Methods
+^^^^^^^^^^^^^^^^^^^^^^^
+
+The following :class:`Future` methods are meant for use in unit tests and
+:class:`Executor` implementations.
+
+.. method:: Future.set_running_or_notify_cancel()
+
+ Should be called by :class:`Executor` implementations before executing the
+ work associated with the :class:`Future`.
+
+ If the method returns `False` then the :class:`Future` was cancelled i.e.
+ :meth:`Future.cancel` was called and returned `True`. Any threads waiting
+ on the :class:`Future` completing (i.e. through :func:`as_completed` or
+ :func:`wait`) will be woken up.
+
+ If the method returns `True` then the :class:`Future` was not cancelled
+ and has been put in the running state i.e. calls to
+ :meth:`Future.running` will return `True`.
+
+ This method can only be called once and cannot be called after
+ :meth:`Future.set_result` or :meth:`Future.set_exception` have been
+ called.
+
+.. method:: Future.set_result(result)
+
+ Sets the result of the work associated with the :class:`Future` to *result*.
+
+.. method:: Future.set_exception(exception)
+
+ Sets the result of the work associated with the :class:`Future` to the
+ :class:`Exception` *exception*.
+
Module Functions
----------------
diff --git a/python3/futures/_base.py b/python3/futures/_base.py
index 366cd9d..df8975c 100644
--- a/python3/futures/_base.py
+++ b/python3/futures/_base.py
@@ -304,38 +304,6 @@ class Future(object):
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
- def _check_cancel_and_notify(self):
- '''Check if the Future was cancelled and notify its waiter if it was.
-
- Returns:
- True if the Future was cancelled, False otherwise.
- '''
- with self._condition:
- if self._state == CANCELLED:
- self._state = CANCELLED_AND_NOTIFIED
- for waiter in self._waiters:
- waiter.add_cancelled(self)
- # self._condition.notify_all() is not necessary because
- # self.cancel() triggers a notification.
- return True
- return False
-
- def _set_result(self, result):
- with self._condition:
- self._result = result
- self._state = FINISHED
- for waiter in self._waiters:
- waiter.add_result(self)
- self._condition.notify_all()
-
- def _set_exception(self, exception):
- with self._condition:
- self._exception = exception
- self._state = FINISHED
- for waiter in self._waiters:
- waiter.add_exception(self)
- self._condition.notify_all()
-
def __get_result(self):
if self._exception:
raise self._exception
@@ -406,6 +374,69 @@ class Future(object):
else:
raise TimeoutError()
+ # The following methods should only be used by Executors and in tests.
+ def set_running_or_notify_cancel(self):
+ '''Mark the future as running or process any cancel notifications.
+
+ If the future has been cancelled (cancel() was called and returned
+ True) then any threads waiting on the future completing (though calls
+ to as_completed() or wait()) are notified and False is returned.
+
+ If the future was not cancelled then it is put in the running state
+ (future calls to running() will return True) and True is returned.
+
+ This method should be called by Executor implementations before
+ executing the work associated with this future. If this method returns
+ False then the work should not be executed.
+
+ Returns:
+ False if the Future was cancelled, True otherwise.
+
+ Raises:
+ RuntimeError: if this method was already called or if set_result()
+ or set_exception() was called.
+ '''
+ with self._condition:
+ if self._state == CANCELLED:
+ self._state = CANCELLED_AND_NOTIFIED
+ for waiter in self._waiters:
+ waiter.add_cancelled(self)
+ # self._condition.notify_all() is not necessary because
+ # self.cancel() triggers a notification.
+ return False
+ elif self._state == PENDING:
+ self._state = RUNNING
+ return True
+ else:
+ LOGGER.critical('Future %s in unexpected state: %s',
+ id(self.future),
+ self.future._state)
+ raise RuntimeError('Future in unexpected state')
+
+ def set_result(self, result):
+ """Sets the return value of work associated with the future.
+
+ Should only be called by Executor implementations.
+ """
+ with self._condition:
+ self._result = result
+ self._state = FINISHED
+ for waiter in self._waiters:
+ waiter.add_result(self)
+ self._condition.notify_all()
+
+ def set_exception(self, exception):
+ """Sets the result of the future as being the given exception.
+
+ Should only be called by Executor implementations.
+ """
+ with self._condition:
+ self._exception = exception
+ self._state = FINISHED
+ for waiter in self._waiters:
+ waiter.add_exception(self)
+ self._condition.notify_all()
+
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
diff --git a/python3/futures/process.py b/python3/futures/process.py
index 4d3b643..a2c8227 100644
--- a/python3/futures/process.py
+++ b/python3/futures/process.py
@@ -44,8 +44,8 @@ Process #1..n:
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
-from futures._base import RUNNING, Executor, Future
import atexit
+import futures
import queue
import multiprocessing
import threading
@@ -170,17 +170,15 @@ def _add_call_item_to_queue(pending_work_items,
else:
work_item = pending_work_items[work_id]
- if work_item.future._check_cancel_and_notify():
- del pending_work_items[work_id]
- continue
- else:
- with work_item.future._condition:
- work_item.future._state = RUNNING
+ if work_item.future.set_running_or_notify_cancel():
call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
+ else:
+ del pending_work_items[work_id]
+ continue
def _queue_manangement_worker(executor_reference,
processes,
@@ -242,11 +240,11 @@ def _queue_manangement_worker(executor_reference,
with work_item.future._condition:
if result_item.exception:
- work_item.future._set_exception(result_item.exception)
+ work_item.future.set_exception(result_item.exception)
else:
- work_item.future._set_result(result_item.result)
+ work_item.future.set_result(result_item.result)
-class ProcessPoolExecutor(Executor):
+class ProcessPoolExecutor(futures._base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
@@ -309,7 +307,7 @@ class ProcessPoolExecutor(Executor):
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
- f = Future()
+ f = futures._base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
@@ -319,7 +317,7 @@ class ProcessPoolExecutor(Executor):
self._start_queue_management_thread()
self._adjust_process_count()
return f
- submit.__doc__ = Executor.submit.__doc__
+ submit.__doc__ = futures._base.Executor.submit.__doc__
def shutdown(self, wait=True):
with self._shutdown_lock:
@@ -328,7 +326,7 @@ class ProcessPoolExecutor(Executor):
if self._queue_management_thread:
self._queue_management_thread.join()
- shutdown.__doc__ = Executor.shutdown.__doc__
+ shutdown.__doc__ = futures._base.Executor.shutdown.__doc__
atexit.register(_python_exit)
diff --git a/python3/futures/thread.py b/python3/futures/thread.py
index b6ea1ba..4c0b2bc 100644
--- a/python3/futures/thread.py
+++ b/python3/futures/thread.py
@@ -4,11 +4,8 @@
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
-from futures._base import (PENDING, RUNNING, CANCELLED,
- CANCELLED_AND_NOTIFIED, FINISHED,
- LOGGER,
- Executor, Future)
import atexit
+import futures._base
import queue
import threading
import weakref
@@ -60,23 +57,15 @@ class _WorkItem(object):
self.kwargs = kwargs
def run(self):
- with self.future._condition:
- if self.future._state == PENDING:
- self.future._state = RUNNING
- elif self.future._check_cancel_and_notify():
- return
- else:
- LOGGER.critical('Future %s in unexpected state: %s',
- id(self.future),
- self.future._state)
- return
+ if not self.future.set_running_or_notify_cancel():
+ return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
- self.future._set_exception(e)
+ self.future.set_exception(e)
else:
- self.future._set_result(result)
+ self.future.set_result(result)
def _worker(executor_reference, work_queue):
try:
@@ -95,9 +84,9 @@ def _worker(executor_reference, work_queue):
else:
work_item.run()
except BaseException as e:
- LOGGER.critical('Exception in worker', exc_info=True)
+ futures._base.LOGGER.critical('Exception in worker', exc_info=True)
-class ThreadPoolExecutor(Executor):
+class ThreadPoolExecutor(futures._base.Executor):
def __init__(self, max_workers):
"""Initializes a new ThreadPoolExecutor instance.
@@ -118,13 +107,13 @@ class ThreadPoolExecutor(Executor):
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
- f = Future()
+ f = futures._base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
- submit.__doc__ = Executor.submit.__doc__
+ submit.__doc__ = futures._base.Executor.submit.__doc__
def _adjust_thread_count(self):
# TODO(bquinlan): Should avoid creating new threads if there are more
@@ -143,5 +132,5 @@ class ThreadPoolExecutor(Executor):
if wait:
for t in self._threads:
t.join()
- shutdown.__doc__ = Executor.shutdown.__doc__
+ shutdown.__doc__ = futures._base.Executor.shutdown.__doc__
diff --git a/python3/test_futures.py b/python3/test_futures.py
index 6241d0f..baf5e42 100644
--- a/python3/test_futures.py
+++ b/python3/test_futures.py
@@ -621,7 +621,7 @@ class FutureTests(unittest.TestCase):
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
- f1._set_result(42)
+ f1.set_result(42)
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)