diff options
Diffstat (limited to 'tests/test_process.py')
-rw-r--r-- | tests/test_process.py | 549 |
1 files changed, 461 insertions, 88 deletions
diff --git a/tests/test_process.py b/tests/test_process.py index 78fba81a..81cd5ade 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -1,29 +1,35 @@ +# coding: utf8 +# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 +# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt + """Tests for process behavior of coverage.py.""" import glob import os import os.path +import re import sys import textwrap import coverage -from coverage import env +from coverage import env, CoverageData +from coverage.misc import output_encoding from tests.coveragetest import CoverageTest -HERE = os.path.dirname(__file__) +TRY_EXECFILE = os.path.join(os.path.dirname(__file__), "modules/process_test/try_execfile.py") class ProcessTest(CoverageTest): """Tests of the per-process behavior of coverage.py.""" + def data_files(self): + """Return the names of coverage data files in this directory.""" + return [f for f in os.listdir('.') if (f.startswith('.coverage.') or f == '.coverage')] + def number_of_data_files(self): """Return the number of coverage data files in this directory.""" - num = 0 - for f in os.listdir('.'): - if f.startswith('.coverage.') or f == '.coverage': - num += 1 - return num + return len(self.data_files()) def test_save_on_exit(self): self.make_file("mycode.py", """\ @@ -36,7 +42,7 @@ class ProcessTest(CoverageTest): self.assert_exists(".coverage") def test_environment(self): - # Checks that we can import modules from the test directory at all! + # Checks that we can import modules from the tests directory at all! self.make_file("mycode.py", """\ import covmod1 import covmodzip1 @@ -49,7 +55,8 @@ class ProcessTest(CoverageTest): self.assert_exists(".coverage") self.assertEqual(out, 'done\n') - def test_combine_parallel_data(self): + def make_b_or_c_py(self): + """Create b_or_c.py, used in a few of these tests.""" self.make_file("b_or_c.py", """\ import sys a = 1 @@ -61,9 +68,12 @@ class ProcessTest(CoverageTest): print('done') """) + def test_combine_parallel_data(self): + self.make_b_or_c_py() out = self.run_command("coverage run -p b_or_c.py b") self.assertEqual(out, 'done\n') self.assert_doesnt_exist(".coverage") + self.assertEqual(self.number_of_data_files(), 1) out = self.run_command("coverage run -p b_or_c.py c") self.assertEqual(out, 'done\n') @@ -83,19 +93,57 @@ class ProcessTest(CoverageTest): # executed. data = coverage.CoverageData() data.read_file(".coverage") - self.assertEqual(data.summary()['b_or_c.py'], 7) + self.assertEqual(data.line_counts()['b_or_c.py'], 7) + + # Running combine again should fail, because there are no parallel data + # files to combine. + status, out = self.run_command_status("coverage combine") + self.assertEqual(status, 1) + self.assertEqual(out, "No data to combine\n") + + # And the originally combined data is still there. + data = coverage.CoverageData() + data.read_file(".coverage") + self.assertEqual(data.line_counts()['b_or_c.py'], 7) + + def test_combine_parallel_data_with_a_corrupt_file(self): + self.make_b_or_c_py() + out = self.run_command("coverage run -p b_or_c.py b") + self.assertEqual(out, 'done\n') + self.assert_doesnt_exist(".coverage") + self.assertEqual(self.number_of_data_files(), 1) + + out = self.run_command("coverage run -p b_or_c.py c") + self.assertEqual(out, 'done\n') + self.assert_doesnt_exist(".coverage") + + # After two -p runs, there should be two .coverage.machine.123 files. + self.assertEqual(self.number_of_data_files(), 2) + + # Make a bogus data file. + self.make_file(".coverage.bad", "This isn't a coverage data file.") + + # Combine the parallel coverage data files into .coverage . + out = self.run_command("coverage combine") + self.assert_exists(".coverage") + self.assert_exists(".coverage.bad") + warning_regex = ( + r"Coverage.py warning: Couldn't read data from '.*\.coverage\.bad': " + r"CoverageException: Doesn't seem to be a coverage\.py data file" + ) + self.assertRegex(out, warning_regex) + + # After combining, those two should be the only data files. + self.assertEqual(self.number_of_data_files(), 2) + + # Read the coverage file and see that b_or_c.py has all 7 lines + # executed. + data = coverage.CoverageData() + data.read_file(".coverage") + self.assertEqual(data.line_counts()['b_or_c.py'], 7) def test_combine_parallel_data_in_two_steps(self): - self.make_file("b_or_c.py", """\ - import sys - a = 1 - if sys.argv[1] == 'b': - b = 1 - else: - c = 1 - d = 1 - print('done') - """) + self.make_b_or_c_py() out = self.run_command("coverage run -p b_or_c.py b") self.assertEqual(out, 'done\n') @@ -107,13 +155,13 @@ class ProcessTest(CoverageTest): self.assert_exists(".coverage") self.assertEqual(self.number_of_data_files(), 1) - out = self.run_command("coverage run --append -p b_or_c.py c") + out = self.run_command("coverage run -p b_or_c.py c") self.assertEqual(out, 'done\n') self.assert_exists(".coverage") self.assertEqual(self.number_of_data_files(), 2) # Combine the parallel coverage data files into .coverage . - self.run_command("coverage combine") + self.run_command("coverage combine --append") self.assert_exists(".coverage") # After combining, there should be only the .coverage file. @@ -123,20 +171,68 @@ class ProcessTest(CoverageTest): # executed. data = coverage.CoverageData() data.read_file(".coverage") - self.assertEqual(data.summary()['b_or_c.py'], 7) + self.assertEqual(data.line_counts()['b_or_c.py'], 7) - def test_combine_with_rc(self): - self.make_file("b_or_c.py", """\ - import sys - a = 1 - if sys.argv[1] == 'b': - b = 1 - else: - c = 1 - d = 1 - print('done') + def test_append_data(self): + self.make_b_or_c_py() + + out = self.run_command("coverage run b_or_c.py b") + self.assertEqual(out, 'done\n') + self.assert_exists(".coverage") + self.assertEqual(self.number_of_data_files(), 1) + + out = self.run_command("coverage run --append b_or_c.py c") + self.assertEqual(out, 'done\n') + self.assert_exists(".coverage") + self.assertEqual(self.number_of_data_files(), 1) + + # Read the coverage file and see that b_or_c.py has all 7 lines + # executed. + data = coverage.CoverageData() + data.read_file(".coverage") + self.assertEqual(data.line_counts()['b_or_c.py'], 7) + + def test_append_data_with_different_file(self): + self.make_b_or_c_py() + + self.make_file(".coveragerc", """\ + [run] + data_file = .mycovdata """) + out = self.run_command("coverage run b_or_c.py b") + self.assertEqual(out, 'done\n') + self.assert_doesnt_exist(".coverage") + self.assert_exists(".mycovdata") + + out = self.run_command("coverage run --append b_or_c.py c") + self.assertEqual(out, 'done\n') + self.assert_doesnt_exist(".coverage") + self.assert_exists(".mycovdata") + + # Read the coverage file and see that b_or_c.py has all 7 lines + # executed. + data = coverage.CoverageData() + data.read_file(".mycovdata") + self.assertEqual(data.line_counts()['b_or_c.py'], 7) + + def test_append_can_create_a_data_file(self): + self.make_b_or_c_py() + + out = self.run_command("coverage run --append b_or_c.py b") + self.assertEqual(out, 'done\n') + self.assert_exists(".coverage") + self.assertEqual(self.number_of_data_files(), 1) + + # Read the coverage file and see that b_or_c.py has only 6 lines + # executed. + data = coverage.CoverageData() + data.read_file(".coverage") + self.assertEqual(data.line_counts()['b_or_c.py'], 6) + + def test_combine_with_rc(self): + self.make_b_or_c_py() + self.make_file(".coveragerc", """\ [run] parallel = true @@ -165,7 +261,7 @@ class ProcessTest(CoverageTest): # executed. data = coverage.CoverageData() data.read_file(".coverage") - self.assertEqual(data.summary()['b_or_c.py'], 7) + self.assertEqual(data.line_counts()['b_or_c.py'], 7) # Reporting should still work even with the .rc file out = self.run_command("coverage report") @@ -219,13 +315,30 @@ class ProcessTest(CoverageTest): # files have been combined together. data = coverage.CoverageData() data.read_file(".coverage") - summary = data.summary(fullpath=True) + summary = data.line_counts(fullpath=True) self.assertEqual(len(summary), 1) actual = os.path.normcase(os.path.abspath(list(summary.keys())[0])) expected = os.path.normcase(os.path.abspath('src/x.py')) self.assertEqual(actual, expected) self.assertEqual(list(summary.values())[0], 6) + def test_erase_parallel(self): + self.make_file(".coveragerc", """\ + [run] + data_file = data.dat + parallel = True + """) + self.make_file("data.dat") + self.make_file("data.dat.fooey") + self.make_file("data.dat.gooey") + self.make_file(".coverage") + + self.run_command("coverage erase") + self.assert_doesnt_exist("data.dat") + self.assert_doesnt_exist("data.dat.fooey") + self.assert_doesnt_exist("data.dat.gooey") + self.assert_exists(".coverage") + def test_missing_source_file(self): # Check what happens if the source is missing when reporting happens. self.make_file("fleeting.py", """\ @@ -323,19 +436,44 @@ class ProcessTest(CoverageTest): self.assertEqual(status, status2) self.assertEqual(status, 0) + def assert_execfile_output(self, out): + """Assert that the output we got is a successful run of try_execfile.py""" + self.assertIn('"DATA": "xyzzy"', out) + def test_coverage_run_is_like_python(self): - tryfile = os.path.join(HERE, "try_execfile.py") - with open(tryfile) as f: + with open(TRY_EXECFILE) as f: self.make_file("run_me.py", f.read()) out_cov = self.run_command("coverage run run_me.py") out_py = self.run_command("python run_me.py") self.assertMultiLineEqual(out_cov, out_py) + self.assert_execfile_output(out_cov) def test_coverage_run_dashm_is_like_python_dashm(self): # These -m commands assume the coverage tree is on the path. - out_cov = self.run_command("coverage run -m tests.try_execfile") - out_py = self.run_command("python -m tests.try_execfile") + out_cov = self.run_command("coverage run -m process_test.try_execfile") + out_py = self.run_command("python -m process_test.try_execfile") + self.assertMultiLineEqual(out_cov, out_py) + self.assert_execfile_output(out_cov) + + def test_coverage_run_dir_is_like_python_dir(self): + with open(TRY_EXECFILE) as f: + self.make_file("with_main/__main__.py", f.read()) + out_cov = self.run_command("coverage run with_main") + out_py = self.run_command("python with_main") + + # The coverage.py results are not identical to the Python results, and + # I don't know why. For now, ignore those failures. If someone finds + # a real problem with the discrepancies, we can work on it some more. + ignored = r"__file__|__loader__|__package__" + # PyPy includes the current directory in the path when running a + # directory, while CPython and coverage.py do not. Exclude that from + # the comparison also... + if env.PYPY: + ignored += "|"+re.escape(os.getcwd()) + out_cov = remove_matching_lines(out_cov, ignored) + out_py = remove_matching_lines(out_py, ignored) self.assertMultiLineEqual(out_cov, out_py) + self.assert_execfile_output(out_cov) def test_coverage_run_dashm_equal_to_doubledashsource(self): """regression test for #328 @@ -345,19 +483,21 @@ class ProcessTest(CoverageTest): """ # These -m commands assume the coverage tree is on the path. out_cov = self.run_command( - "coverage run --source tests.try_execfile -m tests.try_execfile" + "coverage run --source process_test.try_execfile -m process_test.try_execfile" ) - out_py = self.run_command("python -m tests.try_execfile") + out_py = self.run_command("python -m process_test.try_execfile") self.assertMultiLineEqual(out_cov, out_py) + self.assert_execfile_output(out_cov) def test_coverage_run_dashm_superset_of_doubledashsource(self): """Edge case: --source foo -m foo.bar""" # These -m commands assume the coverage tree is on the path. out_cov = self.run_command( - "coverage run --source tests -m tests.try_execfile" + "coverage run --source process_test -m process_test.try_execfile" ) - out_py = self.run_command("python -m tests.try_execfile") + out_py = self.run_command("python -m process_test.try_execfile") self.assertMultiLineEqual(out_cov, out_py) + self.assert_execfile_output(out_cov) st, out = self.run_command_status("coverage report") self.assertEqual(st, 0) @@ -371,15 +511,16 @@ class ProcessTest(CoverageTest): # keeps the test working. self.make_file("myscript", """\ import sys; sys.dont_write_bytecode = True - import tests.try_execfile + import process_test.try_execfile """) # These -m commands assume the coverage tree is on the path. out_cov = self.run_command( - "coverage run --source tests myscript" + "coverage run --source process_test myscript" ) out_py = self.run_command("python myscript") self.assertMultiLineEqual(out_cov, out_py) + self.assert_execfile_output(out_cov) st, out = self.run_command_status("coverage report") self.assertEqual(st, 0) @@ -387,19 +528,19 @@ class ProcessTest(CoverageTest): def test_coverage_run_dashm_is_like_python_dashm_off_path(self): # https://bitbucket.org/ned/coveragepy/issue/242 - tryfile = os.path.join(HERE, "try_execfile.py") self.make_file("sub/__init__.py", "") - with open(tryfile) as f: + with open(TRY_EXECFILE) as f: self.make_file("sub/run_me.py", f.read()) out_cov = self.run_command("coverage run -m sub.run_me") out_py = self.run_command("python -m sub.run_me") self.assertMultiLineEqual(out_cov, out_py) + self.assert_execfile_output(out_cov) def test_coverage_run_dashm_is_like_python_dashm_with__main__207(self): if sys.version_info < (2, 7): - # Coverage isn't bug-for-bug compatible in the behavior of -m for + # Coverage.py isn't bug-for-bug compatible in the behavior of -m for # Pythons < 2.7 - self.skip("-m doesn't work the same < Python 2.7") + self.skipTest("-m doesn't work the same < Python 2.7") # https://bitbucket.org/ned/coveragepy/issue/207 self.make_file("package/__init__.py", "print('init')") self.make_file("package/__main__.py", "print('main')") @@ -409,7 +550,7 @@ class ProcessTest(CoverageTest): def test_fork(self): if not hasattr(os, 'fork'): - self.skip("Can't test os.fork since it doesn't exist.") + self.skipTest("Can't test os.fork since it doesn't exist.") self.make_file("fork.py", """\ import os @@ -436,6 +577,11 @@ class ProcessTest(CoverageTest): # .coverage.machine.123 files. self.assertEqual(self.number_of_data_files(), 2) + # The two data files should have different random numbers at the end of + # the file name. + nums = set(name.rpartition(".")[-1] for name in self.data_files()) + self.assertEqual(len(nums), 2, "Same random: %s" % (self.data_files(),)) + # Combine the parallel coverage data files into .coverage . self.run_command("coverage combine") self.assert_exists(".coverage") @@ -447,7 +593,7 @@ class ProcessTest(CoverageTest): # executed. data = coverage.CoverageData() data.read_file(".coverage") - self.assertEqual(data.summary()['fork.py'], 9) + self.assertEqual(data.line_counts()['fork.py'], 9) def test_warnings(self): self.make_file("hello.py", """\ @@ -531,12 +677,27 @@ class ProcessTest(CoverageTest): self.assertIn("Trace function changed", out) + def test_note(self): + self.make_file(".coveragerc", """\ + [run] + data_file = mydata.dat + note = These are musical notes: ♫𝅗𝅥♩ + """) + self.make_file("simple.py", """print('hello')""") + self.run_command("coverage run simple.py") + + data = CoverageData() + data.read_file("mydata.dat") + infos = data.run_infos() + self.assertEqual(len(infos), 1) + self.assertEqual(infos[0]['note'], u"These are musical notes: ♫𝅗𝅥♩") + def test_fullcoverage(self): # pragma: not covered if env.PY2: # This doesn't work on Python 2. - self.skip("fullcoverage doesn't work on Python 2.") + self.skipTest("fullcoverage doesn't work on Python 2.") # It only works with the C tracer, and if we aren't measuring ourselves. if not env.C_TRACER or env.METACOV: - self.skip("fullcoverage only works with the C tracer.") + self.skipTest("fullcoverage only works with the C tracer.") # fullcoverage is a trick to get stdlib modules measured from # the very beginning of the process. Here we import os and @@ -558,7 +719,27 @@ class ProcessTest(CoverageTest): # The actual number of executed lines in os.py when it's # imported is 120 or so. Just running os.getenv executes # about 5. - self.assertGreater(data.summary()['os.py'], 50) + self.assertGreater(data.line_counts()['os.py'], 50) + + def test_lang_c(self): + if env.PY3 and sys.version_info < (3, 4): + # Python 3.3 can't compile the non-ascii characters in the file name. + self.skipTest("3.3 can't handle this test") + # LANG=C forces getfilesystemencoding on Linux to 'ascii', which causes + # failures with non-ascii file names. We don't want to make a real file + # with strange characters, though, because that gets the test runners + # tangled up. This will isolate the concerns to the coverage.py code. + # https://bitbucket.org/ned/coveragepy/issues/533/exception-on-unencodable-file-name + self.make_file("weird_file.py", r""" + globs = {} + code = "a = 1\nb = 2\n" + exec(compile(code, "wut\xe9\xea\xeb\xec\x01\x02.py", 'exec'), globs) + print(globs['a']) + print(globs['b']) + """) + self.set_environ("LANG", "C") + out = self.run_command("coverage run weird_file.py") + self.assertEqual(out, "1\n2\n") def test_deprecation_warnings(self): # Test that coverage doesn't trigger deprecation warnings. @@ -587,16 +768,23 @@ class ProcessTest(CoverageTest): inst.start() import foo inst.stop() - inst.combine() inst.save() """) out = self.run_command("python run_twice.py") self.assertEqual( out, - "Coverage.py warning: " - "Module foo was previously imported, but not measured.\n" + "Coverage.py warning: Module foo was previously imported, but not measured.\n" ) + def test_module_name(self): + if sys.version_info < (2, 7): + # Python 2.6 thinks that coverage is a package that can't be + # executed + self.skipTest("-m doesn't work the same < Python 2.7") + # https://bitbucket.org/ned/coveragepy/issues/478/help-shows-silly-program-name-when-running + out = self.run_command("python -m coverage") + self.assertIn("Use 'coverage help' for help", out) + class AliasedCommandTest(CoverageTest): """Tests of the version-specific command aliases.""" @@ -621,6 +809,17 @@ class AliasedCommandTest(CoverageTest): out = self.run_command(cmd) self.assertIn("Code coverage for Python", out) + def test_aliases_used_in_messages(self): + cmds = [ + "coverage", + "coverage%d" % sys.version_info[0], + "coverage-%d.%d" % sys.version_info[:2], + ] + for cmd in cmds: + out = self.run_command("%s foobar" % cmd) + self.assertIn("Unknown command: 'foobar'", out) + self.assertIn("Use '%s help' for help" % cmd, out) + class PydocTest(CoverageTest): """Test that pydoc can get our information.""" @@ -749,6 +948,122 @@ class FailUnderEmptyFilesTest(CoverageTest): self.assertEqual(st, 2) +class FailUnder100Test(CoverageTest): + """Tests of the --fail-under switch.""" + + def test_99_8(self): + self.make_file("ninety_nine_eight.py", + "".join("v{i} = {i}\n".format(i=i) for i in range(498)) + + "if v0 > 498:\n v499 = 499\n" + ) + st, _ = self.run_command_status("coverage run ninety_nine_eight.py") + self.assertEqual(st, 0) + st, out = self.run_command_status("coverage report") + self.assertEqual(st, 0) + self.assertEqual( + self.last_line_squeezed(out), + "ninety_nine_eight.py 500 1 99%" + ) + + st, _ = self.run_command_status("coverage report --fail-under=100") + self.assertEqual(st, 2) + + + def test_100(self): + self.make_file("one_hundred.py", + "".join("v{i} = {i}\n".format(i=i) for i in range(500)) + ) + st, _ = self.run_command_status("coverage run one_hundred.py") + self.assertEqual(st, 0) + st, out = self.run_command_status("coverage report") + self.assertEqual(st, 0) + self.assertEqual( + self.last_line_squeezed(out), + "one_hundred.py 500 0 100%" + ) + + st, _ = self.run_command_status("coverage report --fail-under=100") + self.assertEqual(st, 0) + + +class UnicodeFilePathsTest(CoverageTest): + """Tests of using non-ascii characters in the names of files.""" + + def test_accented_dot_py(self): + # Make a file with a non-ascii character in the filename. + self.make_file(u"h\xe2t.py", "print('accented')") + out = self.run_command(u"coverage run h\xe2t.py") + self.assertEqual(out, "accented\n") + + # The HTML report uses ascii-encoded HTML entities. + out = self.run_command("coverage html") + self.assertEqual(out, "") + self.assert_exists(u"htmlcov/h\xe2t_py.html") + with open("htmlcov/index.html") as indexf: + index = indexf.read() + self.assertIn('<a href="hât_py.html">hât.py</a>', index) + + # The XML report is always UTF8-encoded. + out = self.run_command("coverage xml") + self.assertEqual(out, "") + with open("coverage.xml", "rb") as xmlf: + xml = xmlf.read() + self.assertIn(u' filename="h\xe2t.py"'.encode('utf8'), xml) + self.assertIn(u' name="h\xe2t.py"'.encode('utf8'), xml) + + report_expected = ( + u"Name Stmts Miss Cover\n" + u"----------------------------\n" + u"h\xe2t.py 1 0 100%\n" + ) + + if env.PY2: + # pylint: disable=redefined-variable-type + report_expected = report_expected.encode(output_encoding()) + + out = self.run_command("coverage report") + self.assertEqual(out, report_expected) + + def test_accented_directory(self): + # Make a file with a non-ascii character in the directory name. + self.make_file(u"\xe2/accented.py", "print('accented')") + out = self.run_command(u"coverage run \xe2/accented.py") + self.assertEqual(out, "accented\n") + + # The HTML report uses ascii-encoded HTML entities. + out = self.run_command("coverage html") + self.assertEqual(out, "") + self.assert_exists(u"htmlcov/\xe2_accented_py.html") + with open("htmlcov/index.html") as indexf: + index = indexf.read() + self.assertIn('<a href="â_accented_py.html">â%saccented.py</a>' % os.sep, index) + + # The XML report is always UTF8-encoded. + out = self.run_command("coverage xml") + self.assertEqual(out, "") + with open("coverage.xml", "rb") as xmlf: + xml = xmlf.read() + self.assertIn(u' filename="\xe2/accented.py"'.encode('utf8'), xml) + self.assertIn(u' name="accented.py"'.encode('utf8'), xml) + self.assertIn( + u'<package branch-rate="0" complexity="0" line-rate="1" name="\xe2">'.encode('utf8'), + xml + ) + + report_expected = ( + u"Name Stmts Miss Cover\n" + u"-----------------------------------\n" + u"\xe2%saccented.py 1 0 100%%\n" % os.sep + ) + + if env.PY2: + # pylint: disable=redefined-variable-type + report_expected = report_expected.encode(output_encoding()) + + out = self.run_command("coverage report") + self.assertEqual(out, report_expected) + + def possible_pth_dirs(): """Produce a sequence of directories for trying to write .pth files.""" # First look through sys.path, and we find a .pth file, then it's a good @@ -764,24 +1079,38 @@ def possible_pth_dirs(): yield distutils.sysconfig.get_python_lib() +def find_writable_pth_directory(): + """Find a place to write a .pth file.""" + for pth_dir in possible_pth_dirs(): # pragma: part covered + try_it = os.path.join(pth_dir, "touch_{0}.it".format(WORKER)) + with open(try_it, "w") as f: + try: + f.write("foo") + except (IOError, OSError): # pragma: not covered + continue + + os.remove(try_it) + return pth_dir + + return None + +WORKER = os.environ.get('PYTEST_XDIST_WORKER', '') +PTH_DIR = find_writable_pth_directory() + + class ProcessCoverageMixin(object): - """Set up a .pth file that causes all sub-processes to be coverage'd""" + """Set up a .pth file to coverage-measure all sub-processes.""" def setUp(self): super(ProcessCoverageMixin, self).setUp() - # Find a place to put a .pth file. + + # Create the .pth file. + self.assert_(PTH_DIR) pth_contents = "import coverage; coverage.process_startup()\n" - for pth_dir in possible_pth_dirs(): # pragma: part covered - pth_path = os.path.join(pth_dir, "subcover.pth") - with open(pth_path, "w") as pth: - try: - pth.write(pth_contents) - self.pth_path = pth_path - break - except (IOError, OSError): # pragma: not covered - pass - else: # pragma: not covered - raise Exception("Couldn't find a place for the .pth file") + pth_path = os.path.join(PTH_DIR, "subcover_{0}.pth".format(WORKER)) + with open(pth_path, "w") as pth: + pth.write(pth_contents) + self.pth_path = pth_path self.addCleanup(os.remove, self.pth_path) @@ -789,11 +1118,8 @@ class ProcessCoverageMixin(object): class ProcessStartupTest(ProcessCoverageMixin, CoverageTest): """Test that we can measure coverage in sub-processes.""" - def test_subprocess_with_pth_files(self): # pragma: not covered - if env.METACOV: - self.skip( - "Can't test sub-process pth file suppport during metacoverage" - ) + def setUp(self): + super(ProcessStartupTest, self).setUp() # Main will run sub.py self.make_file("main.py", """\ @@ -806,12 +1132,24 @@ class ProcessStartupTest(ProcessCoverageMixin, CoverageTest): with open("out.txt", "w") as f: f.write("Hello, world!\\n") """) + + def test_subprocess_with_pth_files(self): # pragma: not covered + if env.METACOV: + self.skipTest("Can't test sub-process pth file suppport during metacoverage") + + # An existing data file should not be read when a subprocess gets + # measured automatically. Create the data file here with bogus data in + # it. + data = coverage.CoverageData() + data.add_lines({os.path.abspath('sub.py'): dict.fromkeys(range(100))}) + data.write_file(".mycovdata") + self.make_file("coverage.ini", """\ [run] data_file = .mycovdata """) self.set_environ("COVERAGE_PROCESS_START", "coverage.ini") - import main # pylint: disable=import-error,unused-variable + import main # pylint: disable=import-error, unused-variable with open("out.txt") as f: self.assertEqual(f.read(), "Hello, world!\n") @@ -820,7 +1158,37 @@ class ProcessStartupTest(ProcessCoverageMixin, CoverageTest): self.assert_exists(".mycovdata") data = coverage.CoverageData() data.read_file(".mycovdata") - self.assertEqual(data.summary()['sub.py'], 2) + self.assertEqual(data.line_counts()['sub.py'], 2) + + def test_subprocess_with_pth_files_and_parallel(self): # pragma: not covered + # https://bitbucket.org/ned/coveragepy/issues/492/subprocess-coverage-strange-detection-of + if env.METACOV: + self.skipTest("Can't test sub-process pth file suppport during metacoverage") + + self.make_file("coverage.ini", """\ + [run] + parallel = true + """) + + self.set_environ("COVERAGE_PROCESS_START", "coverage.ini") + self.run_command("coverage run main.py") + + with open("out.txt") as f: + self.assertEqual(f.read(), "Hello, world!\n") + + self.run_command("coverage combine") + + # assert that the combined .coverage data file is correct + self.assert_exists(".coverage") + data = coverage.CoverageData() + data.read_file(".coverage") + self.assertEqual(data.line_counts()['sub.py'], 2) + + # assert that there are *no* extra data files left over after a combine + data_files = glob.glob(os.getcwd() + '/.coverage*') + self.assertEqual(len(data_files), 1, + "Expected only .coverage after combine, looks like there are " + "extra data files that were not cleaned up: %r" % data_files) class ProcessStartupWithSourceTest(ProcessCoverageMixin, CoverageTest): @@ -842,21 +1210,20 @@ class ProcessStartupWithSourceTest(ProcessCoverageMixin, CoverageTest): ): # pragma: not covered """Run the test for a particular combination of factors. - Arguments: - dashm (str): Either "" (run the program as a file) or "-m" (run the - program as a module). + The arguments are all strings: - package (str): Either "" (put the source at the top level) or a - package name to use to hold the source. + * `dashm`: Either "" (run the program as a file) or "-m" (run the + program as a module). - source (str): Either "main" or "sub", which file to use as the - ``--source`` argument. + * `package`: Either "" (put the source at the top level) or a + package name to use to hold the source. + + * `source`: Either "main" or "sub", which file to use as the + ``--source`` argument. """ if env.METACOV: - self.skip( - "Can't test sub-process pth file suppport during metacoverage" - ) + self.skipTest("Can't test sub-process pth file suppport during metacoverage") def fullname(modname): """What is the full module name for `modname` for this test?""" @@ -902,7 +1269,7 @@ class ProcessStartupWithSourceTest(ProcessCoverageMixin, CoverageTest): self.assert_exists(".coverage") data = coverage.CoverageData() data.read_file(".coverage") - summary = data.summary() + summary = data.line_counts() print(summary) self.assertEqual(summary[source + '.py'], 2) self.assertEqual(len(summary), 1) @@ -930,3 +1297,9 @@ class ProcessStartupWithSourceTest(ProcessCoverageMixin, CoverageTest): def test_script_pkg_sub(self): self.assert_pth_and_source_work_together('', 'pkg', 'sub') + + +def remove_matching_lines(text, pat): + """Return `text` with all lines matching `pat` removed.""" + lines = [l for l in text.splitlines(True) if not re.search(pat, l)] + return "".join(lines) |