summaryrefslogtreecommitdiff
path: root/test/crossrunner
diff options
context:
space:
mode:
authorNobuaki Sukegawa <nsuke@apache.org>2016-02-18 01:41:46 +0900
committerNobuaki Sukegawa <nsuke@apache.org>2016-02-20 00:18:43 +0900
commit59310f5dd065681db9dc2ab13fda289d8fa41922 (patch)
treeca3f0a75e4e73e5b1a58ca104df4f221ba0e6240 /test/crossrunner
parentb16a0a94fc9498102b5d12632d4501d368ee69ff (diff)
downloadthrift-59310f5dd065681db9dc2ab13fda289d8fa41922.tar.gz
THRIFT-3642 Speed up cross test runner
This closes #873
Diffstat (limited to 'test/crossrunner')
-rw-r--r--test/crossrunner/collect.py4
-rw-r--r--test/crossrunner/run.py66
2 files changed, 52 insertions, 18 deletions
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py
index d7594cb62..03b0c36c9 100644
--- a/test/crossrunner/collect.py
+++ b/test/crossrunner/collect.py
@@ -50,7 +50,7 @@ VALID_JSON_KEYS = [
'env', # additional environmental variable
]
-DEFAULT_DELAY = 1
+DEFAULT_MAX_DELAY = 5
DEFAULT_TIMEOUT = 5
@@ -137,7 +137,7 @@ def _do_collect_tests(servers, clients):
yield {
'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}),
'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}),
- 'delay': maybe_max('delay', sv, cl, DEFAULT_DELAY),
+ 'delay': maybe_max('delay', sv, cl, DEFAULT_MAX_DELAY),
'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT),
'protocol': proto,
'transport': trans,
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index 18c162357..4b4eb0aac 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -48,6 +48,7 @@ class ExecutionContext(object):
self.timer = None
self.expired = False
self.killed = False
+ self.proc = None
def _expire(self):
self._log.info('Timeout')
@@ -123,8 +124,31 @@ def exec_context(port, logdir, test, prog):
def run_test(testdir, logdir, test_dict, max_retry, async=True):
+ logger = multiprocessing.get_logger()
+
+ def ensure_socket_open(proc, port, max_delay):
+ sleeped = 0.1
+ time.sleep(sleeped)
+ sock4 = socket.socket()
+ sock6 = socket.socket(family=socket.AF_INET6)
+ sleep_step = 0.2
+ try:
+ while sock4.connect_ex(('127.0.0.1', port)) and sock6.connect_ex(('::1', port)):
+ if proc.poll() is not None:
+ logger.warn('server process is exited')
+ return False
+ if sleeped > max_delay:
+ logger.warn('sleeped for %f seconds but server port is not open' % sleeped)
+ return False
+ time.sleep(sleep_step)
+ sleeped += sleep_step
+ logger.debug('waited %f sec for server port open' % sleeped)
+ return True
+ finally:
+ sock4.close()
+ sock6.close()
+
try:
- logger = multiprocessing.get_logger()
max_bind_retry = 3
retry_count = 0
bind_retry_count = 0
@@ -141,13 +165,18 @@ def run_test(testdir, logdir, test_dict, max_retry, async=True):
logger.debug('Starting server')
with sv.start():
- if test.delay > 0:
- logger.debug('Delaying client for %.2f seconds' % test.delay)
- time.sleep(test.delay)
+ if test.socket in ('domain', 'abstract'):
+ time.sleep(0.1)
+ else:
+ if not ensure_socket_open(sv.proc, port, test.delay):
+ break
connect_retry_count = 0
- max_connect_retry = 10
+ max_connect_retry = 3
connect_retry_wait = 0.5
while True:
+ if sv.proc.poll() is not None:
+ logger.info('not starting client because server process is absent')
+ break
logger.debug('Starting client')
cl.start(test.timeout)
logger.debug('Waiting client')
@@ -168,27 +197,27 @@ def run_test(testdir, logdir, test_dict, max_retry, async=True):
else:
if cl.expired:
result = RESULT_TIMEOUT
- elif not sv.killed and cl.proc.returncode == 0:
- # Server should be alive at the end.
- result = RESULT_ERROR
else:
- result = cl.proc.returncode
+ result = cl.proc.returncode if cl.proc else RESULT_ERROR
+ if not sv.killed:
+ # Server died without being killed.
+ result |= RESULT_ERROR
if result == 0 or retry_count >= max_retry:
return (retry_count, result)
else:
logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name)
retry_count += 1
- except (KeyboardInterrupt, SystemExit):
- logger.info('Interrupted execution')
+ except Exception:
if not async:
raise
- stop.set()
- return None
+ logger.warn('Error executing [%s]', test.name, exc_info=True)
+ return (retry_count, RESULT_ERROR)
except:
+ logger.info('Interrupted execution', exc_info=True)
if not async:
raise
- logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info())
+ stop.set()
return (retry_count, RESULT_ERROR)
@@ -202,7 +231,8 @@ class PortAllocator(object):
def _get_tcp_port(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(('127.0.0.1', 0))
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('', 0))
port = sock.getsockname()[1]
self._lock.acquire()
try:
@@ -322,7 +352,11 @@ class TestDispatcher(object):
def cont(result):
if not self._stop.is_set():
- retry_count, returncode = result
+ if result and len(result) == 2:
+ retry_count, returncode = result
+ else:
+ retry_count = 0
+ returncode = RESULT_ERROR
self._log.debug('freeing port')
self._log.debug('adding result')
self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count)