1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
import os
import platform
import subprocess
import shutil
from pathlib import PurePath, Path
import threading
from my_typing import *
# Workaround for #17483. msys2 Python's wrapper for stat (called by
# Path.exists()) appears to fail randomly with errno=0. Work around this by
# retrying 10 times in the event that this happens.
if os.name == 'nt':
# Oh the horror...
def new_stat(self):
for i in range(10):
try:
result = new_stat.stat(self)
return result
except OSError as e:
if e.errno != 0:
raise e
new_stat.stat = Path.stat # type: ignore
Path.stat = new_stat # type: ignore
PassFail = NamedTuple('PassFail',
[('passFail', str),
('reason', str),
('tag', Optional[str]),
('stderr', Optional[str]),
('stdout', Optional[str]),
('hc_opts', Optional[str]),
])
def passed(hc_opts=None) -> PassFail:
return PassFail(passFail='pass',
reason='',
tag=None,
stderr=None,
stdout=None,
hc_opts=hc_opts)
def failBecause(reason: str,
tag: str=None,
stderr: str=None,
stdout: str=None
) -> PassFail:
return PassFail(passFail='fail', reason=reason, tag=tag,
stderr=stderr, stdout=stdout, hc_opts=None)
def strip_quotes(s: str) -> str:
# Don't wrap commands to subprocess.call/Popen in quotes.
return s.strip('\'"')
def str_fail(s: str) -> str:
return '\033[1m\033[31m' + s + '\033[0m'
def str_pass(s: str) -> str:
return '\033[1m\033[32m' + s + '\033[0m'
def str_warn(s: str) -> str:
return '\033[1m\033[33m' + s + '\033[0m'
def str_info(s: str) -> str:
return '\033[1m\033[34m' + s + '\033[0m'
def getStdout(cmd_and_args: List[str]):
# Can't use subprocess.check_output, since we also verify that
# no stderr was produced
p = subprocess.Popen([strip_quotes(cmd_and_args[0])] + cmd_and_args[1:],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
r = p.wait()
if r != 0:
raise Exception("Command failed: " + str(cmd_and_args))
if stderr:
raise Exception("stderr from command: %s\nOutput:\n%s\n" % (cmd_and_args, stderr))
return stdout.decode('utf-8')
def lndir(srcdir: Path, dstdir: Path):
# Create symlinks for all files in src directory.
# Not all developers might have lndir installed.
# os.system('lndir -silent {0} {1}'.format(srcdir, dstdir))
for filename in srcdir.iterdir():
base = filename.relative_to(srcdir)
src = srcdir / base
dst = dstdir / base
if src.is_file():
link_or_copy_file(src, dst)
else:
dst.mkdir()
lndir(src, dst)
# All possible test metric strings.
def testing_metrics():
return ['bytes allocated', 'peak_megabytes_allocated', 'max_bytes_used']
# On Windows, os.symlink is not defined with Python 2.7, but is in Python 3
# when using msys2, as GHC does. Unfortunately, only Administrative users have
# the privileges necessary to create symbolic links by default. Consequently we
# are forced to just copy instead.
#
# We define the following function to make this magic more
# explicit/discoverable. You are enouraged to use it instead of os.symlink.
if platform.system() == 'Windows' and os.getenv('FORCE_SYMLINKS') == None:
def link_or_copy_file(src: Path, dst: Path):
shutil.copyfile(str(src), str(dst))
else:
def link_or_copy_file(src: Path, dst: Path):
os.symlink(str(src), str(dst))
class Watcher(object):
def __init__(self, count: int) -> None:
self.pool = count
self.evt = threading.Event()
self.sync_lock = threading.Lock()
if count <= 0:
self.evt.set()
def wait(self):
self.evt.wait()
def notify(self):
self.sync_lock.acquire()
self.pool -= 1
if self.pool <= 0:
self.evt.set()
self.sync_lock.release()
|