diff options
author | Ned Batchelder <ned@nedbatchelder.com> | 2016-07-03 08:34:10 -0400 |
---|---|---|
committer | Ned Batchelder <ned@nedbatchelder.com> | 2016-07-03 08:34:10 -0400 |
commit | ffe481b845ef14289f6920cd2a5b7928e6c78d6a (patch) | |
tree | ddf50d916615b27aa2d83b4cb69fe17fc9a3ae62 | |
parent | f475fb9891808fcd8926dc19599da8e03473af38 (diff) | |
download | python-coveragepy-ffe481b845ef14289f6920cd2a5b7928e6c78d6a.tar.gz |
Refactor concurrency tests to make the pieces more composable
-rw-r--r-- | tests/test_concurrency.py | 315 |
1 files changed, 196 insertions, 119 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index 04eb985..4bbc3b9 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -28,89 +28,141 @@ except ImportError: import greenlet +def measurable_line(l): + """Is this a line of code coverage will measure? + + Not blank, not a comment, and not "else" + """ + l = l.strip() + if not l: + return False + if l.startswith('#'): + return False + if l.startswith('else:'): + return False + return True + + def line_count(s): - """How many non-blank non-comment lines are in `s`?""" - def code_line(l): - """Is this a code line? Not blank, and not a full-line comment.""" - return l.strip() and not l.strip().startswith('#') - return sum(1 for l in s.splitlines() if code_line(l)) + """How many measurable lines are in `s`?""" + return len(list(filter(measurable_line, s.splitlines()))) -class ConcurrencyTest(CoverageTest): - """Tests of the concurrency support in coverage.py.""" +def print_simple_annotation(code, linenos): + """Print the lines in `code` with X for each line number in `linenos`.""" + for lineno, line in enumerate(code.splitlines(), start=1): + print(" {0} {1}".format("X" if lineno in linenos else " ", line)) + + +class LineCountTest(CoverageTest): + """Test the helpers here.""" + + run_in_temp_dir = False + + def test_line_count(self): + CODE = """ + # Hey there! + x = 1 + if x: + print("hello") + else: + print("bye") + + print("done") + """ + + self.assertEqual(line_count(CODE), 5) + + +# The code common to all the concurrency models. +SUM_THEM_Q = """ + # Above this will be imports defining queue and threading. + + class Producer(threading.Thread): + def __init__(self, limit, q): + threading.Thread.__init__(self) + self.limit = limit + self.q = q - LIMIT = 1000 - - # The code common to all the concurrency models. - COMMON = """ - class Producer(threading.Thread): - def __init__(self, q): - threading.Thread.__init__(self) - self.q = q - - def run(self): - for i in range({LIMIT}): - self.q.put(i) - self.q.put(None) - - class Consumer(threading.Thread): - def __init__(self, q): - threading.Thread.__init__(self) - self.q = q - - def run(self): - sum = 0 - while True: - i = self.q.get() - if i is None: - print(sum) - break - sum += i + def run(self): + for i in range(self.limit): + self.q.put(i) + self.q.put(None) + class Consumer(threading.Thread): + def __init__(self, q, qresult): + threading.Thread.__init__(self) + self.q = q + self.qresult = qresult + + def run(self): + sum = 0 + while True: + i = self.q.get() + if i is None: + break + sum += i + self.qresult.put(sum) + + def sum_them(limit): q = queue.Queue() - c = Consumer(q) - p = Producer(q) + qresult = queue.Queue() + c = Consumer(q, qresult) + p = Producer(limit, q) c.start() p.start() p.join() c.join() - """.format(LIMIT=LIMIT) - - # Import the things to use threads. - if env.PY2: - THREAD = """\ - import threading - import Queue as queue - """ + COMMON - else: - THREAD = """\ - import threading - import queue - """ + COMMON - - # Import the things to use eventlet. - EVENTLET = """\ - import eventlet.green.threading as threading - import eventlet.queue as queue - """ + COMMON - - # Import the things to use gevent. - GEVENT = """\ - from gevent import monkey - monkey.patch_thread() - import threading - import gevent.queue as queue - """ + COMMON - - # Uncomplicated code that doesn't use any of the concurrency stuff, to test - # the simple case under each of the regimes. - SIMPLE = """\ - total = 0 - for i in range({LIMIT}): - total += i - print(total) - """.format(LIMIT=LIMIT) + return qresult.get() + + # Below this will be something using sum_them. + """ + +PRINT_SUM_THEM = """ + print(sum_them({QLIMIT})) + """ + +# Import the things to use threads. +if env.PY2: + THREAD = """ + import threading + import Queue as queue + """ +else: + THREAD = """ + import threading + import queue + """ + +# Import the things to use eventlet. +EVENTLET = """ + import eventlet.green.threading as threading + import eventlet.queue as queue + """ + +# Import the things to use gevent. +GEVENT = """ + from gevent import monkey + monkey.patch_thread() + import threading + import gevent.queue as queue + """ + +# Uncomplicated code that doesn't use any of the concurrency stuff, to test +# the simple case under each of the regimes. +SIMPLE = """ + total = 0 + for i in range({QLIMIT}): + total += i + print(total) + """ + + +class ConcurrencyTest(CoverageTest): + """Tests of the concurrency support in coverage.py.""" + + QLIMIT = 1000 def try_some_code(self, code, concurrency, the_module, expected_out=None): """Run some concurrency testing code and see that it was all covered. @@ -139,7 +191,8 @@ class ConcurrencyTest(CoverageTest): # We can fully measure the code if we are using the C tracer, which # can support all the concurrency, or if we are using threads. if expected_out is None: - expected_out = "%d\n" % (sum(range(self.LIMIT))) + expected_out = "%d\n" % (sum(range(self.QLIMIT))) + print(code) self.assertEqual(out, expected_out) # Read the coverage file and see that try_it.py has all its lines @@ -163,22 +216,28 @@ class ConcurrencyTest(CoverageTest): self.assertEqual(out, expected_out) def test_threads(self): - self.try_some_code(self.THREAD, "thread", threading) + code = (THREAD + SUM_THEM_Q + PRINT_SUM_THEM).format(QLIMIT=self.QLIMIT) + self.try_some_code(code, "thread", threading) def test_threads_simple_code(self): - self.try_some_code(self.SIMPLE, "thread", threading) + code = SIMPLE.format(QLIMIT=self.QLIMIT) + self.try_some_code(code, "thread", threading) def test_eventlet(self): - self.try_some_code(self.EVENTLET, "eventlet", eventlet) + code = (EVENTLET + SUM_THEM_Q + PRINT_SUM_THEM).format(QLIMIT=self.QLIMIT) + self.try_some_code(code, "eventlet", eventlet) def test_eventlet_simple_code(self): - self.try_some_code(self.SIMPLE, "eventlet", eventlet) + code = SIMPLE.format(QLIMIT=self.QLIMIT) + self.try_some_code(code, "eventlet", eventlet) def test_gevent(self): - self.try_some_code(self.GEVENT, "gevent", gevent) + code = (GEVENT + SUM_THEM_Q + PRINT_SUM_THEM).format(QLIMIT=self.QLIMIT) + self.try_some_code(code, "gevent", gevent) def test_gevent_simple_code(self): - self.try_some_code(self.SIMPLE, "gevent", gevent) + code = SIMPLE.format(QLIMIT=self.QLIMIT) + self.try_some_code(code, "gevent", gevent) def test_greenlet(self): GREENLET = """\ @@ -199,7 +258,8 @@ class ConcurrencyTest(CoverageTest): self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") def test_greenlet_simple_code(self): - self.try_some_code(self.SIMPLE, "greenlet", greenlet) + code = SIMPLE.format(QLIMIT=self.QLIMIT) + self.try_some_code(code, "greenlet", greenlet) def test_bug_330(self): BUG_330 = """\ @@ -220,41 +280,54 @@ class ConcurrencyTest(CoverageTest): self.try_some_code(BUG_330, "eventlet", eventlet, "0\n") +SQUARE_OR_CUBE_WORK = """ + def work(x): + # Use different lines in different subprocesses. + if x % 2: + y = x*x + else: + y = x*x*x + return y + """ + +MULTI_CODE = """ + # Above this will be a defintion of work(). + import multiprocessing + import os + import time + import sys + + def process_worker_main(args): + # Need to pause, or the tasks go too quick, and some processes + # in the pool don't get any work, and then don't record data. + time.sleep(0.02) + ret = work(*args) + return os.getpid(), ret + + if __name__ == "__main__": + # This if is on a single line so we can get 100% coverage + # even if we have no arguments. + if len(sys.argv) > 1: multiprocessing.set_start_method(sys.argv[1]) + pool = multiprocessing.Pool({NPROCS}) + inputs = [(x,) for x in range({UPTO})] + outputs = pool.imap_unordered(process_worker_main, inputs) + pids = set() + total = 0 + for pid, sq in outputs: + pids.add(pid) + total += sq + print("%d pids, total = %d" % (len(pids), total)) + pool.close() + pool.join() + """ + + class MultiprocessingTest(CoverageTest): """Test support of the multiprocessing module.""" - def test_multiprocessing(self): - self.make_file("multi.py", """\ - import multiprocessing - import os - import time - import sys - - def func(x): - # Need to pause, or the tasks go too quick, and some processes - # in the pool don't get any work, and then don't record data. - time.sleep(0.02) - # Use different lines in different subprocesses. - if x % 2: - y = x*x - else: - y = x*x*x - return os.getpid(), y - - if __name__ == "__main__": - if len(sys.argv) > 1: multiprocessing.set_start_method(sys.argv[1]) - pool = multiprocessing.Pool(3) - inputs = range(30) - outputs = pool.imap_unordered(func, inputs) - pids = set() - total = 0 - for pid, sq in outputs: - pids.add(pid) - total += sq - print("%d pids, total = %d" % (len(pids), total)) - pool.close() - pool.join() - """) + def try_multiprocessing_code(self, code, expected_out): + """Run code using multiprocessing, it should produce `expected_out`.""" + self.make_file("multi.py", code) if env.PYVERSION >= (3, 4): start_methods = ['fork', 'spawn'] @@ -268,16 +341,20 @@ class MultiprocessingTest(CoverageTest): out = self.run_command( "coverage run --concurrency=multiprocessing multi.py %s" % start_method ) - total = sum(x*x if x%2 else x*x*x for x in range(30)) - self.assertEqual(out.rstrip(), "3 pids, total = %d" % total) + self.assertEqual(out.rstrip(), expected_out) self.run_command("coverage combine") out = self.run_command("coverage report -m") + last_line = self.squeezed_lines(out)[-1] - self.assertEqual(last_line, "multi.py 23 0 100%") + expected_report = "multi.py {lines} 0 100%".format(lines=line_count(code)) + self.assertEqual(last_line, expected_report) + def test_multiprocessing(self): + nprocs = 3 + upto = 30 + code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) + total = sum(x*x if x%2 else x*x*x for x in range(upto)) + expected = "{nprocs} pids, total = {total}".format(nprocs=nprocs, total=total) -def print_simple_annotation(code, linenos): - """Print the lines in `code` with X for each line number in `linenos`.""" - for lineno, line in enumerate(code.splitlines(), start=1): - print(" {0} {1}".format("X" if lineno in linenos else " ", line)) + self.try_multiprocessing_code(code, expected) |