blob: da059ffd852cdf2c33c7ce10c3e37e478f906d67 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
"""
Extension to the Queue.Queue class.
Added support for the join() method to take a timeout. This is necessary
in order for KeyboardInterrupt exceptions to get propagated.
See https://bugs.python.org/issue1167930 for more details.
"""
from __future__ import absolute_import
import Queue
import time
# Exception that is raised when get_nowait() is called on an empty Queue.
Empty = Queue.Empty
class Queue(Queue.Queue):
"""
A multi-producer, multi-consumer queue.
"""
def join(self, timeout=None):
"""
Wait until all items in the queue have been retrieved and processed,
or until 'timeout' seconds have passed.
The count of unfinished tasks is incremented whenever an item is added
to the queue. The count is decremented whenever task_done() is called
to indicate that all work on the retrieved item was completed.
When the number of unfinished tasks reaches zero, True is returned.
If the number of unfinished tasks remains nonzero after 'timeout'
seconds have passed, then False is returned.
"""
with self.all_tasks_done:
if timeout is None:
while self.unfinished_tasks:
self.all_tasks_done.wait()
elif timeout < 0:
raise ValueError("timeout must be a nonnegative number")
else:
# Pass timeout down to lock acquisition
deadline = time.time() + timeout
while self.unfinished_tasks:
remaining = deadline - time.time()
if remaining <= 0.0:
return False
self.all_tasks_done.wait(remaining)
return True
|