diff options
-rw-r--r-- | serial/__init__.py | 21 | ||||
-rw-r--r-- | serial/aio.py | 3 | ||||
-rw-r--r-- | serial/rfc2217.py | 136 | ||||
-rw-r--r-- | serial/rs485.py | 8 | ||||
-rw-r--r-- | serial/serialposix.py | 227 | ||||
-rw-r--r-- | serial/serialutil.py | 92 | ||||
-rw-r--r-- | serial/serialwin32.py | 124 | ||||
-rw-r--r-- | serial/tools/hexlify_codec.py | 10 | ||||
-rw-r--r-- | serial/tools/list_ports.py | 23 | ||||
-rw-r--r-- | serial/tools/list_ports_linux.py | 9 | ||||
-rw-r--r-- | serial/tools/list_ports_posix.py | 4 | ||||
-rw-r--r-- | serial/tools/list_ports_windows.py | 40 | ||||
-rw-r--r-- | serial/tools/miniterm.py | 70 | ||||
-rw-r--r-- | serial/urlhandler/protocol_hwgrep.py | 3 | ||||
-rw-r--r-- | serial/urlhandler/protocol_loop.py | 36 | ||||
-rw-r--r-- | serial/urlhandler/protocol_socket.py | 38 | ||||
-rw-r--r-- | serial/urlhandler/protocol_spy.py | 1 | ||||
-rw-r--r-- | serial/win32.py | 107 |
18 files changed, 528 insertions, 424 deletions
diff --git a/serial/__init__.py b/serial/__init__.py index f40ed3e..6d4d6c7 100644 --- a/serial/__init__.py +++ b/serial/__init__.py @@ -7,22 +7,25 @@ # # SPDX-License-Identifier: BSD-3-Clause -VERSION = '3.0a' - import importlib import sys +from serial.serialutil import * +#~ SerialBase, SerialException, to_bytes, iterbytes + +VERSION = '3.0a' + if sys.platform == 'cli': - from serial.serialcli import * + from serial.serialcli import Serial else: import os # chose an implementation, depending on os - if os.name == 'nt': #sys.platform == 'win32': - from serial.serialwin32 import * + if os.name == 'nt': # sys.platform == 'win32': + from serial.serialwin32 import Serial elif os.name == 'posix': - from serial.serialposix import * + from serial.serialposix import Serial, PosixPollSerial elif os.name == 'java': - from serial.serialjava import * + from serial.serialjava import Serial else: raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,)) @@ -31,6 +34,7 @@ protocol_handler_packages = [ 'serial.urlhandler', ] + def serial_for_url(url, *args, **kwargs): """\ Get an instance of the Serial class, depending on port/url. The port is not @@ -48,7 +52,8 @@ def serial_for_url(url, *args, **kwargs): """ # check remove extra parameter to not confuse the Serial class do_open = 'do_not_open' not in kwargs or not kwargs['do_not_open'] - if 'do_not_open' in kwargs: del kwargs['do_not_open'] + if 'do_not_open' in kwargs: + del kwargs['do_not_open'] # the default is to use the native version klass = Serial # 'native' implementation # check port type and get class diff --git a/serial/aio.py b/serial/aio.py index 476959a..7e16545 100644 --- a/serial/aio.py +++ b/serial/aio.py @@ -17,6 +17,8 @@ implementation. It should be possible to get that working though. """ import asyncio import serial +import logger + class SerialTransport(asyncio.Transport): def __init__(self, loop, protocol, serial_instance): @@ -80,6 +82,7 @@ class SerialTransport(asyncio.Transport): #~ def write_eof(self): #~ def abort(self): + @asyncio.coroutine def create_serial_connection(loop, protocol_factory, *args, **kwargs): ser = serial.Serial(*args, **kwargs) diff --git a/serial/rfc2217.py b/serial/rfc2217.py index 24a090c..3ddc33d 100644 --- a/serial/rfc2217.py +++ b/serial/rfc2217.py @@ -17,9 +17,9 @@ # conditional # - write timeout not implemented at all -############################################################################## +# ########################################################################### # observations and issues with servers -#============================================================================= +# =========================================================================== # sredird V2.2.1 # - http://www.ibiblio.org/pub/Linux/system/serial/ sredird-2.2.2.tar.gz # - does not acknowledge SET_CONTROL (RTS/DTR) correctly, always responding @@ -28,11 +28,11 @@ # numbers than 2**32? # - To get the signature [COM_PORT_OPTION 0] has to be sent. # - run a server: while true; do nc -l -p 7000 -c "sredird debug /dev/ttyUSB0 /var/lock/sredir"; done -#============================================================================= +# =========================================================================== # telnetcpcd (untested) # - http://ftp.wayne.edu/kermit/sredird/telnetcpcd-1.09.tar.gz # - To get the signature [COM_PORT_OPTION] w/o data has to be sent. -#============================================================================= +# =========================================================================== # ser2net # - does not negotiate BINARY or COM_PORT_OPTION for his side but at least # acknowledges that the client activates these options @@ -44,7 +44,7 @@ # - To get the signature [COM_PORT_OPTION 0] has to be sent. # - run a server: run ser2net daemon, in /etc/ser2net.conf: # 2000:telnet:0:/dev/ttyS0:9600 remctl banner -############################################################################## +# ########################################################################### # How to identify ports? pySerial might want to support other protocols in the # future, so lets use an URL scheme. @@ -74,7 +74,8 @@ try: except ImportError: import queue as Queue -from serial.serialutil import * +import serial +from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, portNotOpenError # port string is expected to be something like this: # rfc2217://host:port @@ -91,22 +92,22 @@ LOGGER_LEVELS = { # telnet protocol characters -SE = b'\xf0' # Subnegotiation End +SE = b'\xf0' # Subnegotiation End NOP = b'\xf1' # No Operation -DM = b'\xf2' # Data Mark +DM = b'\xf2' # Data Mark BRK = b'\xf3' # Break -IP = b'\xf4' # Interrupt process -AO = b'\xf5' # Abort output +IP = b'\xf4' # Interrupt process +AO = b'\xf5' # Abort output AYT = b'\xf6' # Are You There -EC = b'\xf7' # Erase Character -EL = b'\xf8' # Erase Line -GA = b'\xf9' # Go Ahead -SB = b'\xfa' # Subnegotiation Begin +EC = b'\xf7' # Erase Character +EL = b'\xf8' # Erase Line +GA = b'\xf9' # Go Ahead +SB = b'\xfa' # Subnegotiation Begin WILL = b'\xfb' WONT = b'\xfc' -DO = b'\xfd' +DO = b'\xfd' DONT = b'\xfe' -IAC = b'\xff' # Interpret As Command +IAC = b'\xff' # Interpret As Command IAC_DOUBLED = b'\xff\xff' # selected telnet options @@ -204,20 +205,20 @@ PURGE_BOTH_BUFFERS = b'\x03' # Purge both the access server receive data RFC2217_PARITY_MAP = { - PARITY_NONE: 1, - PARITY_ODD: 2, - PARITY_EVEN: 3, - PARITY_MARK: 4, - PARITY_SPACE: 5, + serial.PARITY_NONE: 1, + serial.PARITY_ODD: 2, + serial.PARITY_EVEN: 3, + serial.PARITY_MARK: 4, + serial.PARITY_SPACE: 5, } -RFC2217_REVERSE_PARITY_MAP = dict((v,k) for k,v in RFC2217_PARITY_MAP.items()) +RFC2217_REVERSE_PARITY_MAP = dict((v, k) for k, v in RFC2217_PARITY_MAP.items()) RFC2217_STOPBIT_MAP = { - STOPBITS_ONE: 1, - STOPBITS_ONE_POINT_FIVE: 3, - STOPBITS_TWO: 2, + serial.STOPBITS_ONE: 1, + serial.STOPBITS_ONE_POINT_FIVE: 3, + serial.STOPBITS_TWO: 2, } -RFC2217_REVERSE_STOPBIT_MAP = dict((v,k) for k,v in RFC2217_STOPBIT_MAP.items()) +RFC2217_REVERSE_STOPBIT_MAP = dict((v, k) for k, v in RFC2217_STOPBIT_MAP.items()) # Telnet filter states M_NORMAL = 0 @@ -230,6 +231,7 @@ ACTIVE = 'ACTIVE' INACTIVE = 'INACTIVE' REALLY_INACTIVE = 'REALLY_INACTIVE' + class TelnetOption(object): """Manage a single telnet option, keeps track of DO/DONT WILL/WONT.""" @@ -306,7 +308,8 @@ class TelnetSubnegotiation(object): """ def __init__(self, connection, name, option, ack_option=None): - if ack_option is None: ack_option = option + if ack_option is None: + ack_option = option self.connection = connection self.name = name self.option = option @@ -395,7 +398,7 @@ class Serial(SerialBase): self._socket = None raise SerialException("Could not open port %s: %s" % (self.portstr, msg)) - self._socket.settimeout(5) # XXX good value? + self._socket.settimeout(5) # XXX good value? # use a thread save queue as buffer. it also simplifies implementing # the read timeout @@ -553,7 +556,8 @@ class Serial(SerialBase): raise ValueError('unknown option: %r' % (option,)) # get host and port host, port = parts.hostname, parts.port - if not 0 <= port < 65536: raise ValueError("port not in range 0...65535") + if not 0 <= port < 65536: + raise ValueError("port not in range 0...65535") except ValueError as e: raise SerialException('expected a string in the form "rfc2217://<host>:<port>[?option[&option...]]": %s' % e) return (host, port) @@ -563,7 +567,8 @@ class Serial(SerialBase): @property def in_waiting(self): """Return the number of bytes currently in the input buffer.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError return self._read_buffer.qsize() def read(self, size=1): @@ -572,14 +577,15 @@ class Serial(SerialBase): return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError data = bytearray() try: while len(data) < size: if self._thread is None: raise SerialException('connection failed (reader thread died)') data += self._read_buffer.get(True, self._timeout) - except Queue.Empty: # -> timeout + except Queue.Empty: # -> timeout pass return bytes(data) @@ -589,7 +595,8 @@ class Serial(SerialBase): connection is blocked. May raise SerialException if the connection is closed. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError with self._write_lock: try: self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED)) @@ -599,7 +606,8 @@ class Serial(SerialBase): def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError self.rfc2217SendPurge(PURGE_RECEIVE_BUFFER) # empty read buffer while self._read_buffer.qsize(): @@ -610,7 +618,8 @@ class Serial(SerialBase): Clear output buffer, aborting the current output and discarding all that is in the buffer. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError self.rfc2217SendPurge(PURGE_TRANSMIT_BUFFER) def _update_break_state(self): @@ -618,7 +627,8 @@ class Serial(SerialBase): Set break: Controls TXD. When active, to transmitting is possible. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('set BREAK to %s' % ('active' if self._break_state else 'inactive')) if self._break_state: @@ -628,7 +638,8 @@ class Serial(SerialBase): def _update_rts_state(self): """Set terminal status line: Request To Send.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('set RTS to %s' % ('active' if self._rts_state else 'inactive')) if self._rts_state: @@ -638,7 +649,8 @@ class Serial(SerialBase): def _update_dtr_state(self, level=True): """Set terminal status line: Data Terminal Ready.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('set DTR to %s' % ('active' if self._dtr_state else 'inactive')) if self._dtr_state: @@ -649,25 +661,29 @@ class Serial(SerialBase): @property def cts(self): """Read terminal status line: Clear To Send.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError return bool(self.getModemState() & MODEMSTATE_MASK_CTS) @property def dsr(self): """Read terminal status line: Data Set Ready.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError return bool(self.getModemState() & MODEMSTATE_MASK_DSR) @property def ri(self): """Read terminal status line: Ring Indicator.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError return bool(self.getModemState() & MODEMSTATE_MASK_RI) @property def cd(self): """Read terminal status line: Carrier Detect.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError return bool(self.getModemState() & MODEMSTATE_MASK_CD) # - - - platform specific - - - @@ -692,7 +708,8 @@ class Serial(SerialBase): if self.logger: self.logger.debug("socket error in reader thread: %s" % (e,)) break - if not data: break # lost connection + if not data: + break # lost connection for byte in iterbytes(data): if mode == M_NORMAL: # interpret as command or as data @@ -731,7 +748,7 @@ class Serial(SerialBase): # other telnet commands self._telnetProcessCommand(byte) mode = M_NORMAL - elif mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following + elif mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following self._telnetNegotiateOption(telnet_command, byte) mode = M_NORMAL finally: @@ -766,16 +783,15 @@ class Serial(SerialBase): if self.logger: self.logger.warning("rejected Telnet option: %r" % (option,)) - def _telnetProcessSubnegotiation(self, suboption): """Process subnegotiation, the data between IAC SB and IAC SE.""" if suboption[0:1] == COM_PORT_OPTION: if suboption[1:2] == SERVER_NOTIFY_LINESTATE and len(suboption) >= 3: - self._linestate = ord(suboption[2:3]) # ensure it is a number + self._linestate = ord(suboption[2:3]) # ensure it is a number if self.logger: self.logger.info("NOTIFY_LINESTATE: %s" % self._linestate) elif suboption[1:2] == SERVER_NOTIFY_MODEMSTATE and len(suboption) >= 3: - self._modemstate = ord(suboption[2:3]) # ensure it is a number + self._modemstate = ord(suboption[2:3]) # ensure it is a number if self.logger: self.logger.info("NOTIFY_MODEMSTATE: %s" % self._modemstate) # update time when we think that a poll would make sense @@ -815,12 +831,12 @@ class Serial(SerialBase): def rfc2217SendPurge(self, value): item = self._rfc2217_options['purge'] - item.set(value) # transmit desired purge type - item.wait(self._network_timeout) # wait for acknowledge from the server + item.set(value) # transmit desired purge type + item.wait(self._network_timeout) # wait for acknowledge from the server def rfc2217SetControl(self, value): item = self._rfc2217_options['control'] - item.set(value) # transmit desired control type + item.set(value) # transmit desired control type if self._ignore_set_control_answer: # answers are ignored when option is set. compatibility mode for # servers that answer, but not the expected one... (or no answer @@ -957,7 +973,7 @@ class PortManager(object): (self.serial.getRI() and MODEMSTATE_MASK_RI) | (self.serial.getCD() and MODEMSTATE_MASK_CD)) # check what has changed - deltas = modemstate ^ (self.last_modemstate or 0) # when last is None -> 0 + deltas = modemstate ^ (self.last_modemstate or 0) # when last is None -> 0 if deltas & MODEMSTATE_MASK_CTS: modemstate |= MODEMSTATE_MASK_CTS_CHANGE if deltas & MODEMSTATE_MASK_DSR: @@ -1053,7 +1069,7 @@ class PortManager(object): # other telnet commands self._telnetProcessCommand(byte) self.mode = M_NORMAL - elif self.mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following + elif self.mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following self._telnetNegotiateOption(self.telnet_command, byte) self.mode = M_NORMAL @@ -1084,7 +1100,6 @@ class PortManager(object): if self.logger: self.logger.warning("rejected Telnet option: %r" % (option,)) - def _telnetProcessSubnegotiation(self, suboption): """Process subnegotiation, the data between IAC SB and IAC SE.""" if suboption[0:1] == COM_PORT_OPTION: @@ -1179,7 +1194,7 @@ class PortManager(object): elif suboption[2:3] == SET_CONTROL_REQ_BREAK_STATE: if self.logger: self.logger.warning("requested break state - not implemented") - pass # XXX needs cached value + pass # XXX needs cached value elif suboption[2:3] == SET_CONTROL_BREAK_ON: self.serial.setBreak(True) if self.logger: @@ -1193,7 +1208,7 @@ class PortManager(object): elif suboption[2:3] == SET_CONTROL_REQ_DTR: if self.logger: self.logger.warning("requested DTR state - not implemented") - pass # XXX needs cached value + pass # XXX needs cached value elif suboption[2:3] == SET_CONTROL_DTR_ON: self.serial.setDTR(True) if self.logger: @@ -1207,7 +1222,7 @@ class PortManager(object): elif suboption[2:3] == SET_CONTROL_REQ_RTS: if self.logger: self.logger.warning("requested RTS state - not implemented") - pass # XXX needs cached value + pass # XXX needs cached value #~ self.rfc2217SendSubnegotiation(SERVER_SET_CONTROL, SET_CONTROL_RTS_ON) elif suboption[2:3] == SET_CONTROL_RTS_ON: self.serial.setRTS(True) @@ -1245,11 +1260,11 @@ class PortManager(object): self.logger.info("resume") self._remote_suspend_flow = False elif suboption[1:2] == SET_LINESTATE_MASK: - self.linstate_mask = ord(suboption[2:3]) # ensure it is a number + self.linstate_mask = ord(suboption[2:3]) # ensure it is a number if self.logger: self.logger.info("line state mask: 0x%02x" % (self.linstate_mask,)) elif suboption[1:2] == SET_MODEMSTATE_MASK: - self.modemstate_mask = ord(suboption[2:3]) # ensure it is a number + self.modemstate_mask = ord(suboption[2:3]) # ensure it is a number if self.logger: self.logger.info("modem state mask: 0x%02x" % (self.modemstate_mask,)) elif suboption[1:2] == PURGE_DATA: @@ -1286,13 +1301,8 @@ if __name__ == '__main__': s = Serial('rfc2217://localhost:7000', 115200) sys.stdout.write('%s\n' % s) - #~ s.baudrate = 1898 - sys.stdout.write("write...\n") s.write(b"hello\n") s.flush() sys.stdout.write("read: %s\n" % s.read(5)) - - #~ s.baudrate = 19200 - #~ s.databits = 7 s.close() diff --git a/serial/rs485.py b/serial/rs485.py index 6f2145b..1a2d4d4 100644 --- a/serial/rs485.py +++ b/serial/rs485.py @@ -15,8 +15,10 @@ NOTE: Some implementations may only support a subset of the settings. import time import serial + class RS485Settings(object): - def __init__(self, + def __init__( + self, rts_level_for_tx=True, rts_level_for_rx=False, loopback=False, @@ -57,7 +59,6 @@ class RS485(serial.Serial): super(RS485, self).__init__(*args, **kwargs) self._alternate_rs485_settings = None - def write(self, b): """Write to port, controlling RTS before and after transmitting.""" if self._alternate_rs485_settings is not None: @@ -75,7 +76,6 @@ class RS485(serial.Serial): else: super(RS485, self).write(b) - # redirect where the property stores the settings so that underlying Serial # instance does not see them @property @@ -89,5 +89,3 @@ class RS485(serial.Serial): @rs485_mode.setter def rs485_mode(self, rs485_settings): self._alternate_rs485_settings = rs485_settings - - diff --git a/serial/serialposix.py b/serial/serialposix.py index 79eaa66..9064c6a 100644 --- a/serial/serialposix.py +++ b/serial/serialposix.py @@ -21,7 +21,9 @@ import struct import sys import termios import time -from serial.serialutil import * + +import serial +from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError class PlatformSpecificBase(object): @@ -41,7 +43,7 @@ also add the device name of the serial port and where the counting starts for the first serial port. e.g. 'first serial port: /dev/ttyS0' and with a bit luck you can get this module running... -""" % (sys.platform, os.name, VERSION)) +""" % (sys.platform, os.name, serial.VERSION)) raise NotImplementedError('no number-to-device mapping defined on this platform') def _set_special_baudrate(self, baudrate): @@ -55,7 +57,7 @@ and with a bit luck you can get this module running... # for the platform plat = sys.platform.lower() -if plat[:5] == 'linux': # Linux (confirmed) +if plat[:5] == 'linux': # Linux (confirmed) import array # baudrate ioctls @@ -66,11 +68,10 @@ if plat[:5] == 'linux': # Linux (confirmed) # RS485 ioctls TIOCGRS485 = 0x542E TIOCSRS485 = 0x542F - SER_RS485_ENABLED = 0b00000001 - SER_RS485_RTS_ON_SEND = 0b00000010 + SER_RS485_ENABLED = 0b00000001 + SER_RS485_RTS_ON_SEND = 0b00000010 SER_RS485_RTS_AFTER_SEND = 0b00000100 - SER_RS485_RX_DURING_TX = 0b00010000 - + SER_RS485_RX_DURING_TX = 0b00010000 class PlatformSpecific(PlatformSpecificBase): BAUDRATE_CONSTANTS = { @@ -122,12 +123,12 @@ if plat[:5] == 'linux': # Linux (confirmed) buf[9] = buf[10] = baudrate # set serial_struct - res = fcntl.ioctl(self.fd, TCSETS2, buf) + fcntl.ioctl(self.fd, TCSETS2, buf) except IOError as e: raise ValueError('Failed to set custom baud rate (%s): %s' % (baudrate, e)) def _set_rs485_mode(self, rs485_settings): - buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding + buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding try: fcntl.ioctl(self.fd, TIOCGRS485, buf) if rs485_settings is not None: @@ -147,7 +148,7 @@ if plat[:5] == 'linux': # Linux (confirmed) buf[2] = int(rs485_settings.delay_rts_after_send * 1000) else: buf[0] = 0 # clear SER_RS485_ENABLED - res = fcntl.ioctl(self.fd, TIOCSRS485, buf) + fcntl.ioctl(self.fd, TIOCSRS485, buf) except IOError as e: raise ValueError('Failed to set RS485 mode: %s' % (e,)) @@ -185,7 +186,7 @@ elif plat[:3] == 'bsd' or plat[:7] == 'freebsd': elif plat[:6] == 'darwin': # OS X import array - IOSSIOSPEED = 0x80045402 #_IOW('T', 2, speed_t) + IOSSIOSPEED = 0x80045402 # _IOW('T', 2, speed_t) class PlatformSpecific(PlatformSpecificBase): def number_to_device(self, port_number): @@ -208,7 +209,7 @@ elif plat[:6] == 'netbsd': # NetBSD 1.6 testing by Erk elif plat[:4] == 'irix': # IRIX (partially tested) class PlatformSpecific(PlatformSpecificBase): def number_to_device(self, port_number): - return '/dev/ttyf%d' % (port_number + 1,) #XXX different device names depending on flow control + return '/dev/ttyf%d' % (port_number + 1,) # XXX different device names depending on flow control elif plat[:2] == 'hp': # HP-UX (not tested) class PlatformSpecific(PlatformSpecificBase): @@ -235,44 +236,44 @@ else: # load some constants for later use. # try to use values from termios, use defaults from linux otherwise -TIOCMGET = getattr(termios, 'TIOCMGET', 0x5415) -TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416) -TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417) -TIOCMSET = getattr(termios, 'TIOCMSET', 0x5418) +TIOCMGET = getattr(termios, 'TIOCMGET', 0x5415) +TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416) +TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417) +TIOCMSET = getattr(termios, 'TIOCMSET', 0x5418) -#TIOCM_LE = getattr(termios, 'TIOCM_LE', 0x001) +# TIOCM_LE = getattr(termios, 'TIOCM_LE', 0x001) TIOCM_DTR = getattr(termios, 'TIOCM_DTR', 0x002) TIOCM_RTS = getattr(termios, 'TIOCM_RTS', 0x004) -#TIOCM_ST = getattr(termios, 'TIOCM_ST', 0x008) -#TIOCM_SR = getattr(termios, 'TIOCM_SR', 0x010) +# TIOCM_ST = getattr(termios, 'TIOCM_ST', 0x008) +# TIOCM_SR = getattr(termios, 'TIOCM_SR', 0x010) TIOCM_CTS = getattr(termios, 'TIOCM_CTS', 0x020) TIOCM_CAR = getattr(termios, 'TIOCM_CAR', 0x040) TIOCM_RNG = getattr(termios, 'TIOCM_RNG', 0x080) TIOCM_DSR = getattr(termios, 'TIOCM_DSR', 0x100) -TIOCM_CD = getattr(termios, 'TIOCM_CD', TIOCM_CAR) -TIOCM_RI = getattr(termios, 'TIOCM_RI', TIOCM_RNG) -#TIOCM_OUT1 = getattr(termios, 'TIOCM_OUT1', 0x2000) -#TIOCM_OUT2 = getattr(termios, 'TIOCM_OUT2', 0x4000) +TIOCM_CD = getattr(termios, 'TIOCM_CD', TIOCM_CAR) +TIOCM_RI = getattr(termios, 'TIOCM_RI', TIOCM_RNG) +# TIOCM_OUT1 = getattr(termios, 'TIOCM_OUT1', 0x2000) +# TIOCM_OUT2 = getattr(termios, 'TIOCM_OUT2', 0x4000) if hasattr(termios, 'TIOCINQ'): TIOCINQ = termios.TIOCINQ else: TIOCINQ = getattr(termios, 'FIONREAD', 0x541B) -TIOCOUTQ = getattr(termios, 'TIOCOUTQ', 0x5411) +TIOCOUTQ = getattr(termios, 'TIOCOUTQ', 0x5411) TIOCM_zero_str = struct.pack('I', 0) TIOCM_RTS_str = struct.pack('I', TIOCM_RTS) TIOCM_DTR_str = struct.pack('I', TIOCM_DTR) -TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427) -TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428) +TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427) +TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428) -CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity +CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity class Serial(SerialBase, PlatformSpecific): """\ - Serial port class POSIX implementation. Serial port configuration is + Serial port class POSIX implementation. Serial port configuration is done with termios and fcntl. Runs on Linux and many other Un*x like systems. """ @@ -288,7 +289,7 @@ class Serial(SerialBase, PlatformSpecific): self.fd = None # open try: - self.fd = os.open(self.portstr, os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK) + self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None raise SerialException(msg.errno, "could not open port %s: %s" % (self._port, msg)) @@ -313,7 +314,6 @@ class Serial(SerialBase, PlatformSpecific): self._update_rts_state() self.reset_input_buffer() - def _reconfigure_port(self): """Set communication parameters on opened port.""" if self.fd is None: @@ -330,15 +330,16 @@ class Serial(SerialBase, PlatformSpecific): except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here raise SerialException("Could not configure port: %s" % msg) # set up raw mode / no echo / binary - cflag |= (termios.CLOCAL|termios.CREAD) - lflag &= ~(termios.ICANON|termios.ECHO|termios.ECHOE|termios.ECHOK|termios.ECHONL| - termios.ISIG|termios.IEXTEN) #|termios.ECHOPRT - for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk + cflag |= (termios.CLOCAL | termios.CREAD) + lflag &= ~(termios.ICANON | termios.ECHO | termios.ECHOE | + termios.ECHOK | termios.ECHONL | + termios.ISIG | termios.IEXTEN) # |termios.ECHOPRT + for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk if hasattr(termios, flag): lflag &= ~getattr(termios, flag) - oflag &= ~(termios.OPOST|termios.ONLCR|termios.OCRNL) - iflag &= ~(termios.INLCR|termios.IGNCR|termios.ICRNL|termios.IGNBRK) + oflag &= ~(termios.OPOST | termios.ONLCR | termios.OCRNL) + iflag &= ~(termios.INLCR | termios.IGNCR | termios.ICRNL | termios.IGNBRK) if hasattr(termios, 'IUCLC'): iflag &= ~termios.IUCLC if hasattr(termios, 'PARMRK'): @@ -355,7 +356,7 @@ class Serial(SerialBase, PlatformSpecific): # may need custom baud rate, it isn't in our list. ispeed = ospeed = getattr(termios, 'B38400') try: - custom_baud = int(self._baudrate) # store for later + custom_baud = int(self._baudrate) # store for later except ValueError: raise ValueError('Invalid baud rate: %r' % self._baudrate) else: @@ -375,27 +376,27 @@ class Serial(SerialBase, PlatformSpecific): else: raise ValueError('Invalid char len: %r' % self._bytesize) # setup stop bits - if self._stopbits == STOPBITS_ONE: + if self._stopbits == serial.STOPBITS_ONE: cflag &= ~(termios.CSTOPB) - elif self._stopbits == STOPBITS_ONE_POINT_FIVE: - cflag |= (termios.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5 - elif self._stopbits == STOPBITS_TWO: - cflag |= (termios.CSTOPB) + elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE: + cflag |= (termios.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5 + elif self._stopbits == serial.STOPBITS_TWO: + cflag |= (termios.CSTOPB) else: raise ValueError('Invalid stop bit specification: %r' % self._stopbits) # setup parity - iflag &= ~(termios.INPCK|termios.ISTRIP) - if self._parity == PARITY_NONE: - cflag &= ~(termios.PARENB|termios.PARODD) - elif self._parity == PARITY_EVEN: + iflag &= ~(termios.INPCK | termios.ISTRIP) + if self._parity == serial.PARITY_NONE: + cflag &= ~(termios.PARENB | termios.PARODD) + elif self._parity == serial.PARITY_EVEN: cflag &= ~(termios.PARODD) - cflag |= (termios.PARENB) - elif self._parity == PARITY_ODD: - cflag |= (termios.PARENB|termios.PARODD) - elif self._parity == PARITY_MARK and plat[:5] == 'linux': - cflag |= (termios.PARENB|CMSPAR|termios.PARODD) - elif self._parity == PARITY_SPACE and plat[:5] == 'linux': - cflag |= (termios.PARENB|CMSPAR) + cflag |= (termios.PARENB) + elif self._parity == serial.PARITY_ODD: + cflag |= (termios.PARENB | termios.PARODD) + elif self._parity == serial.PARITY_MARK and plat[:5] == 'linux': + cflag |= (termios.PARENB | CMSPAR | termios.PARODD) + elif self._parity == serial.PARITY_SPACE and plat[:5] == 'linux': + cflag |= (termios.PARENB | CMSPAR) cflag &= ~(termios.PARODD) else: raise ValueError('Invalid parity: %r' % self._parity) @@ -403,23 +404,23 @@ class Serial(SerialBase, PlatformSpecific): # xonxoff if hasattr(termios, 'IXANY'): if self._xonxoff: - iflag |= (termios.IXON|termios.IXOFF) #|termios.IXANY) + iflag |= (termios.IXON | termios.IXOFF) # |termios.IXANY) else: - iflag &= ~(termios.IXON|termios.IXOFF|termios.IXANY) + iflag &= ~(termios.IXON | termios.IXOFF | termios.IXANY) else: if self._xonxoff: - iflag |= (termios.IXON|termios.IXOFF) + iflag |= (termios.IXON | termios.IXOFF) else: - iflag &= ~(termios.IXON|termios.IXOFF) + iflag &= ~(termios.IXON | termios.IXOFF) # rtscts if hasattr(termios, 'CRTSCTS'): if self._rtscts: - cflag |= (termios.CRTSCTS) + cflag |= (termios.CRTSCTS) else: cflag &= ~(termios.CRTSCTS) elif hasattr(termios, 'CNEW_RTSCTS'): # try it with alternate constant name if self._rtscts: - cflag |= (termios.CNEW_RTSCTS) + cflag |= (termios.CNEW_RTSCTS) else: cflag &= ~(termios.CNEW_RTSCTS) # XXX should there be a warning if setting up rtscts (and xonxoff etc) fails?? @@ -435,7 +436,10 @@ class Serial(SerialBase, PlatformSpecific): cc[termios.VTIME] = vtime # activate settings if [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] != orig_attr: - termios.tcsetattr(self.fd, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]) + termios.tcsetattr( + self.fd, + termios.TCSANOW, + [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]) # apply custom baud rate, if any if custom_baud is not None: @@ -459,7 +463,7 @@ class Serial(SerialBase, PlatformSpecific): """Return the number of bytes currently in the input buffer.""" #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str) s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str) - return struct.unpack('I',s)[0] + return struct.unpack('I', s)[0] # select based implementation, proved to work on many systems def read(self, size=1): @@ -468,18 +472,19 @@ class Serial(SerialBase, PlatformSpecific): return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError read = bytearray() while len(read) < size: try: - ready,_,_ = select.select([self.fd],[],[], self._timeout) + ready, _, _ = select.select([self.fd], [], [], self._timeout) # If select was used with a timeout, and the timeout occurs, it # returns with empty lists -> thus abort read operation. - # For timeout == 0 (non-blocking operation) also abort when there - # is nothing to read. + # For timeout == 0 (non-blocking operation) also abort when + # there is nothing to read. if not ready: break # timeout - buf = os.read(self.fd, size-len(read)) + buf = os.read(self.fd, size - len(read)) # read should always return some data as select reported it was # ready to read when we get to this point. if not buf: @@ -489,8 +494,8 @@ class Serial(SerialBase, PlatformSpecific): raise SerialException('device reports readiness to read but returned no data (device disconnected or multiple access on port?)') read.extend(buf) except OSError as e: - # this is for Python 3.x where select.error is a subclass of OSError - # ignore EAGAIN errors. all other errors are shown + # this is for Python 3.x where select.error is a subclass of + # OSError ignore EAGAIN errors. all other errors are shown if e.errno != errno.EAGAIN: raise SerialException('read failed: %s' % (e,)) except select.error as e: @@ -503,7 +508,8 @@ class Serial(SerialBase, PlatformSpecific): def write(self, data): """Output the given byte string over the serial port.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError d = to_bytes(data) tx_len = len(d) if self._write_timeout is not None and self._write_timeout > 0: @@ -541,12 +547,14 @@ class Serial(SerialBase, PlatformSpecific): Flush of file like objects. In this case, wait until all data is written. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError termios.tcdrain(self.fd) def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError termios.tcflush(self.fd, termios.TCIFLUSH) def reset_output_buffer(self): @@ -554,7 +562,8 @@ class Serial(SerialBase, PlatformSpecific): Clear output buffer, aborting the current output and discarding all that is in the buffer. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError termios.tcflush(self.fd, termios.TCOFLUSH) def send_break(self, duration=0.25): @@ -562,8 +571,9 @@ class Serial(SerialBase, PlatformSpecific): Send break condition. Timed, returns to idle state after given duration. """ - if not self.is_open: raise portNotOpenError - termios.tcsendbreak(self.fd, int(duration/0.25)) + if not self.is_open: + raise portNotOpenError + termios.tcsendbreak(self.fd, int(duration / 0.25)) def _update_break_state(self): """\ @@ -591,30 +601,34 @@ class Serial(SerialBase, PlatformSpecific): @property def cts(self): """Read terminal status line: Clear To Send""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_CTS != 0 + return struct.unpack('I', s)[0] & TIOCM_CTS != 0 @property def dsr(self): """Read terminal status line: Data Set Ready""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_DSR != 0 + return struct.unpack('I', s)[0] & TIOCM_DSR != 0 @property def ri(self): """Read terminal status line: Ring Indicator""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_RI != 0 + return struct.unpack('I', s)[0] & TIOCM_RI != 0 @property def cd(self): """Read terminal status line: Carrier Detect""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_CD != 0 + return struct.unpack('I', s)[0] & TIOCM_CD != 0 # - - platform specific - - - - @@ -623,11 +637,12 @@ class Serial(SerialBase, PlatformSpecific): """Return the number of bytes currently in the output buffer.""" #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str) s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str) - return struct.unpack('I',s)[0] + return struct.unpack('I', s)[0] def nonblocking(self): """internal - not portable!""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError fcntl.fcntl(self.fd, fcntl.F_SETFL, os.O_NONBLOCK) def fileno(self): @@ -635,7 +650,8 @@ class Serial(SerialBase, PlatformSpecific): For easier use of the serial port instance with select. WARNING: this function is not portable to different platforms! """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError return self.fd def set_input_flow_control(self, enable=True): @@ -644,7 +660,8 @@ class Serial(SerialBase, PlatformSpecific): This will send XON (true) or XOFF (false) to the other device. WARNING: this function is not portable to different platforms! """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if enable: termios.tcflow(self.fd, termios.TCION) else: @@ -656,14 +673,14 @@ class Serial(SerialBase, PlatformSpecific): control is enabled. WARNING: this function is not portable to different platforms! """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if enable: termios.tcflow(self.fd, termios.TCOON) else: termios.tcflow(self.fd, termios.TCOOFF) - class PosixPollSerial(Serial): """\ Poll based read implementation. Not all systems support poll properly. @@ -677,43 +694,43 @@ class PosixPollSerial(Serial): return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ - if self.fd is None: raise portNotOpenError + if self.fd is None: + raise portNotOpenError read = bytearray() poll = select.poll() - poll.register(self.fd, select.POLLIN|select.POLLERR|select.POLLHUP|select.POLLNVAL) + poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) if size > 0: while len(read) < size: # print "\tread(): size",size, "have", len(read) #debug # wait until device becomes ready to read (or something fails) - for fd, event in poll.poll(self._timeout*1000): - if event & (select.POLLERR|select.POLLHUP|select.POLLNVAL): + for fd, event in poll.poll(self._timeout * 1000): + if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL): raise SerialException('device reports error (poll)') # we don't care if it is select.POLLIN or timeout, that's # handled below buf = os.read(self.fd, size - len(read)) read.extend(buf) if ((self._timeout is not None and self._timeout >= 0) or - (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf: + (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf: break # early abort on timeout return bytes(read) if __name__ == '__main__': s = Serial(0, - baudrate=19200, # baud rate - bytesize=EIGHTBITS, # number of data bits - parity=PARITY_EVEN, # enable parity checking - stopbits=STOPBITS_ONE, # number of stop bits - timeout=3, # set a timeout value, None for waiting forever - xonxoff=0, # enable software flow control - rtscts=0, # enable RTS/CTS flow control + baudrate=19200, # baud rate + bytesize=serial.EIGHTBITS, # number of data bits + parity=serial.PARITY_EVEN, # enable parity checking + stopbits=serial.STOPBITS_ONE, # number of stop bits + timeout=3, # set a timeout value, None for waiting forever + xonxoff=0, # enable software flow control + rtscts=0, # enable RTS/CTS flow control ) - s.setRTS(1) - s.setDTR(1) - s.flushInput() - s.flushOutput() + s.rts = True + s.dtr = True + s.reset_input_buffer() + s.reset_output_buffer() s.write('hello') sys.stdout.write('%r\n' % s.read(5)) sys.stdout.write('%s\n' % s.inWaiting()) del s - diff --git a/serial/serialutil.py b/serial/serialutil.py index 26e6059..ffc50a0 100644 --- a/serial/serialutil.py +++ b/serial/serialutil.py @@ -29,13 +29,14 @@ def iterbytes(b): b = b.tobytes() x = 0 while True: - a = b[x:x+1] + a = b[x:x + 1] x += 1 if a: yield a else: break + # all Python versions prior 3.x convert ``str([17])`` to '[17]' instead of '\x11' # so a simple ``bytes(sequence)`` doesn't work for all versions def to_bytes(seq): @@ -69,10 +70,10 @@ STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2) FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8) PARITY_NAMES = { - PARITY_NONE: 'None', - PARITY_EVEN: 'Even', - PARITY_ODD: 'Odd', - PARITY_MARK: 'Mark', + PARITY_NONE: 'None', + PARITY_EVEN: 'Even', + PARITY_ODD: 'Odd', + PARITY_MARK: 'Mark', PARITY_SPACE: 'Space', } @@ -105,7 +106,7 @@ class SerialBase(io.RawIOBase): STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO) def __init__(self, - port = None, # number of device, numbering starts at + port=None, # number of device, numbering starts at # zero. if everything fails, the user # can specify a device string, note # that this isn't portable anymore @@ -113,13 +114,13 @@ class SerialBase(io.RawIOBase): baudrate=9600, # baud rate bytesize=EIGHTBITS, # number of data bits parity=PARITY_NONE, # enable parity checking - stopbits=STOPBITS_ONE, # number of stop bits + stopbits=STOPBITS_ONE, # number of stop bits timeout=None, # set a timeout value, None to wait forever xonxoff=False, # enable software flow control rtscts=False, # enable RTS/CTS flow control write_timeout=None, # set a timeout for writes dsrdtr=False, # None: use rtscts setting, dsrdtr override if True or False - inter_byte_timeout=None # Inter-character timeout, None to disable + inter_byte_timeout=None # Inter-character timeout, None to disable ): """\ Initialize comm port object. If a port is given, then the port will be @@ -127,34 +128,34 @@ class SerialBase(io.RawIOBase): is returned. """ - self.is_open = False - self._port = None # correct value is assigned below through properties - self._baudrate = None # correct value is assigned below through properties - self._bytesize = None # correct value is assigned below through properties - self._parity = None # correct value is assigned below through properties - self._stopbits = None # correct value is assigned below through properties - self._timeout = None # correct value is assigned below through properties - self._write_timeout = None # correct value is assigned below through properties - self._xonxoff = None # correct value is assigned below through properties - self._rtscts = None # correct value is assigned below through properties - self._dsrdtr = None # correct value is assigned below through properties - self._inter_byte_timeout = None # correct value is assigned below through properties - self._rs485_mode = None # disabled by default + self.is_open = False + self._port = None # correct value is assigned below through properties + self._baudrate = None # correct value is assigned below through properties + self._bytesize = None # correct value is assigned below through properties + self._parity = None # correct value is assigned below through properties + self._stopbits = None # correct value is assigned below through properties + self._timeout = None # correct value is assigned below through properties + self._write_timeout = None # correct value is assigned below through properties + self._xonxoff = None # correct value is assigned below through properties + self._rtscts = None # correct value is assigned below through properties + self._dsrdtr = None # correct value is assigned below through properties + self._inter_byte_timeout = None # correct value is assigned below through properties + self._rs485_mode = None # disabled by default self._rts_state = True self._dtr_state = True self._break_state = False # assign values using get/set methods using the properties feature - self.port = port + self.port = port self.baudrate = baudrate self.bytesize = bytesize - self.parity = parity + self.parity = parity self.stopbits = stopbits - self.timeout = timeout + self.timeout = timeout self.write_timeout = write_timeout - self.xonxoff = xonxoff - self.rtscts = rtscts - self.dsrdtr = dsrdtr + self.xonxoff = xonxoff + self.rtscts = rtscts + self.dsrdtr = dsrdtr self.inter_character_timeout = inter_byte_timeout if port is not None: @@ -220,7 +221,8 @@ class SerialBase(io.RawIOBase): @bytesize.setter def bytesize(self, bytesize): """Change byte size.""" - if bytesize not in self.BYTESIZES: raise ValueError("Not a valid byte size: %r" % (bytesize,)) + if bytesize not in self.BYTESIZES: + raise ValueError("Not a valid byte size: %r" % (bytesize,)) self._bytesize = bytesize if self.is_open: self._reconfigure_port() @@ -235,7 +237,8 @@ class SerialBase(io.RawIOBase): @parity.setter def parity(self, parity): """Change parity setting.""" - if parity not in self.PARITIES: raise ValueError("Not a valid parity: %r" % (parity,)) + if parity not in self.PARITIES: + raise ValueError("Not a valid parity: %r" % (parity,)) self._parity = parity if self.is_open: self._reconfigure_port() @@ -250,7 +253,8 @@ class SerialBase(io.RawIOBase): @stopbits.setter def stopbits(self, stopbits): """Change stop bits size.""" - if stopbits not in self.STOPBITS: raise ValueError("Not a valid stop bit size: %r" % (stopbits,)) + if stopbits not in self.STOPBITS: + raise ValueError("Not a valid stop bit size: %r" % (stopbits,)) self._stopbits = stopbits if self.is_open: self._reconfigure_port() @@ -284,9 +288,10 @@ class SerialBase(io.RawIOBase): def write_timeout(self, timeout): """Change timeout setting.""" if timeout is not None: - if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,)) + if timeout < 0: + raise ValueError("Not a valid timeout: %r" % (timeout,)) try: - timeout + 1 #test if it's a number, will throw a TypeError if not... + timeout + 1 # test if it's a number, will throw a TypeError if not... except TypeError: raise ValueError("Not a valid timeout: %r" % timeout) @@ -304,7 +309,8 @@ class SerialBase(io.RawIOBase): def inter_byte_timeout(self, ic_timeout): """Change inter-byte timeout setting.""" if ic_timeout is not None: - if ic_timeout < 0: raise ValueError("Not a valid timeout: %r" % ic_timeout) + if ic_timeout < 0: + raise ValueError("Not a valid timeout: %r" % ic_timeout) try: ic_timeout + 1 # test if it's a number, will throw a TypeError if not... except TypeError: @@ -409,14 +415,15 @@ class SerialBase(io.RawIOBase): # - - - - - - - - - - - - - - - - - - - - - - - - _SAVED_SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff', - 'dsrdtr', 'rtscts', 'timeout', 'write_timeout', 'inter_character_timeout') + 'dsrdtr', 'rtscts', 'timeout', 'write_timeout', + 'inter_character_timeout') def get_settings(self): """\ Get current port settings as a dictionary. For use with apply_settings(). """ - return dict([(key, getattr(self, '_'+key)) for key in self._SAVED_SETTINGS]) + return dict([(key, getattr(self, '_' + key)) for key in self._SAVED_SETTINGS]) def apply_settings(self, d): """\ @@ -425,7 +432,7 @@ class SerialBase(io.RawIOBase): values will simply left unchanged. """ for key in self._SAVED_SETTINGS: - if key in d and d[key] != getattr(self, '_'+key): # check against internal "_" value + if key in d and d[key] != getattr(self, '_' + key): # check against internal "_" value setattr(self, key, d[key]) # set non "_" value to use properties write function # - - - - - - - - - - - - - - - - - - - - - - - - @@ -451,9 +458,15 @@ class SerialBase(io.RawIOBase): # - - - - - - - - - - - - - - - - - - - - - - - - # compatibility with io library - def readable(self): return True - def writable(self): return True - def seekable(self): return False + def readable(self): + return True + + def writable(self): + return True + + def seekable(self): + return False + def readinto(self, b): data = self.read(len(b)) n = len(data) @@ -568,7 +581,6 @@ class SerialBase(io.RawIOBase): break return bytes(line) - def iread_until(self, *args, **kwargs): """\ Read lines, implemented as generator. It will raise StopIteration on diff --git a/serial/serialwin32.py b/serial/serialwin32.py index 9422373..0a736e7 100644 --- a/serial/serialwin32.py +++ b/serial/serialwin32.py @@ -13,7 +13,8 @@ import ctypes import time from serial import win32 -from serial.serialutil import * +import serial +from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError class Serial(SerialBase): @@ -48,13 +49,14 @@ class Serial(SerialBase): except ValueError: # for like COMnotanumber pass - self._port_handle = win32.CreateFile(port, - win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, # exclusive access - None, # no security - win32.OPEN_EXISTING, - win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED, - 0) + self._port_handle = win32.CreateFile( + port, + win32.GENERIC_READ | win32.GENERIC_WRITE, + 0, # exclusive access + None, # no security + win32.OPEN_EXISTING, + win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED, + 0) if self._port_handle == win32.INVALID_HANDLE_VALUE: self._port_handle = None # 'cause __del__ is called anyway raise SerialException("could not open port %r: %r" % (self.portstr, ctypes.WinError())) @@ -77,7 +79,8 @@ class Serial(SerialBase): # Clear buffers: # Remove anything that was there - win32.PurgeComm(self._port_handle, + win32.PurgeComm( + self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT | win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) except: @@ -92,7 +95,6 @@ class Serial(SerialBase): else: self.is_open = True - def _reconfigure_port(self): """Set communication parameters on opened port.""" if not self._port_handle: @@ -108,7 +110,7 @@ class Serial(SerialBase): elif self._timeout == 0: timeouts = (win32.MAXDWORD, 0, 0, 0, 0) else: - timeouts = (0, 0, int(self._timeout*1000), 0, 0) + timeouts = (0, 0, int(self._timeout * 1000), 0, 0) if self._timeout != 0 and self._inter_byte_timeout is not None: timeouts = (int(self._inter_byte_timeout * 1000),) + timeouts[1:] @@ -117,7 +119,7 @@ class Serial(SerialBase): elif self._write_timeout == 0: timeouts = timeouts[:-2] + (0, win32.MAXDWORD) else: - timeouts = timeouts[:-2] + (0, int(self._write_timeout*1000)) + timeouts = timeouts[:-2] + (0, int(self._write_timeout * 1000)) win32.SetCommTimeouts(self._port_handle, ctypes.byref(win32.COMMTIMEOUTS(*timeouts))) win32.SetCommMask(self._port_handle, win32.EV_ERR) @@ -128,45 +130,45 @@ class Serial(SerialBase): win32.GetCommState(self._port_handle, ctypes.byref(comDCB)) comDCB.BaudRate = self._baudrate - if self._bytesize == FIVEBITS: + if self._bytesize == serial.FIVEBITS: comDCB.ByteSize = 5 - elif self._bytesize == SIXBITS: + elif self._bytesize == serial.SIXBITS: comDCB.ByteSize = 6 - elif self._bytesize == SEVENBITS: + elif self._bytesize == serial.SEVENBITS: comDCB.ByteSize = 7 - elif self._bytesize == EIGHTBITS: + elif self._bytesize == serial.EIGHTBITS: comDCB.ByteSize = 8 else: raise ValueError("Unsupported number of data bits: %r" % self._bytesize) - if self._parity == PARITY_NONE: + if self._parity == serial.PARITY_NONE: comDCB.Parity = win32.NOPARITY - comDCB.fParity = 0 # Disable Parity Check - elif self._parity == PARITY_EVEN: + comDCB.fParity = 0 # Disable Parity Check + elif self._parity == serial.PARITY_EVEN: comDCB.Parity = win32.EVENPARITY - comDCB.fParity = 1 # Enable Parity Check - elif self._parity == PARITY_ODD: + comDCB.fParity = 1 # Enable Parity Check + elif self._parity == serial.PARITY_ODD: comDCB.Parity = win32.ODDPARITY - comDCB.fParity = 1 # Enable Parity Check - elif self._parity == PARITY_MARK: + comDCB.fParity = 1 # Enable Parity Check + elif self._parity == serial.PARITY_MARK: comDCB.Parity = win32.MARKPARITY - comDCB.fParity = 1 # Enable Parity Check - elif self._parity == PARITY_SPACE: + comDCB.fParity = 1 # Enable Parity Check + elif self._parity == serial.PARITY_SPACE: comDCB.Parity = win32.SPACEPARITY - comDCB.fParity = 1 # Enable Parity Check + comDCB.fParity = 1 # Enable Parity Check else: raise ValueError("Unsupported parity mode: %r" % self._parity) - if self._stopbits == STOPBITS_ONE: - comDCB.StopBits win32.ONESTOPBIT - elif self._stopbits == STOPBITS_ONE_POINT_FIVE: + if self._stopbits == serial.STOPBITS_ONE: + comDCB.StopBits = win32.ONESTOPBIT + elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE: comDCB.StopBits = win32.ONE5STOPBITS - elif self._stopbits == STOPBITS_TWO: + elif self._stopbits == serial.STOPBITS_TWO: comDCB.StopBits = win32.TWOSTOPBITS else: raise ValueError("Unsupported number of stop bits: %r" % self._stopbits) - comDCB.fBinary = 1 # Enable Binary Transmission + comDCB.fBinary = 1 # Enable Binary Transmission # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE) if self._rs485_mode is None: if self._rtscts: @@ -180,38 +182,38 @@ class Serial(SerialBase): if not self._rs485_mode.rts_level_for_tx: raise ValueError( 'Unsupported value for RS485Settings.rts_level_for_tx: %r' % ( - self._rs485_mode.rts_level_for_tx,)) + self._rs485_mode.rts_level_for_tx,)) if self._rs485_mode.rts_level_for_rx: raise ValueError( 'Unsupported value for RS485Settings.rts_level_for_rx: %r' % ( - self._rs485_mode.rts_level_for_rx,)) + self._rs485_mode.rts_level_for_rx,)) if self._rs485_mode.delay_before_tx is not None: raise ValueError( 'Unsupported value for RS485Settings.delay_before_tx: %r' % ( - self._rs485_mode.delay_before_tx,)) + self._rs485_mode.delay_before_tx,)) if self._rs485_mode.delay_before_rx is not None: raise ValueError( 'Unsupported value for RS485Settings.delay_before_rx: %r' % ( - self._rs485_mode.delay_before_rx,)) + self._rs485_mode.delay_before_rx,)) if self._rs485_mode.loopback: raise ValueError( 'Unsupported value for RS485Settings.loopback: %r' % ( - self._rs485_mode.loopback,)) + self._rs485_mode.loopback,)) comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE comDCB.fOutxCtsFlow = 0 if self._dsrdtr: - comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE + comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE else: - comDCB.fDtrControl = win32.DTR_CONTROL_ENABLE if self._dtr_state else win32.DTR_CONTROL_DISABLE - comDCB.fOutxDsrFlow = self._dsrdtr - comDCB.fOutX = self._xonxoff - comDCB.fInX = self._xonxoff - comDCB.fNull = 0 - comDCB.fErrorChar = 0 - comDCB.fAbortOnError = 0 - comDCB.XonChar = XON - comDCB.XoffChar = XOFF + comDCB.fDtrControl = win32.DTR_CONTROL_ENABLE if self._dtr_state else win32.DTR_CONTROL_DISABLE + comDCB.fOutxDsrFlow = self._dsrdtr + comDCB.fOutX = self._xonxoff + comDCB.fInX = self._xonxoff + comDCB.fNull = 0 + comDCB.fErrorChar = 0 + comDCB.fAbortOnError = 0 + comDCB.XonChar = serial.XON + comDCB.XoffChar = serial.XOFF if not win32.SetCommState(self._port_handle, ctypes.byref(comDCB)): raise ValueError("Cannot configure port, some setting was wrong. Original message: %r" % ctypes.WinError()) @@ -219,7 +221,6 @@ class Serial(SerialBase): #~ def __del__(self): #~ self.close() - def _close(self): """internal close port helper""" if self._port_handle: @@ -257,7 +258,8 @@ class Serial(SerialBase): Read size bytes from the serial port. If a timeout is set it may return less characters as requested. With no timeout it will block until the requested number of bytes is read.""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError if size > 0: win32.ResetEvent(self._overlapped_read.hEvent) flags = win32.DWORD() @@ -290,7 +292,8 @@ class Serial(SerialBase): def write(self, data): """Output the given byte string over the serial port.""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError #~ if not isinstance(data, (bytes, bytearray)): #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview @@ -301,7 +304,7 @@ class Serial(SerialBase): err = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write) if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: raise SerialException("WriteFile failed (%r)" % ctypes.WinError()) - if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0) + if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0) # Wait for the write to complete. #~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE) err = win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True) @@ -324,7 +327,8 @@ class Serial(SerialBase): def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) def reset_output_buffer(self): @@ -332,12 +336,14 @@ class Serial(SerialBase): Clear output buffer, aborting the current output and discarding all that is in the buffer. """ - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT) def _update_break_state(self): """Set break: Controls TXD. When active, to transmitting is possible.""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError if self._break_state: win32.SetCommBreak(self._port_handle) else: @@ -358,7 +364,8 @@ class Serial(SerialBase): win32.EscapeCommFunction(self._port_handle, win32.CLRDTR) def _GetCommModemStatus(self): - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError stat = win32.DWORD() win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat)) return stat.value @@ -390,7 +397,8 @@ class Serial(SerialBase): Recommend a buffer size to the driver (device driver can ignore this value). Must be called before the port is opended. """ - if tx_size is None: tx_size = rx_size + if tx_size is None: + tx_size = rx_size win32.SetupComm(self._port_handle, rx_size, tx_size) def set_output_flow_control(self, enable=True): @@ -400,7 +408,8 @@ class Serial(SerialBase): from the other device and control the transmission accordingly. WARNING: this function is not portable to different platforms! """ - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError if enable: win32.EscapeCommFunction(self._port_handle, win32.SETXON) else: @@ -416,9 +425,9 @@ class Serial(SerialBase): return comstat.cbOutQue - # Nur Testfunktion!! if __name__ == '__main__': + import sys s = Serial(0) sys.stdout.write("%s\n" % s) @@ -431,4 +440,3 @@ if __name__ == '__main__': s.port = 0 s.open() sys.stdout.write("%s\n" % s) - diff --git a/serial/tools/hexlify_codec.py b/serial/tools/hexlify_codec.py index f3ba04f..5b35e8e 100644 --- a/serial/tools/hexlify_codec.py +++ b/serial/tools/hexlify_codec.py @@ -9,18 +9,23 @@ HEXDIGITS = '0123456789ABCDEF' ### Codec APIs + def hex_encode(input, errors='strict'): return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input)) + def hex_decode(input, errors='strict'): return (''.join('{:02X} '.format(b) for b in input), len(input)) + class Codec(codecs.Codec): def encode(self, input, errors='strict'): return serial.to_bytes([int(h, 16) for h in input.split()]) + def decode(self, input, errors='strict'): return ''.join('{:02X} '.format(b) for b in input) + class IncrementalEncoder(codecs.IncrementalEncoder): def __init__(self, errors='strict'): @@ -57,18 +62,21 @@ class IncrementalEncoder(codecs.IncrementalEncoder): self.state = state return serial.to_bytes(encoded) + class IncrementalDecoder(codecs.IncrementalDecoder): def decode(self, input, final=False): return ''.join('{:02X} '.format(b) for b in input) + class StreamWriter(Codec, codecs.StreamWriter): pass + class StreamReader(Codec, codecs.StreamReader): pass -### encodings module API +### encodings module API def getregentry(): return codecs.CodecInfo( name='hexlify', diff --git a/serial/tools/list_ports.py b/serial/tools/list_ports.py index c5896c5..3b32b92 100644 --- a/serial/tools/list_ports.py +++ b/serial/tools/list_ports.py @@ -17,23 +17,24 @@ Additionally a grep function is supplied that can be used to search for ports based on their descriptions or hardware ID. """ -import sys, os, re +import sys +import os +import re # chose an implementation, depending on os #~ if sys.platform == 'cli': #~ else: -import os -# chose an implementation, depending on os -if os.name == 'nt': #sys.platform == 'win32': - from serial.tools.list_ports_windows import * +if os.name == 'nt': # sys.platform == 'win32': + from serial.tools.list_ports_windows import comports elif os.name == 'posix': - from serial.tools.list_ports_posix import * + from serial.tools.list_ports_posix import comports #~ elif os.name == 'java': else: raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + def grep(regexp): """\ Search for ports using a regular expression. Port name, description and @@ -52,21 +53,23 @@ def main(): parser = argparse.ArgumentParser(description='Serial port enumeration') - parser.add_argument('regexp', + parser.add_argument( + 'regexp', nargs='?', help='only show ports that match this regex') - parser.add_argument('-v', '--verbose', + parser.add_argument( + '-v', '--verbose', action='store_true', help='show more messages') - parser.add_argument('-q', '--quiet', + parser.add_argument( + '-q', '--quiet', action='store_true', help='suppress all messages') args = parser.parse_args() - hits = 0 # get iteraror w/ or w/o filter if args.regexp: diff --git a/serial/tools/list_ports_linux.py b/serial/tools/list_ports_linux.py index f7491e5..3e37d72 100644 --- a/serial/tools/list_ports_linux.py +++ b/serial/tools/list_ports_linux.py @@ -15,7 +15,7 @@ import glob import os
import re
import subprocess
-import sys
+
def read_command(argv):
"""run a command and return its output"""
@@ -24,6 +24,7 @@ def read_command(argv): except subprocess.SubprocessError as e:
raise IOError('command %r failed: %s' % (argv, e))
+
def read_line(*args):
"""\
Helper function to read a single line from a file.
@@ -37,6 +38,7 @@ def read_line(*args): except IOError:
return None
+
def re_group(regexp, text):
"""search for regexp in text, return 1st group on match"""
m = re.search(regexp, text)
@@ -63,6 +65,7 @@ def usb_sysfs_hw_string(sysfs_path): snr_txt
)
+
def usb_lsusb_string(sysfs_path):
base = os.path.basename(os.path.realpath(sysfs_path))
bus = base.split('-')[0]
@@ -82,6 +85,7 @@ def usb_lsusb_string(sysfs_path): # create descriptions. prefer text from device, fall back to the others
return '%s %s %s' % (iManufacturer or idVendor, iProduct or idProduct, iSerial)
+
def describe(device):
"""\
Get a human readable description.
@@ -106,6 +110,7 @@ def describe(device): return read_line(product_name_file)
return base
+
def hwinfo(device):
"""Try to get a HW identification using sysfs"""
base = os.path.basename(device)
@@ -126,6 +131,7 @@ def hwinfo(device): return usb_sysfs_hw_string(sys_dev_path + '/..')
return 'n/a' # XXX directly remove these from the list?
+
def comports():
devices = glob.glob('/dev/ttyS*') # built-in serial ports
devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver
@@ -138,4 +144,3 @@ def comports(): if __name__ == '__main__':
for port, desc, hwid in sorted(comports()):
print("%s: %s [%s]" % (port, desc, hwid))
-
diff --git a/serial/tools/list_ports_posix.py b/serial/tools/list_ports_posix.py index a1a3faf..9a9b05d 100644 --- a/serial/tools/list_ports_posix.py +++ b/serial/tools/list_ports_posix.py @@ -27,7 +27,7 @@ import os # try to detect the OS so that a device can be selected...
plat = sys.platform.lower()
-if plat[:5] == 'linux': # Linux (confirmed)
+if plat[:5] == 'linux': # Linux (confirmed)
from serial.tools.list_ports_linux import comports
elif plat == 'cygwin': # cygwin/win32
@@ -85,6 +85,7 @@ elif plat[:3] == 'aix': # AIX else:
# platform detection has failed...
+ import serial
sys.stderr.write("""\
don't know how to enumerate ttys on this system.
! I you know how the serial ports are named send this information to
@@ -103,4 +104,3 @@ this module running... if __name__ == '__main__':
for port, desc, hwid in sorted(comports()):
print("%s: %s [%s]" % (port, desc, hwid))
-
diff --git a/serial/tools/list_ports_windows.py b/serial/tools/list_ports_windows.py index c285dde..838e85d 100644 --- a/serial/tools/list_ports_windows.py +++ b/serial/tools/list_ports_windows.py @@ -7,17 +7,8 @@ #
# SPDX-License-Identifier: BSD-3-Clause
-import ctypes
import re
-
-def ValidHandle(value, func, arguments):
- if value == 0:
- raise ctypes.WinError()
- return value
-
-import serial
-from serial.win32 import ULONG_PTR, is_64bit
-from ctypes.wintypes import HANDLE
+import ctypes
from ctypes.wintypes import BOOL
from ctypes.wintypes import HWND
from ctypes.wintypes import DWORD
@@ -27,6 +18,14 @@ from ctypes.wintypes import ULONG from ctypes.wintypes import LPCSTR
from ctypes.wintypes import HKEY
from ctypes.wintypes import BYTE
+import serial
+from serial.win32 import ULONG_PTR
+
+
+def ValidHandle(value, func, arguments):
+ if value == 0:
+ raise ctypes.WinError()
+ return value
NULL = 0
HDEVINFO = ctypes.c_void_p
@@ -45,11 +44,13 @@ def byte_buffer(length): """Get a buffer for a string"""
return (BYTE*length)()
+
def string(buffer):
s = []
for c in buffer:
- if c == 0: break
- s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
+ if c == 0:
+ break
+ s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
return ''.join(s)
@@ -60,6 +61,7 @@ class GUID(ctypes.Structure): ('Data3', WORD),
('Data4', BYTE*8),
]
+
def __str__(self):
return "{%08x-%04x-%04x-%s-%s}" % (
self.Data1,
@@ -69,6 +71,7 @@ class GUID(ctypes.Structure): ''.join(["%02x" % d for d in self.Data4[2:]]),
)
+
class SP_DEVINFO_DATA(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
@@ -76,6 +79,7 @@ class SP_DEVINFO_DATA(ctypes.Structure): ('DevInst', DWORD),
('Reserved', ULONG_PTR),
]
+
def __str__(self):
return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
@@ -134,11 +138,12 @@ DIREG_DEV = 0x00000001 KEY_READ = 0x20019
# workaround for compatibility between Python 2.x and 3.x
-Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports"
-PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
+Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports"
+PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
+
def comports():
- GUIDs = (GUID*8)() # so far only seen one used, so hope 8 are enough...
+ GUIDs = (GUID*8)() # so far only seen one used, so hope 8 are enough...
guids_size = DWORD()
if not SetupDiClassGuidsFromName(
Ports,
@@ -153,7 +158,7 @@ def comports(): ctypes.byref(GUIDs[index]),
None,
NULL,
- DIGCF_PRESENT) # was DIGCF_PRESENT|DIGCF_DEVICEINTERFACE which misses CDC ports
+ DIGCF_PRESENT) # was DIGCF_PRESENT|DIGCF_DEVICEINTERFACE which misses CDC ports
devinfo = SP_DEVINFO_DATA()
devinfo.cbSize = ctypes.sizeof(devinfo)
@@ -244,8 +249,5 @@ def comports(): # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test
if __name__ == '__main__':
- import serial
-
for port, desc, hwid in sorted(comports()):
print("%s: %s [%s]" % (port, desc, hwid))
-
diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py index 513bb2a..711edaf 100644 --- a/serial/tools/miniterm.py +++ b/serial/tools/miniterm.py @@ -189,8 +189,8 @@ class NoTerminal(Transform): REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t') REPLACEMENT_MAP.update({ - 0x7F: 0x2421, # DEL - 0x9B: 0x2425, # CSI + 0x7F: 0x2421, # DEL + 0x9B: 0x2425, # CSI }) def rx(self, text): @@ -204,9 +204,9 @@ class NoControls(NoTerminal): REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32)) REPLACEMENT_MAP.update({ - 32: 0x2423, # visual space - 0x7F: 0x2421, # DEL - 0x9B: 0x2425, # CSI + 32: 0x2423, # visual space + 0x7F: 0x2421, # DEL + 0x9B: 0x2425, # CSI }) @@ -276,8 +276,8 @@ TRANSFORMATIONS = { 'debug': DebugIO, } -# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def dump_port_list(): if comports: sys.stderr.write('\n--- Available ports:\n') @@ -313,7 +313,6 @@ class Miniterm(object): self._reader_alive = False self.receiver_thread.join() - def start(self): self.alive = True self._start_reader() @@ -344,7 +343,6 @@ class Miniterm(object): self.output_encoding = encoding self.tx_encoder = codecs.getincrementalencoder(encoding)(errors) - def dump_port_settings(self): sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format( p=self.serial)) @@ -386,13 +384,12 @@ class Miniterm(object): for transformation in self.rx_transformations: text = transformation.rx(text) self.console.write(text) - except serial.SerialException as e: + except serial.SerialException: self.alive = False # XXX would be nice if the writer could be interrupted at this # point... to exit completely raise - def writer(self): """\ Loop and copy console->serial until self.exit_character character is @@ -488,7 +485,7 @@ class Miniterm(object): self.update_transformations() sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) elif c == '\x0c': # CTRL+L -> EOL mode - modes = list(EOL_TRANSFORMATIONS) # keys + modes = list(EOL_TRANSFORMATIONS) # keys eol = modes.index(self.eol) + 1 if eol >= len(modes): eol = 0 @@ -637,7 +634,6 @@ class Miniterm(object): ) - # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # default args can be used to override when calling main() from an other script # e.g to create a miniterm-my-device.py @@ -647,12 +643,14 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr parser = argparse.ArgumentParser( description="Miniterm - A simple terminal program for the serial port.") - parser.add_argument("port", + parser.add_argument( + "port", nargs='?', help="serial port name", default=default_port) - parser.add_argument("baudrate", + parser.add_argument( + "baudrate", nargs='?', type=int, help="set baud rate, default: %(default)s", @@ -660,72 +658,84 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr group = parser.add_argument_group("port settings") - group.add_argument("--parity", + group.add_argument( + "--parity", choices=['N', 'E', 'O', 'S', 'M'], type=lambda c: c.upper(), help="set parity, one of {N E O S M}, default: N", default='N') - group.add_argument("--rtscts", + group.add_argument( + "--rtscts", action="store_true", help="enable RTS/CTS flow control (default off)", default=False) - group.add_argument("--xonxoff", + group.add_argument( + "--xonxoff", action="store_true", help="enable software flow control (default off)", default=False) - group.add_argument("--rts", + group.add_argument( + "--rts", type=int, help="set initial RTS line state (possible values: 0, 1)", default=default_rts) - group.add_argument("--dtr", + group.add_argument( + "--dtr", type=int, help="set initial DTR line state (possible values: 0, 1)", default=default_dtr) group = parser.add_argument_group("data handling") - group.add_argument("-e", "--echo", + group.add_argument( + "-e", "--echo", action="store_true", help="enable local echo (default off)", default=False) - group.add_argument("--encoding", + group.add_argument( + "--encoding", dest="serial_port_encoding", metavar="CODEC", help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s", default='UTF-8') - group.add_argument("-f", "--filter", + group.add_argument( + "-f", "--filter", action="append", metavar="NAME", help="add text transformation", default=[]) - group.add_argument("--eol", + group.add_argument( + "--eol", choices=['CR', 'LF', 'CRLF'], type=lambda c: c.upper(), help="end of line mode", default='CRLF') - group.add_argument("--raw", + group.add_argument( + "--raw", action="store_true", help="Do no apply any encodings/transformations", default=False) group = parser.add_argument_group("hotkeys") - group.add_argument("--exit-char", + group.add_argument( + "--exit-char", type=int, metavar='NUM', help="Unicode of special character that is used to exit the application, default: %(default)s", default=0x1d # GS/CTRL+] ) - group.add_argument("--menu-char", + group.add_argument( + "--menu-char", type=int, metavar='NUM', help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s", @@ -734,12 +744,14 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr group = parser.add_argument_group("diagnostics") - group.add_argument("-q", "--quiet", + group.add_argument( + "-q", "--quiet", action="store_true", help="suppress non-error messages", default=False) - group.add_argument("--develop", + group.add_argument( + "--develop", action="store_true", help="show Python traceback on error", default=False) @@ -749,7 +761,6 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr if args.menu_char == args.exit_char: parser.error('--exit-char can not be the same as --menu-char') - # no port given on command line -> ask user now if args.port is None: dump_port_list() @@ -767,7 +778,6 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr else: filters = ['default'] - try: serial_instance = serial.serial_for_url( args.port, diff --git a/serial/urlhandler/protocol_hwgrep.py b/serial/urlhandler/protocol_hwgrep.py index 90626ff..e184ec5 100644 --- a/serial/urlhandler/protocol_hwgrep.py +++ b/serial/urlhandler/protocol_hwgrep.py @@ -20,6 +20,7 @@ try: except NameError: basestring = str # python 3 + class Serial(serial.Serial): """Just inherit the native Serial port implementation and patch the port property.""" @@ -43,8 +44,6 @@ class Serial(serial.Serial): # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - if __name__ == '__main__': - #~ s = Serial('hwgrep://ttyS0') s = Serial(None) s.port = 'hwgrep://ttyS0' print(s) - diff --git a/serial/urlhandler/protocol_loop.py b/serial/urlhandler/protocol_loop.py index 79d2c56..f5104b4 100644 --- a/serial/urlhandler/protocol_loop.py +++ b/serial/urlhandler/protocol_loop.py @@ -17,7 +17,6 @@ # - "debug" print diagnostic messages import logging import numbers -import threading import time try: import urlparse @@ -91,13 +90,6 @@ class Serial(SerialBase): if self.logger: self.logger.info('_reconfigure_port()') - def close(self): - """Close port""" - if self.is_open: - self.is_open = False - # in case of quick reconnects, give the server some time - time.sleep(0.3) - def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) @@ -121,7 +113,8 @@ class Serial(SerialBase): @property def in_waiting(self): """Return the number of bytes currently in the input buffer.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: # attention the logged value can differ from return value in # threaded environments... @@ -134,7 +127,8 @@ class Serial(SerialBase): return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self._timeout is not None: timeout = time.time() + self._timeout else: @@ -142,7 +136,7 @@ class Serial(SerialBase): data = bytearray() while size > 0 and self.is_open: try: - data += self.queue.get(timeout=self._timeout) # XXX inter char timeout + data += self.queue.get(timeout=self._timeout) # XXX inter char timeout except queue.Empty: break else: @@ -159,14 +153,15 @@ class Serial(SerialBase): connection is blocked. May raise SerialException if the connection is closed. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError data = to_bytes(data) # calculate aprox time that would be used to send the data time_used_to_send = 10.0*len(data) / self._baudrate # when a write timeout is configured check if we would be successful # (not sending anything, not even the part that would have time) if self._write_timeout is not None and time_used_to_send > self._write_timeout: - time.sleep(self._write_timeout) # must wait so that unit test succeeds + time.sleep(self._write_timeout) # must wait so that unit test succeeds raise writeTimeoutError for byte in iterbytes(data): self.queue.put(byte, timeout=self._write_timeout) @@ -174,7 +169,8 @@ class Serial(SerialBase): def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('reset_input_buffer()') try: @@ -188,7 +184,8 @@ class Serial(SerialBase): Clear output buffer, aborting the current output and discarding all that is in the buffer. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('reset_output_buffer()') try: @@ -218,7 +215,8 @@ class Serial(SerialBase): @property def cts(self): """Read terminal status line: Clear To Send""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('CTS -> state of RTS (%r)' % (self._rts_state,)) return self._rts_state @@ -233,7 +231,8 @@ class Serial(SerialBase): @property def ri(self): """Read terminal status line: Ring Indicator""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('returning dummy for RI') return False @@ -241,7 +240,8 @@ class Serial(SerialBase): @property def cd(self): """Read terminal status line: Carrier Detect""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('returning dummy for CD') return True diff --git a/serial/urlhandler/protocol_socket.py b/serial/urlhandler/protocol_socket.py index 99e532f..a4297bb 100644 --- a/serial/urlhandler/protocol_socket.py +++ b/serial/urlhandler/protocol_socket.py @@ -27,7 +27,7 @@ try: except ImportError: import urllib.parse as urlparse -from serial.serialutil import * +from serial.serialutil import SerialBase, SerialException, portNotOpenError, to_bytes # map log level names to constants. used in from_url() LOGGER_LEVELS = { @@ -39,6 +39,7 @@ LOGGER_LEVELS = { POLL_TIMEOUT = 2 + class Serial(SerialBase): """Serial port implementation for plain sockets.""" @@ -61,7 +62,7 @@ class Serial(SerialBase): self._socket = None raise SerialException("Could not open port %s: %s" % (self.portstr, msg)) - self._socket.settimeout(POLL_TIMEOUT) # used for write timeout support :/ + self._socket.settimeout(POLL_TIMEOUT) # used for write timeout support :/ # not that there anything to configure... self._reconfigure_port() @@ -116,7 +117,8 @@ class Serial(SerialBase): raise ValueError('unknown option: %r' % (option,)) # get host and port host, port = parts.hostname, parts.port - if not 0 <= port < 65536: raise ValueError("port not in range 0...65535") + if not 0 <= port < 65536: + raise ValueError("port not in range 0...65535") except ValueError as e: raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e) return (host, port) @@ -126,7 +128,8 @@ class Serial(SerialBase): @property def in_waiting(self): """Return the number of bytes currently in the input buffer.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError # Poll the socket to see if it is ready for reading. # If ready, at least one byte will be to read. lr, lw, lx = select.select([self._socket], [], [], 0) @@ -138,7 +141,8 @@ class Serial(SerialBase): return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError data = bytearray() if self._timeout is not None: timeout = time.time() + self._timeout @@ -171,7 +175,8 @@ class Serial(SerialBase): connection is blocked. May raise SerialException if the connection is closed. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError try: self._socket.sendall(to_bytes(data)) except socket.error as e: @@ -181,7 +186,8 @@ class Serial(SerialBase): def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('ignored reset_input_buffer') @@ -190,7 +196,8 @@ class Serial(SerialBase): Clear output buffer, aborting the current output and discarding all that is in the buffer. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('ignored reset_output_buffer') @@ -199,7 +206,8 @@ class Serial(SerialBase): Send break condition. Timed, returns to idle state after given duration. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('ignored send_break(%r)' % (duration,)) @@ -222,7 +230,8 @@ class Serial(SerialBase): @property def cts(self): """Read terminal status line: Clear To Send""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('returning dummy for cts') return True @@ -230,7 +239,8 @@ class Serial(SerialBase): @property def dsr(self): """Read terminal status line: Data Set Ready""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('returning dummy for dsr') return True @@ -238,7 +248,8 @@ class Serial(SerialBase): @property def ri(self): """Read terminal status line: Ring Indicator""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('returning dummy for ri') return False @@ -246,7 +257,8 @@ class Serial(SerialBase): @property def cd(self): """Read terminal status line: Carrier Detect""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if self.logger: self.logger.info('returning dummy for cd)') return True diff --git a/serial/urlhandler/protocol_spy.py b/serial/urlhandler/protocol_spy.py index 9a7c75c..647b760 100644 --- a/serial/urlhandler/protocol_spy.py +++ b/serial/urlhandler/protocol_spy.py @@ -263,4 +263,3 @@ if __name__ == '__main__': s = Serial(None) s.port = 'spy:///dev/ttyS0' print(s) - diff --git a/serial/win32.py b/serial/win32.py index 88a1335..c8dd3e3 100644 --- a/serial/win32.py +++ b/serial/win32.py @@ -8,14 +8,15 @@ from ctypes import * from ctypes.wintypes import HANDLE from ctypes.wintypes import BOOL from ctypes.wintypes import LPCWSTR -_stdcall_libraries = {} -_stdcall_libraries['kernel32'] = WinDLL('kernel32') from ctypes.wintypes import DWORD from ctypes.wintypes import WORD from ctypes.wintypes import BYTE +_stdcall_libraries = {} +_stdcall_libraries['kernel32'] = WinDLL('kernel32') INVALID_HANDLE_VALUE = HANDLE(-1).value + # some details of the windows API differ between 32 and 64 bit systems.. def is_64bit(): """Returns true when running on a 64 bit system""" @@ -45,7 +46,7 @@ except AttributeError: CreateEventA = _stdcall_libraries['kernel32'].CreateEventA CreateEventA.restype = HANDLE CreateEventA.argtypes = [LPSECURITY_ATTRIBUTES, BOOL, BOOL, LPCSTR] - CreateEvent=CreateEventA + CreateEvent = CreateEventA CreateFileA = _stdcall_libraries['kernel32'].CreateFileA CreateFileA.restype = HANDLE @@ -54,27 +55,35 @@ except AttributeError: else: CreateEventW.restype = HANDLE CreateEventW.argtypes = [LPSECURITY_ATTRIBUTES, BOOL, BOOL, LPCWSTR] - CreateEvent = CreateEventW # alias + CreateEvent = CreateEventW # alias CreateFileW = _stdcall_libraries['kernel32'].CreateFileW CreateFileW.restype = HANDLE CreateFileW.argtypes = [LPCWSTR, DWORD, DWORD, LPSECURITY_ATTRIBUTES, DWORD, DWORD, HANDLE] - CreateFile = CreateFileW # alias + CreateFile = CreateFileW # alias + class _OVERLAPPED(Structure): pass + OVERLAPPED = _OVERLAPPED + class _COMSTAT(Structure): pass + COMSTAT = _COMSTAT + class _DCB(Structure): pass + DCB = _DCB + class _COMMTIMEOUTS(Structure): pass + COMMTIMEOUTS = _COMMTIMEOUTS GetLastError = _stdcall_libraries['kernel32'].GetLastError @@ -166,66 +175,70 @@ WaitForSingleObject = _stdcall_libraries['kernel32'].WaitForSingleObject WaitForSingleObject.restype = DWORD WaitForSingleObject.argtypes = [HANDLE, DWORD] -ONESTOPBIT = 0 # Variable c_int -TWOSTOPBITS = 2 # Variable c_int +ONESTOPBIT = 0 # Variable c_int +TWOSTOPBITS = 2 # Variable c_int ONE5STOPBITS = 1 -NOPARITY = 0 # Variable c_int -ODDPARITY = 1 # Variable c_int -EVENPARITY = 2 # Variable c_int +NOPARITY = 0 # Variable c_int +ODDPARITY = 1 # Variable c_int +EVENPARITY = 2 # Variable c_int MARKPARITY = 3 SPACEPARITY = 4 -RTS_CONTROL_HANDSHAKE = 2 # Variable c_int -RTS_CONTROL_DISABLE = 0 # Variable c_int -RTS_CONTROL_ENABLE = 1 # Variable c_int -RTS_CONTROL_TOGGLE = 3 # Variable c_int +RTS_CONTROL_HANDSHAKE = 2 # Variable c_int +RTS_CONTROL_DISABLE = 0 # Variable c_int +RTS_CONTROL_ENABLE = 1 # Variable c_int +RTS_CONTROL_TOGGLE = 3 # Variable c_int SETRTS = 3 CLRRTS = 4 -DTR_CONTROL_HANDSHAKE = 2 # Variable c_int -DTR_CONTROL_DISABLE = 0 # Variable c_int -DTR_CONTROL_ENABLE = 1 # Variable c_int +DTR_CONTROL_HANDSHAKE = 2 # Variable c_int +DTR_CONTROL_DISABLE = 0 # Variable c_int +DTR_CONTROL_ENABLE = 1 # Variable c_int SETDTR = 5 CLRDTR = 6 -MS_DSR_ON = 32 # Variable c_ulong -EV_RING = 256 # Variable c_int -EV_PERR = 512 # Variable c_int -EV_ERR = 128 # Variable c_int -SETXOFF = 1 # Variable c_int -EV_RXCHAR = 1 # Variable c_int -GENERIC_WRITE = 1073741824 # Variable c_long -PURGE_TXCLEAR = 4 # Variable c_int -FILE_FLAG_OVERLAPPED = 1073741824 # Variable c_int -EV_DSR = 16 # Variable c_int -MAXDWORD = 4294967295 # Variable c_uint -EV_RLSD = 32 # Variable c_int -ERROR_IO_PENDING = 997 # Variable c_long -MS_CTS_ON = 16 # Variable c_ulong -EV_EVENT1 = 2048 # Variable c_int -EV_RX80FULL = 1024 # Variable c_int -PURGE_RXABORT = 2 # Variable c_int -FILE_ATTRIBUTE_NORMAL = 128 # Variable c_int -PURGE_TXABORT = 1 # Variable c_int -SETXON = 2 # Variable c_int -OPEN_EXISTING = 3 # Variable c_int -MS_RING_ON = 64 # Variable c_ulong -EV_TXEMPTY = 4 # Variable c_int -EV_RXFLAG = 2 # Variable c_int -MS_RLSD_ON = 128 # Variable c_ulong -GENERIC_READ = 2147483648 # Variable c_ulong -EV_EVENT2 = 4096 # Variable c_int -EV_CTS = 8 # Variable c_int -EV_BREAK = 64 # Variable c_int -PURGE_RXCLEAR = 8 # Variable c_int +MS_DSR_ON = 32 # Variable c_ulong +EV_RING = 256 # Variable c_int +EV_PERR = 512 # Variable c_int +EV_ERR = 128 # Variable c_int +SETXOFF = 1 # Variable c_int +EV_RXCHAR = 1 # Variable c_int +GENERIC_WRITE = 1073741824 # Variable c_long +PURGE_TXCLEAR = 4 # Variable c_int +FILE_FLAG_OVERLAPPED = 1073741824 # Variable c_int +EV_DSR = 16 # Variable c_int +MAXDWORD = 4294967295 # Variable c_uint +EV_RLSD = 32 # Variable c_int +ERROR_IO_PENDING = 997 # Variable c_long +MS_CTS_ON = 16 # Variable c_ulong +EV_EVENT1 = 2048 # Variable c_int +EV_RX80FULL = 1024 # Variable c_int +PURGE_RXABORT = 2 # Variable c_int +FILE_ATTRIBUTE_NORMAL = 128 # Variable c_int +PURGE_TXABORT = 1 # Variable c_int +SETXON = 2 # Variable c_int +OPEN_EXISTING = 3 # Variable c_int +MS_RING_ON = 64 # Variable c_ulong +EV_TXEMPTY = 4 # Variable c_int +EV_RXFLAG = 2 # Variable c_int +MS_RLSD_ON = 128 # Variable c_ulong +GENERIC_READ = 2147483648 # Variable c_ulong +EV_EVENT2 = 4096 # Variable c_int +EV_CTS = 8 # Variable c_int +EV_BREAK = 64 # Variable c_int +PURGE_RXCLEAR = 8 # Variable c_int INFINITE = 0xFFFFFFFF class N11_OVERLAPPED4DOLLAR_48E(Union): pass + + class N11_OVERLAPPED4DOLLAR_484DOLLAR_49E(Structure): pass + + N11_OVERLAPPED4DOLLAR_484DOLLAR_49E._fields_ = [ ('Offset', DWORD), ('OffsetHigh', DWORD), |