summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lab/parser.py24
-rw-r--r--tests/test_coroutine.py119
2 files changed, 140 insertions, 3 deletions
diff --git a/lab/parser.py b/lab/parser.py
index a8e03eec..cc8266a9 100644
--- a/lab/parser.py
+++ b/lab/parser.py
@@ -1,6 +1,9 @@
"""Parser.py: a main for invoking code in coverage/parser.py"""
+from __future__ import division
+
import glob, os, sys
+import collections
from optparse import OptionParser
import disgen
@@ -8,6 +11,7 @@ import disgen
from coverage.misc import CoverageException
from coverage.parser import ByteParser, CodeParser
+opcode_counts = collections.Counter()
class ParserMain(object):
"""A main for code parsing experiments."""
@@ -25,6 +29,10 @@ class ParserMain(object):
help="Disassemble"
)
parser.add_option(
+ "-H", action="store_true", dest="histogram",
+ help="Count occurrences of opcodes"
+ )
+ parser.add_option(
"-R", action="store_true", dest="recursive",
help="Recurse to find source files"
)
@@ -51,18 +59,25 @@ class ParserMain(object):
else:
self.one_file(options, args[0])
+ if options.histogram:
+ total = sum(opcode_counts.values())
+ print("{} total opcodes".format(total))
+ for opcode, number in opcode_counts.most_common():
+ print("{:20s} {:6d} {:.1%}".format(opcode, number, number/total))
+
+
def one_file(self, options, filename):
"""Process just one file."""
try:
bp = ByteParser(filename=filename)
- except CoverageException as err:
+ except Exception as err:
print("%s" % (err,))
return
if options.dis:
print("Main code:")
- self.disassemble(bp)
+ self.disassemble(bp, histogram=options.histogram)
arcs = bp._all_arcs()
if options.chunks and not options.dis:
@@ -104,7 +119,7 @@ class ParserMain(object):
(lineno, m0, m1, m2, m3, a, ltext)
)
- def disassemble(self, byte_parser):
+ def disassemble(self, byte_parser, histogram=False):
"""Disassemble code, for ad-hoc experimenting."""
for bp in byte_parser.child_parsers():
@@ -117,6 +132,9 @@ class ParserMain(object):
print("\n%s: " % bp.code)
upto = None
for disline in disgen.disgen(bp.code):
+ if histogram:
+ opcode_counts[disline.opcode] += 1
+ continue
if disline.first:
if srclines:
upto = upto or disline.lineno-1
diff --git a/tests/test_coroutine.py b/tests/test_coroutine.py
new file mode 100644
index 00000000..8e50a358
--- /dev/null
+++ b/tests/test_coroutine.py
@@ -0,0 +1,119 @@
+"""Tests for coroutining."""
+
+from nose.plugins.skip import SkipTest
+import coverage
+
+from tests.coveragetest import CoverageTest
+
+
+# These libraries aren't always available, we'll skip tests if they aren't.
+
+try:
+ import eventlet # pylint: disable=import-error
+except ImportError:
+ eventlet = None
+
+try:
+ import gevent # pylint: disable=import-error
+except ImportError:
+ gevent = None
+
+
+def line_count(s):
+ """How many non-blank lines are in `s`?"""
+ return sum(1 for l in s.splitlines() if l.strip())
+
+
+class CoroutineTest(CoverageTest):
+ """Tests of the coroutine support in coverage.py."""
+
+ # The code common to all the concurrency models. Don't use any comments,
+ # we're counting non-blank lines to see they are all covered.
+ COMMON = """
+ class Producer(threading.Thread):
+ def __init__(self, q):
+ threading.Thread.__init__(self)
+ self.q = q
+
+ def run(self):
+ for i in range(1000):
+ self.q.put(i)
+ self.q.put(None)
+
+ class Consumer(threading.Thread):
+ def __init__(self, q):
+ threading.Thread.__init__(self)
+ self.q = q
+
+ def run(self):
+ sum = 0
+ while True:
+ i = self.q.get()
+ if i is None:
+ print(sum)
+ break
+ sum += i
+
+ q = Queue.Queue()
+ c = Consumer(q)
+ c.start()
+ p = Producer(q)
+ p.start()
+ p.join()
+ c.join()
+ """
+
+ # Import the things to use threads.
+ THREAD = """\
+ import threading
+ try:
+ import Queue
+ except ImportError:
+ # Python 3 :)
+ import queue as Queue
+ """ + COMMON
+
+ # Import the things to use eventlet.
+ EVENTLET = """\
+ import eventlet.green.threading as threading
+ import eventlet.queue as Queue
+ """ + COMMON
+
+ # Import the things to use gevent.
+ GEVENT = """\
+ from gevent import monkey
+ monkey.patch_thread()
+ import threading
+ import gevent.queue as Queue
+ """ + COMMON
+
+ def try_some_code(self, code, args):
+ """Run some coroutine testing code and see that it was all covered."""
+ raise SkipTest("Need to put this on a back burner for a while...")
+ self.make_file("try_it.py", code)
+
+ out = self.run_command("coverage run %s try_it.py" % args)
+ expected_out = "%d\n" % (sum(range(1000)))
+ self.assertEqual(out, expected_out)
+
+ # Read the coverage file and see that try_it.py has all its lines
+ # executed.
+ data = coverage.CoverageData()
+ data.read_file(".coverage")
+ lines = line_count(code)
+ self.assertEqual(data.summary()['try_it.py'], lines)
+
+ def test_threads(self):
+ self.try_some_code(self.THREAD, "")
+
+ def test_eventlet(self):
+ if eventlet is None:
+ raise SkipTest("No eventlet available")
+
+ self.try_some_code(self.EVENTLET, "--coroutine=eventlet")
+
+ def test_gevent(self):
+ if gevent is None:
+ raise SkipTest("No gevent available")
+
+ self.try_some_code(self.GEVENT, "--coroutine=gevent")