summaryrefslogtreecommitdiff
path: root/testsuite/driver/testlib.py
diff options
context:
space:
mode:
authorBen Gamari <ben@smart-cactus.org>2019-06-19 13:25:07 -0400
committerMarge Bot <ben+marge-bot@smart-cactus.org>2019-06-25 08:37:46 -0400
commitc346585b5f5054a85e48dc546e198c5be124340f (patch)
tree7a3d30e456c2811baf64010f059cb8ff6c7dac84 /testsuite/driver/testlib.py
parentebd63e8de30470ccf8b65c11f0fc82705960b5cf (diff)
downloadhaskell-c346585b5f5054a85e48dc546e198c5be124340f.tar.gz
testsuite: A major revamp of the driver
This tries to put the testsuite driver into a slightly more maintainable condition: * Add type annotations where easily done * Use pathlib.Path instead of str paths * Make it pass the mypy typechecker
Diffstat (limited to 'testsuite/driver/testlib.py')
-rw-r--r--testsuite/driver/testlib.py607
1 files changed, 324 insertions, 283 deletions
diff --git a/testsuite/driver/testlib.py b/testsuite/driver/testlib.py
index e37bf5d49b..ea7f61d8f7 100644
--- a/testsuite/driver/testlib.py
+++ b/testsuite/driver/testlib.py
@@ -14,17 +14,22 @@ import copy
import glob
import sys
from math import ceil, trunc
-from pathlib import PurePath
+from pathlib import Path, PurePath
import collections
import subprocess
-from testglobals import config, ghc_env, default_testopts, brokens, t, TestResult
-from testutil import strip_quotes, lndir, link_or_copy_file, passed, failBecause, failBecauseStderr, str_fail, str_pass, testing_metrics
+from testglobals import config, ghc_env, default_testopts, brokens, t, \
+ TestRun, TestResult, TestOptions
+from testutil import strip_quotes, lndir, link_or_copy_file, passed, \
+ failBecause, failBecauseStderr, str_fail, str_pass, testing_metrics
+import testutil
from cpu_features import have_cpu_feature
import perf_notes as Perf
from perf_notes import MetricChange
extra_src_files = {'T4198': ['exitminus1.c']} # TODO: See #12223
+from my_typing import *
+
global pool_sema
if config.use_threads:
import threading
@@ -33,11 +38,15 @@ if config.use_threads:
global wantToStop
wantToStop = False
-def stopNow():
+# I have no idea what the type of this is
+global thisdir_settings
+thisdir_settings = None # type: ignore
+
+def stopNow() -> None:
global wantToStop
wantToStop = True
-def stopping():
+def stopping() -> bool:
return wantToStop
@@ -50,20 +59,20 @@ if config.use_threads:
else:
class TestOpts_Local:
pass
- testopts_local = TestOpts_Local()
+ testopts_local = TestOpts_Local() # type: ignore
-def getTestOpts():
+def getTestOpts() -> TestOptions:
return testopts_local.x
-def setLocalTestOpts(opts):
+def setLocalTestOpts(opts: TestOptions) -> None:
global testopts_local
- testopts_local.x=opts
+ testopts_local.x = opts
-def isCompilerStatsTest():
+def isCompilerStatsTest() -> bool:
opts = getTestOpts()
return bool(opts.is_compiler_stats_test)
-def isStatsTest():
+def isStatsTest() -> bool:
opts = getTestOpts()
return opts.is_stats_test
@@ -133,9 +142,9 @@ def stage1(name, opts):
# Cache the results of looking to see if we have a library or not.
# This makes quite a difference, especially on Windows.
-have_lib_cache = {}
+have_lib_cache = {} # type: Dict[str, bool]
-def have_library(lib):
+def have_library(lib: str) -> bool:
""" Test whether the given library is available """
if lib in have_lib_cache:
got_it = have_lib_cache[lib]
@@ -213,31 +222,31 @@ def _use_specs( name, opts, specs ):
# -----
-def expect_fail_for( ways ):
+def expect_fail_for( ways: List[WayName] ):
assert isinstance(ways, list)
return lambda name, opts, w=ways: _expect_fail_for( name, opts, w )
def _expect_fail_for( name, opts, ways ):
opts.expect_fail_for = ways
-def expect_broken( bug ):
+def expect_broken( bug: IssueNumber ):
# This test is a expected not to work due to the indicated trac bug
# number.
return lambda name, opts, b=bug: _expect_broken (name, opts, b )
-def _expect_broken( name, opts, bug ):
+def _expect_broken( name: TestName, opts, bug: IssueNumber ):
record_broken(name, opts, bug)
opts.expect = 'fail';
-def expect_broken_for( bug, ways ):
+def expect_broken_for( bug: IssueNumber, ways: List[WayName] ):
assert isinstance(ways, list)
return lambda name, opts, b=bug, w=ways: _expect_broken_for( name, opts, b, w )
-def _expect_broken_for( name, opts, bug, ways ):
+def _expect_broken_for( name: TestName, opts, bug: IssueNumber, ways ):
record_broken(name, opts, bug)
opts.expect_fail_for = ways
-def record_broken(name, opts, bug):
+def record_broken(name: TestName, opts, bug: IssueNumber):
me = (bug, opts.testdir, name)
if not me in brokens:
brokens.append(me)
@@ -249,7 +258,7 @@ def _expect_pass(way):
# -----
-def fragile( bug ):
+def fragile( bug: IssueNumber ):
"""
Indicates that the test should be skipped due to fragility documented in
the given ticket.
@@ -260,7 +269,7 @@ def fragile( bug ):
return helper
-def fragile_for( bug, ways ):
+def fragile_for( bug: IssueNumber, ways: List[WayName] ):
"""
Indicates that the test should be skipped due to fragility in the given
test ways as documented in the given ticket.
@@ -273,7 +282,7 @@ def fragile_for( bug, ways ):
# -----
-def omit_ways( ways ):
+def omit_ways( ways: List[WayName] ):
assert isinstance(ways, list)
return lambda name, opts, w=ways: _omit_ways( name, opts, w )
@@ -283,7 +292,7 @@ def _omit_ways( name, opts, ways ):
# -----
-def only_ways( ways ):
+def only_ways( ways: List[WayName] ):
assert isinstance(ways, list)
return lambda name, opts, w=ways: _only_ways( name, opts, w )
@@ -292,7 +301,7 @@ def _only_ways( name, opts, ways ):
# -----
-def extra_ways( ways ):
+def extra_ways( ways: List[WayName] ):
assert isinstance(ways, list)
return lambda name, opts, w=ways: _extra_ways( name, opts, w )
@@ -309,13 +318,13 @@ def _set_stdin( name, opts, f ):
# -----
-def exit_code( val ):
+def exit_code( val: int ):
return lambda name, opts, v=val: _exit_code(name, opts, v);
def _exit_code( name, opts, v ):
opts.exit_code = v
-def signal_exit_code( val ):
+def signal_exit_code( val: int ):
if opsys('solaris2'):
return exit_code( val )
else:
@@ -328,13 +337,13 @@ def signal_exit_code( val ):
# -----
-def compile_timeout_multiplier( val ):
+def compile_timeout_multiplier( val: float ):
return lambda name, opts, v=val: _compile_timeout_multiplier(name, opts, v)
def _compile_timeout_multiplier( name, opts, v ):
opts.compile_timeout_multiplier = v
-def run_timeout_multiplier( val ):
+def run_timeout_multiplier( val: float ):
return lambda name, opts, v=val: _run_timeout_multiplier(name, opts, v)
def _run_timeout_multiplier( name, opts, v ):
@@ -397,7 +406,7 @@ def collect_stats(metric='all', deviation=20):
# measures the performance numbers of the compiler.
# As this is a fairly rare case in the testsuite, it defaults to false to
# indicate that it is a 'normal' performance test.
-def _collect_stats(name, opts, metrics, deviation, is_compiler_stats_test=False):
+def _collect_stats(name: TestName, opts, metrics, deviation, is_compiler_stats_test=False):
if not re.match('^[0-9]*[a-zA-Z][a-zA-Z0-9._-]*$', name):
failBecause('This test has an invalid name.')
@@ -426,7 +435,7 @@ def _collect_stats(name, opts, metrics, deviation, is_compiler_stats_test=False)
# -----
-def when(b, f):
+def when(b: bool, f):
# When list_brokens is on, we want to see all expect_broken calls,
# so we always do f
if b or config.list_broken:
@@ -434,10 +443,10 @@ def when(b, f):
else:
return normal
-def unless(b, f):
+def unless(b: bool, f):
return when(not b, f)
-def doing_ghci():
+def doing_ghci() -> bool:
return 'ghci' in config.run_ways
def requires_th(name, opts):
@@ -448,67 +457,67 @@ def requires_th(name, opts):
"""
return when(ghc_dynamic(), omit_ways(['profasm']))
-def ghc_dynamic():
+def ghc_dynamic() -> bool:
return config.ghc_dynamic
-def fast():
+def fast() -> bool:
return config.speed == 2
-def platform( plat ):
+def platform( plat: str ) -> bool:
return config.platform == plat
-def opsys( os ):
+def opsys( os: str ) -> bool:
return config.os == os
-def arch( arch ):
+def arch( arch: str ) -> bool:
return config.arch == arch
-def wordsize( ws ):
+def wordsize( ws: int ) -> bool:
return config.wordsize == str(ws)
-def msys( ):
+def msys( ) -> bool:
return config.msys
-def cygwin( ):
+def cygwin( ) -> bool:
return config.cygwin
-def have_vanilla( ):
+def have_vanilla( ) -> bool:
return config.have_vanilla
-def have_ncg( ):
+def have_ncg( ) -> bool:
return config.have_ncg
-def have_dynamic( ):
+def have_dynamic( ) -> bool:
return config.have_dynamic
-def have_profiling( ):
+def have_profiling( ) -> bool:
return config.have_profiling
-def in_tree_compiler( ):
+def in_tree_compiler( ) -> bool:
return config.in_tree_compiler
-def unregisterised( ):
+def unregisterised( ) -> bool:
return config.unregisterised
-def compiler_profiled( ):
+def compiler_profiled( ) -> bool:
return config.compiler_profiled
-def compiler_debugged( ):
+def compiler_debugged( ) -> bool:
return config.compiler_debugged
-def have_gdb( ):
+def have_gdb( ) -> bool:
return config.have_gdb
-def have_readelf( ):
+def have_readelf( ) -> bool:
return config.have_readelf
-def integer_gmp( ):
+def integer_gmp( ) -> bool:
return have_library("integer-gmp")
-def integer_simple( ):
+def integer_simple( ) -> bool:
return have_library("integer-simple")
-def llvm_build ( ):
+def llvm_build ( ) -> bool:
return config.ghc_built_by_llvm
# ---
@@ -711,10 +720,10 @@ def newTestDir(tempdir, dir):
# Should be equal to entry in toplevel .gitignore.
testdir_suffix = '.run'
-def _newTestDir(name, opts, tempdir, dir):
+def _newTestDir(name: TestName, opts: TestOptions, tempdir, dir):
testdir = os.path.join('', *(p for p in PurePath(dir).parts if p != '..'))
- opts.srcdir = os.path.join(os.getcwd(), dir)
- opts.testdir = os.path.join(tempdir, testdir, name + testdir_suffix)
+ opts.srcdir = Path.cwd() / dir
+ opts.testdir = Path(os.path.join(tempdir, testdir, name + testdir_suffix))
opts.compiler_always_flags = config.compiler_always_flags
# -----------------------------------------------------------------------------
@@ -722,9 +731,9 @@ def _newTestDir(name, opts, tempdir, dir):
parallelTests = []
aloneTests = []
-allTestNames = set([])
+allTestNames = set([]) # type: Set[TestName]
-def runTest(watcher, opts, name, func, args):
+def runTest(watcher, opts, name: TestName, func, args):
if config.use_threads:
pool_sema.acquire()
t = threading.Thread(target=test_common_thread,
@@ -737,15 +746,17 @@ def runTest(watcher, opts, name, func, args):
# name :: String
# setup :: [TestOpt] -> IO ()
-def test(name, setup, func, args):
+def test(name: TestName,
+ setup: "Callable[[List[TestOptions]], None]",
+ func, args) -> None:
global aloneTests
global parallelTests
global allTestNames
global thisdir_settings
if name in allTestNames:
- framework_fail(name, 'duplicate', 'There are multiple tests with this name')
+ framework_fail(name, None, 'There are multiple tests with this name')
if not re.match('^[0-9]*[a-zA-Z][a-zA-Z0-9._-]*$', name):
- framework_fail(name, 'bad_name', 'This test has an invalid name')
+ framework_fail(name, None, 'This test has an invalid name')
if config.run_only_some_tests:
if name not in config.only:
@@ -779,18 +790,20 @@ if config.use_threads:
finally:
pool_sema.release()
-def get_package_cache_timestamp():
- if config.package_conf_cache_file == '':
+def get_package_cache_timestamp() -> float:
+ if config.package_conf_cache_file is None:
return 0.0
else:
try:
- return os.stat(config.package_conf_cache_file).st_mtime
+ return config.package_conf_cache_file.stat().st_mtime
except:
return 0.0
do_not_copy = ('.hi', '.o', '.dyn_hi', '.dyn_o', '.out') # 12112
-def test_common_work(watcher, name, opts, func, args):
+def test_common_work(watcher: testutil.Watcher,
+ name: TestName, opts,
+ func, args) -> None:
try:
t.total_tests += 1
setLocalTestOpts(opts)
@@ -803,12 +816,12 @@ def test_common_work(watcher, name, opts, func, args):
elif func == compile_and_run or func == multimod_compile_and_run:
all_ways = config.run_ways
elif func == ghci_script:
- if 'ghci' in config.run_ways:
- all_ways = ['ghci']
+ if WayName('ghci') in config.run_ways:
+ all_ways = [WayName('ghci')]
else:
all_ways = []
else:
- all_ways = ['normal']
+ all_ways = [WayName('normal')]
# A test itself can request extra ways by setting opts.extra_ways
all_ways = all_ways + [way for way in opts.extra_ways if way not in all_ways]
@@ -817,7 +830,7 @@ def test_common_work(watcher, name, opts, func, args):
ok_way = lambda way: \
not getTestOpts().skip \
- and (getTestOpts().only_ways == None or way in getTestOpts().only_ways) \
+ and (getTestOpts().only_ways is None or way in getTestOpts().only_ways) \
and (config.cmdline_ways == [] or way in config.cmdline_ways) \
and (not (config.skip_perf_tests and isStatsTest())) \
and (not (config.only_perf_tests and not isStatsTest())) \
@@ -846,26 +859,26 @@ def test_common_work(watcher, name, opts, func, args):
# specify all other files that their test depends on (but
# this seems to be necessary for only about 10% of all
# tests).
- files = set(f for f in os.listdir(opts.srcdir)
+ files = set(f for f in os.listdir(str(opts.srcdir))
if f.startswith(name) and not f == name and
not f.endswith(testdir_suffix) and
not os.path.splitext(f)[1] in do_not_copy)
for filename in (opts.extra_files + extra_src_files.get(name, [])):
if filename.startswith('/'):
- framework_fail(name, 'whole-test',
+ framework_fail(name, None,
'no absolute paths in extra_files please: ' + filename)
elif '*' in filename:
# Don't use wildcards in extra_files too much, as
# globbing is slow.
- files.update((os.path.relpath(f, opts.srcdir)
- for f in glob.iglob(in_srcdir(filename))))
+ files.update(str(Path(f).relative_to(opts.srcdir))
+ for f in glob.iglob(str(in_srcdir(filename))))
elif filename:
files.add(filename)
else:
- framework_fail(name, 'whole-test', 'extra_file is empty string')
+ framework_fail(name, None, 'extra_file is empty string')
# Run the required tests...
for way in do_ways:
@@ -885,19 +898,19 @@ def test_common_work(watcher, name, opts, func, args):
try:
cleanup()
except Exception as e:
- framework_fail(name, 'runTest', 'Unhandled exception during cleanup: ' + str(e))
+ framework_fail(name, None, 'Unhandled exception during cleanup: ' + str(e))
package_conf_cache_file_end_timestamp = get_package_cache_timestamp();
if package_conf_cache_file_start_timestamp != package_conf_cache_file_end_timestamp:
- framework_fail(name, 'whole-test', 'Package cache timestamps do not match: ' + str(package_conf_cache_file_start_timestamp) + ' ' + str(package_conf_cache_file_end_timestamp))
+ framework_fail(name, None, 'Package cache timestamps do not match: ' + str(package_conf_cache_file_start_timestamp) + ' ' + str(package_conf_cache_file_end_timestamp))
except Exception as e:
- framework_fail(name, 'runTest', 'Unhandled exception: ' + str(e))
+ framework_fail(name, None, 'Unhandled exception: ' + str(e))
finally:
watcher.notify()
-def do_test(name, way, func, args, files):
+def do_test(name: TestName, way: WayName, func, args, files: Set[str]) -> None:
opts = getTestOpts()
full_name = name + '(' + way + ')'
@@ -917,7 +930,7 @@ def do_test(name, way, func, args, files):
# Clean up prior to the test, so that we can't spuriously conclude
# that it passed on the basis of old run outputs.
cleanup()
- os.makedirs(opts.testdir)
+ os.makedirs(str(opts.testdir))
# Link all source files for this test into a new directory in
# /tmp, and run the test in that directory. This makes it
@@ -928,12 +941,12 @@ def do_test(name, way, func, args, files):
for extra_file in files:
src = in_srcdir(extra_file)
dst = in_testdir(os.path.basename(extra_file.rstrip('/\\')))
- if os.path.isfile(src):
+ if src.is_file():
link_or_copy_file(src, dst)
- elif os.path.isdir(src):
- if os.path.exists(dst):
- shutil.rmtree(dst)
- os.mkdir(dst)
+ elif src.is_dir():
+ if dst.exists():
+ shutil.rmtree(str(dst))
+ dst.mkdir()
lndir(src, dst)
else:
if not config.haddock and os.path.splitext(extra_file)[1] == '.t':
@@ -950,11 +963,10 @@ def do_test(name, way, func, args, files):
# root of the testsuite.
src_makefile = in_srcdir('Makefile')
dst_makefile = in_testdir('Makefile')
- if os.path.exists(src_makefile):
- with io.open(src_makefile, 'r', encoding='utf8') as src:
- makefile = re.sub('TOP=.*', 'TOP=' + config.top, src.read(), 1)
- with io.open(dst_makefile, 'w', encoding='utf8') as dst:
- dst.write(makefile)
+ if src_makefile.exists():
+ makefile = src_makefile.read_text(encoding='UTF-8')
+ makefile = re.sub('TOP=.*', 'TOP=' + config.top, makefile, 1)
+ dst_makefile.write_text(makefile, encoding='UTF-8')
if opts.pre_cmd:
exit_code = runCmd('cd "{0}" && {1}'.format(opts.testdir, override_options(opts.pre_cmd)),
@@ -976,7 +988,7 @@ def do_test(name, way, func, args, files):
except (KeyError, TypeError):
passFail = 'No passFail found'
- directory = re.sub('^\\.[/\\\\]', '', opts.testdir)
+ directory = re.sub('^\\.[/\\\\]', '', str(opts.testdir))
if passFail == 'pass':
if _expect_pass(way):
@@ -994,7 +1006,9 @@ def do_test(name, way, func, args, files):
t.unexpected_stat_failures.append(TestResult(directory, name, reason, way))
else:
if_verbose(1, '*** unexpected failure for %s' % full_name)
- result = TestResult(directory, name, reason, way, stderr=result.get('stderr'))
+ result = TestResult(directory, name, reason, way,
+ stdout=result.get('stdout'),
+ stderr=result.get('stderr'))
t.unexpected_failures.append(result)
else:
if opts.expect == 'missing-lib':
@@ -1016,14 +1030,16 @@ def override_options(pre_cmd):
return pre_cmd
-def framework_fail(name, way, reason):
+def framework_fail(name: Optional[TestName], way: Optional[WayName], reason: str) -> None:
opts = getTestOpts()
- directory = re.sub('^\\.[/\\\\]', '', opts.testdir)
- full_name = name + '(' + way + ')'
+ directory = re.sub('^\\.[/\\\\]', '', str(opts.testdir))
+ full_name = '%s(%s)' % (name, way)
if_verbose(1, '*** framework failure for %s %s ' % (full_name, reason))
- t.framework_failures.append(TestResult(directory, name, reason, way))
+ name2 = name if name is not None else TestName('none')
+ way2 = way if way is not None else WayName('none')
+ t.framework_failures.append(TestResult(directory, name2, reason, way2))
-def framework_warn(name, way, reason):
+def framework_warn(name: TestName, way: WayName, reason: str) -> None:
opts = getTestOpts()
directory = re.sub('^\\.[/\\\\]', '', opts.testdir)
full_name = name + '(' + way + ')'
@@ -1140,8 +1156,8 @@ def do_compile(name, way, should_fail, top_mod, extra_mods, extra_hc_opts, **kwa
whitespace_normaliser=getattr(getTestOpts(),
"whitespace_normaliser",
normalise_whitespace)):
- stderr = open(diff_file_name, 'rb').read()
- os.remove(diff_file_name)
+ stderr = diff_file_name.read_bytes()
+ diff_file_name.unlink()
return failBecauseStderr('stderr mismatch', stderr=stderr )
@@ -1244,24 +1260,24 @@ def metric_dict(name, way, metric, value):
# range_fields: see TestOptions.stats_range_fields
# Returns a pass/fail object. Passes if the stats are withing the expected value ranges.
# This prints the results for the user.
-def check_stats(name, way, stats_file, range_fields):
- head_commit = Perf.commit_hash('HEAD') if Perf.inside_git_repo() else None
+def check_stats(name, way, stats_file, range_fields) -> Any:
+ head_commit = Perf.commit_hash(GitRef('HEAD')) if Perf.inside_git_repo() else None
result = passed()
if range_fields:
try:
- f = open(stats_file)
+ stats_file_contents = stats_file.read_text()
except IOError as e:
return failBecause(str(e))
- stats_file_contents = f.read()
- f.close()
for (metric, baseline_and_dev) in range_fields.items():
field_match = re.search('\("' + metric + '", "([0-9]+)"\)', stats_file_contents)
- if field_match == None:
+ if field_match is None:
print('Failed to find metric: ', metric)
metric_result = failBecause('no such stats metric')
else:
- actual_val = int(field_match.group(1))
+ val = field_match.group(1)
+ assert val is not None
+ actual_val = int(val)
# Store the metric so it can later be stored in a git note.
perf_stat = metric_dict(name, way, metric, actual_val)
@@ -1270,7 +1286,7 @@ def check_stats(name, way, stats_file, range_fields):
# If this is the first time running the benchmark, then pass.
baseline = baseline_and_dev[0](way, head_commit) \
if Perf.inside_git_repo() else None
- if baseline == None:
+ if baseline is None:
metric_result = passed()
change = MetricChange.NewMetric
else:
@@ -1298,13 +1314,14 @@ def extras_build( way, extra_mods, extra_hc_opts ):
for mod, opts in extra_mods:
result = simple_build(mod, way, opts + ' ' + extra_hc_opts, 0, '', 0, 0)
if not (mod.endswith('.hs') or mod.endswith('.lhs')):
- extra_hc_opts += ' ' + replace_suffix(mod, 'o')
+ extra_hc_opts += ' %s' % Path(mod).with_suffix('.o')
if badResult(result):
return result
return {'passFail' : 'pass', 'hc_opts' : extra_hc_opts}
-def simple_build(name, way, extra_hc_opts, should_fail, top_mod, link, addsuf, backpack = False):
+def simple_build(name: TestName, way: WayName,
+ extra_hc_opts, should_fail, top_mod, link, addsuf, backpack = False) -> Any:
opts = getTestOpts()
# Redirect stdout and stderr to the same file
@@ -1378,11 +1395,11 @@ def simple_build(name, way, extra_hc_opts, should_fail, top_mod, link, addsuf, b
if should_fail:
if exit_code == 0:
- stderr_contents = open(actual_stderr_path, 'rb').read()
+ stderr_contents = actual_stderr_path.read_text(encoding='UTF-8', errors='replace')
return failBecauseStderr('exit code 0', stderr_contents)
else:
if exit_code != 0:
- stderr_contents = open(actual_stderr_path, 'rb').read()
+ stderr_contents = actual_stderr_path.read_text(encoding='UTF-8', errors='replace')
return failBecauseStderr('exit code non-0', stderr_contents)
return passed()
@@ -1394,22 +1411,22 @@ def simple_build(name, way, extra_hc_opts, should_fail, top_mod, link, addsuf, b
# from /dev/null. Route output to testname.run.stdout and
# testname.run.stderr. Returns the exit code of the run.
-def simple_run(name, way, prog, extra_run_opts):
+def simple_run(name: TestName, way: WayName, prog: str, extra_run_opts: str) -> Any:
opts = getTestOpts()
# figure out what to use for stdin
if opts.stdin:
- stdin = in_testdir(opts.stdin)
- elif os.path.exists(in_testdir(name, 'stdin')):
- stdin = in_testdir(name, 'stdin')
+ stdin_arg = in_testdir(opts.stdin) # type: Optional[Path]
+ elif in_testdir(name, 'stdin').exists():
+ stdin_arg = in_testdir(name, 'stdin')
else:
- stdin = None
+ stdin_arg = None
- stdout = in_testdir(name, 'run.stdout')
+ stdout_arg = in_testdir(name, 'run.stdout')
if opts.combined_output:
- stderr = subprocess.STDOUT
+ stderr_arg = subprocess.STDOUT # type: Union[int, Path]
else:
- stderr = in_testdir(name, 'run.stderr')
+ stderr_arg = in_testdir(name, 'run.stderr')
my_rts_flags = rts_flags(way)
@@ -1433,7 +1450,7 @@ def simple_run(name, way, prog, extra_run_opts):
cmd = 'cd "{opts.testdir}" && {cmd}'.format(**locals())
# run the command
- exit_code = runCmd(cmd, stdin, stdout, stderr, opts.run_timeout_multiplier)
+ exit_code = runCmd(cmd, stdin_arg, stdout_arg, stderr_arg, opts.run_timeout_multiplier)
# check the exit code
if exit_code != opts.exit_code:
@@ -1444,9 +1461,13 @@ def simple_run(name, way, prog, extra_run_opts):
return failBecause('bad exit code (%d)' % exit_code)
if not (opts.ignore_stderr or stderr_ok(name, way) or opts.combined_output):
- return failBecause('bad stderr')
+ return failBecause('bad stderr',
+ stderr=read_stderr(name),
+ stdout=read_stdout(name))
if not (opts.ignore_stdout or stdout_ok(name, way)):
- return failBecause('bad stdout')
+ return failBecause('bad stdout',
+ stderr=read_stderr(name),
+ stdout=read_stdout(name))
check_hp = '-h' in my_rts_flags and opts.check_hp
check_prof = '-p' in my_rts_flags
@@ -1459,14 +1480,14 @@ def simple_run(name, way, prog, extra_run_opts):
return check_stats(name, way, in_testdir(stats_file), opts.stats_range_fields)
-def rts_flags(way):
+def rts_flags(way: WayName) -> str:
args = config.way_rts_flags.get(way, [])
return '+RTS {0} -RTS'.format(' '.join(args)) if args else ''
# -----------------------------------------------------------------------------
# Run a program in the interpreter and check its output
-def interpreter_run(name, way, extra_hc_opts, top_mod):
+def interpreter_run(name: TestName, way: WayName, extra_hc_opts: List[str], top_mod: str) -> None:
opts = getTestOpts()
stdout = in_testdir(name, 'interp.stdout')
@@ -1474,17 +1495,17 @@ def interpreter_run(name, way, extra_hc_opts, top_mod):
script = in_testdir(name, 'genscript')
if opts.combined_output:
- framework_fail(name, 'unsupported',
+ framework_fail(name, WayName('unsupported'),
'WAY=ghci and combined_output together is not supported')
if (top_mod == ''):
srcname = add_hs_lhs_suffix(name)
else:
- srcname = top_mod
+ srcname = Path(top_mod)
delimiter = '===== program output begins here\n'
- with io.open(script, 'w', encoding='utf8') as f:
+ with script.open('w', encoding='UTF-8') as f:
# set the prog name and command-line args to match the compiled
# environment.
f.write(':set prog ' + name + '\n')
@@ -1500,7 +1521,7 @@ def interpreter_run(name, way, extra_hc_opts, top_mod):
f.write('GHC.TopHandler.runIOFastExit Main.main Prelude.>> Prelude.return ()\n')
stdin = in_testdir(opts.stdin if opts.stdin else add_suffix(name, 'stdin'))
- if os.path.exists(stdin):
+ if stdin.exists():
os.system('cat "{0}" >> "{1}"'.format(stdin, script))
flags = ' '.join(get_compiler_flags() + config.way_flags[way])
@@ -1528,22 +1549,28 @@ def interpreter_run(name, way, extra_hc_opts, top_mod):
print('Wrong exit code for ' + name + '(' + way + ') (expected', getTestOpts().exit_code, ', actual', exit_code, ')')
dump_stdout(name)
dump_stderr(name)
- return failBecause('bad exit code (%d)' % exit_code)
+ return failBecause('bad exit code (%d)' % exit_code,
+ stderr=read_stderr(name),
+ stdout=read_stdout(name))
# ToDo: if the sub-shell was killed by ^C, then exit
if not (opts.ignore_stderr or stderr_ok(name, way)):
- return failBecause('bad stderr')
+ return failBecause('bad stderr',
+ stderr=read_stderr(name),
+ stdout=read_stdout(name))
elif not (opts.ignore_stdout or stdout_ok(name, way)):
- return failBecause('bad stdout')
+ return failBecause('bad stdout',
+ stderr=read_stderr(name),
+ stdout=read_stdout(name))
else:
return passed()
-def split_file(in_fn, delimiter, out1_fn, out2_fn):
+def split_file(in_fn: Path, delimiter: str, out1_fn: Path, out2_fn: Path):
# See Note [Universal newlines].
- with io.open(in_fn, 'r', encoding='utf8', errors='replace', newline=None) as infile:
- with io.open(out1_fn, 'w', encoding='utf8', newline='') as out1:
- with io.open(out2_fn, 'w', encoding='utf8', newline='') as out2:
+ with in_fn.open('r', encoding='utf8', errors='replace', newline=None) as infile:
+ with out1_fn.open('w', encoding='utf8', newline='') as out1:
+ with out2_fn.open('w', encoding='utf8', newline='') as out2:
line = infile.readline()
while re.sub('^\s*','',line) != delimiter and line != '':
out1.write(line)
@@ -1556,7 +1583,7 @@ def split_file(in_fn, delimiter, out1_fn, out2_fn):
# -----------------------------------------------------------------------------
# Utils
-def get_compiler_flags():
+def get_compiler_flags() -> List[str]:
opts = getTestOpts()
flags = copy.copy(opts.compiler_always_flags)
@@ -1568,7 +1595,7 @@ def get_compiler_flags():
return flags
-def stdout_ok(name, way):
+def stdout_ok(name: TestName, way: WayName) -> bool:
actual_stdout_file = add_suffix(name, 'run.stdout')
expected_stdout_file = find_expected_file(name, 'stdout')
@@ -1582,14 +1609,16 @@ def stdout_ok(name, way):
return compare_outputs(way, 'stdout', extra_norm,
expected_stdout_file, actual_stdout_file)
-def dump_stdout( name ):
- with open(in_testdir(name, 'run.stdout'), encoding='utf8') as f:
- str = f.read().strip()
- if str:
- print("Stdout (", name, "):")
- print(str)
+def read_stdout( name: TestName ) -> str:
+ return in_testdir(name, 'run.stdout').read_text(encoding='UTF-8')
+
+def dump_stdout( name: TestName ) -> None:
+ str = read_stdout(name).strip()
+ if str:
+ print("Stdout (", name, "):")
+ print(str)
-def stderr_ok(name, way):
+def stderr_ok(name: TestName, way: WayName) -> bool:
actual_stderr_file = add_suffix(name, 'run.stderr')
expected_stderr_file = find_expected_file(name, 'stderr')
@@ -1598,29 +1627,31 @@ def stderr_ok(name, way):
expected_stderr_file, actual_stderr_file,
whitespace_normaliser=normalise_whitespace)
-def dump_stderr( name ):
- with open(in_testdir(name, 'run.stderr'), encoding='utf8') as f:
- str = f.read().strip()
- if str:
- print("Stderr (", name, "):")
- print(str)
+def read_stderr( name: TestName ) -> str:
+ return in_testdir(name, 'run.stderr').read_text(encoding='UTF-8')
+
+def dump_stderr( name: TestName ) -> None:
+ str = read_stderr(name).strip()
+ if str:
+ print("Stderr (", name, "):")
+ print(str)
-def read_no_crs(file):
- str = ''
+def read_no_crs(f: Path) -> str:
+ s = ''
try:
# See Note [Universal newlines].
- with io.open(file, 'r', encoding='utf8', errors='replace', newline=None) as h:
- str = h.read()
+ with f.open('r', encoding='utf8', errors='replace', newline=None) as h:
+ s = h.read()
except Exception:
# On Windows, if the program fails very early, it seems the
# files stdout/stderr are redirected to may not get created
pass
- return str
+ return s
-def write_file(file, str):
+def write_file(f: Path, s: str) -> None:
# See Note [Universal newlines].
- with io.open(file, 'w', encoding='utf8', newline='') as h:
- h.write(str)
+ with f.open('w', encoding='utf8', newline='') as h:
+ h.write(s)
# Note [Universal newlines]
#
@@ -1632,22 +1663,22 @@ def write_file(file, str):
# Windows style endings to '\n', as it simplifies searching or massaging
# the content.
#
-# Solution: use `io.open` instead of `open`
+# Solution: use `Path.open` instead of `open`
# * when reading: use newline=None to translate '\r\n' to '\n'
# * when writing: use newline='' to not translate '\n' to '\r\n'
#
-# See https://docs.python.org/2/library/io.html#io.open.
+# See https://docs.python.org/3/library/pathlib.html#pathlib.Path.open
#
# This should work with both python2 and python3, and with both mingw*
# as msys2 style Python.
#
-# Do note that io.open returns unicode strings. So we have to specify
+# Do note that Path.open returns unicode strings. So we have to specify
# the expected encoding. But there is at least one file which is not
# valid utf8 (decodingerror002.stdout). Solution: use errors='replace'.
# Another solution would be to open files in binary mode always, and
# operate on bytes.
-def check_hp_ok(name):
+def check_hp_ok(name: TestName) -> bool:
opts = getTestOpts()
# do not qualify for hp2ps because we should be in the right directory
@@ -1658,39 +1689,41 @@ def check_hp_ok(name):
actual_ps_path = in_testdir(name, 'ps')
if hp2psResult == 0:
- if os.path.exists(actual_ps_path):
+ if actual_ps_path.exists():
if gs_working:
gsResult = runCmd(genGSCmd(actual_ps_path))
if (gsResult == 0):
- return (True)
+ return True
else:
print("hp2ps output for " + name + " is not valid PostScript")
- else: return (True) # assume postscript is valid without ghostscript
+ return False
+ else:
+ return True # assume postscript is valid without ghostscript
else:
print("hp2ps did not generate PostScript for " + name)
- return (False)
+ return False
else:
print("hp2ps error when processing heap profile for " + name)
- return(False)
+ return False
-def check_prof_ok(name, way):
+def check_prof_ok(name: TestName, way: WayName) -> bool:
expected_prof_file = find_expected_file(name, 'prof.sample')
expected_prof_path = in_testdir(expected_prof_file)
# Check actual prof file only if we have an expected prof file to
# compare it with.
- if not os.path.exists(expected_prof_path):
+ if not expected_prof_path.exists():
return True
actual_prof_file = add_suffix(name, 'prof')
actual_prof_path = in_testdir(actual_prof_file)
- if not os.path.exists(actual_prof_path):
- print(actual_prof_path + " does not exist")
+ if not actual_prof_path.exists():
+ print("%s does not exist" % actual_prof_path)
return(False)
- if os.path.getsize(actual_prof_path) == 0:
- print(actual_prof_path + " is empty")
+ if actual_prof_path.stat().st_size == 0:
+ print("%s is empty" % actual_prof_path)
return(False)
return compare_outputs(way, 'prof', normalise_prof,
@@ -1701,20 +1734,23 @@ def check_prof_ok(name, way):
# new output. Returns true if output matched or was accepted, false
# otherwise. See Note [Output comparison] for the meaning of the
# normaliser and whitespace_normaliser parameters.
-def compare_outputs(way, kind, normaliser, expected_file, actual_file, diff_file=None,
- whitespace_normaliser=lambda x:x):
+def compare_outputs(way: WayName,
+ kind: str,
+ normaliser: OutputNormalizer,
+ expected_file, actual_file, diff_file=None,
+ whitespace_normaliser: OutputNormalizer=lambda x:x) -> bool:
expected_path = in_srcdir(expected_file)
actual_path = in_testdir(actual_file)
- if os.path.exists(expected_path):
+ if expected_path.exists():
expected_str = normaliser(read_no_crs(expected_path))
# Create the .normalised file in the testdir, not in the srcdir.
expected_normalised_file = add_suffix(expected_file, 'normalised')
expected_normalised_path = in_testdir(expected_normalised_file)
else:
expected_str = ''
- expected_normalised_path = '/dev/null'
+ expected_normalised_path = Path('/dev/null')
actual_raw = read_no_crs(actual_path)
actual_str = normaliser(actual_raw)
@@ -1746,8 +1782,8 @@ def compare_outputs(way, kind, normaliser, expected_file, actual_file, diff_file
actual_normalised_path),
stdout=diff_file,
print_output=True)
- elif diff_file: open(diff_file, 'ab').close() # Make sure the file exists still as
- # we will try to read it later
+ elif diff_file: diff_file.open('ab').close() # Make sure the file exists still as
+ # we will try to read it later
if config.accept and (getTestOpts().expect == 'fail' or
way in getTestOpts().expect_fail_for):
@@ -1769,14 +1805,14 @@ def compare_outputs(way, kind, normaliser, expected_file, actual_file, diff_file
return True
elif config.accept:
if_verbose(1, 'No output. Deleting "{0}".'.format(expected_path))
- os.remove(expected_path)
+ expected_path.unlink()
return True
else:
return False
# Checks that each line from pattern_file is present in actual_file as
# a substring or regex pattern depending on is_substring.
-def grep_output(normaliser, pattern_file, actual_file, is_substring=True):
+def grep_output(normaliser: OutputNormalizer, pattern_file, actual_file, is_substring: bool=True):
expected_path = in_srcdir(pattern_file)
actual_path = in_testdir(actual_file)
@@ -1827,13 +1863,13 @@ def grep_output(normaliser, pattern_file, actual_file, is_substring=True):
# on the `diff` program to ignore whitespace changes as much as
# possible (#10152).
-def normalise_whitespace( str ):
+def normalise_whitespace(s: str) -> str:
# Merge contiguous whitespace characters into a single space.
- return ' '.join(str.split())
+ return ' '.join(s.split())
callSite_re = re.compile(r', called at (.+):[\d]+:[\d]+ in [\w\-\.]+:')
-def normalise_callstacks(s):
+def normalise_callstacks(s: str) -> str:
opts = getTestOpts()
def repl(matches):
location = matches.group(1)
@@ -1851,72 +1887,72 @@ def normalise_callstacks(s):
tyCon_re = re.compile(r'TyCon\s*\d+L?\#\#\s*\d+L?\#\#\s*', flags=re.MULTILINE)
-def normalise_type_reps(str):
+def normalise_type_reps(s: str) -> str:
""" Normalise out fingerprints from Typeable TyCon representations """
- return re.sub(tyCon_re, 'TyCon FINGERPRINT FINGERPRINT ', str)
+ return re.sub(tyCon_re, 'TyCon FINGERPRINT FINGERPRINT ', s)
-def normalise_errmsg( str ):
+def normalise_errmsg(s: str) -> str:
"""Normalise error-messages emitted via stderr"""
# IBM AIX's `ld` is a bit chatty
if opsys('aix'):
- str = str.replace('ld: 0706-027 The -x flag is ignored.\n', '')
+ s = s.replace('ld: 0706-027 The -x flag is ignored.\n', '')
# remove " error:" and lower-case " Warning:" to make patch for
# trac issue #10021 smaller
- str = modify_lines(str, lambda l: re.sub(' error:', '', l))
- str = modify_lines(str, lambda l: re.sub(' Warning:', ' warning:', l))
- str = normalise_callstacks(str)
- str = normalise_type_reps(str)
+ s = modify_lines(s, lambda l: re.sub(' error:', '', l))
+ s = modify_lines(s, lambda l: re.sub(' Warning:', ' warning:', l))
+ s = normalise_callstacks(s)
+ s = normalise_type_reps(s)
# If somefile ends in ".exe" or ".exe:", zap ".exe" (for Windows)
# the colon is there because it appears in error messages; this
# hacky solution is used in place of more sophisticated filename
# mangling
- str = re.sub('([^\\s])\\.exe', '\\1', str)
+ s = re.sub('([^\\s])\\.exe', '\\1', s)
# normalise slashes, minimise Windows/Unix filename differences
- str = re.sub('\\\\', '/', str)
+ s = re.sub('\\\\', '/', s)
# The inplace ghc's are called ghc-stage[123] to avoid filename
# collisions, so we need to normalise that to just "ghc"
- str = re.sub('ghc-stage[123]', 'ghc', str)
+ s = re.sub('ghc-stage[123]', 'ghc', s)
# Error messages sometimes contain integer implementation package
- str = re.sub('integer-(gmp|simple)-[0-9.]+', 'integer-<IMPL>-<VERSION>', str)
+ s = re.sub('integer-(gmp|simple)-[0-9.]+', 'integer-<IMPL>-<VERSION>', s)
# Error messages sometimes contain this blurb which can vary
# spuriously depending upon build configuration (e.g. based on integer
# backend)
- str = re.sub('...plus ([a-z]+|[0-9]+) instances involving out-of-scope types',
- '...plus N instances involving out-of-scope types', str)
+ s = re.sub('...plus ([a-z]+|[0-9]+) instances involving out-of-scope types',
+ '...plus N instances involving out-of-scope types', s)
# Also filter out bullet characters. This is because bullets are used to
# separate error sections, and tests shouldn't be sensitive to how the
# the division happens.
- bullet = '•'.encode('utf8') if isinstance(str, bytes) else '•'
- str = str.replace(bullet, '')
+ bullet = '•'.encode('utf8') if isinstance(s, bytes) else '•'
+ s = s.replace(bullet, '')
# Windows only, this is a bug in hsc2hs but it is preventing
# stable output for the testsuite. See #9775. For now we filter out this
# warning message to get clean output.
if config.msys:
- str = re.sub('Failed to remove file (.*); error= (.*)$', '', str)
- str = re.sub('DeleteFile "(.+)": permission denied \(Access is denied\.\)(.*)$', '', str)
+ s = re.sub('Failed to remove file (.*); error= (.*)$', '', s)
+ s = re.sub('DeleteFile "(.+)": permission denied \(Access is denied\.\)(.*)$', '', s)
- return str
+ return s
# normalise a .prof file, so that we can reasonably compare it against
# a sample. This doesn't compare any of the actual profiling data,
# only the shape of the profile and the number of entries.
-def normalise_prof (str):
- # strip everything up to the line beginning "COST CENTRE"
- str = re.sub('^(.*\n)*COST CENTRE[^\n]*\n','',str)
+def normalise_prof (s: str) -> str:
+ # sip everything up to the line beginning "COST CENTRE"
+ s = re.sub('^(.*\n)*COST CENTRE[^\n]*\n','',s)
- # strip results for CAFs, these tend to change unpredictably
- str = re.sub('[ \t]*(CAF|IDLE).*\n','',str)
+ # sip results for CAFs, these tend to change unpredictably
+ s = re.sub('[ \t]*(CAF|IDLE).*\n','',s)
# XXX Ignore Main.main. Sometimes this appears under CAF, and
# sometimes under MAIN.
- str = re.sub('[ \t]*main[ \t]+Main.*\n','',str)
+ s = re.sub('[ \t]*main[ \t]+Main.*\n','',s)
# We have something like this:
#
@@ -1949,72 +1985,76 @@ def normalise_prof (str):
# Split 9 whitespace-separated groups, take columns 1 (cost-centre), 2
# (module), 3 (src), and 5 (entries). SCC names can't have whitespace, so
# this works fine.
- str = re.sub(r'\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*',
- '\\1 \\2 \\3 \\5\n', str)
- return str
+ s = re.sub(r'\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*(\S+)\s*',
+ '\\1 \\2 \\3 \\5\n', s)
+ return s
-def normalise_slashes_( str ):
- str = re.sub('\\\\', '/', str)
- str = re.sub('//', '/', str)
- return str
+def normalise_slashes_( s: str ) -> str:
+ s = re.sub('\\\\', '/', s)
+ s = re.sub('//', '/', s)
+ return s
-def normalise_exe_( str ):
- str = re.sub('\.exe', '', str)
- return str
+def normalise_exe_( s: str ) -> str:
+ s = re.sub('\.exe', '', s)
+ return s
-def normalise_output( str ):
+def normalise_output( s: str ) -> str:
# remove " error:" and lower-case " Warning:" to make patch for
# trac issue #10021 smaller
- str = modify_lines(str, lambda l: re.sub(' error:', '', l))
- str = modify_lines(str, lambda l: re.sub(' Warning:', ' warning:', l))
+ s = modify_lines(s, lambda l: re.sub(' error:', '', l))
+ s = modify_lines(s, lambda l: re.sub(' Warning:', ' warning:', l))
# Remove a .exe extension (for Windows)
# This can occur in error messages generated by the program.
- str = re.sub('([^\\s])\\.exe', '\\1', str)
- str = normalise_callstacks(str)
- str = normalise_type_reps(str)
- return str
-
-def normalise_asm( str ):
- lines = str.split('\n')
- # Only keep instructions and labels not starting with a dot.
+ s = re.sub('([^\\s])\\.exe', '\\1', s)
+ s = normalise_callstacks(s)
+ s = normalise_type_reps(s)
+ return s
+
+def normalise_asm( s: str ) -> str:
+ lines = s.split('\n')
+ # Only keep insuctions and labels not starting with a dot.
metadata = re.compile('^[ \t]*\\..*$')
out = []
for line in lines:
# Drop metadata directives (e.g. ".type")
if not metadata.match(line):
line = re.sub('@plt', '', line)
- instr = line.lstrip().split()
+ ins = line.lstrip().split()
# Drop empty lines.
- if not instr:
+ if not ins:
continue
- # Drop operands, except for call instructions.
- elif instr[0] == 'call':
- out.append(instr[0] + ' ' + instr[1])
+ # Drop operands, except for call insuctions.
+ elif ins[0] == 'call':
+ out.append(ins[0] + ' ' + ins[1])
else:
- out.append(instr[0])
- out = '\n'.join(out)
- return out
+ out.append(ins[0])
+ return '\n'.join(out)
-def if_verbose( n, s ):
+def if_verbose( n: int, s: str ) -> None:
if config.verbose >= n:
print(s)
-def dump_file(f):
+def dump_file(f: Path):
try:
- with io.open(f) as file:
+ with f.open() as file:
print(file.read())
except Exception:
print('')
-def runCmd(cmd, stdin=None, stdout=None, stderr=None, timeout_multiplier=1.0, print_output=False):
+def runCmd(cmd: str,
+ stdin: Union[None, Path]=None,
+ stdout: Union[None, Path]=None,
+ stderr: Union[None, int, Path]=None,
+ timeout_multiplier=1.0,
+ print_output=False) -> int:
timeout_prog = strip_quotes(config.timeout_prog)
timeout = str(int(ceil(config.timeout * timeout_multiplier)))
# Format cmd using config. Example: cmd='{hpc} report A.tix'
cmd = cmd.format(**config.__dict__)
- if_verbose(3, cmd + ('< ' + os.path.basename(stdin) if stdin else ''))
+ if_verbose(3, '%s< %s' % (cmd, stdin.name if isinstance(stdin, Path) else ''))
- stdin_file = io.open(stdin, 'rb') if stdin else None
+ stdin_file = stdin.open('rb') if stdin is not None else None
stdout_buffer = b''
stderr_buffer = b''
@@ -2044,13 +2084,15 @@ def runCmd(cmd, stdin=None, stdout=None, stderr=None, timeout_multiplier=1.0, pr
if stderr_buffer:
sys.stderr.buffer.write(stderr_buffer)
- if stdout:
- with io.open(stdout, 'wb') as f:
- f.write(stdout_buffer)
- if stderr:
- if stderr is not subprocess.STDOUT:
- with io.open(stderr, 'wb') as f:
- f.write(stderr_buffer)
+ if stdout is not None:
+ if isinstance(stdout, Path):
+ stdout.write_bytes(stdout_buffer)
+ else:
+ with io.open(stdout, 'wb') as f:
+ f.write(stdout_buffer)
+ if stderr is not None:
+ if isinstance(stderr, Path):
+ stderr.write_bytes(stderr_buffer)
if r.returncode == 98:
# The python timeout program uses 98 to signal that ^C was pressed
@@ -2063,10 +2105,10 @@ def runCmd(cmd, stdin=None, stdout=None, stderr=None, timeout_multiplier=1.0, pr
# -----------------------------------------------------------------------------
# checking if ghostscript is available for checking the output of hp2ps
-def genGSCmd(psfile):
+def genGSCmd(psfile: Path) -> str:
return '{{gs}} -dNODISPLAY -dBATCH -dQUIET -dNOPAUSE "{0}"'.format(psfile)
-def gsNotWorking():
+def gsNotWorking() -> None:
global gs_working
print("GhostScript not available for hp2ps tests")
@@ -2088,13 +2130,13 @@ if config.have_profiling:
else:
gsNotWorking();
-def add_suffix( name, suffix ):
+def add_suffix( name: Union[str, Path], suffix: str ) -> Path:
if suffix == '':
- return name
+ return Path(name)
else:
- return name + '.' + suffix
+ return Path(str(name) + '.' + suffix)
-def add_hs_lhs_suffix(name):
+def add_hs_lhs_suffix(name: str) -> Path:
if getTestOpts().c_src:
return add_suffix(name, 'c')
elif getTestOpts().cmm_src:
@@ -2108,51 +2150,50 @@ def add_hs_lhs_suffix(name):
else:
return add_suffix(name, 'hs')
-def replace_suffix( name, suffix ):
- base, suf = os.path.splitext(name)
- return base + '.' + suffix
+def in_testdir(name: Union[Path, str], suffix: str='') -> Path:
+ return getTestOpts().testdir / add_suffix(name, suffix)
-def in_testdir(name, suffix=''):
- return os.path.join(getTestOpts().testdir, add_suffix(name, suffix))
+def in_srcdir(name: Union[Path, str], suffix: str='') -> Path:
+ return getTestOpts().srcdir / add_suffix(name, suffix)
-def in_srcdir(name, suffix=''):
- return os.path.join(getTestOpts().srcdir, add_suffix(name, suffix))
-
-def in_statsdir(name, suffix=''):
- return os.path.join(config.stats_files_dir, add_suffix(name, suffix))
+def in_statsdir(name: Union[Path, str], suffix: str='') -> Path:
+ dir = config.stats_files_dir
+ if dir is None:
+ raise TypeError('stats_files_dir is not set')
+ return dir / add_suffix(name, suffix)
# Finding the sample output. The filename is of the form
#
# <test>.stdout[-ws-<wordsize>][-<platform>|-<os>]
#
-def find_expected_file(name, suff):
+def find_expected_file(name: TestName, suff: str) -> Path:
basename = add_suffix(name, suff)
# Override the basename if the user has specified one, this will then be
# subjected to the same name mangling scheme as normal to allow platform
# specific overrides to work.
basename = getTestOpts().use_specs.get (suff, basename)
- files = [basename + ws + plat
+ files = [str(basename) + ws + plat
for plat in ['-' + config.platform, '-' + config.os, '']
for ws in ['-ws-' + config.wordsize, '']]
for f in files:
- if os.path.exists(in_srcdir(f)):
+ if in_srcdir(f).exists():
return f
return basename
if config.msys:
import stat
- def cleanup():
- testdir = getTestOpts().testdir
+ def cleanup() -> None:
+ testdir = getTestOpts().testdir # type: Path
max_attempts = 5
retries = max_attempts
def on_error(function, path, excinfo):
# At least one test (T11489) removes the write bit from a file it
# produces. Windows refuses to delete read-only files with a
# permission error. Try setting the write bit and try again.
- os.chmod(path, stat.S_IWRITE)
+ path.chmod(stat.S_IWRITE)
function(path)
# On Windows we have to retry the delete a couple of times.
@@ -2172,28 +2213,28 @@ if config.msys:
#
# See #13162
exception = None
- while retries > 0 and os.path.exists(testdir):
+ while retries > 0 and testdir.exists():
time.sleep((max_attempts-retries)*6)
try:
- shutil.rmtree(testdir, onerror=on_error, ignore_errors=False)
+ shutil.rmtree(str(testdir), onerror=on_error, ignore_errors=False)
except Exception as e:
exception = e
retries -= 1
- if retries == 0 and os.path.exists(testdir):
+ if retries == 0 and testdir.exists():
raise Exception("Unable to remove folder '%s': %s\nUnable to start current test."
% (testdir, exception))
else:
- def cleanup():
+ def cleanup() -> None:
testdir = getTestOpts().testdir
- if os.path.exists(testdir):
- shutil.rmtree(testdir, ignore_errors=False)
+ if testdir.exists():
+ shutil.rmtree(str(testdir), ignore_errors=False)
# -----------------------------------------------------------------------------
# Return a list of all the files ending in '.T' below directories roots.
-def findTFiles(roots):
+def findTFiles(roots: List[str]) -> Iterator[str]:
for root in roots:
for path, dirs, files in os.walk(root, topdown=True):
# Never pick up .T files in uncleaned .run directories.
@@ -2206,7 +2247,7 @@ def findTFiles(roots):
# -----------------------------------------------------------------------------
# Output a test summary to the specified file object
-def summary(t, file, short=False, color=False):
+def summary(t: TestRun, file: TextIO, short=False, color=False) -> None:
file.write('\n')
printUnexpectedTests(file,
@@ -2281,7 +2322,7 @@ def summary(t, file, short=False, color=False):
if stopping():
file.write('WARNING: Testsuite run was terminated early\n')
-def printUnexpectedTests(file, testInfoss):
+def printUnexpectedTests(file: TextIO, testInfoss):
unexpected = set(result.testname
for testInfos in testInfoss
for result in testInfos
@@ -2291,7 +2332,7 @@ def printUnexpectedTests(file, testInfoss):
file.write('TEST="' + ' '.join(sorted(unexpected)) + '"\n')
file.write('\n')
-def printTestInfosSummary(file, testInfos):
+def printTestInfosSummary(file: TextIO, testInfos):
maxDirLen = max(len(tr.directory) for tr in testInfos)
for result in sorted(testInfos, key=lambda r: (r.testname.lower(), r.way, r.directory)):
directory = result.directory.ljust(maxDirLen)
@@ -2300,7 +2341,7 @@ def printTestInfosSummary(file, testInfos):
directory = directory))
file.write('\n')
-def modify_lines(s, f):
+def modify_lines(s: str, f: Callable[[str], str]) -> str:
s = '\n'.join([f(l) for l in s.splitlines()])
if s and s[-1] != '\n':
# Prevent '\ No newline at end of file' warnings when diffing.