summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2023-01-02 11:43:18 -0500
committerNed Batchelder <ned@nedbatchelder.com>2023-01-02 11:43:18 -0500
commit3f0bce2f5f4658bfa1d9cd6ddb2f6d7e520897e8 (patch)
tree63ad1118e59d74a14d0379ec83a8d193967e6791
parentffc701a47a9a6285d3a65cad893e514f5db39a54 (diff)
downloadpython-coveragepy-git-3f0bce2f5f4658bfa1d9cd6ddb2f6d7e520897e8.tar.gz
mypy: partial debug.py and pytracer.py
-rw-r--r--coverage/control.py2
-rw-r--r--coverage/debug.py88
-rw-r--r--coverage/pytracer.py22
3 files changed, 70 insertions, 42 deletions
diff --git a/coverage/control.py b/coverage/control.py
index e5cabd5b..8ac6781e 100644
--- a/coverage/control.py
+++ b/coverage/control.py
@@ -225,7 +225,7 @@ class Coverage(TConfigurable):
data_file = None
# This is injectable by tests.
- self._debug_file = None
+ self._debug_file: Optional[IO[str]] = None
self._auto_load = self._auto_save = auto_data
self._data_suffix_specified = data_suffix
diff --git a/coverage/debug.py b/coverage/debug.py
index 7ed8937c..82de3c29 100644
--- a/coverage/debug.py
+++ b/coverage/debug.py
@@ -3,6 +3,8 @@
"""Control of and utilities for debugging."""
+from __future__ import annotations
+
import contextlib
import functools
import inspect
@@ -15,7 +17,10 @@ import sys
import types
import _thread
-from typing import Any, Callable, Iterable, Iterator, Tuple
+from typing import (
+ Any, Callable, Generator, IO, Iterable, Iterator, Optional, List, Tuple,
+ cast,
+)
from coverage.misc import isolate_module
@@ -25,7 +30,7 @@ os = isolate_module(os)
# When debugging, it can be helpful to force some options, especially when
# debugging the configuration mechanisms you usually use to control debugging!
# This is a list of forced debugging options.
-FORCED_DEBUG = []
+FORCED_DEBUG: List[str] = []
FORCED_DEBUG_FILE = None
@@ -34,7 +39,7 @@ class DebugControl:
show_repr_attr = False # For AutoReprMixin
- def __init__(self, options, output):
+ def __init__(self, options: Iterable[str], output: Optional[IO[str]]) -> None:
"""Configure the options and output file for debugging."""
self.options = list(options) + FORCED_DEBUG
self.suppress_callers = False
@@ -49,17 +54,17 @@ class DebugControl:
)
self.raw_output = self.output.outfile
- def __repr__(self):
+ def __repr__(self) -> str:
return f"<DebugControl options={self.options!r} raw_output={self.raw_output!r}>"
- def should(self, option):
+ def should(self, option: str) -> bool:
"""Decide whether to output debug information in category `option`."""
if option == "callers" and self.suppress_callers:
return False
return (option in self.options)
@contextlib.contextmanager
- def without_callers(self):
+ def without_callers(self) -> Generator[None, None, None]:
"""A context manager to prevent call stacks from being logged."""
old = self.suppress_callers
self.suppress_callers = True
@@ -68,7 +73,7 @@ class DebugControl:
finally:
self.suppress_callers = old
- def write(self, msg):
+ def write(self, msg: str) -> None:
"""Write a line of debug output.
`msg` is the line to write. A newline will be appended.
@@ -86,26 +91,26 @@ class DebugControl:
class DebugControlString(DebugControl):
"""A `DebugControl` that writes to a StringIO, for testing."""
- def __init__(self, options):
+ def __init__(self, options: Iterable[str]) -> None:
super().__init__(options, io.StringIO())
- def get_output(self):
+ def get_output(self) -> str:
"""Get the output text from the `DebugControl`."""
- return self.raw_output.getvalue()
+ return cast(str, self.raw_output.getvalue())
class NoDebugging:
"""A replacement for DebugControl that will never try to do anything."""
- def should(self, option): # pylint: disable=unused-argument
+ def should(self, option: str) -> bool: # pylint: disable=unused-argument
"""Should we write debug messages? Never."""
return False
- def write(self, msg):
+ def write(self, msg: str) -> None:
"""This will never be called."""
raise AssertionError("NoDebugging.write should never be called.")
-def info_header(label):
+def info_header(label: str) -> str:
"""Make a nice header string."""
return "--{:-<60s}".format(" "+label+" ")
@@ -155,7 +160,7 @@ def write_formatted_info(
write(f" {line}")
-def short_stack(limit=None, skip=0):
+def short_stack(limit: Optional[int]=None, skip: int=0) -> str:
"""Return a string summarizing the call stack.
The string is multi-line, with one line per stack frame. Each line shows
@@ -177,21 +182,25 @@ def short_stack(limit=None, skip=0):
return "\n".join("%30s : %s:%d" % (t[3], t[1], t[2]) for t in stack)
-def dump_stack_frames(limit=None, out=None, skip=0):
+def dump_stack_frames(
+ limit: Optional[int]=None,
+ out: Optional[IO[str]]=None,
+ skip: int=0
+) -> None:
"""Print a summary of the stack to stdout, or someplace else."""
out = out or sys.stdout
out.write(short_stack(limit=limit, skip=skip+1))
out.write("\n")
-def clipped_repr(text, numchars=50):
+def clipped_repr(text: str, numchars: int=50) -> str:
"""`repr(text)`, but limited to `numchars`."""
r = reprlib.Repr()
r.maxstring = numchars
return r.repr(text)
-def short_id(id64):
+def short_id(id64: int) -> int:
"""Given a 64-bit id, make a shorter 16-bit one."""
id16 = 0
for offset in range(0, 64, 16):
@@ -199,7 +208,7 @@ def short_id(id64):
return id16 & 0xFFFF
-def add_pid_and_tid(text):
+def add_pid_and_tid(text: str) -> str:
"""A filter to add pid and tid to debug messages."""
# Thread ids are useful, but too long. Make a shorter one.
tid = f"{short_id(_thread.get_ident()):04x}"
@@ -211,7 +220,7 @@ class AutoReprMixin:
"""A mixin implementing an automatic __repr__ for debugging."""
auto_repr_ignore = ['auto_repr_ignore', '$coverage.object_id']
- def __repr__(self):
+ def __repr__(self) -> str:
show_attrs = (
(k, v) for k, v in self.__dict__.items()
if getattr(v, "show_repr_attr", True)
@@ -225,7 +234,7 @@ class AutoReprMixin:
)
-def simplify(v): # pragma: debugging
+def simplify(v: Any) -> Any: # pragma: debugging
"""Turn things which are nearly dict/list/etc into dict/list/etc."""
if isinstance(v, dict):
return {k:simplify(vv) for k, vv in v.items()}
@@ -237,13 +246,13 @@ def simplify(v): # pragma: debugging
return v
-def pp(v): # pragma: debugging
+def pp(v: Any) -> None: # pragma: debugging
"""Debug helper to pretty-print data, including SimpleNamespace objects."""
# Might not be needed in 3.9+
pprint.pprint(simplify(v))
-def filter_text(text, filters):
+def filter_text(text: str, filters: Iterable[Callable[[str], str]]) -> str:
"""Run `text` through a series of filters.
`filters` is a list of functions. Each takes a string and returns a
@@ -266,10 +275,10 @@ def filter_text(text, filters):
class CwdTracker: # pragma: debugging
"""A class to add cwd info to debug messages."""
- def __init__(self):
- self.cwd = None
+ def __init__(self) -> None:
+ self.cwd: Optional[str] = None
- def filter(self, text):
+ def filter(self, text: str) -> str:
"""Add a cwd message for each new cwd."""
cwd = os.getcwd()
if cwd != self.cwd:
@@ -280,7 +289,12 @@ class CwdTracker: # pragma: debugging
class DebugOutputFile: # pragma: debugging
"""A file-like object that includes pid and cwd information."""
- def __init__(self, outfile, show_process, filters):
+ def __init__(
+ self,
+ outfile: Optional[IO[str]],
+ show_process: bool,
+ filters: Iterable[Callable[[str], str]],
+ ):
self.outfile = outfile
self.show_process = show_process
self.filters = list(filters)
@@ -296,7 +310,13 @@ class DebugOutputFile: # pragma: debugging
SINGLETON_ATTR = 'the_one_and_is_interim'
@classmethod
- def get_one(cls, fileobj=None, show_process=True, filters=(), interim=False):
+ def get_one(
+ cls,
+ fileobj: Optional[IO[str]]=None,
+ show_process: bool=True,
+ filters: Iterable[Callable[[str], str]]=(),
+ interim: bool=False,
+ ) -> DebugOutputFile:
"""Get a DebugOutputFile.
If `fileobj` is provided, then a new DebugOutputFile is made with it.
@@ -339,13 +359,15 @@ class DebugOutputFile: # pragma: debugging
sys.modules[cls.SYS_MOD_NAME] = singleton_module
return the_one
- def write(self, text):
+ def write(self, text: str) -> None:
"""Just like file.write, but filter through all our filters."""
+ assert self.outfile is not None
self.outfile.write(filter_text(text, self.filters))
self.outfile.flush()
- def flush(self):
+ def flush(self) -> None:
"""Flush our file."""
+ assert self.outfile is not None
self.outfile.flush()
@@ -388,7 +410,11 @@ OBJ_IDS = itertools.count()
CALLS = itertools.count()
OBJ_ID_ATTR = "$coverage.object_id"
-def show_calls(show_args=True, show_stack=False, show_return=False): # pragma: debugging
+def show_calls(
+ show_args: bool=True,
+ show_stack: bool=False,
+ show_return: bool=False,
+) -> Callable[..., Any]: # pragma: debugging
"""A method decorator to debug-log each call to the function."""
def _decorator(func):
@functools.wraps(func)
@@ -422,7 +448,7 @@ def show_calls(show_args=True, show_stack=False, show_return=False): # pragma
return _decorator
-def _clean_stack_line(s): # pragma: debugging
+def _clean_stack_line(s: str) -> str: # pragma: debugging
"""Simplify some paths in a stack trace, for compactness."""
s = s.strip()
s = s.replace(os.path.dirname(__file__) + '/', '')
diff --git a/coverage/pytracer.py b/coverage/pytracer.py
index c50c9c19..027e8e7e 100644
--- a/coverage/pytracer.py
+++ b/coverage/pytracer.py
@@ -8,7 +8,7 @@ import dis
import sys
from types import FrameType
-from typing import Any, Callable, Dict, Mapping, Optional
+from typing import Any, Callable, Dict, Optional
from coverage import env
from coverage.types import TFileDisposition, TTraceData, TTraceFn, TTracer, TWarnFn
@@ -51,7 +51,7 @@ class PyTracer(TTracer):
self.data: TTraceData
self.trace_arcs = False
self.should_trace: Callable[[str, FrameType], TFileDisposition]
- self.should_trace_cache: Mapping[str, Optional[TFileDisposition]]
+ self.should_trace_cache: Dict[str, Optional[TFileDisposition]]
self.should_start_context: Optional[Callable[[FrameType], Optional[str]]] = None
self.switch_context: Optional[Callable[[Optional[str]], None]] = None
self.warn: TWarnFn
@@ -61,8 +61,8 @@ class PyTracer(TTracer):
self.cur_file_data = None
self.last_line = 0 # int, but uninitialized.
- self.cur_file_name = None
- self.context = None
+ self.cur_file_name: Optional[str] = None
+ self.context: Optional[str] = None
self.started_context = False
self.data_stack = []
@@ -84,7 +84,7 @@ class PyTracer(TTracer):
files = len(self.data)
return f"<PyTracer at 0x{me:x}: {points} data points in {files} files>"
- def log(self, marker, *args) -> None:
+ def log(self, marker: str, *args: Any) -> None:
"""For hard-core logging of what this tracer is doing."""
with open("/tmp/debug_trace.txt", "a") as f:
f.write("{} {}[{}]".format(
@@ -93,13 +93,13 @@ class PyTracer(TTracer):
len(self.data_stack),
))
if 0: # if you want thread ids..
- f.write(".{:x}.{:x}".format(
+ f.write(".{:x}.{:x}".format( # type: ignore[unreachable]
self.thread.ident,
self.threading.current_thread().ident,
))
f.write(" {}".format(" ".join(map(str, args))))
if 0: # if you want callers..
- f.write(" | ")
+ f.write(" | ") # type: ignore[unreachable]
stack = " / ".join(
(fname or "???").rpartition("/")[-1]
for _, fname, _, _ in self.data_stack
@@ -107,7 +107,7 @@ class PyTracer(TTracer):
f.write(stack)
f.write("\n")
- def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> TTraceFn:
+ def _trace(self, frame: FrameType, event: str, arg_unused: Any) -> Optional[TTraceFn]:
"""The trace function passed to sys.settrace."""
if THIS_FILE in frame.f_code.co_filename:
@@ -119,8 +119,8 @@ class PyTracer(TTracer):
# The PyTrace.stop() method has been called, possibly by another
# thread, let's deactivate ourselves now.
if 0:
- self.log("---\nX", frame.f_code.co_filename, frame.f_lineno)
- f = frame
+ f = frame # type: ignore[unreachable]
+ self.log("---\nX", f.f_code.co_filename, f.f_lineno)
while f:
self.log(">", f.f_code.co_filename, f.f_lineno, f.f_code.co_name, f.f_trace)
f = f.f_back
@@ -140,6 +140,7 @@ class PyTracer(TTracer):
if context_maybe is not None:
self.context = context_maybe
started_context = True
+ assert self.switch_context is not None
self.switch_context(self.context)
else:
started_context = False
@@ -175,6 +176,7 @@ class PyTracer(TTracer):
self.cur_file_data = None
if disp.trace:
tracename = disp.source_filename
+ assert tracename is not None
if tracename not in self.data:
self.data[tracename] = set()
self.cur_file_data = self.data[tracename]