summaryrefslogtreecommitdiff
path: root/test/crossrunner
diff options
context:
space:
mode:
authorNobuaki Sukegawa <nsuke@apache.org>2016-02-03 01:57:03 +0900
committerNobuaki Sukegawa <nsuke@apache.org>2016-02-04 14:28:24 +0900
commit10308cb975ac090584068d0470b81e41555b2f35 (patch)
treebc0bb670626a8a196dc00df6429ae4dcc838b4c4 /test/crossrunner
parentd094e79de7e0bd61320f006c83c0de669363bce8 (diff)
downloadthrift-10308cb975ac090584068d0470b81e41555b2f35.tar.gz
THRIFT-3596 Better conformance to PEP8
This closes #832
Diffstat (limited to 'test/crossrunner')
-rw-r--r--test/crossrunner/collect.py188
-rw-r--r--test/crossrunner/compat.py26
-rw-r--r--test/crossrunner/report.py740
-rw-r--r--test/crossrunner/run.py570
-rw-r--r--test/crossrunner/test.py212
-rw-r--r--test/crossrunner/util.py16
6 files changed, 876 insertions, 876 deletions
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py
index f92b9e2d7..e91ac0b43 100644
--- a/test/crossrunner/collect.py
+++ b/test/crossrunner/collect.py
@@ -40,13 +40,13 @@ from .util import merge_dict
# (e.g. "binary" is equivalent to "binary:binary" in tests.json)
#
VALID_JSON_KEYS = [
- 'name', # name of the library, typically a language name
- 'workdir', # work directory where command is executed
- 'command', # test command
- 'extra_args', # args appended to command after other args are appended
- 'remote_args', # args added to the other side of the program
- 'join_args', # whether args should be passed as single concatenated string
- 'env', # additional environmental variable
+ 'name', # name of the library, typically a language name
+ 'workdir', # work directory where command is executed
+ 'command', # test command
+ 'extra_args', # args appended to command after other args are appended
+ 'remote_args', # args added to the other side of the program
+ 'join_args', # whether args should be passed as single concatenated string
+ 'env', # additional environmental variable
]
DEFAULT_DELAY = 1
@@ -54,102 +54,102 @@ DEFAULT_TIMEOUT = 5
def _collect_testlibs(config, server_match, client_match=[None]):
- """Collects server/client configurations from library configurations"""
- def expand_libs(config):
- for lib in config:
- sv = lib.pop('server', None)
- cl = lib.pop('client', None)
- yield lib, sv, cl
-
- def yield_testlibs(base_configs, configs, match):
- for base, conf in zip(base_configs, configs):
- if conf:
- if not match or base['name'] in match:
- platforms = conf.get('platforms') or base.get('platforms')
- if not platforms or platform.system() in platforms:
- yield merge_dict(base, conf)
-
- libs, svs, cls = zip(*expand_libs(config))
- servers = list(yield_testlibs(libs, svs, server_match))
- clients = list(yield_testlibs(libs, cls, client_match))
- return servers, clients
+ """Collects server/client configurations from library configurations"""
+ def expand_libs(config):
+ for lib in config:
+ sv = lib.pop('server', None)
+ cl = lib.pop('client', None)
+ yield lib, sv, cl
+
+ def yield_testlibs(base_configs, configs, match):
+ for base, conf in zip(base_configs, configs):
+ if conf:
+ if not match or base['name'] in match:
+ platforms = conf.get('platforms') or base.get('platforms')
+ if not platforms or platform.system() in platforms:
+ yield merge_dict(base, conf)
+
+ libs, svs, cls = zip(*expand_libs(config))
+ servers = list(yield_testlibs(libs, svs, server_match))
+ clients = list(yield_testlibs(libs, cls, client_match))
+ return servers, clients
def collect_features(config, match):
- res = list(map(re.compile, match))
- return list(filter(lambda c: any(map(lambda r: r.search(c['name']), res)), config))
+ res = list(map(re.compile, match))
+ return list(filter(lambda c: any(map(lambda r: r.search(c['name']), res)), config))
def _do_collect_tests(servers, clients):
- def intersection(key, o1, o2):
- """intersection of two collections.
- collections are replaced with sets the first time"""
- def cached_set(o, key):
- v = o[key]
- if not isinstance(v, set):
- v = set(v)
- o[key] = v
- return v
- return cached_set(o1, key) & cached_set(o2, key)
-
- def intersect_with_spec(key, o1, o2):
- # store as set of (spec, impl) tuple
- def cached_set(o):
- def to_spec_impl_tuples(values):
- for v in values:
- spec, _, impl = v.partition(':')
- yield spec, impl or spec
- v = o[key]
- if not isinstance(v, set):
- v = set(to_spec_impl_tuples(set(v)))
- o[key] = v
- return v
- for spec1, impl1 in cached_set(o1):
- for spec2, impl2 in cached_set(o2):
- if spec1 == spec2:
- name = impl1 if impl1 == impl2 else '%s-%s' % (impl1, impl2)
- yield name, impl1, impl2
-
- def maybe_max(key, o1, o2, default):
- """maximum of two if present, otherwise defult value"""
- v1 = o1.get(key)
- v2 = o2.get(key)
- return max(v1, v2) if v1 and v2 else v1 or v2 or default
-
- def filter_with_validkeys(o):
- ret = {}
- for key in VALID_JSON_KEYS:
- if key in o:
- ret[key] = o[key]
- return ret
-
- def merge_metadata(o, **ret):
- for key in VALID_JSON_KEYS:
- if key in o:
- ret[key] = o[key]
- return ret
-
- for sv, cl in product(servers, clients):
- for proto, proto1, proto2 in intersect_with_spec('protocols', sv, cl):
- for trans, trans1, trans2 in intersect_with_spec('transports', sv, cl):
- for sock in intersection('sockets', sv, cl):
- yield {
- 'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}),
- 'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}),
- 'delay': maybe_max('delay', sv, cl, DEFAULT_DELAY),
- 'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT),
- 'protocol': proto,
- 'transport': trans,
- 'socket': sock
- }
+ def intersection(key, o1, o2):
+ """intersection of two collections.
+ collections are replaced with sets the first time"""
+ def cached_set(o, key):
+ v = o[key]
+ if not isinstance(v, set):
+ v = set(v)
+ o[key] = v
+ return v
+ return cached_set(o1, key) & cached_set(o2, key)
+
+ def intersect_with_spec(key, o1, o2):
+ # store as set of (spec, impl) tuple
+ def cached_set(o):
+ def to_spec_impl_tuples(values):
+ for v in values:
+ spec, _, impl = v.partition(':')
+ yield spec, impl or spec
+ v = o[key]
+ if not isinstance(v, set):
+ v = set(to_spec_impl_tuples(set(v)))
+ o[key] = v
+ return v
+ for spec1, impl1 in cached_set(o1):
+ for spec2, impl2 in cached_set(o2):
+ if spec1 == spec2:
+ name = impl1 if impl1 == impl2 else '%s-%s' % (impl1, impl2)
+ yield name, impl1, impl2
+
+ def maybe_max(key, o1, o2, default):
+ """maximum of two if present, otherwise defult value"""
+ v1 = o1.get(key)
+ v2 = o2.get(key)
+ return max(v1, v2) if v1 and v2 else v1 or v2 or default
+
+ def filter_with_validkeys(o):
+ ret = {}
+ for key in VALID_JSON_KEYS:
+ if key in o:
+ ret[key] = o[key]
+ return ret
+
+ def merge_metadata(o, **ret):
+ for key in VALID_JSON_KEYS:
+ if key in o:
+ ret[key] = o[key]
+ return ret
+
+ for sv, cl in product(servers, clients):
+ for proto, proto1, proto2 in intersect_with_spec('protocols', sv, cl):
+ for trans, trans1, trans2 in intersect_with_spec('transports', sv, cl):
+ for sock in intersection('sockets', sv, cl):
+ yield {
+ 'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}),
+ 'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}),
+ 'delay': maybe_max('delay', sv, cl, DEFAULT_DELAY),
+ 'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT),
+ 'protocol': proto,
+ 'transport': trans,
+ 'socket': sock
+ }
def collect_cross_tests(tests_dict, server_match, client_match):
- sv, cl = _collect_testlibs(tests_dict, server_match, client_match)
- return list(_do_collect_tests(sv, cl))
+ sv, cl = _collect_testlibs(tests_dict, server_match, client_match)
+ return list(_do_collect_tests(sv, cl))
def collect_feature_tests(tests_dict, features_dict, server_match, feature_match):
- sv, _ = _collect_testlibs(tests_dict, server_match)
- ft = collect_features(features_dict, feature_match)
- return list(_do_collect_tests(sv, ft))
+ sv, _ = _collect_testlibs(tests_dict, server_match)
+ ft = collect_features(features_dict, feature_match)
+ return list(_do_collect_tests(sv, ft))
diff --git a/test/crossrunner/compat.py b/test/crossrunner/compat.py
index 6ab9d713b..f1ca91bb3 100644
--- a/test/crossrunner/compat.py
+++ b/test/crossrunner/compat.py
@@ -2,23 +2,23 @@ import os
import sys
if sys.version_info[0] == 2:
- _ENCODE = sys.getfilesystemencoding()
+ _ENCODE = sys.getfilesystemencoding()
- def path_join(*args):
- bin_args = map(lambda a: a.decode(_ENCODE), args)
- return os.path.join(*bin_args).encode(_ENCODE)
+ def path_join(*args):
+ bin_args = map(lambda a: a.decode(_ENCODE), args)
+ return os.path.join(*bin_args).encode(_ENCODE)
- def str_join(s, l):
- bin_args = map(lambda a: a.decode(_ENCODE), l)
- b = s.decode(_ENCODE)
- return b.join(bin_args).encode(_ENCODE)
+ def str_join(s, l):
+ bin_args = map(lambda a: a.decode(_ENCODE), l)
+ b = s.decode(_ENCODE)
+ return b.join(bin_args).encode(_ENCODE)
- logfile_open = open
+ logfile_open = open
else:
- path_join = os.path.join
- str_join = str.join
+ path_join = os.path.join
+ str_join = str.join
- def logfile_open(*args):
- return open(*args, errors='replace')
+ def logfile_open(*args):
+ return open(*args, errors='replace')
diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py
index be7271cb1..cc5f26fe2 100644
--- a/test/crossrunner/report.py
+++ b/test/crossrunner/report.py
@@ -39,396 +39,396 @@ FAIL_JSON = 'known_failures_%s.json'
def generate_known_failures(testdir, overwrite, save, out):
- def collect_failures(results):
- success_index = 5
- for r in results:
- if not r[success_index]:
- yield TestEntry.get_name(*r)
- try:
- with logfile_open(path_join(testdir, RESULT_JSON), 'r') as fp:
- results = json.load(fp)
- except IOError:
- sys.stderr.write('Unable to load last result. Did you run tests ?\n')
- return False
- fails = collect_failures(results['results'])
- if not overwrite:
- known = load_known_failures(testdir)
- known.extend(fails)
- fails = known
- fails_json = json.dumps(sorted(set(fails)), indent=2, separators=(',', ': '))
- if save:
- with logfile_open(os.path.join(testdir, FAIL_JSON % platform.system()), 'w+') as fp:
- fp.write(fails_json)
- sys.stdout.write('Successfully updated known failures.\n')
- if out:
- sys.stdout.write(fails_json)
- sys.stdout.write('\n')
- return True
+ def collect_failures(results):
+ success_index = 5
+ for r in results:
+ if not r[success_index]:
+ yield TestEntry.get_name(*r)
+ try:
+ with logfile_open(path_join(testdir, RESULT_JSON), 'r') as fp:
+ results = json.load(fp)
+ except IOError:
+ sys.stderr.write('Unable to load last result. Did you run tests ?\n')
+ return False
+ fails = collect_failures(results['results'])
+ if not overwrite:
+ known = load_known_failures(testdir)
+ known.extend(fails)
+ fails = known
+ fails_json = json.dumps(sorted(set(fails)), indent=2, separators=(',', ': '))
+ if save:
+ with logfile_open(os.path.join(testdir, FAIL_JSON % platform.system()), 'w+') as fp:
+ fp.write(fails_json)
+ sys.stdout.write('Successfully updated known failures.\n')
+ if out:
+ sys.stdout.write(fails_json)
+ sys.stdout.write('\n')
+ return True
def load_known_failures(testdir):
- try:
- with logfile_open(path_join(testdir, FAIL_JSON % platform.system()), 'r') as fp:
- return json.load(fp)
- except IOError:
- return []
+ try:
+ with logfile_open(path_join(testdir, FAIL_JSON % platform.system()), 'r') as fp:
+ return json.load(fp)
+ except IOError:
+ return []
class TestReporter(object):
- # Unfortunately, standard library doesn't handle timezone well
- # DATETIME_FORMAT = '%a %b %d %H:%M:%S %Z %Y'
- DATETIME_FORMAT = '%a %b %d %H:%M:%S %Y'
+ # Unfortunately, standard library doesn't handle timezone well
+ # DATETIME_FORMAT = '%a %b %d %H:%M:%S %Z %Y'
+ DATETIME_FORMAT = '%a %b %d %H:%M:%S %Y'
- def __init__(self):
- self._log = multiprocessing.get_logger()
- self._lock = multiprocessing.Lock()
+ def __init__(self):
+ self._log = multiprocessing.get_logger()
+ self._lock = multiprocessing.Lock()
- @classmethod
- def test_logfile(cls, test_name, prog_kind, dir=None):
- relpath = path_join('log', '%s_%s.log' % (test_name, prog_kind))
- return relpath if not dir else os.path.realpath(path_join(dir, relpath))
+ @classmethod
+ def test_logfile(cls, test_name, prog_kind, dir=None):
+ relpath = path_join('log', '%s_%s.log' % (test_name, prog_kind))
+ return relpath if not dir else os.path.realpath(path_join(dir, relpath))
- def _start(self):
- self._start_time = time.time()
+ def _start(self):
+ self._start_time = time.time()
- @property
- def _elapsed(self):
- return time.time() - self._start_time
+ @property
+ def _elapsed(self):
+ return time.time() - self._start_time
- @classmethod
- def _format_date(cls):
- return '%s' % datetime.datetime.now().strftime(cls.DATETIME_FORMAT)
+ @classmethod
+ def _format_date(cls):
+ return '%s' % datetime.datetime.now().strftime(cls.DATETIME_FORMAT)
- def _print_date(self):
- print(self._format_date(), file=self.out)
+ def _print_date(self):
+ print(self._format_date(), file=self.out)
- def _print_bar(self, out=None):
- print(
- '==========================================================================',
- file=(out or self.out))
+ def _print_bar(self, out=None):
+ print(
+ '==========================================================================',
+ file=(out or self.out))
- def _print_exec_time(self):
- print('Test execution took {:.1f} seconds.'.format(self._elapsed), file=self.out)
+ def _print_exec_time(self):
+ print('Test execution took {:.1f} seconds.'.format(self._elapsed), file=self.out)
class ExecReporter(TestReporter):
- def __init__(self, testdir, test, prog):
- super(ExecReporter, self).__init__()
- self._test = test
- self._prog = prog
- self.logpath = self.test_logfile(test.name, prog.kind, testdir)
- self.out = None
-
- def begin(self):
- self._start()
- self._open()
- if self.out and not self.out.closed:
- self._print_header()
- else:
- self._log.debug('Output stream is not available.')
-
- def end(self, returncode):
- self._lock.acquire()
- try:
- if self.out and not self.out.closed:
- self._print_footer(returncode)
- self._close()
+ def __init__(self, testdir, test, prog):
+ super(ExecReporter, self).__init__()
+ self._test = test
+ self._prog = prog
+ self.logpath = self.test_logfile(test.name, prog.kind, testdir)
self.out = None
- else:
- self._log.debug('Output stream is not available.')
- finally:
- self._lock.release()
-
- def killed(self):
- print(file=self.out)
- print('Server process is successfully killed.', file=self.out)
- self.end(None)
-
- def died(self):
- print(file=self.out)
- print('*** Server process has died unexpectedly ***', file=self.out)
- self.end(None)
-
- _init_failure_exprs = {
- 'server': list(map(re.compile, [
- '[Aa]ddress already in use',
- 'Could not bind',
- 'EADDRINUSE',
- ])),
- 'client': list(map(re.compile, [
- '[Cc]onnection refused',
- 'Could not connect to localhost',
- 'ECONNREFUSED',
- 'No such file or directory', # domain socket
- ])),
- }
-
- def maybe_false_positive(self):
- """Searches through log file for socket bind error.
- Returns True if suspicious expression is found, otherwise False"""
- try:
- if self.out and not self.out.closed:
+
+ def begin(self):
+ self._start()
+ self._open()
+ if self.out and not self.out.closed:
+ self._print_header()
+ else:
+ self._log.debug('Output stream is not available.')
+
+ def end(self, returncode):
+ self._lock.acquire()
+ try:
+ if self.out and not self.out.closed:
+ self._print_footer(returncode)
+ self._close()
+ self.out = None
+ else:
+ self._log.debug('Output stream is not available.')
+ finally:
+ self._lock.release()
+
+ def killed(self):
+ print(file=self.out)
+ print('Server process is successfully killed.', file=self.out)
+ self.end(None)
+
+ def died(self):
+ print(file=self.out)
+ print('*** Server process has died unexpectedly ***', file=self.out)
+ self.end(None)
+
+ _init_failure_exprs = {
+ 'server': list(map(re.compile, [
+ '[Aa]ddress already in use',
+ 'Could not bind',
+ 'EADDRINUSE',
+ ])),
+ 'client': list(map(re.compile, [
+ '[Cc]onnection refused',
+ 'Could not connect to localhost',
+ 'ECONNREFUSED',
+ 'No such file or directory', # domain socket
+ ])),
+ }
+
+ def maybe_false_positive(self):
+ """Searches through log file for socket bind error.
+ Returns True if suspicious expression is found, otherwise False"""
+ try:
+ if self.out and not self.out.closed:
+ self.out.flush()
+ exprs = self._init_failure_exprs[self._prog.kind]
+
+ def match(line):
+ for expr in exprs:
+ if expr.search(line):
+ return True
+
+ with logfile_open(self.logpath, 'r') as fp:
+ if any(map(match, fp)):
+ return True
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except Exception as ex:
+ self._log.warn('[%s]: Error while detecting false positive: %s' % (self._test.name, str(ex)))
+ self._log.info(traceback.print_exc())
+ return False
+
+ def _open(self):
+ self.out = logfile_open(self.logpath, 'w+')
+
+ def _close(self):
+ self.out.close()
+
+ def _print_header(self):
+ self._print_date()
+ print('Executing: %s' % str_join(' ', self._prog.command), file=self.out)
+ print('Directory: %s' % self._prog.workdir, file=self.out)
+ print('config:delay: %s' % self._test.delay, file=self.out)
+ print('config:timeout: %s' % self._test.timeout, file=self.out)
+ self._print_bar()
self.out.flush()
- exprs = self._init_failure_exprs[self._prog.kind]
-
- def match(line):
- for expr in exprs:
- if expr.search(line):
- return True
-
- with logfile_open(self.logpath, 'r') as fp:
- if any(map(match, fp)):
- return True
- except (KeyboardInterrupt, SystemExit):
- raise
- except Exception as ex:
- self._log.warn('[%s]: Error while detecting false positive: %s' % (self._test.name, str(ex)))
- self._log.info(traceback.print_exc())
- return False
-
- def _open(self):
- self.out = logfile_open(self.logpath, 'w+')
-
- def _close(self):
- self.out.close()
-
- def _print_header(self):
- self._print_date()
- print('Executing: %s' % str_join(' ', self._prog.command), file=self.out)
- print('Directory: %s' % self._prog.workdir, file=self.out)
- print('config:delay: %s' % self._test.delay, file=self.out)
- print('config:timeout: %s' % self._test.timeout, file=self.out)
- self._print_bar()
- self.out.flush()
-
- def _print_footer(self, returncode=None):
- self._print_bar()
- if returncode is not None:
- print('Return code: %d' % returncode, file=self.out)
- else:
- print('Process is killed.', file=self.out)
- self._print_exec_time()
- self._print_date()
+ def _print_footer(self, returncode=None):
+ self._print_bar()
+ if returncode is not None:
+ print('Return code: %d' % returncode, file=self.out)
+ else:
+ print('Process is killed.', file=self.out)
+ self._print_exec_time()
+ self._print_date()
-class SummaryReporter(TestReporter):
- def __init__(self, basedir, testdir_relative, concurrent=True):
- super(SummaryReporter, self).__init__()
- self._basedir = basedir
- self._testdir_rel = testdir_relative
- self.logdir = path_join(self.testdir, LOG_DIR)
- self.out_path = path_join(self.testdir, RESULT_JSON)
- self.concurrent = concurrent
- self.out = sys.stdout
- self._platform = platform.system()
- self._revision = self._get_revision()
- self._tests = []
- if not os.path.exists(self.logdir):
- os.mkdir(self.logdir)
- self._known_failures = load_known_failures(self.testdir)
- self._unexpected_success = []
- self._flaky_success = []
- self._unexpected_failure = []
- self._expected_failure = []
- self._print_header()
-
- @property
- def testdir(self):
- return path_join(self._basedir, self._testdir_rel)
-
- def _result_string(self, test):
- if test.success:
- if test.retry_count == 0:
- return 'success'
- elif test.retry_count == 1:
- return 'flaky(1 retry)'
- else:
- return 'flaky(%d retries)' % test.retry_count
- elif test.expired:
- return 'failure(timeout)'
- else:
- return 'failure(%d)' % test.returncode
-
- def _get_revision(self):
- p = subprocess.Popen(['git', 'rev-parse', '--short', 'HEAD'],
- cwd=self.testdir, stdout=subprocess.PIPE)
- out, _ = p.communicate()
- return out.strip()
-
- def _format_test(self, test, with_result=True):
- name = '%s-%s' % (test.server.name, test.client.name)
- trans = '%s-%s' % (test.transport, test.socket)
- if not with_result:
- return '{:24s}{:13s}{:25s}'.format(name[:23], test.protocol[:12], trans[:24])
- else:
- return '{:24s}{:13s}{:25s}{:s}\n'.format(name[:23], test.protocol[:12], trans[:24], self._result_string(test))
-
- def _print_test_header(self):
- self._print_bar()
- print(
- '{:24s}{:13s}{:25s}{:s}'.format('server-client:', 'protocol:', 'transport:', 'result:'),
- file=self.out)
-
- def _print_header(self):
- self._start()
- print('Apache Thrift - Integration Test Suite', file=self.out)
- self._print_date()
- self._print_test_header()
-
- def _print_unexpected_failure(self):
- if len(self._unexpected_failure) > 0:
- self.out.writelines([
- '*** Following %d failures were unexpected ***:\n' % len(self._unexpected_failure),
- 'If it is introduced by you, please fix it before submitting the code.\n',
- # 'If not, please report at https://issues.apache.org/jira/browse/THRIFT\n',
- ])
- self._print_test_header()
- for i in self._unexpected_failure:
- self.out.write(self._format_test(self._tests[i]))
- self._print_bar()
- else:
- print('No unexpected failures.', file=self.out)
-
- def _print_flaky_success(self):
- if len(self._flaky_success) > 0:
- print(
- 'Following %d tests were expected to cleanly succeed but needed retry:' % len(self._flaky_success),
- file=self.out)
- self._print_test_header()
- for i in self._flaky_success:
- self.out.write(self._format_test(self._tests[i]))
- self._print_bar()
-
- def _print_unexpected_success(self):
- if len(self._unexpected_success) > 0:
- print(
- 'Following %d tests were known to fail but succeeded (maybe flaky):' % len(self._unexpected_success),
- file=self.out)
- self._print_test_header()
- for i in self._unexpected_success:
- self.out.write(self._format_test(self._tests[i]))
- self._print_bar()
-
- def _http_server_command(self, port):
- if sys.version_info[0] < 3:
- return 'python -m SimpleHTTPServer %d' % port
- else:
- return 'python -m http.server %d' % port
-
- def _print_footer(self):
- fail_count = len(self._expected_failure) + len(self._unexpected_failure)
- self._print_bar()
- self._print_unexpected_success()
- self._print_flaky_success()
- self._print_unexpected_failure()
- self._write_html_data()
- self._assemble_log('unexpected failures', self._unexpected_failure)
- self._assemble_log('known failures', self._expected_failure)
- self.out.writelines([
- 'You can browse results at:\n',
- '\tfile://%s/%s\n' % (self.testdir, RESULT_HTML),
- '# If you use Chrome, run:\n',
- '# \tcd %s\n#\t%s\n' % (self._basedir, self._http_server_command(8001)),
- '# then browse:\n',
- '# \thttp://localhost:%d/%s/\n' % (8001, self._testdir_rel),
- 'Full log for each test is here:\n',
- '\ttest/log/client_server_protocol_transport_client.log\n',
- '\ttest/log/client_server_protocol_transport_server.log\n',
- '%d failed of %d tests in total.\n' % (fail_count, len(self._tests)),
- ])
- self._print_exec_time()
- self._print_date()
-
- def _render_result(self, test):
- return [
- test.server.name,
- test.client.name,
- test.protocol,
- test.transport,
- test.socket,
- test.success,
- test.as_expected,
- test.returncode,
- {
- 'server': self.test_logfile(test.name, test.server.kind),
- 'client': self.test_logfile(test.name, test.client.kind),
- },
- ]
-
- def _write_html_data(self):
- """Writes JSON data to be read by result html"""
- results = [self._render_result(r) for r in self._tests]
- with logfile_open(self.out_path, 'w+') as fp:
- fp.write(json.dumps({
- 'date': self._format_date(),
- 'revision': str(self._revision),
- 'platform': self._platform,
- 'duration': '{:.1f}'.format(self._elapsed),
- 'results': results,
- }, indent=2))
-
- def _assemble_log(self, title, indexes):
- if len(indexes) > 0:
- def add_prog_log(fp, test, prog_kind):
- print('*************************** %s message ***************************' % prog_kind,
- file=fp)
- path = self.test_logfile(test.name, prog_kind, self.testdir)
- if os.path.exists(path):
- with logfile_open(path, 'r') as prog_fp:
- print(prog_fp.read(), file=fp)
- filename = title.replace(' ', '_') + '.log'
- with logfile_open(os.path.join(self.logdir, filename), 'w+') as fp:
- for test in map(self._tests.__getitem__, indexes):
- fp.write('TEST: [%s]\n' % test.name)
- add_prog_log(fp, test, test.server.kind)
- add_prog_log(fp, test, test.client.kind)
- fp.write('**********************************************************************\n\n')
- print('%s are logged to %s/%s/%s' % (title.capitalize(), self._testdir_rel, LOG_DIR, filename))
-
- def end(self):
- self._print_footer()
- return len(self._unexpected_failure) == 0
-
- def add_test(self, test_dict):
- test = TestEntry(self.testdir, **test_dict)
- self._lock.acquire()
- try:
- if not self.concurrent:
- self.out.write(self._format_test(test, False))
- self.out.flush()
- self._tests.append(test)
- return len(self._tests) - 1
- finally:
- self._lock.release()
- def add_result(self, index, returncode, expired, retry_count):
- self._lock.acquire()
- try:
- failed = returncode is None or returncode != 0
- flaky = not failed and retry_count != 0
- test = self._tests[index]
- known = test.name in self._known_failures
- if failed:
- if known:
- self._log.debug('%s failed as expected' % test.name)
- self._expected_failure.append(index)
+class SummaryReporter(TestReporter):
+ def __init__(self, basedir, testdir_relative, concurrent=True):
+ super(SummaryReporter, self).__init__()
+ self._basedir = basedir
+ self._testdir_rel = testdir_relative
+ self.logdir = path_join(self.testdir, LOG_DIR)
+ self.out_path = path_join(self.testdir, RESULT_JSON)
+ self.concurrent = concurrent
+ self.out = sys.stdout
+ self._platform = platform.system()
+ self._revision = self._get_revision()
+ self._tests = []
+ if not os.path.exists(self.logdir):
+ os.mkdir(self.logdir)
+ self._known_failures = load_known_failures(self.testdir)
+ self._unexpected_success = []
+ self._flaky_success = []
+ self._unexpected_failure = []
+ self._expected_failure = []
+ self._print_header()
+
+ @property
+ def testdir(self):
+ return path_join(self._basedir, self._testdir_rel)
+
+ def _result_string(self, test):
+ if test.success:
+ if test.retry_count == 0:
+ return 'success'
+ elif test.retry_count == 1:
+ return 'flaky(1 retry)'
+ else:
+ return 'flaky(%d retries)' % test.retry_count
+ elif test.expired:
+ return 'failure(timeout)'
+ else:
+ return 'failure(%d)' % test.returncode
+
+ def _get_revision(self):
+ p = subprocess.Popen(['git', 'rev-parse', '--short', 'HEAD'],
+ cwd=self.testdir, stdout=subprocess.PIPE)
+ out, _ = p.communicate()
+ return out.strip()
+
+ def _format_test(self, test, with_result=True):
+ name = '%s-%s' % (test.server.name, test.client.name)
+ trans = '%s-%s' % (test.transport, test.socket)
+ if not with_result:
+ return '{:24s}{:13s}{:25s}'.format(name[:23], test.protocol[:12], trans[:24])
+ else:
+ return '{:24s}{:13s}{:25s}{:s}\n'.format(name[:23], test.protocol[:12], trans[:24], self._result_string(test))
+
+ def _print_test_header(self):
+ self._print_bar()
+ print(
+ '{:24s}{:13s}{:25s}{:s}'.format('server-client:', 'protocol:', 'transport:', 'result:'),
+ file=self.out)
+
+ def _print_header(self):
+ self._start()
+ print('Apache Thrift - Integration Test Suite', file=self.out)
+ self._print_date()
+ self._print_test_header()
+
+ def _print_unexpected_failure(self):
+ if len(self._unexpected_failure) > 0:
+ self.out.writelines([
+ '*** Following %d failures were unexpected ***:\n' % len(self._unexpected_failure),
+ 'If it is introduced by you, please fix it before submitting the code.\n',
+ # 'If not, please report at https://issues.apache.org/jira/browse/THRIFT\n',
+ ])
+ self._print_test_header()
+ for i in self._unexpected_failure:
+ self.out.write(self._format_test(self._tests[i]))
+ self._print_bar()
+ else:
+ print('No unexpected failures.', file=self.out)
+
+ def _print_flaky_success(self):
+ if len(self._flaky_success) > 0:
+ print(
+ 'Following %d tests were expected to cleanly succeed but needed retry:' % len(self._flaky_success),
+ file=self.out)
+ self._print_test_header()
+ for i in self._flaky_success:
+ self.out.write(self._format_test(self._tests[i]))
+ self._print_bar()
+
+ def _print_unexpected_success(self):
+ if len(self._unexpected_success) > 0:
+ print(
+ 'Following %d tests were known to fail but succeeded (maybe flaky):' % len(self._unexpected_success),
+ file=self.out)
+ self._print_test_header()
+ for i in self._unexpected_success:
+ self.out.write(self._format_test(self._tests[i]))
+ self._print_bar()
+
+ def _http_server_command(self, port):
+ if sys.version_info[0] < 3:
+ return 'python -m SimpleHTTPServer %d' % port
else:
- self._log.info('unexpected failure: %s' % test.name)
- self._unexpected_failure.append(index)
- elif flaky and not known:
- self._log.info('unexpected flaky success: %s' % test.name)
- self._flaky_success.append(index)
- elif not flaky and known:
- self._log.info('unexpected success: %s' % test.name)
- self._unexpected_success.append(index)
- test.success = not failed
- test.returncode = returncode
- test.retry_count = retry_count
- test.expired = expired
- test.as_expected = known == failed
- if not self.concurrent:
- self.out.write(self._result_string(test) + '\n')
- else:
- self.out.write(self._format_test(test))
- finally:
- self._lock.release()
+ return 'python -m http.server %d' % port
+
+ def _print_footer(self):
+ fail_count = len(self._expected_failure) + len(self._unexpected_failure)
+ self._print_bar()
+ self._print_unexpected_success()
+ self._print_flaky_success()
+ self._print_unexpected_failure()
+ self._write_html_data()
+ self._assemble_log('unexpected failures', self._unexpected_failure)
+ self._assemble_log('known failures', self._expected_failure)
+ self.out.writelines([
+ 'You can browse results at:\n',
+ '\tfile://%s/%s\n' % (self.testdir, RESULT_HTML),
+ '# If you use Chrome, run:\n',
+ '# \tcd %s\n#\t%s\n' % (self._basedir, self._http_server_command(8001)),
+ '# then browse:\n',
+ '# \thttp://localhost:%d/%s/\n' % (8001, self._testdir_rel),
+ 'Full log for each test is here:\n',
+ '\ttest/log/client_server_protocol_transport_client.log\n',
+ '\ttest/log/client_server_protocol_transport_server.log\n',
+ '%d failed of %d tests in total.\n' % (fail_count, len(self._tests)),
+ ])
+ self._print_exec_time()
+ self._print_date()
+
+ def _render_result(self, test):
+ return [
+ test.server.name,
+ test.client.name,
+ test.protocol,
+ test.transport,
+ test.socket,
+ test.success,
+ test.as_expected,
+ test.returncode,
+ {
+ 'server': self.test_logfile(test.name, test.server.kind),
+ 'client': self.test_logfile(test.name, test.client.kind),
+ },
+ ]
+
+ def _write_html_data(self):
+ """Writes JSON data to be read by result html"""
+ results = [self._render_result(r) for r in self._tests]
+ with logfile_open(self.out_path, 'w+') as fp:
+ fp.write(json.dumps({
+ 'date': self._format_date(),
+ 'revision': str(self._revision),
+ 'platform': self._platform,
+ 'duration': '{:.1f}'.format(self._elapsed),
+ 'results': results,
+ }, indent=2))
+
+ def _assemble_log(self, title, indexes):
+ if len(indexes) > 0:
+ def add_prog_log(fp, test, prog_kind):
+ print('*************************** %s message ***************************' % prog_kind,
+ file=fp)
+ path = self.test_logfile(test.name, prog_kind, self.testdir)
+ if os.path.exists(path):
+ with logfile_open(path, 'r') as prog_fp:
+ print(prog_fp.read(), file=fp)
+ filename = title.replace(' ', '_') + '.log'
+ with logfile_open(os.path.join(self.logdir, filename), 'w+') as fp:
+ for test in map(self._tests.__getitem__, indexes):
+ fp.write('TEST: [%s]\n' % test.name)
+ add_prog_log(fp, test, test.server.kind)
+ add_prog_log(fp, test, test.client.kind)
+ fp.write('**********************************************************************\n\n')
+ print('%s are logged to %s/%s/%s' % (title.capitalize(), self._testdir_rel, LOG_DIR, filename))
+
+ def end(self):
+ self._print_footer()
+ return len(self._unexpected_failure) == 0
+
+ def add_test(self, test_dict):
+ test = TestEntry(self.testdir, **test_dict)
+ self._lock.acquire()
+ try:
+ if not self.concurrent:
+ self.out.write(self._format_test(test, False))
+ self.out.flush()
+ self._tests.append(test)
+ return len(self._tests) - 1
+ finally:
+ self._lock.release()
+
+ def add_result(self, index, returncode, expired, retry_count):
+ self._lock.acquire()
+ try:
+ failed = returncode is None or returncode != 0
+ flaky = not failed and retry_count != 0
+ test = self._tests[index]
+ known = test.name in self._known_failures
+ if failed:
+ if known:
+ self._log.debug('%s failed as expected' % test.name)
+ self._expected_failure.append(index)
+ else:
+ self._log.info('unexpected failure: %s' % test.name)
+ self._unexpected_failure.append(index)
+ elif flaky and not known:
+ self._log.info('unexpected flaky success: %s' % test.name)
+ self._flaky_success.append(index)
+ elif not flaky and known:
+ self._log.info('unexpected success: %s' % test.name)
+ self._unexpected_success.append(index)
+ test.success = not failed
+ test.returncode = returncode
+ test.retry_count = retry_count
+ test.expired = expired
+ test.as_expected = known == failed
+ if not self.concurrent:
+ self.out.write(self._result_string(test) + '\n')
+ else:
+ self.out.write(self._format_test(test))
+ finally:
+ self._lock.release()
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index 68bd92869..18c162357 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -39,307 +39,307 @@ RESULT_ERROR = 64
class ExecutionContext(object):
- def __init__(self, cmd, cwd, env, report):
- self._log = multiprocessing.get_logger()
- self.report = report
- self.cmd = cmd
- self.cwd = cwd
- self.env = env
- self.timer = None
- self.expired = False
- self.killed = False
-
- def _expire(self):
- self._log.info('Timeout')
- self.expired = True
- self.kill()
-
- def kill(self):
- self._log.debug('Killing process : %d' % self.proc.pid)
- self.killed = True
- if platform.system() != 'Windows':
- try:
- os.killpg(self.proc.pid, signal.SIGKILL)
- except Exception:
- self._log.info('Failed to kill process group', exc_info=sys.exc_info())
- try:
- self.proc.kill()
- except Exception:
- self._log.info('Failed to kill process', exc_info=sys.exc_info())
-
- def _popen_args(self):
- args = {
- 'cwd': self.cwd,
- 'env': self.env,
- 'stdout': self.report.out,
- 'stderr': subprocess.STDOUT,
- }
- # make sure child processes doesn't remain after killing
- if platform.system() == 'Windows':
- DETACHED_PROCESS = 0x00000008
- args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
- else:
- args.update(preexec_fn=os.setsid)
- return args
-
- def start(self, timeout=0):
- joined = str_join(' ', self.cmd)
- self._log.debug('COMMAND: %s', joined)
- self._log.debug('WORKDIR: %s', self.cwd)
- self._log.debug('LOGFILE: %s', self.report.logpath)
- self.report.begin()
- self.proc = subprocess.Popen(self.cmd, **self._popen_args())
- if timeout > 0:
- self.timer = threading.Timer(timeout, self._expire)
- self.timer.start()
- return self._scoped()
-
- @contextlib.contextmanager
- def _scoped(self):
- yield self
- self._log.debug('Killing scoped process')
- if self.proc.poll() is None:
- self.kill()
- self.report.killed()
- else:
- self._log.debug('Process died unexpectedly')
- self.report.died()
-
- def wait(self):
- self.proc.communicate()
- if self.timer:
- self.timer.cancel()
- self.report.end(self.returncode)
-
- @property
- def returncode(self):
- return self.proc.returncode if self.proc else None
+ def __init__(self, cmd, cwd, env, report):
+ self._log = multiprocessing.get_logger()
+ self.report = report
+ self.cmd = cmd
+ self.cwd = cwd
+ self.env = env
+ self.timer = None
+ self.expired = False
+ self.killed = False
+
+ def _expire(self):
+ self._log.info('Timeout')
+ self.expired = True
+ self.kill()
+
+ def kill(self):
+ self._log.debug('Killing process : %d' % self.proc.pid)
+ self.killed = True
+ if platform.system() != 'Windows':
+ try:
+ os.killpg(self.proc.pid, signal.SIGKILL)
+ except Exception:
+ self._log.info('Failed to kill process group', exc_info=sys.exc_info())
+ try:
+ self.proc.kill()
+ except Exception:
+ self._log.info('Failed to kill process', exc_info=sys.exc_info())
+
+ def _popen_args(self):
+ args = {
+ 'cwd': self.cwd,
+ 'env': self.env,
+ 'stdout': self.report.out,
+ 'stderr': subprocess.STDOUT,
+ }
+ # make sure child processes doesn't remain after killing
+ if platform.system() == 'Windows':
+ DETACHED_PROCESS = 0x00000008
+ args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
+ else:
+ args.update(preexec_fn=os.setsid)
+ return args
+
+ def start(self, timeout=0):
+ joined = str_join(' ', self.cmd)
+ self._log.debug('COMMAND: %s', joined)
+ self._log.debug('WORKDIR: %s', self.cwd)
+ self._log.debug('LOGFILE: %s', self.report.logpath)
+ self.report.begin()
+ self.proc = subprocess.Popen(self.cmd, **self._popen_args())
+ if timeout > 0:
+ self.timer = threading.Timer(timeout, self._expire)
+ self.timer.start()
+ return self._scoped()
+
+ @contextlib.contextmanager
+ def _scoped(self):
+ yield self
+ self._log.debug('Killing scoped process')
+ if self.proc.poll() is None:
+ self.kill()
+ self.report.killed()
+ else:
+ self._log.debug('Process died unexpectedly')
+ self.report.died()
+
+ def wait(self):
+ self.proc.communicate()
+ if self.timer:
+ self.timer.cancel()
+ self.report.end(self.returncode)
+
+ @property
+ def returncode(self):
+ return self.proc.returncode if self.proc else None
def exec_context(port, logdir, test, prog):
- report = ExecReporter(logdir, test, prog)
- prog.build_command(port)
- return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+ report = ExecReporter(logdir, test, prog)
+ prog.build_command(port)
+ return ExecutionContext(prog.command, prog.workdir, prog.env, report)
def run_test(testdir, logdir, test_dict, max_retry, async=True):
- try:
- logger = multiprocessing.get_logger()
- max_bind_retry = 3
- retry_count = 0
- bind_retry_count = 0
- test = TestEntry(testdir, **test_dict)
- while True:
- if stop.is_set():
- logger.debug('Skipping because shutting down')
- return (retry_count, None)
- logger.debug('Start')
- with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
- logger.debug('Start with port %d' % port)
- sv = exec_context(port, logdir, test, test.server)
- cl = exec_context(port, logdir, test, test.client)
-
- logger.debug('Starting server')
- with sv.start():
- if test.delay > 0:
- logger.debug('Delaying client for %.2f seconds' % test.delay)
- time.sleep(test.delay)
- connect_retry_count = 0
- max_connect_retry = 10
- connect_retry_wait = 0.5
- while True:
- logger.debug('Starting client')
- cl.start(test.timeout)
- logger.debug('Waiting client')
- cl.wait()
- if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
- if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
- logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
- # Wait for 50ms to see if server does not die at the end.
- time.sleep(0.05)
- break
- logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait)
- time.sleep(connect_retry_wait)
- connect_retry_count += 1
-
- if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry:
- logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
- bind_retry_count += 1
- else:
- if cl.expired:
- result = RESULT_TIMEOUT
- elif not sv.killed and cl.proc.returncode == 0:
- # Server should be alive at the end.
- result = RESULT_ERROR
- else:
- result = cl.proc.returncode
-
- if result == 0 or retry_count >= max_retry:
- return (retry_count, result)
- else:
- logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name)
- retry_count += 1
- except (KeyboardInterrupt, SystemExit):
- logger.info('Interrupted execution')
- if not async:
- raise
- stop.set()
- return None
- except:
- if not async:
- raise
- logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info())
- return (retry_count, RESULT_ERROR)
+ try:
+ logger = multiprocessing.get_logger()
+ max_bind_retry = 3
+ retry_count = 0
+ bind_retry_count = 0
+ test = TestEntry(testdir, **test_dict)
+ while True:
+ if stop.is_set():
+ logger.debug('Skipping because shutting down')
+ return (retry_count, None)
+ logger.debug('Start')
+ with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
+ logger.debug('Start with port %d' % port)
+ sv = exec_context(port, logdir, test, test.server)
+ cl = exec_context(port, logdir, test, test.client)
+
+ logger.debug('Starting server')
+ with sv.start():
+ if test.delay > 0:
+ logger.debug('Delaying client for %.2f seconds' % test.delay)
+ time.sleep(test.delay)
+ connect_retry_count = 0
+ max_connect_retry = 10
+ connect_retry_wait = 0.5
+ while True:
+ logger.debug('Starting client')
+ cl.start(test.timeout)
+ logger.debug('Waiting client')
+ cl.wait()
+ if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
+ if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
+ logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+ # Wait for 50ms to see if server does not die at the end.
+ time.sleep(0.05)
+ break
+ logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait)
+ time.sleep(connect_retry_wait)
+ connect_retry_count += 1
+
+ if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry:
+ logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
+ bind_retry_count += 1
+ else:
+ if cl.expired:
+ result = RESULT_TIMEOUT
+ elif not sv.killed and cl.proc.returncode == 0:
+ # Server should be alive at the end.
+ result = RESULT_ERROR
+ else:
+ result = cl.proc.returncode
+
+ if result == 0 or retry_count >= max_retry:
+ return (retry_count, result)
+ else:
+ logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name)
+ retry_count += 1
+ except (KeyboardInterrupt, SystemExit):
+ logger.info('Interrupted execution')
+ if not async:
+ raise
+ stop.set()
+ return None
+ except:
+ if not async:
+ raise
+ logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info())
+ return (retry_count, RESULT_ERROR)
class PortAllocator(object):
- def __init__(self):
- self._log = multiprocessing.get_logger()
- self._lock = multiprocessing.Lock()
- self._ports = set()
- self._dom_ports = set()
- self._last_alloc = 0
-
- def _get_tcp_port(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(('127.0.0.1', 0))
- port = sock.getsockname()[1]
- self._lock.acquire()
- try:
- ok = port not in self._ports
- if ok:
- self._ports.add(port)
- self._last_alloc = time.time()
- finally:
- self._lock.release()
- sock.close()
- return port if ok else self._get_tcp_port()
-
- def _get_domain_port(self):
- port = random.randint(1024, 65536)
- self._lock.acquire()
- try:
- ok = port not in self._dom_ports
- if ok:
- self._dom_ports.add(port)
- finally:
- self._lock.release()
- return port if ok else self._get_domain_port()
-
- def alloc_port(self, socket_type):
- if socket_type in ('domain', 'abstract'):
- return self._get_domain_port()
- else:
- return self._get_tcp_port()
-
- # static method for inter-process invokation
- @staticmethod
- @contextlib.contextmanager
- def alloc_port_scoped(allocator, socket_type):
- port = allocator.alloc_port(socket_type)
- yield port
- allocator.free_port(socket_type, port)
-
- def free_port(self, socket_type, port):
- self._log.debug('free_port')
- self._lock.acquire()
- try:
- if socket_type == 'domain':
- self._dom_ports.remove(port)
- path = domain_socket_path(port)
- if os.path.exists(path):
- os.remove(path)
- elif socket_type == 'abstract':
- self._dom_ports.remove(port)
- else:
- self._ports.remove(port)
- except IOError:
- self._log.info('Error while freeing port', exc_info=sys.exc_info())
- finally:
- self._lock.release()
+ def __init__(self):
+ self._log = multiprocessing.get_logger()
+ self._lock = multiprocessing.Lock()
+ self._ports = set()
+ self._dom_ports = set()
+ self._last_alloc = 0
+
+ def _get_tcp_port(self):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind(('127.0.0.1', 0))
+ port = sock.getsockname()[1]
+ self._lock.acquire()
+ try:
+ ok = port not in self._ports
+ if ok:
+ self._ports.add(port)
+ self._last_alloc = time.time()
+ finally:
+ self._lock.release()
+ sock.close()
+ return port if ok else self._get_tcp_port()
+
+ def _get_domain_port(self):
+ port = random.randint(1024, 65536)
+ self._lock.acquire()
+ try:
+ ok = port not in self._dom_ports
+ if ok:
+ self._dom_ports.add(port)
+ finally:
+ self._lock.release()
+ return port if ok else self._get_domain_port()
+
+ def alloc_port(self, socket_type):
+ if socket_type in ('domain', 'abstract'):
+ return self._get_domain_port()
+ else:
+ return self._get_tcp_port()
+
+ # static method for inter-process invokation
+ @staticmethod
+ @contextlib.contextmanager
+ def alloc_port_scoped(allocator, socket_type):
+ port = allocator.alloc_port(socket_type)
+ yield port
+ allocator.free_port(socket_type, port)
+
+ def free_port(self, socket_type, port):
+ self._log.debug('free_port')
+ self._lock.acquire()
+ try:
+ if socket_type == 'domain':
+ self._dom_ports.remove(port)
+ path = domain_socket_path(port)
+ if os.path.exists(path):
+ os.remove(path)
+ elif socket_type == 'abstract':
+ self._dom_ports.remove(port)
+ else:
+ self._ports.remove(port)
+ except IOError:
+ self._log.info('Error while freeing port', exc_info=sys.exc_info())
+ finally:
+ self._lock.release()
class NonAsyncResult(object):
- def __init__(self, value):
- self._value = value
+ def __init__(self, value):
+ self._value = value
- def get(self, timeout=None):
- return self._value
+ def get(self, timeout=None):
+ return self._value
- def wait(self, timeout=None):
- pass
+ def wait(self, timeout=None):
+ pass
- def ready(self):
- return True
+ def ready(self):
+ return True
- def successful(self):
- return self._value == 0
+ def successful(self):
+ return self._value == 0
class TestDispatcher(object):
- def __init__(self, testdir, basedir, logdir_rel, concurrency):
- self._log = multiprocessing.get_logger()
- self.testdir = testdir
- self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1)
- self.logdir = self._report.testdir
- # seems needed for python 2.x to handle keyboard interrupt
- self._stop = multiprocessing.Event()
- self._async = concurrency > 1
- if not self._async:
- self._pool = None
- global stop
- global ports
- stop = self._stop
- ports = PortAllocator()
- else:
- self._m = multiprocessing.managers.BaseManager()
- self._m.register('ports', PortAllocator)
- self._m.start()
- self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
- self._log.debug(
- 'TestDispatcher started with %d concurrent jobs' % concurrency)
-
- def _pool_init(self, address):
- global stop
- global m
- global ports
- stop = self._stop
- m = multiprocessing.managers.BaseManager(address)
- m.connect()
- ports = m.ports()
-
- def _dispatch_sync(self, test, cont, max_retry):
- r = run_test(self.testdir, self.logdir, test, max_retry, False)
- cont(r)
- return NonAsyncResult(r)
-
- def _dispatch_async(self, test, cont, max_retry):
- self._log.debug('_dispatch_async')
- return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont)
-
- def dispatch(self, test, max_retry):
- index = self._report.add_test(test)
-
- def cont(result):
- if not self._stop.is_set():
- retry_count, returncode = result
- self._log.debug('freeing port')
- self._log.debug('adding result')
- self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count)
- self._log.debug('finish continuation')
- fn = self._dispatch_async if self._async else self._dispatch_sync
- return fn(test, cont, max_retry)
-
- def wait(self):
- if self._async:
- self._pool.close()
- self._pool.join()
- self._m.shutdown()
- return self._report.end()
-
- def terminate(self):
- self._stop.set()
- if self._async:
- self._pool.terminate()
- self._pool.join()
- self._m.shutdown()
+ def __init__(self, testdir, basedir, logdir_rel, concurrency):
+ self._log = multiprocessing.get_logger()
+ self.testdir = testdir
+ self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1)
+ self.logdir = self._report.testdir
+ # seems needed for python 2.x to handle keyboard interrupt
+ self._stop = multiprocessing.Event()
+ self._async = concurrency > 1
+ if not self._async:
+ self._pool = None
+ global stop
+ global ports
+ stop = self._stop
+ ports = PortAllocator()
+ else:
+ self._m = multiprocessing.managers.BaseManager()
+ self._m.register('ports', PortAllocator)
+ self._m.start()
+ self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
+ self._log.debug(
+ 'TestDispatcher started with %d concurrent jobs' % concurrency)
+
+ def _pool_init(self, address):
+ global stop
+ global m
+ global ports
+ stop = self._stop
+ m = multiprocessing.managers.BaseManager(address)
+ m.connect()
+ ports = m.ports()
+
+ def _dispatch_sync(self, test, cont, max_retry):
+ r = run_test(self.testdir, self.logdir, test, max_retry, False)
+ cont(r)
+ return NonAsyncResult(r)
+
+ def _dispatch_async(self, test, cont, max_retry):
+ self._log.debug('_dispatch_async')
+ return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont)
+
+ def dispatch(self, test, max_retry):
+ index = self._report.add_test(test)
+
+ def cont(result):
+ if not self._stop.is_set():
+ retry_count, returncode = result
+ self._log.debug('freeing port')
+ self._log.debug('adding result')
+ self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count)
+ self._log.debug('finish continuation')
+ fn = self._dispatch_async if self._async else self._dispatch_sync
+ return fn(test, cont, max_retry)
+
+ def wait(self):
+ if self._async:
+ self._pool.close()
+ self._pool.join()
+ self._m.shutdown()
+ return self._report.end()
+
+ def terminate(self):
+ self._stop.set()
+ if self._async:
+ self._pool.terminate()
+ self._pool.join()
+ self._m.shutdown()
diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py
index fc90f7f30..dcc8a9416 100644
--- a/test/crossrunner/test.py
+++ b/test/crossrunner/test.py
@@ -26,118 +26,118 @@ from .util import merge_dict
def domain_socket_path(port):
- return '/tmp/ThriftTest.thrift.%d' % port
+ return '/tmp/ThriftTest.thrift.%d' % port
class TestProgram(object):
- def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None,
- extra_args=[], extra_args2=[], join_args=False, **kwargs):
- self.kind = kind
- self.name = name
- self.protocol = protocol
- self.transport = transport
- self.socket = socket
- self.workdir = workdir
- self.command = None
- self._base_command = self._fix_cmd_path(command)
- if env:
- self.env = copy.copy(os.environ)
- self.env.update(env)
- else:
- self.env = os.environ
- self._extra_args = extra_args
- self._extra_args2 = extra_args2
- self._join_args = join_args
-
- def _fix_cmd_path(self, cmd):
- # if the arg is a file in the current directory, make it path
- def abs_if_exists(arg):
- p = path_join(self.workdir, arg)
- return p if os.path.exists(p) else arg
-
- if cmd[0] == 'python':
- cmd[0] = sys.executable
- else:
- cmd[0] = abs_if_exists(cmd[0])
- return cmd
-
- def _socket_args(self, socket, port):
- return {
- 'ip-ssl': ['--ssl'],
- 'domain': ['--domain-socket=%s' % domain_socket_path(port)],
- 'abstract': ['--abstract-namespace', '--domain-socket=%s' % domain_socket_path(port)],
- }.get(socket, None)
-
- def build_command(self, port):
- cmd = copy.copy(self._base_command)
- args = copy.copy(self._extra_args2)
- args.append('--protocol=' + self.protocol)
- args.append('--transport=' + self.transport)
- socket_args = self._socket_args(self.socket, port)
- if socket_args:
- args += socket_args
- args.append('--port=%d' % port)
- if self._join_args:
- cmd.append('%s' % " ".join(args))
- else:
- cmd.extend(args)
- if self._extra_args:
- cmd.extend(self._extra_args)
- self.command = cmd
- return self.command
+ def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None,
+ extra_args=[], extra_args2=[], join_args=False, **kwargs):
+ self.kind = kind
+ self.name = name
+ self.protocol = protocol
+ self.transport = transport
+ self.socket = socket
+ self.workdir = workdir
+ self.command = None
+ self._base_command = self._fix_cmd_path(command)
+ if env:
+ self.env = copy.copy(os.environ)
+ self.env.update(env)
+ else:
+ self.env = os.environ
+ self._extra_args = extra_args
+ self._extra_args2 = extra_args2
+ self._join_args = join_args
+
+ def _fix_cmd_path(self, cmd):
+ # if the arg is a file in the current directory, make it path
+ def abs_if_exists(arg):
+ p = path_join(self.workdir, arg)
+ return p if os.path.exists(p) else arg
+
+ if cmd[0] == 'python':
+ cmd[0] = sys.executable
+ else:
+ cmd[0] = abs_if_exists(cmd[0])
+ return cmd
+
+ def _socket_args(self, socket, port):
+ return {
+ 'ip-ssl': ['--ssl'],
+ 'domain': ['--domain-socket=%s' % domain_socket_path(port)],
+ 'abstract': ['--abstract-namespace', '--domain-socket=%s' % domain_socket_path(port)],
+ }.get(socket, None)
+
+ def build_command(self, port):
+ cmd = copy.copy(self._base_command)
+ args = copy.copy(self._extra_args2)
+ args.append('--protocol=' + self.protocol)
+ args.append('--transport=' + self.transport)
+ socket_args = self._socket_args(self.socket, port)
+ if socket_args:
+ args += socket_args
+ args.append('--port=%d' % port)
+ if self._join_args:
+ cmd.append('%s' % " ".join(args))
+ else:
+ cmd.extend(args)
+ if self._extra_args:
+ cmd.extend(self._extra_args)
+ self.command = cmd
+ return self.command
class TestEntry(object):
- def __init__(self, testdir, server, client, delay, timeout, **kwargs):
- self.testdir = testdir
- self._log = multiprocessing.get_logger()
- self._config = kwargs
- self.protocol = kwargs['protocol']
- self.transport = kwargs['transport']
- self.socket = kwargs['socket']
- srv_dict = self._fix_workdir(merge_dict(self._config, server))
- cli_dict = self._fix_workdir(merge_dict(self._config, client))
- cli_dict['extra_args2'] = srv_dict.pop('remote_args', [])
- srv_dict['extra_args2'] = cli_dict.pop('remote_args', [])
- self.server = TestProgram('server', **srv_dict)
- self.client = TestProgram('client', **cli_dict)
- self.delay = delay
- self.timeout = timeout
- self._name = None
- # results
- self.success = None
- self.as_expected = None
- self.returncode = None
- self.expired = False
- self.retry_count = 0
-
- def _fix_workdir(self, config):
- key = 'workdir'
- path = config.get(key, None)
- if not path:
- path = self.testdir
- if os.path.isabs(path):
- path = os.path.realpath(path)
- else:
- path = os.path.realpath(path_join(self.testdir, path))
- config.update({key: path})
- return config
-
- @classmethod
- def get_name(cls, server, client, proto, trans, sock, *args):
- return '%s-%s_%s_%s-%s' % (server, client, proto, trans, sock)
-
- @property
- def name(self):
- if not self._name:
- self._name = self.get_name(
- self.server.name, self.client.name, self.protocol, self.transport, self.socket)
- return self._name
-
- @property
- def transport_name(self):
- return '%s-%s' % (self.transport, self.socket)
+ def __init__(self, testdir, server, client, delay, timeout, **kwargs):
+ self.testdir = testdir
+ self._log = multiprocessing.get_logger()
+ self._config = kwargs
+ self.protocol = kwargs['protocol']
+ self.transport = kwargs['transport']
+ self.socket = kwargs['socket']
+ srv_dict = self._fix_workdir(merge_dict(self._config, server))
+ cli_dict = self._fix_workdir(merge_dict(self._config, client))
+ cli_dict['extra_args2'] = srv_dict.pop('remote_args', [])
+ srv_dict['extra_args2'] = cli_dict.pop('remote_args', [])
+ self.server = TestProgram('server', **srv_dict)
+ self.client = TestProgram('client', **cli_dict)
+ self.delay = delay
+ self.timeout = timeout
+ self._name = None
+ # results
+ self.success = None
+ self.as_expected = None
+ self.returncode = None
+ self.expired = False
+ self.retry_count = 0
+
+ def _fix_workdir(self, config):
+ key = 'workdir'
+ path = config.get(key, None)
+ if not path:
+ path = self.testdir
+ if os.path.isabs(path):
+ path = os.path.realpath(path)
+ else:
+ path = os.path.realpath(path_join(self.testdir, path))
+ config.update({key: path})
+ return config
+
+ @classmethod
+ def get_name(cls, server, client, proto, trans, sock, *args):
+ return '%s-%s_%s_%s-%s' % (server, client, proto, trans, sock)
+
+ @property
+ def name(self):
+ if not self._name:
+ self._name = self.get_name(
+ self.server.name, self.client.name, self.protocol, self.transport, self.socket)
+ return self._name
+
+ @property
+ def transport_name(self):
+ return '%s-%s' % (self.transport, self.socket)
def test_name(server, client, protocol, transport, socket, **kwargs):
- return TestEntry.get_name(server['name'], client['name'], protocol, transport, socket)
+ return TestEntry.get_name(server['name'], client['name'], protocol, transport, socket)
diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py
index 750ed475e..e2d195a22 100644
--- a/test/crossrunner/util.py
+++ b/test/crossrunner/util.py
@@ -21,11 +21,11 @@ import copy
def merge_dict(base, update):
- """Update dict concatenating list values"""
- res = copy.deepcopy(base)
- for k, v in list(update.items()):
- if k in list(res.keys()) and isinstance(v, list):
- res[k].extend(v)
- else:
- res[k] = v
- return res
+ """Update dict concatenating list values"""
+ res = copy.deepcopy(base)
+ for k, v in list(update.items()):
+ if k in list(res.keys()) and isinstance(v, list):
+ res[k].extend(v)
+ else:
+ res[k] = v
+ return res