diff options
author | Roger Meier <roger@apache.org> | 2015-03-24 22:30:40 +0100 |
---|---|---|
committer | Roger Meier <roger@apache.org> | 2015-03-24 22:30:40 +0100 |
commit | 41ad4342c5a0389ab2cf2dbf098086413ac01204 (patch) | |
tree | 1ce9c7d4e70a7370f132ce5d4fd245a20a68e45e /test/crossrunner | |
parent | a2d12b6ee3d9aa66f2c16dc6a5ee6eef5f1eba92 (diff) | |
download | thrift-41ad4342c5a0389ab2cf2dbf098086413ac01204.tar.gz |
THRIFT-847 Test Framework harmonization across all languages
THRIFT-2946 Enhance usability of cross test framework
Patch: Nobuaki Sukegawa
This closes: #358
Diffstat (limited to 'test/crossrunner')
-rw-r--r-- | test/crossrunner/__init__.py | 25 | ||||
-rw-r--r-- | test/crossrunner/collect.py | 136 | ||||
-rw-r--r-- | test/crossrunner/prepare.py | 55 | ||||
-rw-r--r-- | test/crossrunner/report.py | 395 | ||||
-rw-r--r-- | test/crossrunner/run.py | 317 | ||||
-rw-r--r-- | test/crossrunner/test.py | 136 | ||||
-rw-r--r-- | test/crossrunner/util.py | 31 |
7 files changed, 1095 insertions, 0 deletions
diff --git a/test/crossrunner/__init__.py b/test/crossrunner/__init__.py new file mode 100644 index 000000000..06de2d093 --- /dev/null +++ b/test/crossrunner/__init__.py @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from crossrunner.test import test_name +from crossrunner.collect import collect_tests +from crossrunner.run import TestDispatcher +from crossrunner.report import generate_known_failures +from crossrunner.report import load_known_failures +from crossrunner.prepare import prepare diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py new file mode 100644 index 000000000..80a82e71a --- /dev/null +++ b/test/crossrunner/collect.py @@ -0,0 +1,136 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import platform +from itertools import product + +from crossrunner.util import merge_dict + +# Those keys are passed to execution as is. +# Note that there are keys other than these, namely: +# delay: After server is started, client start is delayed for the value +# (seconds). +# timeout: Test timeout after client is started (seconds). +# platforms: Supported platforms. Should match platform.system() value. +# protocols: list of supported protocols +# transports: list of supported transports +# sockets: list of supported sockets +VALID_JSON_KEYS = [ + 'name', # name of the library, typically a language name + 'workdir', # work directory where command is executed + 'command', # test command + 'extra_args', # args appended to command after other args are appended + 'join_args', # whether args should be passed as single concatenated string + 'env', # additional environmental variable +] + +DEFAULT_DELAY = 1 +DEFAULT_TIMEOUT = 5 + + +def collect_testlibs(config, server_match, client_match): + """Collects server/client configurations from library configurations""" + def expand_libs(config): + for lib in config: + sv = lib.pop('server', None) + cl = lib.pop('client', None) + yield lib, sv, cl + + def yield_testlibs(base_configs, configs, match): + for base, conf in zip(base_configs, configs): + if conf: + if not match or base['name'] in match: + platforms = conf.get('platforms') or base.get('platforms') + if not platforms or platform.system() in platforms: + yield merge_dict(base, conf) + + libs, svs, cls = zip(*expand_libs(config)) + servers = list(yield_testlibs(libs, svs, server_match)) + clients = list(yield_testlibs(libs, cls, client_match)) + return servers, clients + + +def do_collect_tests(servers, clients): + def intersection(key, o1, o2): + """intersection of two collections. + collections are replaced with sets the first time""" + def cached_set(o, key): + v = o[key] + if not isinstance(v, set): + v = set(v) + o[key] = v + return v + return cached_set(o1, key) & cached_set(o2, key) + + # each entry can be spec:impl (e.g. binary:accel) + def intersect_with_spec(key, o1, o2): + # store as set of (spec, impl) tuple + def cached_set(o): + def to_spec_impl_tuples(values): + for v in values: + spec, _, impl = v.partition(':') + yield spec, impl or spec + v = o[key] + if not isinstance(v, set): + v = set(to_spec_impl_tuples(set(v))) + o[key] = v + return v + for spec1, impl1 in cached_set(o1): + for spec2, impl2 in cached_set(o2): + if spec1 == spec2: + name = impl1 if impl1 == impl2 else '%s-%s' % (impl1, impl2) + yield name, impl1, impl2 + + def maybe_max(key, o1, o2, default): + """maximum of two if present, otherwise defult value""" + v1 = o1.get(key) + v2 = o2.get(key) + return max(v1, v2) if v1 and v2 else v1 or v2 or default + + def filter_with_validkeys(o): + ret = {} + for key in VALID_JSON_KEYS: + if key in o: + ret[key] = o[key] + return ret + + def merge_metadata(o, **ret): + for key in VALID_JSON_KEYS: + if key in o: + ret[key] = o[key] + return ret + + for sv, cl in product(servers, clients): + for proto, proto1, proto2 in intersect_with_spec('protocols', sv, cl): + for trans, trans1, trans2 in intersect_with_spec('transports', sv, cl): + for sock in intersection('sockets', sv, cl): + yield { + 'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}), + 'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}), + 'delay': maybe_max('delay', sv, cl, DEFAULT_DELAY), + 'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT), + 'protocol': proto, + 'transport': trans, + 'socket': sock + } + + +def collect_tests(tests_dict, server_match, client_match): + sv, cl = collect_testlibs(tests_dict, server_match, client_match) + return list(do_collect_tests(sv, cl)) diff --git a/test/crossrunner/prepare.py b/test/crossrunner/prepare.py new file mode 100644 index 000000000..6e4f6eea8 --- /dev/null +++ b/test/crossrunner/prepare.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import subprocess + +from crossrunner.collect import collect_testlibs + + +def prepare(config_dict, testdir, server_match, client_match): + libs, libs2 = collect_testlibs(config_dict, server_match, client_match) + libs.extend(libs2) + + def prepares(): + for lib in libs: + pre = lib.get('prepare') + if pre: + yield pre, lib['workdir'] + + def files(): + for lib in libs: + workdir = os.path.join(testdir, lib['workdir']) + for c in lib['command']: + if not c.startswith('-'): + p = os.path.join(workdir, c) + if not os.path.exists(p): + yield os.path.split(p) + + def make(p): + d, f = p + with open(os.devnull, 'w') as devnull: + return subprocess.Popen(['make', f], cwd=d, stderr=devnull) + + for pre, d in prepares(): + subprocess.Popen(pre, cwd=d).wait() + + for p in list(map(make, set(files()))): + p.wait() + return True diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py new file mode 100644 index 000000000..da478fa4b --- /dev/null +++ b/test/crossrunner/report.py @@ -0,0 +1,395 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import datetime +import json +import multiprocessing +import os +import platform +import re +import subprocess +import sys +import time +import traceback + +from crossrunner.test import TestEntry + +LOG_DIR = 'log' +RESULT_HTML = 'result.html' +RESULT_JSON = 'results.json' +FAIL_JSON = 'known_failures_%s.json' + + +def generate_known_failures(testdir, overwrite, save, out): + def collect_failures(results): + success_index = 5 + for r in results: + if not r[success_index]: + yield TestEntry.get_name(*r) + try: + with open(os.path.join(testdir, RESULT_JSON), 'r') as fp: + results = json.load(fp) + except IOError: + sys.stderr.write('Unable to load last result. Did you run tests ?\n') + return False + fails = collect_failures(results['results']) + if not overwrite: + known = load_known_failures(testdir) + known.extend(fails) + fails = known + fails_json = json.dumps(sorted(set(fails)), indent=2) + if save: + with open(os.path.join(testdir, FAIL_JSON % platform.system()), 'w+') as fp: + fp.write(fails_json) + sys.stdout.write('Successfully updated known failures.\n') + if out: + sys.stdout.write(fails_json) + sys.stdout.write('\n') + return True + + +def load_known_failures(testdir): + try: + with open(os.path.join(testdir, FAIL_JSON % platform.system()), 'r') as fp: + return json.load(fp) + except IOError: + return [] + + +class TestReporter(object): + # Unfortunately, standard library doesn't handle timezone well + # DATETIME_FORMAT = '%a %b %d %H:%M:%S %Z %Y' + DATETIME_FORMAT = '%a %b %d %H:%M:%S %Y' + + def __init__(self): + self._log = multiprocessing.get_logger() + self._lock = multiprocessing.Lock() + + @classmethod + def test_logfile(cls, dir, test_name, prog_kind): + return os.path.realpath(os.path.join( + dir, 'log', '%s_%s.log' % (test_name, prog_kind))) + + def _start(self): + self._start_time = time.time() + + @property + def _elapsed(self): + return time.time() - self._start_time + + @classmethod + def _format_date(cls): + return '%s' % datetime.datetime.now().strftime(cls.DATETIME_FORMAT) + + def _print_date(self): + self.out.write('%s\n' % self._format_date()) + + def _print_bar(self, out=None): + (out or self.out).write( + '======================================================================\n') + + def _print_exec_time(self): + self.out.write('Test execution took {:.1f} seconds.\n'.format(self._elapsed)) + + +class ExecReporter(TestReporter): + def __init__(self, testdir, test, prog): + super(ExecReporter, self).__init__() + self._test = test + self._prog = prog + self.logpath = self.test_logfile(testdir, test.name, prog.kind) + self.out = None + + def begin(self): + self._start() + self._open() + if self.out and not self.out.closed: + self._print_header() + else: + self._log.debug('Output stream is not available.') + + def end(self, returncode): + self._lock.acquire() + try: + if self.out and not self.out.closed: + self._print_footer(returncode) + self._close() + self.out = None + else: + self._log.debug('Output stream is not available.') + finally: + self._lock.release() + + def killed(self): + self._lock.acquire() + try: + if self.out and not self.out.closed: + self._print_footer() + self._close() + self.out = None + else: + self._log.debug('Output stream is not available.') + finally: + self._lock.release() + + _init_failure_exprs = { + 'server': list(map(re.compile, [ + '[Aa]ddress already in use', + 'Could not bind', + 'EADDRINUSE', + ])), + 'client': list(map(re.compile, [ + '[Cc]onnection refused', + 'Could not connect to localhost', + 'ECONNREFUSED', + 'No such file or directory', # domain socket + ])), + } + + def maybe_false_positive(self): + """Searches through log file for socket bind error. + Returns True if suspicious expression is found, otherwise False""" + def match(line): + for expr in exprs: + if expr.search(line): + return True + try: + if self.out and not self.out.closed: + self.out.flush() + exprs = list(map(re.compile, self._init_failure_exprs[self._prog.kind])) + + server_logfile = self.logpath + # need to handle unicode errors on Python 3 + kwargs = {} if sys.version_info[0] < 3 else {'errors': 'replace'} + with open(server_logfile, 'r', **kwargs) as fp: + if any(map(match, fp)): + return True + except (KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + self._log.warn('[%s]: Error while detecting false positive: %s' % (self._test.name, str(ex))) + self._log.info(traceback.print_exc()) + return False + + def _open(self): + self.out = open(self.logpath, 'w+') + + def _close(self): + self.out.close() + + def _print_header(self): + self._print_date() + self.out.write('Executing: %s\n' % ' '.join(self._prog.command)) + self.out.write('Directory: %s\n' % self._prog.workdir) + self.out.write('config:delay: %s\n' % self._test.delay) + self.out.write('config:timeout: %s\n' % self._test.timeout) + self._print_bar() + self.out.flush() + + def _print_footer(self, returncode=None): + self._print_bar() + if returncode is not None: + self.out.write('Return code: %d\n' % returncode) + else: + self.out.write('Process is killed.\n') + self._print_exec_time() + self._print_date() + + +class SummaryReporter(TestReporter): + def __init__(self, testdir, concurrent=True): + super(SummaryReporter, self).__init__() + self.testdir = testdir + self.logdir = os.path.join(testdir, LOG_DIR) + self.out_path = os.path.join(testdir, RESULT_JSON) + self.concurrent = concurrent + self.out = sys.stdout + self._platform = platform.system() + self._revision = self._get_revision() + self._tests = [] + if not os.path.exists(self.logdir): + os.mkdir(self.logdir) + self._known_failures = load_known_failures(testdir) + self._unexpected_success = [] + self._unexpected_failure = [] + self._expected_failure = [] + self._print_header() + + def _get_revision(self): + p = subprocess.Popen(['git', 'rev-parse', '--short', 'HEAD'], + cwd=self.testdir, stdout=subprocess.PIPE) + out, _ = p.communicate() + return out.strip() + + def _format_test(self, test, with_result=True): + name = '%s-%s' % (test.server.name, test.client.name) + trans = '%s-%s' % (test.transport, test.socket) + if not with_result: + return '{:19s}{:13s}{:25s}'.format(name[:18], test.protocol[:12], trans[:24]) + else: + result = 'success' if test.success else ( + 'timeout' if test.expired else 'failure') + result_string = '%s(%d)' % (result, test.returncode) + return '{:19s}{:13s}{:25s}{:s}\n'.format(name[:18], test.protocol[:12], trans[:24], result_string) + + def _print_test_header(self): + self._print_bar() + self.out.write( + '{:19s}{:13s}{:25s}{:s}\n'.format('server-client:', 'protocol:', 'transport:', 'result:')) + + def _print_header(self): + self._start() + self.out.writelines([ + 'Apache Thrift - Integration Test Suite\n', + ]) + self._print_date() + self._print_test_header() + + def _print_unexpected_failure(self): + if len(self._unexpected_failure) > 0: + self.out.writelines([ + '*** Following %d failures were unexpected ***:\n' % len(self._unexpected_failure), + 'If it is introduced by you, please fix it before submitting the code.\n', + # 'If not, please report at https://issues.apache.org/jira/browse/THRIFT\n', + ]) + self._print_test_header() + for i in self._unexpected_failure: + self.out.write(self._format_test(self._tests[i])) + self._print_bar() + else: + self.out.write('No unexpected failures.\n') + + def _print_unexpected_success(self): + if len(self._unexpected_success) > 0: + self.out.write( + 'Following %d tests were known to fail but succeeded (it\'s normal):\n' % len(self._unexpected_success)) + self._print_test_header() + for i in self._unexpected_success: + self.out.write(self._format_test(self._tests[i])) + self._print_bar() + + def _print_footer(self): + fail_count = len(self._expected_failure) + len(self._unexpected_failure) + self._print_bar() + self._print_unexpected_success() + self._print_unexpected_failure() + self._write_html_data() + self._assemble_log('unexpected failures', self._unexpected_failure) + self._assemble_log('known failures', self._expected_failure) + self.out.writelines([ + 'You can browse results at:\n', + '\tfile://%s/%s\n' % (self.testdir, RESULT_HTML), + 'Full log for each test is here:\n', + '\ttest/log/client_server_protocol_transport_client.log\n', + '\ttest/log/client_server_protocol_transport_server.log\n', + '%d failed of %d tests in total.\n' % (fail_count, len(self._tests)), + ]) + self._print_exec_time() + self._print_date() + + def _render_result(self, test): + return [ + test.server.name, + test.client.name, + test.protocol, + test.transport, + test.socket, + test.success, + test.as_expected, + test.returncode, + { + 'server': self.test_logfile(test.testdir, test.name, test.server.kind), + 'client': self.test_logfile(test.testdir, test.name, test.client.kind), + }, + ] + + def _write_html_data(self): + """Writes JSON data to be read by result html""" + results = [self._render_result(r) for r in self._tests] + with open(self.out_path, 'w+') as fp: + fp.write(json.dumps({ + 'date': self._format_date(), + 'revision': str(self._revision), + 'platform': self._platform, + 'duration': '{:.1f}'.format(self._elapsed), + 'results': results, + }, indent=2)) + + def _assemble_log(self, title, indexes): + if len(indexes) > 0: + def add_prog_log(fp, test, prog_kind): + fp.write('*************************** %s message ***************************\n' + % prog_kind) + path = self.test_logfile(self.testdir, test.name, prog_kind) + kwargs = {} if sys.version_info[0] < 3 else {'errors': 'replace'} + with open(path, 'r', **kwargs) as prog_fp: + fp.write(prog_fp.read()) + filename = title.replace(' ', '_') + '.log' + with open(os.path.join(self.logdir, filename), 'w+') as fp: + for test in map(self._tests.__getitem__, indexes): + fp.write('TEST: [%s]\n' % test.name) + add_prog_log(fp, test, test.server.kind) + add_prog_log(fp, test, test.client.kind) + fp.write('**********************************************************************\n\n') + self.out.write('%s are logged to test/%s/%s\n' % (title.capitalize(), LOG_DIR, filename)) + + def end(self): + self._print_footer() + return len(self._unexpected_failure) == 0 + + def add_test(self, test_dict): + test = TestEntry(self.testdir, **test_dict) + self._lock.acquire() + try: + if not self.concurrent: + self.out.write(self._format_test(test, False)) + self.out.flush() + self._tests.append(test) + return len(self._tests) - 1 + finally: + self._lock.release() + + def add_result(self, index, returncode, expired): + self._lock.acquire() + try: + failed = returncode is None or returncode != 0 + test = self._tests[index] + known = test.name in self._known_failures + if failed: + if known: + self._log.debug('%s failed as expected' % test.name) + self._expected_failure.append(index) + else: + self._log.info('unexpected failure: %s' % test.name) + self._unexpected_failure.append(index) + elif known: + self._log.info('unexpected success: %s' % test.name) + self._unexpected_success.append(index) + test.success = not failed + test.returncode = returncode + test.expired = expired + test.as_expected = known == failed + if not self.concurrent: + result = 'success' if not failed else 'failure' + result_string = '%s(%d)' % (result, returncode) + self.out.write(result_string + '\n') + else: + self.out.write(self._format_test(test)) + finally: + self._lock.release() diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py new file mode 100644 index 000000000..e3300bad8 --- /dev/null +++ b/test/crossrunner/run.py @@ -0,0 +1,317 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import contextlib +import multiprocessing +import multiprocessing.managers +import os +import platform +import random +import socket +import signal +import subprocess +import threading +import time +import traceback + +from crossrunner.test import TestEntry, domain_socket_path +from crossrunner.report import ExecReporter, SummaryReporter + +RESULT_TIMEOUT = 128 +RESULT_ERROR = 64 + + +class ExecutionContext(object): + def __init__(self, cmd, cwd, env, report): + self._log = multiprocessing.get_logger() + self.report = report + self.cmd = cmd + self.cwd = cwd + self.env = env + self.timer = None + self.expired = False + + def _expire(self): + self._log.info('Timeout') + self.expired = True + self.kill() + + def kill(self): + self._log.debug('Killing process : %d' % self.proc.pid) + if platform.system() != 'Windows': + try: + os.killpg(self.proc.pid, signal.SIGKILL) + except Exception as err: + self._log.info('Failed to kill process group : %s' % str(err)) + try: + self.proc.kill() + except Exception as err: + self._log.info('Failed to kill process : %s' % str(err)) + self.report.killed() + + def _popen_args(self): + args = { + 'cwd': self.cwd, + 'env': self.env, + 'stdout': self.report.out, + 'stderr': subprocess.STDOUT, + } + # make sure child processes doesn't remain after killing + if platform.system() == 'Windows': + DETACHED_PROCESS = 0x00000008 + args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP) + else: + args.update(preexec_fn=os.setsid) + return args + + def start(self, timeout=0): + self._log.debug('COMMAND: %s', ' '.join(self.cmd)) + self._log.debug('WORKDIR: %s', self.cwd) + self._log.debug('LOGFILE: %s', self.report.logpath) + self.report.begin() + self.proc = subprocess.Popen(self.cmd, **self._popen_args()) + if timeout > 0: + self.timer = threading.Timer(timeout, self._expire) + self.timer.start() + return self._scoped() + + @contextlib.contextmanager + def _scoped(self): + yield self + self._log.debug('Killing scoped process') + self.kill() + + def wait(self): + self.proc.communicate() + if self.timer: + self.timer.cancel() + self.report.end(self.returncode) + + @property + def returncode(self): + return self.proc.returncode if self.proc else None + + +def exec_context(port, testdir, test, prog): + report = ExecReporter(testdir, test, prog) + prog.build_command(port) + return ExecutionContext(prog.command, prog.workdir, prog.env, report) + + +def run_test(testdir, test_dict, async=True, max_retry=3): + try: + logger = multiprocessing.get_logger() + retry_count = 0 + test = TestEntry(testdir, **test_dict) + while True: + if stop.is_set(): + logger.debug('Skipping because shutting down') + return None + logger.debug('Start') + with PortAllocator.alloc_port_scoped(ports, test.socket) as port: + logger.debug('Start with port %d' % port) + sv = exec_context(port, testdir, test, test.server) + cl = exec_context(port, testdir, test, test.client) + + logger.debug('Starting server') + with sv.start(): + if test.delay > 0: + logger.debug('Delaying client for %.2f seconds' % test.delay) + time.sleep(test.delay) + cl_retry_count = 0 + cl_max_retry = 10 + cl_retry_wait = 0.5 + while True: + logger.debug('Starting client') + cl.start(test.timeout) + logger.debug('Waiting client') + cl.wait() + if not cl.report.maybe_false_positive() or cl_retry_count >= cl_max_retry: + if cl_retry_count > 0 and cl_retry_count < cl_max_retry: + logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, cl_retry_count, cl_retry_wait)) + break + logger.debug('Server may not be ready, waiting %.2f second...' % cl_retry_wait) + time.sleep(cl_retry_wait) + cl_retry_count += 1 + + if not sv.report.maybe_false_positive() or retry_count >= max_retry: + logger.debug('Finish') + return RESULT_TIMEOUT if cl.expired else cl.proc.returncode + logger.warn('[%s]: Detected socket bind failure, retrying...' % test.server.name) + retry_count += 1 + except (KeyboardInterrupt, SystemExit): + logger.info('Interrupted execution') + if not async: + raise + stop.set() + return None + except Exception as ex: + logger.warn('Error while executing test : %s' % str(ex)) + if not async: + raise + logger.info(traceback.print_exc()) + return RESULT_ERROR + + +class PortAllocator(object): + def __init__(self): + self._log = multiprocessing.get_logger() + self._lock = multiprocessing.Lock() + self._ports = set() + self._dom_ports = set() + self._last_alloc = 0 + + def _get_tcp_port(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + port = sock.getsockname()[1] + self._lock.acquire() + try: + ok = port not in self._ports + if ok: + self._ports.add(port) + self._last_alloc = time.time() + finally: + self._lock.release() + sock.close() + return port if ok else self._get_tcp_port() + + def _get_domain_port(self): + port = random.randint(1024, 65536) + self._lock.acquire() + try: + ok = port not in self._dom_ports + if ok: + self._dom_ports.add(port) + finally: + self._lock.release() + return port if ok else self._get_domain_port() + + def alloc_port(self, socket_type): + if socket_type == 'domain': + return self._get_domain_port() + else: + return self._get_tcp_port() + + # static method for inter-process invokation + @staticmethod + @contextlib.contextmanager + def alloc_port_scoped(allocator, socket_type): + port = allocator.alloc_port(socket_type) + yield port + allocator.free_port(socket_type, port) + + def free_port(self, socket_type, port): + self._log.debug('free_port') + self._lock.acquire() + try: + if socket_type == 'domain': + self._dom_ports.remove(port) + path = domain_socket_path(port) + if os.path.exists(path): + os.remove(path) + else: + self._ports.remove(port) + except IOError as err: + self._log.info('Error while freeing port : %s' % str(err)) + finally: + self._lock.release() + + +class NonAsyncResult(object): + def __init__(self, value): + self._value = value + + def get(self, timeout=None): + return self._value + + def wait(self, timeout=None): + pass + + def ready(self): + return True + + def successful(self): + return self._value == 0 + + +class TestDispatcher(object): + def __init__(self, testdir, concurrency): + self._log = multiprocessing.get_logger() + self.testdir = testdir + # seems needed for python 2.x to handle keyboard interrupt + self._stop = multiprocessing.Event() + self._async = concurrency > 1 + if not self._async: + self._pool = None + global stop + global ports + stop = self._stop + ports = PortAllocator() + else: + self._m = multiprocessing.managers.BaseManager() + self._m.register('ports', PortAllocator) + self._m.start() + self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,)) + self._report = SummaryReporter(testdir, concurrency > 1) + self._log.debug( + 'TestDispatcher started with %d concurrent jobs' % concurrency) + + def _pool_init(self, address): + global stop + global m + global ports + stop = self._stop + m = multiprocessing.managers.BaseManager(address) + m.connect() + ports = m.ports() + + def _dispatch_sync(self, test, cont): + r = run_test(self.testdir, test, False) + cont(r) + return NonAsyncResult(r) + + def _dispatch_async(self, test, cont): + return self._pool.apply_async(func=run_test, args=(self.testdir, test,), callback=cont) + + def dispatch(self, test): + index = self._report.add_test(test) + + def cont(r): + if not self._stop.is_set(): + self._log.debug('freeing port') + self._log.debug('adding result') + self._report.add_result(index, r, r == RESULT_TIMEOUT) + self._log.debug('finish continuation') + fn = self._dispatch_async if self._async else self._dispatch_sync + return fn(test, cont) + + def wait(self): + if self._async: + self._pool.close() + self._pool.join() + self._m.shutdown() + return self._report.end() + + def terminate(self): + self._stop.set() + if self._async: + self._pool.terminate() + self._pool.join() + self._m.shutdown() diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py new file mode 100644 index 000000000..512e664b0 --- /dev/null +++ b/test/crossrunner/test.py @@ -0,0 +1,136 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import copy +import multiprocessing +import os +import sys + +from crossrunner.util import merge_dict + + +def domain_socket_path(port): + return '/tmp/ThriftTest.thrift.%d' % port + + +class TestProgram(object): + def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None, + extra_args=[], join_args=False, **kwargs): + self.kind = kind + self.name = name + self.protocol = protocol + self.transport = transport + self.socket = socket + self.workdir = workdir + self.command = None + self._base_command = self._fix_cmd_path(command) + if env: + self.env = copy.copy(os.environ) + self.env.update(env) + else: + self.env = os.environ + self._extra_args = extra_args + self._join_args = join_args + + def _fix_cmd_path(self, cmd): + # if the arg is a file in the current directory, make it path + def abs_if_exists(arg): + p = os.path.join(self.workdir, arg) + return p if os.path.exists(p) else arg + + if cmd[0] == 'python': + cmd[0] = sys.executable + else: + cmd[0] = abs_if_exists(cmd[0]) + return cmd + + def _socket_arg(self, socket, port): + return { + 'ip-ssl': '--ssl', + 'domain': '--domain-socket=%s' % domain_socket_path(port), + }.get(socket, None) + + def build_command(self, port): + cmd = copy.copy(self._base_command) + args = [] + args.append('--protocol=' + self.protocol) + args.append('--transport=' + self.transport) + socket_arg = self._socket_arg(self.socket, port) + if socket_arg: + args.append(socket_arg) + args.append('--port=%d' % port) + if self._join_args: + cmd.append('%s' % " ".join(args)) + else: + cmd.extend(args) + if self._extra_args: + cmd.extend(self._extra_args) + self.command = cmd + return self.command + + +class TestEntry(object): + def __init__(self, testdir, server, client, delay, timeout, **kwargs): + self.testdir = testdir + self._log = multiprocessing.get_logger() + self._config = kwargs + self.protocol = kwargs['protocol'] + self.transport = kwargs['transport'] + self.socket = kwargs['socket'] + self.server = TestProgram('server', **self._fix_workdir(merge_dict(self._config, server))) + self.client = TestProgram('client', **self._fix_workdir(merge_dict(self._config, client))) + self.delay = delay + self.timeout = timeout + self._name = None + # results + self.success = None + self.as_expected = None + self.returncode = None + self.expired = False + + def _fix_workdir(self, config): + key = 'workdir' + path = config.get(key, None) + if not path: + path = self.testdir + if os.path.isabs(path): + path = os.path.realpath(path) + else: + path = os.path.realpath(os.path.join(self.testdir, path)) + config.update({key: path}) + return config + + @classmethod + def get_name(cls, server, client, proto, trans, sock, *args): + return '%s-%s_%s_%s-%s' % (server, client, proto, trans, sock) + + @property + def name(self): + if not self._name: + self._name = self.get_name( + self.server.name, self.client.name, self.protocol, self.transport, self.socket) + return self._name + + @property + def transport_name(self): + return '%s-%s' % (self.transport, self.socket) + + +def test_name(server, client, protocol, transport, socket, **kwargs): + return TestEntry.get_name(server['name'], client['name'], protocol, transport, socket) diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py new file mode 100644 index 000000000..750ed475e --- /dev/null +++ b/test/crossrunner/util.py @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import copy + + +def merge_dict(base, update): + """Update dict concatenating list values""" + res = copy.deepcopy(base) + for k, v in list(update.items()): + if k in list(res.keys()) and isinstance(v, list): + res[k].extend(v) + else: + res[k] = v + return res |