summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/core/pipe.py
blob: 168345049f8b210ec48c4e43d52cdbf4e5a6ec45 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
Helper class to read output of a subprocess.

Used to avoid deadlocks from the pipe buffer filling up and blocking the subprocess while it's
being waited on.
"""
from textwrap import wrap
import threading
from typing import List

# Logkeeper only support log lines up to 4 MB, we want to be a little under that to account for
# extra metadata that gets sent along with the log message.
MAX_LOG_LINE = int(3.5 * 1024 * 1024)


class LoggerPipe(threading.Thread):  # pylint: disable=too-many-instance-attributes
    """Asynchronously reads the output of a subprocess and sends it to a logger."""

    # The start() and join() methods are not intended to be called directly on the LoggerPipe
    # instance. Since we override them for that effect, the super's version are preserved here.
    __start = threading.Thread.start
    __join = threading.Thread.join

    def __init__(self, logger, level, pipe_out):
        """Initialize the LoggerPipe with the specified arguments."""

        threading.Thread.__init__(self)
        # Main thread should not call join() when exiting
        self.daemon = True

        self.__logger = logger
        self.__level = level
        self.__pipe_out = pipe_out

        self.__lock = threading.Lock()
        self.__condition = threading.Condition(self.__lock)

        self.__started = False
        self.__finished = False

        LoggerPipe.__start(self)

    def start(self):
        """Start not implemented."""
        raise NotImplementedError("start should not be called directly")

    def run(self):
        """Read the output from 'pipe_out' and logs each line to 'logger'."""

        with self.__lock:
            self.__started = True
            self.__condition.notify_all()

        # Close the pipe when finished reading all of the output.
        with self.__pipe_out:
            # Avoid buffering the output from the pipe.
            for line in iter(self.__pipe_out.readline, b""):
                lines = self._format_line_for_logging(line)
                for entry in lines:
                    self.__logger.log(self.__level, entry)

        with self.__lock:
            self.__finished = True
            self.__condition.notify_all()

    def join(self, timeout=None):
        """Join not implemented."""
        raise NotImplementedError("join should not be called directly")

    def wait_until_started(self):
        """Wait until started."""
        with self.__lock:
            while not self.__started:
                self.__condition.wait()

    def wait_until_finished(self):
        """Wait until finished."""
        with self.__lock:
            while not self.__finished:
                self.__condition.wait()

        # No need to pass a timeout to join() because the thread should already be done after
        # notifying us it has finished reading output from the pipe.
        LoggerPipe.__join(self)  # Tidy up the started thread.

    @staticmethod
    def _format_line_for_logging(line_bytes: bytes) -> List[str]:
        """
        Convert the given byte array into string(s) to be send to the logger.

        If the size of the input is greater than the max size supported by logkeeper, we will
        split the input into multiple strings that are under the max supported size.

        :param line_bytes: Byte array of the line to send to the logger.
        :return: List of strings to send to logger.
        """
        # Replace null bytes in the output of the subprocess with a literal backslash ('\')
        # followed by a literal zero ('0') so tools like grep don't treat resmoke.py's
        # output as binary data.
        line_bytes = line_bytes.replace(b"\0", b"\\0")

        # Convert the output of the process from a bytestring to a UTF-8 string, and replace
        # any characters that cannot be decoded with the official Unicode replacement
        # character, U+FFFD. The log messages of MongoDB processes are not always valid
        # UTF-8 sequences. See SERVER-7506.
        line_str = line_bytes.decode("utf-8", "replace")
        line_str = line_str.rstrip()
        if len(line_str) > MAX_LOG_LINE:
            return wrap(
                line_str,
                MAX_LOG_LINE,
                expand_tabs=False,
                replace_whitespace=False,
                drop_whitespace=False,
                break_on_hyphens=False,
            )
        return [line_str]