summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2016-07-03 08:34:10 -0400
committerNed Batchelder <ned@nedbatchelder.com>2016-07-03 08:34:10 -0400
commitffe481b845ef14289f6920cd2a5b7928e6c78d6a (patch)
treeddf50d916615b27aa2d83b4cb69fe17fc9a3ae62
parentf475fb9891808fcd8926dc19599da8e03473af38 (diff)
downloadpython-coveragepy-ffe481b845ef14289f6920cd2a5b7928e6c78d6a.tar.gz
Refactor concurrency tests to make the pieces more composable
-rw-r--r--tests/test_concurrency.py315
1 files changed, 196 insertions, 119 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
index 04eb985..4bbc3b9 100644
--- a/tests/test_concurrency.py
+++ b/tests/test_concurrency.py
@@ -28,89 +28,141 @@ except ImportError:
import greenlet
+def measurable_line(l):
+ """Is this a line of code coverage will measure?
+
+ Not blank, not a comment, and not "else"
+ """
+ l = l.strip()
+ if not l:
+ return False
+ if l.startswith('#'):
+ return False
+ if l.startswith('else:'):
+ return False
+ return True
+
+
def line_count(s):
- """How many non-blank non-comment lines are in `s`?"""
- def code_line(l):
- """Is this a code line? Not blank, and not a full-line comment."""
- return l.strip() and not l.strip().startswith('#')
- return sum(1 for l in s.splitlines() if code_line(l))
+ """How many measurable lines are in `s`?"""
+ return len(list(filter(measurable_line, s.splitlines())))
-class ConcurrencyTest(CoverageTest):
- """Tests of the concurrency support in coverage.py."""
+def print_simple_annotation(code, linenos):
+ """Print the lines in `code` with X for each line number in `linenos`."""
+ for lineno, line in enumerate(code.splitlines(), start=1):
+ print(" {0} {1}".format("X" if lineno in linenos else " ", line))
+
+
+class LineCountTest(CoverageTest):
+ """Test the helpers here."""
+
+ run_in_temp_dir = False
+
+ def test_line_count(self):
+ CODE = """
+ # Hey there!
+ x = 1
+ if x:
+ print("hello")
+ else:
+ print("bye")
+
+ print("done")
+ """
+
+ self.assertEqual(line_count(CODE), 5)
+
+
+# The code common to all the concurrency models.
+SUM_THEM_Q = """
+ # Above this will be imports defining queue and threading.
+
+ class Producer(threading.Thread):
+ def __init__(self, limit, q):
+ threading.Thread.__init__(self)
+ self.limit = limit
+ self.q = q
- LIMIT = 1000
-
- # The code common to all the concurrency models.
- COMMON = """
- class Producer(threading.Thread):
- def __init__(self, q):
- threading.Thread.__init__(self)
- self.q = q
-
- def run(self):
- for i in range({LIMIT}):
- self.q.put(i)
- self.q.put(None)
-
- class Consumer(threading.Thread):
- def __init__(self, q):
- threading.Thread.__init__(self)
- self.q = q
-
- def run(self):
- sum = 0
- while True:
- i = self.q.get()
- if i is None:
- print(sum)
- break
- sum += i
+ def run(self):
+ for i in range(self.limit):
+ self.q.put(i)
+ self.q.put(None)
+ class Consumer(threading.Thread):
+ def __init__(self, q, qresult):
+ threading.Thread.__init__(self)
+ self.q = q
+ self.qresult = qresult
+
+ def run(self):
+ sum = 0
+ while True:
+ i = self.q.get()
+ if i is None:
+ break
+ sum += i
+ self.qresult.put(sum)
+
+ def sum_them(limit):
q = queue.Queue()
- c = Consumer(q)
- p = Producer(q)
+ qresult = queue.Queue()
+ c = Consumer(q, qresult)
+ p = Producer(limit, q)
c.start()
p.start()
p.join()
c.join()
- """.format(LIMIT=LIMIT)
-
- # Import the things to use threads.
- if env.PY2:
- THREAD = """\
- import threading
- import Queue as queue
- """ + COMMON
- else:
- THREAD = """\
- import threading
- import queue
- """ + COMMON
-
- # Import the things to use eventlet.
- EVENTLET = """\
- import eventlet.green.threading as threading
- import eventlet.queue as queue
- """ + COMMON
-
- # Import the things to use gevent.
- GEVENT = """\
- from gevent import monkey
- monkey.patch_thread()
- import threading
- import gevent.queue as queue
- """ + COMMON
-
- # Uncomplicated code that doesn't use any of the concurrency stuff, to test
- # the simple case under each of the regimes.
- SIMPLE = """\
- total = 0
- for i in range({LIMIT}):
- total += i
- print(total)
- """.format(LIMIT=LIMIT)
+ return qresult.get()
+
+ # Below this will be something using sum_them.
+ """
+
+PRINT_SUM_THEM = """
+ print(sum_them({QLIMIT}))
+ """
+
+# Import the things to use threads.
+if env.PY2:
+ THREAD = """
+ import threading
+ import Queue as queue
+ """
+else:
+ THREAD = """
+ import threading
+ import queue
+ """
+
+# Import the things to use eventlet.
+EVENTLET = """
+ import eventlet.green.threading as threading
+ import eventlet.queue as queue
+ """
+
+# Import the things to use gevent.
+GEVENT = """
+ from gevent import monkey
+ monkey.patch_thread()
+ import threading
+ import gevent.queue as queue
+ """
+
+# Uncomplicated code that doesn't use any of the concurrency stuff, to test
+# the simple case under each of the regimes.
+SIMPLE = """
+ total = 0
+ for i in range({QLIMIT}):
+ total += i
+ print(total)
+ """
+
+
+class ConcurrencyTest(CoverageTest):
+ """Tests of the concurrency support in coverage.py."""
+
+ QLIMIT = 1000
def try_some_code(self, code, concurrency, the_module, expected_out=None):
"""Run some concurrency testing code and see that it was all covered.
@@ -139,7 +191,8 @@ class ConcurrencyTest(CoverageTest):
# We can fully measure the code if we are using the C tracer, which
# can support all the concurrency, or if we are using threads.
if expected_out is None:
- expected_out = "%d\n" % (sum(range(self.LIMIT)))
+ expected_out = "%d\n" % (sum(range(self.QLIMIT)))
+ print(code)
self.assertEqual(out, expected_out)
# Read the coverage file and see that try_it.py has all its lines
@@ -163,22 +216,28 @@ class ConcurrencyTest(CoverageTest):
self.assertEqual(out, expected_out)
def test_threads(self):
- self.try_some_code(self.THREAD, "thread", threading)
+ code = (THREAD + SUM_THEM_Q + PRINT_SUM_THEM).format(QLIMIT=self.QLIMIT)
+ self.try_some_code(code, "thread", threading)
def test_threads_simple_code(self):
- self.try_some_code(self.SIMPLE, "thread", threading)
+ code = SIMPLE.format(QLIMIT=self.QLIMIT)
+ self.try_some_code(code, "thread", threading)
def test_eventlet(self):
- self.try_some_code(self.EVENTLET, "eventlet", eventlet)
+ code = (EVENTLET + SUM_THEM_Q + PRINT_SUM_THEM).format(QLIMIT=self.QLIMIT)
+ self.try_some_code(code, "eventlet", eventlet)
def test_eventlet_simple_code(self):
- self.try_some_code(self.SIMPLE, "eventlet", eventlet)
+ code = SIMPLE.format(QLIMIT=self.QLIMIT)
+ self.try_some_code(code, "eventlet", eventlet)
def test_gevent(self):
- self.try_some_code(self.GEVENT, "gevent", gevent)
+ code = (GEVENT + SUM_THEM_Q + PRINT_SUM_THEM).format(QLIMIT=self.QLIMIT)
+ self.try_some_code(code, "gevent", gevent)
def test_gevent_simple_code(self):
- self.try_some_code(self.SIMPLE, "gevent", gevent)
+ code = SIMPLE.format(QLIMIT=self.QLIMIT)
+ self.try_some_code(code, "gevent", gevent)
def test_greenlet(self):
GREENLET = """\
@@ -199,7 +258,8 @@ class ConcurrencyTest(CoverageTest):
self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n")
def test_greenlet_simple_code(self):
- self.try_some_code(self.SIMPLE, "greenlet", greenlet)
+ code = SIMPLE.format(QLIMIT=self.QLIMIT)
+ self.try_some_code(code, "greenlet", greenlet)
def test_bug_330(self):
BUG_330 = """\
@@ -220,41 +280,54 @@ class ConcurrencyTest(CoverageTest):
self.try_some_code(BUG_330, "eventlet", eventlet, "0\n")
+SQUARE_OR_CUBE_WORK = """
+ def work(x):
+ # Use different lines in different subprocesses.
+ if x % 2:
+ y = x*x
+ else:
+ y = x*x*x
+ return y
+ """
+
+MULTI_CODE = """
+ # Above this will be a defintion of work().
+ import multiprocessing
+ import os
+ import time
+ import sys
+
+ def process_worker_main(args):
+ # Need to pause, or the tasks go too quick, and some processes
+ # in the pool don't get any work, and then don't record data.
+ time.sleep(0.02)
+ ret = work(*args)
+ return os.getpid(), ret
+
+ if __name__ == "__main__":
+ # This if is on a single line so we can get 100% coverage
+ # even if we have no arguments.
+ if len(sys.argv) > 1: multiprocessing.set_start_method(sys.argv[1])
+ pool = multiprocessing.Pool({NPROCS})
+ inputs = [(x,) for x in range({UPTO})]
+ outputs = pool.imap_unordered(process_worker_main, inputs)
+ pids = set()
+ total = 0
+ for pid, sq in outputs:
+ pids.add(pid)
+ total += sq
+ print("%d pids, total = %d" % (len(pids), total))
+ pool.close()
+ pool.join()
+ """
+
+
class MultiprocessingTest(CoverageTest):
"""Test support of the multiprocessing module."""
- def test_multiprocessing(self):
- self.make_file("multi.py", """\
- import multiprocessing
- import os
- import time
- import sys
-
- def func(x):
- # Need to pause, or the tasks go too quick, and some processes
- # in the pool don't get any work, and then don't record data.
- time.sleep(0.02)
- # Use different lines in different subprocesses.
- if x % 2:
- y = x*x
- else:
- y = x*x*x
- return os.getpid(), y
-
- if __name__ == "__main__":
- if len(sys.argv) > 1: multiprocessing.set_start_method(sys.argv[1])
- pool = multiprocessing.Pool(3)
- inputs = range(30)
- outputs = pool.imap_unordered(func, inputs)
- pids = set()
- total = 0
- for pid, sq in outputs:
- pids.add(pid)
- total += sq
- print("%d pids, total = %d" % (len(pids), total))
- pool.close()
- pool.join()
- """)
+ def try_multiprocessing_code(self, code, expected_out):
+ """Run code using multiprocessing, it should produce `expected_out`."""
+ self.make_file("multi.py", code)
if env.PYVERSION >= (3, 4):
start_methods = ['fork', 'spawn']
@@ -268,16 +341,20 @@ class MultiprocessingTest(CoverageTest):
out = self.run_command(
"coverage run --concurrency=multiprocessing multi.py %s" % start_method
)
- total = sum(x*x if x%2 else x*x*x for x in range(30))
- self.assertEqual(out.rstrip(), "3 pids, total = %d" % total)
+ self.assertEqual(out.rstrip(), expected_out)
self.run_command("coverage combine")
out = self.run_command("coverage report -m")
+
last_line = self.squeezed_lines(out)[-1]
- self.assertEqual(last_line, "multi.py 23 0 100%")
+ expected_report = "multi.py {lines} 0 100%".format(lines=line_count(code))
+ self.assertEqual(last_line, expected_report)
+ def test_multiprocessing(self):
+ nprocs = 3
+ upto = 30
+ code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto)
+ total = sum(x*x if x%2 else x*x*x for x in range(upto))
+ expected = "{nprocs} pids, total = {total}".format(nprocs=nprocs, total=total)
-def print_simple_annotation(code, linenos):
- """Print the lines in `code` with X for each line number in `linenos`."""
- for lineno, line in enumerate(code.splitlines(), start=1):
- print(" {0} {1}".format("X" if lineno in linenos else " ", line))
+ self.try_multiprocessing_code(code, expected)