summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2017-02-13 20:27:02 -0500
committerMax Hirschhorn <max.hirschhorn@mongodb.com>2017-02-13 20:27:02 -0500
commit6f6546be9b9554753d5cbf765b315817cb2eb45b (patch)
treec7ff66c7d089fb3869aee69ca64335e5c34e89bc
parentc9a76918c01d86d08ee102ebeedd79e2911bb1ce (diff)
downloadmongo-v2.6.tar.gz
SERVER-23959 Conditionally create a Windows job object in smoke.py.v2.6
Changes smoke.py to (like resmoke.py) only create a job object if the Python process isn't already inside of one. This allows the Evergreen agent to create a job object for managing processes spawned by a task to ensure they are cleaned up reliably. (cherry picked from commit dd6acd3551dc9e409a863e517609f2509a0f87f7)
-rw-r--r--buildscripts/buildlogger.py12
-rw-r--r--buildscripts/resmokelib/__init__.py1
-rw-r--r--buildscripts/resmokelib/core/__init__.py1
-rw-r--r--buildscripts/resmokelib/core/pipe.py87
-rwxr-xr-xbuildscripts/smoke.py124
-rw-r--r--etc/evergreen.yml2
6 files changed, 205 insertions, 22 deletions
diff --git a/buildscripts/buildlogger.py b/buildscripts/buildlogger.py
index a31b3e2dfa1..0dfb5ab5f8c 100644
--- a/buildscripts/buildlogger.py
+++ b/buildscripts/buildlogger.py
@@ -238,6 +238,12 @@ def run_and_echo(command):
"""
proc = subprocess.Popen(command)
+ # We write the pid of the spawned process as the first line of buildlogger.py's stdout because
+ # smoke.py expects to use it to terminate processes individually if already running inside a job
+ # object.
+ sys.stdout.write("[buildlogger.py] pid: %d\n" % (proc.pid))
+ sys.stdout.flush()
+
def handle_sigterm(signum, frame):
try:
proc.send_signal(signum)
@@ -415,6 +421,12 @@ def loop_and_callback(command, callback):
stderr=subprocess.STDOUT,
)
+ # We write the pid of the spawned process as the first line of buildlogger.py's stdout because
+ # smoke.py expects to use it to terminate processes individually if already running inside a job
+ # object.
+ sys.stdout.write("[buildlogger.py] pid: %d\n" % (proc.pid))
+ sys.stdout.flush()
+
def handle_sigterm(signum, frame):
try:
proc.send_signal(signum)
diff --git a/buildscripts/resmokelib/__init__.py b/buildscripts/resmokelib/__init__.py
new file mode 100644
index 00000000000..c3961685ab8
--- /dev/null
+++ b/buildscripts/resmokelib/__init__.py
@@ -0,0 +1 @@
+from __future__ import absolute_import
diff --git a/buildscripts/resmokelib/core/__init__.py b/buildscripts/resmokelib/core/__init__.py
new file mode 100644
index 00000000000..c3961685ab8
--- /dev/null
+++ b/buildscripts/resmokelib/core/__init__.py
@@ -0,0 +1 @@
+from __future__ import absolute_import
diff --git a/buildscripts/resmokelib/core/pipe.py b/buildscripts/resmokelib/core/pipe.py
new file mode 100644
index 00000000000..bb080721b2d
--- /dev/null
+++ b/buildscripts/resmokelib/core/pipe.py
@@ -0,0 +1,87 @@
+"""
+Helper class to read output of a subprocess. Used to avoid deadlocks
+from the pipe buffer filling up and blocking the subprocess while it's
+being waited on.
+"""
+
+from __future__ import absolute_import
+
+import threading
+
+
+class LoggerPipe(threading.Thread):
+ """
+ Asynchronously reads the output of a subprocess and sends it to a
+ logger.
+ """
+
+ # The start() and join() methods are not intended to be called directly on the LoggerPipe
+ # instance. Since we override them for that effect, the super's version are preserved here.
+ __start = threading.Thread.start
+ __join = threading.Thread.join
+
+ def __init__(self, logger, level, pipe_out):
+ """
+ Initializes the LoggerPipe with the specified logger, logging
+ level to use, and pipe to read from.
+ """
+
+ threading.Thread.__init__(self)
+ # Main thread should not call join() when exiting
+ self.daemon = True
+
+ self.__logger = logger
+ self.__level = level
+ self.__pipe_out = pipe_out
+
+ self.__lock = threading.Lock()
+ self.__condition = threading.Condition(self.__lock)
+
+ self.__started = False
+ self.__finished = False
+
+ LoggerPipe.__start(self)
+
+ def start(self):
+ raise NotImplementedError("start should not be called directly")
+
+ def run(self):
+ """
+ Reads the output from 'pipe_out' and logs each line to 'logger'.
+ """
+
+ with self.__lock:
+ self.__started = True
+ self.__condition.notify_all()
+
+ # Close the pipe when finished reading all of the output.
+ with self.__pipe_out:
+ # Avoid buffering the output from the pipe.
+ for line in iter(self.__pipe_out.readline, b""):
+ # Convert the output of the process from a bytestring to a UTF-8 string, and replace
+ # any characters that cannot be decoded with the official Unicode replacement
+ # character, U+FFFD. The log messages of MongoDB processes are not always valid
+ # UTF-8 sequences. See SERVER-7506.
+ line = line.decode("utf-8", "replace")
+ self.__logger.log(self.__level, line.rstrip())
+
+ with self.__lock:
+ self.__finished = True
+ self.__condition.notify_all()
+
+ def join(self, timeout=None):
+ raise NotImplementedError("join should not be called directly")
+
+ def wait_until_started(self):
+ with self.__lock:
+ while not self.__started:
+ self.__condition.wait()
+
+ def wait_until_finished(self):
+ with self.__lock:
+ while not self.__finished:
+ self.__condition.wait()
+
+ # No need to pass a timeout to join() because the thread should already be done after
+ # notifying us it has finished reading output from the pipe.
+ LoggerPipe.__join(self) # Tidy up the started thread.
diff --git a/buildscripts/smoke.py b/buildscripts/smoke.py
index 85a62c76f60..16ad746c7da 100755
--- a/buildscripts/smoke.py
+++ b/buildscripts/smoke.py
@@ -36,6 +36,7 @@
from datetime import datetime
from itertools import izip
import glob
+import logging
from optparse import OptionParser
import os
import pprint
@@ -72,6 +73,12 @@ except:
except:
json = None
+# Get relative imports to work when the package is not installed on the PYTHONPATH.
+if __name__ == "__main__" and __package__ is None:
+ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(os.path.realpath(__file__)))))
+
+from buildscripts.resmokelib.core import pipe
+
# TODO clean this up so we don't need globals...
mongo_repo = os.getcwd() #'./'
@@ -141,6 +148,10 @@ class mongod(object):
self.proc = None
self.auth = False
+ self.job_object = None
+ self._inner_proc_pid = None
+ self._stdout_pipe = None
+
def __enter__(self):
self.start()
return self
@@ -238,6 +249,28 @@ class mongod(object):
print "running " + " ".join(argv)
self.proc = self._start(buildlogger(argv, is_global=True))
+ # If the mongod process is spawned under buildlogger.py, then the first line of output
+ # should include the pid of the underlying mongod process. If smoke.py didn't create its own
+ # job object because it is already inside one, then the pid is used to attempt to terminate
+ # the underlying mongod process.
+ first_line = self.proc.stdout.readline()
+ match = re.search("^\[buildlogger.py\] pid: (?P<pid>[0-9]+)$", first_line.rstrip())
+ if match is not None:
+ self._inner_proc_pid = int(match.group("pid"))
+ else:
+ # The first line of output didn't include the pid of the underlying mongod process. We
+ # write the first line of output to smoke.py's stdout to ensure the message doesn't get
+ # lost since it's possible that buildlogger.py isn't being used.
+ sys.stdout.write(first_line)
+
+ logger = logging.Logger("", level=logging.DEBUG)
+ handler = logging.StreamHandler(sys.stdout)
+ handler.setFormatter(logging.Formatter(fmt="%(message)s"))
+ logger.addHandler(handler)
+
+ self._stdout_pipe = pipe.LoggerPipe(logger, logging.INFO, self.proc.stdout)
+ self._stdout_pipe.wait_until_started()
+
if not self.did_mongod_start(self.port):
raise Exception("Failed to start mongod")
@@ -251,12 +284,15 @@ class mongod(object):
synced = synced and "syncedTo" in source and source["syncedTo"]
def _start(self, argv):
- """In most cases, just call subprocess.Popen(). On windows,
- add the started process to a new Job Object, so that any
- child processes of this process can be killed with a single
- call to TerminateJobObject (see self.stop()).
+ """In most cases, just call subprocess.Popen(). On Windows, this
+ method also assigns the started process to a job object if a new
+ one was created. This ensures that any child processes of this
+ process can be killed with a single call to TerminateJobObject
+ (see self.stop()).
"""
+ creation_flags = 0
+
if os.sys.platform == "win32":
# Create a job object with the "kill on job close"
# flag; this is inherited by child processes (ie
@@ -264,28 +300,30 @@ class mongod(object):
# and lets us terminate the whole tree of processes
# rather than orphaning the mongod.
import win32job
+ import win32process
- # Magic number needed to allow job reassignment in Windows 7
- # see: MSDN - Process Creation Flags - ms684863
- CREATE_BREAKAWAY_FROM_JOB = 0x01000000
+ # Don't create a job object if the current process is already inside one.
+ if not win32job.IsProcessInJob(win32process.GetCurrentProcess(), None):
+ self.job_object = win32job.CreateJobObject(None, '')
- proc = Popen(argv, creationflags=CREATE_BREAKAWAY_FROM_JOB)
+ job_info = win32job.QueryInformationJobObject(
+ self.job_object, win32job.JobObjectExtendedLimitInformation)
+ job_info['BasicLimitInformation']['LimitFlags'] |= \
+ win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
+ win32job.SetInformationJobObject(
+ self.job_object,
+ win32job.JobObjectExtendedLimitInformation,
+ job_info)
- self.job_object = win32job.CreateJobObject(None, '')
+ # Magic number needed to allow job reassignment in Windows 7
+ # see: MSDN - Process Creation Flags - ms684863
+ creation_flags |= win32process.CREATE_BREAKAWAY_FROM_JOB
- job_info = win32job.QueryInformationJobObject(
- self.job_object, win32job.JobObjectExtendedLimitInformation)
- job_info['BasicLimitInformation']['LimitFlags'] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
- win32job.SetInformationJobObject(
- self.job_object,
- win32job.JobObjectExtendedLimitInformation,
- job_info)
+ proc = Popen(argv, creationflags=creation_flags, stdout=PIPE, stderr=None, bufsize=0)
+ if self.job_object is not None:
win32job.AssignProcessToJobObject(self.job_object, proc._handle)
- else:
- proc = Popen(argv)
-
return proc
def stop(self):
@@ -293,12 +331,54 @@ class mongod(object):
print >> sys.stderr, "probable bug: self.proc unset in stop()"
return
try:
- if os.sys.platform == "win32":
+ if os.sys.platform == "win32" and self.job_object is not None:
+ # If smoke.py created its own job object, then we clean up the spawned processes by
+ # terminating it.
import win32job
win32job.TerminateJobObject(self.job_object, -1)
import time
# Windows doesn't seem to kill the process immediately, so give it some time to die
time.sleep(5)
+ elif os.sys.platform == "win32":
+ # If smoke.py didn't create its own job object, then we attempt to clean up the
+ # spawned processes by terminating them individually.
+ import win32api
+ import win32con
+ import win32event
+ import win32process
+ import winerror
+
+ def win32_terminate(handle):
+ # Adapted from implementation of Popen.terminate() in subprocess.py of Python
+ # 2.7 because earlier versions do not catch exceptions.
+ try:
+ win32process.TerminateProcess(handle, -1)
+ except win32process.error as err:
+ # ERROR_ACCESS_DENIED (winerror=5) is received when the process has
+ # already died.
+ if err.winerror != winerror.ERROR_ACCESS_DENIED:
+ raise
+ return_code = win32process.GetExitCodeProcess(handle)
+ if return_code == win32con.STILL_ACTIVE:
+ raise
+
+ # Terminate the mongod process underlying buildlogger.py if one exists.
+ if self._inner_proc_pid is not None:
+ # The PROCESS_TERMINATE privilege is necessary to call TerminateProcess() and
+ # the SYNCHRONIZE privilege is necessary to call WaitForSingleObject(). See
+ # https://msdn.microsoft.com/en-us/library/windows/desktop/ms684880(v=vs.85).aspx
+ # for more details.
+ required_access = win32con.PROCESS_TERMINATE | win32con.SYNCHRONIZE
+ inner_proc_handle = win32api.OpenProcess(required_access,
+ False,
+ self._inner_proc_pid)
+ try:
+ win32_terminate(inner_proc_handle)
+ win32event.WaitForSingleObject(inner_proc_handle, win32event.INFINITE)
+ finally:
+ win32api.CloseHandle(inner_proc_handle)
+
+ win32_terminate(self.proc._handle)
else:
# This function not available in Python 2.5
self.proc.terminate()
@@ -306,6 +386,10 @@ class mongod(object):
from os import kill
kill(self.proc.pid, 15)
self.proc.wait()
+
+ if self._stdout_pipe is not None:
+ self._stdout_pipe.wait_until_finished()
+
sys.stderr.flush()
sys.stdout.flush()
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 774c28a6005..0d804545679 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -22,8 +22,6 @@
# - 'builder_num'
# - 'builder_phase'
-disable_cleanup: true
-
functions:
"fetch source" :
command: git.get_project