diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2015-01-23 23:30:31 -0500 |
---|---|---|
committer | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2015-01-23 23:30:31 -0500 |
commit | 63efd76c158790c22f102b684e9b055d5679cf72 (patch) | |
tree | ad0b76e8a92f2e71f9cd3bc3c8c4efabc8663e42 | |
parent | 94e89af1ad36502af5e78932eaa03dbc26d8d829 (diff) | |
download | trollius-63efd76c158790c22f102b684e9b055d5679cf72.tar.gz |
Tulip issue #220: Merge JoinableQueue with Queue.
To more closely match the standard Queue, asyncio.Queue has "join" and
"task_done". JoinableQueue is deleted.
-rw-r--r-- | asyncio/queues.py | 95 | ||||
-rw-r--r-- | tests/test_queues.py | 10 |
2 files changed, 42 insertions, 63 deletions
diff --git a/asyncio/queues.py b/asyncio/queues.py index dce0d53..37b0c41 100644 --- a/asyncio/queues.py +++ b/asyncio/queues.py @@ -1,7 +1,6 @@ """Queues""" -__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', - 'QueueFull', 'QueueEmpty'] +__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty'] import collections import heapq @@ -45,6 +44,9 @@ class Queue: self._getters = collections.deque() # Pairs of (item, Future). self._putters = collections.deque() + self._unfinished_tasks = 0 + self._finished = locks.Event(loop=self._loop) + self._finished.set() self._init(maxsize) def _init(self, maxsize): @@ -55,6 +57,8 @@ class Queue: def _put(self, item): self._queue.append(item) + self._unfinished_tasks += 1 + self._finished.clear() def __repr__(self): return '<{} at {:#x} {}>'.format( @@ -71,6 +75,8 @@ class Queue: result += ' _getters[{}]'.format(len(self._getters)) if self._putters: result += ' _putters[{}]'.format(len(self._putters)) + if self._unfinished_tasks: + result += ' tasks={}'.format(self._unfinished_tasks) return result def _consume_done_getters(self): @@ -122,9 +128,6 @@ class Queue: 'queue non-empty, why are getters waiting?') getter = self._getters.popleft() - - # Use _put and _get instead of passing item straight to getter, in - # case a subclass has logic that must run (e.g. JoinableQueue). self._put(item) # getter cannot be cancelled, we just removed done getters @@ -150,9 +153,6 @@ class Queue: 'queue non-empty, why are getters waiting?') getter = self._getters.popleft() - - # Use _put and _get instead of passing item straight to getter, in - # case a subclass has logic that must run (e.g. JoinableQueue). self._put(item) # getter cannot be cancelled, we just removed done getters @@ -215,56 +215,6 @@ class Queue: else: raise QueueEmpty - -class PriorityQueue(Queue): - """A subclass of Queue; retrieves entries in priority order (lowest first). - - Entries are typically tuples of the form: (priority number, data). - """ - - def _init(self, maxsize): - self._queue = [] - - def _put(self, item, heappush=heapq.heappush): - heappush(self._queue, item) - - def _get(self, heappop=heapq.heappop): - return heappop(self._queue) - - -class LifoQueue(Queue): - """A subclass of Queue that retrieves most recently added entries first.""" - - def _init(self, maxsize): - self._queue = [] - - def _put(self, item): - self._queue.append(item) - - def _get(self): - return self._queue.pop() - - -class JoinableQueue(Queue): - """A subclass of Queue with task_done() and join() methods.""" - - def __init__(self, maxsize=0, *, loop=None): - super().__init__(maxsize=maxsize, loop=loop) - self._unfinished_tasks = 0 - self._finished = locks.Event(loop=self._loop) - self._finished.set() - - def _format(self): - result = Queue._format(self) - if self._unfinished_tasks: - result += ' tasks={}'.format(self._unfinished_tasks) - return result - - def _put(self, item): - super()._put(item) - self._unfinished_tasks += 1 - self._finished.clear() - def task_done(self): """Indicate that a formerly enqueued task is complete. @@ -296,3 +246,32 @@ class JoinableQueue(Queue): """ if self._unfinished_tasks > 0: yield from self._finished.wait() + + +class PriorityQueue(Queue): + """A subclass of Queue; retrieves entries in priority order (lowest first). + + Entries are typically tuples of the form: (priority number, data). + """ + + def _init(self, maxsize): + self._queue = [] + + def _put(self, item, heappush=heapq.heappush): + heappush(self._queue, item) + + def _get(self, heappop=heapq.heappop): + return heappop(self._queue) + + +class LifoQueue(Queue): + """A subclass of Queue that retrieves most recently added entries first.""" + + def _init(self, maxsize): + self._queue = [] + + def _put(self, item): + self._queue.append(item) + + def _get(self): + return self._queue.pop() diff --git a/tests/test_queues.py b/tests/test_queues.py index 3d4ac51..a73539d 100644 --- a/tests/test_queues.py +++ b/tests/test_queues.py @@ -408,14 +408,14 @@ class PriorityQueueTests(_QueueTestBase): self.assertEqual([1, 2, 3], items) -class JoinableQueueTests(_QueueTestBase): +class QueueJoinTests(_QueueTestBase): def test_task_done_underflow(self): - q = asyncio.JoinableQueue(loop=self.loop) + q = asyncio.Queue(loop=self.loop) self.assertRaises(ValueError, q.task_done) def test_task_done(self): - q = asyncio.JoinableQueue(loop=self.loop) + q = asyncio.Queue(loop=self.loop) for i in range(100): q.put_nowait(i) @@ -452,7 +452,7 @@ class JoinableQueueTests(_QueueTestBase): self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop)) def test_join_empty_queue(self): - q = asyncio.JoinableQueue(loop=self.loop) + q = asyncio.Queue(loop=self.loop) # Test that a queue join()s successfully, and before anything else # (done twice for insurance). @@ -465,7 +465,7 @@ class JoinableQueueTests(_QueueTestBase): self.loop.run_until_complete(join()) def test_format(self): - q = asyncio.JoinableQueue(loop=self.loop) + q = asyncio.Queue(loop=self.loop) self.assertEqual(q._format(), 'maxsize=0') q._unfinished_tasks = 2 |