summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Liechti <cliechti@gmx.net>2015-09-22 23:13:44 +0200
committerChris Liechti <cliechti@gmx.net>2015-09-22 23:13:44 +0200
commitf565693c462d9987456fe22b44cce1a7b5036711 (patch)
treef47db4bdf720acbc1fe3c55eb1a193f374873124
parent087bee33c811a93bfe24c731ee599ed9d28c1956 (diff)
downloadpyserial-git-f565693c462d9987456fe22b44cce1a7b5036711.tar.gz
fix EOL issues
-rwxr-xr-xexamples/port_publisher.py1140
-rw-r--r--serial/tools/list_ports_linux.py312
-rw-r--r--serial/tools/list_ports_posix.py212
-rw-r--r--serial/tools/list_ports_windows.py688
4 files changed, 1176 insertions, 1176 deletions
diff --git a/examples/port_publisher.py b/examples/port_publisher.py
index d426da0..074a529 100755
--- a/examples/port_publisher.py
+++ b/examples/port_publisher.py
@@ -1,570 +1,570 @@
-#! /usr/bin/env python
-#
-# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier: BSD-3-Clause
-"""\
-Multi-port serial<->TCP/IP forwarder.
-- RFC 2217
-- check existence of serial port periodically
-- start/stop forwarders
-- each forwarder creates a server socket and opens the serial port
-- serial ports are opened only once. network connect/disconnect
- does not influence serial port
-- only one client per connection
-"""
-import os
-import select
-import socket
-import sys
-import time
-import traceback
-
-import serial
-import serial.rfc2217
-import serial.tools.list_ports
-
-import dbus
-
-# Try to import the avahi service definitions properly. If the avahi module is
-# not available, fall back to a hard-coded solution that hopefully still works.
-try:
- import avahi
-except ImportError:
- class avahi:
- DBUS_NAME = "org.freedesktop.Avahi"
- DBUS_PATH_SERVER = "/"
- DBUS_INTERFACE_SERVER = "org.freedesktop.Avahi.Server"
- DBUS_INTERFACE_ENTRY_GROUP = DBUS_NAME + ".EntryGroup"
- IF_UNSPEC = -1
- PROTO_UNSPEC, PROTO_INET, PROTO_INET6 = -1, 0, 1
-
-
-class ZeroconfService:
- """\
- A simple class to publish a network service with zeroconf using avahi.
- """
-
- def __init__(self, name, port, stype="_http._tcp",
- domain="", host="", text=""):
- self.name = name
- self.stype = stype
- self.domain = domain
- self.host = host
- self.port = port
- self.text = text
- self.group = None
-
- def publish(self):
- bus = dbus.SystemBus()
- server = dbus.Interface(
- bus.get_object(
- avahi.DBUS_NAME,
- avahi.DBUS_PATH_SERVER
- ),
- avahi.DBUS_INTERFACE_SERVER
- )
-
- g = dbus.Interface(
- bus.get_object(
- avahi.DBUS_NAME,
- server.EntryGroupNew()
- ),
- avahi.DBUS_INTERFACE_ENTRY_GROUP
- )
-
- g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
- self.name, self.stype, self.domain, self.host,
- dbus.UInt16(self.port), self.text)
-
- g.Commit()
- self.group = g
-
- def unpublish(self):
- if self.group is not None:
- self.group.Reset()
- self.group = None
-
- def __str__(self):
- return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype)
-
-
-class Forwarder(ZeroconfService):
- """\
- Single port serial<->TCP/IP forarder that depends on an external select
- loop.
- - Buffers for serial -> network and network -> serial
- - RFC 2217 state
- - Zeroconf publish/unpublish on open/close.
- """
-
- def __init__(self, device, name, network_port, on_close=None, log=None):
- ZeroconfService.__init__(self, name, network_port, stype='_serial_port._tcp')
- self.alive = False
- self.network_port = network_port
- self.on_close = on_close
- self.log = log
- self.device = device
- self.serial = serial.Serial()
- self.serial.port = device
- self.serial.baudrate = 115200
- self.serial.timeout = 0
- self.socket = None
- self.server_socket = None
- self.rfc2217 = None # instantiate later, when connecting
-
- def __del__(self):
- try:
- if self.alive:
- self.close()
- except:
- pass # XXX errors on shutdown
-
- def open(self):
- """open serial port, start network server and publish service"""
- self.buffer_net2ser = bytearray()
- self.buffer_ser2net = bytearray()
-
- # open serial port
- try:
- self.serial.rts = False
- self.serial.open()
- except Exception as msg:
- self.handle_serial_error(msg)
-
- self.serial_settings_backup = self.serial.get_settings()
-
- # start the socket server
- # XXX add IPv6 support: use getaddrinfo for socket options, bind to multiple sockets?
- # info_list = socket.getaddrinfo(None, port, 0, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR,
- self.server_socket.getsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR
- ) | 1
- )
- self.server_socket.setblocking(0)
- try:
- self.server_socket.bind(('', self.network_port))
- self.server_socket.listen(1)
- except socket.error as msg:
- self.handle_server_error()
- #~ raise
- if self.log is not None:
- self.log.info("%s: Waiting for connection on %s..." % (self.device, self.network_port))
-
- # zeroconfig
- self.publish()
-
- # now we are ready
- self.alive = True
-
- def close(self):
- """Close all resources and unpublish service"""
- if self.log is not None:
- self.log.info("%s: closing..." % (self.device, ))
- self.alive = False
- self.unpublish()
- if self.server_socket:
- self.server_socket.close()
- if self.socket:
- self.handle_disconnect()
- self.serial.close()
- if self.on_close is not None:
- # ensure it is only called once
- callback = self.on_close
- self.on_close = None
- callback(self)
-
- def write(self, data):
- """the write method is used by serial.rfc2217.PortManager. it has to
- write to the network."""
- self.buffer_ser2net += data
-
- def update_select_maps(self, read_map, write_map, error_map):
- """Update dictionaries for select call. insert fd->callback mapping"""
- if self.alive:
- # always handle serial port reads
- read_map[self.serial] = self.handle_serial_read
- error_map[self.serial] = self.handle_serial_error
- # handle serial port writes if buffer is not empty
- if self.buffer_net2ser:
- write_map[self.serial] = self.handle_serial_write
- # handle network
- if self.socket is not None:
- # handle socket if connected
- # only read from network if the internal buffer is not
- # already filled. the TCP flow control will hold back data
- if len(self.buffer_net2ser) < 2048:
- read_map[self.socket] = self.handle_socket_read
- # only check for write readiness when there is data
- if self.buffer_ser2net:
- write_map[self.socket] = self.handle_socket_write
- error_map[self.socket] = self.handle_socket_error
- else:
- # no connection, ensure clear buffer
- self.buffer_ser2net = bytearray()
- # check the server socket
- read_map[self.server_socket] = self.handle_connect
- error_map[self.server_socket] = self.handle_server_error
-
- def handle_serial_read(self):
- """Reading from serial port"""
- try:
- data = os.read(self.serial.fileno(), 1024)
- if data:
- # store data in buffer if there is a client connected
- if self.socket is not None:
- # escape outgoing data when needed (Telnet IAC (0xff) character)
- if self.rfc2217:
- data = serial.to_bytes(self.rfc2217.escape(data))
- self.buffer_ser2net += data
- else:
- self.handle_serial_error()
- except Exception as msg:
- self.handle_serial_error(msg)
-
- def handle_serial_write(self):
- """Writing to serial port"""
- try:
- # write a chunk
- n = os.write(self.serial.fileno(), bytes(self.buffer_net2ser))
- # and see how large that chunk was, remove that from buffer
- self.buffer_net2ser = self.buffer_net2ser[n:]
- except Exception as msg:
- self.handle_serial_error(msg)
-
- def handle_serial_error(self, error=None):
- """Serial port error"""
- # terminate connection
- self.close()
-
- def handle_socket_read(self):
- """Read from socket"""
- try:
- # read a chunk from the serial port
- data = self.socket.recv(1024)
- if data:
- # Process RFC 2217 stuff when enabled
- if self.rfc2217:
- data = serial.to_bytes(self.rfc2217.filter(data))
- # add data to buffer
- self.buffer_net2ser += data
- else:
- # empty read indicates disconnection
- self.handle_disconnect()
- except socket.error:
- self.handle_socket_error()
-
- def handle_socket_write(self):
- """Write to socket"""
- try:
- # write a chunk
- count = self.socket.send(bytes(self.buffer_ser2net))
- # and remove the sent data from the buffer
- self.buffer_ser2net = self.buffer_ser2net[count:]
- except socket.error:
- self.handle_socket_error()
-
- def handle_socket_error(self):
- """Socket connection fails"""
- self.handle_disconnect()
-
- def handle_connect(self):
- """Server socket gets a connection"""
- # accept a connection in any case, close connection
- # below if already busy
- connection, addr = self.server_socket.accept()
- if self.socket is None:
- self.socket = connection
- # More quickly detect bad clients who quit without closing the
- # connection: After 1 second of idle, start sending TCP keep-alive
- # packets every 1 second. If 3 consecutive keep-alive packets
- # fail, assume the client is gone and close the connection.
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
- self.socket.setblocking(0)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- if self.log is not None:
- self.log.warning('%s: Connected by %s:%s' % (self.device, addr[0], addr[1]))
- self.serial.rts = True
- self.serial.dtr = True
- if self.log is not None:
- self.rfc2217 = serial.rfc2217.PortManager(self.serial, self, logger=log.getChild(self.device))
- else:
- self.rfc2217 = serial.rfc2217.PortManager(self.serial, self)
- else:
- # reject connection if there is already one
- connection.close()
- if self.log is not None:
- self.log.warning('%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1]))
-
- def handle_server_error(self):
- """Socket server fails"""
- self.close()
-
- def handle_disconnect(self):
- """Socket gets disconnected"""
- # signal disconnected terminal with control lines
- try:
- self.serial.rts = False
- self.serial.dtr = False
- finally:
- # restore original port configuration in case it was changed
- self.serial.apply_settings(self.serial_settings_backup)
- # stop RFC 2217 state machine
- self.rfc2217 = None
- # clear send buffer
- self.buffer_ser2net = bytearray()
- # close network connection
- if self.socket is not None:
- self.socket.close()
- self.socket = None
- if self.log is not None:
- self.log.warning('%s: Disconnected' % self.device)
-
-
-def test():
- service = ZeroconfService(name="TestService", port=3000)
- service.publish()
- raw_input("Press any key to unpublish the service ")
- service.unpublish()
-
-
-if __name__ == '__main__':
- import logging
- import argparse
-
- VERBOSTIY = [
- logging.ERROR, # 0
- logging.WARNING, # 1 (default)
- logging.INFO, # 2
- logging.DEBUG, # 3
- ]
-
- parser = argparse.ArgumentParser(usage="""\
-%(prog)s [options]
-
-Announce the existence of devices using zeroconf and provide
-a TCP/IP <-> serial port gateway (implements RFC 2217).
-
-If running as daemon, write to syslog. Otherwise write to stdout.
-""",
- epilog="""\
-NOTE: no security measures are implemented. Anyone can remotely connect
-to this service over the network.
-
-Only one connection at once, per port, is supported. When the connection is
-terminated, it waits for the next connect.
-""")
-
- group = parser.add_argument_group("serial port settings")
-
- group.add_argument(
- "--ports-regex",
- help="specify a regex to search against the serial devices and their descriptions (default: %(default)s)",
- default='/dev/ttyUSB[0-9]+',
- metavar="REGEX")
-
- group = parser.add_argument_group("network settings")
-
- group.add_argument(
- "--tcp-port",
- dest="base_port",
- help="specify lowest TCP port number (default: %(default)s)",
- default=7000,
- type=int,
- metavar="PORT")
-
- group = parser.add_argument_group("daemon")
-
- group.add_argument(
- "-d", "--daemon",
- dest="daemonize",
- action="store_true",
- help="start as daemon",
- default=False)
-
- group.add_argument(
- "--pidfile",
- help="specify a name for the PID file",
- default=None,
- metavar="FILE")
-
- group = parser.add_argument_group("diagnostics")
-
- group.add_argument(
- "-o", "--logfile",
- help="write messages file instead of stdout",
- default=None,
- metavar="FILE")
-
- group.add_argument(
- "-q", "--quiet",
- dest="verbosity",
- action="store_const",
- const=0,
- help="suppress most diagnostic messages",
- default=1)
-
- group.add_argument(
- "-v", "--verbose",
- dest="verbosity",
- action="count",
- help="increase diagnostic messages")
-
-
- args = parser.parse_args()
-
- # set up logging
- logging.basicConfig(level=VERBOSTIY[min(args.verbosity, len(VERBOSTIY) - 1)])
- log = logging.getLogger('port_publisher')
-
- # redirect output if specified
- if args.logfile is not None:
- class WriteFlushed:
- def __init__(self, fileobj):
- self.fileobj = fileobj
- def write(self, s):
- self.fileobj.write(s)
- self.fileobj.flush()
- def close(self):
- self.fileobj.close()
- sys.stdout = sys.stderr = WriteFlushed(open(args.logfile, 'a'))
- # atexit.register(lambda: sys.stdout.close())
-
- if args.daemonize:
- # if running as daemon is requested, do the fork magic
- # args.quiet = True
- # do the UNIX double-fork magic, see Stevens' "Advanced
- # Programming in the UNIX Environment" for details (ISBN 0201563177)
- try:
- pid = os.fork()
- if pid > 0:
- # exit first parent
- sys.exit(0)
- except OSError as e:
- log.critical("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
- sys.exit(1)
-
- # decouple from parent environment
- os.chdir("/") # don't prevent unmounting....
- os.setsid()
- os.umask(0)
-
- # do second fork
- try:
- pid = os.fork()
- if pid > 0:
- # exit from second parent, save eventual PID before
- if args.pidfile is not None:
- open(args.pidfile, 'w').write("%d" % pid)
- sys.exit(0)
- except OSError as e:
- log.critical("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
- sys.exit(1)
-
- if args.logfile is None:
- import syslog
- syslog.openlog("serial port publisher")
- # redirect output to syslog
- class WriteToSysLog:
- def __init__(self):
- self.buffer = ''
- def write(self, s):
- self.buffer += s
- if '\n' in self.buffer:
- output, self.buffer = self.buffer.split('\n', 1)
- syslog.syslog(output)
- def flush(self):
- syslog.syslog(self.buffer)
- self.buffer = ''
- def close(self):
- self.flush()
- sys.stdout = sys.stderr = WriteToSysLog()
-
- # ensure the that the daemon runs a normal user, if run as root
- #if os.getuid() == 0:
- # name, passwd, uid, gid, desc, home, shell = pwd.getpwnam('someuser')
- # os.setgid(gid) # set group first
- # os.setuid(uid) # set user
-
- # keep the published stuff in a dictionary
- published = {}
- # get a nice hostname
- hostname = socket.gethostname()
-
- def unpublish(forwarder):
- """when forwarders die, we need to unregister them"""
- try:
- del published[forwarder.device]
- except KeyError:
- pass
- else:
- log.info("unpublish: %s" % (forwarder))
-
- alive = True
- next_check = 0
- # main loop
- while alive:
- try:
- # if it is time, check for serial port devices
- now = time.time()
- if now > next_check:
- next_check = now + 5
- connected = [d for d, p, i in serial.tools.list_ports.grep(args.ports_regex)]
- # Handle devices that are published, but no longer connected
- for device in set(published).difference(connected):
- log.info("unpublish: %s" % (published[device]))
- unpublish(published[device])
- # Handle devices that are connected but not yet published
- for device in set(connected).difference(published):
- # Find the first available port, starting from 7000
- port = args.base_port
- ports_in_use = [f.network_port for f in published.values()]
- while port in ports_in_use:
- port += 1
- published[device] = Forwarder(
- device,
- "%s on %s" % (device, hostname),
- port,
- on_close=unpublish,
- log=log
- )
- log.warning("publish: %s" % (published[device]))
- published[device].open()
-
- # select_start = time.time()
- read_map = {}
- write_map = {}
- error_map = {}
- for publisher in published.values():
- publisher.update_select_maps(read_map, write_map, error_map)
- readers, writers, errors = select.select(
- read_map.keys(),
- write_map.keys(),
- error_map.keys(),
- 5
- )
- # select_end = time.time()
- # print "select used %.3f s" % (select_end - select_start)
- for reader in readers:
- read_map[reader]()
- for writer in writers:
- write_map[writer]()
- for error in errors:
- error_map[error]()
- # print "operation used %.3f s" % (time.time() - select_end)
- except KeyboardInterrupt:
- alive = False
- sys.stdout.write('\n')
- except SystemExit:
- raise
- except:
- #~ raise
- traceback.print_exc()
+#! /usr/bin/env python
+#
+# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
+#
+# SPDX-License-Identifier: BSD-3-Clause
+"""\
+Multi-port serial<->TCP/IP forwarder.
+- RFC 2217
+- check existence of serial port periodically
+- start/stop forwarders
+- each forwarder creates a server socket and opens the serial port
+- serial ports are opened only once. network connect/disconnect
+ does not influence serial port
+- only one client per connection
+"""
+import os
+import select
+import socket
+import sys
+import time
+import traceback
+
+import serial
+import serial.rfc2217
+import serial.tools.list_ports
+
+import dbus
+
+# Try to import the avahi service definitions properly. If the avahi module is
+# not available, fall back to a hard-coded solution that hopefully still works.
+try:
+ import avahi
+except ImportError:
+ class avahi:
+ DBUS_NAME = "org.freedesktop.Avahi"
+ DBUS_PATH_SERVER = "/"
+ DBUS_INTERFACE_SERVER = "org.freedesktop.Avahi.Server"
+ DBUS_INTERFACE_ENTRY_GROUP = DBUS_NAME + ".EntryGroup"
+ IF_UNSPEC = -1
+ PROTO_UNSPEC, PROTO_INET, PROTO_INET6 = -1, 0, 1
+
+
+class ZeroconfService:
+ """\
+ A simple class to publish a network service with zeroconf using avahi.
+ """
+
+ def __init__(self, name, port, stype="_http._tcp",
+ domain="", host="", text=""):
+ self.name = name
+ self.stype = stype
+ self.domain = domain
+ self.host = host
+ self.port = port
+ self.text = text
+ self.group = None
+
+ def publish(self):
+ bus = dbus.SystemBus()
+ server = dbus.Interface(
+ bus.get_object(
+ avahi.DBUS_NAME,
+ avahi.DBUS_PATH_SERVER
+ ),
+ avahi.DBUS_INTERFACE_SERVER
+ )
+
+ g = dbus.Interface(
+ bus.get_object(
+ avahi.DBUS_NAME,
+ server.EntryGroupNew()
+ ),
+ avahi.DBUS_INTERFACE_ENTRY_GROUP
+ )
+
+ g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
+ self.name, self.stype, self.domain, self.host,
+ dbus.UInt16(self.port), self.text)
+
+ g.Commit()
+ self.group = g
+
+ def unpublish(self):
+ if self.group is not None:
+ self.group.Reset()
+ self.group = None
+
+ def __str__(self):
+ return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype)
+
+
+class Forwarder(ZeroconfService):
+ """\
+ Single port serial<->TCP/IP forarder that depends on an external select
+ loop.
+ - Buffers for serial -> network and network -> serial
+ - RFC 2217 state
+ - Zeroconf publish/unpublish on open/close.
+ """
+
+ def __init__(self, device, name, network_port, on_close=None, log=None):
+ ZeroconfService.__init__(self, name, network_port, stype='_serial_port._tcp')
+ self.alive = False
+ self.network_port = network_port
+ self.on_close = on_close
+ self.log = log
+ self.device = device
+ self.serial = serial.Serial()
+ self.serial.port = device
+ self.serial.baudrate = 115200
+ self.serial.timeout = 0
+ self.socket = None
+ self.server_socket = None
+ self.rfc2217 = None # instantiate later, when connecting
+
+ def __del__(self):
+ try:
+ if self.alive:
+ self.close()
+ except:
+ pass # XXX errors on shutdown
+
+ def open(self):
+ """open serial port, start network server and publish service"""
+ self.buffer_net2ser = bytearray()
+ self.buffer_ser2net = bytearray()
+
+ # open serial port
+ try:
+ self.serial.rts = False
+ self.serial.open()
+ except Exception as msg:
+ self.handle_serial_error(msg)
+
+ self.serial_settings_backup = self.serial.get_settings()
+
+ # start the socket server
+ # XXX add IPv6 support: use getaddrinfo for socket options, bind to multiple sockets?
+ # info_list = socket.getaddrinfo(None, port, 0, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+ self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.server_socket.setsockopt(
+ socket.SOL_SOCKET,
+ socket.SO_REUSEADDR,
+ self.server_socket.getsockopt(
+ socket.SOL_SOCKET,
+ socket.SO_REUSEADDR
+ ) | 1
+ )
+ self.server_socket.setblocking(0)
+ try:
+ self.server_socket.bind(('', self.network_port))
+ self.server_socket.listen(1)
+ except socket.error as msg:
+ self.handle_server_error()
+ #~ raise
+ if self.log is not None:
+ self.log.info("%s: Waiting for connection on %s..." % (self.device, self.network_port))
+
+ # zeroconfig
+ self.publish()
+
+ # now we are ready
+ self.alive = True
+
+ def close(self):
+ """Close all resources and unpublish service"""
+ if self.log is not None:
+ self.log.info("%s: closing..." % (self.device, ))
+ self.alive = False
+ self.unpublish()
+ if self.server_socket:
+ self.server_socket.close()
+ if self.socket:
+ self.handle_disconnect()
+ self.serial.close()
+ if self.on_close is not None:
+ # ensure it is only called once
+ callback = self.on_close
+ self.on_close = None
+ callback(self)
+
+ def write(self, data):
+ """the write method is used by serial.rfc2217.PortManager. it has to
+ write to the network."""
+ self.buffer_ser2net += data
+
+ def update_select_maps(self, read_map, write_map, error_map):
+ """Update dictionaries for select call. insert fd->callback mapping"""
+ if self.alive:
+ # always handle serial port reads
+ read_map[self.serial] = self.handle_serial_read
+ error_map[self.serial] = self.handle_serial_error
+ # handle serial port writes if buffer is not empty
+ if self.buffer_net2ser:
+ write_map[self.serial] = self.handle_serial_write
+ # handle network
+ if self.socket is not None:
+ # handle socket if connected
+ # only read from network if the internal buffer is not
+ # already filled. the TCP flow control will hold back data
+ if len(self.buffer_net2ser) < 2048:
+ read_map[self.socket] = self.handle_socket_read
+ # only check for write readiness when there is data
+ if self.buffer_ser2net:
+ write_map[self.socket] = self.handle_socket_write
+ error_map[self.socket] = self.handle_socket_error
+ else:
+ # no connection, ensure clear buffer
+ self.buffer_ser2net = bytearray()
+ # check the server socket
+ read_map[self.server_socket] = self.handle_connect
+ error_map[self.server_socket] = self.handle_server_error
+
+ def handle_serial_read(self):
+ """Reading from serial port"""
+ try:
+ data = os.read(self.serial.fileno(), 1024)
+ if data:
+ # store data in buffer if there is a client connected
+ if self.socket is not None:
+ # escape outgoing data when needed (Telnet IAC (0xff) character)
+ if self.rfc2217:
+ data = serial.to_bytes(self.rfc2217.escape(data))
+ self.buffer_ser2net += data
+ else:
+ self.handle_serial_error()
+ except Exception as msg:
+ self.handle_serial_error(msg)
+
+ def handle_serial_write(self):
+ """Writing to serial port"""
+ try:
+ # write a chunk
+ n = os.write(self.serial.fileno(), bytes(self.buffer_net2ser))
+ # and see how large that chunk was, remove that from buffer
+ self.buffer_net2ser = self.buffer_net2ser[n:]
+ except Exception as msg:
+ self.handle_serial_error(msg)
+
+ def handle_serial_error(self, error=None):
+ """Serial port error"""
+ # terminate connection
+ self.close()
+
+ def handle_socket_read(self):
+ """Read from socket"""
+ try:
+ # read a chunk from the serial port
+ data = self.socket.recv(1024)
+ if data:
+ # Process RFC 2217 stuff when enabled
+ if self.rfc2217:
+ data = serial.to_bytes(self.rfc2217.filter(data))
+ # add data to buffer
+ self.buffer_net2ser += data
+ else:
+ # empty read indicates disconnection
+ self.handle_disconnect()
+ except socket.error:
+ self.handle_socket_error()
+
+ def handle_socket_write(self):
+ """Write to socket"""
+ try:
+ # write a chunk
+ count = self.socket.send(bytes(self.buffer_ser2net))
+ # and remove the sent data from the buffer
+ self.buffer_ser2net = self.buffer_ser2net[count:]
+ except socket.error:
+ self.handle_socket_error()
+
+ def handle_socket_error(self):
+ """Socket connection fails"""
+ self.handle_disconnect()
+
+ def handle_connect(self):
+ """Server socket gets a connection"""
+ # accept a connection in any case, close connection
+ # below if already busy
+ connection, addr = self.server_socket.accept()
+ if self.socket is None:
+ self.socket = connection
+ # More quickly detect bad clients who quit without closing the
+ # connection: After 1 second of idle, start sending TCP keep-alive
+ # packets every 1 second. If 3 consecutive keep-alive packets
+ # fail, assume the client is gone and close the connection.
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1)
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1)
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
+ self.socket.setblocking(0)
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ if self.log is not None:
+ self.log.warning('%s: Connected by %s:%s' % (self.device, addr[0], addr[1]))
+ self.serial.rts = True
+ self.serial.dtr = True
+ if self.log is not None:
+ self.rfc2217 = serial.rfc2217.PortManager(self.serial, self, logger=log.getChild(self.device))
+ else:
+ self.rfc2217 = serial.rfc2217.PortManager(self.serial, self)
+ else:
+ # reject connection if there is already one
+ connection.close()
+ if self.log is not None:
+ self.log.warning('%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1]))
+
+ def handle_server_error(self):
+ """Socket server fails"""
+ self.close()
+
+ def handle_disconnect(self):
+ """Socket gets disconnected"""
+ # signal disconnected terminal with control lines
+ try:
+ self.serial.rts = False
+ self.serial.dtr = False
+ finally:
+ # restore original port configuration in case it was changed
+ self.serial.apply_settings(self.serial_settings_backup)
+ # stop RFC 2217 state machine
+ self.rfc2217 = None
+ # clear send buffer
+ self.buffer_ser2net = bytearray()
+ # close network connection
+ if self.socket is not None:
+ self.socket.close()
+ self.socket = None
+ if self.log is not None:
+ self.log.warning('%s: Disconnected' % self.device)
+
+
+def test():
+ service = ZeroconfService(name="TestService", port=3000)
+ service.publish()
+ raw_input("Press any key to unpublish the service ")
+ service.unpublish()
+
+
+if __name__ == '__main__':
+ import logging
+ import argparse
+
+ VERBOSTIY = [
+ logging.ERROR, # 0
+ logging.WARNING, # 1 (default)
+ logging.INFO, # 2
+ logging.DEBUG, # 3
+ ]
+
+ parser = argparse.ArgumentParser(usage="""\
+%(prog)s [options]
+
+Announce the existence of devices using zeroconf and provide
+a TCP/IP <-> serial port gateway (implements RFC 2217).
+
+If running as daemon, write to syslog. Otherwise write to stdout.
+""",
+ epilog="""\
+NOTE: no security measures are implemented. Anyone can remotely connect
+to this service over the network.
+
+Only one connection at once, per port, is supported. When the connection is
+terminated, it waits for the next connect.
+""")
+
+ group = parser.add_argument_group("serial port settings")
+
+ group.add_argument(
+ "--ports-regex",
+ help="specify a regex to search against the serial devices and their descriptions (default: %(default)s)",
+ default='/dev/ttyUSB[0-9]+',
+ metavar="REGEX")
+
+ group = parser.add_argument_group("network settings")
+
+ group.add_argument(
+ "--tcp-port",
+ dest="base_port",
+ help="specify lowest TCP port number (default: %(default)s)",
+ default=7000,
+ type=int,
+ metavar="PORT")
+
+ group = parser.add_argument_group("daemon")
+
+ group.add_argument(
+ "-d", "--daemon",
+ dest="daemonize",
+ action="store_true",
+ help="start as daemon",
+ default=False)
+
+ group.add_argument(
+ "--pidfile",
+ help="specify a name for the PID file",
+ default=None,
+ metavar="FILE")
+
+ group = parser.add_argument_group("diagnostics")
+
+ group.add_argument(
+ "-o", "--logfile",
+ help="write messages file instead of stdout",
+ default=None,
+ metavar="FILE")
+
+ group.add_argument(
+ "-q", "--quiet",
+ dest="verbosity",
+ action="store_const",
+ const=0,
+ help="suppress most diagnostic messages",
+ default=1)
+
+ group.add_argument(
+ "-v", "--verbose",
+ dest="verbosity",
+ action="count",
+ help="increase diagnostic messages")
+
+
+ args = parser.parse_args()
+
+ # set up logging
+ logging.basicConfig(level=VERBOSTIY[min(args.verbosity, len(VERBOSTIY) - 1)])
+ log = logging.getLogger('port_publisher')
+
+ # redirect output if specified
+ if args.logfile is not None:
+ class WriteFlushed:
+ def __init__(self, fileobj):
+ self.fileobj = fileobj
+ def write(self, s):
+ self.fileobj.write(s)
+ self.fileobj.flush()
+ def close(self):
+ self.fileobj.close()
+ sys.stdout = sys.stderr = WriteFlushed(open(args.logfile, 'a'))
+ # atexit.register(lambda: sys.stdout.close())
+
+ if args.daemonize:
+ # if running as daemon is requested, do the fork magic
+ # args.quiet = True
+ # do the UNIX double-fork magic, see Stevens' "Advanced
+ # Programming in the UNIX Environment" for details (ISBN 0201563177)
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # exit first parent
+ sys.exit(0)
+ except OSError as e:
+ log.critical("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
+ sys.exit(1)
+
+ # decouple from parent environment
+ os.chdir("/") # don't prevent unmounting....
+ os.setsid()
+ os.umask(0)
+
+ # do second fork
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # exit from second parent, save eventual PID before
+ if args.pidfile is not None:
+ open(args.pidfile, 'w').write("%d" % pid)
+ sys.exit(0)
+ except OSError as e:
+ log.critical("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
+ sys.exit(1)
+
+ if args.logfile is None:
+ import syslog
+ syslog.openlog("serial port publisher")
+ # redirect output to syslog
+ class WriteToSysLog:
+ def __init__(self):
+ self.buffer = ''
+ def write(self, s):
+ self.buffer += s
+ if '\n' in self.buffer:
+ output, self.buffer = self.buffer.split('\n', 1)
+ syslog.syslog(output)
+ def flush(self):
+ syslog.syslog(self.buffer)
+ self.buffer = ''
+ def close(self):
+ self.flush()
+ sys.stdout = sys.stderr = WriteToSysLog()
+
+ # ensure the that the daemon runs a normal user, if run as root
+ #if os.getuid() == 0:
+ # name, passwd, uid, gid, desc, home, shell = pwd.getpwnam('someuser')
+ # os.setgid(gid) # set group first
+ # os.setuid(uid) # set user
+
+ # keep the published stuff in a dictionary
+ published = {}
+ # get a nice hostname
+ hostname = socket.gethostname()
+
+ def unpublish(forwarder):
+ """when forwarders die, we need to unregister them"""
+ try:
+ del published[forwarder.device]
+ except KeyError:
+ pass
+ else:
+ log.info("unpublish: %s" % (forwarder))
+
+ alive = True
+ next_check = 0
+ # main loop
+ while alive:
+ try:
+ # if it is time, check for serial port devices
+ now = time.time()
+ if now > next_check:
+ next_check = now + 5
+ connected = [d for d, p, i in serial.tools.list_ports.grep(args.ports_regex)]
+ # Handle devices that are published, but no longer connected
+ for device in set(published).difference(connected):
+ log.info("unpublish: %s" % (published[device]))
+ unpublish(published[device])
+ # Handle devices that are connected but not yet published
+ for device in set(connected).difference(published):
+ # Find the first available port, starting from 7000
+ port = args.base_port
+ ports_in_use = [f.network_port for f in published.values()]
+ while port in ports_in_use:
+ port += 1
+ published[device] = Forwarder(
+ device,
+ "%s on %s" % (device, hostname),
+ port,
+ on_close=unpublish,
+ log=log
+ )
+ log.warning("publish: %s" % (published[device]))
+ published[device].open()
+
+ # select_start = time.time()
+ read_map = {}
+ write_map = {}
+ error_map = {}
+ for publisher in published.values():
+ publisher.update_select_maps(read_map, write_map, error_map)
+ readers, writers, errors = select.select(
+ read_map.keys(),
+ write_map.keys(),
+ error_map.keys(),
+ 5
+ )
+ # select_end = time.time()
+ # print "select used %.3f s" % (select_end - select_start)
+ for reader in readers:
+ read_map[reader]()
+ for writer in writers:
+ write_map[writer]()
+ for error in errors:
+ error_map[error]()
+ # print "operation used %.3f s" % (time.time() - select_end)
+ except KeyboardInterrupt:
+ alive = False
+ sys.stdout.write('\n')
+ except SystemExit:
+ raise
+ except:
+ #~ raise
+ traceback.print_exc()
diff --git a/serial/tools/list_ports_linux.py b/serial/tools/list_ports_linux.py
index f0d570a..4d1d432 100644
--- a/serial/tools/list_ports_linux.py
+++ b/serial/tools/list_ports_linux.py
@@ -1,156 +1,156 @@
-#!/usr/bin/env python
-#
-# portable serial port access with python
-#
-# This is a module that gathers a list of serial ports including details on
-# GNU/Linux systems.
-# The comports function is expected to return an iterable that yields tuples of
-# 3 strings: port name, human readable description and a hardware ID.
-#
-# (C) 2011-2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier: BSD-3-Clause
-
-import glob
-import re
-import os
-
-
-def numsplit(text):
- """\
- Convert string into a list of texts and numbers in order to support a
- natural sorting.
- """
- result = []
- for group in re.split(r'(\d+)', text):
- if group:
- try:
- group = int(group)
- except ValueError:
- pass
- result.append(group)
- return result
-
-
-class SysFS(object):
- """Wrapper for easy sysfs access and device info"""
-
- def __init__(self, dev_path):
- self.dev = dev_path
- self.name = os.path.basename(self.dev)
- self.subsystem = None
- self.device_path = None
- self.usb_device_path = None
- if os.path.exists('/sys/class/tty/%s/device' % (self.name,)):
- self.device_path = os.path.realpath('/sys/class/tty/%s/device' % (self.name,))
- self.subsystem = os.path.basename(os.path.realpath(os.path.join(self.device_path, 'subsystem')))
- if self.subsystem == 'usb-serial':
- self.usb_device_path = os.path.dirname(os.path.dirname(self.device_path))
- elif self.subsystem == 'usb':
- self.usb_device_path = os.path.dirname(self.device_path)
- else:
- self.usb_device_path = None
- #~ print repr(self.__dict__)
- if self.usb_device_path is not None:
- self.vid = self.read_line(self.usb_device_path, 'idVendor').upper()
- self.pid = self.read_line(self.usb_device_path, 'idProduct').upper()
- self.serial = self.read_line(self.usb_device_path, 'serial')
- self.location = os.path.basename(self.usb_device_path)
- else:
- self.vid = None
- self.pid = None
- self.serial = None
- self.location = None
-
- def read_line(self, *args):
- """\
- Helper function to read a single line from a file.
- One or more parameters are allowed, they are joined with os.path.join.
- Returns None on errors..
- """
- try:
- with open(os.path.join(*args)) as f:
- line = f.readline().strip()
- return line
- except IOError:
- return None
-
- def describe(self):
- if self.subsystem == 'usb-serial':
- return self.read_line(self.usb_device_path, 'product')
- elif self.subsystem == 'usb': # CDC/ACM devices
- interface = self.read_line(self.device_path, 'interface')
- if interface is not None:
- return interface
- else:
- return self.read_line(self.usb_device_path, 'product')
- elif self.subsystem == 'pnp': # PCI based devices
- return self.name
- else:
- return 'n/a'
-
- def describe_full(self):
- """Get a human readable string"""
- if self.subsystem == 'usb-serial':
- return '{} - {}'.format(
- self.read_line(self.usb_device_path, 'manufacturer'),
- self.read_line(self.usb_device_path, 'product'),
- )
- elif self.subsystem == 'usb': # CDC/ACM devices
- interface = self.read_line(self.device_path, 'interface')
- return '{} - {}{}'.format(
- self.read_line(self.usb_device_path, 'manufacturer'),
- self.read_line(self.usb_device_path, 'product'),
- ' - {}'.format(interface) if interface is not None else '',
- )
- elif self.subsystem == 'pnp': # PCI based devices
- return self.name
- else:
- return 'n/a'
-
- def hwinfo(self):
- """Get a hardware description string"""
- if self.subsystem in ('usb', 'usb-serial'):
- return 'USB VID:PID={}:{}{}{}'.format(
- self.vid,
- self.pid,
- ' SER={}'.format(self.serial) if self.serial is not None else '',
- ' LOCATION={}'.format(self.location) if self.location is not None else '',
- )
- elif self.subsystem == 'pnp': # PCI based devices
- return self.read_line(self.device_path, 'id')
- else:
- return 'n/a'
-
- def __eq__(self, other):
- return self.dev == other.dev
-
- def __lt__(self, other):
- return numsplit(self.dev) < numsplit(other.dev)
-
- def __getitem__(self, index):
- """Item access: backwards compatible -> (port, desc, hwid)"""
- if index == 0:
- return self.dev
- elif index == 1:
- return self.describe()
- elif index == 2:
- return self.hwinfo()
- else:
- raise IndexError('{} > 2'.format(index))
-
-
-def comports():
- devices = glob.glob('/dev/ttyS*') # built-in serial ports
- devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver
- devices.extend(glob.glob('/dev/ttyACM*')) # usb-serial with CDC-ACM profile
- devices.extend(glob.glob('/dev/rfcomm*')) # BT serial devices
- return [info
- for info in [SysFS(d) for d in devices]
- if info.subsystem != "platform"] # hide non-present internal serial ports
-
-# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-# test
-if __name__ == '__main__':
- for port, desc, hwid in sorted(comports()):
- print("%s: %s [%s]" % (port, desc, hwid))
+#!/usr/bin/env python
+#
+# portable serial port access with python
+#
+# This is a module that gathers a list of serial ports including details on
+# GNU/Linux systems.
+# The comports function is expected to return an iterable that yields tuples of
+# 3 strings: port name, human readable description and a hardware ID.
+#
+# (C) 2011-2015 Chris Liechti <cliechti@gmx.net>
+#
+# SPDX-License-Identifier: BSD-3-Clause
+
+import glob
+import re
+import os
+
+
+def numsplit(text):
+ """\
+ Convert string into a list of texts and numbers in order to support a
+ natural sorting.
+ """
+ result = []
+ for group in re.split(r'(\d+)', text):
+ if group:
+ try:
+ group = int(group)
+ except ValueError:
+ pass
+ result.append(group)
+ return result
+
+
+class SysFS(object):
+ """Wrapper for easy sysfs access and device info"""
+
+ def __init__(self, dev_path):
+ self.dev = dev_path
+ self.name = os.path.basename(self.dev)
+ self.subsystem = None
+ self.device_path = None
+ self.usb_device_path = None
+ if os.path.exists('/sys/class/tty/%s/device' % (self.name,)):
+ self.device_path = os.path.realpath('/sys/class/tty/%s/device' % (self.name,))
+ self.subsystem = os.path.basename(os.path.realpath(os.path.join(self.device_path, 'subsystem')))
+ if self.subsystem == 'usb-serial':
+ self.usb_device_path = os.path.dirname(os.path.dirname(self.device_path))
+ elif self.subsystem == 'usb':
+ self.usb_device_path = os.path.dirname(self.device_path)
+ else:
+ self.usb_device_path = None
+ #~ print repr(self.__dict__)
+ if self.usb_device_path is not None:
+ self.vid = self.read_line(self.usb_device_path, 'idVendor').upper()
+ self.pid = self.read_line(self.usb_device_path, 'idProduct').upper()
+ self.serial = self.read_line(self.usb_device_path, 'serial')
+ self.location = os.path.basename(self.usb_device_path)
+ else:
+ self.vid = None
+ self.pid = None
+ self.serial = None
+ self.location = None
+
+ def read_line(self, *args):
+ """\
+ Helper function to read a single line from a file.
+ One or more parameters are allowed, they are joined with os.path.join.
+ Returns None on errors..
+ """
+ try:
+ with open(os.path.join(*args)) as f:
+ line = f.readline().strip()
+ return line
+ except IOError:
+ return None
+
+ def describe(self):
+ if self.subsystem == 'usb-serial':
+ return self.read_line(self.usb_device_path, 'product')
+ elif self.subsystem == 'usb': # CDC/ACM devices
+ interface = self.read_line(self.device_path, 'interface')
+ if interface is not None:
+ return interface
+ else:
+ return self.read_line(self.usb_device_path, 'product')
+ elif self.subsystem == 'pnp': # PCI based devices
+ return self.name
+ else:
+ return 'n/a'
+
+ def describe_full(self):
+ """Get a human readable string"""
+ if self.subsystem == 'usb-serial':
+ return '{} - {}'.format(
+ self.read_line(self.usb_device_path, 'manufacturer'),
+ self.read_line(self.usb_device_path, 'product'),
+ )
+ elif self.subsystem == 'usb': # CDC/ACM devices
+ interface = self.read_line(self.device_path, 'interface')
+ return '{} - {}{}'.format(
+ self.read_line(self.usb_device_path, 'manufacturer'),
+ self.read_line(self.usb_device_path, 'product'),
+ ' - {}'.format(interface) if interface is not None else '',
+ )
+ elif self.subsystem == 'pnp': # PCI based devices
+ return self.name
+ else:
+ return 'n/a'
+
+ def hwinfo(self):
+ """Get a hardware description string"""
+ if self.subsystem in ('usb', 'usb-serial'):
+ return 'USB VID:PID={}:{}{}{}'.format(
+ self.vid,
+ self.pid,
+ ' SER={}'.format(self.serial) if self.serial is not None else '',
+ ' LOCATION={}'.format(self.location) if self.location is not None else '',
+ )
+ elif self.subsystem == 'pnp': # PCI based devices
+ return self.read_line(self.device_path, 'id')
+ else:
+ return 'n/a'
+
+ def __eq__(self, other):
+ return self.dev == other.dev
+
+ def __lt__(self, other):
+ return numsplit(self.dev) < numsplit(other.dev)
+
+ def __getitem__(self, index):
+ """Item access: backwards compatible -> (port, desc, hwid)"""
+ if index == 0:
+ return self.dev
+ elif index == 1:
+ return self.describe()
+ elif index == 2:
+ return self.hwinfo()
+ else:
+ raise IndexError('{} > 2'.format(index))
+
+
+def comports():
+ devices = glob.glob('/dev/ttyS*') # built-in serial ports
+ devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver
+ devices.extend(glob.glob('/dev/ttyACM*')) # usb-serial with CDC-ACM profile
+ devices.extend(glob.glob('/dev/rfcomm*')) # BT serial devices
+ return [info
+ for info in [SysFS(d) for d in devices]
+ if info.subsystem != "platform"] # hide non-present internal serial ports
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# test
+if __name__ == '__main__':
+ for port, desc, hwid in sorted(comports()):
+ print("%s: %s [%s]" % (port, desc, hwid))
diff --git a/serial/tools/list_ports_posix.py b/serial/tools/list_ports_posix.py
index 9b467c4..0495971 100644
--- a/serial/tools/list_ports_posix.py
+++ b/serial/tools/list_ports_posix.py
@@ -1,106 +1,106 @@
-#!/usr/bin/env python
-
-# portable serial port access with python
-
-# This is a module that gathers a list of serial ports on POSIXy systems.
-# For some specific implementations, see also list_ports_linux, list_ports_osx
-#
-# this is a wrapper module for different platform implementations of the
-# port enumeration feature
-#
-# (C) 2011-2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier: BSD-3-Clause
-
-"""\
-The ``comports`` function is expected to return an iterable that yields tuples
-of 3 strings: port name, human readable description and a hardware ID.
-
-As currently no method is known to get the second two strings easily, they are
-currently just identical to the port name.
-"""
-
-import glob
-import sys
-import os
-
-# try to detect the OS so that a device can be selected...
-plat = sys.platform.lower()
-
-if plat[:5] == 'linux': # Linux (confirmed)
- from serial.tools.list_ports_linux import comports
-
-elif plat == 'cygwin': # cygwin/win32
- # cygwin accepts /dev/com* in many contexts
- # (such as 'open' call, explicit 'ls'), but 'glob.glob'
- # and bare 'ls' do not; so use /dev/ttyS* instead
- def comports():
- devices = glob.glob('/dev/ttyS*')
- return [(d, d, d) for d in devices]
-
-elif plat[:7] == 'openbsd': # OpenBSD
- def comports():
- devices = glob.glob('/dev/cua*')
- return [(d, d, d) for d in devices]
-
-elif plat[:3] == 'bsd' or \
- plat[:7] == 'freebsd':
-
- def comports():
- devices = glob.glob('/dev/cua*[!.init][!.lock]')
- return [(d, d, d) for d in devices]
-
-elif plat[:6] == 'darwin': # OS X (confirmed)
- from serial.tools.list_ports_osx import comports
-
-elif plat[:6] == 'netbsd': # NetBSD
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/dty*')
- return [(d, d, d) for d in devices]
-
-elif plat[:4] == 'irix': # IRIX
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/ttyf*')
- return [(d, d, d) for d in devices]
-
-elif plat[:2] == 'hp': # HP-UX (not tested)
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/tty*p0')
- return [(d, d, d) for d in devices]
-
-elif plat[:5] == 'sunos': # Solaris/SunOS
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/tty*c')
- return [(d, d, d) for d in devices]
-
-elif plat[:3] == 'aix': # AIX
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/tty*')
- return [(d, d, d) for d in devices]
-
-else:
- # platform detection has failed...
- import serial
- sys.stderr.write("""\
-don't know how to enumerate ttys on this system.
-! I you know how the serial ports are named send this information to
-! the author of this module:
-
-sys.platform = %r
-os.name = %r
-pySerial version = %s
-
-also add the naming scheme of the serial ports and with a bit luck you can get
-this module running...
-""" % (sys.platform, os.name, serial.VERSION))
- raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
-
-# test
-if __name__ == '__main__':
- for port, desc, hwid in sorted(comports()):
- print("%s: %s [%s]" % (port, desc, hwid))
+#!/usr/bin/env python
+
+# portable serial port access with python
+
+# This is a module that gathers a list of serial ports on POSIXy systems.
+# For some specific implementations, see also list_ports_linux, list_ports_osx
+#
+# this is a wrapper module for different platform implementations of the
+# port enumeration feature
+#
+# (C) 2011-2015 Chris Liechti <cliechti@gmx.net>
+#
+# SPDX-License-Identifier: BSD-3-Clause
+
+"""\
+The ``comports`` function is expected to return an iterable that yields tuples
+of 3 strings: port name, human readable description and a hardware ID.
+
+As currently no method is known to get the second two strings easily, they are
+currently just identical to the port name.
+"""
+
+import glob
+import sys
+import os
+
+# try to detect the OS so that a device can be selected...
+plat = sys.platform.lower()
+
+if plat[:5] == 'linux': # Linux (confirmed)
+ from serial.tools.list_ports_linux import comports
+
+elif plat == 'cygwin': # cygwin/win32
+ # cygwin accepts /dev/com* in many contexts
+ # (such as 'open' call, explicit 'ls'), but 'glob.glob'
+ # and bare 'ls' do not; so use /dev/ttyS* instead
+ def comports():
+ devices = glob.glob('/dev/ttyS*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:7] == 'openbsd': # OpenBSD
+ def comports():
+ devices = glob.glob('/dev/cua*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:3] == 'bsd' or \
+ plat[:7] == 'freebsd':
+
+ def comports():
+ devices = glob.glob('/dev/cua*[!.init][!.lock]')
+ return [(d, d, d) for d in devices]
+
+elif plat[:6] == 'darwin': # OS X (confirmed)
+ from serial.tools.list_ports_osx import comports
+
+elif plat[:6] == 'netbsd': # NetBSD
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/dty*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:4] == 'irix': # IRIX
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/ttyf*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:2] == 'hp': # HP-UX (not tested)
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/tty*p0')
+ return [(d, d, d) for d in devices]
+
+elif plat[:5] == 'sunos': # Solaris/SunOS
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/tty*c')
+ return [(d, d, d) for d in devices]
+
+elif plat[:3] == 'aix': # AIX
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/tty*')
+ return [(d, d, d) for d in devices]
+
+else:
+ # platform detection has failed...
+ import serial
+ sys.stderr.write("""\
+don't know how to enumerate ttys on this system.
+! I you know how the serial ports are named send this information to
+! the author of this module:
+
+sys.platform = %r
+os.name = %r
+pySerial version = %s
+
+also add the naming scheme of the serial ports and with a bit luck you can get
+this module running...
+""" % (sys.platform, os.name, serial.VERSION))
+ raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
+
+# test
+if __name__ == '__main__':
+ for port, desc, hwid in sorted(comports()):
+ print("%s: %s [%s]" % (port, desc, hwid))
diff --git a/serial/tools/list_ports_windows.py b/serial/tools/list_ports_windows.py
index e39b086..837ff57 100644
--- a/serial/tools/list_ports_windows.py
+++ b/serial/tools/list_ports_windows.py
@@ -1,344 +1,344 @@
-#! python
-#
-# Enumerate serial ports on Windows including a human readable description
-# and hardware information.
-#
-# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier: BSD-3-Clause
-
-import re
-import ctypes
-from ctypes.wintypes import BOOL
-from ctypes.wintypes import HWND
-from ctypes.wintypes import DWORD
-from ctypes.wintypes import WORD
-from ctypes.wintypes import LONG
-from ctypes.wintypes import ULONG
-from ctypes.wintypes import LPCSTR
-from ctypes.wintypes import HKEY
-from ctypes.wintypes import BYTE
-import serial
-from serial.win32 import ULONG_PTR
-
-
-def numsplit(text):
- """\
- Convert string into a list of texts and numbers in order to support a
- natural sorting.
- """
- result = []
- for group in re.split(r'(\d+)', text):
- if group:
- try:
- group = int(group)
- except ValueError:
- pass
- result.append(group)
- return result
-
-
-def ValidHandle(value, func, arguments):
- if value == 0:
- raise ctypes.WinError()
- return value
-
-NULL = 0
-HDEVINFO = ctypes.c_void_p
-PCTSTR = ctypes.c_char_p
-PTSTR = ctypes.c_void_p
-CHAR = ctypes.c_char
-LPDWORD = PDWORD = ctypes.POINTER(DWORD)
-#~ LPBYTE = PBYTE = ctypes.POINTER(BYTE)
-LPBYTE = PBYTE = ctypes.c_void_p # XXX avoids error about types
-
-ACCESS_MASK = DWORD
-REGSAM = ACCESS_MASK
-
-
-def byte_buffer(length):
- """Get a buffer for a string"""
- return (BYTE*length)()
-
-
-def string(buffer):
- s = []
- for c in buffer:
- if c == 0:
- break
- s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
- return ''.join(s)
-
-
-class GUID(ctypes.Structure):
- _fields_ = [
- ('Data1', DWORD),
- ('Data2', WORD),
- ('Data3', WORD),
- ('Data4', BYTE*8),
- ]
-
- def __str__(self):
- return "{%08x-%04x-%04x-%s-%s}" % (
- self.Data1,
- self.Data2,
- self.Data3,
- ''.join(["%02x" % d for d in self.Data4[:2]]),
- ''.join(["%02x" % d for d in self.Data4[2:]]),
- )
-
-
-class SP_DEVINFO_DATA(ctypes.Structure):
- _fields_ = [
- ('cbSize', DWORD),
- ('ClassGuid', GUID),
- ('DevInst', DWORD),
- ('Reserved', ULONG_PTR),
- ]
-
- def __str__(self):
- return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
-
-PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
-
-PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
-
-setupapi = ctypes.windll.LoadLibrary("setupapi")
-SetupDiDestroyDeviceInfoList = setupapi.SetupDiDestroyDeviceInfoList
-SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
-SetupDiDestroyDeviceInfoList.restype = BOOL
-
-SetupDiClassGuidsFromName = setupapi.SetupDiClassGuidsFromNameA
-SetupDiClassGuidsFromName.argtypes = [PCTSTR, ctypes.POINTER(GUID), DWORD, PDWORD]
-SetupDiClassGuidsFromName.restype = BOOL
-
-SetupDiEnumDeviceInfo = setupapi.SetupDiEnumDeviceInfo
-SetupDiEnumDeviceInfo.argtypes = [HDEVINFO, DWORD, PSP_DEVINFO_DATA]
-SetupDiEnumDeviceInfo.restype = BOOL
-
-SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsA
-SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
-SetupDiGetClassDevs.restype = HDEVINFO
-SetupDiGetClassDevs.errcheck = ValidHandle
-
-SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyA
-SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
-SetupDiGetDeviceRegistryProperty.restype = BOOL
-
-SetupDiGetDeviceInstanceId = setupapi.SetupDiGetDeviceInstanceIdA
-SetupDiGetDeviceInstanceId.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, PTSTR, DWORD, PDWORD]
-SetupDiGetDeviceInstanceId.restype = BOOL
-
-SetupDiOpenDevRegKey = setupapi.SetupDiOpenDevRegKey
-SetupDiOpenDevRegKey.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, DWORD, DWORD, REGSAM]
-SetupDiOpenDevRegKey.restype = HKEY
-
-advapi32 = ctypes.windll.LoadLibrary("Advapi32")
-RegCloseKey = advapi32.RegCloseKey
-RegCloseKey.argtypes = [HKEY]
-RegCloseKey.restype = LONG
-
-RegQueryValueEx = advapi32.RegQueryValueExA
-RegQueryValueEx.argtypes = [HKEY, LPCSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD]
-RegQueryValueEx.restype = LONG
-
-
-DIGCF_PRESENT = 2
-DIGCF_DEVICEINTERFACE = 16
-INVALID_HANDLE_VALUE = 0
-ERROR_INSUFFICIENT_BUFFER = 122
-SPDRP_HARDWAREID = 1
-SPDRP_FRIENDLYNAME = 12
-SPDRP_LOCATION_PATHS = 35
-DICS_FLAG_GLOBAL = 1
-DIREG_DEV = 0x00000001
-KEY_READ = 0x20019
-
-# workaround for compatibility between Python 2.x and 3.x
-Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports"
-PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
-
-
-class WinInfo(object):
- """Wrapper for device info"""
-
- def __init__(self, dev):
- self.dev = dev
- self.description = None
- self.hwid = None
- self.vid = None
- self.pid = None
- self.serial = None
- self.location = None
-
- def describe(self):
- """Get a human readable string"""
- return self.description if self.description is not None else 'n/a'
-
- def hwinfo(self):
- """Get a hardware description string"""
- if self.vid is not None:
- return 'USB VID:PID={}:{}{}{}'.format(
- self.vid,
- self.pid,
- ' SER={}'.format(self.serial) if self.serial is not None else '',
- ' LOCATION={}'.format(self.location) if self.location is not None else '',
- )
- else:
- return self.hwid
-
- def __eq__(self, other):
- return self.dev == other.dev
-
- def __lt__(self, other):
- return numsplit(self.dev) < numsplit(other.dev)
-
- def __getitem__(self, index):
- """Item access: backwards compatible -> (port, desc, hwid)"""
- if index == 0:
- return self.dev
- elif index == 1:
- return self.describe()
- elif index == 2:
- return self.hwinfo()
- else:
- raise IndexError('{} > 2'.format(index))
-
-
-def comports():
- GUIDs = (GUID*8)() # so far only seen one used, so hope 8 are enough...
- guids_size = DWORD()
- if not SetupDiClassGuidsFromName(
- Ports,
- GUIDs,
- ctypes.sizeof(GUIDs),
- ctypes.byref(guids_size)):
- raise ctypes.WinError()
-
- # repeat for all possible GUIDs
- for index in range(guids_size.value):
- g_hdi = SetupDiGetClassDevs(
- ctypes.byref(GUIDs[index]),
- None,
- NULL,
- DIGCF_PRESENT) # was DIGCF_PRESENT|DIGCF_DEVICEINTERFACE which misses CDC ports
-
- devinfo = SP_DEVINFO_DATA()
- devinfo.cbSize = ctypes.sizeof(devinfo)
- index = 0
- while SetupDiEnumDeviceInfo(g_hdi, index, ctypes.byref(devinfo)):
- index += 1
-
- # get the real com port name
- hkey = SetupDiOpenDevRegKey(
- g_hdi,
- ctypes.byref(devinfo),
- DICS_FLAG_GLOBAL,
- 0,
- DIREG_DEV, # DIREG_DRV for SW info
- KEY_READ)
- port_name_buffer = byte_buffer(250)
- port_name_length = ULONG(ctypes.sizeof(port_name_buffer))
- RegQueryValueEx(
- hkey,
- PortName,
- None,
- None,
- ctypes.byref(port_name_buffer),
- ctypes.byref(port_name_length))
- RegCloseKey(hkey)
-
- # unfortunately does this method also include parallel ports.
- # we could check for names starting with COM or just exclude LPT
- # and hope that other "unknown" names are serial ports...
- if string(port_name_buffer).startswith('LPT'):
- continue
-
- # hardware ID
- szHardwareID = byte_buffer(250)
- # try to get ID that includes serial number
- if not SetupDiGetDeviceInstanceId(
- g_hdi,
- ctypes.byref(devinfo),
- ctypes.byref(szHardwareID),
- ctypes.sizeof(szHardwareID) - 1,
- None):
- # fall back to more generic hardware ID if that would fail
- if not SetupDiGetDeviceRegistryProperty(
- g_hdi,
- ctypes.byref(devinfo),
- SPDRP_HARDWAREID,
- None,
- ctypes.byref(szHardwareID),
- ctypes.sizeof(szHardwareID) - 1,
- None):
- # Ignore ERROR_INSUFFICIENT_BUFFER
- if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
- raise ctypes.WinError()
- # stringify
- szHardwareID_str = string(szHardwareID)
-
- info = WinInfo(string(port_name_buffer))
- info.hwid = szHardwareID_str
-
- # in case of USB, make a more readable string, similar to that form
- # that we also generate on other platforms
- if szHardwareID_str.startswith('USB'):
- m = re.search(r'VID_([0-9a-f]{4})&PID_([0-9a-f]{4})(\\(\w+))?', szHardwareID_str, re.I)
- if m:
- info.vid = m.group(1)
- info.pid = m.group(2)
- if m.group(4):
- info.serial = m.group(4)
- # calculate a location string
- # XXX was empty in tests with (internal) USB3 hub :(
- loc_path_str = byte_buffer(250)
- if SetupDiGetDeviceRegistryProperty(
- g_hdi,
- ctypes.byref(devinfo),
- SPDRP_LOCATION_PATHS,
- None,
- ctypes.byref(loc_path_str),
- ctypes.sizeof(loc_path_str) - 1,
- None):
- #~ print (string(loc_path_str))
- m = re.finditer(r'USBROOT\((\w+)\)|#USB\((\w+)\)', string(loc_path_str))
- location = []
- for g in m:
- if g.group(1):
- location.append('%d' % (int(g.group(1)) + 1))
- else:
- if len(location) > 1:
- location.append('.')
- else:
- location.append('-')
- location.append(g.group(2))
- if location:
- info.location = ''.join(location)
-
- # friendly name
- szFriendlyName = byte_buffer(250)
- if SetupDiGetDeviceRegistryProperty(
- g_hdi,
- ctypes.byref(devinfo),
- SPDRP_FRIENDLYNAME,
- #~ SPDRP_DEVICEDESC,
- None,
- ctypes.byref(szFriendlyName),
- ctypes.sizeof(szFriendlyName) - 1,
- None):
- info.description = string(szFriendlyName)
- #~ else:
- # Ignore ERROR_INSUFFICIENT_BUFFER
- #~ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
- #~ raise IOError("failed to get details for %s (%s)" % (devinfo, szHardwareID.value))
- # ignore errors and still include the port in the list, friendly name will be same as port name
- yield info
- SetupDiDestroyDeviceInfoList(g_hdi)
-
-
-# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-# test
-if __name__ == '__main__':
- for port, desc, hwid in sorted(comports()):
- print("%s: %s [%s]" % (port, desc, hwid))
+#! python
+#
+# Enumerate serial ports on Windows including a human readable description
+# and hardware information.
+#
+# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
+#
+# SPDX-License-Identifier: BSD-3-Clause
+
+import re
+import ctypes
+from ctypes.wintypes import BOOL
+from ctypes.wintypes import HWND
+from ctypes.wintypes import DWORD
+from ctypes.wintypes import WORD
+from ctypes.wintypes import LONG
+from ctypes.wintypes import ULONG
+from ctypes.wintypes import LPCSTR
+from ctypes.wintypes import HKEY
+from ctypes.wintypes import BYTE
+import serial
+from serial.win32 import ULONG_PTR
+
+
+def numsplit(text):
+ """\
+ Convert string into a list of texts and numbers in order to support a
+ natural sorting.
+ """
+ result = []
+ for group in re.split(r'(\d+)', text):
+ if group:
+ try:
+ group = int(group)
+ except ValueError:
+ pass
+ result.append(group)
+ return result
+
+
+def ValidHandle(value, func, arguments):
+ if value == 0:
+ raise ctypes.WinError()
+ return value
+
+NULL = 0
+HDEVINFO = ctypes.c_void_p
+PCTSTR = ctypes.c_char_p
+PTSTR = ctypes.c_void_p
+CHAR = ctypes.c_char
+LPDWORD = PDWORD = ctypes.POINTER(DWORD)
+#~ LPBYTE = PBYTE = ctypes.POINTER(BYTE)
+LPBYTE = PBYTE = ctypes.c_void_p # XXX avoids error about types
+
+ACCESS_MASK = DWORD
+REGSAM = ACCESS_MASK
+
+
+def byte_buffer(length):
+ """Get a buffer for a string"""
+ return (BYTE*length)()
+
+
+def string(buffer):
+ s = []
+ for c in buffer:
+ if c == 0:
+ break
+ s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
+ return ''.join(s)
+
+
+class GUID(ctypes.Structure):
+ _fields_ = [
+ ('Data1', DWORD),
+ ('Data2', WORD),
+ ('Data3', WORD),
+ ('Data4', BYTE*8),
+ ]
+
+ def __str__(self):
+ return "{%08x-%04x-%04x-%s-%s}" % (
+ self.Data1,
+ self.Data2,
+ self.Data3,
+ ''.join(["%02x" % d for d in self.Data4[:2]]),
+ ''.join(["%02x" % d for d in self.Data4[2:]]),
+ )
+
+
+class SP_DEVINFO_DATA(ctypes.Structure):
+ _fields_ = [
+ ('cbSize', DWORD),
+ ('ClassGuid', GUID),
+ ('DevInst', DWORD),
+ ('Reserved', ULONG_PTR),
+ ]
+
+ def __str__(self):
+ return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
+
+PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
+
+PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
+
+setupapi = ctypes.windll.LoadLibrary("setupapi")
+SetupDiDestroyDeviceInfoList = setupapi.SetupDiDestroyDeviceInfoList
+SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
+SetupDiDestroyDeviceInfoList.restype = BOOL
+
+SetupDiClassGuidsFromName = setupapi.SetupDiClassGuidsFromNameA
+SetupDiClassGuidsFromName.argtypes = [PCTSTR, ctypes.POINTER(GUID), DWORD, PDWORD]
+SetupDiClassGuidsFromName.restype = BOOL
+
+SetupDiEnumDeviceInfo = setupapi.SetupDiEnumDeviceInfo
+SetupDiEnumDeviceInfo.argtypes = [HDEVINFO, DWORD, PSP_DEVINFO_DATA]
+SetupDiEnumDeviceInfo.restype = BOOL
+
+SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsA
+SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
+SetupDiGetClassDevs.restype = HDEVINFO
+SetupDiGetClassDevs.errcheck = ValidHandle
+
+SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyA
+SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
+SetupDiGetDeviceRegistryProperty.restype = BOOL
+
+SetupDiGetDeviceInstanceId = setupapi.SetupDiGetDeviceInstanceIdA
+SetupDiGetDeviceInstanceId.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, PTSTR, DWORD, PDWORD]
+SetupDiGetDeviceInstanceId.restype = BOOL
+
+SetupDiOpenDevRegKey = setupapi.SetupDiOpenDevRegKey
+SetupDiOpenDevRegKey.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, DWORD, DWORD, REGSAM]
+SetupDiOpenDevRegKey.restype = HKEY
+
+advapi32 = ctypes.windll.LoadLibrary("Advapi32")
+RegCloseKey = advapi32.RegCloseKey
+RegCloseKey.argtypes = [HKEY]
+RegCloseKey.restype = LONG
+
+RegQueryValueEx = advapi32.RegQueryValueExA
+RegQueryValueEx.argtypes = [HKEY, LPCSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD]
+RegQueryValueEx.restype = LONG
+
+
+DIGCF_PRESENT = 2
+DIGCF_DEVICEINTERFACE = 16
+INVALID_HANDLE_VALUE = 0
+ERROR_INSUFFICIENT_BUFFER = 122
+SPDRP_HARDWAREID = 1
+SPDRP_FRIENDLYNAME = 12
+SPDRP_LOCATION_PATHS = 35
+DICS_FLAG_GLOBAL = 1
+DIREG_DEV = 0x00000001
+KEY_READ = 0x20019
+
+# workaround for compatibility between Python 2.x and 3.x
+Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports"
+PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
+
+
+class WinInfo(object):
+ """Wrapper for device info"""
+
+ def __init__(self, dev):
+ self.dev = dev
+ self.description = None
+ self.hwid = None
+ self.vid = None
+ self.pid = None
+ self.serial = None
+ self.location = None
+
+ def describe(self):
+ """Get a human readable string"""
+ return self.description if self.description is not None else 'n/a'
+
+ def hwinfo(self):
+ """Get a hardware description string"""
+ if self.vid is not None:
+ return 'USB VID:PID={}:{}{}{}'.format(
+ self.vid,
+ self.pid,
+ ' SER={}'.format(self.serial) if self.serial is not None else '',
+ ' LOCATION={}'.format(self.location) if self.location is not None else '',
+ )
+ else:
+ return self.hwid
+
+ def __eq__(self, other):
+ return self.dev == other.dev
+
+ def __lt__(self, other):
+ return numsplit(self.dev) < numsplit(other.dev)
+
+ def __getitem__(self, index):
+ """Item access: backwards compatible -> (port, desc, hwid)"""
+ if index == 0:
+ return self.dev
+ elif index == 1:
+ return self.describe()
+ elif index == 2:
+ return self.hwinfo()
+ else:
+ raise IndexError('{} > 2'.format(index))
+
+
+def comports():
+ GUIDs = (GUID*8)() # so far only seen one used, so hope 8 are enough...
+ guids_size = DWORD()
+ if not SetupDiClassGuidsFromName(
+ Ports,
+ GUIDs,
+ ctypes.sizeof(GUIDs),
+ ctypes.byref(guids_size)):
+ raise ctypes.WinError()
+
+ # repeat for all possible GUIDs
+ for index in range(guids_size.value):
+ g_hdi = SetupDiGetClassDevs(
+ ctypes.byref(GUIDs[index]),
+ None,
+ NULL,
+ DIGCF_PRESENT) # was DIGCF_PRESENT|DIGCF_DEVICEINTERFACE which misses CDC ports
+
+ devinfo = SP_DEVINFO_DATA()
+ devinfo.cbSize = ctypes.sizeof(devinfo)
+ index = 0
+ while SetupDiEnumDeviceInfo(g_hdi, index, ctypes.byref(devinfo)):
+ index += 1
+
+ # get the real com port name
+ hkey = SetupDiOpenDevRegKey(
+ g_hdi,
+ ctypes.byref(devinfo),
+ DICS_FLAG_GLOBAL,
+ 0,
+ DIREG_DEV, # DIREG_DRV for SW info
+ KEY_READ)
+ port_name_buffer = byte_buffer(250)
+ port_name_length = ULONG(ctypes.sizeof(port_name_buffer))
+ RegQueryValueEx(
+ hkey,
+ PortName,
+ None,
+ None,
+ ctypes.byref(port_name_buffer),
+ ctypes.byref(port_name_length))
+ RegCloseKey(hkey)
+
+ # unfortunately does this method also include parallel ports.
+ # we could check for names starting with COM or just exclude LPT
+ # and hope that other "unknown" names are serial ports...
+ if string(port_name_buffer).startswith('LPT'):
+ continue
+
+ # hardware ID
+ szHardwareID = byte_buffer(250)
+ # try to get ID that includes serial number
+ if not SetupDiGetDeviceInstanceId(
+ g_hdi,
+ ctypes.byref(devinfo),
+ ctypes.byref(szHardwareID),
+ ctypes.sizeof(szHardwareID) - 1,
+ None):
+ # fall back to more generic hardware ID if that would fail
+ if not SetupDiGetDeviceRegistryProperty(
+ g_hdi,
+ ctypes.byref(devinfo),
+ SPDRP_HARDWAREID,
+ None,
+ ctypes.byref(szHardwareID),
+ ctypes.sizeof(szHardwareID) - 1,
+ None):
+ # Ignore ERROR_INSUFFICIENT_BUFFER
+ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
+ raise ctypes.WinError()
+ # stringify
+ szHardwareID_str = string(szHardwareID)
+
+ info = WinInfo(string(port_name_buffer))
+ info.hwid = szHardwareID_str
+
+ # in case of USB, make a more readable string, similar to that form
+ # that we also generate on other platforms
+ if szHardwareID_str.startswith('USB'):
+ m = re.search(r'VID_([0-9a-f]{4})&PID_([0-9a-f]{4})(\\(\w+))?', szHardwareID_str, re.I)
+ if m:
+ info.vid = m.group(1)
+ info.pid = m.group(2)
+ if m.group(4):
+ info.serial = m.group(4)
+ # calculate a location string
+ # XXX was empty in tests with (internal) USB3 hub :(
+ loc_path_str = byte_buffer(250)
+ if SetupDiGetDeviceRegistryProperty(
+ g_hdi,
+ ctypes.byref(devinfo),
+ SPDRP_LOCATION_PATHS,
+ None,
+ ctypes.byref(loc_path_str),
+ ctypes.sizeof(loc_path_str) - 1,
+ None):
+ #~ print (string(loc_path_str))
+ m = re.finditer(r'USBROOT\((\w+)\)|#USB\((\w+)\)', string(loc_path_str))
+ location = []
+ for g in m:
+ if g.group(1):
+ location.append('%d' % (int(g.group(1)) + 1))
+ else:
+ if len(location) > 1:
+ location.append('.')
+ else:
+ location.append('-')
+ location.append(g.group(2))
+ if location:
+ info.location = ''.join(location)
+
+ # friendly name
+ szFriendlyName = byte_buffer(250)
+ if SetupDiGetDeviceRegistryProperty(
+ g_hdi,
+ ctypes.byref(devinfo),
+ SPDRP_FRIENDLYNAME,
+ #~ SPDRP_DEVICEDESC,
+ None,
+ ctypes.byref(szFriendlyName),
+ ctypes.sizeof(szFriendlyName) - 1,
+ None):
+ info.description = string(szFriendlyName)
+ #~ else:
+ # Ignore ERROR_INSUFFICIENT_BUFFER
+ #~ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
+ #~ raise IOError("failed to get details for %s (%s)" % (devinfo, szHardwareID.value))
+ # ignore errors and still include the port in the list, friendly name will be same as port name
+ yield info
+ SetupDiDestroyDeviceInfoList(g_hdi)
+
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# test
+if __name__ == '__main__':
+ for port, desc, hwid in sorted(comports()):
+ print("%s: %s [%s]" % (port, desc, hwid))