diff options
author | James E. King III <jking@apache.org> | 2018-03-16 16:07:42 -0400 |
---|---|---|
committer | James E. King III <jking@apache.org> | 2018-03-19 14:38:49 -0400 |
commit | 9bea32f73c36a8f53a45e818cfafe81b6fefefae (patch) | |
tree | 9598fe6b03c4b22d7baf84607bbabbbda1d66bc0 /test/crossrunner | |
parent | 02fbe0ecc795881fe11a447d0a5f6f2f656f7bb4 (diff) | |
download | thrift-9bea32f73c36a8f53a45e818cfafe81b6fefefae.tar.gz |
THRIFT-4515: cross server test improvement: graceful test server shutdown
This closes #1509
Diffstat (limited to 'test/crossrunner')
-rw-r--r-- | test/crossrunner/collect.py | 4 | ||||
-rw-r--r-- | test/crossrunner/report.py | 10 | ||||
-rw-r--r-- | test/crossrunner/run.py | 206 | ||||
-rw-r--r-- | test/crossrunner/test.py | 10 | ||||
-rw-r--r-- | test/crossrunner/util.py | 4 |
5 files changed, 139 insertions, 95 deletions
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py index 03b0c36c9..e2d897828 100644 --- a/test/crossrunner/collect.py +++ b/test/crossrunner/collect.py @@ -51,6 +51,7 @@ VALID_JSON_KEYS = [ ] DEFAULT_MAX_DELAY = 5 +DEFAULT_SIGNAL = 1 DEFAULT_TIMEOUT = 5 @@ -112,7 +113,7 @@ def _do_collect_tests(servers, clients): yield name, impl1, impl2 def maybe_max(key, o1, o2, default): - """maximum of two if present, otherwise defult value""" + """maximum of two if present, otherwise default value""" v1 = o1.get(key) v2 = o2.get(key) return max(v1, v2) if v1 and v2 else v1 or v2 or default @@ -138,6 +139,7 @@ def _do_collect_tests(servers, clients): 'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}), 'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}), 'delay': maybe_max('delay', sv, cl, DEFAULT_MAX_DELAY), + 'stop_signal': maybe_max('stop_signal', sv, cl, DEFAULT_SIGNAL), 'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT), 'protocol': proto, 'transport': trans, diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py index 76324ede1..75f36db75 100644 --- a/test/crossrunner/report.py +++ b/test/crossrunner/report.py @@ -157,8 +157,10 @@ class ExecReporter(TestReporter): ])), 'client': list(map(re.compile, [ '[Cc]onnection refused', - 'Could not connect to localhost', + 'Could not connect to', 'ECONNREFUSED', + 'econnrefused', # erl + 'CONNECTION-REFUSED-ERROR', # cl 'No such file or directory', # domain socket ])), } @@ -174,6 +176,7 @@ class ExecReporter(TestReporter): def match(line): for expr in exprs: if expr.search(line): + self._log.info("maybe false positive: %s" % line) return True with logfile_open(self.logpath, 'r') as fp: @@ -204,7 +207,7 @@ class ExecReporter(TestReporter): def _print_footer(self, returncode=None): self._print_bar() if returncode is not None: - print('Return code: %d' % returncode, file=self.out) + print('Return code: %d (negative values indicate kill by signal)' % returncode, file=self.out) else: print('Process is killed.', file=self.out) self._print_exec_time() @@ -261,7 +264,8 @@ class SummaryReporter(TestReporter): if not with_result: return '{:24s}{:18s}{:25s}'.format(name[:23], test.protocol[:17], trans[:24]) else: - return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17], trans[:24], self._result_string(test)) + return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17], + trans[:24], self._result_string(test)) def _print_test_header(self): self._print_bar() diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py index f522bb19e..25c58cef3 100644 --- a/test/crossrunner/run.py +++ b/test/crossrunner/run.py @@ -23,19 +23,20 @@ import multiprocessing.managers import os import platform import random -import signal import socket import subprocess import sys -import threading import time from .compat import str_join -from .test import TestEntry, domain_socket_path from .report import ExecReporter, SummaryReporter +from .test import TestEntry +from .util import domain_socket_path -RESULT_TIMEOUT = 128 RESULT_ERROR = 64 +RESULT_TIMEOUT = 128 +SIGNONE = 0 +SIGKILL = 15 # globals ports = None @@ -43,35 +44,18 @@ stop = None class ExecutionContext(object): - def __init__(self, cmd, cwd, env, report): + def __init__(self, cmd, cwd, env, stop_signal, is_server, report): self._log = multiprocessing.get_logger() - self.report = report self.cmd = cmd self.cwd = cwd self.env = env - self.timer = None + self.stop_signal = stop_signal + self.is_server = is_server + self.report = report self.expired = False self.killed = False self.proc = None - def _expire(self): - self._log.info('Timeout') - self.expired = True - self.kill() - - def kill(self): - self._log.debug('Killing process : %d' % self.proc.pid) - self.killed = True - if platform.system() != 'Windows': - try: - os.killpg(self.proc.pid, signal.SIGKILL) - except Exception: - self._log.info('Failed to kill process group', exc_info=sys.exc_info()) - try: - self.proc.kill() - except Exception: - self._log.info('Failed to kill process', exc_info=sys.exc_info()) - def _popen_args(self): args = { 'cwd': self.cwd, @@ -87,75 +71,125 @@ class ExecutionContext(object): args.update(preexec_fn=os.setsid) return args - def start(self, timeout=0): + def start(self): joined = str_join(' ', self.cmd) self._log.debug('COMMAND: %s', joined) self._log.debug('WORKDIR: %s', self.cwd) self._log.debug('LOGFILE: %s', self.report.logpath) self.report.begin() self.proc = subprocess.Popen(self.cmd, **self._popen_args()) - if timeout > 0: - self.timer = threading.Timer(timeout, self._expire) - self.timer.start() + self._log.debug(' PID: %d', self.proc.pid) + self._log.debug(' PGID: %d', os.getpgid(self.proc.pid)) return self._scoped() @contextlib.contextmanager def _scoped(self): yield self - self._log.debug('Killing scoped process') - if self.proc.poll() is None: - self.kill() - self.report.killed() + if self.is_server: + # the server is supposed to run until we stop it + if self.returncode is not None: + self.report.died() + else: + if self.stop_signal != SIGNONE: + if self.sigwait(self.stop_signal): + self.report.end(self.returncode) + else: + self.report.killed() + else: + self.sigwait(SIGKILL) else: - self._log.debug('Process died unexpectedly') - self.report.died() - - def wait(self): - self.proc.communicate() - if self.timer: - self.timer.cancel() - self.report.end(self.returncode) + # the client is supposed to exit normally + if self.returncode is not None: + self.report.end(self.returncode) + else: + self.sigwait(SIGKILL) + self.report.killed() + self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode)) + + # Send a signal to the process and then wait for it to end + # If the signal requested is SIGNONE, no signal is sent, and + # instead we just wait for the process to end; further if it + # does not end normally with SIGNONE, we mark it as expired. + # If the process fails to end and the signal is not SIGKILL, + # it re-runs with SIGKILL so that a real process kill occurs + # returns True if the process ended, False if it may not have + def sigwait(self, sig=SIGKILL, timeout=2): + try: + if sig != SIGNONE: + self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig)) + if sig == SIGKILL: + self.killed = True + try: + if platform.system() != 'Windows': + os.killpg(os.getpgid(self.proc.pid), sig) + else: + self.proc.send_signal(sig) + except Exception: + self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info()) + self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout)) + self.proc.communicate(timeout=timeout) + self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode)) + self.report.end(self.returncode) + return True + except subprocess.TimeoutExpired: + self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid)) + if sig == SIGNONE: + self.expired = True + return False if sig == SIGKILL else self.sigwait(SIGKILL, 1) + + # called on the client process to wait for it to end naturally + def wait(self, timeout): + self.sigwait(SIGNONE, timeout) @property def returncode(self): return self.proc.returncode if self.proc else None -def exec_context(port, logdir, test, prog): +def exec_context(port, logdir, test, prog, is_server): report = ExecReporter(logdir, test, prog) prog.build_command(port) - return ExecutionContext(prog.command, prog.workdir, prog.env, report) + return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report) def run_test(testdir, logdir, test_dict, max_retry, async=True): logger = multiprocessing.get_logger() - def ensure_socket_open(proc, port, max_delay): - sleeped = 0.1 - time.sleep(sleeped) - sleep_step = 0.2 + def ensure_socket_open(sv, port, test): + slept = 0.1 + time.sleep(slept) + sleep_step = 0.1 while True: - # Create sockets every iteration because refused sockets cannot be - # reused on some systems. - sock4 = socket.socket() - sock6 = socket.socket(family=socket.AF_INET6) - try: - if sock4.connect_ex(('127.0.0.1', port)) == 0 \ - or sock6.connect_ex(('::1', port)) == 0: - return True - if proc.poll() is not None: - logger.warn('server process is exited') - return False - if sleeped > max_delay: - logger.warn('sleeped for %f seconds but server port is not open' % sleeped) - return False - time.sleep(sleep_step) - sleeped += sleep_step - finally: - sock4.close() - sock6.close() - logger.debug('waited %f sec for server port open' % sleeped) - return True + if slept > test.delay: + logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept)) + return False + if test.socket == 'domain': + if not os.path.exists(domain_socket_path(port)): + logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept)) + time.sleep(sleep_step) + slept += sleep_step + elif test.socket == 'abstract': + return True + else: + # Create sockets every iteration because refused sockets cannot be + # reused on some systems. + sock4 = socket.socket() + sock6 = socket.socket(family=socket.AF_INET6) + try: + if sock4.connect_ex(('127.0.0.1', port)) == 0 \ + or sock6.connect_ex(('::1', port)) == 0: + return True + if sv.proc.poll() is not None: + logger.warn('[{0}] server process is exited'.format(sv.proc.pid)) + return False + logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept)) + time.sleep(sleep_step) + slept += sleep_step + finally: + sock4.close() + sock6.close() + logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept)) + return True try: max_bind_retry = 3 @@ -169,31 +203,27 @@ def run_test(testdir, logdir, test_dict, max_retry, async=True): logger.debug('Start') with PortAllocator.alloc_port_scoped(ports, test.socket) as port: logger.debug('Start with port %d' % port) - sv = exec_context(port, logdir, test, test.server) - cl = exec_context(port, logdir, test, test.client) + sv = exec_context(port, logdir, test, test.server, True) + cl = exec_context(port, logdir, test, test.client, False) logger.debug('Starting server') with sv.start(): - if test.socket in ('domain', 'abstract'): - time.sleep(0.1) - port_ok = True - else: - port_ok = ensure_socket_open(sv.proc, port, test.delay) + port_ok = ensure_socket_open(sv, port, test) if port_ok: connect_retry_count = 0 - max_connect_retry = 3 - connect_retry_wait = 0.5 + max_connect_retry = 12 + connect_retry_wait = 0.25 while True: if sv.proc.poll() is not None: logger.info('not starting client because server process is absent') break logger.debug('Starting client') - cl.start(test.timeout) - logger.debug('Waiting client') - cl.wait() + cl.start() + logger.debug('Waiting client (up to %d secs)' % test.timeout) + cl.wait(test.timeout) if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry: if connect_retry_count > 0 and connect_retry_count < max_connect_retry: - logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) + logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) # Wait for 50ms to see if server does not die at the end. time.sleep(0.05) break @@ -205,12 +235,18 @@ def run_test(testdir, logdir, test_dict, max_retry, async=True): logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name) bind_retry_count += 1 else: - if cl.expired: - result = RESULT_TIMEOUT + result = RESULT_TIMEOUT if cl.expired else cl.returncode if cl.proc.poll() is not None else RESULT_ERROR + + # For servers that handle a controlled shutdown by signal + # if they are killed, or return an error code, that is a + # problem. For servers that are not signal-aware, we simply + # kill them off; if we didn't kill them off, something else + # happened (crashed?) + if test.server.stop_signal != 0: + if sv.killed or sv.returncode > 0: + result |= RESULT_ERROR else: - result = cl.proc.returncode if cl.proc else RESULT_ERROR if not sv.killed: - # Server died without being killed. result |= RESULT_ERROR if result == 0 or retry_count >= max_retry: diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py index 74fd916ec..633e92616 100644 --- a/test/crossrunner/test.py +++ b/test/crossrunner/test.py @@ -22,22 +22,20 @@ import multiprocessing import os import sys from .compat import path_join -from .util import merge_dict - - -def domain_socket_path(port): - return '/tmp/ThriftTest.thrift.%d' % port +from .util import merge_dict, domain_socket_path class TestProgram(object): - def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None, + def __init__(self, kind, name, protocol, transport, socket, workdir, stop_signal, command, env=None, extra_args=[], extra_args2=[], join_args=False, **kwargs): + self.kind = kind self.name = name self.protocol = protocol self.transport = transport self.socket = socket self.workdir = workdir + self.stop_signal = stop_signal self.command = None self._base_command = self._fix_cmd_path(command) if env: diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py index e2d195a22..c214df85a 100644 --- a/test/crossrunner/util.py +++ b/test/crossrunner/util.py @@ -20,6 +20,10 @@ import copy +def domain_socket_path(port): + return '/tmp/ThriftTest.thrift.%d' % port + + def merge_dict(base, update): """Update dict concatenating list values""" res = copy.deepcopy(base) |