# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of logilab-common.
#
# logilab-common is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option) any
# later version.
#
# logilab-common is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with logilab-common. If not, see .
"""logilab-pytest is a tool that eases test running and debugging.
To be able to use logilab-pytest, you should either write tests using
the logilab.common.testlib's framework or the unittest module of the
Python's standard library.
You can customize logilab-pytest's behaviour by defining a ``pytestconf.py``
file somewhere in your test directory. In this file, you can add options or
change the way tests are run.
To add command line options, you must define a ``update_parser`` function in
your ``pytestconf.py`` file. The function must accept a single parameter
that will be the OptionParser's instance to customize.
If you wish to customize the tester, you'll have to define a class named
``CustomPyTester``. This class should extend the default `PyTester` class
defined in the logilab.common.pytest module. Take a look at the `PyTester` and
`DjangoTester` classes for more information about what can be done.
For instance, if you wish to add a custom -l option to specify a loglevel, you
could define the following ``pytestconf.py`` file ::
import logging
from logilab.common.pytest import PyTester
def update_parser(parser):
parser.add_option('-l', '--loglevel', dest='loglevel', action='store',
choices=('debug', 'info', 'warning', 'error', 'critical'),
default='critical', help="the default log level possible choices are "
"('debug', 'info', 'warning', 'error', 'critical')")
return parser
class CustomPyTester(PyTester):
def __init__(self, cvg, options):
super(CustomPyTester, self).__init__(cvg, options)
loglevel = options.loglevel.upper()
logger = logging.getLogger('erudi')
logger.setLevel(logging.getLevelName(loglevel))
In your TestCase class you can then get the value of a specific option with
the ``optval`` method::
class MyTestCase(TestCase):
def test_foo(self):
loglevel = self.optval('loglevel')
# ...
You can also tag your tag your test for fine filtering
With those tag::
from logilab.common.testlib import tag, TestCase
class Exemple(TestCase):
@tag('rouge', 'carre')
def toto(self):
pass
@tag('carre', 'vert')
def tata(self):
pass
@tag('rouge')
def titi(test):
pass
you can filter the function with a simple python expression
* ``toto`` and ``titi`` match ``rouge``
* ``toto``, ``tata`` and ``titi``, match ``rouge or carre``
* ``tata`` and ``titi`` match``rouge ^ carre``
* ``titi`` match ``rouge and not carre``
"""
from __future__ import print_function
import os
import re
import sys
import os.path as osp
from time import process_time, time
try:
from re import Match # type: ignore
except ImportError:
# Match is python > 3.6 only.
#
# To be compatible with python <= 3.6, and still provide some typing, we
# manually define Match, in the same manner it is defined in the re module
# of python > 3.7
# cf https://github.com/python/cpython/blob/3.7/Lib/re.py#L264
Match = type(re.sre_compile.compile("", 0).match("")) # type: ignore
import warnings
import types
import inspect
import traceback
from inspect import isgeneratorfunction, isclass, FrameInfo
from random import shuffle
from itertools import dropwhile
# mypy error: Module 'unittest.runner' has no attribute '_WritelnDecorator'
# but it does
from unittest.runner import _WritelnDecorator # type: ignore
from unittest.suite import TestSuite
from typing import Callable, Any, Optional, List, Tuple, Generator, Dict
from logilab.common.deprecation import callable_deprecated
from logilab.common.fileutils import abspath_listdir
from logilab.common import textutils
from logilab.common import testlib, STD_BLACKLIST
# use the same unittest module as testlib
from logilab.common.testlib import unittest, start_interactive_mode
from logilab.common.testlib import ( # noqa
nocoverage,
pause_trace,
replace_trace,
) # bwcompat
from logilab.common.debugger import Debugger, colorize_source
import doctest
import unittest as unittest_legacy
from .decorators import monkeypatch
__docformat__ = "restructuredtext en"
PYTEST_DOC = """%prog [OPTIONS] [testfile [testpattern]]
examples:
logilab-pytest path/to/mytests.py
logilab-pytest path/to/mytests.py TheseTests
logilab-pytest path/to/mytests.py TheseTests.test_thisone
logilab-pytest path/to/mytests.py -m '(not long and database) or regr'
logilab-pytest one (will run both test_thisone and test_thatone)
logilab-pytest path/to/mytests.py -s not (will skip test_notthisone)
"""
ENABLE_DBC = False
FILE_RESTART = ".pytest.restart"
if not getattr(unittest_legacy, "__package__", None):
try:
import unittest2.suite as unittest_suite
except ImportError:
sys.exit("You have to install python-unittest2 to use this module")
else:
# mypy: Name 'unittest_suite' already defined (possibly by an import))
import unittest.suite as unittest_suite # type: ignore
try:
import django # noqa
from logilab.common.modutils import modpath_from_file, load_module_from_modpath
DJANGO_FOUND = True
except ImportError:
DJANGO_FOUND = False
CONF_FILE = "pytestconf.py"
TESTFILE_RE = re.compile(r"^((unit)?test.*|smoketest)\.py$")
def this_is_a_testfile(filename: str) -> Optional[Match]:
"""returns True if `filename` seems to be a test file"""
return TESTFILE_RE.match(osp.basename(filename))
TESTDIR_RE = re.compile("^(unit)?tests?$")
def this_is_a_testdir(dirpath: str) -> Optional[Match]:
"""returns True if `filename` seems to be a test directory"""
return TESTDIR_RE.match(osp.basename(dirpath))
def load_pytest_conf(path, parser):
"""loads a ``pytestconf.py`` file and update default parser
and / or tester.
"""
namespace = {}
exec(open(path, "rb").read(), namespace)
if "update_parser" in namespace:
namespace["update_parser"](parser)
return namespace.get("CustomPyTester", PyTester)
def project_root(parser, projdir=None):
"""try to find project's root and add it to sys.path"""
if projdir is None:
projdir = os.getcwd()
previousdir = curdir = osp.abspath(projdir)
testercls = PyTester
conf_file_path = osp.join(curdir, CONF_FILE)
if osp.isfile(conf_file_path):
testercls = load_pytest_conf(conf_file_path, parser)
while this_is_a_testdir(curdir) or osp.isfile(osp.join(curdir, "__init__.py")):
newdir = osp.normpath(osp.join(curdir, os.pardir))
if newdir == curdir:
break
previousdir = curdir
curdir = newdir
conf_file_path = osp.join(curdir, CONF_FILE)
if osp.isfile(conf_file_path):
testercls = load_pytest_conf(conf_file_path, parser)
return previousdir, testercls
class GlobalTestReport(object):
"""this class holds global test statistics"""
def __init__(self):
self.ran = 0
self.skipped = 0
self.failures = 0
self.errors = 0
self.ttime = 0
self.ctime = 0
self.modulescount = 0
self.errmodules = []
def feed(self, filename, testresult, ttime, ctime):
"""integrates new test information into internal statistics"""
ran = testresult.testsRun
self.ran += ran
self.skipped += len(getattr(testresult, "skipped", ()))
self.failures += len(testresult.failures)
self.errors += len(testresult.errors)
self.ttime += ttime
self.ctime += ctime
self.modulescount += 1
if not testresult.wasSuccessful():
problems = len(testresult.failures) + len(testresult.errors)
self.errmodules.append((filename[:-3], problems, ran))
def failed_to_test_module(self, filename):
"""called when the test module could not be imported by unittest"""
self.errors += 1
self.modulescount += 1
self.ran += 1
self.errmodules.append((filename[:-3], 1, 1))
def skip_module(self, filename):
self.modulescount += 1
self.ran += 1
self.errmodules.append((filename[:-3], 0, 0))
def __str__(self):
"""this is just presentation stuff"""
line1 = ["Ran %s test cases in %.2fs (%.2fs CPU)" % (self.ran, self.ttime, self.ctime)]
if self.errors:
line1.append("%s errors" % self.errors)
if self.failures:
line1.append("%s failures" % self.failures)
if self.skipped:
line1.append("%s skipped" % self.skipped)
modulesok = self.modulescount - len(self.errmodules)
if self.errors or self.failures:
line2 = "%s modules OK (%s failed)" % (modulesok, len(self.errmodules))
descr = ", ".join(["%s [%s/%s]" % info for info in self.errmodules])
line3 = "\nfailures: %s" % descr
elif modulesok:
line2 = "All %s modules OK" % modulesok
line3 = ""
else:
return ""
return "%s\n%s%s" % (", ".join(line1), line2, line3)
def remove_local_modules_from_sys(testdir):
"""remove all modules from cache that come from `testdir`
This is used to avoid strange side-effects when using the
testall() mode of pytest.
For instance, if we run pytest on this tree::
A/test/test_utils.py
B/test/test_utils.py
we **have** to clean sys.modules to make sure the correct test_utils
module is ran in B
"""
for modname, mod in list(sys.modules.items()):
if mod is None:
continue
if not hasattr(mod, "__file__"):
# this is the case of some built-in modules like sys, imp, marshal
continue
modfile = mod.__file__
# if modfile is not an absolute path, it was probably loaded locally
# during the tests
if not osp.isabs(modfile) or modfile.startswith(testdir):
del sys.modules[modname]
class PyTester(object):
"""encapsulates testrun logic"""
def __init__(self, cvg, options):
self.report = GlobalTestReport()
self.cvg = cvg
self.options = options
self.firstwrite = True
self._errcode = None
def show_report(self):
"""prints the report and returns appropriate exitcode"""
# everything has been ran, print report
print("*" * 79)
print(self.report)
def get_errcode(self):
# errcode set explicitly
if self._errcode is not None:
return self._errcode
return self.report.failures + self.report.errors
def set_errcode(self, errcode):
self._errcode = errcode
errcode = property(get_errcode, set_errcode)
def testall(self, exitfirst=False):
"""walks through current working directory, finds something
which can be considered as a testdir and runs every test there
"""
here = os.getcwd()
for dirname, dirs, _ in os.walk(here):
for skipped in STD_BLACKLIST:
if skipped in dirs:
dirs.remove(skipped)
basename = osp.basename(dirname)
if this_is_a_testdir(basename):
print("going into", dirname)
# we found a testdir, let's explore it !
if not self.testonedir(dirname, exitfirst):
break
dirs[:] = []
if self.report.ran == 0:
print("no test dir found testing here:", here)
# if no test was found during the visit, consider
# the local directory as a test directory even if
# it doesn't have a traditional test directory name
self.testonedir(here)
def testonedir(self, testdir, exitfirst=False):
"""finds each testfile in the `testdir` and runs it
return true when all tests has been executed, false if exitfirst and
some test has failed.
"""
files = abspath_listdir(testdir)
shuffle(files)
for filename in files:
if this_is_a_testfile(filename):
if self.options.exitfirst and not self.options.restart:
# overwrite restart file
try:
restartfile = open(FILE_RESTART, "w")
restartfile.close()
except Exception:
print(
"Error while overwriting succeeded test file :",
osp.join(os.getcwd(), FILE_RESTART),
file=sys.__stderr__,
)
raise
# run test and collect information
prog = self.testfile(filename, batchmode=True)
if exitfirst and (prog is None or not prog.result.wasSuccessful()):
return False
self.firstwrite = True
# clean local modules
remove_local_modules_from_sys(testdir)
return True
def testfile(self, filename, batchmode=False):
"""runs every test in `filename`
:param filename: an absolute path pointing to a unittest file
"""
here = os.getcwd()
dirname = osp.dirname(filename)
if dirname:
os.chdir(dirname)
# overwrite restart file if it has not been done already
if self.options.exitfirst and not self.options.restart and self.firstwrite:
try:
restartfile = open(FILE_RESTART, "w")
restartfile.close()
except Exception:
print(
"Error while overwriting succeeded test file :",
osp.join(os.getcwd(), FILE_RESTART),
file=sys.__stderr__,
)
raise
modname = osp.basename(filename)[:-3]
print((" %s " % osp.basename(filename)).center(70, "="), file=sys.__stderr__)
try:
tstart, cstart = time(), process_time()
try:
testprog = SkipAwareTestProgram(
modname,
batchmode=batchmode,
cvg=self.cvg,
options=self.options,
outstream=sys.stderr,
)
except KeyboardInterrupt:
raise
except SystemExit as exc:
self.errcode = exc.code
raise
except testlib.SkipTest:
print("Module skipped:", filename)
self.report.skip_module(filename)
return None
except Exception:
self.report.failed_to_test_module(filename)
print("unhandled exception occurred while testing", modname, file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return None
tend, cend = time(), process_time()
ttime, ctime = (tend - tstart), (cend - cstart)
self.report.feed(filename, testprog.result, ttime, ctime)
return testprog
finally:
if dirname:
os.chdir(here)
class DjangoTester(PyTester):
def load_django_settings(self, dirname):
"""try to find project's setting and load it"""
curdir = osp.abspath(dirname)
while not osp.isfile(osp.join(curdir, "settings.py")) and osp.isfile(
osp.join(curdir, "__init__.py")
):
newdir = osp.normpath(osp.join(curdir, os.pardir))
if newdir == curdir:
raise AssertionError("could not find settings.py")
curdir = newdir
# late django initialization
settings = load_module_from_modpath(modpath_from_file(osp.join(curdir, "settings.py")))
from django.core.management import setup_environ
setup_environ(settings)
settings.DEBUG = False
self.settings = settings
# add settings dir to pythonpath since it's the project's root
if curdir not in sys.path:
sys.path.insert(1, curdir)
def before_testfile(self):
# Those imports must be done **after** setup_environ was called
from django.test.utils import setup_test_environment
from django.test.utils import create_test_db
setup_test_environment()
create_test_db(verbosity=0)
self.dbname = self.settings.TEST_DATABASE_NAME
def after_testfile(self):
# Those imports must be done **after** setup_environ was called
from django.test.utils import teardown_test_environment
from django.test.utils import destroy_test_db
teardown_test_environment()
print("destroying", self.dbname)
destroy_test_db(self.dbname, verbosity=0)
def testall(self, exitfirst=False):
"""walks through current working directory, finds something
which can be considered as a testdir and runs every test there
"""
for dirname, dirs, files in os.walk(os.getcwd()):
for skipped in ("CVS", ".svn", ".hg"):
if skipped in dirs:
dirs.remove(skipped)
if "tests.py" in files:
if not self.testonedir(dirname, exitfirst):
break
dirs[:] = []
else:
basename = osp.basename(dirname)
if basename in ("test", "tests"):
print("going into", dirname)
# we found a testdir, let's explore it !
if not self.testonedir(dirname, exitfirst):
break
dirs[:] = []
def testonedir(self, testdir, exitfirst=False):
"""finds each testfile in the `testdir` and runs it
return true when all tests has been executed, false if exitfirst and
some test has failed.
"""
# special django behaviour : if tests are splitted in several files,
# remove the main tests.py file and tests each test file separately
testfiles = [fpath for fpath in abspath_listdir(testdir) if this_is_a_testfile(fpath)]
if len(testfiles) > 1:
try:
testfiles.remove(osp.join(testdir, "tests.py"))
except ValueError:
pass
for filename in testfiles:
# run test and collect information
prog = self.testfile(filename, batchmode=True)
if exitfirst and (prog is None or not prog.result.wasSuccessful()):
return False
# clean local modules
remove_local_modules_from_sys(testdir)
return True
def testfile(self, filename, batchmode=False):
"""runs every test in `filename`
:param filename: an absolute path pointing to a unittest file
"""
here = os.getcwd()
dirname = osp.dirname(filename)
if dirname:
os.chdir(dirname)
self.load_django_settings(dirname)
modname = osp.basename(filename)[:-3]
print((" %s " % osp.basename(filename)).center(70, "="), file=sys.stderr)
try:
try:
tstart, cstart = time(), process_time()
self.before_testfile()
testprog = SkipAwareTestProgram(modname, batchmode=batchmode, cvg=self.cvg)
tend, cend = time(), process_time()
ttime, ctime = (tend - tstart), (cend - cstart)
self.report.feed(filename, testprog.result, ttime, ctime)
return testprog
except SystemExit:
raise
except Exception as exc:
import traceback
traceback.print_exc()
self.report.failed_to_test_module(filename)
print("unhandled exception occurred while testing", modname)
print("error: %s" % exc)
return None
finally:
self.after_testfile()
if dirname:
os.chdir(here)
def make_parser():
"""creates the OptionParser instance"""
from optparse import OptionParser
parser = OptionParser(usage=PYTEST_DOC)
parser.newargs = []
def rebuild_cmdline(option, opt, value, parser):
"""carry the option to unittest_main"""
parser.newargs.append(opt)
def rebuild_and_store(option, opt, value, parser):
"""carry the option to unittest_main and store
the value on current parser
"""
parser.newargs.append(opt)
setattr(parser.values, option.dest, True)
def capture_and_rebuild(option, opt, value, parser):
warnings.simplefilter("ignore", DeprecationWarning)
rebuild_cmdline(option, opt, value, parser)
# logilab-pytest options
parser.add_option(
"-t", dest="testdir", default=None, help="directory where the tests will be found"
)
parser.add_option(
"-d", dest="dbc", default=False, action="store_true", help="enable design-by-contract"
)
# unittest_main options provided and passed through logilab-pytest
parser.add_option(
"-v", "--verbose", callback=rebuild_cmdline, action="callback", help="Verbose output"
)
parser.add_option(
"-i",
"--pdb",
callback=rebuild_and_store,
dest="pdb",
action="callback",
help="Enable test failure inspection",
)
parser.add_option(
"-x",
"--exitfirst",
callback=rebuild_and_store,
dest="exitfirst",
default=False,
action="callback",
help="Exit on first failure " "(only make sense when logilab-pytest run one test file)",
)
parser.add_option(
"-R",
"--restart",
callback=rebuild_and_store,
dest="restart",
default=False,
action="callback",
help="Restart tests from where it failed (implies exitfirst) "
"(only make sense if tests previously ran with exitfirst only)",
)
parser.add_option(
"--color", callback=rebuild_cmdline, action="callback", help="colorize tracebacks"
)
parser.add_option(
"-s",
"--skip",
# XXX: I wish I could use the callback action but it
# doesn't seem to be able to get the value
# associated to the option
action="store",
dest="skipped",
default=None,
help="test names matching this name will be skipped "
"to skip several patterns, use commas",
)
parser.add_option(
"-q", "--quiet", callback=rebuild_cmdline, action="callback", help="Minimal output"
)
parser.add_option(
"-P",
"--profile",
default=None,
dest="profile",
help="Profile execution and store data in the given file",
)
parser.add_option(
"-m",
"--match",
default=None,
dest="tags_pattern",
help="only execute test whose tag match the current pattern",
)
if DJANGO_FOUND:
parser.add_option(
"-J",
"--django",
dest="django",
default=False,
action="store_true",
help="use logilab-pytest for django test cases",
)
return parser
def parseargs(parser):
"""Parse the command line and return (options processed), (options to pass to
unittest_main()), (explicitfile or None).
"""
# parse the command line
options, args = parser.parse_args()
filenames = [arg for arg in args if arg.endswith(".py")]
if filenames:
if len(filenames) > 1:
parser.error("only one filename is acceptable")
explicitfile = filenames[0]
args.remove(explicitfile)
else:
explicitfile = None
# someone wants DBC
testlib.ENABLE_DBC = options.dbc
newargs = parser.newargs
if options.skipped:
newargs.extend(["--skip", options.skipped])
# restart implies exitfirst
if options.restart:
options.exitfirst = True
# append additional args to the new sys.argv and let unittest_main
# do the rest
newargs += args
return options, explicitfile
@callable_deprecated("[logilab-common 1.3] logilab-pytest is deprecated, use another test runner")
def run():
parser = make_parser()
rootdir, testercls = project_root(parser)
options, explicitfile = parseargs(parser)
# mock a new command line
sys.argv[1:] = parser.newargs
cvg = None
if "" not in sys.path:
sys.path.insert(0, "")
if DJANGO_FOUND and options.django:
tester = DjangoTester(cvg, options)
else:
tester = testercls(cvg, options)
if explicitfile:
cmd, args = tester.testfile, (explicitfile,)
elif options.testdir:
cmd, args = tester.testonedir, (options.testdir, options.exitfirst)
else:
cmd, args = tester.testall, (options.exitfirst,)
try:
try:
if options.profile:
import hotshot
prof = hotshot.Profile(options.profile)
prof.runcall(cmd, *args)
prof.close()
print("profile data saved in", options.profile)
else:
cmd(*args)
except SystemExit:
raise
except Exception:
import traceback
traceback.print_exc()
finally:
tester.show_report()
sys.exit(tester.errcode)
class SkipAwareTestProgram(unittest.TestProgram):
# XXX: don't try to stay close to unittest.py, use optparse
USAGE = """\
Usage: %(progName)s [options] [test] [...]
Options:
-h, --help Show this message
-v, --verbose Verbose output
-i, --pdb Enable test failure inspection
-x, --exitfirst Exit on first failure
-s, --skip skip test matching this pattern (no regexp for now)
-q, --quiet Minimal output
--color colorize tracebacks
-m, --match Run only test whose tag match this pattern
-P, --profile FILE: Run the tests using cProfile and saving results
in FILE
Examples:
%(progName)s - run default set of tests
%(progName)s MyTestSuite - run suite 'MyTestSuite'
%(progName)s MyTestCase.testSomething - run MyTestCase.testSomething
%(progName)s MyTestCase - run all 'test*' test methods
in MyTestCase
"""
def __init__(
self,
module="__main__",
defaultTest=None,
batchmode=False,
cvg=None,
options=None,
outstream=sys.stderr,
):
self.batchmode = batchmode
self.cvg = cvg
self.options = options
self.outstream = outstream
super(SkipAwareTestProgram, self).__init__(
module=module, defaultTest=defaultTest, testLoader=NonStrictTestLoader()
)
def parseArgs(self, argv):
self.pdbmode = False
self.exitfirst = False
self.skipped_patterns = []
self.test_pattern = None
self.tags_pattern = None
self.colorize = False
self.profile_name = None
import getopt
try:
options, args = getopt.getopt(
argv[1:],
"hHvixrqcp:s:m:P:",
[
"help",
"verbose",
"quiet",
"pdb",
"exitfirst",
"restart",
"skip=",
"color",
"match=",
"profile=",
],
)
for opt, value in options:
if opt in ("-h", "-H", "--help"):
self.usageExit()
if opt in ("-i", "--pdb"):
self.pdbmode = True
if opt in ("-x", "--exitfirst"):
self.exitfirst = True
if opt in ("-r", "--restart"):
self.restart = True
self.exitfirst = True
if opt in ("-q", "--quiet"):
self.verbosity = 0
if opt in ("-v", "--verbose"):
self.verbosity = 2
if opt in ("-s", "--skip"):
self.skipped_patterns = [pat.strip() for pat in value.split(", ")]
if opt == "--color":
self.colorize = True
if opt in ("-m", "--match"):
# self.tags_pattern = value
self.options["tag_pattern"] = value
if opt in ("-P", "--profile"):
self.profile_name = value
self.testLoader.skipped_patterns = self.skipped_patterns
if len(args) == 0 and self.defaultTest is None:
suitefunc = getattr(self.module, "suite", None)
if isinstance(suitefunc, (types.FunctionType, types.MethodType)):
self.test = self.module.suite()
else:
self.test = self.testLoader.loadTestsFromModule(self.module)
return
if len(args) > 0:
self.test_pattern = args[0]
self.testNames = args
else:
self.testNames = (self.defaultTest,)
self.createTests()
except getopt.error as msg:
self.usageExit(msg)
def runTests(self):
if self.profile_name:
import cProfile
cProfile.runctx("self._runTests()", globals(), locals(), self.profile_name)
else:
return self._runTests()
def _runTests(self):
self.testRunner = SkipAwareTextTestRunner(
verbosity=self.verbosity,
stream=self.outstream,
exitfirst=self.exitfirst,
pdbmode=self.pdbmode,
cvg=self.cvg,
test_pattern=self.test_pattern,
skipped_patterns=self.skipped_patterns,
colorize=self.colorize,
batchmode=self.batchmode,
options=self.options,
)
def removeSucceededTests(obj, succTests):
"""Recursive function that removes succTests from
a TestSuite or TestCase
"""
if isinstance(obj, unittest.TestSuite):
removeSucceededTests(obj._tests, succTests)
if isinstance(obj, list):
for el in obj[:]:
if isinstance(el, unittest.TestSuite):
removeSucceededTests(el, succTests)
elif isinstance(el, unittest.TestCase):
descr = ".".join(
(el.__class__.__module__, el.__class__.__name__, el._testMethodName)
)
if descr in succTests:
obj.remove(el)
# take care, self.options may be None
if getattr(self.options, "restart", False):
# retrieve succeeded tests from FILE_RESTART
try:
restartfile = open(FILE_RESTART, "r")
try:
succeededtests = list(elem.rstrip("\n\r") for elem in restartfile.readlines())
removeSucceededTests(self.test, succeededtests)
finally:
restartfile.close()
except Exception as ex:
raise Exception(
"Error while reading succeeded tests into %s: %s"
% (osp.join(os.getcwd(), FILE_RESTART), ex)
)
result = self.testRunner.run(self.test)
# help garbage collection: we want TestSuite, which hold refs to every
# executed TestCase, to be gc'ed
del self.test
if getattr(result, "debuggers", None) and getattr(self, "pdbmode", None):
start_interactive_mode(result)
if not getattr(self, "batchmode", None):
sys.exit(not result.wasSuccessful())
self.result = result
class SkipAwareTextTestRunner(unittest.TextTestRunner):
def __init__(
self,
stream=sys.stderr,
verbosity=1,
exitfirst=False,
pdbmode=False,
cvg=None,
test_pattern=None,
skipped_patterns=(),
colorize=False,
batchmode=False,
options=None,
):
super(SkipAwareTextTestRunner, self).__init__(stream=stream, verbosity=verbosity)
self.exitfirst = exitfirst
self.pdbmode = pdbmode
self.cvg = cvg
self.test_pattern = test_pattern
self.skipped_patterns = skipped_patterns
self.colorize = colorize
self.batchmode = batchmode
self.options = options
def _this_is_skipped(self, testedname: str) -> bool:
return any([(pat in testedname) for pat in self.skipped_patterns])
def _runcondition(self, test: Callable, skipgenerator: bool = True) -> bool:
if isinstance(test, testlib.InnerTest):
testname = test.name
else:
if isinstance(test, testlib.TestCase):
meth = test._get_test_method()
testname = "%s.%s" % (test.__name__, meth.__name__)
elif isinstance(test, types.FunctionType):
func = test
testname = func.__name__
elif isinstance(test, types.MethodType):
cls = test.__self__.__class__
testname = "%s.%s" % (cls.__name__, test.__name__)
else:
return True # Not sure when this happens
if isgeneratorfunction(test) and skipgenerator:
return self.does_match_tags(test) # Let inner tests decide at run time
if self._this_is_skipped(testname):
return False # this was explicitly skipped
if self.test_pattern is not None:
try:
classpattern, testpattern = self.test_pattern.split(".")
klass, name = testname.split(".")
if classpattern not in klass or testpattern not in name:
return False
except ValueError:
if self.test_pattern not in testname:
return False
return self.does_match_tags(test)
def does_match_tags(self, test: Callable) -> bool:
if self.options is not None:
tags_pattern = getattr(self.options, "tags_pattern", None)
if tags_pattern is not None:
tags = getattr(test, "tags", testlib.Tags())
if tags.inherit and isinstance(test, types.MethodType):
tags = tags | getattr(test.__self__.__class__, "tags", testlib.Tags())
return tags.match(tags_pattern)
return True # no pattern
def _makeResult(self) -> "SkipAwareTestResult":
return SkipAwareTestResult(
self.stream,
self.descriptions,
self.verbosity,
self.exitfirst,
self.pdbmode,
self.cvg,
self.colorize,
)
def run(self, test):
"Run the given test case or test suite."
result = self._makeResult()
startTime = time()
test(result, runcondition=self._runcondition, options=self.options)
stopTime = time()
timeTaken = stopTime - startTime
result.printErrors()
if not self.batchmode:
self.stream.writeln(result.separator2)
run = result.testsRun
self.stream.writeln("Ran %d test%s in %.3fs" % (run, run != 1 and "s" or "", timeTaken))
self.stream.writeln()
if not result.wasSuccessful():
if self.colorize:
self.stream.write(textutils.colorize_ansi("FAILED", color="red"))
else:
self.stream.write("FAILED")
else:
if self.colorize:
self.stream.write(textutils.colorize_ansi("OK", color="green"))
else:
self.stream.write("OK")
failed, errored, skipped = map(len, (result.failures, result.errors, result.skipped))
det_results = []
for name, value in (
("failures", result.failures),
("errors", result.errors),
("skipped", result.skipped),
):
if value:
det_results.append("%s=%i" % (name, len(value)))
if det_results:
self.stream.write(" (")
self.stream.write(", ".join(det_results))
self.stream.write(")")
self.stream.writeln("")
return result
class SkipAwareTestResult(unittest._TextTestResult):
def __init__(
self,
stream: _WritelnDecorator,
descriptions: bool,
verbosity: int,
exitfirst: bool = False,
pdbmode: bool = False,
cvg: Optional[Any] = None,
colorize: bool = False,
) -> None:
super(SkipAwareTestResult, self).__init__(stream, descriptions, verbosity)
self.skipped: List[Tuple[Any, Any]] = []
self.debuggers: List = []
self.fail_descrs: List = []
self.error_descrs: List = []
self.exitfirst = exitfirst
self.pdbmode = pdbmode
self.cvg = cvg
self.colorize = colorize
self.pdbclass = Debugger
self.verbose = verbosity > 1
def descrs_for(self, flavour: str) -> List[Tuple[int, str]]:
return getattr(self, "%s_descrs" % flavour.lower())
def _create_pdb(self, test_descr: str, flavour: str) -> None:
self.descrs_for(flavour).append((len(self.debuggers), test_descr))
if self.pdbmode:
self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
def _iter_valid_frames(self, frames: List[FrameInfo]) -> Generator[FrameInfo, Any, None]:
"""only consider non-testlib frames when formatting traceback"""
def invalid(fi):
return osp.abspath(fi[1]) in (lgc_testlib, std_testlib)
lgc_testlib = osp.abspath(__file__)
std_testlib = osp.abspath(unittest.__file__)
for frameinfo in dropwhile(invalid, frames):
yield frameinfo
def _exc_info_to_string(self, err, test):
"""Converts a sys.exc_info()-style tuple of values into a string.
This method is overridden here because we want to colorize
lines if --color is passed, and display local variables if
--verbose is passed
"""
exctype, exc, tb = err
output = ["Traceback (most recent call last)"]
frames = inspect.getinnerframes(tb)
colorize = self.colorize
frames = enumerate(self._iter_valid_frames(frames))
for index, (frame, filename, lineno, funcname, ctx, ctxindex) in frames:
filename = osp.abspath(filename)
if ctx is None: # pyc files or C extensions for instance
source = ""
else:
source = "".join(ctx)
if colorize:
filename = textutils.colorize_ansi(filename, "magenta")
source = colorize_source(source)
output.append(' File "%s", line %s, in %s' % (filename, lineno, funcname))
output.append(" %s" % source.strip())
if self.verbose:
output.append("%r == %r" % (dir(frame), test.__module__))
output.append("")
output.append(" " + " local variables ".center(66, "-"))
for varname, value in sorted(frame.f_locals.items()):
output.append(" %s: %r" % (varname, value))
if varname == "self": # special handy processing for self
for varname, value in sorted(vars(value).items()):
output.append(" self.%s: %r" % (varname, value))
output.append(" " + "-" * 66)
output.append("")
output.append("".join(traceback.format_exception_only(exctype, exc)))
return "\n".join(output)
def addError(self, test, err):
"""err -> (exc_type, exc, tcbk)"""
exc_type, exc, _ = err
if isinstance(exc, testlib.SkipTest):
assert exc_type == testlib.SkipTest
self.addSkip(test, exc)
else:
if self.exitfirst:
self.shouldStop = True
descr = self.getDescription(test)
super(SkipAwareTestResult, self).addError(test, err)
self._create_pdb(descr, "error")
def addFailure(self, test, err):
if self.exitfirst:
self.shouldStop = True
descr = self.getDescription(test)
super(SkipAwareTestResult, self).addFailure(test, err)
self._create_pdb(descr, "fail")
def addSkip(self, test, reason):
self.skipped.append((test, reason))
if self.showAll:
self.stream.writeln("SKIPPED")
elif self.dots:
self.stream.write("S")
def printErrors(self) -> None:
super(SkipAwareTestResult, self).printErrors()
self.printSkippedList()
def printSkippedList(self) -> None:
# format (test, err) compatible with unittest2
for test, err in self.skipped:
descr = self.getDescription(test)
self.stream.writeln(self.separator1)
self.stream.writeln("%s: %s" % ("SKIPPED", descr))
self.stream.writeln("\t%s" % err)
def printErrorList(self, flavour, errors):
for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors):
self.stream.writeln(self.separator1)
self.stream.writeln("%s: %s" % (flavour, descr))
self.stream.writeln(self.separator2)
self.stream.writeln(err)
self.stream.writeln("no stdout".center(len(self.separator2)))
self.stream.writeln("no stderr".center(len(self.separator2)))
orig_call = testlib.TestCase.__call__
@monkeypatch(testlib.TestCase, "__call__")
def call(
self: Any,
result: SkipAwareTestResult = None,
runcondition: Optional[Callable] = None,
options: Optional[Any] = None,
) -> None:
orig_call(self, result=result, runcondition=runcondition, options=options)
# mypy: Item "None" of "Optional[Any]" has no attribute "exitfirst"
# we check it first in the if
if hasattr(options, "exitfirst") and options.exitfirst: # type: ignore
# add this test to restart file
try:
restartfile = open(FILE_RESTART, "a")
try:
descr = ".".join(
(self.__class__.__module__, self.__class__.__name__, self._testMethodName)
)
restartfile.write(descr + os.linesep)
finally:
restartfile.close()
except Exception:
print(
"Error while saving succeeded test into",
osp.join(os.getcwd(), FILE_RESTART),
file=sys.__stderr__,
)
raise
@monkeypatch(testlib.TestCase)
def defaultTestResult(self):
"""return a new instance of the defaultTestResult"""
return SkipAwareTestResult()
class NonStrictTestLoader(unittest.TestLoader):
"""
Overrides default testloader to be able to omit classname when
specifying tests to run on command line.
For example, if the file test_foo.py contains ::
class FooTC(TestCase):
def test_foo1(self): # ...
def test_foo2(self): # ...
def test_bar1(self): # ...
class BarTC(TestCase):
def test_bar2(self): # ...
'python test_foo.py' will run the 3 tests in FooTC
'python test_foo.py FooTC' will run the 3 tests in FooTC
'python test_foo.py test_foo' will run test_foo1 and test_foo2
'python test_foo.py test_foo1' will run test_foo1
'python test_foo.py test_bar' will run FooTC.test_bar1 and BarTC.test_bar2
"""
def __init__(self) -> None:
self.skipped_patterns = ()
# some magic here to accept empty list by extending
# and to provide callable capability
def loadTestsFromNames(self, names: List[str], module: type = None) -> TestSuite:
suites = []
for name in names:
suites.extend(self.loadTestsFromName(name, module))
return self.suiteClass(suites)
def _collect_tests(self, module: type) -> Dict[str, Tuple[type, List[str]]]:
tests = {}
for obj in vars(module).values():
if isclass(obj) and issubclass(obj, unittest.TestCase):
classname = obj.__name__
if classname[0] == "_" or self._this_is_skipped(classname):
continue
methodnames = []
# obj is a TestCase class
for attrname in dir(obj):
if attrname.startswith(self.testMethodPrefix):
attr = getattr(obj, attrname)
if callable(attr):
methodnames.append(attrname)
# keep track of class (obj) for convenience
tests[classname] = (obj, methodnames)
return tests
def loadTestsFromSuite(self, module, suitename):
try:
suite = getattr(module, suitename)()
except AttributeError:
return []
assert hasattr(suite, "_tests"), "%s.%s is not a valid TestSuite" % (
module.__name__,
suitename,
)
# python2.3 does not implement __iter__ on suites, we need to return
# _tests explicitly
return suite._tests
def loadTestsFromName(self, name, module=None):
parts = name.split(".")
if module is None or len(parts) > 2:
# let the base class do its job here
return [super(NonStrictTestLoader, self).loadTestsFromName(name)]
tests = self._collect_tests(module)
collected = []
if len(parts) == 1:
pattern = parts[0]
if callable(getattr(module, pattern, None)) and pattern not in tests:
# consider it as a suite
return self.loadTestsFromSuite(module, pattern)
if pattern in tests:
# case python unittest_foo.py MyTestTC
klass, methodnames = tests[pattern]
for methodname in methodnames:
collected = [klass(methodname) for methodname in methodnames]
else:
# case python unittest_foo.py something
for klass, methodnames in tests.values():
# skip methodname if matched by skipped_patterns
for skip_pattern in self.skipped_patterns:
methodnames = [
methodname
for methodname in methodnames
if skip_pattern not in methodname
]
collected += [
klass(methodname) for methodname in methodnames if pattern in methodname
]
elif len(parts) == 2:
# case "MyClass.test_1"
classname, pattern = parts
klass, methodnames = tests.get(classname, (None, []))
for methodname in methodnames:
collected = [
klass(methodname) for methodname in methodnames if pattern in methodname
]
return collected
def _this_is_skipped(self, testedname: str) -> bool:
# mypy: Need type annotation for 'pat'
# doc doesn't say how to that in list comprehension
return any([(pat in testedname) for pat in self.skipped_patterns]) # type: ignore
def getTestCaseNames(self, testCaseClass: type) -> List[str]:
"""Return a sorted sequence of method names found within testCaseClass"""
is_skipped = self._this_is_skipped
classname = testCaseClass.__name__
if classname[0] == "_" or is_skipped(classname):
return []
testnames = super(NonStrictTestLoader, self).getTestCaseNames(testCaseClass)
return [testname for testname in testnames if not is_skipped(testname)]
# The 2 functions below are modified versions of the TestSuite.run method
# that is provided with unittest2 for python 2.6, in unittest2/suite.py
# It is used to monkeypatch the original implementation to support
# extra runcondition and options arguments (see in testlib.py)
def _ts_run(
self: Any,
result: SkipAwareTestResult,
debug: bool = False,
runcondition: Callable = None,
options: Optional[Any] = None,
) -> SkipAwareTestResult:
self._wrapped_run(result, runcondition=runcondition, options=options)
self._tearDownPreviousClass(None, result)
self._handleModuleTearDown(result)
return result
def _ts_wrapped_run(
self: Any,
result: SkipAwareTestResult,
debug: bool = False,
runcondition: Callable = None,
options: Optional[Any] = None,
) -> SkipAwareTestResult:
for test in self:
if result.shouldStop:
break
if unittest_suite._isnotsuite(test):
self._tearDownPreviousClass(test, result)
self._handleModuleFixture(test, result)
self._handleClassSetUp(test, result)
result._previousTestClass = test.__class__
if getattr(test.__class__, "_classSetupFailed", False) or getattr(
result, "_moduleSetUpFailed", False
):
continue
# --- modifications to deal with _wrapped_run ---
# original code is:
#
# if not debug:
# test(result)
# else:
# test.debug()
if hasattr(test, "_wrapped_run"):
try:
test._wrapped_run(result, debug, runcondition=runcondition, options=options)
except TypeError:
test._wrapped_run(result, debug)
elif not debug:
try:
test(result, runcondition, options)
except TypeError:
test(result)
else:
test.debug()
# --- end of modifications to deal with _wrapped_run ---
return result
if sys.version_info >= (2, 7):
# The function below implements a modified version of the
# TestSuite.run method that is provided with python 2.7, in
# unittest/suite.py
def _ts_run( # noqa
self: Any,
result: SkipAwareTestResult,
debug: bool = False,
runcondition: Callable = None,
options: Optional[Any] = None,
) -> SkipAwareTestResult:
topLevel = False
if getattr(result, "_testRunEntered", False) is False:
result._testRunEntered = topLevel = True
self._wrapped_run(result, debug, runcondition, options)
if topLevel:
self._tearDownPreviousClass(None, result)
self._handleModuleTearDown(result)
result._testRunEntered = False
return result
def enable_dbc(*args):
"""
Without arguments, return True if contracts can be enabled and should be
enabled (see option -d), return False otherwise.
With arguments, return False if contracts can't or shouldn't be enabled,
otherwise weave ContractAspect with items passed as arguments.
"""
if not ENABLE_DBC:
return False
try:
from logilab.aspects.weaver import weaver
from logilab.aspects.lib.contracts import ContractAspect
except ImportError:
sys.stderr.write("Warning: logilab.aspects is not available. Contracts disabled.")
return False
for arg in args:
weaver.weave_module(arg, ContractAspect)
return True
# monkeypatch unittest and doctest (ouch !)
unittest._TextTestResult = SkipAwareTestResult
unittest.TextTestRunner = SkipAwareTextTestRunner
unittest.TestLoader = NonStrictTestLoader
unittest.TestProgram = SkipAwareTestProgram
if sys.version_info >= (2, 4):
doctest.DocTestCase.__bases__ = (testlib.TestCase,)
# XXX check python2.6 compatibility
# doctest.DocTestCase._cleanups = []
# doctest.DocTestCase._out = []
else:
unittest.FunctionTestCase.__bases__ = (testlib.TestCase,)
unittest.TestSuite.run = _ts_run
unittest.TestSuite._wrapped_run = _ts_wrapped_run
if __name__ == "__main__":
run()