summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2012-12-20 01:03:29 +1300
committerRobert Collins <robertc@robertcollins.net>2012-12-20 01:03:29 +1300
commitae843d2261bc966ff22282990e56b206dfc6e5dc (patch)
tree8f7354aea2bcdc54729ebac4282b024b4865cc5b
parent3bf3afd5b96d8e00f03a49b1e4e0f0b3bf553353 (diff)
parentc913fc7a169c0d3cc0b39f3d96a6ef2935ce48c7 (diff)
downloadtestrepository-ae843d2261bc966ff22282990e56b206dfc6e5dc.tar.gz
* It's now possible to configure ``test_run_concurrency`` in ``.testr.conf``
to have concurrency defined by a callout. (Robert Collins) * Testr supports running tests in arbitrary environments. See ``Remote or isolated test environments`` in MANUAL.txt / ``testr help run`` (Robert Collins)
-rw-r--r--NEWS21
-rw-r--r--doc/MANUAL.txt106
-rw-r--r--testrepository/commands/list_tests.py36
-rw-r--r--testrepository/commands/run.py182
-rw-r--r--testrepository/testcommand.py217
-rw-r--r--testrepository/tests/test_testcommand.py233
6 files changed, 667 insertions, 128 deletions
diff --git a/NEWS b/NEWS
index 74f56e6..dbdc8e8 100644
--- a/NEWS
+++ b/NEWS
@@ -5,6 +5,27 @@ testrepository release notes
NEXT (In development)
+++++++++++++++++++++
+IMPROVEMENTS
+------------
+
+* It's now possible to configure ``test_run_concurrency`` in ``.testr.conf``
+ to have concurrency defined by a callout. (Robert Collins)
+
+* Testr supports running tests in arbitrary environments. See ``Remote or
+ isolated test environments`` in MANUAL.txt / ``testr help run``
+ (Robert Collins)
+
+INTERNALS
+---------
+
+* TestCommand is now a fixture. This is used to ensure cached test instances
+ are disposed of - if using the object to run or list tests, you will need
+ to adjust your calls. (Robert Collins)
+
+* ``TestCommand`` now offers, and ``TestListingFixture`` consumes a small
+ protocol for obtaining and releasing test execution instances.
+ (Robert Collins)
+
0.0.9
+++++
diff --git a/doc/MANUAL.txt b/doc/MANUAL.txt
index 595206f..ceba327 100644
--- a/doc/MANUAL.txt
+++ b/doc/MANUAL.txt
@@ -172,10 +172,25 @@ systems, or if you need to control the number of workers that are used, the
$ testr run --parallel --concurrency=2
+A more granular interface is available too - if you insert into .testr.conf::
+
+ test_run_concurrency=foo bar
+
+Then when testr needs to determine concurrency, it will run that command and
+read the first line from stdout, cast that to an int, and use that as the
+number of partitions to create. A count of 0 is interpreted to mean one
+partition per test. For instance in .test.conf::
+
+ test_run_concurrency=echo 2
+
+Would tell testr to use concurrency of 2.
+
When running tests in parallel, testrepository tags each test with a tag for
-the worker that executed the test. The tags are of the form ``worker-%d`` and
-are usually used to reproduce test isolation failures, where knowing exactly
-what test ran on a given backend is important.
+the worker that executed the test. The tags are of the form ``worker-%d``
+and are usually used to reproduce test isolation failures, where knowing
+exactly what test ran on a given backend is important. The %d that is
+substituted in is the partition number of tests from the test run - all tests
+in a single run with the same worker-N ran in the same test runner instance.
To find out which slave a failing test ran on just look at the 'tags' line in
its test error::
@@ -190,6 +205,91 @@ And then find tests with that tag::
$ testr last --subunit | subunit-filter -s --xfail --with-tag=worker-3 | subunit-ls > slave-3.list
+Remote or isolated test environments
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+A common problem with parallel test running is test runners that use global
+resources such as well known ports, well known database names or predictable
+directories on disk.
+
+One way to solve this is to setup isolated environments such as chroots,
+containers or even separate machines. Such environments typically require
+some coordination when being used to run tests, so testr provides an explicit
+model for working with them.
+
+The model testr has is intended to support both developers working
+incrementally on a change and CI systems running tests in a one-off setup,
+for both statically and dynamically provisioned environments.
+
+The process testr follows is:
+
+# The user should perform any one-time or once-per-session setup. For instance,
+ checking out source code, creating a template container, sourcing your cloud
+ credentials.
+
+# Execute testr run.
+
+# testr queries for concurrency.
+
+# testr will make a callout request to provision that many instances.
+ The provisioning callout needs to synchronise source code and do any other
+ per-instance setup at this stage.
+
+# testr will make callouts to execute tests, supplying files that should be
+ copied into the execution environment. Note that instances may be used for
+ more than one command execution.
+
+# testr will callout to dispose of the instances after the test run completes.
+
+Instances may be expensive to create and dispose of. testr does not perform
+any caching, but the callout pattern is intended to facilitate external
+caching - the provisioning callout can be used to pull environments out of
+a cache, and the dispose to just return it to the cache.
+
+Configuring environment support
+-------------------------------
+
+There are three callouts that testrepository depends on - configured in
+.testr.conf as usual. For instance::
+
+ instance_provision=foo -c $INSTANCE_COUNT
+ instance_dispose=bar $INSTANCE_IDS
+ instance_execute=quux $INSTANCE_ID $FILES -- $COMMAND
+
+These should operate as follows:
+
+* instance_provision should start up the number of instances provided in the
+ $INSTANCE_COUNT parameter. It should print out on stdout the instance ids
+ that testr should supply to the dispose and execute commands. There should
+ be no other output on stdout (stderr is entirely up for grabs). An exit code
+ of non-zero will cause testr to consider the command to have failed. A
+ provisioned instance should be able to execute the list tests command and
+ execute tests commands that testr will run via the instance_execute callout.
+ Its possible to lazy-provision things if you desire - testr doesn't care -
+ but to reduce latency we suggest performing any rsync or other code
+ synchronisation steps during the provision step, as testr may make multiple
+ calls to one environment, and re-doing costly operations on each command
+ execution would impair performance.
+
+* instance_dispose should take a list of instance ids and get rid of them
+ this might mean putting them back in a pool of instances, or powering them
+ off, or terminating them - whatever makes sense for your project.
+
+* instance_execute should accept an instance id, a list of files that need to
+ be copied into the instance and a command to run within the instance. It
+ needs to copy those files into the instance (it may adjust their paths if
+ desired). If the paths are adjusted, the same paths within $COMMAND should be
+ adjusted to match. Execution that takes place with a shared filesystem can
+ obviously skip file copying or adjusting (and the $FILES parameter). When the
+ instance_execute terminates, it should use the exit code that the command
+ used within the instance. Stdout and stderr from instance_execute are
+ presumed to be that of $COMMAND. In particular, stdout is where the subunit
+ test output, and subunit test listing output, are expected, and putting other
+ output into stdout can lead to surprising results - such as corrupting the
+ subunit stream.
+ instance_execute is invoked for both test listing and test executing
+ callouts.
+
Hiding tests
~~~~~~~~~~~~
diff --git a/testrepository/commands/list_tests.py b/testrepository/commands/list_tests.py
index 38241bc..77433c5 100644
--- a/testrepository/commands/list_tests.py
+++ b/testrepository/commands/list_tests.py
@@ -39,21 +39,25 @@ class list_tests(Command):
filters = None
if self.ui.arguments['testfilters']:
filters = self.ui.arguments['testfilters']
- cmd = testcommand.get_run_command(ids, self.ui.arguments['testargs'],
- test_filters=filters)
- cmd.setUp()
+ testcommand.setUp()
try:
- # Ugh.
- # List tests if the fixture has not already needed to to filter.
- if filters is None:
- ids = cmd.list_tests()
- else:
- ids = cmd.test_ids
- stream = StringIO()
- for id in ids:
- stream.write('%s\n' % id)
- stream.seek(0)
- self.ui.output_stream(stream)
- return 0
+ cmd = testcommand.get_run_command(
+ ids, self.ui.arguments['testargs'], test_filters=filters)
+ cmd.setUp()
+ try:
+ # Ugh.
+ # List tests if the fixture has not already needed to to filter.
+ if filters is None:
+ ids = cmd.list_tests()
+ else:
+ ids = cmd.test_ids
+ stream = StringIO()
+ for id in ids:
+ stream.write('%s\n' % id)
+ stream.seek(0)
+ self.ui.output_stream(stream)
+ return 0
+ finally:
+ cmd.cleanUp()
finally:
- cmd.cleanUp()
+ testcommand.cleanUp()
diff --git a/testrepository/commands/run.py b/testrepository/commands/run.py
index 076e79a..c865fea 100644
--- a/testrepository/commands/run.py
+++ b/testrepository/commands/run.py
@@ -165,96 +165,100 @@ class run(Command):
else:
filters = None
testcommand = self.command_factory(self.ui, repo)
- if not self.ui.options.analyze_isolation:
- cmd = testcommand.get_run_command(ids, self.ui.arguments['testargs'],
- test_filters = filters)
- return self._run_tests(cmd)
- else:
- # Where do we source data about the cause of conflicts.
- # XXX: Should instead capture the run id in with the failing test
- # data so that we can deal with failures split across many partial
- # runs.
- latest_run = repo.get_latest_run()
- # Stage one: reduce the list of failing tests (possibly further
- # reduced by testfilters) to eliminate fails-on-own tests.
- spurious_failures = set()
- for test_id in ids:
- cmd = testcommand.get_run_command([test_id],
- self.ui.arguments['testargs'], test_filters = filters)
- if not self._run_tests(cmd):
- # If the test was filtered, it won't have been run.
- if test_id in repo.get_test_ids(repo.latest_id()):
- spurious_failures.add(test_id)
- # This is arguably ugly, why not just tell the system that
- # a pass here isn't a real pass? [so that when we find a
- # test that is spuriously failing, we don't forget
- # that it is actually failng.
- # Alternatively, perhaps this is a case for data mining:
- # when a test starts passing, keep a journal, and allow
- # digging back in time to see that it was a failure,
- # what it failed with etc...
- # The current solution is to just let it get marked as
- # a pass temporarily.
- if not spurious_failures:
- # All done.
- return 0
- # spurious-failure -> cause.
- test_conflicts = {}
- for spurious_failure in spurious_failures:
- candidate_causes = self._prior_tests(
- latest_run, spurious_failure)
- bottom = 0
- top = len(candidate_causes)
- width = top - bottom
- while width:
- check_width = int(ceil(width / 2.0))
- cmd = testcommand.get_run_command(
- candidate_causes[bottom:bottom + check_width]
- + [spurious_failure],
- self.ui.arguments['testargs'])
- self._run_tests(cmd)
- # check that the test we're probing still failed - still
- # awkward.
- found_fail = []
- def find_fail(test, status, start_time, stop_time, tags,
- details):
- if test.id() == spurious_failure:
- found_fail.append(True)
- checker = TestByTestResult(find_fail)
- checker.startTestRun()
- try:
- repo.get_failing().get_test().run(checker)
- finally:
- checker.stopTestRun()
- if found_fail:
- # Our conflict is in bottom - clamp the range down.
- top = bottom + check_width
- if width == 1:
- # found the cause
- test_conflicts[
- spurious_failure] = candidate_causes[bottom]
- width = 0
- else:
- width = top - bottom
- else:
- # Conflict in the range we did not run: discard bottom.
- bottom = bottom + check_width
- if width == 1:
- # there will be no more to check, so we didn't
- # reproduce the failure.
- width = 0
+ testcommand.setUp()
+ try:
+ if not self.ui.options.analyze_isolation:
+ cmd = testcommand.get_run_command(ids, self.ui.arguments['testargs'],
+ test_filters = filters)
+ return self._run_tests(cmd)
+ else:
+ # Where do we source data about the cause of conflicts.
+ # XXX: Should instead capture the run id in with the failing test
+ # data so that we can deal with failures split across many partial
+ # runs.
+ latest_run = repo.get_latest_run()
+ # Stage one: reduce the list of failing tests (possibly further
+ # reduced by testfilters) to eliminate fails-on-own tests.
+ spurious_failures = set()
+ for test_id in ids:
+ cmd = testcommand.get_run_command([test_id],
+ self.ui.arguments['testargs'], test_filters = filters)
+ if not self._run_tests(cmd):
+ # If the test was filtered, it won't have been run.
+ if test_id in repo.get_test_ids(repo.latest_id()):
+ spurious_failures.add(test_id)
+ # This is arguably ugly, why not just tell the system that
+ # a pass here isn't a real pass? [so that when we find a
+ # test that is spuriously failing, we don't forget
+ # that it is actually failng.
+ # Alternatively, perhaps this is a case for data mining:
+ # when a test starts passing, keep a journal, and allow
+ # digging back in time to see that it was a failure,
+ # what it failed with etc...
+ # The current solution is to just let it get marked as
+ # a pass temporarily.
+ if not spurious_failures:
+ # All done.
+ return 0
+ # spurious-failure -> cause.
+ test_conflicts = {}
+ for spurious_failure in spurious_failures:
+ candidate_causes = self._prior_tests(
+ latest_run, spurious_failure)
+ bottom = 0
+ top = len(candidate_causes)
+ width = top - bottom
+ while width:
+ check_width = int(ceil(width / 2.0))
+ cmd = testcommand.get_run_command(
+ candidate_causes[bottom:bottom + check_width]
+ + [spurious_failure],
+ self.ui.arguments['testargs'])
+ self._run_tests(cmd)
+ # check that the test we're probing still failed - still
+ # awkward.
+ found_fail = []
+ def find_fail(test, status, start_time, stop_time, tags,
+ details):
+ if test.id() == spurious_failure:
+ found_fail.append(True)
+ checker = TestByTestResult(find_fail)
+ checker.startTestRun()
+ try:
+ repo.get_failing().get_test().run(checker)
+ finally:
+ checker.stopTestRun()
+ if found_fail:
+ # Our conflict is in bottom - clamp the range down.
+ top = bottom + check_width
+ if width == 1:
+ # found the cause
+ test_conflicts[
+ spurious_failure] = candidate_causes[bottom]
+ width = 0
+ else:
+ width = top - bottom
else:
- width = top - bottom
- if spurious_failure not in test_conflicts:
- # Could not determine cause
- test_conflicts[spurious_failure] = 'unknown - no conflicts'
- if test_conflicts:
- table = [('failing test', 'caused by test')]
- for failure, causes in test_conflicts.items():
- table.append((failure, causes))
- self.ui.output_table(table)
- return 3
- return 0
+ # Conflict in the range we did not run: discard bottom.
+ bottom = bottom + check_width
+ if width == 1:
+ # there will be no more to check, so we didn't
+ # reproduce the failure.
+ width = 0
+ else:
+ width = top - bottom
+ if spurious_failure not in test_conflicts:
+ # Could not determine cause
+ test_conflicts[spurious_failure] = 'unknown - no conflicts'
+ if test_conflicts:
+ table = [('failing test', 'caused by test')]
+ for failure, causes in test_conflicts.items():
+ table.append((failure, causes))
+ self.ui.output_table(table)
+ return 3
+ return 0
+ finally:
+ testcommand.cleanUp()
def _prior_tests(self, run, failing_id):
"""Calculate what tests from the test run run ran before test_id.
diff --git a/testrepository/testcommand.py b/testrepository/testcommand.py
index 84fcf4c..6b7053e 100644
--- a/testrepository/testcommand.py
+++ b/testrepository/testcommand.py
@@ -58,6 +58,17 @@ testrconf_help = dedent("""
test_command should output on stdout all the test ids that would have
been run if every other option and argument was honoured, one per line.
This is required for parallel testing, and is substituted into $LISTOPT.
+ * test_run_concurrency -- Optional call out to establish concurrency.
+ Should return one line containing the number of concurrent test runner
+ processes to run.
+ * instance_provision -- provision one or more test run environments.
+ Accepts $INSTANCE_COUNT for the number of instances desired.
+ * instance_execute -- execute a test runner process in a given environment.
+ Accepts $INSTANCE_ID, $FILES and $COMMAND. Paths in $FILES should be
+ synchronised into the test runner environment filesystem. $COMMAND can
+ be adjusted if the paths are synched with different names.
+ * instance_dispose -- dispose of one or more test running environments.
+ Accepts $INSTANCE_IDS.
* $IDOPTION -- the variable to use to trigger running some specific tests.
* $IDFILE -- A file created before the test command is run and deleted
afterwards which contains a list of test ids, one per line. This can
@@ -72,6 +83,47 @@ testrconf_help = dedent("""
""")
+class CallWhenProcFinishes(object):
+ """Convert a process object to trigger a callback when returncode is set.
+
+ This just wraps the entire object and when the returncode attribute access
+ finds a set value, calls the callback.
+ """
+
+ def __init__(self, process, callback):
+ """Adapt process
+
+ :param process: A subprocess.Popen object.
+ :param callback: The process to call when the process completes.
+ """
+ self._proc = process
+ self._callback = callback
+ self._done = False
+
+ @property
+ def stdin(self):
+ return self._proc.stdin
+
+ @property
+ def stdout(self):
+ return self._proc.stdout
+
+ @property
+ def stderr(self):
+ return self._proc.stderr
+
+ @property
+ def returncode(self):
+ result = self._proc.returncode
+ if not self._done and result is not None:
+ self._done = True
+ self._callback()
+ return result
+
+ def wait(self):
+ return self._proc.wait()
+
+
compiled_re_type = type(re.compile(''))
class TestListingFixture(Fixture):
@@ -79,7 +131,7 @@ class TestListingFixture(Fixture):
def __init__(self, test_ids, cmd_template, listopt, idoption, ui,
repository, parallel=True, listpath=None, parser=None,
- test_filters=None):
+ test_filters=None, instance_source=None):
"""Create a TestListingFixture.
:param test_ids: The test_ids to use. May be None indicating that
@@ -109,6 +161,9 @@ class TestListingFixture(Fixture):
filters: to take the intersection instead, craft a single regex that
matches all your criteria. Filters are automatically applied by
run_tests(), or can be applied by calling filter_tests(test_ids).
+ :param instance_source: A source of test run instances. Must support
+ obtain_instance(max_concurrency) -> id and release_instance(id)
+ calls.
"""
self.test_ids = test_ids
self.template = cmd_template
@@ -120,6 +175,7 @@ class TestListingFixture(Fixture):
self._listpath = listpath
self._parser = parser
self.test_filters = test_filters
+ self._instance_source = instance_source
def setUp(self):
super(TestListingFixture, self).setUp()
@@ -147,6 +203,8 @@ class TestListingFixture(Fixture):
else:
self.concurrency = self.ui.options.concurrency
if not self.concurrency:
+ self.concurrency = self.callout_concurrency()
+ if not self.concurrency:
self.concurrency = self.local_concurrency()
if not self.concurrency:
self.concurrency = 1
@@ -219,13 +277,46 @@ class TestListingFixture(Fixture):
"""
if '$LISTOPT' not in self.template:
raise ValueError("LISTOPT not configured in .testr.conf")
- self.ui.output_values([('running', self.list_cmd)])
- run_proc = self.ui.subprocess_Popen(self.list_cmd, shell=True,
- stdout=subprocess.PIPE, stdin=subprocess.PIPE)
- out, err = run_proc.communicate()
- # Should we raise on non-zero exit?
- ids = parse_list(out)
- return ids
+ instance, list_cmd = self._per_instance_command(self.list_cmd)
+ try:
+ self.ui.output_values([('running', list_cmd)])
+ run_proc = self.ui.subprocess_Popen(list_cmd, shell=True,
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
+ out, err = run_proc.communicate()
+ # Should we raise on non-zero exit?
+ ids = parse_list(out)
+ return ids
+ finally:
+ if instance:
+ self._instance_source.release_instance(instance)
+
+ def _per_instance_command(self, cmd):
+ """Customise cmd to with an instance-id.
+
+ :param concurrency: The number of instances to ask for (used to avoid
+ death-by-1000 cuts of latency.
+ """
+ if self._instance_source is None:
+ return None, cmd
+ instance = self._instance_source.obtain_instance(self.concurrency)
+ if instance is not None:
+ try:
+ instance_prefix = self._parser.get(
+ 'DEFAULT', 'instance_execute')
+ variables = {
+ 'INSTANCE_ID': instance,
+ 'COMMAND': cmd,
+ # --list-tests cannot use FILES, so handle it being unset.
+ 'FILES': getattr(self, 'list_file_name', None) or '',
+ }
+ variable_regex = '\$(INSTANCE_ID|COMMAND|FILES)'
+ def subst(match):
+ return variables.get(match.groups(1)[0], '')
+ cmd = re.sub(variable_regex, subst, instance_prefix)
+ except ConfigParser.NoOptionError:
+ # Per-instance execution environment not configured.
+ pass
+ return instance, cmd
def run_tests(self):
"""Run the tests defined by the command and ui.
@@ -235,14 +326,21 @@ class TestListingFixture(Fixture):
result = []
test_ids = self.test_ids
if self.concurrency == 1 and (test_ids is None or test_ids):
- self.ui.output_values([('running', self.cmd)])
- run_proc = self.ui.subprocess_Popen(self.cmd, shell=True,
+ # Have to customise cmd here, as instances are allocated
+ # just-in-time. XXX: Indicates this whole region needs refactoring.
+ instance, cmd = self._per_instance_command(self.cmd)
+ self.ui.output_values([('running', cmd)])
+ run_proc = self.ui.subprocess_Popen(cmd, shell=True,
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
# Prevent processes stalling if they read from stdin; we could
# pass this through in future, but there is no point doing that
# until we have a working can-run-debugger-inline story.
run_proc.stdin.close()
- return [run_proc]
+ if instance:
+ return [CallWhenProcFinishes(run_proc,
+ lambda:self._instance_source.release_instance(instance))]
+ else:
+ return [run_proc]
test_id_groups = self.partition_tests(test_ids, self.concurrency)
for test_ids in test_id_groups:
if not test_ids:
@@ -250,7 +348,8 @@ class TestListingFixture(Fixture):
continue
fixture = self.useFixture(TestListingFixture(test_ids,
self.template, self.listopt, self.idoption, self.ui,
- self.repository, parallel=False, parser=self._parser))
+ self.repository, parallel=False, parser=self._parser,
+ instance_source=self._instance_source))
result.extend(fixture.run_tests())
return result
@@ -286,6 +385,22 @@ class TestListingFixture(Fixture):
partition.append(test_id)
return partitions
+ def callout_concurrency(self):
+ """Callout for user defined concurrency."""
+ try:
+ concurrency_cmd = self._parser.get(
+ 'DEFAULT', 'test_run_concurrency', None)
+ except ConfigParser.NoOptionError:
+ return None
+ run_proc = self.ui.subprocess_Popen(concurrency_cmd, shell=True,
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
+ out, err = run_proc.communicate()
+ if run_proc.returncode:
+ raise ValueError(
+ "test_run_concurrency failed: exit code %d, stderr=%r" % (
+ run_proc.returncode, err))
+ return int(out.strip())
+
def local_concurrency(self):
if sys.platform == 'linux2':
concurrency = None
@@ -297,11 +412,17 @@ class TestListingFixture(Fixture):
return None
-class TestCommand(object):
+class TestCommand(Fixture):
"""Represents the test command defined in .testr.conf.
:ivar run_factory: The fixture to use to execute a command.
:ivar oldschool: Use failing.list rather than a unique file path.
+
+ TestCommand is a Fixture. Many uses of it will not require it to be setUp,
+ but calling get_run_command does require it: the fixture state is used to
+ track test environment instances, which are disposed of when cleanUp
+ happens. This is not done per-run-command, because test bisection (amongst
+ other things) uses multiple get_run_command configurations.
"""
run_factory = TestListingFixture
@@ -315,8 +436,37 @@ class TestCommand(object):
:param repository: A testrepository.repository.Repository used for
determining test times when partitioning tests.
"""
+ super(TestCommand, self).__init__()
self.ui = ui
self.repository = repository
+ self._instances = None
+ self._allocated_instances = None
+
+ def setUp(self):
+ super(TestCommand, self).setUp()
+ self._instances = set()
+ self._allocated_instances = set()
+ self.addCleanup(self._dispose_instances)
+
+ def _dispose_instances(self):
+ instances = self._instances
+ if instances is None:
+ return
+ self._instances = None
+ self._allocated_instances = None
+ try:
+ dispose_cmd = self.get_parser().get('DEFAULT', 'instance_dispose')
+ except (ValueError, ConfigParser.NoOptionError):
+ return
+ variable_regex = '\$INSTANCE_IDS'
+ dispose_cmd = re.sub(variable_regex, ' ' .join(sorted(instances)),
+ dispose_cmd)
+ self.ui.output_values([('running', dispose_cmd)])
+ run_proc = self.ui.subprocess_Popen(dispose_cmd, shell=True)
+ run_proc.communicate()
+ if run_proc.returncode:
+ raise ValueError('Disposing of instances failed, return %d' %
+ run_proc.returncode)
def get_parser(self):
"""Get a parser with the .testr.conf in it."""
@@ -333,6 +483,8 @@ class TestCommand(object):
See TestListingFixture for the definition of test_ids and test_filters.
"""
+ if self._instances is None:
+ raise TypeError('TestCommand not setUp')
parser = self.get_parser()
try:
command = parser.get('DEFAULT', 'test_command')
@@ -364,11 +516,11 @@ class TestCommand(object):
listpath = os.path.join(self.ui.here, 'failing.list')
result = self.run_factory(test_ids, cmd, listopt, idoption,
self.ui, self.repository, listpath=listpath, parser=parser,
- test_filters=test_filters)
+ test_filters=test_filters, instance_source=self)
else:
result = self.run_factory(test_ids, cmd, listopt, idoption,
self.ui, self.repository, parser=parser,
- test_filters=test_filters)
+ test_filters=test_filters, instance_source=self)
return result
def get_filter_tags(self):
@@ -381,6 +533,37 @@ class TestCommand(object):
return set()
return set([tag.strip() for tag in tags.split()])
+ def obtain_instance(self, concurrency):
+ """If possible, get one or more test run environment instance ids.
+
+ Note this is not threadsafe: calling it from multiple threads would
+ likely result in shared results.
+ """
+ while len(self._instances) < concurrency:
+ try:
+ cmd = self.get_parser().get('DEFAULT', 'instance_provision')
+ except ConfigParser.NoOptionError:
+ # Instance allocation not configured
+ return None
+ variable_regex = '\$INSTANCE_COUNT'
+ cmd = re.sub(variable_regex,
+ str(concurrency - len(self._instances)), cmd)
+ self.ui.output_values([('running', cmd)])
+ proc = self.ui.subprocess_Popen(
+ cmd, shell=True, stdout=subprocess.PIPE)
+ out, _ = proc.communicate()
+ if proc.returncode:
+ raise ValueError('Provisioning instances failed, return %d' %
+ proc.returncode)
+ new_instances = set([item.strip() for item in out.split()])
+ self._instances.update(new_instances)
+ # Cached first.
+ available_instances = self._instances - self._allocated_instances
+ # We only ask for instances when one should be available.
+ result = available_instances.pop()
+ self._allocated_instances.add(result)
+ return result
+
def make_result(self, receiver):
"""Create a TestResult that will perform any global filtering etc.
@@ -413,3 +596,7 @@ class TestCommand(object):
return TestResultFilter(
receiver, filter_success=False, filter_predicate=predicate)
return receiver
+
+ def release_instance(self, instance_id):
+ """Return instance_ids to the pool for reuse."""
+ self._allocated_instances.remove(instance_id)
diff --git a/testrepository/tests/test_testcommand.py b/testrepository/tests/test_testcommand.py
index 2b66997..d259966 100644
--- a/testrepository/tests/test_testcommand.py
+++ b/testrepository/tests/test_testcommand.py
@@ -18,7 +18,12 @@ import os.path
import optparse
import re
-from testtools.matchers import MatchesException, Raises
+from testtools.matchers import (
+ Equals,
+ MatchesAny,
+ MatchesException,
+ raises,
+ )
from testtools.testresult.doubles import ExtendedTestResult
from testrepository.commands import run
@@ -45,7 +50,7 @@ class TestTestCommand(ResourcedTestCase):
self.dirty()
ui = UI(options=options, args=args)
ui.here = self.tempdir
- return ui, TestCommand(ui, repository)
+ return ui, self.useFixture(TestCommand(ui, repository))
def get_test_ui_and_cmd2(self, options=(), args=()):
self.dirty()
@@ -75,17 +80,59 @@ class TestTestCommand(ResourcedTestCase):
command = TestCommand(ui, None)
self.assertEqual(command.ui, ui)
+ def test_TestCommand_is_a_fixture(self):
+ ui = UI()
+ ui.here = self.tempdir
+ command = TestCommand(ui, None)
+ command.setUp()
+ command.cleanUp()
+
+ def test_TestCommand_get_run_command_outside_setUp_fails(self):
+ self.dirty()
+ ui = UI()
+ ui.here = self.tempdir
+ command = TestCommand(ui, None)
+ self.set_config('[DEFAULT]\ntest_command=foo\n')
+ self.assertThat(command.get_run_command, raises(TypeError))
+ command.setUp()
+ command.cleanUp()
+ self.assertThat(command.get_run_command, raises(TypeError))
+
+ def test_TestCommand_cleanUp_disposes_instances(self):
+ ui, command = self.get_test_ui_and_cmd()
+ self.set_config(
+ '[DEFAULT]\ntest_command=foo\n'
+ 'instance_dispose=bar $INSTANCE_IDS\n')
+ command._instances.update(['baz', 'quux'])
+ command.cleanUp()
+ command.setUp()
+ self.assertEqual([
+ ('values', [('running', 'bar baz quux')]),
+ ('popen', ('bar baz quux',), {'shell': True}),
+ ('communicate',)], ui.outputs)
+
+ def test_TestCommand_cleanUp_disposes_instances_fail_raises(self):
+ ui, command = self.get_test_ui_and_cmd()
+ ui.proc_results = [1]
+ self.set_config(
+ '[DEFAULT]\ntest_command=foo\n'
+ 'instance_dispose=bar $INSTANCE_IDS\n')
+ command._instances.update(['baz', 'quux'])
+ self.assertThat(command.cleanUp,
+ raises(ValueError('Disposing of instances failed, return 1')))
+ command.setUp()
+
def test_get_run_command_no_config_file_errors(self):
ui, command = self.get_test_ui_and_cmd()
self.assertThat(command.get_run_command,
- Raises(MatchesException(ValueError('No .testr.conf config file'))))
+ raises(ValueError('No .testr.conf config file')))
def test_get_run_command_no_config_settings_errors(self):
ui, command = self.get_test_ui_and_cmd()
self.set_config('')
self.assertThat(command.get_run_command,
- Raises(MatchesException(ValueError(
- 'No test_command option present in .testr.conf'))))
+ raises(ValueError(
+ 'No test_command option present in .testr.conf')))
def test_get_run_command_returns_fixture_makes_IDFILE(self):
ui, command = self.get_test_ui_and_cmd()
@@ -170,6 +217,61 @@ class TestTestCommand(ResourcedTestCase):
expected_cmd = 'foo bar quux'
self.assertEqual(expected_cmd, fixture.cmd)
+ def test_list_tests_requests_concurrency_instances(self):
+ # testr list-tests is non-parallel, so needs 1 instance.
+ # testr run triggering list-tests will want to run parallel on all, so
+ # avoid latency by asking for whatever concurrency is up front.
+ # This covers the case for non-listing runs as well, as the code path
+ # is common.
+ self.dirty()
+ ui = UI(options= [('concurrency', 2), ('parallel', True)])
+ ui.here = self.tempdir
+ cmd = run.run(ui)
+ ui.set_command(cmd)
+ ui.proc_outputs = ['returned\ninstances\n']
+ command = self.useFixture(TestCommand(ui, None))
+ self.set_config(
+ '[DEFAULT]\ntest_command=foo $LISTOPT $IDLIST\ntest_id_list_default=whoo yea\n'
+ 'test_list_option=--list\n'
+ 'instance_provision=provision -c $INSTANCE_COUNT\n'
+ 'instance_execute=quux $INSTANCE_ID -- $COMMAND\n')
+ fixture = self.useFixture(command.get_run_command(test_ids=['1']))
+ fixture.list_tests()
+ self.assertEqual(set(['returned', 'instances']), command._instances)
+ self.assertEqual(set([]), command._allocated_instances)
+ self.assertThat(ui.outputs, MatchesAny(Equals([
+ ('values', [('running', 'provision -c 2')]),
+ ('popen', ('provision -c 2',), {'shell': True, 'stdout': -1}),
+ ('communicate',),
+ ('values', [('running', 'quux instances -- foo --list whoo yea')]),
+ ('popen',('quux instances -- foo --list whoo yea',),
+ {'shell': True, 'stdin': -1, 'stdout': -1}),
+ ('communicate',)]), Equals([
+ ('values', [('running', 'provision -c 2')]),
+ ('popen', ('provision -c 2',), {'shell': True, 'stdout': -1}),
+ ('communicate',),
+ ('values', [('running', 'quux returned -- foo --list whoo yea')]),
+ ('popen',('quux returned -- foo --list whoo yea',),
+ {'shell': True, 'stdin': -1, 'stdout': -1}),
+ ('communicate',)])))
+
+ def test_list_tests_uses_instances(self):
+ ui, command = self.get_test_ui_and_cmd()
+ self.set_config(
+ '[DEFAULT]\ntest_command=foo $LISTOPT $IDLIST\ntest_id_list_default=whoo yea\n'
+ 'test_list_option=--list\n'
+ 'instance_execute=quux $INSTANCE_ID -- $COMMAND\n')
+ fixture = self.useFixture(command.get_run_command())
+ command._instances.add('bar')
+ fixture.list_tests()
+ self.assertEqual(set(['bar']), command._instances)
+ self.assertEqual(set([]), command._allocated_instances)
+ self.assertEqual([
+ ('values', [('running', 'quux bar -- foo --list whoo yea')]),
+ ('popen', ('quux bar -- foo --list whoo yea',),
+ {'shell': True, 'stdin': -1, 'stdout': -1}), ('communicate',)],
+ ui.outputs)
+
def test_list_tests_cmd(self):
ui, command = self.get_test_ui_and_cmd()
self.set_config(
@@ -239,11 +341,132 @@ class TestTestCommand(ResourcedTestCase):
self.assertEqual(1, len(partitions[0]))
self.assertEqual(1, len(partitions[1]))
+ def test_run_tests_with_instances(self):
+ # when there are instances and no instance_execute, run_tests acts as
+ # normal.
+ ui, command = self.get_test_ui_and_cmd()
+ self.set_config(
+ '[DEFAULT]\ntest_command=foo $IDLIST\n')
+ command._instances.update(['foo', 'bar'])
+ fixture = self.useFixture(command.get_run_command())
+ procs = fixture.run_tests()
+ self.assertEqual([
+ ('values', [('running', 'foo ')]),
+ ('popen', ('foo ',), {'shell': True, 'stdin': -1, 'stdout': -1})],
+ ui.outputs)
+
+ def test_run_tests_with_existing_instances_configured(self):
+ # when there are instances present, they are pulled out for running
+ # tests.
+ ui, command = self.get_test_ui_and_cmd()
+ self.set_config(
+ '[DEFAULT]\ntest_command=foo $IDLIST\n'
+ 'instance_execute=quux $INSTANCE_ID -- $COMMAND\n')
+ command._instances.add('bar')
+ fixture = self.useFixture(command.get_run_command(test_ids=['1']))
+ procs = fixture.run_tests()
+ self.assertEqual([
+ ('values', [('running', 'quux bar -- foo 1')]),
+ ('popen', ('quux bar -- foo 1',),
+ {'shell': True, 'stdin': -1, 'stdout': -1})],
+ ui.outputs)
+ # No --parallel, so the one instance should have been allocated.
+ self.assertEqual(set(['bar']), command._instances)
+ self.assertEqual(set(['bar']), command._allocated_instances)
+ # And after the process is run, bar is returned for re-use.
+ procs[0].stdout.read()
+ procs[0].wait()
+ self.assertEqual(0, procs[0].returncode)
+ self.assertEqual(set(['bar']), command._instances)
+ self.assertEqual(set(), command._allocated_instances)
+
+ def test_run_tests_allocated_instances_skipped(self):
+ ui, command = self.get_test_ui_and_cmd()
+ self.set_config(
+ '[DEFAULT]\ntest_command=foo $IDLIST\n'
+ 'instance_execute=quux $INSTANCE_ID -- $COMMAND\n')
+ command._instances.update(['bar', 'baz'])
+ command._allocated_instances.add('baz')
+ fixture = self.useFixture(command.get_run_command(test_ids=['1']))
+ procs = fixture.run_tests()
+ self.assertEqual([
+ ('values', [('running', 'quux bar -- foo 1')]),
+ ('popen', ('quux bar -- foo 1',),
+ {'shell': True, 'stdin': -1, 'stdout': -1})],
+ ui.outputs)
+ # No --parallel, so the one instance should have been allocated.
+ self.assertEqual(set(['bar', 'baz']), command._instances)
+ self.assertEqual(set(['bar', 'baz']), command._allocated_instances)
+ # And after the process is run, bar is returned for re-use.
+ procs[0].wait()
+ procs[0].stdout.read()
+ self.assertEqual(0, procs[0].returncode)
+ self.assertEqual(set(['bar', 'baz']), command._instances)
+ self.assertEqual(set(['baz']), command._allocated_instances)
+
+ def test_run_tests_list_file_in_FILES(self):
+ ui, command = self.get_test_ui_and_cmd()
+ self.set_config(
+ '[DEFAULT]\ntest_command=foo $IDFILE\n'
+ 'instance_execute=quux $INSTANCE_ID $FILES -- $COMMAND\n')
+ command._instances.add('bar')
+ fixture = self.useFixture(command.get_run_command(test_ids=['1']))
+ list_file = fixture.list_file_name
+ procs = fixture.run_tests()
+ expected_cmd = 'quux bar %s -- foo %s' % (list_file, list_file)
+ self.assertEqual([
+ ('values', [('running', expected_cmd)]),
+ ('popen', (expected_cmd,),
+ {'shell': True, 'stdin': -1, 'stdout': -1})],
+ ui.outputs)
+ # No --parallel, so the one instance should have been allocated.
+ self.assertEqual(set(['bar']), command._instances)
+ self.assertEqual(set(['bar']), command._allocated_instances)
+ # And after the process is run, bar is returned for re-use.
+ procs[0].stdout.read()
+ self.assertEqual(0, procs[0].returncode)
+ self.assertEqual(set(['bar']), command._instances)
+ self.assertEqual(set(), command._allocated_instances)
+
def test_filter_tags_parsing(self):
ui, command = self.get_test_ui_and_cmd()
self.set_config('[DEFAULT]\nfilter_tags=foo bar\n')
self.assertEqual(set(['foo', 'bar']), command.get_filter_tags())
+ def test_callout_concurrency(self):
+ ui, command = self.get_test_ui_and_cmd()
+ ui.proc_outputs = ['4']
+ self.set_config(
+ '[DEFAULT]\ntest_run_concurrency=probe\n'
+ 'test_command=foo\n')
+ fixture = self.useFixture(command.get_run_command())
+ self.assertEqual(4, fixture.callout_concurrency())
+ self.assertEqual([
+ ('popen', ('probe',), {'shell': True, 'stdin': -1, 'stdout': -1}),
+ ('communicate',)], ui.outputs)
+
+ def test_callout_concurrency_failed(self):
+ ui, command = self.get_test_ui_and_cmd()
+ ui.proc_results = [1]
+ self.set_config(
+ '[DEFAULT]\ntest_run_concurrency=probe\n'
+ 'test_command=foo\n')
+ fixture = self.useFixture(command.get_run_command())
+ self.assertThat(lambda:fixture.callout_concurrency(), raises(
+ ValueError("test_run_concurrency failed: exit code 1, stderr=''")))
+ self.assertEqual([
+ ('popen', ('probe',), {'shell': True, 'stdin': -1, 'stdout': -1}),
+ ('communicate',)], ui.outputs)
+
+ def test_callout_concurrency_not_set(self):
+ ui, command = self.get_test_ui_and_cmd()
+ self.set_config(
+ '[DEFAULT]\n'
+ 'test_command=foo\n')
+ fixture = self.useFixture(command.get_run_command())
+ self.assertEqual(None, fixture.callout_concurrency())
+ self.assertEqual([], ui.outputs)
+
def test_make_result(self):
# Just a simple 'the dots are joined' test. More later.
ui, command = self.get_test_ui_and_cmd()