summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Liechti <cliechti@gmx.net>2015-08-30 21:28:04 +0200
committerChris Liechti <cliechti@gmx.net>2015-08-30 21:28:04 +0200
commit033f17c9d5c40fe8b5e55b6ada1d483b612ca092 (patch)
tree5ada169ba7c6a8036357904772075467cda8d4ea
parent518b0d31aafdb9ea52a47eb850490d652af2ad96 (diff)
downloadpyserial-git-033f17c9d5c40fe8b5e55b6ada1d483b612ca092.tar.gz
pep-8 and small cleanups
-rw-r--r--serial/__init__.py21
-rw-r--r--serial/aio.py3
-rw-r--r--serial/rfc2217.py136
-rw-r--r--serial/rs485.py8
-rw-r--r--serial/serialposix.py227
-rw-r--r--serial/serialutil.py92
-rw-r--r--serial/serialwin32.py124
-rw-r--r--serial/tools/hexlify_codec.py10
-rw-r--r--serial/tools/list_ports.py23
-rw-r--r--serial/tools/list_ports_linux.py9
-rw-r--r--serial/tools/list_ports_posix.py4
-rw-r--r--serial/tools/list_ports_windows.py40
-rw-r--r--serial/tools/miniterm.py70
-rw-r--r--serial/urlhandler/protocol_hwgrep.py3
-rw-r--r--serial/urlhandler/protocol_loop.py36
-rw-r--r--serial/urlhandler/protocol_socket.py38
-rw-r--r--serial/urlhandler/protocol_spy.py1
-rw-r--r--serial/win32.py107
18 files changed, 528 insertions, 424 deletions
diff --git a/serial/__init__.py b/serial/__init__.py
index f40ed3e..6d4d6c7 100644
--- a/serial/__init__.py
+++ b/serial/__init__.py
@@ -7,22 +7,25 @@
#
# SPDX-License-Identifier: BSD-3-Clause
-VERSION = '3.0a'
-
import importlib
import sys
+from serial.serialutil import *
+#~ SerialBase, SerialException, to_bytes, iterbytes
+
+VERSION = '3.0a'
+
if sys.platform == 'cli':
- from serial.serialcli import *
+ from serial.serialcli import Serial
else:
import os
# chose an implementation, depending on os
- if os.name == 'nt': #sys.platform == 'win32':
- from serial.serialwin32 import *
+ if os.name == 'nt': # sys.platform == 'win32':
+ from serial.serialwin32 import Serial
elif os.name == 'posix':
- from serial.serialposix import *
+ from serial.serialposix import Serial, PosixPollSerial
elif os.name == 'java':
- from serial.serialjava import *
+ from serial.serialjava import Serial
else:
raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
@@ -31,6 +34,7 @@ protocol_handler_packages = [
'serial.urlhandler',
]
+
def serial_for_url(url, *args, **kwargs):
"""\
Get an instance of the Serial class, depending on port/url. The port is not
@@ -48,7 +52,8 @@ def serial_for_url(url, *args, **kwargs):
"""
# check remove extra parameter to not confuse the Serial class
do_open = 'do_not_open' not in kwargs or not kwargs['do_not_open']
- if 'do_not_open' in kwargs: del kwargs['do_not_open']
+ if 'do_not_open' in kwargs:
+ del kwargs['do_not_open']
# the default is to use the native version
klass = Serial # 'native' implementation
# check port type and get class
diff --git a/serial/aio.py b/serial/aio.py
index 476959a..7e16545 100644
--- a/serial/aio.py
+++ b/serial/aio.py
@@ -17,6 +17,8 @@ implementation. It should be possible to get that working though.
"""
import asyncio
import serial
+import logger
+
class SerialTransport(asyncio.Transport):
def __init__(self, loop, protocol, serial_instance):
@@ -80,6 +82,7 @@ class SerialTransport(asyncio.Transport):
#~ def write_eof(self):
#~ def abort(self):
+
@asyncio.coroutine
def create_serial_connection(loop, protocol_factory, *args, **kwargs):
ser = serial.Serial(*args, **kwargs)
diff --git a/serial/rfc2217.py b/serial/rfc2217.py
index 24a090c..3ddc33d 100644
--- a/serial/rfc2217.py
+++ b/serial/rfc2217.py
@@ -17,9 +17,9 @@
# conditional
# - write timeout not implemented at all
-##############################################################################
+# ###########################################################################
# observations and issues with servers
-#=============================================================================
+# ===========================================================================
# sredird V2.2.1
# - http://www.ibiblio.org/pub/Linux/system/serial/ sredird-2.2.2.tar.gz
# - does not acknowledge SET_CONTROL (RTS/DTR) correctly, always responding
@@ -28,11 +28,11 @@
# numbers than 2**32?
# - To get the signature [COM_PORT_OPTION 0] has to be sent.
# - run a server: while true; do nc -l -p 7000 -c "sredird debug /dev/ttyUSB0 /var/lock/sredir"; done
-#=============================================================================
+# ===========================================================================
# telnetcpcd (untested)
# - http://ftp.wayne.edu/kermit/sredird/telnetcpcd-1.09.tar.gz
# - To get the signature [COM_PORT_OPTION] w/o data has to be sent.
-#=============================================================================
+# ===========================================================================
# ser2net
# - does not negotiate BINARY or COM_PORT_OPTION for his side but at least
# acknowledges that the client activates these options
@@ -44,7 +44,7 @@
# - To get the signature [COM_PORT_OPTION 0] has to be sent.
# - run a server: run ser2net daemon, in /etc/ser2net.conf:
# 2000:telnet:0:/dev/ttyS0:9600 remctl banner
-##############################################################################
+# ###########################################################################
# How to identify ports? pySerial might want to support other protocols in the
# future, so lets use an URL scheme.
@@ -74,7 +74,8 @@ try:
except ImportError:
import queue as Queue
-from serial.serialutil import *
+import serial
+from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, portNotOpenError
# port string is expected to be something like this:
# rfc2217://host:port
@@ -91,22 +92,22 @@ LOGGER_LEVELS = {
# telnet protocol characters
-SE = b'\xf0' # Subnegotiation End
+SE = b'\xf0' # Subnegotiation End
NOP = b'\xf1' # No Operation
-DM = b'\xf2' # Data Mark
+DM = b'\xf2' # Data Mark
BRK = b'\xf3' # Break
-IP = b'\xf4' # Interrupt process
-AO = b'\xf5' # Abort output
+IP = b'\xf4' # Interrupt process
+AO = b'\xf5' # Abort output
AYT = b'\xf6' # Are You There
-EC = b'\xf7' # Erase Character
-EL = b'\xf8' # Erase Line
-GA = b'\xf9' # Go Ahead
-SB = b'\xfa' # Subnegotiation Begin
+EC = b'\xf7' # Erase Character
+EL = b'\xf8' # Erase Line
+GA = b'\xf9' # Go Ahead
+SB = b'\xfa' # Subnegotiation Begin
WILL = b'\xfb'
WONT = b'\xfc'
-DO = b'\xfd'
+DO = b'\xfd'
DONT = b'\xfe'
-IAC = b'\xff' # Interpret As Command
+IAC = b'\xff' # Interpret As Command
IAC_DOUBLED = b'\xff\xff'
# selected telnet options
@@ -204,20 +205,20 @@ PURGE_BOTH_BUFFERS = b'\x03' # Purge both the access server receive data
RFC2217_PARITY_MAP = {
- PARITY_NONE: 1,
- PARITY_ODD: 2,
- PARITY_EVEN: 3,
- PARITY_MARK: 4,
- PARITY_SPACE: 5,
+ serial.PARITY_NONE: 1,
+ serial.PARITY_ODD: 2,
+ serial.PARITY_EVEN: 3,
+ serial.PARITY_MARK: 4,
+ serial.PARITY_SPACE: 5,
}
-RFC2217_REVERSE_PARITY_MAP = dict((v,k) for k,v in RFC2217_PARITY_MAP.items())
+RFC2217_REVERSE_PARITY_MAP = dict((v, k) for k, v in RFC2217_PARITY_MAP.items())
RFC2217_STOPBIT_MAP = {
- STOPBITS_ONE: 1,
- STOPBITS_ONE_POINT_FIVE: 3,
- STOPBITS_TWO: 2,
+ serial.STOPBITS_ONE: 1,
+ serial.STOPBITS_ONE_POINT_FIVE: 3,
+ serial.STOPBITS_TWO: 2,
}
-RFC2217_REVERSE_STOPBIT_MAP = dict((v,k) for k,v in RFC2217_STOPBIT_MAP.items())
+RFC2217_REVERSE_STOPBIT_MAP = dict((v, k) for k, v in RFC2217_STOPBIT_MAP.items())
# Telnet filter states
M_NORMAL = 0
@@ -230,6 +231,7 @@ ACTIVE = 'ACTIVE'
INACTIVE = 'INACTIVE'
REALLY_INACTIVE = 'REALLY_INACTIVE'
+
class TelnetOption(object):
"""Manage a single telnet option, keeps track of DO/DONT WILL/WONT."""
@@ -306,7 +308,8 @@ class TelnetSubnegotiation(object):
"""
def __init__(self, connection, name, option, ack_option=None):
- if ack_option is None: ack_option = option
+ if ack_option is None:
+ ack_option = option
self.connection = connection
self.name = name
self.option = option
@@ -395,7 +398,7 @@ class Serial(SerialBase):
self._socket = None
raise SerialException("Could not open port %s: %s" % (self.portstr, msg))
- self._socket.settimeout(5) # XXX good value?
+ self._socket.settimeout(5) # XXX good value?
# use a thread save queue as buffer. it also simplifies implementing
# the read timeout
@@ -553,7 +556,8 @@ class Serial(SerialBase):
raise ValueError('unknown option: %r' % (option,))
# get host and port
host, port = parts.hostname, parts.port
- if not 0 <= port < 65536: raise ValueError("port not in range 0...65535")
+ if not 0 <= port < 65536:
+ raise ValueError("port not in range 0...65535")
except ValueError as e:
raise SerialException('expected a string in the form "rfc2217://<host>:<port>[?option[&option...]]": %s' % e)
return (host, port)
@@ -563,7 +567,8 @@ class Serial(SerialBase):
@property
def in_waiting(self):
"""Return the number of bytes currently in the input buffer."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
return self._read_buffer.qsize()
def read(self, size=1):
@@ -572,14 +577,15 @@ class Serial(SerialBase):
return less characters as requested. With no timeout it will block
until the requested number of bytes is read.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
data = bytearray()
try:
while len(data) < size:
if self._thread is None:
raise SerialException('connection failed (reader thread died)')
data += self._read_buffer.get(True, self._timeout)
- except Queue.Empty: # -> timeout
+ except Queue.Empty: # -> timeout
pass
return bytes(data)
@@ -589,7 +595,8 @@ class Serial(SerialBase):
connection is blocked. May raise SerialException if the connection is
closed.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
with self._write_lock:
try:
self._socket.sendall(to_bytes(data).replace(IAC, IAC_DOUBLED))
@@ -599,7 +606,8 @@ class Serial(SerialBase):
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
self.rfc2217SendPurge(PURGE_RECEIVE_BUFFER)
# empty read buffer
while self._read_buffer.qsize():
@@ -610,7 +618,8 @@ class Serial(SerialBase):
Clear output buffer, aborting the current output and
discarding all that is in the buffer.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
self.rfc2217SendPurge(PURGE_TRANSMIT_BUFFER)
def _update_break_state(self):
@@ -618,7 +627,8 @@ class Serial(SerialBase):
Set break: Controls TXD. When active, to transmitting is
possible.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('set BREAK to %s' % ('active' if self._break_state else 'inactive'))
if self._break_state:
@@ -628,7 +638,8 @@ class Serial(SerialBase):
def _update_rts_state(self):
"""Set terminal status line: Request To Send."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('set RTS to %s' % ('active' if self._rts_state else 'inactive'))
if self._rts_state:
@@ -638,7 +649,8 @@ class Serial(SerialBase):
def _update_dtr_state(self, level=True):
"""Set terminal status line: Data Terminal Ready."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('set DTR to %s' % ('active' if self._dtr_state else 'inactive'))
if self._dtr_state:
@@ -649,25 +661,29 @@ class Serial(SerialBase):
@property
def cts(self):
"""Read terminal status line: Clear To Send."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
return bool(self.getModemState() & MODEMSTATE_MASK_CTS)
@property
def dsr(self):
"""Read terminal status line: Data Set Ready."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
return bool(self.getModemState() & MODEMSTATE_MASK_DSR)
@property
def ri(self):
"""Read terminal status line: Ring Indicator."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
return bool(self.getModemState() & MODEMSTATE_MASK_RI)
@property
def cd(self):
"""Read terminal status line: Carrier Detect."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
return bool(self.getModemState() & MODEMSTATE_MASK_CD)
# - - - platform specific - - -
@@ -692,7 +708,8 @@ class Serial(SerialBase):
if self.logger:
self.logger.debug("socket error in reader thread: %s" % (e,))
break
- if not data: break # lost connection
+ if not data:
+ break # lost connection
for byte in iterbytes(data):
if mode == M_NORMAL:
# interpret as command or as data
@@ -731,7 +748,7 @@ class Serial(SerialBase):
# other telnet commands
self._telnetProcessCommand(byte)
mode = M_NORMAL
- elif mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following
+ elif mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following
self._telnetNegotiateOption(telnet_command, byte)
mode = M_NORMAL
finally:
@@ -766,16 +783,15 @@ class Serial(SerialBase):
if self.logger:
self.logger.warning("rejected Telnet option: %r" % (option,))
-
def _telnetProcessSubnegotiation(self, suboption):
"""Process subnegotiation, the data between IAC SB and IAC SE."""
if suboption[0:1] == COM_PORT_OPTION:
if suboption[1:2] == SERVER_NOTIFY_LINESTATE and len(suboption) >= 3:
- self._linestate = ord(suboption[2:3]) # ensure it is a number
+ self._linestate = ord(suboption[2:3]) # ensure it is a number
if self.logger:
self.logger.info("NOTIFY_LINESTATE: %s" % self._linestate)
elif suboption[1:2] == SERVER_NOTIFY_MODEMSTATE and len(suboption) >= 3:
- self._modemstate = ord(suboption[2:3]) # ensure it is a number
+ self._modemstate = ord(suboption[2:3]) # ensure it is a number
if self.logger:
self.logger.info("NOTIFY_MODEMSTATE: %s" % self._modemstate)
# update time when we think that a poll would make sense
@@ -815,12 +831,12 @@ class Serial(SerialBase):
def rfc2217SendPurge(self, value):
item = self._rfc2217_options['purge']
- item.set(value) # transmit desired purge type
- item.wait(self._network_timeout) # wait for acknowledge from the server
+ item.set(value) # transmit desired purge type
+ item.wait(self._network_timeout) # wait for acknowledge from the server
def rfc2217SetControl(self, value):
item = self._rfc2217_options['control']
- item.set(value) # transmit desired control type
+ item.set(value) # transmit desired control type
if self._ignore_set_control_answer:
# answers are ignored when option is set. compatibility mode for
# servers that answer, but not the expected one... (or no answer
@@ -957,7 +973,7 @@ class PortManager(object):
(self.serial.getRI() and MODEMSTATE_MASK_RI) |
(self.serial.getCD() and MODEMSTATE_MASK_CD))
# check what has changed
- deltas = modemstate ^ (self.last_modemstate or 0) # when last is None -> 0
+ deltas = modemstate ^ (self.last_modemstate or 0) # when last is None -> 0
if deltas & MODEMSTATE_MASK_CTS:
modemstate |= MODEMSTATE_MASK_CTS_CHANGE
if deltas & MODEMSTATE_MASK_DSR:
@@ -1053,7 +1069,7 @@ class PortManager(object):
# other telnet commands
self._telnetProcessCommand(byte)
self.mode = M_NORMAL
- elif self.mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following
+ elif self.mode == M_NEGOTIATE: # DO, DONT, WILL, WONT was received, option now following
self._telnetNegotiateOption(self.telnet_command, byte)
self.mode = M_NORMAL
@@ -1084,7 +1100,6 @@ class PortManager(object):
if self.logger:
self.logger.warning("rejected Telnet option: %r" % (option,))
-
def _telnetProcessSubnegotiation(self, suboption):
"""Process subnegotiation, the data between IAC SB and IAC SE."""
if suboption[0:1] == COM_PORT_OPTION:
@@ -1179,7 +1194,7 @@ class PortManager(object):
elif suboption[2:3] == SET_CONTROL_REQ_BREAK_STATE:
if self.logger:
self.logger.warning("requested break state - not implemented")
- pass # XXX needs cached value
+ pass # XXX needs cached value
elif suboption[2:3] == SET_CONTROL_BREAK_ON:
self.serial.setBreak(True)
if self.logger:
@@ -1193,7 +1208,7 @@ class PortManager(object):
elif suboption[2:3] == SET_CONTROL_REQ_DTR:
if self.logger:
self.logger.warning("requested DTR state - not implemented")
- pass # XXX needs cached value
+ pass # XXX needs cached value
elif suboption[2:3] == SET_CONTROL_DTR_ON:
self.serial.setDTR(True)
if self.logger:
@@ -1207,7 +1222,7 @@ class PortManager(object):
elif suboption[2:3] == SET_CONTROL_REQ_RTS:
if self.logger:
self.logger.warning("requested RTS state - not implemented")
- pass # XXX needs cached value
+ pass # XXX needs cached value
#~ self.rfc2217SendSubnegotiation(SERVER_SET_CONTROL, SET_CONTROL_RTS_ON)
elif suboption[2:3] == SET_CONTROL_RTS_ON:
self.serial.setRTS(True)
@@ -1245,11 +1260,11 @@ class PortManager(object):
self.logger.info("resume")
self._remote_suspend_flow = False
elif suboption[1:2] == SET_LINESTATE_MASK:
- self.linstate_mask = ord(suboption[2:3]) # ensure it is a number
+ self.linstate_mask = ord(suboption[2:3]) # ensure it is a number
if self.logger:
self.logger.info("line state mask: 0x%02x" % (self.linstate_mask,))
elif suboption[1:2] == SET_MODEMSTATE_MASK:
- self.modemstate_mask = ord(suboption[2:3]) # ensure it is a number
+ self.modemstate_mask = ord(suboption[2:3]) # ensure it is a number
if self.logger:
self.logger.info("modem state mask: 0x%02x" % (self.modemstate_mask,))
elif suboption[1:2] == PURGE_DATA:
@@ -1286,13 +1301,8 @@ if __name__ == '__main__':
s = Serial('rfc2217://localhost:7000', 115200)
sys.stdout.write('%s\n' % s)
- #~ s.baudrate = 1898
-
sys.stdout.write("write...\n")
s.write(b"hello\n")
s.flush()
sys.stdout.write("read: %s\n" % s.read(5))
-
- #~ s.baudrate = 19200
- #~ s.databits = 7
s.close()
diff --git a/serial/rs485.py b/serial/rs485.py
index 6f2145b..1a2d4d4 100644
--- a/serial/rs485.py
+++ b/serial/rs485.py
@@ -15,8 +15,10 @@ NOTE: Some implementations may only support a subset of the settings.
import time
import serial
+
class RS485Settings(object):
- def __init__(self,
+ def __init__(
+ self,
rts_level_for_tx=True,
rts_level_for_rx=False,
loopback=False,
@@ -57,7 +59,6 @@ class RS485(serial.Serial):
super(RS485, self).__init__(*args, **kwargs)
self._alternate_rs485_settings = None
-
def write(self, b):
"""Write to port, controlling RTS before and after transmitting."""
if self._alternate_rs485_settings is not None:
@@ -75,7 +76,6 @@ class RS485(serial.Serial):
else:
super(RS485, self).write(b)
-
# redirect where the property stores the settings so that underlying Serial
# instance does not see them
@property
@@ -89,5 +89,3 @@ class RS485(serial.Serial):
@rs485_mode.setter
def rs485_mode(self, rs485_settings):
self._alternate_rs485_settings = rs485_settings
-
-
diff --git a/serial/serialposix.py b/serial/serialposix.py
index 79eaa66..9064c6a 100644
--- a/serial/serialposix.py
+++ b/serial/serialposix.py
@@ -21,7 +21,9 @@ import struct
import sys
import termios
import time
-from serial.serialutil import *
+
+import serial
+from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError
class PlatformSpecificBase(object):
@@ -41,7 +43,7 @@ also add the device name of the serial port and where the
counting starts for the first serial port.
e.g. 'first serial port: /dev/ttyS0'
and with a bit luck you can get this module running...
-""" % (sys.platform, os.name, VERSION))
+""" % (sys.platform, os.name, serial.VERSION))
raise NotImplementedError('no number-to-device mapping defined on this platform')
def _set_special_baudrate(self, baudrate):
@@ -55,7 +57,7 @@ and with a bit luck you can get this module running...
# for the platform
plat = sys.platform.lower()
-if plat[:5] == 'linux': # Linux (confirmed)
+if plat[:5] == 'linux': # Linux (confirmed)
import array
# baudrate ioctls
@@ -66,11 +68,10 @@ if plat[:5] == 'linux': # Linux (confirmed)
# RS485 ioctls
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
- SER_RS485_ENABLED = 0b00000001
- SER_RS485_RTS_ON_SEND = 0b00000010
+ SER_RS485_ENABLED = 0b00000001
+ SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
- SER_RS485_RX_DURING_TX = 0b00010000
-
+ SER_RS485_RX_DURING_TX = 0b00010000
class PlatformSpecific(PlatformSpecificBase):
BAUDRATE_CONSTANTS = {
@@ -122,12 +123,12 @@ if plat[:5] == 'linux': # Linux (confirmed)
buf[9] = buf[10] = baudrate
# set serial_struct
- res = fcntl.ioctl(self.fd, TCSETS2, buf)
+ fcntl.ioctl(self.fd, TCSETS2, buf)
except IOError as e:
raise ValueError('Failed to set custom baud rate (%s): %s' % (baudrate, e))
def _set_rs485_mode(self, rs485_settings):
- buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
+ buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
try:
fcntl.ioctl(self.fd, TIOCGRS485, buf)
if rs485_settings is not None:
@@ -147,7 +148,7 @@ if plat[:5] == 'linux': # Linux (confirmed)
buf[2] = int(rs485_settings.delay_rts_after_send * 1000)
else:
buf[0] = 0 # clear SER_RS485_ENABLED
- res = fcntl.ioctl(self.fd, TIOCSRS485, buf)
+ fcntl.ioctl(self.fd, TIOCSRS485, buf)
except IOError as e:
raise ValueError('Failed to set RS485 mode: %s' % (e,))
@@ -185,7 +186,7 @@ elif plat[:3] == 'bsd' or plat[:7] == 'freebsd':
elif plat[:6] == 'darwin': # OS X
import array
- IOSSIOSPEED = 0x80045402 #_IOW('T', 2, speed_t)
+ IOSSIOSPEED = 0x80045402 # _IOW('T', 2, speed_t)
class PlatformSpecific(PlatformSpecificBase):
def number_to_device(self, port_number):
@@ -208,7 +209,7 @@ elif plat[:6] == 'netbsd': # NetBSD 1.6 testing by Erk
elif plat[:4] == 'irix': # IRIX (partially tested)
class PlatformSpecific(PlatformSpecificBase):
def number_to_device(self, port_number):
- return '/dev/ttyf%d' % (port_number + 1,) #XXX different device names depending on flow control
+ return '/dev/ttyf%d' % (port_number + 1,) # XXX different device names depending on flow control
elif plat[:2] == 'hp': # HP-UX (not tested)
class PlatformSpecific(PlatformSpecificBase):
@@ -235,44 +236,44 @@ else:
# load some constants for later use.
# try to use values from termios, use defaults from linux otherwise
-TIOCMGET = getattr(termios, 'TIOCMGET', 0x5415)
-TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416)
-TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417)
-TIOCMSET = getattr(termios, 'TIOCMSET', 0x5418)
+TIOCMGET = getattr(termios, 'TIOCMGET', 0x5415)
+TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416)
+TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417)
+TIOCMSET = getattr(termios, 'TIOCMSET', 0x5418)
-#TIOCM_LE = getattr(termios, 'TIOCM_LE', 0x001)
+# TIOCM_LE = getattr(termios, 'TIOCM_LE', 0x001)
TIOCM_DTR = getattr(termios, 'TIOCM_DTR', 0x002)
TIOCM_RTS = getattr(termios, 'TIOCM_RTS', 0x004)
-#TIOCM_ST = getattr(termios, 'TIOCM_ST', 0x008)
-#TIOCM_SR = getattr(termios, 'TIOCM_SR', 0x010)
+# TIOCM_ST = getattr(termios, 'TIOCM_ST', 0x008)
+# TIOCM_SR = getattr(termios, 'TIOCM_SR', 0x010)
TIOCM_CTS = getattr(termios, 'TIOCM_CTS', 0x020)
TIOCM_CAR = getattr(termios, 'TIOCM_CAR', 0x040)
TIOCM_RNG = getattr(termios, 'TIOCM_RNG', 0x080)
TIOCM_DSR = getattr(termios, 'TIOCM_DSR', 0x100)
-TIOCM_CD = getattr(termios, 'TIOCM_CD', TIOCM_CAR)
-TIOCM_RI = getattr(termios, 'TIOCM_RI', TIOCM_RNG)
-#TIOCM_OUT1 = getattr(termios, 'TIOCM_OUT1', 0x2000)
-#TIOCM_OUT2 = getattr(termios, 'TIOCM_OUT2', 0x4000)
+TIOCM_CD = getattr(termios, 'TIOCM_CD', TIOCM_CAR)
+TIOCM_RI = getattr(termios, 'TIOCM_RI', TIOCM_RNG)
+# TIOCM_OUT1 = getattr(termios, 'TIOCM_OUT1', 0x2000)
+# TIOCM_OUT2 = getattr(termios, 'TIOCM_OUT2', 0x4000)
if hasattr(termios, 'TIOCINQ'):
TIOCINQ = termios.TIOCINQ
else:
TIOCINQ = getattr(termios, 'FIONREAD', 0x541B)
-TIOCOUTQ = getattr(termios, 'TIOCOUTQ', 0x5411)
+TIOCOUTQ = getattr(termios, 'TIOCOUTQ', 0x5411)
TIOCM_zero_str = struct.pack('I', 0)
TIOCM_RTS_str = struct.pack('I', TIOCM_RTS)
TIOCM_DTR_str = struct.pack('I', TIOCM_DTR)
-TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427)
-TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428)
+TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427)
+TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428)
-CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity
+CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity
class Serial(SerialBase, PlatformSpecific):
"""\
- Serial port class POSIX implementation. Serial port configuration is
+ Serial port class POSIX implementation. Serial port configuration is
done with termios and fcntl. Runs on Linux and many other Un*x like
systems.
"""
@@ -288,7 +289,7 @@ class Serial(SerialBase, PlatformSpecific):
self.fd = None
# open
try:
- self.fd = os.open(self.portstr, os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK)
+ self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
except OSError as msg:
self.fd = None
raise SerialException(msg.errno, "could not open port %s: %s" % (self._port, msg))
@@ -313,7 +314,6 @@ class Serial(SerialBase, PlatformSpecific):
self._update_rts_state()
self.reset_input_buffer()
-
def _reconfigure_port(self):
"""Set communication parameters on opened port."""
if self.fd is None:
@@ -330,15 +330,16 @@ class Serial(SerialBase, PlatformSpecific):
except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here
raise SerialException("Could not configure port: %s" % msg)
# set up raw mode / no echo / binary
- cflag |= (termios.CLOCAL|termios.CREAD)
- lflag &= ~(termios.ICANON|termios.ECHO|termios.ECHOE|termios.ECHOK|termios.ECHONL|
- termios.ISIG|termios.IEXTEN) #|termios.ECHOPRT
- for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk
+ cflag |= (termios.CLOCAL | termios.CREAD)
+ lflag &= ~(termios.ICANON | termios.ECHO | termios.ECHOE |
+ termios.ECHOK | termios.ECHONL |
+ termios.ISIG | termios.IEXTEN) # |termios.ECHOPRT
+ for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk
if hasattr(termios, flag):
lflag &= ~getattr(termios, flag)
- oflag &= ~(termios.OPOST|termios.ONLCR|termios.OCRNL)
- iflag &= ~(termios.INLCR|termios.IGNCR|termios.ICRNL|termios.IGNBRK)
+ oflag &= ~(termios.OPOST | termios.ONLCR | termios.OCRNL)
+ iflag &= ~(termios.INLCR | termios.IGNCR | termios.ICRNL | termios.IGNBRK)
if hasattr(termios, 'IUCLC'):
iflag &= ~termios.IUCLC
if hasattr(termios, 'PARMRK'):
@@ -355,7 +356,7 @@ class Serial(SerialBase, PlatformSpecific):
# may need custom baud rate, it isn't in our list.
ispeed = ospeed = getattr(termios, 'B38400')
try:
- custom_baud = int(self._baudrate) # store for later
+ custom_baud = int(self._baudrate) # store for later
except ValueError:
raise ValueError('Invalid baud rate: %r' % self._baudrate)
else:
@@ -375,27 +376,27 @@ class Serial(SerialBase, PlatformSpecific):
else:
raise ValueError('Invalid char len: %r' % self._bytesize)
# setup stop bits
- if self._stopbits == STOPBITS_ONE:
+ if self._stopbits == serial.STOPBITS_ONE:
cflag &= ~(termios.CSTOPB)
- elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
- cflag |= (termios.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5
- elif self._stopbits == STOPBITS_TWO:
- cflag |= (termios.CSTOPB)
+ elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
+ cflag |= (termios.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5
+ elif self._stopbits == serial.STOPBITS_TWO:
+ cflag |= (termios.CSTOPB)
else:
raise ValueError('Invalid stop bit specification: %r' % self._stopbits)
# setup parity
- iflag &= ~(termios.INPCK|termios.ISTRIP)
- if self._parity == PARITY_NONE:
- cflag &= ~(termios.PARENB|termios.PARODD)
- elif self._parity == PARITY_EVEN:
+ iflag &= ~(termios.INPCK | termios.ISTRIP)
+ if self._parity == serial.PARITY_NONE:
+ cflag &= ~(termios.PARENB | termios.PARODD)
+ elif self._parity == serial.PARITY_EVEN:
cflag &= ~(termios.PARODD)
- cflag |= (termios.PARENB)
- elif self._parity == PARITY_ODD:
- cflag |= (termios.PARENB|termios.PARODD)
- elif self._parity == PARITY_MARK and plat[:5] == 'linux':
- cflag |= (termios.PARENB|CMSPAR|termios.PARODD)
- elif self._parity == PARITY_SPACE and plat[:5] == 'linux':
- cflag |= (termios.PARENB|CMSPAR)
+ cflag |= (termios.PARENB)
+ elif self._parity == serial.PARITY_ODD:
+ cflag |= (termios.PARENB | termios.PARODD)
+ elif self._parity == serial.PARITY_MARK and plat[:5] == 'linux':
+ cflag |= (termios.PARENB | CMSPAR | termios.PARODD)
+ elif self._parity == serial.PARITY_SPACE and plat[:5] == 'linux':
+ cflag |= (termios.PARENB | CMSPAR)
cflag &= ~(termios.PARODD)
else:
raise ValueError('Invalid parity: %r' % self._parity)
@@ -403,23 +404,23 @@ class Serial(SerialBase, PlatformSpecific):
# xonxoff
if hasattr(termios, 'IXANY'):
if self._xonxoff:
- iflag |= (termios.IXON|termios.IXOFF) #|termios.IXANY)
+ iflag |= (termios.IXON | termios.IXOFF) # |termios.IXANY)
else:
- iflag &= ~(termios.IXON|termios.IXOFF|termios.IXANY)
+ iflag &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
else:
if self._xonxoff:
- iflag |= (termios.IXON|termios.IXOFF)
+ iflag |= (termios.IXON | termios.IXOFF)
else:
- iflag &= ~(termios.IXON|termios.IXOFF)
+ iflag &= ~(termios.IXON | termios.IXOFF)
# rtscts
if hasattr(termios, 'CRTSCTS'):
if self._rtscts:
- cflag |= (termios.CRTSCTS)
+ cflag |= (termios.CRTSCTS)
else:
cflag &= ~(termios.CRTSCTS)
elif hasattr(termios, 'CNEW_RTSCTS'): # try it with alternate constant name
if self._rtscts:
- cflag |= (termios.CNEW_RTSCTS)
+ cflag |= (termios.CNEW_RTSCTS)
else:
cflag &= ~(termios.CNEW_RTSCTS)
# XXX should there be a warning if setting up rtscts (and xonxoff etc) fails??
@@ -435,7 +436,10 @@ class Serial(SerialBase, PlatformSpecific):
cc[termios.VTIME] = vtime
# activate settings
if [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] != orig_attr:
- termios.tcsetattr(self.fd, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
+ termios.tcsetattr(
+ self.fd,
+ termios.TCSANOW,
+ [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
# apply custom baud rate, if any
if custom_baud is not None:
@@ -459,7 +463,7 @@ class Serial(SerialBase, PlatformSpecific):
"""Return the number of bytes currently in the input buffer."""
#~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
- return struct.unpack('I',s)[0]
+ return struct.unpack('I', s)[0]
# select based implementation, proved to work on many systems
def read(self, size=1):
@@ -468,18 +472,19 @@ class Serial(SerialBase, PlatformSpecific):
return less characters as requested. With no timeout it will block
until the requested number of bytes is read.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
read = bytearray()
while len(read) < size:
try:
- ready,_,_ = select.select([self.fd],[],[], self._timeout)
+ ready, _, _ = select.select([self.fd], [], [], self._timeout)
# If select was used with a timeout, and the timeout occurs, it
# returns with empty lists -> thus abort read operation.
- # For timeout == 0 (non-blocking operation) also abort when there
- # is nothing to read.
+ # For timeout == 0 (non-blocking operation) also abort when
+ # there is nothing to read.
if not ready:
break # timeout
- buf = os.read(self.fd, size-len(read))
+ buf = os.read(self.fd, size - len(read))
# read should always return some data as select reported it was
# ready to read when we get to this point.
if not buf:
@@ -489,8 +494,8 @@ class Serial(SerialBase, PlatformSpecific):
raise SerialException('device reports readiness to read but returned no data (device disconnected or multiple access on port?)')
read.extend(buf)
except OSError as e:
- # this is for Python 3.x where select.error is a subclass of OSError
- # ignore EAGAIN errors. all other errors are shown
+ # this is for Python 3.x where select.error is a subclass of
+ # OSError ignore EAGAIN errors. all other errors are shown
if e.errno != errno.EAGAIN:
raise SerialException('read failed: %s' % (e,))
except select.error as e:
@@ -503,7 +508,8 @@ class Serial(SerialBase, PlatformSpecific):
def write(self, data):
"""Output the given byte string over the serial port."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
d = to_bytes(data)
tx_len = len(d)
if self._write_timeout is not None and self._write_timeout > 0:
@@ -541,12 +547,14 @@ class Serial(SerialBase, PlatformSpecific):
Flush of file like objects. In this case, wait until all data
is written.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
termios.tcdrain(self.fd)
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
termios.tcflush(self.fd, termios.TCIFLUSH)
def reset_output_buffer(self):
@@ -554,7 +562,8 @@ class Serial(SerialBase, PlatformSpecific):
Clear output buffer, aborting the current output and discarding all
that is in the buffer.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
termios.tcflush(self.fd, termios.TCOFLUSH)
def send_break(self, duration=0.25):
@@ -562,8 +571,9 @@ class Serial(SerialBase, PlatformSpecific):
Send break condition. Timed, returns to idle state after given
duration.
"""
- if not self.is_open: raise portNotOpenError
- termios.tcsendbreak(self.fd, int(duration/0.25))
+ if not self.is_open:
+ raise portNotOpenError
+ termios.tcsendbreak(self.fd, int(duration / 0.25))
def _update_break_state(self):
"""\
@@ -591,30 +601,34 @@ class Serial(SerialBase, PlatformSpecific):
@property
def cts(self):
"""Read terminal status line: Clear To Send"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I',s)[0] & TIOCM_CTS != 0
+ return struct.unpack('I', s)[0] & TIOCM_CTS != 0
@property
def dsr(self):
"""Read terminal status line: Data Set Ready"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I',s)[0] & TIOCM_DSR != 0
+ return struct.unpack('I', s)[0] & TIOCM_DSR != 0
@property
def ri(self):
"""Read terminal status line: Ring Indicator"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I',s)[0] & TIOCM_RI != 0
+ return struct.unpack('I', s)[0] & TIOCM_RI != 0
@property
def cd(self):
"""Read terminal status line: Carrier Detect"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I',s)[0] & TIOCM_CD != 0
+ return struct.unpack('I', s)[0] & TIOCM_CD != 0
# - - platform specific - - - -
@@ -623,11 +637,12 @@ class Serial(SerialBase, PlatformSpecific):
"""Return the number of bytes currently in the output buffer."""
#~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
- return struct.unpack('I',s)[0]
+ return struct.unpack('I', s)[0]
def nonblocking(self):
"""internal - not portable!"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
fcntl.fcntl(self.fd, fcntl.F_SETFL, os.O_NONBLOCK)
def fileno(self):
@@ -635,7 +650,8 @@ class Serial(SerialBase, PlatformSpecific):
For easier use of the serial port instance with select.
WARNING: this function is not portable to different platforms!
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
return self.fd
def set_input_flow_control(self, enable=True):
@@ -644,7 +660,8 @@ class Serial(SerialBase, PlatformSpecific):
This will send XON (true) or XOFF (false) to the other device.
WARNING: this function is not portable to different platforms!
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if enable:
termios.tcflow(self.fd, termios.TCION)
else:
@@ -656,14 +673,14 @@ class Serial(SerialBase, PlatformSpecific):
control is enabled.
WARNING: this function is not portable to different platforms!
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if enable:
termios.tcflow(self.fd, termios.TCOON)
else:
termios.tcflow(self.fd, termios.TCOOFF)
-
class PosixPollSerial(Serial):
"""\
Poll based read implementation. Not all systems support poll properly.
@@ -677,43 +694,43 @@ class PosixPollSerial(Serial):
return less characters as requested. With no timeout it will block
until the requested number of bytes is read.
"""
- if self.fd is None: raise portNotOpenError
+ if self.fd is None:
+ raise portNotOpenError
read = bytearray()
poll = select.poll()
- poll.register(self.fd, select.POLLIN|select.POLLERR|select.POLLHUP|select.POLLNVAL)
+ poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
if size > 0:
while len(read) < size:
# print "\tread(): size",size, "have", len(read) #debug
# wait until device becomes ready to read (or something fails)
- for fd, event in poll.poll(self._timeout*1000):
- if event & (select.POLLERR|select.POLLHUP|select.POLLNVAL):
+ for fd, event in poll.poll(self._timeout * 1000):
+ if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
raise SerialException('device reports error (poll)')
# we don't care if it is select.POLLIN or timeout, that's
# handled below
buf = os.read(self.fd, size - len(read))
read.extend(buf)
if ((self._timeout is not None and self._timeout >= 0) or
- (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf:
+ (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf:
break # early abort on timeout
return bytes(read)
if __name__ == '__main__':
s = Serial(0,
- baudrate=19200, # baud rate
- bytesize=EIGHTBITS, # number of data bits
- parity=PARITY_EVEN, # enable parity checking
- stopbits=STOPBITS_ONE, # number of stop bits
- timeout=3, # set a timeout value, None for waiting forever
- xonxoff=0, # enable software flow control
- rtscts=0, # enable RTS/CTS flow control
+ baudrate=19200, # baud rate
+ bytesize=serial.EIGHTBITS, # number of data bits
+ parity=serial.PARITY_EVEN, # enable parity checking
+ stopbits=serial.STOPBITS_ONE, # number of stop bits
+ timeout=3, # set a timeout value, None for waiting forever
+ xonxoff=0, # enable software flow control
+ rtscts=0, # enable RTS/CTS flow control
)
- s.setRTS(1)
- s.setDTR(1)
- s.flushInput()
- s.flushOutput()
+ s.rts = True
+ s.dtr = True
+ s.reset_input_buffer()
+ s.reset_output_buffer()
s.write('hello')
sys.stdout.write('%r\n' % s.read(5))
sys.stdout.write('%s\n' % s.inWaiting())
del s
-
diff --git a/serial/serialutil.py b/serial/serialutil.py
index 26e6059..ffc50a0 100644
--- a/serial/serialutil.py
+++ b/serial/serialutil.py
@@ -29,13 +29,14 @@ def iterbytes(b):
b = b.tobytes()
x = 0
while True:
- a = b[x:x+1]
+ a = b[x:x + 1]
x += 1
if a:
yield a
else:
break
+
# all Python versions prior 3.x convert ``str([17])`` to '[17]' instead of '\x11'
# so a simple ``bytes(sequence)`` doesn't work for all versions
def to_bytes(seq):
@@ -69,10 +70,10 @@ STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2)
FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8)
PARITY_NAMES = {
- PARITY_NONE: 'None',
- PARITY_EVEN: 'Even',
- PARITY_ODD: 'Odd',
- PARITY_MARK: 'Mark',
+ PARITY_NONE: 'None',
+ PARITY_EVEN: 'Even',
+ PARITY_ODD: 'Odd',
+ PARITY_MARK: 'Mark',
PARITY_SPACE: 'Space',
}
@@ -105,7 +106,7 @@ class SerialBase(io.RawIOBase):
STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO)
def __init__(self,
- port = None, # number of device, numbering starts at
+ port=None, # number of device, numbering starts at
# zero. if everything fails, the user
# can specify a device string, note
# that this isn't portable anymore
@@ -113,13 +114,13 @@ class SerialBase(io.RawIOBase):
baudrate=9600, # baud rate
bytesize=EIGHTBITS, # number of data bits
parity=PARITY_NONE, # enable parity checking
- stopbits=STOPBITS_ONE, # number of stop bits
+ stopbits=STOPBITS_ONE, # number of stop bits
timeout=None, # set a timeout value, None to wait forever
xonxoff=False, # enable software flow control
rtscts=False, # enable RTS/CTS flow control
write_timeout=None, # set a timeout for writes
dsrdtr=False, # None: use rtscts setting, dsrdtr override if True or False
- inter_byte_timeout=None # Inter-character timeout, None to disable
+ inter_byte_timeout=None # Inter-character timeout, None to disable
):
"""\
Initialize comm port object. If a port is given, then the port will be
@@ -127,34 +128,34 @@ class SerialBase(io.RawIOBase):
is returned.
"""
- self.is_open = False
- self._port = None # correct value is assigned below through properties
- self._baudrate = None # correct value is assigned below through properties
- self._bytesize = None # correct value is assigned below through properties
- self._parity = None # correct value is assigned below through properties
- self._stopbits = None # correct value is assigned below through properties
- self._timeout = None # correct value is assigned below through properties
- self._write_timeout = None # correct value is assigned below through properties
- self._xonxoff = None # correct value is assigned below through properties
- self._rtscts = None # correct value is assigned below through properties
- self._dsrdtr = None # correct value is assigned below through properties
- self._inter_byte_timeout = None # correct value is assigned below through properties
- self._rs485_mode = None # disabled by default
+ self.is_open = False
+ self._port = None # correct value is assigned below through properties
+ self._baudrate = None # correct value is assigned below through properties
+ self._bytesize = None # correct value is assigned below through properties
+ self._parity = None # correct value is assigned below through properties
+ self._stopbits = None # correct value is assigned below through properties
+ self._timeout = None # correct value is assigned below through properties
+ self._write_timeout = None # correct value is assigned below through properties
+ self._xonxoff = None # correct value is assigned below through properties
+ self._rtscts = None # correct value is assigned below through properties
+ self._dsrdtr = None # correct value is assigned below through properties
+ self._inter_byte_timeout = None # correct value is assigned below through properties
+ self._rs485_mode = None # disabled by default
self._rts_state = True
self._dtr_state = True
self._break_state = False
# assign values using get/set methods using the properties feature
- self.port = port
+ self.port = port
self.baudrate = baudrate
self.bytesize = bytesize
- self.parity = parity
+ self.parity = parity
self.stopbits = stopbits
- self.timeout = timeout
+ self.timeout = timeout
self.write_timeout = write_timeout
- self.xonxoff = xonxoff
- self.rtscts = rtscts
- self.dsrdtr = dsrdtr
+ self.xonxoff = xonxoff
+ self.rtscts = rtscts
+ self.dsrdtr = dsrdtr
self.inter_character_timeout = inter_byte_timeout
if port is not None:
@@ -220,7 +221,8 @@ class SerialBase(io.RawIOBase):
@bytesize.setter
def bytesize(self, bytesize):
"""Change byte size."""
- if bytesize not in self.BYTESIZES: raise ValueError("Not a valid byte size: %r" % (bytesize,))
+ if bytesize not in self.BYTESIZES:
+ raise ValueError("Not a valid byte size: %r" % (bytesize,))
self._bytesize = bytesize
if self.is_open:
self._reconfigure_port()
@@ -235,7 +237,8 @@ class SerialBase(io.RawIOBase):
@parity.setter
def parity(self, parity):
"""Change parity setting."""
- if parity not in self.PARITIES: raise ValueError("Not a valid parity: %r" % (parity,))
+ if parity not in self.PARITIES:
+ raise ValueError("Not a valid parity: %r" % (parity,))
self._parity = parity
if self.is_open:
self._reconfigure_port()
@@ -250,7 +253,8 @@ class SerialBase(io.RawIOBase):
@stopbits.setter
def stopbits(self, stopbits):
"""Change stop bits size."""
- if stopbits not in self.STOPBITS: raise ValueError("Not a valid stop bit size: %r" % (stopbits,))
+ if stopbits not in self.STOPBITS:
+ raise ValueError("Not a valid stop bit size: %r" % (stopbits,))
self._stopbits = stopbits
if self.is_open:
self._reconfigure_port()
@@ -284,9 +288,10 @@ class SerialBase(io.RawIOBase):
def write_timeout(self, timeout):
"""Change timeout setting."""
if timeout is not None:
- if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,))
+ if timeout < 0:
+ raise ValueError("Not a valid timeout: %r" % (timeout,))
try:
- timeout + 1 #test if it's a number, will throw a TypeError if not...
+ timeout + 1 # test if it's a number, will throw a TypeError if not...
except TypeError:
raise ValueError("Not a valid timeout: %r" % timeout)
@@ -304,7 +309,8 @@ class SerialBase(io.RawIOBase):
def inter_byte_timeout(self, ic_timeout):
"""Change inter-byte timeout setting."""
if ic_timeout is not None:
- if ic_timeout < 0: raise ValueError("Not a valid timeout: %r" % ic_timeout)
+ if ic_timeout < 0:
+ raise ValueError("Not a valid timeout: %r" % ic_timeout)
try:
ic_timeout + 1 # test if it's a number, will throw a TypeError if not...
except TypeError:
@@ -409,14 +415,15 @@ class SerialBase(io.RawIOBase):
# - - - - - - - - - - - - - - - - - - - - - - - -
_SAVED_SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff',
- 'dsrdtr', 'rtscts', 'timeout', 'write_timeout', 'inter_character_timeout')
+ 'dsrdtr', 'rtscts', 'timeout', 'write_timeout',
+ 'inter_character_timeout')
def get_settings(self):
"""\
Get current port settings as a dictionary. For use with
apply_settings().
"""
- return dict([(key, getattr(self, '_'+key)) for key in self._SAVED_SETTINGS])
+ return dict([(key, getattr(self, '_' + key)) for key in self._SAVED_SETTINGS])
def apply_settings(self, d):
"""\
@@ -425,7 +432,7 @@ class SerialBase(io.RawIOBase):
values will simply left unchanged.
"""
for key in self._SAVED_SETTINGS:
- if key in d and d[key] != getattr(self, '_'+key): # check against internal "_" value
+ if key in d and d[key] != getattr(self, '_' + key): # check against internal "_" value
setattr(self, key, d[key]) # set non "_" value to use properties write function
# - - - - - - - - - - - - - - - - - - - - - - - -
@@ -451,9 +458,15 @@ class SerialBase(io.RawIOBase):
# - - - - - - - - - - - - - - - - - - - - - - - -
# compatibility with io library
- def readable(self): return True
- def writable(self): return True
- def seekable(self): return False
+ def readable(self):
+ return True
+
+ def writable(self):
+ return True
+
+ def seekable(self):
+ return False
+
def readinto(self, b):
data = self.read(len(b))
n = len(data)
@@ -568,7 +581,6 @@ class SerialBase(io.RawIOBase):
break
return bytes(line)
-
def iread_until(self, *args, **kwargs):
"""\
Read lines, implemented as generator. It will raise StopIteration on
diff --git a/serial/serialwin32.py b/serial/serialwin32.py
index 9422373..0a736e7 100644
--- a/serial/serialwin32.py
+++ b/serial/serialwin32.py
@@ -13,7 +13,8 @@ import ctypes
import time
from serial import win32
-from serial.serialutil import *
+import serial
+from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError
class Serial(SerialBase):
@@ -48,13 +49,14 @@ class Serial(SerialBase):
except ValueError:
# for like COMnotanumber
pass
- self._port_handle = win32.CreateFile(port,
- win32.GENERIC_READ | win32.GENERIC_WRITE,
- 0, # exclusive access
- None, # no security
- win32.OPEN_EXISTING,
- win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
- 0)
+ self._port_handle = win32.CreateFile(
+ port,
+ win32.GENERIC_READ | win32.GENERIC_WRITE,
+ 0, # exclusive access
+ None, # no security
+ win32.OPEN_EXISTING,
+ win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
+ 0)
if self._port_handle == win32.INVALID_HANDLE_VALUE:
self._port_handle = None # 'cause __del__ is called anyway
raise SerialException("could not open port %r: %r" % (self.portstr, ctypes.WinError()))
@@ -77,7 +79,8 @@ class Serial(SerialBase):
# Clear buffers:
# Remove anything that was there
- win32.PurgeComm(self._port_handle,
+ win32.PurgeComm(
+ self._port_handle,
win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
except:
@@ -92,7 +95,6 @@ class Serial(SerialBase):
else:
self.is_open = True
-
def _reconfigure_port(self):
"""Set communication parameters on opened port."""
if not self._port_handle:
@@ -108,7 +110,7 @@ class Serial(SerialBase):
elif self._timeout == 0:
timeouts = (win32.MAXDWORD, 0, 0, 0, 0)
else:
- timeouts = (0, 0, int(self._timeout*1000), 0, 0)
+ timeouts = (0, 0, int(self._timeout * 1000), 0, 0)
if self._timeout != 0 and self._inter_byte_timeout is not None:
timeouts = (int(self._inter_byte_timeout * 1000),) + timeouts[1:]
@@ -117,7 +119,7 @@ class Serial(SerialBase):
elif self._write_timeout == 0:
timeouts = timeouts[:-2] + (0, win32.MAXDWORD)
else:
- timeouts = timeouts[:-2] + (0, int(self._write_timeout*1000))
+ timeouts = timeouts[:-2] + (0, int(self._write_timeout * 1000))
win32.SetCommTimeouts(self._port_handle, ctypes.byref(win32.COMMTIMEOUTS(*timeouts)))
win32.SetCommMask(self._port_handle, win32.EV_ERR)
@@ -128,45 +130,45 @@ class Serial(SerialBase):
win32.GetCommState(self._port_handle, ctypes.byref(comDCB))
comDCB.BaudRate = self._baudrate
- if self._bytesize == FIVEBITS:
+ if self._bytesize == serial.FIVEBITS:
comDCB.ByteSize = 5
- elif self._bytesize == SIXBITS:
+ elif self._bytesize == serial.SIXBITS:
comDCB.ByteSize = 6
- elif self._bytesize == SEVENBITS:
+ elif self._bytesize == serial.SEVENBITS:
comDCB.ByteSize = 7
- elif self._bytesize == EIGHTBITS:
+ elif self._bytesize == serial.EIGHTBITS:
comDCB.ByteSize = 8
else:
raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
- if self._parity == PARITY_NONE:
+ if self._parity == serial.PARITY_NONE:
comDCB.Parity = win32.NOPARITY
- comDCB.fParity = 0 # Disable Parity Check
- elif self._parity == PARITY_EVEN:
+ comDCB.fParity = 0 # Disable Parity Check
+ elif self._parity == serial.PARITY_EVEN:
comDCB.Parity = win32.EVENPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_ODD:
+ comDCB.fParity = 1 # Enable Parity Check
+ elif self._parity == serial.PARITY_ODD:
comDCB.Parity = win32.ODDPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_MARK:
+ comDCB.fParity = 1 # Enable Parity Check
+ elif self._parity == serial.PARITY_MARK:
comDCB.Parity = win32.MARKPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_SPACE:
+ comDCB.fParity = 1 # Enable Parity Check
+ elif self._parity == serial.PARITY_SPACE:
comDCB.Parity = win32.SPACEPARITY
- comDCB.fParity = 1 # Enable Parity Check
+ comDCB.fParity = 1 # Enable Parity Check
else:
raise ValueError("Unsupported parity mode: %r" % self._parity)
- if self._stopbits == STOPBITS_ONE:
- comDCB.StopBits win32.ONESTOPBIT
- elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
+ if self._stopbits == serial.STOPBITS_ONE:
+ comDCB.StopBits = win32.ONESTOPBIT
+ elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
comDCB.StopBits = win32.ONE5STOPBITS
- elif self._stopbits == STOPBITS_TWO:
+ elif self._stopbits == serial.STOPBITS_TWO:
comDCB.StopBits = win32.TWOSTOPBITS
else:
raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
- comDCB.fBinary = 1 # Enable Binary Transmission
+ comDCB.fBinary = 1 # Enable Binary Transmission
# Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
if self._rs485_mode is None:
if self._rtscts:
@@ -180,38 +182,38 @@ class Serial(SerialBase):
if not self._rs485_mode.rts_level_for_tx:
raise ValueError(
'Unsupported value for RS485Settings.rts_level_for_tx: %r' % (
- self._rs485_mode.rts_level_for_tx,))
+ self._rs485_mode.rts_level_for_tx,))
if self._rs485_mode.rts_level_for_rx:
raise ValueError(
'Unsupported value for RS485Settings.rts_level_for_rx: %r' % (
- self._rs485_mode.rts_level_for_rx,))
+ self._rs485_mode.rts_level_for_rx,))
if self._rs485_mode.delay_before_tx is not None:
raise ValueError(
'Unsupported value for RS485Settings.delay_before_tx: %r' % (
- self._rs485_mode.delay_before_tx,))
+ self._rs485_mode.delay_before_tx,))
if self._rs485_mode.delay_before_rx is not None:
raise ValueError(
'Unsupported value for RS485Settings.delay_before_rx: %r' % (
- self._rs485_mode.delay_before_rx,))
+ self._rs485_mode.delay_before_rx,))
if self._rs485_mode.loopback:
raise ValueError(
'Unsupported value for RS485Settings.loopback: %r' % (
- self._rs485_mode.loopback,))
+ self._rs485_mode.loopback,))
comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE
comDCB.fOutxCtsFlow = 0
if self._dsrdtr:
- comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
+ comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
else:
- comDCB.fDtrControl = win32.DTR_CONTROL_ENABLE if self._dtr_state else win32.DTR_CONTROL_DISABLE
- comDCB.fOutxDsrFlow = self._dsrdtr
- comDCB.fOutX = self._xonxoff
- comDCB.fInX = self._xonxoff
- comDCB.fNull = 0
- comDCB.fErrorChar = 0
- comDCB.fAbortOnError = 0
- comDCB.XonChar = XON
- comDCB.XoffChar = XOFF
+ comDCB.fDtrControl = win32.DTR_CONTROL_ENABLE if self._dtr_state else win32.DTR_CONTROL_DISABLE
+ comDCB.fOutxDsrFlow = self._dsrdtr
+ comDCB.fOutX = self._xonxoff
+ comDCB.fInX = self._xonxoff
+ comDCB.fNull = 0
+ comDCB.fErrorChar = 0
+ comDCB.fAbortOnError = 0
+ comDCB.XonChar = serial.XON
+ comDCB.XoffChar = serial.XOFF
if not win32.SetCommState(self._port_handle, ctypes.byref(comDCB)):
raise ValueError("Cannot configure port, some setting was wrong. Original message: %r" % ctypes.WinError())
@@ -219,7 +221,6 @@ class Serial(SerialBase):
#~ def __del__(self):
#~ self.close()
-
def _close(self):
"""internal close port helper"""
if self._port_handle:
@@ -257,7 +258,8 @@ class Serial(SerialBase):
Read size bytes from the serial port. If a timeout is set it may
return less characters as requested. With no timeout it will block
until the requested number of bytes is read."""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
if size > 0:
win32.ResetEvent(self._overlapped_read.hEvent)
flags = win32.DWORD()
@@ -290,7 +292,8 @@ class Serial(SerialBase):
def write(self, data):
"""Output the given byte string over the serial port."""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
#~ if not isinstance(data, (bytes, bytearray)):
#~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
# convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
@@ -301,7 +304,7 @@ class Serial(SerialBase):
err = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write)
if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
raise SerialException("WriteFile failed (%r)" % ctypes.WinError())
- if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0)
+ if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0)
# Wait for the write to complete.
#~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE)
err = win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True)
@@ -324,7 +327,8 @@ class Serial(SerialBase):
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
def reset_output_buffer(self):
@@ -332,12 +336,14 @@ class Serial(SerialBase):
Clear output buffer, aborting the current output and discarding all
that is in the buffer.
"""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
def _update_break_state(self):
"""Set break: Controls TXD. When active, to transmitting is possible."""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
if self._break_state:
win32.SetCommBreak(self._port_handle)
else:
@@ -358,7 +364,8 @@ class Serial(SerialBase):
win32.EscapeCommFunction(self._port_handle, win32.CLRDTR)
def _GetCommModemStatus(self):
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
stat = win32.DWORD()
win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat))
return stat.value
@@ -390,7 +397,8 @@ class Serial(SerialBase):
Recommend a buffer size to the driver (device driver can ignore this
value). Must be called before the port is opended.
"""
- if tx_size is None: tx_size = rx_size
+ if tx_size is None:
+ tx_size = rx_size
win32.SetupComm(self._port_handle, rx_size, tx_size)
def set_output_flow_control(self, enable=True):
@@ -400,7 +408,8 @@ class Serial(SerialBase):
from the other device and control the transmission accordingly.
WARNING: this function is not portable to different platforms!
"""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
if enable:
win32.EscapeCommFunction(self._port_handle, win32.SETXON)
else:
@@ -416,9 +425,9 @@ class Serial(SerialBase):
return comstat.cbOutQue
-
# Nur Testfunktion!!
if __name__ == '__main__':
+ import sys
s = Serial(0)
sys.stdout.write("%s\n" % s)
@@ -431,4 +440,3 @@ if __name__ == '__main__':
s.port = 0
s.open()
sys.stdout.write("%s\n" % s)
-
diff --git a/serial/tools/hexlify_codec.py b/serial/tools/hexlify_codec.py
index f3ba04f..5b35e8e 100644
--- a/serial/tools/hexlify_codec.py
+++ b/serial/tools/hexlify_codec.py
@@ -9,18 +9,23 @@ HEXDIGITS = '0123456789ABCDEF'
### Codec APIs
+
def hex_encode(input, errors='strict'):
return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input))
+
def hex_decode(input, errors='strict'):
return (''.join('{:02X} '.format(b) for b in input), len(input))
+
class Codec(codecs.Codec):
def encode(self, input, errors='strict'):
return serial.to_bytes([int(h, 16) for h in input.split()])
+
def decode(self, input, errors='strict'):
return ''.join('{:02X} '.format(b) for b in input)
+
class IncrementalEncoder(codecs.IncrementalEncoder):
def __init__(self, errors='strict'):
@@ -57,18 +62,21 @@ class IncrementalEncoder(codecs.IncrementalEncoder):
self.state = state
return serial.to_bytes(encoded)
+
class IncrementalDecoder(codecs.IncrementalDecoder):
def decode(self, input, final=False):
return ''.join('{:02X} '.format(b) for b in input)
+
class StreamWriter(Codec, codecs.StreamWriter):
pass
+
class StreamReader(Codec, codecs.StreamReader):
pass
-### encodings module API
+### encodings module API
def getregentry():
return codecs.CodecInfo(
name='hexlify',
diff --git a/serial/tools/list_ports.py b/serial/tools/list_ports.py
index c5896c5..3b32b92 100644
--- a/serial/tools/list_ports.py
+++ b/serial/tools/list_ports.py
@@ -17,23 +17,24 @@ Additionally a grep function is supplied that can be used to search for ports
based on their descriptions or hardware ID.
"""
-import sys, os, re
+import sys
+import os
+import re
# chose an implementation, depending on os
#~ if sys.platform == 'cli':
#~ else:
-import os
-# chose an implementation, depending on os
-if os.name == 'nt': #sys.platform == 'win32':
- from serial.tools.list_ports_windows import *
+if os.name == 'nt': # sys.platform == 'win32':
+ from serial.tools.list_ports_windows import comports
elif os.name == 'posix':
- from serial.tools.list_ports_posix import *
+ from serial.tools.list_ports_posix import comports
#~ elif os.name == 'java':
else:
raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+
def grep(regexp):
"""\
Search for ports using a regular expression. Port name, description and
@@ -52,21 +53,23 @@ def main():
parser = argparse.ArgumentParser(description='Serial port enumeration')
- parser.add_argument('regexp',
+ parser.add_argument(
+ 'regexp',
nargs='?',
help='only show ports that match this regex')
- parser.add_argument('-v', '--verbose',
+ parser.add_argument(
+ '-v', '--verbose',
action='store_true',
help='show more messages')
- parser.add_argument('-q', '--quiet',
+ parser.add_argument(
+ '-q', '--quiet',
action='store_true',
help='suppress all messages')
args = parser.parse_args()
-
hits = 0
# get iteraror w/ or w/o filter
if args.regexp:
diff --git a/serial/tools/list_ports_linux.py b/serial/tools/list_ports_linux.py
index f7491e5..3e37d72 100644
--- a/serial/tools/list_ports_linux.py
+++ b/serial/tools/list_ports_linux.py
@@ -15,7 +15,7 @@ import glob
import os
import re
import subprocess
-import sys
+
def read_command(argv):
"""run a command and return its output"""
@@ -24,6 +24,7 @@ def read_command(argv):
except subprocess.SubprocessError as e:
raise IOError('command %r failed: %s' % (argv, e))
+
def read_line(*args):
"""\
Helper function to read a single line from a file.
@@ -37,6 +38,7 @@ def read_line(*args):
except IOError:
return None
+
def re_group(regexp, text):
"""search for regexp in text, return 1st group on match"""
m = re.search(regexp, text)
@@ -63,6 +65,7 @@ def usb_sysfs_hw_string(sysfs_path):
snr_txt
)
+
def usb_lsusb_string(sysfs_path):
base = os.path.basename(os.path.realpath(sysfs_path))
bus = base.split('-')[0]
@@ -82,6 +85,7 @@ def usb_lsusb_string(sysfs_path):
# create descriptions. prefer text from device, fall back to the others
return '%s %s %s' % (iManufacturer or idVendor, iProduct or idProduct, iSerial)
+
def describe(device):
"""\
Get a human readable description.
@@ -106,6 +110,7 @@ def describe(device):
return read_line(product_name_file)
return base
+
def hwinfo(device):
"""Try to get a HW identification using sysfs"""
base = os.path.basename(device)
@@ -126,6 +131,7 @@ def hwinfo(device):
return usb_sysfs_hw_string(sys_dev_path + '/..')
return 'n/a' # XXX directly remove these from the list?
+
def comports():
devices = glob.glob('/dev/ttyS*') # built-in serial ports
devices.extend(glob.glob('/dev/ttyUSB*')) # usb-serial with own driver
@@ -138,4 +144,3 @@ def comports():
if __name__ == '__main__':
for port, desc, hwid in sorted(comports()):
print("%s: %s [%s]" % (port, desc, hwid))
-
diff --git a/serial/tools/list_ports_posix.py b/serial/tools/list_ports_posix.py
index a1a3faf..9a9b05d 100644
--- a/serial/tools/list_ports_posix.py
+++ b/serial/tools/list_ports_posix.py
@@ -27,7 +27,7 @@ import os
# try to detect the OS so that a device can be selected...
plat = sys.platform.lower()
-if plat[:5] == 'linux': # Linux (confirmed)
+if plat[:5] == 'linux': # Linux (confirmed)
from serial.tools.list_ports_linux import comports
elif plat == 'cygwin': # cygwin/win32
@@ -85,6 +85,7 @@ elif plat[:3] == 'aix': # AIX
else:
# platform detection has failed...
+ import serial
sys.stderr.write("""\
don't know how to enumerate ttys on this system.
! I you know how the serial ports are named send this information to
@@ -103,4 +104,3 @@ this module running...
if __name__ == '__main__':
for port, desc, hwid in sorted(comports()):
print("%s: %s [%s]" % (port, desc, hwid))
-
diff --git a/serial/tools/list_ports_windows.py b/serial/tools/list_ports_windows.py
index c285dde..838e85d 100644
--- a/serial/tools/list_ports_windows.py
+++ b/serial/tools/list_ports_windows.py
@@ -7,17 +7,8 @@
#
# SPDX-License-Identifier: BSD-3-Clause
-import ctypes
import re
-
-def ValidHandle(value, func, arguments):
- if value == 0:
- raise ctypes.WinError()
- return value
-
-import serial
-from serial.win32 import ULONG_PTR, is_64bit
-from ctypes.wintypes import HANDLE
+import ctypes
from ctypes.wintypes import BOOL
from ctypes.wintypes import HWND
from ctypes.wintypes import DWORD
@@ -27,6 +18,14 @@ from ctypes.wintypes import ULONG
from ctypes.wintypes import LPCSTR
from ctypes.wintypes import HKEY
from ctypes.wintypes import BYTE
+import serial
+from serial.win32 import ULONG_PTR
+
+
+def ValidHandle(value, func, arguments):
+ if value == 0:
+ raise ctypes.WinError()
+ return value
NULL = 0
HDEVINFO = ctypes.c_void_p
@@ -45,11 +44,13 @@ def byte_buffer(length):
"""Get a buffer for a string"""
return (BYTE*length)()
+
def string(buffer):
s = []
for c in buffer:
- if c == 0: break
- s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
+ if c == 0:
+ break
+ s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
return ''.join(s)
@@ -60,6 +61,7 @@ class GUID(ctypes.Structure):
('Data3', WORD),
('Data4', BYTE*8),
]
+
def __str__(self):
return "{%08x-%04x-%04x-%s-%s}" % (
self.Data1,
@@ -69,6 +71,7 @@ class GUID(ctypes.Structure):
''.join(["%02x" % d for d in self.Data4[2:]]),
)
+
class SP_DEVINFO_DATA(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
@@ -76,6 +79,7 @@ class SP_DEVINFO_DATA(ctypes.Structure):
('DevInst', DWORD),
('Reserved', ULONG_PTR),
]
+
def __str__(self):
return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
@@ -134,11 +138,12 @@ DIREG_DEV = 0x00000001
KEY_READ = 0x20019
# workaround for compatibility between Python 2.x and 3.x
-Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports"
-PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
+Ports = serial.to_bytes([80, 111, 114, 116, 115]) # "Ports"
+PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
+
def comports():
- GUIDs = (GUID*8)() # so far only seen one used, so hope 8 are enough...
+ GUIDs = (GUID*8)() # so far only seen one used, so hope 8 are enough...
guids_size = DWORD()
if not SetupDiClassGuidsFromName(
Ports,
@@ -153,7 +158,7 @@ def comports():
ctypes.byref(GUIDs[index]),
None,
NULL,
- DIGCF_PRESENT) # was DIGCF_PRESENT|DIGCF_DEVICEINTERFACE which misses CDC ports
+ DIGCF_PRESENT) # was DIGCF_PRESENT|DIGCF_DEVICEINTERFACE which misses CDC ports
devinfo = SP_DEVINFO_DATA()
devinfo.cbSize = ctypes.sizeof(devinfo)
@@ -244,8 +249,5 @@ def comports():
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test
if __name__ == '__main__':
- import serial
-
for port, desc, hwid in sorted(comports()):
print("%s: %s [%s]" % (port, desc, hwid))
-
diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py
index 513bb2a..711edaf 100644
--- a/serial/tools/miniterm.py
+++ b/serial/tools/miniterm.py
@@ -189,8 +189,8 @@ class NoTerminal(Transform):
REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t')
REPLACEMENT_MAP.update({
- 0x7F: 0x2421, # DEL
- 0x9B: 0x2425, # CSI
+ 0x7F: 0x2421, # DEL
+ 0x9B: 0x2425, # CSI
})
def rx(self, text):
@@ -204,9 +204,9 @@ class NoControls(NoTerminal):
REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32))
REPLACEMENT_MAP.update({
- 32: 0x2423, # visual space
- 0x7F: 0x2421, # DEL
- 0x9B: 0x2425, # CSI
+ 32: 0x2423, # visual space
+ 0x7F: 0x2421, # DEL
+ 0x9B: 0x2425, # CSI
})
@@ -276,8 +276,8 @@ TRANSFORMATIONS = {
'debug': DebugIO,
}
-# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def dump_port_list():
if comports:
sys.stderr.write('\n--- Available ports:\n')
@@ -313,7 +313,6 @@ class Miniterm(object):
self._reader_alive = False
self.receiver_thread.join()
-
def start(self):
self.alive = True
self._start_reader()
@@ -344,7 +343,6 @@ class Miniterm(object):
self.output_encoding = encoding
self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
-
def dump_port_settings(self):
sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
p=self.serial))
@@ -386,13 +384,12 @@ class Miniterm(object):
for transformation in self.rx_transformations:
text = transformation.rx(text)
self.console.write(text)
- except serial.SerialException as e:
+ except serial.SerialException:
self.alive = False
# XXX would be nice if the writer could be interrupted at this
# point... to exit completely
raise
-
def writer(self):
"""\
Loop and copy console->serial until self.exit_character character is
@@ -488,7 +485,7 @@ class Miniterm(object):
self.update_transformations()
sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
elif c == '\x0c': # CTRL+L -> EOL mode
- modes = list(EOL_TRANSFORMATIONS) # keys
+ modes = list(EOL_TRANSFORMATIONS) # keys
eol = modes.index(self.eol) + 1
if eol >= len(modes):
eol = 0
@@ -637,7 +634,6 @@ class Miniterm(object):
)
-
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# default args can be used to override when calling main() from an other script
# e.g to create a miniterm-my-device.py
@@ -647,12 +643,14 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
parser = argparse.ArgumentParser(
description="Miniterm - A simple terminal program for the serial port.")
- parser.add_argument("port",
+ parser.add_argument(
+ "port",
nargs='?',
help="serial port name",
default=default_port)
- parser.add_argument("baudrate",
+ parser.add_argument(
+ "baudrate",
nargs='?',
type=int,
help="set baud rate, default: %(default)s",
@@ -660,72 +658,84 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
group = parser.add_argument_group("port settings")
- group.add_argument("--parity",
+ group.add_argument(
+ "--parity",
choices=['N', 'E', 'O', 'S', 'M'],
type=lambda c: c.upper(),
help="set parity, one of {N E O S M}, default: N",
default='N')
- group.add_argument("--rtscts",
+ group.add_argument(
+ "--rtscts",
action="store_true",
help="enable RTS/CTS flow control (default off)",
default=False)
- group.add_argument("--xonxoff",
+ group.add_argument(
+ "--xonxoff",
action="store_true",
help="enable software flow control (default off)",
default=False)
- group.add_argument("--rts",
+ group.add_argument(
+ "--rts",
type=int,
help="set initial RTS line state (possible values: 0, 1)",
default=default_rts)
- group.add_argument("--dtr",
+ group.add_argument(
+ "--dtr",
type=int,
help="set initial DTR line state (possible values: 0, 1)",
default=default_dtr)
group = parser.add_argument_group("data handling")
- group.add_argument("-e", "--echo",
+ group.add_argument(
+ "-e", "--echo",
action="store_true",
help="enable local echo (default off)",
default=False)
- group.add_argument("--encoding",
+ group.add_argument(
+ "--encoding",
dest="serial_port_encoding",
metavar="CODEC",
help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s",
default='UTF-8')
- group.add_argument("-f", "--filter",
+ group.add_argument(
+ "-f", "--filter",
action="append",
metavar="NAME",
help="add text transformation",
default=[])
- group.add_argument("--eol",
+ group.add_argument(
+ "--eol",
choices=['CR', 'LF', 'CRLF'],
type=lambda c: c.upper(),
help="end of line mode",
default='CRLF')
- group.add_argument("--raw",
+ group.add_argument(
+ "--raw",
action="store_true",
help="Do no apply any encodings/transformations",
default=False)
group = parser.add_argument_group("hotkeys")
- group.add_argument("--exit-char",
+ group.add_argument(
+ "--exit-char",
type=int,
metavar='NUM',
help="Unicode of special character that is used to exit the application, default: %(default)s",
default=0x1d # GS/CTRL+]
)
- group.add_argument("--menu-char",
+ group.add_argument(
+ "--menu-char",
type=int,
metavar='NUM',
help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s",
@@ -734,12 +744,14 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
group = parser.add_argument_group("diagnostics")
- group.add_argument("-q", "--quiet",
+ group.add_argument(
+ "-q", "--quiet",
action="store_true",
help="suppress non-error messages",
default=False)
- group.add_argument("--develop",
+ group.add_argument(
+ "--develop",
action="store_true",
help="show Python traceback on error",
default=False)
@@ -749,7 +761,6 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
if args.menu_char == args.exit_char:
parser.error('--exit-char can not be the same as --menu-char')
-
# no port given on command line -> ask user now
if args.port is None:
dump_port_list()
@@ -767,7 +778,6 @@ def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr
else:
filters = ['default']
-
try:
serial_instance = serial.serial_for_url(
args.port,
diff --git a/serial/urlhandler/protocol_hwgrep.py b/serial/urlhandler/protocol_hwgrep.py
index 90626ff..e184ec5 100644
--- a/serial/urlhandler/protocol_hwgrep.py
+++ b/serial/urlhandler/protocol_hwgrep.py
@@ -20,6 +20,7 @@ try:
except NameError:
basestring = str # python 3
+
class Serial(serial.Serial):
"""Just inherit the native Serial port implementation and patch the port property."""
@@ -43,8 +44,6 @@ class Serial(serial.Serial):
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
if __name__ == '__main__':
- #~ s = Serial('hwgrep://ttyS0')
s = Serial(None)
s.port = 'hwgrep://ttyS0'
print(s)
-
diff --git a/serial/urlhandler/protocol_loop.py b/serial/urlhandler/protocol_loop.py
index 79d2c56..f5104b4 100644
--- a/serial/urlhandler/protocol_loop.py
+++ b/serial/urlhandler/protocol_loop.py
@@ -17,7 +17,6 @@
# - "debug" print diagnostic messages
import logging
import numbers
-import threading
import time
try:
import urlparse
@@ -91,13 +90,6 @@ class Serial(SerialBase):
if self.logger:
self.logger.info('_reconfigure_port()')
- def close(self):
- """Close port"""
- if self.is_open:
- self.is_open = False
- # in case of quick reconnects, give the server some time
- time.sleep(0.3)
-
def from_url(self, url):
"""extract host and port from an URL string"""
parts = urlparse.urlsplit(url)
@@ -121,7 +113,8 @@ class Serial(SerialBase):
@property
def in_waiting(self):
"""Return the number of bytes currently in the input buffer."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
# attention the logged value can differ from return value in
# threaded environments...
@@ -134,7 +127,8 @@ class Serial(SerialBase):
return less characters as requested. With no timeout it will block
until the requested number of bytes is read.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self._timeout is not None:
timeout = time.time() + self._timeout
else:
@@ -142,7 +136,7 @@ class Serial(SerialBase):
data = bytearray()
while size > 0 and self.is_open:
try:
- data += self.queue.get(timeout=self._timeout) # XXX inter char timeout
+ data += self.queue.get(timeout=self._timeout) # XXX inter char timeout
except queue.Empty:
break
else:
@@ -159,14 +153,15 @@ class Serial(SerialBase):
connection is blocked. May raise SerialException if the connection is
closed.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
data = to_bytes(data)
# calculate aprox time that would be used to send the data
time_used_to_send = 10.0*len(data) / self._baudrate
# when a write timeout is configured check if we would be successful
# (not sending anything, not even the part that would have time)
if self._write_timeout is not None and time_used_to_send > self._write_timeout:
- time.sleep(self._write_timeout) # must wait so that unit test succeeds
+ time.sleep(self._write_timeout) # must wait so that unit test succeeds
raise writeTimeoutError
for byte in iterbytes(data):
self.queue.put(byte, timeout=self._write_timeout)
@@ -174,7 +169,8 @@ class Serial(SerialBase):
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('reset_input_buffer()')
try:
@@ -188,7 +184,8 @@ class Serial(SerialBase):
Clear output buffer, aborting the current output and
discarding all that is in the buffer.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('reset_output_buffer()')
try:
@@ -218,7 +215,8 @@ class Serial(SerialBase):
@property
def cts(self):
"""Read terminal status line: Clear To Send"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('CTS -> state of RTS (%r)' % (self._rts_state,))
return self._rts_state
@@ -233,7 +231,8 @@ class Serial(SerialBase):
@property
def ri(self):
"""Read terminal status line: Ring Indicator"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('returning dummy for RI')
return False
@@ -241,7 +240,8 @@ class Serial(SerialBase):
@property
def cd(self):
"""Read terminal status line: Carrier Detect"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('returning dummy for CD')
return True
diff --git a/serial/urlhandler/protocol_socket.py b/serial/urlhandler/protocol_socket.py
index 99e532f..a4297bb 100644
--- a/serial/urlhandler/protocol_socket.py
+++ b/serial/urlhandler/protocol_socket.py
@@ -27,7 +27,7 @@ try:
except ImportError:
import urllib.parse as urlparse
-from serial.serialutil import *
+from serial.serialutil import SerialBase, SerialException, portNotOpenError, to_bytes
# map log level names to constants. used in from_url()
LOGGER_LEVELS = {
@@ -39,6 +39,7 @@ LOGGER_LEVELS = {
POLL_TIMEOUT = 2
+
class Serial(SerialBase):
"""Serial port implementation for plain sockets."""
@@ -61,7 +62,7 @@ class Serial(SerialBase):
self._socket = None
raise SerialException("Could not open port %s: %s" % (self.portstr, msg))
- self._socket.settimeout(POLL_TIMEOUT) # used for write timeout support :/
+ self._socket.settimeout(POLL_TIMEOUT) # used for write timeout support :/
# not that there anything to configure...
self._reconfigure_port()
@@ -116,7 +117,8 @@ class Serial(SerialBase):
raise ValueError('unknown option: %r' % (option,))
# get host and port
host, port = parts.hostname, parts.port
- if not 0 <= port < 65536: raise ValueError("port not in range 0...65535")
+ if not 0 <= port < 65536:
+ raise ValueError("port not in range 0...65535")
except ValueError as e:
raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
return (host, port)
@@ -126,7 +128,8 @@ class Serial(SerialBase):
@property
def in_waiting(self):
"""Return the number of bytes currently in the input buffer."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
# Poll the socket to see if it is ready for reading.
# If ready, at least one byte will be to read.
lr, lw, lx = select.select([self._socket], [], [], 0)
@@ -138,7 +141,8 @@ class Serial(SerialBase):
return less characters as requested. With no timeout it will block
until the requested number of bytes is read.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
data = bytearray()
if self._timeout is not None:
timeout = time.time() + self._timeout
@@ -171,7 +175,8 @@ class Serial(SerialBase):
connection is blocked. May raise SerialException if the connection is
closed.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
try:
self._socket.sendall(to_bytes(data))
except socket.error as e:
@@ -181,7 +186,8 @@ class Serial(SerialBase):
def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('ignored reset_input_buffer')
@@ -190,7 +196,8 @@ class Serial(SerialBase):
Clear output buffer, aborting the current output and
discarding all that is in the buffer.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('ignored reset_output_buffer')
@@ -199,7 +206,8 @@ class Serial(SerialBase):
Send break condition. Timed, returns to idle state after given
duration.
"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('ignored send_break(%r)' % (duration,))
@@ -222,7 +230,8 @@ class Serial(SerialBase):
@property
def cts(self):
"""Read terminal status line: Clear To Send"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('returning dummy for cts')
return True
@@ -230,7 +239,8 @@ class Serial(SerialBase):
@property
def dsr(self):
"""Read terminal status line: Data Set Ready"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('returning dummy for dsr')
return True
@@ -238,7 +248,8 @@ class Serial(SerialBase):
@property
def ri(self):
"""Read terminal status line: Ring Indicator"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('returning dummy for ri')
return False
@@ -246,7 +257,8 @@ class Serial(SerialBase):
@property
def cd(self):
"""Read terminal status line: Carrier Detect"""
- if not self.is_open: raise portNotOpenError
+ if not self.is_open:
+ raise portNotOpenError
if self.logger:
self.logger.info('returning dummy for cd)')
return True
diff --git a/serial/urlhandler/protocol_spy.py b/serial/urlhandler/protocol_spy.py
index 9a7c75c..647b760 100644
--- a/serial/urlhandler/protocol_spy.py
+++ b/serial/urlhandler/protocol_spy.py
@@ -263,4 +263,3 @@ if __name__ == '__main__':
s = Serial(None)
s.port = 'spy:///dev/ttyS0'
print(s)
-
diff --git a/serial/win32.py b/serial/win32.py
index 88a1335..c8dd3e3 100644
--- a/serial/win32.py
+++ b/serial/win32.py
@@ -8,14 +8,15 @@ from ctypes import *
from ctypes.wintypes import HANDLE
from ctypes.wintypes import BOOL
from ctypes.wintypes import LPCWSTR
-_stdcall_libraries = {}
-_stdcall_libraries['kernel32'] = WinDLL('kernel32')
from ctypes.wintypes import DWORD
from ctypes.wintypes import WORD
from ctypes.wintypes import BYTE
+_stdcall_libraries = {}
+_stdcall_libraries['kernel32'] = WinDLL('kernel32')
INVALID_HANDLE_VALUE = HANDLE(-1).value
+
# some details of the windows API differ between 32 and 64 bit systems..
def is_64bit():
"""Returns true when running on a 64 bit system"""
@@ -45,7 +46,7 @@ except AttributeError:
CreateEventA = _stdcall_libraries['kernel32'].CreateEventA
CreateEventA.restype = HANDLE
CreateEventA.argtypes = [LPSECURITY_ATTRIBUTES, BOOL, BOOL, LPCSTR]
- CreateEvent=CreateEventA
+ CreateEvent = CreateEventA
CreateFileA = _stdcall_libraries['kernel32'].CreateFileA
CreateFileA.restype = HANDLE
@@ -54,27 +55,35 @@ except AttributeError:
else:
CreateEventW.restype = HANDLE
CreateEventW.argtypes = [LPSECURITY_ATTRIBUTES, BOOL, BOOL, LPCWSTR]
- CreateEvent = CreateEventW # alias
+ CreateEvent = CreateEventW # alias
CreateFileW = _stdcall_libraries['kernel32'].CreateFileW
CreateFileW.restype = HANDLE
CreateFileW.argtypes = [LPCWSTR, DWORD, DWORD, LPSECURITY_ATTRIBUTES, DWORD, DWORD, HANDLE]
- CreateFile = CreateFileW # alias
+ CreateFile = CreateFileW # alias
+
class _OVERLAPPED(Structure):
pass
+
OVERLAPPED = _OVERLAPPED
+
class _COMSTAT(Structure):
pass
+
COMSTAT = _COMSTAT
+
class _DCB(Structure):
pass
+
DCB = _DCB
+
class _COMMTIMEOUTS(Structure):
pass
+
COMMTIMEOUTS = _COMMTIMEOUTS
GetLastError = _stdcall_libraries['kernel32'].GetLastError
@@ -166,66 +175,70 @@ WaitForSingleObject = _stdcall_libraries['kernel32'].WaitForSingleObject
WaitForSingleObject.restype = DWORD
WaitForSingleObject.argtypes = [HANDLE, DWORD]
-ONESTOPBIT = 0 # Variable c_int
-TWOSTOPBITS = 2 # Variable c_int
+ONESTOPBIT = 0 # Variable c_int
+TWOSTOPBITS = 2 # Variable c_int
ONE5STOPBITS = 1
-NOPARITY = 0 # Variable c_int
-ODDPARITY = 1 # Variable c_int
-EVENPARITY = 2 # Variable c_int
+NOPARITY = 0 # Variable c_int
+ODDPARITY = 1 # Variable c_int
+EVENPARITY = 2 # Variable c_int
MARKPARITY = 3
SPACEPARITY = 4
-RTS_CONTROL_HANDSHAKE = 2 # Variable c_int
-RTS_CONTROL_DISABLE = 0 # Variable c_int
-RTS_CONTROL_ENABLE = 1 # Variable c_int
-RTS_CONTROL_TOGGLE = 3 # Variable c_int
+RTS_CONTROL_HANDSHAKE = 2 # Variable c_int
+RTS_CONTROL_DISABLE = 0 # Variable c_int
+RTS_CONTROL_ENABLE = 1 # Variable c_int
+RTS_CONTROL_TOGGLE = 3 # Variable c_int
SETRTS = 3
CLRRTS = 4
-DTR_CONTROL_HANDSHAKE = 2 # Variable c_int
-DTR_CONTROL_DISABLE = 0 # Variable c_int
-DTR_CONTROL_ENABLE = 1 # Variable c_int
+DTR_CONTROL_HANDSHAKE = 2 # Variable c_int
+DTR_CONTROL_DISABLE = 0 # Variable c_int
+DTR_CONTROL_ENABLE = 1 # Variable c_int
SETDTR = 5
CLRDTR = 6
-MS_DSR_ON = 32 # Variable c_ulong
-EV_RING = 256 # Variable c_int
-EV_PERR = 512 # Variable c_int
-EV_ERR = 128 # Variable c_int
-SETXOFF = 1 # Variable c_int
-EV_RXCHAR = 1 # Variable c_int
-GENERIC_WRITE = 1073741824 # Variable c_long
-PURGE_TXCLEAR = 4 # Variable c_int
-FILE_FLAG_OVERLAPPED = 1073741824 # Variable c_int
-EV_DSR = 16 # Variable c_int
-MAXDWORD = 4294967295 # Variable c_uint
-EV_RLSD = 32 # Variable c_int
-ERROR_IO_PENDING = 997 # Variable c_long
-MS_CTS_ON = 16 # Variable c_ulong
-EV_EVENT1 = 2048 # Variable c_int
-EV_RX80FULL = 1024 # Variable c_int
-PURGE_RXABORT = 2 # Variable c_int
-FILE_ATTRIBUTE_NORMAL = 128 # Variable c_int
-PURGE_TXABORT = 1 # Variable c_int
-SETXON = 2 # Variable c_int
-OPEN_EXISTING = 3 # Variable c_int
-MS_RING_ON = 64 # Variable c_ulong
-EV_TXEMPTY = 4 # Variable c_int
-EV_RXFLAG = 2 # Variable c_int
-MS_RLSD_ON = 128 # Variable c_ulong
-GENERIC_READ = 2147483648 # Variable c_ulong
-EV_EVENT2 = 4096 # Variable c_int
-EV_CTS = 8 # Variable c_int
-EV_BREAK = 64 # Variable c_int
-PURGE_RXCLEAR = 8 # Variable c_int
+MS_DSR_ON = 32 # Variable c_ulong
+EV_RING = 256 # Variable c_int
+EV_PERR = 512 # Variable c_int
+EV_ERR = 128 # Variable c_int
+SETXOFF = 1 # Variable c_int
+EV_RXCHAR = 1 # Variable c_int
+GENERIC_WRITE = 1073741824 # Variable c_long
+PURGE_TXCLEAR = 4 # Variable c_int
+FILE_FLAG_OVERLAPPED = 1073741824 # Variable c_int
+EV_DSR = 16 # Variable c_int
+MAXDWORD = 4294967295 # Variable c_uint
+EV_RLSD = 32 # Variable c_int
+ERROR_IO_PENDING = 997 # Variable c_long
+MS_CTS_ON = 16 # Variable c_ulong
+EV_EVENT1 = 2048 # Variable c_int
+EV_RX80FULL = 1024 # Variable c_int
+PURGE_RXABORT = 2 # Variable c_int
+FILE_ATTRIBUTE_NORMAL = 128 # Variable c_int
+PURGE_TXABORT = 1 # Variable c_int
+SETXON = 2 # Variable c_int
+OPEN_EXISTING = 3 # Variable c_int
+MS_RING_ON = 64 # Variable c_ulong
+EV_TXEMPTY = 4 # Variable c_int
+EV_RXFLAG = 2 # Variable c_int
+MS_RLSD_ON = 128 # Variable c_ulong
+GENERIC_READ = 2147483648 # Variable c_ulong
+EV_EVENT2 = 4096 # Variable c_int
+EV_CTS = 8 # Variable c_int
+EV_BREAK = 64 # Variable c_int
+PURGE_RXCLEAR = 8 # Variable c_int
INFINITE = 0xFFFFFFFF
class N11_OVERLAPPED4DOLLAR_48E(Union):
pass
+
+
class N11_OVERLAPPED4DOLLAR_484DOLLAR_49E(Structure):
pass
+
+
N11_OVERLAPPED4DOLLAR_484DOLLAR_49E._fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD),