diff options
Diffstat (limited to 'pyserial/serial')
-rw-r--r-- | pyserial/serial/.cvsignore | 1 | ||||
-rw-r--r-- | pyserial/serial/__init__.py | 26 | ||||
-rw-r--r-- | pyserial/serial/serialcli.py | 271 | ||||
-rw-r--r-- | pyserial/serial/serialjava.py | 260 | ||||
-rw-r--r-- | pyserial/serial/serialposix.py | 612 | ||||
-rw-r--r-- | pyserial/serial/serialutil.py | 478 | ||||
-rw-r--r-- | pyserial/serial/serialwin32.py | 383 | ||||
-rw-r--r-- | pyserial/serial/sermsdos.py | 200 | ||||
-rw-r--r-- | pyserial/serial/win32.py | 288 |
9 files changed, 0 insertions, 2519 deletions
diff --git a/pyserial/serial/.cvsignore b/pyserial/serial/.cvsignore deleted file mode 100644 index 7e99e36..0000000 --- a/pyserial/serial/.cvsignore +++ /dev/null @@ -1 +0,0 @@ -*.pyc
\ No newline at end of file diff --git a/pyserial/serial/__init__.py b/pyserial/serial/__init__.py deleted file mode 100644 index 6e160e0..0000000 --- a/pyserial/serial/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python - -# portable serial port access with python -# this is a wrapper module for different platform implementations -# -# (C)2001-2002 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt - -VERSION = '2.4' - -import sys - -if sys.platform == 'cli': - from serialcli import * -else: - import os - # chose an implementation, depending on os - if os.name == 'nt': #sys.platform == 'win32': - from serialwin32 import * - elif os.name == 'posix': - from serialposix import * - elif os.name == 'java': - from serialjava import * - else: - raise Exception("Sorry: no implementation for your platform ('%s') available" % os.name) - diff --git a/pyserial/serial/serialcli.py b/pyserial/serial/serialcli.py deleted file mode 100644 index d8eaf1e..0000000 --- a/pyserial/serial/serialcli.py +++ /dev/null @@ -1,271 +0,0 @@ -#! python -# Python Serial Port Extension for Win32, Linux, BSD, Jython and .NET/Mono -# serial driver for .NET/Mono (IronPython), .NET >= 2 -# see __init__.py -# -# (C) 2008 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt - -import clr -import System -import System.IO.Ports -from serialutil import * - - -def device(portnum): - """Turn a port number into a device name""" - return System.IO.Ports.SerialPort.GetPortNames()[portnum] - - -# must invoke function with byte array, make a helper to convert strings -# to byte arrays -sab = System.Array[System.Byte] -def as_byte_array(string): - return sab([ord(x) for x in string]) # XXX will require adaption when run with a 3.x compatible IronPython - -class IronSerial(SerialBase): - """Serial port implemenation for .NET/Mono.""" - - BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, - 9600, 19200, 38400, 57600, 115200) - - def open(self): - """Open port with current settings. This may throw a SerialException - if the port cannot be opened.""" - if self._port is None: - raise SerialException("Port must be configured before it can be used.") - try: - self._port_handle = System.IO.Ports.SerialPort(self.portstr) - except Exception, msg: - self._port_handle = None - raise SerialException("could not open port %s: %s" % (self.portstr, msg)) - - self._reconfigurePort() - self._port_handle.Open() - self._isOpen = True - if not self._rtscts: - self.setRTS(True) - self.setDTR(True) - self.flushInput() - self.flushOutput() - - def _reconfigurePort(self): - """Set communication parameters on opened port.""" - if not self._port_handle: - raise SerialException("Can only operate on a valid port handle") - - #~ self._port_handle.ReceivedBytesThreshold = 1 - - if self._timeout is None: - self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout - else: - self._port_handle.ReadTimeout = int(self._timeout*1000) - - # if self._timeout != 0 and self._interCharTimeout is not None: - # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:] - - if self._writeTimeout is None: - self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout - else: - self._port_handle.WriteTimeout = int(self._writeTimeout*1000) - - - # Setup the connection info. - try: - self._port_handle.BaudRate = self._baudrate - except IOError, e: - # catch errors from illegal baudrate settings - raise ValueError(str(e)) - - if self._bytesize == FIVEBITS: - self._port_handle.DataBits = 5 - elif self._bytesize == SIXBITS: - self._port_handle.DataBits = 6 - elif self._bytesize == SEVENBITS: - self._port_handle.DataBits = 7 - elif self._bytesize == EIGHTBITS: - self._port_handle.DataBits = 8 - else: - raise ValueError("Unsupported number of data bits: %r" % self._bytesize) - - if self._parity == PARITY_NONE: - self._port_handle.Parity = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k - elif self._parity == PARITY_EVEN: - self._port_handle.Parity = System.IO.Ports.Parity.Even - elif self._parity == PARITY_ODD: - self._port_handle.Parity = System.IO.Ports.Parity.Odd - elif self._parity == PARITY_MARK: - self._port_handle.Parity = System.IO.Ports.Parity.Mark - elif self._parity == PARITY_SPACE: - self._port_handle.Parity = System.IO.Ports.Parity.Space - else: - raise ValueError("Unsupported parity mode: %r" % self._parity) - - if self._stopbits == STOPBITS_ONE: - self._port_handle.StopBits = System.IO.Ports.StopBits.One - elif self._stopbits == STOPBITS_ONE_POINT_FIVE: - self._port_handle.StopBits = System.IO.Ports.StopBits.OnePointFive - elif self._stopbits == STOPBITS_TWO: - self._port_handle.StopBits = System.IO.Ports.StopBits.Two - else: - raise ValueError("Unsupported number of stop bits: %r" % self._stopbits) - - if self._rtscts and self._xonxoff: - self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSendXOnXOff - elif self._rtscts: - self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSend - elif self._xonxoff: - self._port_handle.Handshake = System.IO.Ports.Handshake.XOnXOff - else: - self._port_handle.Handshake = getattr(System.IO.Ports.Handshake, 'None') # reserved keyword in Py3k - - #~ def __del__(self): - #~ self.close() - - def close(self): - """Close port""" - if self._isOpen: - if self._port_handle: - try: - self._port_handle.Close() - except System.IO.Ports.InvalidOperationException: - # ignore errors. can happen for unplugged USB serial devices - pass - self._port_handle = None - self._isOpen = False - - def makeDeviceName(self, port): - try: - return device(port) - except TypeError, e: - raise SerialException(str(e)) - - # - - - - - - - - - - - - - - - - - - - - - - - - - - def inWaiting(self): - """Return the number of characters currently in the input buffer.""" - if not self._port_handle: raise portNotOpenError - return self._port_handle.BytesToRead - - def read(self, size=1): - """Read size bytes from the serial port. If a timeout is set it may - return less characters as requested. With no timeout it will block - until the requested number of bytes is read.""" - if not self._port_handle: raise portNotOpenError - # must use single byte reads as this is the only way to read - # without applying encodings - data = bytearray() - while size: - try: - data.append(self._port_handle.ReadByte()) - except System.TimeoutException, e: - break - else: - size -= 1 - return bytes(data) - - def write(self, data): - """Output the given string over the serial port.""" - if not self._port_handle: raise portNotOpenError - if not isinstance(data, (bytes, bytearray)): - raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) - try: - # must call overloaded method with byte array argument - # as this is the only one not applying encodings - self._port_handle.Write(as_byte_array(data), 0, len(data)) - except System.TimeoutException, e: - raise writeTimeoutError - return len(data) - - def flushInput(self): - """Clear input buffer, discarding all that is in the buffer.""" - if not self._port_handle: raise portNotOpenError - self._port_handle.DiscardInBuffer() - - def flushOutput(self): - """Clear output buffer, aborting the current output and - discarding all that is in the buffer.""" - if not self._port_handle: raise portNotOpenError - self._port_handle.DiscardOutBuffer() - - def sendBreak(self, duration=0.25): - """Send break condition. Timed, returns to idle state after given duration.""" - if not self._port_handle: raise portNotOpenError - import time - self._port_handle.BreakState = True - time.sleep(duration) - self._port_handle.BreakState = False - - def setBreak(self, level=True): - """Set break: Controls TXD. When active, to transmitting is possible.""" - if not self._port_handle: raise portNotOpenError - self._port_handle.BreakState = bool(level) - - def setRTS(self, level=True): - """Set terminal status line: Request To Send""" - if not self._port_handle: raise portNotOpenError - self._port_handle.RtsEnable = bool(level) - - def setDTR(self, level=True): - """Set terminal status line: Data Terminal Ready""" - if not self._port_handle: raise portNotOpenError - self._port_handle.DtrEnable = bool(level) - - def getCTS(self): - """Read terminal status line: Clear To Send""" - if not self._port_handle: raise portNotOpenError - return self._port_handle.CtsHolding - - def getDSR(self): - """Read terminal status line: Data Set Ready""" - if not self._port_handle: raise portNotOpenError - return self._port_handle.DsrHolding - - def getRI(self): - """Read terminal status line: Ring Indicator""" - if not self._port_handle: raise portNotOpenError - #~ return self._port_handle.XXX - return False #XXX an error would be better - - def getCD(self): - """Read terminal status line: Carrier Detect""" - if not self._port_handle: raise portNotOpenError - return self._port_handle.CDHolding - - # - - platform specific - - - - - # none - - -# assemble Serial class with the platform specific implementation and the base -# for file-like behavior. for Python 2.6 and newer, that provide the new I/O -# library, derive from io.RawIOBase -try: - import io -except ImportError: - # classic version with our own file-like emulation - class Serial(IronSerial, FileLike): - pass -else: - # io library present - class Serial(IronSerial, io.RawIOBase): - pass - - -# Nur Testfunktion!! -if __name__ == '__main__': - import sys - - s = Serial(0) - sys.stdio.write('%s\n' % s) - - s = Serial() - sys.stdio.write('%s\n' % s) - - - s.baudrate = 19200 - s.databits = 7 - s.close() - s.port = 0 - s.open() - sys.stdio.write('%s\n' % s) - diff --git a/pyserial/serial/serialjava.py b/pyserial/serial/serialjava.py deleted file mode 100644 index 2541534..0000000 --- a/pyserial/serial/serialjava.py +++ /dev/null @@ -1,260 +0,0 @@ -#!jython -# -# Python Serial Port Extension for Win32, Linux, BSD, Jython -# module for serial IO for Jython and JavaComm -# see __init__.py -# -# (C) 2002-2008 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt - -from serialutil import * - -def my_import(name): - mod = __import__(name) - components = name.split('.') - for comp in components[1:]: - mod = getattr(mod, comp) - return mod - - -def detect_java_comm(names): - """try given list of modules and return that imports""" - for name in names: - try: - mod = my_import(name) - mod.SerialPort - return mod - except (ImportError, AttributeError): - pass - raise ImportError("No Java Communications API implementation found") - - -# Java Communications API implementations -# http://mho.republika.pl/java/comm/ - -comm = detect_java_comm([ - 'javax.comm', # Sun/IBM - 'gnu.io', # RXTX -]) - - -def device(portnumber): - """Turn a port number into a device name""" - enum = comm.CommPortIdentifier.getPortIdentifiers() - ports = [] - while enum.hasMoreElements(): - el = enum.nextElement() - if el.getPortType() == comm.CommPortIdentifier.PORT_SERIAL: - ports.append(el) - return ports[portnumber].getName() - - -class JavaSerial(SerialBase): - """Serial port class, implemented with Java Communications API and - thus usable with jython and the appropriate java extension.""" - - def open(self): - """Open port with current settings. This may throw a SerialException - if the port cannot be opened.""" - if self._port is None: - raise SerialException("Port must be configured before it can be used.") - if type(self._port) == type(''): # strings are taken directly - portId = comm.CommPortIdentifier.getPortIdentifier(self._port) - else: - portId = comm.CommPortIdentifier.getPortIdentifier(device(self._port)) # numbers are transformed to a comport id obj - try: - self.sPort = portId.open("python serial module", 10) - except Exception, msg: - self.sPort = None - raise SerialException("Could not open port: %s" % msg) - self._reconfigurePort() - self._instream = self.sPort.getInputStream() - self._outstream = self.sPort.getOutputStream() - self._isOpen = True - - def _reconfigurePort(self): - """Set communication parameters on opened port.""" - if not self.sPort: - raise SerialException("Can only operate on a valid port handle") - - self.sPort.enableReceiveTimeout(30) - if self._bytesize == FIVEBITS: - jdatabits = comm.SerialPort.DATABITS_5 - elif self._bytesize == SIXBITS: - jdatabits = comm.SerialPort.DATABITS_6 - elif self._bytesize == SEVENBITS: - jdatabits = comm.SerialPort.DATABITS_7 - elif self._bytesize == EIGHTBITS: - jdatabits = comm.SerialPort.DATABITS_8 - else: - raise ValueError("unsupported bytesize: %r" % self._bytesize) - - if self._stopbits == STOPBITS_ONE: - jstopbits = comm.SerialPort.STOPBITS_1 - elif stopbits == STOPBITS_ONE_POINT_FIVE: - self._jstopbits = comm.SerialPort.STOPBITS_1_5 - elif self._stopbits == STOPBITS_TWO: - jstopbits = comm.SerialPort.STOPBITS_2 - else: - raise ValueError("unsupported number of stopbits: %r" % self._stopbits) - - if self._parity == PARITY_NONE: - jparity = comm.SerialPort.PARITY_NONE - elif self._parity == PARITY_EVEN: - jparity = comm.SerialPort.PARITY_EVEN - elif self._parity == PARITY_ODD: - jparity = comm.SerialPort.PARITY_ODD - elif self._parity == PARITY_MARK: - jparity = comm.SerialPort.PARITY_MARK - elif self._parity == PARITY_SPACE: - jparity = comm.SerialPort.PARITY_SPACE - else: - raise ValueError("unsupported parity type: %r" % self._parity) - - jflowin = jflowout = 0 - if self._rtscts: - jflowin |= comm.SerialPort.FLOWCONTROL_RTSCTS_IN - jflowout |= comm.SerialPort.FLOWCONTROL_RTSCTS_OUT - if self._xonxoff: - jflowin |= comm.SerialPort.FLOWCONTROL_XONXOFF_IN - jflowout |= comm.SerialPort.FLOWCONTROL_XONXOFF_OUT - - self.sPort.setSerialPortParams(self._baudrate, jdatabits, jstopbits, jparity) - self.sPort.setFlowControlMode(jflowin | jflowout) - - if self._timeout >= 0: - self.sPort.enableReceiveTimeout(self._timeout*1000) - else: - self.sPort.disableReceiveTimeout() - - def close(self): - """Close port""" - if self._isOpen: - if self.sPort: - self._instream.close() - self._outstream.close() - self.sPort.close() - self.sPort = None - self._isOpen = False - - def makeDeviceName(self, port): - return device(port) - - # - - - - - - - - - - - - - - - - - - - - - - - - - - def inWaiting(self): - """Return the number of characters currently in the input buffer.""" - if not self.sPort: raise portNotOpenError - return self._instream.available() - - def read(self, size=1): - """Read size bytes from the serial port. If a timeout is set it may - return less characters as requested. With no timeout it will block - until the requested number of bytes is read.""" - if not self.sPort: raise portNotOpenError - read = bytearray() - if size > 0: - while len(read) < size: - x = self._instream.read() - if x == -1: - if self.timeout >= 0: - break - else: - read.append(x) - return bytes(read) - - def write(self, data): - """Output the given string over the serial port.""" - if not self.sPort: raise portNotOpenError - if not isinstance(data, (bytes, bytearray)): - raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) - self._outstream.write(data) - return len(data) - - def flushInput(self): - """Clear input buffer, discarding all that is in the buffer.""" - if not self.sPort: raise portNotOpenError - self._instream.skip(self._instream.available()) - - def flushOutput(self): - """Clear output buffer, aborting the current output and - discarding all that is in the buffer.""" - if not self.sPort: raise portNotOpenError - self._outstream.flush() - - def sendBreak(self, duration=0.25): - """Send break condition. Timed, returns to idle state after given duration.""" - if not self.sPort: raise portNotOpenError - self.sPort.sendBreak(duration*1000.0) - - def setBreak(self, level=1): - """Set break: Controls TXD. When active, to transmitting is possible.""" - if self.fd is None: raise portNotOpenError - raise SerialException("The setBreak function is not implemented in java.") - - def setRTS(self, level=1): - """Set terminal status line: Request To Send""" - if not self.sPort: raise portNotOpenError - self.sPort.setRTS(level) - - def setDTR(self, level=1): - """Set terminal status line: Data Terminal Ready""" - if not self.sPort: raise portNotOpenError - self.sPort.setDTR(level) - - def getCTS(self): - """Read terminal status line: Clear To Send""" - if not self.sPort: raise portNotOpenError - self.sPort.isCTS() - - def getDSR(self): - """Read terminal status line: Data Set Ready""" - if not self.sPort: raise portNotOpenError - self.sPort.isDSR() - - def getRI(self): - """Read terminal status line: Ring Indicator""" - if not self.sPort: raise portNotOpenError - self.sPort.isRI() - - def getCD(self): - """Read terminal status line: Carrier Detect""" - if not self.sPort: raise portNotOpenError - self.sPort.isCD() - - -# assemble Serial class with the platform specific implementation and the base -# for file-like behavior. for Python 2.6 and newer, that provide the new I/O -# library, derive from io.RawIOBase -try: - import io -except ImportError: - # classic version with our own file-like emulation - class Serial(JavaSerial, FileLike): - pass -else: - # io library present - class Serial(JavaSerial, io.RawIOBase): - pass - - -if __name__ == '__main__': - s = Serial(0, - baudrate=19200, # baudrate - bytesize=EIGHTBITS, # number of databits - parity=PARITY_EVEN, # enable parity checking - stopbits=STOPBITS_ONE, # number of stopbits - timeout=3, # set a timeout value, None for waiting forever - xonxoff=0, # enable software flow control - rtscts=0, # enable RTS/CTS flow control - ) - s.setRTS(1) - s.setDTR(1) - s.flushInput() - s.flushOutput() - s.write('hello') - sys.stdio.write('%r\n' % s.read(5)) - sys.stdio.write('%s\n' % s.inWaiting()) - del s - - diff --git a/pyserial/serial/serialposix.py b/pyserial/serial/serialposix.py deleted file mode 100644 index bbf7fb9..0000000 --- a/pyserial/serial/serialposix.py +++ /dev/null @@ -1,612 +0,0 @@ -#!/usr/bin/env python -# -# Python Serial Port Extension for Win32, Linux, BSD, Jython -# module for serial IO for POSIX compatible systems, like Linux -# see __init__.py -# -# (C) 2001-2009 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt -# -# parts based on code from Grant B. Edwards <grante@visi.com>: -# ftp://ftp.visi.com/users/grante/python/PosixSerial.py -# -# references: http://www.easysw.com/~mike/serial/serial.html - -import sys, os, fcntl, termios, struct, select, errno -from serialutil import * - -# Do check the Python version as some constants have moved. -if (sys.hexversion < 0x020100f0): - import TERMIOS -else: - TERMIOS = termios - -if (sys.hexversion < 0x020200f0): - import FCNTL -else: - FCNTL = fcntl - -baudrate_constants = { - 0: 0000000, # hang up - 50: 0000001, - 75: 0000002, - 110: 0000003, - 134: 0000004, - 150: 0000005, - 200: 0000006, - 300: 0000007, - 600: 0000010, - 1200: 0000011, - 1800: 0000012, - 2400: 0000013, - 4800: 0000014, - 9600: 0000015, - 19200: 0000016, - 38400: 0000017, - 57600: 0010001, - 115200: 0010002, - 230400: 0010003, - 460800: 0010004, - 500000: 0010005, - 576000: 0010006, - 921600: 0010007, - 1000000: 0010010, - 1152000: 0010011, - 1500000: 0010012, - 2000000: 0010013, - 2500000: 0010014, - 3000000: 0010015, - 3500000: 0010016, - 4000000: 0010017 -} - -# try to detect the OS so that a device can be selected... -# this code block should supply a device() and set_special_baudrate() function -# for the platform -plat = sys.platform.lower() - -if plat[:5] == 'linux': # Linux (confirmed) - - def device(port): - return '/dev/ttyS%d' % port - - ASYNC_SPD_MASK = 0x1030 - ASYNC_SPD_CUST = 0x0030 - - def set_special_baudrate(port, baudrate): - import array - buf = array.array('i', [0] * 32) - - # get serial_struct - FCNTL.ioctl(port.fd, TERMIOS.TIOCGSERIAL, buf) - - # set custom divisor - buf[6] = buf[7] / baudrate - - # update flags - buf[4] &= ~ASYNC_SPD_MASK - buf[4] |= ASYNC_SPD_CUST - - # set serial_struct - try: - res = FCNTL.ioctl(port.fd, TERMIOS.TIOCSSERIAL, buf) - except IOError: - raise ValueError('Failed to set custom baud rate: %r' % baudrate) - -elif plat == 'cygwin': # cygwin/win32 (confirmed) - - def device(port): - return '/dev/com%d' % (port + 1) - - ASYNC_SPD_MASK = 0x1030 - ASYNC_SPD_CUST = 0x0030 - - # XXX untested! - def set_special_baudrate(port, baudrate): - import array - buf = array.array('i', [0] * 32) - - # get serial_struct - FCNTL.ioctl(port.fd, TERMIOS.TIOCGSERIAL, buf) - - # set custom divisor - buf[6] = buf[7] / baudrate - - # update flags - buf[4] &= ~ASYNC_SPD_MASK - buf[4] |= ASYNC_SPD_CUST - - # set serial_struct - try: - res = FCNTL.ioctl(port.fd, TERMIOS.TIOCSSERIAL, buf) - except IOError: - raise ValueError('Failed to set custom baud rate: %r' % baudrate) - -elif plat == 'openbsd3': # BSD (confirmed) - - def device(port): - return '/dev/ttyp%d' % port - - def set_special_baudrate(port, baudrate): - raise ValueError("sorry don't know how to handle non standard baud rate on this platform") - -elif plat[:3] == 'bsd' or \ - plat[:7] == 'freebsd' or \ - plat[:7] == 'openbsd': # BSD (confirmed for freebsd4: cuaa%d) - - def device(port): - return '/dev/cuad%d' % port - - def set_special_baudrate(port, baudrate): - raise ValueError("sorry don't know how to handle non standard baud rate on this platform") - -elif plat[:6] == 'darwin': # OS X - - version = os.uname()[2].split('.') - # Tiger or above can support arbitrary serial speeds - if int(version[0]) >= 8: - # remove all speeds not supported with TERMIOS so that pyserial never - # attempts to use them directly - for b in baudrate_constants.keys(): - if b > 230400: - del baudrate_constants[b] - - def set_special_baudrate(port, baudrate): - # use IOKit-specific call to set up high speeds - import array, fcntl - buf = array.array('i', [baudrate]) - IOSSIOSPEED = 0x80045402 #_IOW('T', 2, speed_t) - fcntl.ioctl(port.fd, IOSSIOSPEED, buf, 1) - else: # version < 8 - def set_special_baudrate(port, baudrate): - raise ValueError("baud rate not supported") - - def device(port): - return '/dev/cuad%d' % port - - -elif plat[:6] == 'netbsd': # NetBSD 1.6 testing by Erk - - def device(port): - return '/dev/dty%02d' % port - - def set_special_baudrate(port, baudrate): - raise ValueError("sorry don't know how to handle non standard baud rate on this platform") - -elif plat[:4] == 'irix': # IRIX (partially tested) - - def device(port): - return '/dev/ttyf%d' % (port+1) #XXX different device names depending on flow control - - def set_special_baudrate(port, baudrate): - raise ValueError("sorry don't know how to handle non standard baud rate on this platform") - -elif plat[:2] == 'hp': # HP-UX (not tested) - - def device(port): - return '/dev/tty%dp0' % (port+1) - - def set_special_baudrate(port, baudrate): - raise ValueError("sorry don't know how to handle non standard baud rate on this platform") - -elif plat[:5] == 'sunos': # Solaris/SunOS (confirmed) - - def device(port): - return '/dev/tty%c' % (ord('a')+port) - - def set_special_baudrate(port, baudrate): - raise ValueError("sorry don't know how to handle non standard baud rate on this platform") - -elif plat[:3] == 'aix': # AIX - - def device(port): - return '/dev/tty%d' % (port) - - def set_special_baudrate(port, baudrate): - raise ValueError("sorry don't know how to handle non standard baud rate on this platform") - -else: - #platform detection has failed... - sys.stderr.write("""\ -don't know how to number ttys on this system. -! Use an explicit path (eg /dev/ttyS1) or send this information to -! the author of this module: - -sys.platform = %r -os.name = %r -serialposix.py version = %s - -also add the device name of the serial port and where the -counting starts for the first serial port. -e.g. 'first serial port: /dev/ttyS0' -and with a bit luck you can get this module running... -""" % (sys.platform, os.name, VERSION)) - # no exception, just continue with a brave attempt to build a device name - # even if the device name is not correct for the platform it has chances - # to work using a string with the real device name as port parameter. - def device(portum): - return '/dev/ttyS%d' % portnum - def set_special_baudrate(port, baudrate): - raise SerialException("sorry don't know how to handle non standard baud rate on this platform") - #~ raise Exception, "this module does not run on this platform, sorry." - -# whats up with "aix", "beos", .... -# they should work, just need to know the device names. - - -# load some constants for later use. -# try to use values from TERMIOS, use defaults from linux otherwise -TIOCMGET = hasattr(TERMIOS, 'TIOCMGET') and TERMIOS.TIOCMGET or 0x5415 -TIOCMBIS = hasattr(TERMIOS, 'TIOCMBIS') and TERMIOS.TIOCMBIS or 0x5416 -TIOCMBIC = hasattr(TERMIOS, 'TIOCMBIC') and TERMIOS.TIOCMBIC or 0x5417 -TIOCMSET = hasattr(TERMIOS, 'TIOCMSET') and TERMIOS.TIOCMSET or 0x5418 - -#TIOCM_LE = hasattr(TERMIOS, 'TIOCM_LE') and TERMIOS.TIOCM_LE or 0x001 -TIOCM_DTR = hasattr(TERMIOS, 'TIOCM_DTR') and TERMIOS.TIOCM_DTR or 0x002 -TIOCM_RTS = hasattr(TERMIOS, 'TIOCM_RTS') and TERMIOS.TIOCM_RTS or 0x004 -#TIOCM_ST = hasattr(TERMIOS, 'TIOCM_ST') and TERMIOS.TIOCM_ST or 0x008 -#TIOCM_SR = hasattr(TERMIOS, 'TIOCM_SR') and TERMIOS.TIOCM_SR or 0x010 - -TIOCM_CTS = hasattr(TERMIOS, 'TIOCM_CTS') and TERMIOS.TIOCM_CTS or 0x020 -TIOCM_CAR = hasattr(TERMIOS, 'TIOCM_CAR') and TERMIOS.TIOCM_CAR or 0x040 -TIOCM_RNG = hasattr(TERMIOS, 'TIOCM_RNG') and TERMIOS.TIOCM_RNG or 0x080 -TIOCM_DSR = hasattr(TERMIOS, 'TIOCM_DSR') and TERMIOS.TIOCM_DSR or 0x100 -TIOCM_CD = hasattr(TERMIOS, 'TIOCM_CD') and TERMIOS.TIOCM_CD or TIOCM_CAR -TIOCM_RI = hasattr(TERMIOS, 'TIOCM_RI') and TERMIOS.TIOCM_RI or TIOCM_RNG -#TIOCM_OUT1 = hasattr(TERMIOS, 'TIOCM_OUT1') and TERMIOS.TIOCM_OUT1 or 0x2000 -#TIOCM_OUT2 = hasattr(TERMIOS, 'TIOCM_OUT2') and TERMIOS.TIOCM_OUT2 or 0x4000 -TIOCINQ = hasattr(TERMIOS, 'FIONREAD') and TERMIOS.FIONREAD or 0x541B - -TIOCM_zero_str = struct.pack('I', 0) -TIOCM_RTS_str = struct.pack('I', TIOCM_RTS) -TIOCM_DTR_str = struct.pack('I', TIOCM_DTR) - -TIOCSBRK = hasattr(TERMIOS, 'TIOCSBRK') and TERMIOS.TIOCSBRK or 0x5427 -TIOCCBRK = hasattr(TERMIOS, 'TIOCCBRK') and TERMIOS.TIOCCBRK or 0x5428 - - -class PosixSerial(SerialBase): - """Serial port class POSIX implementation. Serial port configuration is - done with termios and fcntl. Runs on Linux and many other Un*x like - systems.""" - - def open(self): - """Open port with current settings. This may throw a SerialException - if the port cannot be opened.""" - self.fd = None - if self._port is None: - raise SerialException("Port must be configured before it can be used.") - # open - try: - self.fd = os.open(self.portstr, os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK) - except Exception, msg: - self.fd = None - raise SerialException("could not open port %s: %s" % (self._port, msg)) - #~ fcntl.fcntl(self.fd, FCNTL.F_SETFL, 0) # set blocking - - try: - self._reconfigurePort() - except: - try: - os.close(self.fd) - except: - # ignore any exception when closing the port - # also to keep original exception that happened when setting up - pass - self.fd = None - raise - else: - self._isOpen = True - #~ self.flushInput() - - - def _reconfigurePort(self): - """Set communication parameters on opened port.""" - if self.fd is None: - raise SerialException("Can only operate on a valid port handle") - custom_baud = None - - vmin = vtime = 0 # timeout is done via select - if self._interCharTimeout is not None: - vmin = 1 - vtime = int(self._interCharTimeout * 10) - try: - iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self.fd) - except termios.error, msg: # if a port is nonexistent but has a /dev file, it'll fail here - raise SerialException("Could not configure port: %s" % msg) - # set up raw mode / no echo / binary - cflag |= (TERMIOS.CLOCAL|TERMIOS.CREAD) - lflag &= ~(TERMIOS.ICANON|TERMIOS.ECHO|TERMIOS.ECHOE|TERMIOS.ECHOK|TERMIOS.ECHONL| - TERMIOS.ISIG|TERMIOS.IEXTEN) #|TERMIOS.ECHOPRT - for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk - if hasattr(TERMIOS, flag): - lflag &= ~getattr(TERMIOS, flag) - - oflag &= ~(TERMIOS.OPOST) - iflag &= ~(TERMIOS.INLCR|TERMIOS.IGNCR|TERMIOS.ICRNL|TERMIOS.IGNBRK) - if hasattr(TERMIOS, 'IUCLC'): - iflag &= ~TERMIOS.IUCLC - if hasattr(TERMIOS, 'PARMRK'): - iflag &= ~TERMIOS.PARMRK - - # setup baud rate - try: - ispeed = ospeed = getattr(TERMIOS, 'B%s' % (self._baudrate)) - except AttributeError: - try: - ispeed = ospeed = baudrate_constants[self._baudrate] - except KeyError: - #~ raise ValueError('Invalid baud rate: %r' % self._baudrate) - # may need custom baud rate, it isn't in our list. - ispeed = ospeed = getattr(TERMIOS, 'B38400') - try: - custom_baud = int(self._baudrate) # store for later - except ValueError: - raise ValueError('Invalid baud rate: %r' % self._baudrate) - else: - if custom_baud < 0: - raise ValueError('Invalid baud rate: %r' % self._baudrate) - - # setup char len - cflag &= ~TERMIOS.CSIZE - if self._bytesize == 8: - cflag |= TERMIOS.CS8 - elif self._bytesize == 7: - cflag |= TERMIOS.CS7 - elif self._bytesize == 6: - cflag |= TERMIOS.CS6 - elif self._bytesize == 5: - cflag |= TERMIOS.CS5 - else: - raise ValueError('Invalid char len: %r' % self._bytesize) - # setup stopbits - if self._stopbits == STOPBITS_ONE: - cflag &= ~(TERMIOS.CSTOPB) - elif self._stopbits == STOPBITS_ONE_POINT_FIVE: - cflag |= (TERMIOS.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5 - elif self._stopbits == STOPBITS_TWO: - cflag |= (TERMIOS.CSTOPB) - else: - raise ValueError('Invalid stop bit specification: %r' % self._stopbits) - # setup parity - iflag &= ~(TERMIOS.INPCK|TERMIOS.ISTRIP) - if self._parity == PARITY_NONE: - cflag &= ~(TERMIOS.PARENB|TERMIOS.PARODD) - elif self._parity == PARITY_EVEN: - cflag &= ~(TERMIOS.PARODD) - cflag |= (TERMIOS.PARENB) - elif self._parity == PARITY_ODD: - cflag |= (TERMIOS.PARENB|TERMIOS.PARODD) - else: - raise ValueError('Invalid parity: %r' % self._parity) - # setup flow control - # xonxoff - if hasattr(TERMIOS, 'IXANY'): - if self._xonxoff: - iflag |= (TERMIOS.IXON|TERMIOS.IXOFF) #|TERMIOS.IXANY) - else: - iflag &= ~(TERMIOS.IXON|TERMIOS.IXOFF|TERMIOS.IXANY) - else: - if self._xonxoff: - iflag |= (TERMIOS.IXON|TERMIOS.IXOFF) - else: - iflag &= ~(TERMIOS.IXON|TERMIOS.IXOFF) - # rtscts - if hasattr(TERMIOS, 'CRTSCTS'): - if self._rtscts: - cflag |= (TERMIOS.CRTSCTS) - else: - cflag &= ~(TERMIOS.CRTSCTS) - elif hasattr(TERMIOS, 'CNEW_RTSCTS'): # try it with alternate constant name - if self._rtscts: - cflag |= (TERMIOS.CNEW_RTSCTS) - else: - cflag &= ~(TERMIOS.CNEW_RTSCTS) - # XXX should there be a warning if setting up rtscts (and xonxoff etc) fails?? - - # buffer - # vmin "minimal number of characters to be read. = for non blocking" - if vmin < 0 or vmin > 255: - raise ValueError('Invalid vmin: %r ' % vmin) - cc[TERMIOS.VMIN] = vmin - # vtime - if vtime < 0 or vtime > 255: - raise ValueError('Invalid vtime: %r' % vtime) - cc[TERMIOS.VTIME] = vtime - # activate settings - termios.tcsetattr(self.fd, TERMIOS.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]) - - # apply custom baud rate, if any - if custom_baud is not None: - set_special_baudrate(self, custom_baud) - - def close(self): - """Close port""" - if self._isOpen: - if self.fd is not None: - os.close(self.fd) - self.fd = None - self._isOpen = False - - def makeDeviceName(self, port): - return device(port) - - # - - - - - - - - - - - - - - - - - - - - - - - - - - def inWaiting(self): - """Return the number of characters currently in the input buffer.""" - #~ s = fcntl.ioctl(self.fd, TERMIOS.FIONREAD, TIOCM_zero_str) - s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str) - return struct.unpack('I',s)[0] - - def read(self, size=1): - """Read size bytes from the serial port. If a timeout is set it may - return less characters as requested. With no timeout it will block - until the requested number of bytes is read.""" - if self.fd is None: raise portNotOpenError - read = bytearray() - inp = None - if size > 0: - while len(read) < size: - # print "\tread(): size",size, "have", len(read) #debug - ready,_,_ = select.select([self.fd], [], [], self._timeout) - if not ready: - break # timeout - buf = os.read(self.fd, size - len(read)) - read.extend(buf) - if ((self._timeout is not None and self._timeout >= 0) or - (self._interCharTimeout is not None and self._interCharTimeout > 0)) and not buf: - break # early abort on timeout - return bytes(read) - - def write(self, data): - """Output the given string over the serial port.""" - if self.fd is None: raise portNotOpenError - if not isinstance(data, (bytes, bytearray)): - raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) - t = len(data) - d = data - while t > 0: - try: - if self._writeTimeout is not None and self._writeTimeout > 0: - _, ready, _ = select.select([], [self.fd], [], self._writeTimeout) - if not ready: - raise writeTimeoutError - n = os.write(self.fd, d) - if self._writeTimeout is not None and self._writeTimeout > 0: - _, ready, _ = select.select([], [self.fd], [], self._writeTimeout) - if not ready: - raise writeTimeoutError - d = d[n:] - t = t - n - except OSError, v: - if v.errno != errno.EAGAIN: - raise - return len(data) - - def flush(self): - """Flush of file like objects. In this case, wait until all data - is written.""" - self.drainOutput() - - def flushInput(self): - """Clear input buffer, discarding all that is in the buffer.""" - if self.fd is None: - raise portNotOpenError - termios.tcflush(self.fd, TERMIOS.TCIFLUSH) - - def flushOutput(self): - """Clear output buffer, aborting the current output and - discarding all that is in the buffer.""" - if self.fd is None: - raise portNotOpenError - termios.tcflush(self.fd, TERMIOS.TCOFLUSH) - - def sendBreak(self, duration=0.25): - """Send break condition. Timed, returns to idle state after given duration.""" - if self.fd is None: - raise portNotOpenError - termios.tcsendbreak(self.fd, int(duration/0.25)) - - def setBreak(self, level=1): - """Set break: Controls TXD. When active, no transmitting is possible.""" - if self.fd is None: raise portNotOpenError - if level: - fcntl.ioctl(self.fd, TIOCSBRK) - else: - fcntl.ioctl(self.fd, TIOCCBRK) - - def setRTS(self, level=1): - """Set terminal status line: Request To Send""" - if self.fd is None: raise portNotOpenError - if level: - fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_RTS_str) - else: - fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_RTS_str) - - def setDTR(self, level=1): - """Set terminal status line: Data Terminal Ready""" - if self.fd is None: raise portNotOpenError - if level: - fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_DTR_str) - else: - fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_DTR_str) - - def getCTS(self): - """Read terminal status line: Clear To Send""" - if self.fd is None: raise portNotOpenError - s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_CTS != 0 - - def getDSR(self): - """Read terminal status line: Data Set Ready""" - if self.fd is None: raise portNotOpenError - s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_DSR != 0 - - def getRI(self): - """Read terminal status line: Ring Indicator""" - if self.fd is None: raise portNotOpenError - s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_RI != 0 - - def getCD(self): - """Read terminal status line: Carrier Detect""" - if self.fd is None: raise portNotOpenError - s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str) - return struct.unpack('I',s)[0] & TIOCM_CD != 0 - - # - - platform specific - - - - - - def drainOutput(self): - """internal - not portable!""" - if self.fd is None: raise portNotOpenError - termios.tcdrain(self.fd) - - def nonblocking(self): - """internal - not portable!""" - if self.fd is None: - raise portNotOpenError - fcntl.fcntl(self.fd, FCNTL.F_SETFL, FCNTL.O_NONBLOCK) - - def fileno(self): - """For easier use of the serial port instance with select. - WARNING: this function is not portable to different platforms!""" - if self.fd is None: raise portNotOpenError - return self.fd - - -# assemble Serial class with the platform specifc implementation and the base -# for file-like behavior. for Python 2.6 and newer, that provide the new I/O -# library, derrive from io.RawIOBase -try: - import io -except ImportError: - # classic version with our own file-like emulation - class Serial(PosixSerial, FileLike): - pass -else: - # io library present - class Serial(PosixSerial, io.RawIOBase): - pass - - -if __name__ == '__main__': - s = Serial(0, - baudrate=19200, # baud rate - bytesize=EIGHTBITS, # number of data bits - parity=PARITY_EVEN, # enable parity checking - stopbits=STOPBITS_ONE, # number of stop bits - timeout=3, # set a timeout value, None for waiting forever - xonxoff=0, # enable software flow control - rtscts=0, # enable RTS/CTS flow control - ) - s.setRTS(1) - s.setDTR(1) - s.flushInput() - s.flushOutput() - s.write('hello') - sys.stdio.write('%r\n' % s.read(5)) - sys.stdio.write('%s\n' % s.inWaiting()) - del s - diff --git a/pyserial/serial/serialutil.py b/pyserial/serial/serialutil.py deleted file mode 100644 index e568aa1..0000000 --- a/pyserial/serial/serialutil.py +++ /dev/null @@ -1,478 +0,0 @@ -#! python -# Python Serial Port Extension for Win32, Linux, BSD, Jython -# see __init__.py -# -# (C) 2001-2009 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt - -# compatibility folder Python < 2.6 -try: - bytes - bytearray -except (NameError, AttributeError): - # Python older than 2.6 do not have these types. Like for Python 2.6 they - # should behave like str. For Python older than 3.0 we want to work with - # strings anyway, only later versions have a true bytes type. - bytes = str - # bytearray is a mutable type that is easily turned into an instance of - # bytes - class bytearray(list): - # for bytes(bytearray()) usage - def __str__(self): return ''.join(self) - # append automatically converts integers to characters - def append(self, item): - if isinstance(item, str): - list.append(self, item) - else: - list.append(self, chr(item)) - -# create control bytes, depending on true type of bytes -# all Python versions prior 3.x convert str([17]) to '[17]' instead of '\x11' -if bytes is str: - XON = chr(17) - XOFF = chr(19) -else: - XON = bytes([17]) - XOFF = bytes([19]) - - -PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S' -STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2) -FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8) - -PARITY_NAMES = { - PARITY_NONE: 'None', - PARITY_EVEN: 'Even', - PARITY_ODD: 'Odd', - PARITY_MARK: 'Mark', - PARITY_SPACE: 'Space', -} - - -class SerialException(IOError): - """Base class for serial port related exceptions.""" - - -class SerialTimeoutException(SerialException): - """Write timeouts give an exception""" - - -writeTimeoutError = SerialTimeoutException("Write timeout") -portNotOpenError = ValueError('Attempting to use a port that is not open') - - -class FileLike(object): - """An abstract file like class. - - This class implements readline and readlines based on read and - writelines based on write. - This class is used to provide the above functions for to Serial - port objects. - - Note that when the serial port was opened with _NO_ timeout that - readline blocks until it sees a newline (or the specified size is - reached) and that readlines would never return and therefore - refuses to work (it raises an exception in this case)! - """ - - def __init__(self): - self.closed = True - - def close(self): - self.closed = True - - # so that ports are closed when objects are discarded - def __del__(self): - """Destructor. Calls close().""" - # The try/except block is in case this is called at program - # exit time, when it's possible that globals have already been - # deleted, and then the close() call might fail. Since - # there's nothing we can do about such failures and they annoy - # the end users, we suppress the traceback. - try: - self.close() - except: - pass - - def writelines(self, sequence): - for line in sequence: - self.write(line) - - def flush(self): - """flush of file like objects""" - pass - - # iterator for e.g. "for line in Serial(0): ..." usage - def next(self): - line = self.readline() - if not line: raise StopIteration - return line - - def __iter__(self): - return self - - # other functions of file-likes - not used by pySerial - - #~ readinto(b) - - def seek(self, pos, whence=0): - raise IOError("file is not seekable") - - def tell(self): - raise IOError("file is not seekable") - - def truncate(self, n=None): - raise IOError("file is not seekable") - - def isatty(self): - return False - - -class SerialBase(object): - """Serial port base class. Provides __init__ function and properties to - get/set port settings.""" - - # default values, may be overridden in subclasses that do not support all values - BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, - 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, - 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, - 3000000, 3500000, 4000000) - BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS) - PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE) - STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO) - - def __init__(self, - port = None, # number of device, numbering starts at - # zero. if everything fails, the user - # can specify a device string, note - # that this isn't portable anymore - # port will be opened if one is specified - baudrate=9600, # baud rate - bytesize=EIGHTBITS, # number of data bits - parity=PARITY_NONE, # enable parity checking - stopbits=STOPBITS_ONE, # number of stop bits - timeout=None, # set a timeout value, None to wait forever - xonxoff=0, # enable software flow control - rtscts=0, # enable RTS/CTS flow control - writeTimeout=None, # set a timeout for writes - dsrdtr=None, # None: use rtscts setting, dsrdtr override if true or false - interCharTimeout=None # Inter-character timeout, None to disable - ): - """Initialize comm port object. If a port is given, then the port will be - opened immediately. Otherwise a Serial port object in closed state - is returned.""" - - self._isOpen = False - self._port = None # correct value is assigned below through properties - self._baudrate = None # correct value is assigned below through properties - self._bytesize = None # correct value is assigned below through properties - self._parity = None # correct value is assigned below through properties - self._stopbits = None # correct value is assigned below through properties - self._timeout = None # correct value is assigned below through properties - self._writeTimeout = None # correct value is assigned below through properties - self._xonxoff = None # correct value is assigned below through properties - self._rtscts = None # correct value is assigned below through properties - self._dsrdtr = None # correct value is assigned below through properties - self._interCharTimeout = None # correct value is assigned below through properties - - # assign values using get/set methods using the properties feature - self.port = port - self.baudrate = baudrate - self.bytesize = bytesize - self.parity = parity - self.stopbits = stopbits - self.timeout = timeout - self.writeTimeout = writeTimeout - self.xonxoff = xonxoff - self.rtscts = rtscts - self.dsrdtr = dsrdtr - self.interCharTimeout = interCharTimeout - - if port is not None: - self.open() - - def isOpen(self): - """Check if the port is opened.""" - return self._isOpen - - # - - - - - - - - - - - - - - - - - - - - - - - - - - # TODO: these are not really needed as the is the BAUDRATES etc. attribute... - # maybe i remove them before the final release... - - def getSupportedBaudrates(self): - return [(str(b), b) for b in self.BAUDRATES] - - def getSupportedByteSizes(self): - return [(str(b), b) for b in self.BYTESIZES] - - def getSupportedStopbits(self): - return [(str(b), b) for b in self.STOPBITS] - - def getSupportedParities(self): - return [(PARITY_NAMES[b], b) for b in self.PARITIES] - - # - - - - - - - - - - - - - - - - - - - - - - - - - - def setPort(self, port): - """Change the port. The attribute portstr is set to a string that - contains the name of the port.""" - - was_open = self._isOpen - if was_open: self.close() - if port is not None: - if isinstance(port, basestring): - self.portstr = port - else: - self.portstr = self.makeDeviceName(port) - else: - self.portstr = None - self._port = port - self.name = self.portstr - if was_open: self.open() - - def getPort(self): - """Get the current port setting. The value that was passed on init or using - setPort() is passed back. See also the attribute portstr which contains - the name of the port as a string.""" - return self._port - - port = property(getPort, setPort, doc="Port setting") - - - def setBaudrate(self, baudrate): - """Change baud rate. It raises a ValueError if the port is open and the - baud rate is not possible. If the port is closed, then the value is - accepted and the exception is raised when the port is opened.""" - try: - self._baudrate = int(baudrate) - except TypeError: - raise ValueError("Not a valid baudrate: %r" % (baudrate,)) - else: - if self._isOpen: self._reconfigurePort() - - def getBaudrate(self): - """Get the current baud rate setting.""" - return self._baudrate - - baudrate = property(getBaudrate, setBaudrate, doc="Baud rate setting") - - - def setByteSize(self, bytesize): - """Change byte size.""" - if bytesize not in self.BYTESIZES: raise ValueError("Not a valid byte size: %r" % (bytesize,)) - self._bytesize = bytesize - if self._isOpen: self._reconfigurePort() - - def getByteSize(self): - """Get the current byte size setting.""" - return self._bytesize - - bytesize = property(getByteSize, setByteSize, doc="Byte size setting") - - - def setParity(self, parity): - """Change parity setting.""" - if parity not in self.PARITIES: raise ValueError("Not a valid parity: %r" % (parity,)) - self._parity = parity - if self._isOpen: self._reconfigurePort() - - def getParity(self): - """Get the current parity setting.""" - return self._parity - - parity = property(getParity, setParity, doc="Parity setting") - - - def setStopbits(self, stopbits): - """Change stop bits size.""" - if stopbits not in self.STOPBITS: raise ValueError("Not a valid stop bit size: %r" % (stopbits,)) - self._stopbits = stopbits - if self._isOpen: self._reconfigurePort() - - def getStopbits(self): - """Get the current stop bits setting.""" - return self._stopbits - - stopbits = property(getStopbits, setStopbits, doc="Stop bits setting") - - - def setTimeout(self, timeout): - """Change timeout setting.""" - if timeout is not None: - try: - timeout + 1 # test if it's a number, will throw a TypeError if not... - except TypeError: - raise ValueError("Not a valid timeout: %r" % (timeout,)) - if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,)) - self._timeout = timeout - if self._isOpen: self._reconfigurePort() - - def getTimeout(self): - """Get the current timeout setting.""" - return self._timeout - - timeout = property(getTimeout, setTimeout, doc="Timeout setting for read()") - - - def setWriteTimeout(self, timeout): - """Change timeout setting.""" - if timeout is not None: - if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,)) - try: - timeout + 1 #test if it's a number, will throw a TypeError if not... - except TypeError: - raise ValueError("Not a valid timeout: %r" % timeout) - - self._writeTimeout = timeout - if self._isOpen: self._reconfigurePort() - - def getWriteTimeout(self): - """Get the current timeout setting.""" - return self._writeTimeout - - writeTimeout = property(getWriteTimeout, setWriteTimeout, doc="Timeout setting for write()") - - - def setXonXoff(self, xonxoff): - """Change XON/XOFF setting.""" - self._xonxoff = xonxoff - if self._isOpen: self._reconfigurePort() - - def getXonXoff(self): - """Get the current XON/XOFF setting.""" - return self._xonxoff - - xonxoff = property(getXonXoff, setXonXoff, doc="XON/XOFF setting") - - def setRtsCts(self, rtscts): - """Change RTS/CTS flow control setting.""" - self._rtscts = rtscts - if self._isOpen: self._reconfigurePort() - - def getRtsCts(self): - """Get the current RTS/CTS flow control setting.""" - return self._rtscts - - rtscts = property(getRtsCts, setRtsCts, doc="RTS/CTS flow control setting") - - def setDsrDtr(self, dsrdtr=None): - """Change DsrDtr flow control setting.""" - if dsrdtr is None: - # if not set, keep backwards compatibility and follow rtscts setting - self._dsrdtr = self._rtscts - else: - # if defined independently, follow its value - self._dsrdtr = dsrdtr - if self._isOpen: self._reconfigurePort() - - def getDsrDtr(self): - """Get the current DSR/DTR flow control setting.""" - return self._dsrdtr - - dsrdtr = property(getDsrDtr, setDsrDtr, "DSR/DTR flow control setting") - - def setInterCharTimeout(self, interCharTimeout): - """Change inter-character timeout setting.""" - if interCharTimeout is not None: - if interCharTimeout < 0: raise ValueError("Not a valid timeout: %r" % interCharTimeout) - try: - interCharTimeout + 1 # test if it's a number, will throw a TypeError if not... - except TypeError: - raise ValueError("Not a valid timeout: %r" % interCharTimeout) - - self._interCharTimeout = interCharTimeout - if self._isOpen: self._reconfigurePort() - - def getInterCharTimeout(self): - """Get the current inter-character timeout setting.""" - return self._interCharTimeout - - interCharTimeout = property(getInterCharTimeout, setInterCharTimeout, doc="Inter-character timeout setting for read()") - - - # - - - - - - - - - - - - - - - - - - - - - - - - - - def __repr__(self): - """String representation of the current port settings and its state.""" - return "%s<id=0x%x, open=%s>(port=%r, baudrate=%r, bytesize=%r, parity=%r, stopbits=%r, timeout=%r, xonxoff=%r, rtscts=%r, dsrdtr=%r)" % ( - self.__class__.__name__, - id(self), - self._isOpen, - self.portstr, - self.baudrate, - self.bytesize, - self.parity, - self.stopbits, - self.timeout, - self.xonxoff, - self.rtscts, - self.dsrdtr, - ) - - # - - - - - - - - - - - - - - - - - - - - - - - - - - def readline(self, size=None, eol='\n'): - """read a line which is terminated with end-of-line (eol) character - ('\n' by default) or until timeout""" - line = '' - while 1: - c = self.read(1) - if c: - line += c # not very efficient but lines are usually not that long - if c == eol: - break - if size is not None and len(line) >= size: - break - else: - break - return bytes(line) - - def readlines(self, sizehint=None, eol='\n'): - """read a list of lines, until timeout - sizehint is ignored""" - if self.timeout is None: - raise ValueError("Serial port MUST have enabled timeout for this function!") - lines = [] - while 1: - line = self.readline(eol=eol) - if line: - lines.append(line) - if line[-1] != eol: # was the line received with a timeout? - break - else: - break - return lines - - def xreadlines(self, sizehint=None): - """just call readlines - here for compatibility""" - return self.readlines() - - # - - - - - - - - - - - - - - - - - - - - - - - - - # compatibility with io library - - def readable(self): return True - def writable(self): return True - def seekable(self): return False - def readinto(self, b): - data = self.read(len(b)) - n = len(data) - try: - b[:n] = data - except TypeError, err: - import array - if not isinstance(b, array.array): - raise err - b[:n] = array.array('b', data) - return n - - -if __name__ == '__main__': - import sys - s = SerialBase() - sys.stdout.write('port name: %s\n' % s.portstr) - sys.stdout.write('baud rates: %s\n' % s.getSupportedBaudrates()) - sys.stdout.write('byte sizes: %s\n' % s.getSupportedByteSizes()) - sys.stdout.write('parities: %s\n' % s.getSupportedParities()) - sys.stdout.write('stop bits: %s\n' % s.getSupportedStopbits()) - sys.stdout.write('%s\n' % s) diff --git a/pyserial/serial/serialwin32.py b/pyserial/serial/serialwin32.py deleted file mode 100644 index af436bb..0000000 --- a/pyserial/serial/serialwin32.py +++ /dev/null @@ -1,383 +0,0 @@ -#! python -# Python Serial Port Extension for Win32, Linux, BSD, Jython -# serial driver for win32 -# see __init__.py -# -# (C) 2001-2009 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt -# -# Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com> - -import ctypes -import win32 - -from serialutil import * - - -def device(portnum): - """Turn a port number into a device name""" - return 'COM%d' % (portnum+1) # numbers are transformed to a string - - -class Win32Serial(SerialBase): - """Serial port implementation for Win32 based on ctypes.""" - - BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, - 9600, 19200, 38400, 57600, 115200) - - def open(self): - """Open port with current settings. This may throw a SerialException - if the port cannot be opened.""" - if self._port is None: - raise SerialException("Port must be configured before it can be used.") - self.hComPort = None - # the "\\.\COMx" format is required for devices other than COM1-COM8 - # not all versions of windows seem to support this properly - # so that the first few ports are used with the DOS device name - port = self.portstr - try: - if port.upper().startswith('COM') and int(port[3:]) > 8: - port = '\\\\.\\' + port - except ValueError: - # for like COMnotanumber - pass - self.hComPort = win32.CreateFile(port, - win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, # exclusive access - None, # no security - win32.OPEN_EXISTING, - win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED, - 0) - if self.hComPort == win32.INVALID_HANDLE_VALUE: - self.hComPort = None # 'cause __del__ is called anyway - raise SerialException("could not open port %s: %s" % (self.portstr, ctypes.WinError())) - - # Setup a 4k buffer - win32.SetupComm(self.hComPort, 4096, 4096) - - # Save original timeout values: - self._orgTimeouts = win32.COMMTIMEOUTS() - win32.GetCommTimeouts(self.hComPort, ctypes.byref(self._orgTimeouts)) - - self._rtsState = win32.RTS_CONTROL_ENABLE - self._dtrState = win32.DTR_CONTROL_ENABLE - - self._reconfigurePort() - - # Clear buffers: - # Remove anything that was there - win32.PurgeComm(self.hComPort, - win32.PURGE_TXCLEAR | win32.PURGE_TXABORT | - win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) - - self._overlappedRead = win32.OVERLAPPED() - self._overlappedRead.hEvent = win32.CreateEvent(None, 1, 0, None) - self._overlappedWrite = win32.OVERLAPPED() - #~ self._overlappedWrite.hEvent = win32.CreateEvent(None, 1, 0, None) - self._overlappedWrite.hEvent = win32.CreateEvent(None, 0, 0, None) - self._isOpen = True - - def _reconfigurePort(self): - """Set communication parameters on opened port.""" - if not self.hComPort: - raise SerialException("Can only operate on a valid port handle") - - # Set Windows timeout values - # timeouts is a tuple with the following items: - # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier, - # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier, - # WriteTotalTimeoutConstant) - if self._timeout is None: - timeouts = (0, 0, 0, 0, 0) - elif self._timeout == 0: - timeouts = (win32.MAXDWORD, 0, 0, 0, 0) - else: - timeouts = (0, 0, int(self._timeout*1000), 0, 0) - if self._timeout != 0 and self._interCharTimeout is not None: - timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:] - - if self._writeTimeout is None: - pass - elif self._writeTimeout == 0: - timeouts = timeouts[:-2] + (0, win32.MAXDWORD) - else: - timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000)) - win32.SetCommTimeouts(self.hComPort, ctypes.byref(win32.COMMTIMEOUTS(*timeouts))) - - win32.SetCommMask(self.hComPort, win32.EV_ERR) - - # Setup the connection info. - # Get state and modify it: - comDCB = win32.DCB() - win32.GetCommState(self.hComPort, ctypes.byref(comDCB)) - comDCB.BaudRate = self._baudrate - - if self._bytesize == FIVEBITS: - comDCB.ByteSize = 5 - elif self._bytesize == SIXBITS: - comDCB.ByteSize = 6 - elif self._bytesize == SEVENBITS: - comDCB.ByteSize = 7 - elif self._bytesize == EIGHTBITS: - comDCB.ByteSize = 8 - else: - raise ValueError("Unsupported number of data bits: %r" % self._bytesize) - - if self._parity == PARITY_NONE: - comDCB.Parity = win32.NOPARITY - comDCB.fParity = 0 # Disable Parity Check - elif self._parity == PARITY_EVEN: - comDCB.Parity = win32.EVENPARITY - comDCB.fParity = 1 # Enable Parity Check - elif self._parity == PARITY_ODD: - comDCB.Parity = win32.ODDPARITY - comDCB.fParity = 1 # Enable Parity Check - elif self._parity == PARITY_MARK: - comDCB.Parity = win32.MARKPARITY - comDCB.fParity = 1 # Enable Parity Check - elif self._parity == PARITY_SPACE: - comDCB.Parity = win32.SPACEPARITY - comDCB.fParity = 1 # Enable Parity Check - else: - raise ValueError("Unsupported parity mode: %r" % self._parity) - - if self._stopbits == STOPBITS_ONE: - comDCB.StopBits = win32.ONESTOPBIT - elif self._stopbits == STOPBITS_ONE_POINT_FIVE: - comDCB.StopBits = win32.ONE5STOPBITS - elif self._stopbits == STOPBITS_TWO: - comDCB.StopBits = win32.TWOSTOPBITS - else: - raise ValueError("Unsupported number of stop bits: %r" % self._stopbits) - - comDCB.fBinary = 1 # Enable Binary Transmission - # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE) - if self._rtscts: - comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE - else: - comDCB.fRtsControl = self._rtsState - if self._dsrdtr: - comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE - else: - comDCB.fDtrControl = self._dtrState - comDCB.fOutxCtsFlow = self._rtscts - comDCB.fOutxDsrFlow = self._dsrdtr - comDCB.fOutX = self._xonxoff - comDCB.fInX = self._xonxoff - comDCB.fNull = 0 - comDCB.fErrorChar = 0 - comDCB.fAbortOnError = 0 - comDCB.XonChar = XON - comDCB.XoffChar = XOFF - - if not win32.SetCommState(self.hComPort, ctypes.byref(comDCB)): - raise ValueError("Cannot configure port, some setting was wrong. Original message: %s" % ctypes.WinError()) - - #~ def __del__(self): - #~ self.close() - - def close(self): - """Close port""" - if self._isOpen: - if self.hComPort: - # Restore original timeout values: - win32.SetCommTimeouts(self.hComPort, self._orgTimeouts) - # Close COM-Port: - win32.CloseHandle(self.hComPort) - win32.CloseHandle(self._overlappedRead.hEvent) - win32.CloseHandle(self._overlappedWrite.hEvent) - self.hComPort = None - self._isOpen = False - - def makeDeviceName(self, port): - return device(port) - - # - - - - - - - - - - - - - - - - - - - - - - - - - - def inWaiting(self): - """Return the number of characters currently in the input buffer.""" - flags = win32.DWORD() - comstat = win32.COMSTAT() - if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)): - raise SerialException('call to ClearCommError failed') - return comstat.cbInQue - - def read(self, size=1): - """Read size bytes from the serial port. If a timeout is set it may - return less characters as requested. With no timeout it will block - until the requested number of bytes is read.""" - if not self.hComPort: raise portNotOpenError - if size > 0: - win32.ResetEvent(self._overlappedRead.hEvent) - flags = win32.DWORD() - comstat = win32.COMSTAT() - if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)): - raise SerialException('call to ClearCommError failed') - if self.timeout == 0: - n = min(comstat.cbInQue, size) - if n > 0: - buf = ctypes.create_string_buffer(n) - rc = win32.DWORD() - err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead)) - if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: - raise SerialException("ReadFile failed (%s)" % ctypes.WinError()) - err = win32.WaitForSingleObject(self._overlappedRead.hEvent, win32.INFINITE) - read = buf.raw[:rc.value] - else: - read = bytes() - else: - buf = ctypes.create_string_buffer(size) - rc = win32.DWORD() - err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead)) - if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: - raise SerialException("ReadFile failed (%s)" % ctypes.WinError()) - err = win32.GetOverlappedResult(self.hComPort, ctypes.byref(self._overlappedRead), ctypes.byref(rc), True) - read = buf.raw[:rc.value] - else: - read = bytes() - return bytes(read) - - def write(self, data): - """Output the given string over the serial port.""" - if not self.hComPort: raise portNotOpenError - #~ if not isinstance(data, (bytes, bytearray)): - #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) - # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview - data = bytes(data) - if data: - #~ win32event.ResetEvent(self._overlappedWrite.hEvent) - n = win32.DWORD() - err = win32.WriteFile(self.hComPort, data, len(data), ctypes.byref(n), self._overlappedWrite) - if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: - raise SerialException("WriteFile failed (%s)" % ctypes.WinError()) - # Wait for the write to complete. - #~ win32.WaitForSingleObject(self._overlappedWrite.hEvent, win32.INFINITE) - err = win32.GetOverlappedResult(self.hComPort, self._overlappedWrite, ctypes.byref(n), True) - if n.value != len(data): - raise writeTimeoutError - return n.value - else: - return 0 - - - def flushInput(self): - """Clear input buffer, discarding all that is in the buffer.""" - if not self.hComPort: raise portNotOpenError - win32.PurgeComm(self.hComPort, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) - - def flushOutput(self): - """Clear output buffer, aborting the current output and - discarding all that is in the buffer.""" - if not self.hComPort: raise portNotOpenError - win32.PurgeComm(self.hComPort, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT) - - def sendBreak(self, duration=0.25): - """Send break condition. Timed, returns to idle state after given duration.""" - if not self.hComPort: raise portNotOpenError - import time - win32.SetCommBreak(self.hComPort) - time.sleep(duration) - win32.ClearCommBreak(self.hComPort) - - def setBreak(self, level=1): - """Set break: Controls TXD. When active, to transmitting is possible.""" - if not self.hComPort: raise portNotOpenError - if level: - win32.SetCommBreak(self.hComPort) - else: - win32.ClearCommBreak(self.hComPort) - - def setRTS(self, level=1): - """Set terminal status line: Request To Send""" - if not self.hComPort: raise portNotOpenError - if level: - self._rtsState = win32.RTS_CONTROL_ENABLE - win32.EscapeCommFunction(self.hComPort, win32.SETRTS) - else: - self._rtsState = win32.RTS_CONTROL_DISABLE - win32.EscapeCommFunction(self.hComPort, win32.CLRRTS) - - def setDTR(self, level=1): - """Set terminal status line: Data Terminal Ready""" - if not self.hComPort: raise portNotOpenError - if level: - self._dtrState = win32.DTR_CONTROL_ENABLE - win32.EscapeCommFunction(self.hComPort, win32.SETDTR) - else: - self._dtrState = win32.DTR_CONTROL_DISABLE - win32.EscapeCommFunction(self.hComPort, win32.CLRDTR) - - def _GetCommModemStatus(self): - stat = win32.DWORD() - win32.GetCommModemStatus(self.hComPort, ctypes.byref(stat)) - return stat.value - - def getCTS(self): - """Read terminal status line: Clear To Send""" - if not self.hComPort: raise portNotOpenError - return win32.MS_CTS_ON & self._GetCommModemStatus() != 0 - - def getDSR(self): - """Read terminal status line: Data Set Ready""" - if not self.hComPort: raise portNotOpenError - return win32.MS_DSR_ON & self._GetCommModemStatus() != 0 - - def getRI(self): - """Read terminal status line: Ring Indicator""" - if not self.hComPort: raise portNotOpenError - return win32.MS_RING_ON & self._GetCommModemStatus() != 0 - - def getCD(self): - """Read terminal status line: Carrier Detect""" - if not self.hComPort: raise portNotOpenError - return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0 - - # - - platform specific - - - - - - def setXON(self, level=True): - """Platform specific - set flow state.""" - if not self.hComPort: raise portNotOpenError - if level: - win32.EscapeCommFunction(self.hComPort, win32.SETXON) - else: - win32.EscapeCommFunction(self.hComPort, win32.SETXOFF) - - def outWaiting(self): - """return how many characters the in the outgoing buffer""" - flags = win32.DWORD() - comstat = win32.COMSTAT() - if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)): - raise SerialException('call to ClearCommError failed') - return comstat.cbOutQue - - -# assemble Serial class with the platform specific implementation and the base -# for file-like behavior. for Python 2.6 and newer, that provide the new I/O -# library, derive from io.RawIOBase -try: - import io -except ImportError: - # classic version with our own file-like emulation - class Serial(Win32Serial, FileLike): - pass -else: - # io library present - class Serial(Win32Serial, io.RawIOBase): - pass - - -# Nur Testfunktion!! -if __name__ == '__main__': - s = Serial(0) - sys.stdout.write("%s\n" % s) - - s = Serial() - sys.stdout.write("%s\n" % s) - - s.baudrate = 19200 - s.databits = 7 - s.close() - s.port = 0 - s.open() - sys.stdout.write("%s\n" % s) - diff --git a/pyserial/serial/sermsdos.py b/pyserial/serial/sermsdos.py deleted file mode 100644 index c3560d0..0000000 --- a/pyserial/serial/sermsdos.py +++ /dev/null @@ -1,200 +0,0 @@ -# sermsdos.py -# -# History: -# -# 3rd September 2002 Dave Haynes -# 1. First defined -# -# Although this code should run under the latest versions of -# Python, on DOS-based platforms such as Windows 95 and 98, -# it has been specifically written to be compatible with -# PyDOS, available at: -# http://www.python.org/ftp/python/wpy/dos.html -# -# PyDOS is a stripped-down version of Python 1.5.2 for -# DOS machines. Therefore, in making changes to this file, -# please respect Python 1.5.2 syntax. In addition, please -# limit the width of this file to 60 characters. -# -# Note also that the modules in PyDOS contain fewer members -# than other versions, so we are restricted to using the -# following: -# -# In module os: -# ------------- -# environ, chdir, getcwd, getpid, umask, fdopen, close, -# dup, dup2, fstat, lseek, open, read, write, O_RDONLY, -# O_WRONLY, O_RDWR, O_APPEND, O_CREAT, O_EXCL, O_TRUNC, -# access, F_OK, R_OK, W_OK, X_OK, chmod, listdir, mkdir, -# remove, rename, renames, rmdir, stat, unlink, utime, -# execl, execle, execlp, execlpe, execvp, execvpe, _exit, -# system. -# -# In module os.path: -# ------------------ -# curdir, pardir, sep, altsep, pathsep, defpath, linesep. -# - -import os -import sys -import string -import serialutil - -BAUD_RATES = { - 110: "11", - 150: "15", - 300: "30", - 600: "60", - 1200: "12", - 2400: "24", - 4800: "48", - 9600: "96", - 19200: "19"} - -(PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, -PARITY_SPACE) = (0, 1, 2, 3, 4) -(STOPBITS_ONE, STOPBITS_ONEANDAHALF, -STOPBITS_TWO) = (1, 1.5, 2) -FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8) -(RETURN_ERROR, RETURN_BUSY, RETURN_RETRY, RETURN_READY, -RETURN_NONE) = ('E', 'B', 'P', 'R', 'N') -portNotOpenError = ValueError('port not open') - -def device(portnum): - return 'COM%d' % (portnum+1) - -class Serial(serialutil.FileLike): - """ - port: number of device; numbering starts at - zero. if everything fails, the user can - specify a device string, note that this - isn't portable any more - baudrate: baud rate - bytesize: number of databits - parity: enable parity checking - stopbits: number of stopbits - timeout: set a timeout (None for waiting forever) - xonxoff: enable software flow control - rtscts: enable RTS/CTS flow control - retry: DOS retry mode - """ - def __init__(self, - port, - baudrate = 9600, - bytesize = EIGHTBITS, - parity = PARITY_NONE, - stopbits = STOPBITS_ONE, - timeout = None, - xonxoff = 0, - rtscts = 0, - retry = RETURN_RETRY - ): - - if type(port) == type(''): - # strings are taken directly - self.portstr = port - else: - # numbers are transformed to a string - self.portstr = device(port+1) - - self.baud = BAUD_RATES[baudrate] - self.bytesize = str(bytesize) - - if parity == PARITY_NONE: - self.parity = 'N' - elif parity == PARITY_EVEN: - self.parity = 'E' - elif parity == PARITY_ODD: - self.parity = 'O' - elif parity == PARITY_MARK: - self.parity = 'M' - elif parity == PARITY_SPACE: - self.parity = 'S' - - self.stop = str(stopbits) - self.retry = retry - self.filename = "sermsdos.tmp" - - self._config(self.portstr, self.baud, self.parity, - self.bytesize, self.stop, self.retry, self.filename) - - def __del__(self): - self.close() - - def close(self): - pass - - def _config(self, port, baud, parity, data, stop, retry, - filename): - comString = string.join(("MODE ", port, ":" - , " BAUD= ", baud, " PARITY= ", parity - , " DATA= ", data, " STOP= ", stop, " RETRY= ", - retry, " > ", filename ), '') - os.system(comString) - - def setBaudrate(self, baudrate): - self._config(self.portstr, BAUD_RATES[baudrate], - self.parity, self.bytesize, self.stop, self.retry, - self.filename) - - def inWaiting(self): - """returns the number of bytes waiting to be read""" - raise NotImplementedError - - def read(self, num = 1): - """Read num bytes from serial port""" - handle = os.open(self.portstr, - os.O_RDONLY | os.O_BINARY) - rv = os.read(handle, num) - os.close(handle) - return rv - - def write(self, s): - """Write string to serial port""" - handle = os.open(self.portstr, - os.O_WRONLY | os.O_BINARY) - rv = os.write(handle, s) - os.close(handle) - return rv - - def flushInput(self): - raise NotImplementedError - - def flushOutput(self): - raise NotImplementedError - - def sendBreak(self): - raise NotImplementedError - - def setRTS(self,level=1): - """Set terminal status line""" - raise NotImplementedError - - def setDTR(self,level=1): - """Set terminal status line""" - raise NotImplementedError - - def getCTS(self): - """Eead terminal status line""" - raise NotImplementedError - - def getDSR(self): - """Eead terminal status line""" - raise NotImplementedError - - def getRI(self): - """Eead terminal status line""" - raise NotImplementedError - - def getCD(self): - """Eead terminal status line""" - raise NotImplementedError - - def __repr__(self): - return string.join(( "<Serial>: ", self.portstr - , self.baud, self.parity, self.bytesize, self.stop, - self.retry , self.filename), ' ') - -if __name__ == '__main__': - s = Serial(0) - sys.stdio.write('%s %s\n' % (__name__, s)) diff --git a/pyserial/serial/win32.py b/pyserial/serial/win32.py deleted file mode 100644 index 9b8c118..0000000 --- a/pyserial/serial/win32.py +++ /dev/null @@ -1,288 +0,0 @@ -from ctypes import * -from ctypes.wintypes import HANDLE -from ctypes.wintypes import BOOL -from ctypes.wintypes import LPCWSTR -_stdcall_libraries = {} -_stdcall_libraries['kernel32'] = WinDLL('kernel32') -from ctypes.wintypes import DWORD -from ctypes.wintypes import WORD -from ctypes.wintypes import BYTE - -INVALID_HANDLE_VALUE = HANDLE(-1).value - -class _SECURITY_ATTRIBUTES(Structure): - pass -LPSECURITY_ATTRIBUTES = POINTER(_SECURITY_ATTRIBUTES) - -CreateEventW = _stdcall_libraries['kernel32'].CreateEventW -CreateEventW.restype = HANDLE -CreateEventW.argtypes = [LPSECURITY_ATTRIBUTES, BOOL, BOOL, LPCWSTR] -CreateEvent = CreateEventW # alias - -CreateFileW = _stdcall_libraries['kernel32'].CreateFileW -CreateFileW.restype = HANDLE -CreateFileW.argtypes = [LPCWSTR, DWORD, DWORD, LPSECURITY_ATTRIBUTES, DWORD, DWORD, HANDLE] -CreateFile = CreateFileW # alias - -class _OVERLAPPED(Structure): - pass -OVERLAPPED = _OVERLAPPED - -class _COMSTAT(Structure): - pass -COMSTAT = _COMSTAT - -class _DCB(Structure): - pass -DCB = _DCB - -class _COMMTIMEOUTS(Structure): - pass -COMMTIMEOUTS = _COMMTIMEOUTS - -GetLastError = _stdcall_libraries['kernel32'].GetLastError -GetLastError.restype = DWORD -GetLastError.argtypes = [] - -LPOVERLAPPED = POINTER(_OVERLAPPED) -LPDWORD = POINTER(DWORD) - -GetOverlappedResult = _stdcall_libraries['kernel32'].GetOverlappedResult -GetOverlappedResult.restype = BOOL -GetOverlappedResult.argtypes = [HANDLE, LPOVERLAPPED, LPDWORD, BOOL] - -ResetEvent = _stdcall_libraries['kernel32'].ResetEvent -ResetEvent.restype = BOOL -ResetEvent.argtypes = [HANDLE] - -LPCVOID = c_void_p - -WriteFile = _stdcall_libraries['kernel32'].WriteFile -WriteFile.restype = BOOL -WriteFile.argtypes = [HANDLE, LPCVOID, DWORD, LPDWORD, LPOVERLAPPED] - -LPVOID = c_void_p - -ReadFile = _stdcall_libraries['kernel32'].ReadFile -ReadFile.restype = BOOL -ReadFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, LPOVERLAPPED] - -CloseHandle = _stdcall_libraries['kernel32'].CloseHandle -CloseHandle.restype = BOOL -CloseHandle.argtypes = [HANDLE] - -ClearCommBreak = _stdcall_libraries['kernel32'].ClearCommBreak -ClearCommBreak.restype = BOOL -ClearCommBreak.argtypes = [HANDLE] - -LPCOMSTAT = POINTER(_COMSTAT) - -ClearCommError = _stdcall_libraries['kernel32'].ClearCommError -ClearCommError.restype = BOOL -ClearCommError.argtypes = [HANDLE, LPDWORD, LPCOMSTAT] - -SetupComm = _stdcall_libraries['kernel32'].SetupComm -SetupComm.restype = BOOL -SetupComm.argtypes = [HANDLE, DWORD, DWORD] - -EscapeCommFunction = _stdcall_libraries['kernel32'].EscapeCommFunction -EscapeCommFunction.restype = BOOL -EscapeCommFunction.argtypes = [HANDLE, DWORD] - -GetCommModemStatus = _stdcall_libraries['kernel32'].GetCommModemStatus -GetCommModemStatus.restype = BOOL -GetCommModemStatus.argtypes = [HANDLE, LPDWORD] - -LPDCB = POINTER(_DCB) - -GetCommState = _stdcall_libraries['kernel32'].GetCommState -GetCommState.restype = BOOL -GetCommState.argtypes = [HANDLE, LPDCB] - -LPCOMMTIMEOUTS = POINTER(_COMMTIMEOUTS) - -GetCommTimeouts = _stdcall_libraries['kernel32'].GetCommTimeouts -GetCommTimeouts.restype = BOOL -GetCommTimeouts.argtypes = [HANDLE, LPCOMMTIMEOUTS] - -PurgeComm = _stdcall_libraries['kernel32'].PurgeComm -PurgeComm.restype = BOOL -PurgeComm.argtypes = [HANDLE, DWORD] - -SetCommBreak = _stdcall_libraries['kernel32'].SetCommBreak -SetCommBreak.restype = BOOL -SetCommBreak.argtypes = [HANDLE] - -SetCommMask = _stdcall_libraries['kernel32'].SetCommMask -SetCommMask.restype = BOOL -SetCommMask.argtypes = [HANDLE, DWORD] - -SetCommState = _stdcall_libraries['kernel32'].SetCommState -SetCommState.restype = BOOL -SetCommState.argtypes = [HANDLE, LPDCB] - -SetCommTimeouts = _stdcall_libraries['kernel32'].SetCommTimeouts -SetCommTimeouts.restype = BOOL -SetCommTimeouts.argtypes = [HANDLE, LPCOMMTIMEOUTS] - -WaitForSingleObject = _stdcall_libraries['kernel32'].WaitForSingleObject -WaitForSingleObject.restype = DWORD -WaitForSingleObject.argtypes = [HANDLE, DWORD] - -ONESTOPBIT = 0 # Variable c_int -TWOSTOPBITS = 2 # Variable c_int -ONE5STOPBITS = 1 - -NOPARITY = 0 # Variable c_int -ODDPARITY = 1 # Variable c_int -EVENPARITY = 2 # Variable c_int -MARKPARITY = 3 -SPACEPARITY = 4 - -RTS_CONTROL_HANDSHAKE = 2 # Variable c_int -RTS_CONTROL_DISABLE = 0 # Variable c_int -RTS_CONTROL_ENABLE = 1 # Variable c_int -SETRTS = 3 -CLRRTS = 4 - -DTR_CONTROL_HANDSHAKE = 2 # Variable c_int -DTR_CONTROL_DISABLE = 0 # Variable c_int -DTR_CONTROL_ENABLE = 1 # Variable c_int -SETDTR = 5 -CLRDTR = 6 - -MS_DSR_ON = 32 # Variable c_ulong -EV_RING = 256 # Variable c_int -EV_PERR = 512 # Variable c_int -EV_ERR = 128 # Variable c_int -SETXOFF = 1 # Variable c_int -EV_RXCHAR = 1 # Variable c_int -GENERIC_WRITE = 1073741824 # Variable c_long -PURGE_TXCLEAR = 4 # Variable c_int -FILE_FLAG_OVERLAPPED = 1073741824 # Variable c_int -EV_DSR = 16 # Variable c_int -MAXDWORD = 4294967295L # Variable c_uint -EV_RLSD = 32 # Variable c_int -ERROR_IO_PENDING = 997 # Variable c_long -MS_CTS_ON = 16 # Variable c_ulong -EV_EVENT1 = 2048 # Variable c_int -EV_RX80FULL = 1024 # Variable c_int -PURGE_RXABORT = 2 # Variable c_int -FILE_ATTRIBUTE_NORMAL = 128 # Variable c_int -PURGE_TXABORT = 1 # Variable c_int -SETXON = 2 # Variable c_int -OPEN_EXISTING = 3 # Variable c_int -MS_RING_ON = 64 # Variable c_ulong -EV_TXEMPTY = 4 # Variable c_int -EV_RXFLAG = 2 # Variable c_int -MS_RLSD_ON = 128 # Variable c_ulong -GENERIC_READ = 2147483648L # Variable c_ulong -EV_EVENT2 = 4096 # Variable c_int -EV_CTS = 8 # Variable c_int -EV_BREAK = 64 # Variable c_int -PURGE_RXCLEAR = 8 # Variable c_int -ULONG_PTR = c_ulong -INFINITE = 0xFFFFFFFFL - -class N11_OVERLAPPED4DOLLAR_48E(Union): - pass -class N11_OVERLAPPED4DOLLAR_484DOLLAR_49E(Structure): - pass -N11_OVERLAPPED4DOLLAR_484DOLLAR_49E._fields_ = [ - ('Offset', DWORD), - ('OffsetHigh', DWORD), -] - -PVOID = c_void_p - -N11_OVERLAPPED4DOLLAR_48E._anonymous_ = ['_0'] -N11_OVERLAPPED4DOLLAR_48E._fields_ = [ - ('_0', N11_OVERLAPPED4DOLLAR_484DOLLAR_49E), - ('Pointer', PVOID), -] -_OVERLAPPED._anonymous_ = ['_0'] -_OVERLAPPED._fields_ = [ - ('Internal', ULONG_PTR), - ('InternalHigh', ULONG_PTR), - ('_0', N11_OVERLAPPED4DOLLAR_48E), - ('hEvent', HANDLE), -] -_SECURITY_ATTRIBUTES._fields_ = [ - ('nLength', DWORD), - ('lpSecurityDescriptor', LPVOID), - ('bInheritHandle', BOOL), -] -_COMSTAT._fields_ = [ - ('fCtsHold', DWORD, 1), - ('fDsrHold', DWORD, 1), - ('fRlsdHold', DWORD, 1), - ('fXoffHold', DWORD, 1), - ('fXoffSent', DWORD, 1), - ('fEof', DWORD, 1), - ('fTxim', DWORD, 1), - ('fReserved', DWORD, 25), - ('cbInQue', DWORD), - ('cbOutQue', DWORD), -] -_DCB._fields_ = [ - ('DCBlength', DWORD), - ('BaudRate', DWORD), - ('fBinary', DWORD, 1), - ('fParity', DWORD, 1), - ('fOutxCtsFlow', DWORD, 1), - ('fOutxDsrFlow', DWORD, 1), - ('fDtrControl', DWORD, 2), - ('fDsrSensitivity', DWORD, 1), - ('fTXContinueOnXoff', DWORD, 1), - ('fOutX', DWORD, 1), - ('fInX', DWORD, 1), - ('fErrorChar', DWORD, 1), - ('fNull', DWORD, 1), - ('fRtsControl', DWORD, 2), - ('fAbortOnError', DWORD, 1), - ('fDummy2', DWORD, 17), - ('wReserved', WORD), - ('XonLim', WORD), - ('XoffLim', WORD), - ('ByteSize', BYTE), - ('Parity', BYTE), - ('StopBits', BYTE), - ('XonChar', c_char), - ('XoffChar', c_char), - ('ErrorChar', c_char), - ('EofChar', c_char), - ('EvtChar', c_char), - ('wReserved1', WORD), -] -_COMMTIMEOUTS._fields_ = [ - ('ReadIntervalTimeout', DWORD), - ('ReadTotalTimeoutMultiplier', DWORD), - ('ReadTotalTimeoutConstant', DWORD), - ('WriteTotalTimeoutMultiplier', DWORD), - ('WriteTotalTimeoutConstant', DWORD), -] -__all__ = ['GetLastError', 'MS_CTS_ON', 'FILE_ATTRIBUTE_NORMAL', - 'DTR_CONTROL_ENABLE', '_COMSTAT', 'MS_RLSD_ON', - 'GetOverlappedResult', 'SETXON', 'PURGE_TXABORT', - 'PurgeComm', 'N11_OVERLAPPED4DOLLAR_48E', 'EV_RING', - 'ONESTOPBIT', 'SETXOFF', 'PURGE_RXABORT', 'GetCommState', - 'RTS_CONTROL_ENABLE', '_DCB', 'CreateEvent', - '_COMMTIMEOUTS', '_SECURITY_ATTRIBUTES', 'EV_DSR', - 'EV_PERR', 'EV_RXFLAG', 'OPEN_EXISTING', 'DCB', - 'FILE_FLAG_OVERLAPPED', 'EV_CTS', 'SetupComm', - 'LPOVERLAPPED', 'EV_TXEMPTY', 'ClearCommBreak', - 'LPSECURITY_ATTRIBUTES', 'SetCommBreak', 'SetCommTimeouts', - 'COMMTIMEOUTS', 'ODDPARITY', 'EV_RLSD', - 'GetCommModemStatus', 'EV_EVENT2', 'PURGE_TXCLEAR', - 'EV_BREAK', 'EVENPARITY', 'LPCVOID', 'COMSTAT', 'ReadFile', - 'PVOID', '_OVERLAPPED', 'WriteFile', 'GetCommTimeouts', - 'ResetEvent', 'EV_RXCHAR', 'LPCOMSTAT', 'ClearCommError', - 'ERROR_IO_PENDING', 'EscapeCommFunction', 'GENERIC_READ', - 'RTS_CONTROL_HANDSHAKE', 'OVERLAPPED', - 'DTR_CONTROL_HANDSHAKE', 'PURGE_RXCLEAR', 'GENERIC_WRITE', - 'LPDCB', 'CreateEventW', 'SetCommMask', 'EV_EVENT1', - 'SetCommState', 'LPVOID', 'CreateFileW', 'LPDWORD', - 'EV_RX80FULL', 'TWOSTOPBITS', 'LPCOMMTIMEOUTS', 'MAXDWORD', - 'MS_DSR_ON', 'MS_RING_ON', - 'N11_OVERLAPPED4DOLLAR_484DOLLAR_49E', 'EV_ERR', - 'ULONG_PTR', 'CreateFile', 'NOPARITY', 'CloseHandle'] |