diff options
Diffstat (limited to 'tests/test_concurrency.py')
-rw-r--r-- | tests/test_concurrency.py | 92 |
1 files changed, 65 insertions, 27 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index 4bbc3b9..031d7fd 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -159,6 +159,31 @@ SIMPLE = """ """ +def cant_trace_msg(concurrency, the_module): + """What might coverage.py say about a concurrency setting and imported module?""" + # In the concurrency choices, "multiprocessing" doesn't count, so remove it. + if "multiprocessing" in concurrency: + parts = concurrency.split(",") + parts.remove("multiprocessing") + concurrency = ",".join(parts) + + if the_module is None: + # We don't even have the underlying module installed, we expect + # coverage to alert us to this fact. + expected_out = ( + "Couldn't trace with concurrency=%s, " + "the module isn't installed.\n" % concurrency + ) + elif env.C_TRACER or concurrency == "thread" or concurrency == "": + expected_out = None + else: + expected_out = ( + "Can't support concurrency=%s with PyTracer, " + "only threads are supported\n" % concurrency + ) + return expected_out + + class ConcurrencyTest(CoverageTest): """Tests of the concurrency support in coverage.py.""" @@ -179,15 +204,11 @@ class ConcurrencyTest(CoverageTest): cmd = "coverage run --concurrency=%s try_it.py" % concurrency out = self.run_command(cmd) - if not the_module: - # We don't even have the underlying module installed, we expect - # coverage to alert us to this fact. - expected_out = ( - "Couldn't trace with concurrency=%s, " - "the module isn't installed.\n" % concurrency - ) - self.assertEqual(out, expected_out) - elif env.C_TRACER or concurrency == "thread": + expected_cant_trace = cant_trace_msg(concurrency, the_module) + + if expected_cant_trace is not None: + self.assertEqual(out, expected_cant_trace) + else: # We can fully measure the code if we are using the C tracer, which # can support all the concurrency, or if we are using threads. if expected_out is None: @@ -208,12 +229,6 @@ class ConcurrencyTest(CoverageTest): lines = line_count(code) self.assertEqual(data.line_counts()['try_it.py'], lines) - else: - expected_out = ( - "Can't support concurrency=%s with PyTracer, " - "only threads are supported\n" % concurrency - ) - self.assertEqual(out, expected_out) def test_threads(self): code = (THREAD + SUM_THEM_Q + PRINT_SUM_THEM).format(QLIMIT=self.QLIMIT) @@ -290,6 +305,11 @@ SQUARE_OR_CUBE_WORK = """ return y """ +SUM_THEM_WORK = """ + def work(x): + return sum_them((x+1)*100) + """ + MULTI_CODE = """ # Above this will be a defintion of work(). import multiprocessing @@ -325,9 +345,15 @@ MULTI_CODE = """ class MultiprocessingTest(CoverageTest): """Test support of the multiprocessing module.""" - def try_multiprocessing_code(self, code, expected_out): + def try_multiprocessing_code( + self, code, expected_out, the_module, concurrency="multiprocessing" + ): """Run code using multiprocessing, it should produce `expected_out`.""" self.make_file("multi.py", code) + self.make_file(".coveragerc", """\ + [run] + concurrency = %s + """ % concurrency) if env.PYVERSION >= (3, 4): start_methods = ['fork', 'spawn'] @@ -338,23 +364,35 @@ class MultiprocessingTest(CoverageTest): if start_method and start_method not in multiprocessing.get_all_start_methods(): continue - out = self.run_command( - "coverage run --concurrency=multiprocessing multi.py %s" % start_method - ) - self.assertEqual(out.rstrip(), expected_out) + out = self.run_command("coverage run multi.py %s" % (start_method,)) + expected_cant_trace = cant_trace_msg(concurrency, the_module) - self.run_command("coverage combine") - out = self.run_command("coverage report -m") + if expected_cant_trace is not None: + self.assertEqual(out, expected_cant_trace) + else: + self.assertEqual(out.rstrip(), expected_out) - last_line = self.squeezed_lines(out)[-1] - expected_report = "multi.py {lines} 0 100%".format(lines=line_count(code)) - self.assertEqual(last_line, expected_report) + self.run_command("coverage combine") + out = self.run_command("coverage report -m") + + last_line = self.squeezed_lines(out)[-1] + expected_report = "multi.py {lines} 0 100%".format(lines=line_count(code)) + self.assertEqual(last_line, expected_report) def test_multiprocessing(self): nprocs = 3 upto = 30 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) total = sum(x*x if x%2 else x*x*x for x in range(upto)) - expected = "{nprocs} pids, total = {total}".format(nprocs=nprocs, total=total) + expected_out = "{nprocs} pids, total = {total}".format(nprocs=nprocs, total=total) + self.try_multiprocessing_code(code, expected_out, threading) - self.try_multiprocessing_code(code, expected) + def test_multiprocessing_and_gevent(self): + nprocs = 3 + upto = 30 + code = (SUM_THEM_WORK + EVENTLET + SUM_THEM_Q + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) + total = sum(sum(range((x + 1) * 100)) for x in range(upto)) + expected_out = "{nprocs} pids, total = {total}".format(nprocs=nprocs, total=total) + self.try_multiprocessing_code( + code, expected_out, eventlet, concurrency="multiprocessing,eventlet" + ) |