summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/core/process.py
blob: 4b8e0f25d0ae87240fe81bb94a1da50a5bd483b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""A more reliable way to create and destroy processes.

Uses job objects when running on Windows to ensure that all created
processes are terminated.
"""

import atexit
import logging
import os
import os.path
import sys
import threading
import subprocess

from . import pipe  # pylint: disable=wrong-import-position
from .. import utils  # pylint: disable=wrong-import-position

# Attempt to avoid race conditions (e.g. hangs caused by a file descriptor being left open) when
# starting subprocesses concurrently from multiple threads by guarding calls to subprocess.Popen()
# with a lock. See https://bugs.python.org/issue2320 and https://bugs.python.org/issue12739 as
# reports of such hangs.
#
# This lock probably isn't necessary when both the subprocess32 module and its _posixsubprocess C
# extension module are available because either
#   (a) the pipe2() syscall is available on the platform we're using, so pipes are atomically
#       created with the FD_CLOEXEC flag set on them, or
#   (b) the pipe2() syscall isn't available, but the GIL isn't released during the
#       _posixsubprocess.fork_exec() call or the _posixsubprocess.cloexec_pipe() call.
# See https://bugs.python.org/issue7213 for more details.
_POPEN_LOCK = threading.Lock()

# Job objects are the only reliable way to ensure that processes are terminated on Windows.
if sys.platform == "win32":
    import win32api
    import win32con
    import win32event
    import win32job
    import win32process
    import winerror

    def _init_job_object():
        job_object = win32job.CreateJobObject(None, "")

        # Get the limit and job state information of the newly-created job object.
        job_info = win32job.QueryInformationJobObject(job_object,
                                                      win32job.JobObjectExtendedLimitInformation)

        # Set up the job object so that closing the last handle to the job object
        # will terminate all associated processes and destroy the job object itself.
        job_info["BasicLimitInformation"]["LimitFlags"] |= \
                win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE

        # Update the limits of the job object.
        win32job.SetInformationJobObject(job_object, win32job.JobObjectExtendedLimitInformation,
                                         job_info)

        return job_object

    # Don't create a job object if the current process is already inside one.
    if win32job.IsProcessInJob(win32process.GetCurrentProcess(), None):
        _JOB_OBJECT = None
    else:
        _JOB_OBJECT = _init_job_object()
        atexit.register(win32api.CloseHandle, _JOB_OBJECT)


class Process(object):
    """Wrapper around subprocess.Popen class."""

    # pylint: disable=protected-access

    def __init__(self, logger, args, env=None, env_vars=None):
        """Initialize the process with the specified logger, arguments, and environment."""

        # Ensure that executable files that don't already have an
        # extension on Windows have a ".exe" extension.
        if sys.platform == "win32" and not os.path.splitext(args[0])[1]:
            args[0] += ".exe"

        self.logger = logger
        self.args = args
        self.env = utils.default_if_none(env, os.environ.copy())
        if env_vars is not None:
            self.env.update(env_vars)

        self.pid = None

        self._process = None
        self._stdout_pipe = None
        self._stderr_pipe = None

    def start(self):
        """Start the process and the logger pipes for its stdout and stderr."""

        creation_flags = 0
        if sys.platform == "win32" and _JOB_OBJECT is not None:
            creation_flags |= win32process.CREATE_BREAKAWAY_FROM_JOB

        # Tests fail if a process takes too long to startup and listen to a socket. Use buffered
        # I/O pipes to give the process some leeway.
        buffer_size = 1024 * 1024

        # Close file descriptors in the child process before executing the program. This prevents
        # file descriptors that were inherited due to multiple calls to fork() -- either within one
        # thread, or concurrently from multiple threads -- from causing another subprocess to wait
        # for the completion of the newly spawned child process. Closing other file descriptors
        # isn't supported on Windows when stdout and stderr are redirected.
        close_fds = (sys.platform != "win32")

        with _POPEN_LOCK:
            self._process = subprocess.Popen(self.args, bufsize=buffer_size, stdout=subprocess.PIPE,
                                             stderr=subprocess.PIPE, close_fds=close_fds,
                                             env=self.env, creationflags=creation_flags)
            self.pid = self._process.pid

        self._stdout_pipe = pipe.LoggerPipe(self.logger, logging.INFO, self._process.stdout)
        self._stderr_pipe = pipe.LoggerPipe(self.logger, logging.ERROR, self._process.stderr)

        self._stdout_pipe.wait_until_started()
        self._stderr_pipe.wait_until_started()

        if sys.platform == "win32" and _JOB_OBJECT is not None:
            try:
                win32job.AssignProcessToJobObject(_JOB_OBJECT, self._process._handle)
            except win32job.error as err:
                # ERROR_ACCESS_DENIED (winerror=5) is received when the process has already died.
                if err.winerror != winerror.ERROR_ACCESS_DENIED:
                    raise
                return_code = win32process.GetExitCodeProcess(self._process._handle)
                if return_code == win32con.STILL_ACTIVE:
                    raise

    def stop(self, kill=False):  # pylint: disable=too-many-branches
        """Terminate the process."""
        if sys.platform == "win32":

            # Attempt to cleanly shutdown mongod.
            if not kill and self.args and self.args[0].find("mongod") != -1:
                mongo_signal_handle = None
                try:
                    mongo_signal_handle = win32event.OpenEvent(
                        win32event.EVENT_MODIFY_STATE, False,
                        "Global\\Mongo_" + str(self._process.pid))

                    if not mongo_signal_handle:
                        # The process has already died.
                        return
                    win32event.SetEvent(mongo_signal_handle)
                    # Wait 60 seconds for the program to exit.
                    status = win32event.WaitForSingleObject(self._process._handle, 60 * 1000)
                    if status == win32event.WAIT_OBJECT_0:
                        return
                except win32process.error as err:
                    # ERROR_FILE_NOT_FOUND (winerror=2)
                    # ERROR_ACCESS_DENIED (winerror=5)
                    # ERROR_INVALID_HANDLE (winerror=6)
                    # One of the above errors is received if the process has
                    # already died.
                    if err[0] not in (2, 5, 6):
                        raise
                finally:
                    win32api.CloseHandle(mongo_signal_handle)

                print("Failed to cleanly exit the program, calling TerminateProcess() on PID: " +\
                    str(self._process.pid))

            # Adapted from implementation of Popen.terminate() in subprocess.py of Python 2.7
            # because earlier versions do not catch exceptions.
            try:
                # Have the process exit with code 0 if it is terminated by us to simplify the
                # success-checking logic later on.
                win32process.TerminateProcess(self._process._handle, 0)
            except win32process.error as err:
                # ERROR_ACCESS_DENIED (winerror=5) is received when the process
                # has already died.
                if err.winerror != winerror.ERROR_ACCESS_DENIED:
                    raise
                return_code = win32process.GetExitCodeProcess(self._process._handle)
                if return_code == win32con.STILL_ACTIVE:
                    raise
        else:
            try:
                if kill:
                    self._process.kill()
                else:
                    self._process.terminate()
            except OSError as err:
                # ESRCH (errno=3) is received when the process has already died.
                if err.errno != 3:
                    raise

    def poll(self):
        """Poll."""
        return self._process.poll()

    def wait(self):
        """Wait until process has terminated and all output has been consumed by the logger pipes."""

        return_code = self._process.wait()

        if self._stdout_pipe:
            self._stdout_pipe.wait_until_finished()
        if self._stderr_pipe:
            self._stderr_pipe.wait_until_finished()

        return return_code

    def as_command(self):
        """Return an equivalent command line invocation of the process."""

        default_env = os.environ
        env_diff = self.env.copy()

        # Remove environment variables that appear in both 'os.environ' and 'self.env'.
        for env_var in default_env:
            if env_var in env_diff and env_diff[env_var] == default_env[env_var]:
                del env_diff[env_var]

        sb = []  # String builder.
        for env_var in env_diff:
            sb.append("%s=%s" % (env_var, env_diff[env_var]))
        sb.extend(self.args)

        return " ".join(sb)

    def __str__(self):
        if self.pid is None:
            return self.as_command()
        return "%s (%d)" % (self.as_command(), self.pid)