diff options
author | Ned Batchelder <ned@nedbatchelder.com> | 2014-09-24 21:03:16 -0400 |
---|---|---|
committer | Ned Batchelder <ned@nedbatchelder.com> | 2014-09-24 21:03:16 -0400 |
commit | 7b5457967f256696d3b6c936e81436aa60b4b409 (patch) | |
tree | 6cc4e2517a408a730a525c4ee98ad38f3a532cff /tests/test_concurrency.py | |
parent | 2eb5fe3be624d8f970d0394d32a1f2fa148a9372 (diff) | |
download | python-coveragepy-git-7b5457967f256696d3b6c936e81436aa60b4b409.tar.gz |
"concurrency" is a better name that "coroutine"
--HG--
rename : tests/test_coroutine.py => tests/test_concurrency.py
Diffstat (limited to 'tests/test_concurrency.py')
-rw-r--r-- | tests/test_concurrency.py | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 00000000..5ea756f6 --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,208 @@ +"""Tests for concurrency libraries.""" + +import os, os.path, sys, threading + +import coverage + +from tests.coveragetest import CoverageTest + + +# These libraries aren't always available, we'll skip tests if they aren't. + +try: + import eventlet # pylint: disable=import-error +except ImportError: + eventlet = None + +try: + import gevent # pylint: disable=import-error +except ImportError: + gevent = None + +try: + import greenlet # pylint: disable=import-error +except ImportError: + greenlet = None + +# Are we running with the C tracer or not? +C_TRACER = os.getenv('COVERAGE_TEST_TRACER', 'c') == 'c' + + +def line_count(s): + """How many non-blank non-comment lines are in `s`?""" + def code_line(l): + """Is this a code line? Not blank, and not a full-line comment.""" + return l.strip() and not l.strip().startswith('#') + return sum(1 for l in s.splitlines() if code_line(l)) + + +class ConcurrencyTest(CoverageTest): + """Tests of the concurrency support in coverage.py.""" + + LIMIT = 1000 + + # The code common to all the concurrency models. + COMMON = """ + class Producer(threading.Thread): + def __init__(self, q): + threading.Thread.__init__(self) + self.q = q + + def run(self): + for i in range({LIMIT}): + self.q.put(i) + self.q.put(None) + + class Consumer(threading.Thread): + def __init__(self, q): + threading.Thread.__init__(self) + self.q = q + + def run(self): + sum = 0 + while True: + i = self.q.get() + if i is None: + print(sum) + break + sum += i + + q = queue.Queue() + c = Consumer(q) + p = Producer(q) + c.start() + p.start() + + p.join() + c.join() + """.format(LIMIT=LIMIT) + + # Import the things to use threads. + if sys.version_info < (3, 0): + THREAD = """\ + import threading + import Queue as queue + """ + COMMON + else: + THREAD = """\ + import threading + import queue + """ + COMMON + + # Import the things to use eventlet. + EVENTLET = """\ + import eventlet.green.threading as threading + import eventlet.queue as queue + """ + COMMON + + # Import the things to use gevent. + GEVENT = """\ + from gevent import monkey + monkey.patch_thread() + import threading + import gevent.queue as queue + """ + COMMON + + # Uncomplicated code that doesn't use any of the concurrency stuff, to test + # the simple case under each of the regimes. + SIMPLE = """\ + total = 0 + for i in range({LIMIT}): + total += i + print(total) + """.format(LIMIT=LIMIT) + + def try_some_code(self, code, concurrency, the_module, expected_out=None): + """Run some concurrency testing code and see that it was all covered. + + `code` is the Python code to execute. `concurrency` is the name of + the concurrency regime to test it under. `the_module` is the imported + module that must be available for this to work at all. `expected_out` + is the text we expect the code to produce. + + """ + + self.make_file("try_it.py", code) + + cmd = "coverage run --concurrency=%s try_it.py" % concurrency + out = self.run_command(cmd) + + if not the_module: + # We don't even have the underlying module installed, we expect + # coverage to alert us to this fact. + expected_out = ( + "Couldn't trace with concurrency=%s, " + "the module isn't installed.\n" % concurrency + ) + self.assertEqual(out, expected_out) + elif C_TRACER or concurrency == "thread": + # We can fully measure the code if we are using the C tracer, which + # can support all the concurrency, or if we are using threads. + if expected_out is None: + expected_out = "%d\n" % (sum(range(self.LIMIT))) + self.assertEqual(out, expected_out) + + # Read the coverage file and see that try_it.py has all its lines + # executed. + data = coverage.CoverageData() + data.read_file(".coverage") + + # If the test fails, it's helpful to see this info: + fname = os.path.abspath("try_it.py") + linenos = data.executed_lines(fname).keys() + print("{0}: {1}".format(len(linenos), linenos)) + print_simple_annotation(code, linenos) + + lines = line_count(code) + self.assertEqual(data.summary()['try_it.py'], lines) + else: + expected_out = ( + "Can't support concurrency=%s with PyTracer, " + "only threads are supported\n" % concurrency + ) + self.assertEqual(out, expected_out) + + def test_threads(self): + self.try_some_code(self.THREAD, "thread", threading) + + def test_threads_simple_code(self): + self.try_some_code(self.SIMPLE, "thread", threading) + + def test_eventlet(self): + self.try_some_code(self.EVENTLET, "eventlet", eventlet) + + def test_eventlet_simple_code(self): + self.try_some_code(self.SIMPLE, "eventlet", eventlet) + + def test_gevent(self): + self.try_some_code(self.GEVENT, "gevent", gevent) + + def test_gevent_simple_code(self): + self.try_some_code(self.SIMPLE, "gevent", gevent) + + def test_greenlet(self): + GREENLET = """\ + from greenlet import greenlet + + def test1(x, y): + z = gr2.switch(x+y) + print(z) + + def test2(u): + print(u) + gr1.switch(42) + + gr1 = greenlet(test1) + gr2 = greenlet(test2) + gr1.switch("hello", " world") + """ + self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") + + def test_greenlet_simple_code(self): + self.try_some_code(self.SIMPLE, "greenlet", greenlet) + + +def print_simple_annotation(code, linenos): + """Print the lines in `code` with X for each line number in `linenos`.""" + for lineno, line in enumerate(code.splitlines(), start=1): + print(" {0} {1}".format("X" if lineno in linenos else " ", line)) |