summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/core/pipe.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildscripts/resmokelib/core/pipe.py')
-rw-r--r--buildscripts/resmokelib/core/pipe.py87
1 files changed, 87 insertions, 0 deletions
diff --git a/buildscripts/resmokelib/core/pipe.py b/buildscripts/resmokelib/core/pipe.py
new file mode 100644
index 00000000000..bb080721b2d
--- /dev/null
+++ b/buildscripts/resmokelib/core/pipe.py
@@ -0,0 +1,87 @@
+"""
+Helper class to read output of a subprocess. Used to avoid deadlocks
+from the pipe buffer filling up and blocking the subprocess while it's
+being waited on.
+"""
+
+from __future__ import absolute_import
+
+import threading
+
+
+class LoggerPipe(threading.Thread):
+ """
+ Asynchronously reads the output of a subprocess and sends it to a
+ logger.
+ """
+
+ # The start() and join() methods are not intended to be called directly on the LoggerPipe
+ # instance. Since we override them for that effect, the super's version are preserved here.
+ __start = threading.Thread.start
+ __join = threading.Thread.join
+
+ def __init__(self, logger, level, pipe_out):
+ """
+ Initializes the LoggerPipe with the specified logger, logging
+ level to use, and pipe to read from.
+ """
+
+ threading.Thread.__init__(self)
+ # Main thread should not call join() when exiting
+ self.daemon = True
+
+ self.__logger = logger
+ self.__level = level
+ self.__pipe_out = pipe_out
+
+ self.__lock = threading.Lock()
+ self.__condition = threading.Condition(self.__lock)
+
+ self.__started = False
+ self.__finished = False
+
+ LoggerPipe.__start(self)
+
+ def start(self):
+ raise NotImplementedError("start should not be called directly")
+
+ def run(self):
+ """
+ Reads the output from 'pipe_out' and logs each line to 'logger'.
+ """
+
+ with self.__lock:
+ self.__started = True
+ self.__condition.notify_all()
+
+ # Close the pipe when finished reading all of the output.
+ with self.__pipe_out:
+ # Avoid buffering the output from the pipe.
+ for line in iter(self.__pipe_out.readline, b""):
+ # Convert the output of the process from a bytestring to a UTF-8 string, and replace
+ # any characters that cannot be decoded with the official Unicode replacement
+ # character, U+FFFD. The log messages of MongoDB processes are not always valid
+ # UTF-8 sequences. See SERVER-7506.
+ line = line.decode("utf-8", "replace")
+ self.__logger.log(self.__level, line.rstrip())
+
+ with self.__lock:
+ self.__finished = True
+ self.__condition.notify_all()
+
+ def join(self, timeout=None):
+ raise NotImplementedError("join should not be called directly")
+
+ def wait_until_started(self):
+ with self.__lock:
+ while not self.__started:
+ self.__condition.wait()
+
+ def wait_until_finished(self):
+ with self.__lock:
+ while not self.__finished:
+ self.__condition.wait()
+
+ # No need to pass a timeout to join() because the thread should already be done after
+ # notifying us it has finished reading output from the pipe.
+ LoggerPipe.__join(self) # Tidy up the started thread.