diff options
author | Chris Liechti <cliechti@gmx.net> | 2015-09-22 23:13:44 +0200 |
---|---|---|
committer | Chris Liechti <cliechti@gmx.net> | 2015-09-22 23:13:44 +0200 |
commit | f565693c462d9987456fe22b44cce1a7b5036711 (patch) | |
tree | f47db4bdf720acbc1fe3c55eb1a193f374873124 | |
parent | 087bee33c811a93bfe24c731ee599ed9d28c1956 (diff) | |
download | pyserial-git-f565693c462d9987456fe22b44cce1a7b5036711.tar.gz |
fix EOL issues
-rwxr-xr-x | examples/port_publisher.py | 1140 | ||||
-rw-r--r-- | serial/tools/list_ports_linux.py | 312 | ||||
-rw-r--r-- | serial/tools/list_ports_posix.py | 212 | ||||
-rw-r--r-- | serial/tools/list_ports_windows.py | 688 |
4 files changed, 1176 insertions, 1176 deletions
diff --git a/examples/port_publisher.py b/examples/port_publisher.py index d426da0..074a529 100755 --- a/examples/port_publisher.py +++ b/examples/port_publisher.py @@ -1,570 +1,570 @@ -#! /usr/bin/env python
-#
-# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier: BSD-3-Clause
-"""\
-Multi-port serial<->TCP/IP forwarder.
-- RFC 2217
-- check existence of serial port periodically
-- start/stop forwarders
-- each forwarder creates a server socket and opens the serial port
-- serial ports are opened only once. network connect/disconnect
- does not influence serial port
-- only one client per connection
-"""
-import os
-import select
-import socket
-import sys
-import time
-import traceback
-
-import serial
-import serial.rfc2217
-import serial.tools.list_ports
-
-import dbus
-
-# Try to import the avahi service definitions properly. If the avahi module is
-# not available, fall back to a hard-coded solution that hopefully still works.
-try:
- import avahi
-except ImportError:
- class avahi:
- DBUS_NAME = "org.freedesktop.Avahi"
- DBUS_PATH_SERVER = "/"
- DBUS_INTERFACE_SERVER = "org.freedesktop.Avahi.Server"
- DBUS_INTERFACE_ENTRY_GROUP = DBUS_NAME + ".EntryGroup"
- IF_UNSPEC = -1
- PROTO_UNSPEC, PROTO_INET, PROTO_INET6 = -1, 0, 1
-
-
-class ZeroconfService:
- """\
- A simple class to publish a network service with zeroconf using avahi.
- """
-
- def __init__(self, name, port, stype="_http._tcp",
- domain="", host="", text=""):
- self.name = name
- self.stype = stype
- self.domain = domain
- self.host = host
- self.port = port
- self.text = text
- self.group = None
-
- def publish(self):
- bus = dbus.SystemBus()
- server = dbus.Interface(
- bus.get_object(
- avahi.DBUS_NAME,
- avahi.DBUS_PATH_SERVER
- ),
- avahi.DBUS_INTERFACE_SERVER
- )
-
- g = dbus.Interface(
- bus.get_object(
- avahi.DBUS_NAME,
- server.EntryGroupNew()
- ),
- avahi.DBUS_INTERFACE_ENTRY_GROUP
- )
-
- g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
- self.name, self.stype, self.domain, self.host,
- dbus.UInt16(self.port), self.text)
-
- g.Commit()
- self.group = g
-
- def unpublish(self):
- if self.group is not None:
- self.group.Reset()
- self.group = None
-
- def __str__(self):
- return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype)
-
-
-class Forwarder(ZeroconfService):
- """\
- Single port serial<->TCP/IP forarder that depends on an external select
- loop.
- - Buffers for serial -> network and network -> serial
- - RFC 2217 state
- - Zeroconf publish/unpublish on open/close.
- """
-
- def __init__(self, device, name, network_port, on_close=None, log=None):
- ZeroconfService.__init__(self, name, network_port, stype='_serial_port._tcp')
- self.alive = False
- self.network_port = network_port
- self.on_close = on_close
- self.log = log
- self.device = device
- self.serial = serial.Serial()
- self.serial.port = device
- self.serial.baudrate = 115200
- self.serial.timeout = 0
- self.socket = None
- self.server_socket = None
- self.rfc2217 = None # instantiate later, when connecting
-
- def __del__(self):
- try:
- if self.alive:
- self.close()
- except:
- pass # XXX errors on shutdown
-
- def open(self):
- """open serial port, start network server and publish service"""
- self.buffer_net2ser = bytearray()
- self.buffer_ser2net = bytearray()
-
- # open serial port
- try:
- self.serial.rts = False
- self.serial.open()
- except Exception as msg:
- self.handle_serial_error(msg)
-
- self.serial_settings_backup = self.serial.get_settings()
-
- # start the socket server
- # XXX add IPv6 support: use getaddrinfo for socket options, bind to multiple sockets?
- # info_list = socket.getaddrinfo(None, port, 0, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR,
- self.server_socket.getsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR
- ) | 1
- )
- self.server_socket.setblocking(0)
- try:
- self.server_socket.bind(('', self.network_port))
- self.server_socket.listen(1)
- except socket.error as msg:
- self.handle_server_error()
- #~ raise
- if self.log is not None:
- self.log.info("%s: Waiting for connection on %s..." % (self.device, self.network_port))
-
- # zeroconfig
- self.publish()
-
- # now we are ready
- self.alive = True
-
- def close(self):
- """Close all resources and unpublish service"""
- if self.log is not None:
- self.log.info("%s: closing..." % (self.device, ))
- self.alive = False
- self.unpublish()
- if self.server_socket:
- self.server_socket.close()
- if self.socket:
- self.handle_disconnect()
- self.serial.close()
- if self.on_close is not None:
- # ensure it is only called once
- callback = self.on_close
- self.on_close = None
- callback(self)
-
- def write(self, data):
- """the write method is used by serial.rfc2217.PortManager. it has to
- write to the network."""
- self.buffer_ser2net += data
-
- def update_select_maps(self, read_map, write_map, error_map):
- """Update dictionaries for select call. insert fd->callback mapping"""
- if self.alive:
- # always handle serial port reads
- read_map[self.serial] = self.handle_serial_read
- error_map[self.serial] = self.handle_serial_error
- # handle serial port writes if buffer is not empty
- if self.buffer_net2ser:
- write_map[self.serial] = self.handle_serial_write
- # handle network
- if self.socket is not None:
- # handle socket if connected
- # only read from network if the internal buffer is not
- # already filled. the TCP flow control will hold back data
- if len(self.buffer_net2ser) < 2048:
- read_map[self.socket] = self.handle_socket_read
- # only check for write readiness when there is data
- if self.buffer_ser2net:
- write_map[self.socket] = self.handle_socket_write
- error_map[self.socket] = self.handle_socket_error
- else:
- # no connection, ensure clear buffer
- self.buffer_ser2net = bytearray()
- # check the server socket
- read_map[self.server_socket] = self.handle_connect
- error_map[self.server_socket] = self.handle_server_error
-
- def handle_serial_read(self):
- """Reading from serial port"""
- try:
- data = os.read(self.serial.fileno(), 1024)
- if data:
- # store data in buffer if there is a client connected
- if self.socket is not None:
- # escape outgoing data when needed (Telnet IAC (0xff) character)
- if self.rfc2217:
- data = serial.to_bytes(self.rfc2217.escape(data))
- self.buffer_ser2net += data
- else:
- self.handle_serial_error()
- except Exception as msg:
- self.handle_serial_error(msg)
-
- def handle_serial_write(self):
- """Writing to serial port"""
- try:
- # write a chunk
- n = os.write(self.serial.fileno(), bytes(self.buffer_net2ser))
- # and see how large that chunk was, remove that from buffer
- self.buffer_net2ser = self.buffer_net2ser[n:]
- except Exception as msg:
- self.handle_serial_error(msg)
-
- def handle_serial_error(self, error=None):
- """Serial port error"""
- # terminate connection
- self.close()
-
- def handle_socket_read(self):
- """Read from socket"""
- try:
- # read a chunk from the serial port
- data = self.socket.recv(1024)
- if data:
- # Process RFC 2217 stuff when enabled
- if self.rfc2217:
- data = serial.to_bytes(self.rfc2217.filter(data))
- # add data to buffer
- self.buffer_net2ser += data
- else:
- # empty read indicates disconnection
- self.handle_disconnect()
- except socket.error:
- self.handle_socket_error()
-
- def handle_socket_write(self):
- """Write to socket"""
- try:
- # write a chunk
- count = self.socket.send(bytes(self.buffer_ser2net))
- # and remove the sent data from the buffer
- self.buffer_ser2net = self.buffer_ser2net[count:]
- except socket.error:
- self.handle_socket_error()
-
- def handle_socket_error(self):
- """Socket connection fails"""
- self.handle_disconnect()
-
- def handle_connect(self):
- """Server socket gets a connection"""
- # accept a connection in any case, close connection
- # below if already busy
- connection, addr = self.server_socket.accept()
- if self.socket is None:
- self.socket = connection
- # More quickly detect bad clients who quit without closing the
- # connection: After 1 second of idle, start sending TCP keep-alive
- # packets every 1 second. If 3 consecutive keep-alive packets
- # fail, assume the client is gone and close the connection.
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
- self.socket.setblocking(0)
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- if self.log is not None:
- self.log.warning('%s: Connected by %s:%s' % (self.device, addr[0], addr[1]))
- self.serial.rts = True
- self.serial.dtr = True
- if self.log is not None:
- self.rfc2217 = serial.rfc2217.PortManager(self.serial, self, logger=log.getChild(self.device))
- else:
- self.rfc2217 = serial.rfc2217.PortManager(self.serial, self)
- else:
- # reject connection if there is already one
- connection.close()
- if self.log is not None:
- self.log.warning('%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1]))
-
- def handle_server_error(self):
- """Socket server fails"""
- self.close()
-
- def handle_disconnect(self):
- """Socket gets disconnected"""
- # signal disconnected terminal with control lines
- try:
- self.serial.rts = False
- self.serial.dtr = False
- finally:
- # restore original port configuration in case it was changed
- self.serial.apply_settings(self.serial_settings_backup)
- # stop RFC 2217 state machine
- self.rfc2217 = None
- # clear send buffer
- self.buffer_ser2net = bytearray()
- # close network connection
- if self.socket is not None:
- self.socket.close()
- self.socket = None
- if self.log is not None:
- self.log.warning('%s: Disconnected' % self.device)
-
-
-def test():
- service = ZeroconfService(name="TestService", port=3000)
- service.publish()
- raw_input("Press any key to unpublish the service ")
- service.unpublish()
-
-
-if __name__ == '__main__':
- import logging
- import argparse
-
- VERBOSTIY = [
- logging.ERROR, # 0
- logging.WARNING, # 1 (default)
- logging.INFO, # 2
- logging.DEBUG, # 3
- ]
-
- parser = argparse.ArgumentParser(usage="""\
-%(prog)s [options]
-
-Announce the existence of devices using zeroconf and provide
-a TCP/IP <-> serial port gateway (implements RFC 2217).
-
-If running as daemon, write to syslog. Otherwise write to stdout.
-""",
- epilog="""\
-NOTE: no security measures are implemented. Anyone can remotely connect
-to this service over the network.
-
-Only one connection at once, per port, is supported. When the connection is
-terminated, it waits for the next connect.
-""")
-
- group = parser.add_argument_group("serial port settings")
-
- group.add_argument(
- "--ports-regex",
- help="specify a regex to search against the serial devices and their descriptions (default: %(default)s)",
- default='/dev/ttyUSB[0-9]+',
- metavar="REGEX")
-
- group = parser.add_argument_group("network settings")
-
- group.add_argument(
- "--tcp-port",
- dest="base_port",
- help="specify lowest TCP port number (default: %(default)s)",
- default=7000,
- type=int,
- metavar="PORT")
-
- group = parser.add_argument_group("daemon")
-
- group.add_argument(
- "-d", "--daemon",
- dest="daemonize",
- action="store_true",
- help="start as daemon",
- default=False)
-
- group.add_argument(
- "--pidfile",
- help="specify a name for the PID file",
- default=None,
- metavar="FILE")
-
- group = parser.add_argument_group("diagnostics")
-
- group.add_argument(
- "-o", "--logfile",
- help="write messages file instead of stdout",
- default=None,
- metavar="FILE")
-
- group.add_argument(
- "-q", "--quiet",
- dest="verbosity",
- action="store_const",
- const=0,
- help="suppress most diagnostic messages",
- default=1)
-
- group.add_argument(
- "-v", "--verbose",
- dest="verbosity",
- action="count",
- help="increase diagnostic messages")
-
-
- args = parser.parse_args()
-
- # set up logging
- logging.basicConfig(level=VERBOSTIY[min(args.verbosity, len(VERBOSTIY) - 1)])
- log = logging.getLogger('port_publisher')
-
- # redirect output if specified
- if args.logfile is not None:
- class WriteFlushed:
- def __init__(self, fileobj):
- self.fileobj = fileobj
- def write(self, s):
- self.fileobj.write(s)
- self.fileobj.flush()
- def close(self):
- self.fileobj.close()
- sys.stdout = sys.stderr = WriteFlushed(open(args.logfile, 'a'))
- # atexit.register(lambda: sys.stdout.close())
-
- if args.daemonize:
- # if running as daemon is requested, do the fork magic
- # args.quiet = True
- # do the UNIX double-fork magic, see Stevens' "Advanced
- # Programming in the UNIX Environment" for details (ISBN 0201563177)
- try:
- pid = os.fork()
- if pid > 0:
- # exit first parent
- sys.exit(0)
- except OSError as e:
- log.critical("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
- sys.exit(1)
-
- # decouple from parent environment
- os.chdir("/") # don't prevent unmounting....
- os.setsid()
- os.umask(0)
-
- # do second fork
- try:
- pid = os.fork()
- if pid > 0:
- # exit from second parent, save eventual PID before
- if args.pidfile is not None:
- open(args.pidfile, 'w').write("%d" % pid)
- sys.exit(0)
- except OSError as e:
- log.critical("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
- sys.exit(1)
-
- if args.logfile is None:
- import syslog
- syslog.openlog("serial port publisher")
- # redirect output to syslog
- class WriteToSysLog:
- def __init__(self):
- self.buffer = ''
- def write(self, s):
- self.buffer += s
- if '\n' in self.buffer:
- output, self.buffer = self.buffer.split('\n', 1)
- syslog.syslog(output)
- def flush(self):
- syslog.syslog(self.buffer)
- self.buffer = ''
- def close(self):
- self.flush()
- sys.stdout = sys.stderr = WriteToSysLog()
-
- # ensure the that the daemon runs a normal user, if run as root
- #if os.getuid() == 0:
- # name, passwd, uid, gid, desc, home, shell = pwd.getpwnam('someuser')
- # os.setgid(gid) # set group first
- # os.setuid(uid) # set user
-
- # keep the published stuff in a dictionary
- published = {}
- # get a nice hostname
- hostname = socket.gethostname()
-
- def unpublish(forwarder):
- """when forwarders die, we need to unregister them"""
- try:
- del published[forwarder.device]
- except KeyError:
- pass
- else:
- log.info("unpublish: %s" % (forwarder))
-
- alive = True
- next_check = 0
- # main loop
- while alive:
- try:
- # if it is time, check for serial port devices
- now = time.time()
- if now > next_check:
- next_check = now + 5
- connected = [d for d, p, i in serial.tools.list_ports.grep(args.ports_regex)]
- # Handle devices that are published, but no longer connected
- for device in set(published).difference(connected):
- log.info("unpublish: %s" % (published[device]))
- unpublish(published[device])
- # Handle devices that are connected but not yet published
- for device in set(connected).difference(published):
- # Find the first available port, starting from 7000
- port = args.base_port
- ports_in_use = [f.network_port for f in published.values()]
- while port in ports_in_use:
- port += 1
- published[device] = Forwarder(
- device,
- "%s on %s" % (device, hostname),
- port,
- on_close=unpublish,
- log=log
- )
- log.warning("publish: %s" % (published[device]))
- published[device].open()
-
- # select_start = time.time()
- read_map = {}
- write_map = {}
- error_map = {}
- for publisher in published.values():
- publisher.update_select_maps(read_map, write_map, error_map)
- readers, writers, errors = select.select(
- read_map.keys(),
- write_map.keys(),
- error_map.keys(),
- 5
- )
- # select_end = time.time()
- # print "select used %.3f s" % (select_end - select_start)
- for reader in readers:
- read_map[reader]()
- for writer in writers:
- write_map[writer]()
- for error in errors:
- error_map[error]()
- # print "operation used %.3f s" % (time.time() - select_end)
- except KeyboardInterrupt:
- alive = False
- sys.stdout.write('\n')
- except SystemExit:
- raise
- except:
- #~ raise
- traceback.print_exc()
+#! /usr/bin/env python +# +# (C) 2001-2015 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause +"""\ +Multi-port serial<->TCP/IP forwarder. +- RFC 2217 +- check existence of serial port periodically +- start/stop forwarders +- each forwarder creates a server socket and opens the serial port +- serial ports are opened only once. network connect/disconnect + does not influence serial port +- only one client per connection +""" +import os +import select +import socket +import sys +import time +import traceback + +import serial +import serial.rfc2217 +import serial.tools.list_ports + +import dbus + +# Try to import the avahi service definitions properly. If the avahi module is +# not available, fall back to a hard-coded solution that hopefully still works. +try: + import avahi +except ImportError: + class avahi: + DBUS_NAME = "org.freedesktop.Avahi" + DBUS_PATH_SERVER = "/" + DBUS_INTERFACE_SERVER = "org.freedesktop.Avahi.Server" + DBUS_INTERFACE_ENTRY_GROUP = DBUS_NAME + ".EntryGroup" + IF_UNSPEC = -1 + PROTO_UNSPEC, PROTO_INET, PROTO_INET6 = -1, 0, 1 + + +class ZeroconfService: + """\ + A simple class to publish a network service with zeroconf using avahi. + """ + + def __init__(self, name, port, stype="_http._tcp", + domain="", host="", text=""): + self.name = name + self.stype = stype + self.domain = domain + self.host = host + self.port = port + self.text = text + self.group = None + + def publish(self): + bus = dbus.SystemBus() + server = dbus.Interface( + bus.get_object( + avahi.DBUS_NAME, + avahi.DBUS_PATH_SERVER + ), + avahi.DBUS_INTERFACE_SERVER + ) + + g = dbus.Interface( + bus.get_object( + avahi.DBUS_NAME, + server.EntryGroupNew() + ), + avahi.DBUS_INTERFACE_ENTRY_GROUP + ) + + g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0), + self.name, self.stype, self.domain, self.host, + dbus.UInt16(self.port), self.text) + + g.Commit() + self.group = g + + def unpublish(self): + if self.group is not None: + self.group.Reset() + self.group = None + + def __str__(self): + return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype) + + +class Forwarder(ZeroconfService): + """\ + Single port serial<->TCP/IP forarder that depends on an external select + loop. + - Buffers for serial -> network and network -> serial + - RFC 2217 state + - Zeroconf publish/unpublish on open/close. + """ + + def __init__(self, device, name, network_port, on_close=None, log=None): + ZeroconfService.__init__(self, name, network_port, stype='_serial_port._tcp') + self.alive = False + self.network_port = network_port + self.on_close = on_close + self.log = log + self.device = device + self.serial = serial.Serial() + self.serial.port = device + self.serial.baudrate = 115200 + self.serial.timeout = 0 + self.socket = None + self.server_socket = None + self.rfc2217 = None # instantiate later, when connecting + + def __del__(self): + try: + if self.alive: + self.close() + except: + pass # XXX errors on shutdown + + def open(self): + """open serial port, start network server and publish service""" + self.buffer_net2ser = bytearray() + self.buffer_ser2net = bytearray() + + # open serial port + try: + self.serial.rts = False + self.serial.open() + except Exception as msg: + self.handle_serial_error(msg) + + self.serial_settings_backup = self.serial.get_settings() + + # start the socket server + # XXX add IPv6 support: use getaddrinfo for socket options, bind to multiple sockets? + # info_list = socket.getaddrinfo(None, port, 0, socket.SOCK_STREAM, 0, socket.AI_PASSIVE) + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt( + socket.SOL_SOCKET, + socket.SO_REUSEADDR, + self.server_socket.getsockopt( + socket.SOL_SOCKET, + socket.SO_REUSEADDR + ) | 1 + ) + self.server_socket.setblocking(0) + try: + self.server_socket.bind(('', self.network_port)) + self.server_socket.listen(1) + except socket.error as msg: + self.handle_server_error() + #~ raise + if self.log is not None: + self.log.info("%s: Waiting for connection on %s..." % (self.device, self.network_port)) + + # zeroconfig + self.publish() + + # now we are ready + self.alive = True + + def close(self): + """Close all resources and unpublish service""" + if self.log is not None: + self.log.info("%s: closing..." % (self.device, )) + self.alive = False + self.unpublish() + if self.server_socket: + self.server_socket.close() + if self.socket: + self.handle_disconnect() + self.serial.close() + if self.on_close is not None: + # ensure it is only called once + callback = self.on_close + self.on_close = None + callback(self) + + def write(self, data): + """the write method is used by serial.rfc2217.PortManager. it has to + write to the network.""" + self.buffer_ser2net += data + + def update_select_maps(self, read_map, write_map, error_map): + """Update dictionaries for select call. insert fd->callback mapping""" + if self.alive: + # always handle serial port reads + read_map[self.serial] = self.handle_serial_read + error_map[self.serial] = self.handle_serial_error + # handle serial port writes if buffer is not empty + if self.buffer_net2ser: + write_map[self.serial] = self.handle_serial_write + # handle network + if self.socket is not None: + # handle socket if connected + # only read from network if the internal buffer is not + # already filled. the TCP flow control will hold back data + if len(self.buffer_net2ser) < 2048: + read_map[self.socket] = self.handle_socket_read + # only check for write readiness when there is data + if self.buffer_ser2net: + write_map[self.socket] = self.handle_socket_write + error_map[self.socket] = self.handle_socket_error + else: + # no connection, ensure clear buffer + self.buffer_ser2net = bytearray() + # check the server socket + read_map[self.server_socket] = self.handle_connect + error_map[self.server_socket] = self.handle_server_error + + def handle_serial_read(self): + """Reading from serial port""" + try: + data = os.read(self.serial.fileno(), 1024) + if data: + # store data in buffer if there is a client connected + if self.socket is not None: + # escape outgoing data when needed (Telnet IAC (0xff) character) + if self.rfc2217: + data = serial.to_bytes(self.rfc2217.escape(data)) + self.buffer_ser2net += data + else: + self.handle_serial_error() + except Exception as msg: + self.handle_serial_error(msg) + + def handle_serial_write(self): + """Writing to serial port""" + try: + # write a chunk + n = os.write(self.serial.fileno(), bytes(self.buffer_net2ser)) + # and see how large that chunk was, remove that from buffer + self.buffer_net2ser = self.buffer_net2ser[n:] + except Exception as msg: + self.handle_serial_error(msg) + + def handle_serial_error(self, error=None): + """Serial port error""" + # terminate connection + self.close() + + def handle_socket_read(self): + """Read from socket""" + try: + # read a chunk from the serial port + data = self.socket.recv(1024) + if data: + # Process RFC 2217 stuff when enabled + if self.rfc2217: + data = serial.to_bytes(self.rfc2217.filter(data)) + # add data to buffer + self.buffer_net2ser += data + else: + # empty read indicates disconnection + self.handle_disconnect() + except socket.error: + self.handle_socket_error() + + def handle_socket_write(self): + """Write to socket""" + try: + # write a chunk + count = self.socket.send(bytes(self.buffer_ser2net)) + # and remove the sent data from the buffer + self.buffer_ser2net = self.buffer_ser2net[count:] + except socket.error: + self.handle_socket_error() + + def handle_socket_error(self): + """Socket connection fails""" + self.handle_disconnect() + + def handle_connect(self): + """Server socket gets a connection""" + # accept a connection in any case, close connection + # below if already busy + connection, addr = self.server_socket.accept() + if self.socket is None: + self.socket = connection + # More quickly detect bad clients who quit without closing the + # connection: After 1 second of idle, start sending TCP keep-alive + # packets every 1 second. If 3 consecutive keep-alive packets + # fail, assume the client is gone and close the connection. + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3) + self.socket.setblocking(0) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + if self.log is not None: + self.log.warning('%s: Connected by %s:%s' % (self.device, addr[0], addr[1])) + self.serial.rts = True + self.serial.dtr = True + if self.log is not None: + self.rfc2217 = serial.rfc2217.PortManager(self.serial, self, logger=log.getChild(self.device)) + else: + self.rfc2217 = serial.rfc2217.PortManager(self.serial, self) + else: + # reject connection if there is already one + connection.close() + if self.log is not None: + self.log.warning('%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1])) + + def handle_server_error(self): + """Socket server fails""" + self.close() + + def handle_disconnect(self): + """Socket gets disconnected""" + # signal disconnected terminal with control lines + try: + self.serial.rts = False + self.serial.dtr = False + finally: + # restore original port configuration in case it was changed + self.serial.apply_settings(self.serial_settings_backup) + # stop RFC 2217 state machine + self.rfc2217 = None + # clear send buffer + self.buffer_ser2net = bytearray() + # close network connection + if self.socket is not None: + self.socket.close() + self.socket = None + if self.log is not None: + self.log.warning('%s: Disconnected' % self.device) + + +def test(): + service = ZeroconfService(name="TestService", port=3000) + service.publish() + raw_input("Press any key to unpublish the service ") + service.unpublish() + + +if __name__ == '__main__': + import logging + import argparse + + VERBOSTIY = [ + logging.ERROR, # 0 + logging.WARNING, # 1 (default) + logging.INFO, # 2 + logging.DEBUG, # 3 + ] + + parser = argparse.ArgumentParser(usage="""\ +%(prog)s [options] + +Announce the existence of devices using zeroconf and provide +a TCP/IP <-> serial port gateway (implements RFC 2217). + +If running as daemon, write to syslog. Otherwise write to stdout. +""", + epilog="""\ +NOTE: no security measures are implemented. Anyone can remotely connect +to this service over the network. + +Only one connection at once, per port, is supported. When the connection is +terminated, it waits for the next connect. +""") + + group = parser.add_argument_group("serial port settings") + + group.add_argument( + "--ports-regex", + help="specify a regex to search against the serial devices and their descriptions (default: %(default)s)", + default='/dev/ttyUSB[0-9]+', + metavar="REGEX") + + group = parser.add_argument_group("network settings") + + group.add_argument( + "--tcp-port", + dest="base_port", + help="specify lowest TCP port number (default: %(default)s)", + default=7000, + type=int, + metavar="PORT") + + group = parser.add_argument_group("daemon") + + group.add_argument( + "-d", "--daemon", + dest="daemonize", + action="store_true", + help="start as daemon", + default=False) + + group.add_argument( + "--pidfile", + help="specify a name for the PID file", + default=None, + metavar="FILE") + + group = parser.add_argument_group("diagnostics") + + group.add_argument( + "-o", "--logfile", + help="write messages file instead of stdout", + default=None, + metavar="FILE") + + group.add_argument( + "-q", "--quiet", + dest="verbosity", + action="store_const", + const=0, + help="suppress most diagnostic messages", + default=1) + + group.add_argument( + "-v", "--verbose", + dest="verbosity", + action="count", + help="increase diagnostic messages") + + + args = parser.parse_args() + + # set up logging + logging.basicConfig(level=VERBOSTIY[min(args.verbosity, len(VERBOSTIY) - 1)]) + log = logging.getLogger('port_publisher') + + # redirect output if specified + if args.logfile is not None: + class WriteFlushed: + def __init__(self, fileobj): + self.fileobj = fileobj + def write(self, s): + self.fileobj.write(s) + self.fileobj.flush() + def close(self): + self.fileobj.close() + sys.stdout = sys.stderr = WriteFlushed(open(args.logfile, 'a')) + # atexit.register(lambda: sys.stdout.close()) + + if args.daemonize: + # if running as daemon is requested, do the fork magic + # args.quiet = True + # do the UNIX double-fork magic, see Stevens' "Advanced + # Programming in the UNIX Environment" for details (ISBN 0201563177) + try: + pid = os.fork() + if pid > 0: + # exit first parent + sys.exit(0) + except OSError as e: + log.critical("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # decouple from parent environment + os.chdir("/") # don't prevent unmounting.... + os.setsid() + os.umask(0) + + # do second fork + try: + pid = os.fork() + if pid > 0: + # exit from second parent, save eventual PID before + if args.pidfile is not None: + open(args.pidfile, 'w').write("%d" % pid) + sys.exit(0) + except OSError as e: + log.critical("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + if args.logfile is None: + import syslog + syslog.openlog("serial port publisher") + # redirect output to syslog + class WriteToSysLog: + def __init__(self): + self.buffer = '' + def write(self, s): + self.buffer += s + if '\n' in self.buffer: + output, self.buffer = self.buffer.split('\n', 1) + syslog.syslog(output) + def flush(self): + syslog.syslog(self.buffer) + self.buffer = '' + def close(self): + self.flush() + sys.stdout = sys.stderr = WriteToSysLog() + + # ensure the that the daemon runs a normal user, if run as root + #if os.getuid() == 0: + # name, passwd, uid, gid, desc, home, shell = pwd.getpwnam('someuser') + # os.setgid(gid) # set group first + # os.setuid(uid) # set user + + # keep the published stuff in a dictionary + published = {} + # get a nice hostname + hostname = socket.gethostname() + + def unpublish(forwarder): + """when forwarders die, we need to unregister them""" + try: + del published[forwarder.device] + except KeyError: + pass + else: + log.info("unpublish: %s" % (forwarder)) + + alive = True + next_check = 0 + # main loop + while alive: + try: + # if it is time, check for serial port devices + now = time.time() + if now > next_check: + next_check = now + 5 + connected = [d for d, p, i in serial.tools.list_ports.grep(args.ports_regex)] + # Handle devices that are published, but no longer connected + for device in set(published).difference(connected): + log.info("unpublish: %s" % (published[device])) + unpublish(published[device]) + # Handle devices that are connected but not yet published + for device in set(connected).difference(published): + # Find the first available port, starting from 7000 + port = args.base_port + ports_in_use = [f.network_port for f in published.values()] + while port in ports_in_use: + port += 1 + published[device] = Forwarder( + device, + "%s on %s" % (device, hostname), + port, + on_close=unpublish, + log=log + ) + log.warning("publish: %s" % (published[device])) + published[device].open() + + # select_start = time.time() + read_map = {} + write_map = {} + error_map = {} + for publisher in published.values(): + publisher.update_select_maps(read_map, write_map, error_map) + readers, writers, errors = select.select( + read_map.keys(), + write_map.keys(), + error_map.keys(), + 5 + ) + # select_end = time.time() + # print "select used %.3f s" % (select_end - select_start) + for reader in readers: + read_map[reader]() + for writer in writers: + write_map[writer]() + for error in errors: + error_map[error]() + # print "operation used %.3f s" % (time.time() - select_end) + except KeyboardInterrupt: + alive = False + sys.stdout.write('\n') + except SystemExit: + raise + except: + #~ raise + traceback.print_exc() diff --git a/serial/tools/list_ports_linux.py b/serial/tools/list_ports_linux.py index f0d570a..4d1d432 100644 --- a/serial/tools/list_ports_linux.py +++ b/serial/tools/list_ports_linux.py @@ -1,156 +1,156 @@ -#!/usr/bin/env python
-#
-# portable serial port access with python
-#
-# This is a module that gathers a list of serial ports including details on
-# GNU/Linux systems.
-# The comports function is expected to return an iterable that yields tuples of
-# 3 strings: port name, human readable description and a hardware ID.
-#
-# (C) 2011-2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier: BSD-3-Clause
-
-import glob
-import re
-import os
-
-
-def numsplit(text):
- """\
- Convert string into a list of texts and numbers in order to support a
- natural sorting.
- """
- result = []
- for group in re.split(r'(\d+)', text):
- if group:
- try:
- group = int(group)
- except ValueError:
- pass
- result.append(group)
- return result
-
-
-class SysFS(object):
- """Wrapper for easy sysfs access and device info"""
-
- def __init__(self, dev_path):
- self.dev = dev_path
- self.name = os.path.basename(self.dev)
- self.subsystem = None
- self.device_path = None
- self.usb_device_path = None
- if os.path.exists('/sys/class/tty/%s/device' % (self.name,)):
- self.device_path = os.path.realpath('/sys/class/tty/%s/device' % (self.name,))
- self.subsystem = os.path.basename(os.path.realpath(os.path.join(self.device_path, 'subsystem')))
- if self.subsystem == 'usb-serial':
- self.usb_device_path = os.path.dirname(os.path.dirname(self.device_path))
- elif self.subsystem == 'usb':
- self.usb_device_path = os.path.dirname(self.device_path)
- else:
- self.usb_device_path = None
- #~ print repr(self.__dict__)
- if self.usb_device_path is not None:
- self.vid = self.read_line(self.usb_device_path, 'idVendor').upper()
- self.pid = self.read_line(self.usb_device_path, 'idProduct').upper()
- self.serial = self.read_line(self.usb_device_path, 'serial')
- self.location = os.path.basename(self.usb_device_path)
- else:
- self.vid = None
- self.pid = None
- self.serial = None
- self.location = None
-
- def read_line(self, *args):
- """\
- Helper function to read a single line from a file.
- One or more parameters are allowed, they are joined with os.path.join.
- Returns None on errors..
- """
- try:
- with open(os.path.join(*args)) as f:
- line = f.readline().strip()
- return line
- except IOError:
- return None
-
- def describe(self):
- if self.subsystem == 'usb-serial':
- return self.read_line(self.usb_device_path, 'product')
- elif self.subsystem == 'usb': # CDC/ACM devices
- interface = self.read_line(self.device_path, 'interface')
- if interface is not None:
- return interface
- else:
- return self.read_line(self.usb_device_path, 'product')
- elif self.subsystem == 'pnp': # PCI based devices
- return self.name
- else:
- return 'n/a'
-
- def describe_full(self):
- """Get a human readable string"""
- if self.subsystem == 'usb-serial':
- return '{} - {}'.format(
- self.read_line(self.usb_device_path, 'manufacturer'),
- self.read_line(self.usb_device_path, 'product'),
- )
- elif self.subsystem == 'usb': # CDC/ACM devices
- interface = self.read_line(self.device_path, 'interface')
- return '{} - {}{}'.format(
- self.read_line(self.usb_device_path, 'manufacturer'),
- self.read_line(self.usb_device_path, 'product'),
- ' - {}'.format(interface) if interface is not None else '',
- )
- elif self.subsystem == 'pnp': # PCI based devices
- return self.name
- else:
- return 'n/a'
-
- def hwinfo(self):
- """Get a hardware description string"""
- if self.subsystem in ('usb', 'usb-serial'):
- return 'USB VID:PID={}:{}{}{}'.format(
- self.vid,
- self.pid,
- ' SER={}'.format(self.serial) if self.serial is not None else '',
- ' LOCATION={}'.format(self.location) if self.location is not None else '',
- )
- elif self.subsystem == 'pnp': # PCI based devices
- return self.read_line(self.device_path, 'id')
- else:
- return 'n/a'
-
- def __eq__(self, other):
- return self.dev == other.dev
-
- def __lt__(self, other):
- return numsplit(self.dev) < numsplit(other.dev)
-
- def __getitem__(self, index):
- """Item access: backwards compatible -> (port, desc, hwid)"""
- if index == 0:
- return self.dev
- elif index == 1:
- return self.describe()
- elif index == 2:
- return self.hwinfo()
- else:
- raise IndexError('{} > 2'.format(index))
-
-
-def comports():
- devices = glob.glob('/dev/ttyS*') # built-in serial ports
- devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver
- devices.extend(glob.glob('/dev/ttyACM*')) # usb-serial with CDC-ACM profile
- devices.extend(glob.glob('/dev/rfcomm*')) # BT serial devices
- return [info
- for info in [SysFS(d) for d in devices]
- if info.subsystem != "platform"] # hide non-present internal serial ports
-
-# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-# test
-if __name__ == '__main__':
- for port, desc, hwid in sorted(comports()):
- print("%s: %s [%s]" % (port, desc, hwid))
+#!/usr/bin/env python +# +# portable serial port access with python +# +# This is a module that gathers a list of serial ports including details on +# GNU/Linux systems. +# The comports function is expected to return an iterable that yields tuples of +# 3 strings: port name, human readable description and a hardware ID. +# +# (C) 2011-2015 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause + +import glob +import re +import os + + +def numsplit(text): + """\ + Convert string into a list of texts and numbers in order to support a + natural sorting. + """ + result = [] + for group in re.split(r'(\d+)', text): + if group: + try: + group = int(group) + except ValueError: + pass + result.append(group) + return result + + +class SysFS(object): + """Wrapper for easy sysfs access and device info""" + + def __init__(self, dev_path): + self.dev = dev_path + self.name = os.path.basename(self.dev) + self.subsystem = None + self.device_path = None + self.usb_device_path = None + if os.path.exists('/sys/class/tty/%s/device' % (self.name,)): + self.device_path = os.path.realpath('/sys/class/tty/%s/device' % (self.name,)) + self.subsystem = os.path.basename(os.path.realpath(os.path.join(self.device_path, 'subsystem'))) + if self.subsystem == 'usb-serial': + self.usb_device_path = os.path.dirname(os.path.dirname(self.device_path)) + elif self.subsystem == 'usb': + self.usb_device_path = os.path.dirname(self.device_path) + else: + self.usb_device_path = None + #~ print repr(self.__dict__) + if self.usb_device_path is not None: + self.vid = self.read_line(self.usb_device_path, 'idVendor').upper() + self.pid = self.read_line(self.usb_device_path, 'idProduct').upper() + self.serial = self.read_line(self.usb_device_path, 'serial') + self.location = os.path.basename(self.usb_device_path) + else: + self.vid = None + self.pid = None + self.serial = None + self.location = None + + def read_line(self, *args): + """\ + Helper function to read a single line from a file. + One or more parameters are allowed, they are joined with os.path.join. + Returns None on errors.. + """ + try: + with open(os.path.join(*args)) as f: + line = f.readline().strip() + return line + except IOError: + return None + + def describe(self): + if self.subsystem == 'usb-serial': + return self.read_line(self.usb_device_path, 'product') + elif self.subsystem == 'usb': # CDC/ACM devices + interface = self.read_line(self.device_path, 'interface') + if interface is not None: + return interface + else: + return self.read_line(self.usb_device_path, 'product') + elif self.subsystem == 'pnp': # PCI based devices + return self.name + else: + return 'n/a' + + def describe_full(self): + """Get a human readable string""" + if self.subsystem == 'usb-serial': + return '{} - {}'.format( + self.read_line(self.usb_device_path, 'manufacturer'), + self.read_line(self.usb_device_path, 'product'), + ) + elif self.subsystem == 'usb': # CDC/ACM devices + interface = self.read_line(self.device_path, 'interface') + return '{} - {}{}'.format( + self.read_line(self.usb_device_path, 'manufacturer'), + self.read_line(self.usb_device_path, 'product'), + ' - {}'.format(interface) if interface is not None else '', + ) + elif self.subsystem == 'pnp': # PCI based devices + return self.name + else: + return 'n/a' + + def hwinfo(self): + """Get a hardware description string""" + if self.subsystem in ('usb', 'usb-serial'): + return 'USB VID:PID={}:{}{}{}'.format( + self.vid, + self.pid, + ' SER={}'.format(self.serial) if self.serial is not None else '', + ' LOCATION={}'.format(self.location) if self.location is not None else '', + ) + elif self.subsystem == 'pnp': # PCI based devices + return self.read_line(self.device_path, 'id') + else: + return 'n/a' + + def __eq__(self, other): + return self.dev == other.dev + + def __lt__(self, other): + return numsplit(self.dev) < numsplit(other.dev) + + def __getitem__(self, index): + """Item access: backwards compatible -> (port, desc, hwid)""" + if index == 0: + return self.dev + elif index == 1: + return self.describe() + elif index == 2: + return self.hwinfo() + else: + raise IndexError('{} > 2'.format(index)) + + +def comports(): + devices = glob.glob('/dev/ttyS*') # built-in serial ports + devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver + devices.extend(glob.glob('/dev/ttyACM*')) # usb-serial with CDC-ACM profile + devices.extend(glob.glob('/dev/rfcomm*')) # BT serial devices + return [info + for info in [SysFS(d) for d in devices] + if info.subsystem != "platform"] # hide non-present internal serial ports + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +# test +if __name__ == '__main__': + for port, desc, hwid in sorted(comports()): + print("%s: %s [%s]" % (port, desc, hwid)) diff --git a/serial/tools/list_ports_posix.py b/serial/tools/list_ports_posix.py index 9b467c4..0495971 100644 --- a/serial/tools/list_ports_posix.py +++ b/serial/tools/list_ports_posix.py @@ -1,106 +1,106 @@ -#!/usr/bin/env python
-
-# portable serial port access with python
-
-# This is a module that gathers a list of serial ports on POSIXy systems.
-# For some specific implementations, see also list_ports_linux, list_ports_osx
-#
-# this is a wrapper module for different platform implementations of the
-# port enumeration feature
-#
-# (C) 2011-2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier: BSD-3-Clause
-
-"""\
-The ``comports`` function is expected to return an iterable that yields tuples
-of 3 strings: port name, human readable description and a hardware ID.
-
-As currently no method is known to get the second two strings easily, they are
-currently just identical to the port name.
-"""
-
-import glob
-import sys
-import os
-
-# try to detect the OS so that a device can be selected...
-plat = sys.platform.lower()
-
-if plat[:5] == 'linux': # Linux (confirmed)
- from serial.tools.list_ports_linux import comports
-
-elif plat == 'cygwin': # cygwin/win32
- # cygwin accepts /dev/com* in many contexts
- # (such as 'open' call, explicit 'ls'), but 'glob.glob'
- # and bare 'ls' do not; so use /dev/ttyS* instead
- def comports():
- devices = glob.glob('/dev/ttyS*')
- return [(d, d, d) for d in devices]
-
-elif plat[:7] == 'openbsd': # OpenBSD
- def comports():
- devices = glob.glob('/dev/cua*')
- return [(d, d, d) for d in devices]
-
-elif plat[:3] == 'bsd' or \
- plat[:7] == 'freebsd':
-
- def comports():
- devices = glob.glob('/dev/cua*[!.init][!.lock]')
- return [(d, d, d) for d in devices]
-
-elif plat[:6] == 'darwin': # OS X (confirmed)
- from serial.tools.list_ports_osx import comports
-
-elif plat[:6] == 'netbsd': # NetBSD
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/dty*')
- return [(d, d, d) for d in devices]
-
-elif plat[:4] == 'irix': # IRIX
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/ttyf*')
- return [(d, d, d) for d in devices]
-
-elif plat[:2] == 'hp': # HP-UX (not tested)
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/tty*p0')
- return [(d, d, d) for d in devices]
-
-elif plat[:5] == 'sunos': # Solaris/SunOS
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/tty*c')
- return [(d, d, d) for d in devices]
-
-elif plat[:3] == 'aix': # AIX
- def comports():
- """scan for available ports. return a list of device names."""
- devices = glob.glob('/dev/tty*')
- return [(d, d, d) for d in devices]
-
-else:
- # platform detection has failed...
- import serial
- sys.stderr.write("""\
-don't know how to enumerate ttys on this system.
-! I you know how the serial ports are named send this information to
-! the author of this module:
-
-sys.platform = %r
-os.name = %r
-pySerial version = %s
-
-also add the naming scheme of the serial ports and with a bit luck you can get
-this module running...
-""" % (sys.platform, os.name, serial.VERSION))
- raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
-
-# test
-if __name__ == '__main__':
- for port, desc, hwid in sorted(comports()):
- print("%s: %s [%s]" % (port, desc, hwid))
+#!/usr/bin/env python + +# portable serial port access with python + +# This is a module that gathers a list of serial ports on POSIXy systems. +# For some specific implementations, see also list_ports_linux, list_ports_osx +# +# this is a wrapper module for different platform implementations of the +# port enumeration feature +# +# (C) 2011-2015 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause + +"""\ +The ``comports`` function is expected to return an iterable that yields tuples +of 3 strings: port name, human readable description and a hardware ID. + +As currently no method is known to get the second two strings easily, they are +currently just identical to the port name. +""" + +import glob +import sys +import os + +# try to detect the OS so that a device can be selected... +plat = sys.platform.lower() + +if plat[:5] == 'linux': # Linux (confirmed) + from serial.tools.list_ports_linux import comports + +elif plat == 'cygwin': # cygwin/win32 + # cygwin accepts /dev/com* in many contexts + # (such as 'open' call, explicit 'ls'), but 'glob.glob' + # and bare 'ls' do not; so use /dev/ttyS* instead + def comports(): + devices = glob.glob('/dev/ttyS*') + return [(d, d, d) for d in devices] + +elif plat[:7] == 'openbsd': # OpenBSD + def comports(): + devices = glob.glob('/dev/cua*') + return [(d, d, d) for d in devices] + +elif plat[:3] == 'bsd' or \ + plat[:7] == 'freebsd': + + def comports(): + devices = glob.glob('/dev/cua*[!.init][!.lock]') + return [(d, d, d) for d in devices] + +elif plat[:6] == 'darwin': # OS X (confirmed) + from serial.tools.list_ports_osx import comports + +elif plat[:6] == 'netbsd': # NetBSD + def comports(): + """scan for available ports. return a list of device names.""" + devices = glob.glob('/dev/dty*') + return [(d, d, d) for d in devices] + +elif plat[:4] == 'irix': # IRIX + def comports(): + """scan for available ports. return a list of device names.""" + devices = glob.glob('/dev/ttyf*') + return [(d, d, d) for d in devices] + +elif plat[:2] == 'hp': # HP-UX (not tested) + def comports(): + """scan for available ports. return a list of device names.""" + devices = glob.glob('/dev/tty*p0') + return [(d, d, d) for d in devices] + +elif plat[:5] == 'sunos': # Solaris/SunOS + def comports(): + """scan for available ports. return a list of device names.""" + devices = glob.glob('/dev/tty*c') + return [(d, d, d) for d in devices] + +elif plat[:3] == 'aix': # AIX + def comports(): + """scan for available ports. return a list of device names.""" + devices = glob.glob('/dev/tty*') + return [(d, d, d) for d in devices] + +else: + # platform detection has failed... + import serial + sys.stderr.write("""\ +don't know how to enumerate ttys on this system. +! I you know how the serial ports are named send this information to +! the author of this module: + +sys.platform = %r +os.name = %r +pySerial version = %s + +also add the naming scheme of the serial ports and with a bit luck you can get +this module running... +""" % (sys.platform, os.name, serial.VERSION)) + raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,)) + +# test +if __name__ == '__main__': + for port, desc, hwid in sorted(comports()): + print("%s: %s [%s]" % (port, desc, hwid)) diff --git a/serial/tools/list_ports_windows.py b/serial/tools/list_ports_windows.py index e39b086..837ff57 100644 --- a/serial/tools/list_ports_windows.py +++ b/serial/tools/list_ports_windows.py @@ -1,344 +1,344 @@ -#! python
-#
-# Enumerate serial ports on Windows including a human readable description
-# and hardware information.
-#
-# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier: BSD-3-Clause
-
-import re
-import ctypes
-from ctypes.wintypes import BOOL
-from ctypes.wintypes import HWND
-from ctypes.wintypes import DWORD
-from ctypes.wintypes import WORD
-from ctypes.wintypes import LONG
-from ctypes.wintypes import ULONG
-from ctypes.wintypes import LPCSTR
-from ctypes.wintypes import HKEY
-from ctypes.wintypes import BYTE
-import serial
-from serial.win32 import ULONG_PTR
-
-
-def numsplit(text):
- """\
- Convert string into a list of texts and numbers in order to support a
- natural sorting.
- """
- result = []
- for group in re.split(r'(\d+)', text):
- if group:
- try:
- group = int(group)
- except ValueError:
- pass
- result.append(group)
- return result
-
-
-def ValidHandle(value, func, arguments):
- if value == 0:
- raise ctypes.WinError()
- return value
-
-NULL = 0
-HDEVINFO = ctypes.c_void_p
-PCTSTR = ctypes.c_char_p
-PTSTR = ctypes.c_void_p
-CHAR = ctypes.c_char
-LPDWORD = PDWORD = ctypes.POINTER(DWORD)
-#~ LPBYTE = PBYTE = ctypes.POINTER(BYTE)
-LPBYTE = PBYTE = ctypes.c_void_p # XXX avoids error about types
-
-ACCESS_MASK = DWORD
-REGSAM = ACCESS_MASK
-
-
-def byte_buffer(length):
- """Get a buffer for a string"""
- return (BYTE*length)()
-
-
-def string(buffer):
- s = []
- for c in buffer:
- if c == 0:
- break
- s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
- return ''.join(s)
-
-
-class GUID(ctypes.Structure):
- _fields_ = [
- ('Data1', DWORD),
- ('Data2', WORD),
- ('Data3', WORD),
- ('Data4', BYTE*8),
- ]
-
- def __str__(self):
- return "{%08x-%04x-%04x-%s-%s}" % (
- self.Data1,
- self.Data2,
- self.Data3,
- ''.join(["%02x" % d for d in self.Data4[:2]]),
- ''.join(["%02x" % d for d in self.Data4[2:]]),
- )
-
-
-class SP_DEVINFO_DATA(ctypes.Structure):
- _fields_ = [
- ('cbSize', DWORD),
- ('ClassGuid', GUID),
- ('DevInst', DWORD),
- ('Reserved', ULONG_PTR),
- ]
-
- def __str__(self):
- return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
-
-PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
-
-PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
-
-setupapi = ctypes.windll.LoadLibrary("setupapi")
-SetupDiDestroyDeviceInfoList = setupapi.SetupDiDestroyDeviceInfoList
-SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
-SetupDiDestroyDeviceInfoList.restype = BOOL
-
-SetupDiClassGuidsFromName = setupapi.SetupDiClassGuidsFromNameA
-SetupDiClassGuidsFromName.argtypes = [PCTSTR, ctypes.POINTER(GUID), DWORD, PDWORD]
-SetupDiClassGuidsFromName.restype = BOOL
-
-SetupDiEnumDeviceInfo = setupapi.SetupDiEnumDeviceInfo
-SetupDiEnumDeviceInfo.argtypes = [HDEVINFO, DWORD, PSP_DEVINFO_DATA]
-SetupDiEnumDeviceInfo.restype = BOOL
-
-SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsA
-SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
-SetupDiGetClassDevs.restype = HDEVINFO
-SetupDiGetClassDevs.errcheck = ValidHandle
-
-SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyA
-SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
-SetupDiGetDeviceRegistryProperty.restype = BOOL
-
-SetupDiGetDeviceInstanceId = setupapi.SetupDiGetDeviceInstanceIdA
-SetupDiGetDeviceInstanceId.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, PTSTR, DWORD, PDWORD]
-SetupDiGetDeviceInstanceId.restype = BOOL
-
-SetupDiOpenDevRegKey = setupapi.SetupDiOpenDevRegKey
-SetupDiOpenDevRegKey.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, DWORD, DWORD, REGSAM]
-SetupDiOpenDevRegKey.restype = HKEY
-
-advapi32 = ctypes.windll.LoadLibrary("Advapi32")
-RegCloseKey = advapi32.RegCloseKey
-RegCloseKey.argtypes = [HKEY]
-RegCloseKey.restype = LONG
-
-RegQueryValueEx = advapi32.RegQueryValueExA
-RegQueryValueEx.argtypes = [HKEY, LPCSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD]
-RegQueryValueEx.restype = LONG
-
-
-DIGCF_PRESENT = 2
-DIGCF_DEVICEINTERFACE = 16
-INVALID_HANDLE_VALUE = 0
-ERROR_INSUFFICIENT_BUFFER = 122
-SPDRP_HARDWAREID = 1
-SPDRP_FRIENDLYNAME = 12
-SPDRP_LOCATION_PATHS = 35
-DICS_FLAG_GLOBAL = 1
-DIREG_DEV = 0x00000001
-KEY_READ = 0x20019
-
-# workaround for compatibility between Python 2.x and 3.x
-Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports"
-PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
-
-
-class WinInfo(object):
- """Wrapper for device info"""
-
- def __init__(self, dev):
- self.dev = dev
- self.description = None
- self.hwid = None
- self.vid = None
- self.pid = None
- self.serial = None
- self.location = None
-
- def describe(self):
- """Get a human readable string"""
- return self.description if self.description is not None else 'n/a'
-
- def hwinfo(self):
- """Get a hardware description string"""
- if self.vid is not None:
- return 'USB VID:PID={}:{}{}{}'.format(
- self.vid,
- self.pid,
- ' SER={}'.format(self.serial) if self.serial is not None else '',
- ' LOCATION={}'.format(self.location) if self.location is not None else '',
- )
- else:
- return self.hwid
-
- def __eq__(self, other):
- return self.dev == other.dev
-
- def __lt__(self, other):
- return numsplit(self.dev) < numsplit(other.dev)
-
- def __getitem__(self, index):
- """Item access: backwards compatible -> (port, desc, hwid)"""
- if index == 0:
- return self.dev
- elif index == 1:
- return self.describe()
- elif index == 2:
- return self.hwinfo()
- else:
- raise IndexError('{} > 2'.format(index))
-
-
-def comports():
- GUIDs = (GUID*8)() # so far only seen one used, so hope 8 are enough...
- guids_size = DWORD()
- if not SetupDiClassGuidsFromName(
- Ports,
- GUIDs,
- ctypes.sizeof(GUIDs),
- ctypes.byref(guids_size)):
- raise ctypes.WinError()
-
- # repeat for all possible GUIDs
- for index in range(guids_size.value):
- g_hdi = SetupDiGetClassDevs(
- ctypes.byref(GUIDs[index]),
- None,
- NULL,
- DIGCF_PRESENT) # was DIGCF_PRESENT|DIGCF_DEVICEINTERFACE which misses CDC ports
-
- devinfo = SP_DEVINFO_DATA()
- devinfo.cbSize = ctypes.sizeof(devinfo)
- index = 0
- while SetupDiEnumDeviceInfo(g_hdi, index, ctypes.byref(devinfo)):
- index += 1
-
- # get the real com port name
- hkey = SetupDiOpenDevRegKey(
- g_hdi,
- ctypes.byref(devinfo),
- DICS_FLAG_GLOBAL,
- 0,
- DIREG_DEV, # DIREG_DRV for SW info
- KEY_READ)
- port_name_buffer = byte_buffer(250)
- port_name_length = ULONG(ctypes.sizeof(port_name_buffer))
- RegQueryValueEx(
- hkey,
- PortName,
- None,
- None,
- ctypes.byref(port_name_buffer),
- ctypes.byref(port_name_length))
- RegCloseKey(hkey)
-
- # unfortunately does this method also include parallel ports.
- # we could check for names starting with COM or just exclude LPT
- # and hope that other "unknown" names are serial ports...
- if string(port_name_buffer).startswith('LPT'):
- continue
-
- # hardware ID
- szHardwareID = byte_buffer(250)
- # try to get ID that includes serial number
- if not SetupDiGetDeviceInstanceId(
- g_hdi,
- ctypes.byref(devinfo),
- ctypes.byref(szHardwareID),
- ctypes.sizeof(szHardwareID) - 1,
- None):
- # fall back to more generic hardware ID if that would fail
- if not SetupDiGetDeviceRegistryProperty(
- g_hdi,
- ctypes.byref(devinfo),
- SPDRP_HARDWAREID,
- None,
- ctypes.byref(szHardwareID),
- ctypes.sizeof(szHardwareID) - 1,
- None):
- # Ignore ERROR_INSUFFICIENT_BUFFER
- if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
- raise ctypes.WinError()
- # stringify
- szHardwareID_str = string(szHardwareID)
-
- info = WinInfo(string(port_name_buffer))
- info.hwid = szHardwareID_str
-
- # in case of USB, make a more readable string, similar to that form
- # that we also generate on other platforms
- if szHardwareID_str.startswith('USB'):
- m = re.search(r'VID_([0-9a-f]{4})&PID_([0-9a-f]{4})(\\(\w+))?', szHardwareID_str, re.I)
- if m:
- info.vid = m.group(1)
- info.pid = m.group(2)
- if m.group(4):
- info.serial = m.group(4)
- # calculate a location string
- # XXX was empty in tests with (internal) USB3 hub :(
- loc_path_str = byte_buffer(250)
- if SetupDiGetDeviceRegistryProperty(
- g_hdi,
- ctypes.byref(devinfo),
- SPDRP_LOCATION_PATHS,
- None,
- ctypes.byref(loc_path_str),
- ctypes.sizeof(loc_path_str) - 1,
- None):
- #~ print (string(loc_path_str))
- m = re.finditer(r'USBROOT\((\w+)\)|#USB\((\w+)\)', string(loc_path_str))
- location = []
- for g in m:
- if g.group(1):
- location.append('%d' % (int(g.group(1)) + 1))
- else:
- if len(location) > 1:
- location.append('.')
- else:
- location.append('-')
- location.append(g.group(2))
- if location:
- info.location = ''.join(location)
-
- # friendly name
- szFriendlyName = byte_buffer(250)
- if SetupDiGetDeviceRegistryProperty(
- g_hdi,
- ctypes.byref(devinfo),
- SPDRP_FRIENDLYNAME,
- #~ SPDRP_DEVICEDESC,
- None,
- ctypes.byref(szFriendlyName),
- ctypes.sizeof(szFriendlyName) - 1,
- None):
- info.description = string(szFriendlyName)
- #~ else:
- # Ignore ERROR_INSUFFICIENT_BUFFER
- #~ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
- #~ raise IOError("failed to get details for %s (%s)" % (devinfo, szHardwareID.value))
- # ignore errors and still include the port in the list, friendly name will be same as port name
- yield info
- SetupDiDestroyDeviceInfoList(g_hdi)
-
-
-# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-# test
-if __name__ == '__main__':
- for port, desc, hwid in sorted(comports()):
- print("%s: %s [%s]" % (port, desc, hwid))
+#! python +# +# Enumerate serial ports on Windows including a human readable description +# and hardware information. +# +# (C) 2001-2015 Chris Liechti <cliechti@gmx.net> +# +# SPDX-License-Identifier: BSD-3-Clause + +import re +import ctypes +from ctypes.wintypes import BOOL +from ctypes.wintypes import HWND +from ctypes.wintypes import DWORD +from ctypes.wintypes import WORD +from ctypes.wintypes import LONG +from ctypes.wintypes import ULONG +from ctypes.wintypes import LPCSTR +from ctypes.wintypes import HKEY +from ctypes.wintypes import BYTE +import serial +from serial.win32 import ULONG_PTR + + +def numsplit(text): + """\ + Convert string into a list of texts and numbers in order to support a + natural sorting. + """ + result = [] + for group in re.split(r'(\d+)', text): + if group: + try: + group = int(group) + except ValueError: + pass + result.append(group) + return result + + +def ValidHandle(value, func, arguments): + if value == 0: + raise ctypes.WinError() + return value + +NULL = 0 +HDEVINFO = ctypes.c_void_p +PCTSTR = ctypes.c_char_p +PTSTR = ctypes.c_void_p +CHAR = ctypes.c_char +LPDWORD = PDWORD = ctypes.POINTER(DWORD) +#~ LPBYTE = PBYTE = ctypes.POINTER(BYTE) +LPBYTE = PBYTE = ctypes.c_void_p # XXX avoids error about types + +ACCESS_MASK = DWORD +REGSAM = ACCESS_MASK + + +def byte_buffer(length): + """Get a buffer for a string""" + return (BYTE*length)() + + +def string(buffer): + s = [] + for c in buffer: + if c == 0: + break + s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned + return ''.join(s) + + +class GUID(ctypes.Structure): + _fields_ = [ + ('Data1', DWORD), + ('Data2', WORD), + ('Data3', WORD), + ('Data4', BYTE*8), + ] + + def __str__(self): + return "{%08x-%04x-%04x-%s-%s}" % ( + self.Data1, + self.Data2, + self.Data3, + ''.join(["%02x" % d for d in self.Data4[:2]]), + ''.join(["%02x" % d for d in self.Data4[2:]]), + ) + + +class SP_DEVINFO_DATA(ctypes.Structure): + _fields_ = [ + ('cbSize', DWORD), + ('ClassGuid', GUID), + ('DevInst', DWORD), + ('Reserved', ULONG_PTR), + ] + + def __str__(self): + return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst) + +PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA) + +PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p + +setupapi = ctypes.windll.LoadLibrary("setupapi") +SetupDiDestroyDeviceInfoList = setupapi.SetupDiDestroyDeviceInfoList +SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO] +SetupDiDestroyDeviceInfoList.restype = BOOL + +SetupDiClassGuidsFromName = setupapi.SetupDiClassGuidsFromNameA +SetupDiClassGuidsFromName.argtypes = [PCTSTR, ctypes.POINTER(GUID), DWORD, PDWORD] +SetupDiClassGuidsFromName.restype = BOOL + +SetupDiEnumDeviceInfo = setupapi.SetupDiEnumDeviceInfo +SetupDiEnumDeviceInfo.argtypes = [HDEVINFO, DWORD, PSP_DEVINFO_DATA] +SetupDiEnumDeviceInfo.restype = BOOL + +SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsA +SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD] +SetupDiGetClassDevs.restype = HDEVINFO +SetupDiGetClassDevs.errcheck = ValidHandle + +SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyA +SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD] +SetupDiGetDeviceRegistryProperty.restype = BOOL + +SetupDiGetDeviceInstanceId = setupapi.SetupDiGetDeviceInstanceIdA +SetupDiGetDeviceInstanceId.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, PTSTR, DWORD, PDWORD] +SetupDiGetDeviceInstanceId.restype = BOOL + +SetupDiOpenDevRegKey = setupapi.SetupDiOpenDevRegKey +SetupDiOpenDevRegKey.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, DWORD, DWORD, REGSAM] +SetupDiOpenDevRegKey.restype = HKEY + +advapi32 = ctypes.windll.LoadLibrary("Advapi32") +RegCloseKey = advapi32.RegCloseKey +RegCloseKey.argtypes = [HKEY] +RegCloseKey.restype = LONG + +RegQueryValueEx = advapi32.RegQueryValueExA +RegQueryValueEx.argtypes = [HKEY, LPCSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD] +RegQueryValueEx.restype = LONG + + +DIGCF_PRESENT = 2 +DIGCF_DEVICEINTERFACE = 16 +INVALID_HANDLE_VALUE = 0 +ERROR_INSUFFICIENT_BUFFER = 122 +SPDRP_HARDWAREID = 1 +SPDRP_FRIENDLYNAME = 12 +SPDRP_LOCATION_PATHS = 35 +DICS_FLAG_GLOBAL = 1 +DIREG_DEV = 0x00000001 +KEY_READ = 0x20019 + +# workaround for compatibility between Python 2.x and 3.x +Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports" +PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName" + + +class WinInfo(object): + """Wrapper for device info""" + + def __init__(self, dev): + self.dev = dev + self.description = None + self.hwid = None + self.vid = None + self.pid = None + self.serial = None + self.location = None + + def describe(self): + """Get a human readable string""" + return self.description if self.description is not None else 'n/a' + + def hwinfo(self): + """Get a hardware description string""" + if self.vid is not None: + return 'USB VID:PID={}:{}{}{}'.format( + self.vid, + self.pid, + ' SER={}'.format(self.serial) if self.serial is not None else '', + ' LOCATION={}'.format(self.location) if self.location is not None else '', + ) + else: + return self.hwid + + def __eq__(self, other): + return self.dev == other.dev + + def __lt__(self, other): + return numsplit(self.dev) < numsplit(other.dev) + + def __getitem__(self, index): + """Item access: backwards compatible -> (port, desc, hwid)""" + if index == 0: + return self.dev + elif index == 1: + return self.describe() + elif index == 2: + return self.hwinfo() + else: + raise IndexError('{} > 2'.format(index)) + + +def comports(): + GUIDs = (GUID*8)() # so far only seen one used, so hope 8 are enough... + guids_size = DWORD() + if not SetupDiClassGuidsFromName( + Ports, + GUIDs, + ctypes.sizeof(GUIDs), + ctypes.byref(guids_size)): + raise ctypes.WinError() + + # repeat for all possible GUIDs + for index in range(guids_size.value): + g_hdi = SetupDiGetClassDevs( + ctypes.byref(GUIDs[index]), + None, + NULL, + DIGCF_PRESENT) # was DIGCF_PRESENT|DIGCF_DEVICEINTERFACE which misses CDC ports + + devinfo = SP_DEVINFO_DATA() + devinfo.cbSize = ctypes.sizeof(devinfo) + index = 0 + while SetupDiEnumDeviceInfo(g_hdi, index, ctypes.byref(devinfo)): + index += 1 + + # get the real com port name + hkey = SetupDiOpenDevRegKey( + g_hdi, + ctypes.byref(devinfo), + DICS_FLAG_GLOBAL, + 0, + DIREG_DEV, # DIREG_DRV for SW info + KEY_READ) + port_name_buffer = byte_buffer(250) + port_name_length = ULONG(ctypes.sizeof(port_name_buffer)) + RegQueryValueEx( + hkey, + PortName, + None, + None, + ctypes.byref(port_name_buffer), + ctypes.byref(port_name_length)) + RegCloseKey(hkey) + + # unfortunately does this method also include parallel ports. + # we could check for names starting with COM or just exclude LPT + # and hope that other "unknown" names are serial ports... + if string(port_name_buffer).startswith('LPT'): + continue + + # hardware ID + szHardwareID = byte_buffer(250) + # try to get ID that includes serial number + if not SetupDiGetDeviceInstanceId( + g_hdi, + ctypes.byref(devinfo), + ctypes.byref(szHardwareID), + ctypes.sizeof(szHardwareID) - 1, + None): + # fall back to more generic hardware ID if that would fail + if not SetupDiGetDeviceRegistryProperty( + g_hdi, + ctypes.byref(devinfo), + SPDRP_HARDWAREID, + None, + ctypes.byref(szHardwareID), + ctypes.sizeof(szHardwareID) - 1, + None): + # Ignore ERROR_INSUFFICIENT_BUFFER + if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + raise ctypes.WinError() + # stringify + szHardwareID_str = string(szHardwareID) + + info = WinInfo(string(port_name_buffer)) + info.hwid = szHardwareID_str + + # in case of USB, make a more readable string, similar to that form + # that we also generate on other platforms + if szHardwareID_str.startswith('USB'): + m = re.search(r'VID_([0-9a-f]{4})&PID_([0-9a-f]{4})(\\(\w+))?', szHardwareID_str, re.I) + if m: + info.vid = m.group(1) + info.pid = m.group(2) + if m.group(4): + info.serial = m.group(4) + # calculate a location string + # XXX was empty in tests with (internal) USB3 hub :( + loc_path_str = byte_buffer(250) + if SetupDiGetDeviceRegistryProperty( + g_hdi, + ctypes.byref(devinfo), + SPDRP_LOCATION_PATHS, + None, + ctypes.byref(loc_path_str), + ctypes.sizeof(loc_path_str) - 1, + None): + #~ print (string(loc_path_str)) + m = re.finditer(r'USBROOT\((\w+)\)|#USB\((\w+)\)', string(loc_path_str)) + location = [] + for g in m: + if g.group(1): + location.append('%d' % (int(g.group(1)) + 1)) + else: + if len(location) > 1: + location.append('.') + else: + location.append('-') + location.append(g.group(2)) + if location: + info.location = ''.join(location) + + # friendly name + szFriendlyName = byte_buffer(250) + if SetupDiGetDeviceRegistryProperty( + g_hdi, + ctypes.byref(devinfo), + SPDRP_FRIENDLYNAME, + #~ SPDRP_DEVICEDESC, + None, + ctypes.byref(szFriendlyName), + ctypes.sizeof(szFriendlyName) - 1, + None): + info.description = string(szFriendlyName) + #~ else: + # Ignore ERROR_INSUFFICIENT_BUFFER + #~ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + #~ raise IOError("failed to get details for %s (%s)" % (devinfo, szHardwareID.value)) + # ignore errors and still include the port in the list, friendly name will be same as port name + yield info + SetupDiDestroyDeviceInfoList(g_hdi) + + +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +# test +if __name__ == '__main__': + for port, desc, hwid in sorted(comports()): + print("%s: %s [%s]" % (port, desc, hwid)) |