From 9ae9bd0e11d626ded92267d28340b70fef6824c2 Mon Sep 17 00:00:00 2001 From: Martin Panter Date: Mon, 13 Jun 2016 03:17:47 +0000 Subject: Issue #27136: Change test to use ::1 for better OS X Tiger compatibility --- Lib/test/test_asyncio/test_base_events.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'Lib') diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 0807dfbf4c..206ebc69fe 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1185,14 +1185,14 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): test_utils.run_briefly(self.loop) # allow transport to close sock.family = socket.AF_INET6 - coro = self.loop.create_connection(asyncio.Protocol, '::2', 80) + coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) t, p = self.loop.run_until_complete(coro) try: - # Without inet_pton we use getaddrinfo, which transforms ('::2', 80) - # to ('::0.0.0.2', 80, 0, 0). The last 0s are flow info, scope id. + # Without inet_pton we use getaddrinfo, which transforms ('::1', 80) + # to ('::1', 80, 0, 0). The last 0s are flow info, scope id. [address] = sock.connect.call_args[0] host, port = address[:2] - self.assertRegex(host, r'::(0\.)*2') + self.assertRegex(host, r'::(0\.)*1') self.assertEqual(port, 80) _, kwargs = m_socket.socket.call_args self.assertEqual(kwargs['family'], m_socket.AF_INET6) -- cgit v1.2.1 From 03b5e72c6323b1fe6315c8c33ad8c24be8d2430b Mon Sep 17 00:00:00 2001 From: Berker Peksag Date: Tue, 14 Jun 2016 00:42:50 +0300 Subject: Replace more boilerplate code with modern unittest features in sqlite3 tests --- Lib/sqlite3/test/dbapi.py | 3 +-- Lib/sqlite3/test/hooks.py | 4 ++-- Lib/sqlite3/test/regression.py | 12 +++--------- 3 files changed, 6 insertions(+), 13 deletions(-) (limited to 'Lib') diff --git a/Lib/sqlite3/test/dbapi.py b/Lib/sqlite3/test/dbapi.py index 6057805f65..903e599031 100644 --- a/Lib/sqlite3/test/dbapi.py +++ b/Lib/sqlite3/test/dbapi.py @@ -335,8 +335,7 @@ class CursorTests(unittest.TestCase): def CheckTotalChanges(self): self.cu.execute("insert into test(name) values ('foo')") self.cu.execute("insert into test(name) values ('foo')") - if self.cx.total_changes < 2: - self.fail("total changes reported wrong value") + self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') # Checks for executemany: # Sequences are required by the DB-API, iterators diff --git a/Lib/sqlite3/test/hooks.py b/Lib/sqlite3/test/hooks.py index 67653aeeb9..de69569d1c 100644 --- a/Lib/sqlite3/test/hooks.py +++ b/Lib/sqlite3/test/hooks.py @@ -61,8 +61,8 @@ class CollationTests(unittest.TestCase): ) order by x collate mycoll """ result = con.execute(sql).fetchall() - if result[0][0] != "c" or result[1][0] != "b" or result[2][0] != "a": - self.fail("the expected order was not returned") + self.assertEqual(result, [('c',), ('b',), ('a',)], + msg='the expected order was not returned') con.create_collation("mycoll", None) with self.assertRaises(sqlite.OperationalError) as cm: diff --git a/Lib/sqlite3/test/regression.py b/Lib/sqlite3/test/regression.py index 85ace84d75..0cf9002d8c 100644 --- a/Lib/sqlite3/test/regression.py +++ b/Lib/sqlite3/test/regression.py @@ -134,17 +134,11 @@ class RegressionTests(unittest.TestCase): def CheckErrorMsgDecodeError(self): # When porting the module to Python 3.0, the error message about # decoding errors disappeared. This verifies they're back again. - failure = None - try: + with self.assertRaises(sqlite.OperationalError) as cm: self.con.execute("select 'xxx' || ? || 'yyy' colname", (bytes(bytearray([250])),)).fetchone() - failure = "should have raised an OperationalError with detailed description" - except sqlite.OperationalError as e: - msg = e.args[0] - if not msg.startswith("Could not decode to UTF-8 column 'colname' with text 'xxx"): - failure = "OperationalError did not have expected description text" - if failure: - self.fail(failure) + msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" + self.assertIn(msg, str(cm.exception)) def CheckRegisterAdapter(self): """ -- cgit v1.2.1 From 6257f11378a3f1a6642afe32c614aa3fe8969117 Mon Sep 17 00:00:00 2001 From: Martin Panter Date: Tue, 14 Jun 2016 01:16:16 +0000 Subject: Issue #16182: Fix readline begidx, endidx, and use locale encoding Based on patch by Serhiy Storchaka. --- Lib/test/test_readline.py | 141 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 137 insertions(+), 4 deletions(-) (limited to 'Lib') diff --git a/Lib/test/test_readline.py b/Lib/test/test_readline.py index 35330ab076..34a55b25ee 100644 --- a/Lib/test/test_readline.py +++ b/Lib/test/test_readline.py @@ -1,15 +1,23 @@ """ Very minimal unittests for parts of the readline module. """ +from contextlib import ExitStack +from errno import EIO import os +import selectors +import subprocess +import sys import tempfile import unittest -from test.support import import_module, unlink +from test.support import import_module, unlink, TESTFN from test.support.script_helper import assert_python_ok # Skip tests if there is no readline module readline = import_module('readline') +@unittest.skipUnless(hasattr(readline, "clear_history"), + "The history update test cannot be run because the " + "clear_history method is not available.") class TestHistoryManipulation (unittest.TestCase): """ These tests were added to check that the libedit emulation on OSX and the @@ -17,9 +25,6 @@ class TestHistoryManipulation (unittest.TestCase): why the tests cover only a small subset of the interface. """ - @unittest.skipUnless(hasattr(readline, "clear_history"), - "The history update test cannot be run because the " - "clear_history method is not available.") def testHistoryUpdates(self): readline.clear_history() @@ -82,6 +87,21 @@ class TestHistoryManipulation (unittest.TestCase): # write_history_file can create the target readline.write_history_file(hfilename) + def test_nonascii_history(self): + readline.clear_history() + try: + readline.add_history("entrée 1") + except UnicodeEncodeError as err: + self.skipTest("Locale cannot encode test data: " + format(err)) + readline.add_history("entrée 2") + readline.replace_history_item(1, "entrée 22") + readline.write_history_file(TESTFN) + self.addCleanup(os.remove, TESTFN) + readline.clear_history() + readline.read_history_file(TESTFN) + self.assertEqual(readline.get_history_item(1), "entrée 1") + self.assertEqual(readline.get_history_item(2), "entrée 22") + class TestReadline(unittest.TestCase): @@ -96,6 +116,119 @@ class TestReadline(unittest.TestCase): TERM='xterm-256color') self.assertEqual(stdout, b'') + def test_nonascii(self): + try: + readline.add_history("\xEB\xEF") + except UnicodeEncodeError as err: + self.skipTest("Locale cannot encode test data: " + format(err)) + + script = r"""import readline + +if readline.__doc__ and "libedit" in readline.__doc__: + readline.parse_and_bind(r'bind ^B ed-prev-char') + readline.parse_and_bind(r'bind "\t" rl_complete') + readline.parse_and_bind('bind -s ^A "|t\xEB[after]"') +else: + readline.parse_and_bind(r'Control-b: backward-char') + readline.parse_and_bind(r'"\t": complete') + readline.parse_and_bind(r'set disable-completion off') + readline.parse_and_bind(r'set show-all-if-ambiguous off') + readline.parse_and_bind(r'set show-all-if-unmodified off') + readline.parse_and_bind('Control-a: "|t\xEB[after]"') + +def pre_input_hook(): + readline.insert_text("[\xEFnserted]") + readline.redisplay() +readline.set_pre_input_hook(pre_input_hook) + +def completer(text, state): + if text == "t\xEB": + if state == 0: + print("text", ascii(text)) + print("line", ascii(readline.get_line_buffer())) + print("indexes", readline.get_begidx(), readline.get_endidx()) + return "t\xEBnt" + if state == 1: + return "t\xEBxt" + if text == "t\xEBx" and state == 0: + return "t\xEBxt" + return None +readline.set_completer(completer) + +def display(substitution, matches, longest_match_length): + print("substitution", ascii(substitution)) + print("matches", ascii(matches)) +readline.set_completion_display_matches_hook(display) + +print("result", ascii(input())) +print("history", ascii(readline.get_history_item(1))) +""" + + input = b"\x01" # Ctrl-A, expands to "|t\xEB[after]" + input += b"\x02" * len("[after]") # Move cursor back + input += b"\t\t" # Display possible completions + input += b"x\t" # Complete "t\xEBx" -> "t\xEBxt" + input += b"\r" + output = run_pty(script, input) + self.assertIn(b"text 't\\xeb'\r\n", output) + self.assertIn(b"line '[\\xefnserted]|t\\xeb[after]'\r\n", output) + self.assertIn(b"indexes 11 13\r\n", output) + self.assertIn(b"substitution 't\\xeb'\r\n", output) + self.assertIn(b"matches ['t\\xebnt', 't\\xebxt']\r\n", output) + expected = br"'[\xefnserted]|t\xebxt[after]'" + self.assertIn(b"result " + expected + b"\r\n", output) + self.assertIn(b"history " + expected + b"\r\n", output) + + +def run_pty(script, input=b"dummy input\r"): + pty = import_module('pty') + output = bytearray() + [master, slave] = pty.openpty() + args = (sys.executable, '-c', script) + proc = subprocess.Popen(args, stdin=slave, stdout=slave, stderr=slave) + os.close(slave) + with ExitStack() as cleanup: + cleanup.enter_context(proc) + def terminate(proc): + try: + proc.terminate() + except ProcessLookupError: + # Workaround for Open/Net BSD bug (Issue 16762) + pass + cleanup.callback(terminate, proc) + cleanup.callback(os.close, master) + # Avoid using DefaultSelector and PollSelector. Kqueue() does not + # work with pseudo-terminals on OS X < 10.9 (Issue 20365) and Open + # BSD (Issue 20667). Poll() does not work with OS X 10.6 or 10.4 + # either (Issue 20472). Hopefully the file descriptor is low enough + # to use with select(). + sel = cleanup.enter_context(selectors.SelectSelector()) + sel.register(master, selectors.EVENT_READ | selectors.EVENT_WRITE) + os.set_blocking(master, False) + while True: + for [_, events] in sel.select(): + if events & selectors.EVENT_READ: + try: + chunk = os.read(master, 0x10000) + except OSError as err: + # Linux raises EIO when slave is closed (Issue 5380) + if err.errno != EIO: + raise + chunk = b"" + if not chunk: + return output + output.extend(chunk) + if events & selectors.EVENT_WRITE: + try: + input = input[os.write(master, input):] + except OSError as err: + # Apparently EIO means the slave was closed + if err.errno != EIO: + raise + input = b"" # Stop writing + if not input: + sel.modify(master, selectors.EVENT_READ) + if __name__ == "__main__": unittest.main() -- cgit v1.2.1 From 55f28b6d7dce4455f6f9230b9330b5a0285727be Mon Sep 17 00:00:00 2001 From: Martin Panter Date: Tue, 14 Jun 2016 01:27:11 +0000 Subject: Issue #22636: avoid using a shell in the ctypes.util module Replace os.popen() with subprocess.Popen. Based on patch by Victor Stinner. If the "gcc", "cc" or "objdump" command is not available, the code was supposed to raise an OSError exception. But there was a bug in the code. The shell code returns the exit code 10 if the required command is missing, and the code tries to check for the status 10. The problem is that os.popen() doesn't return the exit code directly, but a status which should be processed by os.WIFEXITED() and os.WEXITSTATUS(). In practice, the exception was never raised. The OSError exception was not documented and ctypes.util.find_library() is expected to return None if the library is not found. --- Lib/ctypes/test/test_find.py | 7 ++- Lib/ctypes/util.py | 119 +++++++++++++++++++++++++++---------------- 2 files changed, 82 insertions(+), 44 deletions(-) (limited to 'Lib') diff --git a/Lib/ctypes/test/test_find.py b/Lib/ctypes/test/test_find.py index e6bc19d7dd..20c5337a8b 100644 --- a/Lib/ctypes/test/test_find.py +++ b/Lib/ctypes/test/test_find.py @@ -1,5 +1,5 @@ import unittest -import os +import os, os.path import sys import test.support from ctypes import * @@ -64,6 +64,11 @@ class Test_OpenGL_libs(unittest.TestCase): self.skipTest('lib_gle not available') self.gle.gleGetJoinStyle + def test_shell_injection(self): + result = find_library('; echo Hello shell > ' + test.support.TESTFN) + self.assertFalse(os.path.lexists(test.support.TESTFN)) + self.assertIsNone(result) + # On platforms where the default shared library suffix is '.so', # at least some libraries can be loaded as attributes of the cdll # object, since ctypes now tries loading the lib again diff --git a/Lib/ctypes/util.py b/Lib/ctypes/util.py index 8ff4aff0ca..0b96e5953e 100644 --- a/Lib/ctypes/util.py +++ b/Lib/ctypes/util.py @@ -1,6 +1,7 @@ -import sys, os -import contextlib +import os +import shutil import subprocess +import sys # find_library(name) returns the pathname of a library, or None. if os.name == "nt": @@ -94,28 +95,43 @@ elif os.name == "posix": import re, tempfile def _findLib_gcc(name): - expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) - fdout, ccout = tempfile.mkstemp() - os.close(fdout) - cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \ - 'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name + # Run GCC's linker with the -t (aka --trace) option and examine the + # library name it prints out. The GCC command will fail because we + # haven't supplied a proper program with main(), but that does not + # matter. + expr = os.fsencode(r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)) + + c_compiler = shutil.which('gcc') + if not c_compiler: + c_compiler = shutil.which('cc') + if not c_compiler: + # No C compiler available, give up + return None + + temp = tempfile.NamedTemporaryFile() try: - f = os.popen(cmd) - try: - trace = f.read() - finally: - rv = f.close() + args = [c_compiler, '-Wl,-t', '-o', temp.name, '-l' + name] + + env = dict(os.environ) + env['LC_ALL'] = 'C' + env['LANG'] = 'C' + proc = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env) + with proc: + trace = proc.stdout.read() finally: try: - os.unlink(ccout) + temp.close() except FileNotFoundError: + # Raised if the file was already removed, which is the normal + # behaviour of GCC if linking fails pass - if rv == 10: - raise OSError('gcc or cc command not found') res = re.search(expr, trace) if not res: return None - return res.group(0) + return os.fsdecode(res.group(0)) if sys.platform == "sunos5": @@ -123,55 +139,65 @@ elif os.name == "posix": def _get_soname(f): if not f: return None - cmd = "/usr/ccs/bin/dump -Lpv 2>/dev/null " + f - with contextlib.closing(os.popen(cmd)) as f: - data = f.read() - res = re.search(r'\[.*\]\sSONAME\s+([^\s]+)', data) + + proc = subprocess.Popen(("/usr/ccs/bin/dump", "-Lpv", f), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL) + with proc: + data = proc.stdout.read() + res = re.search(br'\[.*\]\sSONAME\s+([^\s]+)', data) if not res: return None - return res.group(1) + return os.fsdecode(res.group(1)) else: def _get_soname(f): # assuming GNU binutils / ELF if not f: return None - cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \ - "objdump -p -j .dynamic 2>/dev/null " + f - f = os.popen(cmd) - try: - dump = f.read() - finally: - rv = f.close() - if rv == 10: - raise OSError('objdump command not found') - res = re.search(r'\sSONAME\s+([^\s]+)', dump) + objdump = shutil.which('objdump') + if not objdump: + # objdump is not available, give up + return None + + proc = subprocess.Popen((objdump, '-p', '-j', '.dynamic', f), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL) + with proc: + dump = proc.stdout.read() + res = re.search(br'\sSONAME\s+([^\s]+)', dump) if not res: return None - return res.group(1) + return os.fsdecode(res.group(1)) if sys.platform.startswith(("freebsd", "openbsd", "dragonfly")): def _num_version(libname): # "libxyz.so.MAJOR.MINOR" => [ MAJOR, MINOR ] - parts = libname.split(".") + parts = libname.split(b".") nums = [] try: while parts: nums.insert(0, int(parts.pop())) except ValueError: pass - return nums or [ sys.maxsize ] + return nums or [sys.maxsize] def find_library(name): ename = re.escape(name) expr = r':-l%s\.\S+ => \S*/(lib%s\.\S+)' % (ename, ename) - with contextlib.closing(os.popen('/sbin/ldconfig -r 2>/dev/null')) as f: - data = f.read() + expr = os.fsencode(expr) + + proc = subprocess.Popen(('/sbin/ldconfig', '-r'), + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL) + with proc: + data = proc.stdout.read() + res = re.findall(expr, data) if not res: return _get_soname(_findLib_gcc(name)) res.sort(key=_num_version) - return res[-1] + return os.fsdecode(res[-1]) elif sys.platform == "sunos5": @@ -179,17 +205,24 @@ elif os.name == "posix": if not os.path.exists('/usr/bin/crle'): return None + env = dict(os.environ) + env['LC_ALL'] = 'C' + if is64: - cmd = 'env LC_ALL=C /usr/bin/crle -64 2>/dev/null' + args = ('/usr/bin/crle', '-64') else: - cmd = 'env LC_ALL=C /usr/bin/crle 2>/dev/null' + args = ('/usr/bin/crle',) paths = None - with contextlib.closing(os.popen(cmd)) as f: - for line in f.readlines(): + proc = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + env=env) + with proc: + for line in proc.stdout: line = line.strip() - if line.startswith('Default Library Path (ELF):'): - paths = line.split()[4] + if line.startswith(b'Default Library Path (ELF):'): + paths = os.fsdecode(line).split()[4] if not paths: return None -- cgit v1.2.1