summaryrefslogtreecommitdiff
path: root/tests/balance_xdist_plugin.py
blob: aec7dc21c714a1297c6a75b311ed473eeb789b06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt

"""
A pytest plugin to record test times and then use those times to divide tests
into evenly balanced workloads for each xdist worker.

Two things are hard-coded here that shouldn't be:

- The timing data is written to the tmp directory, but should use the pytest
  cache (https://docs.pytest.org/en/latest/how-to/cache.html).

- The number of xdist workers is hard-coded to 8 because I couldn't figure out
  how to find the number.  Would it be crazy to read the -n argument directly?

You can force some tests to run on the same worker by setting the
`balanced_clumps` setting in your pytest config file.  Each line is a substring
of a test name.  All tests with that substring (like -k) will run on the
worker:

    balanced_clumps =
        LongRunningFixture
        some_other_test_substring

"""

import collections
import csv
import os
import shutil
import time

from pathlib import Path

import pytest
import xdist.scheduler


def pytest_addoption(parser):
    """Auto-called to define ini-file settings."""
    parser.addini(
        "balanced_clumps",
        type="linelist",
        help="Test substrings to assign to the same worker",
    )

@pytest.hookimpl(tryfirst=True)
def pytest_configure(config):
    """Registers our pytest plugin."""
    config.pluginmanager.register(BalanceXdistPlugin(config), "balance_xdist_plugin")


class BalanceXdistPlugin:       # pragma: debugging
    """The plugin"""

    def __init__(self, config):
        self.config = config
        self.running_all = (self.config.getoption("-k") == "")
        self.times = collections.defaultdict(float)
        self.worker = os.environ.get("PYTEST_XDIST_WORKER", "none")
        self.tests_csv = None

    def pytest_sessionstart(self, session):
        """Called once before any tests are run, but in every worker."""
        if not self.running_all:
            return

        tests_csv_dir = session.startpath.resolve() / "tmp/tests_csv"
        self.tests_csv = tests_csv_dir / f"{self.worker}.csv"

        if self.worker == "none":
            if tests_csv_dir.exists():
                for csv_file in tests_csv_dir.iterdir():
                    with csv_file.open(newline="") as fcsv:
                        reader = csv.reader(fcsv)
                        for row in reader:
                            self.times[row[1]] += float(row[3])
                shutil.rmtree(tests_csv_dir)

    def write_duration_row(self, item, phase, duration):
        """Helper to write a row to the tracked-test csv file."""
        if self.running_all:
            self.tests_csv.parent.mkdir(parents=True, exist_ok=True)
            with self.tests_csv.open("a", newline="") as fcsv:
                csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration])

    @pytest.hookimpl(hookwrapper=True)
    def pytest_runtest_setup(self, item):
        """Run once for each test."""
        start = time.time()
        yield
        self.write_duration_row(item, "setup", time.time() - start)

    @pytest.hookimpl(hookwrapper=True)
    def pytest_runtest_call(self, item):
        """Run once for each test."""
        start = time.time()
        yield
        self.write_duration_row(item, "call", time.time() - start)

    @pytest.hookimpl(hookwrapper=True)
    def pytest_runtest_teardown(self, item):
        """Run once for each test."""
        start = time.time()
        yield
        self.write_duration_row(item, "teardown", time.time() - start)

    @pytest.hookimpl(trylast=True)
    def pytest_xdist_make_scheduler(self, config, log):
        """Create our BalancedScheduler using time data from the last run."""
        # Assign tests to chunks
        nchunks = 8
        totals = [0] * nchunks
        tests = collections.defaultdict(set)

        # first put the difficult ones all in one worker
        clumped = set()
        clumps = config.getini("balanced_clumps")
        for i, clump_word in enumerate(clumps):
            clump_nodes = set(nodeid for nodeid in self.times.keys() if clump_word in nodeid)
            i %= nchunks
            tests[i].update(clump_nodes)
            totals[i] += sum(self.times[nodeid] for nodeid in clump_nodes)
            clumped.update(clump_nodes)

        # Then assign the rest in descending order
        rest = [(nodeid, t) for (nodeid, t) in self.times.items() if nodeid not in clumped]
        rest.sort(key=lambda item: item[1], reverse=True)
        for nodeid, t in rest:
            lightest = min(enumerate(totals), key=lambda pair: pair[1])[0]
            tests[lightest].add(nodeid)
            totals[lightest] += t

        test_chunks = {}
        for chunk_id, nodeids in tests.items():
            for nodeid in nodeids:
                test_chunks[nodeid] = chunk_id

        return BalancedScheduler(config, log, clumps, test_chunks)


class BalancedScheduler(xdist.scheduler.LoadScopeScheduling):   # pylint: disable=abstract-method # pragma: debugging
    """A balanced-chunk test scheduler for pytest-xdist."""
    def __init__(self, config, log, clumps, test_chunks):
        super().__init__(config, log)
        self.clumps = clumps
        self.test_chunks = test_chunks

    def _split_scope(self, nodeid):
        """Assign a chunk id to a test node."""
        # If we have a chunk assignment for this node, return it.
        scope = self.test_chunks.get(nodeid)
        if scope is not None:
            return scope

        # If this is a node that should be clumped, clump it.
        for i, clump_word in enumerate(self.clumps):
            if clump_word in nodeid:
                return f"clump{i}"

        # Otherwise every node is a separate chunk.
        return nodeid


# Run this with:
#   python -c "from tests.balance_xdist_plugin import show_worker_times as f; f()"
def show_worker_times():                            # pragma: debugging
    """Ad-hoc utility to show data from the last tracked-test run."""
    times = collections.defaultdict(float)
    tests = collections.defaultdict(int)
    tests_csv_dir = Path("tmp/tests_csv")

    for csv_file in tests_csv_dir.iterdir():
        with csv_file.open(newline="") as fcsv:
            reader = csv.reader(fcsv)
            for row in reader:
                worker = row[0]
                duration = float(row[3])
                times[worker] += duration
                if row[2] == "call":
                    tests[worker] += 1

    for worker in sorted(tests.keys()):
        print(f"{worker}: {tests[worker]:3d} {times[worker]:.2f}")

    total = sum(times.values())
    avg = total / len(times)
    print(f"total: {total:.2f}, avg: {avg:.2f}")
    lo = min(times.values())
    hi = max(times.values())
    print(f"lo = {lo:.2f}; hi = {hi:.2f}; gap = {hi - lo:.2f}; long delta = {hi - avg:.2f}")