summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/utils/queue.py
blob: c77692138b1f8805e04d2f716fb487a295757875 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"""Extension to the Queue.Queue class.

Added support for the join() method to take a timeout. This is necessary
in order for KeyboardInterrupt exceptions to get propagated.

See https://bugs.python.org/issue1167930 for more details.
"""

from __future__ import absolute_import

import Queue as _Queue
import time

# Exception that is raised when get_nowait() is called on an empty Queue.
Empty = _Queue.Empty


class Queue(_Queue.Queue):
    """A multi-producer, multi-consumer queue."""

    def join(self, timeout=None):  # pylint: disable=arguments-differ
        """Wait until all items in the queue have been processed or 'timeout' seconds have passed.

        The count of unfinished tasks is incremented whenever an item is added
        to the queue. The count is decremented whenever task_done() is called
        to indicate that all work on the retrieved item was completed.

        When the number of unfinished tasks reaches zero, True is returned.
        If the number of unfinished tasks remains nonzero after 'timeout'
        seconds have passed, then False is returned.
        """
        with self.all_tasks_done:
            if timeout is None:
                while self.unfinished_tasks:
                    self.all_tasks_done.wait()
            elif timeout < 0:
                raise ValueError("timeout must be a nonnegative number")
            else:
                # Pass timeout down to lock acquisition
                deadline = time.time() + timeout
                while self.unfinished_tasks:
                    remaining = deadline - time.time()
                    if remaining <= 0.0:
                        return False
                    self.all_tasks_done.wait(remaining)
        return True