summaryrefslogtreecommitdiff
path: root/lib/ansible/utils
diff options
context:
space:
mode:
authorSloane Hertel <19572925+s-hertel@users.noreply.github.com>2023-02-28 15:21:48 -0500
committerGitHub <noreply@github.com>2023-02-28 12:21:48 -0800
commitb981a9dfcd1f799abf6183eade556e653792cc03 (patch)
treebb339d6f495209576784e1d2bfb5330ec6b5d6ef /lib/ansible/utils
parentc2ea56e12e589c976a1972f71c3ef8200b36eb35 (diff)
downloadansible-b981a9dfcd1f799abf6183eade556e653792cc03.tar.gz
add a worker queue to get updates from the main results thread (#79886)
* Create a queue per WorkerProcess to receive intra-task updates * Update `pause` action to use the worker queue * Deprecate ConnectionBase()._new_stdin * Add new `Display` convenience method `prompt_until` to manage both controller- and worker-sourced prompting without cross-fork stdin sharing, in-worker mechanism to handle request-response over new worker queue.
Diffstat (limited to 'lib/ansible/utils')
-rw-r--r--lib/ansible/utils/display.py213
1 files changed, 208 insertions, 5 deletions
diff --git a/lib/ansible/utils/display.py b/lib/ansible/utils/display.py
index e521f2a401..16966aaf0d 100644
--- a/lib/ansible/utils/display.py
+++ b/lib/ansible/utils/display.py
@@ -18,31 +18,41 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
+try:
+ import curses
+except ImportError:
+ HAS_CURSES = False
+else:
+ # this will be set to False if curses.setupterm() fails
+ HAS_CURSES = True
+
import ctypes.util
import fcntl
import getpass
+import io
import logging
import os
import random
import subprocess
import sys
+import termios
import textwrap
import threading
import time
+import tty
+import typing as t
+from functools import wraps
from struct import unpack, pack
-from termios import TIOCGWINSZ
from ansible import constants as C
-from ansible.errors import AnsibleError, AnsibleAssertionError
+from ansible.errors import AnsibleError, AnsibleAssertionError, AnsiblePromptInterrupt, AnsiblePromptNoninteractive
from ansible.module_utils._text import to_bytes, to_text
from ansible.module_utils.six import text_type
from ansible.utils.color import stringc
from ansible.utils.multiprocessing import context as multiprocessing_context
from ansible.utils.singleton import Singleton
from ansible.utils.unsafe_proxy import wrap_var
-from functools import wraps
-
_LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
# Set argtypes, to avoid segfault if the wrong type is provided,
@@ -52,6 +62,9 @@ _LIBC.wcswidth.argtypes = (ctypes.c_wchar_p, ctypes.c_int)
# Max for c_int
_MAX_INT = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1
+MOVE_TO_BOL = b'\r'
+CLEAR_TO_EOL = b'\x1b[K'
+
def get_text_width(text):
"""Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the
@@ -183,6 +196,62 @@ def _synchronize_textiowrapper(tio, lock):
buffer.flush = _wrap_with_lock(buffer.flush, lock)
+def setraw(fd, when=termios.TCSAFLUSH):
+ """Put terminal into a raw mode.
+
+ Copied from ``tty`` from CPython 3.11.0, and modified to not remove OPOST from OFLAG
+
+ OPOST is kept to prevent an issue with multi line prompts from being corrupted now that display
+ is proxied via the queue from forks. The problem is a race condition, in that we proxy the display
+ over the fork, but before it can be displayed, this plugin will have continued executing, potentially
+ setting stdout and stdin to raw which remove output post processing that commonly converts NL to CRLF
+ """
+ mode = termios.tcgetattr(fd)
+ mode[tty.IFLAG] = mode[tty.IFLAG] & ~(termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON)
+ mode[tty.OFLAG] = mode[tty.OFLAG] & ~(termios.OPOST)
+ mode[tty.CFLAG] = mode[tty.CFLAG] & ~(termios.CSIZE | termios.PARENB)
+ mode[tty.CFLAG] = mode[tty.CFLAG] | termios.CS8
+ mode[tty.LFLAG] = mode[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
+ mode[tty.CC][termios.VMIN] = 1
+ mode[tty.CC][termios.VTIME] = 0
+ termios.tcsetattr(fd, when, mode)
+
+
+def clear_line(stdout):
+ stdout.write(b'\x1b[%s' % MOVE_TO_BOL)
+ stdout.write(b'\x1b[%s' % CLEAR_TO_EOL)
+
+
+def setup_prompt(stdin_fd, stdout_fd, seconds, echo):
+ # type: (int, int, int, bool) -> None
+ setraw(stdin_fd)
+
+ # Only set stdout to raw mode if it is a TTY. This is needed when redirecting
+ # stdout to a file since a file cannot be set to raw mode.
+ if os.isatty(stdout_fd):
+ setraw(stdout_fd)
+
+ if echo:
+ new_settings = termios.tcgetattr(stdin_fd)
+ new_settings[3] = new_settings[3] | termios.ECHO
+ termios.tcsetattr(stdin_fd, termios.TCSANOW, new_settings)
+
+
+def setupterm():
+ # Nest the try except since curses.error is not available if curses did not import
+ try:
+ curses.setupterm()
+ except (curses.error, TypeError, io.UnsupportedOperation):
+ global HAS_CURSES
+ HAS_CURSES = False
+ else:
+ global MOVE_TO_BOL
+ global CLEAR_TO_EOL
+ # curses.tigetstr() returns None in some circumstances
+ MOVE_TO_BOL = curses.tigetstr('cr') or MOVE_TO_BOL
+ CLEAR_TO_EOL = curses.tigetstr('el') or CLEAR_TO_EOL
+
+
class Display(metaclass=Singleton):
def __init__(self, verbosity=0):
@@ -228,6 +297,8 @@ class Display(metaclass=Singleton):
except Exception as ex:
self.warning(f"failed to patch stdout/stderr for fork-safety: {ex}")
+ self.setup_curses = False
+
def set_queue(self, queue):
"""Set the _final_q on Display, so that we know to proxy display over the queue
instead of directly writing to stdout/stderr from forks
@@ -520,7 +591,139 @@ class Display(metaclass=Singleton):
def _set_column_width(self):
if os.isatty(1):
- tty_size = unpack('HHHH', fcntl.ioctl(1, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
+ tty_size = unpack('HHHH', fcntl.ioctl(1, termios.TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
else:
tty_size = 0
self.columns = max(79, tty_size - 1)
+
+ def prompt_until(self, msg, private=False, seconds=None, interrupt_input=None, complete_input=None):
+ if self._final_q:
+ from ansible.executor.process.worker import current_worker
+ self._final_q.send_prompt(
+ worker_id=current_worker.worker_id, prompt=msg, private=private, seconds=seconds,
+ interrupt_input=interrupt_input, complete_input=complete_input
+ )
+ return current_worker.worker_queue.get()
+
+ if HAS_CURSES and not self.setup_curses:
+ setupterm()
+ self.setup_curses = True
+
+ if (
+ self._stdin_fd is None
+ or not os.isatty(self._stdin_fd)
+ # Compare the current process group to the process group associated
+ # with terminal of the given file descriptor to determine if the process
+ # is running in the background.
+ or os.getpgrp() != os.tcgetpgrp(self._stdin_fd)
+ ):
+ raise AnsiblePromptNoninteractive('stdin is not interactive')
+
+ # When seconds/interrupt_input/complete_input are all None, this does mostly the same thing as input/getpass,
+ # but self.prompt may raise a KeyboardInterrupt, which must be caught in the main thread.
+ # If the main thread handled this, it would also need to send a newline to the tty of any hanging pids.
+ # if seconds is None and interrupt_input is None and complete_input is None:
+ # try:
+ # return self.prompt(msg, private=private)
+ # except KeyboardInterrupt:
+ # # can't catch in the results_thread_main daemon thread
+ # raise AnsiblePromptInterrupt('user interrupt')
+
+ self.display(msg)
+ result = b''
+ with self._lock:
+ original_stdin_settings = termios.tcgetattr(self._stdin_fd)
+ try:
+ setup_prompt(self._stdin_fd, self._stdout_fd, seconds, not private)
+
+ # flush the buffer to make sure no previous key presses
+ # are read in below
+ termios.tcflush(self._stdin, termios.TCIFLUSH)
+
+ # read input 1 char at a time until the optional timeout or complete/interrupt condition is met
+ return self._read_non_blocking_stdin(echo=not private, seconds=seconds, interrupt_input=interrupt_input, complete_input=complete_input)
+ finally:
+ # restore the old settings for the duped stdin stdin_fd
+ termios.tcsetattr(self._stdin_fd, termios.TCSADRAIN, original_stdin_settings)
+
+ def _read_non_blocking_stdin(
+ self,
+ echo=False, # type: bool
+ seconds=None, # type: int
+ interrupt_input=None, # type: t.Iterable[bytes]
+ complete_input=None, # type: t.Iterable[bytes]
+ ): # type: (...) -> bytes
+
+ if self._final_q:
+ raise NotImplementedError
+
+ if seconds is not None:
+ start = time.time()
+ if interrupt_input is None:
+ try:
+ interrupt = termios.tcgetattr(sys.stdin.buffer.fileno())[6][termios.VINTR]
+ except Exception:
+ interrupt = b'\x03' # value for Ctrl+C
+
+ try:
+ backspace_sequences = [termios.tcgetattr(self._stdin_fd)[6][termios.VERASE]]
+ except Exception:
+ # unsupported/not present, use default
+ backspace_sequences = [b'\x7f', b'\x08']
+
+ result_string = b''
+ while seconds is None or (time.time() - start < seconds):
+ key_pressed = None
+ try:
+ os.set_blocking(self._stdin_fd, False)
+ while key_pressed is None and (seconds is None or (time.time() - start < seconds)):
+ key_pressed = self._stdin.read(1)
+ finally:
+ os.set_blocking(self._stdin_fd, True)
+ if key_pressed is None:
+ key_pressed = b''
+
+ if (interrupt_input is None and key_pressed == interrupt) or (interrupt_input is not None and key_pressed.lower() in interrupt_input):
+ clear_line(self._stdout)
+ raise AnsiblePromptInterrupt('user interrupt')
+ if (complete_input is None and key_pressed in (b'\r', b'\n')) or (complete_input is not None and key_pressed.lower() in complete_input):
+ clear_line(self._stdout)
+ break
+ elif key_pressed in backspace_sequences:
+ clear_line(self._stdout)
+ result_string = result_string[:-1]
+ if echo:
+ self._stdout.write(result_string)
+ self._stdout.flush()
+ else:
+ result_string += key_pressed
+ return result_string
+
+ @property
+ def _stdin(self):
+ if self._final_q:
+ raise NotImplementedError
+ try:
+ return sys.stdin.buffer
+ except AttributeError:
+ return None
+
+ @property
+ def _stdin_fd(self):
+ try:
+ return self._stdin.fileno()
+ except (ValueError, AttributeError):
+ return None
+
+ @property
+ def _stdout(self):
+ if self._final_q:
+ raise NotImplementedError
+ return sys.stdout.buffer
+
+ @property
+ def _stdout_fd(self):
+ try:
+ return self._stdout.fileno()
+ except (ValueError, AttributeError):
+ return None