diff options
author | Nobuaki Sukegawa <nsuke@apache.org> | 2016-02-18 01:41:46 +0900 |
---|---|---|
committer | Nobuaki Sukegawa <nsuke@apache.org> | 2016-02-20 00:18:43 +0900 |
commit | 59310f5dd065681db9dc2ab13fda289d8fa41922 (patch) | |
tree | ca3f0a75e4e73e5b1a58ca104df4f221ba0e6240 /test/crossrunner | |
parent | b16a0a94fc9498102b5d12632d4501d368ee69ff (diff) | |
download | thrift-59310f5dd065681db9dc2ab13fda289d8fa41922.tar.gz |
THRIFT-3642 Speed up cross test runner
This closes #873
Diffstat (limited to 'test/crossrunner')
-rw-r--r-- | test/crossrunner/collect.py | 4 | ||||
-rw-r--r-- | test/crossrunner/run.py | 66 |
2 files changed, 52 insertions, 18 deletions
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py index d7594cb62..03b0c36c9 100644 --- a/test/crossrunner/collect.py +++ b/test/crossrunner/collect.py @@ -50,7 +50,7 @@ VALID_JSON_KEYS = [ 'env', # additional environmental variable ] -DEFAULT_DELAY = 1 +DEFAULT_MAX_DELAY = 5 DEFAULT_TIMEOUT = 5 @@ -137,7 +137,7 @@ def _do_collect_tests(servers, clients): yield { 'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}), 'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}), - 'delay': maybe_max('delay', sv, cl, DEFAULT_DELAY), + 'delay': maybe_max('delay', sv, cl, DEFAULT_MAX_DELAY), 'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT), 'protocol': proto, 'transport': trans, diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py index 18c162357..4b4eb0aac 100644 --- a/test/crossrunner/run.py +++ b/test/crossrunner/run.py @@ -48,6 +48,7 @@ class ExecutionContext(object): self.timer = None self.expired = False self.killed = False + self.proc = None def _expire(self): self._log.info('Timeout') @@ -123,8 +124,31 @@ def exec_context(port, logdir, test, prog): def run_test(testdir, logdir, test_dict, max_retry, async=True): + logger = multiprocessing.get_logger() + + def ensure_socket_open(proc, port, max_delay): + sleeped = 0.1 + time.sleep(sleeped) + sock4 = socket.socket() + sock6 = socket.socket(family=socket.AF_INET6) + sleep_step = 0.2 + try: + while sock4.connect_ex(('127.0.0.1', port)) and sock6.connect_ex(('::1', port)): + if proc.poll() is not None: + logger.warn('server process is exited') + return False + if sleeped > max_delay: + logger.warn('sleeped for %f seconds but server port is not open' % sleeped) + return False + time.sleep(sleep_step) + sleeped += sleep_step + logger.debug('waited %f sec for server port open' % sleeped) + return True + finally: + sock4.close() + sock6.close() + try: - logger = multiprocessing.get_logger() max_bind_retry = 3 retry_count = 0 bind_retry_count = 0 @@ -141,13 +165,18 @@ def run_test(testdir, logdir, test_dict, max_retry, async=True): logger.debug('Starting server') with sv.start(): - if test.delay > 0: - logger.debug('Delaying client for %.2f seconds' % test.delay) - time.sleep(test.delay) + if test.socket in ('domain', 'abstract'): + time.sleep(0.1) + else: + if not ensure_socket_open(sv.proc, port, test.delay): + break connect_retry_count = 0 - max_connect_retry = 10 + max_connect_retry = 3 connect_retry_wait = 0.5 while True: + if sv.proc.poll() is not None: + logger.info('not starting client because server process is absent') + break logger.debug('Starting client') cl.start(test.timeout) logger.debug('Waiting client') @@ -168,27 +197,27 @@ def run_test(testdir, logdir, test_dict, max_retry, async=True): else: if cl.expired: result = RESULT_TIMEOUT - elif not sv.killed and cl.proc.returncode == 0: - # Server should be alive at the end. - result = RESULT_ERROR else: - result = cl.proc.returncode + result = cl.proc.returncode if cl.proc else RESULT_ERROR + if not sv.killed: + # Server died without being killed. + result |= RESULT_ERROR if result == 0 or retry_count >= max_retry: return (retry_count, result) else: logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name) retry_count += 1 - except (KeyboardInterrupt, SystemExit): - logger.info('Interrupted execution') + except Exception: if not async: raise - stop.set() - return None + logger.warn('Error executing [%s]', test.name, exc_info=True) + return (retry_count, RESULT_ERROR) except: + logger.info('Interrupted execution', exc_info=True) if not async: raise - logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info()) + stop.set() return (retry_count, RESULT_ERROR) @@ -202,7 +231,8 @@ class PortAllocator(object): def _get_tcp_port(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('127.0.0.1', 0)) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('', 0)) port = sock.getsockname()[1] self._lock.acquire() try: @@ -322,7 +352,11 @@ class TestDispatcher(object): def cont(result): if not self._stop.is_set(): - retry_count, returncode = result + if result and len(result) == 2: + retry_count, returncode = result + else: + retry_count = 0 + returncode = RESULT_ERROR self._log.debug('freeing port') self._log.debug('adding result') self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count) |