diff options
author | NJDFan <rob.gaddi@gmail.com> | 2019-06-20 16:04:55 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-20 16:04:55 -0700 |
commit | 3215eb3088a4ca55c5217c2e99da5584c3ee02c1 (patch) | |
tree | c64894a3b91f219add4a296db8376d9b3b9bdcef /serial/tools/miniterm.py | |
parent | 5c021d4bdab2297602b4459f75bef2e00e5ec9ab (diff) | |
parent | acab9d2c0efb63323faebfd5e3405d77cd4b5617 (diff) | |
download | pyserial-git-3215eb3088a4ca55c5217c2e99da5584c3ee02c1.tar.gz |
Merge pull request #1 from pyserial/master
Catch up to the main fork
Diffstat (limited to 'serial/tools/miniterm.py')
-rw-r--r-- | serial/tools/miniterm.py | 355 |
1 files changed, 210 insertions, 145 deletions
diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py index 14182f0..3b8d5d2 100644 --- a/serial/tools/miniterm.py +++ b/serial/tools/miniterm.py @@ -3,10 +3,12 @@ # Very simple serial terminal # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C)2002-2015 Chris Liechti <cliechti@gmx.net> +# (C)2002-2017 Chris Liechti <cliechti@gmx.net> # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + import codecs import os import sys @@ -275,12 +277,12 @@ class DebugIO(Transform): """Print what is sent and received""" def rx(self, text): - sys.stderr.write(' [RX:{}] '.format(repr(text))) + sys.stderr.write(' [RX:{!r}] '.format(text)) sys.stderr.flush() return text def tx(self, text): - sys.stderr.write(' [TX:{}] '.format(repr(text))) + sys.stderr.write(' [TX:{!r}] '.format(text)) sys.stderr.flush() return text @@ -315,7 +317,7 @@ def ask_for_port(): sys.stderr.write('\n--- Available ports:\n') ports = [] for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): - sys.stderr.write('--- {:2}: {:20} {}\n'.format(n, port, desc)) + sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc)) ports.append(port) while True: port = raw_input('--- Enter port index or full name: ') @@ -347,8 +349,8 @@ class Miniterm(object): self.eol = eol self.filters = filters self.update_transformations() - self.exit_character = 0x1d # GS/CTRL+] - self.menu_character = 0x14 # Menu: CTRL+T + self.exit_character = unichr(0x1d) # GS/CTRL+] + self.menu_character = unichr(0x14) # Menu: CTRL+T self.alive = None self._reader_alive = None self.receiver_thread = None @@ -502,25 +504,7 @@ class Miniterm(object): if self.echo: self.console.write(c) elif c == '\x15': # CTRL+U -> upload file - sys.stderr.write('\n--- File to upload: ') - sys.stderr.flush() - with self.console: - filename = sys.stdin.readline().rstrip('\r\n') - if filename: - try: - with open(filename, 'rb') as f: - sys.stderr.write('--- Sending file {} ---\n'.format(filename)) - while True: - block = f.read(1024) - if not block: - break - self.serial.write(block) - # Wait for output buffer to drain. - self.serial.flush() - sys.stderr.write('.') # Progress indicator. - sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) - except IOError as e: - sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) + self.upload_file() elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help sys.stderr.write(self.get_help_text()) elif c == '\x12': # CTRL+R -> Toggle RTS @@ -536,24 +520,9 @@ class Miniterm(object): self.echo = not self.echo sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive')) elif c == '\x06': # CTRL+F -> edit filters - sys.stderr.write('\n--- Available Filters:\n') - sys.stderr.write('\n'.join( - '--- {:<10} = {.__doc__}'.format(k, v) - for k, v in sorted(TRANSFORMATIONS.items()))) - sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) - with self.console: - new_filters = sys.stdin.readline().lower().split() - if new_filters: - for f in new_filters: - if f not in TRANSFORMATIONS: - sys.stderr.write('--- unknown filter: {}\n'.format(repr(f))) - break - else: - self.filters = new_filters - self.update_transformations() - sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + self.change_filter() elif c == '\x0c': # CTRL+L -> EOL mode - modes = list(EOL_TRANSFORMATIONS) # keys + modes = list(EOL_TRANSFORMATIONS) # keys eol = modes.index(self.eol) + 1 if eol >= len(modes): eol = 0 @@ -561,63 +530,17 @@ class Miniterm(object): sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper())) self.update_transformations() elif c == '\x01': # CTRL+A -> set encoding - sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) - with self.console: - new_encoding = sys.stdin.readline().strip() - if new_encoding: - try: - codecs.lookup(new_encoding) - except LookupError: - sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) - else: - self.set_rx_encoding(new_encoding) - self.set_tx_encoding(new_encoding) - sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) - sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + self.change_encoding() elif c == '\x09': # CTRL+I -> info self.dump_port_settings() #~ elif c == '\x01': # CTRL+A -> cycle escape mode #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode elif c in 'pP': # P -> change port - with self.console: - try: - port = ask_for_port() - except KeyboardInterrupt: - port = None - if port and port != self.serial.port: - # reader thread needs to be shut down - self._stop_reader() - # save settings - settings = self.serial.getSettingsDict() - try: - new_serial = serial.serial_for_url(port, do_not_open=True) - # restore settings and open - new_serial.applySettingsDict(settings) - new_serial.rts = self.serial.rts - new_serial.dtr = self.serial.dtr - new_serial.open() - new_serial.break_condition = self.serial.break_condition - except Exception as e: - sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) - new_serial.close() - else: - self.serial.close() - self.serial = new_serial - sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) - # and restart the reader thread - self._start_reader() + self.change_port() + elif c in 'sS': # S -> suspend / open port temporarily + self.suspend_port() elif c in 'bB': # B -> change baudrate - sys.stderr.write('\n--- Baudrate: ') - sys.stderr.flush() - with self.console: - backup = self.serial.baudrate - try: - self.serial.baudrate = int(sys.stdin.readline().strip()) - except ValueError as e: - sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) - self.serial.baudrate = backup - else: - self.dump_port_settings() + self.change_baudrate() elif c == '8': # 8 -> change to 8 bits self.serial.bytesize = serial.EIGHTBITS self.dump_port_settings() @@ -657,6 +580,138 @@ class Miniterm(object): else: sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c))) + def upload_file(self): + """Ask user for filenname and send its contents""" + sys.stderr.write('\n--- File to upload: ') + sys.stderr.flush() + with self.console: + filename = sys.stdin.readline().rstrip('\r\n') + if filename: + try: + with open(filename, 'rb') as f: + sys.stderr.write('--- Sending file {} ---\n'.format(filename)) + while True: + block = f.read(1024) + if not block: + break + self.serial.write(block) + # Wait for output buffer to drain. + self.serial.flush() + sys.stderr.write('.') # Progress indicator. + sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) + except IOError as e: + sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) + + def change_filter(self): + """change the i/o transformations""" + sys.stderr.write('\n--- Available Filters:\n') + sys.stderr.write('\n'.join( + '--- {:<10} = {.__doc__}'.format(k, v) + for k, v in sorted(TRANSFORMATIONS.items()))) + sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) + with self.console: + new_filters = sys.stdin.readline().lower().split() + if new_filters: + for f in new_filters: + if f not in TRANSFORMATIONS: + sys.stderr.write('--- unknown filter: {!r}\n'.format(f)) + break + else: + self.filters = new_filters + self.update_transformations() + sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + + def change_encoding(self): + """change encoding on the serial port""" + sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) + with self.console: + new_encoding = sys.stdin.readline().strip() + if new_encoding: + try: + codecs.lookup(new_encoding) + except LookupError: + sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) + else: + self.set_rx_encoding(new_encoding) + self.set_tx_encoding(new_encoding) + sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) + sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + + def change_baudrate(self): + """change the baudrate""" + sys.stderr.write('\n--- Baudrate: ') + sys.stderr.flush() + with self.console: + backup = self.serial.baudrate + try: + self.serial.baudrate = int(sys.stdin.readline().strip()) + except ValueError as e: + sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) + self.serial.baudrate = backup + else: + self.dump_port_settings() + + def change_port(self): + """Have a conversation with the user to change the serial port""" + with self.console: + try: + port = ask_for_port() + except KeyboardInterrupt: + port = None + if port and port != self.serial.port: + # reader thread needs to be shut down + self._stop_reader() + # save settings + settings = self.serial.getSettingsDict() + try: + new_serial = serial.serial_for_url(port, do_not_open=True) + # restore settings and open + new_serial.applySettingsDict(settings) + new_serial.rts = self.serial.rts + new_serial.dtr = self.serial.dtr + new_serial.open() + new_serial.break_condition = self.serial.break_condition + except Exception as e: + sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) + new_serial.close() + else: + self.serial.close() + self.serial = new_serial + sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) + # and restart the reader thread + self._start_reader() + + def suspend_port(self): + """\ + open port temporarily, allow reconnect, exit and port change to get + out of the loop + """ + # reader thread needs to be shut down + self._stop_reader() + self.serial.close() + sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port)) + do_change_port = False + while not self.serial.is_open: + sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format( + exit=key_description(self.exit_character))) + k = self.console.getkey() + if k == self.exit_character: + self.stop() # exit app + break + elif k in 'pP': + do_change_port = True + break + try: + self.serial.open() + except Exception as e: + sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e)) + if do_change_port: + self.change_port() + else: + # and restart the reader thread + self._start_reader() + sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port)) + def get_help_text(self): """return the help text""" # help text, starts with blank line! @@ -707,123 +762,130 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr import argparse parser = argparse.ArgumentParser( - description="Miniterm - A simple terminal program for the serial port.") + description='Miniterm - A simple terminal program for the serial port.') parser.add_argument( - "port", + 'port', nargs='?', - help="serial port name ('-' to show port list)", + help='serial port name ("-" to show port list)', default=default_port) parser.add_argument( - "baudrate", + 'baudrate', nargs='?', type=int, - help="set baud rate, default: %(default)s", + help='set baud rate, default: %(default)s', default=default_baudrate) - group = parser.add_argument_group("port settings") + group = parser.add_argument_group('port settings') group.add_argument( - "--parity", + '--parity', choices=['N', 'E', 'O', 'S', 'M'], type=lambda c: c.upper(), - help="set parity, one of {N E O S M}, default: N", + help='set parity, one of {N E O S M}, default: N', default='N') group.add_argument( - "--rtscts", - action="store_true", - help="enable RTS/CTS flow control (default off)", + '--rtscts', + action='store_true', + help='enable RTS/CTS flow control (default off)', default=False) group.add_argument( - "--xonxoff", - action="store_true", - help="enable software flow control (default off)", + '--xonxoff', + action='store_true', + help='enable software flow control (default off)', default=False) group.add_argument( - "--rts", + '--rts', type=int, - help="set initial RTS line state (possible values: 0, 1)", + help='set initial RTS line state (possible values: 0, 1)', default=default_rts) group.add_argument( - "--dtr", + '--dtr', type=int, - help="set initial DTR line state (possible values: 0, 1)", + help='set initial DTR line state (possible values: 0, 1)', default=default_dtr) group.add_argument( - "--ask", - action="store_true", - help="ask again for port when open fails", + '--non-exclusive', + dest='exclusive', + action='store_false', + help='disable locking for native ports', + default=True) + + group.add_argument( + '--ask', + action='store_true', + help='ask again for port when open fails', default=False) - group = parser.add_argument_group("data handling") + group = parser.add_argument_group('data handling') group.add_argument( - "-e", "--echo", - action="store_true", - help="enable local echo (default off)", + '-e', '--echo', + action='store_true', + help='enable local echo (default off)', default=False) group.add_argument( - "--encoding", - dest="serial_port_encoding", - metavar="CODEC", - help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s", + '--encoding', + dest='serial_port_encoding', + metavar='CODEC', + help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s', default='UTF-8') group.add_argument( - "-f", "--filter", - action="append", - metavar="NAME", - help="add text transformation", + '-f', '--filter', + action='append', + metavar='NAME', + help='add text transformation', default=[]) group.add_argument( - "--eol", + '--eol', choices=['CR', 'LF', 'CRLF'], type=lambda c: c.upper(), - help="end of line mode", + help='end of line mode', default='CRLF') group.add_argument( - "--raw", - action="store_true", - help="Do no apply any encodings/transformations", + '--raw', + action='store_true', + help='Do no apply any encodings/transformations', default=False) - group = parser.add_argument_group("hotkeys") + group = parser.add_argument_group('hotkeys') group.add_argument( - "--exit-char", + '--exit-char', type=int, metavar='NUM', - help="Unicode of special character that is used to exit the application, default: %(default)s", + help='Unicode of special character that is used to exit the application, default: %(default)s', default=0x1d) # GS/CTRL+] group.add_argument( - "--menu-char", + '--menu-char', type=int, metavar='NUM', - help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s", + help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s', default=0x14) # Menu: CTRL+T - group = parser.add_argument_group("diagnostics") + group = parser.add_argument_group('diagnostics') group.add_argument( - "-q", "--quiet", - action="store_true", - help="suppress non-error messages", + '-q', '--quiet', + action='store_true', + help='suppress non-error messages', default=False) group.add_argument( - "--develop", - action="store_true", - help="show Python traceback on error", + '--develop', + action='store_true', + help='show Python traceback on error', default=False) args = parser.parse_args() @@ -876,9 +938,12 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive')) serial_instance.rts = args.rts + if isinstance(serial_instance, serial.Serial): + serial_instance.exclusive = args.exclusive + serial_instance.open() except serial.SerialException as e: - sys.stderr.write('could not open port {}: {}\n'.format(repr(args.port), e)) + sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e)) if args.develop: raise if not args.ask: @@ -914,7 +979,7 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr except KeyboardInterrupt: pass if not args.quiet: - sys.stderr.write("\n--- exit ---\n") + sys.stderr.write('\n--- exit ---\n') miniterm.join() miniterm.close() |