summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/logging/flush.py
blob: 5b2b488e51a567d235f5007d38d31dcc2f1b45d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
Manages a thread responsible for periodically calling flush() on
logging.Handler instances used to send logs to buildlogger.
"""

from __future__ import absolute_import

import logging
import threading
import time

from ..utils import scheduler

_FLUSH_THREAD_LOCK = threading.Lock()
_FLUSH_THREAD = None


def start_thread():
    """
    Starts the flush thread.
    """

    global _FLUSH_THREAD
    with _FLUSH_THREAD_LOCK:
        if _FLUSH_THREAD is not None:
            raise ValueError("FlushThread has already been started")

        _FLUSH_THREAD = _FlushThread()
        _FLUSH_THREAD.start()


def stop_thread():
    """
    Signals the flush thread to stop and waits until it does.
    """

    with _FLUSH_THREAD_LOCK:
        if _FLUSH_THREAD is None:
            raise ValueError("FlushThread hasn't been started")

    _FLUSH_THREAD.signal_shutdown()
    _FLUSH_THREAD.await_shutdown()
    _FLUSH_THREAD.join()


def flush_after(handler, delay):
    """
    Adds 'handler' to the queue so that it is flushed after 'delay'
    seconds by the flush thread.

    Returns the scheduled event which may be used for later cancellation
    (see cancel()).
    """

    if not isinstance(handler, logging.Handler):
        raise TypeError("handler must be a logging.Handler instance")

    return _FLUSH_THREAD.submit(handler.flush, delay)


def close_later(handler):
    """
    Adds 'handler' to the queue so that it is closed later by the flush
    thread.

    Returns the scheduled event which may be used for later cancelation
    (see cancel()).
    """

    if not isinstance(handler, logging.Handler):
        raise TypeError("handler must be a logging.Handler instance")

    # Schedule the event to run immediately. It is possible for the scheduler to not immediately run
    # handler.close() if it has fallen behind as a result of other events taking longer to run than
    # the time available before the next event.
    no_delay = 0.0
    return _FLUSH_THREAD.submit(handler.close, no_delay)


def cancel(event):
    """
    Attempts to cancel the specified event.

    Returns true if the event was successfully canceled, and returns
    false otherwise.
    """
    return _FLUSH_THREAD.cancel_event(event)


class _FlushThread(threading.Thread):
    """
    Asynchronously flushes and closes logging handlers.
    """

    _TIMEOUT = 24 * 60 * 60  # =1 day (a long time to have tests run)

    def __init__(self):
        """
        Initializes the flush thread.
        """

        threading.Thread.__init__(self, name="FlushThread")
        # Do not wait to flush the logs if interrupted by the user.
        self.daemon = True

        def interruptible_sleep(secs):
            """
            Waits up to 'secs' seconds or for the
            'self.__schedule_updated' event to be set.
            """

            # Setting 'self.__schedule_updated' in submit() will cause the scheduler to return early
            # from its 'delayfunc'. This makes it so that if a new event is scheduled with
            # delay=0.0, then it will be performed immediately.
            self.__schedule_updated.wait(secs)
            self.__schedule_updated.clear()

        self.__scheduler = scheduler.Scheduler(time.time, interruptible_sleep)
        self.__schedule_updated = threading.Event()
        self.__should_stop = threading.Event()
        self.__terminated = threading.Event()

    def run(self):
        """
        Continuously flushes and closes logging handlers.
        """

        try:
            while not (self.__should_stop.is_set() and self.__scheduler.empty()):
                self.__scheduler.run()

                # Reset 'self.__schedule_updated' here since we've processed all the events
                # thought to exist. Either the queue won't be empty or 'self.__schedule_updated'
                # will get set again later.
                self.__schedule_updated.clear()

                if self.__should_stop.is_set():
                    # If the main thread has asked the flush thread to stop, then either run
                    # whatever has been added to the queue since the scheduler last ran, or exit.
                    continue

                # Otherwise, wait for a new event to be scheduled.
                if self.__scheduler.empty():
                    self.__schedule_updated.wait()
        finally:
            self.__terminated.set()

    def signal_shutdown(self):
        """
        Indicates to the flush thread that it should exit once its
        current queue of logging handlers are flushed and closed.
        """

        self.__should_stop.set()

        # Signal the flush thread to wake up as though there is more work for it to do since we're
        # trying to get it to exit.
        self.__schedule_updated.set()

    def await_shutdown(self):
        """
        Waits for the flush thread to finish processing its current
        queue of logging handlers.
        """

        while not self.__terminated.is_set():
            # Need to pass a timeout to wait() so that KeyboardInterrupt exceptions are propagated.
            self.__terminated.wait(_FlushThread._TIMEOUT)

    def submit(self, action, delay):
        """
        Schedules 'action' for 'delay' seconds from now.

        Returns the scheduled event which may be used for later
        cancelation (see cancel_event()).
        """

        event = self.__scheduler.enter(delay, 0, action, ())
        self.__schedule_updated.set()
        return event

    def cancel_event(self, event):
        """
        Attempts to cancel the specified event.

        Returns true if the event was successfully canceled, and returns
        false otherwise.
        """

        try:
            self.__scheduler.cancel(event)
            return True
        except ValueError:
            # We may have failed to cancel the event due to it already being in progress.
            pass
        return False