summaryrefslogtreecommitdiff
path: root/tests/test_concurrency.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_concurrency.py')
-rw-r--r--tests/test_concurrency.py92
1 files changed, 65 insertions, 27 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
index 4bbc3b9..031d7fd 100644
--- a/tests/test_concurrency.py
+++ b/tests/test_concurrency.py
@@ -159,6 +159,31 @@ SIMPLE = """
"""
+def cant_trace_msg(concurrency, the_module):
+ """What might coverage.py say about a concurrency setting and imported module?"""
+ # In the concurrency choices, "multiprocessing" doesn't count, so remove it.
+ if "multiprocessing" in concurrency:
+ parts = concurrency.split(",")
+ parts.remove("multiprocessing")
+ concurrency = ",".join(parts)
+
+ if the_module is None:
+ # We don't even have the underlying module installed, we expect
+ # coverage to alert us to this fact.
+ expected_out = (
+ "Couldn't trace with concurrency=%s, "
+ "the module isn't installed.\n" % concurrency
+ )
+ elif env.C_TRACER or concurrency == "thread" or concurrency == "":
+ expected_out = None
+ else:
+ expected_out = (
+ "Can't support concurrency=%s with PyTracer, "
+ "only threads are supported\n" % concurrency
+ )
+ return expected_out
+
+
class ConcurrencyTest(CoverageTest):
"""Tests of the concurrency support in coverage.py."""
@@ -179,15 +204,11 @@ class ConcurrencyTest(CoverageTest):
cmd = "coverage run --concurrency=%s try_it.py" % concurrency
out = self.run_command(cmd)
- if not the_module:
- # We don't even have the underlying module installed, we expect
- # coverage to alert us to this fact.
- expected_out = (
- "Couldn't trace with concurrency=%s, "
- "the module isn't installed.\n" % concurrency
- )
- self.assertEqual(out, expected_out)
- elif env.C_TRACER or concurrency == "thread":
+ expected_cant_trace = cant_trace_msg(concurrency, the_module)
+
+ if expected_cant_trace is not None:
+ self.assertEqual(out, expected_cant_trace)
+ else:
# We can fully measure the code if we are using the C tracer, which
# can support all the concurrency, or if we are using threads.
if expected_out is None:
@@ -208,12 +229,6 @@ class ConcurrencyTest(CoverageTest):
lines = line_count(code)
self.assertEqual(data.line_counts()['try_it.py'], lines)
- else:
- expected_out = (
- "Can't support concurrency=%s with PyTracer, "
- "only threads are supported\n" % concurrency
- )
- self.assertEqual(out, expected_out)
def test_threads(self):
code = (THREAD + SUM_THEM_Q + PRINT_SUM_THEM).format(QLIMIT=self.QLIMIT)
@@ -290,6 +305,11 @@ SQUARE_OR_CUBE_WORK = """
return y
"""
+SUM_THEM_WORK = """
+ def work(x):
+ return sum_them((x+1)*100)
+ """
+
MULTI_CODE = """
# Above this will be a defintion of work().
import multiprocessing
@@ -325,9 +345,15 @@ MULTI_CODE = """
class MultiprocessingTest(CoverageTest):
"""Test support of the multiprocessing module."""
- def try_multiprocessing_code(self, code, expected_out):
+ def try_multiprocessing_code(
+ self, code, expected_out, the_module, concurrency="multiprocessing"
+ ):
"""Run code using multiprocessing, it should produce `expected_out`."""
self.make_file("multi.py", code)
+ self.make_file(".coveragerc", """\
+ [run]
+ concurrency = %s
+ """ % concurrency)
if env.PYVERSION >= (3, 4):
start_methods = ['fork', 'spawn']
@@ -338,23 +364,35 @@ class MultiprocessingTest(CoverageTest):
if start_method and start_method not in multiprocessing.get_all_start_methods():
continue
- out = self.run_command(
- "coverage run --concurrency=multiprocessing multi.py %s" % start_method
- )
- self.assertEqual(out.rstrip(), expected_out)
+ out = self.run_command("coverage run multi.py %s" % (start_method,))
+ expected_cant_trace = cant_trace_msg(concurrency, the_module)
- self.run_command("coverage combine")
- out = self.run_command("coverage report -m")
+ if expected_cant_trace is not None:
+ self.assertEqual(out, expected_cant_trace)
+ else:
+ self.assertEqual(out.rstrip(), expected_out)
- last_line = self.squeezed_lines(out)[-1]
- expected_report = "multi.py {lines} 0 100%".format(lines=line_count(code))
- self.assertEqual(last_line, expected_report)
+ self.run_command("coverage combine")
+ out = self.run_command("coverage report -m")
+
+ last_line = self.squeezed_lines(out)[-1]
+ expected_report = "multi.py {lines} 0 100%".format(lines=line_count(code))
+ self.assertEqual(last_line, expected_report)
def test_multiprocessing(self):
nprocs = 3
upto = 30
code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
total = sum(x*x if x%2 else x*x*x for x in range(upto))
- expected = "{nprocs} pids, total = {total}".format(nprocs=nprocs, total=total)
+ expected_out = "{nprocs} pids, total = {total}".format(nprocs=nprocs, total=total)
+ self.try_multiprocessing_code(code, expected_out, threading)
- self.try_multiprocessing_code(code, expected)
+ def test_multiprocessing_and_gevent(self):
+ nprocs = 3
+ upto = 30
+ code = (SUM_THEM_WORK + EVENTLET + SUM_THEM_Q + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
+ total = sum(sum(range((x + 1) * 100)) for x in range(upto))
+ expected_out = "{nprocs} pids, total = {total}".format(nprocs=nprocs, total=total)
+ self.try_multiprocessing_code(
+ code, expected_out, eventlet, concurrency="multiprocessing,eventlet"
+ )