diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2016-12-06 00:13:05 -0500 |
---|---|---|
committer | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2016-12-06 00:13:05 -0500 |
commit | 3eba0c15cacad6e64bcb8bf89f68ef65ec0014b5 (patch) | |
tree | 577094c3fabcc0a5b021d6faa28cd35c8e2368ec | |
parent | e8129eb62fef9fc1bbfbbc55ab3c767086c21a3b (diff) | |
download | mongo-3eba0c15cacad6e64bcb8bf89f68ef65ec0014b5.tar.gz |
SERVER-26445 Make emit() not wait for flush() to complete.
Changes the BufferedHandler to always delegate the actual flushing of
the logs (perhaps via a long-running, blocking network operation) to the
timer thread that's running in the background.
(cherry picked from commit 93989f1023869e8640dcabbe59de00275e61dfa4)
-rw-r--r-- | buildscripts/resmokelib/logging/buildlogger.py | 6 | ||||
-rw-r--r-- | buildscripts/resmokelib/logging/handlers.py | 63 | ||||
-rw-r--r-- | buildscripts/resmokelib/utils/timer.py | 66 |
3 files changed, 81 insertions, 54 deletions
diff --git a/buildscripts/resmokelib/logging/buildlogger.py b/buildscripts/resmokelib/logging/buildlogger.py index c5f5d40401b..f058372df8c 100644 --- a/buildscripts/resmokelib/logging/buildlogger.py +++ b/buildscripts/resmokelib/logging/buildlogger.py @@ -184,7 +184,7 @@ class _BaseBuildloggerHandler(handlers.BufferedHandler): raise NotImplementedError("_append_logs must be implemented by _BaseBuildloggerHandler" " subclasses") - def flush_with_lock(self, close_called): + def _flush_buffer_with_lock(self, buf, close_called): """ Ensures all logging output has been flushed to the buildlogger server. @@ -194,7 +194,7 @@ class _BaseBuildloggerHandler(handlers.BufferedHandler): called. """ - self.retry_buffer.extend(self.buffer) + self.retry_buffer.extend(buf) if self._append_logs(self.retry_buffer): self.retry_buffer = [] @@ -209,8 +209,6 @@ class _BaseBuildloggerHandler(handlers.BufferedHandler): loggers._BUILDLOGGER_FALLBACK.info(message) self.retry_buffer = [] - self.buffer = [] - class BuildloggerTestHandler(_BaseBuildloggerHandler): """ diff --git a/buildscripts/resmokelib/logging/handlers.py b/buildscripts/resmokelib/logging/handlers.py index 6ede7d38b3c..3d71399bfa5 100644 --- a/buildscripts/resmokelib/logging/handlers.py +++ b/buildscripts/resmokelib/logging/handlers.py @@ -43,10 +43,12 @@ class BufferedHandler(logging.Handler): self.capacity = capacity self.interval_secs = interval_secs - self.buffer = [] - self._lock = threading.Lock() - self._timer = None # Defer creation until actually begin to log messages. + self.__emit_lock = threading.Lock() # Prohibits concurrent access to 'self.__emit_buffer'. + self.__emit_buffer = [] + + self.__flush_lock = threading.Lock() # Serializes callers of self.flush(). + self.__timer = None # Defer creation until we actually begin to log messages. def _new_timer(self): """ @@ -54,7 +56,7 @@ class BufferedHandler(logging.Handler): flush() method after 'interval_secs' seconds. """ - return timer.AlarmClock(self.interval_secs, self.flush, args=[self]) + return timer.AlarmClock(self.interval_secs, self.flush) def process_record(self, record): """ @@ -79,37 +81,45 @@ class BufferedHandler(logging.Handler): will expire after another 'interval_secs' seconds. """ - with self._lock: - self.buffer.append(self.process_record(record)) - if len(self.buffer) >= self.capacity: - if self._timer is not None: - self._timer.snooze() - self.flush_with_lock(False) - if self._timer is not None: - self._timer.reset() + if self.__timer is None: + self.__timer = self._new_timer() + self.__timer.start() - if self._timer is None: - self._timer = self._new_timer() - self._timer.start() + with self.__emit_lock: + self.__emit_buffer.append(self.process_record(record)) + if len(self.__emit_buffer) >= self.capacity: + # Trigger the timer thread to cause it to flush the buffer early. + self.__timer.trigger() - def flush(self, close_called=False): + def flush(self): """ Ensures all logging output has been flushed. """ - with self._lock: - if self.buffer: - self.flush_with_lock(close_called) + self.__flush(close_called=False) - def flush_with_lock(self, close_called): + def __flush(self, close_called): """ Ensures all logging output has been flushed. + """ + + with self.__emit_lock: + buf = self.__emit_buffer + self.__emit_buffer = [] + + # The buffer 'buf' is flushed without holding 'self.__emit_lock' to avoid causing callers of + # self.emit() to block behind the completion of a potentially long-running flush operation. + if buf: + with self.__flush_lock: + self._flush_buffer_with_lock(buf, close_called) - This version resets the buffers back to an empty list and is - intended to be overridden by subclasses. + def _flush_buffer_with_lock(self, buf, close_called): + """ + Ensures all logging output has been flushed. """ - self.buffer = [] + raise NotImplementedError("_flush_buffer_with_lock must be implemented by BufferedHandler" + " subclasses") def close(self): """ @@ -118,9 +128,10 @@ class BufferedHandler(logging.Handler): Stops the timer and flushes the buffer. """ - if self._timer is not None: - self._timer.dismiss() - self.flush(close_called=True) + if self.__timer is not None: + self.__timer.dismiss() + + self.__flush(close_called=True) logging.Handler.close(self) diff --git a/buildscripts/resmokelib/utils/timer.py b/buildscripts/resmokelib/utils/timer.py index 80531d5db5c..a32ed99b670 100644 --- a/buildscripts/resmokelib/utils/timer.py +++ b/buildscripts/resmokelib/utils/timer.py @@ -1,10 +1,10 @@ """ Alternative to the threading.Timer class. -Enables a timer to be restarted without needing to construct a new thread -each time. This is necessary to execute periodic actions, e.g. flushing -log messages to buildlogger, while avoiding errors related to "can't start -new thread" that would otherwise occur on Windows. +Enables a timer to be restarted without needing to construct a new +thread each time. This is necessary to execute periodic actions, e.g. +flushing log messages to buildlogger, while avoiding errors related to +"can't start new thread" that would otherwise occur on Windows. """ from __future__ import absolute_import @@ -32,8 +32,8 @@ class AlarmClock(threading.Thread): self.args = args if args is not None else [] self.kwargs = kwargs if kwargs is not None else {} - self.lock = threading.Lock() - self.cond = threading.Condition(self.lock) + self.__lock = threading.Lock() + self.__cond = threading.Condition(self.__lock) self.snoozed = False # canceled for one execution self.dismissed = False # canceled for all time @@ -44,9 +44,9 @@ class AlarmClock(threading.Thread): Disables the timer. """ - with self.lock: + with self.__lock: self.dismissed = True - self.cond.notify_all() + self.__cond.notify_all() self.join() # Tidy up the started thread. @@ -54,24 +54,25 @@ class AlarmClock(threading.Thread): def snooze(self): """ - Skips the next execution of 'func' if it has not already started. + Skips the next execution of 'func' if it has not already + started. """ - with self.lock: + with self.__lock: if self.dismissed: raise ValueError("Timer cannot be snoozed if it has been dismissed") self.snoozed = True self.restarted = False - self.cond.notify_all() + self.__cond.notify_all() def reset(self): """ - Restarts the timer, causing it to wait 'interval' seconds before calling - 'func' again. + Restarts the timer, causing it to wait 'interval' seconds before + calling 'func' again. """ - with self.lock: + with self.__lock: if self.dismissed: raise ValueError("Timer cannot be reset if it has been dismissed") @@ -79,26 +80,43 @@ class AlarmClock(threading.Thread): raise ValueError("Timer cannot be reset if it has not been snoozed") self.restarted = True - self.cond.notify_all() + self.__cond.notify_all() + + def trigger(self): + """ + Signals the timer, causing 'func' to execute sooner than waiting + for 'interval' seconds to pass. + """ + + with self.__lock: + if self.dismissed: + raise ValueError("Timer cannot be triggered if it has been dismissed") + + if self.snoozed: + raise ValueError("Timer cannot be triggered if it has been snoozed") + + self.__cond.notify_all() def run(self): """ - Repeatedly calls 'func' with a delay of 'interval' seconds between executions. + Repeatedly calls 'func' with a delay of 'interval' seconds + between executions. - If the timer is snoozed before 'func' is called, then it waits to be reset. - After it has been reset, the timer will again wait 'interval' seconds and - then try to call 'func'. + If the timer is snoozed before 'func' is called, then it waits + to be reset. After it has been reset, the timer will again wait + 'interval' seconds and then try to call 'func'. - If the timer is dismissed, then no subsequent executions of 'func' are made. + If the timer is dismissed, then no subsequent executions of + 'func' are made. """ while True: - with self.lock: + with self.__lock: if self.dismissed: return # Wait for the specified amount of time. - self.cond.wait(self.interval) + self.__cond.wait(self.interval) if self.dismissed: return @@ -106,7 +124,7 @@ class AlarmClock(threading.Thread): # If the timer was snoozed, then it should wait to be reset. if self.snoozed: while not self.restarted: - self.cond.wait() + self.__cond.wait() if self.dismissed: return @@ -120,6 +138,6 @@ class AlarmClock(threading.Thread): self.func(*self.args, **self.kwargs) # Reacquire the lock. - with self.lock: + with self.__lock: # Ignore snoozes that took place while the function was being executed. self.snoozed = False |