diff options
Diffstat (limited to 'Lib/test')
84 files changed, 2322 insertions, 943 deletions
diff --git a/Lib/test/datetimetester.py b/Lib/test/datetimetester.py index 3f3c60acea..b8c6138e9c 100644 --- a/Lib/test/datetimetester.py +++ b/Lib/test/datetimetester.py @@ -2270,13 +2270,14 @@ class TestTime(HarmlessMixedComparison, unittest.TestCase): self.assertEqual(orig, derived) def test_bool(self): + # time is always True. cls = self.theclass self.assertTrue(cls(1)) self.assertTrue(cls(0, 1)) self.assertTrue(cls(0, 0, 1)) self.assertTrue(cls(0, 0, 0, 1)) - self.assertFalse(cls(0)) - self.assertFalse(cls()) + self.assertTrue(cls(0)) + self.assertTrue(cls()) def test_replace(self): cls = self.theclass @@ -2629,7 +2630,7 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase): self.assertEqual(derived.tzname(), 'cookie') def test_more_bool(self): - # Test cases with non-None tzinfo. + # time is always True. cls = self.theclass t = cls(0, tzinfo=FixedOffset(-300, "")) @@ -2639,23 +2640,11 @@ class TestTimeTZ(TestTime, TZInfoBase, unittest.TestCase): self.assertTrue(t) t = cls(5, tzinfo=FixedOffset(300, "")) - self.assertFalse(t) + self.assertTrue(t) t = cls(23, 59, tzinfo=FixedOffset(23*60 + 59, "")) - self.assertFalse(t) - - # Mostly ensuring this doesn't overflow internally. - t = cls(0, tzinfo=FixedOffset(23*60 + 59, "")) self.assertTrue(t) - # But this should yield a value error -- the utcoffset is bogus. - t = cls(0, tzinfo=FixedOffset(24*60, "")) - self.assertRaises(ValueError, lambda: bool(t)) - - # Likewise. - t = cls(0, tzinfo=FixedOffset(-24*60, "")) - self.assertRaises(ValueError, lambda: bool(t)) - def test_replace(self): cls = self.theclass z100 = FixedOffset(100, "+100") diff --git a/Lib/test/fork_wait.py b/Lib/test/fork_wait.py index 19b54ec736..8c7c3aac99 100644 --- a/Lib/test/fork_wait.py +++ b/Lib/test/fork_wait.py @@ -48,7 +48,12 @@ class ForkWait(unittest.TestCase): for i in range(NUM_THREADS): _thread.start_new(self.f, (i,)) - time.sleep(LONGSLEEP) + # busy-loop to wait for threads + deadline = time.monotonic() + 10.0 + while len(self.alive) < NUM_THREADS: + time.sleep(0.1) + if time.monotonic() <= deadline: + break a = sorted(self.alive.keys()) self.assertEqual(a, list(range(NUM_THREADS))) diff --git a/Lib/test/ssl_servers.py b/Lib/test/ssl_servers.py index 759b3f487e..f9d30cf0bd 100644 --- a/Lib/test/ssl_servers.py +++ b/Lib/test/ssl_servers.py @@ -150,7 +150,7 @@ class HTTPSServerThread(threading.Thread): def make_https_server(case, *, context=None, certfile=CERTFILE, host=HOST, handler_class=None): if context is None: - context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) # We assume the certfile contains both private key and certificate context.load_cert_chain(certfile) server = HTTPSServerThread(context, host, handler_class) @@ -182,6 +182,8 @@ if __name__ == "__main__": parser.add_argument('--curve-name', dest='curve_name', type=str, action='store', help='curve name for EC-based Diffie-Hellman') + parser.add_argument('--ciphers', dest='ciphers', type=str, + help='allowed cipher list') parser.add_argument('--dh', dest='dh_file', type=str, action='store', help='PEM file containing DH parameters') args = parser.parse_args() @@ -192,12 +194,14 @@ if __name__ == "__main__": else: handler_class = RootedHTTPRequestHandler handler_class.root = os.getcwd() - context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_cert_chain(CERTFILE) if args.curve_name: context.set_ecdh_curve(args.curve_name) if args.dh_file: context.load_dh_params(args.dh_file) + if args.ciphers: + context.set_ciphers(args.ciphers) server = HTTPSServer(("", args.port), handler_class, context) if args.verbose: diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py index 5ed01f2979..569bae1804 100644 --- a/Lib/test/string_tests.py +++ b/Lib/test/string_tests.py @@ -1178,8 +1178,7 @@ class MixinStrUnicodeUserStringTest: self.checkraises(TypeError, 'abc', '__mod__') self.checkraises(TypeError, '%(foo)s', '__mod__', 42) self.checkraises(TypeError, '%s%s', '__mod__', (42,)) - with self.assertWarns(DeprecationWarning): - self.checkraises(TypeError, '%c', '__mod__', (None,)) + self.checkraises(TypeError, '%c', '__mod__', (None,)) self.checkraises(ValueError, '%(foo', '__mod__', {}) self.checkraises(TypeError, '%(foo)s %(bar)s', '__mod__', ('foo', 42)) self.checkraises(TypeError, '%d', '__mod__', "42") # not numeric diff --git a/Lib/test/test_asdl_parser.py b/Lib/test/test_asdl_parser.py new file mode 100644 index 0000000000..7a6426a4d3 --- /dev/null +++ b/Lib/test/test_asdl_parser.py @@ -0,0 +1,122 @@ +"""Tests for the asdl parser in Parser/asdl.py""" + +import importlib.machinery +import os +from os.path import dirname +import sys +import sysconfig +import unittest + + +# This test is only relevant for from-source builds of Python. +if not sysconfig.is_python_build(): + raise unittest.SkipTest('test irrelevant for an installed Python') + +src_base = dirname(dirname(dirname(__file__))) +parser_dir = os.path.join(src_base, 'Parser') + + +class TestAsdlParser(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Loads the asdl module dynamically, since it's not in a real importable + # package. + # Parses Python.asdl into a ast.Module and run the check on it. + # There's no need to do this for each test method, hence setUpClass. + sys.path.insert(0, parser_dir) + loader = importlib.machinery.SourceFileLoader( + 'asdl', os.path.join(parser_dir, 'asdl.py')) + cls.asdl = loader.load_module() + cls.mod = cls.asdl.parse(os.path.join(parser_dir, 'Python.asdl')) + cls.assertTrue(cls.asdl.check(cls.mod), 'Module validation failed') + + @classmethod + def tearDownClass(cls): + del sys.path[0] + + def setUp(self): + # alias stuff from the class, for convenience + self.asdl = TestAsdlParser.asdl + self.mod = TestAsdlParser.mod + self.types = self.mod.types + + def test_module(self): + self.assertEqual(self.mod.name, 'Python') + self.assertIn('stmt', self.types) + self.assertIn('expr', self.types) + self.assertIn('mod', self.types) + + def test_definitions(self): + defs = self.mod.dfns + self.assertIsInstance(defs[0], self.asdl.Type) + self.assertIsInstance(defs[0].value, self.asdl.Sum) + + self.assertIsInstance(self.types['withitem'], self.asdl.Product) + self.assertIsInstance(self.types['alias'], self.asdl.Product) + + def test_product(self): + alias = self.types['alias'] + self.assertEqual( + str(alias), + 'Product([Field(identifier, name), Field(identifier, asname, opt=True)])') + + def test_attributes(self): + stmt = self.types['stmt'] + self.assertEqual(len(stmt.attributes), 2) + self.assertEqual(str(stmt.attributes[0]), 'Field(int, lineno)') + self.assertEqual(str(stmt.attributes[1]), 'Field(int, col_offset)') + + def test_constructor_fields(self): + ehandler = self.types['excepthandler'] + self.assertEqual(len(ehandler.types), 1) + self.assertEqual(len(ehandler.attributes), 2) + + cons = ehandler.types[0] + self.assertIsInstance(cons, self.asdl.Constructor) + self.assertEqual(len(cons.fields), 3) + + f0 = cons.fields[0] + self.assertEqual(f0.type, 'expr') + self.assertEqual(f0.name, 'type') + self.assertTrue(f0.opt) + + f1 = cons.fields[1] + self.assertEqual(f1.type, 'identifier') + self.assertEqual(f1.name, 'name') + self.assertTrue(f1.opt) + + f2 = cons.fields[2] + self.assertEqual(f2.type, 'stmt') + self.assertEqual(f2.name, 'body') + self.assertFalse(f2.opt) + self.assertTrue(f2.seq) + + def test_visitor(self): + class CustomVisitor(self.asdl.VisitorBase): + def __init__(self): + super().__init__() + self.names_with_seq = [] + + def visitModule(self, mod): + for dfn in mod.dfns: + self.visit(dfn) + + def visitType(self, type): + self.visit(type.value) + + def visitSum(self, sum): + for t in sum.types: + self.visit(t) + + def visitConstructor(self, cons): + for f in cons.fields: + if f.seq: + self.names_with_seq.append(cons.name) + + v = CustomVisitor() + v.visit(self.types['mod']) + self.assertEqual(v.names_with_seq, ['Module', 'Interactive', 'Suite']) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 084d247295..b04aa1d8be 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -316,23 +316,6 @@ class DispatcherTests(unittest.TestCase): 'warning: unhandled connect event'] self.assertEqual(lines, expected) - def test_issue_8594(self): - # XXX - this test is supposed to be removed in next major Python - # version - d = asyncore.dispatcher(socket.socket()) - # make sure the error message no longer refers to the socket - # object but the dispatcher instance instead - self.assertRaisesRegex(AttributeError, 'dispatcher instance', - getattr, d, 'foo') - # cheap inheritance with the underlying socket is supposed - # to still work but a DeprecationWarning is expected - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - family = d.family - self.assertEqual(family, socket.AF_INET) - self.assertEqual(len(w), 1) - self.assertTrue(issubclass(w[0].category, DeprecationWarning)) - def test_strerror(self): # refers to bug #8573 err = asyncore._strerror(errno.EPERM) @@ -349,9 +332,8 @@ class dispatcherwithsend_noread(asyncore.dispatcher_with_send): def handle_connect(self): pass -class DispatcherWithSendTests(unittest.TestCase): - usepoll = False +class DispatcherWithSendTests(unittest.TestCase): def setUp(self): pass @@ -401,10 +383,6 @@ class DispatcherWithSendTests(unittest.TestCase): self.fail("join() timed out") - -class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests): - usepoll = True - @unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), 'asyncore.file_wrapper required') class FileWrapperTest(unittest.TestCase): diff --git a/Lib/test/test_augassign.py b/Lib/test/test_augassign.py index 9a59c58ec0..19b76874be 100644 --- a/Lib/test/test_augassign.py +++ b/Lib/test/test_augassign.py @@ -136,6 +136,14 @@ class AugAssignTest(unittest.TestCase): output.append("__imul__ called") return self + def __matmul__(self, val): + output.append("__matmul__ called") + def __rmatmul__(self, val): + output.append("__rmatmul__ called") + def __imatmul__(self, val): + output.append("__imatmul__ called") + return self + def __div__(self, val): output.append("__div__ called") def __rdiv__(self, val): @@ -233,6 +241,10 @@ class AugAssignTest(unittest.TestCase): 1 * x x *= 1 + x @ 1 + 1 @ x + x @= 1 + x / 1 1 / x x /= 1 @@ -279,6 +291,9 @@ __isub__ called __mul__ called __rmul__ called __imul__ called +__matmul__ called +__rmatmul__ called +__imatmul__ called __truediv__ called __rtruediv__ called __itruediv__ called diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py index b561a6f73a..018ac8d921 100644 --- a/Lib/test/test_builtin.py +++ b/Lib/test/test_builtin.py @@ -1092,7 +1092,7 @@ class BuiltinTest(unittest.TestCase): self.assertAlmostEqual(pow(-1, 0.5), 1j) self.assertAlmostEqual(pow(-1, 1/3), 0.5 + 0.8660254037844386j) - self.assertRaises(TypeError, pow, -1, -2, 3) + self.assertRaises(ValueError, pow, -1, -2, 3) self.assertRaises(ValueError, pow, 1, 2, 0) self.assertRaises(TypeError, pow) diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py index ba7c38db27..ba7f2c4c6d 100644 --- a/Lib/test/test_capi.py +++ b/Lib/test/test_capi.py @@ -150,6 +150,23 @@ class CAPITest(unittest.TestCase): self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, "($module, /, parameter)") + def test_c_type_with_matrix_multiplication(self): + M = _testcapi.matmulType + m1 = M() + m2 = M() + self.assertEqual(m1 @ m2, ("matmul", m1, m2)) + self.assertEqual(m1 @ 42, ("matmul", m1, 42)) + self.assertEqual(42 @ m1, ("matmul", 42, m1)) + o = m1 + o @= m2 + self.assertEqual(o, ("imatmul", m1, m2)) + o = m1 + o @= 42 + self.assertEqual(o, ("imatmul", m1, 42)) + o = 42 + o @= m1 + self.assertEqual(o, ("matmul", 42, m1)) + @unittest.skipUnless(threading, 'Threading required for this test.') class TestPendingCalls(unittest.TestCase): @@ -319,34 +336,38 @@ class EmbeddingTests(unittest.TestCase): print() print(out) print(err) + expected_errors = sys.__stdout__.errors expected_stdin_encoding = sys.__stdin__.encoding expected_pipe_encoding = self._get_default_pipe_encoding() expected_output = os.linesep.join([ "--- Use defaults ---", "Expected encoding: default", "Expected errors: default", - "stdin: {0}:strict", - "stdout: {1}:strict", - "stderr: {1}:backslashreplace", + "stdin: {in_encoding}:{errors}", + "stdout: {out_encoding}:{errors}", + "stderr: {out_encoding}:backslashreplace", "--- Set errors only ---", "Expected encoding: default", - "Expected errors: surrogateescape", - "stdin: {0}:surrogateescape", - "stdout: {1}:surrogateescape", - "stderr: {1}:backslashreplace", + "Expected errors: ignore", + "stdin: {in_encoding}:ignore", + "stdout: {out_encoding}:ignore", + "stderr: {out_encoding}:backslashreplace", "--- Set encoding only ---", "Expected encoding: latin-1", "Expected errors: default", - "stdin: latin-1:strict", - "stdout: latin-1:strict", + "stdin: latin-1:{errors}", + "stdout: latin-1:{errors}", "stderr: latin-1:backslashreplace", "--- Set encoding and errors ---", "Expected encoding: latin-1", - "Expected errors: surrogateescape", - "stdin: latin-1:surrogateescape", - "stdout: latin-1:surrogateescape", - "stderr: latin-1:backslashreplace"]).format(expected_stdin_encoding, - expected_pipe_encoding) + "Expected errors: replace", + "stdin: latin-1:replace", + "stdout: latin-1:replace", + "stderr: latin-1:backslashreplace"]) + expected_output = expected_output.format( + in_encoding=expected_stdin_encoding, + out_encoding=expected_pipe_encoding, + errors=expected_errors) # This is useful if we ever trip over odd platform behaviour self.maxDiff = None self.assertEqual(out.strip(), expected_output) diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py index 84804bb0da..a1ce9cf78a 100644 --- a/Lib/test/test_codeccallbacks.py +++ b/Lib/test/test_codeccallbacks.py @@ -819,7 +819,7 @@ class CodecCallbackTest(unittest.TestCase): def __getitem__(self, key): raise ValueError #self.assertRaises(ValueError, "\xff".translate, D()) - self.assertRaises(TypeError, "\xff".translate, {0xff: sys.maxunicode+1}) + self.assertRaises(ValueError, "\xff".translate, {0xff: sys.maxunicode+1}) self.assertRaises(TypeError, "\xff".translate, {0xff: ()}) def test_bug828737(self): diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 9b62d5b12f..f59c37d120 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -890,10 +890,6 @@ class CP65001Test(ReadTest, unittest.TestCase): "\U00010fff\uD800") self.assertTrue(codecs.lookup_error("surrogatepass")) - def test_readline(self): - self.skipTest("issue #20571: code page 65001 codec does not " - "support partial decoder yet") - class UTF7Test(ReadTest, unittest.TestCase): encoding = "utf-7" @@ -1600,6 +1596,12 @@ class CodecsModuleTest(unittest.TestCase): self.assertEqual(codecs.decode(b'abc'), 'abc') self.assertRaises(UnicodeDecodeError, codecs.decode, b'\xff', 'ascii') + # test keywords + self.assertEqual(codecs.decode(obj=b'\xe4\xf6\xfc', encoding='latin-1'), + '\xe4\xf6\xfc') + self.assertEqual(codecs.decode(b'[\xff]', 'ascii', errors='ignore'), + '[]') + def test_encode(self): self.assertEqual(codecs.encode('\xe4\xf6\xfc', 'latin-1'), b'\xe4\xf6\xfc') @@ -1608,6 +1610,12 @@ class CodecsModuleTest(unittest.TestCase): self.assertEqual(codecs.encode('abc'), b'abc') self.assertRaises(UnicodeEncodeError, codecs.encode, '\xffff', 'ascii') + # test keywords + self.assertEqual(codecs.encode(obj='\xe4\xf6\xfc', encoding='latin-1'), + b'\xe4\xf6\xfc') + self.assertEqual(codecs.encode('[\xff]', 'ascii', errors='ignore'), + b'[]') + def test_register(self): self.assertRaises(TypeError, codecs.register) self.assertRaises(TypeError, codecs.register, 42) @@ -2750,15 +2758,15 @@ class CodePageTest(unittest.TestCase): self.assertRaisesRegex(UnicodeEncodeError, 'cp932', codecs.code_page_encode, 932, '\xff') self.assertRaisesRegex(UnicodeDecodeError, 'cp932', - codecs.code_page_decode, 932, b'\x81\x00') + codecs.code_page_decode, 932, b'\x81\x00', 'strict', True) self.assertRaisesRegex(UnicodeDecodeError, 'CP_UTF8', - codecs.code_page_decode, self.CP_UTF8, b'\xff') + codecs.code_page_decode, self.CP_UTF8, b'\xff', 'strict', True) def check_decode(self, cp, tests): for raw, errors, expected in tests: if expected is not None: try: - decoded = codecs.code_page_decode(cp, raw, errors) + decoded = codecs.code_page_decode(cp, raw, errors, True) except UnicodeDecodeError as err: self.fail('Unable to decode %a from "cp%s" with ' 'errors=%r: %s' % (raw, cp, errors, err)) @@ -2770,7 +2778,7 @@ class CodePageTest(unittest.TestCase): self.assertLessEqual(decoded[1], len(raw)) else: self.assertRaises(UnicodeDecodeError, - codecs.code_page_decode, cp, raw, errors) + codecs.code_page_decode, cp, raw, errors, True) def check_encode(self, cp, tests): for text, errors, expected in tests: @@ -2799,6 +2807,9 @@ class CodePageTest(unittest.TestCase): ('[\u20ac]', 'replace', b'[?]'), ('[\xff]', 'backslashreplace', b'[\\xff]'), ('[\xff]', 'xmlcharrefreplace', b'[ÿ]'), + ('\udcff', 'strict', None), + ('[\udcff]', 'surrogateescape', b'[\xff]'), + ('[\udcff]', 'surrogatepass', None), )) self.check_decode(932, ( (b'abc', 'strict', 'abc'), @@ -2808,6 +2819,7 @@ class CodePageTest(unittest.TestCase): (b'[\xff]', 'ignore', '[]'), (b'[\xff]', 'replace', '[\ufffd]'), (b'[\xff]', 'surrogateescape', '[\udcff]'), + (b'[\xff]', 'surrogatepass', None), (b'\x81\x00abc', 'strict', None), (b'\x81\x00abc', 'ignore', '\x00abc'), (b'\x81\x00abc', 'replace', '\ufffd\x00abc'), @@ -2818,9 +2830,12 @@ class CodePageTest(unittest.TestCase): ('abc', 'strict', b'abc'), ('\xe9\u20ac', 'strict', b'\xe9\x80'), ('\xff', 'strict', b'\xff'), + # test error handlers ('\u0141', 'strict', None), ('\u0141', 'ignore', b''), ('\u0141', 'replace', b'L'), + ('\udc98', 'surrogateescape', b'\x98'), + ('\udc98', 'surrogatepass', None), )) self.check_decode(1252, ( (b'abc', 'strict', 'abc'), diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py index ee28a6c0b3..7e149808d4 100644 --- a/Lib/test/test_collections.py +++ b/Lib/test/test_collections.py @@ -1187,6 +1187,21 @@ class TestOrderedDict(unittest.TestCase): self.assertEqual(list(od.items()), pairs) self.assertEqual(list(reversed(od)), [t[0] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.keys())), + [t[0] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.values())), + [t[1] for t in reversed(pairs)]) + self.assertEqual(list(reversed(od.items())), list(reversed(pairs))) + + def test_detect_deletion_during_iteration(self): + od = OrderedDict.fromkeys('abc') + it = iter(od) + key = next(it) + del od[key] + with self.assertRaises(Exception): + # Note, the exact exception raised is not guaranteed + # The only guarantee that the next() will not succeed + next(it) def test_popitem(self): pairs = [('c', 1), ('b', 2), ('a', 3), ('d', 4), ('e', 5), ('f', 6)] diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index c74b2ca6ed..55254b5c5e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -425,6 +425,13 @@ class ExecutorTest: self.assertTrue(collected, "Stale reference not collected within timeout.") + def test_max_workers_negative(self): + for number in (0, -1): + with self.assertRaisesRegexp(ValueError, + "max_workers must be greater " + "than 0"): + self.executor_type(max_workers=number) + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase): def test_map_submits_without_iteration(self): diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py index 7e2485f954..7c31ac71f6 100644 --- a/Lib/test/test_csv.py +++ b/Lib/test/test_csv.py @@ -575,6 +575,16 @@ class TestDictFields(unittest.TestCase): fileobj.readline() # header self.assertEqual(fileobj.read(), "10,,abc\r\n") + def test_write_multiple_dict_rows(self): + fileobj = StringIO() + writer = csv.DictWriter(fileobj, fieldnames=["f1", "f2", "f3"]) + writer.writeheader() + self.assertEqual(fileobj.getvalue(), "f1,f2,f3\r\n") + writer.writerows([{"f1": 1, "f2": "abc", "f3": "f"}, + {"f1": 2, "f2": 5, "f3": "xyz"}]) + self.assertEqual(fileobj.getvalue(), + "f1,f2,f3\r\n1,abc,f\r\n2,5,xyz\r\n") + def test_write_no_fields(self): fileobj = StringIO() self.assertRaises(TypeError, csv.DictWriter, fileobj) diff --git a/Lib/test/test_decimal.py b/Lib/test/test_decimal.py index 4b279071df..8072899711 100644 --- a/Lib/test/test_decimal.py +++ b/Lib/test/test_decimal.py @@ -33,12 +33,13 @@ import unittest import numbers import locale from test.support import (run_unittest, run_doctest, is_resource_enabled, - requires_IEEE_754) + requires_IEEE_754, requires_docstrings) from test.support import (check_warnings, import_fresh_module, TestFailed, run_with_locale, cpython_only) import random import time import warnings +import inspect try: import threading except ImportError: @@ -4448,18 +4449,6 @@ class PyCoverage(Coverage): class PyFunctionality(unittest.TestCase): """Extra functionality in decimal.py""" - def test_py_quantize_watchexp(self): - # watchexp functionality - Decimal = P.Decimal - localcontext = P.localcontext - - with localcontext() as c: - c.prec = 1 - c.Emax = 1 - c.Emin = -1 - x = Decimal(99999).quantize(Decimal("1e3"), watchexp=False) - self.assertEqual(x, Decimal('1.00E+5')) - def test_py_alternate_formatting(self): # triples giving a format, a Decimal, and the expected result Decimal = P.Decimal @@ -5402,6 +5391,143 @@ class CWhitebox(unittest.TestCase): y = Decimal(10**(9*25)).__sizeof__() self.assertEqual(y, x+4) +@requires_docstrings +@unittest.skipUnless(C, "test requires C version") +class SignatureTest(unittest.TestCase): + """Function signatures""" + + def test_inspect_module(self): + for attr in dir(P): + if attr.startswith('_'): + continue + p_func = getattr(P, attr) + c_func = getattr(C, attr) + if (attr == 'Decimal' or attr == 'Context' or + inspect.isfunction(p_func)): + p_sig = inspect.signature(p_func) + c_sig = inspect.signature(c_func) + + # parameter names: + c_names = list(c_sig.parameters.keys()) + p_names = [x for x in p_sig.parameters.keys() if not + x.startswith('_')] + + self.assertEqual(c_names, p_names, + msg="parameter name mismatch in %s" % p_func) + + c_kind = [x.kind for x in c_sig.parameters.values()] + p_kind = [x[1].kind for x in p_sig.parameters.items() if not + x[0].startswith('_')] + + # parameters: + if attr != 'setcontext': + self.assertEqual(c_kind, p_kind, + msg="parameter kind mismatch in %s" % p_func) + + def test_inspect_types(self): + + POS = inspect._ParameterKind.POSITIONAL_ONLY + POS_KWD = inspect._ParameterKind.POSITIONAL_OR_KEYWORD + + # Type heuristic (type annotations would help!): + pdict = {C: {'other': C.Decimal(1), + 'third': C.Decimal(1), + 'x': C.Decimal(1), + 'y': C.Decimal(1), + 'z': C.Decimal(1), + 'a': C.Decimal(1), + 'b': C.Decimal(1), + 'c': C.Decimal(1), + 'exp': C.Decimal(1), + 'modulo': C.Decimal(1), + 'num': "1", + 'f': 1.0, + 'rounding': C.ROUND_HALF_UP, + 'context': C.getcontext()}, + P: {'other': P.Decimal(1), + 'third': P.Decimal(1), + 'a': P.Decimal(1), + 'b': P.Decimal(1), + 'c': P.Decimal(1), + 'exp': P.Decimal(1), + 'modulo': P.Decimal(1), + 'num': "1", + 'f': 1.0, + 'rounding': P.ROUND_HALF_UP, + 'context': P.getcontext()}} + + def mkargs(module, sig): + args = [] + kwargs = {} + for name, param in sig.parameters.items(): + if name == 'self': continue + if param.kind == POS: + args.append(pdict[module][name]) + elif param.kind == POS_KWD: + kwargs[name] = pdict[module][name] + else: + raise TestFailed("unexpected parameter kind") + return args, kwargs + + def tr(s): + """The C Context docstrings use 'x' in order to prevent confusion + with the article 'a' in the descriptions.""" + if s == 'x': return 'a' + if s == 'y': return 'b' + if s == 'z': return 'c' + return s + + def doit(ty): + p_type = getattr(P, ty) + c_type = getattr(C, ty) + for attr in dir(p_type): + if attr.startswith('_'): + continue + p_func = getattr(p_type, attr) + c_func = getattr(c_type, attr) + if inspect.isfunction(p_func): + p_sig = inspect.signature(p_func) + c_sig = inspect.signature(c_func) + + # parameter names: + p_names = list(p_sig.parameters.keys()) + c_names = [tr(x) for x in c_sig.parameters.keys()] + + self.assertEqual(c_names, p_names, + msg="parameter name mismatch in %s" % p_func) + + p_kind = [x.kind for x in p_sig.parameters.values()] + c_kind = [x.kind for x in c_sig.parameters.values()] + + # 'self' parameter: + self.assertIs(p_kind[0], POS_KWD) + self.assertIs(c_kind[0], POS) + + # remaining parameters: + if ty == 'Decimal': + self.assertEqual(c_kind[1:], p_kind[1:], + msg="parameter kind mismatch in %s" % p_func) + else: # Context methods are positional only in the C version. + self.assertEqual(len(c_kind), len(p_kind), + msg="parameter kind mismatch in %s" % p_func) + + # Run the function: + args, kwds = mkargs(C, c_sig) + try: + getattr(c_type(9), attr)(*args, **kwds) + except Exception as err: + raise TestFailed("invalid signature for %s: %s %s" % (c_func, args, kwds)) + + args, kwds = mkargs(P, p_sig) + try: + getattr(p_type(9), attr)(*args, **kwds) + except Exception as err: + raise TestFailed("invalid signature for %s: %s %s" % (p_func, args, kwds)) + + doit('Decimal') + doit('Context') + + all_tests = [ CExplicitConstructionTest, PyExplicitConstructionTest, CImplicitConstructionTest, PyImplicitConstructionTest, @@ -5427,6 +5553,7 @@ if not C: all_tests = all_tests[1::2] else: all_tests.insert(0, CheckAttributes) + all_tests.insert(1, SignatureTest) def test_main(arith=False, verbose=None, todo_tests=None, debug=None): diff --git a/Lib/test/test_descr.py b/Lib/test/test_descr.py index 8bb7d6a474..e65edb2f8c 100644 --- a/Lib/test/test_descr.py +++ b/Lib/test/test_descr.py @@ -4160,6 +4160,7 @@ order (MRO) for bases """ ('__add__', 'x + y', 'x += y'), ('__sub__', 'x - y', 'x -= y'), ('__mul__', 'x * y', 'x *= y'), + ('__matmul__', 'x @ y', 'x @= y'), ('__truediv__', 'operator.truediv(x, y)', None), ('__floordiv__', 'operator.floordiv(x, y)', None), ('__div__', 'x / y', 'x /= y'), diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py index 56193e87b1..c62e7ca0f7 100644 --- a/Lib/test/test_doctest.py +++ b/Lib/test/test_doctest.py @@ -2096,22 +2096,9 @@ def test_DocTestSuite(): >>> suite.run(unittest.TestResult()) <unittest.result.TestResult run=0 errors=0 failures=0> - However, if DocTestSuite finds no docstrings, it raises an error: + The module need not contain any docstrings either: - >>> try: - ... doctest.DocTestSuite('test.sample_doctest_no_docstrings') - ... except ValueError as e: - ... error = e - - >>> print(error.args[1]) - has no docstrings - - You can prevent this error by passing a DocTestFinder instance with - the `exclude_empty` keyword argument set to False: - - >>> finder = doctest.DocTestFinder(exclude_empty=False) - >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings', - ... test_finder=finder) + >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings') >>> suite.run(unittest.TestResult()) <unittest.result.TestResult run=0 errors=0 failures=0> @@ -2121,6 +2108,22 @@ def test_DocTestSuite(): >>> suite.run(unittest.TestResult()) <unittest.result.TestResult run=9 errors=0 failures=4> + We can also provide a DocTestFinder: + + >>> finder = doctest.DocTestFinder() + >>> suite = doctest.DocTestSuite('test.sample_doctest', + ... test_finder=finder) + >>> suite.run(unittest.TestResult()) + <unittest.result.TestResult run=9 errors=0 failures=4> + + The DocTestFinder need not return any tests: + + >>> finder = doctest.DocTestFinder() + >>> suite = doctest.DocTestSuite('test.sample_doctest_no_docstrings', + ... test_finder=finder) + >>> suite.run(unittest.TestResult()) + <unittest.result.TestResult run=0 errors=0 failures=0> + We can supply global variables. If we pass globs, they will be used instead of the module globals. Here we'll pass an empty globals, triggering an extra error: @@ -2168,7 +2171,7 @@ def test_DocTestSuite(): >>> test.test_doctest.sillySetup Traceback (most recent call last): ... - AttributeError: 'module' object has no attribute 'sillySetup' + AttributeError: module 'test.test_doctest' has no attribute 'sillySetup' The setUp and tearDown funtions are passed test objects. Here we'll use the setUp function to supply the missing variable y: @@ -2314,7 +2317,7 @@ def test_DocFileSuite(): >>> test.test_doctest.sillySetup Traceback (most recent call last): ... - AttributeError: 'module' object has no attribute 'sillySetup' + AttributeError: module 'test.test_doctest' has no attribute 'sillySetup' The setUp and tearDown funtions are passed test objects. Here, we'll use a setUp function to set the favorite color in @@ -2897,7 +2900,7 @@ Invalid doctest option: def test_main(): # Check the doctest cases in doctest itself: - support.run_doctest(doctest, verbosity=True) + ret = support.run_doctest(doctest, verbosity=True) # Check the doctest cases defined here: from test import test_doctest support.run_doctest(test_doctest, verbosity=True) diff --git a/Lib/test/test_docxmlrpc.py b/Lib/test/test_docxmlrpc.py index cb6366c7d5..eb975161af 100644 --- a/Lib/test/test_docxmlrpc.py +++ b/Lib/test/test_docxmlrpc.py @@ -87,10 +87,11 @@ class DocXMLRPCHTTPGETServer(unittest.TestCase): threading.Thread(target=server, args=(self.evt, 1)).start() # wait for port to be assigned - n = 1000 - while n > 0 and PORT is None: - time.sleep(0.001) - n -= 1 + deadline = time.monotonic() + 10.0 + while PORT is None: + time.sleep(0.010) + if time.monotonic() > deadline: + break self.client = http.client.HTTPConnection("localhost:%d" % PORT) diff --git a/Lib/test/test_fork1.py b/Lib/test/test_fork1.py index e0626dffdc..8bcbd465cc 100644 --- a/Lib/test/test_fork1.py +++ b/Lib/test/test_fork1.py @@ -18,13 +18,14 @@ get_attribute(os, 'fork') class ForkTest(ForkWait): def wait_impl(self, cpid): - for i in range(10): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # waitpid() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status = os.waitpid(cpid, os.WNOHANG) if spid == cpid: break - time.sleep(1.0) + time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8)) diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py index fc71e4818e..631bf35ad7 100644 --- a/Lib/test/test_format.py +++ b/Lib/test/test_format.py @@ -142,8 +142,6 @@ class FormatTest(unittest.TestCase): testformat("%#+027.23X", big, "+0X0001234567890ABCDEF12345") # same, except no 0 flag testformat("%#+27.23X", big, " +0X001234567890ABCDEF12345") - with self.assertWarns(DeprecationWarning): - testformat("%x", float(big), "123456_______________", 6) big = 0o12345670123456701234567012345670 # 32 octal digits testformat("%o", big, "12345670123456701234567012345670") testformat("%o", -big, "-12345670123456701234567012345670") @@ -183,8 +181,6 @@ class FormatTest(unittest.TestCase): testformat("%034.33o", big, "0012345670123456701234567012345670") # base marker shouldn't change that testformat("%0#34.33o", big, "0o012345670123456701234567012345670") - with self.assertWarns(DeprecationWarning): - testformat("%o", float(big), "123456__________________________", 6) # Some small ints, in both Python int and flavors). testformat("%d", 42, "42") testformat("%d", -42, "-42") @@ -195,8 +191,6 @@ class FormatTest(unittest.TestCase): testformat("%#x", 1, "0x1") testformat("%#X", 1, "0X1") testformat("%#X", 1, "0X1") - with self.assertWarns(DeprecationWarning): - testformat("%#x", 1.0, "0x1") testformat("%#o", 1, "0o1") testformat("%#o", 1, "0o1") testformat("%#o", 0, "0o0") @@ -213,14 +207,10 @@ class FormatTest(unittest.TestCase): testformat("%x", -0x42, "-42") testformat("%x", 0x42, "42") testformat("%x", -0x42, "-42") - with self.assertWarns(DeprecationWarning): - testformat("%x", float(0x42), "42") testformat("%o", 0o42, "42") testformat("%o", -0o42, "-42") testformat("%o", 0o42, "42") testformat("%o", -0o42, "-42") - with self.assertWarns(DeprecationWarning): - testformat("%o", float(0o42), "42") testformat("%r", "\u0378", "'\\u0378'") # non printable testformat("%a", "\u0378", "'\\u0378'") # non printable testformat("%r", "\u0374", "'\u0374'") # printable diff --git a/Lib/test/test_fractions.py b/Lib/test/test_fractions.py index 33365322b9..e86d5cee6e 100644 --- a/Lib/test/test_fractions.py +++ b/Lib/test/test_fractions.py @@ -330,7 +330,6 @@ class FractionTest(unittest.TestCase): self.assertTypedEquals(F(-2, 10), round(F(-15, 100), 1)) self.assertTypedEquals(F(-2, 10), round(F(-25, 100), 1)) - def testArithmetic(self): self.assertEqual(F(1, 2), F(1, 10) + F(2, 5)) self.assertEqual(F(-3, 10), F(1, 10) - F(2, 5)) @@ -402,6 +401,8 @@ class FractionTest(unittest.TestCase): self.assertTypedEquals(2.0 , 4 ** F(1, 2)) self.assertTypedEquals(0.25, 2.0 ** F(-2, 1)) self.assertTypedEquals(1.0 + 0j, (1.0 + 0j) ** F(1, 10)) + self.assertRaises(ZeroDivisionError, operator.pow, + F(0, 1), -2) def testMixingWithDecimal(self): # Decimal refuses mixed arithmetic (but not mixed comparisons) diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py index bba8820ab1..a7bad2d2d9 100644 --- a/Lib/test/test_grammar.py +++ b/Lib/test/test_grammar.py @@ -985,6 +985,20 @@ class GrammarTests(unittest.TestCase): self.assertFalse((False is 2) is 3) self.assertFalse(False is 2 is 3) + def test_matrix_mul(self): + # This is not intended to be a comprehensive test, rather just to be few + # samples of the @ operator in test_grammar.py. + class M: + def __matmul__(self, o): + return 4 + def __imatmul__(self, o): + self.other = o + return self + m = M() + self.assertEqual(m @ m, 4) + m @= 42 + self.assertEqual(m.other, 42) + def test_main(): run_unittest(TokenTests, GrammarTests) diff --git a/Lib/test/test_heapq.py b/Lib/test/test_heapq.py index b5a2fd803a..59c70297e3 100644 --- a/Lib/test/test_heapq.py +++ b/Lib/test/test_heapq.py @@ -13,7 +13,7 @@ c_heapq = support.import_fresh_module('heapq', fresh=['_heapq']) # _heapq.nlargest/nsmallest are saved in heapq._nlargest/_smallest when # _heapq is imported, so check them there func_names = ['heapify', 'heappop', 'heappush', 'heappushpop', - 'heapreplace', '_nlargest', '_nsmallest'] + 'heapreplace', '_heapreplace_max'] class TestModules(TestCase): def test_py_functions(self): diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 22f7329886..1a6d8d04d3 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -18,6 +18,26 @@ CERT_fakehostname = os.path.join(here, 'keycert2.pem') # Root cert file (CA) for svn.python.org's cert CACERT_svn_python_org = os.path.join(here, 'https_svn_python_org_root.pem') +# constants for testing chunked encoding +chunked_start = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd! \r\n' + '8\r\n' + 'and now \r\n' + '22\r\n' + 'for something completely different\r\n' +) +chunked_expected = b'hello world! and now for something completely different' +chunk_extension = ";foo=bar" +last_chunk = "0\r\n" +last_chunk_extended = "0" + chunk_extension + "\r\n" +trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" +chunked_end = "\r\n" + HOST = support.HOST class FakeSocket: @@ -38,7 +58,10 @@ class FakeSocket: def makefile(self, mode, bufsize=None): if mode != 'r' and mode != 'rb': raise client.UnimplementedFileMode() - return self.fileclass(self.text) + # keep the file around so we can check how much was read from it + self.file = self.fileclass(self.text) + self.file.close = lambda:None #nerf close () + return self.file def close(self): pass @@ -435,20 +458,8 @@ class BasicTest(TestCase): conn.request('POST', 'test', conn) def test_chunked(self): - chunked_start = ( - 'HTTP/1.1 200 OK\r\n' - 'Transfer-Encoding: chunked\r\n\r\n' - 'a\r\n' - 'hello worl\r\n' - '3\r\n' - 'd! \r\n' - '8\r\n' - 'and now \r\n' - '22\r\n' - 'for something completely different\r\n' - ) - expected = b'hello world! and now for something completely different' - sock = FakeSocket(chunked_start + '0\r\n') + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() self.assertEqual(resp.read(), expected) @@ -456,7 +467,7 @@ class BasicTest(TestCase): # Various read sizes for n in range(1, 12): - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) @@ -479,23 +490,12 @@ class BasicTest(TestCase): resp.close() def test_readinto_chunked(self): - chunked_start = ( - 'HTTP/1.1 200 OK\r\n' - 'Transfer-Encoding: chunked\r\n\r\n' - 'a\r\n' - 'hello worl\r\n' - '3\r\n' - 'd! \r\n' - '8\r\n' - 'and now \r\n' - '22\r\n' - 'for something completely different\r\n' - ) - expected = b'hello world! and now for something completely different' + + expected = chunked_expected nexpected = len(expected) b = bytearray(128) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() n = resp.readinto(b) @@ -505,7 +505,7 @@ class BasicTest(TestCase): # Various read sizes for n in range(1, 12): - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="GET") resp.begin() m = memoryview(b) @@ -541,7 +541,7 @@ class BasicTest(TestCase): '1\r\n' 'd\r\n' ) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="HEAD") resp.begin() self.assertEqual(resp.read(), b'') @@ -561,7 +561,7 @@ class BasicTest(TestCase): '1\r\n' 'd\r\n' ) - sock = FakeSocket(chunked_start + '0\r\n') + sock = FakeSocket(chunked_start + last_chunk + chunked_end) resp = client.HTTPResponse(sock, method="HEAD") resp.begin() b = bytearray(5) @@ -636,6 +636,7 @@ class BasicTest(TestCase): + '0' * 65536 + 'a\r\n' 'hello world\r\n' '0\r\n' + '\r\n' ) resp = client.HTTPResponse(FakeSocket(body)) resp.begin() @@ -675,6 +676,239 @@ class BasicTest(TestCase): conn.request('POST', '/', body) self.assertGreater(sock.sendall_calls, 1) + def test_chunked_extension(self): + extra = '3;foo=bar\r\n' + 'abc\r\n' + expected = chunked_expected + b'abc' + + sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + resp.close() + + def test_chunked_missing_end(self): + """some servers may serve up a short chunked encoding stream""" + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + resp.close() + + def test_chunked_trailers(self): + """See that trailers are read and ignored""" + expected = chunked_expected + sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # we should have reached the end of the file + self.assertEqual(sock.file.read(100), b"") #we read to the end + resp.close() + + def test_chunked_sync(self): + """Check that we don't read past the end of the chunked-encoding stream""" + expected = chunked_expected + extradata = "extradata" + sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # the file should now have our extradata ready to be read + self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end + resp.close() + + def test_content_length_sync(self): + """Check that we don't read past the end of the Content-Length stream""" + extradata = "extradata" + expected = b"Hello123\r\n" + sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello123\r\n' + extradata) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + self.assertEqual(resp.read(), expected) + # the file should now have our extradata ready to be read + self.assertEqual(sock.file.read(100), extradata.encode("ascii")) #we read to the end + resp.close() + +class ExtendedReadTest(TestCase): + """ + Test peek(), read1(), readline() + """ + lines = ( + 'HTTP/1.1 200 OK\r\n' + '\r\n' + 'hello world!\n' + 'and now \n' + 'for something completely different\n' + 'foo' + ) + lines_expected = lines[lines.find('hello'):].encode("ascii") + lines_chunked = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd!\n\r\n' + '9\r\n' + 'and now \n\r\n' + '23\r\n' + 'for something completely different\n\r\n' + '3\r\n' + 'foo\r\n' + '0\r\n' # terminating chunk + '\r\n' # end of trailers + ) + + def setUp(self): + sock = FakeSocket(self.lines) + resp = client.HTTPResponse(sock, method="GET") + resp.begin() + resp.fp = io.BufferedReader(resp.fp) + self.resp = resp + + + + def test_peek(self): + resp = self.resp + # patch up the buffered peek so that it returns not too much stuff + oldpeek = resp.fp.peek + def mypeek(n=-1): + p = oldpeek(n) + if n >= 0: + return p[:n] + return p[:10] + resp.fp.peek = mypeek + + all = [] + while True: + # try a short peek + p = resp.peek(3) + if p: + self.assertGreater(len(p), 0) + # then unbounded peek + p2 = resp.peek() + self.assertGreaterEqual(len(p2), len(p)) + self.assertTrue(p2.startswith(p)) + next = resp.read(len(p2)) + self.assertEqual(next, p2) + else: + next = resp.read() + self.assertFalse(next) + all.append(next) + if not next: + break + self.assertEqual(b"".join(all), self.lines_expected) + + def test_readline(self): + resp = self.resp + self._verify_readline(self.resp.readline, self.lines_expected) + + def _verify_readline(self, readline, expected): + all = [] + while True: + # short readlines + line = readline(5) + if line and line != b"foo": + if len(line) < 5: + self.assertTrue(line.endswith(b"\n")) + all.append(line) + if not line: + break + self.assertEqual(b"".join(all), expected) + + def test_read1(self): + resp = self.resp + def r(): + res = resp.read1(4) + self.assertLessEqual(len(res), 4) + return res + readliner = Readliner(r) + self._verify_readline(readliner.readline, self.lines_expected) + + def test_read1_unbounded(self): + resp = self.resp + all = [] + while True: + data = resp.read1() + if not data: + break + all.append(data) + self.assertEqual(b"".join(all), self.lines_expected) + + def test_read1_bounded(self): + resp = self.resp + all = [] + while True: + data = resp.read1(10) + if not data: + break + self.assertLessEqual(len(data), 10) + all.append(data) + self.assertEqual(b"".join(all), self.lines_expected) + + def test_read1_0(self): + self.assertEqual(self.resp.read1(0), b"") + + def test_peek_0(self): + p = self.resp.peek(0) + self.assertLessEqual(0, len(p)) + +class ExtendedReadTestChunked(ExtendedReadTest): + """ + Test peek(), read1(), readline() in chunked mode + """ + lines = ( + 'HTTP/1.1 200 OK\r\n' + 'Transfer-Encoding: chunked\r\n\r\n' + 'a\r\n' + 'hello worl\r\n' + '3\r\n' + 'd!\n\r\n' + '9\r\n' + 'and now \n\r\n' + '23\r\n' + 'for something completely different\n\r\n' + '3\r\n' + 'foo\r\n' + '0\r\n' # terminating chunk + '\r\n' # end of trailers + ) + + +class Readliner: + """ + a simple readline class that uses an arbitrary read function and buffering + """ + def __init__(self, readfunc): + self.readfunc = readfunc + self.remainder = b"" + + def readline(self, limit): + data = [] + datalen = 0 + read = self.remainder + try: + while True: + idx = read.find(b'\n') + if idx != -1: + break + if datalen + len(read) >= limit: + idx = limit - datalen - 1 + # read more data + data.append(read) + read = self.readfunc() + if not read: + idx = 0 #eof condition + break + idx += 1 + data.append(read[:idx]) + self.remainder = read[idx:] + return b"".join(data) + except: + self.remainder = b"".join(data) + raise + class OfflineTest(TestCase): def test_responses(self): self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") @@ -1019,7 +1253,8 @@ class TunnelTests(TestCase): def test_main(verbose=None): support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTest, RequestBodyTest, SourceAddressTest, - HTTPResponseTest, TunnelTests) + HTTPResponseTest, ExtendedReadTest, + ExtendedReadTestChunked, TunnelTests) if __name__ == '__main__': test_main() diff --git a/Lib/test/test_importlib/builtin/test_finder.py b/Lib/test/test_importlib/builtin/test_finder.py index 934562feb6..a2e6e1edc3 100644 --- a/Lib/test/test_importlib/builtin/test_finder.py +++ b/Lib/test/test_importlib/builtin/test_finder.py @@ -1,21 +1,21 @@ from .. import abc from .. import util -from . import util as builtin_util -frozen_machinery, source_machinery = util.import_importlib('importlib.machinery') +machinery = util.import_importlib('importlib.machinery') import sys import unittest +@unittest.skipIf(util.BUILTINS.good_name is None, 'no reasonable builtin module') class FindSpecTests(abc.FinderTests): """Test find_spec() for built-in modules.""" def test_module(self): # Common case. - with util.uncache(builtin_util.NAME): - found = self.machinery.BuiltinImporter.find_spec(builtin_util.NAME) + with util.uncache(util.BUILTINS.good_name): + found = self.machinery.BuiltinImporter.find_spec(util.BUILTINS.good_name) self.assertTrue(found) self.assertEqual(found.origin, 'built-in') @@ -39,23 +39,26 @@ class FindSpecTests(abc.FinderTests): def test_ignore_path(self): # The value for 'path' should always trigger a failed import. - with util.uncache(builtin_util.NAME): - spec = self.machinery.BuiltinImporter.find_spec(builtin_util.NAME, + with util.uncache(util.BUILTINS.good_name): + spec = self.machinery.BuiltinImporter.find_spec(util.BUILTINS.good_name, ['pkg']) self.assertIsNone(spec) -Frozen_FindSpecTests, Source_FindSpecTests = util.test_both(FindSpecTests, - machinery=[frozen_machinery, source_machinery]) +(Frozen_FindSpecTests, + Source_FindSpecTests + ) = util.test_both(FindSpecTests, machinery=machinery) + +@unittest.skipIf(util.BUILTINS.good_name is None, 'no reasonable builtin module') class FinderTests(abc.FinderTests): """Test find_module() for built-in modules.""" def test_module(self): # Common case. - with util.uncache(builtin_util.NAME): - found = self.machinery.BuiltinImporter.find_module(builtin_util.NAME) + with util.uncache(util.BUILTINS.good_name): + found = self.machinery.BuiltinImporter.find_module(util.BUILTINS.good_name) self.assertTrue(found) self.assertTrue(hasattr(found, 'load_module')) @@ -72,13 +75,15 @@ class FinderTests(abc.FinderTests): def test_ignore_path(self): # The value for 'path' should always trigger a failed import. - with util.uncache(builtin_util.NAME): - loader = self.machinery.BuiltinImporter.find_module(builtin_util.NAME, + with util.uncache(util.BUILTINS.good_name): + loader = self.machinery.BuiltinImporter.find_module(util.BUILTINS.good_name, ['pkg']) self.assertIsNone(loader) -Frozen_FinderTests, Source_FinderTests = util.test_both(FinderTests, - machinery=[frozen_machinery, source_machinery]) + +(Frozen_FinderTests, + Source_FinderTests + ) = util.test_both(FinderTests, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/builtin/test_loader.py b/Lib/test/test_importlib/builtin/test_loader.py index a636f778fa..eaee025483 100644 --- a/Lib/test/test_importlib/builtin/test_loader.py +++ b/Lib/test/test_importlib/builtin/test_loader.py @@ -1,14 +1,13 @@ from .. import abc from .. import util -from . import util as builtin_util -frozen_machinery, source_machinery = util.import_importlib('importlib.machinery') +machinery = util.import_importlib('importlib.machinery') import sys import types import unittest - +@unittest.skipIf(util.BUILTINS.good_name is None, 'no reasonable builtin module') class LoaderTests(abc.LoaderTests): """Test load_module() for built-in modules.""" @@ -29,8 +28,8 @@ class LoaderTests(abc.LoaderTests): def test_module(self): # Common case. - with util.uncache(builtin_util.NAME): - module = self.load_module(builtin_util.NAME) + with util.uncache(util.BUILTINS.good_name): + module = self.load_module(util.BUILTINS.good_name) self.verify(module) # Built-in modules cannot be a package. @@ -41,9 +40,9 @@ class LoaderTests(abc.LoaderTests): def test_module_reuse(self): # Test that the same module is used in a reload. - with util.uncache(builtin_util.NAME): - module1 = self.load_module(builtin_util.NAME) - module2 = self.load_module(builtin_util.NAME) + with util.uncache(util.BUILTINS.good_name): + module1 = self.load_module(util.BUILTINS.good_name) + module2 = self.load_module(util.BUILTINS.good_name) self.assertIs(module1, module2) def test_unloadable(self): @@ -66,40 +65,44 @@ class LoaderTests(abc.LoaderTests): self.assertEqual(cm.exception.name, module_name) -Frozen_LoaderTests, Source_LoaderTests = util.test_both(LoaderTests, - machinery=[frozen_machinery, source_machinery]) +(Frozen_LoaderTests, + Source_LoaderTests + ) = util.test_both(LoaderTests, machinery=machinery) +@unittest.skipIf(util.BUILTINS.good_name is None, 'no reasonable builtin module') class InspectLoaderTests: """Tests for InspectLoader methods for BuiltinImporter.""" def test_get_code(self): # There is no code object. - result = self.machinery.BuiltinImporter.get_code(builtin_util.NAME) + result = self.machinery.BuiltinImporter.get_code(util.BUILTINS.good_name) self.assertIsNone(result) def test_get_source(self): # There is no source. - result = self.machinery.BuiltinImporter.get_source(builtin_util.NAME) + result = self.machinery.BuiltinImporter.get_source(util.BUILTINS.good_name) self.assertIsNone(result) def test_is_package(self): # Cannot be a package. - result = self.machinery.BuiltinImporter.is_package(builtin_util.NAME) + result = self.machinery.BuiltinImporter.is_package(util.BUILTINS.good_name) self.assertTrue(not result) + @unittest.skipIf(util.BUILTINS.bad_name is None, 'all modules are built in') def test_not_builtin(self): # Modules not built-in should raise ImportError. for meth_name in ('get_code', 'get_source', 'is_package'): method = getattr(self.machinery.BuiltinImporter, meth_name) with self.assertRaises(ImportError) as cm: - method(builtin_util.BAD_NAME) - self.assertRaises(builtin_util.BAD_NAME) + method(util.BUILTINS.bad_name) + self.assertRaises(util.BUILTINS.bad_name) + -Frozen_InspectLoaderTests, Source_InspectLoaderTests = util.test_both( - InspectLoaderTests, - machinery=[frozen_machinery, source_machinery]) +(Frozen_InspectLoaderTests, + Source_InspectLoaderTests + ) = util.test_both(InspectLoaderTests, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/builtin/util.py b/Lib/test/test_importlib/builtin/util.py deleted file mode 100644 index 5704699ee2..0000000000 --- a/Lib/test/test_importlib/builtin/util.py +++ /dev/null @@ -1,7 +0,0 @@ -import sys - -assert 'errno' in sys.builtin_module_names -NAME = 'errno' - -assert 'importlib' not in sys.builtin_module_names -BAD_NAME = 'importlib' diff --git a/Lib/test/test_importlib/extension/test_case_sensitivity.py b/Lib/test/test_importlib/extension/test_case_sensitivity.py index bb2528e626..c7d6ca61f8 100644 --- a/Lib/test/test_importlib/extension/test_case_sensitivity.py +++ b/Lib/test/test_importlib/extension/test_case_sensitivity.py @@ -4,22 +4,21 @@ from test import support import unittest from .. import util -from . import util as ext_util -frozen_machinery, source_machinery = util.import_importlib('importlib.machinery') +machinery = util.import_importlib('importlib.machinery') # XXX find_spec tests -@unittest.skipIf(ext_util.FILENAME is None, '_testcapi not available') +@unittest.skipIf(util.EXTENSIONS.filename is None, '_testcapi not available') @util.case_insensitive_tests class ExtensionModuleCaseSensitivityTest: def find_module(self): - good_name = ext_util.NAME + good_name = util.EXTENSIONS.name bad_name = good_name.upper() assert good_name != bad_name - finder = self.machinery.FileFinder(ext_util.PATH, + finder = self.machinery.FileFinder(util.EXTENSIONS.path, (self.machinery.ExtensionFileLoader, self.machinery.EXTENSION_SUFFIXES)) return finder.find_module(bad_name) @@ -42,9 +41,10 @@ class ExtensionModuleCaseSensitivityTest: loader = self.find_module() self.assertTrue(hasattr(loader, 'load_module')) -Frozen_ExtensionCaseSensitivity, Source_ExtensionCaseSensitivity = util.test_both( - ExtensionModuleCaseSensitivityTest, - machinery=[frozen_machinery, source_machinery]) + +(Frozen_ExtensionCaseSensitivity, + Source_ExtensionCaseSensitivity + ) = util.test_both(ExtensionModuleCaseSensitivityTest, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/extension/test_finder.py b/Lib/test/test_importlib/extension/test_finder.py index 990f29c4e5..71bf67febd 100644 --- a/Lib/test/test_importlib/extension/test_finder.py +++ b/Lib/test/test_importlib/extension/test_finder.py @@ -1,8 +1,7 @@ from .. import abc -from .. import util as test_util -from . import util +from .. import util -machinery = test_util.import_importlib('importlib.machinery') +machinery = util.import_importlib('importlib.machinery') import unittest import warnings @@ -14,7 +13,7 @@ class FinderTests(abc.FinderTests): """Test the finder for extension modules.""" def find_module(self, fullname): - importer = self.machinery.FileFinder(util.PATH, + importer = self.machinery.FileFinder(util.EXTENSIONS.path, (self.machinery.ExtensionFileLoader, self.machinery.EXTENSION_SUFFIXES)) with warnings.catch_warnings(): @@ -22,7 +21,7 @@ class FinderTests(abc.FinderTests): return importer.find_module(fullname) def test_module(self): - self.assertTrue(self.find_module(util.NAME)) + self.assertTrue(self.find_module(util.EXTENSIONS.name)) # No extension module as an __init__ available for testing. test_package = test_package_in_package = None @@ -36,8 +35,10 @@ class FinderTests(abc.FinderTests): def test_failure(self): self.assertIsNone(self.find_module('asdfjkl;')) -Frozen_FinderTests, Source_FinderTests = test_util.test_both( - FinderTests, machinery=machinery) + +(Frozen_FinderTests, + Source_FinderTests + ) = util.test_both(FinderTests, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/extension/test_loader.py b/Lib/test/test_importlib/extension/test_loader.py index fd9abf269b..aefd050ff0 100644 --- a/Lib/test/test_importlib/extension/test_loader.py +++ b/Lib/test/test_importlib/extension/test_loader.py @@ -1,4 +1,3 @@ -from . import util as ext_util from .. import abc from .. import util @@ -15,8 +14,8 @@ class LoaderTests(abc.LoaderTests): """Test load_module() for extension modules.""" def setUp(self): - self.loader = self.machinery.ExtensionFileLoader(ext_util.NAME, - ext_util.FILEPATH) + self.loader = self.machinery.ExtensionFileLoader(util.EXTENSIONS.name, + util.EXTENSIONS.file_path) def load_module(self, fullname): return self.loader.load_module(fullname) @@ -29,23 +28,23 @@ class LoaderTests(abc.LoaderTests): self.load_module('XXX') def test_equality(self): - other = self.machinery.ExtensionFileLoader(ext_util.NAME, - ext_util.FILEPATH) + other = self.machinery.ExtensionFileLoader(util.EXTENSIONS.name, + util.EXTENSIONS.file_path) self.assertEqual(self.loader, other) def test_inequality(self): - other = self.machinery.ExtensionFileLoader('_' + ext_util.NAME, - ext_util.FILEPATH) + other = self.machinery.ExtensionFileLoader('_' + util.EXTENSIONS.name, + util.EXTENSIONS.file_path) self.assertNotEqual(self.loader, other) def test_module(self): - with util.uncache(ext_util.NAME): - module = self.load_module(ext_util.NAME) - for attr, value in [('__name__', ext_util.NAME), - ('__file__', ext_util.FILEPATH), + with util.uncache(util.EXTENSIONS.name): + module = self.load_module(util.EXTENSIONS.name) + for attr, value in [('__name__', util.EXTENSIONS.name), + ('__file__', util.EXTENSIONS.file_path), ('__package__', '')]: self.assertEqual(getattr(module, attr), value) - self.assertIn(ext_util.NAME, sys.modules) + self.assertIn(util.EXTENSIONS.name, sys.modules) self.assertIsInstance(module.__loader__, self.machinery.ExtensionFileLoader) @@ -56,9 +55,9 @@ class LoaderTests(abc.LoaderTests): test_lacking_parent = None def test_module_reuse(self): - with util.uncache(ext_util.NAME): - module1 = self.load_module(ext_util.NAME) - module2 = self.load_module(ext_util.NAME) + with util.uncache(util.EXTENSIONS.name): + module1 = self.load_module(util.EXTENSIONS.name) + module2 = self.load_module(util.EXTENSIONS.name) self.assertIs(module1, module2) # No easy way to trigger a failure after a successful import. @@ -71,14 +70,15 @@ class LoaderTests(abc.LoaderTests): self.assertEqual(cm.exception.name, name) def test_is_package(self): - self.assertFalse(self.loader.is_package(ext_util.NAME)) + self.assertFalse(self.loader.is_package(util.EXTENSIONS.name)) for suffix in self.machinery.EXTENSION_SUFFIXES: path = os.path.join('some', 'path', 'pkg', '__init__' + suffix) loader = self.machinery.ExtensionFileLoader('pkg', path) self.assertTrue(loader.is_package('pkg')) -Frozen_LoaderTests, Source_LoaderTests = util.test_both( - LoaderTests, machinery=machinery) +(Frozen_LoaderTests, + Source_LoaderTests + ) = util.test_both(LoaderTests, machinery=machinery) diff --git a/Lib/test/test_importlib/extension/test_path_hook.py b/Lib/test/test_importlib/extension/test_path_hook.py index 49d6734711..8f4b8bb161 100644 --- a/Lib/test/test_importlib/extension/test_path_hook.py +++ b/Lib/test/test_importlib/extension/test_path_hook.py @@ -1,7 +1,6 @@ -from .. import util as test_util -from . import util +from .. import util -machinery = test_util.import_importlib('importlib.machinery') +machinery = util.import_importlib('importlib.machinery') import collections import sys @@ -22,10 +21,12 @@ class PathHookTests: def test_success(self): # Path hook should handle a directory where a known extension module # exists. - self.assertTrue(hasattr(self.hook(util.PATH), 'find_module')) + self.assertTrue(hasattr(self.hook(util.EXTENSIONS.path), 'find_module')) -Frozen_PathHooksTests, Source_PathHooksTests = test_util.test_both( - PathHookTests, machinery=machinery) + +(Frozen_PathHooksTests, + Source_PathHooksTests + ) = util.test_both(PathHookTests, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/extension/util.py b/Lib/test/test_importlib/extension/util.py deleted file mode 100644 index 8d089f0d7c..0000000000 --- a/Lib/test/test_importlib/extension/util.py +++ /dev/null @@ -1,19 +0,0 @@ -from importlib import machinery -import os -import sys - -PATH = None -EXT = None -FILENAME = None -NAME = '_testcapi' -try: - for PATH in sys.path: - for EXT in machinery.EXTENSION_SUFFIXES: - FILENAME = NAME + EXT - FILEPATH = os.path.join(PATH, FILENAME) - if os.path.exists(os.path.join(PATH, FILENAME)): - raise StopIteration - else: - PATH = EXT = FILENAME = FILEPATH = None -except StopIteration: - pass diff --git a/Lib/test/test_importlib/frozen/test_finder.py b/Lib/test/test_importlib/frozen/test_finder.py index f9f97f3851..519aa027d6 100644 --- a/Lib/test/test_importlib/frozen/test_finder.py +++ b/Lib/test/test_importlib/frozen/test_finder.py @@ -37,8 +37,10 @@ class FindSpecTests(abc.FinderTests): spec = self.find('<not real>') self.assertIsNone(spec) -Frozen_FindSpecTests, Source_FindSpecTests = util.test_both(FindSpecTests, - machinery=machinery) + +(Frozen_FindSpecTests, + Source_FindSpecTests + ) = util.test_both(FindSpecTests, machinery=machinery) class FinderTests(abc.FinderTests): @@ -72,8 +74,10 @@ class FinderTests(abc.FinderTests): loader = self.find('<not real>') self.assertIsNone(loader) -Frozen_FinderTests, Source_FinderTests = util.test_both(FinderTests, - machinery=machinery) + +(Frozen_FinderTests, + Source_FinderTests + ) = util.test_both(FinderTests, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/frozen/test_loader.py b/Lib/test/test_importlib/frozen/test_loader.py index 7c0146456d..603c7d7b19 100644 --- a/Lib/test/test_importlib/frozen/test_loader.py +++ b/Lib/test/test_importlib/frozen/test_loader.py @@ -85,8 +85,10 @@ class ExecModuleTests(abc.LoaderTests): self.exec_module('_not_real') self.assertEqual(cm.exception.name, '_not_real') -Frozen_ExecModuleTests, Source_ExecModuleTests = util.test_both(ExecModuleTests, - machinery=machinery) + +(Frozen_ExecModuleTests, + Source_ExecModuleTests + ) = util.test_both(ExecModuleTests, machinery=machinery) class LoaderTests(abc.LoaderTests): @@ -175,8 +177,10 @@ class LoaderTests(abc.LoaderTests): self.machinery.FrozenImporter.load_module('_not_real') self.assertEqual(cm.exception.name, '_not_real') -Frozen_LoaderTests, Source_LoaderTests = util.test_both(LoaderTests, - machinery=machinery) + +(Frozen_LoaderTests, + Source_LoaderTests + ) = util.test_both(LoaderTests, machinery=machinery) class InspectLoaderTests: @@ -214,8 +218,9 @@ class InspectLoaderTests: method('importlib') self.assertEqual(cm.exception.name, 'importlib') -Frozen_ILTests, Source_ILTests = util.test_both(InspectLoaderTests, - machinery=machinery) +(Frozen_ILTests, + Source_ILTests + ) = util.test_both(InspectLoaderTests, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/import_/test___loader__.py b/Lib/test/test_importlib/import_/test___loader__.py index 6df80101fa..9998cd628d 100644 --- a/Lib/test/test_importlib/import_/test___loader__.py +++ b/Lib/test/test_importlib/import_/test___loader__.py @@ -4,7 +4,6 @@ import types import unittest from .. import util -from . import util as import_util class SpecLoaderMock: @@ -24,8 +23,10 @@ class SpecLoaderAttributeTests: module = self.__import__('blah') self.assertEqual(loader, module.__loader__) -Frozen_SpecTests, Source_SpecTests = util.test_both( - SpecLoaderAttributeTests, __import__=import_util.__import__) + +(Frozen_SpecTests, + Source_SpecTests + ) = util.test_both(SpecLoaderAttributeTests, __import__=util.__import__) class LoaderMock: @@ -62,8 +63,9 @@ class LoaderAttributeTests: self.assertEqual(loader, module.__loader__) -Frozen_Tests, Source_Tests = util.test_both(LoaderAttributeTests, - __import__=import_util.__import__) +(Frozen_Tests, + Source_Tests + ) = util.test_both(LoaderAttributeTests, __import__=util.__import__) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/import_/test___package__.py b/Lib/test/test_importlib/import_/test___package__.py index 2e19725680..c7d3a2a204 100644 --- a/Lib/test/test_importlib/import_/test___package__.py +++ b/Lib/test/test_importlib/import_/test___package__.py @@ -6,7 +6,6 @@ of using the typical __path__/__name__ test). """ import unittest from .. import util -from . import util as import_util class Using__package__: @@ -70,17 +69,23 @@ class Using__package__: with self.assertRaises(TypeError): self.__import__('', globals, {}, ['relimport'], 1) + class Using__package__PEP302(Using__package__): mock_modules = util.mock_modules -Frozen_UsingPackagePEP302, Source_UsingPackagePEP302 = util.test_both( - Using__package__PEP302, __import__=import_util.__import__) -class Using__package__PEP302(Using__package__): +(Frozen_UsingPackagePEP302, + Source_UsingPackagePEP302 + ) = util.test_both(Using__package__PEP302, __import__=util.__import__) + + +class Using__package__PEP451(Using__package__): mock_modules = util.mock_spec -Frozen_UsingPackagePEP451, Source_UsingPackagePEP451 = util.test_both( - Using__package__PEP302, __import__=import_util.__import__) + +(Frozen_UsingPackagePEP451, + Source_UsingPackagePEP451 + ) = util.test_both(Using__package__PEP451, __import__=util.__import__) class Setting__package__: @@ -95,7 +100,7 @@ class Setting__package__: """ - __import__ = import_util.__import__[1] + __import__ = util.__import__['Source'] # [top-level] def test_top_level(self): diff --git a/Lib/test/test_importlib/import_/test_api.py b/Lib/test/test_importlib/import_/test_api.py index 439c105e9b..2c61b0121e 100644 --- a/Lib/test/test_importlib/import_/test_api.py +++ b/Lib/test/test_importlib/import_/test_api.py @@ -1,5 +1,4 @@ from .. import util -from . import util as import_util from importlib import machinery import sys @@ -79,15 +78,19 @@ class APITest: class OldAPITests(APITest): bad_finder_loader = BadLoaderFinder -Frozen_OldAPITests, Source_OldAPITests = util.test_both( - OldAPITests, __import__=import_util.__import__) + +(Frozen_OldAPITests, + Source_OldAPITests + ) = util.test_both(OldAPITests, __import__=util.__import__) class SpecAPITests(APITest): bad_finder_loader = BadSpecFinderLoader -Frozen_SpecAPITests, Source_SpecAPITests = util.test_both( - SpecAPITests, __import__=import_util.__import__) + +(Frozen_SpecAPITests, + Source_SpecAPITests + ) = util.test_both(SpecAPITests, __import__=util.__import__) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/import_/test_caching.py b/Lib/test/test_importlib/import_/test_caching.py index c292ee4ac2..8079add5b2 100644 --- a/Lib/test/test_importlib/import_/test_caching.py +++ b/Lib/test/test_importlib/import_/test_caching.py @@ -1,6 +1,5 @@ """Test that sys.modules is used properly by import.""" from .. import util -from . import util as import_util import sys from types import MethodType import unittest @@ -39,15 +38,17 @@ class UseCache: self.__import__(name) self.assertEqual(cm.exception.name, name) -Frozen_UseCache, Source_UseCache = util.test_both( - UseCache, __import__=import_util.__import__) + +(Frozen_UseCache, + Source_UseCache + ) = util.test_both(UseCache, __import__=util.__import__) class ImportlibUseCache(UseCache, unittest.TestCase): # Pertinent only to PEP 302; exec_module() doesn't return a module. - __import__ = import_util.__import__[1] + __import__ = util.__import__['Source'] def create_mock(self, *names, return_=None): mock = util.mock_modules(*names) diff --git a/Lib/test/test_importlib/import_/test_fromlist.py b/Lib/test/test_importlib/import_/test_fromlist.py index 58f244bddc..899322647c 100644 --- a/Lib/test/test_importlib/import_/test_fromlist.py +++ b/Lib/test/test_importlib/import_/test_fromlist.py @@ -1,6 +1,5 @@ """Test that the semantics relating to the 'fromlist' argument are correct.""" from .. import util -from . import util as import_util import unittest @@ -29,8 +28,10 @@ class ReturnValue: module = self.__import__('pkg.module', fromlist=['attr']) self.assertEqual(module.__name__, 'pkg.module') -Frozen_ReturnValue, Source_ReturnValue = util.test_both( - ReturnValue, __import__=import_util.__import__) + +(Frozen_ReturnValue, + Source_ReturnValue + ) = util.test_both(ReturnValue, __import__=util.__import__) class HandlingFromlist: @@ -121,8 +122,10 @@ class HandlingFromlist: self.assertEqual(module.module1.__name__, 'pkg.module1') self.assertEqual(module.module2.__name__, 'pkg.module2') -Frozen_FromList, Source_FromList = util.test_both( - HandlingFromlist, __import__=import_util.__import__) + +(Frozen_FromList, + Source_FromList + ) = util.test_both(HandlingFromlist, __import__=util.__import__) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/import_/test_meta_path.py b/Lib/test/test_importlib/import_/test_meta_path.py index dc45420621..47a603c10d 100644 --- a/Lib/test/test_importlib/import_/test_meta_path.py +++ b/Lib/test/test_importlib/import_/test_meta_path.py @@ -1,5 +1,4 @@ from .. import util -from . import util as import_util import importlib._bootstrap import sys from types import MethodType @@ -46,8 +45,10 @@ class CallingOrder: self.assertEqual(len(w), 1) self.assertTrue(issubclass(w[-1].category, ImportWarning)) -Frozen_CallingOrder, Source_CallingOrder = util.test_both( - CallingOrder, __import__=import_util.__import__) + +(Frozen_CallingOrder, + Source_CallingOrder + ) = util.test_both(CallingOrder, __import__=util.__import__) class CallSignature: @@ -100,19 +101,25 @@ class CallSignature: self.assertEqual(args[0], mod_name) self.assertIs(args[1], path) + class CallSignaturePEP302(CallSignature): mock_modules = util.mock_modules finder_name = 'find_module' -Frozen_CallSignaturePEP302, Source_CallSignaturePEP302 = util.test_both( - CallSignaturePEP302, __import__=import_util.__import__) + +(Frozen_CallSignaturePEP302, + Source_CallSignaturePEP302 + ) = util.test_both(CallSignaturePEP302, __import__=util.__import__) + class CallSignaturePEP451(CallSignature): mock_modules = util.mock_spec finder_name = 'find_spec' -Frozen_CallSignaturePEP451, Source_CallSignaturePEP451 = util.test_both( - CallSignaturePEP451, __import__=import_util.__import__) + +(Frozen_CallSignaturePEP451, + Source_CallSignaturePEP451 + ) = util.test_both(CallSignaturePEP451, __import__=util.__import__) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/import_/test_packages.py b/Lib/test/test_importlib/import_/test_packages.py index 55a5d14985..3755b84a1a 100644 --- a/Lib/test/test_importlib/import_/test_packages.py +++ b/Lib/test/test_importlib/import_/test_packages.py @@ -1,5 +1,4 @@ from .. import util -from . import util as import_util import sys import unittest import importlib @@ -102,8 +101,10 @@ class ParentModuleTests: finally: support.unload(subname) -Frozen_ParentTests, Source_ParentTests = util.test_both( - ParentModuleTests, __import__=import_util.__import__) + +(Frozen_ParentTests, + Source_ParentTests + ) = util.test_both(ParentModuleTests, __import__=util.__import__) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/import_/test_path.py b/Lib/test/test_importlib/import_/test_path.py index 1274f8cb9e..e86c655852 100644 --- a/Lib/test/test_importlib/import_/test_path.py +++ b/Lib/test/test_importlib/import_/test_path.py @@ -1,5 +1,4 @@ from .. import util -from . import util as import_util importlib = util.import_importlib('importlib') machinery = util.import_importlib('importlib.machinery') @@ -58,7 +57,7 @@ class FinderTests: module = '<test module>' path = '<test path>' importer = util.mock_spec(module) - hook = import_util.mock_path_hook(path, importer=importer) + hook = util.mock_path_hook(path, importer=importer) with util.import_state(path_hooks=[hook]): loader = self.machinery.PathFinder.find_module(module, [path]) self.assertIs(loader, importer) @@ -83,7 +82,7 @@ class FinderTests: path = '' module = '<test module>' importer = util.mock_spec(module) - hook = import_util.mock_path_hook(os.getcwd(), importer=importer) + hook = util.mock_path_hook(os.getcwd(), importer=importer) with util.import_state(path=[path], path_hooks=[hook]): loader = self.machinery.PathFinder.find_module(module) self.assertIs(loader, importer) @@ -112,8 +111,57 @@ class FinderTests: if email is not missing: sys.modules['email'] = email -Frozen_FinderTests, Source_FinderTests = util.test_both( - FinderTests, importlib=importlib, machinery=machinery) + def test_finder_with_find_module(self): + class TestFinder: + def find_module(self, fullname): + return self.to_return + failing_finder = TestFinder() + failing_finder.to_return = None + path = 'testing path' + with util.import_state(path_importer_cache={path: failing_finder}): + self.assertIsNone( + self.machinery.PathFinder.find_spec('whatever', [path])) + success_finder = TestFinder() + success_finder.to_return = __loader__ + with util.import_state(path_importer_cache={path: success_finder}): + spec = self.machinery.PathFinder.find_spec('whatever', [path]) + self.assertEqual(spec.loader, __loader__) + + def test_finder_with_find_loader(self): + class TestFinder: + loader = None + portions = [] + def find_loader(self, fullname): + return self.loader, self.portions + path = 'testing path' + with util.import_state(path_importer_cache={path: TestFinder()}): + self.assertIsNone( + self.machinery.PathFinder.find_spec('whatever', [path])) + success_finder = TestFinder() + success_finder.loader = __loader__ + with util.import_state(path_importer_cache={path: success_finder}): + spec = self.machinery.PathFinder.find_spec('whatever', [path]) + self.assertEqual(spec.loader, __loader__) + + def test_finder_with_find_spec(self): + class TestFinder: + spec = None + def find_spec(self, fullname, target=None): + return self.spec + path = 'testing path' + with util.import_state(path_importer_cache={path: TestFinder()}): + self.assertIsNone( + self.machinery.PathFinder.find_spec('whatever', [path])) + success_finder = TestFinder() + success_finder.spec = self.machinery.ModuleSpec('whatever', __loader__) + with util.import_state(path_importer_cache={path: success_finder}): + got = self.machinery.PathFinder.find_spec('whatever', [path]) + self.assertEqual(got, success_finder.spec) + + +(Frozen_FinderTests, + Source_FinderTests + ) = util.test_both(FinderTests, importlib=importlib, machinery=machinery) class PathEntryFinderTests: @@ -136,8 +184,10 @@ class PathEntryFinderTests: path_hooks=[Finder]): self.machinery.PathFinder.find_spec('importlib') -Frozen_PEFTests, Source_PEFTests = util.test_both( - PathEntryFinderTests, machinery=machinery) + +(Frozen_PEFTests, + Source_PEFTests + ) = util.test_both(PathEntryFinderTests, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/import_/test_relative_imports.py b/Lib/test/test_importlib/import_/test_relative_imports.py index b216e9cc13..28bb6f7c0c 100644 --- a/Lib/test/test_importlib/import_/test_relative_imports.py +++ b/Lib/test/test_importlib/import_/test_relative_imports.py @@ -1,6 +1,5 @@ """Test relative imports (PEP 328).""" from .. import util -from . import util as import_util import sys import unittest @@ -208,8 +207,10 @@ class RelativeImports: with self.assertRaises(KeyError): self.__import__('sys', level=1) -Frozen_RelativeImports, Source_RelativeImports = util.test_both( - RelativeImports, __import__=import_util.__import__) + +(Frozen_RelativeImports, + Source_RelativeImports + ) = util.test_both(RelativeImports, __import__=util.__import__) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/import_/util.py b/Lib/test/test_importlib/import_/util.py deleted file mode 100644 index dcb490f967..0000000000 --- a/Lib/test/test_importlib/import_/util.py +++ /dev/null @@ -1,20 +0,0 @@ -from .. import util - -frozen_importlib, source_importlib = util.import_importlib('importlib') - -import builtins -import functools -import importlib -import unittest - - -__import__ = staticmethod(builtins.__import__), staticmethod(source_importlib.__import__) - - -def mock_path_hook(*entries, importer): - """A mock sys.path_hooks entry.""" - def hook(entry): - if entry not in entries: - raise ImportError - return importer - return hook diff --git a/Lib/test/test_importlib/source/test_case_sensitivity.py b/Lib/test/test_importlib/source/test_case_sensitivity.py index efd3146d90..29e95b2bbc 100644 --- a/Lib/test/test_importlib/source/test_case_sensitivity.py +++ b/Lib/test/test_importlib/source/test_case_sensitivity.py @@ -1,6 +1,5 @@ """Test case-sensitivity (PEP 235).""" from .. import util -from . import util as source_util importlib = util.import_importlib('importlib') machinery = util.import_importlib('importlib.machinery') @@ -32,7 +31,7 @@ class CaseSensitivityTest: """Look for a module with matching and non-matching sensitivity.""" sensitive_pkg = 'sensitive.{0}'.format(self.name) insensitive_pkg = 'insensitive.{0}'.format(self.name.lower()) - context = source_util.create_modules(insensitive_pkg, sensitive_pkg) + context = util.create_modules(insensitive_pkg, sensitive_pkg) with context as mapping: sensitive_path = os.path.join(mapping['.root'], 'sensitive') insensitive_path = os.path.join(mapping['.root'], 'insensitive') @@ -63,20 +62,28 @@ class CaseSensitivityTest: self.assertIsNotNone(insensitive) self.assertIn(self.name, insensitive.get_filename(self.name)) + class CaseSensitivityTestPEP302(CaseSensitivityTest): def find(self, finder): return finder.find_module(self.name) -Frozen_CaseSensitivityTestPEP302, Source_CaseSensitivityTestPEP302 = util.test_both( - CaseSensitivityTestPEP302, importlib=importlib, machinery=machinery) + +(Frozen_CaseSensitivityTestPEP302, + Source_CaseSensitivityTestPEP302 + ) = util.test_both(CaseSensitivityTestPEP302, importlib=importlib, + machinery=machinery) + class CaseSensitivityTestPEP451(CaseSensitivityTest): def find(self, finder): found = finder.find_spec(self.name) return found.loader if found is not None else found -Frozen_CaseSensitivityTestPEP451, Source_CaseSensitivityTestPEP451 = util.test_both( - CaseSensitivityTestPEP451, importlib=importlib, machinery=machinery) + +(Frozen_CaseSensitivityTestPEP451, + Source_CaseSensitivityTestPEP451 + ) = util.test_both(CaseSensitivityTestPEP451, importlib=importlib, + machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/source/test_file_loader.py b/Lib/test/test_importlib/source/test_file_loader.py index 2d415f985d..73f4c62070 100644 --- a/Lib/test/test_importlib/source/test_file_loader.py +++ b/Lib/test/test_importlib/source/test_file_loader.py @@ -1,6 +1,5 @@ from .. import abc from .. import util -from . import util as source_util importlib = util.import_importlib('importlib') importlib_abc = util.import_importlib('importlib.abc') @@ -71,7 +70,7 @@ class SimpleTest(abc.LoaderTests): # [basic] def test_module(self): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: loader = self.machinery.SourceFileLoader('_temp', mapping['_temp']) with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) @@ -83,7 +82,7 @@ class SimpleTest(abc.LoaderTests): self.assertEqual(getattr(module, attr), value) def test_package(self): - with source_util.create_modules('_pkg.__init__') as mapping: + with util.create_modules('_pkg.__init__') as mapping: loader = self.machinery.SourceFileLoader('_pkg', mapping['_pkg.__init__']) with warnings.catch_warnings(): @@ -98,7 +97,7 @@ class SimpleTest(abc.LoaderTests): def test_lacking_parent(self): - with source_util.create_modules('_pkg.__init__', '_pkg.mod')as mapping: + with util.create_modules('_pkg.__init__', '_pkg.mod')as mapping: loader = self.machinery.SourceFileLoader('_pkg.mod', mapping['_pkg.mod']) with warnings.catch_warnings(): @@ -115,7 +114,7 @@ class SimpleTest(abc.LoaderTests): return lambda name: fxn(name) + 1 def test_module_reuse(self): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: loader = self.machinery.SourceFileLoader('_temp', mapping['_temp']) with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) @@ -139,7 +138,7 @@ class SimpleTest(abc.LoaderTests): attributes = ('__file__', '__path__', '__package__') value = '<test>' name = '_temp' - with source_util.create_modules(name) as mapping: + with util.create_modules(name) as mapping: orig_module = types.ModuleType(name) for attr in attributes: setattr(orig_module, attr, value) @@ -159,7 +158,7 @@ class SimpleTest(abc.LoaderTests): # [syntax error] def test_bad_syntax(self): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: with open(mapping['_temp'], 'w') as file: file.write('=') loader = self.machinery.SourceFileLoader('_temp', mapping['_temp']) @@ -190,11 +189,11 @@ class SimpleTest(abc.LoaderTests): if os.path.exists(pycache): shutil.rmtree(pycache) - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_timestamp_overflow(self): # When a modification timestamp is larger than 2**32, it should be # truncated rather than raise an OverflowError. - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: source = mapping['_temp'] compiled = self.util.cache_from_source(source) with open(source, 'w') as f: @@ -236,9 +235,11 @@ class SimpleTest(abc.LoaderTests): warnings.simplefilter('ignore', DeprecationWarning) loader.load_module('bad name') -Frozen_SimpleTest, Source_SimpleTest = util.test_both( - SimpleTest, importlib=importlib, machinery=machinery, abc=importlib_abc, - util=importlib_util) + +(Frozen_SimpleTest, + Source_SimpleTest + ) = util.test_both(SimpleTest, importlib=importlib, machinery=machinery, + abc=importlib_abc, util=importlib_util) class BadBytecodeTest: @@ -275,45 +276,45 @@ class BadBytecodeTest: return bytecode_path def _test_empty_file(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bc_path = self.manipulate_bytecode('_temp', mapping, lambda bc: b'', del_source=del_source) test('_temp', mapping, bc_path) - @source_util.writes_bytecode_files + @util.writes_bytecode_files def _test_partial_magic(self, test, *, del_source=False): # When their are less than 4 bytes to a .pyc, regenerate it if # possible, else raise ImportError. - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bc_path = self.manipulate_bytecode('_temp', mapping, lambda bc: bc[:3], del_source=del_source) test('_temp', mapping, bc_path) def _test_magic_only(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bc_path = self.manipulate_bytecode('_temp', mapping, lambda bc: bc[:4], del_source=del_source) test('_temp', mapping, bc_path) def _test_partial_timestamp(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bc_path = self.manipulate_bytecode('_temp', mapping, lambda bc: bc[:7], del_source=del_source) test('_temp', mapping, bc_path) def _test_partial_size(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bc_path = self.manipulate_bytecode('_temp', mapping, lambda bc: bc[:11], del_source=del_source) test('_temp', mapping, bc_path) def _test_no_marshal(self, *, del_source=False): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bc_path = self.manipulate_bytecode('_temp', mapping, lambda bc: bc[:12], del_source=del_source) @@ -322,7 +323,7 @@ class BadBytecodeTest: self.import_(file_path, '_temp') def _test_non_code_marshal(self, *, del_source=False): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bytecode_path = self.manipulate_bytecode('_temp', mapping, lambda bc: bc[:12] + marshal.dumps(b'abcd'), del_source=del_source) @@ -333,7 +334,7 @@ class BadBytecodeTest: self.assertEqual(cm.exception.path, bytecode_path) def _test_bad_marshal(self, *, del_source=False): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bytecode_path = self.manipulate_bytecode('_temp', mapping, lambda bc: bc[:12] + b'<test>', del_source=del_source) @@ -342,11 +343,12 @@ class BadBytecodeTest: self.import_(file_path, '_temp') def _test_bad_magic(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: bc_path = self.manipulate_bytecode('_temp', mapping, lambda bc: b'\x00\x00\x00\x00' + bc[4:]) test('_temp', mapping, bc_path) + class BadBytecodeTestPEP451(BadBytecodeTest): def import_(self, file, module_name): @@ -355,6 +357,7 @@ class BadBytecodeTestPEP451(BadBytecodeTest): module.__spec__ = self.util.spec_from_loader(module_name, loader) loader.exec_module(module) + class BadBytecodeTestPEP302(BadBytecodeTest): def import_(self, file, module_name): @@ -371,7 +374,7 @@ class SourceLoaderBadBytecodeTest: def setUpClass(cls): cls.loader = cls.machinery.SourceFileLoader - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_empty_file(self): # When a .pyc is empty, regenerate it if possible, else raise # ImportError. @@ -390,7 +393,7 @@ class SourceLoaderBadBytecodeTest: self._test_partial_magic(test) - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_magic_only(self): # When there is only the magic number, regenerate the .pyc if possible, # else raise EOFError. @@ -401,7 +404,7 @@ class SourceLoaderBadBytecodeTest: self._test_magic_only(test) - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_bad_magic(self): # When the magic number is different, the bytecode should be # regenerated. @@ -413,7 +416,7 @@ class SourceLoaderBadBytecodeTest: self._test_bad_magic(test) - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_partial_timestamp(self): # When the timestamp is partial, regenerate the .pyc, else # raise EOFError. @@ -424,7 +427,7 @@ class SourceLoaderBadBytecodeTest: self._test_partial_timestamp(test) - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_partial_size(self): # When the size is partial, regenerate the .pyc, else # raise EOFError. @@ -435,29 +438,29 @@ class SourceLoaderBadBytecodeTest: self._test_partial_size(test) - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_no_marshal(self): # When there is only the magic number and timestamp, raise EOFError. self._test_no_marshal() - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_non_code_marshal(self): self._test_non_code_marshal() # XXX ImportError when sourceless # [bad marshal] - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_bad_marshal(self): # Bad marshal data should raise a ValueError. self._test_bad_marshal() # [bad timestamp] - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_old_timestamp(self): # When the timestamp is older than the source, bytecode should be # regenerated. zeros = b'\x00\x00\x00\x00' - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: py_compile.compile(mapping['_temp']) bytecode_path = self.util.cache_from_source(mapping['_temp']) with open(bytecode_path, 'r+b') as bytecode_file: @@ -471,10 +474,10 @@ class SourceLoaderBadBytecodeTest: self.assertEqual(bytecode_file.read(4), source_timestamp) # [bytecode read-only] - @source_util.writes_bytecode_files + @util.writes_bytecode_files def test_read_only_bytecode(self): # When bytecode is read-only but should be rewritten, fail silently. - with source_util.create_modules('_temp') as mapping: + with util.create_modules('_temp') as mapping: # Create bytecode that will need to be re-created. py_compile.compile(mapping['_temp']) bytecode_path = self.util.cache_from_source(mapping['_temp']) @@ -491,21 +494,29 @@ class SourceLoaderBadBytecodeTest: # Make writable for eventual clean-up. os.chmod(bytecode_path, stat.S_IWUSR) + class SourceLoaderBadBytecodeTestPEP451( SourceLoaderBadBytecodeTest, BadBytecodeTestPEP451): pass -Frozen_SourceBadBytecodePEP451, Source_SourceBadBytecodePEP451 = util.test_both( - SourceLoaderBadBytecodeTestPEP451, importlib=importlib, machinery=machinery, - abc=importlib_abc, util=importlib_util) + +(Frozen_SourceBadBytecodePEP451, + Source_SourceBadBytecodePEP451 + ) = util.test_both(SourceLoaderBadBytecodeTestPEP451, importlib=importlib, + machinery=machinery, abc=importlib_abc, + util=importlib_util) + class SourceLoaderBadBytecodeTestPEP302( SourceLoaderBadBytecodeTest, BadBytecodeTestPEP302): pass -Frozen_SourceBadBytecodePEP302, Source_SourceBadBytecodePEP302 = util.test_both( - SourceLoaderBadBytecodeTestPEP302, importlib=importlib, machinery=machinery, - abc=importlib_abc, util=importlib_util) + +(Frozen_SourceBadBytecodePEP302, + Source_SourceBadBytecodePEP302 + ) = util.test_both(SourceLoaderBadBytecodeTestPEP302, importlib=importlib, + machinery=machinery, abc=importlib_abc, + util=importlib_util) class SourcelessLoaderBadBytecodeTest: @@ -567,21 +578,29 @@ class SourcelessLoaderBadBytecodeTest: def test_non_code_marshal(self): self._test_non_code_marshal(del_source=True) + class SourcelessLoaderBadBytecodeTestPEP451(SourcelessLoaderBadBytecodeTest, BadBytecodeTestPEP451): pass -Frozen_SourcelessBadBytecodePEP451, Source_SourcelessBadBytecodePEP451 = util.test_both( - SourcelessLoaderBadBytecodeTestPEP451, importlib=importlib, - machinery=machinery, abc=importlib_abc, util=importlib_util) + +(Frozen_SourcelessBadBytecodePEP451, + Source_SourcelessBadBytecodePEP451 + ) = util.test_both(SourcelessLoaderBadBytecodeTestPEP451, importlib=importlib, + machinery=machinery, abc=importlib_abc, + util=importlib_util) + class SourcelessLoaderBadBytecodeTestPEP302(SourcelessLoaderBadBytecodeTest, BadBytecodeTestPEP302): pass -Frozen_SourcelessBadBytecodePEP302, Source_SourcelessBadBytecodePEP302 = util.test_both( - SourcelessLoaderBadBytecodeTestPEP302, importlib=importlib, - machinery=machinery, abc=importlib_abc, util=importlib_util) + +(Frozen_SourcelessBadBytecodePEP302, + Source_SourcelessBadBytecodePEP302 + ) = util.test_both(SourcelessLoaderBadBytecodeTestPEP302, importlib=importlib, + machinery=machinery, abc=importlib_abc, + util=importlib_util) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/source/test_finder.py b/Lib/test/test_importlib/source/test_finder.py index 473297bdc8..f372b850dc 100644 --- a/Lib/test/test_importlib/source/test_finder.py +++ b/Lib/test/test_importlib/source/test_finder.py @@ -1,6 +1,5 @@ from .. import abc from .. import util -from . import util as source_util machinery = util.import_importlib('importlib.machinery') @@ -60,7 +59,7 @@ class FinderTests(abc.FinderTests): """ if create is None: create = {test} - with source_util.create_modules(*create) as mapping: + with util.create_modules(*create) as mapping: if compile_: for name in compile_: py_compile.compile(mapping[name]) @@ -100,14 +99,14 @@ class FinderTests(abc.FinderTests): # [sub module] def test_module_in_package(self): - with source_util.create_modules('pkg.__init__', 'pkg.sub') as mapping: + with util.create_modules('pkg.__init__', 'pkg.sub') as mapping: pkg_dir = os.path.dirname(mapping['pkg.__init__']) loader = self.import_(pkg_dir, 'pkg.sub') self.assertTrue(hasattr(loader, 'load_module')) # [sub package] def test_package_in_package(self): - context = source_util.create_modules('pkg.__init__', 'pkg.sub.__init__') + context = util.create_modules('pkg.__init__', 'pkg.sub.__init__') with context as mapping: pkg_dir = os.path.dirname(mapping['pkg.__init__']) loader = self.import_(pkg_dir, 'pkg.sub') @@ -120,7 +119,7 @@ class FinderTests(abc.FinderTests): self.assertIn('__init__', loader.get_filename(name)) def test_failure(self): - with source_util.create_modules('blah') as mapping: + with util.create_modules('blah') as mapping: nothing = self.import_(mapping['.root'], 'sdfsadsadf') self.assertIsNone(nothing) @@ -147,7 +146,7 @@ class FinderTests(abc.FinderTests): # Regression test for http://bugs.python.org/issue14846 def test_dir_removal_handling(self): mod = 'mod' - with source_util.create_modules(mod) as mapping: + with util.create_modules(mod) as mapping: finder = self.get_finder(mapping['.root']) found = self._find(finder, 'mod', loader_only=True) self.assertIsNotNone(found) @@ -196,8 +195,10 @@ class FinderTestsPEP451(FinderTests): spec = finder.find_spec(name) return spec.loader if spec is not None else spec -Frozen_FinderTestsPEP451, Source_FinderTestsPEP451 = util.test_both( - FinderTestsPEP451, machinery=machinery) + +(Frozen_FinderTestsPEP451, + Source_FinderTestsPEP451 + ) = util.test_both(FinderTestsPEP451, machinery=machinery) class FinderTestsPEP420(FinderTests): @@ -210,8 +211,10 @@ class FinderTestsPEP420(FinderTests): loader_portions = finder.find_loader(name) return loader_portions[0] if loader_only else loader_portions -Frozen_FinderTestsPEP420, Source_FinderTestsPEP420 = util.test_both( - FinderTestsPEP420, machinery=machinery) + +(Frozen_FinderTestsPEP420, + Source_FinderTestsPEP420 + ) = util.test_both(FinderTestsPEP420, machinery=machinery) class FinderTestsPEP302(FinderTests): @@ -223,9 +226,10 @@ class FinderTestsPEP302(FinderTests): warnings.simplefilter("ignore", DeprecationWarning) return finder.find_module(name) -Frozen_FinderTestsPEP302, Source_FinderTestsPEP302 = util.test_both( - FinderTestsPEP302, machinery=machinery) +(Frozen_FinderTestsPEP302, + Source_FinderTestsPEP302 + ) = util.test_both(FinderTestsPEP302, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/source/test_path_hook.py b/Lib/test/test_importlib/source/test_path_hook.py index 92da77265b..e6a2415bda 100644 --- a/Lib/test/test_importlib/source/test_path_hook.py +++ b/Lib/test/test_importlib/source/test_path_hook.py @@ -1,5 +1,4 @@ from .. import util -from . import util as source_util machinery = util.import_importlib('importlib.machinery') @@ -15,7 +14,7 @@ class PathHookTest: self.machinery.SOURCE_SUFFIXES)) def test_success(self): - with source_util.create_modules('dummy') as mapping: + with util.create_modules('dummy') as mapping: self.assertTrue(hasattr(self.path_hook()(mapping['.root']), 'find_module')) @@ -23,7 +22,10 @@ class PathHookTest: # The empty string represents the cwd. self.assertTrue(hasattr(self.path_hook()(''), 'find_module')) -Frozen_PathHookTest, Source_PathHooktest = util.test_both(PathHookTest, machinery=machinery) + +(Frozen_PathHookTest, + Source_PathHooktest + ) = util.test_both(PathHookTest, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/source/test_source_encoding.py b/Lib/test/test_importlib/source/test_source_encoding.py index c62dfa108d..b604afb5ec 100644 --- a/Lib/test/test_importlib/source/test_source_encoding.py +++ b/Lib/test/test_importlib/source/test_source_encoding.py @@ -1,5 +1,4 @@ from .. import util -from . import util as source_util machinery = util.import_importlib('importlib.machinery') @@ -37,7 +36,7 @@ class EncodingTest: module_name = '_temp' def run_test(self, source): - with source_util.create_modules(self.module_name) as mapping: + with util.create_modules(self.module_name) as mapping: with open(mapping[self.module_name], 'wb') as file: file.write(source) loader = self.machinery.SourceFileLoader(self.module_name, @@ -89,6 +88,7 @@ class EncodingTest: with self.assertRaises(SyntaxError): self.run_test(source) + class EncodingTestPEP451(EncodingTest): def load(self, loader): @@ -97,8 +97,11 @@ class EncodingTestPEP451(EncodingTest): loader.exec_module(module) return module -Frozen_EncodingTestPEP451, Source_EncodingTestPEP451 = util.test_both( - EncodingTestPEP451, machinery=machinery) + +(Frozen_EncodingTestPEP451, + Source_EncodingTestPEP451 + ) = util.test_both(EncodingTestPEP451, machinery=machinery) + class EncodingTestPEP302(EncodingTest): @@ -107,8 +110,10 @@ class EncodingTestPEP302(EncodingTest): warnings.simplefilter('ignore', DeprecationWarning) return loader.load_module(self.module_name) -Frozen_EncodingTestPEP302, Source_EncodingTestPEP302 = util.test_both( - EncodingTestPEP302, machinery=machinery) + +(Frozen_EncodingTestPEP302, + Source_EncodingTestPEP302 + ) = util.test_both(EncodingTestPEP302, machinery=machinery) class LineEndingTest: @@ -120,7 +125,7 @@ class LineEndingTest: module_name = '_temp' source_lines = [b"a = 42", b"b = -13", b''] source = line_ending.join(source_lines) - with source_util.create_modules(module_name) as mapping: + with util.create_modules(module_name) as mapping: with open(mapping[module_name], 'wb') as file: file.write(source) loader = self.machinery.SourceFileLoader(module_name, @@ -139,6 +144,7 @@ class LineEndingTest: def test_lf(self): self.run_test(b'\n') + class LineEndingTestPEP451(LineEndingTest): def load(self, loader, module_name): @@ -147,8 +153,11 @@ class LineEndingTestPEP451(LineEndingTest): loader.exec_module(module) return module -Frozen_LineEndingTestPEP451, Source_LineEndingTestPEP451 = util.test_both( - LineEndingTestPEP451, machinery=machinery) + +(Frozen_LineEndingTestPEP451, + Source_LineEndingTestPEP451 + ) = util.test_both(LineEndingTestPEP451, machinery=machinery) + class LineEndingTestPEP302(LineEndingTest): @@ -157,8 +166,10 @@ class LineEndingTestPEP302(LineEndingTest): warnings.simplefilter('ignore', DeprecationWarning) return loader.load_module(module_name) -Frozen_LineEndingTestPEP302, Source_LineEndingTestPEP302 = util.test_both( - LineEndingTestPEP302, machinery=machinery) + +(Frozen_LineEndingTestPEP302, + Source_LineEndingTestPEP302 + ) = util.test_both(LineEndingTestPEP302, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/source/util.py b/Lib/test/test_importlib/source/util.py deleted file mode 100644 index 63cd25adc4..0000000000 --- a/Lib/test/test_importlib/source/util.py +++ /dev/null @@ -1,96 +0,0 @@ -from .. import util -import contextlib -import errno -import functools -import os -import os.path -import sys -import tempfile -from test import support - - -def writes_bytecode_files(fxn): - """Decorator to protect sys.dont_write_bytecode from mutation and to skip - tests that require it to be set to False.""" - if sys.dont_write_bytecode: - return lambda *args, **kwargs: None - @functools.wraps(fxn) - def wrapper(*args, **kwargs): - original = sys.dont_write_bytecode - sys.dont_write_bytecode = False - try: - to_return = fxn(*args, **kwargs) - finally: - sys.dont_write_bytecode = original - return to_return - return wrapper - - -def ensure_bytecode_path(bytecode_path): - """Ensure that the __pycache__ directory for PEP 3147 pyc file exists. - - :param bytecode_path: File system path to PEP 3147 pyc file. - """ - try: - os.mkdir(os.path.dirname(bytecode_path)) - except OSError as error: - if error.errno != errno.EEXIST: - raise - - -@contextlib.contextmanager -def create_modules(*names): - """Temporarily create each named module with an attribute (named 'attr') - that contains the name passed into the context manager that caused the - creation of the module. - - All files are created in a temporary directory returned by - tempfile.mkdtemp(). This directory is inserted at the beginning of - sys.path. When the context manager exits all created files (source and - bytecode) are explicitly deleted. - - No magic is performed when creating packages! This means that if you create - a module within a package you must also create the package's __init__ as - well. - - """ - source = 'attr = {0!r}' - created_paths = [] - mapping = {} - state_manager = None - uncache_manager = None - try: - temp_dir = tempfile.mkdtemp() - mapping['.root'] = temp_dir - import_names = set() - for name in names: - if not name.endswith('__init__'): - import_name = name - else: - import_name = name[:-len('.__init__')] - import_names.add(import_name) - if import_name in sys.modules: - del sys.modules[import_name] - name_parts = name.split('.') - file_path = temp_dir - for directory in name_parts[:-1]: - file_path = os.path.join(file_path, directory) - if not os.path.exists(file_path): - os.mkdir(file_path) - created_paths.append(file_path) - file_path = os.path.join(file_path, name_parts[-1] + '.py') - with open(file_path, 'w') as file: - file.write(source.format(name)) - created_paths.append(file_path) - mapping[name] = file_path - uncache_manager = util.uncache(*import_names) - uncache_manager.__enter__() - state_manager = util.import_state(path=[temp_dir]) - state_manager.__enter__() - yield mapping - finally: - if state_manager is not None: - state_manager.__exit__(None, None, None) - if uncache_manager is not None: - uncache_manager.__exit__(None, None, None) - support.rmtree(temp_dir) diff --git a/Lib/test/test_importlib/test_abc.py b/Lib/test/test_importlib/test_abc.py index 09b72949ca..66a8d7bfb2 100644 --- a/Lib/test/test_importlib/test_abc.py +++ b/Lib/test/test_importlib/test_abc.py @@ -10,12 +10,13 @@ import unittest from unittest import mock import warnings -from . import util +from . import util as test_util + +init = test_util.import_importlib('importlib') +abc = test_util.import_importlib('importlib.abc') +machinery = test_util.import_importlib('importlib.machinery') +util = test_util.import_importlib('importlib.util') -frozen_init, source_init = util.import_importlib('importlib') -frozen_abc, source_abc = util.import_importlib('importlib.abc') -machinery = util.import_importlib('importlib.machinery') -frozen_util, source_util = util.import_importlib('importlib.util') ##### Inheritance ############################################################## class InheritanceTests: @@ -26,8 +27,7 @@ class InheritanceTests: subclasses = [] superclasses = [] - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def setUp(self): self.superclasses = [getattr(self.abc, class_name) for class_name in self.superclass_names] if hasattr(self, 'subclass_names'): @@ -36,11 +36,11 @@ class InheritanceTests: # checking across module boundaries (i.e. the _bootstrap in abc is # not the same as the one in machinery). That means stealing one of # the modules from the other to make sure the same instance is used. - self.subclasses = [getattr(self.abc.machinery, class_name) - for class_name in self.subclass_names] + machinery = self.abc.machinery + self.subclasses = [getattr(machinery, class_name) + for class_name in self.subclass_names] assert self.subclasses or self.superclasses, self.__class__ - testing = self.__class__.__name__.partition('_')[2] - self.__test = getattr(self.abc, testing) + self.__test = getattr(self.abc, self._NAME) def test_subclasses(self): # Test that the expected subclasses inherit. @@ -54,94 +54,97 @@ class InheritanceTests: self.assertTrue(issubclass(self.__test, superclass), "{0} is not a superclass of {1}".format(superclass, self.__test)) -def create_inheritance_tests(base_class): - def set_frozen(ns): - ns['abc'] = frozen_abc - def set_source(ns): - ns['abc'] = source_abc - - classes = [] - for prefix, ns_set in [('Frozen', set_frozen), ('Source', set_source)]: - classes.append(types.new_class('_'.join([prefix, base_class.__name__]), - (base_class, unittest.TestCase), - exec_body=ns_set)) - return classes - class MetaPathFinder(InheritanceTests): superclass_names = ['Finder'] subclass_names = ['BuiltinImporter', 'FrozenImporter', 'PathFinder', 'WindowsRegistryFinder'] -tests = create_inheritance_tests(MetaPathFinder) -Frozen_MetaPathFinderInheritanceTests, Source_MetaPathFinderInheritanceTests = tests + +(Frozen_MetaPathFinderInheritanceTests, + Source_MetaPathFinderInheritanceTests + ) = test_util.test_both(MetaPathFinder, abc=abc) class PathEntryFinder(InheritanceTests): superclass_names = ['Finder'] subclass_names = ['FileFinder'] -tests = create_inheritance_tests(PathEntryFinder) -Frozen_PathEntryFinderInheritanceTests, Source_PathEntryFinderInheritanceTests = tests + +(Frozen_PathEntryFinderInheritanceTests, + Source_PathEntryFinderInheritanceTests + ) = test_util.test_both(PathEntryFinder, abc=abc) class ResourceLoader(InheritanceTests): superclass_names = ['Loader'] -tests = create_inheritance_tests(ResourceLoader) -Frozen_ResourceLoaderInheritanceTests, Source_ResourceLoaderInheritanceTests = tests + +(Frozen_ResourceLoaderInheritanceTests, + Source_ResourceLoaderInheritanceTests + ) = test_util.test_both(ResourceLoader, abc=abc) class InspectLoader(InheritanceTests): superclass_names = ['Loader'] subclass_names = ['BuiltinImporter', 'FrozenImporter', 'ExtensionFileLoader'] -tests = create_inheritance_tests(InspectLoader) -Frozen_InspectLoaderInheritanceTests, Source_InspectLoaderInheritanceTests = tests + +(Frozen_InspectLoaderInheritanceTests, + Source_InspectLoaderInheritanceTests + ) = test_util.test_both(InspectLoader, abc=abc) class ExecutionLoader(InheritanceTests): superclass_names = ['InspectLoader'] subclass_names = ['ExtensionFileLoader'] -tests = create_inheritance_tests(ExecutionLoader) -Frozen_ExecutionLoaderInheritanceTests, Source_ExecutionLoaderInheritanceTests = tests + +(Frozen_ExecutionLoaderInheritanceTests, + Source_ExecutionLoaderInheritanceTests + ) = test_util.test_both(ExecutionLoader, abc=abc) class FileLoader(InheritanceTests): superclass_names = ['ResourceLoader', 'ExecutionLoader'] subclass_names = ['SourceFileLoader', 'SourcelessFileLoader'] -tests = create_inheritance_tests(FileLoader) -Frozen_FileLoaderInheritanceTests, Source_FileLoaderInheritanceTests = tests + +(Frozen_FileLoaderInheritanceTests, + Source_FileLoaderInheritanceTests + ) = test_util.test_both(FileLoader, abc=abc) class SourceLoader(InheritanceTests): superclass_names = ['ResourceLoader', 'ExecutionLoader'] subclass_names = ['SourceFileLoader'] -tests = create_inheritance_tests(SourceLoader) -Frozen_SourceLoaderInheritanceTests, Source_SourceLoaderInheritanceTests = tests + +(Frozen_SourceLoaderInheritanceTests, + Source_SourceLoaderInheritanceTests + ) = test_util.test_both(SourceLoader, abc=abc) + ##### Default return values #################################################### -def make_abc_subclasses(base_class): - classes = [] - for kind, abc in [('Frozen', frozen_abc), ('Source', source_abc)]: - name = '_'.join([kind, base_class.__name__]) - base_classes = base_class, getattr(abc, base_class.__name__) - classes.append(types.new_class(name, base_classes)) - return classes - -def make_return_value_tests(base_class, test_class): - frozen_class, source_class = make_abc_subclasses(base_class) - tests = [] - for prefix, class_in_test in [('Frozen', frozen_class), ('Source', source_class)]: - def set_ns(ns): - ns['ins'] = class_in_test() - tests.append(types.new_class('_'.join([prefix, test_class.__name__]), - (test_class, unittest.TestCase), - exec_body=set_ns)) - return tests + +def make_abc_subclasses(base_class, name=None, inst=False, **kwargs): + if name is None: + name = base_class.__name__ + base = {kind: getattr(splitabc, name) + for kind, splitabc in abc.items()} + return {cls._KIND: cls() if inst else cls + for cls in test_util.split_frozen(base_class, base, **kwargs)} + + +class ABCTestHarness: + + @property + def ins(self): + # Lazily set ins on the class. + cls = self.SPLIT[self._KIND] + ins = cls() + self.__class__.ins = ins + return ins class MetaPathFinder: @@ -149,10 +152,10 @@ class MetaPathFinder: def find_module(self, fullname, path): return super().find_module(fullname, path) -Frozen_MPF, Source_MPF = make_abc_subclasses(MetaPathFinder) +class MetaPathFinderDefaultsTests(ABCTestHarness): -class MetaPathFinderDefaultsTests: + SPLIT = make_abc_subclasses(MetaPathFinder) def test_find_module(self): # Default should return None. @@ -163,8 +166,9 @@ class MetaPathFinderDefaultsTests: self.ins.invalidate_caches() -tests = make_return_value_tests(MetaPathFinder, MetaPathFinderDefaultsTests) -Frozen_MPFDefaultTests, Source_MPFDefaultTests = tests +(Frozen_MPFDefaultTests, + Source_MPFDefaultTests + ) = test_util.test_both(MetaPathFinderDefaultsTests) class PathEntryFinder: @@ -172,10 +176,10 @@ class PathEntryFinder: def find_loader(self, fullname): return super().find_loader(fullname) -Frozen_PEF, Source_PEF = make_abc_subclasses(PathEntryFinder) +class PathEntryFinderDefaultsTests(ABCTestHarness): -class PathEntryFinderDefaultsTests: + SPLIT = make_abc_subclasses(PathEntryFinder) def test_find_loader(self): self.assertEqual((None, []), self.ins.find_loader('something')) @@ -188,8 +192,9 @@ class PathEntryFinderDefaultsTests: self.ins.invalidate_caches() -tests = make_return_value_tests(PathEntryFinder, PathEntryFinderDefaultsTests) -Frozen_PEFDefaultTests, Source_PEFDefaultTests = tests +(Frozen_PEFDefaultTests, + Source_PEFDefaultTests + ) = test_util.test_both(PathEntryFinderDefaultsTests) class Loader: @@ -198,10 +203,9 @@ class Loader: return super().load_module(fullname) -Frozen_L, Source_L = make_abc_subclasses(Loader) +class LoaderDefaultsTests(ABCTestHarness): - -class LoaderDefaultsTests: + SPLIT = make_abc_subclasses(Loader) def test_load_module(self): with self.assertRaises(ImportError): @@ -217,8 +221,9 @@ class LoaderDefaultsTests: self.assertTrue(repr(mod)) -tests = make_return_value_tests(Loader, LoaderDefaultsTests) -Frozen_LDefaultTests, SourceLDefaultTests = tests +(Frozen_LDefaultTests, + SourceLDefaultTests + ) = test_util.test_both(LoaderDefaultsTests) class ResourceLoader(Loader): @@ -227,18 +232,18 @@ class ResourceLoader(Loader): return super().get_data(path) -Frozen_RL, Source_RL = make_abc_subclasses(ResourceLoader) - +class ResourceLoaderDefaultsTests(ABCTestHarness): -class ResourceLoaderDefaultsTests: + SPLIT = make_abc_subclasses(ResourceLoader) def test_get_data(self): with self.assertRaises(IOError): self.ins.get_data('/some/path') -tests = make_return_value_tests(ResourceLoader, ResourceLoaderDefaultsTests) -Frozen_RLDefaultTests, Source_RLDefaultTests = tests +(Frozen_RLDefaultTests, + Source_RLDefaultTests + ) = test_util.test_both(ResourceLoaderDefaultsTests) class InspectLoader(Loader): @@ -250,10 +255,12 @@ class InspectLoader(Loader): return super().get_source(fullname) -Frozen_IL, Source_IL = make_abc_subclasses(InspectLoader) +SPLIT_IL = make_abc_subclasses(InspectLoader) -class InspectLoaderDefaultsTests: +class InspectLoaderDefaultsTests(ABCTestHarness): + + SPLIT = SPLIT_IL def test_is_package(self): with self.assertRaises(ImportError): @@ -264,8 +271,9 @@ class InspectLoaderDefaultsTests: self.ins.get_source('blah') -tests = make_return_value_tests(InspectLoader, InspectLoaderDefaultsTests) -Frozen_ILDefaultTests, Source_ILDefaultTests = tests +(Frozen_ILDefaultTests, + Source_ILDefaultTests + ) = test_util.test_both(InspectLoaderDefaultsTests) class ExecutionLoader(InspectLoader): @@ -273,21 +281,25 @@ class ExecutionLoader(InspectLoader): def get_filename(self, fullname): return super().get_filename(fullname) -Frozen_EL, Source_EL = make_abc_subclasses(ExecutionLoader) + +SPLIT_EL = make_abc_subclasses(ExecutionLoader) -class ExecutionLoaderDefaultsTests: +class ExecutionLoaderDefaultsTests(ABCTestHarness): + + SPLIT = SPLIT_EL def test_get_filename(self): with self.assertRaises(ImportError): self.ins.get_filename('blah') -tests = make_return_value_tests(ExecutionLoader, InspectLoaderDefaultsTests) -Frozen_ELDefaultTests, Source_ELDefaultsTests = tests +(Frozen_ELDefaultTests, + Source_ELDefaultsTests + ) = test_util.test_both(InspectLoaderDefaultsTests) -##### MetaPathFinder concrete methods ########################################## +##### MetaPathFinder concrete methods ########################################## class MetaPathFinderFindModuleTests: @classmethod @@ -317,13 +329,12 @@ class MetaPathFinderFindModuleTests: self.assertIs(found, spec.loader) -Frozen_MPFFindModuleTests, Source_MPFFindModuleTests = util.test_both( - MetaPathFinderFindModuleTests, - abc=(frozen_abc, source_abc), - util=(frozen_util, source_util)) +(Frozen_MPFFindModuleTests, + Source_MPFFindModuleTests + ) = test_util.test_both(MetaPathFinderFindModuleTests, abc=abc, util=util) -##### PathEntryFinder concrete methods ######################################### +##### PathEntryFinder concrete methods ######################################### class PathEntryFinderFindLoaderTests: @classmethod @@ -361,11 +372,10 @@ class PathEntryFinderFindLoaderTests: self.assertEqual(paths, found[1]) -Frozen_PEFFindLoaderTests, Source_PEFFindLoaderTests = util.test_both( - PathEntryFinderFindLoaderTests, - abc=(frozen_abc, source_abc), - machinery=machinery, - util=(frozen_util, source_util)) +(Frozen_PEFFindLoaderTests, + Source_PEFFindLoaderTests + ) = test_util.test_both(PathEntryFinderFindLoaderTests, abc=abc, util=util, + machinery=machinery) ##### Loader concrete methods ################################################## @@ -386,7 +396,7 @@ class LoaderLoadModuleTests: def test_fresh(self): loader = self.loader() name = 'blah' - with util.uncache(name): + with test_util.uncache(name): loader.load_module(name) module = loader.found self.assertIs(sys.modules[name], module) @@ -404,7 +414,7 @@ class LoaderLoadModuleTests: module = types.ModuleType(name) module.__spec__ = self.util.spec_from_loader(name, loader) module.__loader__ = loader - with util.uncache(name): + with test_util.uncache(name): sys.modules[name] = module loader.load_module(name) found = loader.found @@ -412,10 +422,9 @@ class LoaderLoadModuleTests: self.assertIs(module, sys.modules[name]) -Frozen_LoaderLoadModuleTests, Source_LoaderLoadModuleTests = util.test_both( - LoaderLoadModuleTests, - abc=(frozen_abc, source_abc), - util=(frozen_util, source_util)) +(Frozen_LoaderLoadModuleTests, + Source_LoaderLoadModuleTests + ) = test_util.test_both(LoaderLoadModuleTests, abc=abc, util=util) ##### InspectLoader concrete methods ########################################### @@ -461,11 +470,10 @@ class InspectLoaderSourceToCodeTests: self.assertEqual(code.co_filename, '<string>') -class Frozen_ILSourceToCodeTests(InspectLoaderSourceToCodeTests, unittest.TestCase): - InspectLoaderSubclass = Frozen_IL - -class Source_ILSourceToCodeTests(InspectLoaderSourceToCodeTests, unittest.TestCase): - InspectLoaderSubclass = Source_IL +(Frozen_ILSourceToCodeTests, + Source_ILSourceToCodeTests + ) = test_util.test_both(InspectLoaderSourceToCodeTests, + InspectLoaderSubclass=SPLIT_IL) class InspectLoaderGetCodeTests: @@ -495,11 +503,10 @@ class InspectLoaderGetCodeTests: loader.get_code('blah') -class Frozen_ILGetCodeTests(InspectLoaderGetCodeTests, unittest.TestCase): - InspectLoaderSubclass = Frozen_IL - -class Source_ILGetCodeTests(InspectLoaderGetCodeTests, unittest.TestCase): - InspectLoaderSubclass = Source_IL +(Frozen_ILGetCodeTests, + Source_ILGetCodeTests + ) = test_util.test_both(InspectLoaderGetCodeTests, + InspectLoaderSubclass=SPLIT_IL) class InspectLoaderLoadModuleTests: @@ -543,11 +550,10 @@ class InspectLoaderLoadModuleTests: self.assertEqual(module, sys.modules[self.module_name]) -class Frozen_ILLoadModuleTests(InspectLoaderLoadModuleTests, unittest.TestCase): - InspectLoaderSubclass = Frozen_IL - -class Source_ILLoadModuleTests(InspectLoaderLoadModuleTests, unittest.TestCase): - InspectLoaderSubclass = Source_IL +(Frozen_ILLoadModuleTests, + Source_ILLoadModuleTests + ) = test_util.test_both(InspectLoaderLoadModuleTests, + InspectLoaderSubclass=SPLIT_IL) ##### ExecutionLoader concrete methods ######################################### @@ -608,15 +614,14 @@ class ExecutionLoaderGetCodeTests: self.assertEqual(module.attr, 42) -class Frozen_ELGetCodeTests(ExecutionLoaderGetCodeTests, unittest.TestCase): - ExecutionLoaderSubclass = Frozen_EL - -class Source_ELGetCodeTests(ExecutionLoaderGetCodeTests, unittest.TestCase): - ExecutionLoaderSubclass = Source_EL +(Frozen_ELGetCodeTests, + Source_ELGetCodeTests + ) = test_util.test_both(ExecutionLoaderGetCodeTests, + ExecutionLoaderSubclass=SPLIT_EL) ##### SourceLoader concrete methods ############################################ -class SourceLoader: +class SourceOnlyLoader: # Globals that should be defined for all modules. source = (b"_ = '::'.join([__name__, __file__, __cached__, __package__, " @@ -637,10 +642,10 @@ class SourceLoader: return '<module>' -Frozen_SourceOnlyL, Source_SourceOnlyL = make_abc_subclasses(SourceLoader) +SPLIT_SOL = make_abc_subclasses(SourceOnlyLoader, 'SourceLoader') -class SourceLoader(SourceLoader): +class SourceLoader(SourceOnlyLoader): source_mtime = 1 @@ -677,11 +682,7 @@ class SourceLoader(SourceLoader): return path == self.bytecode_path -Frozen_SL, Source_SL = make_abc_subclasses(SourceLoader) -Frozen_SL.util = frozen_util -Source_SL.util = source_util -Frozen_SL.init = frozen_init -Source_SL.init = source_init +SPLIT_SL = make_abc_subclasses(SourceLoader, util=util, init=init) class SourceLoaderTestHarness: @@ -765,7 +766,7 @@ class SourceOnlyLoaderTests(SourceLoaderTestHarness): # Loading a module should set __name__, __loader__, __package__, # __path__ (for packages), __file__, and __cached__. # The module should also be put into sys.modules. - with util.uncache(self.name): + with test_util.uncache(self.name): with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) module = self.loader.load_module(self.name) @@ -778,7 +779,7 @@ class SourceOnlyLoaderTests(SourceLoaderTestHarness): # is a package. # Testing the values for a package are covered by test_load_module. self.setUp(is_package=False) - with util.uncache(self.name): + with test_util.uncache(self.name): with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) module = self.loader.load_module(self.name) @@ -798,13 +799,10 @@ class SourceOnlyLoaderTests(SourceLoaderTestHarness): self.assertEqual(returned_source, source) -class Frozen_SourceOnlyLTests(SourceOnlyLoaderTests, unittest.TestCase): - loader_mock = Frozen_SourceOnlyL - util = frozen_util - -class Source_SourceOnlyLTests(SourceOnlyLoaderTests, unittest.TestCase): - loader_mock = Source_SourceOnlyL - util = source_util +(Frozen_SourceOnlyLoaderTests, + Source_SourceOnlyLoaderTests + ) = test_util.test_both(SourceOnlyLoaderTests, util=util, + loader_mock=SPLIT_SOL) @unittest.skipIf(sys.dont_write_bytecode, "sys.dont_write_bytecode is true") @@ -896,15 +894,10 @@ class SourceLoaderBytecodeTests(SourceLoaderTestHarness): self.verify_code(code_object) -class Frozen_SLBytecodeTests(SourceLoaderBytecodeTests, unittest.TestCase): - loader_mock = Frozen_SL - init = frozen_init - util = frozen_util - -class SourceSLBytecodeTests(SourceLoaderBytecodeTests, unittest.TestCase): - loader_mock = Source_SL - init = source_init - util = source_util +(Frozen_SLBytecodeTests, + SourceSLBytecodeTests + ) = test_util.test_both(SourceLoaderBytecodeTests, init=init, util=util, + loader_mock=SPLIT_SL) class SourceLoaderGetSourceTests: @@ -940,11 +933,10 @@ class SourceLoaderGetSourceTests: self.assertEqual(mock.get_source(name), expect) -class Frozen_SourceOnlyLGetSourceTests(SourceLoaderGetSourceTests, unittest.TestCase): - SourceOnlyLoaderMock = Frozen_SourceOnlyL - -class Source_SourceOnlyLGetSourceTests(SourceLoaderGetSourceTests, unittest.TestCase): - SourceOnlyLoaderMock = Source_SourceOnlyL +(Frozen_SourceOnlyLoaderGetSourceTests, + Source_SourceOnlyLoaderGetSourceTests + ) = test_util.test_both(SourceLoaderGetSourceTests, + SourceOnlyLoaderMock=SPLIT_SOL) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/test_api.py b/Lib/test/test_importlib/test_api.py index 2a2d42bbfe..6bc3c564a5 100644 --- a/Lib/test/test_importlib/test_api.py +++ b/Lib/test/test_importlib/test_api.py @@ -1,8 +1,8 @@ -from . import util +from . import util as test_util -frozen_init, source_init = util.import_importlib('importlib') -frozen_util, source_util = util.import_importlib('importlib.util') -frozen_machinery, source_machinery = util.import_importlib('importlib.machinery') +init = test_util.import_importlib('importlib') +util = test_util.import_importlib('importlib.util') +machinery = test_util.import_importlib('importlib.machinery') import os.path import sys @@ -18,8 +18,8 @@ class ImportModuleTests: def test_module_import(self): # Test importing a top-level module. - with util.mock_modules('top_level') as mock: - with util.import_state(meta_path=[mock]): + with test_util.mock_modules('top_level') as mock: + with test_util.import_state(meta_path=[mock]): module = self.init.import_module('top_level') self.assertEqual(module.__name__, 'top_level') @@ -28,8 +28,8 @@ class ImportModuleTests: pkg_name = 'pkg' pkg_long_name = '{0}.__init__'.format(pkg_name) name = '{0}.mod'.format(pkg_name) - with util.mock_modules(pkg_long_name, name) as mock: - with util.import_state(meta_path=[mock]): + with test_util.mock_modules(pkg_long_name, name) as mock: + with test_util.import_state(meta_path=[mock]): module = self.init.import_module(name) self.assertEqual(module.__name__, name) @@ -40,16 +40,16 @@ class ImportModuleTests: module_name = 'mod' absolute_name = '{0}.{1}'.format(pkg_name, module_name) relative_name = '.{0}'.format(module_name) - with util.mock_modules(pkg_long_name, absolute_name) as mock: - with util.import_state(meta_path=[mock]): + with test_util.mock_modules(pkg_long_name, absolute_name) as mock: + with test_util.import_state(meta_path=[mock]): self.init.import_module(pkg_name) module = self.init.import_module(relative_name, pkg_name) self.assertEqual(module.__name__, absolute_name) def test_deep_relative_package_import(self): modules = ['a.__init__', 'a.b.__init__', 'a.c'] - with util.mock_modules(*modules) as mock: - with util.import_state(meta_path=[mock]): + with test_util.mock_modules(*modules) as mock: + with test_util.import_state(meta_path=[mock]): self.init.import_module('a') self.init.import_module('a.b') module = self.init.import_module('..c', 'a.b') @@ -61,8 +61,8 @@ class ImportModuleTests: pkg_name = 'pkg' pkg_long_name = '{0}.__init__'.format(pkg_name) name = '{0}.mod'.format(pkg_name) - with util.mock_modules(pkg_long_name, name) as mock: - with util.import_state(meta_path=[mock]): + with test_util.mock_modules(pkg_long_name, name) as mock: + with test_util.import_state(meta_path=[mock]): self.init.import_module(pkg_name) module = self.init.import_module(name, pkg_name) self.assertEqual(module.__name__, name) @@ -86,16 +86,15 @@ class ImportModuleTests: b_load_count += 1 code = {'a': load_a, 'a.b': load_b} modules = ['a.__init__', 'a.b'] - with util.mock_modules(*modules, module_code=code) as mock: - with util.import_state(meta_path=[mock]): + with test_util.mock_modules(*modules, module_code=code) as mock: + with test_util.import_state(meta_path=[mock]): self.init.import_module('a.b') self.assertEqual(b_load_count, 1) -class Frozen_ImportModuleTests(ImportModuleTests, unittest.TestCase): - init = frozen_init -class Source_ImportModuleTests(ImportModuleTests, unittest.TestCase): - init = source_init +(Frozen_ImportModuleTests, + Source_ImportModuleTests + ) = test_util.test_both(ImportModuleTests, init=init) class FindLoaderTests: @@ -107,7 +106,7 @@ class FindLoaderTests: def test_sys_modules(self): # If a module with __loader__ is in sys.modules, then return it. name = 'some_mod' - with util.uncache(name): + with test_util.uncache(name): module = types.ModuleType(name) loader = 'a loader!' module.__loader__ = loader @@ -120,7 +119,7 @@ class FindLoaderTests: def test_sys_modules_loader_is_None(self): # If sys.modules[name].__loader__ is None, raise ValueError. name = 'some_mod' - with util.uncache(name): + with test_util.uncache(name): module = types.ModuleType(name) module.__loader__ = None sys.modules[name] = module @@ -133,7 +132,7 @@ class FindLoaderTests: # Should raise ValueError # Issue #17099 name = 'some_mod' - with util.uncache(name): + with test_util.uncache(name): module = types.ModuleType(name) try: del module.__loader__ @@ -148,8 +147,8 @@ class FindLoaderTests: def test_success(self): # Return the loader found on sys.meta_path. name = 'some_mod' - with util.uncache(name): - with util.import_state(meta_path=[self.FakeMetaFinder]): + with test_util.uncache(name): + with test_util.import_state(meta_path=[self.FakeMetaFinder]): with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) self.assertEqual((name, None), self.init.find_loader(name)) @@ -158,8 +157,8 @@ class FindLoaderTests: # Searching on a path should work. name = 'some_mod' path = 'path to some place' - with util.uncache(name): - with util.import_state(meta_path=[self.FakeMetaFinder]): + with test_util.uncache(name): + with test_util.import_state(meta_path=[self.FakeMetaFinder]): with warnings.catch_warnings(): warnings.simplefilter('ignore', DeprecationWarning) self.assertEqual((name, path), @@ -171,11 +170,10 @@ class FindLoaderTests: warnings.simplefilter('ignore', DeprecationWarning) self.assertIsNone(self.init.find_loader('nevergoingtofindthismodule')) -class Frozen_FindLoaderTests(FindLoaderTests, unittest.TestCase): - init = frozen_init -class Source_FindLoaderTests(FindLoaderTests, unittest.TestCase): - init = source_init +(Frozen_FindLoaderTests, + Source_FindLoaderTests + ) = test_util.test_both(FindLoaderTests, init=init) class ReloadTests: @@ -195,10 +193,10 @@ class ReloadTests: module = type(sys)('top_level') module.spam = 3 sys.modules['top_level'] = module - mock = util.mock_modules('top_level', - module_code={'top_level': code}) + mock = test_util.mock_modules('top_level', + module_code={'top_level': code}) with mock: - with util.import_state(meta_path=[mock]): + with test_util.import_state(meta_path=[mock]): module = self.init.import_module('top_level') reloaded = self.init.reload(module) actual = sys.modules['top_level'] @@ -230,7 +228,7 @@ class ReloadTests: def test_reload_location_changed(self): name = 'spam' with support.temp_cwd(None) as cwd: - with util.uncache('spam'): + with test_util.uncache('spam'): with support.DirsOnSysPath(cwd): # Start as a plain module. self.init.invalidate_caches() @@ -281,7 +279,7 @@ class ReloadTests: def test_reload_namespace_changed(self): name = 'spam' with support.temp_cwd(None) as cwd: - with util.uncache('spam'): + with test_util.uncache('spam'): with support.DirsOnSysPath(cwd): # Start as a namespace package. self.init.invalidate_caches() @@ -338,20 +336,16 @@ class ReloadTests: # See #19851. name = 'spam' subname = 'ham' - with util.temp_module(name, pkg=True) as pkg_dir: - fullname, _ = util.submodule(name, subname, pkg_dir) + with test_util.temp_module(name, pkg=True) as pkg_dir: + fullname, _ = test_util.submodule(name, subname, pkg_dir) ham = self.init.import_module(fullname) reloaded = self.init.reload(ham) self.assertIs(reloaded, ham) -class Frozen_ReloadTests(ReloadTests, unittest.TestCase): - init = frozen_init - util = frozen_util - -class Source_ReloadTests(ReloadTests, unittest.TestCase): - init = source_init - util = source_util +(Frozen_ReloadTests, + Source_ReloadTests + ) = test_util.test_both(ReloadTests, init=init, util=util) class InvalidateCacheTests: @@ -384,11 +378,10 @@ class InvalidateCacheTests: self.addCleanup(lambda: sys.path_importer_cache.__delitem__(key)) self.init.invalidate_caches() # Shouldn't trigger an exception. -class Frozen_InvalidateCacheTests(InvalidateCacheTests, unittest.TestCase): - init = frozen_init -class Source_InvalidateCacheTests(InvalidateCacheTests, unittest.TestCase): - init = source_init +(Frozen_InvalidateCacheTests, + Source_InvalidateCacheTests + ) = test_util.test_both(InvalidateCacheTests, init=init) class FrozenImportlibTests(unittest.TestCase): @@ -398,6 +391,7 @@ class FrozenImportlibTests(unittest.TestCase): # Can't do an isinstance() check since separate copies of importlib # may have been used for import, so just check the name is not for the # frozen loader. + source_init = init['Source'] self.assertNotEqual(source_init.__loader__.__class__.__name__, 'FrozenImporter') @@ -426,11 +420,10 @@ class StartupTests: elif self.machinery.FrozenImporter.find_module(name): self.assertIsNot(module.__spec__, None) -class Frozen_StartupTests(StartupTests, unittest.TestCase): - machinery = frozen_machinery -class Source_StartupTests(StartupTests, unittest.TestCase): - machinery = source_machinery +(Frozen_StartupTests, + Source_StartupTests + ) = test_util.test_both(StartupTests, machinery=machinery) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/test_lazy.py b/Lib/test/test_importlib/test_lazy.py new file mode 100644 index 0000000000..2e191bbf5c --- /dev/null +++ b/Lib/test/test_importlib/test_lazy.py @@ -0,0 +1,132 @@ +import importlib +from importlib import abc +from importlib import util +import unittest + +from . import util as test_util + + +class CollectInit: + + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + def exec_module(self, module): + return self + + +class LazyLoaderFactoryTests(unittest.TestCase): + + def test_init(self): + factory = util.LazyLoader.factory(CollectInit) + # E.g. what importlib.machinery.FileFinder instantiates loaders with + # plus keyword arguments. + lazy_loader = factory('module name', 'module path', kw='kw') + loader = lazy_loader.loader + self.assertEqual(('module name', 'module path'), loader.args) + self.assertEqual({'kw': 'kw'}, loader.kwargs) + + def test_validation(self): + # No exec_module(), no lazy loading. + with self.assertRaises(TypeError): + util.LazyLoader.factory(object) + + +class TestingImporter(abc.MetaPathFinder, abc.Loader): + + module_name = 'lazy_loader_test' + mutated_name = 'changed' + loaded = None + source_code = 'attr = 42; __name__ = {!r}'.format(mutated_name) + + def find_spec(self, name, path, target=None): + if name != self.module_name: + return None + return util.spec_from_loader(name, util.LazyLoader(self)) + + def exec_module(self, module): + exec(self.source_code, module.__dict__) + self.loaded = module + + +class LazyLoaderTests(unittest.TestCase): + + def test_init(self): + with self.assertRaises(TypeError): + util.LazyLoader(object) + + def new_module(self, source_code=None): + loader = TestingImporter() + if source_code is not None: + loader.source_code = source_code + spec = util.spec_from_loader(TestingImporter.module_name, + util.LazyLoader(loader)) + module = spec.loader.create_module(spec) + module.__spec__ = spec + module.__loader__ = spec.loader + spec.loader.exec_module(module) + # Module is now lazy. + self.assertIsNone(loader.loaded) + return module + + def test_e2e(self): + # End-to-end test to verify the load is in fact lazy. + importer = TestingImporter() + assert importer.loaded is None + with test_util.uncache(importer.module_name): + with test_util.import_state(meta_path=[importer]): + module = importlib.import_module(importer.module_name) + self.assertIsNone(importer.loaded) + # Trigger load. + self.assertEqual(module.__loader__, importer) + self.assertIsNotNone(importer.loaded) + self.assertEqual(module, importer.loaded) + + def test_attr_unchanged(self): + # An attribute only mutated as a side-effect of import should not be + # changed needlessly. + module = self.new_module() + self.assertEqual(TestingImporter.mutated_name, module.__name__) + + def test_new_attr(self): + # A new attribute should persist. + module = self.new_module() + module.new_attr = 42 + self.assertEqual(42, module.new_attr) + + def test_mutated_preexisting_attr(self): + # Changing an attribute that already existed on the module -- + # e.g. __name__ -- should persist. + module = self.new_module() + module.__name__ = 'bogus' + self.assertEqual('bogus', module.__name__) + + def test_mutated_attr(self): + # Changing an attribute that comes into existence after an import + # should persist. + module = self.new_module() + module.attr = 6 + self.assertEqual(6, module.attr) + + def test_delete_eventual_attr(self): + # Deleting an attribute should stay deleted. + module = self.new_module() + del module.attr + self.assertFalse(hasattr(module, 'attr')) + + def test_delete_preexisting_attr(self): + module = self.new_module() + del module.__name__ + self.assertFalse(hasattr(module, '__name__')) + + def test_module_substitution_error(self): + source_code = 'import sys; sys.modules[__name__] = 42' + module = self.new_module(source_code) + with test_util.uncache(TestingImporter.module_name): + with self.assertRaises(ValueError): + module.__name__ + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py index dc97ba1567..08050545cd 100644 --- a/Lib/test/test_importlib/test_locks.py +++ b/Lib/test/test_importlib/test_locks.py @@ -1,7 +1,6 @@ -from . import util -frozen_init, source_init = util.import_importlib('importlib') -frozen_bootstrap = frozen_init._bootstrap -source_bootstrap = source_init._bootstrap +from . import util as test_util + +init = test_util.import_importlib('importlib') import sys import time @@ -33,13 +32,16 @@ if threading is not None: # _release_save() unsupported test_release_save_unacquired = None - class Frozen_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests): - LockType = frozen_bootstrap._ModuleLock - - class Source_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests): - LockType = source_bootstrap._ModuleLock + LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock + for kind, splitinit in init.items()} + (Frozen_ModuleLockAsRLockTests, + Source_ModuleLockAsRLockTests + ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests, + LockType=LOCK_TYPES) else: + LOCK_TYPES = {} + class Frozen_ModuleLockAsRLockTests(unittest.TestCase): pass @@ -47,6 +49,7 @@ else: pass +@unittest.skipUnless(threading, "threads needed for this test") class DeadlockAvoidanceTests: def setUp(self): @@ -106,19 +109,22 @@ class DeadlockAvoidanceTests: self.assertEqual(results.count((True, False)), 0) self.assertEqual(results.count((True, True)), len(results)) -@unittest.skipUnless(threading, "threads needed for this test") -class Frozen_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase): - LockType = frozen_bootstrap._ModuleLock - DeadlockError = frozen_bootstrap._DeadlockError -@unittest.skipUnless(threading, "threads needed for this test") -class Source_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase): - LockType = source_bootstrap._ModuleLock - DeadlockError = source_bootstrap._DeadlockError +DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError + for kind, splitinit in init.items()} + +(Frozen_DeadlockAvoidanceTests, + Source_DeadlockAvoidanceTests + ) = test_util.test_both(DeadlockAvoidanceTests, + LockType=LOCK_TYPES, DeadlockError=DEADLOCK_ERRORS) class LifetimeTests: + @property + def bootstrap(self): + return self.init._bootstrap + def test_lock_lifetime(self): name = "xyzzy" self.assertNotIn(name, self.bootstrap._module_locks) @@ -135,11 +141,10 @@ class LifetimeTests: self.assertEqual(0, len(self.bootstrap._module_locks), self.bootstrap._module_locks) -class Frozen_LifetimeTests(LifetimeTests, unittest.TestCase): - bootstrap = frozen_bootstrap -class Source_LifetimeTests(LifetimeTests, unittest.TestCase): - bootstrap = source_bootstrap +(Frozen_LifetimeTests, + Source_LifetimeTests + ) = test_util.test_both(LifetimeTests, init=init) @support.reap_threads diff --git a/Lib/test/test_importlib/test_spec.py b/Lib/test/test_importlib/test_spec.py index 71541f692e..0cb14eea42 100644 --- a/Lib/test/test_importlib/test_spec.py +++ b/Lib/test/test_importlib/test_spec.py @@ -1,10 +1,8 @@ -from . import util +from . import util as test_util -frozen_init, source_init = util.import_importlib('importlib') -frozen_bootstrap = frozen_init._bootstrap -source_bootstrap = source_init._bootstrap -frozen_machinery, source_machinery = util.import_importlib('importlib.machinery') -frozen_util, source_util = util.import_importlib('importlib.util') +init = test_util.import_importlib('importlib') +machinery = test_util.import_importlib('importlib.machinery') +util = test_util.import_importlib('importlib.util') import os.path from test.support import CleanImport @@ -52,6 +50,8 @@ class LegacyLoader(TestLoader): with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) + frozen_util = util['Frozen'] + @frozen_util.module_for_loader def load_module(self, module): module.ham = self.HAM @@ -221,18 +221,17 @@ class ModuleSpecTests: self.assertEqual(self.loc_spec.cached, 'spam.pyc') -class Frozen_ModuleSpecTests(ModuleSpecTests, unittest.TestCase): - util = frozen_util - machinery = frozen_machinery - - -class Source_ModuleSpecTests(ModuleSpecTests, unittest.TestCase): - util = source_util - machinery = source_machinery +(Frozen_ModuleSpecTests, + Source_ModuleSpecTests + ) = test_util.test_both(ModuleSpecTests, util=util, machinery=machinery) class ModuleSpecMethodsTests: + @property + def bootstrap(self): + return self.init._bootstrap + def setUp(self): self.name = 'spam' self.path = 'spam.py' @@ -528,20 +527,18 @@ class ModuleSpecMethodsTests: self.assertIs(installed, loaded) -class Frozen_ModuleSpecMethodsTests(ModuleSpecMethodsTests, unittest.TestCase): - bootstrap = frozen_bootstrap - machinery = frozen_machinery - util = frozen_util - - -class Source_ModuleSpecMethodsTests(ModuleSpecMethodsTests, unittest.TestCase): - bootstrap = source_bootstrap - machinery = source_machinery - util = source_util +(Frozen_ModuleSpecMethodsTests, + Source_ModuleSpecMethodsTests + ) = test_util.test_both(ModuleSpecMethodsTests, init=init, util=util, + machinery=machinery) class ModuleReprTests: + @property + def bootstrap(self): + return self.init._bootstrap + def setUp(self): self.module = type(os)('spam') self.spec = self.machinery.ModuleSpec('spam', TestLoader()) @@ -625,16 +622,10 @@ class ModuleReprTests: self.assertEqual(modrepr, '<module {!r}>'.format('spam')) -class Frozen_ModuleReprTests(ModuleReprTests, unittest.TestCase): - bootstrap = frozen_bootstrap - machinery = frozen_machinery - util = frozen_util - - -class Source_ModuleReprTests(ModuleReprTests, unittest.TestCase): - bootstrap = source_bootstrap - machinery = source_machinery - util = source_util +(Frozen_ModuleReprTests, + Source_ModuleReprTests + ) = test_util.test_both(ModuleReprTests, init=init, util=util, + machinery=machinery) class FactoryTests: @@ -787,7 +778,7 @@ class FactoryTests: # spec_from_file_location() def test_spec_from_file_location_default(self): - if self.machinery is source_machinery: + if self.machinery is machinery['Source']: raise unittest.SkipTest('not sure why this is breaking...') spec = self.util.spec_from_file_location(self.name, self.path) @@ -947,11 +938,6 @@ class FactoryTests: self.assertTrue(spec.has_location) -class Frozen_FactoryTests(FactoryTests, unittest.TestCase): - util = frozen_util - machinery = frozen_machinery - - -class Source_FactoryTests(FactoryTests, unittest.TestCase): - util = source_util - machinery = source_machinery +(Frozen_FactoryTests, + Source_FactoryTests + ) = test_util.test_both(FactoryTests, util=util, machinery=machinery) diff --git a/Lib/test/test_importlib/test_util.py b/Lib/test/test_importlib/test_util.py index b2823c6a78..9428471539 100644 --- a/Lib/test/test_importlib/test_util.py +++ b/Lib/test/test_importlib/test_util.py @@ -1,8 +1,8 @@ -from importlib import util +import importlib.util from . import util as test_util -frozen_init, source_init = test_util.import_importlib('importlib') -frozen_machinery, source_machinery = test_util.import_importlib('importlib.machinery') -frozen_util, source_util = test_util.import_importlib('importlib.util') +init = test_util.import_importlib('importlib') +machinery = test_util.import_importlib('importlib.machinery') +util = test_util.import_importlib('importlib.util') import os import sys @@ -32,8 +32,10 @@ class DecodeSourceBytesTests: self.assertEqual(self.util.decode_source(source_bytes), '\n'.join([self.source, self.source])) -Frozen_DecodeSourceBytesTests, Source_DecodeSourceBytesTests = test_util.test_both( - DecodeSourceBytesTests, util=[frozen_util, source_util]) + +(Frozen_DecodeSourceBytesTests, + Source_DecodeSourceBytesTests + ) = test_util.test_both(DecodeSourceBytesTests, util=util) class ModuleForLoaderTests: @@ -161,8 +163,10 @@ class ModuleForLoaderTests: self.assertIs(module.__loader__, loader) self.assertEqual(module.__package__, name) -Frozen_ModuleForLoaderTests, Source_ModuleForLoaderTests = test_util.test_both( - ModuleForLoaderTests, util=[frozen_util, source_util]) + +(Frozen_ModuleForLoaderTests, + Source_ModuleForLoaderTests + ) = test_util.test_both(ModuleForLoaderTests, util=util) class SetPackageTests: @@ -222,18 +226,25 @@ class SetPackageTests: self.assertEqual(wrapped.__name__, fxn.__name__) self.assertEqual(wrapped.__qualname__, fxn.__qualname__) -Frozen_SetPackageTests, Source_SetPackageTests = test_util.test_both( - SetPackageTests, util=[frozen_util, source_util]) + +(Frozen_SetPackageTests, + Source_SetPackageTests + ) = test_util.test_both(SetPackageTests, util=util) class SetLoaderTests: """Tests importlib.util.set_loader().""" - class DummyLoader: - @util.set_loader - def load_module(self, module): - return self.module + @property + def DummyLoader(self): + # Set DummyLoader on the class lazily. + class DummyLoader: + @self.util.set_loader + def load_module(self, module): + return self.module + self.__class__.DummyLoader = DummyLoader + return DummyLoader def test_no_attribute(self): loader = self.DummyLoader() @@ -262,17 +273,10 @@ class SetLoaderTests: warnings.simplefilter('ignore', DeprecationWarning) self.assertEqual(42, loader.load_module('blah').__loader__) -class Frozen_SetLoaderTests(SetLoaderTests, unittest.TestCase): - class DummyLoader: - @frozen_util.set_loader - def load_module(self, module): - return self.module -class Source_SetLoaderTests(SetLoaderTests, unittest.TestCase): - class DummyLoader: - @source_util.set_loader - def load_module(self, module): - return self.module +(Frozen_SetLoaderTests, + Source_SetLoaderTests + ) = test_util.test_both(SetLoaderTests, util=util) class ResolveNameTests: @@ -307,9 +311,10 @@ class ResolveNameTests: with self.assertRaises(ValueError): self.util.resolve_name('..bacon', 'spam') -Frozen_ResolveNameTests, Source_ResolveNameTests = test_util.test_both( - ResolveNameTests, - util=[frozen_util, source_util]) + +(Frozen_ResolveNameTests, + Source_ResolveNameTests + ) = test_util.test_both(ResolveNameTests, util=util) class FindSpecTests: @@ -446,15 +451,10 @@ class FindSpecTests: self.assertNotIn(fullname, sorted(sys.modules)) -class Frozen_FindSpecTests(FindSpecTests, unittest.TestCase): - init = frozen_init - machinery = frozen_machinery - util = frozen_util - -class Source_FindSpecTests(FindSpecTests, unittest.TestCase): - init = source_init - machinery = source_machinery - util = source_util +(Frozen_FindSpecTests, + Source_FindSpecTests + ) = test_util.test_both(FindSpecTests, init=init, util=util, + machinery=machinery) class MagicNumberTests: @@ -467,8 +467,10 @@ class MagicNumberTests: # The magic number uses \r\n to come out wrong when splitting on lines. self.assertTrue(self.util.MAGIC_NUMBER.endswith(b'\r\n')) -Frozen_MagicNumberTests, Source_MagicNumberTests = test_util.test_both( - MagicNumberTests, util=[frozen_util, source_util]) + +(Frozen_MagicNumberTests, + Source_MagicNumberTests + ) = test_util.test_both(MagicNumberTests, util=util) class PEP3147Tests: @@ -583,9 +585,10 @@ class PEP3147Tests: ValueError, self.util.source_from_cache, '/foo/bar/foo.cpython-32.foo.pyc') -Frozen_PEP3147Tests, Source_PEP3147Tests = test_util.test_both( - PEP3147Tests, - util=[frozen_util, source_util]) + +(Frozen_PEP3147Tests, + Source_PEP3147Tests + ) = test_util.test_both(PEP3147Tests, util=util) if __name__ == '__main__': diff --git a/Lib/test/test_importlib/test_windows.py b/Lib/test/test_importlib/test_windows.py index 96b4adcd05..d4c771c40d 100644 --- a/Lib/test/test_importlib/test_windows.py +++ b/Lib/test/test_importlib/test_windows.py @@ -1,5 +1,5 @@ -from . import util -frozen_machinery, source_machinery = util.import_importlib('importlib.machinery') +from . import util as test_util +machinery = test_util.import_importlib('importlib.machinery') import sys import unittest @@ -19,11 +19,6 @@ class WindowsRegistryFinderTests: self.assertIs(loader, None) -class Frozen_WindowsRegistryFinderTests(WindowsRegistryFinderTests, - unittest.TestCase): - machinery = frozen_machinery - - -class Source_WindowsRegistryFinderTests(WindowsRegistryFinderTests, - unittest.TestCase): - machinery = source_machinery +(Frozen_WindowsRegistryFinderTests, + Source_WindowsRegistryFinderTests + ) = test_util.test_both(WindowsRegistryFinderTests, machinery=machinery) diff --git a/Lib/test/test_importlib/util.py b/Lib/test/test_importlib/util.py index 885cec3b29..aa4cd7e861 100644 --- a/Lib/test/test_importlib/util.py +++ b/Lib/test/test_importlib/util.py @@ -1,31 +1,85 @@ -from contextlib import contextmanager -from importlib import util, invalidate_caches +import builtins +import contextlib +import errno +import functools +import importlib +from importlib import machinery, util, invalidate_caches +import os import os.path from test import support import unittest import sys +import tempfile import types +BUILTINS = types.SimpleNamespace() +BUILTINS.good_name = None +BUILTINS.bad_name = None +if 'errno' in sys.builtin_module_names: + BUILTINS.good_name = 'errno' +if 'importlib' not in sys.builtin_module_names: + BUILTINS.bad_name = 'importlib' + +EXTENSIONS = types.SimpleNamespace() +EXTENSIONS.path = None +EXTENSIONS.ext = None +EXTENSIONS.filename = None +EXTENSIONS.file_path = None +EXTENSIONS.name = '_testcapi' + +def _extension_details(): + global EXTENSIONS + for path in sys.path: + for ext in machinery.EXTENSION_SUFFIXES: + filename = EXTENSIONS.name + ext + file_path = os.path.join(path, filename) + if os.path.exists(file_path): + EXTENSIONS.path = path + EXTENSIONS.ext = ext + EXTENSIONS.filename = filename + EXTENSIONS.file_path = file_path + return + +_extension_details() + + def import_importlib(module_name): """Import a module from importlib both w/ and w/o _frozen_importlib.""" fresh = ('importlib',) if '.' in module_name else () frozen = support.import_fresh_module(module_name) source = support.import_fresh_module(module_name, fresh=fresh, blocked=('_frozen_importlib',)) + return {'Frozen': frozen, 'Source': source} + + +def specialize_class(cls, kind, base=None, **kwargs): + # XXX Support passing in submodule names--load (and cache) them? + # That would clean up the test modules a bit more. + if base is None: + base = unittest.TestCase + elif not isinstance(base, type): + base = base[kind] + name = '{}_{}'.format(kind, cls.__name__) + bases = (cls, base) + specialized = types.new_class(name, bases) + specialized.__module__ = cls.__module__ + specialized._NAME = cls.__name__ + specialized._KIND = kind + for attr, values in kwargs.items(): + value = values[kind] + setattr(specialized, attr, value) + return specialized + + +def split_frozen(cls, base=None, **kwargs): + frozen = specialize_class(cls, 'Frozen', base, **kwargs) + source = specialize_class(cls, 'Source', base, **kwargs) return frozen, source -def test_both(test_class, **kwargs): - frozen_tests = types.new_class('Frozen_'+test_class.__name__, - (test_class, unittest.TestCase)) - source_tests = types.new_class('Source_'+test_class.__name__, - (test_class, unittest.TestCase)) - frozen_tests.__module__ = source_tests.__module__ = test_class.__module__ - for attr, (frozen_value, source_value) in kwargs.items(): - setattr(frozen_tests, attr, frozen_value) - setattr(source_tests, attr, source_value) - return frozen_tests, source_tests +def test_both(test_class, base=None, **kwargs): + return split_frozen(test_class, base, **kwargs) CASE_INSENSITIVE_FS = True @@ -38,6 +92,10 @@ if sys.platform not in ('win32', 'cygwin'): if not os.path.exists(changed_name): CASE_INSENSITIVE_FS = False +source_importlib = import_importlib('importlib')['Source'] +__import__ = {'Frozen': staticmethod(builtins.__import__), + 'Source': staticmethod(source_importlib.__import__)} + def case_insensitive_tests(test): """Class decorator that nullifies tests requiring a case-insensitive @@ -53,7 +111,7 @@ def submodule(parent, name, pkg_dir, content=''): return '{}.{}'.format(parent, name), path -@contextmanager +@contextlib.contextmanager def uncache(*names): """Uncache a module from sys.modules. @@ -79,7 +137,7 @@ def uncache(*names): pass -@contextmanager +@contextlib.contextmanager def temp_module(name, content='', *, pkg=False): conflicts = [n for n in sys.modules if n.partition('.')[0] == name] with support.temp_cwd(None) as cwd: @@ -103,7 +161,7 @@ def temp_module(name, content='', *, pkg=False): yield location -@contextmanager +@contextlib.contextmanager def import_state(**kwargs): """Context manager to manage the various importers and stored state in the sys module. @@ -198,6 +256,7 @@ class mock_modules(_ImporterMock): raise return self.modules[fullname] + class mock_spec(_ImporterMock): """Importer mock using PEP 451 APIs.""" @@ -223,3 +282,99 @@ class mock_spec(_ImporterMock): self.module_code[module.__spec__.name]() except KeyError: pass + + +def writes_bytecode_files(fxn): + """Decorator to protect sys.dont_write_bytecode from mutation and to skip + tests that require it to be set to False.""" + if sys.dont_write_bytecode: + return lambda *args, **kwargs: None + @functools.wraps(fxn) + def wrapper(*args, **kwargs): + original = sys.dont_write_bytecode + sys.dont_write_bytecode = False + try: + to_return = fxn(*args, **kwargs) + finally: + sys.dont_write_bytecode = original + return to_return + return wrapper + + +def ensure_bytecode_path(bytecode_path): + """Ensure that the __pycache__ directory for PEP 3147 pyc file exists. + + :param bytecode_path: File system path to PEP 3147 pyc file. + """ + try: + os.mkdir(os.path.dirname(bytecode_path)) + except OSError as error: + if error.errno != errno.EEXIST: + raise + + +@contextlib.contextmanager +def create_modules(*names): + """Temporarily create each named module with an attribute (named 'attr') + that contains the name passed into the context manager that caused the + creation of the module. + + All files are created in a temporary directory returned by + tempfile.mkdtemp(). This directory is inserted at the beginning of + sys.path. When the context manager exits all created files (source and + bytecode) are explicitly deleted. + + No magic is performed when creating packages! This means that if you create + a module within a package you must also create the package's __init__ as + well. + + """ + source = 'attr = {0!r}' + created_paths = [] + mapping = {} + state_manager = None + uncache_manager = None + try: + temp_dir = tempfile.mkdtemp() + mapping['.root'] = temp_dir + import_names = set() + for name in names: + if not name.endswith('__init__'): + import_name = name + else: + import_name = name[:-len('.__init__')] + import_names.add(import_name) + if import_name in sys.modules: + del sys.modules[import_name] + name_parts = name.split('.') + file_path = temp_dir + for directory in name_parts[:-1]: + file_path = os.path.join(file_path, directory) + if not os.path.exists(file_path): + os.mkdir(file_path) + created_paths.append(file_path) + file_path = os.path.join(file_path, name_parts[-1] + '.py') + with open(file_path, 'w') as file: + file.write(source.format(name)) + created_paths.append(file_path) + mapping[name] = file_path + uncache_manager = uncache(*import_names) + uncache_manager.__enter__() + state_manager = import_state(path=[temp_dir]) + state_manager.__enter__() + yield mapping + finally: + if state_manager is not None: + state_manager.__exit__(None, None, None) + if uncache_manager is not None: + uncache_manager.__exit__(None, None, None) + support.rmtree(temp_dir) + + +def mock_path_hook(*entries, importer): + """A mock sys.path_hooks entry.""" + def hook(entry): + if entry not in entries: + raise ImportError + return importer + return hook diff --git a/Lib/test/test_inspect.py b/Lib/test/test_inspect.py index 1ede3b5097..7ad190b056 100644 --- a/Lib/test/test_inspect.py +++ b/Lib/test/test_inspect.py @@ -8,6 +8,7 @@ import linecache import os from os.path import normcase import _pickle +import pickle import re import shutil import sys @@ -73,6 +74,7 @@ def generator_function_example(self): for i in range(2): yield i + class TestPredicates(IsTestBase): def test_sixteen(self): count = len([x for x in dir(inspect) if x.startswith('is')]) @@ -1611,6 +1613,17 @@ class TestGetGeneratorState(unittest.TestCase): self.assertRaises(TypeError, inspect.getgeneratorlocals, (2,3)) +class MySignature(inspect.Signature): + # Top-level to make it picklable; + # used in test_signature_object_pickle + pass + +class MyParameter(inspect.Parameter): + # Top-level to make it picklable; + # used in test_signature_object_pickle + pass + + class TestSignatureObject(unittest.TestCase): @staticmethod def signature(func): @@ -1668,6 +1681,37 @@ class TestSignatureObject(unittest.TestCase): with self.assertRaisesRegex(ValueError, 'follows default argument'): S((pkd, pk)) + self.assertTrue(repr(sig).startswith('<Signature')) + self.assertTrue('"(po, pk' in repr(sig)) + + def test_signature_object_pickle(self): + def foo(a, b, *, c:1={}, **kw) -> {42:'ham'}: pass + foo_partial = functools.partial(foo, a=1) + + sig = inspect.signature(foo_partial) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver, subclass=False): + sig_pickled = pickle.loads(pickle.dumps(sig, ver)) + self.assertEqual(sig, sig_pickled) + + # Test that basic sub-classing works + sig = inspect.signature(foo) + myparam = MyParameter(name='z', kind=inspect.Parameter.POSITIONAL_ONLY) + myparams = collections.OrderedDict(sig.parameters, a=myparam) + mysig = MySignature().replace(parameters=myparams.values(), + return_annotation=sig.return_annotation) + self.assertTrue(isinstance(mysig, MySignature)) + self.assertTrue(isinstance(mysig.parameters['z'], MyParameter)) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver, subclass=True): + sig_pickled = pickle.loads(pickle.dumps(mysig, ver)) + self.assertEqual(mysig, sig_pickled) + self.assertTrue(isinstance(sig_pickled, MySignature)) + self.assertTrue(isinstance(sig_pickled.parameters['z'], + MyParameter)) + def test_signature_immutability(self): def test(a): pass @@ -2469,11 +2513,29 @@ class TestSignatureObject(unittest.TestCase): def bar(pos, *args, c, b, a=42, **kwargs:int): pass self.assertEqual(inspect.signature(foo), inspect.signature(bar)) - def test_signature_unhashable(self): + def test_signature_hashable(self): + S = inspect.Signature + P = inspect.Parameter + def foo(a): pass - sig = inspect.signature(foo) + foo_sig = inspect.signature(foo) + + manual_sig = S(parameters=[P('a', P.POSITIONAL_OR_KEYWORD)]) + + self.assertEqual(hash(foo_sig), hash(manual_sig)) + self.assertNotEqual(hash(foo_sig), + hash(manual_sig.replace(return_annotation='spam'))) + + def bar(a) -> 1: pass + self.assertNotEqual(hash(foo_sig), hash(inspect.signature(bar))) + + def foo(a={}): pass + with self.assertRaisesRegex(TypeError, 'unhashable type'): + hash(inspect.signature(foo)) + + def foo(a) -> {}: pass with self.assertRaisesRegex(TypeError, 'unhashable type'): - hash(sig) + hash(inspect.signature(foo)) def test_signature_str(self): def foo(a:int=1, *, b, c=None, **kwargs) -> 42: @@ -2547,6 +2609,19 @@ class TestSignatureObject(unittest.TestCase): self.assertEqual(self.signature(Spam.foo), self.signature(Ham.foo)) + def test_signature_from_callable_python_obj(self): + class MySignature(inspect.Signature): pass + def foo(a, *, b:1): pass + foo_sig = MySignature.from_callable(foo) + self.assertTrue(isinstance(foo_sig, MySignature)) + + @unittest.skipIf(MISSING_C_DOCSTRINGS, + "Signature information for builtins requires docstrings") + def test_signature_from_callable_builtin_obj(self): + class MySignature(inspect.Signature): pass + sig = MySignature.from_callable(_pickle.Pickler) + self.assertTrue(isinstance(sig, MySignature)) + class TestParameterObject(unittest.TestCase): def test_signature_parameter_kinds(self): @@ -2592,6 +2667,16 @@ class TestParameterObject(unittest.TestCase): p.replace(kind=inspect.Parameter.VAR_POSITIONAL) self.assertTrue(repr(p).startswith('<Parameter')) + self.assertTrue('"a=42"' in repr(p)) + + def test_signature_parameter_hashable(self): + P = inspect.Parameter + foo = P('foo', kind=P.POSITIONAL_ONLY) + self.assertEqual(hash(foo), hash(P('foo', kind=P.POSITIONAL_ONLY))) + self.assertNotEqual(hash(foo), hash(P('foo', kind=P.POSITIONAL_ONLY, + default=42))) + self.assertNotEqual(hash(foo), + hash(foo.replace(kind=P.VAR_POSITIONAL))) def test_signature_parameter_equality(self): P = inspect.Parameter @@ -2603,13 +2688,6 @@ class TestParameterObject(unittest.TestCase): self.assertEqual(p, P('foo', default=42, kind=inspect.Parameter.KEYWORD_ONLY)) - def test_signature_parameter_unhashable(self): - p = inspect.Parameter('foo', default=42, - kind=inspect.Parameter.KEYWORD_ONLY) - - with self.assertRaisesRegex(TypeError, 'unhashable type'): - hash(p) - def test_signature_parameter_replace(self): p = inspect.Parameter('foo', default=42, kind=inspect.Parameter.KEYWORD_ONLY) @@ -2918,6 +2996,16 @@ class TestBoundArguments(unittest.TestCase): ba4 = inspect.signature(bar).bind(1) self.assertNotEqual(ba, ba4) + def test_signature_bound_arguments_pickle(self): + def foo(a, b, *, c:1={}, **kw) -> {42:'ham'}: pass + sig = inspect.signature(foo) + ba = sig.bind(20, 30, z={}) + + for ver in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(pickle_ver=ver): + ba_pickled = pickle.loads(pickle.dumps(ba, ver)) + self.assertEqual(ba, ba_pickled) + class TestSignaturePrivateHelpers(unittest.TestCase): def test_signature_get_bound_param(self): diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index ef1e05622b..347832d71f 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -2713,6 +2713,34 @@ class TextIOWrapperTest(unittest.TestCase): self.assertFalse(err) self.assertEqual("ok", out.decode().strip()) + def test_read_byteslike(self): + r = MemviewBytesIO(b'Just some random string\n') + t = self.TextIOWrapper(r, 'utf-8') + + # TextIOwrapper will not read the full string, because + # we truncate it to a multiple of the native int size + # so that we can construct a more complex memoryview. + bytes_val = _to_memoryview(r.getvalue()).tobytes() + + self.assertEqual(t.read(200), bytes_val.decode('utf-8')) + +class MemviewBytesIO(io.BytesIO): + '''A BytesIO object whose read method returns memoryviews + rather than bytes''' + + def read1(self, len_): + return _to_memoryview(super().read1(len_)) + + def read(self, len_): + return _to_memoryview(super().read(len_)) + +def _to_memoryview(buf): + '''Convert bytes-object *buf* to a non-trivial memoryview''' + + arr = array.array('i') + idx = len(buf) - len(buf) % arr.itemsize + arr.frombytes(buf[:idx]) + return memoryview(arr) class CTextIOWrapperTest(TextIOWrapperTest): io = io diff --git a/Lib/test/test_ipaddress.py b/Lib/test/test_ipaddress.py index f2947b9c8a..0b71bf81b4 100644 --- a/Lib/test/test_ipaddress.py +++ b/Lib/test/test_ipaddress.py @@ -628,6 +628,119 @@ class IpaddrUnitTest(unittest.TestCase): self.assertEqual("IPv6Interface('::1/128')", repr(ipaddress.IPv6Interface('::1'))) + # issue #16531: constructing IPv4Network from a (address, mask) tuple + def testIPv4Tuple(self): + # /32 + ip = ipaddress.IPv4Address('192.0.2.1') + net = ipaddress.IPv4Network('192.0.2.1/32') + self.assertEqual(ipaddress.IPv4Network(('192.0.2.1', 32)), net) + self.assertEqual(ipaddress.IPv4Network((ip, 32)), net) + self.assertEqual(ipaddress.IPv4Network((3221225985, 32)), net) + self.assertEqual(ipaddress.IPv4Network(('192.0.2.1', + '255.255.255.255')), net) + self.assertEqual(ipaddress.IPv4Network((ip, + '255.255.255.255')), net) + self.assertEqual(ipaddress.IPv4Network((3221225985, + '255.255.255.255')), net) + # strict=True and host bits set + with self.assertRaises(ValueError): + ipaddress.IPv4Network(('192.0.2.1', 24)) + with self.assertRaises(ValueError): + ipaddress.IPv4Network((ip, 24)) + with self.assertRaises(ValueError): + ipaddress.IPv4Network((3221225985, 24)) + with self.assertRaises(ValueError): + ipaddress.IPv4Network(('192.0.2.1', '255.255.255.0')) + with self.assertRaises(ValueError): + ipaddress.IPv4Network((ip, '255.255.255.0')) + with self.assertRaises(ValueError): + ipaddress.IPv4Network((3221225985, '255.255.255.0')) + # strict=False and host bits set + net = ipaddress.IPv4Network('192.0.2.0/24') + self.assertEqual(ipaddress.IPv4Network(('192.0.2.1', 24), + strict=False), net) + self.assertEqual(ipaddress.IPv4Network((ip, 24), + strict=False), net) + self.assertEqual(ipaddress.IPv4Network((3221225985, 24), + strict=False), net) + self.assertEqual(ipaddress.IPv4Network(('192.0.2.1', + '255.255.255.0'), + strict=False), net) + self.assertEqual(ipaddress.IPv4Network((ip, + '255.255.255.0'), + strict=False), net) + self.assertEqual(ipaddress.IPv4Network((3221225985, + '255.255.255.0'), + strict=False), net) + + # /24 + ip = ipaddress.IPv4Address('192.0.2.0') + net = ipaddress.IPv4Network('192.0.2.0/24') + self.assertEqual(ipaddress.IPv4Network(('192.0.2.0', + '255.255.255.0')), net) + self.assertEqual(ipaddress.IPv4Network((ip, + '255.255.255.0')), net) + self.assertEqual(ipaddress.IPv4Network((3221225984, + '255.255.255.0')), net) + self.assertEqual(ipaddress.IPv4Network(('192.0.2.0', 24)), net) + self.assertEqual(ipaddress.IPv4Network((ip, 24)), net) + self.assertEqual(ipaddress.IPv4Network((3221225984, 24)), net) + + self.assertEqual(ipaddress.IPv4Interface(('192.0.2.1', 24)), + ipaddress.IPv4Interface('192.0.2.1/24')) + self.assertEqual(ipaddress.IPv4Interface((3221225985, 24)), + ipaddress.IPv4Interface('192.0.2.1/24')) + + # issue #16531: constructing IPv6Network from a (address, mask) tuple + def testIPv6Tuple(self): + # /128 + ip = ipaddress.IPv6Address('2001:db8::') + net = ipaddress.IPv6Network('2001:db8::/128') + self.assertEqual(ipaddress.IPv6Network(('2001:db8::', '128')), + net) + self.assertEqual(ipaddress.IPv6Network( + (42540766411282592856903984951653826560, 128)), + net) + self.assertEqual(ipaddress.IPv6Network((ip, '128')), + net) + ip = ipaddress.IPv6Address('2001:db8::') + net = ipaddress.IPv6Network('2001:db8::/96') + self.assertEqual(ipaddress.IPv6Network(('2001:db8::', '96')), + net) + self.assertEqual(ipaddress.IPv6Network( + (42540766411282592856903984951653826560, 96)), + net) + self.assertEqual(ipaddress.IPv6Network((ip, '96')), + net) + + # strict=True and host bits set + ip = ipaddress.IPv6Address('2001:db8::1') + with self.assertRaises(ValueError): + ipaddress.IPv6Network(('2001:db8::1', 96)) + with self.assertRaises(ValueError): + ipaddress.IPv6Network(( + 42540766411282592856903984951653826561, 96)) + with self.assertRaises(ValueError): + ipaddress.IPv6Network((ip, 96)) + # strict=False and host bits set + net = ipaddress.IPv6Network('2001:db8::/96') + self.assertEqual(ipaddress.IPv6Network(('2001:db8::1', 96), + strict=False), + net) + self.assertEqual(ipaddress.IPv6Network( + (42540766411282592856903984951653826561, 96), + strict=False), + net) + self.assertEqual(ipaddress.IPv6Network((ip, 96), strict=False), + net) + + # /96 + self.assertEqual(ipaddress.IPv6Interface(('2001:db8::1', '96')), + ipaddress.IPv6Interface('2001:db8::1/96')) + self.assertEqual(ipaddress.IPv6Interface( + (42540766411282592856903984951653826561, '96')), + ipaddress.IPv6Interface('2001:db8::1/96')) + # issue57 def testAddressIntMath(self): self.assertEqual(ipaddress.IPv4Address('1.1.1.1') + 255, @@ -1593,6 +1706,14 @@ class IpaddrUnitTest(unittest.TestCase): addr3.exploded) self.assertEqual('192.168.178.1', addr4.exploded) + def testReversePointer(self): + addr1 = ipaddress.IPv4Address('127.0.0.1') + addr2 = ipaddress.IPv6Address('2001:db8::1') + self.assertEqual('1.0.0.127.in-addr.arpa', addr1.reverse_pointer) + self.assertEqual('1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.' + + 'b.d.0.1.0.0.2.ip6.arpa', + addr2.reverse_pointer) + def testIntRepresentation(self): self.assertEqual(16909060, int(self.ipv4_address)) self.assertEqual(42540616829182469433547762482097946625, diff --git a/Lib/test/test_json/test_tool.py b/Lib/test/test_json/test_tool.py index 0c39e56837..5484a8a7ca 100644 --- a/Lib/test/test_json/test_tool.py +++ b/Lib/test/test_json/test_tool.py @@ -55,6 +55,7 @@ class TestTool(unittest.TestCase): def test_infile_stdout(self): infile = self._create_infile() rc, out, err = assert_python_ok('-m', 'json.tool', infile) + self.assertEqual(rc, 0) self.assertEqual(out.splitlines(), self.expect.encode().splitlines()) self.assertEqual(err, b'') @@ -65,5 +66,12 @@ class TestTool(unittest.TestCase): self.addCleanup(os.remove, outfile) with open(outfile, "r") as fp: self.assertEqual(fp.read(), self.expect) + self.assertEqual(rc, 0) self.assertEqual(out, b'') self.assertEqual(err, b'') + + def test_help_flag(self): + rc, out, err = assert_python_ok('-m', 'json.tool', '-h') + self.assertEqual(rc, 0) + self.assertTrue(out.startswith(b'usage: ')) + self.assertEqual(err, b'') diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py index 48f84ba732..c9f3f16128 100644 --- a/Lib/test/test_math.py +++ b/Lib/test/test_math.py @@ -422,9 +422,17 @@ class MathTests(unittest.TestCase): self.assertEqual(math.factorial(i), py_factorial(i)) self.assertRaises(ValueError, math.factorial, -1) self.assertRaises(ValueError, math.factorial, -1.0) + self.assertRaises(ValueError, math.factorial, -10**100) + self.assertRaises(ValueError, math.factorial, -1e100) self.assertRaises(ValueError, math.factorial, math.pi) - self.assertRaises(OverflowError, math.factorial, sys.maxsize+1) - self.assertRaises(OverflowError, math.factorial, 10e100) + + # Other implementations may place different upper bounds. + @support.cpython_only + def testFactorialHugeInputs(self): + # Currently raises ValueError for inputs that are too large + # to fit into a C long. + self.assertRaises(OverflowError, math.factorial, 10**100) + self.assertRaises(OverflowError, math.factorial, 1e100) def testFloor(self): self.assertRaises(TypeError, math.floor) diff --git a/Lib/test/test_module.py b/Lib/test/test_module.py index 1230293670..9da3536333 100644 --- a/Lib/test/test_module.py +++ b/Lib/test/test_module.py @@ -30,6 +30,22 @@ class ModuleTests(unittest.TestCase): pass self.assertEqual(foo.__doc__, ModuleType.__doc__) + def test_unintialized_missing_getattr(self): + # Issue 8297 + # test the text in the AttributeError of an uninitialized module + foo = ModuleType.__new__(ModuleType) + self.assertRaisesRegex( + AttributeError, "module has no attribute 'not_here'", + getattr, foo, "not_here") + + def test_missing_getattr(self): + # Issue 8297 + # test the text in the AttributeError + foo = ModuleType("foo") + self.assertRaisesRegex( + AttributeError, "module 'foo' has no attribute 'not_here'", + getattr, foo, "not_here") + def test_no_docstring(self): # Regularly initialized module, no docstring foo = ModuleType("foo") @@ -211,6 +227,14 @@ a = A(destroyed)""" b"len = len", b"shutil.rmtree = rmtree"}) + def test_descriptor_errors_propogate(self): + class Descr: + def __get__(self, o, t): + raise RuntimeError + class M(ModuleType): + melon = Descr() + self.assertRaises(RuntimeError, getattr, M("mymod"), "melon") + # frozen and namespace module reprs are tested in importlib. diff --git a/Lib/test/test_operator.py b/Lib/test/test_operator.py index ab58a98365..1bd0391ee2 100644 --- a/Lib/test/test_operator.py +++ b/Lib/test/test_operator.py @@ -203,6 +203,15 @@ class OperatorTestCase: self.assertRaises(TypeError, operator.mul, None, None) self.assertTrue(operator.mul(5, 2) == 10) + def test_matmul(self): + operator = self.module + self.assertRaises(TypeError, operator.matmul) + self.assertRaises(TypeError, operator.matmul, 42, 42) + class M: + def __matmul__(self, other): + return other - 1 + self.assertEqual(M() @ 42, 41) + def test_neg(self): operator = self.module self.assertRaises(TypeError, operator.neg) @@ -416,6 +425,7 @@ class OperatorTestCase: def __ilshift__ (self, other): return "ilshift" def __imod__ (self, other): return "imod" def __imul__ (self, other): return "imul" + def __imatmul__ (self, other): return "imatmul" def __ior__ (self, other): return "ior" def __ipow__ (self, other): return "ipow" def __irshift__ (self, other): return "irshift" @@ -430,6 +440,7 @@ class OperatorTestCase: self.assertEqual(operator.ilshift (c, 5), "ilshift") self.assertEqual(operator.imod (c, 5), "imod") self.assertEqual(operator.imul (c, 5), "imul") + self.assertEqual(operator.imatmul (c, 5), "imatmul") self.assertEqual(operator.ior (c, 5), "ior") self.assertEqual(operator.ipow (c, 5), "ipow") self.assertEqual(operator.irshift (c, 5), "irshift") diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index e129beff54..7d5ee69c45 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -39,6 +39,10 @@ try: import fcntl except ImportError: fcntl = None +try: + import _winapi +except ImportError: + _winapi = None from test.script_helper import assert_python_ok @@ -1773,6 +1777,37 @@ class Win32SymlinkTests(unittest.TestCase): shutil.rmtree(level1) +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") +class Win32JunctionTests(unittest.TestCase): + junction = 'junctiontest' + junction_target = os.path.dirname(os.path.abspath(__file__)) + + def setUp(self): + assert os.path.exists(self.junction_target) + assert not os.path.exists(self.junction) + + def tearDown(self): + if os.path.exists(self.junction): + # os.rmdir delegates to Windows' RemoveDirectoryW, + # which removes junction points safely. + os.rmdir(self.junction) + + def test_create_junction(self): + _winapi.CreateJunction(self.junction_target, self.junction) + self.assertTrue(os.path.exists(self.junction)) + self.assertTrue(os.path.isdir(self.junction)) + + # Junctions are not recognized as links. + self.assertFalse(os.path.islink(self.junction)) + + def test_unlink_removes_junction(self): + _winapi.CreateJunction(self.junction_target, self.junction) + self.assertTrue(os.path.exists(self.junction)) + + os.unlink(self.junction) + self.assertFalse(os.path.exists(self.junction)) + + @support.skip_unless_symlink class NonLocalSymlinkTests(unittest.TestCase): @@ -2544,6 +2579,7 @@ def test_main(): RemoveDirsTests, CPUCountTests, FDInheritanceTests, + Win32JunctionTests, ) if __name__ == "__main__": diff --git a/Lib/test/test_pathlib.py b/Lib/test/test_pathlib.py index 6378d8c7f9..fd9cf232a1 100644 --- a/Lib/test/test_pathlib.py +++ b/Lib/test/test_pathlib.py @@ -1251,6 +1251,26 @@ class _BasePathTest(object): p = self.cls.cwd() self._test_cwd(p) + def test_samefile(self): + fileA_path = os.path.join(BASE, 'fileA') + fileB_path = os.path.join(BASE, 'dirB', 'fileB') + p = self.cls(fileA_path) + pp = self.cls(fileA_path) + q = self.cls(fileB_path) + self.assertTrue(p.samefile(fileA_path)) + self.assertTrue(p.samefile(pp)) + self.assertFalse(p.samefile(fileB_path)) + self.assertFalse(p.samefile(q)) + # Test the non-existent file case + non_existent = os.path.join(BASE, 'foo') + r = self.cls(non_existent) + self.assertRaises(FileNotFoundError, p.samefile, r) + self.assertRaises(FileNotFoundError, p.samefile, non_existent) + self.assertRaises(FileNotFoundError, r.samefile, p) + self.assertRaises(FileNotFoundError, r.samefile, non_existent) + self.assertRaises(FileNotFoundError, r.samefile, r) + self.assertRaises(FileNotFoundError, r.samefile, non_existent) + def test_empty_path(self): # The empty path points to '.' p = self.cls('') diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index d076fc1b9c..d695a0db12 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -349,23 +349,18 @@ class TestPOP3Class(TestCase): if SUPPORTS_SSL: + from test.test_ftplib import SSLConnection - class DummyPOP3_SSLHandler(DummyPOP3Handler): + class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler): def __init__(self, conn): asynchat.async_chat.__init__(self, conn) - ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE, - server_side=True, - do_handshake_on_connect=False) - self.del_channel() - self.set_socket(ssl_socket) - # Must try handshake before calling push() - self.tls_active = True - self.tls_starting = True - self._do_tls_handshake() + self.secure_connection() self.set_terminator(b"\r\n") self.in_buffer = [] self.push('+OK dummy pop3 server ready. <timestamp>') + self.tls_active = True + self.tls_starting = False @requires_ssl diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py index 34edd7630d..8f83c90a7a 100644 --- a/Lib/test/test_selectors.py +++ b/Lib/test/test_selectors.py @@ -441,10 +441,18 @@ class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): SELECTOR = getattr(selectors, 'KqueueSelector', None) +@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'), + "Test needs selectors.DevpollSelector") +class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn): + + SELECTOR = getattr(selectors, 'DevpollSelector', None) + + + def test_main(): tests = [DefaultSelectorTestCase, SelectSelectorTestCase, PollSelectorTestCase, EpollSelectorTestCase, - KqueueSelectorTestCase] + KqueueSelectorTestCase, DevpollSelectorTestCase] support.run_unittest(*tests) support.reap_children() diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py index bfef621093..992a4ceda0 100644 --- a/Lib/test/test_set.py +++ b/Lib/test/test_set.py @@ -929,7 +929,7 @@ class TestBasicOpsString(TestBasicOps, unittest.TestCase): class TestBasicOpsBytes(TestBasicOps, unittest.TestCase): def setUp(self): - self.case = "string set" + self.case = "bytes set" self.values = [b"a", b"b", b"c"] self.set = set(self.values) self.dup = set(self.values) diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 74f74af0b4..92747cf109 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,6 +1,7 @@ import unittest from test import support from contextlib import closing +import enum import gc import pickle import select @@ -39,6 +40,23 @@ def ignoring_eintr(__func, *args, **kwargs): return None +class GenericTests(unittest.TestCase): + + @unittest.skipIf(threading is None, "test needs threading module") + def test_enums(self): + for name in dir(signal): + sig = getattr(signal, name) + if name in {'SIG_DFL', 'SIG_IGN'}: + self.assertIsInstance(sig, signal.Handlers) + elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: + self.assertIsInstance(sig, signal.Sigmasks) + elif name.startswith('SIG') and not name.startswith('SIG_'): + self.assertIsInstance(sig, signal.Signals) + elif name.startswith('CTRL_'): + self.assertIsInstance(sig, signal.Signals) + self.assertEqual(sys.platform, "win32") + + @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class InterProcessSignalTests(unittest.TestCase): MAX_DURATION = 20 # Entire test should last at most 20 sec. @@ -195,6 +213,7 @@ class PosixTests(unittest.TestCase): def test_getsignal(self): hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) + self.assertIsInstance(hup, signal.Handlers) self.assertEqual(signal.getsignal(signal.SIGHUP), self.trivial_signal_handler) signal.signal(signal.SIGHUP, hup) @@ -271,7 +290,7 @@ class WakeupSignalTests(unittest.TestCase): os.close(read) os.close(write) - """.format(signals, ordered, test_body) + """.format(tuple(map(int, signals)), ordered, test_body) assert_python_ok('-c', code) @@ -604,6 +623,8 @@ class PendingSignalsTests(unittest.TestCase): signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) os.kill(os.getpid(), signum) pending = signal.sigpending() + for sig in pending: + assert isinstance(sig, signal.Signals), repr(pending) if pending != {signum}: raise Exception('%s != {%s}' % (pending, signum)) try: @@ -660,6 +681,7 @@ class PendingSignalsTests(unittest.TestCase): code = '''if 1: import signal import sys + from signal import Signals def handler(signum, frame): 1/0 @@ -702,6 +724,7 @@ class PendingSignalsTests(unittest.TestCase): def test(signum): signal.alarm(1) received = signal.sigwait([signum]) + assert isinstance(received, signal.Signals), received if received != signum: raise Exception('received %s, not %s' % (received, signum)) ''') @@ -842,8 +865,14 @@ class PendingSignalsTests(unittest.TestCase): def kill(signum): os.kill(os.getpid(), signum) + def check_mask(mask): + for sig in mask: + assert isinstance(sig, signal.Signals), repr(sig) + def read_sigmask(): - return signal.pthread_sigmask(signal.SIG_BLOCK, []) + sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) + check_mask(sigmask) + return sigmask signum = signal.SIGUSR1 @@ -852,6 +881,7 @@ class PendingSignalsTests(unittest.TestCase): # Unblock SIGUSR1 (and copy the old mask) to test our signal handler old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + check_mask(old_mask) try: kill(signum) except ZeroDivisionError: @@ -861,11 +891,13 @@ class PendingSignalsTests(unittest.TestCase): # Block and then raise SIGUSR1. The signal is blocked: the signal # handler is not called, and the signal is now pending - signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + check_mask(mask) kill(signum) # Check the new mask blocked = read_sigmask() + check_mask(blocked) if signum not in blocked: raise Exception("%s not in %s" % (signum, blocked)) if old_mask ^ blocked != {signum}: @@ -928,7 +960,7 @@ class PendingSignalsTests(unittest.TestCase): def test_main(): try: - support.run_unittest(PosixTests, InterProcessSignalTests, + support.run_unittest(GenericTests, PosixTests, InterProcessSignalTests, WakeupFDTests, WakeupSignalTests, SiginterruptTest, ItimerTest, WindowsSignalTests, PendingSignalsTests) diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index 0617b30a69..8e0fde497c 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -222,38 +222,6 @@ class SocketServerTest(unittest.TestCase): socketserver.DatagramRequestHandler, self.dgram_examine) - @contextlib.contextmanager - def mocked_select_module(self): - """Mocks the select.select() call to raise EINTR for first call""" - old_select = select.select - - class MockSelect: - def __init__(self): - self.called = 0 - - def __call__(self, *args): - self.called += 1 - if self.called == 1: - # raise the exception on first call - raise OSError(errno.EINTR, os.strerror(errno.EINTR)) - else: - # Return real select value for consecutive calls - return old_select(*args) - - select.select = MockSelect() - try: - yield select.select - finally: - select.select = old_select - - def test_InterruptServerSelectCall(self): - with self.mocked_select_module() as mock_select: - pid = self.run_server(socketserver.TCPServer, - socketserver.StreamRequestHandler, - self.stream_examine) - # Make sure select was called again: - self.assertGreater(mock_select.called, 1) - # Alas, on Linux (at least) recvfrom() doesn't return a meaningful # client address so this cannot work: diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 2b3de1f477..f72fb15cca 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -86,6 +86,12 @@ def have_verify_flags(): # 0.9.8 or higher return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15) +def utc_offset(): #NOTE: ignore issues like #1647654 + # local time = utc time + utc offset + if time.daylight and time.localtime().tm_isdst > 0: + return -time.altzone # seconds + return -time.timezone + def asn1time(cert_time): # Some versions of OpenSSL ignore seconds, see #18207 # 0.9.8.i @@ -134,6 +140,14 @@ class BasicSocketTests(unittest.TestCase): self.assertIn(ssl.HAS_SNI, {True, False}) self.assertIn(ssl.HAS_ECDH, {True, False}) + def test_str_for_enums(self): + # Make sure that the PROTOCOL_* constants have enum-like string + # reprs. + proto = ssl.PROTOCOL_SSLv3 + self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv3') + ctx = ssl.SSLContext(proto) + self.assertIs(ctx.protocol, proto) + def test_random(self): v = ssl.RAND_status() if support.verbose: @@ -643,6 +657,71 @@ class BasicSocketTests(unittest.TestCase): ctx.wrap_socket(s) self.assertEqual(str(cx.exception), "only stream sockets are supported") + def cert_time_ok(self, timestring, timestamp): + self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp) + + def cert_time_fail(self, timestring): + with self.assertRaises(ValueError): + ssl.cert_time_to_seconds(timestring) + + @unittest.skipUnless(utc_offset(), + 'local time needs to be different from UTC') + def test_cert_time_to_seconds_timezone(self): + # Issue #19940: ssl.cert_time_to_seconds() returns wrong + # results if local timezone is not UTC + self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0) + self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0) + + def test_cert_time_to_seconds(self): + timestring = "Jan 5 09:34:43 2018 GMT" + ts = 1515144883.0 + self.cert_time_ok(timestring, ts) + # accept keyword parameter, assert its name + self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts) + # accept both %e and %d (space or zero generated by strftime) + self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts) + # case-insensitive + self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts) + self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds + self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT + self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone + self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day + self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month + self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour + self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute + + newyear_ts = 1230768000.0 + # leap seconds + self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts) + # same timestamp + self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts) + + self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899) + # allow 60th second (even if it is not a leap second) + self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900) + # allow 2nd leap second for compatibility with time.strptime() + self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901) + self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds + + # no special treatement for the special value: + # 99991231235959Z (rfc 5280) + self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0) + + @support.run_with_locale('LC_ALL', '') + def test_cert_time_to_seconds_locale(self): + # `cert_time_to_seconds()` should be locale independent + + def local_february_name(): + return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0)) + + if local_february_name().lower() == 'feb': + self.skipTest("locale-specific month name needs to be " + "different from C locale") + + # locale-independent + self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0) + self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT") + class ContextTests(unittest.TestCase): @@ -1371,14 +1450,12 @@ class NetworkedTests(unittest.TestCase): def test_get_server_certificate(self): def _test_get_server_certificate(host, port, cert=None): with support.transient_internet(host): - pem = ssl.get_server_certificate((host, port), - ssl.PROTOCOL_SSLv23) + pem = ssl.get_server_certificate((host, port)) if not pem: self.fail("No server certificate on %s:%s!" % (host, port)) try: pem = ssl.get_server_certificate((host, port), - ssl.PROTOCOL_SSLv23, ca_certs=CERTFILE) except ssl.SSLError as x: #should fail @@ -1388,7 +1465,6 @@ class NetworkedTests(unittest.TestCase): self.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) pem = ssl.get_server_certificate((host, port), - ssl.PROTOCOL_SSLv23, ca_certs=cert) if not pem: self.fail("No server certificate on %s:%s!" % (host, port)) @@ -2471,6 +2547,36 @@ else: s.write(b"over\n") s.close() + def test_nonblocking_send(self): + server = ThreadedEchoServer(CERTFILE, + certreqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1, + cacerts=CERTFILE, + chatty=True, + connectionchatty=False) + with server: + s = ssl.wrap_socket(socket.socket(), + server_side=False, + certfile=CERTFILE, + ca_certs=CERTFILE, + cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) + s.setblocking(False) + + # If we keep sending data, at some point the buffers + # will be full and the call will block + buf = bytearray(8192) + def fill_buffer(): + while True: + s.send(buf) + self.assertRaises((ssl.SSLWantWriteError, + ssl.SSLWantReadError), fill_buffer) + + # Now read all the output and discard it + s.setblocking(True) + s.close() + def test_handshake_timeout(self): # Issue #5103: SSL handshake must respect the socket timeout server = socket.socket(socket.AF_INET) diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index a68ed08df7..4eadd4b9d2 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -635,6 +635,53 @@ class SysModuleTest(unittest.TestCase): expected = None self.check_fsencoding(fs_encoding, expected) + def c_locale_get_error_handler(self, isolated=False, encoding=None): + # Force the POSIX locale + env = os.environ.copy() + env["LC_ALL"] = "C" + code = '\n'.join(( + 'import sys', + 'def dump(name):', + ' std = getattr(sys, name)', + ' print("%s: %s" % (name, std.errors))', + 'dump("stdin")', + 'dump("stdout")', + 'dump("stderr")', + )) + args = [sys.executable, "-c", code] + if isolated: + args.append("-I") + elif encoding: + env['PYTHONIOENCODING'] = encoding + p = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + universal_newlines=True) + stdout, stderr = p.communicate() + return stdout + + def test_c_locale_surrogateescape(self): + out = self.c_locale_get_error_handler(isolated=True) + self.assertEqual(out, + 'stdin: surrogateescape\n' + 'stdout: surrogateescape\n' + 'stderr: backslashreplace\n') + + # replace the default error handler + out = self.c_locale_get_error_handler(encoding=':strict') + self.assertEqual(out, + 'stdin: strict\n' + 'stdout: strict\n' + 'stderr: backslashreplace\n') + + # force the encoding + out = self.c_locale_get_error_handler(encoding='iso8859-1') + self.assertEqual(out, + 'stdin: surrogateescape\n' + 'stdout: surrogateescape\n' + 'stderr: backslashreplace\n') + def test_implementation(self): # This test applies to all implementations equally. @@ -925,7 +972,7 @@ class SizeofTest(unittest.TestCase): check(int, s) # (PyTypeObject + PyNumberMethods + PyMappingMethods + # PySequenceMethods + PyBufferProcs + 4P) - s = vsize('P2n15Pl4Pn9Pn11PIP') + struct.calcsize('34P 3P 10P 2P 4P') + s = vsize('P2n17Pl4Pn9Pn11PIP') + struct.calcsize('34P 3P 10P 2P 4P') # Separate block for PyDictKeysObject with 4 entries s += struct.calcsize("2nPn") + 4*struct.calcsize("n2P") # class diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 92f8bfe920..d828979c47 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -1,7 +1,6 @@ import sys import os import io -import shutil from hashlib import md5 import unittest @@ -456,16 +455,16 @@ class MiscReadTestBase(CommonReadTest): # Test hardlink extraction (e.g. bug #857297). with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: tar.extract("ustar/regtype", TEMPDIR) - self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype")) + self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) tar.extract("ustar/lnktype", TEMPDIR) - self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype")) + self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: data = f.read() self.assertEqual(md5sum(data), md5_regtype) tar.extract("ustar/symtype", TEMPDIR) - self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype")) + self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: data = f.read() self.assertEqual(md5sum(data), md5_regtype) @@ -498,7 +497,7 @@ class MiscReadTestBase(CommonReadTest): self.assertEqual(tarinfo.mtime, file_mtime, errmsg) finally: tar.close() - shutil.rmtree(DIR) + support.rmtree(DIR) def test_extract_directory(self): dirtype = "ustar/dirtype" @@ -513,7 +512,7 @@ class MiscReadTestBase(CommonReadTest): if sys.platform != "win32": self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) finally: - shutil.rmtree(DIR) + support.rmtree(DIR) def test_init_close_fobj(self): # Issue #7341: Close the internal file object in the TarFile @@ -877,7 +876,7 @@ class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): fobj.seek(4096) fobj.truncate() s = os.stat(name) - os.remove(name) + support.unlink(name) return s.st_blocks == 0 else: return False @@ -1010,7 +1009,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - os.rmdir(path) + support.rmdir(path) @unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") @@ -1030,8 +1029,8 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - os.remove(target) - os.remove(link) + support.unlink(target) + support.unlink(link) @support.skip_unless_symlink def test_symlink_size(self): @@ -1045,7 +1044,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - os.remove(path) + support.unlink(path) def test_add_self(self): # Test for #1257255. @@ -1092,7 +1091,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - shutil.rmtree(tempdir) + support.rmtree(tempdir) def test_filter(self): tempdir = os.path.join(TEMPDIR, "filter") @@ -1128,7 +1127,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - shutil.rmtree(tempdir) + support.rmtree(tempdir) # Guarantee that stored pathnames are not modified. Don't # remove ./ or ../ or double slashes. Still make absolute @@ -1156,9 +1155,9 @@ class WriteTest(WriteTestBase, unittest.TestCase): tar.close() if not dir: - os.remove(foo) + support.unlink(foo) else: - os.rmdir(foo) + support.rmdir(foo) self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) @@ -1189,8 +1188,8 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - os.unlink(temparchive) - shutil.rmtree(tempdir) + support.unlink(temparchive) + support.rmtree(tempdir) def test_pathnames(self): self._test_pathname("foo") @@ -1290,7 +1289,7 @@ class StreamWriteTest(WriteTestBase, unittest.TestCase): # Test for issue #8464: Create files with correct # permissions. if os.path.exists(tmpname): - os.remove(tmpname) + support.unlink(tmpname) original_umask = os.umask(0o022) try: @@ -1644,7 +1643,7 @@ class AppendTestBase: def setUp(self): self.tarname = tmpname if os.path.exists(self.tarname): - os.remove(self.tarname) + support.unlink(self.tarname) def _create_testtar(self, mode="w:"): with tarfile.open(tarname, encoding="iso8859-1") as src: @@ -2151,7 +2150,7 @@ def setUpModule(): def tearDownModule(): if os.path.exists(TEMPDIR): - shutil.rmtree(TEMPDIR) + support.rmtree(TEMPDIR) if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_tokenize.py b/Lib/test/test_tokenize.py index 38611a79ee..8f74a06df5 100644 --- a/Lib/test/test_tokenize.py +++ b/Lib/test/test_tokenize.py @@ -464,7 +464,7 @@ Additive Multiplicative - >>> dump_tokens("x = 1//1*1/5*12%0x12") + >>> dump_tokens("x = 1//1*1/5*12%0x12@42") ENCODING 'utf-8' (0, 0) (0, 0) NAME 'x' (1, 0) (1, 1) OP '=' (1, 2) (1, 3) @@ -479,6 +479,8 @@ Multiplicative NUMBER '12' (1, 13) (1, 15) OP '%' (1, 15) (1, 16) NUMBER '0x12' (1, 16) (1, 20) + OP '@' (1, 20) (1, 21) + NUMBER '42' (1, 21) (1, 23) Unary @@ -1154,6 +1156,7 @@ class TestTokenize(TestCase): self.assertExactTypeEqual('//', token.DOUBLESLASH) self.assertExactTypeEqual('//=', token.DOUBLESLASHEQUAL) self.assertExactTypeEqual('@', token.AT) + self.assertExactTypeEqual('@=', token.ATEQUAL) self.assertExactTypeEqual('a**2+b**2==c**2', NAME, token.DOUBLESTAR, NUMBER, diff --git a/Lib/test/test_tuple.py b/Lib/test/test_tuple.py index e41711c8e6..14c6430869 100644 --- a/Lib/test/test_tuple.py +++ b/Lib/test/test_tuple.py @@ -201,6 +201,14 @@ class TupleTest(seq_tests.CommonTest): with self.assertRaises(TypeError): [3,] + T((1,2)) + def test_lexicographic_ordering(self): + # Issue 21100 + a = self.type2test([1, 2]) + b = self.type2test([1, 2, 0]) + c = self.type2test([1, 3]) + self.assertLess(a, b) + self.assertLess(b, c) + def test_main(): support.run_unittest(TupleTest) diff --git a/Lib/test/test_types.py b/Lib/test/test_types.py index ec10752e6a..11d95465a8 100644 --- a/Lib/test/test_types.py +++ b/Lib/test/test_types.py @@ -343,6 +343,8 @@ class TypesTests(unittest.TestCase): self.assertRaises(ValueError, 3 .__format__, ",n") # can't have ',' with 'c' self.assertRaises(ValueError, 3 .__format__, ",c") + # can't have '#' with 'c' + self.assertRaises(ValueError, 3 .__format__, "#c") # ensure that only int and float type specifiers work for format_spec in ([chr(x) for x in range(ord('a'), ord('z')+1)] + diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py index 9ae31d17d2..64e6bf5f6a 100644 --- a/Lib/test/test_unicode.py +++ b/Lib/test/test_unicode.py @@ -8,6 +8,7 @@ Written by Marc-Andre Lemburg (mal@lemburg.com). import _string import codecs import itertools +import operator import struct import sys import unittest @@ -250,6 +251,7 @@ class UnicodeTest(string_tests.CommonTest, {ord('a'): None, ord('b'): ''}) self.checkequalnofix('xyyx', 'xzx', 'translate', {ord('z'): 'yy'}) + # this needs maketrans() self.checkequalnofix('abababc', 'abababc', 'translate', {'b': '<i>'}) @@ -259,6 +261,33 @@ class UnicodeTest(string_tests.CommonTest, tbl = self.type2test.maketrans('abc', 'xyz', 'd') self.checkequalnofix('xyzzy', 'abdcdcbdddd', 'translate', tbl) + # various tests switching from ASCII to latin1 or the opposite; + # same length, remove a letter, or replace with a longer string. + self.assertEqual("[a]".translate(str.maketrans('a', 'X')), + "[X]") + self.assertEqual("[a]".translate(str.maketrans({'a': 'X'})), + "[X]") + self.assertEqual("[a]".translate(str.maketrans({'a': None})), + "[]") + self.assertEqual("[a]".translate(str.maketrans({'a': 'XXX'})), + "[XXX]") + self.assertEqual("[a]".translate(str.maketrans({'a': '\xe9'})), + "[\xe9]") + self.assertEqual("[a]".translate(str.maketrans({'a': '<\xe9>'})), + "[<\xe9>]") + self.assertEqual("[\xe9]".translate(str.maketrans({'\xe9': 'a'})), + "[a]") + self.assertEqual("[\xe9]".translate(str.maketrans({'\xe9': None})), + "[]") + + # invalid Unicode characters + invalid_char = 0x10ffff+1 + for before in "a\xe9\u20ac\U0010ffff": + mapping = str.maketrans({before: invalid_char}) + text = "[%s]" % before + self.assertRaises(ValueError, text.translate, mapping) + + # errors self.assertRaises(TypeError, self.type2test.maketrans) self.assertRaises(ValueError, self.type2test.maketrans, 'abc', 'defg') self.assertRaises(TypeError, self.type2test.maketrans, 2, 'def') @@ -1148,20 +1177,20 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('%.2s' % "a\xe9\u20ac", 'a\xe9') #issue 19995 - class PsuedoInt: + class PseudoInt: def __init__(self, value): self.value = int(value) def __int__(self): return self.value def __index__(self): return self.value - class PsuedoFloat: + class PseudoFloat: def __init__(self, value): self.value = float(value) def __int__(self): return int(self.value) - pi = PsuedoFloat(3.1415) - letter_m = PsuedoInt(109) + pi = PseudoFloat(3.1415) + letter_m = PseudoInt(109) self.assertEqual('%x' % 42, '2a') self.assertEqual('%X' % 15, 'F') self.assertEqual('%o' % 9, '11') @@ -1170,11 +1199,11 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('%X' % letter_m, '6D') self.assertEqual('%o' % letter_m, '155') self.assertEqual('%c' % letter_m, 'm') - self.assertWarns(DeprecationWarning, '%x'.__mod__, pi), - self.assertWarns(DeprecationWarning, '%x'.__mod__, 3.14), - self.assertWarns(DeprecationWarning, '%X'.__mod__, 2.11), - self.assertWarns(DeprecationWarning, '%o'.__mod__, 1.79), - self.assertWarns(DeprecationWarning, '%c'.__mod__, pi), + self.assertRaisesRegex(TypeError, '%x format: an integer is required, not float', operator.mod, '%x', 3.14), + self.assertRaisesRegex(TypeError, '%X format: an integer is required, not float', operator.mod, '%X', 2.11), + self.assertRaisesRegex(TypeError, '%o format: an integer is required, not float', operator.mod, '%o', 1.79), + self.assertRaisesRegex(TypeError, '%x format: an integer is required, not PseudoFloat', operator.mod, '%x', pi), + self.assertRaises(TypeError, operator.mod, '%c', pi), def test_formatting_with_enum(self): # issue18780 diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py index f6a065d850..bb71481adc 100644 --- a/Lib/test/test_wait3.py +++ b/Lib/test/test_wait3.py @@ -18,7 +18,8 @@ class Wait3Test(ForkWait): # This many iterations can be required, since some previously run # tests (e.g. test_ctypes) could have spawned a lot of children # very quickly. - for i in range(30): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # wait3() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait3(os.WNOHANG) diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py index 352c11aade..b427a9b1a4 100644 --- a/Lib/test/test_wait4.py +++ b/Lib/test/test_wait4.py @@ -19,13 +19,14 @@ class Wait4Test(ForkWait): # Issue #11185: wait4 is broken on AIX and will always return 0 # with WNOHANG. option = 0 - for i in range(10): + deadline = time.monotonic() + 10.0 + while time.monotonic() <= deadline: # wait4() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait4(cpid, option) if spid == cpid: break - time.sleep(1.0) + time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8)) self.assertTrue(rusage) diff --git a/Lib/test/test_warnings.py b/Lib/test/test_warnings.py index eec2c24218..cf7f747753 100644 --- a/Lib/test/test_warnings.py +++ b/Lib/test/test_warnings.py @@ -5,7 +5,7 @@ from io import StringIO import sys import unittest from test import support -from test.script_helper import assert_python_ok +from test.script_helper import assert_python_ok, assert_python_failure from test import warning_tests @@ -748,7 +748,19 @@ class EnvironmentVariableTests(BaseTest): "import sys; sys.stdout.write(str(sys.warnoptions))", PYTHONWARNINGS="ignore::DeprecationWarning") self.assertEqual(stdout, - b"['ignore::UnicodeWarning', 'ignore::DeprecationWarning']") + b"['ignore::DeprecationWarning', 'ignore::UnicodeWarning']") + + def test_conflicting_envvar_and_command_line(self): + rc, stdout, stderr = assert_python_failure("-Werror::DeprecationWarning", "-c", + "import sys, warnings; sys.stdout.write(str(sys.warnoptions)); " + "warnings.warn('Message', DeprecationWarning)", + PYTHONWARNINGS="default::DeprecationWarning") + self.assertEqual(stdout, + b"['default::DeprecationWarning', 'error::DeprecationWarning']") + self.assertEqual(stderr.splitlines(), + [b"Traceback (most recent call last):", + b" File \"<string>\", line 1, in <module>", + b"DeprecationWarning: Message"]) @unittest.skipUnless(sys.getfilesystemencoding() != 'ascii', 'requires non-ascii filesystemencoding') diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 99b3eda13e..120c54f16c 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -713,6 +713,23 @@ class SimpleServerTestCase(BaseServerTestCase): conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') conn.close() + def test_context_manager(self): + with xmlrpclib.ServerProxy(URL) as server: + server.add(2, 3) + self.assertNotEqual(server('transport')._connection, + (None, None)) + self.assertEqual(server('transport')._connection, + (None, None)) + + def test_context_manager_method_error(self): + try: + with xmlrpclib.ServerProxy(URL) as server: + server.add(2, "a") + except xmlrpclib.Fault: + pass + self.assertEqual(server('transport')._connection, + (None, None)) + class MultiPathServerTestCase(BaseServerTestCase): threadFunc = staticmethod(http_multi_server) @@ -898,6 +915,7 @@ class ServerProxyTestCase(unittest.TestCase): p = xmlrpclib.ServerProxy(self.url, transport=t) self.assertEqual(p('transport'), t) + # This is a contrived way to make a failure occur on the server side # in order to test the _send_traceback_header flag on the server class FailingMessageClass(http.client.HTTPMessage): diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index 1bef5750a1..2e232f38e1 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -3,7 +3,6 @@ import os import sys import importlib.util import time -import shutil import struct import zipfile import unittest @@ -12,7 +11,7 @@ import unittest from tempfile import TemporaryFile from random import randint, random, getrandbits -from test.support import (TESTFN, findfile, unlink, +from test.support import (TESTFN, findfile, unlink, rmtree, requires_zlib, requires_bz2, requires_lzma, captured_stdout, check_warnings) @@ -691,7 +690,7 @@ class PyZipFileTests(unittest.TestCase): self.assertNotIn('mod2.txt', names) finally: - shutil.rmtree(TESTFN2) + rmtree(TESTFN2) def test_write_python_directory_filtered(self): os.mkdir(TESTFN2) @@ -711,7 +710,7 @@ class PyZipFileTests(unittest.TestCase): self.assertNotIn('mod2.py', names) finally: - shutil.rmtree(TESTFN2) + rmtree(TESTFN2) def test_write_non_pyfile(self): with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: @@ -741,7 +740,7 @@ class PyZipFileTests(unittest.TestCase): self.assertNotIn('mod1.pyo', names) finally: - shutil.rmtree(TESTFN2) + rmtree(TESTFN2) class ExtractTests(unittest.TestCase): @@ -767,7 +766,7 @@ class ExtractTests(unittest.TestCase): os.remove(writtenfile) # remove the test file subdirectories - shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) def test_extract_all(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: @@ -785,7 +784,7 @@ class ExtractTests(unittest.TestCase): os.remove(outfile) # remove the test file subdirectories - shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) def check_file(self, filename, content): self.assertTrue(os.path.isfile(filename)) @@ -867,12 +866,12 @@ class ExtractTests(unittest.TestCase): msg='extract %r: %r != %r' % (arcname, writtenfile, correctfile)) self.check_file(correctfile, content) - shutil.rmtree('target') + rmtree('target') with zipfile.ZipFile(TESTFN2, 'r') as zipfp: zipfp.extractall(targetpath) self.check_file(correctfile, content) - shutil.rmtree('target') + rmtree('target') correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) @@ -881,12 +880,12 @@ class ExtractTests(unittest.TestCase): self.assertEqual(writtenfile, correctfile, msg="extract %r" % arcname) self.check_file(correctfile, content) - shutil.rmtree(fixedname.split('/')[0]) + rmtree(fixedname.split('/')[0]) with zipfile.ZipFile(TESTFN2, 'r') as zipfp: zipfp.extractall() self.check_file(correctfile, content) - shutil.rmtree(fixedname.split('/')[0]) + rmtree(fixedname.split('/')[0]) os.remove(TESTFN2) @@ -1628,7 +1627,7 @@ class TestWithDirectory(unittest.TestCase): self.assertTrue(zipf.filelist[0].filename.endswith("x/")) def tearDown(self): - shutil.rmtree(TESTFN2) + rmtree(TESTFN2) if os.path.exists(TESTFN): unlink(TESTFN) |