summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2021-11-28 08:08:36 -0500
committerNed Batchelder <ned@nedbatchelder.com>2021-11-28 08:08:36 -0500
commitd17be155d85afb2bc804693deacfd2e1c5f8738e (patch)
tree4d327ff3ed4bbf39c7470da995eec6255548f3b3
parentef79b4eb91c5a88f48a630c1eed46134d98733d8 (diff)
downloadpython-coveragepy-git-d17be155d85afb2bc804693deacfd2e1c5f8738e.tar.gz
test(feat): track test times, and balance xdist workers
In the end, this seems to only speed things up by about 5%
-rw-r--r--.treerc1
-rw-r--r--Makefile2
-rw-r--r--setup.cfg10
-rw-r--r--tests/balance_xdist_plugin.py174
-rw-r--r--tests/conftest.py54
5 files changed, 190 insertions, 51 deletions
diff --git a/.treerc b/.treerc
index 74a2a395..ddea2e92 100644
--- a/.treerc
+++ b/.treerc
@@ -15,3 +15,4 @@ ignore =
_build _spell
*.egg *.egg-info
.mypy_cache
+ tmp
diff --git a/Makefile b/Makefile
index 3609b11c..53d2f051 100644
--- a/Makefile
+++ b/Makefile
@@ -62,7 +62,7 @@ metahtml: ## Produce meta-coverage HTML reports.
python igor.py combine_html
metasmoke:
- COVERAGE_NO_PYTRACER=1 ARGS="-e py39" make clean metacov metahtml
+ COVERAGE_NO_PYTRACER=1 ARGS="-e py39" make metacov metahtml
PIP_COMPILE = pip-compile --upgrade --allow-unsafe
upgrade: export CUSTOM_COMPILE_COMMAND=make upgrade
diff --git a/setup.cfg b/setup.cfg
index 07697a94..221e1230 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -14,7 +14,15 @@ filterwarnings =
ignore:the imp module is deprecated in favour of importlib:DeprecationWarning
# xfail tests that pass should fail the test suite
-xfail_strict=true
+xfail_strict = true
+
+balanced_clumps =
+ ; Because of expensive session-scoped fixture:
+ VirtualenvTest
+ ; Because of shared-file manipulations (~/tests/actual/testing):
+ CompareTest
+ ; No idea why this one fails if run on separate workers:
+ GetZipBytesTest
[pep8]
# E265 block comment should start with '# '
diff --git a/tests/balance_xdist_plugin.py b/tests/balance_xdist_plugin.py
new file mode 100644
index 00000000..6786a342
--- /dev/null
+++ b/tests/balance_xdist_plugin.py
@@ -0,0 +1,174 @@
+# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
+# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
+
+"""
+A pytest plugin to record test times and then balance xdist with them next time.
+
+The only thing in here particular to coverage.py is the use of the tmp directory
+for storing the data.
+"""
+
+import collections
+import csv
+import os
+import shutil
+import time
+from pathlib import Path
+
+import pytest
+import xdist.scheduler
+
+
+def pytest_addoption(parser):
+ """Auto-called to define ini-file settings."""
+ parser.addini(
+ "balanced_clumps",
+ type="linelist",
+ help="Test substrings to assign to the same worker",
+ )
+
+@pytest.hookimpl(tryfirst=True)
+def pytest_configure(config):
+ """Registers our pytest plugin."""
+ config.pluginmanager.register(BalanceXdistPlugin(config), "balance_xdist_plugin")
+
+
+class BalanceXdistPlugin: # pragma: debugging
+ """The plugin"""
+
+ def __init__(self, config):
+ self.config = config
+ self.running_all = (self.config.getoption("-k") == "")
+ self.times = collections.defaultdict(float)
+ self.worker = os.environ.get("PYTEST_XDIST_WORKER", "none")
+ self.tests_csv = None
+
+ def pytest_sessionstart(self, session):
+ """Called once before any tests are run, but in every worker."""
+ if not self.running_all:
+ return
+
+ tests_csv_dir = Path(session.startdir).resolve() / "tmp/tests_csv"
+ self.tests_csv = tests_csv_dir / f"{self.worker}.csv"
+
+ if self.worker == "none":
+ if tests_csv_dir.exists():
+ for csv_file in tests_csv_dir.iterdir():
+ with csv_file.open(newline="") as fcsv:
+ reader = csv.reader(fcsv)
+ for row in reader:
+ self.times[row[1]] += float(row[3])
+ shutil.rmtree(tests_csv_dir)
+
+ def write_duration_row(self, item, phase, duration):
+ """Helper to write a row to the tracked-test csv file."""
+ if self.running_all:
+ self.tests_csv.parent.mkdir(parents=True, exist_ok=True)
+ with self.tests_csv.open("a", newline="") as fcsv:
+ csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration])
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_setup(self, item):
+ """Run once for each test."""
+ start = time.time()
+ yield
+ self.write_duration_row(item, "setup", time.time() - start)
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_call(self, item):
+ """Run once for each test."""
+ start = time.time()
+ yield
+ self.write_duration_row(item, "call", time.time() - start)
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_teardown(self, item):
+ """Run once for each test."""
+ start = time.time()
+ yield
+ self.write_duration_row(item, "teardown", time.time() - start)
+
+ @pytest.mark.trylast
+ def pytest_xdist_make_scheduler(self, config, log):
+ """Create our BalancedScheduler using time data from the last run."""
+ # Assign tests to chunks
+ nchunks = 8
+ totals = [0] * nchunks
+ tests = collections.defaultdict(set)
+
+ # first put the difficult ones all in one worker
+ clumped = set()
+ clumps = config.getini("balanced_clumps")
+ for i, clump_word in enumerate(clumps):
+ clump_nodes = set(nodeid for nodeid in self.times.keys() if clump_word in nodeid)
+ i %= nchunks
+ tests[i].update(clump_nodes)
+ totals[i] += sum(self.times[nodeid] for nodeid in clump_nodes)
+ clumped.update(clump_nodes)
+
+ # Then assign the rest in descending order
+ rest = [(nodeid, t) for (nodeid, t) in self.times.items() if nodeid not in clumped]
+ rest.sort(key=lambda item: item[1], reverse=True)
+ for nodeid, t in rest:
+ lightest = min(enumerate(totals), key=lambda pair: pair[1])[0]
+ tests[lightest].add(nodeid)
+ totals[lightest] += t
+
+ test_chunks = {}
+ for chunk_id, nodeids in tests.items():
+ for nodeid in nodeids:
+ test_chunks[nodeid] = chunk_id
+
+ return BalancedScheduler(config, log, clumps, test_chunks)
+
+
+class BalancedScheduler(xdist.scheduler.LoadScopeScheduling): # pylint: disable=abstract-method # pragma: debugging
+ """A balanced-chunk test scheduler for pytest-xdist."""
+ def __init__(self, config, log, clumps, test_chunks):
+ super().__init__(config, log)
+ self.clumps = clumps
+ self.test_chunks = test_chunks
+
+ def _split_scope(self, nodeid):
+ """Assign a chunk id to a test node."""
+ # If we have a chunk assignment for this node, return it.
+ scope = self.test_chunks.get(nodeid)
+ if scope is not None:
+ return scope
+
+ # If this is a node that should be clumped, clump it.
+ for i, clump_word in enumerate(self.clumps):
+ if clump_word in nodeid:
+ return f"clump{i}"
+
+ # Otherwise every node is a separate chunk.
+ return nodeid
+
+
+# Run this with:
+# python -c "from tests.balance_xdist_plugin import show_worker_times as f; f()"
+def show_worker_times(): # pragma: debugging
+ """Ad-hoc utility to show data from the last tracked-test run."""
+ times = collections.defaultdict(float)
+ tests = collections.defaultdict(int)
+ tests_csv_dir = Path("tmp/tests_csv")
+
+ for csv_file in tests_csv_dir.iterdir():
+ with csv_file.open(newline="") as fcsv:
+ reader = csv.reader(fcsv)
+ for row in reader:
+ worker = row[0]
+ duration = float(row[3])
+ times[worker] += duration
+ if row[2] == "call":
+ tests[worker] += 1
+
+ for worker in sorted(tests.keys()):
+ print(f"{worker}: {tests[worker]:3d} {times[worker]:.2f}")
+
+ total = sum(times.values())
+ avg = total / len(times)
+ print(f"total: {total:.2f}, avg: {avg:.2f}")
+ lo = min(times.values())
+ hi = max(times.values())
+ print(f"lo = {lo:.2f}; hi = {hi:.2f}; gap = {hi - lo:.2f}; long delta = {hi - avg:.2f}")
diff --git a/tests/conftest.py b/tests/conftest.py
index c8ca21f2..23d6b213 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -7,7 +7,6 @@ Pytest auto configuration.
This module is run automatically by pytest, to define and enable fixtures.
"""
-import itertools
import os
import sys
import sysconfig
@@ -20,7 +19,6 @@ from coverage import env
from coverage.exceptions import _StopEverything
from coverage.files import set_relative_directory
-
# Pytest will rewrite assertions in test modules, but not elsewhere.
# This tells pytest to also rewrite assertions in coveragetest.py.
pytest.register_assert_rewrite("tests.coveragetest")
@@ -29,6 +27,9 @@ pytest.register_assert_rewrite("tests.helpers")
# Pytest can take additional options:
# $set_env.py: PYTEST_ADDOPTS - Extra arguments to pytest.
+pytest_plugins = "tests.balance_xdist_plugin"
+
+
@pytest.fixture(autouse=True)
def set_warnings():
"""Configure warnings to show while running tests."""
@@ -92,17 +93,11 @@ def reset_filesdotpy_globals():
WORKER = os.environ.get("PYTEST_XDIST_WORKER", "none")
-TRACK_TESTS = False
-TEST_TXT = "/tmp/tests.txt"
-
def pytest_sessionstart():
"""Run once at the start of the test session."""
- if TRACK_TESTS: # pragma: debugging
- with open(TEST_TXT, "w") as testtxt:
- print("Starting:", file=testtxt)
-
- # Create a .pth file for measuring subprocess coverage.
+ # Only in the main process...
if WORKER == "none":
+ # Create a .pth file for measuring subprocess coverage.
pth_dir = find_writable_pth_directory()
assert pth_dir
(pth_dir / "subcover.pth").write_text("import coverage; coverage.process_startup()\n")
@@ -118,53 +113,14 @@ def pytest_sessionfinish():
if pth_file.exists():
pth_file.unlink()
-
-def write_test_name(prefix):
- """For tracking where and when tests are running."""
- if TRACK_TESTS: # pragma: debugging
- with open(TEST_TXT, "a") as testtxt:
- test = os.environ.get("PYTEST_CURRENT_TEST", "unknown")
- print(f"{prefix} {WORKER}: {test}", file=testtxt, flush=True)
-
-
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(item):
"""Run once for each test."""
- write_test_name(">")
-
# Convert _StopEverything into skipped tests.
outcome = yield
if outcome.excinfo and issubclass(outcome.excinfo[0], _StopEverything): # pragma: only jython
pytest.skip(f"Skipping {item.nodeid} for _StopEverything: {outcome.excinfo[1]}")
- write_test_name("<")
-
-
-def interleaved(firsts, rest, n):
- """Interleave the firsts among the rest so that they occur each n items."""
- num = sum(len(l) for l in firsts) + len(rest)
- lists = firsts + [rest] * (n - len(firsts))
- listcycle = itertools.cycle(lists)
-
- while num:
- alist = next(listcycle) # pylint: disable=stop-iteration-return
- if alist:
- yield alist.pop()
- num -= 1
-
-
-def pytest_collection_modifyitems(items):
- """Re-order the collected tests."""
- # Trick the xdist scheduler to put all of the VirtualenvTest tests on the
- # same worker by sprinkling them into the collected items every Nth place.
- virt = set(i for i in items if "VirtualenvTest" in i.nodeid)
- rest = [i for i in items if i not in virt]
- nworkers = int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", 4))
- items[:] = interleaved([virt], rest, nworkers)
- if TRACK_TESTS: # pragma: debugging
- with open("/tmp/items.txt", "w") as f:
- print("\n".join(i.nodeid for i in items), file=f)
-
def possible_pth_dirs():
"""Produce a sequence of directories for trying to write .pth files."""