1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
|
#
# Copyright (c) 2010 Testrepository Contributors
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
"""The test command that test repository knows how to run."""
from extras import (
try_import,
try_imports,
)
from collections import defaultdict
ConfigParser = try_imports(['ConfigParser', 'configparser'])
import io
import itertools
import operator
import os.path
import re
import subprocess
import sys
import tempfile
import multiprocessing
from textwrap import dedent
from fixtures import Fixture
v2 = try_import('subunit.v2')
from testrepository import results
from testrepository.testlist import (
parse_enumeration,
write_list,
)
testrconf_help = dedent("""
Configuring via .testr.conf:
---
[DEFAULT]
test_command=foo $IDOPTION
test_id_option=--bar $IDFILE
---
will cause 'testr run' to run 'foo' to execute tests, and
'testr run --failing' will cause 'foo --bar failing.list ' to be run to
execute tests. Shell variables are expanded in these commands on platforms
that have a shell.
The full list of options and variables for .testr.conf:
* filter_tags -- a list of tags which should be used to filter test counts.
This is useful for stripping out non-test results from the subunit stream
such as Zope test layers. These filtered items are still considered for
test failures.
* test_command -- command line to run to execute tests.
* test_id_option -- the value to substitute into test_command when specific
test ids should be run.
* test_id_list_default -- the value to use for $IDLIST when no specific
test ids are being run.
* test_list_option -- the option to use to cause the test runner to report
on the tests it would run, rather than running them. When supplied the
test_command should output on stdout all the test ids that would have
been run if every other option and argument was honoured, one per line.
This is required for parallel testing, and is substituted into $LISTOPT.
* test_run_concurrency -- Optional call out to establish concurrency.
Should return one line containing the number of concurrent test runner
processes to run.
* instance_provision -- provision one or more test run environments.
Accepts $INSTANCE_COUNT for the number of instances desired.
* instance_execute -- execute a test runner process in a given environment.
Accepts $INSTANCE_ID, $FILES and $COMMAND. Paths in $FILES should be
synchronised into the test runner environment filesystem. $COMMAND can
be adjusted if the paths are synched with different names.
* instance_dispose -- dispose of one or more test running environments.
Accepts $INSTANCE_IDS.
* group_regex -- If set group tests by the matched section of the test id.
* $IDOPTION -- the variable to use to trigger running some specific tests.
* $IDFILE -- A file created before the test command is run and deleted
afterwards which contains a list of test ids, one per line. This can
handle test ids with emedded whitespace.
* $IDLIST -- A list of the test ids to run, separated by spaces. IDLIST
defaults to an empty string when no test ids are known and no explicit
default is provided. This will not handle test ids with spaces.
See the testrepository manual for example .testr.conf files in different
programming languages.
""")
class CallWhenProcFinishes(object):
"""Convert a process object to trigger a callback when returncode is set.
This just wraps the entire object and when the returncode attribute access
finds a set value, calls the callback.
"""
def __init__(self, process, callback):
"""Adapt process
:param process: A subprocess.Popen object.
:param callback: The process to call when the process completes.
"""
self._proc = process
self._callback = callback
self._done = False
@property
def stdin(self):
return self._proc.stdin
@property
def stdout(self):
return self._proc.stdout
@property
def stderr(self):
return self._proc.stderr
@property
def returncode(self):
result = self._proc.returncode
if not self._done and result is not None:
self._done = True
self._callback()
return result
def wait(self):
return self._proc.wait()
compiled_re_type = type(re.compile(''))
class TestListingFixture(Fixture):
"""Write a temporary file to disk with test ids in it."""
def __init__(self, test_ids, cmd_template, listopt, idoption, ui,
repository, parallel=True, listpath=None, parser=None,
test_filters=None, instance_source=None, group_callback=None):
"""Create a TestListingFixture.
:param test_ids: The test_ids to use. May be None indicating that
no ids are known and they should be discovered by listing or
configuration if they must be known to run tests. Test ids are
needed to run tests when filtering or partitioning is needed: if
the run concurrency is > 1 partitioning is needed, and filtering is
needed if the user has passed in filters.
:param cmd_template: string to be filled out with
IDFILE.
:param listopt: Option to substitute into LISTOPT to cause test listing
to take place.
:param idoption: Option to substitutde into cmd when supplying any test
ids.
:param ui: The UI in use.
:param repository: The repository to query for test times, if needed.
:param parallel: If not True, prohibit parallel use : used to implement
--parallel run recursively.
:param listpath: The file listing path to use. If None, a unique path
is created.
:param parser: An options parser for reading options from.
:param test_filters: An optional list of test filters to apply. Each
filter should be a string suitable for passing to re.compile.
filters are applied using search() rather than match(), so if
anchoring is needed it should be included in the regex.
The test ids used for executing are the union of all the individual
filters: to take the intersection instead, craft a single regex that
matches all your criteria. Filters are automatically applied by
run_tests(), or can be applied by calling filter_tests(test_ids).
:param instance_source: A source of test run instances. Must support
obtain_instance(max_concurrency) -> id and release_instance(id)
calls.
:param group_callback: If supplied, should be a function that accepts a
test id and returns a group id. A group id is an arbitrary value
used as a dictionary key in the scheduler. All test ids with the
same group id are scheduled onto the same backend test process.
"""
self.test_ids = test_ids
self.template = cmd_template
self.listopt = listopt
self.idoption = idoption
self.ui = ui
self.repository = repository
self.parallel = parallel
self._listpath = listpath
self._parser = parser
self.test_filters = test_filters
self._group_callback = group_callback
self._instance_source = instance_source
def setUp(self):
super(TestListingFixture, self).setUp()
variable_regex = '\$(IDOPTION|IDFILE|IDLIST|LISTOPT)'
variables = {}
list_variables = {'LISTOPT': self.listopt}
cmd = self.template
try:
default_idstr = self._parser.get('DEFAULT', 'test_id_list_default')
list_variables['IDLIST'] = default_idstr
# In theory we should also support casting this into IDFILE etc -
# needs this horrible class refactored.
except ConfigParser.NoOptionError as e:
if e.message != "No option 'test_id_list_default' in section: 'DEFAULT'":
raise
default_idstr = None
def list_subst(match):
return list_variables.get(match.groups(1)[0], '')
self.list_cmd = re.sub(variable_regex, list_subst, cmd)
nonparallel = (not self.parallel or not
getattr(self.ui, 'options', None) or not
getattr(self.ui.options, 'parallel', None))
if nonparallel:
self.concurrency = 1
else:
self.concurrency = self.ui.options.concurrency
if not self.concurrency:
self.concurrency = self.callout_concurrency()
if not self.concurrency:
self.concurrency = self.local_concurrency()
if not self.concurrency:
self.concurrency = 1
if self.test_ids is None:
if self.concurrency == 1:
if default_idstr:
self.test_ids = default_idstr.split()
if self.concurrency != 1 or self.test_filters is not None:
# Have to be able to tell each worker what to run / filter
# tests.
self.test_ids = self.list_tests()
if self.test_ids is None:
# No test ids to supply to the program.
self.list_file_name = None
name = ''
idlist = ''
else:
self.test_ids = self.filter_tests(self.test_ids)
name = self.make_listfile()
variables['IDFILE'] = name
idlist = ' '.join(self.test_ids)
variables['IDLIST'] = idlist
def subst(match):
return variables.get(match.groups(1)[0], '')
if self.test_ids is None:
# No test ids, no id option.
idoption = ''
else:
idoption = re.sub(variable_regex, subst, self.idoption)
variables['IDOPTION'] = idoption
self.cmd = re.sub(variable_regex, subst, cmd)
def make_listfile(self):
name = None
try:
if self._listpath:
name = self._listpath
stream = open(name, 'wb')
else:
fd, name = tempfile.mkstemp()
stream = os.fdopen(fd, 'wb')
self.list_file_name = name
write_list(stream, self.test_ids)
stream.close()
except:
if name:
os.unlink(name)
raise
self.addCleanup(os.unlink, name)
return name
def filter_tests(self, test_ids):
"""Filter test_ids by the test_filters.
:return: A list of test ids.
"""
if self.test_filters is None:
return test_ids
filters = map(re.compile, self.test_filters)
def include(test_id):
for pred in filters:
if pred.search(test_id):
return True
return list(filter(include, test_ids))
def list_tests(self):
"""List the tests returned by list_cmd.
:return: A list of test ids.
"""
if '$LISTOPT' not in self.template:
raise ValueError("LISTOPT not configured in .testr.conf")
instance, list_cmd = self._per_instance_command(self.list_cmd)
try:
self.ui.output_values([('running', list_cmd)])
run_proc = self.ui.subprocess_Popen(list_cmd, shell=True,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
out, err = run_proc.communicate()
if run_proc.returncode != 0:
if v2 is not None:
new_out = io.BytesIO()
v2.ByteStreamToStreamResult(io.BytesIO(out), 'stdout').run(
results.CatFiles(new_out))
out = new_out.getvalue()
self.ui.output_stream(io.BytesIO(out))
self.ui.output_stream(io.BytesIO(err))
raise ValueError(
"Non-zero exit code (%d) from test listing."
% (run_proc.returncode))
ids = parse_enumeration(out)
return ids
finally:
if instance:
self._instance_source.release_instance(instance)
def _per_instance_command(self, cmd):
"""Customise cmd to with an instance-id.
:param concurrency: The number of instances to ask for (used to avoid
death-by-1000 cuts of latency.
"""
if self._instance_source is None:
return None, cmd
instance = self._instance_source.obtain_instance(self.concurrency)
if instance is not None:
try:
instance_prefix = self._parser.get(
'DEFAULT', 'instance_execute')
variables = {
'INSTANCE_ID': instance.decode('utf8'),
'COMMAND': cmd,
# --list-tests cannot use FILES, so handle it being unset.
'FILES': getattr(self, 'list_file_name', None) or '',
}
variable_regex = '\$(INSTANCE_ID|COMMAND|FILES)'
def subst(match):
return variables.get(match.groups(1)[0], '')
cmd = re.sub(variable_regex, subst, instance_prefix)
except ConfigParser.NoOptionError:
# Per-instance execution environment not configured.
pass
return instance, cmd
def run_tests(self):
"""Run the tests defined by the command and ui.
:return: A list of spawned processes.
"""
result = []
test_ids = self.test_ids
if self.concurrency == 1 and (test_ids is None or test_ids):
# Have to customise cmd here, as instances are allocated
# just-in-time. XXX: Indicates this whole region needs refactoring.
instance, cmd = self._per_instance_command(self.cmd)
self.ui.output_values([('running', cmd)])
run_proc = self.ui.subprocess_Popen(cmd, shell=True,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
# Prevent processes stalling if they read from stdin; we could
# pass this through in future, but there is no point doing that
# until we have a working can-run-debugger-inline story.
run_proc.stdin.close()
if instance:
return [CallWhenProcFinishes(run_proc,
lambda:self._instance_source.release_instance(instance))]
else:
return [run_proc]
test_id_groups = self.partition_tests(test_ids, self.concurrency)
for test_ids in test_id_groups:
if not test_ids:
# No tests in this partition
continue
fixture = self.useFixture(TestListingFixture(test_ids,
self.template, self.listopt, self.idoption, self.ui,
self.repository, parallel=False, parser=self._parser,
instance_source=self._instance_source))
result.extend(fixture.run_tests())
return result
def partition_tests(self, test_ids, concurrency):
"""Parition test_ids by concurrency.
Test durations from the repository are used to get partitions which
have roughly the same expected runtime. New tests - those with no
recorded duration - are allocated in round-robin fashion to the
partitions created using test durations.
:return: A list where each element is a distinct subset of test_ids,
and the union of all the elements is equal to set(test_ids).
"""
partitions = [list() for i in range(concurrency)]
timed_partitions = [[0.0, partition] for partition in partitions]
time_data = self.repository.get_test_times(test_ids)
timed_tests = time_data['known']
unknown_tests = time_data['unknown']
# Group tests: generate group_id -> test_ids.
group_ids = defaultdict(list)
if self._group_callback is None:
group_callback = lambda _:None
else:
group_callback = self._group_callback
for test_id in test_ids:
group_id = group_callback(test_id) or test_id
group_ids[group_id].append(test_id)
# Time groups: generate three sets of groups:
# - fully timed dict(group_id -> time),
# - partially timed dict(group_id -> time) and
# - unknown (set of group_id)
# We may in future treat partially timed different for scheduling, but
# at least today we just schedule them after the fully timed groups.
timed = {}
partial = {}
unknown = []
for group_id, group_tests in group_ids.items():
untimed_ids = unknown_tests.intersection(group_tests)
group_time = sum([timed_tests[test_id]
for test_id in untimed_ids.symmetric_difference(group_tests)])
if not untimed_ids:
timed[group_id] = group_time
elif group_time:
partial[group_id] = group_time
else:
unknown.append(group_id)
# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
# sort the groups by time
# allocate to partitions by putting each group in to the partition with
# the current (lowest time, shortest length[in tests])
def consume_queue(groups):
queue = sorted(
groups.items(), key=operator.itemgetter(1), reverse=True)
for group_id, duration in queue:
timed_partitions[0][0] = timed_partitions[0][0] + duration
timed_partitions[0][1].extend(group_ids[group_id])
timed_partitions.sort(key=lambda item:(item[0], len(item[1])))
consume_queue(timed)
consume_queue(partial)
# Assign groups with entirely unknown times in round robin fashion to
# the partitions.
for partition, group_id in zip(itertools.cycle(partitions), unknown):
partition.extend(group_ids[group_id])
return partitions
def callout_concurrency(self):
"""Callout for user defined concurrency."""
try:
concurrency_cmd = self._parser.get(
'DEFAULT', 'test_run_concurrency')
except ConfigParser.NoOptionError:
return None
run_proc = self.ui.subprocess_Popen(concurrency_cmd, shell=True,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
out, err = run_proc.communicate()
if run_proc.returncode:
raise ValueError(
"test_run_concurrency failed: exit code %d, stderr='%s'" % (
run_proc.returncode, err.decode('utf8', 'replace')))
return int(out.strip())
def local_concurrency(self):
try:
return multiprocessing.cpu_count()
except NotImplementedError:
# No concurrency logic known.
return None
class TestCommand(Fixture):
"""Represents the test command defined in .testr.conf.
:ivar run_factory: The fixture to use to execute a command.
:ivar oldschool: Use failing.list rather than a unique file path.
TestCommand is a Fixture. Many uses of it will not require it to be setUp,
but calling get_run_command does require it: the fixture state is used to
track test environment instances, which are disposed of when cleanUp
happens. This is not done per-run-command, because test bisection (amongst
other things) uses multiple get_run_command configurations.
"""
run_factory = TestListingFixture
oldschool = False
def __init__(self, ui, repository):
"""Create a TestCommand.
:param ui: A testrepository.ui.UI object which is used to obtain the
location of the .testr.conf.
:param repository: A testrepository.repository.Repository used for
determining test times when partitioning tests.
"""
super(TestCommand, self).__init__()
self.ui = ui
self.repository = repository
self._instances = None
self._allocated_instances = None
def setUp(self):
super(TestCommand, self).setUp()
self._instances = set()
self._allocated_instances = set()
self.addCleanup(self._dispose_instances)
def _dispose_instances(self):
instances = self._instances
if instances is None:
return
self._instances = None
self._allocated_instances = None
try:
dispose_cmd = self.get_parser().get('DEFAULT', 'instance_dispose')
except (ValueError, ConfigParser.NoOptionError):
return
variable_regex = '\$INSTANCE_IDS'
dispose_cmd = re.sub(variable_regex, ' '.join(sorted(instance.decode('utf') for instance in instances)),
dispose_cmd)
self.ui.output_values([('running', dispose_cmd)])
run_proc = self.ui.subprocess_Popen(dispose_cmd, shell=True)
run_proc.communicate()
if run_proc.returncode:
raise ValueError('Disposing of instances failed, return %d' %
run_proc.returncode)
def get_parser(self):
"""Get a parser with the .testr.conf in it."""
parser = ConfigParser.ConfigParser()
# This possibly should push down into UI.
if self.ui.here == 'memory:':
return parser
if not parser.read(os.path.join(self.ui.here, '.testr.conf')):
raise ValueError("No .testr.conf config file")
return parser
def get_run_command(self, test_ids=None, testargs=(), test_filters=None):
"""Get the command that would be run to run tests.
See TestListingFixture for the definition of test_ids and test_filters.
"""
if self._instances is None:
raise TypeError('TestCommand not setUp')
parser = self.get_parser()
try:
command = parser.get('DEFAULT', 'test_command')
except ConfigParser.NoOptionError as e:
if e.message != "No option 'test_command' in section: 'DEFAULT'":
raise
raise ValueError("No test_command option present in .testr.conf")
elements = [command] + list(testargs)
cmd = ' '.join(elements)
idoption = ''
if '$IDOPTION' in command:
# IDOPTION is used, we must have it configured.
try:
idoption = parser.get('DEFAULT', 'test_id_option')
except ConfigParser.NoOptionError as e:
if e.message != "No option 'test_id_option' in section: 'DEFAULT'":
raise
raise ValueError("No test_id_option option present in .testr.conf")
listopt = ''
if '$LISTOPT' in command:
# LISTOPT is used, test_list_option must be configured.
try:
listopt = parser.get('DEFAULT', 'test_list_option')
except ConfigParser.NoOptionError as e:
if e.message != "No option 'test_list_option' in section: 'DEFAULT'":
raise
raise ValueError("No test_list_option option present in .testr.conf")
try:
group_regex = parser.get('DEFAULT', 'group_regex')
except ConfigParser.NoOptionError:
group_regex = None
if group_regex:
def group_callback(test_id, regex=re.compile(group_regex)):
match = regex.match(test_id)
if match:
return match.group(0)
else:
group_callback = None
if self.oldschool:
listpath = os.path.join(self.ui.here, 'failing.list')
result = self.run_factory(test_ids, cmd, listopt, idoption,
self.ui, self.repository, listpath=listpath, parser=parser,
test_filters=test_filters, instance_source=self,
group_callback=group_callback)
else:
result = self.run_factory(test_ids, cmd, listopt, idoption,
self.ui, self.repository, parser=parser,
test_filters=test_filters, instance_source=self,
group_callback=group_callback)
return result
def get_filter_tags(self):
parser = self.get_parser()
try:
tags = parser.get('DEFAULT', 'filter_tags')
except ConfigParser.NoOptionError as e:
if e.message != "No option 'filter_tags' in section: 'DEFAULT'":
raise
return set()
return set([tag.strip() for tag in tags.split()])
def obtain_instance(self, concurrency):
"""If possible, get one or more test run environment instance ids.
Note this is not threadsafe: calling it from multiple threads would
likely result in shared results.
"""
while len(self._instances) < concurrency:
try:
cmd = self.get_parser().get('DEFAULT', 'instance_provision')
except ConfigParser.NoOptionError:
# Instance allocation not configured
return None
variable_regex = '\$INSTANCE_COUNT'
cmd = re.sub(variable_regex,
str(concurrency - len(self._instances)), cmd)
self.ui.output_values([('running', cmd)])
proc = self.ui.subprocess_Popen(
cmd, shell=True, stdout=subprocess.PIPE)
out, _ = proc.communicate()
if proc.returncode:
raise ValueError('Provisioning instances failed, return %d' %
proc.returncode)
new_instances = set([item.strip() for item in out.split()])
self._instances.update(new_instances)
# Cached first.
available_instances = self._instances - self._allocated_instances
# We only ask for instances when one should be available.
result = available_instances.pop()
self._allocated_instances.add(result)
return result
def release_instance(self, instance_id):
"""Return instance_ids to the pool for reuse."""
self._allocated_instances.remove(instance_id)
|