diff options
author | Sloane Hertel <19572925+s-hertel@users.noreply.github.com> | 2023-02-28 15:21:48 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-28 12:21:48 -0800 |
commit | b981a9dfcd1f799abf6183eade556e653792cc03 (patch) | |
tree | bb339d6f495209576784e1d2bfb5330ec6b5d6ef /lib/ansible/utils | |
parent | c2ea56e12e589c976a1972f71c3ef8200b36eb35 (diff) | |
download | ansible-b981a9dfcd1f799abf6183eade556e653792cc03.tar.gz |
add a worker queue to get updates from the main results thread (#79886)
* Create a queue per WorkerProcess to receive intra-task updates
* Update `pause` action to use the worker queue
* Deprecate ConnectionBase()._new_stdin
* Add new `Display` convenience method `prompt_until` to manage both controller- and worker-sourced prompting without cross-fork stdin sharing, in-worker mechanism to handle request-response over new worker queue.
Diffstat (limited to 'lib/ansible/utils')
-rw-r--r-- | lib/ansible/utils/display.py | 213 |
1 files changed, 208 insertions, 5 deletions
diff --git a/lib/ansible/utils/display.py b/lib/ansible/utils/display.py index e521f2a401..16966aaf0d 100644 --- a/lib/ansible/utils/display.py +++ b/lib/ansible/utils/display.py @@ -18,31 +18,41 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +try: + import curses +except ImportError: + HAS_CURSES = False +else: + # this will be set to False if curses.setupterm() fails + HAS_CURSES = True + import ctypes.util import fcntl import getpass +import io import logging import os import random import subprocess import sys +import termios import textwrap import threading import time +import tty +import typing as t +from functools import wraps from struct import unpack, pack -from termios import TIOCGWINSZ from ansible import constants as C -from ansible.errors import AnsibleError, AnsibleAssertionError +from ansible.errors import AnsibleError, AnsibleAssertionError, AnsiblePromptInterrupt, AnsiblePromptNoninteractive from ansible.module_utils._text import to_bytes, to_text from ansible.module_utils.six import text_type from ansible.utils.color import stringc from ansible.utils.multiprocessing import context as multiprocessing_context from ansible.utils.singleton import Singleton from ansible.utils.unsafe_proxy import wrap_var -from functools import wraps - _LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c')) # Set argtypes, to avoid segfault if the wrong type is provided, @@ -52,6 +62,9 @@ _LIBC.wcswidth.argtypes = (ctypes.c_wchar_p, ctypes.c_int) # Max for c_int _MAX_INT = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1 +MOVE_TO_BOL = b'\r' +CLEAR_TO_EOL = b'\x1b[K' + def get_text_width(text): """Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the @@ -183,6 +196,62 @@ def _synchronize_textiowrapper(tio, lock): buffer.flush = _wrap_with_lock(buffer.flush, lock) +def setraw(fd, when=termios.TCSAFLUSH): + """Put terminal into a raw mode. + + Copied from ``tty`` from CPython 3.11.0, and modified to not remove OPOST from OFLAG + + OPOST is kept to prevent an issue with multi line prompts from being corrupted now that display + is proxied via the queue from forks. The problem is a race condition, in that we proxy the display + over the fork, but before it can be displayed, this plugin will have continued executing, potentially + setting stdout and stdin to raw which remove output post processing that commonly converts NL to CRLF + """ + mode = termios.tcgetattr(fd) + mode[tty.IFLAG] = mode[tty.IFLAG] & ~(termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON) + mode[tty.OFLAG] = mode[tty.OFLAG] & ~(termios.OPOST) + mode[tty.CFLAG] = mode[tty.CFLAG] & ~(termios.CSIZE | termios.PARENB) + mode[tty.CFLAG] = mode[tty.CFLAG] | termios.CS8 + mode[tty.LFLAG] = mode[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) + mode[tty.CC][termios.VMIN] = 1 + mode[tty.CC][termios.VTIME] = 0 + termios.tcsetattr(fd, when, mode) + + +def clear_line(stdout): + stdout.write(b'\x1b[%s' % MOVE_TO_BOL) + stdout.write(b'\x1b[%s' % CLEAR_TO_EOL) + + +def setup_prompt(stdin_fd, stdout_fd, seconds, echo): + # type: (int, int, int, bool) -> None + setraw(stdin_fd) + + # Only set stdout to raw mode if it is a TTY. This is needed when redirecting + # stdout to a file since a file cannot be set to raw mode. + if os.isatty(stdout_fd): + setraw(stdout_fd) + + if echo: + new_settings = termios.tcgetattr(stdin_fd) + new_settings[3] = new_settings[3] | termios.ECHO + termios.tcsetattr(stdin_fd, termios.TCSANOW, new_settings) + + +def setupterm(): + # Nest the try except since curses.error is not available if curses did not import + try: + curses.setupterm() + except (curses.error, TypeError, io.UnsupportedOperation): + global HAS_CURSES + HAS_CURSES = False + else: + global MOVE_TO_BOL + global CLEAR_TO_EOL + # curses.tigetstr() returns None in some circumstances + MOVE_TO_BOL = curses.tigetstr('cr') or MOVE_TO_BOL + CLEAR_TO_EOL = curses.tigetstr('el') or CLEAR_TO_EOL + + class Display(metaclass=Singleton): def __init__(self, verbosity=0): @@ -228,6 +297,8 @@ class Display(metaclass=Singleton): except Exception as ex: self.warning(f"failed to patch stdout/stderr for fork-safety: {ex}") + self.setup_curses = False + def set_queue(self, queue): """Set the _final_q on Display, so that we know to proxy display over the queue instead of directly writing to stdout/stderr from forks @@ -520,7 +591,139 @@ class Display(metaclass=Singleton): def _set_column_width(self): if os.isatty(1): - tty_size = unpack('HHHH', fcntl.ioctl(1, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1] + tty_size = unpack('HHHH', fcntl.ioctl(1, termios.TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1] else: tty_size = 0 self.columns = max(79, tty_size - 1) + + def prompt_until(self, msg, private=False, seconds=None, interrupt_input=None, complete_input=None): + if self._final_q: + from ansible.executor.process.worker import current_worker + self._final_q.send_prompt( + worker_id=current_worker.worker_id, prompt=msg, private=private, seconds=seconds, + interrupt_input=interrupt_input, complete_input=complete_input + ) + return current_worker.worker_queue.get() + + if HAS_CURSES and not self.setup_curses: + setupterm() + self.setup_curses = True + + if ( + self._stdin_fd is None + or not os.isatty(self._stdin_fd) + # Compare the current process group to the process group associated + # with terminal of the given file descriptor to determine if the process + # is running in the background. + or os.getpgrp() != os.tcgetpgrp(self._stdin_fd) + ): + raise AnsiblePromptNoninteractive('stdin is not interactive') + + # When seconds/interrupt_input/complete_input are all None, this does mostly the same thing as input/getpass, + # but self.prompt may raise a KeyboardInterrupt, which must be caught in the main thread. + # If the main thread handled this, it would also need to send a newline to the tty of any hanging pids. + # if seconds is None and interrupt_input is None and complete_input is None: + # try: + # return self.prompt(msg, private=private) + # except KeyboardInterrupt: + # # can't catch in the results_thread_main daemon thread + # raise AnsiblePromptInterrupt('user interrupt') + + self.display(msg) + result = b'' + with self._lock: + original_stdin_settings = termios.tcgetattr(self._stdin_fd) + try: + setup_prompt(self._stdin_fd, self._stdout_fd, seconds, not private) + + # flush the buffer to make sure no previous key presses + # are read in below + termios.tcflush(self._stdin, termios.TCIFLUSH) + + # read input 1 char at a time until the optional timeout or complete/interrupt condition is met + return self._read_non_blocking_stdin(echo=not private, seconds=seconds, interrupt_input=interrupt_input, complete_input=complete_input) + finally: + # restore the old settings for the duped stdin stdin_fd + termios.tcsetattr(self._stdin_fd, termios.TCSADRAIN, original_stdin_settings) + + def _read_non_blocking_stdin( + self, + echo=False, # type: bool + seconds=None, # type: int + interrupt_input=None, # type: t.Iterable[bytes] + complete_input=None, # type: t.Iterable[bytes] + ): # type: (...) -> bytes + + if self._final_q: + raise NotImplementedError + + if seconds is not None: + start = time.time() + if interrupt_input is None: + try: + interrupt = termios.tcgetattr(sys.stdin.buffer.fileno())[6][termios.VINTR] + except Exception: + interrupt = b'\x03' # value for Ctrl+C + + try: + backspace_sequences = [termios.tcgetattr(self._stdin_fd)[6][termios.VERASE]] + except Exception: + # unsupported/not present, use default + backspace_sequences = [b'\x7f', b'\x08'] + + result_string = b'' + while seconds is None or (time.time() - start < seconds): + key_pressed = None + try: + os.set_blocking(self._stdin_fd, False) + while key_pressed is None and (seconds is None or (time.time() - start < seconds)): + key_pressed = self._stdin.read(1) + finally: + os.set_blocking(self._stdin_fd, True) + if key_pressed is None: + key_pressed = b'' + + if (interrupt_input is None and key_pressed == interrupt) or (interrupt_input is not None and key_pressed.lower() in interrupt_input): + clear_line(self._stdout) + raise AnsiblePromptInterrupt('user interrupt') + if (complete_input is None and key_pressed in (b'\r', b'\n')) or (complete_input is not None and key_pressed.lower() in complete_input): + clear_line(self._stdout) + break + elif key_pressed in backspace_sequences: + clear_line(self._stdout) + result_string = result_string[:-1] + if echo: + self._stdout.write(result_string) + self._stdout.flush() + else: + result_string += key_pressed + return result_string + + @property + def _stdin(self): + if self._final_q: + raise NotImplementedError + try: + return sys.stdin.buffer + except AttributeError: + return None + + @property + def _stdin_fd(self): + try: + return self._stdin.fileno() + except (ValueError, AttributeError): + return None + + @property + def _stdout(self): + if self._final_q: + raise NotImplementedError + return sys.stdout.buffer + + @property + def _stdout_fd(self): + try: + return self._stdout.fileno() + except (ValueError, AttributeError): + return None |