diff options
author | Nobuaki Sukegawa <nsuke@apache.org> | 2016-02-03 01:57:03 +0900 |
---|---|---|
committer | Nobuaki Sukegawa <nsuke@apache.org> | 2016-02-04 14:28:24 +0900 |
commit | 10308cb975ac090584068d0470b81e41555b2f35 (patch) | |
tree | bc0bb670626a8a196dc00df6429ae4dcc838b4c4 /test/crossrunner | |
parent | d094e79de7e0bd61320f006c83c0de669363bce8 (diff) | |
download | thrift-10308cb975ac090584068d0470b81e41555b2f35.tar.gz |
THRIFT-3596 Better conformance to PEP8
This closes #832
Diffstat (limited to 'test/crossrunner')
-rw-r--r-- | test/crossrunner/collect.py | 188 | ||||
-rw-r--r-- | test/crossrunner/compat.py | 26 | ||||
-rw-r--r-- | test/crossrunner/report.py | 740 | ||||
-rw-r--r-- | test/crossrunner/run.py | 570 | ||||
-rw-r--r-- | test/crossrunner/test.py | 212 | ||||
-rw-r--r-- | test/crossrunner/util.py | 16 |
6 files changed, 876 insertions, 876 deletions
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py index f92b9e2d7..e91ac0b43 100644 --- a/test/crossrunner/collect.py +++ b/test/crossrunner/collect.py @@ -40,13 +40,13 @@ from .util import merge_dict # (e.g. "binary" is equivalent to "binary:binary" in tests.json) # VALID_JSON_KEYS = [ - 'name', # name of the library, typically a language name - 'workdir', # work directory where command is executed - 'command', # test command - 'extra_args', # args appended to command after other args are appended - 'remote_args', # args added to the other side of the program - 'join_args', # whether args should be passed as single concatenated string - 'env', # additional environmental variable + 'name', # name of the library, typically a language name + 'workdir', # work directory where command is executed + 'command', # test command + 'extra_args', # args appended to command after other args are appended + 'remote_args', # args added to the other side of the program + 'join_args', # whether args should be passed as single concatenated string + 'env', # additional environmental variable ] DEFAULT_DELAY = 1 @@ -54,102 +54,102 @@ DEFAULT_TIMEOUT = 5 def _collect_testlibs(config, server_match, client_match=[None]): - """Collects server/client configurations from library configurations""" - def expand_libs(config): - for lib in config: - sv = lib.pop('server', None) - cl = lib.pop('client', None) - yield lib, sv, cl - - def yield_testlibs(base_configs, configs, match): - for base, conf in zip(base_configs, configs): - if conf: - if not match or base['name'] in match: - platforms = conf.get('platforms') or base.get('platforms') - if not platforms or platform.system() in platforms: - yield merge_dict(base, conf) - - libs, svs, cls = zip(*expand_libs(config)) - servers = list(yield_testlibs(libs, svs, server_match)) - clients = list(yield_testlibs(libs, cls, client_match)) - return servers, clients + """Collects server/client configurations from library configurations""" + def expand_libs(config): + for lib in config: + sv = lib.pop('server', None) + cl = lib.pop('client', None) + yield lib, sv, cl + + def yield_testlibs(base_configs, configs, match): + for base, conf in zip(base_configs, configs): + if conf: + if not match or base['name'] in match: + platforms = conf.get('platforms') or base.get('platforms') + if not platforms or platform.system() in platforms: + yield merge_dict(base, conf) + + libs, svs, cls = zip(*expand_libs(config)) + servers = list(yield_testlibs(libs, svs, server_match)) + clients = list(yield_testlibs(libs, cls, client_match)) + return servers, clients def collect_features(config, match): - res = list(map(re.compile, match)) - return list(filter(lambda c: any(map(lambda r: r.search(c['name']), res)), config)) + res = list(map(re.compile, match)) + return list(filter(lambda c: any(map(lambda r: r.search(c['name']), res)), config)) def _do_collect_tests(servers, clients): - def intersection(key, o1, o2): - """intersection of two collections. - collections are replaced with sets the first time""" - def cached_set(o, key): - v = o[key] - if not isinstance(v, set): - v = set(v) - o[key] = v - return v - return cached_set(o1, key) & cached_set(o2, key) - - def intersect_with_spec(key, o1, o2): - # store as set of (spec, impl) tuple - def cached_set(o): - def to_spec_impl_tuples(values): - for v in values: - spec, _, impl = v.partition(':') - yield spec, impl or spec - v = o[key] - if not isinstance(v, set): - v = set(to_spec_impl_tuples(set(v))) - o[key] = v - return v - for spec1, impl1 in cached_set(o1): - for spec2, impl2 in cached_set(o2): - if spec1 == spec2: - name = impl1 if impl1 == impl2 else '%s-%s' % (impl1, impl2) - yield name, impl1, impl2 - - def maybe_max(key, o1, o2, default): - """maximum of two if present, otherwise defult value""" - v1 = o1.get(key) - v2 = o2.get(key) - return max(v1, v2) if v1 and v2 else v1 or v2 or default - - def filter_with_validkeys(o): - ret = {} - for key in VALID_JSON_KEYS: - if key in o: - ret[key] = o[key] - return ret - - def merge_metadata(o, **ret): - for key in VALID_JSON_KEYS: - if key in o: - ret[key] = o[key] - return ret - - for sv, cl in product(servers, clients): - for proto, proto1, proto2 in intersect_with_spec('protocols', sv, cl): - for trans, trans1, trans2 in intersect_with_spec('transports', sv, cl): - for sock in intersection('sockets', sv, cl): - yield { - 'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}), - 'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}), - 'delay': maybe_max('delay', sv, cl, DEFAULT_DELAY), - 'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT), - 'protocol': proto, - 'transport': trans, - 'socket': sock - } + def intersection(key, o1, o2): + """intersection of two collections. + collections are replaced with sets the first time""" + def cached_set(o, key): + v = o[key] + if not isinstance(v, set): + v = set(v) + o[key] = v + return v + return cached_set(o1, key) & cached_set(o2, key) + + def intersect_with_spec(key, o1, o2): + # store as set of (spec, impl) tuple + def cached_set(o): + def to_spec_impl_tuples(values): + for v in values: + spec, _, impl = v.partition(':') + yield spec, impl or spec + v = o[key] + if not isinstance(v, set): + v = set(to_spec_impl_tuples(set(v))) + o[key] = v + return v + for spec1, impl1 in cached_set(o1): + for spec2, impl2 in cached_set(o2): + if spec1 == spec2: + name = impl1 if impl1 == impl2 else '%s-%s' % (impl1, impl2) + yield name, impl1, impl2 + + def maybe_max(key, o1, o2, default): + """maximum of two if present, otherwise defult value""" + v1 = o1.get(key) + v2 = o2.get(key) + return max(v1, v2) if v1 and v2 else v1 or v2 or default + + def filter_with_validkeys(o): + ret = {} + for key in VALID_JSON_KEYS: + if key in o: + ret[key] = o[key] + return ret + + def merge_metadata(o, **ret): + for key in VALID_JSON_KEYS: + if key in o: + ret[key] = o[key] + return ret + + for sv, cl in product(servers, clients): + for proto, proto1, proto2 in intersect_with_spec('protocols', sv, cl): + for trans, trans1, trans2 in intersect_with_spec('transports', sv, cl): + for sock in intersection('sockets', sv, cl): + yield { + 'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}), + 'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}), + 'delay': maybe_max('delay', sv, cl, DEFAULT_DELAY), + 'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT), + 'protocol': proto, + 'transport': trans, + 'socket': sock + } def collect_cross_tests(tests_dict, server_match, client_match): - sv, cl = _collect_testlibs(tests_dict, server_match, client_match) - return list(_do_collect_tests(sv, cl)) + sv, cl = _collect_testlibs(tests_dict, server_match, client_match) + return list(_do_collect_tests(sv, cl)) def collect_feature_tests(tests_dict, features_dict, server_match, feature_match): - sv, _ = _collect_testlibs(tests_dict, server_match) - ft = collect_features(features_dict, feature_match) - return list(_do_collect_tests(sv, ft)) + sv, _ = _collect_testlibs(tests_dict, server_match) + ft = collect_features(features_dict, feature_match) + return list(_do_collect_tests(sv, ft)) diff --git a/test/crossrunner/compat.py b/test/crossrunner/compat.py index 6ab9d713b..f1ca91bb3 100644 --- a/test/crossrunner/compat.py +++ b/test/crossrunner/compat.py @@ -2,23 +2,23 @@ import os import sys if sys.version_info[0] == 2: - _ENCODE = sys.getfilesystemencoding() + _ENCODE = sys.getfilesystemencoding() - def path_join(*args): - bin_args = map(lambda a: a.decode(_ENCODE), args) - return os.path.join(*bin_args).encode(_ENCODE) + def path_join(*args): + bin_args = map(lambda a: a.decode(_ENCODE), args) + return os.path.join(*bin_args).encode(_ENCODE) - def str_join(s, l): - bin_args = map(lambda a: a.decode(_ENCODE), l) - b = s.decode(_ENCODE) - return b.join(bin_args).encode(_ENCODE) + def str_join(s, l): + bin_args = map(lambda a: a.decode(_ENCODE), l) + b = s.decode(_ENCODE) + return b.join(bin_args).encode(_ENCODE) - logfile_open = open + logfile_open = open else: - path_join = os.path.join - str_join = str.join + path_join = os.path.join + str_join = str.join - def logfile_open(*args): - return open(*args, errors='replace') + def logfile_open(*args): + return open(*args, errors='replace') diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py index be7271cb1..cc5f26fe2 100644 --- a/test/crossrunner/report.py +++ b/test/crossrunner/report.py @@ -39,396 +39,396 @@ FAIL_JSON = 'known_failures_%s.json' def generate_known_failures(testdir, overwrite, save, out): - def collect_failures(results): - success_index = 5 - for r in results: - if not r[success_index]: - yield TestEntry.get_name(*r) - try: - with logfile_open(path_join(testdir, RESULT_JSON), 'r') as fp: - results = json.load(fp) - except IOError: - sys.stderr.write('Unable to load last result. Did you run tests ?\n') - return False - fails = collect_failures(results['results']) - if not overwrite: - known = load_known_failures(testdir) - known.extend(fails) - fails = known - fails_json = json.dumps(sorted(set(fails)), indent=2, separators=(',', ': ')) - if save: - with logfile_open(os.path.join(testdir, FAIL_JSON % platform.system()), 'w+') as fp: - fp.write(fails_json) - sys.stdout.write('Successfully updated known failures.\n') - if out: - sys.stdout.write(fails_json) - sys.stdout.write('\n') - return True + def collect_failures(results): + success_index = 5 + for r in results: + if not r[success_index]: + yield TestEntry.get_name(*r) + try: + with logfile_open(path_join(testdir, RESULT_JSON), 'r') as fp: + results = json.load(fp) + except IOError: + sys.stderr.write('Unable to load last result. Did you run tests ?\n') + return False + fails = collect_failures(results['results']) + if not overwrite: + known = load_known_failures(testdir) + known.extend(fails) + fails = known + fails_json = json.dumps(sorted(set(fails)), indent=2, separators=(',', ': ')) + if save: + with logfile_open(os.path.join(testdir, FAIL_JSON % platform.system()), 'w+') as fp: + fp.write(fails_json) + sys.stdout.write('Successfully updated known failures.\n') + if out: + sys.stdout.write(fails_json) + sys.stdout.write('\n') + return True def load_known_failures(testdir): - try: - with logfile_open(path_join(testdir, FAIL_JSON % platform.system()), 'r') as fp: - return json.load(fp) - except IOError: - return [] + try: + with logfile_open(path_join(testdir, FAIL_JSON % platform.system()), 'r') as fp: + return json.load(fp) + except IOError: + return [] class TestReporter(object): - # Unfortunately, standard library doesn't handle timezone well - # DATETIME_FORMAT = '%a %b %d %H:%M:%S %Z %Y' - DATETIME_FORMAT = '%a %b %d %H:%M:%S %Y' + # Unfortunately, standard library doesn't handle timezone well + # DATETIME_FORMAT = '%a %b %d %H:%M:%S %Z %Y' + DATETIME_FORMAT = '%a %b %d %H:%M:%S %Y' - def __init__(self): - self._log = multiprocessing.get_logger() - self._lock = multiprocessing.Lock() + def __init__(self): + self._log = multiprocessing.get_logger() + self._lock = multiprocessing.Lock() - @classmethod - def test_logfile(cls, test_name, prog_kind, dir=None): - relpath = path_join('log', '%s_%s.log' % (test_name, prog_kind)) - return relpath if not dir else os.path.realpath(path_join(dir, relpath)) + @classmethod + def test_logfile(cls, test_name, prog_kind, dir=None): + relpath = path_join('log', '%s_%s.log' % (test_name, prog_kind)) + return relpath if not dir else os.path.realpath(path_join(dir, relpath)) - def _start(self): - self._start_time = time.time() + def _start(self): + self._start_time = time.time() - @property - def _elapsed(self): - return time.time() - self._start_time + @property + def _elapsed(self): + return time.time() - self._start_time - @classmethod - def _format_date(cls): - return '%s' % datetime.datetime.now().strftime(cls.DATETIME_FORMAT) + @classmethod + def _format_date(cls): + return '%s' % datetime.datetime.now().strftime(cls.DATETIME_FORMAT) - def _print_date(self): - print(self._format_date(), file=self.out) + def _print_date(self): + print(self._format_date(), file=self.out) - def _print_bar(self, out=None): - print( - '==========================================================================', - file=(out or self.out)) + def _print_bar(self, out=None): + print( + '==========================================================================', + file=(out or self.out)) - def _print_exec_time(self): - print('Test execution took {:.1f} seconds.'.format(self._elapsed), file=self.out) + def _print_exec_time(self): + print('Test execution took {:.1f} seconds.'.format(self._elapsed), file=self.out) class ExecReporter(TestReporter): - def __init__(self, testdir, test, prog): - super(ExecReporter, self).__init__() - self._test = test - self._prog = prog - self.logpath = self.test_logfile(test.name, prog.kind, testdir) - self.out = None - - def begin(self): - self._start() - self._open() - if self.out and not self.out.closed: - self._print_header() - else: - self._log.debug('Output stream is not available.') - - def end(self, returncode): - self._lock.acquire() - try: - if self.out and not self.out.closed: - self._print_footer(returncode) - self._close() + def __init__(self, testdir, test, prog): + super(ExecReporter, self).__init__() + self._test = test + self._prog = prog + self.logpath = self.test_logfile(test.name, prog.kind, testdir) self.out = None - else: - self._log.debug('Output stream is not available.') - finally: - self._lock.release() - - def killed(self): - print(file=self.out) - print('Server process is successfully killed.', file=self.out) - self.end(None) - - def died(self): - print(file=self.out) - print('*** Server process has died unexpectedly ***', file=self.out) - self.end(None) - - _init_failure_exprs = { - 'server': list(map(re.compile, [ - '[Aa]ddress already in use', - 'Could not bind', - 'EADDRINUSE', - ])), - 'client': list(map(re.compile, [ - '[Cc]onnection refused', - 'Could not connect to localhost', - 'ECONNREFUSED', - 'No such file or directory', # domain socket - ])), - } - - def maybe_false_positive(self): - """Searches through log file for socket bind error. - Returns True if suspicious expression is found, otherwise False""" - try: - if self.out and not self.out.closed: + + def begin(self): + self._start() + self._open() + if self.out and not self.out.closed: + self._print_header() + else: + self._log.debug('Output stream is not available.') + + def end(self, returncode): + self._lock.acquire() + try: + if self.out and not self.out.closed: + self._print_footer(returncode) + self._close() + self.out = None + else: + self._log.debug('Output stream is not available.') + finally: + self._lock.release() + + def killed(self): + print(file=self.out) + print('Server process is successfully killed.', file=self.out) + self.end(None) + + def died(self): + print(file=self.out) + print('*** Server process has died unexpectedly ***', file=self.out) + self.end(None) + + _init_failure_exprs = { + 'server': list(map(re.compile, [ + '[Aa]ddress already in use', + 'Could not bind', + 'EADDRINUSE', + ])), + 'client': list(map(re.compile, [ + '[Cc]onnection refused', + 'Could not connect to localhost', + 'ECONNREFUSED', + 'No such file or directory', # domain socket + ])), + } + + def maybe_false_positive(self): + """Searches through log file for socket bind error. + Returns True if suspicious expression is found, otherwise False""" + try: + if self.out and not self.out.closed: + self.out.flush() + exprs = self._init_failure_exprs[self._prog.kind] + + def match(line): + for expr in exprs: + if expr.search(line): + return True + + with logfile_open(self.logpath, 'r') as fp: + if any(map(match, fp)): + return True + except (KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + self._log.warn('[%s]: Error while detecting false positive: %s' % (self._test.name, str(ex))) + self._log.info(traceback.print_exc()) + return False + + def _open(self): + self.out = logfile_open(self.logpath, 'w+') + + def _close(self): + self.out.close() + + def _print_header(self): + self._print_date() + print('Executing: %s' % str_join(' ', self._prog.command), file=self.out) + print('Directory: %s' % self._prog.workdir, file=self.out) + print('config:delay: %s' % self._test.delay, file=self.out) + print('config:timeout: %s' % self._test.timeout, file=self.out) + self._print_bar() self.out.flush() - exprs = self._init_failure_exprs[self._prog.kind] - - def match(line): - for expr in exprs: - if expr.search(line): - return True - - with logfile_open(self.logpath, 'r') as fp: - if any(map(match, fp)): - return True - except (KeyboardInterrupt, SystemExit): - raise - except Exception as ex: - self._log.warn('[%s]: Error while detecting false positive: %s' % (self._test.name, str(ex))) - self._log.info(traceback.print_exc()) - return False - - def _open(self): - self.out = logfile_open(self.logpath, 'w+') - - def _close(self): - self.out.close() - - def _print_header(self): - self._print_date() - print('Executing: %s' % str_join(' ', self._prog.command), file=self.out) - print('Directory: %s' % self._prog.workdir, file=self.out) - print('config:delay: %s' % self._test.delay, file=self.out) - print('config:timeout: %s' % self._test.timeout, file=self.out) - self._print_bar() - self.out.flush() - - def _print_footer(self, returncode=None): - self._print_bar() - if returncode is not None: - print('Return code: %d' % returncode, file=self.out) - else: - print('Process is killed.', file=self.out) - self._print_exec_time() - self._print_date() + def _print_footer(self, returncode=None): + self._print_bar() + if returncode is not None: + print('Return code: %d' % returncode, file=self.out) + else: + print('Process is killed.', file=self.out) + self._print_exec_time() + self._print_date() -class SummaryReporter(TestReporter): - def __init__(self, basedir, testdir_relative, concurrent=True): - super(SummaryReporter, self).__init__() - self._basedir = basedir - self._testdir_rel = testdir_relative - self.logdir = path_join(self.testdir, LOG_DIR) - self.out_path = path_join(self.testdir, RESULT_JSON) - self.concurrent = concurrent - self.out = sys.stdout - self._platform = platform.system() - self._revision = self._get_revision() - self._tests = [] - if not os.path.exists(self.logdir): - os.mkdir(self.logdir) - self._known_failures = load_known_failures(self.testdir) - self._unexpected_success = [] - self._flaky_success = [] - self._unexpected_failure = [] - self._expected_failure = [] - self._print_header() - - @property - def testdir(self): - return path_join(self._basedir, self._testdir_rel) - - def _result_string(self, test): - if test.success: - if test.retry_count == 0: - return 'success' - elif test.retry_count == 1: - return 'flaky(1 retry)' - else: - return 'flaky(%d retries)' % test.retry_count - elif test.expired: - return 'failure(timeout)' - else: - return 'failure(%d)' % test.returncode - - def _get_revision(self): - p = subprocess.Popen(['git', 'rev-parse', '--short', 'HEAD'], - cwd=self.testdir, stdout=subprocess.PIPE) - out, _ = p.communicate() - return out.strip() - - def _format_test(self, test, with_result=True): - name = '%s-%s' % (test.server.name, test.client.name) - trans = '%s-%s' % (test.transport, test.socket) - if not with_result: - return '{:24s}{:13s}{:25s}'.format(name[:23], test.protocol[:12], trans[:24]) - else: - return '{:24s}{:13s}{:25s}{:s}\n'.format(name[:23], test.protocol[:12], trans[:24], self._result_string(test)) - - def _print_test_header(self): - self._print_bar() - print( - '{:24s}{:13s}{:25s}{:s}'.format('server-client:', 'protocol:', 'transport:', 'result:'), - file=self.out) - - def _print_header(self): - self._start() - print('Apache Thrift - Integration Test Suite', file=self.out) - self._print_date() - self._print_test_header() - - def _print_unexpected_failure(self): - if len(self._unexpected_failure) > 0: - self.out.writelines([ - '*** Following %d failures were unexpected ***:\n' % len(self._unexpected_failure), - 'If it is introduced by you, please fix it before submitting the code.\n', - # 'If not, please report at https://issues.apache.org/jira/browse/THRIFT\n', - ]) - self._print_test_header() - for i in self._unexpected_failure: - self.out.write(self._format_test(self._tests[i])) - self._print_bar() - else: - print('No unexpected failures.', file=self.out) - - def _print_flaky_success(self): - if len(self._flaky_success) > 0: - print( - 'Following %d tests were expected to cleanly succeed but needed retry:' % len(self._flaky_success), - file=self.out) - self._print_test_header() - for i in self._flaky_success: - self.out.write(self._format_test(self._tests[i])) - self._print_bar() - - def _print_unexpected_success(self): - if len(self._unexpected_success) > 0: - print( - 'Following %d tests were known to fail but succeeded (maybe flaky):' % len(self._unexpected_success), - file=self.out) - self._print_test_header() - for i in self._unexpected_success: - self.out.write(self._format_test(self._tests[i])) - self._print_bar() - - def _http_server_command(self, port): - if sys.version_info[0] < 3: - return 'python -m SimpleHTTPServer %d' % port - else: - return 'python -m http.server %d' % port - - def _print_footer(self): - fail_count = len(self._expected_failure) + len(self._unexpected_failure) - self._print_bar() - self._print_unexpected_success() - self._print_flaky_success() - self._print_unexpected_failure() - self._write_html_data() - self._assemble_log('unexpected failures', self._unexpected_failure) - self._assemble_log('known failures', self._expected_failure) - self.out.writelines([ - 'You can browse results at:\n', - '\tfile://%s/%s\n' % (self.testdir, RESULT_HTML), - '# If you use Chrome, run:\n', - '# \tcd %s\n#\t%s\n' % (self._basedir, self._http_server_command(8001)), - '# then browse:\n', - '# \thttp://localhost:%d/%s/\n' % (8001, self._testdir_rel), - 'Full log for each test is here:\n', - '\ttest/log/client_server_protocol_transport_client.log\n', - '\ttest/log/client_server_protocol_transport_server.log\n', - '%d failed of %d tests in total.\n' % (fail_count, len(self._tests)), - ]) - self._print_exec_time() - self._print_date() - - def _render_result(self, test): - return [ - test.server.name, - test.client.name, - test.protocol, - test.transport, - test.socket, - test.success, - test.as_expected, - test.returncode, - { - 'server': self.test_logfile(test.name, test.server.kind), - 'client': self.test_logfile(test.name, test.client.kind), - }, - ] - - def _write_html_data(self): - """Writes JSON data to be read by result html""" - results = [self._render_result(r) for r in self._tests] - with logfile_open(self.out_path, 'w+') as fp: - fp.write(json.dumps({ - 'date': self._format_date(), - 'revision': str(self._revision), - 'platform': self._platform, - 'duration': '{:.1f}'.format(self._elapsed), - 'results': results, - }, indent=2)) - - def _assemble_log(self, title, indexes): - if len(indexes) > 0: - def add_prog_log(fp, test, prog_kind): - print('*************************** %s message ***************************' % prog_kind, - file=fp) - path = self.test_logfile(test.name, prog_kind, self.testdir) - if os.path.exists(path): - with logfile_open(path, 'r') as prog_fp: - print(prog_fp.read(), file=fp) - filename = title.replace(' ', '_') + '.log' - with logfile_open(os.path.join(self.logdir, filename), 'w+') as fp: - for test in map(self._tests.__getitem__, indexes): - fp.write('TEST: [%s]\n' % test.name) - add_prog_log(fp, test, test.server.kind) - add_prog_log(fp, test, test.client.kind) - fp.write('**********************************************************************\n\n') - print('%s are logged to %s/%s/%s' % (title.capitalize(), self._testdir_rel, LOG_DIR, filename)) - - def end(self): - self._print_footer() - return len(self._unexpected_failure) == 0 - - def add_test(self, test_dict): - test = TestEntry(self.testdir, **test_dict) - self._lock.acquire() - try: - if not self.concurrent: - self.out.write(self._format_test(test, False)) - self.out.flush() - self._tests.append(test) - return len(self._tests) - 1 - finally: - self._lock.release() - def add_result(self, index, returncode, expired, retry_count): - self._lock.acquire() - try: - failed = returncode is None or returncode != 0 - flaky = not failed and retry_count != 0 - test = self._tests[index] - known = test.name in self._known_failures - if failed: - if known: - self._log.debug('%s failed as expected' % test.name) - self._expected_failure.append(index) +class SummaryReporter(TestReporter): + def __init__(self, basedir, testdir_relative, concurrent=True): + super(SummaryReporter, self).__init__() + self._basedir = basedir + self._testdir_rel = testdir_relative + self.logdir = path_join(self.testdir, LOG_DIR) + self.out_path = path_join(self.testdir, RESULT_JSON) + self.concurrent = concurrent + self.out = sys.stdout + self._platform = platform.system() + self._revision = self._get_revision() + self._tests = [] + if not os.path.exists(self.logdir): + os.mkdir(self.logdir) + self._known_failures = load_known_failures(self.testdir) + self._unexpected_success = [] + self._flaky_success = [] + self._unexpected_failure = [] + self._expected_failure = [] + self._print_header() + + @property + def testdir(self): + return path_join(self._basedir, self._testdir_rel) + + def _result_string(self, test): + if test.success: + if test.retry_count == 0: + return 'success' + elif test.retry_count == 1: + return 'flaky(1 retry)' + else: + return 'flaky(%d retries)' % test.retry_count + elif test.expired: + return 'failure(timeout)' + else: + return 'failure(%d)' % test.returncode + + def _get_revision(self): + p = subprocess.Popen(['git', 'rev-parse', '--short', 'HEAD'], + cwd=self.testdir, stdout=subprocess.PIPE) + out, _ = p.communicate() + return out.strip() + + def _format_test(self, test, with_result=True): + name = '%s-%s' % (test.server.name, test.client.name) + trans = '%s-%s' % (test.transport, test.socket) + if not with_result: + return '{:24s}{:13s}{:25s}'.format(name[:23], test.protocol[:12], trans[:24]) + else: + return '{:24s}{:13s}{:25s}{:s}\n'.format(name[:23], test.protocol[:12], trans[:24], self._result_string(test)) + + def _print_test_header(self): + self._print_bar() + print( + '{:24s}{:13s}{:25s}{:s}'.format('server-client:', 'protocol:', 'transport:', 'result:'), + file=self.out) + + def _print_header(self): + self._start() + print('Apache Thrift - Integration Test Suite', file=self.out) + self._print_date() + self._print_test_header() + + def _print_unexpected_failure(self): + if len(self._unexpected_failure) > 0: + self.out.writelines([ + '*** Following %d failures were unexpected ***:\n' % len(self._unexpected_failure), + 'If it is introduced by you, please fix it before submitting the code.\n', + # 'If not, please report at https://issues.apache.org/jira/browse/THRIFT\n', + ]) + self._print_test_header() + for i in self._unexpected_failure: + self.out.write(self._format_test(self._tests[i])) + self._print_bar() + else: + print('No unexpected failures.', file=self.out) + + def _print_flaky_success(self): + if len(self._flaky_success) > 0: + print( + 'Following %d tests were expected to cleanly succeed but needed retry:' % len(self._flaky_success), + file=self.out) + self._print_test_header() + for i in self._flaky_success: + self.out.write(self._format_test(self._tests[i])) + self._print_bar() + + def _print_unexpected_success(self): + if len(self._unexpected_success) > 0: + print( + 'Following %d tests were known to fail but succeeded (maybe flaky):' % len(self._unexpected_success), + file=self.out) + self._print_test_header() + for i in self._unexpected_success: + self.out.write(self._format_test(self._tests[i])) + self._print_bar() + + def _http_server_command(self, port): + if sys.version_info[0] < 3: + return 'python -m SimpleHTTPServer %d' % port else: - self._log.info('unexpected failure: %s' % test.name) - self._unexpected_failure.append(index) - elif flaky and not known: - self._log.info('unexpected flaky success: %s' % test.name) - self._flaky_success.append(index) - elif not flaky and known: - self._log.info('unexpected success: %s' % test.name) - self._unexpected_success.append(index) - test.success = not failed - test.returncode = returncode - test.retry_count = retry_count - test.expired = expired - test.as_expected = known == failed - if not self.concurrent: - self.out.write(self._result_string(test) + '\n') - else: - self.out.write(self._format_test(test)) - finally: - self._lock.release() + return 'python -m http.server %d' % port + + def _print_footer(self): + fail_count = len(self._expected_failure) + len(self._unexpected_failure) + self._print_bar() + self._print_unexpected_success() + self._print_flaky_success() + self._print_unexpected_failure() + self._write_html_data() + self._assemble_log('unexpected failures', self._unexpected_failure) + self._assemble_log('known failures', self._expected_failure) + self.out.writelines([ + 'You can browse results at:\n', + '\tfile://%s/%s\n' % (self.testdir, RESULT_HTML), + '# If you use Chrome, run:\n', + '# \tcd %s\n#\t%s\n' % (self._basedir, self._http_server_command(8001)), + '# then browse:\n', + '# \thttp://localhost:%d/%s/\n' % (8001, self._testdir_rel), + 'Full log for each test is here:\n', + '\ttest/log/client_server_protocol_transport_client.log\n', + '\ttest/log/client_server_protocol_transport_server.log\n', + '%d failed of %d tests in total.\n' % (fail_count, len(self._tests)), + ]) + self._print_exec_time() + self._print_date() + + def _render_result(self, test): + return [ + test.server.name, + test.client.name, + test.protocol, + test.transport, + test.socket, + test.success, + test.as_expected, + test.returncode, + { + 'server': self.test_logfile(test.name, test.server.kind), + 'client': self.test_logfile(test.name, test.client.kind), + }, + ] + + def _write_html_data(self): + """Writes JSON data to be read by result html""" + results = [self._render_result(r) for r in self._tests] + with logfile_open(self.out_path, 'w+') as fp: + fp.write(json.dumps({ + 'date': self._format_date(), + 'revision': str(self._revision), + 'platform': self._platform, + 'duration': '{:.1f}'.format(self._elapsed), + 'results': results, + }, indent=2)) + + def _assemble_log(self, title, indexes): + if len(indexes) > 0: + def add_prog_log(fp, test, prog_kind): + print('*************************** %s message ***************************' % prog_kind, + file=fp) + path = self.test_logfile(test.name, prog_kind, self.testdir) + if os.path.exists(path): + with logfile_open(path, 'r') as prog_fp: + print(prog_fp.read(), file=fp) + filename = title.replace(' ', '_') + '.log' + with logfile_open(os.path.join(self.logdir, filename), 'w+') as fp: + for test in map(self._tests.__getitem__, indexes): + fp.write('TEST: [%s]\n' % test.name) + add_prog_log(fp, test, test.server.kind) + add_prog_log(fp, test, test.client.kind) + fp.write('**********************************************************************\n\n') + print('%s are logged to %s/%s/%s' % (title.capitalize(), self._testdir_rel, LOG_DIR, filename)) + + def end(self): + self._print_footer() + return len(self._unexpected_failure) == 0 + + def add_test(self, test_dict): + test = TestEntry(self.testdir, **test_dict) + self._lock.acquire() + try: + if not self.concurrent: + self.out.write(self._format_test(test, False)) + self.out.flush() + self._tests.append(test) + return len(self._tests) - 1 + finally: + self._lock.release() + + def add_result(self, index, returncode, expired, retry_count): + self._lock.acquire() + try: + failed = returncode is None or returncode != 0 + flaky = not failed and retry_count != 0 + test = self._tests[index] + known = test.name in self._known_failures + if failed: + if known: + self._log.debug('%s failed as expected' % test.name) + self._expected_failure.append(index) + else: + self._log.info('unexpected failure: %s' % test.name) + self._unexpected_failure.append(index) + elif flaky and not known: + self._log.info('unexpected flaky success: %s' % test.name) + self._flaky_success.append(index) + elif not flaky and known: + self._log.info('unexpected success: %s' % test.name) + self._unexpected_success.append(index) + test.success = not failed + test.returncode = returncode + test.retry_count = retry_count + test.expired = expired + test.as_expected = known == failed + if not self.concurrent: + self.out.write(self._result_string(test) + '\n') + else: + self.out.write(self._format_test(test)) + finally: + self._lock.release() diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py index 68bd92869..18c162357 100644 --- a/test/crossrunner/run.py +++ b/test/crossrunner/run.py @@ -39,307 +39,307 @@ RESULT_ERROR = 64 class ExecutionContext(object): - def __init__(self, cmd, cwd, env, report): - self._log = multiprocessing.get_logger() - self.report = report - self.cmd = cmd - self.cwd = cwd - self.env = env - self.timer = None - self.expired = False - self.killed = False - - def _expire(self): - self._log.info('Timeout') - self.expired = True - self.kill() - - def kill(self): - self._log.debug('Killing process : %d' % self.proc.pid) - self.killed = True - if platform.system() != 'Windows': - try: - os.killpg(self.proc.pid, signal.SIGKILL) - except Exception: - self._log.info('Failed to kill process group', exc_info=sys.exc_info()) - try: - self.proc.kill() - except Exception: - self._log.info('Failed to kill process', exc_info=sys.exc_info()) - - def _popen_args(self): - args = { - 'cwd': self.cwd, - 'env': self.env, - 'stdout': self.report.out, - 'stderr': subprocess.STDOUT, - } - # make sure child processes doesn't remain after killing - if platform.system() == 'Windows': - DETACHED_PROCESS = 0x00000008 - args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP) - else: - args.update(preexec_fn=os.setsid) - return args - - def start(self, timeout=0): - joined = str_join(' ', self.cmd) - self._log.debug('COMMAND: %s', joined) - self._log.debug('WORKDIR: %s', self.cwd) - self._log.debug('LOGFILE: %s', self.report.logpath) - self.report.begin() - self.proc = subprocess.Popen(self.cmd, **self._popen_args()) - if timeout > 0: - self.timer = threading.Timer(timeout, self._expire) - self.timer.start() - return self._scoped() - - @contextlib.contextmanager - def _scoped(self): - yield self - self._log.debug('Killing scoped process') - if self.proc.poll() is None: - self.kill() - self.report.killed() - else: - self._log.debug('Process died unexpectedly') - self.report.died() - - def wait(self): - self.proc.communicate() - if self.timer: - self.timer.cancel() - self.report.end(self.returncode) - - @property - def returncode(self): - return self.proc.returncode if self.proc else None + def __init__(self, cmd, cwd, env, report): + self._log = multiprocessing.get_logger() + self.report = report + self.cmd = cmd + self.cwd = cwd + self.env = env + self.timer = None + self.expired = False + self.killed = False + + def _expire(self): + self._log.info('Timeout') + self.expired = True + self.kill() + + def kill(self): + self._log.debug('Killing process : %d' % self.proc.pid) + self.killed = True + if platform.system() != 'Windows': + try: + os.killpg(self.proc.pid, signal.SIGKILL) + except Exception: + self._log.info('Failed to kill process group', exc_info=sys.exc_info()) + try: + self.proc.kill() + except Exception: + self._log.info('Failed to kill process', exc_info=sys.exc_info()) + + def _popen_args(self): + args = { + 'cwd': self.cwd, + 'env': self.env, + 'stdout': self.report.out, + 'stderr': subprocess.STDOUT, + } + # make sure child processes doesn't remain after killing + if platform.system() == 'Windows': + DETACHED_PROCESS = 0x00000008 + args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP) + else: + args.update(preexec_fn=os.setsid) + return args + + def start(self, timeout=0): + joined = str_join(' ', self.cmd) + self._log.debug('COMMAND: %s', joined) + self._log.debug('WORKDIR: %s', self.cwd) + self._log.debug('LOGFILE: %s', self.report.logpath) + self.report.begin() + self.proc = subprocess.Popen(self.cmd, **self._popen_args()) + if timeout > 0: + self.timer = threading.Timer(timeout, self._expire) + self.timer.start() + return self._scoped() + + @contextlib.contextmanager + def _scoped(self): + yield self + self._log.debug('Killing scoped process') + if self.proc.poll() is None: + self.kill() + self.report.killed() + else: + self._log.debug('Process died unexpectedly') + self.report.died() + + def wait(self): + self.proc.communicate() + if self.timer: + self.timer.cancel() + self.report.end(self.returncode) + + @property + def returncode(self): + return self.proc.returncode if self.proc else None def exec_context(port, logdir, test, prog): - report = ExecReporter(logdir, test, prog) - prog.build_command(port) - return ExecutionContext(prog.command, prog.workdir, prog.env, report) + report = ExecReporter(logdir, test, prog) + prog.build_command(port) + return ExecutionContext(prog.command, prog.workdir, prog.env, report) def run_test(testdir, logdir, test_dict, max_retry, async=True): - try: - logger = multiprocessing.get_logger() - max_bind_retry = 3 - retry_count = 0 - bind_retry_count = 0 - test = TestEntry(testdir, **test_dict) - while True: - if stop.is_set(): - logger.debug('Skipping because shutting down') - return (retry_count, None) - logger.debug('Start') - with PortAllocator.alloc_port_scoped(ports, test.socket) as port: - logger.debug('Start with port %d' % port) - sv = exec_context(port, logdir, test, test.server) - cl = exec_context(port, logdir, test, test.client) - - logger.debug('Starting server') - with sv.start(): - if test.delay > 0: - logger.debug('Delaying client for %.2f seconds' % test.delay) - time.sleep(test.delay) - connect_retry_count = 0 - max_connect_retry = 10 - connect_retry_wait = 0.5 - while True: - logger.debug('Starting client') - cl.start(test.timeout) - logger.debug('Waiting client') - cl.wait() - if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry: - if connect_retry_count > 0 and connect_retry_count < max_connect_retry: - logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) - # Wait for 50ms to see if server does not die at the end. - time.sleep(0.05) - break - logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait) - time.sleep(connect_retry_wait) - connect_retry_count += 1 - - if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry: - logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name) - bind_retry_count += 1 - else: - if cl.expired: - result = RESULT_TIMEOUT - elif not sv.killed and cl.proc.returncode == 0: - # Server should be alive at the end. - result = RESULT_ERROR - else: - result = cl.proc.returncode - - if result == 0 or retry_count >= max_retry: - return (retry_count, result) - else: - logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name) - retry_count += 1 - except (KeyboardInterrupt, SystemExit): - logger.info('Interrupted execution') - if not async: - raise - stop.set() - return None - except: - if not async: - raise - logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info()) - return (retry_count, RESULT_ERROR) + try: + logger = multiprocessing.get_logger() + max_bind_retry = 3 + retry_count = 0 + bind_retry_count = 0 + test = TestEntry(testdir, **test_dict) + while True: + if stop.is_set(): + logger.debug('Skipping because shutting down') + return (retry_count, None) + logger.debug('Start') + with PortAllocator.alloc_port_scoped(ports, test.socket) as port: + logger.debug('Start with port %d' % port) + sv = exec_context(port, logdir, test, test.server) + cl = exec_context(port, logdir, test, test.client) + + logger.debug('Starting server') + with sv.start(): + if test.delay > 0: + logger.debug('Delaying client for %.2f seconds' % test.delay) + time.sleep(test.delay) + connect_retry_count = 0 + max_connect_retry = 10 + connect_retry_wait = 0.5 + while True: + logger.debug('Starting client') + cl.start(test.timeout) + logger.debug('Waiting client') + cl.wait() + if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry: + if connect_retry_count > 0 and connect_retry_count < max_connect_retry: + logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) + # Wait for 50ms to see if server does not die at the end. + time.sleep(0.05) + break + logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait) + time.sleep(connect_retry_wait) + connect_retry_count += 1 + + if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry: + logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name) + bind_retry_count += 1 + else: + if cl.expired: + result = RESULT_TIMEOUT + elif not sv.killed and cl.proc.returncode == 0: + # Server should be alive at the end. + result = RESULT_ERROR + else: + result = cl.proc.returncode + + if result == 0 or retry_count >= max_retry: + return (retry_count, result) + else: + logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name) + retry_count += 1 + except (KeyboardInterrupt, SystemExit): + logger.info('Interrupted execution') + if not async: + raise + stop.set() + return None + except: + if not async: + raise + logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info()) + return (retry_count, RESULT_ERROR) class PortAllocator(object): - def __init__(self): - self._log = multiprocessing.get_logger() - self._lock = multiprocessing.Lock() - self._ports = set() - self._dom_ports = set() - self._last_alloc = 0 - - def _get_tcp_port(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('127.0.0.1', 0)) - port = sock.getsockname()[1] - self._lock.acquire() - try: - ok = port not in self._ports - if ok: - self._ports.add(port) - self._last_alloc = time.time() - finally: - self._lock.release() - sock.close() - return port if ok else self._get_tcp_port() - - def _get_domain_port(self): - port = random.randint(1024, 65536) - self._lock.acquire() - try: - ok = port not in self._dom_ports - if ok: - self._dom_ports.add(port) - finally: - self._lock.release() - return port if ok else self._get_domain_port() - - def alloc_port(self, socket_type): - if socket_type in ('domain', 'abstract'): - return self._get_domain_port() - else: - return self._get_tcp_port() - - # static method for inter-process invokation - @staticmethod - @contextlib.contextmanager - def alloc_port_scoped(allocator, socket_type): - port = allocator.alloc_port(socket_type) - yield port - allocator.free_port(socket_type, port) - - def free_port(self, socket_type, port): - self._log.debug('free_port') - self._lock.acquire() - try: - if socket_type == 'domain': - self._dom_ports.remove(port) - path = domain_socket_path(port) - if os.path.exists(path): - os.remove(path) - elif socket_type == 'abstract': - self._dom_ports.remove(port) - else: - self._ports.remove(port) - except IOError: - self._log.info('Error while freeing port', exc_info=sys.exc_info()) - finally: - self._lock.release() + def __init__(self): + self._log = multiprocessing.get_logger() + self._lock = multiprocessing.Lock() + self._ports = set() + self._dom_ports = set() + self._last_alloc = 0 + + def _get_tcp_port(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + port = sock.getsockname()[1] + self._lock.acquire() + try: + ok = port not in self._ports + if ok: + self._ports.add(port) + self._last_alloc = time.time() + finally: + self._lock.release() + sock.close() + return port if ok else self._get_tcp_port() + + def _get_domain_port(self): + port = random.randint(1024, 65536) + self._lock.acquire() + try: + ok = port not in self._dom_ports + if ok: + self._dom_ports.add(port) + finally: + self._lock.release() + return port if ok else self._get_domain_port() + + def alloc_port(self, socket_type): + if socket_type in ('domain', 'abstract'): + return self._get_domain_port() + else: + return self._get_tcp_port() + + # static method for inter-process invokation + @staticmethod + @contextlib.contextmanager + def alloc_port_scoped(allocator, socket_type): + port = allocator.alloc_port(socket_type) + yield port + allocator.free_port(socket_type, port) + + def free_port(self, socket_type, port): + self._log.debug('free_port') + self._lock.acquire() + try: + if socket_type == 'domain': + self._dom_ports.remove(port) + path = domain_socket_path(port) + if os.path.exists(path): + os.remove(path) + elif socket_type == 'abstract': + self._dom_ports.remove(port) + else: + self._ports.remove(port) + except IOError: + self._log.info('Error while freeing port', exc_info=sys.exc_info()) + finally: + self._lock.release() class NonAsyncResult(object): - def __init__(self, value): - self._value = value + def __init__(self, value): + self._value = value - def get(self, timeout=None): - return self._value + def get(self, timeout=None): + return self._value - def wait(self, timeout=None): - pass + def wait(self, timeout=None): + pass - def ready(self): - return True + def ready(self): + return True - def successful(self): - return self._value == 0 + def successful(self): + return self._value == 0 class TestDispatcher(object): - def __init__(self, testdir, basedir, logdir_rel, concurrency): - self._log = multiprocessing.get_logger() - self.testdir = testdir - self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1) - self.logdir = self._report.testdir - # seems needed for python 2.x to handle keyboard interrupt - self._stop = multiprocessing.Event() - self._async = concurrency > 1 - if not self._async: - self._pool = None - global stop - global ports - stop = self._stop - ports = PortAllocator() - else: - self._m = multiprocessing.managers.BaseManager() - self._m.register('ports', PortAllocator) - self._m.start() - self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,)) - self._log.debug( - 'TestDispatcher started with %d concurrent jobs' % concurrency) - - def _pool_init(self, address): - global stop - global m - global ports - stop = self._stop - m = multiprocessing.managers.BaseManager(address) - m.connect() - ports = m.ports() - - def _dispatch_sync(self, test, cont, max_retry): - r = run_test(self.testdir, self.logdir, test, max_retry, False) - cont(r) - return NonAsyncResult(r) - - def _dispatch_async(self, test, cont, max_retry): - self._log.debug('_dispatch_async') - return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont) - - def dispatch(self, test, max_retry): - index = self._report.add_test(test) - - def cont(result): - if not self._stop.is_set(): - retry_count, returncode = result - self._log.debug('freeing port') - self._log.debug('adding result') - self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count) - self._log.debug('finish continuation') - fn = self._dispatch_async if self._async else self._dispatch_sync - return fn(test, cont, max_retry) - - def wait(self): - if self._async: - self._pool.close() - self._pool.join() - self._m.shutdown() - return self._report.end() - - def terminate(self): - self._stop.set() - if self._async: - self._pool.terminate() - self._pool.join() - self._m.shutdown() + def __init__(self, testdir, basedir, logdir_rel, concurrency): + self._log = multiprocessing.get_logger() + self.testdir = testdir + self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1) + self.logdir = self._report.testdir + # seems needed for python 2.x to handle keyboard interrupt + self._stop = multiprocessing.Event() + self._async = concurrency > 1 + if not self._async: + self._pool = None + global stop + global ports + stop = self._stop + ports = PortAllocator() + else: + self._m = multiprocessing.managers.BaseManager() + self._m.register('ports', PortAllocator) + self._m.start() + self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,)) + self._log.debug( + 'TestDispatcher started with %d concurrent jobs' % concurrency) + + def _pool_init(self, address): + global stop + global m + global ports + stop = self._stop + m = multiprocessing.managers.BaseManager(address) + m.connect() + ports = m.ports() + + def _dispatch_sync(self, test, cont, max_retry): + r = run_test(self.testdir, self.logdir, test, max_retry, False) + cont(r) + return NonAsyncResult(r) + + def _dispatch_async(self, test, cont, max_retry): + self._log.debug('_dispatch_async') + return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont) + + def dispatch(self, test, max_retry): + index = self._report.add_test(test) + + def cont(result): + if not self._stop.is_set(): + retry_count, returncode = result + self._log.debug('freeing port') + self._log.debug('adding result') + self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count) + self._log.debug('finish continuation') + fn = self._dispatch_async if self._async else self._dispatch_sync + return fn(test, cont, max_retry) + + def wait(self): + if self._async: + self._pool.close() + self._pool.join() + self._m.shutdown() + return self._report.end() + + def terminate(self): + self._stop.set() + if self._async: + self._pool.terminate() + self._pool.join() + self._m.shutdown() diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py index fc90f7f30..dcc8a9416 100644 --- a/test/crossrunner/test.py +++ b/test/crossrunner/test.py @@ -26,118 +26,118 @@ from .util import merge_dict def domain_socket_path(port): - return '/tmp/ThriftTest.thrift.%d' % port + return '/tmp/ThriftTest.thrift.%d' % port class TestProgram(object): - def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None, - extra_args=[], extra_args2=[], join_args=False, **kwargs): - self.kind = kind - self.name = name - self.protocol = protocol - self.transport = transport - self.socket = socket - self.workdir = workdir - self.command = None - self._base_command = self._fix_cmd_path(command) - if env: - self.env = copy.copy(os.environ) - self.env.update(env) - else: - self.env = os.environ - self._extra_args = extra_args - self._extra_args2 = extra_args2 - self._join_args = join_args - - def _fix_cmd_path(self, cmd): - # if the arg is a file in the current directory, make it path - def abs_if_exists(arg): - p = path_join(self.workdir, arg) - return p if os.path.exists(p) else arg - - if cmd[0] == 'python': - cmd[0] = sys.executable - else: - cmd[0] = abs_if_exists(cmd[0]) - return cmd - - def _socket_args(self, socket, port): - return { - 'ip-ssl': ['--ssl'], - 'domain': ['--domain-socket=%s' % domain_socket_path(port)], - 'abstract': ['--abstract-namespace', '--domain-socket=%s' % domain_socket_path(port)], - }.get(socket, None) - - def build_command(self, port): - cmd = copy.copy(self._base_command) - args = copy.copy(self._extra_args2) - args.append('--protocol=' + self.protocol) - args.append('--transport=' + self.transport) - socket_args = self._socket_args(self.socket, port) - if socket_args: - args += socket_args - args.append('--port=%d' % port) - if self._join_args: - cmd.append('%s' % " ".join(args)) - else: - cmd.extend(args) - if self._extra_args: - cmd.extend(self._extra_args) - self.command = cmd - return self.command + def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None, + extra_args=[], extra_args2=[], join_args=False, **kwargs): + self.kind = kind + self.name = name + self.protocol = protocol + self.transport = transport + self.socket = socket + self.workdir = workdir + self.command = None + self._base_command = self._fix_cmd_path(command) + if env: + self.env = copy.copy(os.environ) + self.env.update(env) + else: + self.env = os.environ + self._extra_args = extra_args + self._extra_args2 = extra_args2 + self._join_args = join_args + + def _fix_cmd_path(self, cmd): + # if the arg is a file in the current directory, make it path + def abs_if_exists(arg): + p = path_join(self.workdir, arg) + return p if os.path.exists(p) else arg + + if cmd[0] == 'python': + cmd[0] = sys.executable + else: + cmd[0] = abs_if_exists(cmd[0]) + return cmd + + def _socket_args(self, socket, port): + return { + 'ip-ssl': ['--ssl'], + 'domain': ['--domain-socket=%s' % domain_socket_path(port)], + 'abstract': ['--abstract-namespace', '--domain-socket=%s' % domain_socket_path(port)], + }.get(socket, None) + + def build_command(self, port): + cmd = copy.copy(self._base_command) + args = copy.copy(self._extra_args2) + args.append('--protocol=' + self.protocol) + args.append('--transport=' + self.transport) + socket_args = self._socket_args(self.socket, port) + if socket_args: + args += socket_args + args.append('--port=%d' % port) + if self._join_args: + cmd.append('%s' % " ".join(args)) + else: + cmd.extend(args) + if self._extra_args: + cmd.extend(self._extra_args) + self.command = cmd + return self.command class TestEntry(object): - def __init__(self, testdir, server, client, delay, timeout, **kwargs): - self.testdir = testdir - self._log = multiprocessing.get_logger() - self._config = kwargs - self.protocol = kwargs['protocol'] - self.transport = kwargs['transport'] - self.socket = kwargs['socket'] - srv_dict = self._fix_workdir(merge_dict(self._config, server)) - cli_dict = self._fix_workdir(merge_dict(self._config, client)) - cli_dict['extra_args2'] = srv_dict.pop('remote_args', []) - srv_dict['extra_args2'] = cli_dict.pop('remote_args', []) - self.server = TestProgram('server', **srv_dict) - self.client = TestProgram('client', **cli_dict) - self.delay = delay - self.timeout = timeout - self._name = None - # results - self.success = None - self.as_expected = None - self.returncode = None - self.expired = False - self.retry_count = 0 - - def _fix_workdir(self, config): - key = 'workdir' - path = config.get(key, None) - if not path: - path = self.testdir - if os.path.isabs(path): - path = os.path.realpath(path) - else: - path = os.path.realpath(path_join(self.testdir, path)) - config.update({key: path}) - return config - - @classmethod - def get_name(cls, server, client, proto, trans, sock, *args): - return '%s-%s_%s_%s-%s' % (server, client, proto, trans, sock) - - @property - def name(self): - if not self._name: - self._name = self.get_name( - self.server.name, self.client.name, self.protocol, self.transport, self.socket) - return self._name - - @property - def transport_name(self): - return '%s-%s' % (self.transport, self.socket) + def __init__(self, testdir, server, client, delay, timeout, **kwargs): + self.testdir = testdir + self._log = multiprocessing.get_logger() + self._config = kwargs + self.protocol = kwargs['protocol'] + self.transport = kwargs['transport'] + self.socket = kwargs['socket'] + srv_dict = self._fix_workdir(merge_dict(self._config, server)) + cli_dict = self._fix_workdir(merge_dict(self._config, client)) + cli_dict['extra_args2'] = srv_dict.pop('remote_args', []) + srv_dict['extra_args2'] = cli_dict.pop('remote_args', []) + self.server = TestProgram('server', **srv_dict) + self.client = TestProgram('client', **cli_dict) + self.delay = delay + self.timeout = timeout + self._name = None + # results + self.success = None + self.as_expected = None + self.returncode = None + self.expired = False + self.retry_count = 0 + + def _fix_workdir(self, config): + key = 'workdir' + path = config.get(key, None) + if not path: + path = self.testdir + if os.path.isabs(path): + path = os.path.realpath(path) + else: + path = os.path.realpath(path_join(self.testdir, path)) + config.update({key: path}) + return config + + @classmethod + def get_name(cls, server, client, proto, trans, sock, *args): + return '%s-%s_%s_%s-%s' % (server, client, proto, trans, sock) + + @property + def name(self): + if not self._name: + self._name = self.get_name( + self.server.name, self.client.name, self.protocol, self.transport, self.socket) + return self._name + + @property + def transport_name(self): + return '%s-%s' % (self.transport, self.socket) def test_name(server, client, protocol, transport, socket, **kwargs): - return TestEntry.get_name(server['name'], client['name'], protocol, transport, socket) + return TestEntry.get_name(server['name'], client['name'], protocol, transport, socket) diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py index 750ed475e..e2d195a22 100644 --- a/test/crossrunner/util.py +++ b/test/crossrunner/util.py @@ -21,11 +21,11 @@ import copy def merge_dict(base, update): - """Update dict concatenating list values""" - res = copy.deepcopy(base) - for k, v in list(update.items()): - if k in list(res.keys()) and isinstance(v, list): - res[k].extend(v) - else: - res[k] = v - return res + """Update dict concatenating list values""" + res = copy.deepcopy(base) + for k, v in list(update.items()): + if k in list(res.keys()) and isinstance(v, list): + res[k].extend(v) + else: + res[k] = v + return res |