diff options
author | Chris Liechti <cliechti@gmx.net> | 2015-08-30 21:28:04 +0200 |
---|---|---|
committer | Chris Liechti <cliechti@gmx.net> | 2015-08-30 21:28:04 +0200 |
commit | 033f17c9d5c40fe8b5e55b6ada1d483b612ca092 (patch) | |
tree | 5ada169ba7c6a8036357904772075467cda8d4ea /serial/serialposix.py | |
parent | 518b0d31aafdb9ea52a47eb850490d652af2ad96 (diff) | |
download | pyserial-git-033f17c9d5c40fe8b5e55b6ada1d483b612ca092.tar.gz |
pep-8 and small cleanups
Diffstat (limited to 'serial/serialposix.py')
-rw-r--r-- | serial/serialposix.py | 227 |
1 files changed, 122 insertions, 105 deletions
diff --git a/serial/serialposix.py b/serial/serialposix.py index 79eaa66..9064c6a 100644 --- a/serial/serialposix.py +++ b/serial/serialposix.py @@ -21,7 +21,9 @@ import struct import sys import termios import time -from serial.serialutil import * + +import serial +from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError class PlatformSpecificBase(object): @@ -41,7 +43,7 @@ also add the device name of the serial port and where the counting starts for the first serial port. e.g. 'first serial port: /dev/ttyS0' and with a bit luck you can get this module running... -""" % (sys.platform, os.name, VERSION)) +""" % (sys.platform, os.name, serial.VERSION)) raise NotImplementedError('no number-to-device mapping defined on this platform') def _set_special_baudrate(self, baudrate): @@ -55,7 +57,7 @@ and with a bit luck you can get this module running... # for the platform plat = sys.platform.lower() -if plat[:5] == 'linux': # Linux (confirmed) +if plat[:5] == 'linux': # Linux (confirmed) import array # baudrate ioctls @@ -66,11 +68,10 @@ if plat[:5] == 'linux': # Linux (confirmed) # RS485 ioctls TIOCGRS485 = 0x542E TIOCSRS485 = 0x542F - SER_RS485_ENABLED = 0b00000001 - SER_RS485_RTS_ON_SEND = 0b00000010 + SER_RS485_ENABLED = 0b00000001 + SER_RS485_RTS_ON_SEND = 0b00000010 SER_RS485_RTS_AFTER_SEND = 0b00000100 - SER_RS485_RX_DURING_TX = 0b00010000 - + SER_RS485_RX_DURING_TX = 0b00010000 class PlatformSpecific(PlatformSpecificBase): BAUDRATE_CONSTANTS = { @@ -122,12 +123,12 @@ if plat[:5] == 'linux': # Linux (confirmed) buf[9] = buf[10] = baudrate # set serial_struct - res = fcntl.ioctl(self.fd, TCSETS2, buf) + fcntl.ioctl(self.fd, TCSETS2, buf) except IOError as e: raise ValueError('Failed to set custom baud rate (%s): %s' % (baudrate, e)) def _set_rs485_mode(self, rs485_settings): - buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding + buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding try: fcntl.ioctl(self.fd, TIOCGRS485, buf) if rs485_settings is not None: @@ -147,7 +148,7 @@ if plat[:5] == 'linux': # Linux (confirmed) buf[2] = int(rs485_settings.delay_rts_after_send * 1000) else: buf[0] = 0 # clear SER_RS485_ENABLED - res = fcntl.ioctl(self.fd, TIOCSRS485, buf) + fcntl.ioctl(self.fd, TIOCSRS485, buf) except IOError as e: raise ValueError('Failed to set RS485 mode: %s' % (e,)) @@ -185,7 +186,7 @@ elif plat[:3] == 'bsd' or plat[:7] == 'freebsd': elif plat[:6] == 'darwin': # OS X import array - IOSSIOSPEED = 0x80045402 #_IOW('T', 2, speed_t) + IOSSIOSPEED = 0x80045402 # _IOW('T', 2, speed_t) class PlatformSpecific(PlatformSpecificBase): def number_to_device(self, port_number): @@ -208,7 +209,7 @@ elif plat[:6] == 'netbsd': # NetBSD 1.6 testing by Erk elif plat[:4] == 'irix': # IRIX (partially tested) class PlatformSpecific(PlatformSpecificBase): def number_to_device(self, port_number): - return '/dev/ttyf%d' % (port_number + 1,) #XXX different device names depending on flow control + return '/dev/ttyf%d' % (port_number + 1,) # XXX different device names depending on flow control elif plat[:2] == 'hp': # HP-UX (not tested) class PlatformSpecific(PlatformSpecificBase): @@ -235,44 +236,44 @@ else: # load some constants for later use. # try to use values from termios, use defaults from linux otherwise -TIOCMGET = getattr(termios, 'TIOCMGET', 0x5415) -TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416) -TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417) -TIOCMSET = getattr(termios, 'TIOCMSET', 0x5418) +TIOCMGET = getattr(termios, 'TIOCMGET', 0x5415) +TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416) +TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417) +TIOCMSET = getattr(termios, 'TIOCMSET', 0x5418) -#TIOCM_LE = getattr(termios, 'TIOCM_LE', 0x001) +# TIOCM_LE = getattr(termios, 'TIOCM_LE', 0x001) TIOCM_DTR = getattr(termios, 'TIOCM_DTR', 0x002) TIOCM_RTS = getattr(termios, 'TIOCM_RTS', 0x004) -#TIOCM_ST = getattr(termios, 'TIOCM_ST', 0x008) -#TIOCM_SR = getattr(termios, 'TIOCM_SR', 0x010) +# TIOCM_ST = getattr(termios, 'TIOCM_ST', 0x008) +# TIOCM_SR = getattr(termios, 'TIOCM_SR', 0x010) TIOCM_CTS = getattr(termios, 'TIOCM_CTS', 0x020) TIOCM_CAR = getattr(termios, 'TIOCM_CAR', 0x040) TIOCM_RNG = getattr(termios, 'TIOCM_RNG', 0x080) TIOCM_DSR = getattr(termios, 'TIOCM_DSR', 0x100) -TIOCM_CD = getattr(termios, 'TIOCM_CD', TIOCM_CAR) -TIOCM_RI = getattr(termios, 'TIOCM_RI', TIOCM_RNG) -#TIOCM_OUT1 = getattr(termios, 'TIOCM_OUT1', 0x2000) -#TIOCM_OUT2 = getattr(termios, 'TIOCM_OUT2', 0x4000) +TIOCM_CD = getattr(termios, 'TIOCM_CD', TIOCM_CAR) +TIOCM_RI = getattr(termios, 'TIOCM_RI', TIOCM_RNG) +# TIOCM_OUT1 = getattr(termios, 'TIOCM_OUT1', 0x2000) +# TIOCM_OUT2 = getattr(termios, 'TIOCM_OUT2', 0x4000) if hasattr(termios, 'TIOCINQ'): TIOCINQ = termios.TIOCINQ else: TIOCINQ = getattr(termios, 'FIONREAD', 0x541B) -TIOCOUTQ = getattr(termios, 'TIOCOUTQ', 0x5411) +TIOCOUTQ = getattr(termios, 'TIOCOUTQ', 0x5411) TIOCM_zero_str = struct.pack('I', 0) TIOCM_RTS_str = struct.pack('I', TIOCM_RTS) TIOCM_DTR_str = struct.pack('I', TIOCM_DTR) -TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427) -TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428) +TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427) +TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428) -CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity +CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity class Serial(SerialBase, PlatformSpecific): """\ - Serial port class POSIX implementation. Serial port configuration is + Serial port class POSIX implementation. Serial port configuration is done with termios and fcntl. Runs on Linux and many other Un*x like systems. """ @@ -288,7 +289,7 @@ class Serial(SerialBase, PlatformSpecific): self.fd = None # open try: - self.fd = os.open(self.portstr, os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK) + self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) except OSError as msg: self.fd = None raise SerialException(msg.errno, "could not open port %s: %s" % (self._port, msg)) @@ -313,7 +314,6 @@ class Serial(SerialBase, PlatformSpecific): self._update_rts_state() self.reset_input_buffer() - def _reconfigure_port(self): """Set communication parameters on opened port.""" if self.fd is None: @@ -330,15 +330,16 @@ class Serial(SerialBase, PlatformSpecific): except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here raise SerialException("Could not configure port: %s" % msg) # set up raw mode / no echo / binary - cflag |= (termios.CLOCAL|termios.CREAD) - lflag &= ~(termios.ICANON|termios.ECHO|termios.ECHOE|termios.ECHOK|termios.ECHONL| - termios.ISIG|termios.IEXTEN) #|termios.ECHOPRT - for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk + cflag |= (termios.CLOCAL | termios.CREAD) + lflag &= ~(termios.ICANON | termios.ECHO | termios.ECHOE | + termios.ECHOK | termios.ECHONL | + termios.ISIG | termios.IEXTEN) # |termios.ECHOPRT + for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk if hasattr(termios, flag): lflag &= ~getattr(termios, flag) - oflag &= ~(termios.OPOST|termios.ONLCR|termios.OCRNL) - iflag &= ~(termios.INLCR|termios.IGNCR|termios.ICRNL|termios.IGNBRK) + oflag &= ~(termios.OPOST | termios.ONLCR | termios.OCRNL) + iflag &= ~(termios.INLCR | termios.IGNCR | termios.ICRNL | termios.IGNBRK) if hasattr(termios, 'IUCLC'): iflag &= ~termios.IUCLC if hasattr(termios, 'PARMRK'): @@ -355,7 +356,7 @@ class Serial(SerialBase, PlatformSpecific): # may need custom baud rate, it isn't in our list. ispeed = ospeed = getattr(termios, 'B38400') try: - custom_baud = int(self._baudrate) # store for later + custom_baud = int(self._baudrate) # store for later except ValueError: raise ValueError('Invalid baud rate: %r' % self._baudrate) else: @@ -375,27 +376,27 @@ class Serial(SerialBase, PlatformSpecific): else: raise ValueError('Invalid char len: %r' % self._bytesize) # setup stop bits - if self._stopbits == STOPBITS_ONE: + if self._stopbits == serial.STOPBITS_ONE: cflag &= ~(termios.CSTOPB) - elif self._stopbits == STOPBITS_ONE_POINT_FIVE: - cflag |= (termios.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5 - elif self._stopbits == STOPBITS_TWO: - cflag |= (termios.CSTOPB) + elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE: + cflag |= (termios.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5 + elif self._stopbits == serial.STOPBITS_TWO: + cflag |= (termios.CSTOPB) else: raise ValueError('Invalid stop bit specification: %r' % self._stopbits) # setup parity - iflag &= ~(termios.INPCK|termios.ISTRIP) - if self._parity == PARITY_NONE: - cflag &= ~(termios.PARENB|termios.PARODD) - elif self._parity == PARITY_EVEN: + iflag &= ~(termios.INPCK | termios.ISTRIP) + if self._parity == serial.PARITY_NONE: + cflag &= ~(termios.PARENB | termios.PARODD) + elif self._parity == serial.PARITY_EVEN: cflag &= ~(termios.PARODD) - cflag |= (termios.PARENB) - elif self._parity == PARITY_ODD: - cflag |= (termios.PARENB|termios.PARODD) - elif self._parity == PARITY_MARK and plat[:5] == 'linux': - cflag |= (termios.PARENB|CMSPAR|termios.PARODD) - elif self._parity == PARITY_SPACE and plat[:5] == 'linux': - cflag |= (termios.PARENB|CMSPAR) + cflag |= (termios.PARENB) + elif self._parity == serial.PARITY_ODD: + cflag |= (termios.PARENB | termios.PARODD) + elif self._parity == serial.PARITY_MARK and plat[:5] == 'linux': + cflag |= (termios.PARENB | CMSPAR | termios.PARODD) + elif self._parity == serial.PARITY_SPACE and plat[:5] == 'linux': + cflag |= (termios.PARENB | CMSPAR) cflag &= ~(termios.PARODD) else: raise ValueError('Invalid parity: %r' % self._parity) @@ -403,23 +404,23 @@ class Serial(SerialBase, PlatformSpecific): # xonxoff if hasattr(termios, 'IXANY'): if self._xonxoff: - iflag |= (termios.IXON|termios.IXOFF) #|termios.IXANY) + iflag |= (termios.IXON | termios.IXOFF) # |termios.IXANY) else: - iflag &= ~(termios.IXON|termios.IXOFF|termios.IXANY) + iflag &= ~(termios.IXON | termios.IXOFF | termios.IXANY) else: if self._xonxoff: - iflag |= (termios.IXON|termios.IXOFF) + iflag |= (termios.IXON | termios.IXOFF) else: - iflag &= ~(termios.IXON|termios.IXOFF) + iflag &= ~(termios.IXON | termios.IXOFF) # rtscts if hasattr(termios, 'CRTSCTS'): if self._rtscts: - cflag |= (termios.CRTSCTS) + cflag |= (termios.CRTSCTS) else: cflag &= ~(termios.CRTSCTS) elif hasattr(termios, 'CNEW_RTSCTS'): # try it with alternate constant name if self._rtscts: - cflag |= (termios.CNEW_RTSCTS) + cflag |= (termios.CNEW_RTSCTS) else: cflag &= ~(termios.CNEW_RTSCTS) # XXX should there be a warning if setting up rtscts (and xonxoff etc) fails?? @@ -435,7 +436,10 @@ class Serial(SerialBase, PlatformSpecific): cc[termios.VTIME] = vtime # activate settings if [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] != orig_attr: - termios.tcsetattr(self.fd, termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]) + termios.tcsetattr( + self.fd, + termios.TCSANOW, + [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]) # apply custom baud rate, if any if custom_baud is not None: @@ -459,7 +463,7 @@ class Serial(SerialBase, PlatformSpecific): """Return the number of bytes currently in the input buffer.""" #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str) s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str) - return struct.unpack('I',s)[0] + return struct.unpack('I', s)[0] # select based implementation, proved to work on many systems def read(self, size=1): @@ -468,18 +472,19 @@ class Serial(SerialBase, PlatformSpecific): return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError read = bytearray() while len(read) < size: try: - ready,_,_ = select.select([self.fd],[],[], self._timeout) + ready, _, _ = select.select([self.fd], [], [], self._timeout) # If select was used with a timeout, and the timeout occurs, it # returns with empty lists -> thus abort read operation. - # For timeout == 0 (non-blocking operation) also abort when there - # is nothing to read. + # For timeout == 0 (non-blocking operation) also abort when + # there is nothing to read. if not ready: break # timeout - buf = os.read(self.fd, size-len(read)) + buf = os.read(self.fd, size - len(read)) # read should always return some data as select reported it was # ready to read when we get to this point. if not buf: @@ -489,8 +494,8 @@ class Serial(SerialBase, PlatformSpecific): raise SerialException('device reports readiness to read but returned no data (device disconnected or multiple access on port?)') read.extend(buf) except OSError as e: - # this is for Python 3.x where select.error is a subclass of OSError - # ignore EAGAIN errors. all other errors are shown + # this is for Python 3.x where select.error is a subclass of + # OSError ignore EAGAIN errors. all other errors are shown if e.errno != errno.EAGAIN: raise SerialException('read failed: %s' % (e,)) except select.error as e: @@ -503,7 +508,8 @@ class Serial(SerialBase, PlatformSpecific): def write(self, data): """Output the given byte string over the serial port.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError d = to_bytes(data) tx_len = len(d) if self._write_timeout is not None and self._write_timeout > 0: @@ -541,12 +547,14 @@ class Serial(SerialBase, PlatformSpecific): Flush of file like objects. In this case, wait until all data is written. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError termios.tcdrain(self.fd) def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError termios.tcflush(self.fd, termios.TCIFLUSH) def reset_output_buffer(self): @@ -554,7 +562,8 @@ class Serial(SerialBase, PlatformSpecific): Clear output buffer, aborting the current output and discarding all that is in the buffer. """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError termios.tcflush(self.fd, termios.TCOFLUSH) def send_break(self, duration=0.25): @@ -562,8 +571,9 @@ class Serial(SerialBase, PlatformSpecific): Send break condition. Timed, returns to idle state after given duration. """ - if not self.is_open: raise portNotOpenError - termios.tcsendbreak(self.fd, int(duration/0.25)) + if not self.is_open: + raise portNotOpenError + termios.tcsendbreak(self.fd, int(duration / 0.25)) def _update_break_state(self): """\ @@ -591,30 +601,34 @@ class Serial(SerialBase, PlatformSpecific): @property def cts(self): """Read terminal status line: Clear To Send""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_CTS != 0 + return struct.unpack('I', s)[0] & TIOCM_CTS != 0 @property def dsr(self): """Read terminal status line: Data Set Ready""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_DSR != 0 + return struct.unpack('I', s)[0] & TIOCM_DSR != 0 @property def ri(self): """Read terminal status line: Ring Indicator""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_RI != 0 + return struct.unpack('I', s)[0] & TIOCM_RI != 0 @property def cd(self): """Read terminal status line: Carrier Detect""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_CD != 0 + return struct.unpack('I', s)[0] & TIOCM_CD != 0 # - - platform specific - - - - @@ -623,11 +637,12 @@ class Serial(SerialBase, PlatformSpecific): """Return the number of bytes currently in the output buffer.""" #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str) s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str) - return struct.unpack('I',s)[0] + return struct.unpack('I', s)[0] def nonblocking(self): """internal - not portable!""" - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError fcntl.fcntl(self.fd, fcntl.F_SETFL, os.O_NONBLOCK) def fileno(self): @@ -635,7 +650,8 @@ class Serial(SerialBase, PlatformSpecific): For easier use of the serial port instance with select. WARNING: this function is not portable to different platforms! """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError return self.fd def set_input_flow_control(self, enable=True): @@ -644,7 +660,8 @@ class Serial(SerialBase, PlatformSpecific): This will send XON (true) or XOFF (false) to the other device. WARNING: this function is not portable to different platforms! """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if enable: termios.tcflow(self.fd, termios.TCION) else: @@ -656,14 +673,14 @@ class Serial(SerialBase, PlatformSpecific): control is enabled. WARNING: this function is not portable to different platforms! """ - if not self.is_open: raise portNotOpenError + if not self.is_open: + raise portNotOpenError if enable: termios.tcflow(self.fd, termios.TCOON) else: termios.tcflow(self.fd, termios.TCOOFF) - class PosixPollSerial(Serial): """\ Poll based read implementation. Not all systems support poll properly. @@ -677,43 +694,43 @@ class PosixPollSerial(Serial): return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ - if self.fd is None: raise portNotOpenError + if self.fd is None: + raise portNotOpenError read = bytearray() poll = select.poll() - poll.register(self.fd, select.POLLIN|select.POLLERR|select.POLLHUP|select.POLLNVAL) + poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) if size > 0: while len(read) < size: # print "\tread(): size",size, "have", len(read) #debug # wait until device becomes ready to read (or something fails) - for fd, event in poll.poll(self._timeout*1000): - if event & (select.POLLERR|select.POLLHUP|select.POLLNVAL): + for fd, event in poll.poll(self._timeout * 1000): + if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL): raise SerialException('device reports error (poll)') # we don't care if it is select.POLLIN or timeout, that's # handled below buf = os.read(self.fd, size - len(read)) read.extend(buf) if ((self._timeout is not None and self._timeout >= 0) or - (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf: + (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0)) and not buf: break # early abort on timeout return bytes(read) if __name__ == '__main__': s = Serial(0, - baudrate=19200, # baud rate - bytesize=EIGHTBITS, # number of data bits - parity=PARITY_EVEN, # enable parity checking - stopbits=STOPBITS_ONE, # number of stop bits - timeout=3, # set a timeout value, None for waiting forever - xonxoff=0, # enable software flow control - rtscts=0, # enable RTS/CTS flow control + baudrate=19200, # baud rate + bytesize=serial.EIGHTBITS, # number of data bits + parity=serial.PARITY_EVEN, # enable parity checking + stopbits=serial.STOPBITS_ONE, # number of stop bits + timeout=3, # set a timeout value, None for waiting forever + xonxoff=0, # enable software flow control + rtscts=0, # enable RTS/CTS flow control ) - s.setRTS(1) - s.setDTR(1) - s.flushInput() - s.flushOutput() + s.rts = True + s.dtr = True + s.reset_input_buffer() + s.reset_output_buffer() s.write('hello') sys.stdout.write('%r\n' % s.read(5)) sys.stdout.write('%s\n' % s.inWaiting()) del s - |