From 6f6546be9b9554753d5cbf765b315817cb2eb45b Mon Sep 17 00:00:00 2001 From: Max Hirschhorn Date: Mon, 13 Feb 2017 20:27:02 -0500 Subject: SERVER-23959 Conditionally create a Windows job object in smoke.py. Changes smoke.py to (like resmoke.py) only create a job object if the Python process isn't already inside of one. This allows the Evergreen agent to create a job object for managing processes spawned by a task to ensure they are cleaned up reliably. (cherry picked from commit dd6acd3551dc9e409a863e517609f2509a0f87f7) --- buildscripts/buildlogger.py | 12 +++ buildscripts/resmokelib/__init__.py | 1 + buildscripts/resmokelib/core/__init__.py | 1 + buildscripts/resmokelib/core/pipe.py | 87 ++++++++++++++++++++++ buildscripts/smoke.py | 124 ++++++++++++++++++++++++++----- etc/evergreen.yml | 2 - 6 files changed, 205 insertions(+), 22 deletions(-) create mode 100644 buildscripts/resmokelib/__init__.py create mode 100644 buildscripts/resmokelib/core/__init__.py create mode 100644 buildscripts/resmokelib/core/pipe.py diff --git a/buildscripts/buildlogger.py b/buildscripts/buildlogger.py index a31b3e2dfa1..0dfb5ab5f8c 100644 --- a/buildscripts/buildlogger.py +++ b/buildscripts/buildlogger.py @@ -238,6 +238,12 @@ def run_and_echo(command): """ proc = subprocess.Popen(command) + # We write the pid of the spawned process as the first line of buildlogger.py's stdout because + # smoke.py expects to use it to terminate processes individually if already running inside a job + # object. + sys.stdout.write("[buildlogger.py] pid: %d\n" % (proc.pid)) + sys.stdout.flush() + def handle_sigterm(signum, frame): try: proc.send_signal(signum) @@ -415,6 +421,12 @@ def loop_and_callback(command, callback): stderr=subprocess.STDOUT, ) + # We write the pid of the spawned process as the first line of buildlogger.py's stdout because + # smoke.py expects to use it to terminate processes individually if already running inside a job + # object. + sys.stdout.write("[buildlogger.py] pid: %d\n" % (proc.pid)) + sys.stdout.flush() + def handle_sigterm(signum, frame): try: proc.send_signal(signum) diff --git a/buildscripts/resmokelib/__init__.py b/buildscripts/resmokelib/__init__.py new file mode 100644 index 00000000000..c3961685ab8 --- /dev/null +++ b/buildscripts/resmokelib/__init__.py @@ -0,0 +1 @@ +from __future__ import absolute_import diff --git a/buildscripts/resmokelib/core/__init__.py b/buildscripts/resmokelib/core/__init__.py new file mode 100644 index 00000000000..c3961685ab8 --- /dev/null +++ b/buildscripts/resmokelib/core/__init__.py @@ -0,0 +1 @@ +from __future__ import absolute_import diff --git a/buildscripts/resmokelib/core/pipe.py b/buildscripts/resmokelib/core/pipe.py new file mode 100644 index 00000000000..bb080721b2d --- /dev/null +++ b/buildscripts/resmokelib/core/pipe.py @@ -0,0 +1,87 @@ +""" +Helper class to read output of a subprocess. Used to avoid deadlocks +from the pipe buffer filling up and blocking the subprocess while it's +being waited on. +""" + +from __future__ import absolute_import + +import threading + + +class LoggerPipe(threading.Thread): + """ + Asynchronously reads the output of a subprocess and sends it to a + logger. + """ + + # The start() and join() methods are not intended to be called directly on the LoggerPipe + # instance. Since we override them for that effect, the super's version are preserved here. + __start = threading.Thread.start + __join = threading.Thread.join + + def __init__(self, logger, level, pipe_out): + """ + Initializes the LoggerPipe with the specified logger, logging + level to use, and pipe to read from. + """ + + threading.Thread.__init__(self) + # Main thread should not call join() when exiting + self.daemon = True + + self.__logger = logger + self.__level = level + self.__pipe_out = pipe_out + + self.__lock = threading.Lock() + self.__condition = threading.Condition(self.__lock) + + self.__started = False + self.__finished = False + + LoggerPipe.__start(self) + + def start(self): + raise NotImplementedError("start should not be called directly") + + def run(self): + """ + Reads the output from 'pipe_out' and logs each line to 'logger'. + """ + + with self.__lock: + self.__started = True + self.__condition.notify_all() + + # Close the pipe when finished reading all of the output. + with self.__pipe_out: + # Avoid buffering the output from the pipe. + for line in iter(self.__pipe_out.readline, b""): + # Convert the output of the process from a bytestring to a UTF-8 string, and replace + # any characters that cannot be decoded with the official Unicode replacement + # character, U+FFFD. The log messages of MongoDB processes are not always valid + # UTF-8 sequences. See SERVER-7506. + line = line.decode("utf-8", "replace") + self.__logger.log(self.__level, line.rstrip()) + + with self.__lock: + self.__finished = True + self.__condition.notify_all() + + def join(self, timeout=None): + raise NotImplementedError("join should not be called directly") + + def wait_until_started(self): + with self.__lock: + while not self.__started: + self.__condition.wait() + + def wait_until_finished(self): + with self.__lock: + while not self.__finished: + self.__condition.wait() + + # No need to pass a timeout to join() because the thread should already be done after + # notifying us it has finished reading output from the pipe. + LoggerPipe.__join(self) # Tidy up the started thread. diff --git a/buildscripts/smoke.py b/buildscripts/smoke.py index 85a62c76f60..16ad746c7da 100755 --- a/buildscripts/smoke.py +++ b/buildscripts/smoke.py @@ -36,6 +36,7 @@ from datetime import datetime from itertools import izip import glob +import logging from optparse import OptionParser import os import pprint @@ -72,6 +73,12 @@ except: except: json = None +# Get relative imports to work when the package is not installed on the PYTHONPATH. +if __name__ == "__main__" and __package__ is None: + sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(os.path.realpath(__file__))))) + +from buildscripts.resmokelib.core import pipe + # TODO clean this up so we don't need globals... mongo_repo = os.getcwd() #'./' @@ -141,6 +148,10 @@ class mongod(object): self.proc = None self.auth = False + self.job_object = None + self._inner_proc_pid = None + self._stdout_pipe = None + def __enter__(self): self.start() return self @@ -238,6 +249,28 @@ class mongod(object): print "running " + " ".join(argv) self.proc = self._start(buildlogger(argv, is_global=True)) + # If the mongod process is spawned under buildlogger.py, then the first line of output + # should include the pid of the underlying mongod process. If smoke.py didn't create its own + # job object because it is already inside one, then the pid is used to attempt to terminate + # the underlying mongod process. + first_line = self.proc.stdout.readline() + match = re.search("^\[buildlogger.py\] pid: (?P[0-9]+)$", first_line.rstrip()) + if match is not None: + self._inner_proc_pid = int(match.group("pid")) + else: + # The first line of output didn't include the pid of the underlying mongod process. We + # write the first line of output to smoke.py's stdout to ensure the message doesn't get + # lost since it's possible that buildlogger.py isn't being used. + sys.stdout.write(first_line) + + logger = logging.Logger("", level=logging.DEBUG) + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(logging.Formatter(fmt="%(message)s")) + logger.addHandler(handler) + + self._stdout_pipe = pipe.LoggerPipe(logger, logging.INFO, self.proc.stdout) + self._stdout_pipe.wait_until_started() + if not self.did_mongod_start(self.port): raise Exception("Failed to start mongod") @@ -251,12 +284,15 @@ class mongod(object): synced = synced and "syncedTo" in source and source["syncedTo"] def _start(self, argv): - """In most cases, just call subprocess.Popen(). On windows, - add the started process to a new Job Object, so that any - child processes of this process can be killed with a single - call to TerminateJobObject (see self.stop()). + """In most cases, just call subprocess.Popen(). On Windows, this + method also assigns the started process to a job object if a new + one was created. This ensures that any child processes of this + process can be killed with a single call to TerminateJobObject + (see self.stop()). """ + creation_flags = 0 + if os.sys.platform == "win32": # Create a job object with the "kill on job close" # flag; this is inherited by child processes (ie @@ -264,28 +300,30 @@ class mongod(object): # and lets us terminate the whole tree of processes # rather than orphaning the mongod. import win32job + import win32process - # Magic number needed to allow job reassignment in Windows 7 - # see: MSDN - Process Creation Flags - ms684863 - CREATE_BREAKAWAY_FROM_JOB = 0x01000000 + # Don't create a job object if the current process is already inside one. + if not win32job.IsProcessInJob(win32process.GetCurrentProcess(), None): + self.job_object = win32job.CreateJobObject(None, '') - proc = Popen(argv, creationflags=CREATE_BREAKAWAY_FROM_JOB) + job_info = win32job.QueryInformationJobObject( + self.job_object, win32job.JobObjectExtendedLimitInformation) + job_info['BasicLimitInformation']['LimitFlags'] |= \ + win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE + win32job.SetInformationJobObject( + self.job_object, + win32job.JobObjectExtendedLimitInformation, + job_info) - self.job_object = win32job.CreateJobObject(None, '') + # Magic number needed to allow job reassignment in Windows 7 + # see: MSDN - Process Creation Flags - ms684863 + creation_flags |= win32process.CREATE_BREAKAWAY_FROM_JOB - job_info = win32job.QueryInformationJobObject( - self.job_object, win32job.JobObjectExtendedLimitInformation) - job_info['BasicLimitInformation']['LimitFlags'] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE - win32job.SetInformationJobObject( - self.job_object, - win32job.JobObjectExtendedLimitInformation, - job_info) + proc = Popen(argv, creationflags=creation_flags, stdout=PIPE, stderr=None, bufsize=0) + if self.job_object is not None: win32job.AssignProcessToJobObject(self.job_object, proc._handle) - else: - proc = Popen(argv) - return proc def stop(self): @@ -293,12 +331,54 @@ class mongod(object): print >> sys.stderr, "probable bug: self.proc unset in stop()" return try: - if os.sys.platform == "win32": + if os.sys.platform == "win32" and self.job_object is not None: + # If smoke.py created its own job object, then we clean up the spawned processes by + # terminating it. import win32job win32job.TerminateJobObject(self.job_object, -1) import time # Windows doesn't seem to kill the process immediately, so give it some time to die time.sleep(5) + elif os.sys.platform == "win32": + # If smoke.py didn't create its own job object, then we attempt to clean up the + # spawned processes by terminating them individually. + import win32api + import win32con + import win32event + import win32process + import winerror + + def win32_terminate(handle): + # Adapted from implementation of Popen.terminate() in subprocess.py of Python + # 2.7 because earlier versions do not catch exceptions. + try: + win32process.TerminateProcess(handle, -1) + except win32process.error as err: + # ERROR_ACCESS_DENIED (winerror=5) is received when the process has + # already died. + if err.winerror != winerror.ERROR_ACCESS_DENIED: + raise + return_code = win32process.GetExitCodeProcess(handle) + if return_code == win32con.STILL_ACTIVE: + raise + + # Terminate the mongod process underlying buildlogger.py if one exists. + if self._inner_proc_pid is not None: + # The PROCESS_TERMINATE privilege is necessary to call TerminateProcess() and + # the SYNCHRONIZE privilege is necessary to call WaitForSingleObject(). See + # https://msdn.microsoft.com/en-us/library/windows/desktop/ms684880(v=vs.85).aspx + # for more details. + required_access = win32con.PROCESS_TERMINATE | win32con.SYNCHRONIZE + inner_proc_handle = win32api.OpenProcess(required_access, + False, + self._inner_proc_pid) + try: + win32_terminate(inner_proc_handle) + win32event.WaitForSingleObject(inner_proc_handle, win32event.INFINITE) + finally: + win32api.CloseHandle(inner_proc_handle) + + win32_terminate(self.proc._handle) else: # This function not available in Python 2.5 self.proc.terminate() @@ -306,6 +386,10 @@ class mongod(object): from os import kill kill(self.proc.pid, 15) self.proc.wait() + + if self._stdout_pipe is not None: + self._stdout_pipe.wait_until_finished() + sys.stderr.flush() sys.stdout.flush() diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 774c28a6005..0d804545679 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -22,8 +22,6 @@ # - 'builder_num' # - 'builder_phase' -disable_cleanup: true - functions: "fetch source" : command: git.get_project -- cgit v1.2.1