summaryrefslogtreecommitdiff
path: root/Lib
diff options
context:
space:
mode:
authorYury Selivanov <yselivanov@sprymix.com>2015-08-05 13:52:33 -0400
committerYury Selivanov <yselivanov@sprymix.com>2015-08-05 13:52:33 -0400
commit7752a62f41a4f22edff9a24f5c46ecf51918bd82 (patch)
treea12cbe56e114bf98c29aba89f1abb6bac16c910c /Lib
parent58432020d72efe51395cd0da2736bb62e440f832 (diff)
downloadcpython-7752a62f41a4f22edff9a24f5c46ecf51918bd82.tar.gz
Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation.
Patch by Gustavo J. A. M. Carneiro.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asyncio/queues.py47
-rw-r--r--Lib/test/test_asyncio/test_queues.py61
2 files changed, 98 insertions, 10 deletions
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index c55dd8bbb0..b26edfbe4f 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -47,7 +47,7 @@ class Queue:
# Futures.
self._getters = collections.deque()
- # Pairs of (item, Future).
+ # Futures
self._putters = collections.deque()
self._unfinished_tasks = 0
self._finished = locks.Event(loop=self._loop)
@@ -98,7 +98,7 @@ class Queue:
def _consume_done_putters(self):
# Delete waiters at the head of the put() queue who've timed out.
- while self._putters and self._putters[0][1].done():
+ while self._putters and self._putters[0].done():
self._putters.popleft()
def qsize(self):
@@ -148,8 +148,9 @@ class Queue:
elif self._maxsize > 0 and self._maxsize <= self.qsize():
waiter = futures.Future(loop=self._loop)
- self._putters.append((item, waiter))
+ self._putters.append(waiter)
yield from waiter
+ self._put(item)
else:
self.__put_internal(item)
@@ -186,8 +187,7 @@ class Queue:
self._consume_done_putters()
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
- item, putter = self._putters.popleft()
- self.__put_internal(item)
+ putter = self._putters.popleft()
# When a getter runs and frees up a slot so this putter can
# run, we need to defer the put for a tick to ensure that
@@ -201,9 +201,39 @@ class Queue:
return self._get()
else:
waiter = futures.Future(loop=self._loop)
-
self._getters.append(waiter)
- return (yield from waiter)
+ try:
+ return (yield from waiter)
+ except futures.CancelledError:
+ # if we get CancelledError, it means someone cancelled this
+ # get() coroutine. But there is a chance that the waiter
+ # already is ready and contains an item that has just been
+ # removed from the queue. In this case, we need to put the item
+ # back into the front of the queue. This get() must either
+ # succeed without fault or, if it gets cancelled, it must be as
+ # if it never happened.
+ if waiter.done():
+ self._put_it_back(waiter.result())
+ raise
+
+ def _put_it_back(self, item):
+ """
+ This is called when we have a waiter to get() an item and this waiter
+ gets cancelled. In this case, we put the item back: wake up another
+ waiter or put it in the _queue.
+ """
+ self._consume_done_getters()
+ if self._getters:
+ assert not self._queue, (
+ 'queue non-empty, why are getters waiting?')
+
+ getter = self._getters.popleft()
+ self._put_internal(item)
+
+ # getter cannot be cancelled, we just removed done getters
+ getter.set_result(item)
+ else:
+ self._queue.appendleft(item)
def get_nowait(self):
"""Remove and return an item from the queue.
@@ -213,8 +243,7 @@ class Queue:
self._consume_done_putters()
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
- item, putter = self._putters.popleft()
- self.__put_internal(item)
+ putter = self._putters.popleft()
# Wake putter on next tick.
# getter cannot be cancelled, we just removed done putters
diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py
index 88b4f07502..7c7d0eae52 100644
--- a/Lib/test/test_asyncio/test_queues.py
+++ b/Lib/test/test_asyncio/test_queues.py
@@ -171,7 +171,7 @@ class QueueGetTests(_QueueTestBase):
q.put_nowait(1)
waiter = asyncio.Future(loop=self.loop)
- q._putters.append((2, waiter))
+ q._putters.append(waiter)
res = self.loop.run_until_complete(q.get())
self.assertEqual(1, res)
@@ -322,6 +322,64 @@ class QueuePutTests(_QueueTestBase):
q.put_nowait(1)
self.assertEqual(1, q.get_nowait())
+ def test_get_cancel_drop(self):
+ def gen():
+ yield 0.01
+ yield 0.1
+
+ loop = self.new_test_loop(gen)
+
+ q = asyncio.Queue(loop=loop)
+
+ reader = loop.create_task(q.get())
+
+ loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+ q.put_nowait(1)
+ q.put_nowait(2)
+ reader.cancel()
+
+ try:
+ loop.run_until_complete(reader)
+ except asyncio.CancelledError:
+ # try again
+ reader = loop.create_task(q.get())
+ loop.run_until_complete(reader)
+
+ result = reader.result()
+ # if we get 2, it means 1 got dropped!
+ self.assertEqual(1, result)
+
+ def test_put_cancel_drop(self):
+
+ def gen():
+ yield 0.01
+ yield 0.1
+
+ loop = self.new_test_loop(gen)
+ q = asyncio.Queue(1, loop=loop)
+
+ q.put_nowait(1)
+
+ # putting a second item in the queue has to block (qsize=1)
+ writer = loop.create_task(q.put(2))
+ loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+ value1 = q.get_nowait()
+ self.assertEqual(value1, 1)
+
+ writer.cancel()
+ try:
+ loop.run_until_complete(writer)
+ except asyncio.CancelledError:
+ # try again
+ writer = loop.create_task(q.put(2))
+ loop.run_until_complete(writer)
+
+ value2 = q.get_nowait()
+ self.assertEqual(value2, 2)
+ self.assertEqual(q.qsize(), 0)
+
def test_nonblocking_put_exception(self):
q = asyncio.Queue(maxsize=1, loop=self.loop)
q.put_nowait(1)
@@ -374,6 +432,7 @@ class QueuePutTests(_QueueTestBase):
test_utils.run_briefly(self.loop)
self.assertTrue(put_c.done())
self.assertEqual(q.get_nowait(), 'a')
+ test_utils.run_briefly(self.loop)
self.assertEqual(q.get_nowait(), 'b')
self.loop.run_until_complete(put_b)