summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tests/test_concurrency.py104
1 files changed, 65 insertions, 39 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
index 2e784cc4..35f0e384 100644
--- a/tests/test_concurrency.py
+++ b/tests/test_concurrency.py
@@ -438,13 +438,29 @@ MULTI_CODE = """
"""
+@pytest.fixture(params=["fork", "spawn"], name="start_method")
+def start_method_fixture(request):
+ """Parameterized fixture to choose the start_method for multiprocessing."""
+ start_method = request.param
+ if start_method not in multiprocessing.get_all_start_methods():
+ pytest.skip(f"start_method={start_method} not supported here")
+ return start_method
+
+
@pytest.mark.skipif(not multiprocessing, reason="No multiprocessing in this Python")
@flaky(max_runs=30) # Sometimes a test fails due to inherent randomness. Try more times.
class MultiprocessingTest(CoverageTest):
"""Test support of the multiprocessing module."""
def try_multiprocessing_code(
- self, code, expected_out, the_module, nprocs, concurrency="multiprocessing", args=""
+ self,
+ code,
+ expected_out,
+ the_module,
+ nprocs,
+ start_method,
+ concurrency="multiprocessing",
+ args="",
):
"""Run code using multiprocessing, it should produce `expected_out`."""
self.make_file("multi.py", code)
@@ -454,54 +470,63 @@ class MultiprocessingTest(CoverageTest):
source = .
""")
- for start_method in ["fork", "spawn"]:
- if start_method and start_method not in multiprocessing.get_all_start_methods():
- continue
+ remove_files(".coverage", ".coverage.*")
+ cmd = "coverage run {args} multi.py {start_method}".format(
+ args=args, start_method=start_method,
+ )
+ out = self.run_command(cmd)
+ expected_cant_trace = cant_trace_msg(concurrency, the_module)
- remove_files(".coverage", ".coverage.*")
- cmd = "coverage run {args} multi.py {start_method}".format(
- args=args, start_method=start_method,
+ if expected_cant_trace is not None:
+ print(out)
+ assert out == expected_cant_trace
+ pytest.skip(f"Can't test: {expected_cant_trace}")
+ else:
+ assert out.rstrip() == expected_out
+ assert len(glob.glob(".coverage.*")) == nprocs + 1
+
+ out = self.run_command("coverage combine")
+ out_lines = out.splitlines()
+ assert len(out_lines) == nprocs + 1
+ assert all(
+ re.fullmatch(r"Combined data file \.coverage\..*\.\d+\.\d+", line)
+ for line in out_lines
)
- out = self.run_command(cmd)
- expected_cant_trace = cant_trace_msg(concurrency, the_module)
+ out = self.run_command("coverage report -m")
- if expected_cant_trace is not None:
- print(out)
- assert out == expected_cant_trace
- pytest.skip(f"Can't test: {expected_cant_trace}")
- else:
- assert out.rstrip() == expected_out
- assert len(glob.glob(".coverage.*")) == nprocs + 1
-
- out = self.run_command("coverage combine")
- out_lines = out.splitlines()
- assert len(out_lines) == nprocs + 1
- assert all(
- re.fullmatch(r"Combined data file \.coverage\..*\.\d+\.\d+", line)
- for line in out_lines
- )
- out = self.run_command("coverage report -m")
-
- last_line = self.squeezed_lines(out)[-1]
- assert re.search(r"TOTAL \d+ 0 100%", last_line)
-
- def test_multiprocessing_simple(self):
+ last_line = self.squeezed_lines(out)[-1]
+ assert re.search(r"TOTAL \d+ 0 100%", last_line)
+
+ def test_multiprocessing_simple(self, start_method):
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
total = sum(x*x if x%2 else x*x*x for x in range(upto))
expected_out = f"{nprocs} pids, total = {total}"
- self.try_multiprocessing_code(code, expected_out, threading, nprocs)
+ self.try_multiprocessing_code(
+ code,
+ expected_out,
+ threading,
+ nprocs,
+ start_method=start_method,
+ )
- def test_multiprocessing_append(self):
+ def test_multiprocessing_append(self, start_method):
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
total = sum(x*x if x%2 else x*x*x for x in range(upto))
expected_out = f"{nprocs} pids, total = {total}"
- self.try_multiprocessing_code(code, expected_out, threading, nprocs, args="--append")
+ self.try_multiprocessing_code(
+ code,
+ expected_out,
+ threading,
+ nprocs,
+ args="--append",
+ start_method=start_method,
+ )
- def test_multiprocessing_and_gevent(self):
+ def test_multiprocessing_and_gevent(self, start_method):
nprocs = 3
upto = 30
code = (
@@ -510,14 +535,15 @@ class MultiprocessingTest(CoverageTest):
total = sum(sum(range((x + 1) * 100)) for x in range(upto))
expected_out = f"{nprocs} pids, total = {total}"
self.try_multiprocessing_code(
- code, expected_out, eventlet, nprocs, concurrency="multiprocessing,eventlet"
+ code,
+ expected_out,
+ eventlet,
+ nprocs,
+ concurrency="multiprocessing,eventlet",
+ start_method=start_method,
)
- @pytest.mark.parametrize("start_method", ["fork", "spawn"])
def test_multiprocessing_with_branching(self, start_method):
- if start_method not in multiprocessing.get_all_start_methods():
- pytest.skip(f"{start_method} not supported here")
-
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)