summaryrefslogtreecommitdiff
path: root/serial/tools
diff options
context:
space:
mode:
Diffstat (limited to 'serial/tools')
-rw-r--r--serial/tools/hexlify_codec.py2
-rw-r--r--serial/tools/list_ports.py15
-rw-r--r--serial/tools/list_ports_common.py36
-rw-r--r--serial/tools/list_ports_linux.py38
-rw-r--r--serial/tools/list_ports_osx.py5
-rw-r--r--serial/tools/list_ports_posix.py34
-rw-r--r--serial/tools/list_ports_windows.py19
-rw-r--r--serial/tools/miniterm.py355
8 files changed, 331 insertions, 173 deletions
diff --git a/serial/tools/hexlify_codec.py b/serial/tools/hexlify_codec.py
index 1371da2..bd8f6b0 100644
--- a/serial/tools/hexlify_codec.py
+++ b/serial/tools/hexlify_codec.py
@@ -18,6 +18,8 @@ Therefore decoding is binary to text and thus converting binary data to hex dump
"""
+from __future__ import absolute_import
+
import codecs
import serial
diff --git a/serial/tools/list_ports.py b/serial/tools/list_ports.py
index 2271dd1..0d7e3d4 100644
--- a/serial/tools/list_ports.py
+++ b/serial/tools/list_ports.py
@@ -16,6 +16,8 @@ Additionally a grep function is supplied that can be used to search for ports
based on their descriptions or hardware ID.
"""
+from __future__ import absolute_import
+
import sys
import os
import re
@@ -34,14 +36,14 @@ else:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-def grep(regexp):
+def grep(regexp, include_links=False):
"""\
Search for ports using a regular expression. Port name, description and
hardware ID are searched. The function returns an iterable that returns the
same tuples as comport() would do.
"""
r = re.compile(regexp, re.I)
- for info in comports():
+ for info in comports(include_links):
port, desc, hwid = info
if r.search(port) or r.search(desc) or r.search(hwid):
yield info
@@ -73,6 +75,11 @@ def main():
type=int,
help='only output the N-th entry')
+ parser.add_argument(
+ '-s', '--include-links',
+ action='store_true',
+ help='include entries that are symlinks to real devices')
+
args = parser.parse_args()
hits = 0
@@ -80,9 +87,9 @@ def main():
if args.regexp:
if not args.quiet:
sys.stderr.write("Filtered list with regexp: {!r}\n".format(args.regexp))
- iterator = sorted(grep(args.regexp))
+ iterator = sorted(grep(args.regexp, include_links=args.include_links))
else:
- iterator = sorted(comports())
+ iterator = sorted(comports(include_links=args.include_links))
# list them
for n, (port, desc, hwid) in enumerate(iterator, 1):
if args.n is None or args.n == n:
diff --git a/serial/tools/list_ports_common.py b/serial/tools/list_ports_common.py
index df12939..617f3dc 100644
--- a/serial/tools/list_ports_common.py
+++ b/serial/tools/list_ports_common.py
@@ -7,7 +7,13 @@
# (C) 2015 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
+
+from __future__ import absolute_import
+
import re
+import glob
+import os
+import os.path
def numsplit(text):
@@ -29,9 +35,9 @@ def numsplit(text):
class ListPortInfo(object):
"""Info collection base class for serial ports"""
- def __init__(self, device=None):
+ def __init__(self, device, skip_link_detection=False):
self.device = device
- self.name = None
+ self.name = os.path.basename(device)
self.description = 'n/a'
self.hwid = 'n/a'
# USB specific data
@@ -42,6 +48,9 @@ class ListPortInfo(object):
self.manufacturer = None
self.product = None
self.interface = None
+ # special handling for links
+ if not skip_link_detection and device is not None and os.path.islink(device):
+ self.hwid = 'LINK={}'.format(os.path.realpath(device))
def usb_description(self):
"""return a short string to name the port based on USB info"""
@@ -66,9 +75,16 @@ class ListPortInfo(object):
self.hwid = self.usb_info()
def __eq__(self, other):
- return self.device == other.device
+ return isinstance(other, ListPortInfo) and self.device == other.device
+
+ def __hash__(self):
+ return hash(self.device)
def __lt__(self, other):
+ if not isinstance(other, ListPortInfo):
+ raise TypeError('unorderable types: {}() and {}()'.format(
+ type(self).__name__,
+ type(other).__name__))
return numsplit(self.device) < numsplit(other.device)
def __str__(self):
@@ -85,6 +101,20 @@ class ListPortInfo(object):
else:
raise IndexError('{} > 2'.format(index))
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+def list_links(devices):
+ """\
+ search all /dev devices and look for symlinks to known ports already
+ listed in devices.
+ """
+ links = []
+ for device in glob.glob('/dev/*'):
+ if os.path.islink(device) and os.path.realpath(device) in devices:
+ links.append(device)
+ return links
+
+
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test
if __name__ == '__main__':
diff --git a/serial/tools/list_ports_linux.py b/serial/tools/list_ports_linux.py
index 0dfa81f..9346ae9 100644
--- a/serial/tools/list_ports_linux.py
+++ b/serial/tools/list_ports_linux.py
@@ -8,6 +8,8 @@
#
# SPDX-License-Identifier: BSD-3-Clause
+from __future__ import absolute_import
+
import glob
import os
from serial.tools import list_ports_common
@@ -18,7 +20,12 @@ class SysFS(list_ports_common.ListPortInfo):
def __init__(self, device):
super(SysFS, self).__init__(device)
- self.name = os.path.basename(device)
+ # special handling for links
+ if device is not None and os.path.islink(device):
+ device = os.path.realpath(device)
+ is_link = True
+ else:
+ is_link = False
self.usb_device_path = None
if os.path.exists('/sys/class/tty/{}/device'.format(self.name)):
self.device_path = os.path.realpath('/sys/class/tty/{}/device'.format(self.name))
@@ -28,17 +35,28 @@ class SysFS(list_ports_common.ListPortInfo):
self.subsystem = None
# check device type
if self.subsystem == 'usb-serial':
- self.usb_device_path = os.path.dirname(os.path.dirname(self.device_path))
+ self.usb_interface_path = os.path.dirname(self.device_path)
elif self.subsystem == 'usb':
- self.usb_device_path = os.path.dirname(self.device_path)
+ self.usb_interface_path = self.device_path
else:
- self.usb_device_path = None
+ self.usb_interface_path = None
# fill-in info for USB devices
- if self.usb_device_path is not None:
+ if self.usb_interface_path is not None:
+ self.usb_device_path = os.path.dirname(self.usb_interface_path)
+
+ try:
+ num_if = int(self.read_line(self.usb_device_path, 'bNumInterfaces'))
+ except ValueError:
+ num_if = 1
+
self.vid = int(self.read_line(self.usb_device_path, 'idVendor'), 16)
self.pid = int(self.read_line(self.usb_device_path, 'idProduct'), 16)
self.serial_number = self.read_line(self.usb_device_path, 'serial')
- self.location = os.path.basename(self.usb_device_path)
+ if num_if > 1: # multi interface devices like FT4232
+ self.location = os.path.basename(self.usb_interface_path)
+ else:
+ self.location = os.path.basename(self.usb_device_path)
+
self.manufacturer = self.read_line(self.usb_device_path, 'manufacturer')
self.product = self.read_line(self.usb_device_path, 'product')
self.interface = self.read_line(self.device_path, 'interface')
@@ -53,6 +71,9 @@ class SysFS(list_ports_common.ListPortInfo):
self.description = self.name
self.hwid = os.path.basename(self.device_path)
+ if is_link:
+ self.hwid += ' LINK={}'.format(device)
+
def read_line(self, *args):
"""\
Helper function to read a single line from a file.
@@ -67,13 +88,16 @@ class SysFS(list_ports_common.ListPortInfo):
return None
-def comports():
+def comports(include_links=False):
devices = glob.glob('/dev/ttyS*') # built-in serial ports
devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver
+ devices.extend(glob.glob('/dev/ttyXRUSB*')) # xr-usb-serial port exar (DELL Edge 3001)
devices.extend(glob.glob('/dev/ttyACM*')) # usb-serial with CDC-ACM profile
devices.extend(glob.glob('/dev/ttyAMA*')) # ARM internal port (raspi)
devices.extend(glob.glob('/dev/rfcomm*')) # BT serial devices
devices.extend(glob.glob('/dev/ttyAP*')) # Advantech multi-port serial controllers
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [info
for info in [SysFS(d) for d in devices]
if info.subsystem != "platform"] # hide non-present internal serial ports
diff --git a/serial/tools/list_ports_osx.py b/serial/tools/list_ports_osx.py
index 1d57b96..f46a820 100644
--- a/serial/tools/list_ports_osx.py
+++ b/serial/tools/list_ports_osx.py
@@ -21,6 +21,8 @@
# Also see the 'IORegistryExplorer' for an idea of what we are actually searching
+from __future__ import absolute_import
+
import ctypes
import ctypes.util
@@ -227,7 +229,8 @@ def search_for_locationID_in_interfaces(serial_interfaces, locationID):
return None
-def comports():
+def comports(include_links=False):
+ # XXX include_links is currently ignored. are links in /dev even supported here?
# Scan for all iokit serial ports
services = GetIOServicesByType('IOSerialBSDClient')
ports = []
diff --git a/serial/tools/list_ports_posix.py b/serial/tools/list_ports_posix.py
index 6ea4db9..79bc8ed 100644
--- a/serial/tools/list_ports_posix.py
+++ b/serial/tools/list_ports_posix.py
@@ -16,6 +16,8 @@ As currently no method is known to get the second two strings easily, they are
currently just identical to the port name.
"""
+from __future__ import absolute_import
+
import glob
import sys
import os
@@ -34,48 +36,64 @@ elif plat == 'cygwin': # cygwin/win32
# cygwin accepts /dev/com* in many contexts
# (such as 'open' call, explicit 'ls'), but 'glob.glob'
# and bare 'ls' do not; so use /dev/ttyS* instead
- def comports():
+ def comports(include_links=False):
devices = glob.glob('/dev/ttyS*')
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:7] == 'openbsd': # OpenBSD
- def comports():
+ def comports(include_links=False):
devices = glob.glob('/dev/cua*')
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:3] == 'bsd' or plat[:7] == 'freebsd':
- def comports():
+ def comports(include_links=False):
devices = glob.glob('/dev/cua*[!.init][!.lock]')
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:6] == 'netbsd': # NetBSD
- def comports():
+ def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/dty*')
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:4] == 'irix': # IRIX
- def comports():
+ def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/ttyf*')
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:2] == 'hp': # HP-UX (not tested)
- def comports():
+ def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/tty*p0')
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:5] == 'sunos': # Solaris/SunOS
- def comports():
+ def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/tty*c')
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
elif plat[:3] == 'aix': # AIX
- def comports():
+ def comports(include_links=False):
"""scan for available ports. return a list of device names."""
devices = glob.glob('/dev/tty*')
+ if include_links:
+ devices.extend(list_ports_common.list_links(devices))
return [list_ports_common.ListPortInfo(d) for d in devices]
else:
diff --git a/serial/tools/list_ports_windows.py b/serial/tools/list_ports_windows.py
index 93fa128..19b9499 100644
--- a/serial/tools/list_ports_windows.py
+++ b/serial/tools/list_ports_windows.py
@@ -8,6 +8,8 @@
#
# SPDX-License-Identifier: BSD-3-Clause
+from __future__ import absolute_import
+
# pylint: disable=invalid-name,too-few-public-methods
import re
import ctypes
@@ -113,7 +115,7 @@ RegCloseKey.argtypes = [HKEY]
RegCloseKey.restype = LONG
RegQueryValueEx = advapi32.RegQueryValueExW
-RegQueryValueEx.argtypes = [HKEY, LPCTSTR , LPDWORD, LPDWORD, LPBYTE, LPDWORD]
+RegQueryValueEx.argtypes = [HKEY, LPCTSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD]
RegQueryValueEx.restype = LONG
@@ -143,6 +145,7 @@ def iterate_comports():
# repeat for all possible GUIDs
for index in range(guids_size.value):
+ bInterfaceNumber = None
g_hdi = SetupDiGetClassDevs(
ctypes.byref(GUIDs[index]),
None,
@@ -205,18 +208,20 @@ def iterate_comports():
# stringify
szHardwareID_str = szHardwareID.value
- info = list_ports_common.ListPortInfo(port_name_buffer.value)
+ info = list_ports_common.ListPortInfo(port_name_buffer.value, skip_link_detection=True)
# in case of USB, make a more readable string, similar to that form
# that we also generate on other platforms
if szHardwareID_str.startswith('USB'):
- m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(\\(\w+))?', szHardwareID_str, re.I)
+ m = re.search(r'VID_([0-9a-f]{4})(&PID_([0-9a-f]{4}))?(&MI_(\d{2}))?(\\(\w+))?', szHardwareID_str, re.I)
if m:
info.vid = int(m.group(1), 16)
if m.group(3):
info.pid = int(m.group(3), 16)
if m.group(5):
- info.serial_number = m.group(5)
+ bInterfaceNumber = int(m.group(5))
+ if m.group(7):
+ info.serial_number = m.group(7)
# calculate a location string
loc_path_str = ctypes.create_unicode_buffer(250)
if SetupDiGetDeviceRegistryProperty(
@@ -238,6 +243,10 @@ def iterate_comports():
else:
location.append('-')
location.append(g.group(2))
+ if bInterfaceNumber is not None:
+ location.append(':{}.{}'.format(
+ 'x', # XXX how to determine correct bConfigurationValue?
+ bInterfaceNumber))
if location:
info.location = ''.join(location)
info.hwid = info.usb_info()
@@ -287,7 +296,7 @@ def iterate_comports():
SetupDiDestroyDeviceInfoList(g_hdi)
-def comports():
+def comports(include_links=False):
"""Return a list of info objects about serial ports"""
return list(iterate_comports())
diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py
index 14182f0..3b8d5d2 100644
--- a/serial/tools/miniterm.py
+++ b/serial/tools/miniterm.py
@@ -3,10 +3,12 @@
# Very simple serial terminal
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
-# (C)2002-2015 Chris Liechti <cliechti@gmx.net>
+# (C)2002-2017 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
+from __future__ import absolute_import
+
import codecs
import os
import sys
@@ -275,12 +277,12 @@ class DebugIO(Transform):
"""Print what is sent and received"""
def rx(self, text):
- sys.stderr.write(' [RX:{}] '.format(repr(text)))
+ sys.stderr.write(' [RX:{!r}] '.format(text))
sys.stderr.flush()
return text
def tx(self, text):
- sys.stderr.write(' [TX:{}] '.format(repr(text)))
+ sys.stderr.write(' [TX:{!r}] '.format(text))
sys.stderr.flush()
return text
@@ -315,7 +317,7 @@ def ask_for_port():
sys.stderr.write('\n--- Available ports:\n')
ports = []
for n, (port, desc, hwid) in enumerate(sorted(comports()), 1):
- sys.stderr.write('--- {:2}: {:20} {}\n'.format(n, port, desc))
+ sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc))
ports.append(port)
while True:
port = raw_input('--- Enter port index or full name: ')
@@ -347,8 +349,8 @@ class Miniterm(object):
self.eol = eol
self.filters = filters
self.update_transformations()
- self.exit_character = 0x1d # GS/CTRL+]
- self.menu_character = 0x14 # Menu: CTRL+T
+ self.exit_character = unichr(0x1d) # GS/CTRL+]
+ self.menu_character = unichr(0x14) # Menu: CTRL+T
self.alive = None
self._reader_alive = None
self.receiver_thread = None
@@ -502,25 +504,7 @@ class Miniterm(object):
if self.echo:
self.console.write(c)
elif c == '\x15': # CTRL+U -> upload file
- sys.stderr.write('\n--- File to upload: ')
- sys.stderr.flush()
- with self.console:
- filename = sys.stdin.readline().rstrip('\r\n')
- if filename:
- try:
- with open(filename, 'rb') as f:
- sys.stderr.write('--- Sending file {} ---\n'.format(filename))
- while True:
- block = f.read(1024)
- if not block:
- break
- self.serial.write(block)
- # Wait for output buffer to drain.
- self.serial.flush()
- sys.stderr.write('.') # Progress indicator.
- sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
- except IOError as e:
- sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
+ self.upload_file()
elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
sys.stderr.write(self.get_help_text())
elif c == '\x12': # CTRL+R -> Toggle RTS
@@ -536,24 +520,9 @@ class Miniterm(object):
self.echo = not self.echo
sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive'))
elif c == '\x06': # CTRL+F -> edit filters
- sys.stderr.write('\n--- Available Filters:\n')
- sys.stderr.write('\n'.join(
- '--- {:<10} = {.__doc__}'.format(k, v)
- for k, v in sorted(TRANSFORMATIONS.items())))
- sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
- with self.console:
- new_filters = sys.stdin.readline().lower().split()
- if new_filters:
- for f in new_filters:
- if f not in TRANSFORMATIONS:
- sys.stderr.write('--- unknown filter: {}\n'.format(repr(f)))
- break
- else:
- self.filters = new_filters
- self.update_transformations()
- sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
+ self.change_filter()
elif c == '\x0c': # CTRL+L -> EOL mode
- modes = list(EOL_TRANSFORMATIONS) # keys
+ modes = list(EOL_TRANSFORMATIONS) # keys
eol = modes.index(self.eol) + 1
if eol >= len(modes):
eol = 0
@@ -561,63 +530,17 @@ class Miniterm(object):
sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper()))
self.update_transformations()
elif c == '\x01': # CTRL+A -> set encoding
- sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
- with self.console:
- new_encoding = sys.stdin.readline().strip()
- if new_encoding:
- try:
- codecs.lookup(new_encoding)
- except LookupError:
- sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
- else:
- self.set_rx_encoding(new_encoding)
- self.set_tx_encoding(new_encoding)
- sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
- sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
+ self.change_encoding()
elif c == '\x09': # CTRL+I -> info
self.dump_port_settings()
#~ elif c == '\x01': # CTRL+A -> cycle escape mode
#~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode
elif c in 'pP': # P -> change port
- with self.console:
- try:
- port = ask_for_port()
- except KeyboardInterrupt:
- port = None
- if port and port != self.serial.port:
- # reader thread needs to be shut down
- self._stop_reader()
- # save settings
- settings = self.serial.getSettingsDict()
- try:
- new_serial = serial.serial_for_url(port, do_not_open=True)
- # restore settings and open
- new_serial.applySettingsDict(settings)
- new_serial.rts = self.serial.rts
- new_serial.dtr = self.serial.dtr
- new_serial.open()
- new_serial.break_condition = self.serial.break_condition
- except Exception as e:
- sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
- new_serial.close()
- else:
- self.serial.close()
- self.serial = new_serial
- sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
- # and restart the reader thread
- self._start_reader()
+ self.change_port()
+ elif c in 'sS': # S -> suspend / open port temporarily
+ self.suspend_port()
elif c in 'bB': # B -> change baudrate
- sys.stderr.write('\n--- Baudrate: ')
- sys.stderr.flush()
- with self.console:
- backup = self.serial.baudrate
- try:
- self.serial.baudrate = int(sys.stdin.readline().strip())
- except ValueError as e:
- sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
- self.serial.baudrate = backup
- else:
- self.dump_port_settings()
+ self.change_baudrate()
elif c == '8': # 8 -> change to 8 bits
self.serial.bytesize = serial.EIGHTBITS
self.dump_port_settings()
@@ -657,6 +580,138 @@ class Miniterm(object):
else:
sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c)))
+ def upload_file(self):
+ """Ask user for filenname and send its contents"""
+ sys.stderr.write('\n--- File to upload: ')
+ sys.stderr.flush()
+ with self.console:
+ filename = sys.stdin.readline().rstrip('\r\n')
+ if filename:
+ try:
+ with open(filename, 'rb') as f:
+ sys.stderr.write('--- Sending file {} ---\n'.format(filename))
+ while True:
+ block = f.read(1024)
+ if not block:
+ break
+ self.serial.write(block)
+ # Wait for output buffer to drain.
+ self.serial.flush()
+ sys.stderr.write('.') # Progress indicator.
+ sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
+ except IOError as e:
+ sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
+
+ def change_filter(self):
+ """change the i/o transformations"""
+ sys.stderr.write('\n--- Available Filters:\n')
+ sys.stderr.write('\n'.join(
+ '--- {:<10} = {.__doc__}'.format(k, v)
+ for k, v in sorted(TRANSFORMATIONS.items())))
+ sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
+ with self.console:
+ new_filters = sys.stdin.readline().lower().split()
+ if new_filters:
+ for f in new_filters:
+ if f not in TRANSFORMATIONS:
+ sys.stderr.write('--- unknown filter: {!r}\n'.format(f))
+ break
+ else:
+ self.filters = new_filters
+ self.update_transformations()
+ sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
+
+ def change_encoding(self):
+ """change encoding on the serial port"""
+ sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
+ with self.console:
+ new_encoding = sys.stdin.readline().strip()
+ if new_encoding:
+ try:
+ codecs.lookup(new_encoding)
+ except LookupError:
+ sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
+ else:
+ self.set_rx_encoding(new_encoding)
+ self.set_tx_encoding(new_encoding)
+ sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
+ sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
+
+ def change_baudrate(self):
+ """change the baudrate"""
+ sys.stderr.write('\n--- Baudrate: ')
+ sys.stderr.flush()
+ with self.console:
+ backup = self.serial.baudrate
+ try:
+ self.serial.baudrate = int(sys.stdin.readline().strip())
+ except ValueError as e:
+ sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
+ self.serial.baudrate = backup
+ else:
+ self.dump_port_settings()
+
+ def change_port(self):
+ """Have a conversation with the user to change the serial port"""
+ with self.console:
+ try:
+ port = ask_for_port()
+ except KeyboardInterrupt:
+ port = None
+ if port and port != self.serial.port:
+ # reader thread needs to be shut down
+ self._stop_reader()
+ # save settings
+ settings = self.serial.getSettingsDict()
+ try:
+ new_serial = serial.serial_for_url(port, do_not_open=True)
+ # restore settings and open
+ new_serial.applySettingsDict(settings)
+ new_serial.rts = self.serial.rts
+ new_serial.dtr = self.serial.dtr
+ new_serial.open()
+ new_serial.break_condition = self.serial.break_condition
+ except Exception as e:
+ sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
+ new_serial.close()
+ else:
+ self.serial.close()
+ self.serial = new_serial
+ sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
+ # and restart the reader thread
+ self._start_reader()
+
+ def suspend_port(self):
+ """\
+ open port temporarily, allow reconnect, exit and port change to get
+ out of the loop
+ """
+ # reader thread needs to be shut down
+ self._stop_reader()
+ self.serial.close()
+ sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
+ do_change_port = False
+ while not self.serial.is_open:
+ sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format(
+ exit=key_description(self.exit_character)))
+ k = self.console.getkey()
+ if k == self.exit_character:
+ self.stop() # exit app
+ break
+ elif k in 'pP':
+ do_change_port = True
+ break
+ try:
+ self.serial.open()
+ except Exception as e:
+ sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
+ if do_change_port:
+ self.change_port()
+ else:
+ # and restart the reader thread
+ self._start_reader()
+ sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
+
def get_help_text(self):
"""return the help text"""
# help text, starts with blank line!
@@ -707,123 +762,130 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
import argparse
parser = argparse.ArgumentParser(
- description="Miniterm - A simple terminal program for the serial port.")
+ description='Miniterm - A simple terminal program for the serial port.')
parser.add_argument(
- "port",
+ 'port',
nargs='?',
- help="serial port name ('-' to show port list)",
+ help='serial port name ("-" to show port list)',
default=default_port)
parser.add_argument(
- "baudrate",
+ 'baudrate',
nargs='?',
type=int,
- help="set baud rate, default: %(default)s",
+ help='set baud rate, default: %(default)s',
default=default_baudrate)
- group = parser.add_argument_group("port settings")
+ group = parser.add_argument_group('port settings')
group.add_argument(
- "--parity",
+ '--parity',
choices=['N', 'E', 'O', 'S', 'M'],
type=lambda c: c.upper(),
- help="set parity, one of {N E O S M}, default: N",
+ help='set parity, one of {N E O S M}, default: N',
default='N')
group.add_argument(
- "--rtscts",
- action="store_true",
- help="enable RTS/CTS flow control (default off)",
+ '--rtscts',
+ action='store_true',
+ help='enable RTS/CTS flow control (default off)',
default=False)
group.add_argument(
- "--xonxoff",
- action="store_true",
- help="enable software flow control (default off)",
+ '--xonxoff',
+ action='store_true',
+ help='enable software flow control (default off)',
default=False)
group.add_argument(
- "--rts",
+ '--rts',
type=int,
- help="set initial RTS line state (possible values: 0, 1)",
+ help='set initial RTS line state (possible values: 0, 1)',
default=default_rts)
group.add_argument(
- "--dtr",
+ '--dtr',
type=int,
- help="set initial DTR line state (possible values: 0, 1)",
+ help='set initial DTR line state (possible values: 0, 1)',
default=default_dtr)
group.add_argument(
- "--ask",
- action="store_true",
- help="ask again for port when open fails",
+ '--non-exclusive',
+ dest='exclusive',
+ action='store_false',
+ help='disable locking for native ports',
+ default=True)
+
+ group.add_argument(
+ '--ask',
+ action='store_true',
+ help='ask again for port when open fails',
default=False)
- group = parser.add_argument_group("data handling")
+ group = parser.add_argument_group('data handling')
group.add_argument(
- "-e", "--echo",
- action="store_true",
- help="enable local echo (default off)",
+ '-e', '--echo',
+ action='store_true',
+ help='enable local echo (default off)',
default=False)
group.add_argument(
- "--encoding",
- dest="serial_port_encoding",
- metavar="CODEC",
- help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s",
+ '--encoding',
+ dest='serial_port_encoding',
+ metavar='CODEC',
+ help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s',
default='UTF-8')
group.add_argument(
- "-f", "--filter",
- action="append",
- metavar="NAME",
- help="add text transformation",
+ '-f', '--filter',
+ action='append',
+ metavar='NAME',
+ help='add text transformation',
default=[])
group.add_argument(
- "--eol",
+ '--eol',
choices=['CR', 'LF', 'CRLF'],
type=lambda c: c.upper(),
- help="end of line mode",
+ help='end of line mode',
default='CRLF')
group.add_argument(
- "--raw",
- action="store_true",
- help="Do no apply any encodings/transformations",
+ '--raw',
+ action='store_true',
+ help='Do no apply any encodings/transformations',
default=False)
- group = parser.add_argument_group("hotkeys")
+ group = parser.add_argument_group('hotkeys')
group.add_argument(
- "--exit-char",
+ '--exit-char',
type=int,
metavar='NUM',
- help="Unicode of special character that is used to exit the application, default: %(default)s",
+ help='Unicode of special character that is used to exit the application, default: %(default)s',
default=0x1d) # GS/CTRL+]
group.add_argument(
- "--menu-char",
+ '--menu-char',
type=int,
metavar='NUM',
- help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s",
+ help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s',
default=0x14) # Menu: CTRL+T
- group = parser.add_argument_group("diagnostics")
+ group = parser.add_argument_group('diagnostics')
group.add_argument(
- "-q", "--quiet",
- action="store_true",
- help="suppress non-error messages",
+ '-q', '--quiet',
+ action='store_true',
+ help='suppress non-error messages',
default=False)
group.add_argument(
- "--develop",
- action="store_true",
- help="show Python traceback on error",
+ '--develop',
+ action='store_true',
+ help='show Python traceback on error',
default=False)
args = parser.parse_args()
@@ -876,9 +938,12 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive'))
serial_instance.rts = args.rts
+ if isinstance(serial_instance, serial.Serial):
+ serial_instance.exclusive = args.exclusive
+
serial_instance.open()
except serial.SerialException as e:
- sys.stderr.write('could not open port {}: {}\n'.format(repr(args.port), e))
+ sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e))
if args.develop:
raise
if not args.ask:
@@ -914,7 +979,7 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
except KeyboardInterrupt:
pass
if not args.quiet:
- sys.stderr.write("\n--- exit ---\n")
+ sys.stderr.write('\n--- exit ---\n')
miniterm.join()
miniterm.close()