summaryrefslogtreecommitdiff
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/datetimetester.py3
-rw-r--r--Lib/test/eintrdata/eintr_tester.py7
-rw-r--r--Lib/test/libregrtest/__init__.py2
-rw-r--r--Lib/test/libregrtest/cmdline.py344
-rw-r--r--Lib/test/libregrtest/main.py455
-rw-r--r--Lib/test/libregrtest/refleak.py202
-rw-r--r--Lib/test/libregrtest/runtest.py242
-rw-r--r--Lib/test/libregrtest/runtest_mp.py224
-rw-r--r--Lib/test/libregrtest/save_env.py285
-rw-r--r--Lib/test/libregrtest/setup.py116
-rw-r--r--Lib/test/lock_tests.py12
-rw-r--r--Lib/test/pickletester.py6
-rw-r--r--[-rwxr-xr-x]Lib/test/regrtest.py1579
-rw-r--r--Lib/test/test_argparse.py15
-rw-r--r--Lib/test/test_binascii.py26
-rw-r--r--Lib/test/test_bytes.py121
-rw-r--r--Lib/test/test_calendar.py8
-rw-r--r--Lib/test/test_codecs.py143
-rw-r--r--Lib/test/test_collections.py8
-rw-r--r--Lib/test/test_deque.py9
-rw-r--r--Lib/test/test_descr.py9
-rw-r--r--Lib/test/test_dictviews.py22
-rw-r--r--Lib/test/test_eintr.py10
-rw-r--r--Lib/test/test_enum.py7
-rw-r--r--Lib/test/test_file.py2
-rw-r--r--Lib/test/test_format.py2
-rw-r--r--Lib/test/test_fstring.py734
-rw-r--r--Lib/test/test_grammar.py38
-rw-r--r--Lib/test/test_imp.py2
-rw-r--r--Lib/test/test_inspect.py42
-rw-r--r--Lib/test/test_itertools.py50
-rw-r--r--Lib/test/test_linecache.py75
-rw-r--r--Lib/test/test_operator.py50
-rw-r--r--Lib/test/test_os.py19
-rw-r--r--Lib/test/test_pyclbr.py2
-rw-r--r--Lib/test/test_pydoc.py16
-rw-r--r--Lib/test/test_regrtest.py570
-rw-r--r--Lib/test/test_richcmp.py25
-rw-r--r--Lib/test/test_rlcompleter.py28
-rw-r--r--Lib/test/test_robotparser.py90
-rw-r--r--Lib/test/test_set.py24
-rw-r--r--Lib/test/test_strptime.py57
-rw-r--r--Lib/test/test_symbol.py55
-rw-r--r--Lib/test/test_time.py598
-rw-r--r--Lib/test/test_tokenize.py55
-rw-r--r--Lib/test/test_tools/test_unparse.py9
-rw-r--r--Lib/test/test_urlparse.py36
-rw-r--r--Lib/test/test_userdict.py4
-rw-r--r--Lib/test/test_warnings/data/import_warning.py2
-rw-r--r--Lib/test/test_zipimport.py21
50 files changed, 4314 insertions, 2147 deletions
diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py
index 63c3ae84bd..11deffccd7 100644
--- a/Lib/test/datetimetester.py
+++ b/Lib/test/datetimetester.py
@@ -281,7 +281,8 @@ class TestTimeZone(unittest.TestCase):
with self.assertRaises(TypeError): self.EST.dst(5)
def test_tzname(self):
- self.assertEqual('UTC+00:00', timezone(ZERO).tzname(None))
+ self.assertEqual('UTC', timezone.utc.tzname(None))
+ self.assertEqual('UTC', timezone(ZERO).tzname(None))
self.assertEqual('UTC-05:00', timezone(-5 * HOUR).tzname(None))
self.assertEqual('UTC+09:30', timezone(9.5 * HOUR).tzname(None))
self.assertEqual('UTC-00:01', timezone(timedelta(minutes=-1)).tzname(None))
diff --git a/Lib/test/eintrdata/eintr_tester.py b/Lib/test/eintrdata/eintr_tester.py
index e3c36f9ce3..ee6e75bb99 100644
--- a/Lib/test/eintrdata/eintr_tester.py
+++ b/Lib/test/eintrdata/eintr_tester.py
@@ -9,6 +9,7 @@ sub-second periodicity (contrarily to signal()).
"""
import contextlib
+import faulthandler
import io
import os
import select
@@ -50,6 +51,10 @@ class EINTRBaseTest(unittest.TestCase):
signal.setitimer(signal.ITIMER_REAL, cls.signal_delay,
cls.signal_period)
+ # Issue #25277: Use faulthandler to try to debug a hang on FreeBSD
+ if hasattr(faulthandler, 'dump_traceback_later'):
+ faulthandler.dump_traceback_later(10 * 60, exit=True)
+
@classmethod
def stop_alarm(cls):
signal.setitimer(signal.ITIMER_REAL, 0, 0)
@@ -58,6 +63,8 @@ class EINTRBaseTest(unittest.TestCase):
def tearDownClass(cls):
cls.stop_alarm()
signal.signal(signal.SIGALRM, cls.orig_handler)
+ if hasattr(faulthandler, 'cancel_dump_traceback_later'):
+ faulthandler.cancel_dump_traceback_later()
def subprocess(self, *args, **kw):
cmd_args = (sys.executable, '-c') + args
diff --git a/Lib/test/libregrtest/__init__.py b/Lib/test/libregrtest/__init__.py
new file mode 100644
index 0000000000..9f7b1c1fe2
--- /dev/null
+++ b/Lib/test/libregrtest/__init__.py
@@ -0,0 +1,2 @@
+from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES
+from test.libregrtest.main import main, main_in_temp_cwd
diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py
new file mode 100644
index 0000000000..c7e990db1f
--- /dev/null
+++ b/Lib/test/libregrtest/cmdline.py
@@ -0,0 +1,344 @@
+import argparse
+import os
+import sys
+from test import support
+
+
+USAGE = """\
+python -m test [options] [test_name1 [test_name2 ...]]
+python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]]
+"""
+
+DESCRIPTION = """\
+Run Python regression tests.
+
+If no arguments or options are provided, finds all files matching
+the pattern "test_*" in the Lib/test subdirectory and runs
+them in alphabetical order (but see -M and -u, below, for exceptions).
+
+For more rigorous testing, it is useful to use the following
+command line:
+
+python -E -Wd -m test [options] [test_name1 ...]
+"""
+
+EPILOG = """\
+Additional option details:
+
+-r randomizes test execution order. You can use --randseed=int to provide an
+int seed value for the randomizer; this is useful for reproducing troublesome
+test orders.
+
+-s On the first invocation of regrtest using -s, the first test file found
+or the first test file given on the command line is run, and the name of
+the next test is recorded in a file named pynexttest. If run from the
+Python build directory, pynexttest is located in the 'build' subdirectory,
+otherwise it is located in tempfile.gettempdir(). On subsequent runs,
+the test in pynexttest is run, and the next test is written to pynexttest.
+When the last test has been run, pynexttest is deleted. In this way it
+is possible to single step through the test files. This is useful when
+doing memory analysis on the Python interpreter, which process tends to
+consume too many resources to run the full regression test non-stop.
+
+-S is used to continue running tests after an aborted run. It will
+maintain the order a standard run (ie, this assumes -r is not used).
+This is useful after the tests have prematurely stopped for some external
+reason and you want to start running from where you left off rather
+than starting from the beginning.
+
+-f reads the names of tests from the file given as f's argument, one
+or more test names per line. Whitespace is ignored. Blank lines and
+lines beginning with '#' are ignored. This is especially useful for
+whittling down failures involving interactions among tests.
+
+-L causes the leaks(1) command to be run just before exit if it exists.
+leaks(1) is available on Mac OS X and presumably on some other
+FreeBSD-derived systems.
+
+-R runs each test several times and examines sys.gettotalrefcount() to
+see if the test appears to be leaking references. The argument should
+be of the form stab:run:fname where 'stab' is the number of times the
+test is run to let gettotalrefcount settle down, 'run' is the number
+of times further it is run and 'fname' is the name of the file the
+reports are written to. These parameters all have defaults (5, 4 and
+"reflog.txt" respectively), and the minimal invocation is '-R :'.
+
+-M runs tests that require an exorbitant amount of memory. These tests
+typically try to ascertain containers keep working when containing more than
+2 billion objects, which only works on 64-bit systems. There are also some
+tests that try to exhaust the address space of the process, which only makes
+sense on 32-bit systems with at least 2Gb of memory. The passed-in memlimit,
+which is a string in the form of '2.5Gb', determines howmuch memory the
+tests will limit themselves to (but they may go slightly over.) The number
+shouldn't be more memory than the machine has (including swap memory). You
+should also keep in mind that swap memory is generally much, much slower
+than RAM, and setting memlimit to all available RAM or higher will heavily
+tax the machine. On the other hand, it is no use running these tests with a
+limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect
+to use more than memlimit memory will be skipped. The big-memory tests
+generally run very, very long.
+
+-u is used to specify which special resource intensive tests to run,
+such as those requiring large file support or network connectivity.
+The argument is a comma-separated list of words indicating the
+resources to test. Currently only the following are defined:
+
+ all - Enable all special resources.
+
+ none - Disable all special resources (this is the default).
+
+ audio - Tests that use the audio device. (There are known
+ cases of broken audio drivers that can crash Python or
+ even the Linux kernel.)
+
+ curses - Tests that use curses and will modify the terminal's
+ state and output modes.
+
+ largefile - It is okay to run some test that may create huge
+ files. These tests can take a long time and may
+ consume >2GB of disk space temporarily.
+
+ network - It is okay to run tests that use external network
+ resource, e.g. testing SSL support for sockets.
+
+ decimal - Test the decimal module against a large suite that
+ verifies compliance with standards.
+
+ cpu - Used for certain CPU-heavy tests.
+
+ subprocess Run all tests for the subprocess module.
+
+ urlfetch - It is okay to download files required on testing.
+
+ gui - Run tests that require a running GUI.
+
+To enable all resources except one, use '-uall,-<resource>'. For
+example, to run all the tests except for the gui tests, give the
+option '-uall,-gui'.
+"""
+
+
+RESOURCE_NAMES = ('audio', 'curses', 'largefile', 'network',
+ 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui')
+
+class _ArgParser(argparse.ArgumentParser):
+
+ def error(self, message):
+ super().error(message + "\nPass -h or --help for complete help.")
+
+
+def _create_parser():
+ # Set prog to prevent the uninformative "__main__.py" from displaying in
+ # error messages when using "python -m test ...".
+ parser = _ArgParser(prog='regrtest.py',
+ usage=USAGE,
+ description=DESCRIPTION,
+ epilog=EPILOG,
+ add_help=False,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+
+ # Arguments with this clause added to its help are described further in
+ # the epilog's "Additional option details" section.
+ more_details = ' See the section at bottom for more details.'
+
+ group = parser.add_argument_group('General options')
+ # We add help explicitly to control what argument group it renders under.
+ group.add_argument('-h', '--help', action='help',
+ help='show this help message and exit')
+ group.add_argument('--timeout', metavar='TIMEOUT', type=float,
+ help='dump the traceback and exit if a test takes '
+ 'more than TIMEOUT seconds; disabled if TIMEOUT '
+ 'is negative or equals to zero')
+ group.add_argument('--wait', action='store_true',
+ help='wait for user input, e.g., allow a debugger '
+ 'to be attached')
+ group.add_argument('--slaveargs', metavar='ARGS')
+ group.add_argument('-S', '--start', metavar='START',
+ help='the name of the test at which to start.' +
+ more_details)
+
+ group = parser.add_argument_group('Verbosity')
+ group.add_argument('-v', '--verbose', action='count',
+ help='run tests in verbose mode with output to stdout')
+ group.add_argument('-w', '--verbose2', action='store_true',
+ help='re-run failed tests in verbose mode')
+ group.add_argument('-W', '--verbose3', action='store_true',
+ help='display test output on failure')
+ group.add_argument('-q', '--quiet', action='store_true',
+ help='no output unless one or more tests fail')
+ group.add_argument('-o', '--slow', action='store_true', dest='print_slow',
+ help='print the slowest 10 tests')
+ group.add_argument('--header', action='store_true',
+ help='print header with interpreter info')
+
+ group = parser.add_argument_group('Selecting tests')
+ group.add_argument('-r', '--randomize', action='store_true',
+ help='randomize test execution order.' + more_details)
+ group.add_argument('--randseed', metavar='SEED',
+ dest='random_seed', type=int,
+ help='pass a random seed to reproduce a previous '
+ 'random run')
+ group.add_argument('-f', '--fromfile', metavar='FILE',
+ help='read names of tests to run from a file.' +
+ more_details)
+ group.add_argument('-x', '--exclude', action='store_true',
+ help='arguments are tests to *exclude*')
+ group.add_argument('-s', '--single', action='store_true',
+ help='single step through a set of tests.' +
+ more_details)
+ group.add_argument('-m', '--match', metavar='PAT',
+ dest='match_tests',
+ help='match test cases and methods with glob pattern PAT')
+ group.add_argument('-G', '--failfast', action='store_true',
+ help='fail as soon as a test fails (only with -v or -W)')
+ group.add_argument('-u', '--use', metavar='RES1,RES2,...',
+ action='append', type=resources_list,
+ help='specify which special resource intensive tests '
+ 'to run.' + more_details)
+ group.add_argument('-M', '--memlimit', metavar='LIMIT',
+ help='run very large memory-consuming tests.' +
+ more_details)
+ group.add_argument('--testdir', metavar='DIR',
+ type=relative_filename,
+ help='execute test files in the specified directory '
+ '(instead of the Python stdlib test suite)')
+
+ group = parser.add_argument_group('Special runs')
+ group.add_argument('-l', '--findleaks', action='store_true',
+ help='if GC is available detect tests that leak memory')
+ group.add_argument('-L', '--runleaks', action='store_true',
+ help='run the leaks(1) command just before exit.' +
+ more_details)
+ group.add_argument('-R', '--huntrleaks', metavar='RUNCOUNTS',
+ type=huntrleaks,
+ help='search for reference leaks (needs debug build, '
+ 'very slow).' + more_details)
+ group.add_argument('-j', '--multiprocess', metavar='PROCESSES',
+ dest='use_mp', type=int,
+ help='run PROCESSES processes at once')
+ group.add_argument('-T', '--coverage', action='store_true',
+ dest='trace',
+ help='turn on code coverage tracing using the trace '
+ 'module')
+ group.add_argument('-D', '--coverdir', metavar='DIR',
+ type=relative_filename,
+ help='directory where coverage files are put')
+ group.add_argument('-N', '--nocoverdir',
+ action='store_const', const=None, dest='coverdir',
+ help='put coverage files alongside modules')
+ group.add_argument('-t', '--threshold', metavar='THRESHOLD',
+ type=int,
+ help='call gc.set_threshold(THRESHOLD)')
+ group.add_argument('-n', '--nowindows', action='store_true',
+ help='suppress error message boxes on Windows')
+ group.add_argument('-F', '--forever', action='store_true',
+ help='run the specified tests in a loop, until an '
+ 'error happens')
+ group.add_argument('--list-tests', action='store_true',
+ help="only write the name of tests that will be run, "
+ "don't execute them")
+ group.add_argument('-P', '--pgo', dest='pgo', action='store_true',
+ help='enable Profile Guided Optimization training')
+
+ parser.add_argument('args', nargs=argparse.REMAINDER,
+ help=argparse.SUPPRESS)
+
+ return parser
+
+
+def relative_filename(string):
+ # CWD is replaced with a temporary dir before calling main(), so we
+ # join it with the saved CWD so it ends up where the user expects.
+ return os.path.join(support.SAVEDCWD, string)
+
+
+def huntrleaks(string):
+ args = string.split(':')
+ if len(args) not in (2, 3):
+ raise argparse.ArgumentTypeError(
+ 'needs 2 or 3 colon-separated arguments')
+ nwarmup = int(args[0]) if args[0] else 5
+ ntracked = int(args[1]) if args[1] else 4
+ fname = args[2] if len(args) > 2 and args[2] else 'reflog.txt'
+ return nwarmup, ntracked, fname
+
+
+def resources_list(string):
+ u = [x.lower() for x in string.split(',')]
+ for r in u:
+ if r == 'all' or r == 'none':
+ continue
+ if r[0] == '-':
+ r = r[1:]
+ if r not in RESOURCE_NAMES:
+ raise argparse.ArgumentTypeError('invalid resource: ' + r)
+ return u
+
+
+def _parse_args(args, **kwargs):
+ # Defaults
+ ns = argparse.Namespace(testdir=None, verbose=0, quiet=False,
+ exclude=False, single=False, randomize=False, fromfile=None,
+ findleaks=False, use_resources=None, trace=False, coverdir='coverage',
+ runleaks=False, huntrleaks=False, verbose2=False, print_slow=False,
+ random_seed=None, use_mp=None, verbose3=False, forever=False,
+ header=False, failfast=False, match_tests=None, pgo=False)
+ for k, v in kwargs.items():
+ if not hasattr(ns, k):
+ raise TypeError('%r is an invalid keyword argument '
+ 'for this function' % k)
+ setattr(ns, k, v)
+ if ns.use_resources is None:
+ ns.use_resources = []
+
+ parser = _create_parser()
+ parser.parse_args(args=args, namespace=ns)
+
+ if ns.single and ns.fromfile:
+ parser.error("-s and -f don't go together!")
+ if ns.use_mp and ns.trace:
+ parser.error("-T and -j don't go together!")
+ if ns.use_mp and ns.findleaks:
+ parser.error("-l and -j don't go together!")
+ if ns.failfast and not (ns.verbose or ns.verbose3):
+ parser.error("-G/--failfast needs either -v or -W")
+ if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3):
+ parser.error("--pgo/-v don't go together!")
+
+ if ns.nowindows:
+ print("Warning: the --nowindows (-n) option is deprecated. "
+ "Use -vv to display assertions in stderr.", file=sys.stderr)
+
+ if ns.quiet:
+ ns.verbose = 0
+ if ns.timeout is not None:
+ if ns.timeout <= 0:
+ ns.timeout = None
+ if ns.use_mp is not None:
+ if ns.use_mp <= 0:
+ # Use all cores + extras for tests that like to sleep
+ ns.use_mp = 2 + (os.cpu_count() or 1)
+ if ns.use_mp == 1:
+ ns.use_mp = None
+ if ns.use:
+ for a in ns.use:
+ for r in a:
+ if r == 'all':
+ ns.use_resources[:] = RESOURCE_NAMES
+ continue
+ if r == 'none':
+ del ns.use_resources[:]
+ continue
+ remove = False
+ if r[0] == '-':
+ remove = True
+ r = r[1:]
+ if remove:
+ if r in ns.use_resources:
+ ns.use_resources.remove(r)
+ elif r not in ns.use_resources:
+ ns.use_resources.append(r)
+ if ns.random_seed is not None:
+ ns.randomize = True
+
+ return ns
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
new file mode 100644
index 0000000000..82788ad941
--- /dev/null
+++ b/Lib/test/libregrtest/main.py
@@ -0,0 +1,455 @@
+import faulthandler
+import os
+import platform
+import random
+import re
+import sys
+import sysconfig
+import tempfile
+import textwrap
+from test.libregrtest.cmdline import _parse_args
+from test.libregrtest.runtest import (
+ findtests, runtest,
+ STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
+ INTERRUPTED, CHILD_ERROR)
+from test.libregrtest.setup import setup_tests
+from test import support
+try:
+ import gc
+except ImportError:
+ gc = None
+
+
+# When tests are run from the Python build directory, it is best practice
+# to keep the test files in a subfolder. This eases the cleanup of leftover
+# files using the "make distclean" command.
+if sysconfig.is_python_build():
+ TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
+else:
+ TEMPDIR = tempfile.gettempdir()
+TEMPDIR = os.path.abspath(TEMPDIR)
+
+
+class Regrtest:
+ """Execute a test suite.
+
+ This also parses command-line options and modifies its behavior
+ accordingly.
+
+ tests -- a list of strings containing test names (optional)
+ testdir -- the directory in which to look for tests (optional)
+
+ Users other than the Python test suite will certainly want to
+ specify testdir; if it's omitted, the directory containing the
+ Python test suite is searched for.
+
+ If the tests argument is omitted, the tests listed on the
+ command-line will be used. If that's empty, too, then all *.py
+ files beginning with test_ will be used.
+
+ The other default arguments (verbose, quiet, exclude,
+ single, randomize, findleaks, use_resources, trace, coverdir,
+ print_slow, and random_seed) allow programmers calling main()
+ directly to set the values that would normally be set by flags
+ on the command line.
+ """
+ def __init__(self):
+ # Namespace of command line options
+ self.ns = None
+
+ # tests
+ self.tests = []
+ self.selected = []
+
+ # test results
+ self.good = []
+ self.bad = []
+ self.skipped = []
+ self.resource_denieds = []
+ self.environment_changed = []
+ self.interrupted = False
+
+ # used by --slow
+ self.test_times = []
+
+ # used by --coverage, trace.Trace instance
+ self.tracer = None
+
+ # used by --findleaks, store for gc.garbage
+ self.found_garbage = []
+
+ # used to display the progress bar "[ 3/100]"
+ self.test_count = ''
+ self.test_count_width = 1
+
+ # used by --single
+ self.next_single_test = None
+ self.next_single_filename = None
+
+ def accumulate_result(self, test, result):
+ ok, test_time = result
+ if ok not in (CHILD_ERROR, INTERRUPTED):
+ self.test_times.append((test_time, test))
+ if ok == PASSED:
+ self.good.append(test)
+ elif ok == FAILED:
+ self.bad.append(test)
+ elif ok == ENV_CHANGED:
+ self.environment_changed.append(test)
+ elif ok == SKIPPED:
+ self.skipped.append(test)
+ elif ok == RESOURCE_DENIED:
+ self.skipped.append(test)
+ self.resource_denieds.append(test)
+
+ def display_progress(self, test_index, test):
+ if self.ns.quiet:
+ return
+ if self.bad and not self.ns.pgo:
+ fmt = "[{1:{0}}{2}/{3}] {4}"
+ else:
+ fmt = "[{1:{0}}{2}] {4}"
+ print(fmt.format(self.test_count_width, test_index,
+ self.test_count, len(self.bad), test),
+ flush=True)
+
+ def parse_args(self, kwargs):
+ ns = _parse_args(sys.argv[1:], **kwargs)
+
+ if ns.timeout and not hasattr(faulthandler, 'dump_traceback_later'):
+ print("Warning: The timeout option requires "
+ "faulthandler.dump_traceback_later", file=sys.stderr)
+ ns.timeout = None
+
+ if ns.threshold is not None and gc is None:
+ print('No GC available, ignore --threshold.', file=sys.stderr)
+ ns.threshold = None
+
+ if ns.findleaks:
+ if gc is not None:
+ # Uncomment the line below to report garbage that is not
+ # freeable by reference counting alone. By default only
+ # garbage that is not collectable by the GC is reported.
+ pass
+ #gc.set_debug(gc.DEBUG_SAVEALL)
+ else:
+ print('No GC available, disabling --findleaks',
+ file=sys.stderr)
+ ns.findleaks = False
+
+ # Strip .py extensions.
+ removepy(ns.args)
+
+ return ns
+
+ def find_tests(self, tests):
+ self.tests = tests
+
+ if self.ns.single:
+ self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest')
+ try:
+ with open(self.next_single_filename, 'r') as fp:
+ next_test = fp.read().strip()
+ self.tests = [next_test]
+ except OSError:
+ pass
+
+ if self.ns.fromfile:
+ self.tests = []
+ with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
+ count_pat = re.compile(r'\[\s*\d+/\s*\d+\]')
+ for line in fp:
+ line = count_pat.sub('', line)
+ guts = line.split() # assuming no test has whitespace in its name
+ if guts and not guts[0].startswith('#'):
+ self.tests.extend(guts)
+
+ removepy(self.tests)
+
+ stdtests = STDTESTS[:]
+ nottests = NOTTESTS.copy()
+ if self.ns.exclude:
+ for arg in self.ns.args:
+ if arg in stdtests:
+ stdtests.remove(arg)
+ nottests.add(arg)
+ self.ns.args = []
+
+ # if testdir is set, then we are not running the python tests suite, so
+ # don't add default tests to be executed or skipped (pass empty values)
+ if self.ns.testdir:
+ alltests = findtests(self.ns.testdir, list(), set())
+ else:
+ alltests = findtests(self.ns.testdir, stdtests, nottests)
+
+ self.selected = self.tests or self.ns.args or alltests
+ if self.ns.single:
+ self.selected = self.selected[:1]
+ try:
+ pos = alltests.index(self.selected[0])
+ self.next_single_test = alltests[pos + 1]
+ except IndexError:
+ pass
+
+ # Remove all the selected tests that precede start if it's set.
+ if self.ns.start:
+ try:
+ del self.selected[:self.selected.index(self.ns.start)]
+ except ValueError:
+ print("Couldn't find starting test (%s), using all tests"
+ % self.ns.start, file=sys.stderr)
+
+ if self.ns.randomize:
+ if self.ns.random_seed is None:
+ self.ns.random_seed = random.randrange(10000000)
+ random.seed(self.ns.random_seed)
+ random.shuffle(self.selected)
+
+ def list_tests(self):
+ for name in self.selected:
+ print(name)
+
+ def rerun_failed_tests(self):
+ self.ns.verbose = True
+ self.ns.failfast = False
+ self.ns.verbose3 = False
+ self.ns.match_tests = None
+
+ print("Re-running failed tests in verbose mode")
+ for test in self.bad[:]:
+ print("Re-running test %r in verbose mode" % test, flush=True)
+ try:
+ self.ns.verbose = True
+ ok = runtest(self.ns, test)
+ except KeyboardInterrupt:
+ # print a newline separate from the ^C
+ print()
+ break
+ else:
+ if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
+ self.bad.remove(test)
+ else:
+ if self.bad:
+ print(count(len(self.bad), 'test'), "failed again:")
+ printlist(self.bad)
+
+ def display_result(self):
+ if self.interrupted:
+ # print a newline after ^C
+ print()
+ print("Test suite interrupted by signal SIGINT.")
+ executed = set(self.good) | set(self.bad) | set(self.skipped)
+ omitted = set(self.selected) - executed
+ print(count(len(omitted), "test"), "omitted:")
+ printlist(omitted)
+
+ # If running the test suite for PGO then no one cares about
+ # results.
+ if self.ns.pgo:
+ return
+
+ if self.good and not self.ns.quiet:
+ if (not self.bad
+ and not self.skipped
+ and not self.interrupted
+ and len(self.good) > 1):
+ print("All", end=' ')
+ print(count(len(self.good), "test"), "OK.")
+
+ if self.ns.print_slow:
+ self.test_times.sort(reverse=True)
+ print("10 slowest tests:")
+ for time, test in self.test_times[:10]:
+ print("%s: %.1fs" % (test, time))
+
+ if self.bad:
+ print(count(len(self.bad), "test"), "failed:")
+ printlist(self.bad)
+
+ if self.environment_changed:
+ print("{} altered the execution environment:".format(
+ count(len(self.environment_changed), "test")))
+ printlist(self.environment_changed)
+
+ if self.skipped and not self.ns.quiet:
+ print(count(len(self.skipped), "test"), "skipped:")
+ printlist(self.skipped)
+
+ def run_tests_sequential(self):
+ if self.ns.trace:
+ import trace
+ self.tracer = trace.Trace(trace=False, count=True)
+
+ save_modules = sys.modules.keys()
+
+ for test_index, test in enumerate(self.tests, 1):
+ self.display_progress(test_index, test)
+ if self.tracer:
+ # If we're tracing code coverage, then we don't exit with status
+ # if on a false return value from main.
+ cmd = ('result = runtest(self.ns, test); '
+ 'self.accumulate_result(test, result)')
+ self.tracer.runctx(cmd, globals=globals(), locals=vars())
+ else:
+ try:
+ result = runtest(self.ns, test)
+ except KeyboardInterrupt:
+ self.accumulate_result(test, (INTERRUPTED, None))
+ self.interrupted = True
+ break
+ else:
+ self.accumulate_result(test, result)
+
+ if self.ns.findleaks:
+ gc.collect()
+ if gc.garbage:
+ print("Warning: test created", len(gc.garbage), end=' ')
+ print("uncollectable object(s).")
+ # move the uncollectable objects somewhere so we don't see
+ # them again
+ self.found_garbage.extend(gc.garbage)
+ del gc.garbage[:]
+
+ # Unload the newly imported modules (best effort finalization)
+ for module in sys.modules.keys():
+ if module not in save_modules and module.startswith("test."):
+ support.unload(module)
+
+ def _test_forever(self, tests):
+ while True:
+ for test in tests:
+ yield test
+ if self.bad:
+ return
+
+ def run_tests(self):
+ # For a partial run, we do not need to clutter the output.
+ if (self.ns.verbose
+ or self.ns.header
+ or not (self.ns.pgo or self.ns.quiet or self.ns.single
+ or self.tests or self.ns.args)):
+ # Print basic platform information
+ print("==", platform.python_implementation(), *sys.version.split())
+ print("== ", platform.platform(aliased=True),
+ "%s-endian" % sys.byteorder)
+ print("== ", "hash algorithm:", sys.hash_info.algorithm,
+ "64bit" if sys.maxsize > 2**32 else "32bit")
+ print("== ", os.getcwd())
+ print("Testing with flags:", sys.flags)
+
+ if self.ns.randomize:
+ print("Using random seed", self.ns.random_seed)
+
+ if self.ns.forever:
+ self.tests = self._test_forever(list(self.selected))
+ self.test_count = ''
+ self.test_count_width = 3
+ else:
+ self.tests = iter(self.selected)
+ self.test_count = '/{}'.format(len(self.selected))
+ self.test_count_width = len(self.test_count) - 1
+
+ if self.ns.use_mp:
+ from test.libregrtest.runtest_mp import run_tests_multiprocess
+ run_tests_multiprocess(self)
+ else:
+ self.run_tests_sequential()
+
+ def finalize(self):
+ if self.next_single_filename:
+ if self.next_single_test:
+ with open(self.next_single_filename, 'w') as fp:
+ fp.write(self.next_single_test + '\n')
+ else:
+ os.unlink(self.next_single_filename)
+
+ if self.tracer:
+ r = self.tracer.results()
+ r.write_results(show_missing=True, summary=True,
+ coverdir=self.ns.coverdir)
+
+ if self.ns.runleaks:
+ os.system("leaks %d" % os.getpid())
+
+ def main(self, tests=None, **kwargs):
+ self.ns = self.parse_args(kwargs)
+
+ if self.ns.slaveargs is not None:
+ from test.libregrtest.runtest_mp import run_tests_slave
+ run_tests_slave(self.ns.slaveargs)
+
+ if self.ns.wait:
+ input("Press any key to continue...")
+
+ setup_tests(self.ns)
+
+ self.find_tests(tests)
+
+ if self.ns.list_tests:
+ self.list_tests()
+ sys.exit(0)
+
+ self.run_tests()
+ self.display_result()
+
+ if self.ns.verbose2 and self.bad:
+ self.rerun_failed_tests()
+
+ self.finalize()
+ sys.exit(len(self.bad) > 0 or self.interrupted)
+
+
+def removepy(names):
+ if not names:
+ return
+ for idx, name in enumerate(names):
+ basename, ext = os.path.splitext(name)
+ if ext == '.py':
+ names[idx] = basename
+
+
+def count(n, word):
+ if n == 1:
+ return "%d %s" % (n, word)
+ else:
+ return "%d %ss" % (n, word)
+
+
+def printlist(x, width=70, indent=4):
+ """Print the elements of iterable x to stdout.
+
+ Optional arg width (default 70) is the maximum line length.
+ Optional arg indent (default 4) is the number of blanks with which to
+ begin each line.
+ """
+
+ blanks = ' ' * indent
+ # Print the sorted list: 'x' may be a '--random' list or a set()
+ print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
+ initial_indent=blanks, subsequent_indent=blanks))
+
+
+def main(tests=None, **kwargs):
+ Regrtest().main(tests=tests, **kwargs)
+
+
+def main_in_temp_cwd():
+ """Run main() in a temporary working directory."""
+ if sysconfig.is_python_build():
+ try:
+ os.mkdir(TEMPDIR)
+ except FileExistsError:
+ pass
+
+ # Define a writable temp dir that will be used as cwd while running
+ # the tests. The name of the dir includes the pid to allow parallel
+ # testing (see the -j option).
+ test_cwd = 'test_python_{}'.format(os.getpid())
+ test_cwd = os.path.join(TEMPDIR, test_cwd)
+
+ # Run the tests in a context manager that temporarily changes the CWD to a
+ # temporary and writable directory. If it's not possible to create or
+ # change the CWD, the original CWD will be used. The original CWD is
+ # available from support.SAVEDCWD.
+ with support.temp_cwd(test_cwd, quiet=True):
+ main()
diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py
new file mode 100644
index 0000000000..59dc49fe77
--- /dev/null
+++ b/Lib/test/libregrtest/refleak.py
@@ -0,0 +1,202 @@
+import errno
+import os
+import re
+import sys
+import warnings
+from inspect import isabstract
+from test import support
+
+
+try:
+ MAXFD = os.sysconf("SC_OPEN_MAX")
+except Exception:
+ MAXFD = 256
+
+
+def fd_count():
+ """Count the number of open file descriptors"""
+ if sys.platform.startswith(('linux', 'freebsd')):
+ try:
+ names = os.listdir("/proc/self/fd")
+ return len(names)
+ except FileNotFoundError:
+ pass
+
+ count = 0
+ for fd in range(MAXFD):
+ try:
+ # Prefer dup() over fstat(). fstat() can require input/output
+ # whereas dup() doesn't.
+ fd2 = os.dup(fd)
+ except OSError as e:
+ if e.errno != errno.EBADF:
+ raise
+ else:
+ os.close(fd2)
+ count += 1
+ return count
+
+
+def dash_R(the_module, test, indirect_test, huntrleaks):
+ """Run a test multiple times, looking for reference leaks.
+
+ Returns:
+ False if the test didn't leak references; True if we detected refleaks.
+ """
+ # This code is hackish and inelegant, but it seems to do the job.
+ import copyreg
+ import collections.abc
+
+ if not hasattr(sys, 'gettotalrefcount'):
+ raise Exception("Tracking reference leaks requires a debug build "
+ "of Python")
+
+ # Save current values for dash_R_cleanup() to restore.
+ fs = warnings.filters[:]
+ ps = copyreg.dispatch_table.copy()
+ pic = sys.path_importer_cache.copy()
+ try:
+ import zipimport
+ except ImportError:
+ zdc = None # Run unmodified on platforms without zipimport support
+ else:
+ zdc = zipimport._zip_directory_cache.copy()
+ abcs = {}
+ for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
+ if not isabstract(abc):
+ continue
+ for obj in abc.__subclasses__() + [abc]:
+ abcs[obj] = obj._abc_registry.copy()
+
+ nwarmup, ntracked, fname = huntrleaks
+ fname = os.path.join(support.SAVEDCWD, fname)
+ repcount = nwarmup + ntracked
+ rc_deltas = [0] * repcount
+ alloc_deltas = [0] * repcount
+ fd_deltas = [0] * repcount
+
+ print("beginning", repcount, "repetitions", file=sys.stderr)
+ print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr,
+ flush=True)
+ # initialize variables to make pyflakes quiet
+ rc_before = alloc_before = fd_before = 0
+ for i in range(repcount):
+ indirect_test()
+ alloc_after, rc_after, fd_after = dash_R_cleanup(fs, ps, pic, zdc,
+ abcs)
+ print('.', end='', flush=True)
+ if i >= nwarmup:
+ rc_deltas[i] = rc_after - rc_before
+ alloc_deltas[i] = alloc_after - alloc_before
+ fd_deltas[i] = fd_after - fd_before
+ alloc_before = alloc_after
+ rc_before = rc_after
+ fd_before = fd_after
+ print(file=sys.stderr)
+ # These checkers return False on success, True on failure
+ def check_rc_deltas(deltas):
+ return any(deltas)
+ def check_alloc_deltas(deltas):
+ # At least 1/3rd of 0s
+ if 3 * deltas.count(0) < len(deltas):
+ return True
+ # Nothing else than 1s, 0s and -1s
+ if not set(deltas) <= {1,0,-1}:
+ return True
+ return False
+ failed = False
+ for deltas, item_name, checker in [
+ (rc_deltas, 'references', check_rc_deltas),
+ (alloc_deltas, 'memory blocks', check_alloc_deltas),
+ (fd_deltas, 'file descriptors', check_rc_deltas)]:
+ if checker(deltas):
+ msg = '%s leaked %s %s, sum=%s' % (
+ test, deltas[nwarmup:], item_name, sum(deltas))
+ print(msg, file=sys.stderr, flush=True)
+ with open(fname, "a") as refrep:
+ print(msg, file=refrep)
+ refrep.flush()
+ failed = True
+ return failed
+
+
+def dash_R_cleanup(fs, ps, pic, zdc, abcs):
+ import gc, copyreg
+ import _strptime, linecache
+ import urllib.parse, urllib.request, mimetypes, doctest
+ import struct, filecmp, collections.abc
+ from distutils.dir_util import _path_created
+ from weakref import WeakSet
+
+ # Clear the warnings registry, so they can be displayed again
+ for mod in sys.modules.values():
+ if hasattr(mod, '__warningregistry__'):
+ del mod.__warningregistry__
+
+ # Restore some original values.
+ warnings.filters[:] = fs
+ copyreg.dispatch_table.clear()
+ copyreg.dispatch_table.update(ps)
+ sys.path_importer_cache.clear()
+ sys.path_importer_cache.update(pic)
+ try:
+ import zipimport
+ except ImportError:
+ pass # Run unmodified on platforms without zipimport support
+ else:
+ zipimport._zip_directory_cache.clear()
+ zipimport._zip_directory_cache.update(zdc)
+
+ # clear type cache
+ sys._clear_type_cache()
+
+ # Clear ABC registries, restoring previously saved ABC registries.
+ for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
+ if not isabstract(abc):
+ continue
+ for obj in abc.__subclasses__() + [abc]:
+ obj._abc_registry = abcs.get(obj, WeakSet()).copy()
+ obj._abc_cache.clear()
+ obj._abc_negative_cache.clear()
+
+ # Flush standard output, so that buffered data is sent to the OS and
+ # associated Python objects are reclaimed.
+ for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__):
+ if stream is not None:
+ stream.flush()
+
+ # Clear assorted module caches.
+ _path_created.clear()
+ re.purge()
+ _strptime._regex_cache.clear()
+ urllib.parse.clear_cache()
+ urllib.request.urlcleanup()
+ linecache.clearcache()
+ mimetypes._default_mime_types()
+ filecmp._cache.clear()
+ struct._clearcache()
+ doctest.master = None
+ try:
+ import ctypes
+ except ImportError:
+ # Don't worry about resetting the cache if ctypes is not supported
+ pass
+ else:
+ ctypes._reset_cache()
+
+ # Collect cyclic trash and read memory statistics immediately after.
+ func1 = sys.getallocatedblocks
+ func2 = sys.gettotalrefcount
+ gc.collect()
+ return func1(), func2(), fd_count()
+
+
+def warm_caches():
+ # char cache
+ s = bytes(range(256))
+ for i in range(256):
+ s[i:i+1]
+ # unicode cache
+ [chr(i) for i in range(256)]
+ # int cache
+ list(range(-5, 257))
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
new file mode 100644
index 0000000000..043f23c095
--- /dev/null
+++ b/Lib/test/libregrtest/runtest.py
@@ -0,0 +1,242 @@
+import faulthandler
+import importlib
+import io
+import os
+import sys
+import time
+import traceback
+import unittest
+from test import support
+from test.libregrtest.refleak import dash_R
+from test.libregrtest.save_env import saved_test_environment
+
+
+# Test result constants.
+PASSED = 1
+FAILED = 0
+ENV_CHANGED = -1
+SKIPPED = -2
+RESOURCE_DENIED = -3
+INTERRUPTED = -4
+CHILD_ERROR = -5 # error in a child process
+
+
+# small set of tests to determine if we have a basically functioning interpreter
+# (i.e. if any of these fail, then anything else is likely to follow)
+STDTESTS = [
+ 'test_grammar',
+ 'test_opcodes',
+ 'test_dict',
+ 'test_builtin',
+ 'test_exceptions',
+ 'test_types',
+ 'test_unittest',
+ 'test_doctest',
+ 'test_doctest2',
+ 'test_support'
+]
+
+# set of tests that we don't want to be executed when using regrtest
+NOTTESTS = set()
+
+
+def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
+ """Return a list of all applicable test modules."""
+ testdir = findtestdir(testdir)
+ names = os.listdir(testdir)
+ tests = []
+ others = set(stdtests) | nottests
+ for name in names:
+ mod, ext = os.path.splitext(name)
+ if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
+ tests.append(mod)
+ return stdtests + sorted(tests)
+
+
+def runtest(ns, test):
+ """Run a single test.
+
+ test -- the name of the test
+ verbose -- if true, print more messages
+ quiet -- if true, don't print 'skipped' messages (probably redundant)
+ huntrleaks -- run multiple times to test for leaks; requires a debug
+ build; a triple corresponding to -R's three arguments
+ output_on_failure -- if true, display test output on failure
+ timeout -- dump the traceback and exit if a test takes more than
+ timeout seconds
+ failfast, match_tests -- See regrtest command-line flags for these.
+ pgo -- if true, suppress any info irrelevant to a generating a PGO build
+
+ Returns the tuple result, test_time, where result is one of the constants:
+ INTERRUPTED KeyboardInterrupt when run under -j
+ RESOURCE_DENIED test skipped because resource denied
+ SKIPPED test skipped for some other reason
+ ENV_CHANGED test failed because it changed the execution environment
+ FAILED test failed
+ PASSED test passed
+ """
+
+ verbose = ns.verbose
+ quiet = ns.quiet
+ huntrleaks = ns.huntrleaks
+ output_on_failure = ns.verbose3
+ failfast = ns.failfast
+ match_tests = ns.match_tests
+ timeout = ns.timeout
+ pgo = ns.pgo
+
+ use_timeout = (timeout is not None)
+ if use_timeout:
+ faulthandler.dump_traceback_later(timeout, exit=True)
+ try:
+ support.match_tests = match_tests
+ if failfast:
+ support.failfast = True
+ if output_on_failure:
+ support.verbose = True
+
+ # Reuse the same instance to all calls to runtest(). Some
+ # tests keep a reference to sys.stdout or sys.stderr
+ # (eg. test_argparse).
+ if runtest.stringio is None:
+ stream = io.StringIO()
+ runtest.stringio = stream
+ else:
+ stream = runtest.stringio
+ stream.seek(0)
+ stream.truncate()
+
+ orig_stdout = sys.stdout
+ orig_stderr = sys.stderr
+ try:
+ sys.stdout = stream
+ sys.stderr = stream
+ result = runtest_inner(test, verbose, quiet, huntrleaks,
+ display_failure=False, pgo=pgo)
+ if result[0] == FAILED:
+ output = stream.getvalue()
+ orig_stderr.write(output)
+ orig_stderr.flush()
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+ else:
+ support.verbose = verbose # Tell tests to be moderately quiet
+ result = runtest_inner(test, verbose, quiet, huntrleaks,
+ display_failure=not verbose, pgo=pgo)
+ return result
+ finally:
+ if use_timeout:
+ faulthandler.cancel_dump_traceback_later()
+ cleanup_test_droppings(test, verbose)
+runtest.stringio = None
+
+
+def runtest_inner(test, verbose, quiet,
+ huntrleaks=False, display_failure=True, *, pgo=False):
+ support.unload(test)
+
+ test_time = 0.0
+ refleak = False # True if the test leaked references.
+ try:
+ if test.startswith('test.'):
+ abstest = test
+ else:
+ # Always import it from the test package
+ abstest = 'test.' + test
+ with saved_test_environment(test, verbose, quiet, pgo=pgo) as environment:
+ start_time = time.time()
+ the_module = importlib.import_module(abstest)
+ # If the test has a test_main, that will run the appropriate
+ # tests. If not, use normal unittest test loading.
+ test_runner = getattr(the_module, "test_main", None)
+ if test_runner is None:
+ def test_runner():
+ loader = unittest.TestLoader()
+ tests = loader.loadTestsFromModule(the_module)
+ for error in loader.errors:
+ print(error, file=sys.stderr)
+ if loader.errors:
+ raise Exception("errors while loading tests")
+ support.run_unittest(tests)
+ test_runner()
+ if huntrleaks:
+ refleak = dash_R(the_module, test, test_runner, huntrleaks)
+ test_time = time.time() - start_time
+ except support.ResourceDenied as msg:
+ if not quiet and not pgo:
+ print(test, "skipped --", msg, flush=True)
+ return RESOURCE_DENIED, test_time
+ except unittest.SkipTest as msg:
+ if not quiet and not pgo:
+ print(test, "skipped --", msg, flush=True)
+ return SKIPPED, test_time
+ except KeyboardInterrupt:
+ raise
+ except support.TestFailed as msg:
+ if not pgo:
+ if display_failure:
+ print("test", test, "failed --", msg, file=sys.stderr,
+ flush=True)
+ else:
+ print("test", test, "failed", file=sys.stderr, flush=True)
+ return FAILED, test_time
+ except:
+ msg = traceback.format_exc()
+ if not pgo:
+ print("test", test, "crashed --", msg, file=sys.stderr,
+ flush=True)
+ return FAILED, test_time
+ else:
+ if refleak:
+ return FAILED, test_time
+ if environment.changed:
+ return ENV_CHANGED, test_time
+ return PASSED, test_time
+
+
+def cleanup_test_droppings(testname, verbose):
+ import shutil
+ import stat
+ import gc
+
+ # First kill any dangling references to open files etc.
+ # This can also issue some ResourceWarnings which would otherwise get
+ # triggered during the following test run, and possibly produce failures.
+ gc.collect()
+
+ # Try to clean up junk commonly left behind. While tests shouldn't leave
+ # any files or directories behind, when a test fails that can be tedious
+ # for it to arrange. The consequences can be especially nasty on Windows,
+ # since if a test leaves a file open, it cannot be deleted by name (while
+ # there's nothing we can do about that here either, we can display the
+ # name of the offending test, which is a real help).
+ for name in (support.TESTFN,
+ "db_home",
+ ):
+ if not os.path.exists(name):
+ continue
+
+ if os.path.isdir(name):
+ kind, nuker = "directory", shutil.rmtree
+ elif os.path.isfile(name):
+ kind, nuker = "file", os.unlink
+ else:
+ raise SystemError("os.path says %r exists but is neither "
+ "directory nor file" % name)
+
+ if verbose:
+ print("%r left behind %s %r" % (testname, kind, name))
+ try:
+ # if we have chmod, fix possible permissions problems
+ # that might prevent cleanup
+ if (hasattr(os, 'chmod')):
+ os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+ nuker(name)
+ except Exception as msg:
+ print(("%r left behind %s %r and it couldn't be "
+ "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
+
+
+def findtestdir(path=None):
+ return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
new file mode 100644
index 0000000000..0ca7dd7a4f
--- /dev/null
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -0,0 +1,224 @@
+import json
+import os
+import queue
+import sys
+import time
+import traceback
+import types
+from test import support
+try:
+ import threading
+except ImportError:
+ print("Multiprocess option requires thread support")
+ sys.exit(2)
+
+from test.libregrtest.runtest import runtest, INTERRUPTED, CHILD_ERROR
+from test.libregrtest.setup import setup_tests
+
+
+# Minimum duration of a test to display its duration or to mention that
+# the test is running in background
+PROGRESS_MIN_TIME = 30.0 # seconds
+
+# Display the running tests if nothing happened last N seconds
+PROGRESS_UPDATE = 30.0 # seconds
+
+
+def run_test_in_subprocess(testname, ns):
+ """Run the given test in a subprocess with --slaveargs.
+
+ ns is the option Namespace parsed from command-line arguments. regrtest
+ is invoked in a subprocess with the --slaveargs argument; when the
+ subprocess exits, its return code, stdout and stderr are returned as a
+ 3-tuple.
+ """
+ from subprocess import Popen, PIPE
+
+ ns_dict = vars(ns)
+ slaveargs = (ns_dict, testname)
+ slaveargs = json.dumps(slaveargs)
+
+ cmd = [sys.executable, *support.args_from_interpreter_flags(),
+ '-X', 'faulthandler',
+ '-m', 'test.regrtest',
+ '--slaveargs', slaveargs]
+ if ns.pgo:
+ cmd += ['--pgo']
+
+ # Running the child from the same working directory as regrtest's original
+ # invocation ensures that TEMPDIR for the child is the same when
+ # sysconfig.is_python_build() is true. See issue 15300.
+ popen = Popen(cmd,
+ stdout=PIPE, stderr=PIPE,
+ universal_newlines=True,
+ close_fds=(os.name != 'nt'),
+ cwd=support.SAVEDCWD)
+ with popen:
+ stdout, stderr = popen.communicate()
+ retcode = popen.wait()
+ return retcode, stdout, stderr
+
+
+def run_tests_slave(slaveargs):
+ ns_dict, testname = json.loads(slaveargs)
+ ns = types.SimpleNamespace(**ns_dict)
+
+ setup_tests(ns)
+
+ try:
+ result = runtest(ns, testname)
+ except KeyboardInterrupt:
+ result = INTERRUPTED, ''
+ except BaseException as e:
+ traceback.print_exc()
+ result = CHILD_ERROR, str(e)
+
+ print() # Force a newline (just in case)
+ print(json.dumps(result), flush=True)
+ sys.exit(0)
+
+
+# We do not use a generator so multiple threads can call next().
+class MultiprocessIterator:
+
+ """A thread-safe iterator over tests for multiprocess mode."""
+
+ def __init__(self, tests):
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.tests = tests
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ with self.lock:
+ if self.interrupted:
+ raise StopIteration('tests interrupted')
+ return next(self.tests)
+
+
+class MultiprocessThread(threading.Thread):
+ def __init__(self, pending, output, ns):
+ super().__init__()
+ self.pending = pending
+ self.output = output
+ self.ns = ns
+ self.current_test = None
+ self.start_time = None
+
+ def _runtest(self):
+ try:
+ test = next(self.pending)
+ except StopIteration:
+ self.output.put((None, None, None, None))
+ return True
+
+ try:
+ self.start_time = time.monotonic()
+ self.current_test = test
+
+ retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
+ finally:
+ self.current_test = None
+
+ stdout, _, result = stdout.strip().rpartition("\n")
+ if retcode != 0:
+ result = (CHILD_ERROR, "Exit code %s" % retcode)
+ self.output.put((test, stdout.rstrip(), stderr.rstrip(),
+ result))
+ return True
+
+ if not result:
+ self.output.put((None, None, None, None))
+ return True
+
+ result = json.loads(result)
+ self.output.put((test, stdout.rstrip(), stderr.rstrip(),
+ result))
+ return False
+
+ def run(self):
+ try:
+ stop = False
+ while not stop:
+ stop = self._runtest()
+ except BaseException:
+ self.output.put((None, None, None, None))
+ raise
+
+
+def run_tests_multiprocess(regrtest):
+ output = queue.Queue()
+ pending = MultiprocessIterator(regrtest.tests)
+
+ workers = [MultiprocessThread(pending, output, regrtest.ns)
+ for i in range(regrtest.ns.use_mp)]
+ for worker in workers:
+ worker.start()
+
+ def get_running(workers):
+ running = []
+ for worker in workers:
+ current_test = worker.current_test
+ if not current_test:
+ continue
+ dt = time.monotonic() - worker.start_time
+ if dt >= PROGRESS_MIN_TIME:
+ running.append('%s (%.0f sec)' % (current_test, dt))
+ return running
+
+ finished = 0
+ test_index = 1
+ timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
+ try:
+ while finished < regrtest.ns.use_mp:
+ try:
+ item = output.get(timeout=timeout)
+ except queue.Empty:
+ running = get_running(workers)
+ if running and not regrtest.ns.pgo:
+ print('running: %s' % ', '.join(running))
+ continue
+
+ test, stdout, stderr, result = item
+ if test is None:
+ finished += 1
+ continue
+ regrtest.accumulate_result(test, result)
+
+ # Display progress
+ text = test
+ ok, test_time = result
+ if (ok not in (CHILD_ERROR, INTERRUPTED)
+ and test_time >= PROGRESS_MIN_TIME
+ and not regrtest.ns.pgo):
+ text += ' (%.0f sec)' % test_time
+ running = get_running(workers)
+ if running and not regrtest.ns.pgo:
+ text += ' -- running: %s' % ', '.join(running)
+ regrtest.display_progress(test_index, text)
+
+ # Copy stdout and stderr from the child process
+ if stdout:
+ print(stdout, flush=True)
+ if stderr and not regrtest.ns.pgo:
+ print(stderr, file=sys.stderr, flush=True)
+
+ if result[0] == INTERRUPTED:
+ raise KeyboardInterrupt
+ if result[0] == CHILD_ERROR:
+ msg = "Child error on {}: {}".format(test, result[1])
+ raise Exception(msg)
+ test_index += 1
+ except KeyboardInterrupt:
+ regrtest.interrupted = True
+ pending.interrupted = True
+ print()
+
+ running = [worker.current_test for worker in workers]
+ running = list(filter(bool, running))
+ if running:
+ print("Waiting for %s" % ', '.join(running))
+ for worker in workers:
+ worker.join()
diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py
new file mode 100644
index 0000000000..90900a9770
--- /dev/null
+++ b/Lib/test/libregrtest/save_env.py
@@ -0,0 +1,285 @@
+import builtins
+import locale
+import logging
+import os
+import shutil
+import sys
+import sysconfig
+import warnings
+from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+try:
+ import _multiprocessing, multiprocessing.process
+except ImportError:
+ multiprocessing = None
+
+
+# Unit tests are supposed to leave the execution environment unchanged
+# once they complete. But sometimes tests have bugs, especially when
+# tests fail, and the changes to environment go on to mess up other
+# tests. This can cause issues with buildbot stability, since tests
+# are run in random order and so problems may appear to come and go.
+# There are a few things we can save and restore to mitigate this, and
+# the following context manager handles this task.
+
+class saved_test_environment:
+ """Save bits of the test environment and restore them at block exit.
+
+ with saved_test_environment(testname, verbose, quiet):
+ #stuff
+
+ Unless quiet is True, a warning is printed to stderr if any of
+ the saved items was changed by the test. The attribute 'changed'
+ is initially False, but is set to True if a change is detected.
+
+ If verbose is more than 1, the before and after state of changed
+ items is also printed.
+ """
+
+ changed = False
+
+ def __init__(self, testname, verbose=0, quiet=False, *, pgo=False):
+ self.testname = testname
+ self.verbose = verbose
+ self.quiet = quiet
+ self.pgo = pgo
+
+ # To add things to save and restore, add a name XXX to the resources list
+ # and add corresponding get_XXX/restore_XXX functions. get_XXX should
+ # return the value to be saved and compared against a second call to the
+ # get function when test execution completes. restore_XXX should accept
+ # the saved value and restore the resource using it. It will be called if
+ # and only if a change in the value is detected.
+ #
+ # Note: XXX will have any '.' replaced with '_' characters when determining
+ # the corresponding method names.
+
+ resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
+ 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
+ 'warnings.filters', 'asyncore.socket_map',
+ 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
+ 'sys.warnoptions',
+ # multiprocessing.process._cleanup() may release ref
+ # to a thread, so check processes first.
+ 'multiprocessing.process._dangling', 'threading._dangling',
+ 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
+ 'files', 'locale', 'warnings.showwarning',
+ )
+
+ def get_sys_argv(self):
+ return id(sys.argv), sys.argv, sys.argv[:]
+ def restore_sys_argv(self, saved_argv):
+ sys.argv = saved_argv[1]
+ sys.argv[:] = saved_argv[2]
+
+ def get_cwd(self):
+ return os.getcwd()
+ def restore_cwd(self, saved_cwd):
+ os.chdir(saved_cwd)
+
+ def get_sys_stdout(self):
+ return sys.stdout
+ def restore_sys_stdout(self, saved_stdout):
+ sys.stdout = saved_stdout
+
+ def get_sys_stderr(self):
+ return sys.stderr
+ def restore_sys_stderr(self, saved_stderr):
+ sys.stderr = saved_stderr
+
+ def get_sys_stdin(self):
+ return sys.stdin
+ def restore_sys_stdin(self, saved_stdin):
+ sys.stdin = saved_stdin
+
+ def get_os_environ(self):
+ return id(os.environ), os.environ, dict(os.environ)
+ def restore_os_environ(self, saved_environ):
+ os.environ = saved_environ[1]
+ os.environ.clear()
+ os.environ.update(saved_environ[2])
+
+ def get_sys_path(self):
+ return id(sys.path), sys.path, sys.path[:]
+ def restore_sys_path(self, saved_path):
+ sys.path = saved_path[1]
+ sys.path[:] = saved_path[2]
+
+ def get_sys_path_hooks(self):
+ return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
+ def restore_sys_path_hooks(self, saved_hooks):
+ sys.path_hooks = saved_hooks[1]
+ sys.path_hooks[:] = saved_hooks[2]
+
+ def get_sys_gettrace(self):
+ return sys.gettrace()
+ def restore_sys_gettrace(self, trace_fxn):
+ sys.settrace(trace_fxn)
+
+ def get___import__(self):
+ return builtins.__import__
+ def restore___import__(self, import_):
+ builtins.__import__ = import_
+
+ def get_warnings_filters(self):
+ return id(warnings.filters), warnings.filters, warnings.filters[:]
+ def restore_warnings_filters(self, saved_filters):
+ warnings.filters = saved_filters[1]
+ warnings.filters[:] = saved_filters[2]
+
+ def get_asyncore_socket_map(self):
+ asyncore = sys.modules.get('asyncore')
+ # XXX Making a copy keeps objects alive until __exit__ gets called.
+ return asyncore and asyncore.socket_map.copy() or {}
+ def restore_asyncore_socket_map(self, saved_map):
+ asyncore = sys.modules.get('asyncore')
+ if asyncore is not None:
+ asyncore.close_all(ignore_all=True)
+ asyncore.socket_map.update(saved_map)
+
+ def get_shutil_archive_formats(self):
+ # we could call get_archives_formats() but that only returns the
+ # registry keys; we want to check the values too (the functions that
+ # are registered)
+ return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
+ def restore_shutil_archive_formats(self, saved):
+ shutil._ARCHIVE_FORMATS = saved[0]
+ shutil._ARCHIVE_FORMATS.clear()
+ shutil._ARCHIVE_FORMATS.update(saved[1])
+
+ def get_shutil_unpack_formats(self):
+ return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
+ def restore_shutil_unpack_formats(self, saved):
+ shutil._UNPACK_FORMATS = saved[0]
+ shutil._UNPACK_FORMATS.clear()
+ shutil._UNPACK_FORMATS.update(saved[1])
+
+ def get_logging__handlers(self):
+ # _handlers is a WeakValueDictionary
+ return id(logging._handlers), logging._handlers, logging._handlers.copy()
+ def restore_logging__handlers(self, saved_handlers):
+ # Can't easily revert the logging state
+ pass
+
+ def get_logging__handlerList(self):
+ # _handlerList is a list of weakrefs to handlers
+ return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
+ def restore_logging__handlerList(self, saved_handlerList):
+ # Can't easily revert the logging state
+ pass
+
+ def get_sys_warnoptions(self):
+ return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
+ def restore_sys_warnoptions(self, saved_options):
+ sys.warnoptions = saved_options[1]
+ sys.warnoptions[:] = saved_options[2]
+
+ # Controlling dangling references to Thread objects can make it easier
+ # to track reference leaks.
+ def get_threading__dangling(self):
+ if not threading:
+ return None
+ # This copies the weakrefs without making any strong reference
+ return threading._dangling.copy()
+ def restore_threading__dangling(self, saved):
+ if not threading:
+ return
+ threading._dangling.clear()
+ threading._dangling.update(saved)
+
+ # Same for Process objects
+ def get_multiprocessing_process__dangling(self):
+ if not multiprocessing:
+ return None
+ # Unjoined process objects can survive after process exits
+ multiprocessing.process._cleanup()
+ # This copies the weakrefs without making any strong reference
+ return multiprocessing.process._dangling.copy()
+ def restore_multiprocessing_process__dangling(self, saved):
+ if not multiprocessing:
+ return
+ multiprocessing.process._dangling.clear()
+ multiprocessing.process._dangling.update(saved)
+
+ def get_sysconfig__CONFIG_VARS(self):
+ # make sure the dict is initialized
+ sysconfig.get_config_var('prefix')
+ return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
+ dict(sysconfig._CONFIG_VARS))
+ def restore_sysconfig__CONFIG_VARS(self, saved):
+ sysconfig._CONFIG_VARS = saved[1]
+ sysconfig._CONFIG_VARS.clear()
+ sysconfig._CONFIG_VARS.update(saved[2])
+
+ def get_sysconfig__INSTALL_SCHEMES(self):
+ return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
+ sysconfig._INSTALL_SCHEMES.copy())
+ def restore_sysconfig__INSTALL_SCHEMES(self, saved):
+ sysconfig._INSTALL_SCHEMES = saved[1]
+ sysconfig._INSTALL_SCHEMES.clear()
+ sysconfig._INSTALL_SCHEMES.update(saved[2])
+
+ def get_files(self):
+ return sorted(fn + ('/' if os.path.isdir(fn) else '')
+ for fn in os.listdir())
+ def restore_files(self, saved_value):
+ fn = support.TESTFN
+ if fn not in saved_value and (fn + '/') not in saved_value:
+ if os.path.isfile(fn):
+ support.unlink(fn)
+ elif os.path.isdir(fn):
+ support.rmtree(fn)
+
+ _lc = [getattr(locale, lc) for lc in dir(locale)
+ if lc.startswith('LC_')]
+ def get_locale(self):
+ pairings = []
+ for lc in self._lc:
+ try:
+ pairings.append((lc, locale.setlocale(lc, None)))
+ except (TypeError, ValueError):
+ continue
+ return pairings
+ def restore_locale(self, saved):
+ for lc, setting in saved:
+ locale.setlocale(lc, setting)
+
+ def get_warnings_showwarning(self):
+ return warnings.showwarning
+ def restore_warnings_showwarning(self, fxn):
+ warnings.showwarning = fxn
+
+ def resource_info(self):
+ for name in self.resources:
+ method_suffix = name.replace('.', '_')
+ get_name = 'get_' + method_suffix
+ restore_name = 'restore_' + method_suffix
+ yield name, getattr(self, get_name), getattr(self, restore_name)
+
+ def __enter__(self):
+ self.saved_values = dict((name, get()) for name, get, restore
+ in self.resource_info())
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ saved_values = self.saved_values
+ del self.saved_values
+ for name, get, restore in self.resource_info():
+ current = get()
+ original = saved_values.pop(name)
+ # Check for changes to the resource's value
+ if current != original:
+ self.changed = True
+ restore(original)
+ if not self.quiet and not self.pgo:
+ print("Warning -- {} was modified by {}".format(
+ name, self.testname),
+ file=sys.stderr)
+ if self.verbose > 1:
+ print(" Before: {}\n After: {} ".format(
+ original, current),
+ file=sys.stderr)
+ return False
diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py
new file mode 100644
index 0000000000..6e05c7e6ff
--- /dev/null
+++ b/Lib/test/libregrtest/setup.py
@@ -0,0 +1,116 @@
+import atexit
+import faulthandler
+import os
+import signal
+import sys
+import unittest
+from test import support
+try:
+ import gc
+except ImportError:
+ gc = None
+
+from test.libregrtest.refleak import warm_caches
+
+
+def setup_tests(ns):
+ # Display the Python traceback on fatal errors (e.g. segfault)
+ faulthandler.enable(all_threads=True)
+
+ # Display the Python traceback on SIGALRM or SIGUSR1 signal
+ signals = []
+ if hasattr(signal, 'SIGALRM'):
+ signals.append(signal.SIGALRM)
+ if hasattr(signal, 'SIGUSR1'):
+ signals.append(signal.SIGUSR1)
+ for signum in signals:
+ faulthandler.register(signum, chain=True)
+
+ replace_stdout()
+ support.record_original_stdout(sys.stdout)
+
+ # Some times __path__ and __file__ are not absolute (e.g. while running from
+ # Lib/) and, if we change the CWD to run the tests in a temporary dir, some
+ # imports might fail. This affects only the modules imported before os.chdir().
+ # These modules are searched first in sys.path[0] (so '' -- the CWD) and if
+ # they are found in the CWD their __file__ and __path__ will be relative (this
+ # happens before the chdir). All the modules imported after the chdir, are
+ # not found in the CWD, and since the other paths in sys.path[1:] are absolute
+ # (site.py absolutize them), the __file__ and __path__ will be absolute too.
+ # Therefore it is necessary to absolutize manually the __file__ and __path__ of
+ # the packages to prevent later imports to fail when the CWD is different.
+ for module in sys.modules.values():
+ if hasattr(module, '__path__'):
+ module.__path__ = [os.path.abspath(path)
+ for path in module.__path__]
+ if hasattr(module, '__file__'):
+ module.__file__ = os.path.abspath(module.__file__)
+
+ # MacOSX (a.k.a. Darwin) has a default stack size that is too small
+ # for deeply recursive regular expressions. We see this as crashes in
+ # the Python test suite when running test_re.py and test_sre.py. The
+ # fix is to set the stack limit to 2048.
+ # This approach may also be useful for other Unixy platforms that
+ # suffer from small default stack limits.
+ if sys.platform == 'darwin':
+ try:
+ import resource
+ except ImportError:
+ pass
+ else:
+ soft, hard = resource.getrlimit(resource.RLIMIT_STACK)
+ newsoft = min(hard, max(soft, 1024*2048))
+ resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard))
+
+ if ns.huntrleaks:
+ unittest.BaseTestSuite._cleanup = False
+
+ # Avoid false positives due to various caches
+ # filling slowly with random data:
+ warm_caches()
+
+ if ns.memlimit is not None:
+ support.set_memlimit(ns.memlimit)
+
+ if ns.threshold is not None:
+ gc.set_threshold(ns.threshold)
+
+ try:
+ import msvcrt
+ except ImportError:
+ pass
+ else:
+ msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS|
+ msvcrt.SEM_NOALIGNMENTFAULTEXCEPT|
+ msvcrt.SEM_NOGPFAULTERRORBOX|
+ msvcrt.SEM_NOOPENFILEERRORBOX)
+ try:
+ msvcrt.CrtSetReportMode
+ except AttributeError:
+ # release build
+ pass
+ else:
+ for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
+ if ns.verbose and ns.verbose >= 2:
+ msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
+ msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
+ else:
+ msvcrt.CrtSetReportMode(m, 0)
+
+ support.use_resources = ns.use_resources
+
+
+def replace_stdout():
+ """Set stdout encoder error handler to backslashreplace (as stderr error
+ handler) to avoid UnicodeEncodeError when printing a traceback"""
+ stdout = sys.stdout
+ sys.stdout = open(stdout.fileno(), 'w',
+ encoding=stdout.encoding,
+ errors="backslashreplace",
+ closefd=False,
+ newline='\n')
+
+ def restore_stdout():
+ sys.stdout.close()
+ sys.stdout = stdout
+ atexit.register(restore_stdout)
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
index afd6873683..055bf28565 100644
--- a/Lib/test/lock_tests.py
+++ b/Lib/test/lock_tests.py
@@ -7,6 +7,7 @@ import time
from _thread import start_new_thread, TIMEOUT_MAX
import threading
import unittest
+import weakref
from test import support
@@ -198,6 +199,17 @@ class BaseLockTests(BaseTestCase):
self.assertFalse(results[0])
self.assertTimeout(results[1], 0.5)
+ def test_weakref_exists(self):
+ lock = self.locktype()
+ ref = weakref.ref(lock)
+ self.assertIsNotNone(ref())
+
+ def test_weakref_deleted(self):
+ lock = self.locktype()
+ ref = weakref.ref(lock)
+ del lock
+ self.assertIsNone(ref())
+
class LockTests(BaseLockTests):
"""
diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py
index 6d971aa273..d0c02acef0 100644
--- a/Lib/test/pickletester.py
+++ b/Lib/test/pickletester.py
@@ -1668,16 +1668,14 @@ class AbstractPickleTests(unittest.TestCase):
x.abc = 666
for proto in protocols:
with self.subTest(proto=proto):
- if 2 <= proto < 4:
- self.assertRaises(ValueError, self.dumps, x, proto)
- continue
s = self.dumps(x, proto)
if proto < 1:
self.assertIn(b'\nL64206', s) # LONG
elif proto < 2:
self.assertIn(b'M\xce\xfa', s) # BININT2
+ elif proto < 4:
+ self.assertIn(b'X\x04\x00\x00\x00FACE', s) # BINUNICODE
else:
- assert proto >= 4
self.assertIn(b'\x8c\x04FACE', s) # SHORT_BINUNICODE
self.assertFalse(opcode_in_pickle(pickle.NEWOBJ, s))
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ_EX, s),
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index 431042eec1..fcc39375c0 100755..100644
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -6,1589 +6,12 @@ Script to run Python regression tests.
Run this script with -h or --help for documentation.
"""
-USAGE = """\
-python -m test [options] [test_name1 [test_name2 ...]]
-python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]]
-"""
-
-DESCRIPTION = """\
-Run Python regression tests.
-
-If no arguments or options are provided, finds all files matching
-the pattern "test_*" in the Lib/test subdirectory and runs
-them in alphabetical order (but see -M and -u, below, for exceptions).
-
-For more rigorous testing, it is useful to use the following
-command line:
-
-python -E -Wd -m test [options] [test_name1 ...]
-"""
-
-EPILOG = """\
-Additional option details:
-
--r randomizes test execution order. You can use --randseed=int to provide an
-int seed value for the randomizer; this is useful for reproducing troublesome
-test orders.
-
--s On the first invocation of regrtest using -s, the first test file found
-or the first test file given on the command line is run, and the name of
-the next test is recorded in a file named pynexttest. If run from the
-Python build directory, pynexttest is located in the 'build' subdirectory,
-otherwise it is located in tempfile.gettempdir(). On subsequent runs,
-the test in pynexttest is run, and the next test is written to pynexttest.
-When the last test has been run, pynexttest is deleted. In this way it
-is possible to single step through the test files. This is useful when
-doing memory analysis on the Python interpreter, which process tends to
-consume too many resources to run the full regression test non-stop.
-
--S is used to continue running tests after an aborted run. It will
-maintain the order a standard run (ie, this assumes -r is not used).
-This is useful after the tests have prematurely stopped for some external
-reason and you want to start running from where you left off rather
-than starting from the beginning.
-
--f reads the names of tests from the file given as f's argument, one
-or more test names per line. Whitespace is ignored. Blank lines and
-lines beginning with '#' are ignored. This is especially useful for
-whittling down failures involving interactions among tests.
-
--L causes the leaks(1) command to be run just before exit if it exists.
-leaks(1) is available on Mac OS X and presumably on some other
-FreeBSD-derived systems.
-
--R runs each test several times and examines sys.gettotalrefcount() to
-see if the test appears to be leaking references. The argument should
-be of the form stab:run:fname where 'stab' is the number of times the
-test is run to let gettotalrefcount settle down, 'run' is the number
-of times further it is run and 'fname' is the name of the file the
-reports are written to. These parameters all have defaults (5, 4 and
-"reflog.txt" respectively), and the minimal invocation is '-R :'.
-
--M runs tests that require an exorbitant amount of memory. These tests
-typically try to ascertain containers keep working when containing more than
-2 billion objects, which only works on 64-bit systems. There are also some
-tests that try to exhaust the address space of the process, which only makes
-sense on 32-bit systems with at least 2Gb of memory. The passed-in memlimit,
-which is a string in the form of '2.5Gb', determines howmuch memory the
-tests will limit themselves to (but they may go slightly over.) The number
-shouldn't be more memory than the machine has (including swap memory). You
-should also keep in mind that swap memory is generally much, much slower
-than RAM, and setting memlimit to all available RAM or higher will heavily
-tax the machine. On the other hand, it is no use running these tests with a
-limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect
-to use more than memlimit memory will be skipped. The big-memory tests
-generally run very, very long.
-
--u is used to specify which special resource intensive tests to run,
-such as those requiring large file support or network connectivity.
-The argument is a comma-separated list of words indicating the
-resources to test. Currently only the following are defined:
-
- all - Enable all special resources.
-
- none - Disable all special resources (this is the default).
-
- audio - Tests that use the audio device. (There are known
- cases of broken audio drivers that can crash Python or
- even the Linux kernel.)
-
- curses - Tests that use curses and will modify the terminal's
- state and output modes.
-
- largefile - It is okay to run some test that may create huge
- files. These tests can take a long time and may
- consume >2GB of disk space temporarily.
-
- network - It is okay to run tests that use external network
- resource, e.g. testing SSL support for sockets.
-
- decimal - Test the decimal module against a large suite that
- verifies compliance with standards.
-
- cpu - Used for certain CPU-heavy tests.
-
- subprocess Run all tests for the subprocess module.
-
- urlfetch - It is okay to download files required on testing.
-
- gui - Run tests that require a running GUI.
-
-To enable all resources except one, use '-uall,-<resource>'. For
-example, to run all the tests except for the gui tests, give the
-option '-uall,-gui'.
-"""
-
# We import importlib *ASAP* in order to test #15386
import importlib
-import argparse
-import builtins
-import faulthandler
-import io
-import json
-import locale
-import logging
import os
-import platform
-import random
-import re
-import shutil
-import signal
import sys
-import sysconfig
-import tempfile
-import time
-import traceback
-import unittest
-import warnings
-from inspect import isabstract
-
-try:
- import threading
-except ImportError:
- threading = None
-try:
- import _multiprocessing, multiprocessing.process
-except ImportError:
- multiprocessing = None
-
-
-# Some times __path__ and __file__ are not absolute (e.g. while running from
-# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
-# imports might fail. This affects only the modules imported before os.chdir().
-# These modules are searched first in sys.path[0] (so '' -- the CWD) and if
-# they are found in the CWD their __file__ and __path__ will be relative (this
-# happens before the chdir). All the modules imported after the chdir, are
-# not found in the CWD, and since the other paths in sys.path[1:] are absolute
-# (site.py absolutize them), the __file__ and __path__ will be absolute too.
-# Therefore it is necessary to absolutize manually the __file__ and __path__ of
-# the packages to prevent later imports to fail when the CWD is different.
-for module in sys.modules.values():
- if hasattr(module, '__path__'):
- module.__path__ = [os.path.abspath(path) for path in module.__path__]
- if hasattr(module, '__file__'):
- module.__file__ = os.path.abspath(module.__file__)
-
-
-# MacOSX (a.k.a. Darwin) has a default stack size that is too small
-# for deeply recursive regular expressions. We see this as crashes in
-# the Python test suite when running test_re.py and test_sre.py. The
-# fix is to set the stack limit to 2048.
-# This approach may also be useful for other Unixy platforms that
-# suffer from small default stack limits.
-if sys.platform == 'darwin':
- try:
- import resource
- except ImportError:
- pass
- else:
- soft, hard = resource.getrlimit(resource.RLIMIT_STACK)
- newsoft = min(hard, max(soft, 1024*2048))
- resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard))
-
-# Test result constants.
-PASSED = 1
-FAILED = 0
-ENV_CHANGED = -1
-SKIPPED = -2
-RESOURCE_DENIED = -3
-INTERRUPTED = -4
-CHILD_ERROR = -5 # error in a child process
-
-from test import support
-
-RESOURCE_NAMES = ('audio', 'curses', 'largefile', 'network',
- 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui')
-
-# When tests are run from the Python build directory, it is best practice
-# to keep the test files in a subfolder. This eases the cleanup of leftover
-# files using the "make distclean" command.
-if sysconfig.is_python_build():
- TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
-else:
- TEMPDIR = tempfile.gettempdir()
-TEMPDIR = os.path.abspath(TEMPDIR)
-
-class _ArgParser(argparse.ArgumentParser):
-
- def error(self, message):
- super().error(message + "\nPass -h or --help for complete help.")
-
-def _create_parser():
- # Set prog to prevent the uninformative "__main__.py" from displaying in
- # error messages when using "python -m test ...".
- parser = _ArgParser(prog='regrtest.py',
- usage=USAGE,
- description=DESCRIPTION,
- epilog=EPILOG,
- add_help=False,
- formatter_class=argparse.RawDescriptionHelpFormatter)
-
- # Arguments with this clause added to its help are described further in
- # the epilog's "Additional option details" section.
- more_details = ' See the section at bottom for more details.'
-
- group = parser.add_argument_group('General options')
- # We add help explicitly to control what argument group it renders under.
- group.add_argument('-h', '--help', action='help',
- help='show this help message and exit')
- group.add_argument('--timeout', metavar='TIMEOUT', type=float,
- help='dump the traceback and exit if a test takes '
- 'more than TIMEOUT seconds; disabled if TIMEOUT '
- 'is negative or equals to zero')
- group.add_argument('--wait', action='store_true',
- help='wait for user input, e.g., allow a debugger '
- 'to be attached')
- group.add_argument('--slaveargs', metavar='ARGS')
- group.add_argument('-S', '--start', metavar='START',
- help='the name of the test at which to start.' +
- more_details)
-
- group = parser.add_argument_group('Verbosity')
- group.add_argument('-v', '--verbose', action='count',
- help='run tests in verbose mode with output to stdout')
- group.add_argument('-w', '--verbose2', action='store_true',
- help='re-run failed tests in verbose mode')
- group.add_argument('-W', '--verbose3', action='store_true',
- help='display test output on failure')
- group.add_argument('-q', '--quiet', action='store_true',
- help='no output unless one or more tests fail')
- group.add_argument('-o', '--slow', action='store_true', dest='print_slow',
- help='print the slowest 10 tests')
- group.add_argument('--header', action='store_true',
- help='print header with interpreter info')
-
- group = parser.add_argument_group('Selecting tests')
- group.add_argument('-r', '--randomize', action='store_true',
- help='randomize test execution order.' + more_details)
- group.add_argument('--randseed', metavar='SEED',
- dest='random_seed', type=int,
- help='pass a random seed to reproduce a previous '
- 'random run')
- group.add_argument('-f', '--fromfile', metavar='FILE',
- help='read names of tests to run from a file.' +
- more_details)
- group.add_argument('-x', '--exclude', action='store_true',
- help='arguments are tests to *exclude*')
- group.add_argument('-s', '--single', action='store_true',
- help='single step through a set of tests.' +
- more_details)
- group.add_argument('-m', '--match', metavar='PAT',
- dest='match_tests',
- help='match test cases and methods with glob pattern PAT')
- group.add_argument('-G', '--failfast', action='store_true',
- help='fail as soon as a test fails (only with -v or -W)')
- group.add_argument('-u', '--use', metavar='RES1,RES2,...',
- action='append', type=resources_list,
- help='specify which special resource intensive tests '
- 'to run.' + more_details)
- group.add_argument('-M', '--memlimit', metavar='LIMIT',
- help='run very large memory-consuming tests.' +
- more_details)
- group.add_argument('--testdir', metavar='DIR',
- type=relative_filename,
- help='execute test files in the specified directory '
- '(instead of the Python stdlib test suite)')
-
- group = parser.add_argument_group('Special runs')
- group.add_argument('-l', '--findleaks', action='store_true',
- help='if GC is available detect tests that leak memory')
- group.add_argument('-L', '--runleaks', action='store_true',
- help='run the leaks(1) command just before exit.' +
- more_details)
- group.add_argument('-R', '--huntrleaks', metavar='RUNCOUNTS',
- type=huntrleaks,
- help='search for reference leaks (needs debug build, '
- 'very slow).' + more_details)
- group.add_argument('-j', '--multiprocess', metavar='PROCESSES',
- dest='use_mp', type=int,
- help='run PROCESSES processes at once')
- group.add_argument('-T', '--coverage', action='store_true',
- dest='trace',
- help='turn on code coverage tracing using the trace '
- 'module')
- group.add_argument('-D', '--coverdir', metavar='DIR',
- type=relative_filename,
- help='directory where coverage files are put')
- group.add_argument('-N', '--nocoverdir',
- action='store_const', const=None, dest='coverdir',
- help='put coverage files alongside modules')
- group.add_argument('-t', '--threshold', metavar='THRESHOLD',
- type=int,
- help='call gc.set_threshold(THRESHOLD)')
- group.add_argument('-n', '--nowindows', action='store_true',
- help='suppress error message boxes on Windows')
- group.add_argument('-F', '--forever', action='store_true',
- help='run the specified tests in a loop, until an '
- 'error happens')
- group.add_argument('-P', '--pgo', dest='pgo', action='store_true',
- help='enable Profile Guided Optimization training')
-
- parser.add_argument('args', nargs=argparse.REMAINDER,
- help=argparse.SUPPRESS)
-
- return parser
-
-def relative_filename(string):
- # CWD is replaced with a temporary dir before calling main(), so we
- # join it with the saved CWD so it ends up where the user expects.
- return os.path.join(support.SAVEDCWD, string)
-
-def huntrleaks(string):
- args = string.split(':')
- if len(args) not in (2, 3):
- raise argparse.ArgumentTypeError(
- 'needs 2 or 3 colon-separated arguments')
- nwarmup = int(args[0]) if args[0] else 5
- ntracked = int(args[1]) if args[1] else 4
- fname = args[2] if len(args) > 2 and args[2] else 'reflog.txt'
- return nwarmup, ntracked, fname
-
-def resources_list(string):
- u = [x.lower() for x in string.split(',')]
- for r in u:
- if r == 'all' or r == 'none':
- continue
- if r[0] == '-':
- r = r[1:]
- if r not in RESOURCE_NAMES:
- raise argparse.ArgumentTypeError('invalid resource: ' + r)
- return u
-
-def _parse_args(args, **kwargs):
- # Defaults
- ns = argparse.Namespace(testdir=None, verbose=0, quiet=False,
- exclude=False, single=False, randomize=False, fromfile=None,
- findleaks=False, use_resources=None, trace=False, coverdir='coverage',
- runleaks=False, huntrleaks=False, verbose2=False, print_slow=False,
- random_seed=None, use_mp=None, verbose3=False, forever=False,
- header=False, failfast=False, match_tests=None, pgo=False)
- for k, v in kwargs.items():
- if not hasattr(ns, k):
- raise TypeError('%r is an invalid keyword argument '
- 'for this function' % k)
- setattr(ns, k, v)
- if ns.use_resources is None:
- ns.use_resources = []
-
- parser = _create_parser()
- parser.parse_args(args=args, namespace=ns)
-
- if ns.single and ns.fromfile:
- parser.error("-s and -f don't go together!")
- if ns.use_mp and ns.trace:
- parser.error("-T and -j don't go together!")
- if ns.use_mp and ns.findleaks:
- parser.error("-l and -j don't go together!")
- if ns.use_mp and ns.memlimit:
- parser.error("-M and -j don't go together!")
- if ns.failfast and not (ns.verbose or ns.verbose3):
- parser.error("-G/--failfast needs either -v or -W")
-
- if ns.quiet:
- ns.verbose = 0
- if ns.timeout is not None:
- if hasattr(faulthandler, 'dump_traceback_later'):
- if ns.timeout <= 0:
- ns.timeout = None
- else:
- print("Warning: The timeout option requires "
- "faulthandler.dump_traceback_later")
- ns.timeout = None
- if ns.use_mp is not None:
- if ns.use_mp <= 0:
- # Use all cores + extras for tests that like to sleep
- ns.use_mp = 2 + (os.cpu_count() or 1)
- if ns.use_mp == 1:
- ns.use_mp = None
- if ns.use:
- for a in ns.use:
- for r in a:
- if r == 'all':
- ns.use_resources[:] = RESOURCE_NAMES
- continue
- if r == 'none':
- del ns.use_resources[:]
- continue
- remove = False
- if r[0] == '-':
- remove = True
- r = r[1:]
- if remove:
- if r in ns.use_resources:
- ns.use_resources.remove(r)
- elif r not in ns.use_resources:
- ns.use_resources.append(r)
- if ns.random_seed is not None:
- ns.randomize = True
-
- return ns
-
-
-def run_test_in_subprocess(testname, ns):
- """Run the given test in a subprocess with --slaveargs.
-
- ns is the option Namespace parsed from command-line arguments. regrtest
- is invoked in a subprocess with the --slaveargs argument; when the
- subprocess exits, its return code, stdout and stderr are returned as a
- 3-tuple.
- """
- from subprocess import Popen, PIPE
- base_cmd = ([sys.executable] + support.args_from_interpreter_flags() +
- ['-X', 'faulthandler', '-m', 'test.regrtest'])
- # required to spawn a new process with PGO flag on/off
- if ns.pgo:
- base_cmd = base_cmd + ['--pgo']
- slaveargs = (
- (testname, ns.verbose, ns.quiet),
- dict(huntrleaks=ns.huntrleaks,
- use_resources=ns.use_resources,
- output_on_failure=ns.verbose3,
- timeout=ns.timeout, failfast=ns.failfast,
- match_tests=ns.match_tests, pgo=ns.pgo))
- # Running the child from the same working directory as regrtest's original
- # invocation ensures that TEMPDIR for the child is the same when
- # sysconfig.is_python_build() is true. See issue 15300.
- popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)],
- stdout=PIPE, stderr=PIPE,
- universal_newlines=True,
- close_fds=(os.name != 'nt'),
- cwd=support.SAVEDCWD)
- stdout, stderr = popen.communicate()
- retcode = popen.wait()
- return retcode, stdout, stderr
-
-
-def main(tests=None, **kwargs):
- """Execute a test suite.
-
- This also parses command-line options and modifies its behavior
- accordingly.
-
- tests -- a list of strings containing test names (optional)
- testdir -- the directory in which to look for tests (optional)
-
- Users other than the Python test suite will certainly want to
- specify testdir; if it's omitted, the directory containing the
- Python test suite is searched for.
-
- If the tests argument is omitted, the tests listed on the
- command-line will be used. If that's empty, too, then all *.py
- files beginning with test_ will be used.
-
- The other default arguments (verbose, quiet, exclude,
- single, randomize, findleaks, use_resources, trace, coverdir,
- print_slow, and random_seed) allow programmers calling main()
- directly to set the values that would normally be set by flags
- on the command line.
- """
- # Display the Python traceback on fatal errors (e.g. segfault)
- faulthandler.enable(all_threads=True)
-
- # Display the Python traceback on SIGALRM or SIGUSR1 signal
- signals = []
- if hasattr(signal, 'SIGALRM'):
- signals.append(signal.SIGALRM)
- if hasattr(signal, 'SIGUSR1'):
- signals.append(signal.SIGUSR1)
- for signum in signals:
- faulthandler.register(signum, chain=True)
-
- replace_stdout()
-
- support.record_original_stdout(sys.stdout)
-
- ns = _parse_args(sys.argv[1:], **kwargs)
-
- if ns.huntrleaks:
- # Avoid false positives due to various caches
- # filling slowly with random data:
- warm_caches()
- if ns.memlimit is not None:
- support.set_memlimit(ns.memlimit)
- if ns.threshold is not None:
- import gc
- gc.set_threshold(ns.threshold)
- if ns.nowindows:
- print('The --nowindows (-n) option is deprecated. '
- 'Use -vv to display assertions in stderr.')
- try:
- import msvcrt
- except ImportError:
- pass
- else:
- msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS|
- msvcrt.SEM_NOALIGNMENTFAULTEXCEPT|
- msvcrt.SEM_NOGPFAULTERRORBOX|
- msvcrt.SEM_NOOPENFILEERRORBOX)
- try:
- msvcrt.CrtSetReportMode
- except AttributeError:
- # release build
- pass
- else:
- for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
- if ns.verbose and ns.verbose >= 2:
- msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
- msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
- else:
- msvcrt.CrtSetReportMode(m, 0)
- if ns.wait:
- input("Press any key to continue...")
-
- if ns.slaveargs is not None:
- args, kwargs = json.loads(ns.slaveargs)
- if kwargs.get('huntrleaks'):
- unittest.BaseTestSuite._cleanup = False
- try:
- result = runtest(*args, **kwargs)
- except KeyboardInterrupt:
- result = INTERRUPTED, ''
- except BaseException as e:
- traceback.print_exc()
- result = CHILD_ERROR, str(e)
- sys.stdout.flush()
- print() # Force a newline (just in case)
- print(json.dumps(result))
- sys.exit(0)
-
- good = []
- bad = []
- skipped = []
- resource_denieds = []
- environment_changed = []
- interrupted = False
-
- if ns.findleaks:
- try:
- import gc
- except ImportError:
- print('No GC available, disabling findleaks.')
- ns.findleaks = False
- else:
- # Uncomment the line below to report garbage that is not
- # freeable by reference counting alone. By default only
- # garbage that is not collectable by the GC is reported.
- #gc.set_debug(gc.DEBUG_SAVEALL)
- found_garbage = []
-
- if ns.huntrleaks:
- unittest.BaseTestSuite._cleanup = False
-
- if ns.single:
- filename = os.path.join(TEMPDIR, 'pynexttest')
- try:
- with open(filename, 'r') as fp:
- next_test = fp.read().strip()
- tests = [next_test]
- except OSError:
- pass
-
- if ns.fromfile:
- tests = []
- with open(os.path.join(support.SAVEDCWD, ns.fromfile)) as fp:
- count_pat = re.compile(r'\[\s*\d+/\s*\d+\]')
- for line in fp:
- line = count_pat.sub('', line)
- guts = line.split() # assuming no test has whitespace in its name
- if guts and not guts[0].startswith('#'):
- tests.extend(guts)
-
- # Strip .py extensions.
- removepy(ns.args)
- removepy(tests)
-
- stdtests = STDTESTS[:]
- nottests = NOTTESTS.copy()
- if ns.exclude:
- for arg in ns.args:
- if arg in stdtests:
- stdtests.remove(arg)
- nottests.add(arg)
- ns.args = []
-
- # For a partial run, we do not need to clutter the output.
- if (ns.verbose or ns.header or
- not (ns.pgo or ns.quiet or ns.single or tests or ns.args)):
- # Print basic platform information
- print("==", platform.python_implementation(), *sys.version.split())
- print("== ", platform.platform(aliased=True),
- "%s-endian" % sys.byteorder)
- print("== ", "hash algorithm:", sys.hash_info.algorithm,
- "64bit" if sys.maxsize > 2**32 else "32bit")
- print("== ", os.getcwd())
- print("Testing with flags:", sys.flags)
-
- # if testdir is set, then we are not running the python tests suite, so
- # don't add default tests to be executed or skipped (pass empty values)
- if ns.testdir:
- alltests = findtests(ns.testdir, list(), set())
- else:
- alltests = findtests(ns.testdir, stdtests, nottests)
-
- selected = tests or ns.args or alltests
- if ns.single:
- selected = selected[:1]
- try:
- next_single_test = alltests[alltests.index(selected[0])+1]
- except IndexError:
- next_single_test = None
- # Remove all the selected tests that precede start if it's set.
- if ns.start:
- try:
- del selected[:selected.index(ns.start)]
- except ValueError:
- print("Couldn't find starting test (%s), using all tests" % ns.start)
- if ns.randomize:
- if ns.random_seed is None:
- ns.random_seed = random.randrange(10000000)
- random.seed(ns.random_seed)
- print("Using random seed", ns.random_seed)
- random.shuffle(selected)
- if ns.trace:
- import trace, tempfile
- tracer = trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix,
- tempfile.gettempdir()],
- trace=False, count=True)
-
- test_times = []
- support.verbose = ns.verbose # Tell tests to be moderately quiet
- support.use_resources = ns.use_resources
- save_modules = sys.modules.keys()
-
- def accumulate_result(test, result):
- ok, test_time = result
- if ok not in (CHILD_ERROR, INTERRUPTED):
- test_times.append((test_time, test))
- if ok == PASSED:
- good.append(test)
- elif ok == FAILED:
- bad.append(test)
- elif ok == ENV_CHANGED:
- environment_changed.append(test)
- elif ok == SKIPPED:
- skipped.append(test)
- elif ok == RESOURCE_DENIED:
- skipped.append(test)
- resource_denieds.append(test)
-
- if ns.forever:
- def test_forever(tests=list(selected)):
- while True:
- for test in tests:
- yield test
- if bad:
- return
- tests = test_forever()
- test_count = ''
- test_count_width = 3
- else:
- tests = iter(selected)
- test_count = '/{}'.format(len(selected))
- test_count_width = len(test_count) - 1
-
- if ns.use_mp:
- try:
- from threading import Thread
- except ImportError:
- print("Multiprocess option requires thread support")
- sys.exit(2)
- from queue import Queue
- debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
- output = Queue()
- pending = MultiprocessTests(tests)
- def work():
- # A worker thread.
- try:
- while True:
- try:
- test = next(pending)
- except StopIteration:
- output.put((None, None, None, None))
- return
- retcode, stdout, stderr = run_test_in_subprocess(test, ns)
- # Strip last refcount output line if it exists, since it
- # comes from the shutdown of the interpreter in the subcommand.
- stderr = debug_output_pat.sub("", stderr)
- stdout, _, result = stdout.strip().rpartition("\n")
- if retcode != 0:
- result = (CHILD_ERROR, "Exit code %s" % retcode)
- output.put((test, stdout.rstrip(), stderr.rstrip(), result))
- return
- if not result:
- output.put((None, None, None, None))
- return
- result = json.loads(result)
- output.put((test, stdout.rstrip(), stderr.rstrip(), result))
- except BaseException:
- output.put((None, None, None, None))
- raise
- workers = [Thread(target=work) for i in range(ns.use_mp)]
- for worker in workers:
- worker.start()
- finished = 0
- test_index = 1
- try:
- while finished < ns.use_mp:
- test, stdout, stderr, result = output.get()
- if test is None:
- finished += 1
- continue
- accumulate_result(test, result)
- if not ns.quiet:
- if bad and not ns.pgo:
- fmt = "[{1:{0}}{2}/{3}] {4}"
- else:
- fmt = "[{1:{0}}{2}] {4}"
- print(fmt.format(
- test_count_width, test_index, test_count,
- len(bad), test))
- if stdout:
- print(stdout)
- if stderr and not ns.pgo:
- print(stderr, file=sys.stderr)
- sys.stdout.flush()
- sys.stderr.flush()
- if result[0] == INTERRUPTED:
- raise KeyboardInterrupt
- if result[0] == CHILD_ERROR:
- raise Exception("Child error on {}: {}".format(test, result[1]))
- test_index += 1
- except KeyboardInterrupt:
- interrupted = True
- pending.interrupted = True
- for worker in workers:
- worker.join()
- else:
- for test_index, test in enumerate(tests, 1):
- if not ns.quiet:
- if bad and not ns.pgo:
- fmt = "[{1:{0}}{2}/{3}] {4}"
- else:
- fmt = "[{1:{0}}{2}] {4}"
- print(fmt.format(
- test_count_width, test_index, test_count, len(bad), test))
- sys.stdout.flush()
- if ns.trace:
- # If we're tracing code coverage, then we don't exit with status
- # if on a false return value from main.
- tracer.runctx('runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)',
- globals=globals(), locals=vars())
- else:
- try:
- result = runtest(test, ns.verbose, ns.quiet,
- ns.huntrleaks,
- output_on_failure=ns.verbose3,
- timeout=ns.timeout, failfast=ns.failfast,
- match_tests=ns.match_tests, pgo=ns.pgo)
- accumulate_result(test, result)
- except KeyboardInterrupt:
- interrupted = True
- break
- if ns.findleaks:
- gc.collect()
- if gc.garbage:
- print("Warning: test created", len(gc.garbage), end=' ')
- print("uncollectable object(s).")
- # move the uncollectable objects somewhere so we don't see
- # them again
- found_garbage.extend(gc.garbage)
- del gc.garbage[:]
- # Unload the newly imported modules (best effort finalization)
- for module in sys.modules.keys():
- if module not in save_modules and module.startswith("test."):
- support.unload(module)
-
- if interrupted and not ns.pgo:
- # print a newline after ^C
- print()
- print("Test suite interrupted by signal SIGINT.")
- omitted = set(selected) - set(good) - set(bad) - set(skipped)
- print(count(len(omitted), "test"), "omitted:")
- printlist(omitted)
- if good and not ns.quiet and not ns.pgo:
- if not bad and not skipped and not interrupted and len(good) > 1:
- print("All", end=' ')
- print(count(len(good), "test"), "OK.")
- if ns.print_slow:
- test_times.sort(reverse=True)
- print("10 slowest tests:")
- for time, test in test_times[:10]:
- print("%s: %.1fs" % (test, time))
- if bad and not ns.pgo:
- print(count(len(bad), "test"), "failed:")
- printlist(bad)
- if environment_changed and not ns.pgo:
- print("{} altered the execution environment:".format(
- count(len(environment_changed), "test")))
- printlist(environment_changed)
- if skipped and not ns.quiet and not ns.pgo:
- print(count(len(skipped), "test"), "skipped:")
- printlist(skipped)
-
- if ns.verbose2 and bad:
- print("Re-running failed tests in verbose mode")
- for test in bad[:]:
- if not ns.pgo:
- print("Re-running test %r in verbose mode" % test)
- sys.stdout.flush()
- try:
- ns.verbose = True
- ok = runtest(test, True, ns.quiet, ns.huntrleaks,
- timeout=ns.timeout, pgo=ns.pgo)
- except KeyboardInterrupt:
- # print a newline separate from the ^C
- print()
- break
- else:
- if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
- bad.remove(test)
- else:
- if bad:
- print(count(len(bad), 'test'), "failed again:")
- printlist(bad)
-
- if ns.single:
- if next_single_test:
- with open(filename, 'w') as fp:
- fp.write(next_single_test + '\n')
- else:
- os.unlink(filename)
-
- if ns.trace:
- r = tracer.results()
- r.write_results(show_missing=True, summary=True, coverdir=ns.coverdir)
-
- if ns.runleaks:
- os.system("leaks %d" % os.getpid())
-
- sys.exit(len(bad) > 0 or interrupted)
-
-
-# small set of tests to determine if we have a basically functioning interpreter
-# (i.e. if any of these fail, then anything else is likely to follow)
-STDTESTS = [
- 'test_grammar',
- 'test_opcodes',
- 'test_dict',
- 'test_builtin',
- 'test_exceptions',
- 'test_types',
- 'test_unittest',
- 'test_doctest',
- 'test_doctest2',
- 'test_support'
-]
-
-# set of tests that we don't want to be executed when using regrtest
-NOTTESTS = set()
-
-def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
- """Return a list of all applicable test modules."""
- testdir = findtestdir(testdir)
- names = os.listdir(testdir)
- tests = []
- others = set(stdtests) | nottests
- for name in names:
- mod, ext = os.path.splitext(name)
- if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
- tests.append(mod)
- return stdtests + sorted(tests)
-
-# We do not use a generator so multiple threads can call next().
-class MultiprocessTests(object):
-
- """A thread-safe iterator over tests for multiprocess mode."""
-
- def __init__(self, tests):
- self.interrupted = False
- self.lock = threading.Lock()
- self.tests = tests
-
- def __iter__(self):
- return self
-
- def __next__(self):
- with self.lock:
- if self.interrupted:
- raise StopIteration('tests interrupted')
- return next(self.tests)
-
-def replace_stdout():
- """Set stdout encoder error handler to backslashreplace (as stderr error
- handler) to avoid UnicodeEncodeError when printing a traceback"""
- import atexit
-
- stdout = sys.stdout
- sys.stdout = open(stdout.fileno(), 'w',
- encoding=stdout.encoding,
- errors="backslashreplace",
- closefd=False,
- newline='\n')
-
- def restore_stdout():
- sys.stdout.close()
- sys.stdout = stdout
- atexit.register(restore_stdout)
-
-def runtest(test, verbose, quiet,
- huntrleaks=False, use_resources=None,
- output_on_failure=False, failfast=False, match_tests=None,
- timeout=None, *, pgo=False):
- """Run a single test.
-
- test -- the name of the test
- verbose -- if true, print more messages
- quiet -- if true, don't print 'skipped' messages (probably redundant)
- huntrleaks -- run multiple times to test for leaks; requires a debug
- build; a triple corresponding to -R's three arguments
- use_resources -- list of extra resources to use
- output_on_failure -- if true, display test output on failure
- timeout -- dump the traceback and exit if a test takes more than
- timeout seconds
- failfast, match_tests -- See regrtest command-line flags for these.
- pgo -- if true, do not print unnecessary info when running the test
- for Profile Guided Optimization build
-
- Returns the tuple result, test_time, where result is one of the constants:
- INTERRUPTED KeyboardInterrupt when run under -j
- RESOURCE_DENIED test skipped because resource denied
- SKIPPED test skipped for some other reason
- ENV_CHANGED test failed because it changed the execution environment
- FAILED test failed
- PASSED test passed
- """
- if use_resources is not None:
- support.use_resources = use_resources
- use_timeout = (timeout is not None)
- if use_timeout:
- faulthandler.dump_traceback_later(timeout, exit=True)
- try:
- support.match_tests = match_tests
- if failfast:
- support.failfast = True
- if output_on_failure:
- support.verbose = True
-
- # Reuse the same instance to all calls to runtest(). Some
- # tests keep a reference to sys.stdout or sys.stderr
- # (eg. test_argparse).
- if runtest.stringio is None:
- stream = io.StringIO()
- runtest.stringio = stream
- else:
- stream = runtest.stringio
- stream.seek(0)
- stream.truncate()
-
- orig_stdout = sys.stdout
- orig_stderr = sys.stderr
- try:
- sys.stdout = stream
- sys.stderr = stream
- result = runtest_inner(test, verbose, quiet, huntrleaks,
- display_failure=False, pgo=pgo)
- if result[0] == FAILED and not pgo:
- output = stream.getvalue()
- orig_stderr.write(output)
- orig_stderr.flush()
- finally:
- sys.stdout = orig_stdout
- sys.stderr = orig_stderr
- else:
- support.verbose = verbose # Tell tests to be moderately quiet
- result = runtest_inner(test, verbose, quiet, huntrleaks,
- display_failure=not verbose, pgo=pgo)
- return result
- finally:
- if use_timeout:
- faulthandler.cancel_dump_traceback_later()
- cleanup_test_droppings(test, verbose)
-runtest.stringio = None
-
-# Unit tests are supposed to leave the execution environment unchanged
-# once they complete. But sometimes tests have bugs, especially when
-# tests fail, and the changes to environment go on to mess up other
-# tests. This can cause issues with buildbot stability, since tests
-# are run in random order and so problems may appear to come and go.
-# There are a few things we can save and restore to mitigate this, and
-# the following context manager handles this task.
-
-class saved_test_environment:
- """Save bits of the test environment and restore them at block exit.
-
- with saved_test_environment(testname, verbose, quiet):
- #stuff
-
- Unless quiet is True, a warning is printed to stderr if any of
- the saved items was changed by the test. The attribute 'changed'
- is initially False, but is set to True if a change is detected.
-
- If verbose is more than 1, the before and after state of changed
- items is also printed.
- """
-
- changed = False
-
- def __init__(self, testname, verbose=0, quiet=False, *, pgo=False):
- self.testname = testname
- self.verbose = verbose
- self.quiet = quiet
- self.pgo = pgo
-
- # To add things to save and restore, add a name XXX to the resources list
- # and add corresponding get_XXX/restore_XXX functions. get_XXX should
- # return the value to be saved and compared against a second call to the
- # get function when test execution completes. restore_XXX should accept
- # the saved value and restore the resource using it. It will be called if
- # and only if a change in the value is detected.
- #
- # Note: XXX will have any '.' replaced with '_' characters when determining
- # the corresponding method names.
-
- resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
- 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
- 'warnings.filters', 'asyncore.socket_map',
- 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
- 'sys.warnoptions',
- # multiprocessing.process._cleanup() may release ref
- # to a thread, so check processes first.
- 'multiprocessing.process._dangling', 'threading._dangling',
- 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
- 'files', 'locale', 'warnings.showwarning',
- )
-
- def get_sys_argv(self):
- return id(sys.argv), sys.argv, sys.argv[:]
- def restore_sys_argv(self, saved_argv):
- sys.argv = saved_argv[1]
- sys.argv[:] = saved_argv[2]
-
- def get_cwd(self):
- return os.getcwd()
- def restore_cwd(self, saved_cwd):
- os.chdir(saved_cwd)
-
- def get_sys_stdout(self):
- return sys.stdout
- def restore_sys_stdout(self, saved_stdout):
- sys.stdout = saved_stdout
-
- def get_sys_stderr(self):
- return sys.stderr
- def restore_sys_stderr(self, saved_stderr):
- sys.stderr = saved_stderr
-
- def get_sys_stdin(self):
- return sys.stdin
- def restore_sys_stdin(self, saved_stdin):
- sys.stdin = saved_stdin
-
- def get_os_environ(self):
- return id(os.environ), os.environ, dict(os.environ)
- def restore_os_environ(self, saved_environ):
- os.environ = saved_environ[1]
- os.environ.clear()
- os.environ.update(saved_environ[2])
-
- def get_sys_path(self):
- return id(sys.path), sys.path, sys.path[:]
- def restore_sys_path(self, saved_path):
- sys.path = saved_path[1]
- sys.path[:] = saved_path[2]
-
- def get_sys_path_hooks(self):
- return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
- def restore_sys_path_hooks(self, saved_hooks):
- sys.path_hooks = saved_hooks[1]
- sys.path_hooks[:] = saved_hooks[2]
-
- def get_sys_gettrace(self):
- return sys.gettrace()
- def restore_sys_gettrace(self, trace_fxn):
- sys.settrace(trace_fxn)
-
- def get___import__(self):
- return builtins.__import__
- def restore___import__(self, import_):
- builtins.__import__ = import_
-
- def get_warnings_filters(self):
- return id(warnings.filters), warnings.filters, warnings.filters[:]
- def restore_warnings_filters(self, saved_filters):
- warnings.filters = saved_filters[1]
- warnings.filters[:] = saved_filters[2]
-
- def get_asyncore_socket_map(self):
- asyncore = sys.modules.get('asyncore')
- # XXX Making a copy keeps objects alive until __exit__ gets called.
- return asyncore and asyncore.socket_map.copy() or {}
- def restore_asyncore_socket_map(self, saved_map):
- asyncore = sys.modules.get('asyncore')
- if asyncore is not None:
- asyncore.close_all(ignore_all=True)
- asyncore.socket_map.update(saved_map)
-
- def get_shutil_archive_formats(self):
- # we could call get_archives_formats() but that only returns the
- # registry keys; we want to check the values too (the functions that
- # are registered)
- return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
- def restore_shutil_archive_formats(self, saved):
- shutil._ARCHIVE_FORMATS = saved[0]
- shutil._ARCHIVE_FORMATS.clear()
- shutil._ARCHIVE_FORMATS.update(saved[1])
-
- def get_shutil_unpack_formats(self):
- return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
- def restore_shutil_unpack_formats(self, saved):
- shutil._UNPACK_FORMATS = saved[0]
- shutil._UNPACK_FORMATS.clear()
- shutil._UNPACK_FORMATS.update(saved[1])
-
- def get_logging__handlers(self):
- # _handlers is a WeakValueDictionary
- return id(logging._handlers), logging._handlers, logging._handlers.copy()
- def restore_logging__handlers(self, saved_handlers):
- # Can't easily revert the logging state
- pass
-
- def get_logging__handlerList(self):
- # _handlerList is a list of weakrefs to handlers
- return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
- def restore_logging__handlerList(self, saved_handlerList):
- # Can't easily revert the logging state
- pass
-
- def get_sys_warnoptions(self):
- return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
- def restore_sys_warnoptions(self, saved_options):
- sys.warnoptions = saved_options[1]
- sys.warnoptions[:] = saved_options[2]
-
- # Controlling dangling references to Thread objects can make it easier
- # to track reference leaks.
- def get_threading__dangling(self):
- if not threading:
- return None
- # This copies the weakrefs without making any strong reference
- return threading._dangling.copy()
- def restore_threading__dangling(self, saved):
- if not threading:
- return
- threading._dangling.clear()
- threading._dangling.update(saved)
-
- # Same for Process objects
- def get_multiprocessing_process__dangling(self):
- if not multiprocessing:
- return None
- # Unjoined process objects can survive after process exits
- multiprocessing.process._cleanup()
- # This copies the weakrefs without making any strong reference
- return multiprocessing.process._dangling.copy()
- def restore_multiprocessing_process__dangling(self, saved):
- if not multiprocessing:
- return
- multiprocessing.process._dangling.clear()
- multiprocessing.process._dangling.update(saved)
-
- def get_sysconfig__CONFIG_VARS(self):
- # make sure the dict is initialized
- sysconfig.get_config_var('prefix')
- return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
- dict(sysconfig._CONFIG_VARS))
- def restore_sysconfig__CONFIG_VARS(self, saved):
- sysconfig._CONFIG_VARS = saved[1]
- sysconfig._CONFIG_VARS.clear()
- sysconfig._CONFIG_VARS.update(saved[2])
-
- def get_sysconfig__INSTALL_SCHEMES(self):
- return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
- sysconfig._INSTALL_SCHEMES.copy())
- def restore_sysconfig__INSTALL_SCHEMES(self, saved):
- sysconfig._INSTALL_SCHEMES = saved[1]
- sysconfig._INSTALL_SCHEMES.clear()
- sysconfig._INSTALL_SCHEMES.update(saved[2])
-
- def get_files(self):
- return sorted(fn + ('/' if os.path.isdir(fn) else '')
- for fn in os.listdir())
- def restore_files(self, saved_value):
- fn = support.TESTFN
- if fn not in saved_value and (fn + '/') not in saved_value:
- if os.path.isfile(fn):
- support.unlink(fn)
- elif os.path.isdir(fn):
- support.rmtree(fn)
-
- _lc = [getattr(locale, lc) for lc in dir(locale)
- if lc.startswith('LC_')]
- def get_locale(self):
- pairings = []
- for lc in self._lc:
- try:
- pairings.append((lc, locale.setlocale(lc, None)))
- except (TypeError, ValueError):
- continue
- return pairings
- def restore_locale(self, saved):
- for lc, setting in saved:
- locale.setlocale(lc, setting)
-
- def get_warnings_showwarning(self):
- return warnings.showwarning
- def restore_warnings_showwarning(self, fxn):
- warnings.showwarning = fxn
-
- def resource_info(self):
- for name in self.resources:
- method_suffix = name.replace('.', '_')
- get_name = 'get_' + method_suffix
- restore_name = 'restore_' + method_suffix
- yield name, getattr(self, get_name), getattr(self, restore_name)
-
- def __enter__(self):
- self.saved_values = dict((name, get()) for name, get, restore
- in self.resource_info())
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- saved_values = self.saved_values
- del self.saved_values
- for name, get, restore in self.resource_info():
- current = get()
- original = saved_values.pop(name)
- # Check for changes to the resource's value
- if current != original:
- self.changed = True
- restore(original)
- if not self.quiet and not self.pgo:
- print("Warning -- {} was modified by {}".format(
- name, self.testname),
- file=sys.stderr)
- if self.verbose > 1 and not self.pgo:
- print(" Before: {}\n After: {} ".format(
- original, current),
- file=sys.stderr)
- return False
-
-
-def runtest_inner(test, verbose, quiet,
- huntrleaks=False, display_failure=True, pgo=False):
- support.unload(test)
-
- test_time = 0.0
- refleak = False # True if the test leaked references.
- try:
- if test.startswith('test.'):
- abstest = test
- else:
- # Always import it from the test package
- abstest = 'test.' + test
- with saved_test_environment(test, verbose, quiet, pgo=pgo) as environment:
- start_time = time.time()
- the_module = importlib.import_module(abstest)
- # If the test has a test_main, that will run the appropriate
- # tests. If not, use normal unittest test loading.
- test_runner = getattr(the_module, "test_main", None)
- if test_runner is None:
- def test_runner():
- loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
- for error in loader.errors:
- print(error, file=sys.stderr)
- if loader.errors:
- raise Exception("errors while loading tests")
- support.run_unittest(tests)
- test_runner()
- if huntrleaks:
- refleak = dash_R(the_module, test, test_runner, huntrleaks)
- test_time = time.time() - start_time
- except support.ResourceDenied as msg:
- if not quiet and not pgo:
- print(test, "skipped --", msg)
- sys.stdout.flush()
- return RESOURCE_DENIED, test_time
- except unittest.SkipTest as msg:
- if not quiet and not pgo:
- print(test, "skipped --", msg)
- sys.stdout.flush()
- return SKIPPED, test_time
- except KeyboardInterrupt:
- raise
- except support.TestFailed as msg:
- if not pgo:
- if display_failure:
- print("test", test, "failed --", msg, file=sys.stderr)
- else:
- print("test", test, "failed", file=sys.stderr)
- sys.stderr.flush()
- return FAILED, test_time
- except:
- msg = traceback.format_exc()
- if not pgo:
- print("test", test, "crashed --", msg, file=sys.stderr)
- sys.stderr.flush()
- return FAILED, test_time
- else:
- if refleak:
- return FAILED, test_time
- if environment.changed:
- return ENV_CHANGED, test_time
- return PASSED, test_time
-
-def cleanup_test_droppings(testname, verbose):
- import shutil
- import stat
- import gc
-
- # First kill any dangling references to open files etc.
- # This can also issue some ResourceWarnings which would otherwise get
- # triggered during the following test run, and possibly produce failures.
- gc.collect()
-
- # Try to clean up junk commonly left behind. While tests shouldn't leave
- # any files or directories behind, when a test fails that can be tedious
- # for it to arrange. The consequences can be especially nasty on Windows,
- # since if a test leaves a file open, it cannot be deleted by name (while
- # there's nothing we can do about that here either, we can display the
- # name of the offending test, which is a real help).
- for name in (support.TESTFN,
- "db_home",
- ):
- if not os.path.exists(name):
- continue
-
- if os.path.isdir(name):
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise SystemError("os.path says %r exists but is neither "
- "directory nor file" % name)
-
- if verbose:
- print("%r left behind %s %r" % (testname, kind, name))
- try:
- # if we have chmod, fix possible permissions problems
- # that might prevent cleanup
- if (hasattr(os, 'chmod')):
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as msg:
- print(("%r left behind %s %r and it couldn't be "
- "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
-
-def dash_R(the_module, test, indirect_test, huntrleaks):
- """Run a test multiple times, looking for reference leaks.
-
- Returns:
- False if the test didn't leak references; True if we detected refleaks.
- """
- # This code is hackish and inelegant, but it seems to do the job.
- import copyreg
- import collections.abc
-
- if not hasattr(sys, 'gettotalrefcount'):
- raise Exception("Tracking reference leaks requires a debug build "
- "of Python")
-
- # Save current values for dash_R_cleanup() to restore.
- fs = warnings.filters[:]
- ps = copyreg.dispatch_table.copy()
- pic = sys.path_importer_cache.copy()
- try:
- import zipimport
- except ImportError:
- zdc = None # Run unmodified on platforms without zipimport support
- else:
- zdc = zipimport._zip_directory_cache.copy()
- abcs = {}
- for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
- if not isabstract(abc):
- continue
- for obj in abc.__subclasses__() + [abc]:
- abcs[obj] = obj._abc_registry.copy()
-
- nwarmup, ntracked, fname = huntrleaks
- fname = os.path.join(support.SAVEDCWD, fname)
- repcount = nwarmup + ntracked
- rc_deltas = [0] * repcount
- alloc_deltas = [0] * repcount
-
- print("beginning", repcount, "repetitions", file=sys.stderr)
- print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr)
- sys.stderr.flush()
- for i in range(repcount):
- indirect_test()
- alloc_after, rc_after = dash_R_cleanup(fs, ps, pic, zdc, abcs)
- sys.stderr.write('.')
- sys.stderr.flush()
- if i >= nwarmup:
- rc_deltas[i] = rc_after - rc_before
- alloc_deltas[i] = alloc_after - alloc_before
- alloc_before, rc_before = alloc_after, rc_after
- print(file=sys.stderr)
- # These checkers return False on success, True on failure
- def check_rc_deltas(deltas):
- return any(deltas)
- def check_alloc_deltas(deltas):
- # At least 1/3rd of 0s
- if 3 * deltas.count(0) < len(deltas):
- return True
- # Nothing else than 1s, 0s and -1s
- if not set(deltas) <= {1,0,-1}:
- return True
- return False
- failed = False
- for deltas, item_name, checker in [
- (rc_deltas, 'references', check_rc_deltas),
- (alloc_deltas, 'memory blocks', check_alloc_deltas)]:
- if checker(deltas):
- msg = '%s leaked %s %s, sum=%s' % (
- test, deltas[nwarmup:], item_name, sum(deltas))
- print(msg, file=sys.stderr)
- sys.stderr.flush()
- with open(fname, "a") as refrep:
- print(msg, file=refrep)
- refrep.flush()
- failed = True
- return failed
-
-def dash_R_cleanup(fs, ps, pic, zdc, abcs):
- import gc, copyreg
- import _strptime, linecache
- import urllib.parse, urllib.request, mimetypes, doctest
- import struct, filecmp, collections.abc
- from distutils.dir_util import _path_created
- from weakref import WeakSet
-
- # Clear the warnings registry, so they can be displayed again
- for mod in sys.modules.values():
- if hasattr(mod, '__warningregistry__'):
- del mod.__warningregistry__
-
- # Restore some original values.
- warnings.filters[:] = fs
- copyreg.dispatch_table.clear()
- copyreg.dispatch_table.update(ps)
- sys.path_importer_cache.clear()
- sys.path_importer_cache.update(pic)
- try:
- import zipimport
- except ImportError:
- pass # Run unmodified on platforms without zipimport support
- else:
- zipimport._zip_directory_cache.clear()
- zipimport._zip_directory_cache.update(zdc)
-
- # clear type cache
- sys._clear_type_cache()
-
- # Clear ABC registries, restoring previously saved ABC registries.
- for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
- if not isabstract(abc):
- continue
- for obj in abc.__subclasses__() + [abc]:
- obj._abc_registry = abcs.get(obj, WeakSet()).copy()
- obj._abc_cache.clear()
- obj._abc_negative_cache.clear()
-
- # Flush standard output, so that buffered data is sent to the OS and
- # associated Python objects are reclaimed.
- for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__):
- if stream is not None:
- stream.flush()
-
- # Clear assorted module caches.
- _path_created.clear()
- re.purge()
- _strptime._regex_cache.clear()
- urllib.parse.clear_cache()
- urllib.request.urlcleanup()
- linecache.clearcache()
- mimetypes._default_mime_types()
- filecmp._cache.clear()
- struct._clearcache()
- doctest.master = None
- try:
- import ctypes
- except ImportError:
- # Don't worry about resetting the cache if ctypes is not supported
- pass
- else:
- ctypes._reset_cache()
-
- # Collect cyclic trash and read memory statistics immediately after.
- func1 = sys.getallocatedblocks
- func2 = sys.gettotalrefcount
- gc.collect()
- return func1(), func2()
-
-def warm_caches():
- # char cache
- s = bytes(range(256))
- for i in range(256):
- s[i:i+1]
- # unicode cache
- x = [chr(i) for i in range(256)]
- # int cache
- x = list(range(-5, 257))
-
-def findtestdir(path=None):
- return path or os.path.dirname(__file__) or os.curdir
-
-def removepy(names):
- if not names:
- return
- for idx, name in enumerate(names):
- basename, ext = os.path.splitext(name)
- if ext == '.py':
- names[idx] = basename
-
-def count(n, word):
- if n == 1:
- return "%d %s" % (n, word)
- else:
- return "%d %ss" % (n, word)
-
-def printlist(x, width=70, indent=4):
- """Print the elements of iterable x to stdout.
-
- Optional arg width (default 70) is the maximum line length.
- Optional arg indent (default 4) is the number of blanks with which to
- begin each line.
- """
-
- from textwrap import fill
- blanks = ' ' * indent
- # Print the sorted list: 'x' may be a '--random' list or a set()
- print(fill(' '.join(str(elt) for elt in sorted(x)), width,
- initial_indent=blanks, subsequent_indent=blanks))
-
-
-def main_in_temp_cwd():
- """Run main() in a temporary working directory."""
- if sysconfig.is_python_build():
- try:
- os.mkdir(TEMPDIR)
- except FileExistsError:
- pass
-
- # Define a writable temp dir that will be used as cwd while running
- # the tests. The name of the dir includes the pid to allow parallel
- # testing (see the -j option).
- test_cwd = 'test_python_{}'.format(os.getpid())
- test_cwd = os.path.join(TEMPDIR, test_cwd)
-
- # Run the tests in a context manager that temporarily changes the CWD to a
- # temporary and writable directory. If it's not possible to create or
- # change the CWD, the original CWD will be used. The original CWD is
- # available from support.SAVEDCWD.
- with support.temp_cwd(test_cwd, quiet=True):
- main()
+from test.libregrtest import main, main_in_temp_cwd
if __name__ == '__main__':
diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py
index 27bfad5fb6..893ec394f6 100644
--- a/Lib/test/test_argparse.py
+++ b/Lib/test/test_argparse.py
@@ -4512,6 +4512,21 @@ class TestStrings(TestCase):
string = "Namespace(bar='spam', foo=42)"
self.assertStringEqual(ns, string)
+ def test_namespace_starkwargs_notidentifier(self):
+ ns = argparse.Namespace(**{'"': 'quote'})
+ string = """Namespace(**{'"': 'quote'})"""
+ self.assertStringEqual(ns, string)
+
+ def test_namespace_kwargs_and_starkwargs_notidentifier(self):
+ ns = argparse.Namespace(a=1, **{'"': 'quote'})
+ string = """Namespace(a=1, **{'"': 'quote'})"""
+ self.assertStringEqual(ns, string)
+
+ def test_namespace_starkwargs_identifier(self):
+ ns = argparse.Namespace(**{'valid': True})
+ string = "Namespace(valid=True)"
+ self.assertStringEqual(ns, string)
+
def test_parser(self):
parser = argparse.ArgumentParser(prog='PROG')
string = (
diff --git a/Lib/test/test_binascii.py b/Lib/test/test_binascii.py
index 8367afe083..fbc933e4e6 100644
--- a/Lib/test/test_binascii.py
+++ b/Lib/test/test_binascii.py
@@ -159,11 +159,25 @@ class BinASCIITest(unittest.TestCase):
# Then calculate the hexbin4 binary-to-ASCII translation
rle = binascii.rlecode_hqx(self.data)
a = binascii.b2a_hqx(self.type2test(rle))
+
b, _ = binascii.a2b_hqx(self.type2test(a))
res = binascii.rledecode_hqx(b)
-
self.assertEqual(res, self.rawdata)
+ def test_rle(self):
+ # test repetition with a repetition longer than the limit of 255
+ data = (b'a' * 100 + b'b' + b'c' * 300)
+
+ encoded = binascii.rlecode_hqx(data)
+ self.assertEqual(encoded,
+ (b'a\x90d' # 'a' * 100
+ b'b' # 'b'
+ b'c\x90\xff' # 'c' * 255
+ b'c\x90-')) # 'c' * 45
+
+ decoded = binascii.rledecode_hqx(encoded)
+ self.assertEqual(decoded, data)
+
def test_hex(self):
# test hexlification
s = b'{s\005\000\000\000worldi\002\000\000\000s\005\000\000\000helloi\001\000\000\0000'
@@ -262,6 +276,16 @@ class BinASCIITest(unittest.TestCase):
# non-ASCII string
self.assertRaises(ValueError, a2b, "\x80")
+ def test_b2a_base64_newline(self):
+ # Issue #25357: test newline parameter
+ b = self.type2test(b'hello')
+ self.assertEqual(binascii.b2a_base64(b),
+ b'aGVsbG8=\n')
+ self.assertEqual(binascii.b2a_base64(b, newline=True),
+ b'aGVsbG8=\n')
+ self.assertEqual(binascii.b2a_base64(b, newline=False),
+ b'aGVsbG8=')
+
class ArrayBinASCIITest(BinASCIITest):
def type2test(self, s):
diff --git a/Lib/test/test_bytes.py b/Lib/test/test_bytes.py
index 53a80f4b47..87799dfd19 100644
--- a/Lib/test/test_bytes.py
+++ b/Lib/test/test_bytes.py
@@ -301,6 +301,20 @@ class BaseBytesTest:
self.assertRaises(ValueError, self.type2test.fromhex, '\x00')
self.assertRaises(ValueError, self.type2test.fromhex, '12 \x00 34')
+ for data, pos in (
+ # invalid first hexadecimal character
+ ('12 x4 56', 3),
+ # invalid second hexadecimal character
+ ('12 3x 56', 4),
+ # two invalid hexadecimal characters
+ ('12 xy 56', 3),
+ # test non-ASCII string
+ ('12 3\xff 56', 4),
+ ):
+ with self.assertRaises(ValueError) as cm:
+ self.type2test.fromhex(data)
+ self.assertIn('at position %s' % pos, str(cm.exception))
+
def test_hex(self):
self.assertRaises(TypeError, self.type2test.hex)
self.assertRaises(TypeError, self.type2test.hex, 1)
@@ -782,26 +796,107 @@ class BytesTest(BaseBytesTest, unittest.TestCase):
# Test PyBytes_FromFormat()
def test_from_format(self):
- test.support.import_module('ctypes')
- from ctypes import pythonapi, py_object, c_int, c_char_p
+ ctypes = test.support.import_module('ctypes')
+ _testcapi = test.support.import_module('_testcapi')
+ from ctypes import pythonapi, py_object
+ from ctypes import (
+ c_int, c_uint,
+ c_long, c_ulong,
+ c_size_t, c_ssize_t,
+ c_char_p)
+
PyBytes_FromFormat = pythonapi.PyBytes_FromFormat
PyBytes_FromFormat.restype = py_object
+ # basic tests
self.assertEqual(PyBytes_FromFormat(b'format'),
b'format')
-
+ self.assertEqual(PyBytes_FromFormat(b'Hello %s !', b'world'),
+ b'Hello world !')
+
+ # test formatters
+ self.assertEqual(PyBytes_FromFormat(b'c=%c', c_int(0)),
+ b'c=\0')
+ self.assertEqual(PyBytes_FromFormat(b'c=%c', c_int(ord('@'))),
+ b'c=@')
+ self.assertEqual(PyBytes_FromFormat(b'c=%c', c_int(255)),
+ b'c=\xff')
+ self.assertEqual(PyBytes_FromFormat(b'd=%d ld=%ld zd=%zd',
+ c_int(1), c_long(2),
+ c_size_t(3)),
+ b'd=1 ld=2 zd=3')
+ self.assertEqual(PyBytes_FromFormat(b'd=%d ld=%ld zd=%zd',
+ c_int(-1), c_long(-2),
+ c_size_t(-3)),
+ b'd=-1 ld=-2 zd=-3')
+ self.assertEqual(PyBytes_FromFormat(b'u=%u lu=%lu zu=%zu',
+ c_uint(123), c_ulong(456),
+ c_size_t(789)),
+ b'u=123 lu=456 zu=789')
+ self.assertEqual(PyBytes_FromFormat(b'i=%i', c_int(123)),
+ b'i=123')
+ self.assertEqual(PyBytes_FromFormat(b'i=%i', c_int(-123)),
+ b'i=-123')
+ self.assertEqual(PyBytes_FromFormat(b'x=%x', c_int(0xabc)),
+ b'x=abc')
+
+ sizeof_ptr = ctypes.sizeof(c_char_p)
+
+ if os.name == 'nt':
+ # Windows (MSCRT)
+ ptr_format = '0x%0{}X'.format(2 * sizeof_ptr)
+ def ptr_formatter(ptr):
+ return (ptr_format % ptr)
+ else:
+ # UNIX (glibc)
+ def ptr_formatter(ptr):
+ return '%#x' % ptr
+
+ ptr = 0xabcdef
+ self.assertEqual(PyBytes_FromFormat(b'ptr=%p', c_char_p(ptr)),
+ ('ptr=' + ptr_formatter(ptr)).encode('ascii'))
+ self.assertEqual(PyBytes_FromFormat(b's=%s', c_char_p(b'cstr')),
+ b's=cstr')
+
+ # test minimum and maximum integer values
+ size_max = c_size_t(-1).value
+ for formatstr, ctypes_type, value, py_formatter in (
+ (b'%d', c_int, _testcapi.INT_MIN, str),
+ (b'%d', c_int, _testcapi.INT_MAX, str),
+ (b'%ld', c_long, _testcapi.LONG_MIN, str),
+ (b'%ld', c_long, _testcapi.LONG_MAX, str),
+ (b'%lu', c_ulong, _testcapi.ULONG_MAX, str),
+ (b'%zd', c_ssize_t, _testcapi.PY_SSIZE_T_MIN, str),
+ (b'%zd', c_ssize_t, _testcapi.PY_SSIZE_T_MAX, str),
+ (b'%zu', c_size_t, size_max, str),
+ (b'%p', c_char_p, size_max, ptr_formatter),
+ ):
+ self.assertEqual(PyBytes_FromFormat(formatstr, ctypes_type(value)),
+ py_formatter(value).encode('ascii')),
+
+ # width and precision (width is currently ignored)
+ self.assertEqual(PyBytes_FromFormat(b'%5s', b'a'),
+ b'a')
+ self.assertEqual(PyBytes_FromFormat(b'%.3s', b'abcdef'),
+ b'abc')
+
+ # '%%' formatter
+ self.assertEqual(PyBytes_FromFormat(b'%%'),
+ b'%')
+ self.assertEqual(PyBytes_FromFormat(b'[%%]'),
+ b'[%]')
+ self.assertEqual(PyBytes_FromFormat(b'%%%c', c_int(ord('_'))),
+ b'%_')
+ self.assertEqual(PyBytes_FromFormat(b'%%s'),
+ b'%s')
+
+ # Invalid formats and partial formatting
self.assertEqual(PyBytes_FromFormat(b'%'), b'%')
- self.assertEqual(PyBytes_FromFormat(b'%%'), b'%')
- self.assertEqual(PyBytes_FromFormat(b'%%s'), b'%s')
- self.assertEqual(PyBytes_FromFormat(b'[%%]'), b'[%]')
- self.assertEqual(PyBytes_FromFormat(b'%%%c', c_int(ord('_'))), b'%_')
-
- self.assertEqual(PyBytes_FromFormat(b'c:%c', c_int(255)),
- b'c:\xff')
- self.assertEqual(PyBytes_FromFormat(b's:%s', c_char_p(b'cstr')),
- b's:cstr')
+ self.assertEqual(PyBytes_FromFormat(b'x=%i y=%', c_int(2), c_int(3)),
+ b'x=2 y=%')
- # Issue #19969
+ # Issue #19969: %c must raise OverflowError for values
+ # not in the range [0; 255]
self.assertRaises(OverflowError,
PyBytes_FromFormat, b'%c', c_int(-1))
self.assertRaises(OverflowError,
diff --git a/Lib/test/test_calendar.py b/Lib/test/test_calendar.py
index 80ed632588..d9d3128ea8 100644
--- a/Lib/test/test_calendar.py
+++ b/Lib/test/test_calendar.py
@@ -702,19 +702,19 @@ class CommandLineTestCase(unittest.TestCase):
def assertFailure(self, *args):
rc, stdout, stderr = assert_python_failure('-m', 'calendar', *args)
- self.assertIn(b'Usage:', stderr)
+ self.assertIn(b'usage:', stderr)
self.assertEqual(rc, 2)
def test_help(self):
stdout = self.run_ok('-h')
- self.assertIn(b'Usage:', stdout)
+ self.assertIn(b'usage:', stdout)
self.assertIn(b'calendar.py', stdout)
self.assertIn(b'--help', stdout)
def test_illegal_arguments(self):
self.assertFailure('-z')
- #self.assertFailure('spam')
- #self.assertFailure('2004', 'spam')
+ self.assertFailure('spam')
+ self.assertFailure('2004', 'spam')
self.assertFailure('-t', 'html', '2004', '1')
def test_output_current_year(self):
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index b93e0ab0e2..4740b682aa 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -27,6 +27,7 @@ def coding_checker(self, coder):
self.assertEqual(coder(input), (expect, len(input)))
return check
+
class Queue(object):
"""
queue: write bytes at one end, read bytes from the other end
@@ -47,6 +48,7 @@ class Queue(object):
self._buffer = self._buffer[size:]
return s
+
class MixInCheckStateHandling:
def check_state_handling_decode(self, encoding, u, s):
for i in range(len(s)+1):
@@ -80,6 +82,7 @@ class MixInCheckStateHandling:
part2 = d.encode(u[i:], True)
self.assertEqual(s, part1+part2)
+
class ReadTest(MixInCheckStateHandling):
def check_partial(self, input, partialresults):
# get a StreamReader for the encoding and feed the bytestring version
@@ -358,6 +361,12 @@ class ReadTest(MixInCheckStateHandling):
self.assertEqual("[\uDC80]".encode(self.encoding, "replace"),
"[?]".encode(self.encoding))
+ # sequential surrogate characters
+ self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "ignore"),
+ "[]".encode(self.encoding))
+ self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "replace"),
+ "[??]".encode(self.encoding))
+
bom = "".encode(self.encoding)
for before, after in [("\U00010fff", "A"), ("[", "]"),
("A", "\U00010fff")]:
@@ -383,6 +392,7 @@ class ReadTest(MixInCheckStateHandling):
self.assertEqual(test_sequence.decode(self.encoding, "backslashreplace"),
before + backslashreplace + after)
+
class UTF32Test(ReadTest, unittest.TestCase):
encoding = "utf-32"
if sys.byteorder == 'little':
@@ -478,6 +488,7 @@ class UTF32Test(ReadTest, unittest.TestCase):
self.assertEqual('\U00010000' * 1024,
codecs.utf_32_decode(encoded_be)[0])
+
class UTF32LETest(ReadTest, unittest.TestCase):
encoding = "utf-32-le"
ill_formed_sequence = b"\x80\xdc\x00\x00"
@@ -523,6 +534,7 @@ class UTF32LETest(ReadTest, unittest.TestCase):
self.assertEqual('\U00010000' * 1024,
codecs.utf_32_le_decode(encoded)[0])
+
class UTF32BETest(ReadTest, unittest.TestCase):
encoding = "utf-32-be"
ill_formed_sequence = b"\x00\x00\xdc\x80"
@@ -747,6 +759,7 @@ class UTF8Test(ReadTest, unittest.TestCase):
encoding = "utf-8"
ill_formed_sequence = b"\xed\xb2\x80"
ill_formed_sequence_replace = "\ufffd" * 3
+ BOM = b''
def test_partial(self):
self.check_partial(
@@ -775,27 +788,49 @@ class UTF8Test(ReadTest, unittest.TestCase):
self.check_state_handling_decode(self.encoding,
u, u.encode(self.encoding))
+ def test_decode_error(self):
+ for data, error_handler, expected in (
+ (b'[\x80\xff]', 'ignore', '[]'),
+ (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
+ (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
+ (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
+ ):
+ with self.subTest(data=data, error_handler=error_handler,
+ expected=expected):
+ self.assertEqual(data.decode(self.encoding, error_handler),
+ expected)
+
def test_lone_surrogates(self):
super().test_lone_surrogates()
# not sure if this is making sense for
# UTF-16 and UTF-32
- self.assertEqual("[\uDC80]".encode('utf-8', "surrogateescape"),
- b'[\x80]')
+ self.assertEqual("[\uDC80]".encode(self.encoding, "surrogateescape"),
+ self.BOM + b'[\x80]')
+
+ with self.assertRaises(UnicodeEncodeError) as cm:
+ "[\uDC80\uD800\uDFFF]".encode(self.encoding, "surrogateescape")
+ exc = cm.exception
+ self.assertEqual(exc.object[exc.start:exc.end], '\uD800\uDFFF')
def test_surrogatepass_handler(self):
- self.assertEqual("abc\ud800def".encode("utf-8", "surrogatepass"),
- b"abc\xed\xa0\x80def")
- self.assertEqual(b"abc\xed\xa0\x80def".decode("utf-8", "surrogatepass"),
+ self.assertEqual("abc\ud800def".encode(self.encoding, "surrogatepass"),
+ self.BOM + b"abc\xed\xa0\x80def")
+ self.assertEqual("\U00010fff\uD800".encode(self.encoding, "surrogatepass"),
+ self.BOM + b"\xf0\x90\xbf\xbf\xed\xa0\x80")
+ self.assertEqual("[\uD800\uDC80]".encode(self.encoding, "surrogatepass"),
+ self.BOM + b'[\xed\xa0\x80\xed\xb2\x80]')
+
+ self.assertEqual(b"abc\xed\xa0\x80def".decode(self.encoding, "surrogatepass"),
"abc\ud800def")
- self.assertEqual("\U00010fff\uD800".encode("utf-8", "surrogatepass"),
- b"\xf0\x90\xbf\xbf\xed\xa0\x80")
- self.assertEqual(b"\xf0\x90\xbf\xbf\xed\xa0\x80".decode("utf-8", "surrogatepass"),
+ self.assertEqual(b"\xf0\x90\xbf\xbf\xed\xa0\x80".decode(self.encoding, "surrogatepass"),
"\U00010fff\uD800")
+
self.assertTrue(codecs.lookup_error("surrogatepass"))
with self.assertRaises(UnicodeDecodeError):
- b"abc\xed\xa0".decode("utf-8", "surrogatepass")
+ b"abc\xed\xa0".decode(self.encoding, "surrogatepass")
with self.assertRaises(UnicodeDecodeError):
- b"abc\xed\xa0z".decode("utf-8", "surrogatepass")
+ b"abc\xed\xa0z".decode(self.encoding, "surrogatepass")
+
@unittest.skipUnless(sys.platform == 'win32',
'cp65001 is a Windows-only codec')
@@ -1059,6 +1094,7 @@ class ReadBufferTest(unittest.TestCase):
class UTF8SigTest(UTF8Test, unittest.TestCase):
encoding = "utf-8-sig"
+ BOM = codecs.BOM_UTF8
def test_partial(self):
self.check_partial(
@@ -1194,6 +1230,7 @@ class EscapeDecodeTest(unittest.TestCase):
self.assertEqual(decode(br"[\x0]\x0", "ignore"), (b"[]", 8))
self.assertEqual(decode(br"[\x0]\x0", "replace"), (b"[?]?", 8))
+
class RecodingTest(unittest.TestCase):
def test_recoding(self):
f = io.BytesIO()
@@ -1313,6 +1350,7 @@ for i in punycode_testcases:
if len(i)!=2:
print(repr(i))
+
class PunycodeTest(unittest.TestCase):
def test_encode(self):
for uni, puny in punycode_testcases:
@@ -1332,6 +1370,7 @@ class PunycodeTest(unittest.TestCase):
puny = puny.decode("ascii").encode("ascii")
self.assertEqual(uni, puny.decode("punycode"))
+
class UnicodeInternalTest(unittest.TestCase):
@unittest.skipUnless(SIZEOF_WCHAR_T == 4, 'specific to 32-bit wchar_t')
def test_bug1251300(self):
@@ -1586,6 +1625,7 @@ class NameprepTest(unittest.TestCase):
except Exception as e:
raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
+
class IDNACodecTest(unittest.TestCase):
def test_builtin_decode(self):
self.assertEqual(str(b"python.org", "idna"), "python.org")
@@ -1672,6 +1712,7 @@ class IDNACodecTest(unittest.TestCase):
self.assertRaises(Exception,
b"python.org".decode, "idna", errors)
+
class CodecsModuleTest(unittest.TestCase):
def test_decode(self):
@@ -1780,6 +1821,7 @@ class CodecsModuleTest(unittest.TestCase):
self.assertRaises(UnicodeError,
codecs.decode, b'abc', 'undefined', errors)
+
class StreamReaderTest(unittest.TestCase):
def setUp(self):
@@ -1790,6 +1832,7 @@ class StreamReaderTest(unittest.TestCase):
f = self.reader(self.stream)
self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
+
class EncodedFileTest(unittest.TestCase):
def test_basic(self):
@@ -1920,6 +1963,7 @@ broken_unicode_with_stateful = [
"unicode_internal"
]
+
class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
def test_basics(self):
s = "abc123" # all codecs should be able to encode these
@@ -2082,6 +2126,7 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
self.check_state_handling_decode(encoding, u, u.encode(encoding))
self.check_state_handling_encode(encoding, u, u.encode(encoding))
+
class CharmapTest(unittest.TestCase):
def test_decode_with_string_map(self):
self.assertEqual(
@@ -2332,6 +2377,7 @@ class WithStmtTest(unittest.TestCase):
info.streamwriter, 'strict') as srw:
self.assertEqual(srw.read(), "\xfc")
+
class TypesTest(unittest.TestCase):
def test_decode_unicode(self):
# Most decoders don't accept unicode input
@@ -2622,6 +2668,7 @@ else:
bytes_transform_encodings.append("bz2_codec")
transform_aliases["bz2_codec"] = ["bz2"]
+
class TransformCodecTest(unittest.TestCase):
def test_basics(self):
@@ -3099,5 +3146,81 @@ class CodePageTest(unittest.TestCase):
self.assertEqual(decoded, ('abc', 3))
+class ASCIITest(unittest.TestCase):
+ def test_encode(self):
+ self.assertEqual('abc123'.encode('ascii'), b'abc123')
+
+ def test_encode_error(self):
+ for data, error_handler, expected in (
+ ('[\x80\xff\u20ac]', 'ignore', b'[]'),
+ ('[\x80\xff\u20ac]', 'replace', b'[???]'),
+ ('[\x80\xff\u20ac]', 'xmlcharrefreplace', b'[&#128;&#255;&#8364;]'),
+ ('[\x80\xff\u20ac\U000abcde]', 'backslashreplace',
+ b'[\\x80\\xff\\u20ac\\U000abcde]'),
+ ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
+ ):
+ with self.subTest(data=data, error_handler=error_handler,
+ expected=expected):
+ self.assertEqual(data.encode('ascii', error_handler),
+ expected)
+
+ def test_encode_surrogateescape_error(self):
+ with self.assertRaises(UnicodeEncodeError):
+ # the first character can be decoded, but not the second
+ '\udc80\xff'.encode('ascii', 'surrogateescape')
+
+ def test_decode(self):
+ self.assertEqual(b'abc'.decode('ascii'), 'abc')
+
+ def test_decode_error(self):
+ for data, error_handler, expected in (
+ (b'[\x80\xff]', 'ignore', '[]'),
+ (b'[\x80\xff]', 'replace', '[\ufffd\ufffd]'),
+ (b'[\x80\xff]', 'surrogateescape', '[\udc80\udcff]'),
+ (b'[\x80\xff]', 'backslashreplace', '[\\x80\\xff]'),
+ ):
+ with self.subTest(data=data, error_handler=error_handler,
+ expected=expected):
+ self.assertEqual(data.decode('ascii', error_handler),
+ expected)
+
+
+class Latin1Test(unittest.TestCase):
+ def test_encode(self):
+ for data, expected in (
+ ('abc', b'abc'),
+ ('\x80\xe9\xff', b'\x80\xe9\xff'),
+ ):
+ with self.subTest(data=data, expected=expected):
+ self.assertEqual(data.encode('latin1'), expected)
+
+ def test_encode_errors(self):
+ for data, error_handler, expected in (
+ ('[\u20ac\udc80]', 'ignore', b'[]'),
+ ('[\u20ac\udc80]', 'replace', b'[??]'),
+ ('[\u20ac\U000abcde]', 'backslashreplace',
+ b'[\\u20ac\\U000abcde]'),
+ ('[\u20ac\udc80]', 'xmlcharrefreplace', b'[&#8364;&#56448;]'),
+ ('[\udc80\udcff]', 'surrogateescape', b'[\x80\xff]'),
+ ):
+ with self.subTest(data=data, error_handler=error_handler,
+ expected=expected):
+ self.assertEqual(data.encode('latin1', error_handler),
+ expected)
+
+ def test_encode_surrogateescape_error(self):
+ with self.assertRaises(UnicodeEncodeError):
+ # the first character can be decoded, but not the second
+ '\udc80\u20ac'.encode('latin1', 'surrogateescape')
+
+ def test_decode(self):
+ for data, expected in (
+ (b'abc', 'abc'),
+ (b'[\x80\xff]', '[\x80\xff]'),
+ ):
+ with self.subTest(data=data, expected=expected):
+ self.assertEqual(data.decode('latin1'), expected)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py
index b9fdf79463..c14932c585 100644
--- a/Lib/test/test_collections.py
+++ b/Lib/test/test_collections.py
@@ -2016,6 +2016,14 @@ class OrderedDictTests:
od = OrderedDict(**d)
self.assertGreater(sys.getsizeof(od), sys.getsizeof(d))
+ def test_views(self):
+ OrderedDict = self.OrderedDict
+ # See http://bugs.python.org/issue24286
+ s = 'the quick brown fox jumped over a lazy dog yesterday before dawn'.split()
+ od = OrderedDict.fromkeys(s)
+ self.assertEqual(od.keys(), dict(od).keys())
+ self.assertEqual(od.items(), dict(od).items())
+
def test_override_update(self):
OrderedDict = self.OrderedDict
# Verify that subclasses can override update() without breaking __init__()
diff --git a/Lib/test/test_deque.py b/Lib/test/test_deque.py
index 87187161ab..c61e80bc2e 100644
--- a/Lib/test/test_deque.py
+++ b/Lib/test/test_deque.py
@@ -654,6 +654,15 @@ class TestBasic(unittest.TestCase):
self.assertNotEqual(id(d), id(e))
self.assertEqual(list(d), list(e))
+ for i in range(5):
+ for maxlen in range(-1, 6):
+ s = [random.random() for j in range(i)]
+ d = deque(s) if maxlen == -1 else deque(s, maxlen)
+ e = d.copy()
+ self.assertEqual(d, e)
+ self.assertEqual(d.maxlen, e.maxlen)
+ self.assertTrue(all(x is y for x, y in zip(d, e)))
+
def test_copy_method(self):
mut = [10]
d = deque([mut])
diff --git a/Lib/test/test_descr.py b/Lib/test/test_descr.py
index d096390110..d75109995e 100644
--- a/Lib/test/test_descr.py
+++ b/Lib/test/test_descr.py
@@ -4738,11 +4738,8 @@ class PicklingTests(unittest.TestCase):
return (args, kwargs)
obj = C3()
for proto in protocols:
- if proto >= 4:
+ if proto >= 2:
self._check_reduce(proto, obj, args, kwargs)
- elif proto >= 2:
- with self.assertRaises(ValueError):
- obj.__reduce_ex__(proto)
class C4:
def __getnewargs_ex__(self):
@@ -5061,10 +5058,6 @@ class PicklingTests(unittest.TestCase):
kwargs = getattr(cls, 'KWARGS', {})
obj = cls(*cls.ARGS, **kwargs)
proto = pickle_copier.proto
- if 2 <= proto < 4 and hasattr(cls, '__getnewargs_ex__'):
- with self.assertRaises(ValueError):
- pickle_copier.dumps(obj, proto)
- continue
objcopy = pickle_copier.copy(obj)
self._assert_is_copy(obj, objcopy)
# For test classes that supports this, make sure we didn't go
diff --git a/Lib/test/test_dictviews.py b/Lib/test/test_dictviews.py
index 787ef20c47..245f8c858b 100644
--- a/Lib/test/test_dictviews.py
+++ b/Lib/test/test_dictviews.py
@@ -1,3 +1,4 @@
+import collections
import copy
import pickle
import unittest
@@ -215,6 +216,27 @@ class DictSetTest(unittest.TestCase):
self.assertRaises((TypeError, pickle.PicklingError),
pickle.dumps, d.items(), proto)
+ def test_abc_registry(self):
+ d = dict(a=1)
+
+ self.assertIsInstance(d.keys(), collections.KeysView)
+ self.assertIsInstance(d.keys(), collections.MappingView)
+ self.assertIsInstance(d.keys(), collections.Set)
+ self.assertIsInstance(d.keys(), collections.Sized)
+ self.assertIsInstance(d.keys(), collections.Iterable)
+ self.assertIsInstance(d.keys(), collections.Container)
+
+ self.assertIsInstance(d.values(), collections.ValuesView)
+ self.assertIsInstance(d.values(), collections.MappingView)
+ self.assertIsInstance(d.values(), collections.Sized)
+
+ self.assertIsInstance(d.items(), collections.ItemsView)
+ self.assertIsInstance(d.items(), collections.MappingView)
+ self.assertIsInstance(d.items(), collections.Set)
+ self.assertIsInstance(d.items(), collections.Sized)
+ self.assertIsInstance(d.items(), collections.Iterable)
+ self.assertIsInstance(d.items(), collections.Container)
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_eintr.py b/Lib/test/test_eintr.py
index aabad835a0..75452f2d41 100644
--- a/Lib/test/test_eintr.py
+++ b/Lib/test/test_eintr.py
@@ -16,14 +16,8 @@ class EINTRTests(unittest.TestCase):
# Run the tester in a sub-process, to make sure there is only one
# thread (for reliable signal delivery).
tester = support.findfile("eintr_tester.py", subdir="eintrdata")
-
- if support.verbose:
- args = [sys.executable, tester]
- with subprocess.Popen(args) as proc:
- exitcode = proc.wait()
- self.assertEqual(exitcode, 0)
- else:
- script_helper.assert_python_ok(tester)
+ # use -u to try to get the full output if the test hangs or crash
+ script_helper.assert_python_ok("-u", tester)
if __name__ == "__main__":
diff --git a/Lib/test/test_enum.py b/Lib/test/test_enum.py
index 4b5d0d07bc..0f7b769b7a 100644
--- a/Lib/test/test_enum.py
+++ b/Lib/test/test_enum.py
@@ -270,6 +270,13 @@ class TestEnum(unittest.TestCase):
class Wrong(Enum):
_any_name_ = 9
+ def test_bool(self):
+ class Logic(Enum):
+ true = True
+ false = False
+ self.assertTrue(Logic.true)
+ self.assertFalse(Logic.false)
+
def test_contains(self):
Season = self.Season
self.assertIn(Season.AUTUMN, Season)
diff --git a/Lib/test/test_file.py b/Lib/test/test_file.py
index 4e392b770c..67c3d864d8 100644
--- a/Lib/test/test_file.py
+++ b/Lib/test/test_file.py
@@ -139,7 +139,7 @@ class OtherFileTests:
def testModeStrings(self):
# check invalid mode strings
- for mode in ("", "aU", "wU+"):
+ for mode in ("", "aU", "wU+", "U+", "+U", "rU+"):
try:
f = self.open(TESTFN, mode)
except ValueError:
diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py
index 9b13632591..9924fde13e 100644
--- a/Lib/test/test_format.py
+++ b/Lib/test/test_format.py
@@ -300,6 +300,8 @@ class FormatTest(unittest.TestCase):
testcommon(b"%c", 7, b"\x07")
testcommon(b"%c", b"Z", b"Z")
testcommon(b"%c", bytearray(b"Z"), b"Z")
+ testcommon(b"%5c", 65, b" A")
+ testcommon(b"%-5c", 65, b"A ")
# %b will insert a series of bytes, either from a type that supports
# the Py_buffer protocol, or something that has a __bytes__ method
class FakeBytes(object):
diff --git a/Lib/test/test_fstring.py b/Lib/test/test_fstring.py
new file mode 100644
index 0000000000..d6f781c846
--- /dev/null
+++ b/Lib/test/test_fstring.py
@@ -0,0 +1,734 @@
+import ast
+import types
+import decimal
+import unittest
+
+a_global = 'global variable'
+
+# You could argue that I'm too strict in looking for specific error
+# values with assertRaisesRegex, but without it it's way too easy to
+# make a syntax error in the test strings. Especially with all of the
+# triple quotes, raw strings, backslashes, etc. I think it's a
+# worthwhile tradeoff. When I switched to this method, I found many
+# examples where I wasn't testing what I thought I was.
+
+class TestCase(unittest.TestCase):
+ def assertAllRaise(self, exception_type, regex, error_strings):
+ for str in error_strings:
+ with self.subTest(str=str):
+ with self.assertRaisesRegex(exception_type, regex):
+ eval(str)
+
+ def test__format__lookup(self):
+ # Make sure __format__ is looked up on the type, not the instance.
+ class X:
+ def __format__(self, spec):
+ return 'class'
+
+ x = X()
+
+ # Add a bound __format__ method to the 'y' instance, but not
+ # the 'x' instance.
+ y = X()
+ y.__format__ = types.MethodType(lambda self, spec: 'instance', y)
+
+ self.assertEqual(f'{y}', format(y))
+ self.assertEqual(f'{y}', 'class')
+ self.assertEqual(format(x), format(y))
+
+ # __format__ is not called this way, but still make sure it
+ # returns what we expect (so we can make sure we're bypassing
+ # it).
+ self.assertEqual(x.__format__(''), 'class')
+ self.assertEqual(y.__format__(''), 'instance')
+
+ # This is how __format__ is actually called.
+ self.assertEqual(type(x).__format__(x, ''), 'class')
+ self.assertEqual(type(y).__format__(y, ''), 'class')
+
+ def test_ast(self):
+ # Inspired by http://bugs.python.org/issue24975
+ class X:
+ def __init__(self):
+ self.called = False
+ def __call__(self):
+ self.called = True
+ return 4
+ x = X()
+ expr = """
+a = 10
+f'{a * x()}'"""
+ t = ast.parse(expr)
+ c = compile(t, '', 'exec')
+
+ # Make sure x was not called.
+ self.assertFalse(x.called)
+
+ # Actually run the code.
+ exec(c)
+
+ # Make sure x was called.
+ self.assertTrue(x.called)
+
+ def test_literal_eval(self):
+ # With no expressions, an f-string is okay.
+ self.assertEqual(ast.literal_eval("f'x'"), 'x')
+ self.assertEqual(ast.literal_eval("f'x' 'y'"), 'xy')
+
+ # But this should raise an error.
+ with self.assertRaisesRegex(ValueError, 'malformed node or string'):
+ ast.literal_eval("f'x{3}'")
+
+ # As should this, which uses a different ast node
+ with self.assertRaisesRegex(ValueError, 'malformed node or string'):
+ ast.literal_eval("f'{3}'")
+
+ def test_ast_compile_time_concat(self):
+ x = ['']
+
+ expr = """x[0] = 'foo' f'{3}'"""
+ t = ast.parse(expr)
+ c = compile(t, '', 'exec')
+ exec(c)
+ self.assertEqual(x[0], 'foo3')
+
+ def test_literal(self):
+ self.assertEqual(f'', '')
+ self.assertEqual(f'a', 'a')
+ self.assertEqual(f' ', ' ')
+ self.assertEqual(f'\N{GREEK CAPITAL LETTER DELTA}',
+ '\N{GREEK CAPITAL LETTER DELTA}')
+ self.assertEqual(f'\N{GREEK CAPITAL LETTER DELTA}',
+ '\u0394')
+ self.assertEqual(f'\N{True}', '\u22a8')
+ self.assertEqual(rf'\N{True}', r'\NTrue')
+
+ def test_escape_order(self):
+ # note that hex(ord('{')) == 0x7b, so this
+ # string becomes f'a{4*10}b'
+ self.assertEqual(f'a\u007b4*10}b', 'a40b')
+ self.assertEqual(f'a\x7b4*10}b', 'a40b')
+ self.assertEqual(f'a\x7b4*10\N{RIGHT CURLY BRACKET}b', 'a40b')
+ self.assertEqual(f'{"a"!\N{LATIN SMALL LETTER R}}', "'a'")
+ self.assertEqual(f'{10\x3a02X}', '0A')
+ self.assertEqual(f'{10:02\N{LATIN CAPITAL LETTER X}}', '0A')
+
+ self.assertAllRaise(SyntaxError, "f-string: single '}' is not allowed",
+ [r"""f'a{\u007b4*10}b'""", # mis-matched brackets
+ ])
+ self.assertAllRaise(SyntaxError, 'unexpected character after line continuation character',
+ [r"""f'{"a"\!r}'""",
+ r"""f'{a\!r}'""",
+ ])
+
+ def test_unterminated_string(self):
+ self.assertAllRaise(SyntaxError, 'f-string: unterminated string',
+ [r"""f'{"x'""",
+ r"""f'{"x}'""",
+ r"""f'{("x'""",
+ r"""f'{("x}'""",
+ ])
+
+ def test_mismatched_parens(self):
+ self.assertAllRaise(SyntaxError, 'f-string: mismatched',
+ ["f'{((}'",
+ ])
+
+ def test_double_braces(self):
+ self.assertEqual(f'{{', '{')
+ self.assertEqual(f'a{{', 'a{')
+ self.assertEqual(f'{{b', '{b')
+ self.assertEqual(f'a{{b', 'a{b')
+ self.assertEqual(f'}}', '}')
+ self.assertEqual(f'a}}', 'a}')
+ self.assertEqual(f'}}b', '}b')
+ self.assertEqual(f'a}}b', 'a}b')
+
+ self.assertEqual(f'{{{10}', '{10')
+ self.assertEqual(f'}}{10}', '}10')
+ self.assertEqual(f'}}{{{10}', '}{10')
+ self.assertEqual(f'}}a{{{10}', '}a{10')
+
+ self.assertEqual(f'{10}{{', '10{')
+ self.assertEqual(f'{10}}}', '10}')
+ self.assertEqual(f'{10}}}{{', '10}{')
+ self.assertEqual(f'{10}}}a{{' '}', '10}a{}')
+
+ # Inside of strings, don't interpret doubled brackets.
+ self.assertEqual(f'{"{{}}"}', '{{}}')
+
+ self.assertAllRaise(TypeError, 'unhashable type',
+ ["f'{ {{}} }'", # dict in a set
+ ])
+
+ def test_compile_time_concat(self):
+ x = 'def'
+ self.assertEqual('abc' f'## {x}ghi', 'abc## defghi')
+ self.assertEqual('abc' f'{x}' 'ghi', 'abcdefghi')
+ self.assertEqual('abc' f'{x}' 'gh' f'i{x:4}', 'abcdefghidef ')
+ self.assertEqual('{x}' f'{x}', '{x}def')
+ self.assertEqual('{x' f'{x}', '{xdef')
+ self.assertEqual('{x}' f'{x}', '{x}def')
+ self.assertEqual('{{x}}' f'{x}', '{{x}}def')
+ self.assertEqual('{{x' f'{x}', '{{xdef')
+ self.assertEqual('x}}' f'{x}', 'x}}def')
+ self.assertEqual(f'{x}' 'x}}', 'defx}}')
+ self.assertEqual(f'{x}' '', 'def')
+ self.assertEqual('' f'{x}' '', 'def')
+ self.assertEqual('' f'{x}', 'def')
+ self.assertEqual(f'{x}' '2', 'def2')
+ self.assertEqual('1' f'{x}' '2', '1def2')
+ self.assertEqual('1' f'{x}', '1def')
+ self.assertEqual(f'{x}' f'-{x}', 'def-def')
+ self.assertEqual('' f'', '')
+ self.assertEqual('' f'' '', '')
+ self.assertEqual('' f'' '' f'', '')
+ self.assertEqual(f'', '')
+ self.assertEqual(f'' '', '')
+ self.assertEqual(f'' '' f'', '')
+ self.assertEqual(f'' '' f'' '', '')
+
+ self.assertAllRaise(SyntaxError, "f-string: expecting '}'",
+ ["f'{3' f'}'", # can't concat to get a valid f-string
+ ])
+
+ def test_comments(self):
+ # These aren't comments, since they're in strings.
+ d = {'#': 'hash'}
+ self.assertEqual(f'{"#"}', '#')
+ self.assertEqual(f'{d["#"]}', 'hash')
+
+ self.assertAllRaise(SyntaxError, "f-string cannot include '#'",
+ ["f'{1#}'", # error because the expression becomes "(1#)"
+ "f'{3(#)}'",
+ ])
+
+ def test_many_expressions(self):
+ # Create a string with many expressions in it. Note that
+ # because we have a space in here as a literal, we're actually
+ # going to use twice as many ast nodes: one for each literal
+ # plus one for each expression.
+ def build_fstr(n, extra=''):
+ return "f'" + ('{x} ' * n) + extra + "'"
+
+ x = 'X'
+ width = 1
+
+ # Test around 256.
+ for i in range(250, 260):
+ self.assertEqual(eval(build_fstr(i)), (x+' ')*i)
+
+ # Test concatenating 2 largs fstrings.
+ self.assertEqual(eval(build_fstr(255)*256), (x+' ')*(255*256))
+
+ s = build_fstr(253, '{x:{width}} ')
+ self.assertEqual(eval(s), (x+' ')*254)
+
+ # Test lots of expressions and constants, concatenated.
+ s = "f'{1}' 'x' 'y'" * 1024
+ self.assertEqual(eval(s), '1xy' * 1024)
+
+ def test_format_specifier_expressions(self):
+ width = 10
+ precision = 4
+ value = decimal.Decimal('12.34567')
+ self.assertEqual(f'result: {value:{width}.{precision}}', 'result: 12.35')
+ self.assertEqual(f'result: {value:{width!r}.{precision}}', 'result: 12.35')
+ self.assertEqual(f'result: {value:{width:0}.{precision:1}}', 'result: 12.35')
+ self.assertEqual(f'result: {value:{1}{0:0}.{precision:1}}', 'result: 12.35')
+ self.assertEqual(f'result: {value:{ 1}{ 0:0}.{ precision:1}}', 'result: 12.35')
+ self.assertEqual(f'{10:#{1}0x}', ' 0xa')
+ self.assertEqual(f'{10:{"#"}1{0}{"x"}}', ' 0xa')
+ self.assertEqual(f'{-10:-{"#"}1{0}x}', ' -0xa')
+ self.assertEqual(f'{-10:{"-"}#{1}0{"x"}}', ' -0xa')
+ self.assertEqual(f'{10:#{3 != {4:5} and width}x}', ' 0xa')
+
+ self.assertAllRaise(SyntaxError, "f-string: expecting '}'",
+ ["""f'{"s"!r{":10"}}'""",
+
+ # This looks like a nested format spec.
+ ])
+
+ self.assertAllRaise(SyntaxError, "invalid syntax",
+ [# Invalid sytax inside a nested spec.
+ "f'{4:{/5}}'",
+ ])
+
+ self.assertAllRaise(SyntaxError, "f-string: expressions nested too deeply",
+ [# Can't nest format specifiers.
+ "f'result: {value:{width:{0}}.{precision:1}}'",
+ ])
+
+ self.assertAllRaise(SyntaxError, 'f-string: invalid conversion character',
+ [# No expansion inside conversion or for
+ # the : or ! itself.
+ """f'{"s"!{"r"}}'""",
+ ])
+
+ def test_side_effect_order(self):
+ class X:
+ def __init__(self):
+ self.i = 0
+ def __format__(self, spec):
+ self.i += 1
+ return str(self.i)
+
+ x = X()
+ self.assertEqual(f'{x} {x}', '1 2')
+
+ def test_missing_expression(self):
+ self.assertAllRaise(SyntaxError, 'f-string: empty expression not allowed',
+ ["f'{}'",
+ "f'{ }'"
+ "f' {} '",
+ "f'{!r}'",
+ "f'{ !r}'",
+ "f'{10:{ }}'",
+ "f' { } '",
+ r"f'{\n}'",
+ r"f'{\n \n}'",
+
+ # Catch the empty expression before the
+ # invalid conversion.
+ "f'{!x}'",
+ "f'{ !xr}'",
+ "f'{!x:}'",
+ "f'{!x:a}'",
+ "f'{ !xr:}'",
+ "f'{ !xr:a}'",
+
+ "f'{!}'",
+ "f'{:}'",
+
+ # We find the empty expression before the
+ # missing closing brace.
+ "f'{!'",
+ "f'{!s:'",
+ "f'{:'",
+ "f'{:x'",
+ ])
+
+ def test_parens_in_expressions(self):
+ self.assertEqual(f'{3,}', '(3,)')
+
+ # Add these because when an expression is evaluated, parens
+ # are added around it. But we shouldn't go from an invalid
+ # expression to a valid one. The added parens are just
+ # supposed to allow whitespace (including newlines).
+ self.assertAllRaise(SyntaxError, 'invalid syntax',
+ ["f'{,}'",
+ "f'{,}'", # this is (,), which is an error
+ ])
+
+ self.assertAllRaise(SyntaxError, "f-string: expecting '}'",
+ ["f'{3)+(4}'",
+ ])
+
+ self.assertAllRaise(SyntaxError, 'EOL while scanning string literal',
+ ["f'{\n}'",
+ ])
+
+ def test_newlines_in_expressions(self):
+ self.assertEqual(f'{0}', '0')
+ self.assertEqual(f'{0\n}', '0')
+ self.assertEqual(f'{0\r}', '0')
+ self.assertEqual(f'{\n0\n}', '0')
+ self.assertEqual(f'{\r0\r}', '0')
+ self.assertEqual(f'{\n0\r}', '0')
+ self.assertEqual(f'{\n0}', '0')
+ self.assertEqual(f'{3+\n4}', '7')
+ self.assertEqual(f'{3+\\\n4}', '7')
+ self.assertEqual(rf'''{3+
+4}''', '7')
+ self.assertEqual(f'''{3+\
+4}''', '7')
+
+ self.assertAllRaise(SyntaxError, 'f-string: empty expression not allowed',
+ [r"f'{\n}'",
+ ])
+
+ def test_lambda(self):
+ x = 5
+ self.assertEqual(f'{(lambda y:x*y)("8")!r}', "'88888'")
+ self.assertEqual(f'{(lambda y:x*y)("8")!r:10}', "'88888' ")
+ self.assertEqual(f'{(lambda y:x*y)("8"):10}', "88888 ")
+
+ # lambda doesn't work without parens, because the colon
+ # makes the parser think it's a format_spec
+ self.assertAllRaise(SyntaxError, 'unexpected EOF while parsing',
+ ["f'{lambda x:x}'",
+ ])
+
+ def test_yield(self):
+ # Not terribly useful, but make sure the yield turns
+ # a function into a generator
+ def fn(y):
+ f'y:{yield y*2}'
+
+ g = fn(4)
+ self.assertEqual(next(g), 8)
+
+ def test_yield_send(self):
+ def fn(x):
+ yield f'x:{yield (lambda i: x * i)}'
+
+ g = fn(10)
+ the_lambda = next(g)
+ self.assertEqual(the_lambda(4), 40)
+ self.assertEqual(g.send('string'), 'x:string')
+
+ def test_expressions_with_triple_quoted_strings(self):
+ self.assertEqual(f"{'''x'''}", 'x')
+ self.assertEqual(f"{'''eric's'''}", "eric's")
+ self.assertEqual(f'{"""eric\'s"""}', "eric's")
+ self.assertEqual(f"{'''eric\"s'''}", 'eric"s')
+ self.assertEqual(f'{"""eric"s"""}', 'eric"s')
+
+ # Test concatenation within an expression
+ self.assertEqual(f'{"x" """eric"s""" "y"}', 'xeric"sy')
+ self.assertEqual(f'{"x" """eric"s"""}', 'xeric"s')
+ self.assertEqual(f'{"""eric"s""" "y"}', 'eric"sy')
+ self.assertEqual(f'{"""x""" """eric"s""" "y"}', 'xeric"sy')
+ self.assertEqual(f'{"""x""" """eric"s""" """y"""}', 'xeric"sy')
+ self.assertEqual(f'{r"""x""" """eric"s""" """y"""}', 'xeric"sy')
+
+ def test_multiple_vars(self):
+ x = 98
+ y = 'abc'
+ self.assertEqual(f'{x}{y}', '98abc')
+
+ self.assertEqual(f'X{x}{y}', 'X98abc')
+ self.assertEqual(f'{x}X{y}', '98Xabc')
+ self.assertEqual(f'{x}{y}X', '98abcX')
+
+ self.assertEqual(f'X{x}Y{y}', 'X98Yabc')
+ self.assertEqual(f'X{x}{y}Y', 'X98abcY')
+ self.assertEqual(f'{x}X{y}Y', '98XabcY')
+
+ self.assertEqual(f'X{x}Y{y}Z', 'X98YabcZ')
+
+ def test_closure(self):
+ def outer(x):
+ def inner():
+ return f'x:{x}'
+ return inner
+
+ self.assertEqual(outer('987')(), 'x:987')
+ self.assertEqual(outer(7)(), 'x:7')
+
+ def test_arguments(self):
+ y = 2
+ def f(x, width):
+ return f'x={x*y:{width}}'
+
+ self.assertEqual(f('foo', 10), 'x=foofoo ')
+ x = 'bar'
+ self.assertEqual(f(10, 10), 'x= 20')
+
+ def test_locals(self):
+ value = 123
+ self.assertEqual(f'v:{value}', 'v:123')
+
+ def test_missing_variable(self):
+ with self.assertRaises(NameError):
+ f'v:{value}'
+
+ def test_missing_format_spec(self):
+ class O:
+ def __format__(self, spec):
+ if not spec:
+ return '*'
+ return spec
+
+ self.assertEqual(f'{O():x}', 'x')
+ self.assertEqual(f'{O()}', '*')
+ self.assertEqual(f'{O():}', '*')
+
+ self.assertEqual(f'{3:}', '3')
+ self.assertEqual(f'{3!s:}', '3')
+
+ def test_global(self):
+ self.assertEqual(f'g:{a_global}', 'g:global variable')
+ self.assertEqual(f'g:{a_global!r}', "g:'global variable'")
+
+ a_local = 'local variable'
+ self.assertEqual(f'g:{a_global} l:{a_local}',
+ 'g:global variable l:local variable')
+ self.assertEqual(f'g:{a_global!r}',
+ "g:'global variable'")
+ self.assertEqual(f'g:{a_global} l:{a_local!r}',
+ "g:global variable l:'local variable'")
+
+ self.assertIn("module 'unittest' from", f'{unittest}')
+
+ def test_shadowed_global(self):
+ a_global = 'really a local'
+ self.assertEqual(f'g:{a_global}', 'g:really a local')
+ self.assertEqual(f'g:{a_global!r}', "g:'really a local'")
+
+ a_local = 'local variable'
+ self.assertEqual(f'g:{a_global} l:{a_local}',
+ 'g:really a local l:local variable')
+ self.assertEqual(f'g:{a_global!r}',
+ "g:'really a local'")
+ self.assertEqual(f'g:{a_global} l:{a_local!r}',
+ "g:really a local l:'local variable'")
+
+ def test_call(self):
+ def foo(x):
+ return 'x=' + str(x)
+
+ self.assertEqual(f'{foo(10)}', 'x=10')
+
+ def test_nested_fstrings(self):
+ y = 5
+ self.assertEqual(f'{f"{0}"*3}', '000')
+ self.assertEqual(f'{f"{y}"*3}', '555')
+ self.assertEqual(f'{f"{\'x\'}"*3}', 'xxx')
+
+ self.assertEqual(f"{r'x' f'{\"s\"}'}", 'xs')
+ self.assertEqual(f"{r'x'rf'{\"s\"}'}", 'xs')
+
+ def test_invalid_string_prefixes(self):
+ self.assertAllRaise(SyntaxError, 'unexpected EOF while parsing',
+ ["fu''",
+ "uf''",
+ "Fu''",
+ "fU''",
+ "Uf''",
+ "uF''",
+ "ufr''",
+ "urf''",
+ "fur''",
+ "fru''",
+ "rfu''",
+ "ruf''",
+ "FUR''",
+ "Fur''",
+ ])
+
+ def test_leading_trailing_spaces(self):
+ self.assertEqual(f'{ 3}', '3')
+ self.assertEqual(f'{ 3}', '3')
+ self.assertEqual(f'{\t3}', '3')
+ self.assertEqual(f'{\t\t3}', '3')
+ self.assertEqual(f'{3 }', '3')
+ self.assertEqual(f'{3 }', '3')
+ self.assertEqual(f'{3\t}', '3')
+ self.assertEqual(f'{3\t\t}', '3')
+
+ self.assertEqual(f'expr={ {x: y for x, y in [(1, 2), ]}}',
+ 'expr={1: 2}')
+ self.assertEqual(f'expr={ {x: y for x, y in [(1, 2), ]} }',
+ 'expr={1: 2}')
+
+ def test_character_name(self):
+ self.assertEqual(f'{4}\N{GREEK CAPITAL LETTER DELTA}{3}',
+ '4\N{GREEK CAPITAL LETTER DELTA}3')
+ self.assertEqual(f'{{}}\N{GREEK CAPITAL LETTER DELTA}{3}',
+ '{}\N{GREEK CAPITAL LETTER DELTA}3')
+
+ def test_not_equal(self):
+ # There's a special test for this because there's a special
+ # case in the f-string parser to look for != as not ending an
+ # expression. Normally it would, while looking for !s or !r.
+
+ self.assertEqual(f'{3!=4}', 'True')
+ self.assertEqual(f'{3!=4:}', 'True')
+ self.assertEqual(f'{3!=4!s}', 'True')
+ self.assertEqual(f'{3!=4!s:.3}', 'Tru')
+
+ def test_conversions(self):
+ self.assertEqual(f'{3.14:10.10}', ' 3.14')
+ self.assertEqual(f'{3.14!s:10.10}', '3.14 ')
+ self.assertEqual(f'{3.14!r:10.10}', '3.14 ')
+ self.assertEqual(f'{3.14!a:10.10}', '3.14 ')
+
+ self.assertEqual(f'{"a"}', 'a')
+ self.assertEqual(f'{"a"!r}', "'a'")
+ self.assertEqual(f'{"a"!a}', "'a'")
+
+ # Not a conversion.
+ self.assertEqual(f'{"a!r"}', "a!r")
+
+ # Not a conversion, but show that ! is allowed in a format spec.
+ self.assertEqual(f'{3.14:!<10.10}', '3.14!!!!!!')
+
+ self.assertEqual(f'{"\N{GREEK CAPITAL LETTER DELTA}"}', '\u0394')
+ self.assertEqual(f'{"\N{GREEK CAPITAL LETTER DELTA}"!r}', "'\u0394'")
+ self.assertEqual(f'{"\N{GREEK CAPITAL LETTER DELTA}"!a}', "'\\u0394'")
+
+ self.assertAllRaise(SyntaxError, 'f-string: invalid conversion character',
+ ["f'{3!g}'",
+ "f'{3!A}'",
+ "f'{3!A}'",
+ "f'{3!A}'",
+ "f'{3!!}'",
+ "f'{3!:}'",
+ "f'{3!\N{GREEK CAPITAL LETTER DELTA}}'",
+ "f'{3! s}'", # no space before conversion char
+ "f'{x!\\x00:.<10}'",
+ ])
+
+ self.assertAllRaise(SyntaxError, "f-string: expecting '}'",
+ ["f'{x!s{y}}'",
+ "f'{3!ss}'",
+ "f'{3!ss:}'",
+ "f'{3!ss:s}'",
+ ])
+
+ def test_assignment(self):
+ self.assertAllRaise(SyntaxError, 'invalid syntax',
+ ["f'' = 3",
+ "f'{0}' = x",
+ "f'{x}' = x",
+ ])
+
+ def test_del(self):
+ self.assertAllRaise(SyntaxError, 'invalid syntax',
+ ["del f''",
+ "del '' f''",
+ ])
+
+ def test_mismatched_braces(self):
+ self.assertAllRaise(SyntaxError, "f-string: single '}' is not allowed",
+ ["f'{{}'",
+ "f'{{}}}'",
+ "f'}'",
+ "f'x}'",
+ "f'x}x'",
+
+ # Can't have { or } in a format spec.
+ "f'{3:}>10}'",
+ r"f'{3:\\}>10}'",
+ "f'{3:}}>10}'",
+ ])
+
+ self.assertAllRaise(SyntaxError, "f-string: expecting '}'",
+ ["f'{3:{{>10}'",
+ "f'{3'",
+ "f'{3!'",
+ "f'{3:'",
+ "f'{3!s'",
+ "f'{3!s:'",
+ "f'{3!s:3'",
+ "f'x{'",
+ "f'x{x'",
+ "f'{3:s'",
+ "f'{{{'",
+ "f'{{}}{'",
+ "f'{'",
+ ])
+
+ self.assertAllRaise(SyntaxError, 'invalid syntax',
+ [r"f'{3:\\{>10}'",
+ ])
+
+ # But these are just normal strings.
+ self.assertEqual(f'{"{"}', '{')
+ self.assertEqual(f'{"}"}', '}')
+ self.assertEqual(f'{3:{"}"}>10}', '}}}}}}}}}3')
+ self.assertEqual(f'{2:{"{"}>10}', '{{{{{{{{{2')
+
+ def test_if_conditional(self):
+ # There's special logic in compile.c to test if the
+ # conditional for an if (and while) are constants. Exercise
+ # that code.
+
+ def test_fstring(x, expected):
+ flag = 0
+ if f'{x}':
+ flag = 1
+ else:
+ flag = 2
+ self.assertEqual(flag, expected)
+
+ def test_concat_empty(x, expected):
+ flag = 0
+ if '' f'{x}':
+ flag = 1
+ else:
+ flag = 2
+ self.assertEqual(flag, expected)
+
+ def test_concat_non_empty(x, expected):
+ flag = 0
+ if ' ' f'{x}':
+ flag = 1
+ else:
+ flag = 2
+ self.assertEqual(flag, expected)
+
+ test_fstring('', 2)
+ test_fstring(' ', 1)
+
+ test_concat_empty('', 2)
+ test_concat_empty(' ', 1)
+
+ test_concat_non_empty('', 1)
+ test_concat_non_empty(' ', 1)
+
+ def test_empty_format_specifier(self):
+ x = 'test'
+ self.assertEqual(f'{x}', 'test')
+ self.assertEqual(f'{x:}', 'test')
+ self.assertEqual(f'{x!s:}', 'test')
+ self.assertEqual(f'{x!r:}', "'test'")
+
+ def test_str_format_differences(self):
+ d = {'a': 'string',
+ 0: 'integer',
+ }
+ a = 0
+ self.assertEqual(f'{d[0]}', 'integer')
+ self.assertEqual(f'{d["a"]}', 'string')
+ self.assertEqual(f'{d[a]}', 'integer')
+ self.assertEqual('{d[a]}'.format(d=d), 'string')
+ self.assertEqual('{d[0]}'.format(d=d), 'integer')
+
+ def test_invalid_expressions(self):
+ self.assertAllRaise(SyntaxError, 'invalid syntax',
+ [r"f'{a[4)}'",
+ r"f'{a(4]}'",
+ ])
+
+ def test_loop(self):
+ for i in range(1000):
+ self.assertEqual(f'i:{i}', 'i:' + str(i))
+
+ def test_dict(self):
+ d = {'"': 'dquote',
+ "'": 'squote',
+ 'foo': 'bar',
+ }
+ self.assertEqual(f'{d["\'"]}', 'squote')
+ self.assertEqual(f"{d['\"']}", 'dquote')
+
+ self.assertEqual(f'''{d["'"]}''', 'squote')
+ self.assertEqual(f"""{d['"']}""", 'dquote')
+
+ self.assertEqual(f'{d["foo"]}', 'bar')
+ self.assertEqual(f"{d['foo']}", 'bar')
+ self.assertEqual(f'{d[\'foo\']}', 'bar')
+ self.assertEqual(f"{d[\"foo\"]}", 'bar')
+
+ def test_escaped_quotes(self):
+ d = {'"': 'a',
+ "'": 'b'}
+
+ self.assertEqual(fr"{d['\"']}", 'a')
+ self.assertEqual(fr'{d["\'"]}', 'b')
+ self.assertEqual(fr"{'\"'}", '"')
+ self.assertEqual(fr'{"\'"}', "'")
+ self.assertEqual(f'{"\\"3"}', '"3')
+
+ self.assertAllRaise(SyntaxError, 'f-string: unterminated string',
+ [r'''f'{"""\\}' ''', # Backslash at end of expression
+ ])
+ self.assertAllRaise(SyntaxError, 'unexpected character after line continuation',
+ [r"rf'{3\}'",
+ ])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py
index ec3d7833f7..8f8d71ce85 100644
--- a/Lib/test/test_grammar.py
+++ b/Lib/test/test_grammar.py
@@ -295,6 +295,10 @@ class GrammarTests(unittest.TestCase):
pos2key2dict(1,2,k2=100,tokwarg1=100,tokwarg2=200)
pos2key2dict(1,2,tokwarg1=100,tokwarg2=200, k2=100)
+ self.assertRaises(SyntaxError, eval, "def f(*): pass")
+ self.assertRaises(SyntaxError, eval, "def f(*,): pass")
+ self.assertRaises(SyntaxError, eval, "def f(*, **kwds): pass")
+
# keyword arguments after *arglist
def f(*args, **kwargs):
return args, kwargs
@@ -352,6 +356,23 @@ class GrammarTests(unittest.TestCase):
check_syntax_error(self, "f(*g(1=2))")
check_syntax_error(self, "f(**g(1=2))")
+ # Check trailing commas are permitted in funcdef argument list
+ def f(a,): pass
+ def f(*args,): pass
+ def f(**kwds,): pass
+ def f(a, *args,): pass
+ def f(a, **kwds,): pass
+ def f(*args, b,): pass
+ def f(*, b,): pass
+ def f(*args, **kwds,): pass
+ def f(a, *args, b,): pass
+ def f(a, *, b,): pass
+ def f(a, *args, **kwds,): pass
+ def f(*args, b, **kwds,): pass
+ def f(*, b, **kwds,): pass
+ def f(a, *args, b, **kwds,): pass
+ def f(a, *, b, **kwds,): pass
+
def test_lambdef(self):
### lambdef: 'lambda' [varargslist] ':' test
l1 = lambda : 0
@@ -370,6 +391,23 @@ class GrammarTests(unittest.TestCase):
self.assertEqual(l6(1,2), 1+2+20)
self.assertEqual(l6(1,2,k=10), 1+2+10)
+ # check that trailing commas are permitted
+ l10 = lambda a,: 0
+ l11 = lambda *args,: 0
+ l12 = lambda **kwds,: 0
+ l13 = lambda a, *args,: 0
+ l14 = lambda a, **kwds,: 0
+ l15 = lambda *args, b,: 0
+ l16 = lambda *, b,: 0
+ l17 = lambda *args, **kwds,: 0
+ l18 = lambda a, *args, b,: 0
+ l19 = lambda a, *, b,: 0
+ l20 = lambda a, *args, **kwds,: 0
+ l21 = lambda *args, b, **kwds,: 0
+ l22 = lambda *, b, **kwds,: 0
+ l23 = lambda a, *args, b, **kwds,: 0
+ l24 = lambda a, *, b, **kwds,: 0
+
### stmt: simple_stmt | compound_stmt
# Tested below
diff --git a/Lib/test/test_imp.py b/Lib/test/test_imp.py
index ee9ee1ad8c..efb03840c4 100644
--- a/Lib/test/test_imp.py
+++ b/Lib/test/test_imp.py
@@ -12,7 +12,7 @@ from test import support
import unittest
import warnings
with warnings.catch_warnings():
- warnings.simplefilter('ignore', PendingDeprecationWarning)
+ warnings.simplefilter('ignore', DeprecationWarning)
import imp
diff --git a/Lib/test/test_inspect.py b/Lib/test/test_inspect.py
index 69ddb514d6..a88e7fdbd8 100644
--- a/Lib/test/test_inspect.py
+++ b/Lib/test/test_inspect.py
@@ -38,7 +38,7 @@ from test.test_import import _ready_to_import
# ismodule, isclass, ismethod, isfunction, istraceback, isframe, iscode,
# isbuiltin, isroutine, isgenerator, isgeneratorfunction, getmembers,
# getdoc, getfile, getmodule, getsourcefile, getcomments, getsource,
-# getclasstree, getargspec, getargvalues, formatargspec, formatargvalues,
+# getclasstree, getargvalues, formatargspec, formatargvalues,
# currentframe, stack, trace, isdatadescriptor
# NOTE: There are some additional tests relating to interaction with
@@ -628,18 +628,6 @@ class TestClassesAndFunctions(unittest.TestCase):
got = inspect.getmro(D)
self.assertEqual(expected, got)
- def assertArgSpecEquals(self, routine, args_e, varargs_e=None,
- varkw_e=None, defaults_e=None, formatted=None):
- with self.assertWarns(DeprecationWarning):
- args, varargs, varkw, defaults = inspect.getargspec(routine)
- self.assertEqual(args, args_e)
- self.assertEqual(varargs, varargs_e)
- self.assertEqual(varkw, varkw_e)
- self.assertEqual(defaults, defaults_e)
- if formatted is not None:
- self.assertEqual(inspect.formatargspec(args, varargs, varkw, defaults),
- formatted)
-
def assertFullArgSpecEquals(self, routine, args_e, varargs_e=None,
varkw_e=None, defaults_e=None,
kwonlyargs_e=[], kwonlydefaults_e=None,
@@ -658,23 +646,6 @@ class TestClassesAndFunctions(unittest.TestCase):
kwonlyargs, kwonlydefaults, ann),
formatted)
- def test_getargspec(self):
- self.assertArgSpecEquals(mod.eggs, ['x', 'y'], formatted='(x, y)')
-
- self.assertArgSpecEquals(mod.spam,
- ['a', 'b', 'c', 'd', 'e', 'f'],
- 'g', 'h', (3, 4, 5),
- '(a, b, c, d=3, e=4, f=5, *g, **h)')
-
- self.assertRaises(ValueError, self.assertArgSpecEquals,
- mod2.keyworded, [])
-
- self.assertRaises(ValueError, self.assertArgSpecEquals,
- mod2.annotated, [])
- self.assertRaises(ValueError, self.assertArgSpecEquals,
- mod2.keyword_only_arg, [])
-
-
def test_getfullargspec(self):
self.assertFullArgSpecEquals(mod2.keyworded, [], varargs_e='arg1',
kwonlyargs_e=['arg2'],
@@ -688,20 +659,19 @@ class TestClassesAndFunctions(unittest.TestCase):
kwonlyargs_e=['arg'],
formatted='(*, arg)')
- def test_argspec_api_ignores_wrapped(self):
+ def test_fullargspec_api_ignores_wrapped(self):
# Issue 20684: low level introspection API must ignore __wrapped__
@functools.wraps(mod.spam)
def ham(x, y):
pass
# Basic check
- self.assertArgSpecEquals(ham, ['x', 'y'], formatted='(x, y)')
self.assertFullArgSpecEquals(ham, ['x', 'y'], formatted='(x, y)')
self.assertFullArgSpecEquals(functools.partial(ham),
['x', 'y'], formatted='(x, y)')
# Other variants
def check_method(f):
- self.assertArgSpecEquals(f, ['self', 'x', 'y'],
- formatted='(self, x, y)')
+ self.assertFullArgSpecEquals(f, ['self', 'x', 'y'],
+ formatted='(self, x, y)')
class C:
@functools.wraps(mod.spam)
def ham(self, x, y):
@@ -779,11 +749,11 @@ class TestClassesAndFunctions(unittest.TestCase):
with self.assertRaises(TypeError):
inspect.getfullargspec(builtin)
- def test_getargspec_method(self):
+ def test_getfullargspec_method(self):
class A(object):
def m(self):
pass
- self.assertArgSpecEquals(A.m, ['self'])
+ self.assertFullArgSpecEquals(A.m, ['self'])
def test_classify_newstyle(self):
class A(object):
diff --git a/Lib/test/test_itertools.py b/Lib/test/test_itertools.py
index 5b3ba7e297..9e55b2a359 100644
--- a/Lib/test/test_itertools.py
+++ b/Lib/test/test_itertools.py
@@ -613,6 +613,56 @@ class TestBasicOps(unittest.TestCase):
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
self.pickletest(proto, cycle('abc'))
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ # test with partial consumed input iterable
+ it = iter('abcde')
+ c = cycle(it)
+ _ = [next(c) for i in range(2)] # consume 2 of 5 inputs
+ p = pickle.dumps(c, proto)
+ d = pickle.loads(p) # rebuild the cycle object
+ self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab'))
+
+ # test with completely consumed input iterable
+ it = iter('abcde')
+ c = cycle(it)
+ _ = [next(c) for i in range(7)] # consume 7 of 5 inputs
+ p = pickle.dumps(c, proto)
+ d = pickle.loads(p) # rebuild the cycle object
+ self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab'))
+
+ def test_cycle_setstate(self):
+ # Verify both modes for restoring state
+
+ # Mode 0 is efficient. It uses an incompletely consumed input
+ # iterator to build a cycle object and then passes in state with
+ # a list of previously consumed values. There is no data
+ # overlap bewteen the two.
+ c = cycle('defg')
+ c.__setstate__((list('abc'), 0))
+ self.assertEqual(take(20, c), list('defgabcdefgabcdefgab'))
+
+ # Mode 1 is inefficient. It starts with a cycle object built
+ # from an iterator over the remaining elements in a partial
+ # cycle and then passes in state with all of the previously
+ # seen values (this overlaps values included in the iterator).
+ c = cycle('defg')
+ c.__setstate__((list('abcdefg'), 1))
+ self.assertEqual(take(20, c), list('defgabcdefgabcdefgab'))
+
+ # The first argument to setstate needs to be a tuple
+ with self.assertRaises(SystemError):
+ cycle('defg').__setstate__([list('abcdefg'), 0])
+
+ # The first argument in the setstate tuple must be a list
+ with self.assertRaises(TypeError):
+ c = cycle('defg')
+ c.__setstate__((dict.fromkeys('defg'), 0))
+ take(20, c)
+
+ # The first argument in the setstate tuple must be a list
+ with self.assertRaises(TypeError):
+ cycle('defg').__setstate__((list('abcdefg'), 'x'))
+
def test_groupby(self):
# Check whether it accepts arguments correctly
self.assertEqual([], list(groupby([])))
diff --git a/Lib/test/test_linecache.py b/Lib/test/test_linecache.py
index 21ef738932..240db7f874 100644
--- a/Lib/test/test_linecache.py
+++ b/Lib/test/test_linecache.py
@@ -3,6 +3,8 @@
import linecache
import unittest
import os.path
+import tempfile
+import tokenize
from test import support
@@ -10,8 +12,6 @@ FILENAME = linecache.__file__
NONEXISTENT_FILENAME = FILENAME + '.missing'
INVALID_NAME = '!@$)(!@#_1'
EMPTY = ''
-TESTS = 'inspect_fodder inspect_fodder2 mapping_tests'
-TESTS = TESTS.split()
TEST_PATH = os.path.dirname(__file__)
MODULES = "linecache abc".split()
MODULE_PATH = os.path.dirname(FILENAME)
@@ -37,6 +37,65 @@ def f():
return 3''' # No ending newline
+class TempFile:
+
+ def setUp(self):
+ super().setUp()
+ with tempfile.NamedTemporaryFile(delete=False) as fp:
+ self.file_name = fp.name
+ fp.write(self.file_byte_string)
+ self.addCleanup(support.unlink, self.file_name)
+
+
+class GetLineTestsGoodData(TempFile):
+ # file_list = ['list\n', 'of\n', 'good\n', 'strings\n']
+
+ def setUp(self):
+ self.file_byte_string = ''.join(self.file_list).encode('utf-8')
+ super().setUp()
+
+ def test_getline(self):
+ with tokenize.open(self.file_name) as fp:
+ for index, line in enumerate(fp):
+ if not line.endswith('\n'):
+ line += '\n'
+
+ cached_line = linecache.getline(self.file_name, index + 1)
+ self.assertEqual(line, cached_line)
+
+ def test_getlines(self):
+ lines = linecache.getlines(self.file_name)
+ self.assertEqual(lines, self.file_list)
+
+
+class GetLineTestsBadData(TempFile):
+ # file_byte_string = b'Bad data goes here'
+
+ def test_getline(self):
+ self.assertRaises((SyntaxError, UnicodeDecodeError),
+ linecache.getline, self.file_name, 1)
+
+ def test_getlines(self):
+ self.assertRaises((SyntaxError, UnicodeDecodeError),
+ linecache.getlines, self.file_name)
+
+
+class EmptyFile(GetLineTestsGoodData, unittest.TestCase):
+ file_list = []
+
+
+class SingleEmptyLine(GetLineTestsGoodData, unittest.TestCase):
+ file_list = ['\n']
+
+
+class GoodUnicode(GetLineTestsGoodData, unittest.TestCase):
+ file_list = ['á\n', 'b\n', 'abcdef\n', 'ááááá\n']
+
+
+class BadUnicode(GetLineTestsBadData, unittest.TestCase):
+ file_byte_string = b'\x80abc'
+
+
class LineCacheTests(unittest.TestCase):
def test_getline(self):
@@ -53,13 +112,6 @@ class LineCacheTests(unittest.TestCase):
self.assertEqual(getline(EMPTY, 1), EMPTY)
self.assertEqual(getline(INVALID_NAME, 1), EMPTY)
- # Check whether lines correspond to those from file iteration
- for entry in TESTS:
- filename = os.path.join(TEST_PATH, entry) + '.py'
- with open(filename) as file:
- for index, line in enumerate(file):
- self.assertEqual(line, getline(filename, index + 1))
-
# Check module loading
for entry in MODULES:
filename = os.path.join(MODULE_PATH, entry) + '.py'
@@ -80,12 +132,13 @@ class LineCacheTests(unittest.TestCase):
def test_clearcache(self):
cached = []
- for entry in TESTS:
- filename = os.path.join(TEST_PATH, entry) + '.py'
+ for entry in MODULES:
+ filename = os.path.join(MODULE_PATH, entry) + '.py'
cached.append(filename)
linecache.getline(filename, 1)
# Are all files cached?
+ self.assertNotEqual(cached, [])
cached_empty = [fn for fn in cached if fn not in linecache.cache]
self.assertEqual(cached_empty, [])
diff --git a/Lib/test/test_operator.py b/Lib/test/test_operator.py
index da9c8ef34f..54fd1f4e52 100644
--- a/Lib/test/test_operator.py
+++ b/Lib/test/test_operator.py
@@ -120,63 +120,63 @@ class OperatorTestCase:
operator = self.module
self.assertRaises(TypeError, operator.add)
self.assertRaises(TypeError, operator.add, None, None)
- self.assertTrue(operator.add(3, 4) == 7)
+ self.assertEqual(operator.add(3, 4), 7)
def test_bitwise_and(self):
operator = self.module
self.assertRaises(TypeError, operator.and_)
self.assertRaises(TypeError, operator.and_, None, None)
- self.assertTrue(operator.and_(0xf, 0xa) == 0xa)
+ self.assertEqual(operator.and_(0xf, 0xa), 0xa)
def test_concat(self):
operator = self.module
self.assertRaises(TypeError, operator.concat)
self.assertRaises(TypeError, operator.concat, None, None)
- self.assertTrue(operator.concat('py', 'thon') == 'python')
- self.assertTrue(operator.concat([1, 2], [3, 4]) == [1, 2, 3, 4])
- self.assertTrue(operator.concat(Seq1([5, 6]), Seq1([7])) == [5, 6, 7])
- self.assertTrue(operator.concat(Seq2([5, 6]), Seq2([7])) == [5, 6, 7])
+ self.assertEqual(operator.concat('py', 'thon'), 'python')
+ self.assertEqual(operator.concat([1, 2], [3, 4]), [1, 2, 3, 4])
+ self.assertEqual(operator.concat(Seq1([5, 6]), Seq1([7])), [5, 6, 7])
+ self.assertEqual(operator.concat(Seq2([5, 6]), Seq2([7])), [5, 6, 7])
self.assertRaises(TypeError, operator.concat, 13, 29)
def test_countOf(self):
operator = self.module
self.assertRaises(TypeError, operator.countOf)
self.assertRaises(TypeError, operator.countOf, None, None)
- self.assertTrue(operator.countOf([1, 2, 1, 3, 1, 4], 3) == 1)
- self.assertTrue(operator.countOf([1, 2, 1, 3, 1, 4], 5) == 0)
+ self.assertEqual(operator.countOf([1, 2, 1, 3, 1, 4], 3), 1)
+ self.assertEqual(operator.countOf([1, 2, 1, 3, 1, 4], 5), 0)
def test_delitem(self):
operator = self.module
a = [4, 3, 2, 1]
self.assertRaises(TypeError, operator.delitem, a)
self.assertRaises(TypeError, operator.delitem, a, None)
- self.assertTrue(operator.delitem(a, 1) is None)
- self.assertTrue(a == [4, 2, 1])
+ self.assertIsNone(operator.delitem(a, 1))
+ self.assertEqual(a, [4, 2, 1])
def test_floordiv(self):
operator = self.module
self.assertRaises(TypeError, operator.floordiv, 5)
self.assertRaises(TypeError, operator.floordiv, None, None)
- self.assertTrue(operator.floordiv(5, 2) == 2)
+ self.assertEqual(operator.floordiv(5, 2), 2)
def test_truediv(self):
operator = self.module
self.assertRaises(TypeError, operator.truediv, 5)
self.assertRaises(TypeError, operator.truediv, None, None)
- self.assertTrue(operator.truediv(5, 2) == 2.5)
+ self.assertEqual(operator.truediv(5, 2), 2.5)
def test_getitem(self):
operator = self.module
a = range(10)
self.assertRaises(TypeError, operator.getitem)
self.assertRaises(TypeError, operator.getitem, a, None)
- self.assertTrue(operator.getitem(a, 2) == 2)
+ self.assertEqual(operator.getitem(a, 2), 2)
def test_indexOf(self):
operator = self.module
self.assertRaises(TypeError, operator.indexOf)
self.assertRaises(TypeError, operator.indexOf, None, None)
- self.assertTrue(operator.indexOf([4, 3, 2, 1], 3) == 1)
+ self.assertEqual(operator.indexOf([4, 3, 2, 1], 3), 1)
self.assertRaises(ValueError, operator.indexOf, [4, 3, 2, 1], 0)
def test_invert(self):
@@ -189,21 +189,21 @@ class OperatorTestCase:
operator = self.module
self.assertRaises(TypeError, operator.lshift)
self.assertRaises(TypeError, operator.lshift, None, 42)
- self.assertTrue(operator.lshift(5, 1) == 10)
- self.assertTrue(operator.lshift(5, 0) == 5)
+ self.assertEqual(operator.lshift(5, 1), 10)
+ self.assertEqual(operator.lshift(5, 0), 5)
self.assertRaises(ValueError, operator.lshift, 2, -1)
def test_mod(self):
operator = self.module
self.assertRaises(TypeError, operator.mod)
self.assertRaises(TypeError, operator.mod, None, 42)
- self.assertTrue(operator.mod(5, 2) == 1)
+ self.assertEqual(operator.mod(5, 2), 1)
def test_mul(self):
operator = self.module
self.assertRaises(TypeError, operator.mul)
self.assertRaises(TypeError, operator.mul, None, None)
- self.assertTrue(operator.mul(5, 2) == 10)
+ self.assertEqual(operator.mul(5, 2), 10)
def test_matmul(self):
operator = self.module
@@ -227,7 +227,7 @@ class OperatorTestCase:
operator = self.module
self.assertRaises(TypeError, operator.or_)
self.assertRaises(TypeError, operator.or_, None, None)
- self.assertTrue(operator.or_(0xa, 0x5) == 0xf)
+ self.assertEqual(operator.or_(0xa, 0x5), 0xf)
def test_pos(self):
operator = self.module
@@ -250,8 +250,8 @@ class OperatorTestCase:
operator = self.module
self.assertRaises(TypeError, operator.rshift)
self.assertRaises(TypeError, operator.rshift, None, 42)
- self.assertTrue(operator.rshift(5, 1) == 2)
- self.assertTrue(operator.rshift(5, 0) == 5)
+ self.assertEqual(operator.rshift(5, 1), 2)
+ self.assertEqual(operator.rshift(5, 0), 5)
self.assertRaises(ValueError, operator.rshift, 2, -1)
def test_contains(self):
@@ -266,15 +266,15 @@ class OperatorTestCase:
a = list(range(3))
self.assertRaises(TypeError, operator.setitem, a)
self.assertRaises(TypeError, operator.setitem, a, None, None)
- self.assertTrue(operator.setitem(a, 0, 2) is None)
- self.assertTrue(a == [2, 1, 2])
+ self.assertIsNone(operator.setitem(a, 0, 2))
+ self.assertEqual(a, [2, 1, 2])
self.assertRaises(IndexError, operator.setitem, a, 4, 2)
def test_sub(self):
operator = self.module
self.assertRaises(TypeError, operator.sub)
self.assertRaises(TypeError, operator.sub, None, None)
- self.assertTrue(operator.sub(5, 2) == 3)
+ self.assertEqual(operator.sub(5, 2), 3)
def test_truth(self):
operator = self.module
@@ -292,7 +292,7 @@ class OperatorTestCase:
operator = self.module
self.assertRaises(TypeError, operator.xor)
self.assertRaises(TypeError, operator.xor, None, None)
- self.assertTrue(operator.xor(0xb, 0xc) == 0x7)
+ self.assertEqual(operator.xor(0xb, 0xc), 0x7)
def test_is(self):
operator = self.module
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 1e67e7ab19..e8a7c97021 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -226,15 +226,10 @@ class FileTests(unittest.TestCase):
# Test attributes on return values from os.*stat* family.
class StatAttributeTests(unittest.TestCase):
def setUp(self):
- os.mkdir(support.TESTFN)
- self.fname = os.path.join(support.TESTFN, "f1")
- f = open(self.fname, 'wb')
- f.write(b"ABC")
- f.close()
-
- def tearDown(self):
- os.unlink(self.fname)
- os.rmdir(support.TESTFN)
+ self.fname = support.TESTFN
+ self.addCleanup(support.unlink, self.fname)
+ with open(self.fname, 'wb') as fp:
+ fp.write(b"ABC")
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def check_stat_attributes(self, fname):
@@ -426,7 +421,11 @@ class StatAttributeTests(unittest.TestCase):
0)
# test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
- result = os.stat(support.TESTFN)
+ dirname = support.TESTFN + "dir"
+ os.mkdir(dirname)
+ self.addCleanup(os.rmdir, dirname)
+
+ result = os.stat(dirname)
self.check_file_attributes(result)
self.assertEqual(
result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
diff --git a/Lib/test/test_pyclbr.py b/Lib/test/test_pyclbr.py
index cab430b4bd..48bd72590f 100644
--- a/Lib/test/test_pyclbr.py
+++ b/Lib/test/test_pyclbr.py
@@ -156,7 +156,7 @@ class PyclbrTest(TestCase):
# These were once about the 10 longest modules
cm('random', ignore=('Random',)) # from _random import Random as CoreGenerator
cm('cgi', ignore=('log',)) # set with = in module
- cm('pickle')
+ cm('pickle', ignore=('partial',))
cm('aifc', ignore=('openfp', '_aifc_params')) # set with = in module
cm('sre_parse', ignore=('dump', 'groups')) # from sre_constants import *; property
cm('pdb')
diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py
index 8ad5706a84..cdc12ed632 100644
--- a/Lib/test/test_pydoc.py
+++ b/Lib/test/test_pydoc.py
@@ -841,6 +841,22 @@ class TestDescriptions(unittest.TestCase):
self.assertEqual(self._get_summary_line(t.wrap),
"wrap(text) method of textwrap.TextWrapper instance")
+ def test_field_order_for_named_tuples(self):
+ Person = namedtuple('Person', ['nickname', 'firstname', 'agegroup'])
+ s = pydoc.render_doc(Person)
+ self.assertLess(s.index('nickname'), s.index('firstname'))
+ self.assertLess(s.index('firstname'), s.index('agegroup'))
+
+ class NonIterableFields:
+ _fields = None
+
+ class NonHashableFields:
+ _fields = [[]]
+
+ # Make sure these doesn't fail
+ pydoc.render_doc(NonIterableFields)
+ pydoc.render_doc(NonHashableFields)
+
@requires_docstrings
def test_bound_builtin_method(self):
s = StringIO()
diff --git a/Lib/test/test_regrtest.py b/Lib/test/test_regrtest.py
index a398a4f836..ab7741f3bf 100644
--- a/Lib/test/test_regrtest.py
+++ b/Lib/test/test_regrtest.py
@@ -1,21 +1,48 @@
"""
Tests of regrtest.py.
+
+Note: test_regrtest cannot be run twice in parallel.
"""
import argparse
+import contextlib
import faulthandler
import getopt
+import io
import os.path
+import platform
+import re
+import subprocess
+import sys
+import textwrap
import unittest
-from test import regrtest, support
+from test import libregrtest
+from test import support
-class ParseArgsTestCase(unittest.TestCase):
- """Test regrtest's argument parsing."""
+Py_DEBUG = hasattr(sys, 'getobjects')
+ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
+ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
+
+TEST_INTERRUPTED = textwrap.dedent("""
+ from signal import SIGINT
+ try:
+ from _testcapi import raise_signal
+ raise_signal(SIGINT)
+ except ImportError:
+ import os
+ os.kill(os.getpid(), SIGINT)
+ """)
+
+
+class ParseArgsTestCase(unittest.TestCase):
+ """
+ Test regrtest's argument parsing, function _parse_args().
+ """
def checkError(self, args, msg):
with support.captured_stderr() as err, self.assertRaises(SystemExit):
- regrtest._parse_args(args)
+ libregrtest._parse_args(args)
self.assertIn(msg, err.getvalue())
def test_help(self):
@@ -23,82 +50,82 @@ class ParseArgsTestCase(unittest.TestCase):
with self.subTest(opt=opt):
with support.captured_stdout() as out, \
self.assertRaises(SystemExit):
- regrtest._parse_args([opt])
+ libregrtest._parse_args([opt])
self.assertIn('Run Python regression tests.', out.getvalue())
@unittest.skipUnless(hasattr(faulthandler, 'dump_traceback_later'),
"faulthandler.dump_traceback_later() required")
def test_timeout(self):
- ns = regrtest._parse_args(['--timeout', '4.2'])
+ ns = libregrtest._parse_args(['--timeout', '4.2'])
self.assertEqual(ns.timeout, 4.2)
self.checkError(['--timeout'], 'expected one argument')
self.checkError(['--timeout', 'foo'], 'invalid float value')
def test_wait(self):
- ns = regrtest._parse_args(['--wait'])
+ ns = libregrtest._parse_args(['--wait'])
self.assertTrue(ns.wait)
def test_slaveargs(self):
- ns = regrtest._parse_args(['--slaveargs', '[[], {}]'])
+ ns = libregrtest._parse_args(['--slaveargs', '[[], {}]'])
self.assertEqual(ns.slaveargs, '[[], {}]')
self.checkError(['--slaveargs'], 'expected one argument')
def test_start(self):
for opt in '-S', '--start':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, 'foo'])
+ ns = libregrtest._parse_args([opt, 'foo'])
self.assertEqual(ns.start, 'foo')
self.checkError([opt], 'expected one argument')
def test_verbose(self):
- ns = regrtest._parse_args(['-v'])
+ ns = libregrtest._parse_args(['-v'])
self.assertEqual(ns.verbose, 1)
- ns = regrtest._parse_args(['-vvv'])
+ ns = libregrtest._parse_args(['-vvv'])
self.assertEqual(ns.verbose, 3)
- ns = regrtest._parse_args(['--verbose'])
+ ns = libregrtest._parse_args(['--verbose'])
self.assertEqual(ns.verbose, 1)
- ns = regrtest._parse_args(['--verbose'] * 3)
+ ns = libregrtest._parse_args(['--verbose'] * 3)
self.assertEqual(ns.verbose, 3)
- ns = regrtest._parse_args([])
+ ns = libregrtest._parse_args([])
self.assertEqual(ns.verbose, 0)
def test_verbose2(self):
for opt in '-w', '--verbose2':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.verbose2)
def test_verbose3(self):
for opt in '-W', '--verbose3':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.verbose3)
def test_quiet(self):
for opt in '-q', '--quiet':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.quiet)
self.assertEqual(ns.verbose, 0)
def test_slow(self):
for opt in '-o', '--slow':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.print_slow)
def test_header(self):
- ns = regrtest._parse_args(['--header'])
+ ns = libregrtest._parse_args(['--header'])
self.assertTrue(ns.header)
def test_randomize(self):
for opt in '-r', '--randomize':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.randomize)
def test_randseed(self):
- ns = regrtest._parse_args(['--randseed', '12345'])
+ ns = libregrtest._parse_args(['--randseed', '12345'])
self.assertEqual(ns.random_seed, 12345)
self.assertTrue(ns.randomize)
self.checkError(['--randseed'], 'expected one argument')
@@ -107,7 +134,7 @@ class ParseArgsTestCase(unittest.TestCase):
def test_fromfile(self):
for opt in '-f', '--fromfile':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, 'foo'])
+ ns = libregrtest._parse_args([opt, 'foo'])
self.assertEqual(ns.fromfile, 'foo')
self.checkError([opt], 'expected one argument')
self.checkError([opt, 'foo', '-s'], "don't go together")
@@ -115,42 +142,42 @@ class ParseArgsTestCase(unittest.TestCase):
def test_exclude(self):
for opt in '-x', '--exclude':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.exclude)
def test_single(self):
for opt in '-s', '--single':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.single)
self.checkError([opt, '-f', 'foo'], "don't go together")
def test_match(self):
for opt in '-m', '--match':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, 'pattern'])
+ ns = libregrtest._parse_args([opt, 'pattern'])
self.assertEqual(ns.match_tests, 'pattern')
self.checkError([opt], 'expected one argument')
def test_failfast(self):
for opt in '-G', '--failfast':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, '-v'])
+ ns = libregrtest._parse_args([opt, '-v'])
self.assertTrue(ns.failfast)
- ns = regrtest._parse_args([opt, '-W'])
+ ns = libregrtest._parse_args([opt, '-W'])
self.assertTrue(ns.failfast)
self.checkError([opt], '-G/--failfast needs either -v or -W')
def test_use(self):
for opt in '-u', '--use':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, 'gui,network'])
+ ns = libregrtest._parse_args([opt, 'gui,network'])
self.assertEqual(ns.use_resources, ['gui', 'network'])
- ns = regrtest._parse_args([opt, 'gui,none,network'])
+ ns = libregrtest._parse_args([opt, 'gui,none,network'])
self.assertEqual(ns.use_resources, ['network'])
- expected = list(regrtest.RESOURCE_NAMES)
+ expected = list(libregrtest.RESOURCE_NAMES)
expected.remove('gui')
- ns = regrtest._parse_args([opt, 'all,-gui'])
+ ns = libregrtest._parse_args([opt, 'all,-gui'])
self.assertEqual(ns.use_resources, expected)
self.checkError([opt], 'expected one argument')
self.checkError([opt, 'foo'], 'invalid resource')
@@ -158,31 +185,31 @@ class ParseArgsTestCase(unittest.TestCase):
def test_memlimit(self):
for opt in '-M', '--memlimit':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, '4G'])
+ ns = libregrtest._parse_args([opt, '4G'])
self.assertEqual(ns.memlimit, '4G')
self.checkError([opt], 'expected one argument')
def test_testdir(self):
- ns = regrtest._parse_args(['--testdir', 'foo'])
+ ns = libregrtest._parse_args(['--testdir', 'foo'])
self.assertEqual(ns.testdir, os.path.join(support.SAVEDCWD, 'foo'))
self.checkError(['--testdir'], 'expected one argument')
def test_runleaks(self):
for opt in '-L', '--runleaks':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.runleaks)
def test_huntrleaks(self):
for opt in '-R', '--huntrleaks':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, ':'])
+ ns = libregrtest._parse_args([opt, ':'])
self.assertEqual(ns.huntrleaks, (5, 4, 'reflog.txt'))
- ns = regrtest._parse_args([opt, '6:'])
+ ns = libregrtest._parse_args([opt, '6:'])
self.assertEqual(ns.huntrleaks, (6, 4, 'reflog.txt'))
- ns = regrtest._parse_args([opt, ':3'])
+ ns = libregrtest._parse_args([opt, ':3'])
self.assertEqual(ns.huntrleaks, (5, 3, 'reflog.txt'))
- ns = regrtest._parse_args([opt, '6:3:leaks.log'])
+ ns = libregrtest._parse_args([opt, '6:3:leaks.log'])
self.assertEqual(ns.huntrleaks, (6, 3, 'leaks.log'))
self.checkError([opt], 'expected one argument')
self.checkError([opt, '6'],
@@ -193,24 +220,23 @@ class ParseArgsTestCase(unittest.TestCase):
def test_multiprocess(self):
for opt in '-j', '--multiprocess':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, '2'])
+ ns = libregrtest._parse_args([opt, '2'])
self.assertEqual(ns.use_mp, 2)
self.checkError([opt], 'expected one argument')
self.checkError([opt, 'foo'], 'invalid int value')
self.checkError([opt, '2', '-T'], "don't go together")
self.checkError([opt, '2', '-l'], "don't go together")
- self.checkError([opt, '2', '-M', '4G'], "don't go together")
def test_coverage(self):
for opt in '-T', '--coverage':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.trace)
def test_coverdir(self):
for opt in '-D', '--coverdir':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, 'foo'])
+ ns = libregrtest._parse_args([opt, 'foo'])
self.assertEqual(ns.coverdir,
os.path.join(support.SAVEDCWD, 'foo'))
self.checkError([opt], 'expected one argument')
@@ -218,13 +244,13 @@ class ParseArgsTestCase(unittest.TestCase):
def test_nocoverdir(self):
for opt in '-N', '--nocoverdir':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertIsNone(ns.coverdir)
def test_threshold(self):
for opt in '-t', '--threshold':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt, '1000'])
+ ns = libregrtest._parse_args([opt, '1000'])
self.assertEqual(ns.threshold, 1000)
self.checkError([opt], 'expected one argument')
self.checkError([opt, 'foo'], 'invalid int value')
@@ -232,13 +258,16 @@ class ParseArgsTestCase(unittest.TestCase):
def test_nowindows(self):
for opt in '-n', '--nowindows':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ with contextlib.redirect_stderr(io.StringIO()) as stderr:
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.nowindows)
+ err = stderr.getvalue()
+ self.assertIn('the --nowindows (-n) option is deprecated', err)
def test_forever(self):
for opt in '-F', '--forever':
with self.subTest(opt=opt):
- ns = regrtest._parse_args([opt])
+ ns = libregrtest._parse_args([opt])
self.assertTrue(ns.forever)
@@ -246,30 +275,469 @@ class ParseArgsTestCase(unittest.TestCase):
self.checkError(['--xxx'], 'usage:')
def test_long_option__partial(self):
- ns = regrtest._parse_args(['--qui'])
+ ns = libregrtest._parse_args(['--qui'])
self.assertTrue(ns.quiet)
self.assertEqual(ns.verbose, 0)
def test_two_options(self):
- ns = regrtest._parse_args(['--quiet', '--exclude'])
+ ns = libregrtest._parse_args(['--quiet', '--exclude'])
self.assertTrue(ns.quiet)
self.assertEqual(ns.verbose, 0)
self.assertTrue(ns.exclude)
def test_option_with_empty_string_value(self):
- ns = regrtest._parse_args(['--start', ''])
+ ns = libregrtest._parse_args(['--start', ''])
self.assertEqual(ns.start, '')
def test_arg(self):
- ns = regrtest._parse_args(['foo'])
+ ns = libregrtest._parse_args(['foo'])
self.assertEqual(ns.args, ['foo'])
def test_option_and_arg(self):
- ns = regrtest._parse_args(['--quiet', 'foo'])
+ ns = libregrtest._parse_args(['--quiet', 'foo'])
self.assertTrue(ns.quiet)
self.assertEqual(ns.verbose, 0)
self.assertEqual(ns.args, ['foo'])
+class BaseTestCase(unittest.TestCase):
+ TEST_UNIQUE_ID = 1
+ TESTNAME_PREFIX = 'test_regrtest_'
+ TESTNAME_REGEX = r'test_[a-z0-9_]+'
+
+ def setUp(self):
+ self.testdir = os.path.join(ROOT_DIR, 'Lib', 'test')
+
+ # When test_regrtest is interrupted by CTRL+c, it can leave
+ # temporary test files
+ remove = [entry.path
+ for entry in os.scandir(self.testdir)
+ if (entry.name.startswith(self.TESTNAME_PREFIX)
+ and entry.name.endswith(".py"))]
+ for path in remove:
+ print("WARNING: test_regrtest: remove %s" % path)
+ support.unlink(path)
+
+ def create_test(self, name=None, code=''):
+ if not name:
+ name = 'noop%s' % BaseTestCase.TEST_UNIQUE_ID
+ BaseTestCase.TEST_UNIQUE_ID += 1
+
+ # test_regrtest cannot be run twice in parallel because
+ # of setUp() and create_test()
+ name = self.TESTNAME_PREFIX + "%s_%s" % (os.getpid(), name)
+ path = os.path.join(self.testdir, name + '.py')
+
+ self.addCleanup(support.unlink, path)
+ # Use 'x' mode to ensure that we do not override existing tests
+ with open(path, 'x', encoding='utf-8') as fp:
+ fp.write(code)
+ return name
+
+ def regex_search(self, regex, output):
+ match = re.search(regex, output, re.MULTILINE)
+ if not match:
+ self.fail("%r not found in %r" % (regex, output))
+ return match
+
+ def check_line(self, output, regex):
+ regex = re.compile(r'^' + regex, re.MULTILINE)
+ self.assertRegex(output, regex)
+
+ def parse_executed_tests(self, output):
+ regex = r'^\[ *[0-9]+(?:/ *[0-9]+)?\] (%s)$' % self.TESTNAME_REGEX
+ parser = re.finditer(regex, output, re.MULTILINE)
+ return list(match.group(1) for match in parser)
+
+ def check_executed_tests(self, output, tests, skipped=(), failed=(),
+ omitted=(), randomize=False):
+ if isinstance(tests, str):
+ tests = [tests]
+ if isinstance(skipped, str):
+ skipped = [skipped]
+ if isinstance(failed, str):
+ failed = [failed]
+ if isinstance(omitted, str):
+ omitted = [omitted]
+ ntest = len(tests)
+ nskipped = len(skipped)
+ nfailed = len(failed)
+ nomitted = len(omitted)
+
+ executed = self.parse_executed_tests(output)
+ if randomize:
+ self.assertEqual(set(executed), set(tests), output)
+ else:
+ self.assertEqual(executed, tests, output)
+
+ def plural(count):
+ return 's' if count != 1 else ''
+
+ def list_regex(line_format, tests):
+ count = len(tests)
+ names = ' '.join(sorted(tests))
+ regex = line_format % (count, plural(count))
+ regex = r'%s:\n %s$' % (regex, names)
+ return regex
+
+ if skipped:
+ regex = list_regex('%s test%s skipped', skipped)
+ self.check_line(output, regex)
+
+ if failed:
+ regex = list_regex('%s test%s failed', failed)
+ self.check_line(output, regex)
+
+ if omitted:
+ regex = list_regex('%s test%s omitted', omitted)
+ self.check_line(output, regex)
+
+ good = ntest - nskipped - nfailed - nomitted
+ if good:
+ regex = r'%s test%s OK\.$' % (good, plural(good))
+ if not skipped and not failed and good > 1:
+ regex = 'All %s' % regex
+ self.check_line(output, regex)
+
+ def parse_random_seed(self, output):
+ match = self.regex_search(r'Using random seed ([0-9]+)', output)
+ randseed = int(match.group(1))
+ self.assertTrue(0 <= randseed <= 10000000, randseed)
+ return randseed
+
+ def run_command(self, args, input=None, exitcode=0, **kw):
+ if not input:
+ input = ''
+ if 'stderr' not in kw:
+ kw['stderr'] = subprocess.PIPE
+ proc = subprocess.run(args,
+ universal_newlines=True,
+ input=input,
+ stdout=subprocess.PIPE,
+ **kw)
+ if proc.returncode != exitcode:
+ msg = ("Command %s failed with exit code %s\n"
+ "\n"
+ "stdout:\n"
+ "---\n"
+ "%s\n"
+ "---\n"
+ % (str(args), proc.returncode, proc.stdout))
+ if proc.stderr:
+ msg += ("\n"
+ "stderr:\n"
+ "---\n"
+ "%s"
+ "---\n"
+ % proc.stderr)
+ self.fail(msg)
+ return proc
+
+
+ def run_python(self, args, **kw):
+ args = [sys.executable, '-X', 'faulthandler', '-I', *args]
+ proc = self.run_command(args, **kw)
+ return proc.stdout
+
+
+class ProgramsTestCase(BaseTestCase):
+ """
+ Test various ways to run the Python test suite. Use options close
+ to options used on the buildbot.
+ """
+
+ NTEST = 4
+
+ def setUp(self):
+ super().setUp()
+
+ # Create NTEST tests doing nothing
+ self.tests = [self.create_test() for index in range(self.NTEST)]
+
+ self.python_args = ['-Wd', '-E', '-bb']
+ self.regrtest_args = ['-uall', '-rwW']
+ if hasattr(faulthandler, 'dump_traceback_later'):
+ self.regrtest_args.extend(('--timeout', '3600', '-j4'))
+ if sys.platform == 'win32':
+ self.regrtest_args.append('-n')
+
+ def check_output(self, output):
+ self.parse_random_seed(output)
+ self.check_executed_tests(output, self.tests, randomize=True)
+
+ def run_tests(self, args):
+ output = self.run_python(args)
+ self.check_output(output)
+
+ def test_script_regrtest(self):
+ # Lib/test/regrtest.py
+ script = os.path.join(ROOT_DIR, 'Lib', 'test', 'regrtest.py')
+
+ args = [*self.python_args, script, *self.regrtest_args, *self.tests]
+ self.run_tests(args)
+
+ def test_module_test(self):
+ # -m test
+ args = [*self.python_args, '-m', 'test',
+ *self.regrtest_args, *self.tests]
+ self.run_tests(args)
+
+ def test_module_regrtest(self):
+ # -m test.regrtest
+ args = [*self.python_args, '-m', 'test.regrtest',
+ *self.regrtest_args, *self.tests]
+ self.run_tests(args)
+
+ def test_module_autotest(self):
+ # -m test.autotest
+ args = [*self.python_args, '-m', 'test.autotest',
+ *self.regrtest_args, *self.tests]
+ self.run_tests(args)
+
+ def test_module_from_test_autotest(self):
+ # from test import autotest
+ code = 'from test import autotest'
+ args = [*self.python_args, '-c', code,
+ *self.regrtest_args, *self.tests]
+ self.run_tests(args)
+
+ def test_script_autotest(self):
+ # Lib/test/autotest.py
+ script = os.path.join(ROOT_DIR, 'Lib', 'test', 'autotest.py')
+ args = [*self.python_args, script, *self.regrtest_args, *self.tests]
+ self.run_tests(args)
+
+ def test_tools_script_run_tests(self):
+ # Tools/scripts/run_tests.py
+ script = os.path.join(ROOT_DIR, 'Tools', 'scripts', 'run_tests.py')
+ self.run_tests([script, *self.tests])
+
+ def run_batch(self, *args):
+ proc = self.run_command(args)
+ self.check_output(proc.stdout)
+
+ @unittest.skipUnless(sys.platform == 'win32', 'Windows only')
+ def test_tools_buildbot_test(self):
+ # Tools\buildbot\test.bat
+ script = os.path.join(ROOT_DIR, 'Tools', 'buildbot', 'test.bat')
+ test_args = []
+ if platform.architecture()[0] == '64bit':
+ test_args.append('-x64') # 64-bit build
+ if not Py_DEBUG:
+ test_args.append('+d') # Release build, use python.exe
+ self.run_batch(script, *test_args, *self.tests)
+
+ @unittest.skipUnless(sys.platform == 'win32', 'Windows only')
+ def test_pcbuild_rt(self):
+ # PCbuild\rt.bat
+ script = os.path.join(ROOT_DIR, r'PCbuild\rt.bat')
+ rt_args = ["-q"] # Quick, don't run tests twice
+ if platform.architecture()[0] == '64bit':
+ rt_args.append('-x64') # 64-bit build
+ if Py_DEBUG:
+ rt_args.append('-d') # Debug build, use python_d.exe
+ self.run_batch(script, *rt_args, *self.regrtest_args, *self.tests)
+
+
+class ArgsTestCase(BaseTestCase):
+ """
+ Test arguments of the Python test suite.
+ """
+
+ def run_tests(self, *args, **kw):
+ return self.run_python(['-m', 'test', *args], **kw)
+
+ def test_failing_test(self):
+ # test a failing test
+ code = textwrap.dedent("""
+ import unittest
+
+ class FailingTest(unittest.TestCase):
+ def test_failing(self):
+ self.fail("bug")
+ """)
+ test_ok = self.create_test()
+ test_failing = self.create_test(code=code)
+ tests = [test_ok, test_failing]
+
+ output = self.run_tests(*tests, exitcode=1)
+ self.check_executed_tests(output, tests, failed=test_failing)
+
+ def test_resources(self):
+ # test -u command line option
+ tests = {}
+ for resource in ('audio', 'network'):
+ code = 'from test import support\nsupport.requires(%r)' % resource
+ tests[resource] = self.create_test(resource, code)
+ test_names = sorted(tests.values())
+
+ # -u all: 2 resources enabled
+ output = self.run_tests('-u', 'all', *test_names)
+ self.check_executed_tests(output, test_names)
+
+ # -u audio: 1 resource enabled
+ output = self.run_tests('-uaudio', *test_names)
+ self.check_executed_tests(output, test_names,
+ skipped=tests['network'])
+
+ # no option: 0 resources enabled
+ output = self.run_tests(*test_names)
+ self.check_executed_tests(output, test_names,
+ skipped=test_names)
+
+ def test_random(self):
+ # test -r and --randseed command line option
+ code = textwrap.dedent("""
+ import random
+ print("TESTRANDOM: %s" % random.randint(1, 1000))
+ """)
+ test = self.create_test('random', code)
+
+ # first run to get the output with the random seed
+ output = self.run_tests('-r', test)
+ randseed = self.parse_random_seed(output)
+ match = self.regex_search(r'TESTRANDOM: ([0-9]+)', output)
+ test_random = int(match.group(1))
+
+ # try to reproduce with the random seed
+ output = self.run_tests('-r', '--randseed=%s' % randseed, test)
+ randseed2 = self.parse_random_seed(output)
+ self.assertEqual(randseed2, randseed)
+
+ match = self.regex_search(r'TESTRANDOM: ([0-9]+)', output)
+ test_random2 = int(match.group(1))
+ self.assertEqual(test_random2, test_random)
+
+ def test_fromfile(self):
+ # test --fromfile
+ tests = [self.create_test() for index in range(5)]
+
+ # Write the list of files using a format similar to regrtest output:
+ # [1/2] test_1
+ # [2/2] test_2
+ filename = support.TESTFN
+ self.addCleanup(support.unlink, filename)
+ with open(filename, "w") as fp:
+ for index, name in enumerate(tests, 1):
+ print("[%s/%s] %s" % (index, len(tests), name), file=fp)
+
+ output = self.run_tests('--fromfile', filename)
+ self.check_executed_tests(output, tests)
+
+ def test_interrupted(self):
+ code = TEST_INTERRUPTED
+ test = self.create_test("sigint", code=code)
+ output = self.run_tests(test, exitcode=1)
+ self.check_executed_tests(output, test, omitted=test)
+
+ def test_slow(self):
+ # test --slow
+ tests = [self.create_test() for index in range(3)]
+ output = self.run_tests("--slow", *tests)
+ self.check_executed_tests(output, tests)
+ regex = ('10 slowest tests:\n'
+ '(?:%s: [0-9]+\.[0-9]+s\n){%s}'
+ % (self.TESTNAME_REGEX, len(tests)))
+ self.check_line(output, regex)
+
+ def test_slow_interrupted(self):
+ # Issue #25373: test --slow with an interrupted test
+ code = TEST_INTERRUPTED
+ test = self.create_test("sigint", code=code)
+
+ for multiprocessing in (False, True):
+ if multiprocessing:
+ args = ("--slow", "-j2", test)
+ else:
+ args = ("--slow", test)
+ output = self.run_tests(*args, exitcode=1)
+ self.check_executed_tests(output, test, omitted=test)
+ regex = ('10 slowest tests:\n')
+ self.check_line(output, regex)
+ self.check_line(output, 'Test suite interrupted by signal SIGINT.')
+
+ def test_coverage(self):
+ # test --coverage
+ test = self.create_test()
+ output = self.run_tests("--coverage", test)
+ self.check_executed_tests(output, [test])
+ regex = ('lines +cov% +module +\(path\)\n'
+ '(?: *[0-9]+ *[0-9]{1,2}% *[^ ]+ +\([^)]+\)+)+')
+ self.check_line(output, regex)
+
+ def test_wait(self):
+ # test --wait
+ test = self.create_test()
+ output = self.run_tests("--wait", test, input='key')
+ self.check_line(output, 'Press any key to continue')
+
+ def test_forever(self):
+ # test --forever
+ code = textwrap.dedent("""
+ import unittest
+
+ class ForeverTester(unittest.TestCase):
+ RUN = 1
+
+ def test_run(self):
+ ForeverTester.RUN += 1
+ if ForeverTester.RUN > 3:
+ self.fail("fail at the 3rd runs")
+ """)
+ test = self.create_test(code=code)
+ output = self.run_tests('--forever', test, exitcode=1)
+ self.check_executed_tests(output, [test]*3, failed=test)
+
+ @unittest.skipUnless(Py_DEBUG, 'need a debug build')
+ def test_huntrleaks_fd_leak(self):
+ # test --huntrleaks for file descriptor leak
+ code = textwrap.dedent("""
+ import os
+ import unittest
+
+ # Issue #25306: Disable popups and logs to stderr on assertion
+ # failures in MSCRT
+ try:
+ import msvcrt
+ msvcrt.CrtSetReportMode
+ except (ImportError, AttributeError):
+ # no Windows, o release build
+ pass
+ else:
+ for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
+ msvcrt.CrtSetReportMode(m, 0)
+
+ class FDLeakTest(unittest.TestCase):
+ def test_leak(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ # bug: never cloes the file descriptor
+ """)
+ test = self.create_test(code=code)
+
+ filename = 'reflog.txt'
+ self.addCleanup(support.unlink, filename)
+ output = self.run_tests('--huntrleaks', '3:3:', test,
+ exitcode=1,
+ stderr=subprocess.STDOUT)
+ self.check_executed_tests(output, [test], failed=test)
+
+ line = 'beginning 6 repetitions\n123456\n......\n'
+ self.check_line(output, re.escape(line))
+
+ line2 = '%s leaked [1, 1, 1] file descriptors, sum=3\n' % test
+ self.check_line(output, re.escape(line2))
+
+ with open(filename) as fp:
+ reflog = fp.read()
+ self.assertEqual(reflog, line2)
+
+ def test_list_tests(self):
+ # test --list-tests
+ tests = [self.create_test() for i in range(5)]
+ output = self.run_tests('--list-tests', *tests)
+ self.assertEqual(output.rstrip().splitlines(),
+ tests)
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_richcmp.py b/Lib/test/test_richcmp.py
index 1582caad97..58729a9fea 100644
--- a/Lib/test/test_richcmp.py
+++ b/Lib/test/test_richcmp.py
@@ -253,6 +253,31 @@ class MiscTest(unittest.TestCase):
self.assertTrue(a != b)
self.assertTrue(a < b)
+ def test_exception_message(self):
+ class Spam:
+ pass
+
+ tests = [
+ (lambda: 42 < None, r"'<' .* of 'int' and 'NoneType'"),
+ (lambda: None < 42, r"'<' .* of 'NoneType' and 'int'"),
+ (lambda: 42 > None, r"'>' .* of 'int' and 'NoneType'"),
+ (lambda: "foo" < None, r"'<' .* of 'str' and 'NoneType'"),
+ (lambda: "foo" >= 666, r"'>=' .* of 'str' and 'int'"),
+ (lambda: 42 <= None, r"'<=' .* of 'int' and 'NoneType'"),
+ (lambda: 42 >= None, r"'>=' .* of 'int' and 'NoneType'"),
+ (lambda: 42 < [], r"'<' .* of 'int' and 'list'"),
+ (lambda: () > [], r"'>' .* of 'tuple' and 'list'"),
+ (lambda: None >= None, r"'>=' .* of 'NoneType' and 'NoneType'"),
+ (lambda: Spam() < 42, r"'<' .* of 'Spam' and 'int'"),
+ (lambda: 42 < Spam(), r"'<' .* of 'int' and 'Spam'"),
+ (lambda: Spam() <= Spam(), r"'<=' .* of 'Spam' and 'Spam'"),
+ ]
+ for i, test in enumerate(tests):
+ with self.subTest(test=i):
+ with self.assertRaisesRegex(TypeError, test[1]):
+ test[0]()
+
+
class DictTest(unittest.TestCase):
def test_dicts(self):
diff --git a/Lib/test/test_rlcompleter.py b/Lib/test/test_rlcompleter.py
index d37b620b7a..2ff0788429 100644
--- a/Lib/test/test_rlcompleter.py
+++ b/Lib/test/test_rlcompleter.py
@@ -5,6 +5,7 @@ import rlcompleter
class CompleteMe:
""" Trivial class used in testing rlcompleter.Completer. """
spam = 1
+ _ham = 2
class TestRlcompleter(unittest.TestCase):
@@ -51,11 +52,25 @@ class TestRlcompleter(unittest.TestCase):
['str.{}('.format(x) for x in dir(str)
if x.startswith('s')])
self.assertEqual(self.stdcompleter.attr_matches('tuple.foospamegg'), [])
+ expected = sorted({'None.%s%s' % (x, '(' if x != '__doc__' else '')
+ for x in dir(None)})
+ self.assertEqual(self.stdcompleter.attr_matches('None.'), expected)
+ self.assertEqual(self.stdcompleter.attr_matches('None._'), expected)
+ self.assertEqual(self.stdcompleter.attr_matches('None.__'), expected)
# test with a customized namespace
self.assertEqual(self.completer.attr_matches('CompleteMe.sp'),
['CompleteMe.spam'])
self.assertEqual(self.completer.attr_matches('Completeme.egg'), [])
+ self.assertEqual(self.completer.attr_matches('CompleteMe.'),
+ ['CompleteMe.mro(', 'CompleteMe.spam'])
+ self.assertEqual(self.completer.attr_matches('CompleteMe._'),
+ ['CompleteMe._ham'])
+ matches = self.completer.attr_matches('CompleteMe.__')
+ for x in matches:
+ self.assertTrue(x.startswith('CompleteMe.__'), x)
+ self.assertIn('CompleteMe.__name__', matches)
+ self.assertIn('CompleteMe.__new__(', matches)
CompleteMe.me = CompleteMe
self.assertEqual(self.completer.attr_matches('CompleteMe.me.me.sp'),
@@ -67,10 +82,15 @@ class TestRlcompleter(unittest.TestCase):
def test_complete(self):
completer = rlcompleter.Completer()
self.assertEqual(completer.complete('', 0), '\t')
- self.assertEqual(completer.complete('a', 0), 'and')
- self.assertEqual(completer.complete('a', 1), 'as')
- self.assertEqual(completer.complete('as', 2), 'assert')
- self.assertEqual(completer.complete('an', 0), 'and')
+ self.assertEqual(completer.complete('a', 0), 'and ')
+ self.assertEqual(completer.complete('a', 1), 'as ')
+ self.assertEqual(completer.complete('as', 2), 'assert ')
+ self.assertEqual(completer.complete('an', 0), 'and ')
+ self.assertEqual(completer.complete('pa', 0), 'pass')
+ self.assertEqual(completer.complete('Fa', 0), 'False')
+ self.assertEqual(completer.complete('el', 0), 'elif ')
+ self.assertEqual(completer.complete('el', 1), 'else')
+ self.assertEqual(completer.complete('tr', 0), 'try:')
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_robotparser.py b/Lib/test/test_robotparser.py
index d01266f330..90b30722da 100644
--- a/Lib/test/test_robotparser.py
+++ b/Lib/test/test_robotparser.py
@@ -1,6 +1,7 @@
import io
import unittest
import urllib.robotparser
+from collections import namedtuple
from urllib.error import URLError, HTTPError
from urllib.request import urlopen
from test import support
@@ -12,7 +13,8 @@ except ImportError:
class RobotTestCase(unittest.TestCase):
- def __init__(self, index=None, parser=None, url=None, good=None, agent=None):
+ def __init__(self, index=None, parser=None, url=None, good=None,
+ agent=None, request_rate=None, crawl_delay=None):
# workaround to make unittest discovery work (see #17066)
if not isinstance(index, int):
return
@@ -25,6 +27,8 @@ class RobotTestCase(unittest.TestCase):
self.url = url
self.good = good
self.agent = agent
+ self.request_rate = request_rate
+ self.crawl_delay = crawl_delay
def runTest(self):
if isinstance(self.url, tuple):
@@ -34,6 +38,18 @@ class RobotTestCase(unittest.TestCase):
agent = self.agent
if self.good:
self.assertTrue(self.parser.can_fetch(agent, url))
+ self.assertEqual(self.parser.crawl_delay(agent), self.crawl_delay)
+ # if we have actual values for request rate
+ if self.request_rate and self.parser.request_rate(agent):
+ self.assertEqual(
+ self.parser.request_rate(agent).requests,
+ self.request_rate.requests
+ )
+ self.assertEqual(
+ self.parser.request_rate(agent).seconds,
+ self.request_rate.seconds
+ )
+ self.assertEqual(self.parser.request_rate(agent), self.request_rate)
else:
self.assertFalse(self.parser.can_fetch(agent, url))
@@ -43,15 +59,17 @@ class RobotTestCase(unittest.TestCase):
tests = unittest.TestSuite()
def RobotTest(index, robots_txt, good_urls, bad_urls,
- agent="test_robotparser"):
+ request_rate, crawl_delay, agent="test_robotparser"):
lines = io.StringIO(robots_txt).readlines()
parser = urllib.robotparser.RobotFileParser()
parser.parse(lines)
for url in good_urls:
- tests.addTest(RobotTestCase(index, parser, url, 1, agent))
+ tests.addTest(RobotTestCase(index, parser, url, 1, agent,
+ request_rate, crawl_delay))
for url in bad_urls:
- tests.addTest(RobotTestCase(index, parser, url, 0, agent))
+ tests.addTest(RobotTestCase(index, parser, url, 0, agent,
+ request_rate, crawl_delay))
# Examples from http://www.robotstxt.org/wc/norobots.html (fetched 2002)
@@ -65,14 +83,18 @@ Disallow: /foo.html
good = ['/','/test.html']
bad = ['/cyberworld/map/index.html','/tmp/xxx','/foo.html']
+request_rate = None
+crawl_delay = None
-RobotTest(1, doc, good, bad)
+RobotTest(1, doc, good, bad, request_rate, crawl_delay)
# 2.
doc = """
# robots.txt for http://www.example.com/
User-agent: *
+Crawl-delay: 1
+Request-rate: 3/15
Disallow: /cyberworld/map/ # This is an infinite virtual URL space
# Cybermapper knows where to go.
@@ -83,8 +105,10 @@ Disallow:
good = ['/','/test.html',('cybermapper','/cyberworld/map/index.html')]
bad = ['/cyberworld/map/index.html']
+request_rate = None # The parameters should be equal to None since they
+crawl_delay = None # don't apply to the cybermapper user agent
-RobotTest(2, doc, good, bad)
+RobotTest(2, doc, good, bad, request_rate, crawl_delay)
# 3.
doc = """
@@ -95,14 +119,18 @@ Disallow: /
good = []
bad = ['/cyberworld/map/index.html','/','/tmp/']
+request_rate = None
+crawl_delay = None
-RobotTest(3, doc, good, bad)
+RobotTest(3, doc, good, bad, request_rate, crawl_delay)
# Examples from http://www.robotstxt.org/wc/norobots-rfc.html (fetched 2002)
# 4.
doc = """
User-agent: figtree
+Crawl-delay: 3
+Request-rate: 9/30
Disallow: /tmp
Disallow: /a%3cd.html
Disallow: /a%2fb.html
@@ -115,8 +143,17 @@ bad = ['/tmp','/tmp.html','/tmp/a.html',
'/~joe/index.html'
]
-RobotTest(4, doc, good, bad, 'figtree')
-RobotTest(5, doc, good, bad, 'FigTree Robot libwww-perl/5.04')
+request_rate = namedtuple('req_rate', 'requests seconds')
+request_rate.requests = 9
+request_rate.seconds = 30
+crawl_delay = 3
+request_rate_bad = None # not actually tested, but we still need to parse it
+crawl_delay_bad = None # in order to accommodate the input parameters
+
+
+RobotTest(4, doc, good, bad, request_rate, crawl_delay, 'figtree' )
+RobotTest(5, doc, good, bad, request_rate_bad, crawl_delay_bad,
+ 'FigTree Robot libwww-perl/5.04')
# 6.
doc = """
@@ -125,14 +162,18 @@ Disallow: /tmp/
Disallow: /a%3Cd.html
Disallow: /a/b.html
Disallow: /%7ejoe/index.html
+Crawl-delay: 3
+Request-rate: 9/banana
"""
good = ['/tmp',] # XFAIL: '/a%2fb.html'
bad = ['/tmp/','/tmp/a.html',
'/a%3cd.html','/a%3Cd.html',"/a/b.html",
'/%7Ejoe/index.html']
+crawl_delay = 3
+request_rate = None # since request rate has invalid syntax, return None
-RobotTest(6, doc, good, bad)
+RobotTest(6, doc, good, bad, None, None)
# From bug report #523041
@@ -140,12 +181,16 @@ RobotTest(6, doc, good, bad)
doc = """
User-Agent: *
Disallow: /.
+Crawl-delay: pears
"""
good = ['/foo.html']
-bad = [] # Bug report says "/" should be denied, but that is not in the RFC
+bad = [] # bug report says "/" should be denied, but that is not in the RFC
+
+crawl_delay = None # since crawl delay has invalid syntax, return None
+request_rate = None
-RobotTest(7, doc, good, bad)
+RobotTest(7, doc, good, bad, crawl_delay, request_rate)
# From Google: http://www.google.com/support/webmasters/bin/answer.py?hl=en&answer=40364
@@ -154,12 +199,15 @@ doc = """
User-agent: Googlebot
Allow: /folder1/myfile.html
Disallow: /folder1/
+Request-rate: whale/banana
"""
good = ['/folder1/myfile.html']
bad = ['/folder1/anotherfile.html']
+crawl_delay = None
+request_rate = None # invalid syntax, return none
-RobotTest(8, doc, good, bad, agent="Googlebot")
+RobotTest(8, doc, good, bad, crawl_delay, request_rate, agent="Googlebot")
# 9. This file is incorrect because "Googlebot" is a substring of
# "Googlebot-Mobile", so test 10 works just like test 9.
@@ -174,12 +222,12 @@ Allow: /
good = []
bad = ['/something.jpg']
-RobotTest(9, doc, good, bad, agent="Googlebot")
+RobotTest(9, doc, good, bad, None, None, agent="Googlebot")
good = []
bad = ['/something.jpg']
-RobotTest(10, doc, good, bad, agent="Googlebot-Mobile")
+RobotTest(10, doc, good, bad, None, None, agent="Googlebot-Mobile")
# 11. Get the order correct.
doc = """
@@ -193,12 +241,12 @@ Disallow: /
good = []
bad = ['/something.jpg']
-RobotTest(11, doc, good, bad, agent="Googlebot")
+RobotTest(11, doc, good, bad, None, None, agent="Googlebot")
good = ['/something.jpg']
bad = []
-RobotTest(12, doc, good, bad, agent="Googlebot-Mobile")
+RobotTest(12, doc, good, bad, None, None, agent="Googlebot-Mobile")
# 13. Google also got the order wrong in #8. You need to specify the
@@ -212,7 +260,7 @@ Disallow: /folder1/
good = ['/folder1/myfile.html']
bad = ['/folder1/anotherfile.html']
-RobotTest(13, doc, good, bad, agent="googlebot")
+RobotTest(13, doc, good, bad, None, None, agent="googlebot")
# 14. For issue #6325 (query string support)
@@ -224,7 +272,7 @@ Disallow: /some/path?name=value
good = ['/some/path']
bad = ['/some/path?name=value']
-RobotTest(14, doc, good, bad)
+RobotTest(14, doc, good, bad, None, None)
# 15. For issue #4108 (obey first * entry)
doc = """
@@ -238,7 +286,7 @@ Disallow: /another/path
good = ['/another/path']
bad = ['/some/path']
-RobotTest(15, doc, good, bad)
+RobotTest(15, doc, good, bad, None, None)
# 16. Empty query (issue #17403). Normalizing the url first.
doc = """
@@ -250,7 +298,7 @@ Disallow: /another/path?
good = ['/some/path?']
bad = ['/another/path?']
-RobotTest(16, doc, good, bad)
+RobotTest(16, doc, good, bad, None, None)
class RobotHandler(BaseHTTPRequestHandler):
diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py
index 54de508a83..ade39fb758 100644
--- a/Lib/test/test_set.py
+++ b/Lib/test/test_set.py
@@ -10,6 +10,8 @@ import sys
import warnings
import collections
import collections.abc
+import itertools
+import string
class PassThru(Exception):
pass
@@ -711,6 +713,28 @@ class TestFrozenSet(TestJointOps, unittest.TestCase):
addhashvalue(hash(frozenset([e for e, m in elemmasks if m&i])))
self.assertEqual(len(hashvalues), 2**n)
+ def letter_range(n):
+ return string.ascii_letters[:n]
+
+ def zf_range(n):
+ # https://en.wikipedia.org/wiki/Set-theoretic_definition_of_natural_numbers
+ nums = [frozenset()]
+ for i in range(n-1):
+ num = frozenset(nums)
+ nums.append(num)
+ return nums[:n]
+
+ def powerset(s):
+ for i in range(len(s)+1):
+ yield from map(frozenset, itertools.combinations(s, i))
+
+ for n in range(18):
+ t = 2 ** n
+ mask = t - 1
+ for nums in (range, letter_range, zf_range):
+ u = len({h & mask for h in map(hash, powerset(nums(n)))})
+ self.assertGreater(4*u, t)
+
class FrozenSetSubclass(frozenset):
pass
diff --git a/Lib/test/test_strptime.py b/Lib/test/test_strptime.py
index 346e2c63f8..6b26c8a55b 100644
--- a/Lib/test/test_strptime.py
+++ b/Lib/test/test_strptime.py
@@ -152,8 +152,8 @@ class TimeRETests(unittest.TestCase):
"'%s' using '%s'; group 'a' = '%s', group 'b' = %s'" %
(found.string, found.re.pattern, found.group('a'),
found.group('b')))
- for directive in ('a','A','b','B','c','d','H','I','j','m','M','p','S',
- 'U','w','W','x','X','y','Y','Z','%'):
+ for directive in ('a','A','b','B','c','d','G','H','I','j','m','M','p',
+ 'S','u','U','V','w','W','x','X','y','Y','Z','%'):
compiled = self.time_re.compile("%" + directive)
found = compiled.match(time.strftime("%" + directive))
self.assertTrue(found, "Matching failed on '%s' using '%s' regex" %
@@ -218,6 +218,26 @@ class StrptimeTests(unittest.TestCase):
else:
self.fail("'%s' did not raise ValueError" % bad_format)
+ # Ambiguous or incomplete cases using ISO year/week/weekday directives
+ # 1. ISO week (%V) is specified, but the year is specified with %Y
+ # instead of %G
+ with self.assertRaises(ValueError):
+ _strptime._strptime("1999 50", "%Y %V")
+ # 2. ISO year (%G) and ISO week (%V) are specified, but weekday is not
+ with self.assertRaises(ValueError):
+ _strptime._strptime("1999 51", "%G %V")
+ # 3. ISO year (%G) and weekday are specified, but ISO week (%V) is not
+ for w in ('A', 'a', 'w', 'u'):
+ with self.assertRaises(ValueError):
+ _strptime._strptime("1999 51","%G %{}".format(w))
+ # 4. ISO year is specified alone (e.g. time.strptime('2015', '%G'))
+ with self.assertRaises(ValueError):
+ _strptime._strptime("2015", "%G")
+ # 5. Julian/ordinal day (%j) is specified with %G, but not %Y
+ with self.assertRaises(ValueError):
+ _strptime._strptime("1999 256", "%G %j")
+
+
def test_strptime_exception_context(self):
# check that this doesn't chain exceptions needlessly (see #17572)
with self.assertRaises(ValueError) as e:
@@ -289,7 +309,7 @@ class StrptimeTests(unittest.TestCase):
def test_weekday(self):
# Test weekday directives
- for directive in ('A', 'a', 'w'):
+ for directive in ('A', 'a', 'w', 'u'):
self.helper(directive,6)
def test_julian(self):
@@ -458,16 +478,20 @@ class CalculationTests(unittest.TestCase):
# Should be able to infer date if given year, week of year (%U or %W)
# and day of the week
def test_helper(ymd_tuple, test_reason):
- for directive in ('W', 'U'):
- format_string = "%%Y %%%s %%w" % directive
- dt_date = datetime_date(*ymd_tuple)
- strp_input = dt_date.strftime(format_string)
- strp_output = _strptime._strptime_time(strp_input, format_string)
- self.assertTrue(strp_output[:3] == ymd_tuple,
- "%s(%s) test failed w/ '%s': %s != %s (%s != %s)" %
- (test_reason, directive, strp_input,
- strp_output[:3], ymd_tuple,
- strp_output[7], dt_date.timetuple()[7]))
+ for year_week_format in ('%Y %W', '%Y %U', '%G %V'):
+ for weekday_format in ('%w', '%u', '%a', '%A'):
+ format_string = year_week_format + ' ' + weekday_format
+ with self.subTest(test_reason,
+ date=ymd_tuple,
+ format=format_string):
+ dt_date = datetime_date(*ymd_tuple)
+ strp_input = dt_date.strftime(format_string)
+ strp_output = _strptime._strptime_time(strp_input,
+ format_string)
+ msg = "%r: %s != %s" % (strp_input,
+ strp_output[7],
+ dt_date.timetuple()[7])
+ self.assertEqual(strp_output[:3], ymd_tuple, msg)
test_helper((1901, 1, 3), "week 0")
test_helper((1901, 1, 8), "common case")
test_helper((1901, 1, 13), "day on Sunday")
@@ -499,18 +523,25 @@ class CalculationTests(unittest.TestCase):
self.assertEqual(_strptime._strptime_time(value, format)[:-1], expected)
check('2015 0 0', '%Y %U %w', 2014, 12, 28, 0, 0, 0, 6, -3)
check('2015 0 0', '%Y %W %w', 2015, 1, 4, 0, 0, 0, 6, 4)
+ check('2015 1 1', '%G %V %u', 2014, 12, 29, 0, 0, 0, 0, 363)
check('2015 0 1', '%Y %U %w', 2014, 12, 29, 0, 0, 0, 0, -2)
check('2015 0 1', '%Y %W %w', 2014, 12, 29, 0, 0, 0, 0, -2)
+ check('2015 1 2', '%G %V %u', 2014, 12, 30, 0, 0, 0, 1, 364)
check('2015 0 2', '%Y %U %w', 2014, 12, 30, 0, 0, 0, 1, -1)
check('2015 0 2', '%Y %W %w', 2014, 12, 30, 0, 0, 0, 1, -1)
+ check('2015 1 3', '%G %V %u', 2014, 12, 31, 0, 0, 0, 2, 365)
check('2015 0 3', '%Y %U %w', 2014, 12, 31, 0, 0, 0, 2, 0)
check('2015 0 3', '%Y %W %w', 2014, 12, 31, 0, 0, 0, 2, 0)
+ check('2015 1 4', '%G %V %u', 2015, 1, 1, 0, 0, 0, 3, 1)
check('2015 0 4', '%Y %U %w', 2015, 1, 1, 0, 0, 0, 3, 1)
check('2015 0 4', '%Y %W %w', 2015, 1, 1, 0, 0, 0, 3, 1)
+ check('2015 1 5', '%G %V %u', 2015, 1, 2, 0, 0, 0, 4, 2)
check('2015 0 5', '%Y %U %w', 2015, 1, 2, 0, 0, 0, 4, 2)
check('2015 0 5', '%Y %W %w', 2015, 1, 2, 0, 0, 0, 4, 2)
+ check('2015 1 6', '%G %V %u', 2015, 1, 3, 0, 0, 0, 5, 3)
check('2015 0 6', '%Y %U %w', 2015, 1, 3, 0, 0, 0, 5, 3)
check('2015 0 6', '%Y %W %w', 2015, 1, 3, 0, 0, 0, 5, 3)
+ check('2015 1 7', '%G %V %u', 2015, 1, 4, 0, 0, 0, 6, 4)
class CacheTests(unittest.TestCase):
diff --git a/Lib/test/test_symbol.py b/Lib/test/test_symbol.py
new file mode 100644
index 0000000000..2dcb9de8b0
--- /dev/null
+++ b/Lib/test/test_symbol.py
@@ -0,0 +1,55 @@
+import unittest
+from test import support
+import filecmp
+import os
+import sys
+import subprocess
+
+
+SYMBOL_FILE = support.findfile('symbol.py')
+GRAMMAR_FILE = os.path.join(os.path.dirname(__file__),
+ '..', '..', 'Include', 'graminit.h')
+TEST_PY_FILE = 'symbol_test.py'
+
+
+class TestSymbolGeneration(unittest.TestCase):
+
+ def _copy_file_without_generated_symbols(self, source_file, dest_file):
+ with open(source_file) as fp:
+ lines = fp.readlines()
+ with open(dest_file, 'w') as fp:
+ fp.writelines(lines[:lines.index("#--start constants--\n") + 1])
+ fp.writelines(lines[lines.index("#--end constants--\n"):])
+
+ def _generate_symbols(self, grammar_file, target_symbol_py_file):
+ proc = subprocess.Popen([sys.executable,
+ SYMBOL_FILE,
+ grammar_file,
+ target_symbol_py_file], stderr=subprocess.PIPE)
+ stderr = proc.communicate()[1]
+ return proc.returncode, stderr
+
+ def compare_files(self, file1, file2):
+ with open(file1) as fp:
+ lines1 = fp.readlines()
+ with open(file2) as fp:
+ lines2 = fp.readlines()
+ self.assertEqual(lines1, lines2)
+
+ @unittest.skipIf(not os.path.exists(GRAMMAR_FILE),
+ 'test only works from source build directory')
+ def test_real_grammar_and_symbol_file(self):
+ output = support.TESTFN
+ self.addCleanup(support.unlink, output)
+
+ self._copy_file_without_generated_symbols(SYMBOL_FILE, output)
+
+ exitcode, stderr = self._generate_symbols(GRAMMAR_FILE, output)
+ self.assertEqual(b'', stderr)
+ self.assertEqual(0, exitcode)
+
+ self.compare_files(SYMBOL_FILE, output)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_time.py b/Lib/test/test_time.py
index 76b894eece..f883c45d04 100644
--- a/Lib/test/test_time.py
+++ b/Lib/test/test_time.py
@@ -1,6 +1,8 @@
from test import support
+import decimal
import enum
import locale
+import math
import platform
import sys
import sysconfig
@@ -21,17 +23,27 @@ SIZEOF_INT = sysconfig.get_config_var('SIZEOF_INT') or 4
TIME_MAXYEAR = (1 << 8 * SIZEOF_INT - 1) - 1
TIME_MINYEAR = -TIME_MAXYEAR - 1
+SEC_TO_US = 10 ** 6
US_TO_NS = 10 ** 3
MS_TO_NS = 10 ** 6
SEC_TO_NS = 10 ** 9
+NS_TO_SEC = 10 ** 9
class _PyTime(enum.IntEnum):
# Round towards minus infinity (-inf)
ROUND_FLOOR = 0
# Round towards infinity (+inf)
ROUND_CEILING = 1
+ # Round to nearest with ties going to nearest even integer
+ ROUND_HALF_EVEN = 2
-ALL_ROUNDING_METHODS = (_PyTime.ROUND_FLOOR, _PyTime.ROUND_CEILING)
+# Rounding modes supported by PyTime
+ROUNDING_MODES = (
+ # (PyTime rounding method, decimal rounding method)
+ (_PyTime.ROUND_FLOOR, decimal.ROUND_FLOOR),
+ (_PyTime.ROUND_CEILING, decimal.ROUND_CEILING),
+ (_PyTime.ROUND_HALF_EVEN, decimal.ROUND_HALF_EVEN),
+)
class TimeTestCase(unittest.TestCase):
@@ -607,79 +619,6 @@ class TestStrftime4dyear(_TestStrftimeYear, _Test4dYear, unittest.TestCase):
class TestPytime(unittest.TestCase):
- def setUp(self):
- self.invalid_values = (
- -(2 ** 100), 2 ** 100,
- -(2.0 ** 100.0), 2.0 ** 100.0,
- )
-
- @support.cpython_only
- def test_time_t(self):
- from _testcapi import pytime_object_to_time_t
- for obj, time_t, rnd in (
- # Round towards minus infinity (-inf)
- (0, 0, _PyTime.ROUND_FLOOR),
- (-1, -1, _PyTime.ROUND_FLOOR),
- (-1.0, -1, _PyTime.ROUND_FLOOR),
- (-1.9, -2, _PyTime.ROUND_FLOOR),
- (1.0, 1, _PyTime.ROUND_FLOOR),
- (1.9, 1, _PyTime.ROUND_FLOOR),
- # Round towards infinity (+inf)
- (0, 0, _PyTime.ROUND_CEILING),
- (-1, -1, _PyTime.ROUND_CEILING),
- (-1.0, -1, _PyTime.ROUND_CEILING),
- (-1.9, -1, _PyTime.ROUND_CEILING),
- (1.0, 1, _PyTime.ROUND_CEILING),
- (1.9, 2, _PyTime.ROUND_CEILING),
- ):
- self.assertEqual(pytime_object_to_time_t(obj, rnd), time_t)
-
- rnd = _PyTime.ROUND_FLOOR
- for invalid in self.invalid_values:
- self.assertRaises(OverflowError,
- pytime_object_to_time_t, invalid, rnd)
-
- @support.cpython_only
- def test_timespec(self):
- from _testcapi import pytime_object_to_timespec
- for obj, timespec, rnd in (
- # Round towards minus infinity (-inf)
- (0, (0, 0), _PyTime.ROUND_FLOOR),
- (-1, (-1, 0), _PyTime.ROUND_FLOOR),
- (-1.0, (-1, 0), _PyTime.ROUND_FLOOR),
- (1e-9, (0, 1), _PyTime.ROUND_FLOOR),
- (1e-10, (0, 0), _PyTime.ROUND_FLOOR),
- (-1e-9, (-1, 999999999), _PyTime.ROUND_FLOOR),
- (-1e-10, (-1, 999999999), _PyTime.ROUND_FLOOR),
- (-1.2, (-2, 800000000), _PyTime.ROUND_FLOOR),
- (0.9999999999, (0, 999999999), _PyTime.ROUND_FLOOR),
- (1.1234567890, (1, 123456789), _PyTime.ROUND_FLOOR),
- (1.1234567899, (1, 123456789), _PyTime.ROUND_FLOOR),
- (-1.1234567890, (-2, 876543211), _PyTime.ROUND_FLOOR),
- (-1.1234567891, (-2, 876543210), _PyTime.ROUND_FLOOR),
- # Round towards infinity (+inf)
- (0, (0, 0), _PyTime.ROUND_CEILING),
- (-1, (-1, 0), _PyTime.ROUND_CEILING),
- (-1.0, (-1, 0), _PyTime.ROUND_CEILING),
- (1e-9, (0, 1), _PyTime.ROUND_CEILING),
- (1e-10, (0, 1), _PyTime.ROUND_CEILING),
- (-1e-9, (-1, 999999999), _PyTime.ROUND_CEILING),
- (-1e-10, (0, 0), _PyTime.ROUND_CEILING),
- (-1.2, (-2, 800000000), _PyTime.ROUND_CEILING),
- (0.9999999999, (1, 0), _PyTime.ROUND_CEILING),
- (1.1234567890, (1, 123456790), _PyTime.ROUND_CEILING),
- (1.1234567899, (1, 123456790), _PyTime.ROUND_CEILING),
- (-1.1234567890, (-2, 876543211), _PyTime.ROUND_CEILING),
- (-1.1234567891, (-2, 876543211), _PyTime.ROUND_CEILING),
- ):
- with self.subTest(obj=obj, round=rnd, timespec=timespec):
- self.assertEqual(pytime_object_to_timespec(obj, rnd), timespec)
-
- rnd = _PyTime.ROUND_FLOOR
- for invalid in self.invalid_values:
- self.assertRaises(OverflowError,
- pytime_object_to_timespec, invalid, rnd)
-
@unittest.skipUnless(time._STRUCT_TM_ITEMS == 11, "needs tm_zone support")
def test_localtime_timezone(self):
@@ -734,266 +673,291 @@ class TestPytime(unittest.TestCase):
self.assertIs(lt.tm_zone, None)
-@unittest.skipUnless(_testcapi is not None,
- 'need the _testcapi module')
-class TestPyTime_t(unittest.TestCase):
+@unittest.skipIf(_testcapi is None, 'need the _testcapi module')
+class CPyTimeTestCase:
+ """
+ Base class to test the C _PyTime_t API.
+ """
+ OVERFLOW_SECONDS = None
+
+ def setUp(self):
+ from _testcapi import SIZEOF_TIME_T
+ bits = SIZEOF_TIME_T * 8 - 1
+ self.time_t_min = -2 ** bits
+ self.time_t_max = 2 ** bits - 1
+
+ def time_t_filter(self, seconds):
+ return (self.time_t_min <= seconds <= self.time_t_max)
+
+ def _rounding_values(self, use_float):
+ "Build timestamps used to test rounding."
+
+ units = [1, US_TO_NS, MS_TO_NS, SEC_TO_NS]
+ if use_float:
+ # picoseconds are only tested to pytime_converter accepting floats
+ units.append(1e-3)
+
+ values = (
+ # small values
+ 1, 2, 5, 7, 123, 456, 1234,
+ # 10^k - 1
+ 9,
+ 99,
+ 999,
+ 9999,
+ 99999,
+ 999999,
+ # test half even rounding near 0.5, 1.5, 2.5, 3.5, 4.5
+ 499, 500, 501,
+ 1499, 1500, 1501,
+ 2500,
+ 3500,
+ 4500,
+ )
+
+ ns_timestamps = [0]
+ for unit in units:
+ for value in values:
+ ns = value * unit
+ ns_timestamps.extend((-ns, ns))
+ for pow2 in (0, 5, 10, 15, 22, 23, 24, 30, 33):
+ ns = (2 ** pow2) * SEC_TO_NS
+ ns_timestamps.extend((
+ -ns-1, -ns, -ns+1,
+ ns-1, ns, ns+1
+ ))
+ for seconds in (_testcapi.INT_MIN, _testcapi.INT_MAX):
+ ns_timestamps.append(seconds * SEC_TO_NS)
+ if use_float:
+ # numbers with an extract representation in IEEE 754 (base 2)
+ for pow2 in (3, 7, 10, 15):
+ ns = 2.0 ** (-pow2)
+ ns_timestamps.extend((-ns, ns))
+
+ # seconds close to _PyTime_t type limit
+ ns = (2 ** 63 // SEC_TO_NS) * SEC_TO_NS
+ ns_timestamps.extend((-ns, ns))
+
+ return ns_timestamps
+
+ def _check_rounding(self, pytime_converter, expected_func,
+ use_float, unit_to_sec, value_filter=None):
+
+ def convert_values(ns_timestamps):
+ if use_float:
+ unit_to_ns = SEC_TO_NS / float(unit_to_sec)
+ values = [ns / unit_to_ns for ns in ns_timestamps]
+ else:
+ unit_to_ns = SEC_TO_NS // unit_to_sec
+ values = [ns // unit_to_ns for ns in ns_timestamps]
+
+ if value_filter:
+ values = filter(value_filter, values)
+
+ # remove duplicates and sort
+ return sorted(set(values))
+
+ # test rounding
+ ns_timestamps = self._rounding_values(use_float)
+ valid_values = convert_values(ns_timestamps)
+ for time_rnd, decimal_rnd in ROUNDING_MODES :
+ context = decimal.getcontext()
+ context.rounding = decimal_rnd
+
+ for value in valid_values:
+ debug_info = {'value': value, 'rounding': decimal_rnd}
+ try:
+ result = pytime_converter(value, time_rnd)
+ expected = expected_func(value)
+ except Exception as exc:
+ self.fail("Error on timestamp conversion: %s" % debug_info)
+ self.assertEqual(result,
+ expected,
+ debug_info)
+
+ # test overflow
+ ns = self.OVERFLOW_SECONDS * SEC_TO_NS
+ ns_timestamps = (-ns, ns)
+ overflow_values = convert_values(ns_timestamps)
+ for time_rnd, _ in ROUNDING_MODES :
+ for value in overflow_values:
+ debug_info = {'value': value, 'rounding': time_rnd}
+ with self.assertRaises(OverflowError, msg=debug_info):
+ pytime_converter(value, time_rnd)
+
+ def check_int_rounding(self, pytime_converter, expected_func,
+ unit_to_sec=1, value_filter=None):
+ self._check_rounding(pytime_converter, expected_func,
+ False, unit_to_sec, value_filter)
+
+ def check_float_rounding(self, pytime_converter, expected_func,
+ unit_to_sec=1, value_filter=None):
+ self._check_rounding(pytime_converter, expected_func,
+ True, unit_to_sec, value_filter)
+
+ def decimal_round(self, x):
+ d = decimal.Decimal(x)
+ d = d.quantize(1)
+ return int(d)
+
+
+class TestCPyTime(CPyTimeTestCase, unittest.TestCase):
+ """
+ Test the C _PyTime_t API.
+ """
+ # _PyTime_t is a 64-bit signed integer
+ OVERFLOW_SECONDS = math.ceil((2**63 + 1) / SEC_TO_NS)
+
def test_FromSeconds(self):
from _testcapi import PyTime_FromSeconds
- for seconds in (0, 3, -456, _testcapi.INT_MAX, _testcapi.INT_MIN):
- with self.subTest(seconds=seconds):
- self.assertEqual(PyTime_FromSeconds(seconds),
- seconds * SEC_TO_NS)
+
+ # PyTime_FromSeconds() expects a C int, reject values out of range
+ def c_int_filter(secs):
+ return (_testcapi.INT_MIN <= secs <= _testcapi.INT_MAX)
+
+ self.check_int_rounding(lambda secs, rnd: PyTime_FromSeconds(secs),
+ lambda secs: secs * SEC_TO_NS,
+ value_filter=c_int_filter)
def test_FromSecondsObject(self):
from _testcapi import PyTime_FromSecondsObject
- # Conversion giving the same result for all rounding methods
- for rnd in ALL_ROUNDING_METHODS:
- for obj, ts in (
- # integers
- (0, 0),
- (1, SEC_TO_NS),
- (-3, -3 * SEC_TO_NS),
-
- # float: subseconds
- (0.0, 0),
- (1e-9, 1),
- (1e-6, 10 ** 3),
- (1e-3, 10 ** 6),
-
- # float: seconds
- (2.0, 2 * SEC_TO_NS),
- (123.0, 123 * SEC_TO_NS),
- (-7.0, -7 * SEC_TO_NS),
-
- # nanosecond are kept for value <= 2^23 seconds
- (2**22 - 1e-9, 4194303999999999),
- (2**22, 4194304000000000),
- (2**22 + 1e-9, 4194304000000001),
- (2**23 - 1e-9, 8388607999999999),
- (2**23, 8388608000000000),
-
- # start losing precision for value > 2^23 seconds
- (2**23 + 1e-9, 8388608000000002),
-
- # nanoseconds are lost for value > 2^23 seconds
- (2**24 - 1e-9, 16777215999999998),
- (2**24, 16777216000000000),
- (2**24 + 1e-9, 16777216000000000),
- (2**25 - 1e-9, 33554432000000000),
- (2**25 , 33554432000000000),
- (2**25 + 1e-9, 33554432000000000),
-
- # close to 2^63 nanoseconds (_PyTime_t limit)
- (9223372036, 9223372036 * SEC_TO_NS),
- (9223372036.0, 9223372036 * SEC_TO_NS),
- (-9223372036, -9223372036 * SEC_TO_NS),
- (-9223372036.0, -9223372036 * SEC_TO_NS),
- ):
- with self.subTest(obj=obj, round=rnd, timestamp=ts):
- self.assertEqual(PyTime_FromSecondsObject(obj, rnd), ts)
-
- with self.subTest(round=rnd):
- with self.assertRaises(OverflowError):
- PyTime_FromSecondsObject(9223372037, rnd)
- PyTime_FromSecondsObject(9223372037.0, rnd)
- PyTime_FromSecondsObject(-9223372037, rnd)
- PyTime_FromSecondsObject(-9223372037.0, rnd)
-
- # Conversion giving different results depending on the rounding method
- FLOOR = _PyTime.ROUND_FLOOR
- CEILING = _PyTime.ROUND_CEILING
- for obj, ts, rnd in (
- # close to zero
- ( 1e-10, 0, FLOOR),
- ( 1e-10, 1, CEILING),
- (-1e-10, -1, FLOOR),
- (-1e-10, 0, CEILING),
-
- # test rounding of the last nanosecond
- ( 1.1234567899, 1123456789, FLOOR),
- ( 1.1234567899, 1123456790, CEILING),
- (-1.1234567899, -1123456790, FLOOR),
- (-1.1234567899, -1123456789, CEILING),
-
- # close to 1 second
- ( 0.9999999999, 999999999, FLOOR),
- ( 0.9999999999, 1000000000, CEILING),
- (-0.9999999999, -1000000000, FLOOR),
- (-0.9999999999, -999999999, CEILING),
- ):
- with self.subTest(obj=obj, round=rnd, timestamp=ts):
- self.assertEqual(PyTime_FromSecondsObject(obj, rnd), ts)
+ self.check_int_rounding(
+ PyTime_FromSecondsObject,
+ lambda secs: secs * SEC_TO_NS)
+
+ self.check_float_rounding(
+ PyTime_FromSecondsObject,
+ lambda ns: self.decimal_round(ns * SEC_TO_NS))
def test_AsSecondsDouble(self):
from _testcapi import PyTime_AsSecondsDouble
- for nanoseconds, seconds in (
- # near 1 nanosecond
- ( 0, 0.0),
- ( 1, 1e-9),
- (-1, -1e-9),
-
- # near 1 second
- (SEC_TO_NS + 1, 1.0 + 1e-9),
- (SEC_TO_NS, 1.0),
- (SEC_TO_NS - 1, 1.0 - 1e-9),
-
- # a few seconds
- (123 * SEC_TO_NS, 123.0),
- (-567 * SEC_TO_NS, -567.0),
-
- # nanosecond are kept for value <= 2^23 seconds
- (4194303999999999, 2**22 - 1e-9),
- (4194304000000000, 2**22),
- (4194304000000001, 2**22 + 1e-9),
-
- # start losing precision for value > 2^23 seconds
- (8388608000000002, 2**23 + 1e-9),
-
- # nanoseconds are lost for value > 2^23 seconds
- (16777215999999998, 2**24 - 1e-9),
- (16777215999999999, 2**24 - 1e-9),
- (16777216000000000, 2**24 ),
- (16777216000000001, 2**24 ),
- (16777216000000002, 2**24 + 2e-9),
-
- (33554432000000000, 2**25 ),
- (33554432000000002, 2**25 ),
- (33554432000000004, 2**25 + 4e-9),
-
- # close to 2^63 nanoseconds (_PyTime_t limit)
- (9223372036 * SEC_TO_NS, 9223372036.0),
- (-9223372036 * SEC_TO_NS, -9223372036.0),
- ):
- with self.subTest(nanoseconds=nanoseconds, seconds=seconds):
- self.assertEqual(PyTime_AsSecondsDouble(nanoseconds),
- seconds)
-
- def test_timeval(self):
+ def float_converter(ns):
+ if abs(ns) % SEC_TO_NS == 0:
+ return float(ns // SEC_TO_NS)
+ else:
+ return float(ns) / SEC_TO_NS
+
+ self.check_int_rounding(lambda ns, rnd: PyTime_AsSecondsDouble(ns),
+ float_converter,
+ NS_TO_SEC)
+
+ def create_decimal_converter(self, denominator):
+ denom = decimal.Decimal(denominator)
+
+ def converter(value):
+ d = decimal.Decimal(value) / denom
+ return self.decimal_round(d)
+
+ return converter
+
+ def test_AsTimeval(self):
from _testcapi import PyTime_AsTimeval
- for rnd in ALL_ROUNDING_METHODS:
- for ns, tv in (
- # microseconds
- (0, (0, 0)),
- (1000, (0, 1)),
- (-1000, (-1, 999999)),
-
- # seconds
- (2 * SEC_TO_NS, (2, 0)),
- (-3 * SEC_TO_NS, (-3, 0)),
- ):
- with self.subTest(nanoseconds=ns, timeval=tv, round=rnd):
- self.assertEqual(PyTime_AsTimeval(ns, rnd), tv)
-
- FLOOR = _PyTime.ROUND_FLOOR
- CEILING = _PyTime.ROUND_CEILING
- for ns, tv, rnd in (
- # nanoseconds
- (1, (0, 0), FLOOR),
- (1, (0, 1), CEILING),
- (-1, (-1, 999999), FLOOR),
- (-1, (0, 0), CEILING),
-
- # seconds + nanoseconds
- (1234567001, (1, 234567), FLOOR),
- (1234567001, (1, 234568), CEILING),
- (-1234567001, (-2, 765432), FLOOR),
- (-1234567001, (-2, 765433), CEILING),
- ):
- with self.subTest(nanoseconds=ns, timeval=tv, round=rnd):
- self.assertEqual(PyTime_AsTimeval(ns, rnd), tv)
+
+ us_converter = self.create_decimal_converter(US_TO_NS)
+
+ def timeval_converter(ns):
+ us = us_converter(ns)
+ return divmod(us, SEC_TO_US)
+
+ if sys.platform == 'win32':
+ from _testcapi import LONG_MIN, LONG_MAX
+
+ # On Windows, timeval.tv_sec type is a C long
+ def seconds_filter(secs):
+ return LONG_MIN <= secs <= LONG_MAX
+ else:
+ seconds_filter = self.time_t_filter
+
+ self.check_int_rounding(PyTime_AsTimeval,
+ timeval_converter,
+ NS_TO_SEC,
+ value_filter=seconds_filter)
@unittest.skipUnless(hasattr(_testcapi, 'PyTime_AsTimespec'),
'need _testcapi.PyTime_AsTimespec')
- def test_timespec(self):
+ def test_AsTimespec(self):
from _testcapi import PyTime_AsTimespec
- for ns, ts in (
- # nanoseconds
- (0, (0, 0)),
- (1, (0, 1)),
- (-1, (-1, 999999999)),
-
- # seconds
- (2 * SEC_TO_NS, (2, 0)),
- (-3 * SEC_TO_NS, (-3, 0)),
-
- # seconds + nanoseconds
- (1234567890, (1, 234567890)),
- (-1234567890, (-2, 765432110)),
- ):
- with self.subTest(nanoseconds=ns, timespec=ts):
- self.assertEqual(PyTime_AsTimespec(ns), ts)
-
- def test_milliseconds(self):
+
+ def timespec_converter(ns):
+ return divmod(ns, SEC_TO_NS)
+
+ self.check_int_rounding(lambda ns, rnd: PyTime_AsTimespec(ns),
+ timespec_converter,
+ NS_TO_SEC,
+ value_filter=self.time_t_filter)
+
+ def test_AsMilliseconds(self):
from _testcapi import PyTime_AsMilliseconds
- for rnd in ALL_ROUNDING_METHODS:
- for ns, tv in (
- # milliseconds
- (1 * MS_TO_NS, 1),
- (-2 * MS_TO_NS, -2),
-
- # seconds
- (2 * SEC_TO_NS, 2000),
- (-3 * SEC_TO_NS, -3000),
- ):
- with self.subTest(nanoseconds=ns, timeval=tv, round=rnd):
- self.assertEqual(PyTime_AsMilliseconds(ns, rnd), tv)
-
- FLOOR = _PyTime.ROUND_FLOOR
- CEILING = _PyTime.ROUND_CEILING
- for ns, ms, rnd in (
- # nanoseconds
- (1, 0, FLOOR),
- (1, 1, CEILING),
- (-1, -1, FLOOR),
- (-1, 0, CEILING),
-
- # seconds + nanoseconds
- (1234 * MS_TO_NS + 1, 1234, FLOOR),
- (1234 * MS_TO_NS + 1, 1235, CEILING),
- (-1234 * MS_TO_NS - 1, -1235, FLOOR),
- (-1234 * MS_TO_NS - 1, -1234, CEILING),
- ):
- with self.subTest(nanoseconds=ns, milliseconds=ms, round=rnd):
- self.assertEqual(PyTime_AsMilliseconds(ns, rnd), ms)
-
- def test_microseconds(self):
+
+ self.check_int_rounding(PyTime_AsMilliseconds,
+ self.create_decimal_converter(MS_TO_NS),
+ NS_TO_SEC)
+
+ def test_AsMicroseconds(self):
from _testcapi import PyTime_AsMicroseconds
- for rnd in ALL_ROUNDING_METHODS:
- for ns, tv in (
- # microseconds
- (1 * US_TO_NS, 1),
- (-2 * US_TO_NS, -2),
-
- # milliseconds
- (1 * MS_TO_NS, 1000),
- (-2 * MS_TO_NS, -2000),
-
- # seconds
- (2 * SEC_TO_NS, 2000000),
- (-3 * SEC_TO_NS, -3000000),
- ):
- with self.subTest(nanoseconds=ns, timeval=tv, round=rnd):
- self.assertEqual(PyTime_AsMicroseconds(ns, rnd), tv)
-
- FLOOR = _PyTime.ROUND_FLOOR
- CEILING = _PyTime.ROUND_CEILING
- for ns, ms, rnd in (
- # nanoseconds
- (1, 0, FLOOR),
- (1, 1, CEILING),
- (-1, -1, FLOOR),
- (-1, 0, CEILING),
-
- # seconds + nanoseconds
- (1234 * US_TO_NS + 1, 1234, FLOOR),
- (1234 * US_TO_NS + 1, 1235, CEILING),
- (-1234 * US_TO_NS - 1, -1235, FLOOR),
- (-1234 * US_TO_NS - 1, -1234, CEILING),
- ):
- with self.subTest(nanoseconds=ns, milliseconds=ms, round=rnd):
- self.assertEqual(PyTime_AsMicroseconds(ns, rnd), ms)
+
+ self.check_int_rounding(PyTime_AsMicroseconds,
+ self.create_decimal_converter(US_TO_NS),
+ NS_TO_SEC)
+
+
+class TestOldPyTime(CPyTimeTestCase, unittest.TestCase):
+ """
+ Test the old C _PyTime_t API: _PyTime_ObjectToXXX() functions.
+ """
+
+ # time_t is a 32-bit or 64-bit signed integer
+ OVERFLOW_SECONDS = 2 ** 64
+
+ def test_object_to_time_t(self):
+ from _testcapi import pytime_object_to_time_t
+
+ self.check_int_rounding(pytime_object_to_time_t,
+ lambda secs: secs,
+ value_filter=self.time_t_filter)
+
+ self.check_float_rounding(pytime_object_to_time_t,
+ self.decimal_round,
+ value_filter=self.time_t_filter)
+
+ def create_converter(self, sec_to_unit):
+ def converter(secs):
+ floatpart, intpart = math.modf(secs)
+ intpart = int(intpart)
+ floatpart *= sec_to_unit
+ floatpart = self.decimal_round(floatpart)
+ if floatpart < 0:
+ floatpart += sec_to_unit
+ intpart -= 1
+ elif floatpart >= sec_to_unit:
+ floatpart -= sec_to_unit
+ intpart += 1
+ return (intpart, floatpart)
+ return converter
+
+ def test_object_to_timeval(self):
+ from _testcapi import pytime_object_to_timeval
+
+ self.check_int_rounding(pytime_object_to_timeval,
+ lambda secs: (secs, 0),
+ value_filter=self.time_t_filter)
+
+ self.check_float_rounding(pytime_object_to_timeval,
+ self.create_converter(SEC_TO_US),
+ value_filter=self.time_t_filter)
+
+ def test_object_to_timespec(self):
+ from _testcapi import pytime_object_to_timespec
+
+ self.check_int_rounding(pytime_object_to_timespec,
+ lambda secs: (secs, 0),
+ value_filter=self.time_t_filter)
+
+ self.check_float_rounding(pytime_object_to_timespec,
+ self.create_converter(SEC_TO_NS),
+ value_filter=self.time_t_filter)
if __name__ == "__main__":
diff --git a/Lib/test/test_tokenize.py b/Lib/test/test_tokenize.py
index 3b17ca6329..90438e7d30 100644
--- a/Lib/test/test_tokenize.py
+++ b/Lib/test/test_tokenize.py
@@ -24,8 +24,7 @@ class TokenizeTest(TestCase):
if type == ENDMARKER:
break
type = tok_name[type]
- result.append(" %(type)-10.10s %(token)-13.13r %(start)s %(end)s" %
- locals())
+ result.append(f" {type:10} {token!r:13} {start} {end}")
self.assertEqual(result,
[" ENCODING 'utf-8' (0, 0) (0, 0)"] +
expected.rstrip().splitlines())
@@ -132,18 +131,18 @@ def k(x):
self.check_tokenize("x = 0xfffffffffff", """\
NAME 'x' (1, 0) (1, 1)
OP '=' (1, 2) (1, 3)
- NUMBER '0xffffffffff (1, 4) (1, 17)
+ NUMBER '0xfffffffffff' (1, 4) (1, 17)
""")
self.check_tokenize("x = 123141242151251616110", """\
NAME 'x' (1, 0) (1, 1)
OP '=' (1, 2) (1, 3)
- NUMBER '123141242151 (1, 4) (1, 25)
+ NUMBER '123141242151251616110' (1, 4) (1, 25)
""")
self.check_tokenize("x = -15921590215012591", """\
NAME 'x' (1, 0) (1, 1)
OP '=' (1, 2) (1, 3)
OP '-' (1, 4) (1, 5)
- NUMBER '159215902150 (1, 5) (1, 22)
+ NUMBER '15921590215012591' (1, 5) (1, 22)
""")
def test_float(self):
@@ -307,6 +306,50 @@ def k(x):
OP '+' (1, 28) (1, 29)
STRING 'RB"abc"' (1, 30) (1, 37)
""")
+ # Check 0, 1, and 2 character string prefixes.
+ self.check_tokenize(r'"a\
+de\
+fg"', """\
+ STRING '"a\\\\\\nde\\\\\\nfg"\' (1, 0) (3, 3)
+ """)
+ self.check_tokenize(r'u"a\
+de"', """\
+ STRING 'u"a\\\\\\nde"\' (1, 0) (2, 3)
+ """)
+ self.check_tokenize(r'rb"a\
+d"', """\
+ STRING 'rb"a\\\\\\nd"\' (1, 0) (2, 2)
+ """)
+ self.check_tokenize(r'"""a\
+b"""', """\
+ STRING '\"\""a\\\\\\nb\"\""' (1, 0) (2, 4)
+ """)
+ self.check_tokenize(r'u"""a\
+b"""', """\
+ STRING 'u\"\""a\\\\\\nb\"\""' (1, 0) (2, 4)
+ """)
+ self.check_tokenize(r'rb"""a\
+b\
+c"""', """\
+ STRING 'rb"\""a\\\\\\nb\\\\\\nc"\""' (1, 0) (3, 4)
+ """)
+ self.check_tokenize('f"abc"', """\
+ STRING 'f"abc"' (1, 0) (1, 6)
+ """)
+ self.check_tokenize('fR"a{b}c"', """\
+ STRING 'fR"a{b}c"' (1, 0) (1, 9)
+ """)
+ self.check_tokenize('f"""abc"""', """\
+ STRING 'f\"\"\"abc\"\"\"' (1, 0) (1, 10)
+ """)
+ self.check_tokenize(r'f"abc\
+def"', """\
+ STRING 'f"abc\\\\\\ndef"' (1, 0) (2, 4)
+ """)
+ self.check_tokenize(r'Rf"abc\
+def"', """\
+ STRING 'Rf"abc\\\\\\ndef"' (1, 0) (2, 4)
+ """)
def test_function(self):
self.check_tokenize("def d22(a, b, c=2, d=2, *k): pass", """\
@@ -505,7 +548,7 @@ def k(x):
# Methods
self.check_tokenize("@staticmethod\ndef foo(x,y): pass", """\
OP '@' (1, 0) (1, 1)
- NAME 'staticmethod (1, 1) (1, 13)
+ NAME 'staticmethod' (1, 1) (1, 13)
NEWLINE '\\n' (1, 13) (1, 14)
NAME 'def' (2, 0) (2, 3)
NAME 'foo' (2, 4) (2, 7)
diff --git a/Lib/test/test_tools/test_unparse.py b/Lib/test/test_tools/test_unparse.py
index 976a6c59ae..4b47916636 100644
--- a/Lib/test/test_tools/test_unparse.py
+++ b/Lib/test/test_tools/test_unparse.py
@@ -134,6 +134,15 @@ class ASTTestCase(unittest.TestCase):
class UnparseTestCase(ASTTestCase):
# Tests for specific bugs found in earlier versions of unparse
+ def test_fstrings(self):
+ # See issue 25180
+ self.check_roundtrip(r"""f'{f"{0}"*3}'""")
+ self.check_roundtrip(r"""f'{f"{y}"*3}'""")
+ self.check_roundtrip(r"""f'{f"{\'x\'}"*3}'""")
+
+ self.check_roundtrip(r'''f"{r'x' f'{\"s\"}'}"''')
+ self.check_roundtrip(r'''f"{r'x'rf'{\"s\"}'}"''')
+
def test_del_statement(self):
self.check_roundtrip("del x, y, z")
diff --git a/Lib/test/test_urlparse.py b/Lib/test/test_urlparse.py
index 0552f90594..fcf508259e 100644
--- a/Lib/test/test_urlparse.py
+++ b/Lib/test/test_urlparse.py
@@ -554,29 +554,27 @@ class UrlParseTestCase(unittest.TestCase):
self.assertEqual(p.port, 80)
self.assertEqual(p.geturl(), url)
- # Verify an illegal port is returned as None
+ # Verify an illegal port raises ValueError
url = b"HTTP://WWW.PYTHON.ORG:65536/doc/#frag"
p = urllib.parse.urlsplit(url)
- self.assertEqual(p.port, None)
+ with self.assertRaisesRegex(ValueError, "out of range"):
+ p.port
def test_attributes_bad_port(self):
- """Check handling of non-integer ports."""
- p = urllib.parse.urlsplit("http://www.example.net:foo")
- self.assertEqual(p.netloc, "www.example.net:foo")
- self.assertRaises(ValueError, lambda: p.port)
-
- p = urllib.parse.urlparse("http://www.example.net:foo")
- self.assertEqual(p.netloc, "www.example.net:foo")
- self.assertRaises(ValueError, lambda: p.port)
-
- # Once again, repeat ourselves to test bytes
- p = urllib.parse.urlsplit(b"http://www.example.net:foo")
- self.assertEqual(p.netloc, b"www.example.net:foo")
- self.assertRaises(ValueError, lambda: p.port)
-
- p = urllib.parse.urlparse(b"http://www.example.net:foo")
- self.assertEqual(p.netloc, b"www.example.net:foo")
- self.assertRaises(ValueError, lambda: p.port)
+ """Check handling of invalid ports."""
+ for bytes in (False, True):
+ for parse in (urllib.parse.urlsplit, urllib.parse.urlparse):
+ for port in ("foo", "1.5", "-1", "0x10"):
+ with self.subTest(bytes=bytes, parse=parse, port=port):
+ netloc = "www.example.net:" + port
+ url = "http://" + netloc
+ if bytes:
+ netloc = netloc.encode("ascii")
+ url = url.encode("ascii")
+ p = parse(url)
+ self.assertEqual(p.netloc, netloc)
+ with self.assertRaises(ValueError):
+ p.port
def test_attributes_without_netloc(self):
# This example is straight from RFC 3261. It looks like it
diff --git a/Lib/test/test_userdict.py b/Lib/test/test_userdict.py
index 12889438a9..b30418f465 100644
--- a/Lib/test/test_userdict.py
+++ b/Lib/test/test_userdict.py
@@ -29,7 +29,7 @@ class UserDictTest(mapping_tests.TestHashMappingProtocol):
self.assertEqual(collections.UserDict(one=1, two=2), d2)
# item sequence constructor
self.assertEqual(collections.UserDict([('one',1), ('two',2)]), d2)
- with self.assertWarnsRegex(PendingDeprecationWarning, "'dict'"):
+ with self.assertWarnsRegex(DeprecationWarning, "'dict'"):
self.assertEqual(collections.UserDict(dict=[('one',1), ('two',2)]), d2)
# both together
self.assertEqual(collections.UserDict([('one',1), ('two',2)], two=3, three=5), d3)
@@ -148,7 +148,7 @@ class UserDictTest(mapping_tests.TestHashMappingProtocol):
[('dict', 42)])
self.assertEqual(list(collections.UserDict({}, dict=None).items()),
[('dict', None)])
- with self.assertWarnsRegex(PendingDeprecationWarning, "'dict'"):
+ with self.assertWarnsRegex(DeprecationWarning, "'dict'"):
self.assertEqual(list(collections.UserDict(dict={'a': 42}).items()),
[('a', 42)])
self.assertRaises(TypeError, collections.UserDict, 42)
diff --git a/Lib/test/test_warnings/data/import_warning.py b/Lib/test/test_warnings/data/import_warning.py
index d6ea2ce104..32daec1140 100644
--- a/Lib/test/test_warnings/data/import_warning.py
+++ b/Lib/test/test_warnings/data/import_warning.py
@@ -1,3 +1,3 @@
import warnings
-warnings.warn('module-level warning', DeprecationWarning, stacklevel=2) \ No newline at end of file
+warnings.warn('module-level warning', DeprecationWarning, stacklevel=2)
diff --git a/Lib/test/test_zipimport.py b/Lib/test/test_zipimport.py
index a97a7784bd..4f1953515d 100644
--- a/Lib/test/test_zipimport.py
+++ b/Lib/test/test_zipimport.py
@@ -214,7 +214,8 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
packdir2 = packdir + TESTPACK2 + os.sep
files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
- packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
+ packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc),
+ "spam" + pyc_ext: (NOW, test_pyc)}
z = ZipFile(TEMP_ZIP, "w")
try:
@@ -228,6 +229,14 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
zi = zipimport.zipimporter(TEMP_ZIP)
self.assertEqual(zi.archive, TEMP_ZIP)
self.assertEqual(zi.is_package(TESTPACK), True)
+
+ find_mod = zi.find_module('spam')
+ self.assertIsNotNone(find_mod)
+ self.assertIsInstance(find_mod, zipimport.zipimporter)
+ self.assertFalse(find_mod.is_package('spam'))
+ load_mod = find_mod.load_module('spam')
+ self.assertEqual(find_mod.get_filename('spam'), load_mod.__file__)
+
mod = zi.load_module(TESTPACK)
self.assertEqual(zi.get_filename(TESTPACK), mod.__file__)
@@ -287,6 +296,16 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
self.assertEqual(
zi.is_package(TESTPACK2 + os.sep + TESTMOD), False)
+ pkg_path = TEMP_ZIP + os.sep + packdir + TESTPACK2
+ zi2 = zipimport.zipimporter(pkg_path)
+ find_mod_dotted = zi2.find_module(TESTMOD)
+ self.assertIsNotNone(find_mod_dotted)
+ self.assertIsInstance(find_mod_dotted, zipimport.zipimporter)
+ self.assertFalse(zi2.is_package(TESTMOD))
+ load_mod = find_mod_dotted.load_module(TESTMOD)
+ self.assertEqual(
+ find_mod_dotted.get_filename(TESTMOD), load_mod.__file__)
+
mod_path = TESTPACK2 + os.sep + TESTMOD
mod_name = module_path_to_dotted_name(mod_path)
__import__(mod_name)