diff options
-rw-r--r-- | tests/test_concurrency.py | 104 |
1 files changed, 65 insertions, 39 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index 2e784cc4..35f0e384 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -438,13 +438,29 @@ MULTI_CODE = """ """ +@pytest.fixture(params=["fork", "spawn"], name="start_method") +def start_method_fixture(request): + """Parameterized fixture to choose the start_method for multiprocessing.""" + start_method = request.param + if start_method not in multiprocessing.get_all_start_methods(): + pytest.skip(f"start_method={start_method} not supported here") + return start_method + + @pytest.mark.skipif(not multiprocessing, reason="No multiprocessing in this Python") @flaky(max_runs=30) # Sometimes a test fails due to inherent randomness. Try more times. class MultiprocessingTest(CoverageTest): """Test support of the multiprocessing module.""" def try_multiprocessing_code( - self, code, expected_out, the_module, nprocs, concurrency="multiprocessing", args="" + self, + code, + expected_out, + the_module, + nprocs, + start_method, + concurrency="multiprocessing", + args="", ): """Run code using multiprocessing, it should produce `expected_out`.""" self.make_file("multi.py", code) @@ -454,54 +470,63 @@ class MultiprocessingTest(CoverageTest): source = . """) - for start_method in ["fork", "spawn"]: - if start_method and start_method not in multiprocessing.get_all_start_methods(): - continue + remove_files(".coverage", ".coverage.*") + cmd = "coverage run {args} multi.py {start_method}".format( + args=args, start_method=start_method, + ) + out = self.run_command(cmd) + expected_cant_trace = cant_trace_msg(concurrency, the_module) - remove_files(".coverage", ".coverage.*") - cmd = "coverage run {args} multi.py {start_method}".format( - args=args, start_method=start_method, + if expected_cant_trace is not None: + print(out) + assert out == expected_cant_trace + pytest.skip(f"Can't test: {expected_cant_trace}") + else: + assert out.rstrip() == expected_out + assert len(glob.glob(".coverage.*")) == nprocs + 1 + + out = self.run_command("coverage combine") + out_lines = out.splitlines() + assert len(out_lines) == nprocs + 1 + assert all( + re.fullmatch(r"Combined data file \.coverage\..*\.\d+\.\d+", line) + for line in out_lines ) - out = self.run_command(cmd) - expected_cant_trace = cant_trace_msg(concurrency, the_module) + out = self.run_command("coverage report -m") - if expected_cant_trace is not None: - print(out) - assert out == expected_cant_trace - pytest.skip(f"Can't test: {expected_cant_trace}") - else: - assert out.rstrip() == expected_out - assert len(glob.glob(".coverage.*")) == nprocs + 1 - - out = self.run_command("coverage combine") - out_lines = out.splitlines() - assert len(out_lines) == nprocs + 1 - assert all( - re.fullmatch(r"Combined data file \.coverage\..*\.\d+\.\d+", line) - for line in out_lines - ) - out = self.run_command("coverage report -m") - - last_line = self.squeezed_lines(out)[-1] - assert re.search(r"TOTAL \d+ 0 100%", last_line) - - def test_multiprocessing_simple(self): + last_line = self.squeezed_lines(out)[-1] + assert re.search(r"TOTAL \d+ 0 100%", last_line) + + def test_multiprocessing_simple(self, start_method): nprocs = 3 upto = 30 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) total = sum(x*x if x%2 else x*x*x for x in range(upto)) expected_out = f"{nprocs} pids, total = {total}" - self.try_multiprocessing_code(code, expected_out, threading, nprocs) + self.try_multiprocessing_code( + code, + expected_out, + threading, + nprocs, + start_method=start_method, + ) - def test_multiprocessing_append(self): + def test_multiprocessing_append(self, start_method): nprocs = 3 upto = 30 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) total = sum(x*x if x%2 else x*x*x for x in range(upto)) expected_out = f"{nprocs} pids, total = {total}" - self.try_multiprocessing_code(code, expected_out, threading, nprocs, args="--append") + self.try_multiprocessing_code( + code, + expected_out, + threading, + nprocs, + args="--append", + start_method=start_method, + ) - def test_multiprocessing_and_gevent(self): + def test_multiprocessing_and_gevent(self, start_method): nprocs = 3 upto = 30 code = ( @@ -510,14 +535,15 @@ class MultiprocessingTest(CoverageTest): total = sum(sum(range((x + 1) * 100)) for x in range(upto)) expected_out = f"{nprocs} pids, total = {total}" self.try_multiprocessing_code( - code, expected_out, eventlet, nprocs, concurrency="multiprocessing,eventlet" + code, + expected_out, + eventlet, + nprocs, + concurrency="multiprocessing,eventlet", + start_method=start_method, ) - @pytest.mark.parametrize("start_method", ["fork", "spawn"]) def test_multiprocessing_with_branching(self, start_method): - if start_method not in multiprocessing.get_all_start_methods(): - pytest.skip(f"{start_method} not supported here") - nprocs = 3 upto = 30 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) |