From a52b582c733c7cfb8b38f82d84c3561daef34ffc Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Sat, 7 Jan 2012 15:32:52 -0800 Subject: Simplify the code using with-statements. --- Lib/queue.py | 42 ++++++++++++++---------------------------- 1 file changed, 14 insertions(+), 28 deletions(-) (limited to 'Lib/queue.py') diff --git a/Lib/queue.py b/Lib/queue.py index bee7ed4086..de29ca5564 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -26,17 +26,21 @@ class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) + # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. self.mutex = _threading.Lock() + # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = _threading.Condition(self.mutex) + # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = _threading.Condition(self.mutex) + # Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume self.all_tasks_done = _threading.Condition(self.mutex) @@ -56,16 +60,13 @@ class Queue: Raises a ValueError if called more times than there were items placed in the queue. """ - self.all_tasks_done.acquire() - try: + with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished - finally: - self.all_tasks_done.release() def join(self): """Blocks until all items in the Queue have been gotten and processed. @@ -76,19 +77,14 @@ class Queue: When the count of unfinished tasks drops to zero, join() unblocks. """ - self.all_tasks_done.acquire() - try: + with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() - finally: - self.all_tasks_done.release() def qsize(self): """Return the approximate size of the queue (not reliable!).""" - self.mutex.acquire() - n = self._qsize() - self.mutex.release() - return n + with self.mutex: + return self._qsize() def empty(self): """Return True if the queue is empty, False otherwise (not reliable!). @@ -102,10 +98,8 @@ class Queue: completed, the preferred technique is to use the join() method. """ - self.mutex.acquire() - n = not self._qsize() - self.mutex.release() - return n + with self.mutex: + return not self._qsize() def full(self): """Return True if the queue is full, False otherwise (not reliable!). @@ -116,10 +110,8 @@ class Queue: qsize() can be used. """ - self.mutex.acquire() - n = 0 < self.maxsize <= self._qsize() - self.mutex.release() - return n + with self.mutex: + return 0 < self.maxsize <= self._qsize() def put(self, item, block=True, timeout=None): """Put an item into the queue. @@ -132,8 +124,7 @@ class Queue: is immediately available, else raise the Full exception ('timeout' is ignored in that case). """ - self.not_full.acquire() - try: + with self.not_full: if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: @@ -153,8 +144,6 @@ class Queue: self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() - finally: - self.not_full.release() def put_nowait(self, item): """Put an item into the queue without blocking. @@ -175,8 +164,7 @@ class Queue: available, else raise the Empty exception ('timeout' is ignored in that case). """ - self.not_empty.acquire() - try: + with self.not_empty: if not block: if not self._qsize(): raise Empty @@ -195,8 +183,6 @@ class Queue: item = self._get() self.not_full.notify() return item - finally: - self.not_empty.release() def get_nowait(self): """Remove and return an item from the queue without blocking. -- cgit v1.2.1 From b6ace0286e54ca73fd7e762b220e75bbd2c64970 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Mon, 9 Jan 2012 05:32:01 +0000 Subject: Minor code clean-ups and beautifications. --- Lib/queue.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) (limited to 'Lib/queue.py') diff --git a/Lib/queue.py b/Lib/queue.py index de29ca5564..c65ba4b46d 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -1,12 +1,12 @@ """A multi-producer, multi-consumer queue.""" -from time import time as _time try: - import threading as _threading + import threading except ImportError: - import dummy_threading as _threading + import dummythreading as threading from collections import deque -import heapq +from heapq import heappush, heappop +from time import time __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] @@ -31,19 +31,19 @@ class Queue: # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. - self.mutex = _threading.Lock() + self.mutex = threading.Lock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. - self.not_empty = _threading.Condition(self.mutex) + self.not_empty = threading.Condition(self.mutex) # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. - self.not_full = _threading.Condition(self.mutex) + self.not_full = threading.Condition(self.mutex) # Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume - self.all_tasks_done = _threading.Condition(self.mutex) + self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def task_done(self): @@ -135,9 +135,9 @@ class Queue: elif timeout < 0: raise ValueError("'timeout' must be a positive number") else: - endtime = _time() + timeout + endtime = time() + timeout while self._qsize() >= self.maxsize: - remaining = endtime - _time() + remaining = endtime - time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) @@ -174,9 +174,9 @@ class Queue: elif timeout < 0: raise ValueError("'timeout' must be a positive number") else: - endtime = _time() + timeout + endtime = time() + timeout while not self._qsize(): - remaining = endtime - _time() + remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) @@ -200,7 +200,7 @@ class Queue: def _init(self, maxsize): self.queue = deque() - def _qsize(self, len=len): + def _qsize(self): return len(self.queue) # Put a new item in the queue @@ -221,13 +221,13 @@ class PriorityQueue(Queue): def _init(self, maxsize): self.queue = [] - def _qsize(self, len=len): + def _qsize(self): return len(self.queue) - def _put(self, item, heappush=heapq.heappush): + def _put(self, item): heappush(self.queue, item) - def _get(self, heappop=heapq.heappop): + def _get(self): return heappop(self.queue) @@ -237,7 +237,7 @@ class LifoQueue(Queue): def _init(self, maxsize): self.queue = [] - def _qsize(self, len=len): + def _qsize(self): return len(self.queue) def _put(self, item): -- cgit v1.2.1 From c1bc654c2bafcc7dff14d7bddc63928f391c8c05 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Mon, 9 Jan 2012 06:02:08 +0000 Subject: Improve clarity with keyword argument for block. Move nowait methods together. --- Lib/queue.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'Lib/queue.py') diff --git a/Lib/queue.py b/Lib/queue.py index c65ba4b46d..c31310e2ff 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -145,14 +145,6 @@ class Queue: self.unfinished_tasks += 1 self.not_empty.notify() - def put_nowait(self, item): - """Put an item into the queue without blocking. - - Only enqueue the item if a free slot is immediately available. - Otherwise raise the Full exception. - """ - return self.put(item, False) - def get(self, block=True, timeout=None): """Remove and return an item from the queue. @@ -184,13 +176,21 @@ class Queue: self.not_full.notify() return item + def put_nowait(self, item): + """Put an item into the queue without blocking. + + Only enqueue the item if a free slot is immediately available. + Otherwise raise the Full exception. + """ + return self.put(item, block=False) + def get_nowait(self): """Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise raise the Empty exception. """ - return self.get(False) + return self.get(block=False) # Override these methods to implement other queue organizations # (e.g. stack or priority queue). -- cgit v1.2.1 From 955d01f5f02b3b7ad02fbb163587a7fddf63d99d Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Mon, 9 Jan 2012 06:17:39 +0000 Subject: Make the docstring style consistent. --- Lib/queue.py | 47 +++++++++++++++++++++++------------------------ 1 file changed, 23 insertions(+), 24 deletions(-) (limited to 'Lib/queue.py') diff --git a/Lib/queue.py b/Lib/queue.py index c31310e2ff..31ec46b697 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -1,4 +1,4 @@ -"""A multi-producer, multi-consumer queue.""" +'''A multi-producer, multi-consumer queue.''' try: import threading @@ -11,18 +11,19 @@ from time import time __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): - "Exception raised by Queue.get(block=0)/get_nowait()." + 'Exception raised by Queue.get(block=0)/get_nowait().' pass class Full(Exception): - "Exception raised by Queue.put(block=0)/put_nowait()." + 'Exception raised by Queue.put(block=0)/put_nowait().' pass class Queue: - """Create a queue object with a given maximum size. + '''Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. - """ + ''' + def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) @@ -47,7 +48,7 @@ class Queue: self.unfinished_tasks = 0 def task_done(self): - """Indicate that a formerly enqueued task is complete. + '''Indicate that a formerly enqueued task is complete. Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing @@ -59,7 +60,7 @@ class Queue: Raises a ValueError if called more times than there were items placed in the queue. - """ + ''' with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: @@ -69,25 +70,25 @@ class Queue: self.unfinished_tasks = unfinished def join(self): - """Blocks until all items in the Queue have been gotten and processed. + '''Blocks until all items in the Queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. - """ + ''' with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() def qsize(self): - """Return the approximate size of the queue (not reliable!).""" + '''Return the approximate size of the queue (not reliable!).''' with self.mutex: return self._qsize() def empty(self): - """Return True if the queue is empty, False otherwise (not reliable!). + '''Return True if the queue is empty, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() == 0 as a direct substitute, but be aware that either approach risks a race @@ -96,25 +97,23 @@ class Queue: To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join() method. - - """ + ''' with self.mutex: return not self._qsize() def full(self): - """Return True if the queue is full, False otherwise (not reliable!). + '''Return True if the queue is full, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() >= n as a direct substitute, but be aware that either approach risks a race condition where a queue can shrink before the result of full() or qsize() can be used. - - """ + ''' with self.mutex: return 0 < self.maxsize <= self._qsize() def put(self, item, block=True, timeout=None): - """Put an item into the queue. + '''Put an item into the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until a free slot is available. If 'timeout' is @@ -123,7 +122,7 @@ class Queue: Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). - """ + ''' with self.not_full: if self.maxsize > 0: if not block: @@ -146,7 +145,7 @@ class Queue: self.not_empty.notify() def get(self, block=True, timeout=None): - """Remove and return an item from the queue. + '''Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is @@ -155,7 +154,7 @@ class Queue: Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). - """ + ''' with self.not_empty: if not block: if not self._qsize(): @@ -177,19 +176,19 @@ class Queue: return item def put_nowait(self, item): - """Put an item into the queue without blocking. + '''Put an item into the queue without blocking. Only enqueue the item if a free slot is immediately available. Otherwise raise the Full exception. - """ + ''' return self.put(item, block=False) def get_nowait(self): - """Remove and return an item from the queue without blocking. + '''Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise raise the Empty exception. - """ + ''' return self.get(block=False) # Override these methods to implement other queue organizations -- cgit v1.2.1 From 452ea442279926bf9e8063114878a13204be7398 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Mon, 9 Jan 2012 20:02:24 +0000 Subject: Fix nasty typo --- Lib/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/queue.py') diff --git a/Lib/queue.py b/Lib/queue.py index 31ec46b697..ba608c5a4c 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -3,7 +3,7 @@ try: import threading except ImportError: - import dummythreading as threading + import dummy_threading as threading from collections import deque from heapq import heappush, heappop from time import time -- cgit v1.2.1 From c69d66ff955d42936a8518d9b06f7749945e13c9 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 15 Mar 2012 01:22:16 +0100 Subject: Issue #14222: Use the new time.steady() function instead of time.time() for timeout in queue and threading modules to not be affected of system time update. --- Lib/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/queue.py') diff --git a/Lib/queue.py b/Lib/queue.py index ba608c5a4c..1dc72c4bbe 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -6,7 +6,7 @@ except ImportError: import dummy_threading as threading from collections import deque from heapq import heappush, heappop -from time import time +from time import steady as time __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] -- cgit v1.2.1