summaryrefslogtreecommitdiff
path: root/tests/test_concurrency.py
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2014-09-24 21:03:16 -0400
committerNed Batchelder <ned@nedbatchelder.com>2014-09-24 21:03:16 -0400
commit7b5457967f256696d3b6c936e81436aa60b4b409 (patch)
tree6cc4e2517a408a730a525c4ee98ad38f3a532cff /tests/test_concurrency.py
parent2eb5fe3be624d8f970d0394d32a1f2fa148a9372 (diff)
downloadpython-coveragepy-git-7b5457967f256696d3b6c936e81436aa60b4b409.tar.gz
"concurrency" is a better name that "coroutine"
--HG-- rename : tests/test_coroutine.py => tests/test_concurrency.py
Diffstat (limited to 'tests/test_concurrency.py')
-rw-r--r--tests/test_concurrency.py208
1 files changed, 208 insertions, 0 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
new file mode 100644
index 00000000..5ea756f6
--- /dev/null
+++ b/tests/test_concurrency.py
@@ -0,0 +1,208 @@
+"""Tests for concurrency libraries."""
+
+import os, os.path, sys, threading
+
+import coverage
+
+from tests.coveragetest import CoverageTest
+
+
+# These libraries aren't always available, we'll skip tests if they aren't.
+
+try:
+ import eventlet # pylint: disable=import-error
+except ImportError:
+ eventlet = None
+
+try:
+ import gevent # pylint: disable=import-error
+except ImportError:
+ gevent = None
+
+try:
+ import greenlet # pylint: disable=import-error
+except ImportError:
+ greenlet = None
+
+# Are we running with the C tracer or not?
+C_TRACER = os.getenv('COVERAGE_TEST_TRACER', 'c') == 'c'
+
+
+def line_count(s):
+ """How many non-blank non-comment lines are in `s`?"""
+ def code_line(l):
+ """Is this a code line? Not blank, and not a full-line comment."""
+ return l.strip() and not l.strip().startswith('#')
+ return sum(1 for l in s.splitlines() if code_line(l))
+
+
+class ConcurrencyTest(CoverageTest):
+ """Tests of the concurrency support in coverage.py."""
+
+ LIMIT = 1000
+
+ # The code common to all the concurrency models.
+ COMMON = """
+ class Producer(threading.Thread):
+ def __init__(self, q):
+ threading.Thread.__init__(self)
+ self.q = q
+
+ def run(self):
+ for i in range({LIMIT}):
+ self.q.put(i)
+ self.q.put(None)
+
+ class Consumer(threading.Thread):
+ def __init__(self, q):
+ threading.Thread.__init__(self)
+ self.q = q
+
+ def run(self):
+ sum = 0
+ while True:
+ i = self.q.get()
+ if i is None:
+ print(sum)
+ break
+ sum += i
+
+ q = queue.Queue()
+ c = Consumer(q)
+ p = Producer(q)
+ c.start()
+ p.start()
+
+ p.join()
+ c.join()
+ """.format(LIMIT=LIMIT)
+
+ # Import the things to use threads.
+ if sys.version_info < (3, 0):
+ THREAD = """\
+ import threading
+ import Queue as queue
+ """ + COMMON
+ else:
+ THREAD = """\
+ import threading
+ import queue
+ """ + COMMON
+
+ # Import the things to use eventlet.
+ EVENTLET = """\
+ import eventlet.green.threading as threading
+ import eventlet.queue as queue
+ """ + COMMON
+
+ # Import the things to use gevent.
+ GEVENT = """\
+ from gevent import monkey
+ monkey.patch_thread()
+ import threading
+ import gevent.queue as queue
+ """ + COMMON
+
+ # Uncomplicated code that doesn't use any of the concurrency stuff, to test
+ # the simple case under each of the regimes.
+ SIMPLE = """\
+ total = 0
+ for i in range({LIMIT}):
+ total += i
+ print(total)
+ """.format(LIMIT=LIMIT)
+
+ def try_some_code(self, code, concurrency, the_module, expected_out=None):
+ """Run some concurrency testing code and see that it was all covered.
+
+ `code` is the Python code to execute. `concurrency` is the name of
+ the concurrency regime to test it under. `the_module` is the imported
+ module that must be available for this to work at all. `expected_out`
+ is the text we expect the code to produce.
+
+ """
+
+ self.make_file("try_it.py", code)
+
+ cmd = "coverage run --concurrency=%s try_it.py" % concurrency
+ out = self.run_command(cmd)
+
+ if not the_module:
+ # We don't even have the underlying module installed, we expect
+ # coverage to alert us to this fact.
+ expected_out = (
+ "Couldn't trace with concurrency=%s, "
+ "the module isn't installed.\n" % concurrency
+ )
+ self.assertEqual(out, expected_out)
+ elif C_TRACER or concurrency == "thread":
+ # We can fully measure the code if we are using the C tracer, which
+ # can support all the concurrency, or if we are using threads.
+ if expected_out is None:
+ expected_out = "%d\n" % (sum(range(self.LIMIT)))
+ self.assertEqual(out, expected_out)
+
+ # Read the coverage file and see that try_it.py has all its lines
+ # executed.
+ data = coverage.CoverageData()
+ data.read_file(".coverage")
+
+ # If the test fails, it's helpful to see this info:
+ fname = os.path.abspath("try_it.py")
+ linenos = data.executed_lines(fname).keys()
+ print("{0}: {1}".format(len(linenos), linenos))
+ print_simple_annotation(code, linenos)
+
+ lines = line_count(code)
+ self.assertEqual(data.summary()['try_it.py'], lines)
+ else:
+ expected_out = (
+ "Can't support concurrency=%s with PyTracer, "
+ "only threads are supported\n" % concurrency
+ )
+ self.assertEqual(out, expected_out)
+
+ def test_threads(self):
+ self.try_some_code(self.THREAD, "thread", threading)
+
+ def test_threads_simple_code(self):
+ self.try_some_code(self.SIMPLE, "thread", threading)
+
+ def test_eventlet(self):
+ self.try_some_code(self.EVENTLET, "eventlet", eventlet)
+
+ def test_eventlet_simple_code(self):
+ self.try_some_code(self.SIMPLE, "eventlet", eventlet)
+
+ def test_gevent(self):
+ self.try_some_code(self.GEVENT, "gevent", gevent)
+
+ def test_gevent_simple_code(self):
+ self.try_some_code(self.SIMPLE, "gevent", gevent)
+
+ def test_greenlet(self):
+ GREENLET = """\
+ from greenlet import greenlet
+
+ def test1(x, y):
+ z = gr2.switch(x+y)
+ print(z)
+
+ def test2(u):
+ print(u)
+ gr1.switch(42)
+
+ gr1 = greenlet(test1)
+ gr2 = greenlet(test2)
+ gr1.switch("hello", " world")
+ """
+ self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n")
+
+ def test_greenlet_simple_code(self):
+ self.try_some_code(self.SIMPLE, "greenlet", greenlet)
+
+
+def print_simple_annotation(code, linenos):
+ """Print the lines in `code` with X for each line number in `linenos`."""
+ for lineno, line in enumerate(code.splitlines(), start=1):
+ print(" {0} {1}".format("X" if lineno in linenos else " ", line))