summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/logging/flush.py
blob: b812fc2b606279fbaa4fb24779cc5143a52f8ac4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""Manage a thread responsible for periodically calling flush() on logging.Handler instances.

These instances are used to send logs to buildlogger.
"""

import logging
import sched
import threading
import time

_FLUSH_THREAD_LOCK = threading.Lock()
_FLUSH_THREAD = None


def start_thread():
    """Start the flush thread."""

    global _FLUSH_THREAD  # pylint: disable=global-statement
    with _FLUSH_THREAD_LOCK:
        if _FLUSH_THREAD is not None:
            raise ValueError("FlushThread has already been started")

        _FLUSH_THREAD = _FlushThread()
        _FLUSH_THREAD.start()


def stop_thread():
    """Signal the flush thread to stop and wait until it does."""

    with _FLUSH_THREAD_LOCK:
        if _FLUSH_THREAD is None:
            raise ValueError("FlushThread hasn't been started")

    _FLUSH_THREAD.signal_shutdown()
    _FLUSH_THREAD.await_shutdown()
    _FLUSH_THREAD.join()


def flush_after(handler, delay):
    """Add 'handler' to the queue so that it is flushed after 'delay' seconds by the flush thread.

    Return the scheduled event which may be used for later cancellation (see cancel()).
    """

    if not isinstance(handler, logging.Handler):
        raise TypeError("handler must be a logging.Handler instance")

    return _FLUSH_THREAD.submit(handler.flush, delay)


def close_later(handler):
    """Add 'handler' to the queue so that it is closed later by the flush thread.

    Return the scheduled event which may be used for later cancelation (see cancel()).
    """

    if not isinstance(handler, logging.Handler):
        raise TypeError("handler must be a logging.Handler instance")

    # Schedule the event to run immediately. It is possible for the scheduler to not immediately run
    # handler.close() if it has fallen behind as a result of other events taking longer to run than
    # the time available before the next event.
    no_delay = 0.0
    return _FLUSH_THREAD.submit(handler.close, no_delay)


def cancel(event):
    """Attempt to cancel the specified event.

    Returns true if the event was successfully canceled, and returns false otherwise.
    """
    return _FLUSH_THREAD.cancel_event(event)


class _FlushThread(threading.Thread):
    """Asynchronously flush and close logging handlers."""

    _TIMEOUT = 24 * 60 * 60  # =1 day (a long time to have tests run)

    def __init__(self):
        """Initialize the flush thread."""

        threading.Thread.__init__(self, name="FlushThread")
        # Do not wait to flush the logs if interrupted by the user.
        self.daemon = True

        def interruptible_sleep(secs):
            """Wait up to 'secs' seconds or for the 'self.__schedule_updated' event to be set."""

            # Setting 'self.__schedule_updated' in submit() will cause the scheduler to return early
            # from its 'delayfunc'. This makes it so that if a new event is scheduled with
            # delay=0.0, then it will be performed immediately.
            self.__schedule_updated.wait(secs)
            self.__schedule_updated.clear()

        self.__scheduler = sched.scheduler(time.time, interruptible_sleep)
        self.__schedule_updated = threading.Event()
        self.__should_stop = threading.Event()
        self.__terminated = threading.Event()

    def run(self):
        """Continuously flush and close logging handlers."""

        try:
            while not (self.__should_stop.is_set() and self.__scheduler.empty()):
                self.__scheduler.run()

                # Reset 'self.__schedule_updated' here since we've processed all the events
                # thought to exist. Either the queue won't be empty or 'self.__schedule_updated'
                # will get set again later.
                self.__schedule_updated.clear()

                if self.__should_stop.is_set():
                    # If the main thread has asked the flush thread to stop, then either run
                    # whatever has been added to the queue since the scheduler last ran, or exit.
                    continue

                # Otherwise, wait for a new event to be scheduled.
                if self.__scheduler.empty():
                    self.__schedule_updated.wait()
        finally:
            self.__terminated.set()

    def signal_shutdown(self):
        """Indicate to the flush thread that it should exit.

        This will happen once its current queue of logging handlers are flushed and closed.
        """

        self.__should_stop.set()

        # Signal the flush thread to wake up as though there is more work for it to do since we're
        # trying to get it to exit.
        self.__schedule_updated.set()

    def await_shutdown(self):
        """Wait for the flush thread to finish processing its current queue of logging handlers."""

        while not self.__terminated.is_set():
            # Need to pass a timeout to wait() so that KeyboardInterrupt exceptions are propagated.
            self.__terminated.wait(_FlushThread._TIMEOUT)

    def submit(self, action, delay):
        """Schedule 'action' for 'delay' seconds from now.

        Return the scheduled event which may be used for later cancelation (see cancel_event()).
        """

        event = self.__scheduler.enter(delay, 0, action, ())
        self.__schedule_updated.set()
        return event

    def cancel_event(self, event):
        """Attempt to cancel the specified event.

        Return true if the event was successfully canceled, and returns false otherwise.
        """

        try:
            self.__scheduler.cancel(event)
            return True
        except ValueError:
            # We may have failed to cancel the event due to it already being in progress.
            pass
        return False