summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2015-01-23 23:30:31 -0500
committerA. Jesse Jiryu Davis <jesse@mongodb.com>2015-01-23 23:30:31 -0500
commit63efd76c158790c22f102b684e9b055d5679cf72 (patch)
treead0b76e8a92f2e71f9cd3bc3c8c4efabc8663e42
parent94e89af1ad36502af5e78932eaa03dbc26d8d829 (diff)
downloadtrollius-63efd76c158790c22f102b684e9b055d5679cf72.tar.gz
Tulip issue #220: Merge JoinableQueue with Queue.
To more closely match the standard Queue, asyncio.Queue has "join" and "task_done". JoinableQueue is deleted.
-rw-r--r--asyncio/queues.py95
-rw-r--r--tests/test_queues.py10
2 files changed, 42 insertions, 63 deletions
diff --git a/asyncio/queues.py b/asyncio/queues.py
index dce0d53..37b0c41 100644
--- a/asyncio/queues.py
+++ b/asyncio/queues.py
@@ -1,7 +1,6 @@
"""Queues"""
-__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
- 'QueueFull', 'QueueEmpty']
+__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
import collections
import heapq
@@ -45,6 +44,9 @@ class Queue:
self._getters = collections.deque()
# Pairs of (item, Future).
self._putters = collections.deque()
+ self._unfinished_tasks = 0
+ self._finished = locks.Event(loop=self._loop)
+ self._finished.set()
self._init(maxsize)
def _init(self, maxsize):
@@ -55,6 +57,8 @@ class Queue:
def _put(self, item):
self._queue.append(item)
+ self._unfinished_tasks += 1
+ self._finished.clear()
def __repr__(self):
return '<{} at {:#x} {}>'.format(
@@ -71,6 +75,8 @@ class Queue:
result += ' _getters[{}]'.format(len(self._getters))
if self._putters:
result += ' _putters[{}]'.format(len(self._putters))
+ if self._unfinished_tasks:
+ result += ' tasks={}'.format(self._unfinished_tasks)
return result
def _consume_done_getters(self):
@@ -122,9 +128,6 @@ class Queue:
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
-
- # Use _put and _get instead of passing item straight to getter, in
- # case a subclass has logic that must run (e.g. JoinableQueue).
self._put(item)
# getter cannot be cancelled, we just removed done getters
@@ -150,9 +153,6 @@ class Queue:
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
-
- # Use _put and _get instead of passing item straight to getter, in
- # case a subclass has logic that must run (e.g. JoinableQueue).
self._put(item)
# getter cannot be cancelled, we just removed done getters
@@ -215,56 +215,6 @@ class Queue:
else:
raise QueueEmpty
-
-class PriorityQueue(Queue):
- """A subclass of Queue; retrieves entries in priority order (lowest first).
-
- Entries are typically tuples of the form: (priority number, data).
- """
-
- def _init(self, maxsize):
- self._queue = []
-
- def _put(self, item, heappush=heapq.heappush):
- heappush(self._queue, item)
-
- def _get(self, heappop=heapq.heappop):
- return heappop(self._queue)
-
-
-class LifoQueue(Queue):
- """A subclass of Queue that retrieves most recently added entries first."""
-
- def _init(self, maxsize):
- self._queue = []
-
- def _put(self, item):
- self._queue.append(item)
-
- def _get(self):
- return self._queue.pop()
-
-
-class JoinableQueue(Queue):
- """A subclass of Queue with task_done() and join() methods."""
-
- def __init__(self, maxsize=0, *, loop=None):
- super().__init__(maxsize=maxsize, loop=loop)
- self._unfinished_tasks = 0
- self._finished = locks.Event(loop=self._loop)
- self._finished.set()
-
- def _format(self):
- result = Queue._format(self)
- if self._unfinished_tasks:
- result += ' tasks={}'.format(self._unfinished_tasks)
- return result
-
- def _put(self, item):
- super()._put(item)
- self._unfinished_tasks += 1
- self._finished.clear()
-
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
@@ -296,3 +246,32 @@ class JoinableQueue(Queue):
"""
if self._unfinished_tasks > 0:
yield from self._finished.wait()
+
+
+class PriorityQueue(Queue):
+ """A subclass of Queue; retrieves entries in priority order (lowest first).
+
+ Entries are typically tuples of the form: (priority number, data).
+ """
+
+ def _init(self, maxsize):
+ self._queue = []
+
+ def _put(self, item, heappush=heapq.heappush):
+ heappush(self._queue, item)
+
+ def _get(self, heappop=heapq.heappop):
+ return heappop(self._queue)
+
+
+class LifoQueue(Queue):
+ """A subclass of Queue that retrieves most recently added entries first."""
+
+ def _init(self, maxsize):
+ self._queue = []
+
+ def _put(self, item):
+ self._queue.append(item)
+
+ def _get(self):
+ return self._queue.pop()
diff --git a/tests/test_queues.py b/tests/test_queues.py
index 3d4ac51..a73539d 100644
--- a/tests/test_queues.py
+++ b/tests/test_queues.py
@@ -408,14 +408,14 @@ class PriorityQueueTests(_QueueTestBase):
self.assertEqual([1, 2, 3], items)
-class JoinableQueueTests(_QueueTestBase):
+class QueueJoinTests(_QueueTestBase):
def test_task_done_underflow(self):
- q = asyncio.JoinableQueue(loop=self.loop)
+ q = asyncio.Queue(loop=self.loop)
self.assertRaises(ValueError, q.task_done)
def test_task_done(self):
- q = asyncio.JoinableQueue(loop=self.loop)
+ q = asyncio.Queue(loop=self.loop)
for i in range(100):
q.put_nowait(i)
@@ -452,7 +452,7 @@ class JoinableQueueTests(_QueueTestBase):
self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop))
def test_join_empty_queue(self):
- q = asyncio.JoinableQueue(loop=self.loop)
+ q = asyncio.Queue(loop=self.loop)
# Test that a queue join()s successfully, and before anything else
# (done twice for insurance).
@@ -465,7 +465,7 @@ class JoinableQueueTests(_QueueTestBase):
self.loop.run_until_complete(join())
def test_format(self):
- q = asyncio.JoinableQueue(loop=self.loop)
+ q = asyncio.Queue(loop=self.loop)
self.assertEqual(q._format(), 'maxsize=0')
q._unfinished_tasks = 2