summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2023-01-01 18:57:42 -0500
committerNed Batchelder <ned@nedbatchelder.com>2023-01-01 19:27:47 -0500
commit0c9b5e0e9da9c2cffd50db7b28142d22d0f66cee (patch)
treee407697322a76e40ccc38ac44aa4d2f2489d6dcc
parent8f4d404c8f9044ea1c3bf2479236f51d7706cb76 (diff)
downloadpython-coveragepy-git-0c9b5e0e9da9c2cffd50db7b28142d22d0f66cee.tar.gz
mypy: check collector.py and plugin_support.py
-rw-r--r--.editorconfig3
-rw-r--r--MANIFEST.in1
-rw-r--r--coverage/cmdline.py4
-rw-r--r--coverage/collector.py136
-rw-r--r--coverage/control.py12
-rw-r--r--coverage/disposition.py6
-rw-r--r--coverage/inorout.py12
-rw-r--r--coverage/plugin.py12
-rw-r--r--coverage/plugin_support.py108
-rw-r--r--coverage/pytracer.py46
-rw-r--r--coverage/sqldata.py10
-rw-r--r--coverage/tracer.pyi35
-rw-r--r--coverage/types.py99
-rw-r--r--tests/test_oddball.py2
-rw-r--r--tox.ini4
15 files changed, 337 insertions, 153 deletions
diff --git a/.editorconfig b/.editorconfig
index f560af74..ae430ffd 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -18,6 +18,9 @@ trim_trailing_whitespace = true
[*.py]
max_line_length = 100
+[*.pyi]
+max_line_length = 100
+
[*.c]
max_line_length = 100
diff --git a/MANIFEST.in b/MANIFEST.in
index 1db4d7f6..b1616dd0 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -29,6 +29,7 @@ recursive-include ci *
recursive-include lab *
recursive-include .github *
+recursive-include coverage *.pyi
recursive-include coverage/fullcoverage *.py
recursive-include coverage/ctracer *.c *.h
diff --git a/coverage/cmdline.py b/coverage/cmdline.py
index 4a00105a..b8ca2e7e 100644
--- a/coverage/cmdline.py
+++ b/coverage/cmdline.py
@@ -17,7 +17,7 @@ from typing import cast, Any, List, NoReturn, Optional, Tuple
import coverage
from coverage import Coverage
from coverage import env
-from coverage.collector import CTracer
+from coverage.collector import HAS_CTRACER
from coverage.config import CoverageConfig
from coverage.control import DEFAULT_DATAFILE
from coverage.data import combinable_files, debug_data_file
@@ -573,7 +573,7 @@ def show_help(
help_params = dict(coverage.__dict__)
help_params['program_name'] = program_name
- if CTracer is not None:
+ if HAS_CTRACER:
help_params['extension_modifier'] = 'with C extension'
else:
help_params['extension_modifier'] = 'without C extension'
diff --git a/coverage/collector.py b/coverage/collector.py
index ef1d9b41..a3c537d6 100644
--- a/coverage/collector.py
+++ b/coverage/collector.py
@@ -3,16 +3,29 @@
"""Raw data collector for coverage.py."""
+from __future__ import annotations
+
+import functools
import os
import sys
+from types import FrameType
+from typing import (
+ cast, Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Type, TypeVar,
+)
+
from coverage import env
from coverage.config import CoverageConfig
+from coverage.data import CoverageData
from coverage.debug import short_stack
from coverage.disposition import FileDisposition
from coverage.exceptions import ConfigError
from coverage.misc import human_sorted_items, isolate_module
+from coverage.plugin import CoveragePlugin
from coverage.pytracer import PyTracer
+from coverage.types import (
+ TArc, TFileDisposition, TLineNo, TTraceData, TTraceFn, TTracer, TWarnFn,
+)
os = isolate_module(os)
@@ -20,6 +33,7 @@ os = isolate_module(os)
try:
# Use the C extension code when we can, for speed.
from coverage.tracer import CTracer, CFileDisposition
+ HAS_CTRACER = True
except ImportError:
# Couldn't import the C extension, maybe it isn't built.
if os.getenv('COVERAGE_TEST_TRACER') == 'c': # pragma: part covered
@@ -31,8 +45,9 @@ except ImportError:
# exception here causes all sorts of other noise in unittest.
sys.stderr.write("*** COVERAGE_TEST_TRACER is 'c' but can't import CTracer!\n")
sys.exit(1)
- CTracer = None
+ HAS_CTRACER = False
+T = TypeVar("T")
class Collector:
"""Collects trace data.
@@ -53,15 +68,22 @@ class Collector:
# The stack of active Collectors. Collectors are added here when started,
# and popped when stopped. Collectors on the stack are paused when not
# the top, and resumed when they become the top again.
- _collectors = []
+ _collectors: List[Collector] = []
# The concurrency settings we support here.
LIGHT_THREADS = {"greenlet", "eventlet", "gevent"}
def __init__(
- self, should_trace, check_include, should_start_context, file_mapper,
- timid, branch, warn, concurrency,
- ):
+ self,
+ should_trace: Callable[[str, FrameType], TFileDisposition],
+ check_include: Callable[[str, FrameType], bool],
+ should_start_context: Optional[Callable[[FrameType], Optional[str]]],
+ file_mapper: Callable[[str], str],
+ timid: bool,
+ branch: bool,
+ warn: TWarnFn,
+ concurrency: List[str],
+ ) -> None:
"""Create a collector.
`should_trace` is a function, taking a file name and a frame, and
@@ -107,28 +129,29 @@ class Collector:
self.concurrency = concurrency
assert isinstance(self.concurrency, list), f"Expected a list: {self.concurrency!r}"
+ self.covdata: CoverageData
self.threading = None
- self.covdata = None
- self.static_context = None
+ self.static_context: Optional[str] = None
self.origin = short_stack()
self.concur_id_func = None
- self.mapped_file_cache = {}
- if timid:
- # Being timid: use the simple Python trace function.
- self._trace_class = PyTracer
- else:
- # Being fast: use the C Tracer if it is available, else the Python
- # trace function.
- self._trace_class = CTracer or PyTracer
+ self._trace_class: Type[TTracer]
+ self.file_disposition_class: Type[TFileDisposition]
+
+ use_ctracer = False
+ if HAS_CTRACER and not timid:
+ use_ctracer = True
- if self._trace_class is CTracer:
+ #if HAS_CTRACER and self._trace_class is CTracer:
+ if use_ctracer:
+ self._trace_class = CTracer
self.file_disposition_class = CFileDisposition
self.supports_plugins = True
self.packed_arcs = True
else:
+ self._trace_class = PyTracer
self.file_disposition_class = FileDisposition
self.supports_plugins = False
self.packed_arcs = False
@@ -182,22 +205,22 @@ class Collector:
self.reset()
- def __repr__(self):
+ def __repr__(self) -> str:
return f"<Collector at 0x{id(self):x}: {self.tracer_name()}>"
- def use_data(self, covdata, context):
+ def use_data(self, covdata: CoverageData, context: Optional[str]) -> None:
"""Use `covdata` for recording data."""
self.covdata = covdata
self.static_context = context
self.covdata.set_context(self.static_context)
- def tracer_name(self):
+ def tracer_name(self) -> str:
"""Return the class name of the tracer we're using."""
return self._trace_class.__name__
- def _clear_data(self):
+ def _clear_data(self) -> None:
"""Clear out existing data, but stay ready for more collection."""
- # We used to used self.data.clear(), but that would remove filename
+ # We used to use self.data.clear(), but that would remove filename
# keys and data values that were still in use higher up the stack
# when we are called as part of switch_context.
for d in self.data.values():
@@ -206,18 +229,16 @@ class Collector:
for tracer in self.tracers:
tracer.reset_activity()
- def reset(self):
+ def reset(self) -> None:
"""Clear collected data, and prepare to collect more."""
- # A dictionary mapping file names to dicts with line number keys (if not
- # branch coverage), or mapping file names to dicts with line number
- # pairs as keys (if branch coverage).
- self.data = {}
+ # The trace data we are collecting.
+ self.data: TTraceData = {} # type: ignore[assignment]
# A dictionary mapping file names to file tracer plugin names that will
# handle them.
- self.file_tracers = {}
+ self.file_tracers: Dict[str, str] = {}
- self.disabled_plugins = set()
+ self.disabled_plugins: Set[str] = set()
# The .should_trace_cache attribute is a cache from file names to
# coverage.FileDisposition objects, or None. When a file is first
@@ -248,11 +269,11 @@ class Collector:
self.should_trace_cache = {}
# Our active Tracers.
- self.tracers = []
+ self.tracers: List[TTracer] = []
self._clear_data()
- def _start_tracer(self):
+ def _start_tracer(self) -> TTraceFn:
"""Start a new Tracer object, and store it in self.tracers."""
tracer = self._trace_class()
tracer.data = self.data
@@ -271,6 +292,7 @@ class Collector:
tracer.check_include = self.check_include
if hasattr(tracer, 'should_start_context'):
tracer.should_start_context = self.should_start_context
+ if hasattr(tracer, 'switch_context'):
tracer.switch_context = self.switch_context
if hasattr(tracer, 'disable_plugin'):
tracer.disable_plugin = self.disable_plugin
@@ -288,7 +310,7 @@ class Collector:
#
# New in 3.12: threading.settrace_all_threads: https://github.com/python/cpython/pull/96681
- def _installation_trace(self, frame, event, arg):
+ def _installation_trace(self, frame: FrameType, event: str, arg: Any) -> TTraceFn:
"""Called on new threads, installs the real tracer."""
# Remove ourselves as the trace function.
sys.settrace(None)
@@ -301,7 +323,7 @@ class Collector:
# Return the new trace function to continue tracing in this scope.
return fn
- def start(self):
+ def start(self) -> None:
"""Start collecting trace information."""
if self._collectors:
self._collectors[-1].pause()
@@ -310,7 +332,7 @@ class Collector:
# Check to see whether we had a fullcoverage tracer installed. If so,
# get the stack frames it stashed away for us.
- traces0 = []
+ traces0: List[Tuple[Tuple[FrameType, str, Any], TLineNo]] = []
fn0 = sys.gettrace()
if fn0:
tracer0 = getattr(fn0, '__self__', None)
@@ -341,7 +363,7 @@ class Collector:
if self.threading:
self.threading.settrace(self._installation_trace)
- def stop(self):
+ def stop(self) -> None:
"""Stop collecting trace information."""
assert self._collectors
if self._collectors[-1] is not self:
@@ -360,7 +382,7 @@ class Collector:
if self._collectors:
self._collectors[-1].resume()
- def pause(self):
+ def pause(self) -> None:
"""Pause tracing, but be prepared to `resume`."""
for tracer in self.tracers:
tracer.stop()
@@ -372,7 +394,7 @@ class Collector:
if self.threading:
self.threading.settrace(None)
- def resume(self):
+ def resume(self) -> None:
"""Resume tracing after a `pause`."""
for tracer in self.tracers:
tracer.start()
@@ -381,7 +403,7 @@ class Collector:
else:
self._start_tracer()
- def _activity(self):
+ def _activity(self) -> bool:
"""Has any activity been traced?
Returns a boolean, True if any trace function was invoked.
@@ -389,8 +411,9 @@ class Collector:
"""
return any(tracer.activity() for tracer in self.tracers)
- def switch_context(self, new_context):
+ def switch_context(self, new_context: Optional[str]) -> None:
"""Switch to a new dynamic context."""
+ context: Optional[str]
self.flush_data()
if self.static_context:
context = self.static_context
@@ -400,24 +423,22 @@ class Collector:
context = new_context
self.covdata.set_context(context)
- def disable_plugin(self, disposition):
+ def disable_plugin(self, disposition: TFileDisposition) -> None:
"""Disable the plugin mentioned in `disposition`."""
file_tracer = disposition.file_tracer
+ assert file_tracer is not None
plugin = file_tracer._coverage_plugin
plugin_name = plugin._coverage_plugin_name
self.warn(f"Disabling plug-in {plugin_name!r} due to previous exception")
plugin._coverage_enabled = False
disposition.trace = False
- def cached_mapped_file(self, filename):
+ @functools.lru_cache(maxsize=0)
+ def cached_mapped_file(self, filename: str) -> str:
"""A locally cached version of file names mapped through file_mapper."""
- key = (type(filename), filename)
- try:
- return self.mapped_file_cache[key]
- except KeyError:
- return self.mapped_file_cache.setdefault(key, self.file_mapper(filename))
+ return self.file_mapper(filename)
- def mapped_file_dict(self, d):
+ def mapped_file_dict(self, d: Mapping[str, T]) -> Dict[str, T]:
"""Return a dict like d, but with keys modified by file_mapper."""
# The call to list(items()) ensures that the GIL protects the dictionary
# iterator against concurrent modifications by tracers running
@@ -431,16 +452,17 @@ class Collector:
runtime_err = ex
else:
break
- else:
- raise runtime_err # pragma: cant happen
+ else: # pragma: cant happen
+ assert isinstance(runtime_err, Exception)
+ raise runtime_err
return {self.cached_mapped_file(k): v for k, v in items}
- def plugin_was_disabled(self, plugin):
+ def plugin_was_disabled(self, plugin: CoveragePlugin) -> None:
"""Record that `plugin` was disabled during the run."""
self.disabled_plugins.add(plugin._coverage_plugin_name)
- def flush_data(self):
+ def flush_data(self) -> bool:
"""Save the collected data to our associated `CoverageData`.
Data may have also been saved along the way. This forces the
@@ -456,8 +478,9 @@ class Collector:
# Unpack the line number pairs packed into integers. See
# tracer.c:CTracer_record_pair for the C code that creates
# these packed ints.
- data = {}
- for fname, packeds in self.data.items():
+ arc_data: Dict[str, List[TArc]] = {}
+ packed_data = cast(Dict[str, Set[int]], self.data)
+ for fname, packeds in packed_data.items():
tuples = []
for packed in packeds:
l1 = packed & 0xFFFFF
@@ -467,12 +490,13 @@ class Collector:
if packed & (1 << 41):
l2 *= -1
tuples.append((l1, l2))
- data[fname] = tuples
+ arc_data[fname] = tuples
else:
- data = self.data
- self.covdata.add_arcs(self.mapped_file_dict(data))
+ arc_data = cast(Dict[str, List[TArc]], self.data)
+ self.covdata.add_arcs(self.mapped_file_dict(arc_data))
else:
- self.covdata.add_lines(self.mapped_file_dict(self.data))
+ line_data = cast(Dict[str, Set[int]], self.data)
+ self.covdata.add_lines(self.mapped_file_dict(line_data))
file_tracers = {
k: v for k, v in self.file_tracers.items()
diff --git a/coverage/control.py b/coverage/control.py
index 4306fea7..e5cabd5b 100644
--- a/coverage/control.py
+++ b/coverage/control.py
@@ -25,12 +25,12 @@ from typing import (
from coverage import env
from coverage.annotate import AnnotateReporter
-from coverage.collector import Collector, CTracer
+from coverage.collector import Collector, HAS_CTRACER
from coverage.config import read_coverage_config
from coverage.context import should_start_context_test_function, combine_context_switchers
from coverage.data import CoverageData, combine_parallel_data
from coverage.debug import DebugControl, short_stack, write_formatted_info
-from coverage.disposition import FileDisposition, disposition_debug_msg
+from coverage.disposition import disposition_debug_msg
from coverage.exceptions import ConfigError, CoverageException, CoverageWarning, PluginError
from coverage.files import PathAliases, abs_file, relative_filename, set_relative_directory
from coverage.html import HtmlReporter
@@ -46,7 +46,9 @@ from coverage.python import PythonFileReporter
from coverage.report import render_report
from coverage.results import Analysis
from coverage.summary import SummaryReporter
-from coverage.types import TConfigurable, TConfigSection, TConfigValue, TLineNo, TMorf
+from coverage.types import (
+ TConfigurable, TConfigSection, TConfigValue, TFileDisposition, TLineNo, TMorf,
+)
from coverage.xmlreport import XmlReporter
@@ -362,7 +364,7 @@ class Coverage(TConfigurable):
if wrote_any:
write_formatted_info(self._debug.write, "end", ())
- def _should_trace(self, filename: str, frame: FrameType) -> FileDisposition:
+ def _should_trace(self, filename: str, frame: FrameType) -> TFileDisposition:
"""Decide whether to trace execution in `filename`.
Calls `_should_trace_internal`, and returns the FileDisposition.
@@ -1253,7 +1255,7 @@ class Coverage(TConfigurable):
('coverage_version', covmod.__version__),
('coverage_module', covmod.__file__),
('tracer', self._collector.tracer_name() if hasattr(self, "_collector") else "-none-"),
- ('CTracer', 'available' if CTracer else "unavailable"),
+ ('CTracer', 'available' if HAS_CTRACER else "unavailable"),
('plugins.file_tracers', plugin_info(self._plugins.file_tracers)),
('plugins.configurers', plugin_info(self._plugins.configurers)),
('plugins.context_switchers', plugin_info(self._plugins.context_switchers)),
diff --git a/coverage/disposition.py b/coverage/disposition.py
index 5237c364..3cc6c8d6 100644
--- a/coverage/disposition.py
+++ b/coverage/disposition.py
@@ -7,6 +7,8 @@ from __future__ import annotations
from typing import Optional, Type, TYPE_CHECKING
+from coverage.types import TFileDisposition
+
if TYPE_CHECKING:
from coverage.plugin import FileTracer
@@ -30,7 +32,7 @@ class FileDisposition:
# be implemented in either C or Python. Acting on them is done with these
# functions.
-def disposition_init(cls: Type[FileDisposition], original_filename: str) -> FileDisposition:
+def disposition_init(cls: Type[TFileDisposition], original_filename: str) -> TFileDisposition:
"""Construct and initialize a new FileDisposition object."""
disp = cls()
disp.original_filename = original_filename
@@ -43,7 +45,7 @@ def disposition_init(cls: Type[FileDisposition], original_filename: str) -> File
return disp
-def disposition_debug_msg(disp: FileDisposition) -> str:
+def disposition_debug_msg(disp: TFileDisposition) -> str:
"""Make a nice debug message of what the FileDisposition is doing."""
if disp.trace:
msg = f"Tracing {disp.original_filename!r}"
diff --git a/coverage/inorout.py b/coverage/inorout.py
index 252796f4..860ed354 100644
--- a/coverage/inorout.py
+++ b/coverage/inorout.py
@@ -16,7 +16,9 @@ import sysconfig
import traceback
from types import FrameType, ModuleType
-from typing import cast, Any, Iterable, List, Optional, Set, Tuple, TYPE_CHECKING
+from typing import (
+ cast, Any, Iterable, List, Optional, Set, Tuple, Type, TYPE_CHECKING,
+)
from coverage import env
from coverage.disposition import FileDisposition, disposition_init
@@ -25,7 +27,7 @@ from coverage.files import TreeMatcher, GlobMatcher, ModuleMatcher
from coverage.files import prep_patterns, find_python_files, canonical_filename
from coverage.misc import sys_modules_saved
from coverage.python import source_for_file, source_for_morf
-from coverage.types import TMorf, TWarnFn, TDebugCtl
+from coverage.types import TFileDisposition, TMorf, TWarnFn, TDebugCtl
if TYPE_CHECKING:
from coverage.config import CoverageConfig
@@ -290,9 +292,9 @@ class InOrOut:
self.source_in_third = True
self.plugins: Plugins
- self.disp_class = FileDisposition
+ self.disp_class: Type[TFileDisposition] = FileDisposition
- def should_trace(self, filename: str, frame: Optional[FrameType]=None) -> FileDisposition:
+ def should_trace(self, filename: str, frame: Optional[FrameType]=None) -> TFileDisposition:
"""Decide whether to trace execution in `filename`, with a reason.
This function is called from the trace function. As each new file name
@@ -304,7 +306,7 @@ class InOrOut:
original_filename = filename
disp = disposition_init(self.disp_class, filename)
- def nope(disp: FileDisposition, reason: str) -> FileDisposition:
+ def nope(disp: TFileDisposition, reason: str) -> TFileDisposition:
"""Simple helper to make it easy to return NO."""
disp.trace = False
disp.reason = reason
diff --git a/coverage/plugin.py b/coverage/plugin.py
index 4a7fc235..ccc33337 100644
--- a/coverage/plugin.py
+++ b/coverage/plugin.py
@@ -127,6 +127,9 @@ from coverage.types import TArc, TConfigurable, TLineNo, TSourceTokenLines
class CoveragePlugin:
"""Base class for coverage.py plug-ins."""
+ _coverage_plugin_name: str
+ _coverage_enabled: bool
+
def file_tracer(self, filename: str) -> Optional[FileTracer]: # pylint: disable=unused-argument
"""Get a :class:`FileTracer` object for a file.
@@ -249,7 +252,12 @@ class CoveragePlugin:
return []
-class FileTracer:
+class CoveragePluginBase:
+ """Plugins produce specialized objects, which point back to the original plugin."""
+ _coverage_plugin: CoveragePlugin
+
+
+class FileTracer(CoveragePluginBase):
"""Support needed for files during the execution phase.
File tracer plug-ins implement subclasses of FileTracer to return from
@@ -337,7 +345,7 @@ class FileTracer:
@functools.total_ordering
-class FileReporter:
+class FileReporter(CoveragePluginBase):
"""Support needed for files during the analysis and reporting phases.
File tracer plug-ins implement a subclass of `FileReporter`, and return
diff --git a/coverage/plugin_support.py b/coverage/plugin_support.py
index 0b892391..8ac42491 100644
--- a/coverage/plugin_support.py
+++ b/coverage/plugin_support.py
@@ -3,13 +3,20 @@
"""Support for plugins."""
+from __future__ import annotations
+
import os
import os.path
import sys
+from types import FrameType
+from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
+
+from coverage.config import CoverageConfig
from coverage.exceptions import PluginError
from coverage.misc import isolate_module
from coverage.plugin import CoveragePlugin, FileTracer, FileReporter
+from coverage.types import TArc, TConfigurable, TDebugCtl, TLineNo, TSourceTokenLines
os = isolate_module(os)
@@ -17,18 +24,23 @@ os = isolate_module(os)
class Plugins:
"""The currently loaded collection of coverage.py plugins."""
- def __init__(self):
- self.order = []
- self.names = {}
- self.file_tracers = []
- self.configurers = []
- self.context_switchers = []
+ def __init__(self) -> None:
+ self.order: List[CoveragePlugin] = []
+ self.names: Dict[str, CoveragePlugin] = {}
+ self.file_tracers: List[CoveragePlugin] = []
+ self.configurers: List[CoveragePlugin] = []
+ self.context_switchers: List[CoveragePlugin] = []
- self.current_module = None
- self.debug = None
+ self.current_module: Optional[str] = None
+ self.debug: Optional[TDebugCtl]
@classmethod
- def load_plugins(cls, modules, config, debug=None):
+ def load_plugins(
+ cls,
+ modules: Iterable[str],
+ config: CoverageConfig,
+ debug: Optional[TDebugCtl]=None,
+ ) -> Plugins:
"""Load plugins from `modules`.
Returns a Plugins object with the loaded and configured plugins.
@@ -54,7 +66,7 @@ class Plugins:
plugins.current_module = None
return plugins
- def add_file_tracer(self, plugin):
+ def add_file_tracer(self, plugin: CoveragePlugin) -> None:
"""Add a file tracer plugin.
`plugin` is an instance of a third-party plugin class. It must
@@ -63,7 +75,7 @@ class Plugins:
"""
self._add_plugin(plugin, self.file_tracers)
- def add_configurer(self, plugin):
+ def add_configurer(self, plugin: CoveragePlugin) -> None:
"""Add a configuring plugin.
`plugin` is an instance of a third-party plugin class. It must
@@ -72,7 +84,7 @@ class Plugins:
"""
self._add_plugin(plugin, self.configurers)
- def add_dynamic_context(self, plugin):
+ def add_dynamic_context(self, plugin: CoveragePlugin) -> None:
"""Add a dynamic context plugin.
`plugin` is an instance of a third-party plugin class. It must
@@ -81,7 +93,7 @@ class Plugins:
"""
self._add_plugin(plugin, self.context_switchers)
- def add_noop(self, plugin):
+ def add_noop(self, plugin: CoveragePlugin) -> None:
"""Add a plugin that does nothing.
This is only useful for testing the plugin support.
@@ -89,7 +101,11 @@ class Plugins:
"""
self._add_plugin(plugin, None)
- def _add_plugin(self, plugin, specialized):
+ def _add_plugin(
+ self,
+ plugin: CoveragePlugin,
+ specialized: Optional[List[CoveragePlugin]],
+ ) -> None:
"""Add a plugin object.
`plugin` is a :class:`CoveragePlugin` instance to add. `specialized`
@@ -102,7 +118,6 @@ class Plugins:
labelled = LabelledDebug(f"plugin {self.current_module!r}", self.debug)
plugin = DebugPluginWrapper(plugin, labelled)
- # pylint: disable=attribute-defined-outside-init
plugin._coverage_plugin_name = plugin_name
plugin._coverage_enabled = True
self.order.append(plugin)
@@ -110,13 +125,13 @@ class Plugins:
if specialized is not None:
specialized.append(plugin)
- def __bool__(self):
+ def __bool__(self) -> bool:
return bool(self.order)
- def __iter__(self):
+ def __iter__(self) -> Iterator[CoveragePlugin]:
return iter(self.order)
- def get(self, plugin_name):
+ def get(self, plugin_name: str) -> CoveragePlugin:
"""Return a plugin by name."""
return self.names[plugin_name]
@@ -124,20 +139,20 @@ class Plugins:
class LabelledDebug:
"""A Debug writer, but with labels for prepending to the messages."""
- def __init__(self, label, debug, prev_labels=()):
+ def __init__(self, label: str, debug: TDebugCtl, prev_labels: Iterable[str]=()):
self.labels = list(prev_labels) + [label]
self.debug = debug
- def add_label(self, label):
+ def add_label(self, label: str) -> LabelledDebug:
"""Add a label to the writer, and return a new `LabelledDebug`."""
return LabelledDebug(label, self.debug, self.labels)
- def message_prefix(self):
+ def message_prefix(self) -> str:
"""The prefix to use on messages, combining the labels."""
prefixes = self.labels + ['']
return ":\n".join(" "*i+label for i, label in enumerate(prefixes))
- def write(self, message):
+ def write(self, message: str) -> None:
"""Write `message`, but with the labels prepended."""
self.debug.write(f"{self.message_prefix()}{message}")
@@ -145,12 +160,12 @@ class LabelledDebug:
class DebugPluginWrapper(CoveragePlugin):
"""Wrap a plugin, and use debug to report on what it's doing."""
- def __init__(self, plugin, debug):
+ def __init__(self, plugin: CoveragePlugin, debug: LabelledDebug) -> None:
super().__init__()
self.plugin = plugin
self.debug = debug
- def file_tracer(self, filename):
+ def file_tracer(self, filename: str) -> Optional[FileTracer]:
tracer = self.plugin.file_tracer(filename)
self.debug.write(f"file_tracer({filename!r}) --> {tracer!r}")
if tracer:
@@ -158,64 +173,65 @@ class DebugPluginWrapper(CoveragePlugin):
tracer = DebugFileTracerWrapper(tracer, debug)
return tracer
- def file_reporter(self, filename):
+ def file_reporter(self, filename: str) -> Union[FileReporter, str]:
reporter = self.plugin.file_reporter(filename)
+ assert isinstance(reporter, FileReporter)
self.debug.write(f"file_reporter({filename!r}) --> {reporter!r}")
if reporter:
debug = self.debug.add_label(f"file {filename!r}")
reporter = DebugFileReporterWrapper(filename, reporter, debug)
return reporter
- def dynamic_context(self, frame):
+ def dynamic_context(self, frame: FrameType) -> Optional[str]:
context = self.plugin.dynamic_context(frame)
self.debug.write(f"dynamic_context({frame!r}) --> {context!r}")
return context
- def find_executable_files(self, src_dir):
+ def find_executable_files(self, src_dir: str) -> Iterable[str]:
executable_files = self.plugin.find_executable_files(src_dir)
self.debug.write(f"find_executable_files({src_dir!r}) --> {executable_files!r}")
return executable_files
- def configure(self, config):
+ def configure(self, config: TConfigurable) -> None:
self.debug.write(f"configure({config!r})")
self.plugin.configure(config)
- def sys_info(self):
+ def sys_info(self) -> Iterable[Tuple[str, Any]]:
return self.plugin.sys_info()
class DebugFileTracerWrapper(FileTracer):
"""A debugging `FileTracer`."""
- def __init__(self, tracer, debug):
+ def __init__(self, tracer: FileTracer, debug: LabelledDebug) -> None:
self.tracer = tracer
self.debug = debug
- def _show_frame(self, frame):
+ def _show_frame(self, frame: FrameType) -> str:
"""A short string identifying a frame, for debug messages."""
return "%s@%d" % (
os.path.basename(frame.f_code.co_filename),
frame.f_lineno,
)
- def source_filename(self):
+ def source_filename(self) -> str:
sfilename = self.tracer.source_filename()
self.debug.write(f"source_filename() --> {sfilename!r}")
return sfilename
- def has_dynamic_source_filename(self):
+ def has_dynamic_source_filename(self) -> bool:
has = self.tracer.has_dynamic_source_filename()
self.debug.write(f"has_dynamic_source_filename() --> {has!r}")
return has
- def dynamic_source_filename(self, filename, frame):
+ def dynamic_source_filename(self, filename: str, frame: FrameType) -> Optional[str]:
dyn = self.tracer.dynamic_source_filename(filename, frame)
self.debug.write("dynamic_source_filename({!r}, {}) --> {!r}".format(
filename, self._show_frame(frame), dyn,
))
return dyn
- def line_number_range(self, frame):
+ def line_number_range(self, frame: FrameType) -> Tuple[TLineNo, TLineNo]:
pair = self.tracer.line_number_range(frame)
self.debug.write(f"line_number_range({self._show_frame(frame)}) --> {pair!r}")
return pair
@@ -224,57 +240,57 @@ class DebugFileTracerWrapper(FileTracer):
class DebugFileReporterWrapper(FileReporter):
"""A debugging `FileReporter`."""
- def __init__(self, filename, reporter, debug):
+ def __init__(self, filename: str, reporter: FileReporter, debug: LabelledDebug) -> None:
super().__init__(filename)
self.reporter = reporter
self.debug = debug
- def relative_filename(self):
+ def relative_filename(self) -> str:
ret = self.reporter.relative_filename()
self.debug.write(f"relative_filename() --> {ret!r}")
return ret
- def lines(self):
+ def lines(self) -> Set[TLineNo]:
ret = self.reporter.lines()
self.debug.write(f"lines() --> {ret!r}")
return ret
- def excluded_lines(self):
+ def excluded_lines(self) -> Set[TLineNo]:
ret = self.reporter.excluded_lines()
self.debug.write(f"excluded_lines() --> {ret!r}")
return ret
- def translate_lines(self, lines):
+ def translate_lines(self, lines: Iterable[TLineNo]) -> Set[TLineNo]:
ret = self.reporter.translate_lines(lines)
self.debug.write(f"translate_lines({lines!r}) --> {ret!r}")
return ret
- def translate_arcs(self, arcs):
+ def translate_arcs(self, arcs: Iterable[TArc]) -> Set[TArc]:
ret = self.reporter.translate_arcs(arcs)
self.debug.write(f"translate_arcs({arcs!r}) --> {ret!r}")
return ret
- def no_branch_lines(self):
+ def no_branch_lines(self) -> Set[TLineNo]:
ret = self.reporter.no_branch_lines()
self.debug.write(f"no_branch_lines() --> {ret!r}")
return ret
- def exit_counts(self):
+ def exit_counts(self) -> Dict[TLineNo, int]:
ret = self.reporter.exit_counts()
self.debug.write(f"exit_counts() --> {ret!r}")
return ret
- def arcs(self):
+ def arcs(self) -> Set[TArc]:
ret = self.reporter.arcs()
self.debug.write(f"arcs() --> {ret!r}")
return ret
- def source(self):
+ def source(self) -> str:
ret = self.reporter.source()
self.debug.write("source() --> %d chars" % (len(ret),))
return ret
- def source_token_lines(self):
+ def source_token_lines(self) -> TSourceTokenLines:
ret = list(self.reporter.source_token_lines())
self.debug.write("source_token_lines() --> %d tokens" % (len(ret),))
return ret
diff --git a/coverage/pytracer.py b/coverage/pytracer.py
index 4f138074..c50c9c19 100644
--- a/coverage/pytracer.py
+++ b/coverage/pytracer.py
@@ -7,7 +7,11 @@ import atexit
import dis
import sys
+from types import FrameType
+from typing import Any, Callable, Dict, Mapping, Optional
+
from coverage import env
+from coverage.types import TFileDisposition, TTraceData, TTraceFn, TTracer, TWarnFn
# We need the YIELD_VALUE opcode below, in a comparison-friendly form.
RESUME = dis.opmap.get('RESUME')
@@ -22,7 +26,7 @@ if RESUME is None:
THIS_FILE = __file__.rstrip("co")
-class PyTracer:
+class PyTracer(TTracer):
"""Python implementation of the raw data tracer."""
# Because of poor implementations of trace-function-manipulating tools,
@@ -41,14 +45,17 @@ class PyTracer:
# PyTracer to get accurate results. The command-line --timid argument is
# used to force the use of this tracer.
- def __init__(self):
+ def __init__(self) -> None:
+ # pylint: disable=super-init-not-called
# Attributes set from the collector:
- self.data = None
+ self.data: TTraceData
self.trace_arcs = False
- self.should_trace = None
- self.should_trace_cache = None
- self.should_start_context = None
- self.warn = None
+ self.should_trace: Callable[[str, FrameType], TFileDisposition]
+ self.should_trace_cache: Mapping[str, Optional[TFileDisposition]]
+ self.should_start_context: Optional[Callable[[FrameType], Optional[str]]] = None
+ self.switch_context: Optional[Callable[[Optional[str]], None]] = None
+ self.warn: TWarnFn
+
# The threading module to use, if any.
self.threading = None
@@ -71,14 +78,13 @@ class PyTracer:
# re-create a bound method object all the time.
self._cached_bound_method_trace = self._trace
- def __repr__(self):
- return "<PyTracer at 0x{:x}: {} lines in {} files>".format(
- id(self),
- sum(len(v) for v in self.data.values()),
- len(self.data),
- )
+ def __repr__(self) -> str:
+ me = id(self)
+ points = sum(len(v) for v in self.data.values())
+ files = len(self.data)
+ return f"<PyTracer at 0x{me:x}: {points} data points in {files} files>"
- def log(self, marker, *args):
+ def log(self, marker, *args) -> None:
"""For hard-core logging of what this tracer is doing."""
with open("/tmp/debug_trace.txt", "a") as f:
f.write("{} {}[{}]".format(
@@ -101,7 +107,7 @@ class PyTracer:
f.write(stack)
f.write("\n")
- def _trace(self, frame, event, arg_unused):
+ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
"""The trace function passed to sys.settrace."""
if THIS_FILE in frame.f_code.co_filename:
@@ -242,7 +248,7 @@ class PyTracer:
self.switch_context(None)
return self._cached_bound_method_trace
- def start(self):
+ def start(self) -> TTraceFn:
"""Start this Tracer.
Return a Python function suitable for use with sys.settrace().
@@ -263,7 +269,7 @@ class PyTracer:
sys.settrace(self._cached_bound_method_trace)
return self._cached_bound_method_trace
- def stop(self):
+ def stop(self) -> None:
"""Stop this Tracer."""
# Get the active tracer callback before setting the stop flag to be
# able to detect if the tracer was changed prior to stopping it.
@@ -293,14 +299,14 @@ class PyTracer:
slug="trace-changed",
)
- def activity(self):
+ def activity(self) -> bool:
"""Has there been any activity?"""
return self._activity
- def reset_activity(self):
+ def reset_activity(self) -> None:
"""Reset the activity() flag."""
self._activity = False
- def get_stats(self):
+ def get_stats(self) -> Optional[Dict[str, int]]:
"""Return a dictionary of statistics, or None."""
return None
diff --git a/coverage/sqldata.py b/coverage/sqldata.py
index e7e941a6..eced1616 100644
--- a/coverage/sqldata.py
+++ b/coverage/sqldata.py
@@ -22,7 +22,7 @@ import threading
import zlib
from typing import (
- cast, Any, Callable, Dict, Generator, Iterable, List, Optional,
+ cast, Any, Callable, Collection, Dict, Generator, Iterable, List, Mapping, Optional,
Sequence, Set, Tuple, TypeVar, Union,
)
@@ -430,7 +430,7 @@ class CoverageData(AutoReprMixin):
return None
@_locked
- def set_context(self, context: str) -> None:
+ def set_context(self, context: Optional[str]) -> None:
"""Set the current context for future :meth:`add_lines` etc.
`context` is a str, the name of the context to use for the next data
@@ -474,7 +474,7 @@ class CoverageData(AutoReprMixin):
return self._filename
@_locked
- def add_lines(self, line_data: Dict[str, Sequence[TLineNo]]) -> None:
+ def add_lines(self, line_data: Mapping[str, Collection[TLineNo]]) -> None:
"""Add measured line data.
`line_data` is a dictionary mapping file names to iterables of ints::
@@ -508,7 +508,7 @@ class CoverageData(AutoReprMixin):
)
@_locked
- def add_arcs(self, arc_data: Dict[str, Set[TArc]]) -> None:
+ def add_arcs(self, arc_data: Mapping[str, Collection[TArc]]) -> None:
"""Add measured arc data.
`arc_data` is a dictionary mapping file names to iterables of pairs of
@@ -558,7 +558,7 @@ class CoverageData(AutoReprMixin):
)
@_locked
- def add_file_tracers(self, file_tracers: Dict[str, str]) -> None:
+ def add_file_tracers(self, file_tracers: Mapping[str, str]) -> None:
"""Add per-file plugin information.
`file_tracers` is { filename: plugin_name, ... }
diff --git a/coverage/tracer.pyi b/coverage/tracer.pyi
new file mode 100644
index 00000000..d1281767
--- /dev/null
+++ b/coverage/tracer.pyi
@@ -0,0 +1,35 @@
+# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
+# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
+
+from typing import Any, Dict
+
+from coverage.types import TFileDisposition, TTraceData, TTraceFn, TTracer
+
+class CFileDisposition(TFileDisposition):
+ canonical_filename: Any
+ file_tracer: Any
+ has_dynamic_filename: Any
+ original_filename: Any
+ reason: Any
+ source_filename: Any
+ trace: Any
+ def __init__(self) -> None: ...
+
+class CTracer(TTracer):
+ check_include: Any
+ concur_id_func: Any
+ data: TTraceData
+ disable_plugin: Any
+ file_tracers: Any
+ should_start_context: Any
+ should_trace: Any
+ should_trace_cache: Any
+ switch_context: Any
+ trace_arcs: Any
+ warn: Any
+ def __init__(self) -> None: ...
+ def activity(self) -> bool: ...
+ def get_stats(self) -> Dict[str, int]: ...
+ def reset_activity(self) -> Any: ...
+ def start(self) -> TTraceFn: ...
+ def stop(self) -> None: ...
diff --git a/coverage/types.py b/coverage/types.py
index 79cf5d3a..ddfcdb81 100644
--- a/coverage/types.py
+++ b/coverage/types.py
@@ -5,19 +5,101 @@
Types for use throughout coverage.py.
"""
-from types import ModuleType
+from __future__ import annotations
+
+from types import FrameType, ModuleType
from typing import (
- Any, Dict, Iterable, List, Optional, Tuple, Union,
+ Any, Callable, Dict, Iterable, List, Mapping, Optional, Set, Tuple, Union,
TYPE_CHECKING,
)
if TYPE_CHECKING:
# Protocol is new in 3.8. PYVERSIONS
from typing import Protocol
+
+ from coverage.plugin import FileTracer
+
else:
class Protocol: # pylint: disable=missing-class-docstring
pass
+## Python tracing
+
+class TTraceFn(Protocol):
+ """A Python trace function."""
+ def __call__(
+ self,
+ frame: FrameType,
+ event: str,
+ arg: Any,
+ lineno: Optional[int]=None # Our own twist, see collector.py
+ ) -> TTraceFn:
+ ...
+
+## Coverage.py tracing
+
+# Line numbers are pervasive enough that they deserve their own type.
+TLineNo = int
+
+TArc = Tuple[TLineNo, TLineNo]
+
+class TFileDisposition(Protocol):
+ """A simple value type for recording what to do with a file."""
+
+ original_filename: str
+ canonical_filename: str
+ source_filename: Optional[str]
+ trace: bool
+ reason: str
+ file_tracer: Optional[FileTracer]
+ has_dynamic_filename: bool
+
+
+# When collecting data, we use a dictionary with a few possible shapes. The
+# keys are always file names.
+# - If measuring line coverage, the values are sets of line numbers.
+# - If measuring arcs in the Python tracer, the values are sets of arcs (pairs
+# of line numbers).
+# - If measuring arcs in the C tracer, the values are sets of packed arcs (two
+# line numbers combined into one integer).
+
+TTraceData = Union[
+ Dict[str, Set[TLineNo]],
+ Dict[str, Set[TArc]],
+ Dict[str, Set[int]],
+]
+
+class TTracer(Protocol):
+ """Either CTracer or PyTracer."""
+
+ data: TTraceData
+ trace_arcs: bool
+ should_trace: Callable[[str, FrameType], TFileDisposition]
+ should_trace_cache: Mapping[str, Optional[TFileDisposition]]
+ should_start_context: Optional[Callable[[FrameType], Optional[str]]]
+ switch_context: Optional[Callable[[Optional[str]], None]]
+ warn: TWarnFn
+
+ def __init__(self) -> None:
+ ...
+
+ def start(self) -> TTraceFn:
+ """Start this tracer, returning a trace function."""
+
+ def stop(self) -> None:
+ """Stop this tracer."""
+
+ def activity(self) -> bool:
+ """Has there been any activity?"""
+
+ def reset_activity(self) -> None:
+ """Reset the activity() flag."""
+
+ def get_stats(self) -> Optional[Dict[str, int]]:
+ """Return a dictionary of statistics, or None."""
+
+## Coverage
+
# Many places use kwargs as Coverage kwargs.
TCovKwargs = Any
@@ -56,15 +138,18 @@ class TConfigurable(Protocol):
## Parsing
-# Line numbers are pervasive enough that they deserve their own type.
-TLineNo = int
-
-TArc = Tuple[TLineNo, TLineNo]
-
TMorf = Union[ModuleType, str]
TSourceTokenLines = Iterable[List[Tuple[str, str]]]
+## Plugins
+
+class TPlugin(Protocol):
+ """What all plugins have in common."""
+ _coverage_plugin_name: str
+ _coverage_enabled: bool
+
+
## Debugging
class TWarnFn(Protocol):
diff --git a/tests/test_oddball.py b/tests/test_oddball.py
index 2c35177b..ca139737 100644
--- a/tests/test_oddball.py
+++ b/tests/test_oddball.py
@@ -139,7 +139,7 @@ class RecursionTest(CoverageTest):
assert re.fullmatch(
r"Trace function changed, data is likely wrong: None != " +
r"<bound method PyTracer._trace of " +
- "<PyTracer at 0x[0-9a-fA-F]+: 5 lines in 1 files>>",
+ "<PyTracer at 0x[0-9a-fA-F]+: 5 data points in 1 files>>",
cov._warnings[0],
)
else:
diff --git a/tox.ini b/tox.ini
index 882b38f9..95201b0b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -96,10 +96,10 @@ deps =
setenv =
{[testenv]setenv}
C__B=coverage/__init__.py coverage/__main__.py coverage/annotate.py coverage/bytecode.py
- C_CC=coverage/cmdline.py coverage/config.py coverage/context.py coverage/control.py
+ C_CC=coverage/cmdline.py coverage/collector.py coverage/config.py coverage/context.py coverage/control.py
C_DE=coverage/data.py coverage/disposition.py coverage/env.py coverage/exceptions.py
C_FN=coverage/files.py coverage/inorout.py coverage/jsonreport.py coverage/lcovreport.py coverage/multiproc.py coverage/numbits.py
- C_OP=coverage/parser.py coverage/phystokens.py coverage/plugin.py coverage/python.py
+ C_OP=coverage/parser.py coverage/phystokens.py coverage/plugin.py coverage/plugin_support.py coverage/python.py
C_QZ=coverage/report.py coverage/results.py coverage/sqldata.py coverage/tomlconfig.py coverage/types.py coverage/version.py
T_AN=tests/test_api.py tests/test_cmdline.py tests/goldtest.py tests/helpers.py tests/test_html.py
TYPEABLE={env:C__B} {env:C_CC} {env:C_DE} {env:C_FN} {env:C_OP} {env:C_QZ} {env:T_AN}