diff options
Diffstat (limited to 'serial/tools')
-rw-r--r-- | serial/tools/hexlify_codec.py | 2 | ||||
-rw-r--r-- | serial/tools/list_ports.py | 15 | ||||
-rw-r--r-- | serial/tools/list_ports_common.py | 36 | ||||
-rw-r--r-- | serial/tools/list_ports_linux.py | 38 | ||||
-rw-r--r-- | serial/tools/list_ports_osx.py | 5 | ||||
-rw-r--r-- | serial/tools/list_ports_posix.py | 34 | ||||
-rw-r--r-- | serial/tools/list_ports_windows.py | 19 | ||||
-rw-r--r-- | serial/tools/miniterm.py | 355 |
8 files changed, 331 insertions, 173 deletions
diff --git a/serial/tools/hexlify_codec.py b/serial/tools/hexlify_codec.py index 1371da2..bd8f6b0 100644 --- a/serial/tools/hexlify_codec.py +++ b/serial/tools/hexlify_codec.py @@ -18,6 +18,8 @@ Therefore decoding is binary to text and thus converting binary data to hex dump """ +from __future__ import absolute_import + import codecs import serial diff --git a/serial/tools/list_ports.py b/serial/tools/list_ports.py index 2271dd1..0d7e3d4 100644 --- a/serial/tools/list_ports.py +++ b/serial/tools/list_ports.py @@ -16,6 +16,8 @@ Additionally a grep function is supplied that can be used to search for ports based on their descriptions or hardware ID. """ +from __future__ import absolute_import + import sys import os import re @@ -34,14 +36,14 @@ else: # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -def grep(regexp): +def grep(regexp, include_links=False): """\ Search for ports using a regular expression. Port name, description and hardware ID are searched. The function returns an iterable that returns the same tuples as comport() would do. """ r = re.compile(regexp, re.I) - for info in comports(): + for info in comports(include_links): port, desc, hwid = info if r.search(port) or r.search(desc) or r.search(hwid): yield info @@ -73,6 +75,11 @@ def main(): type=int, help='only output the N-th entry') + parser.add_argument( + '-s', '--include-links', + action='store_true', + help='include entries that are symlinks to real devices') + args = parser.parse_args() hits = 0 @@ -80,9 +87,9 @@ def main(): if args.regexp: if not args.quiet: sys.stderr.write("Filtered list with regexp: {!r}\n".format(args.regexp)) - iterator = sorted(grep(args.regexp)) + iterator = sorted(grep(args.regexp, include_links=args.include_links)) else: - iterator = sorted(comports()) + iterator = sorted(comports(include_links=args.include_links)) # list them for n, (port, desc, hwid) in enumerate(iterator, 1): if args.n is None or args.n == n: diff --git a/serial/tools/list_ports_common.py b/serial/tools/list_ports_common.py index df12939..617f3dc 100644 --- a/serial/tools/list_ports_common.py +++ b/serial/tools/list_ports_common.py @@ -7,7 +7,13 @@ # (C) 2015 Chris Liechti <cliechti@gmx.net> # # SPDX-License-Identifier: BSD-3-Clause + +from __future__ import absolute_import + import re +import glob +import os +import os.path def numsplit(text): @@ -29,9 +35,9 @@ def numsplit(text): class ListPortInfo(object): """Info collection base class for serial ports""" - def __init__(self, device=None): + def __init__(self, device, skip_link_detection=False): self.device = device - self.name = None + self.name = os.path.basename(device) self.description = 'n/a' self.hwid = 'n/a' # USB specific data @@ -42,6 +48,9 @@ class ListPortInfo(object): self.manufacturer = None self.product = None self.interface = None + # special handling for links + if not skip_link_detection and device is not None and os.path.islink(device): + self.hwid = 'LINK={}'.format(os.path.realpath(device)) def usb_description(self): """return a short string to name the port based on USB info""" @@ -66,9 +75,16 @@ class ListPortInfo(object): self.hwid = self.usb_info() def __eq__(self, other): - return self.device == other.device + return isinstance(other, ListPortInfo) and self.device == other.device + + def __hash__(self): + return hash(self.device) def __lt__(self, other): + if not isinstance(other, ListPortInfo): + raise TypeError('unorderable types: {}() and {}()'.format( + type(self).__name__, + type(other).__name__)) return numsplit(self.device) < numsplit(other.device) def __str__(self): @@ -85,6 +101,20 @@ class ListPortInfo(object): else: raise IndexError('{} > 2'.format(index)) + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +def list_links(devices): + """\ + search all /dev devices and look for symlinks to known ports already + listed in devices. + """ + links = [] + for device in glob.glob('/dev/*'): + if os.path.islink(device) and os.path.realpath(device) in devices: + links.append(device) + return links + + # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # test if __name__ == '__main__': diff --git a/serial/tools/list_ports_linux.py b/serial/tools/list_ports_linux.py index 0dfa81f..9346ae9 100644 --- a/serial/tools/list_ports_linux.py +++ b/serial/tools/list_ports_linux.py @@ -8,6 +8,8 @@ # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + import glob import os from serial.tools import list_ports_common @@ -18,7 +20,12 @@ class SysFS(list_ports_common.ListPortInfo): def __init__(self, device): super(SysFS, self).__init__(device) - self.name = os.path.basename(device) + # special handling for links + if device is not None and os.path.islink(device): + device = os.path.realpath(device) + is_link = True + else: + is_link = False self.usb_device_path = None if os.path.exists('/sys/class/tty/{}/device'.format(self.name)): self.device_path = os.path.realpath('/sys/class/tty/{}/device'.format(self.name)) @@ -28,17 +35,28 @@ class SysFS(list_ports_common.ListPortInfo): self.subsystem = None # check device type if self.subsystem == 'usb-serial': - self.usb_device_path = os.path.dirname(os.path.dirname(self.device_path)) + self.usb_interface_path = os.path.dirname(self.device_path) elif self.subsystem == 'usb': - self.usb_device_path = os.path.dirname(self.device_path) + self.usb_interface_path = self.device_path else: - self.usb_device_path = None + self.usb_interface_path = None # fill-in info for USB devices - if self.usb_device_path is not None: + if self.usb_interface_path is not None: + self.usb_device_path = os.path.dirname(self.usb_interface_path) + + try: + num_if = int(self.read_line(self.usb_device_path, 'bNumInterfaces')) + except ValueError: + num_if = 1 + self.vid = int(self.read_line(self.usb_device_path, 'idVendor'), 16) self.pid = int(self.read_line(self.usb_device_path, 'idProduct'), 16) self.serial_number = self.read_line(self.usb_device_path, 'serial') - self.location = os.path.basename(self.usb_device_path) + if num_if > 1: # multi interface devices like FT4232 + self.location = os.path.basename(self.usb_interface_path) + else: + self.location = os.path.basename(self.usb_device_path) + self.manufacturer = self.read_line(self.usb_device_path, 'manufacturer') self.product = self.read_line(self.usb_device_path, 'product') self.interface = self.read_line(self.device_path, 'interface') @@ -53,6 +71,9 @@ class SysFS(list_ports_common.ListPortInfo): self.description = self.name self.hwid = os.path.basename(self.device_path) + if is_link: + self.hwid += ' LINK={}'.format(device) + def read_line(self, *args): """\ Helper function to read a single line from a file. @@ -67,13 +88,16 @@ class SysFS(list_ports_common.ListPortInfo): return None -def comports(): +def comports(include_links=False): devices = glob.glob('/dev/ttyS*') # built-in serial ports devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver + devices.extend(glob.glob('/dev/ttyXRUSB*')) # xr-usb-serial port exar (DELL Edge 3001) devices.extend(glob.glob('/dev/ttyACM*')) # usb-serial with CDC-ACM profile devices.extend(glob.glob('/dev/ttyAMA*')) # ARM internal port (raspi) devices.extend(glob.glob('/dev/rfcomm*')) # BT serial devices devices.extend(glob.glob('/dev/ttyAP*')) # Advantech multi-port serial controllers + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [info for info in [SysFS(d) for d in devices] if info.subsystem != "platform"] # hide non-present internal serial ports diff --git a/serial/tools/list_ports_osx.py b/serial/tools/list_ports_osx.py index 1d57b96..f46a820 100644 --- a/serial/tools/list_ports_osx.py +++ b/serial/tools/list_ports_osx.py @@ -21,6 +21,8 @@ # Also see the 'IORegistryExplorer' for an idea of what we are actually searching +from __future__ import absolute_import + import ctypes import ctypes.util @@ -227,7 +229,8 @@ def search_for_locationID_in_interfaces(serial_interfaces, locationID): return None -def comports(): +def comports(include_links=False): + # XXX include_links is currently ignored. are links in /dev even supported here? # Scan for all iokit serial ports services = GetIOServicesByType('IOSerialBSDClient') ports = [] diff --git a/serial/tools/list_ports_posix.py b/serial/tools/list_ports_posix.py index 6ea4db9..79bc8ed 100644 --- a/serial/tools/list_ports_posix.py +++ b/serial/tools/list_ports_posix.py @@ -16,6 +16,8 @@ As currently no method is known to get the second two strings easily, they are currently just identical to the port name. """ +from __future__ import absolute_import + import glob import sys import os @@ -34,48 +36,64 @@ elif plat == 'cygwin': # cygwin/win32 # cygwin accepts /dev/com* in many contexts # (such as 'open' call, explicit 'ls'), but 'glob.glob' # and bare 'ls' do not; so use /dev/ttyS* instead - def comports(): + def comports(include_links=False): devices = glob.glob('/dev/ttyS*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:7] == 'openbsd': # OpenBSD - def comports(): + def comports(include_links=False): devices = glob.glob('/dev/cua*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:3] == 'bsd' or plat[:7] == 'freebsd': - def comports(): + def comports(include_links=False): devices = glob.glob('/dev/cua*[!.init][!.lock]') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:6] == 'netbsd': # NetBSD - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/dty*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:4] == 'irix': # IRIX - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/ttyf*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:2] == 'hp': # HP-UX (not tested) - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/tty*p0') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:5] == 'sunos': # Solaris/SunOS - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/tty*c') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] elif plat[:3] == 'aix': # AIX - def comports(): + def comports(include_links=False): """scan for available ports. return a list of device names.""" devices = glob.glob('/dev/tty*') + if include_links: + devices.extend(list_ports_common.list_links(devices)) return [list_ports_common.ListPortInfo(d) for d in devices] else: diff --git a/serial/tools/list_ports_windows.py b/serial/tools/list_ports_windows.py index 93fa128..19b9499 100644 --- a/serial/tools/list_ports_windows.py +++ b/serial/tools/list_ports_windows.py @@ -8,6 +8,8 @@ # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + # pylint: disable=invalid-name,too-few-public-methods import re import ctypes @@ -113,7 +115,7 @@ RegCloseKey.argtypes = [HKEY] RegCloseKey.restype = LONG RegQueryValueEx = advapi32.RegQueryValueExW -RegQueryValueEx.argtypes = [HKEY, LPCTSTR , LPDWORD, LPDWORD, LPBYTE, LPDWORD] +RegQueryValueEx.argtypes = [HKEY, LPCTSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD] RegQueryValueEx.restype = LONG @@ -143,6 +145,7 @@ def iterate_comports(): # repeat for all possible GUIDs for index in range(guids_size.value): + bInterfaceNumber = None g_hdi = SetupDiGetClassDevs( ctypes.byref(GUIDs[index]), None, @@ -205,18 +208,20 @@ def iterate_comports(): # stringify szHardwareID_str = szHardwareID.value - info = list_ports_common.ListPortInfo(port_name_buffer.value) + info = list_ports_common.ListPortInfo(port_name_buffer.value, skip_link_detection=True) # in case of USB, make a more readable string, similar to that form # that we also generate on other platforms if szHardwareID_str.startswith('USB'): - m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(\\(\w+))?', szHardwareID_str, re.I) + m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(&MI_(\d{2}))?(\\(\w+))?', szHardwareID_str, re.I) if m: info.vid = int(m.group(1), 16) if m.group(3): info.pid = int(m.group(3), 16) if m.group(5): - info.serial_number = m.group(5) + bInterfaceNumber = int(m.group(5)) + if m.group(7): + info.serial_number = m.group(7) # calculate a location string loc_path_str = ctypes.create_unicode_buffer(250) if SetupDiGetDeviceRegistryProperty( @@ -238,6 +243,10 @@ def iterate_comports(): else: location.append('-') location.append(g.group(2)) + if bInterfaceNumber is not None: + location.append(':{}.{}'.format( + 'x', # XXX how to determine correct bConfigurationValue? + bInterfaceNumber)) if location: info.location = ''.join(location) info.hwid = info.usb_info() @@ -287,7 +296,7 @@ def iterate_comports(): SetupDiDestroyDeviceInfoList(g_hdi) -def comports(): +def comports(include_links=False): """Return a list of info objects about serial ports""" return list(iterate_comports()) diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py index 14182f0..3b8d5d2 100644 --- a/serial/tools/miniterm.py +++ b/serial/tools/miniterm.py @@ -3,10 +3,12 @@ # Very simple serial terminal # # This file is part of pySerial. https://github.com/pyserial/pyserial -# (C)2002-2015 Chris Liechti <cliechti@gmx.net> +# (C)2002-2017 Chris Liechti <cliechti@gmx.net> # # SPDX-License-Identifier: BSD-3-Clause +from __future__ import absolute_import + import codecs import os import sys @@ -275,12 +277,12 @@ class DebugIO(Transform): """Print what is sent and received""" def rx(self, text): - sys.stderr.write(' [RX:{}] '.format(repr(text))) + sys.stderr.write(' [RX:{!r}] '.format(text)) sys.stderr.flush() return text def tx(self, text): - sys.stderr.write(' [TX:{}] '.format(repr(text))) + sys.stderr.write(' [TX:{!r}] '.format(text)) sys.stderr.flush() return text @@ -315,7 +317,7 @@ def ask_for_port(): sys.stderr.write('\n--- Available ports:\n') ports = [] for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): - sys.stderr.write('--- {:2}: {:20} {}\n'.format(n, port, desc)) + sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc)) ports.append(port) while True: port = raw_input('--- Enter port index or full name: ') @@ -347,8 +349,8 @@ class Miniterm(object): self.eol = eol self.filters = filters self.update_transformations() - self.exit_character = 0x1d # GS/CTRL+] - self.menu_character = 0x14 # Menu: CTRL+T + self.exit_character = unichr(0x1d) # GS/CTRL+] + self.menu_character = unichr(0x14) # Menu: CTRL+T self.alive = None self._reader_alive = None self.receiver_thread = None @@ -502,25 +504,7 @@ class Miniterm(object): if self.echo: self.console.write(c) elif c == '\x15': # CTRL+U -> upload file - sys.stderr.write('\n--- File to upload: ') - sys.stderr.flush() - with self.console: - filename = sys.stdin.readline().rstrip('\r\n') - if filename: - try: - with open(filename, 'rb') as f: - sys.stderr.write('--- Sending file {} ---\n'.format(filename)) - while True: - block = f.read(1024) - if not block: - break - self.serial.write(block) - # Wait for output buffer to drain. - self.serial.flush() - sys.stderr.write('.') # Progress indicator. - sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) - except IOError as e: - sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) + self.upload_file() elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help sys.stderr.write(self.get_help_text()) elif c == '\x12': # CTRL+R -> Toggle RTS @@ -536,24 +520,9 @@ class Miniterm(object): self.echo = not self.echo sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive')) elif c == '\x06': # CTRL+F -> edit filters - sys.stderr.write('\n--- Available Filters:\n') - sys.stderr.write('\n'.join( - '--- {:<10} = {.__doc__}'.format(k, v) - for k, v in sorted(TRANSFORMATIONS.items()))) - sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) - with self.console: - new_filters = sys.stdin.readline().lower().split() - if new_filters: - for f in new_filters: - if f not in TRANSFORMATIONS: - sys.stderr.write('--- unknown filter: {}\n'.format(repr(f))) - break - else: - self.filters = new_filters - self.update_transformations() - sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + self.change_filter() elif c == '\x0c': # CTRL+L -> EOL mode - modes = list(EOL_TRANSFORMATIONS) # keys + modes = list(EOL_TRANSFORMATIONS) # keys eol = modes.index(self.eol) + 1 if eol >= len(modes): eol = 0 @@ -561,63 +530,17 @@ class Miniterm(object): sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper())) self.update_transformations() elif c == '\x01': # CTRL+A -> set encoding - sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) - with self.console: - new_encoding = sys.stdin.readline().strip() - if new_encoding: - try: - codecs.lookup(new_encoding) - except LookupError: - sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) - else: - self.set_rx_encoding(new_encoding) - self.set_tx_encoding(new_encoding) - sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) - sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + self.change_encoding() elif c == '\x09': # CTRL+I -> info self.dump_port_settings() #~ elif c == '\x01': # CTRL+A -> cycle escape mode #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode elif c in 'pP': # P -> change port - with self.console: - try: - port = ask_for_port() - except KeyboardInterrupt: - port = None - if port and port != self.serial.port: - # reader thread needs to be shut down - self._stop_reader() - # save settings - settings = self.serial.getSettingsDict() - try: - new_serial = serial.serial_for_url(port, do_not_open=True) - # restore settings and open - new_serial.applySettingsDict(settings) - new_serial.rts = self.serial.rts - new_serial.dtr = self.serial.dtr - new_serial.open() - new_serial.break_condition = self.serial.break_condition - except Exception as e: - sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) - new_serial.close() - else: - self.serial.close() - self.serial = new_serial - sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) - # and restart the reader thread - self._start_reader() + self.change_port() + elif c in 'sS': # S -> suspend / open port temporarily + self.suspend_port() elif c in 'bB': # B -> change baudrate - sys.stderr.write('\n--- Baudrate: ') - sys.stderr.flush() - with self.console: - backup = self.serial.baudrate - try: - self.serial.baudrate = int(sys.stdin.readline().strip()) - except ValueError as e: - sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) - self.serial.baudrate = backup - else: - self.dump_port_settings() + self.change_baudrate() elif c == '8': # 8 -> change to 8 bits self.serial.bytesize = serial.EIGHTBITS self.dump_port_settings() @@ -657,6 +580,138 @@ class Miniterm(object): else: sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c))) + def upload_file(self): + """Ask user for filenname and send its contents""" + sys.stderr.write('\n--- File to upload: ') + sys.stderr.flush() + with self.console: + filename = sys.stdin.readline().rstrip('\r\n') + if filename: + try: + with open(filename, 'rb') as f: + sys.stderr.write('--- Sending file {} ---\n'.format(filename)) + while True: + block = f.read(1024) + if not block: + break + self.serial.write(block) + # Wait for output buffer to drain. + self.serial.flush() + sys.stderr.write('.') # Progress indicator. + sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) + except IOError as e: + sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) + + def change_filter(self): + """change the i/o transformations""" + sys.stderr.write('\n--- Available Filters:\n') + sys.stderr.write('\n'.join( + '--- {:<10} = {.__doc__}'.format(k, v) + for k, v in sorted(TRANSFORMATIONS.items()))) + sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) + with self.console: + new_filters = sys.stdin.readline().lower().split() + if new_filters: + for f in new_filters: + if f not in TRANSFORMATIONS: + sys.stderr.write('--- unknown filter: {!r}\n'.format(f)) + break + else: + self.filters = new_filters + self.update_transformations() + sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) + + def change_encoding(self): + """change encoding on the serial port""" + sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) + with self.console: + new_encoding = sys.stdin.readline().strip() + if new_encoding: + try: + codecs.lookup(new_encoding) + except LookupError: + sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) + else: + self.set_rx_encoding(new_encoding) + self.set_tx_encoding(new_encoding) + sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) + sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) + + def change_baudrate(self): + """change the baudrate""" + sys.stderr.write('\n--- Baudrate: ') + sys.stderr.flush() + with self.console: + backup = self.serial.baudrate + try: + self.serial.baudrate = int(sys.stdin.readline().strip()) + except ValueError as e: + sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) + self.serial.baudrate = backup + else: + self.dump_port_settings() + + def change_port(self): + """Have a conversation with the user to change the serial port""" + with self.console: + try: + port = ask_for_port() + except KeyboardInterrupt: + port = None + if port and port != self.serial.port: + # reader thread needs to be shut down + self._stop_reader() + # save settings + settings = self.serial.getSettingsDict() + try: + new_serial = serial.serial_for_url(port, do_not_open=True) + # restore settings and open + new_serial.applySettingsDict(settings) + new_serial.rts = self.serial.rts + new_serial.dtr = self.serial.dtr + new_serial.open() + new_serial.break_condition = self.serial.break_condition + except Exception as e: + sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) + new_serial.close() + else: + self.serial.close() + self.serial = new_serial + sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) + # and restart the reader thread + self._start_reader() + + def suspend_port(self): + """\ + open port temporarily, allow reconnect, exit and port change to get + out of the loop + """ + # reader thread needs to be shut down + self._stop_reader() + self.serial.close() + sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port)) + do_change_port = False + while not self.serial.is_open: + sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format( + exit=key_description(self.exit_character))) + k = self.console.getkey() + if k == self.exit_character: + self.stop() # exit app + break + elif k in 'pP': + do_change_port = True + break + try: + self.serial.open() + except Exception as e: + sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e)) + if do_change_port: + self.change_port() + else: + # and restart the reader thread + self._start_reader() + sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port)) + def get_help_text(self): """return the help text""" # help text, starts with blank line! @@ -707,123 +762,130 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr import argparse parser = argparse.ArgumentParser( - description="Miniterm - A simple terminal program for the serial port.") + description='Miniterm - A simple terminal program for the serial port.') parser.add_argument( - "port", + 'port', nargs='?', - help="serial port name ('-' to show port list)", + help='serial port name ("-" to show port list)', default=default_port) parser.add_argument( - "baudrate", + 'baudrate', nargs='?', type=int, - help="set baud rate, default: %(default)s", + help='set baud rate, default: %(default)s', default=default_baudrate) - group = parser.add_argument_group("port settings") + group = parser.add_argument_group('port settings') group.add_argument( - "--parity", + '--parity', choices=['N', 'E', 'O', 'S', 'M'], type=lambda c: c.upper(), - help="set parity, one of {N E O S M}, default: N", + help='set parity, one of {N E O S M}, default: N', default='N') group.add_argument( - "--rtscts", - action="store_true", - help="enable RTS/CTS flow control (default off)", + '--rtscts', + action='store_true', + help='enable RTS/CTS flow control (default off)', default=False) group.add_argument( - "--xonxoff", - action="store_true", - help="enable software flow control (default off)", + '--xonxoff', + action='store_true', + help='enable software flow control (default off)', default=False) group.add_argument( - "--rts", + '--rts', type=int, - help="set initial RTS line state (possible values: 0, 1)", + help='set initial RTS line state (possible values: 0, 1)', default=default_rts) group.add_argument( - "--dtr", + '--dtr', type=int, - help="set initial DTR line state (possible values: 0, 1)", + help='set initial DTR line state (possible values: 0, 1)', default=default_dtr) group.add_argument( - "--ask", - action="store_true", - help="ask again for port when open fails", + '--non-exclusive', + dest='exclusive', + action='store_false', + help='disable locking for native ports', + default=True) + + group.add_argument( + '--ask', + action='store_true', + help='ask again for port when open fails', default=False) - group = parser.add_argument_group("data handling") + group = parser.add_argument_group('data handling') group.add_argument( - "-e", "--echo", - action="store_true", - help="enable local echo (default off)", + '-e', '--echo', + action='store_true', + help='enable local echo (default off)', default=False) group.add_argument( - "--encoding", - dest="serial_port_encoding", - metavar="CODEC", - help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s", + '--encoding', + dest='serial_port_encoding', + metavar='CODEC', + help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s', default='UTF-8') group.add_argument( - "-f", "--filter", - action="append", - metavar="NAME", - help="add text transformation", + '-f', '--filter', + action='append', + metavar='NAME', + help='add text transformation', default=[]) group.add_argument( - "--eol", + '--eol', choices=['CR', 'LF', 'CRLF'], type=lambda c: c.upper(), - help="end of line mode", + help='end of line mode', default='CRLF') group.add_argument( - "--raw", - action="store_true", - help="Do no apply any encodings/transformations", + '--raw', + action='store_true', + help='Do no apply any encodings/transformations', default=False) - group = parser.add_argument_group("hotkeys") + group = parser.add_argument_group('hotkeys') group.add_argument( - "--exit-char", + '--exit-char', type=int, metavar='NUM', - help="Unicode of special character that is used to exit the application, default: %(default)s", + help='Unicode of special character that is used to exit the application, default: %(default)s', default=0x1d) # GS/CTRL+] group.add_argument( - "--menu-char", + '--menu-char', type=int, metavar='NUM', - help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s", + help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s', default=0x14) # Menu: CTRL+T - group = parser.add_argument_group("diagnostics") + group = parser.add_argument_group('diagnostics') group.add_argument( - "-q", "--quiet", - action="store_true", - help="suppress non-error messages", + '-q', '--quiet', + action='store_true', + help='suppress non-error messages', default=False) group.add_argument( - "--develop", - action="store_true", - help="show Python traceback on error", + '--develop', + action='store_true', + help='show Python traceback on error', default=False) args = parser.parse_args() @@ -876,9 +938,12 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive')) serial_instance.rts = args.rts + if isinstance(serial_instance, serial.Serial): + serial_instance.exclusive = args.exclusive + serial_instance.open() except serial.SerialException as e: - sys.stderr.write('could not open port {}: {}\n'.format(repr(args.port), e)) + sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e)) if args.develop: raise if not args.ask: @@ -914,7 +979,7 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr except KeyboardInterrupt: pass if not args.quiet: - sys.stderr.write("\n--- exit ---\n") + sys.stderr.write('\n--- exit ---\n') miniterm.join() miniterm.close() |