From d9862d1b37a267ffaafd7a6facae9e6b4ccbdee8 Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Tue, 14 Apr 2015 19:45:07 -0700 Subject: remove interruptable=True, following python3.5 --- blessings/terminal.py | 47 ++++++++------------- blessings/tests/test_keyboard.py | 90 ---------------------------------------- 2 files changed, 18 insertions(+), 119 deletions(-) diff --git a/blessings/terminal.py b/blessings/terminal.py index 94ecef9..b4b8fcb 100644 --- a/blessings/terminal.py +++ b/blessings/terminal.py @@ -697,7 +697,7 @@ class Terminal(object): byte = os.read(self._keyboard_fd, 1) return self._keyboard_decoder.decode(byte, final=False) - def _char_is_ready(self, timeout=None, interruptable=True): + def _char_is_ready(self, timeout=None): """ Whether a keypress has been detected on the keyboard. @@ -709,25 +709,11 @@ class Terminal(object): is detected when None (default). When ``timeout`` is a positive number, returns after ``timeout`` seconds have elapsed (float). - :param bool interruptable: Normally, when this function is interrupted - by a signal, such as the installment of SIGWINCH, this function will - ignore this interruption and continue to poll for input up to the - ``timeout`` specified. If you'd rather this function return ``u''`` - early, specify False for ``interruptable``. - - This is an open issue for review to **remove** this parameter, - https://github.com/erikrose/blessings/issues/96 :rtype: bool :returns: True if a keypress is awaiting to be read on the keyboard attached to this terminal. If input is not a terminal, False is always returned. """ - # Special care is taken to handle a custom SIGWINCH handler, which - # causes select() to be interrupted with errno 4 (EAGAIN) -- - # it is ignored, and a new timeout value is derived from the previous, - # unless timeout becomes negative, because signal handler has blocked - # beyond timeout, then False is returned. Otherwise, when timeout is 0, - # we continue to block indefinitely (default). stime = time.time() ready_r = [None, ] check_r = [self._keyboard_fd] if self._keyboard_fd is not None else [] @@ -736,8 +722,19 @@ class Terminal(object): try: ready_r, _, _ = select.select(check_r, [], [], timeout) except InterruptedError: - if not interruptable: - return u'' + # Beginning with python3.5, IntrruptError is no longer thrown + # https://www.python.org/dev/peps/pep-0475/ + # + # For previous versions of python, we take special care to + # retry select on InterruptedError exception, namely to handle + # a custom SIGWINCH handler. When installed, it would cause + # select() to be interrupted with errno 4 (EAGAIN). + # + # Just as in python3.5, it is ignored, and a new timeout value + # is derived from the previous unless timeout becomes negative. + # because the signal handler has blocked beyond timeout, then + # False is returned. Otherwise, when timeout is None, we + # continue to block indefinitely (default). if timeout is not None: # subtract time already elapsed, timeout -= time.time() - stime @@ -823,7 +820,7 @@ class Terminal(object): finally: self.stream.write(self.rmkx) - def keystroke(self, timeout=None, esc_delay=0.35, interruptable=True): + def keystroke(self, timeout=None, esc_delay=0.35): """ Receive and return next keystroke from keyboard within given timeout. @@ -835,14 +832,6 @@ class Terminal(object): specifies the amount of time after receiving the escape character (``chr(27)``) to seek for the completion of an application key before returning a :class:`~.Keystroke` for ``KEY_ESCAPE``. - :param bool interruptable: Normally, when this function is interrupted - by a signal, such as the installment of SIGWINCH, this function will - ignore this interruption and continue to poll for input up to the - ``timeout`` specified. If you'd rather this function return ``u''`` - early, specify False for ``interruptable``. - - This is an open issue for review to **remove** this parameter, - https://github.com/erikrose/blessings/issues/96 :rtype: :class:`~.Keystroke`. :raises NoKeyboard: The :attr:`stream` is not a terminal with timeout parameter as the default value of None, which would @@ -892,7 +881,7 @@ class Terminal(object): ucs += self._keyboard_buf.pop() # receive all immediately available bytes - while self._char_is_ready(0): + while self._char_is_ready(timeout=0): ucs += self._next_char() # decode keystroke, if any @@ -902,7 +891,7 @@ class Terminal(object): # incomplete, (which may be a multibyte encoding), block until until # one is received. while (not keystroke and - self._char_is_ready(time_left(stime, timeout), interruptable)): + self._char_is_ready(timeout=time_left(stime, timeout))): ucs += self._next_char() keystroke = resolve(text=ucs) @@ -914,7 +903,7 @@ class Terminal(object): if keystroke.code == self.KEY_ESCAPE: esctime = time.time() while (keystroke.code == self.KEY_ESCAPE and - self._char_is_ready(time_left(esctime, esc_delay))): + self._char_is_ready(timeout=time_left(esctime, esc_delay))): ucs += self._next_char() keystroke = resolve(text=ucs) diff --git a/blessings/tests/test_keyboard.py b/blessings/tests/test_keyboard.py index ff4d518..74fdc5c 100644 --- a/blessings/tests/test_keyboard.py +++ b/blessings/tests/test_keyboard.py @@ -123,96 +123,6 @@ def test_char_is_ready_interrupted_nonetype(): assert math.floor(time.time() - stime) == 1.0 -def test_char_is_ready_interrupted_interruptable(): - "_char_is_ready() may be interrupted when interruptable=False." - pid, master_fd = pty.fork() - if pid == 0: - try: - cov = __import__('cov_core_init').init() - except ImportError: - cov = None - - # child pauses, writes semaphore and begins awaiting input - global got_sigwinch - got_sigwinch = False - - def on_resize(sig, action): - global got_sigwinch - got_sigwinch = True - - term = TestTerminal() - signal.signal(signal.SIGWINCH, on_resize) - read_until_semaphore(sys.__stdin__.fileno(), semaphore=SEMAPHORE) - os.write(sys.__stdout__.fileno(), SEMAPHORE) - with term.keystroke_input(raw=True): - term.keystroke(timeout=1.05, interruptable=False) - os.write(sys.__stdout__.fileno(), b'complete') - assert got_sigwinch - if cov is not None: - cov.stop() - cov.save() - os._exit(0) - - with echo_off(master_fd): - os.write(master_fd, SEND_SEMAPHORE) - read_until_semaphore(master_fd) - stime = time.time() - time.sleep(0.05) - os.kill(pid, signal.SIGWINCH) - output = read_until_eof(master_fd) - - pid, status = os.waitpid(pid, 0) - assert output == u'complete' - assert os.WEXITSTATUS(status) == 0 - assert math.floor(time.time() - stime) == 0.0 - - -def test_char_is_ready_interrupted_nonetype_interruptable(): - """_char_is_ready() may be interrupted when interruptable=False with - timeout None.""" - pid, master_fd = pty.fork() - if pid == 0: - try: - cov = __import__('cov_core_init').init() - except ImportError: - cov = None - - # child pauses, writes semaphore and begins awaiting input - global got_sigwinch - got_sigwinch = False - - def on_resize(sig, action): - global got_sigwinch - got_sigwinch = True - - term = TestTerminal() - signal.signal(signal.SIGWINCH, on_resize) - read_until_semaphore(sys.__stdin__.fileno(), semaphore=SEMAPHORE) - os.write(sys.__stdout__.fileno(), SEMAPHORE) - with term.keystroke_input(raw=True): - term.keystroke(timeout=None, interruptable=False) - os.write(sys.__stdout__.fileno(), b'complete') - assert got_sigwinch - if cov is not None: - cov.stop() - cov.save() - os._exit(0) - - with echo_off(master_fd): - os.write(master_fd, SEND_SEMAPHORE) - read_until_semaphore(master_fd) - stime = time.time() - time.sleep(0.05) - os.kill(pid, signal.SIGWINCH) - os.write(master_fd, b'X') - output = read_until_eof(master_fd) - - pid, status = os.waitpid(pid, 0) - assert output == u'complete' - assert os.WEXITSTATUS(status) == 0 - assert math.floor(time.time() - stime) == 0.0 - - def test_keystroke_input_no_kb(): "keystroke_input() should not call tty.setcbreak() without keyboard." @as_subprocess -- cgit v1.2.1