summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Shao <terrorjack@type.dance>2023-02-06 13:24:50 +0000
committerMarge Bot <ben+marge-bot@smart-cactus.org>2023-02-08 18:42:16 -0500
commitca6673e3cab496bbeed2ced47b40bcf1e0d0b3cd (patch)
treead8ef55b3889d0df1fd452c7e7dad7e700f23786
parent633f2799e697ddaf63c4c91820c0b5a7c9b17db7 (diff)
downloadhaskell-ca6673e3cab496bbeed2ced47b40bcf1e0d0b3cd.tar.gz
testsuite: use concurrent.futures.ThreadPoolExecutor in the driver
The testsuite driver used to create one thread per test case, and explicitly use semaphore and locks for rate limiting and synchronization. This is a bad practice in any language, and occasionally may result in livelock conditions (e.g. #22889). This patch uses concurrent.futures.ThreadPoolExecutor for scheduling test case runs, which is simpler and more robust.
-rw-r--r--testsuite/driver/runtests.py36
-rw-r--r--testsuite/driver/testlib.py24
-rw-r--r--testsuite/driver/testutil.py20
3 files changed, 24 insertions, 56 deletions
diff --git a/testsuite/driver/runtests.py b/testsuite/driver/runtests.py
index d49ada96f4..5cd8b9697f 100644
--- a/testsuite/driver/runtests.py
+++ b/testsuite/driver/runtests.py
@@ -26,7 +26,9 @@ from pathlib import Path
# So we import it here first, so that the testsuite doesn't appear to fail.
import subprocess
-from testutil import getStdout, Watcher, str_warn, str_info, print_table, shorten_metric_name
+from concurrent.futures import ThreadPoolExecutor
+
+from testutil import getStdout, str_warn, str_info, print_table, shorten_metric_name
from testglobals import getConfig, ghc_env, getTestRun, TestConfig, \
TestOptions, brokens, PerfMetric
from my_typing import TestName
@@ -480,26 +482,28 @@ if config.list_broken:
print('WARNING:', len(t.framework_failures), 'framework failures!')
print('')
else:
- # completion watcher
- watcher = Watcher(len(parallelTests))
-
# Now run all the tests
try:
- for oneTest in parallelTests:
- if stopping():
- break
- oneTest(watcher)
+ with ThreadPoolExecutor(max_workers=config.threads) as executor:
+ for oneTest in parallelTests:
+ if stopping():
+ break
+ oneTest(executor)
- # wait for parallel tests to finish
- if not stopping():
- watcher.wait()
+ # wait for parallel tests to finish
+ if not stopping():
+ executor.shutdown(wait=True)
# Run the following tests purely sequential
- config.threads = 1
- for oneTest in aloneTests:
- if stopping():
- break
- oneTest(watcher)
+ with ThreadPoolExecutor(max_workers=1) as executor:
+ for oneTest in aloneTests:
+ if stopping():
+ break
+ oneTest(executor)
+
+ if not stopping():
+ executor.shutdown(wait=True)
+
except KeyboardInterrupt:
pass
diff --git a/testsuite/driver/testlib.py b/testsuite/driver/testlib.py
index 10c0314afc..1c5f5fcaf7 100644
--- a/testsuite/driver/testlib.py
+++ b/testsuite/driver/testlib.py
@@ -36,9 +36,7 @@ from my_typing import *
from threading import Timer
from collections import OrderedDict
-global pool_sema
import threading
-pool_sema = threading.BoundedSemaphore(value=config.threads)
global wantToStop
wantToStop = False
@@ -1014,13 +1012,8 @@ parallelTests = []
aloneTests = []
allTestNames = set([]) # type: Set[TestName]
-def runTest(watcher, opts, name: TestName, func, args):
- pool_sema.acquire()
- t = threading.Thread(target=test_common_thread,
- name=name,
- args=(watcher, name, opts, func, args))
- t.daemon = False
- t.start()
+def runTest(executor, opts, name: TestName, func, args):
+ return executor.submit(test_common_work, name, opts, func, args)
# name :: String
# setup :: [TestOpt] -> IO ()
@@ -1058,19 +1051,13 @@ def test(name: TestName,
if name in config.broken_tests:
myTestOpts.expect = 'fail'
- thisTest = lambda watcher: runTest(watcher, myTestOpts, name, func, args)
+ thisTest = lambda executor: runTest(executor, myTestOpts, name, func, args)
if myTestOpts.alone:
aloneTests.append(thisTest)
else:
parallelTests.append(thisTest)
allTestNames.add(name)
-def test_common_thread(watcher, name, opts, func, args):
- try:
- test_common_work(watcher, name, opts, func, args)
- finally:
- pool_sema.release()
-
def get_package_cache_timestamp() -> float:
if config.package_conf_cache_file is None:
return 0.0
@@ -1084,8 +1071,7 @@ do_not_copy = ('.hi', '.o', '.dyn_hi'
, '.dyn_o', '.out'
,'.hi-boot', '.o-boot') # 12112
-def test_common_work(watcher: testutil.Watcher,
- name: TestName, opts,
+def test_common_work(name: TestName, opts,
func, args) -> None:
try:
t.total_tests += 1
@@ -1204,8 +1190,6 @@ def test_common_work(watcher: testutil.Watcher,
except Exception as e:
framework_fail(name, None, 'Unhandled exception: ' + str(e))
- finally:
- watcher.notify()
def do_test(name: TestName,
way: WayName,
diff --git a/testsuite/driver/testutil.py b/testsuite/driver/testutil.py
index e7b6bf2948..f2c63c5a2d 100644
--- a/testsuite/driver/testutil.py
+++ b/testsuite/driver/testutil.py
@@ -5,8 +5,6 @@ import tempfile
from pathlib import Path, PurePath
from term_color import Color, colored
-import threading
-
from my_typing import *
@@ -125,24 +123,6 @@ else:
else:
os.symlink(str(src), str(dst))
-class Watcher(object):
- def __init__(self, count: int) -> None:
- self.pool = count
- self.evt = threading.Event()
- self.sync_lock = threading.Lock()
- if count <= 0:
- self.evt.set()
-
- def wait(self):
- self.evt.wait()
-
- def notify(self):
- self.sync_lock.acquire()
- self.pool -= 1
- if self.pool <= 0:
- self.evt.set()
- self.sync_lock.release()
-
def memoize(f):
"""
A decorator to memoize a nullary function.