summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikolay Kim <fafhrd91@gmail.com>2013-10-17 12:19:02 -0700
committerNikolay Kim <fafhrd91@gmail.com>2013-10-17 12:19:02 -0700
commit112322738c23f4b96f725e33864c5b8ee9e8d866 (patch)
tree1c53ad4bbfa1572ca590add5dda9f850887fddc4
parent6735a0002e8d152450e001c74c98e61537db90c3 (diff)
downloadtrollius-112322738c23f4b96f725e33864c5b8ee9e8d866.tar.gz
custom implementation for wait_for
-rw-r--r--tests/queues_test.py4
-rw-r--r--tests/tasks_test.py1
-rw-r--r--tulip/tasks.py30
3 files changed, 21 insertions, 14 deletions
diff --git a/tests/queues_test.py b/tests/queues_test.py
index 437a1c3..ae1e3a1 100644
--- a/tests/queues_test.py
+++ b/tests/queues_test.py
@@ -242,7 +242,7 @@ class QueueGetTests(_QueueTestBase):
when = yield
self.assertAlmostEqual(0.01, when)
when = yield 0.01
- self.assertAlmostEqual(0.06, when)
+ self.assertAlmostEqual(0.061, when)
yield 0.05
loop = test_utils.TestLoop(gen)
@@ -252,7 +252,7 @@ class QueueGetTests(_QueueTestBase):
@tasks.coroutine
def queue_get():
- return (yield from tasks.wait_for(q.get(), 0.05, loop=loop))
+ return (yield from tasks.wait_for(q.get(), 0.051, loop=loop))
@tasks.coroutine
def test():
diff --git a/tests/tasks_test.py b/tests/tasks_test.py
index b8dcd7e..161bff8 100644
--- a/tests/tasks_test.py
+++ b/tests/tasks_test.py
@@ -374,7 +374,6 @@ class TaskTests(unittest.TestCase):
self.assertFalse(fut.done())
self.assertAlmostEqual(0.1, loop.time())
- loop
# wait for result
res = loop.run_until_complete(
tasks.wait_for(fut, 0.3, loop=loop))
diff --git a/tulip/tasks.py b/tulip/tasks.py
index b020d9c..c998a7e 100644
--- a/tulip/tasks.py
+++ b/tulip/tasks.py
@@ -349,6 +349,11 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
return (yield from _wait(fs, timeout, return_when, loop))
+def _release_waiter(waiter, value=True, *args):
+ if not waiter.done():
+ waiter.set_result(value)
+
+
@coroutine
def wait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
@@ -366,18 +371,21 @@ def wait_for(fut, timeout, *, loop=None):
if loop is None:
loop = events.get_event_loop()
- fut = async(fut, loop=loop)
-
- done, pending = yield from _wait([fut], timeout, FIRST_COMPLETED, loop)
- if done:
- return done.pop().result()
-
- raise futures.TimeoutError()
+ waiter = futures.Future(loop=loop)
+ timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
+ cb = functools.partial(_release_waiter, waiter, True)
+ fut = async(fut, loop=loop)
+ fut.add_done_callback(cb)
-def _waiter_timeout(waiter):
- if not waiter.done():
- waiter.set_result(False)
+ try:
+ if (yield from waiter):
+ return fut.result()
+ else:
+ fut.remove_done_callback(cb)
+ raise futures.TimeoutError()
+ finally:
+ timeout_handle.cancel()
@coroutine
@@ -390,7 +398,7 @@ def _wait(fs, timeout, return_when, loop):
waiter = futures.Future(loop=loop)
timeout_handle = None
if timeout is not None:
- timeout_handle = loop.call_later(timeout, _waiter_timeout, waiter)
+ timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):