diff options
author | Nikolay Kim <fafhrd91@gmail.com> | 2013-10-17 12:19:02 -0700 |
---|---|---|
committer | Nikolay Kim <fafhrd91@gmail.com> | 2013-10-17 12:19:02 -0700 |
commit | 112322738c23f4b96f725e33864c5b8ee9e8d866 (patch) | |
tree | 1c53ad4bbfa1572ca590add5dda9f850887fddc4 | |
parent | 6735a0002e8d152450e001c74c98e61537db90c3 (diff) | |
download | trollius-112322738c23f4b96f725e33864c5b8ee9e8d866.tar.gz |
custom implementation for wait_for
-rw-r--r-- | tests/queues_test.py | 4 | ||||
-rw-r--r-- | tests/tasks_test.py | 1 | ||||
-rw-r--r-- | tulip/tasks.py | 30 |
3 files changed, 21 insertions, 14 deletions
diff --git a/tests/queues_test.py b/tests/queues_test.py index 437a1c3..ae1e3a1 100644 --- a/tests/queues_test.py +++ b/tests/queues_test.py @@ -242,7 +242,7 @@ class QueueGetTests(_QueueTestBase): when = yield self.assertAlmostEqual(0.01, when) when = yield 0.01 - self.assertAlmostEqual(0.06, when) + self.assertAlmostEqual(0.061, when) yield 0.05 loop = test_utils.TestLoop(gen) @@ -252,7 +252,7 @@ class QueueGetTests(_QueueTestBase): @tasks.coroutine def queue_get(): - return (yield from tasks.wait_for(q.get(), 0.05, loop=loop)) + return (yield from tasks.wait_for(q.get(), 0.051, loop=loop)) @tasks.coroutine def test(): diff --git a/tests/tasks_test.py b/tests/tasks_test.py index b8dcd7e..161bff8 100644 --- a/tests/tasks_test.py +++ b/tests/tasks_test.py @@ -374,7 +374,6 @@ class TaskTests(unittest.TestCase): self.assertFalse(fut.done()) self.assertAlmostEqual(0.1, loop.time()) - loop # wait for result res = loop.run_until_complete( tasks.wait_for(fut, 0.3, loop=loop)) diff --git a/tulip/tasks.py b/tulip/tasks.py index b020d9c..c998a7e 100644 --- a/tulip/tasks.py +++ b/tulip/tasks.py @@ -349,6 +349,11 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): return (yield from _wait(fs, timeout, return_when, loop)) +def _release_waiter(waiter, value=True, *args): + if not waiter.done(): + waiter.set_result(value) + + @coroutine def wait_for(fut, timeout, *, loop=None): """Wait for the single Future or coroutine to complete, with timeout. @@ -366,18 +371,21 @@ def wait_for(fut, timeout, *, loop=None): if loop is None: loop = events.get_event_loop() - fut = async(fut, loop=loop) - - done, pending = yield from _wait([fut], timeout, FIRST_COMPLETED, loop) - if done: - return done.pop().result() - - raise futures.TimeoutError() + waiter = futures.Future(loop=loop) + timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False) + cb = functools.partial(_release_waiter, waiter, True) + fut = async(fut, loop=loop) + fut.add_done_callback(cb) -def _waiter_timeout(waiter): - if not waiter.done(): - waiter.set_result(False) + try: + if (yield from waiter): + return fut.result() + else: + fut.remove_done_callback(cb) + raise futures.TimeoutError() + finally: + timeout_handle.cancel() @coroutine @@ -390,7 +398,7 @@ def _wait(fs, timeout, return_when, loop): waiter = futures.Future(loop=loop) timeout_handle = None if timeout is not None: - timeout_handle = loop.call_later(timeout, _waiter_timeout, waiter) + timeout_handle = loop.call_later(timeout, _release_waiter, waiter) counter = len(fs) def _on_completion(f): |