summaryrefslogtreecommitdiff
path: root/pyserial/serial
diff options
context:
space:
mode:
Diffstat (limited to 'pyserial/serial')
-rw-r--r--pyserial/serial/.cvsignore1
-rw-r--r--pyserial/serial/__init__.py26
-rw-r--r--pyserial/serial/serialcli.py271
-rw-r--r--pyserial/serial/serialjava.py260
-rw-r--r--pyserial/serial/serialposix.py612
-rw-r--r--pyserial/serial/serialutil.py478
-rw-r--r--pyserial/serial/serialwin32.py383
-rw-r--r--pyserial/serial/sermsdos.py200
-rw-r--r--pyserial/serial/win32.py288
9 files changed, 0 insertions, 2519 deletions
diff --git a/pyserial/serial/.cvsignore b/pyserial/serial/.cvsignore
deleted file mode 100644
index 7e99e36..0000000
--- a/pyserial/serial/.cvsignore
+++ /dev/null
@@ -1 +0,0 @@
-*.pyc \ No newline at end of file
diff --git a/pyserial/serial/__init__.py b/pyserial/serial/__init__.py
deleted file mode 100644
index 6e160e0..0000000
--- a/pyserial/serial/__init__.py
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/usr/bin/env python
-
-# portable serial port access with python
-# this is a wrapper module for different platform implementations
-#
-# (C)2001-2002 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-
-VERSION = '2.4'
-
-import sys
-
-if sys.platform == 'cli':
- from serialcli import *
-else:
- import os
- # chose an implementation, depending on os
- if os.name == 'nt': #sys.platform == 'win32':
- from serialwin32 import *
- elif os.name == 'posix':
- from serialposix import *
- elif os.name == 'java':
- from serialjava import *
- else:
- raise Exception("Sorry: no implementation for your platform ('%s') available" % os.name)
-
diff --git a/pyserial/serial/serialcli.py b/pyserial/serial/serialcli.py
deleted file mode 100644
index d8eaf1e..0000000
--- a/pyserial/serial/serialcli.py
+++ /dev/null
@@ -1,271 +0,0 @@
-#! python
-# Python Serial Port Extension for Win32, Linux, BSD, Jython and .NET/Mono
-# serial driver for .NET/Mono (IronPython), .NET >= 2
-# see __init__.py
-#
-# (C) 2008 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-
-import clr
-import System
-import System.IO.Ports
-from serialutil import *
-
-
-def device(portnum):
- """Turn a port number into a device name"""
- return System.IO.Ports.SerialPort.GetPortNames()[portnum]
-
-
-# must invoke function with byte array, make a helper to convert strings
-# to byte arrays
-sab = System.Array[System.Byte]
-def as_byte_array(string):
- return sab([ord(x) for x in string]) # XXX will require adaption when run with a 3.x compatible IronPython
-
-class IronSerial(SerialBase):
- """Serial port implemenation for .NET/Mono."""
-
- BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
- 9600, 19200, 38400, 57600, 115200)
-
- def open(self):
- """Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- try:
- self._port_handle = System.IO.Ports.SerialPort(self.portstr)
- except Exception, msg:
- self._port_handle = None
- raise SerialException("could not open port %s: %s" % (self.portstr, msg))
-
- self._reconfigurePort()
- self._port_handle.Open()
- self._isOpen = True
- if not self._rtscts:
- self.setRTS(True)
- self.setDTR(True)
- self.flushInput()
- self.flushOutput()
-
- def _reconfigurePort(self):
- """Set communication parameters on opened port."""
- if not self._port_handle:
- raise SerialException("Can only operate on a valid port handle")
-
- #~ self._port_handle.ReceivedBytesThreshold = 1
-
- if self._timeout is None:
- self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
- else:
- self._port_handle.ReadTimeout = int(self._timeout*1000)
-
- # if self._timeout != 0 and self._interCharTimeout is not None:
- # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
-
- if self._writeTimeout is None:
- self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
- else:
- self._port_handle.WriteTimeout = int(self._writeTimeout*1000)
-
-
- # Setup the connection info.
- try:
- self._port_handle.BaudRate = self._baudrate
- except IOError, e:
- # catch errors from illegal baudrate settings
- raise ValueError(str(e))
-
- if self._bytesize == FIVEBITS:
- self._port_handle.DataBits = 5
- elif self._bytesize == SIXBITS:
- self._port_handle.DataBits = 6
- elif self._bytesize == SEVENBITS:
- self._port_handle.DataBits = 7
- elif self._bytesize == EIGHTBITS:
- self._port_handle.DataBits = 8
- else:
- raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
-
- if self._parity == PARITY_NONE:
- self._port_handle.Parity = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k
- elif self._parity == PARITY_EVEN:
- self._port_handle.Parity = System.IO.Ports.Parity.Even
- elif self._parity == PARITY_ODD:
- self._port_handle.Parity = System.IO.Ports.Parity.Odd
- elif self._parity == PARITY_MARK:
- self._port_handle.Parity = System.IO.Ports.Parity.Mark
- elif self._parity == PARITY_SPACE:
- self._port_handle.Parity = System.IO.Ports.Parity.Space
- else:
- raise ValueError("Unsupported parity mode: %r" % self._parity)
-
- if self._stopbits == STOPBITS_ONE:
- self._port_handle.StopBits = System.IO.Ports.StopBits.One
- elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
- self._port_handle.StopBits = System.IO.Ports.StopBits.OnePointFive
- elif self._stopbits == STOPBITS_TWO:
- self._port_handle.StopBits = System.IO.Ports.StopBits.Two
- else:
- raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
-
- if self._rtscts and self._xonxoff:
- self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSendXOnXOff
- elif self._rtscts:
- self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSend
- elif self._xonxoff:
- self._port_handle.Handshake = System.IO.Ports.Handshake.XOnXOff
- else:
- self._port_handle.Handshake = getattr(System.IO.Ports.Handshake, 'None') # reserved keyword in Py3k
-
- #~ def __del__(self):
- #~ self.close()
-
- def close(self):
- """Close port"""
- if self._isOpen:
- if self._port_handle:
- try:
- self._port_handle.Close()
- except System.IO.Ports.InvalidOperationException:
- # ignore errors. can happen for unplugged USB serial devices
- pass
- self._port_handle = None
- self._isOpen = False
-
- def makeDeviceName(self, port):
- try:
- return device(port)
- except TypeError, e:
- raise SerialException(str(e))
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def inWaiting(self):
- """Return the number of characters currently in the input buffer."""
- if not self._port_handle: raise portNotOpenError
- return self._port_handle.BytesToRead
-
- def read(self, size=1):
- """Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read."""
- if not self._port_handle: raise portNotOpenError
- # must use single byte reads as this is the only way to read
- # without applying encodings
- data = bytearray()
- while size:
- try:
- data.append(self._port_handle.ReadByte())
- except System.TimeoutException, e:
- break
- else:
- size -= 1
- return bytes(data)
-
- def write(self, data):
- """Output the given string over the serial port."""
- if not self._port_handle: raise portNotOpenError
- if not isinstance(data, (bytes, bytearray)):
- raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
- try:
- # must call overloaded method with byte array argument
- # as this is the only one not applying encodings
- self._port_handle.Write(as_byte_array(data), 0, len(data))
- except System.TimeoutException, e:
- raise writeTimeoutError
- return len(data)
-
- def flushInput(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if not self._port_handle: raise portNotOpenError
- self._port_handle.DiscardInBuffer()
-
- def flushOutput(self):
- """Clear output buffer, aborting the current output and
- discarding all that is in the buffer."""
- if not self._port_handle: raise portNotOpenError
- self._port_handle.DiscardOutBuffer()
-
- def sendBreak(self, duration=0.25):
- """Send break condition. Timed, returns to idle state after given duration."""
- if not self._port_handle: raise portNotOpenError
- import time
- self._port_handle.BreakState = True
- time.sleep(duration)
- self._port_handle.BreakState = False
-
- def setBreak(self, level=True):
- """Set break: Controls TXD. When active, to transmitting is possible."""
- if not self._port_handle: raise portNotOpenError
- self._port_handle.BreakState = bool(level)
-
- def setRTS(self, level=True):
- """Set terminal status line: Request To Send"""
- if not self._port_handle: raise portNotOpenError
- self._port_handle.RtsEnable = bool(level)
-
- def setDTR(self, level=True):
- """Set terminal status line: Data Terminal Ready"""
- if not self._port_handle: raise portNotOpenError
- self._port_handle.DtrEnable = bool(level)
-
- def getCTS(self):
- """Read terminal status line: Clear To Send"""
- if not self._port_handle: raise portNotOpenError
- return self._port_handle.CtsHolding
-
- def getDSR(self):
- """Read terminal status line: Data Set Ready"""
- if not self._port_handle: raise portNotOpenError
- return self._port_handle.DsrHolding
-
- def getRI(self):
- """Read terminal status line: Ring Indicator"""
- if not self._port_handle: raise portNotOpenError
- #~ return self._port_handle.XXX
- return False #XXX an error would be better
-
- def getCD(self):
- """Read terminal status line: Carrier Detect"""
- if not self._port_handle: raise portNotOpenError
- return self._port_handle.CDHolding
-
- # - - platform specific - - - -
- # none
-
-
-# assemble Serial class with the platform specific implementation and the base
-# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
-# library, derive from io.RawIOBase
-try:
- import io
-except ImportError:
- # classic version with our own file-like emulation
- class Serial(IronSerial, FileLike):
- pass
-else:
- # io library present
- class Serial(IronSerial, io.RawIOBase):
- pass
-
-
-# Nur Testfunktion!!
-if __name__ == '__main__':
- import sys
-
- s = Serial(0)
- sys.stdio.write('%s\n' % s)
-
- s = Serial()
- sys.stdio.write('%s\n' % s)
-
-
- s.baudrate = 19200
- s.databits = 7
- s.close()
- s.port = 0
- s.open()
- sys.stdio.write('%s\n' % s)
-
diff --git a/pyserial/serial/serialjava.py b/pyserial/serial/serialjava.py
deleted file mode 100644
index 2541534..0000000
--- a/pyserial/serial/serialjava.py
+++ /dev/null
@@ -1,260 +0,0 @@
-#!jython
-#
-# Python Serial Port Extension for Win32, Linux, BSD, Jython
-# module for serial IO for Jython and JavaComm
-# see __init__.py
-#
-# (C) 2002-2008 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-
-from serialutil import *
-
-def my_import(name):
- mod = __import__(name)
- components = name.split('.')
- for comp in components[1:]:
- mod = getattr(mod, comp)
- return mod
-
-
-def detect_java_comm(names):
- """try given list of modules and return that imports"""
- for name in names:
- try:
- mod = my_import(name)
- mod.SerialPort
- return mod
- except (ImportError, AttributeError):
- pass
- raise ImportError("No Java Communications API implementation found")
-
-
-# Java Communications API implementations
-# http://mho.republika.pl/java/comm/
-
-comm = detect_java_comm([
- 'javax.comm', # Sun/IBM
- 'gnu.io', # RXTX
-])
-
-
-def device(portnumber):
- """Turn a port number into a device name"""
- enum = comm.CommPortIdentifier.getPortIdentifiers()
- ports = []
- while enum.hasMoreElements():
- el = enum.nextElement()
- if el.getPortType() == comm.CommPortIdentifier.PORT_SERIAL:
- ports.append(el)
- return ports[portnumber].getName()
-
-
-class JavaSerial(SerialBase):
- """Serial port class, implemented with Java Communications API and
- thus usable with jython and the appropriate java extension."""
-
- def open(self):
- """Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- if type(self._port) == type(''): # strings are taken directly
- portId = comm.CommPortIdentifier.getPortIdentifier(self._port)
- else:
- portId = comm.CommPortIdentifier.getPortIdentifier(device(self._port)) # numbers are transformed to a comport id obj
- try:
- self.sPort = portId.open("python serial module", 10)
- except Exception, msg:
- self.sPort = None
- raise SerialException("Could not open port: %s" % msg)
- self._reconfigurePort()
- self._instream = self.sPort.getInputStream()
- self._outstream = self.sPort.getOutputStream()
- self._isOpen = True
-
- def _reconfigurePort(self):
- """Set communication parameters on opened port."""
- if not self.sPort:
- raise SerialException("Can only operate on a valid port handle")
-
- self.sPort.enableReceiveTimeout(30)
- if self._bytesize == FIVEBITS:
- jdatabits = comm.SerialPort.DATABITS_5
- elif self._bytesize == SIXBITS:
- jdatabits = comm.SerialPort.DATABITS_6
- elif self._bytesize == SEVENBITS:
- jdatabits = comm.SerialPort.DATABITS_7
- elif self._bytesize == EIGHTBITS:
- jdatabits = comm.SerialPort.DATABITS_8
- else:
- raise ValueError("unsupported bytesize: %r" % self._bytesize)
-
- if self._stopbits == STOPBITS_ONE:
- jstopbits = comm.SerialPort.STOPBITS_1
- elif stopbits == STOPBITS_ONE_POINT_FIVE:
- self._jstopbits = comm.SerialPort.STOPBITS_1_5
- elif self._stopbits == STOPBITS_TWO:
- jstopbits = comm.SerialPort.STOPBITS_2
- else:
- raise ValueError("unsupported number of stopbits: %r" % self._stopbits)
-
- if self._parity == PARITY_NONE:
- jparity = comm.SerialPort.PARITY_NONE
- elif self._parity == PARITY_EVEN:
- jparity = comm.SerialPort.PARITY_EVEN
- elif self._parity == PARITY_ODD:
- jparity = comm.SerialPort.PARITY_ODD
- elif self._parity == PARITY_MARK:
- jparity = comm.SerialPort.PARITY_MARK
- elif self._parity == PARITY_SPACE:
- jparity = comm.SerialPort.PARITY_SPACE
- else:
- raise ValueError("unsupported parity type: %r" % self._parity)
-
- jflowin = jflowout = 0
- if self._rtscts:
- jflowin |= comm.SerialPort.FLOWCONTROL_RTSCTS_IN
- jflowout |= comm.SerialPort.FLOWCONTROL_RTSCTS_OUT
- if self._xonxoff:
- jflowin |= comm.SerialPort.FLOWCONTROL_XONXOFF_IN
- jflowout |= comm.SerialPort.FLOWCONTROL_XONXOFF_OUT
-
- self.sPort.setSerialPortParams(self._baudrate, jdatabits, jstopbits, jparity)
- self.sPort.setFlowControlMode(jflowin | jflowout)
-
- if self._timeout >= 0:
- self.sPort.enableReceiveTimeout(self._timeout*1000)
- else:
- self.sPort.disableReceiveTimeout()
-
- def close(self):
- """Close port"""
- if self._isOpen:
- if self.sPort:
- self._instream.close()
- self._outstream.close()
- self.sPort.close()
- self.sPort = None
- self._isOpen = False
-
- def makeDeviceName(self, port):
- return device(port)
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def inWaiting(self):
- """Return the number of characters currently in the input buffer."""
- if not self.sPort: raise portNotOpenError
- return self._instream.available()
-
- def read(self, size=1):
- """Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read."""
- if not self.sPort: raise portNotOpenError
- read = bytearray()
- if size > 0:
- while len(read) < size:
- x = self._instream.read()
- if x == -1:
- if self.timeout >= 0:
- break
- else:
- read.append(x)
- return bytes(read)
-
- def write(self, data):
- """Output the given string over the serial port."""
- if not self.sPort: raise portNotOpenError
- if not isinstance(data, (bytes, bytearray)):
- raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
- self._outstream.write(data)
- return len(data)
-
- def flushInput(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if not self.sPort: raise portNotOpenError
- self._instream.skip(self._instream.available())
-
- def flushOutput(self):
- """Clear output buffer, aborting the current output and
- discarding all that is in the buffer."""
- if not self.sPort: raise portNotOpenError
- self._outstream.flush()
-
- def sendBreak(self, duration=0.25):
- """Send break condition. Timed, returns to idle state after given duration."""
- if not self.sPort: raise portNotOpenError
- self.sPort.sendBreak(duration*1000.0)
-
- def setBreak(self, level=1):
- """Set break: Controls TXD. When active, to transmitting is possible."""
- if self.fd is None: raise portNotOpenError
- raise SerialException("The setBreak function is not implemented in java.")
-
- def setRTS(self, level=1):
- """Set terminal status line: Request To Send"""
- if not self.sPort: raise portNotOpenError
- self.sPort.setRTS(level)
-
- def setDTR(self, level=1):
- """Set terminal status line: Data Terminal Ready"""
- if not self.sPort: raise portNotOpenError
- self.sPort.setDTR(level)
-
- def getCTS(self):
- """Read terminal status line: Clear To Send"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isCTS()
-
- def getDSR(self):
- """Read terminal status line: Data Set Ready"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isDSR()
-
- def getRI(self):
- """Read terminal status line: Ring Indicator"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isRI()
-
- def getCD(self):
- """Read terminal status line: Carrier Detect"""
- if not self.sPort: raise portNotOpenError
- self.sPort.isCD()
-
-
-# assemble Serial class with the platform specific implementation and the base
-# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
-# library, derive from io.RawIOBase
-try:
- import io
-except ImportError:
- # classic version with our own file-like emulation
- class Serial(JavaSerial, FileLike):
- pass
-else:
- # io library present
- class Serial(JavaSerial, io.RawIOBase):
- pass
-
-
-if __name__ == '__main__':
- s = Serial(0,
- baudrate=19200, # baudrate
- bytesize=EIGHTBITS, # number of databits
- parity=PARITY_EVEN, # enable parity checking
- stopbits=STOPBITS_ONE, # number of stopbits
- timeout=3, # set a timeout value, None for waiting forever
- xonxoff=0, # enable software flow control
- rtscts=0, # enable RTS/CTS flow control
- )
- s.setRTS(1)
- s.setDTR(1)
- s.flushInput()
- s.flushOutput()
- s.write('hello')
- sys.stdio.write('%r\n' % s.read(5))
- sys.stdio.write('%s\n' % s.inWaiting())
- del s
-
-
diff --git a/pyserial/serial/serialposix.py b/pyserial/serial/serialposix.py
deleted file mode 100644
index bbf7fb9..0000000
--- a/pyserial/serial/serialposix.py
+++ /dev/null
@@ -1,612 +0,0 @@
-#!/usr/bin/env python
-#
-# Python Serial Port Extension for Win32, Linux, BSD, Jython
-# module for serial IO for POSIX compatible systems, like Linux
-# see __init__.py
-#
-# (C) 2001-2009 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-#
-# parts based on code from Grant B. Edwards <grante@visi.com>:
-# ftp://ftp.visi.com/users/grante/python/PosixSerial.py
-#
-# references: http://www.easysw.com/~mike/serial/serial.html
-
-import sys, os, fcntl, termios, struct, select, errno
-from serialutil import *
-
-# Do check the Python version as some constants have moved.
-if (sys.hexversion < 0x020100f0):
- import TERMIOS
-else:
- TERMIOS = termios
-
-if (sys.hexversion < 0x020200f0):
- import FCNTL
-else:
- FCNTL = fcntl
-
-baudrate_constants = {
- 0: 0000000, # hang up
- 50: 0000001,
- 75: 0000002,
- 110: 0000003,
- 134: 0000004,
- 150: 0000005,
- 200: 0000006,
- 300: 0000007,
- 600: 0000010,
- 1200: 0000011,
- 1800: 0000012,
- 2400: 0000013,
- 4800: 0000014,
- 9600: 0000015,
- 19200: 0000016,
- 38400: 0000017,
- 57600: 0010001,
- 115200: 0010002,
- 230400: 0010003,
- 460800: 0010004,
- 500000: 0010005,
- 576000: 0010006,
- 921600: 0010007,
- 1000000: 0010010,
- 1152000: 0010011,
- 1500000: 0010012,
- 2000000: 0010013,
- 2500000: 0010014,
- 3000000: 0010015,
- 3500000: 0010016,
- 4000000: 0010017
-}
-
-# try to detect the OS so that a device can be selected...
-# this code block should supply a device() and set_special_baudrate() function
-# for the platform
-plat = sys.platform.lower()
-
-if plat[:5] == 'linux': # Linux (confirmed)
-
- def device(port):
- return '/dev/ttyS%d' % port
-
- ASYNC_SPD_MASK = 0x1030
- ASYNC_SPD_CUST = 0x0030
-
- def set_special_baudrate(port, baudrate):
- import array
- buf = array.array('i', [0] * 32)
-
- # get serial_struct
- FCNTL.ioctl(port.fd, TERMIOS.TIOCGSERIAL, buf)
-
- # set custom divisor
- buf[6] = buf[7] / baudrate
-
- # update flags
- buf[4] &= ~ASYNC_SPD_MASK
- buf[4] |= ASYNC_SPD_CUST
-
- # set serial_struct
- try:
- res = FCNTL.ioctl(port.fd, TERMIOS.TIOCSSERIAL, buf)
- except IOError:
- raise ValueError('Failed to set custom baud rate: %r' % baudrate)
-
-elif plat == 'cygwin': # cygwin/win32 (confirmed)
-
- def device(port):
- return '/dev/com%d' % (port + 1)
-
- ASYNC_SPD_MASK = 0x1030
- ASYNC_SPD_CUST = 0x0030
-
- # XXX untested!
- def set_special_baudrate(port, baudrate):
- import array
- buf = array.array('i', [0] * 32)
-
- # get serial_struct
- FCNTL.ioctl(port.fd, TERMIOS.TIOCGSERIAL, buf)
-
- # set custom divisor
- buf[6] = buf[7] / baudrate
-
- # update flags
- buf[4] &= ~ASYNC_SPD_MASK
- buf[4] |= ASYNC_SPD_CUST
-
- # set serial_struct
- try:
- res = FCNTL.ioctl(port.fd, TERMIOS.TIOCSSERIAL, buf)
- except IOError:
- raise ValueError('Failed to set custom baud rate: %r' % baudrate)
-
-elif plat == 'openbsd3': # BSD (confirmed)
-
- def device(port):
- return '/dev/ttyp%d' % port
-
- def set_special_baudrate(port, baudrate):
- raise ValueError("sorry don't know how to handle non standard baud rate on this platform")
-
-elif plat[:3] == 'bsd' or \
- plat[:7] == 'freebsd' or \
- plat[:7] == 'openbsd': # BSD (confirmed for freebsd4: cuaa%d)
-
- def device(port):
- return '/dev/cuad%d' % port
-
- def set_special_baudrate(port, baudrate):
- raise ValueError("sorry don't know how to handle non standard baud rate on this platform")
-
-elif plat[:6] == 'darwin': # OS X
-
- version = os.uname()[2].split('.')
- # Tiger or above can support arbitrary serial speeds
- if int(version[0]) >= 8:
- # remove all speeds not supported with TERMIOS so that pyserial never
- # attempts to use them directly
- for b in baudrate_constants.keys():
- if b > 230400:
- del baudrate_constants[b]
-
- def set_special_baudrate(port, baudrate):
- # use IOKit-specific call to set up high speeds
- import array, fcntl
- buf = array.array('i', [baudrate])
- IOSSIOSPEED = 0x80045402 #_IOW('T', 2, speed_t)
- fcntl.ioctl(port.fd, IOSSIOSPEED, buf, 1)
- else: # version < 8
- def set_special_baudrate(port, baudrate):
- raise ValueError("baud rate not supported")
-
- def device(port):
- return '/dev/cuad%d' % port
-
-
-elif plat[:6] == 'netbsd': # NetBSD 1.6 testing by Erk
-
- def device(port):
- return '/dev/dty%02d' % port
-
- def set_special_baudrate(port, baudrate):
- raise ValueError("sorry don't know how to handle non standard baud rate on this platform")
-
-elif plat[:4] == 'irix': # IRIX (partially tested)
-
- def device(port):
- return '/dev/ttyf%d' % (port+1) #XXX different device names depending on flow control
-
- def set_special_baudrate(port, baudrate):
- raise ValueError("sorry don't know how to handle non standard baud rate on this platform")
-
-elif plat[:2] == 'hp': # HP-UX (not tested)
-
- def device(port):
- return '/dev/tty%dp0' % (port+1)
-
- def set_special_baudrate(port, baudrate):
- raise ValueError("sorry don't know how to handle non standard baud rate on this platform")
-
-elif plat[:5] == 'sunos': # Solaris/SunOS (confirmed)
-
- def device(port):
- return '/dev/tty%c' % (ord('a')+port)
-
- def set_special_baudrate(port, baudrate):
- raise ValueError("sorry don't know how to handle non standard baud rate on this platform")
-
-elif plat[:3] == 'aix': # AIX
-
- def device(port):
- return '/dev/tty%d' % (port)
-
- def set_special_baudrate(port, baudrate):
- raise ValueError("sorry don't know how to handle non standard baud rate on this platform")
-
-else:
- #platform detection has failed...
- sys.stderr.write("""\
-don't know how to number ttys on this system.
-! Use an explicit path (eg /dev/ttyS1) or send this information to
-! the author of this module:
-
-sys.platform = %r
-os.name = %r
-serialposix.py version = %s
-
-also add the device name of the serial port and where the
-counting starts for the first serial port.
-e.g. 'first serial port: /dev/ttyS0'
-and with a bit luck you can get this module running...
-""" % (sys.platform, os.name, VERSION))
- # no exception, just continue with a brave attempt to build a device name
- # even if the device name is not correct for the platform it has chances
- # to work using a string with the real device name as port parameter.
- def device(portum):
- return '/dev/ttyS%d' % portnum
- def set_special_baudrate(port, baudrate):
- raise SerialException("sorry don't know how to handle non standard baud rate on this platform")
- #~ raise Exception, "this module does not run on this platform, sorry."
-
-# whats up with "aix", "beos", ....
-# they should work, just need to know the device names.
-
-
-# load some constants for later use.
-# try to use values from TERMIOS, use defaults from linux otherwise
-TIOCMGET = hasattr(TERMIOS, 'TIOCMGET') and TERMIOS.TIOCMGET or 0x5415
-TIOCMBIS = hasattr(TERMIOS, 'TIOCMBIS') and TERMIOS.TIOCMBIS or 0x5416
-TIOCMBIC = hasattr(TERMIOS, 'TIOCMBIC') and TERMIOS.TIOCMBIC or 0x5417
-TIOCMSET = hasattr(TERMIOS, 'TIOCMSET') and TERMIOS.TIOCMSET or 0x5418
-
-#TIOCM_LE = hasattr(TERMIOS, 'TIOCM_LE') and TERMIOS.TIOCM_LE or 0x001
-TIOCM_DTR = hasattr(TERMIOS, 'TIOCM_DTR') and TERMIOS.TIOCM_DTR or 0x002
-TIOCM_RTS = hasattr(TERMIOS, 'TIOCM_RTS') and TERMIOS.TIOCM_RTS or 0x004
-#TIOCM_ST = hasattr(TERMIOS, 'TIOCM_ST') and TERMIOS.TIOCM_ST or 0x008
-#TIOCM_SR = hasattr(TERMIOS, 'TIOCM_SR') and TERMIOS.TIOCM_SR or 0x010
-
-TIOCM_CTS = hasattr(TERMIOS, 'TIOCM_CTS') and TERMIOS.TIOCM_CTS or 0x020
-TIOCM_CAR = hasattr(TERMIOS, 'TIOCM_CAR') and TERMIOS.TIOCM_CAR or 0x040
-TIOCM_RNG = hasattr(TERMIOS, 'TIOCM_RNG') and TERMIOS.TIOCM_RNG or 0x080
-TIOCM_DSR = hasattr(TERMIOS, 'TIOCM_DSR') and TERMIOS.TIOCM_DSR or 0x100
-TIOCM_CD = hasattr(TERMIOS, 'TIOCM_CD') and TERMIOS.TIOCM_CD or TIOCM_CAR
-TIOCM_RI = hasattr(TERMIOS, 'TIOCM_RI') and TERMIOS.TIOCM_RI or TIOCM_RNG
-#TIOCM_OUT1 = hasattr(TERMIOS, 'TIOCM_OUT1') and TERMIOS.TIOCM_OUT1 or 0x2000
-#TIOCM_OUT2 = hasattr(TERMIOS, 'TIOCM_OUT2') and TERMIOS.TIOCM_OUT2 or 0x4000
-TIOCINQ = hasattr(TERMIOS, 'FIONREAD') and TERMIOS.FIONREAD or 0x541B
-
-TIOCM_zero_str = struct.pack('I', 0)
-TIOCM_RTS_str = struct.pack('I', TIOCM_RTS)
-TIOCM_DTR_str = struct.pack('I', TIOCM_DTR)
-
-TIOCSBRK = hasattr(TERMIOS, 'TIOCSBRK') and TERMIOS.TIOCSBRK or 0x5427
-TIOCCBRK = hasattr(TERMIOS, 'TIOCCBRK') and TERMIOS.TIOCCBRK or 0x5428
-
-
-class PosixSerial(SerialBase):
- """Serial port class POSIX implementation. Serial port configuration is
- done with termios and fcntl. Runs on Linux and many other Un*x like
- systems."""
-
- def open(self):
- """Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- self.fd = None
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- # open
- try:
- self.fd = os.open(self.portstr, os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK)
- except Exception, msg:
- self.fd = None
- raise SerialException("could not open port %s: %s" % (self._port, msg))
- #~ fcntl.fcntl(self.fd, FCNTL.F_SETFL, 0) # set blocking
-
- try:
- self._reconfigurePort()
- except:
- try:
- os.close(self.fd)
- except:
- # ignore any exception when closing the port
- # also to keep original exception that happened when setting up
- pass
- self.fd = None
- raise
- else:
- self._isOpen = True
- #~ self.flushInput()
-
-
- def _reconfigurePort(self):
- """Set communication parameters on opened port."""
- if self.fd is None:
- raise SerialException("Can only operate on a valid port handle")
- custom_baud = None
-
- vmin = vtime = 0 # timeout is done via select
- if self._interCharTimeout is not None:
- vmin = 1
- vtime = int(self._interCharTimeout * 10)
- try:
- iflag, oflag, cflag, lflag, ispeed, ospeed, cc = termios.tcgetattr(self.fd)
- except termios.error, msg: # if a port is nonexistent but has a /dev file, it'll fail here
- raise SerialException("Could not configure port: %s" % msg)
- # set up raw mode / no echo / binary
- cflag |= (TERMIOS.CLOCAL|TERMIOS.CREAD)
- lflag &= ~(TERMIOS.ICANON|TERMIOS.ECHO|TERMIOS.ECHOE|TERMIOS.ECHOK|TERMIOS.ECHONL|
- TERMIOS.ISIG|TERMIOS.IEXTEN) #|TERMIOS.ECHOPRT
- for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk
- if hasattr(TERMIOS, flag):
- lflag &= ~getattr(TERMIOS, flag)
-
- oflag &= ~(TERMIOS.OPOST)
- iflag &= ~(TERMIOS.INLCR|TERMIOS.IGNCR|TERMIOS.ICRNL|TERMIOS.IGNBRK)
- if hasattr(TERMIOS, 'IUCLC'):
- iflag &= ~TERMIOS.IUCLC
- if hasattr(TERMIOS, 'PARMRK'):
- iflag &= ~TERMIOS.PARMRK
-
- # setup baud rate
- try:
- ispeed = ospeed = getattr(TERMIOS, 'B%s' % (self._baudrate))
- except AttributeError:
- try:
- ispeed = ospeed = baudrate_constants[self._baudrate]
- except KeyError:
- #~ raise ValueError('Invalid baud rate: %r' % self._baudrate)
- # may need custom baud rate, it isn't in our list.
- ispeed = ospeed = getattr(TERMIOS, 'B38400')
- try:
- custom_baud = int(self._baudrate) # store for later
- except ValueError:
- raise ValueError('Invalid baud rate: %r' % self._baudrate)
- else:
- if custom_baud < 0:
- raise ValueError('Invalid baud rate: %r' % self._baudrate)
-
- # setup char len
- cflag &= ~TERMIOS.CSIZE
- if self._bytesize == 8:
- cflag |= TERMIOS.CS8
- elif self._bytesize == 7:
- cflag |= TERMIOS.CS7
- elif self._bytesize == 6:
- cflag |= TERMIOS.CS6
- elif self._bytesize == 5:
- cflag |= TERMIOS.CS5
- else:
- raise ValueError('Invalid char len: %r' % self._bytesize)
- # setup stopbits
- if self._stopbits == STOPBITS_ONE:
- cflag &= ~(TERMIOS.CSTOPB)
- elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
- cflag |= (TERMIOS.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5
- elif self._stopbits == STOPBITS_TWO:
- cflag |= (TERMIOS.CSTOPB)
- else:
- raise ValueError('Invalid stop bit specification: %r' % self._stopbits)
- # setup parity
- iflag &= ~(TERMIOS.INPCK|TERMIOS.ISTRIP)
- if self._parity == PARITY_NONE:
- cflag &= ~(TERMIOS.PARENB|TERMIOS.PARODD)
- elif self._parity == PARITY_EVEN:
- cflag &= ~(TERMIOS.PARODD)
- cflag |= (TERMIOS.PARENB)
- elif self._parity == PARITY_ODD:
- cflag |= (TERMIOS.PARENB|TERMIOS.PARODD)
- else:
- raise ValueError('Invalid parity: %r' % self._parity)
- # setup flow control
- # xonxoff
- if hasattr(TERMIOS, 'IXANY'):
- if self._xonxoff:
- iflag |= (TERMIOS.IXON|TERMIOS.IXOFF) #|TERMIOS.IXANY)
- else:
- iflag &= ~(TERMIOS.IXON|TERMIOS.IXOFF|TERMIOS.IXANY)
- else:
- if self._xonxoff:
- iflag |= (TERMIOS.IXON|TERMIOS.IXOFF)
- else:
- iflag &= ~(TERMIOS.IXON|TERMIOS.IXOFF)
- # rtscts
- if hasattr(TERMIOS, 'CRTSCTS'):
- if self._rtscts:
- cflag |= (TERMIOS.CRTSCTS)
- else:
- cflag &= ~(TERMIOS.CRTSCTS)
- elif hasattr(TERMIOS, 'CNEW_RTSCTS'): # try it with alternate constant name
- if self._rtscts:
- cflag |= (TERMIOS.CNEW_RTSCTS)
- else:
- cflag &= ~(TERMIOS.CNEW_RTSCTS)
- # XXX should there be a warning if setting up rtscts (and xonxoff etc) fails??
-
- # buffer
- # vmin "minimal number of characters to be read. = for non blocking"
- if vmin < 0 or vmin > 255:
- raise ValueError('Invalid vmin: %r ' % vmin)
- cc[TERMIOS.VMIN] = vmin
- # vtime
- if vtime < 0 or vtime > 255:
- raise ValueError('Invalid vtime: %r' % vtime)
- cc[TERMIOS.VTIME] = vtime
- # activate settings
- termios.tcsetattr(self.fd, TERMIOS.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
-
- # apply custom baud rate, if any
- if custom_baud is not None:
- set_special_baudrate(self, custom_baud)
-
- def close(self):
- """Close port"""
- if self._isOpen:
- if self.fd is not None:
- os.close(self.fd)
- self.fd = None
- self._isOpen = False
-
- def makeDeviceName(self, port):
- return device(port)
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def inWaiting(self):
- """Return the number of characters currently in the input buffer."""
- #~ s = fcntl.ioctl(self.fd, TERMIOS.FIONREAD, TIOCM_zero_str)
- s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
- return struct.unpack('I',s)[0]
-
- def read(self, size=1):
- """Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read."""
- if self.fd is None: raise portNotOpenError
- read = bytearray()
- inp = None
- if size > 0:
- while len(read) < size:
- # print "\tread(): size",size, "have", len(read) #debug
- ready,_,_ = select.select([self.fd], [], [], self._timeout)
- if not ready:
- break # timeout
- buf = os.read(self.fd, size - len(read))
- read.extend(buf)
- if ((self._timeout is not None and self._timeout >= 0) or
- (self._interCharTimeout is not None and self._interCharTimeout > 0)) and not buf:
- break # early abort on timeout
- return bytes(read)
-
- def write(self, data):
- """Output the given string over the serial port."""
- if self.fd is None: raise portNotOpenError
- if not isinstance(data, (bytes, bytearray)):
- raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
- t = len(data)
- d = data
- while t > 0:
- try:
- if self._writeTimeout is not None and self._writeTimeout > 0:
- _, ready, _ = select.select([], [self.fd], [], self._writeTimeout)
- if not ready:
- raise writeTimeoutError
- n = os.write(self.fd, d)
- if self._writeTimeout is not None and self._writeTimeout > 0:
- _, ready, _ = select.select([], [self.fd], [], self._writeTimeout)
- if not ready:
- raise writeTimeoutError
- d = d[n:]
- t = t - n
- except OSError, v:
- if v.errno != errno.EAGAIN:
- raise
- return len(data)
-
- def flush(self):
- """Flush of file like objects. In this case, wait until all data
- is written."""
- self.drainOutput()
-
- def flushInput(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if self.fd is None:
- raise portNotOpenError
- termios.tcflush(self.fd, TERMIOS.TCIFLUSH)
-
- def flushOutput(self):
- """Clear output buffer, aborting the current output and
- discarding all that is in the buffer."""
- if self.fd is None:
- raise portNotOpenError
- termios.tcflush(self.fd, TERMIOS.TCOFLUSH)
-
- def sendBreak(self, duration=0.25):
- """Send break condition. Timed, returns to idle state after given duration."""
- if self.fd is None:
- raise portNotOpenError
- termios.tcsendbreak(self.fd, int(duration/0.25))
-
- def setBreak(self, level=1):
- """Set break: Controls TXD. When active, no transmitting is possible."""
- if self.fd is None: raise portNotOpenError
- if level:
- fcntl.ioctl(self.fd, TIOCSBRK)
- else:
- fcntl.ioctl(self.fd, TIOCCBRK)
-
- def setRTS(self, level=1):
- """Set terminal status line: Request To Send"""
- if self.fd is None: raise portNotOpenError
- if level:
- fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_RTS_str)
- else:
- fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_RTS_str)
-
- def setDTR(self, level=1):
- """Set terminal status line: Data Terminal Ready"""
- if self.fd is None: raise portNotOpenError
- if level:
- fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_DTR_str)
- else:
- fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_DTR_str)
-
- def getCTS(self):
- """Read terminal status line: Clear To Send"""
- if self.fd is None: raise portNotOpenError
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I',s)[0] & TIOCM_CTS != 0
-
- def getDSR(self):
- """Read terminal status line: Data Set Ready"""
- if self.fd is None: raise portNotOpenError
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I',s)[0] & TIOCM_DSR != 0
-
- def getRI(self):
- """Read terminal status line: Ring Indicator"""
- if self.fd is None: raise portNotOpenError
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I',s)[0] & TIOCM_RI != 0
-
- def getCD(self):
- """Read terminal status line: Carrier Detect"""
- if self.fd is None: raise portNotOpenError
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I',s)[0] & TIOCM_CD != 0
-
- # - - platform specific - - - -
-
- def drainOutput(self):
- """internal - not portable!"""
- if self.fd is None: raise portNotOpenError
- termios.tcdrain(self.fd)
-
- def nonblocking(self):
- """internal - not portable!"""
- if self.fd is None:
- raise portNotOpenError
- fcntl.fcntl(self.fd, FCNTL.F_SETFL, FCNTL.O_NONBLOCK)
-
- def fileno(self):
- """For easier use of the serial port instance with select.
- WARNING: this function is not portable to different platforms!"""
- if self.fd is None: raise portNotOpenError
- return self.fd
-
-
-# assemble Serial class with the platform specifc implementation and the base
-# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
-# library, derrive from io.RawIOBase
-try:
- import io
-except ImportError:
- # classic version with our own file-like emulation
- class Serial(PosixSerial, FileLike):
- pass
-else:
- # io library present
- class Serial(PosixSerial, io.RawIOBase):
- pass
-
-
-if __name__ == '__main__':
- s = Serial(0,
- baudrate=19200, # baud rate
- bytesize=EIGHTBITS, # number of data bits
- parity=PARITY_EVEN, # enable parity checking
- stopbits=STOPBITS_ONE, # number of stop bits
- timeout=3, # set a timeout value, None for waiting forever
- xonxoff=0, # enable software flow control
- rtscts=0, # enable RTS/CTS flow control
- )
- s.setRTS(1)
- s.setDTR(1)
- s.flushInput()
- s.flushOutput()
- s.write('hello')
- sys.stdio.write('%r\n' % s.read(5))
- sys.stdio.write('%s\n' % s.inWaiting())
- del s
-
diff --git a/pyserial/serial/serialutil.py b/pyserial/serial/serialutil.py
deleted file mode 100644
index e568aa1..0000000
--- a/pyserial/serial/serialutil.py
+++ /dev/null
@@ -1,478 +0,0 @@
-#! python
-# Python Serial Port Extension for Win32, Linux, BSD, Jython
-# see __init__.py
-#
-# (C) 2001-2009 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-
-# compatibility folder Python < 2.6
-try:
- bytes
- bytearray
-except (NameError, AttributeError):
- # Python older than 2.6 do not have these types. Like for Python 2.6 they
- # should behave like str. For Python older than 3.0 we want to work with
- # strings anyway, only later versions have a true bytes type.
- bytes = str
- # bytearray is a mutable type that is easily turned into an instance of
- # bytes
- class bytearray(list):
- # for bytes(bytearray()) usage
- def __str__(self): return ''.join(self)
- # append automatically converts integers to characters
- def append(self, item):
- if isinstance(item, str):
- list.append(self, item)
- else:
- list.append(self, chr(item))
-
-# create control bytes, depending on true type of bytes
-# all Python versions prior 3.x convert str([17]) to '[17]' instead of '\x11'
-if bytes is str:
- XON = chr(17)
- XOFF = chr(19)
-else:
- XON = bytes([17])
- XOFF = bytes([19])
-
-
-PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S'
-STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2)
-FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8)
-
-PARITY_NAMES = {
- PARITY_NONE: 'None',
- PARITY_EVEN: 'Even',
- PARITY_ODD: 'Odd',
- PARITY_MARK: 'Mark',
- PARITY_SPACE: 'Space',
-}
-
-
-class SerialException(IOError):
- """Base class for serial port related exceptions."""
-
-
-class SerialTimeoutException(SerialException):
- """Write timeouts give an exception"""
-
-
-writeTimeoutError = SerialTimeoutException("Write timeout")
-portNotOpenError = ValueError('Attempting to use a port that is not open')
-
-
-class FileLike(object):
- """An abstract file like class.
-
- This class implements readline and readlines based on read and
- writelines based on write.
- This class is used to provide the above functions for to Serial
- port objects.
-
- Note that when the serial port was opened with _NO_ timeout that
- readline blocks until it sees a newline (or the specified size is
- reached) and that readlines would never return and therefore
- refuses to work (it raises an exception in this case)!
- """
-
- def __init__(self):
- self.closed = True
-
- def close(self):
- self.closed = True
-
- # so that ports are closed when objects are discarded
- def __del__(self):
- """Destructor. Calls close()."""
- # The try/except block is in case this is called at program
- # exit time, when it's possible that globals have already been
- # deleted, and then the close() call might fail. Since
- # there's nothing we can do about such failures and they annoy
- # the end users, we suppress the traceback.
- try:
- self.close()
- except:
- pass
-
- def writelines(self, sequence):
- for line in sequence:
- self.write(line)
-
- def flush(self):
- """flush of file like objects"""
- pass
-
- # iterator for e.g. "for line in Serial(0): ..." usage
- def next(self):
- line = self.readline()
- if not line: raise StopIteration
- return line
-
- def __iter__(self):
- return self
-
- # other functions of file-likes - not used by pySerial
-
- #~ readinto(b)
-
- def seek(self, pos, whence=0):
- raise IOError("file is not seekable")
-
- def tell(self):
- raise IOError("file is not seekable")
-
- def truncate(self, n=None):
- raise IOError("file is not seekable")
-
- def isatty(self):
- return False
-
-
-class SerialBase(object):
- """Serial port base class. Provides __init__ function and properties to
- get/set port settings."""
-
- # default values, may be overridden in subclasses that do not support all values
- BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
- 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000,
- 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000,
- 3000000, 3500000, 4000000)
- BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS)
- PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE)
- STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO)
-
- def __init__(self,
- port = None, # number of device, numbering starts at
- # zero. if everything fails, the user
- # can specify a device string, note
- # that this isn't portable anymore
- # port will be opened if one is specified
- baudrate=9600, # baud rate
- bytesize=EIGHTBITS, # number of data bits
- parity=PARITY_NONE, # enable parity checking
- stopbits=STOPBITS_ONE, # number of stop bits
- timeout=None, # set a timeout value, None to wait forever
- xonxoff=0, # enable software flow control
- rtscts=0, # enable RTS/CTS flow control
- writeTimeout=None, # set a timeout for writes
- dsrdtr=None, # None: use rtscts setting, dsrdtr override if true or false
- interCharTimeout=None # Inter-character timeout, None to disable
- ):
- """Initialize comm port object. If a port is given, then the port will be
- opened immediately. Otherwise a Serial port object in closed state
- is returned."""
-
- self._isOpen = False
- self._port = None # correct value is assigned below through properties
- self._baudrate = None # correct value is assigned below through properties
- self._bytesize = None # correct value is assigned below through properties
- self._parity = None # correct value is assigned below through properties
- self._stopbits = None # correct value is assigned below through properties
- self._timeout = None # correct value is assigned below through properties
- self._writeTimeout = None # correct value is assigned below through properties
- self._xonxoff = None # correct value is assigned below through properties
- self._rtscts = None # correct value is assigned below through properties
- self._dsrdtr = None # correct value is assigned below through properties
- self._interCharTimeout = None # correct value is assigned below through properties
-
- # assign values using get/set methods using the properties feature
- self.port = port
- self.baudrate = baudrate
- self.bytesize = bytesize
- self.parity = parity
- self.stopbits = stopbits
- self.timeout = timeout
- self.writeTimeout = writeTimeout
- self.xonxoff = xonxoff
- self.rtscts = rtscts
- self.dsrdtr = dsrdtr
- self.interCharTimeout = interCharTimeout
-
- if port is not None:
- self.open()
-
- def isOpen(self):
- """Check if the port is opened."""
- return self._isOpen
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- # TODO: these are not really needed as the is the BAUDRATES etc. attribute...
- # maybe i remove them before the final release...
-
- def getSupportedBaudrates(self):
- return [(str(b), b) for b in self.BAUDRATES]
-
- def getSupportedByteSizes(self):
- return [(str(b), b) for b in self.BYTESIZES]
-
- def getSupportedStopbits(self):
- return [(str(b), b) for b in self.STOPBITS]
-
- def getSupportedParities(self):
- return [(PARITY_NAMES[b], b) for b in self.PARITIES]
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def setPort(self, port):
- """Change the port. The attribute portstr is set to a string that
- contains the name of the port."""
-
- was_open = self._isOpen
- if was_open: self.close()
- if port is not None:
- if isinstance(port, basestring):
- self.portstr = port
- else:
- self.portstr = self.makeDeviceName(port)
- else:
- self.portstr = None
- self._port = port
- self.name = self.portstr
- if was_open: self.open()
-
- def getPort(self):
- """Get the current port setting. The value that was passed on init or using
- setPort() is passed back. See also the attribute portstr which contains
- the name of the port as a string."""
- return self._port
-
- port = property(getPort, setPort, doc="Port setting")
-
-
- def setBaudrate(self, baudrate):
- """Change baud rate. It raises a ValueError if the port is open and the
- baud rate is not possible. If the port is closed, then the value is
- accepted and the exception is raised when the port is opened."""
- try:
- self._baudrate = int(baudrate)
- except TypeError:
- raise ValueError("Not a valid baudrate: %r" % (baudrate,))
- else:
- if self._isOpen: self._reconfigurePort()
-
- def getBaudrate(self):
- """Get the current baud rate setting."""
- return self._baudrate
-
- baudrate = property(getBaudrate, setBaudrate, doc="Baud rate setting")
-
-
- def setByteSize(self, bytesize):
- """Change byte size."""
- if bytesize not in self.BYTESIZES: raise ValueError("Not a valid byte size: %r" % (bytesize,))
- self._bytesize = bytesize
- if self._isOpen: self._reconfigurePort()
-
- def getByteSize(self):
- """Get the current byte size setting."""
- return self._bytesize
-
- bytesize = property(getByteSize, setByteSize, doc="Byte size setting")
-
-
- def setParity(self, parity):
- """Change parity setting."""
- if parity not in self.PARITIES: raise ValueError("Not a valid parity: %r" % (parity,))
- self._parity = parity
- if self._isOpen: self._reconfigurePort()
-
- def getParity(self):
- """Get the current parity setting."""
- return self._parity
-
- parity = property(getParity, setParity, doc="Parity setting")
-
-
- def setStopbits(self, stopbits):
- """Change stop bits size."""
- if stopbits not in self.STOPBITS: raise ValueError("Not a valid stop bit size: %r" % (stopbits,))
- self._stopbits = stopbits
- if self._isOpen: self._reconfigurePort()
-
- def getStopbits(self):
- """Get the current stop bits setting."""
- return self._stopbits
-
- stopbits = property(getStopbits, setStopbits, doc="Stop bits setting")
-
-
- def setTimeout(self, timeout):
- """Change timeout setting."""
- if timeout is not None:
- try:
- timeout + 1 # test if it's a number, will throw a TypeError if not...
- except TypeError:
- raise ValueError("Not a valid timeout: %r" % (timeout,))
- if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,))
- self._timeout = timeout
- if self._isOpen: self._reconfigurePort()
-
- def getTimeout(self):
- """Get the current timeout setting."""
- return self._timeout
-
- timeout = property(getTimeout, setTimeout, doc="Timeout setting for read()")
-
-
- def setWriteTimeout(self, timeout):
- """Change timeout setting."""
- if timeout is not None:
- if timeout < 0: raise ValueError("Not a valid timeout: %r" % (timeout,))
- try:
- timeout + 1 #test if it's a number, will throw a TypeError if not...
- except TypeError:
- raise ValueError("Not a valid timeout: %r" % timeout)
-
- self._writeTimeout = timeout
- if self._isOpen: self._reconfigurePort()
-
- def getWriteTimeout(self):
- """Get the current timeout setting."""
- return self._writeTimeout
-
- writeTimeout = property(getWriteTimeout, setWriteTimeout, doc="Timeout setting for write()")
-
-
- def setXonXoff(self, xonxoff):
- """Change XON/XOFF setting."""
- self._xonxoff = xonxoff
- if self._isOpen: self._reconfigurePort()
-
- def getXonXoff(self):
- """Get the current XON/XOFF setting."""
- return self._xonxoff
-
- xonxoff = property(getXonXoff, setXonXoff, doc="XON/XOFF setting")
-
- def setRtsCts(self, rtscts):
- """Change RTS/CTS flow control setting."""
- self._rtscts = rtscts
- if self._isOpen: self._reconfigurePort()
-
- def getRtsCts(self):
- """Get the current RTS/CTS flow control setting."""
- return self._rtscts
-
- rtscts = property(getRtsCts, setRtsCts, doc="RTS/CTS flow control setting")
-
- def setDsrDtr(self, dsrdtr=None):
- """Change DsrDtr flow control setting."""
- if dsrdtr is None:
- # if not set, keep backwards compatibility and follow rtscts setting
- self._dsrdtr = self._rtscts
- else:
- # if defined independently, follow its value
- self._dsrdtr = dsrdtr
- if self._isOpen: self._reconfigurePort()
-
- def getDsrDtr(self):
- """Get the current DSR/DTR flow control setting."""
- return self._dsrdtr
-
- dsrdtr = property(getDsrDtr, setDsrDtr, "DSR/DTR flow control setting")
-
- def setInterCharTimeout(self, interCharTimeout):
- """Change inter-character timeout setting."""
- if interCharTimeout is not None:
- if interCharTimeout < 0: raise ValueError("Not a valid timeout: %r" % interCharTimeout)
- try:
- interCharTimeout + 1 # test if it's a number, will throw a TypeError if not...
- except TypeError:
- raise ValueError("Not a valid timeout: %r" % interCharTimeout)
-
- self._interCharTimeout = interCharTimeout
- if self._isOpen: self._reconfigurePort()
-
- def getInterCharTimeout(self):
- """Get the current inter-character timeout setting."""
- return self._interCharTimeout
-
- interCharTimeout = property(getInterCharTimeout, setInterCharTimeout, doc="Inter-character timeout setting for read()")
-
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def __repr__(self):
- """String representation of the current port settings and its state."""
- return "%s<id=0x%x, open=%s>(port=%r, baudrate=%r, bytesize=%r, parity=%r, stopbits=%r, timeout=%r, xonxoff=%r, rtscts=%r, dsrdtr=%r)" % (
- self.__class__.__name__,
- id(self),
- self._isOpen,
- self.portstr,
- self.baudrate,
- self.bytesize,
- self.parity,
- self.stopbits,
- self.timeout,
- self.xonxoff,
- self.rtscts,
- self.dsrdtr,
- )
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def readline(self, size=None, eol='\n'):
- """read a line which is terminated with end-of-line (eol) character
- ('\n' by default) or until timeout"""
- line = ''
- while 1:
- c = self.read(1)
- if c:
- line += c # not very efficient but lines are usually not that long
- if c == eol:
- break
- if size is not None and len(line) >= size:
- break
- else:
- break
- return bytes(line)
-
- def readlines(self, sizehint=None, eol='\n'):
- """read a list of lines, until timeout
- sizehint is ignored"""
- if self.timeout is None:
- raise ValueError("Serial port MUST have enabled timeout for this function!")
- lines = []
- while 1:
- line = self.readline(eol=eol)
- if line:
- lines.append(line)
- if line[-1] != eol: # was the line received with a timeout?
- break
- else:
- break
- return lines
-
- def xreadlines(self, sizehint=None):
- """just call readlines - here for compatibility"""
- return self.readlines()
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
- # compatibility with io library
-
- def readable(self): return True
- def writable(self): return True
- def seekable(self): return False
- def readinto(self, b):
- data = self.read(len(b))
- n = len(data)
- try:
- b[:n] = data
- except TypeError, err:
- import array
- if not isinstance(b, array.array):
- raise err
- b[:n] = array.array('b', data)
- return n
-
-
-if __name__ == '__main__':
- import sys
- s = SerialBase()
- sys.stdout.write('port name: %s\n' % s.portstr)
- sys.stdout.write('baud rates: %s\n' % s.getSupportedBaudrates())
- sys.stdout.write('byte sizes: %s\n' % s.getSupportedByteSizes())
- sys.stdout.write('parities: %s\n' % s.getSupportedParities())
- sys.stdout.write('stop bits: %s\n' % s.getSupportedStopbits())
- sys.stdout.write('%s\n' % s)
diff --git a/pyserial/serial/serialwin32.py b/pyserial/serial/serialwin32.py
deleted file mode 100644
index af436bb..0000000
--- a/pyserial/serial/serialwin32.py
+++ /dev/null
@@ -1,383 +0,0 @@
-#! python
-# Python Serial Port Extension for Win32, Linux, BSD, Jython
-# serial driver for win32
-# see __init__.py
-#
-# (C) 2001-2009 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-#
-# Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com>
-
-import ctypes
-import win32
-
-from serialutil import *
-
-
-def device(portnum):
- """Turn a port number into a device name"""
- return 'COM%d' % (portnum+1) # numbers are transformed to a string
-
-
-class Win32Serial(SerialBase):
- """Serial port implementation for Win32 based on ctypes."""
-
- BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
- 9600, 19200, 38400, 57600, 115200)
-
- def open(self):
- """Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- self.hComPort = None
- # the "\\.\COMx" format is required for devices other than COM1-COM8
- # not all versions of windows seem to support this properly
- # so that the first few ports are used with the DOS device name
- port = self.portstr
- try:
- if port.upper().startswith('COM') and int(port[3:]) > 8:
- port = '\\\\.\\' + port
- except ValueError:
- # for like COMnotanumber
- pass
- self.hComPort = win32.CreateFile(port,
- win32.GENERIC_READ | win32.GENERIC_WRITE,
- 0, # exclusive access
- None, # no security
- win32.OPEN_EXISTING,
- win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
- 0)
- if self.hComPort == win32.INVALID_HANDLE_VALUE:
- self.hComPort = None # 'cause __del__ is called anyway
- raise SerialException("could not open port %s: %s" % (self.portstr, ctypes.WinError()))
-
- # Setup a 4k buffer
- win32.SetupComm(self.hComPort, 4096, 4096)
-
- # Save original timeout values:
- self._orgTimeouts = win32.COMMTIMEOUTS()
- win32.GetCommTimeouts(self.hComPort, ctypes.byref(self._orgTimeouts))
-
- self._rtsState = win32.RTS_CONTROL_ENABLE
- self._dtrState = win32.DTR_CONTROL_ENABLE
-
- self._reconfigurePort()
-
- # Clear buffers:
- # Remove anything that was there
- win32.PurgeComm(self.hComPort,
- win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
- win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
-
- self._overlappedRead = win32.OVERLAPPED()
- self._overlappedRead.hEvent = win32.CreateEvent(None, 1, 0, None)
- self._overlappedWrite = win32.OVERLAPPED()
- #~ self._overlappedWrite.hEvent = win32.CreateEvent(None, 1, 0, None)
- self._overlappedWrite.hEvent = win32.CreateEvent(None, 0, 0, None)
- self._isOpen = True
-
- def _reconfigurePort(self):
- """Set communication parameters on opened port."""
- if not self.hComPort:
- raise SerialException("Can only operate on a valid port handle")
-
- # Set Windows timeout values
- # timeouts is a tuple with the following items:
- # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
- # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
- # WriteTotalTimeoutConstant)
- if self._timeout is None:
- timeouts = (0, 0, 0, 0, 0)
- elif self._timeout == 0:
- timeouts = (win32.MAXDWORD, 0, 0, 0, 0)
- else:
- timeouts = (0, 0, int(self._timeout*1000), 0, 0)
- if self._timeout != 0 and self._interCharTimeout is not None:
- timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
-
- if self._writeTimeout is None:
- pass
- elif self._writeTimeout == 0:
- timeouts = timeouts[:-2] + (0, win32.MAXDWORD)
- else:
- timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000))
- win32.SetCommTimeouts(self.hComPort, ctypes.byref(win32.COMMTIMEOUTS(*timeouts)))
-
- win32.SetCommMask(self.hComPort, win32.EV_ERR)
-
- # Setup the connection info.
- # Get state and modify it:
- comDCB = win32.DCB()
- win32.GetCommState(self.hComPort, ctypes.byref(comDCB))
- comDCB.BaudRate = self._baudrate
-
- if self._bytesize == FIVEBITS:
- comDCB.ByteSize = 5
- elif self._bytesize == SIXBITS:
- comDCB.ByteSize = 6
- elif self._bytesize == SEVENBITS:
- comDCB.ByteSize = 7
- elif self._bytesize == EIGHTBITS:
- comDCB.ByteSize = 8
- else:
- raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
-
- if self._parity == PARITY_NONE:
- comDCB.Parity = win32.NOPARITY
- comDCB.fParity = 0 # Disable Parity Check
- elif self._parity == PARITY_EVEN:
- comDCB.Parity = win32.EVENPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_ODD:
- comDCB.Parity = win32.ODDPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_MARK:
- comDCB.Parity = win32.MARKPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_SPACE:
- comDCB.Parity = win32.SPACEPARITY
- comDCB.fParity = 1 # Enable Parity Check
- else:
- raise ValueError("Unsupported parity mode: %r" % self._parity)
-
- if self._stopbits == STOPBITS_ONE:
- comDCB.StopBits = win32.ONESTOPBIT
- elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
- comDCB.StopBits = win32.ONE5STOPBITS
- elif self._stopbits == STOPBITS_TWO:
- comDCB.StopBits = win32.TWOSTOPBITS
- else:
- raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
-
- comDCB.fBinary = 1 # Enable Binary Transmission
- # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
- if self._rtscts:
- comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
- else:
- comDCB.fRtsControl = self._rtsState
- if self._dsrdtr:
- comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
- else:
- comDCB.fDtrControl = self._dtrState
- comDCB.fOutxCtsFlow = self._rtscts
- comDCB.fOutxDsrFlow = self._dsrdtr
- comDCB.fOutX = self._xonxoff
- comDCB.fInX = self._xonxoff
- comDCB.fNull = 0
- comDCB.fErrorChar = 0
- comDCB.fAbortOnError = 0
- comDCB.XonChar = XON
- comDCB.XoffChar = XOFF
-
- if not win32.SetCommState(self.hComPort, ctypes.byref(comDCB)):
- raise ValueError("Cannot configure port, some setting was wrong. Original message: %s" % ctypes.WinError())
-
- #~ def __del__(self):
- #~ self.close()
-
- def close(self):
- """Close port"""
- if self._isOpen:
- if self.hComPort:
- # Restore original timeout values:
- win32.SetCommTimeouts(self.hComPort, self._orgTimeouts)
- # Close COM-Port:
- win32.CloseHandle(self.hComPort)
- win32.CloseHandle(self._overlappedRead.hEvent)
- win32.CloseHandle(self._overlappedWrite.hEvent)
- self.hComPort = None
- self._isOpen = False
-
- def makeDeviceName(self, port):
- return device(port)
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def inWaiting(self):
- """Return the number of characters currently in the input buffer."""
- flags = win32.DWORD()
- comstat = win32.COMSTAT()
- if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
- raise SerialException('call to ClearCommError failed')
- return comstat.cbInQue
-
- def read(self, size=1):
- """Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read."""
- if not self.hComPort: raise portNotOpenError
- if size > 0:
- win32.ResetEvent(self._overlappedRead.hEvent)
- flags = win32.DWORD()
- comstat = win32.COMSTAT()
- if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
- raise SerialException('call to ClearCommError failed')
- if self.timeout == 0:
- n = min(comstat.cbInQue, size)
- if n > 0:
- buf = ctypes.create_string_buffer(n)
- rc = win32.DWORD()
- err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead))
- if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
- raise SerialException("ReadFile failed (%s)" % ctypes.WinError())
- err = win32.WaitForSingleObject(self._overlappedRead.hEvent, win32.INFINITE)
- read = buf.raw[:rc.value]
- else:
- read = bytes()
- else:
- buf = ctypes.create_string_buffer(size)
- rc = win32.DWORD()
- err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead))
- if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
- raise SerialException("ReadFile failed (%s)" % ctypes.WinError())
- err = win32.GetOverlappedResult(self.hComPort, ctypes.byref(self._overlappedRead), ctypes.byref(rc), True)
- read = buf.raw[:rc.value]
- else:
- read = bytes()
- return bytes(read)
-
- def write(self, data):
- """Output the given string over the serial port."""
- if not self.hComPort: raise portNotOpenError
- #~ if not isinstance(data, (bytes, bytearray)):
- #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
- # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
- data = bytes(data)
- if data:
- #~ win32event.ResetEvent(self._overlappedWrite.hEvent)
- n = win32.DWORD()
- err = win32.WriteFile(self.hComPort, data, len(data), ctypes.byref(n), self._overlappedWrite)
- if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
- raise SerialException("WriteFile failed (%s)" % ctypes.WinError())
- # Wait for the write to complete.
- #~ win32.WaitForSingleObject(self._overlappedWrite.hEvent, win32.INFINITE)
- err = win32.GetOverlappedResult(self.hComPort, self._overlappedWrite, ctypes.byref(n), True)
- if n.value != len(data):
- raise writeTimeoutError
- return n.value
- else:
- return 0
-
-
- def flushInput(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if not self.hComPort: raise portNotOpenError
- win32.PurgeComm(self.hComPort, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
-
- def flushOutput(self):
- """Clear output buffer, aborting the current output and
- discarding all that is in the buffer."""
- if not self.hComPort: raise portNotOpenError
- win32.PurgeComm(self.hComPort, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
-
- def sendBreak(self, duration=0.25):
- """Send break condition. Timed, returns to idle state after given duration."""
- if not self.hComPort: raise portNotOpenError
- import time
- win32.SetCommBreak(self.hComPort)
- time.sleep(duration)
- win32.ClearCommBreak(self.hComPort)
-
- def setBreak(self, level=1):
- """Set break: Controls TXD. When active, to transmitting is possible."""
- if not self.hComPort: raise portNotOpenError
- if level:
- win32.SetCommBreak(self.hComPort)
- else:
- win32.ClearCommBreak(self.hComPort)
-
- def setRTS(self, level=1):
- """Set terminal status line: Request To Send"""
- if not self.hComPort: raise portNotOpenError
- if level:
- self._rtsState = win32.RTS_CONTROL_ENABLE
- win32.EscapeCommFunction(self.hComPort, win32.SETRTS)
- else:
- self._rtsState = win32.RTS_CONTROL_DISABLE
- win32.EscapeCommFunction(self.hComPort, win32.CLRRTS)
-
- def setDTR(self, level=1):
- """Set terminal status line: Data Terminal Ready"""
- if not self.hComPort: raise portNotOpenError
- if level:
- self._dtrState = win32.DTR_CONTROL_ENABLE
- win32.EscapeCommFunction(self.hComPort, win32.SETDTR)
- else:
- self._dtrState = win32.DTR_CONTROL_DISABLE
- win32.EscapeCommFunction(self.hComPort, win32.CLRDTR)
-
- def _GetCommModemStatus(self):
- stat = win32.DWORD()
- win32.GetCommModemStatus(self.hComPort, ctypes.byref(stat))
- return stat.value
-
- def getCTS(self):
- """Read terminal status line: Clear To Send"""
- if not self.hComPort: raise portNotOpenError
- return win32.MS_CTS_ON & self._GetCommModemStatus() != 0
-
- def getDSR(self):
- """Read terminal status line: Data Set Ready"""
- if not self.hComPort: raise portNotOpenError
- return win32.MS_DSR_ON & self._GetCommModemStatus() != 0
-
- def getRI(self):
- """Read terminal status line: Ring Indicator"""
- if not self.hComPort: raise portNotOpenError
- return win32.MS_RING_ON & self._GetCommModemStatus() != 0
-
- def getCD(self):
- """Read terminal status line: Carrier Detect"""
- if not self.hComPort: raise portNotOpenError
- return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0
-
- # - - platform specific - - - -
-
- def setXON(self, level=True):
- """Platform specific - set flow state."""
- if not self.hComPort: raise portNotOpenError
- if level:
- win32.EscapeCommFunction(self.hComPort, win32.SETXON)
- else:
- win32.EscapeCommFunction(self.hComPort, win32.SETXOFF)
-
- def outWaiting(self):
- """return how many characters the in the outgoing buffer"""
- flags = win32.DWORD()
- comstat = win32.COMSTAT()
- if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
- raise SerialException('call to ClearCommError failed')
- return comstat.cbOutQue
-
-
-# assemble Serial class with the platform specific implementation and the base
-# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
-# library, derive from io.RawIOBase
-try:
- import io
-except ImportError:
- # classic version with our own file-like emulation
- class Serial(Win32Serial, FileLike):
- pass
-else:
- # io library present
- class Serial(Win32Serial, io.RawIOBase):
- pass
-
-
-# Nur Testfunktion!!
-if __name__ == '__main__':
- s = Serial(0)
- sys.stdout.write("%s\n" % s)
-
- s = Serial()
- sys.stdout.write("%s\n" % s)
-
- s.baudrate = 19200
- s.databits = 7
- s.close()
- s.port = 0
- s.open()
- sys.stdout.write("%s\n" % s)
-
diff --git a/pyserial/serial/sermsdos.py b/pyserial/serial/sermsdos.py
deleted file mode 100644
index c3560d0..0000000
--- a/pyserial/serial/sermsdos.py
+++ /dev/null
@@ -1,200 +0,0 @@
-# sermsdos.py
-#
-# History:
-#
-# 3rd September 2002 Dave Haynes
-# 1. First defined
-#
-# Although this code should run under the latest versions of
-# Python, on DOS-based platforms such as Windows 95 and 98,
-# it has been specifically written to be compatible with
-# PyDOS, available at:
-# http://www.python.org/ftp/python/wpy/dos.html
-#
-# PyDOS is a stripped-down version of Python 1.5.2 for
-# DOS machines. Therefore, in making changes to this file,
-# please respect Python 1.5.2 syntax. In addition, please
-# limit the width of this file to 60 characters.
-#
-# Note also that the modules in PyDOS contain fewer members
-# than other versions, so we are restricted to using the
-# following:
-#
-# In module os:
-# -------------
-# environ, chdir, getcwd, getpid, umask, fdopen, close,
-# dup, dup2, fstat, lseek, open, read, write, O_RDONLY,
-# O_WRONLY, O_RDWR, O_APPEND, O_CREAT, O_EXCL, O_TRUNC,
-# access, F_OK, R_OK, W_OK, X_OK, chmod, listdir, mkdir,
-# remove, rename, renames, rmdir, stat, unlink, utime,
-# execl, execle, execlp, execlpe, execvp, execvpe, _exit,
-# system.
-#
-# In module os.path:
-# ------------------
-# curdir, pardir, sep, altsep, pathsep, defpath, linesep.
-#
-
-import os
-import sys
-import string
-import serialutil
-
-BAUD_RATES = {
- 110: "11",
- 150: "15",
- 300: "30",
- 600: "60",
- 1200: "12",
- 2400: "24",
- 4800: "48",
- 9600: "96",
- 19200: "19"}
-
-(PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK,
-PARITY_SPACE) = (0, 1, 2, 3, 4)
-(STOPBITS_ONE, STOPBITS_ONEANDAHALF,
-STOPBITS_TWO) = (1, 1.5, 2)
-FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8)
-(RETURN_ERROR, RETURN_BUSY, RETURN_RETRY, RETURN_READY,
-RETURN_NONE) = ('E', 'B', 'P', 'R', 'N')
-portNotOpenError = ValueError('port not open')
-
-def device(portnum):
- return 'COM%d' % (portnum+1)
-
-class Serial(serialutil.FileLike):
- """
- port: number of device; numbering starts at
- zero. if everything fails, the user can
- specify a device string, note that this
- isn't portable any more
- baudrate: baud rate
- bytesize: number of databits
- parity: enable parity checking
- stopbits: number of stopbits
- timeout: set a timeout (None for waiting forever)
- xonxoff: enable software flow control
- rtscts: enable RTS/CTS flow control
- retry: DOS retry mode
- """
- def __init__(self,
- port,
- baudrate = 9600,
- bytesize = EIGHTBITS,
- parity = PARITY_NONE,
- stopbits = STOPBITS_ONE,
- timeout = None,
- xonxoff = 0,
- rtscts = 0,
- retry = RETURN_RETRY
- ):
-
- if type(port) == type(''):
- # strings are taken directly
- self.portstr = port
- else:
- # numbers are transformed to a string
- self.portstr = device(port+1)
-
- self.baud = BAUD_RATES[baudrate]
- self.bytesize = str(bytesize)
-
- if parity == PARITY_NONE:
- self.parity = 'N'
- elif parity == PARITY_EVEN:
- self.parity = 'E'
- elif parity == PARITY_ODD:
- self.parity = 'O'
- elif parity == PARITY_MARK:
- self.parity = 'M'
- elif parity == PARITY_SPACE:
- self.parity = 'S'
-
- self.stop = str(stopbits)
- self.retry = retry
- self.filename = "sermsdos.tmp"
-
- self._config(self.portstr, self.baud, self.parity,
- self.bytesize, self.stop, self.retry, self.filename)
-
- def __del__(self):
- self.close()
-
- def close(self):
- pass
-
- def _config(self, port, baud, parity, data, stop, retry,
- filename):
- comString = string.join(("MODE ", port, ":"
- , " BAUD= ", baud, " PARITY= ", parity
- , " DATA= ", data, " STOP= ", stop, " RETRY= ",
- retry, " > ", filename ), '')
- os.system(comString)
-
- def setBaudrate(self, baudrate):
- self._config(self.portstr, BAUD_RATES[baudrate],
- self.parity, self.bytesize, self.stop, self.retry,
- self.filename)
-
- def inWaiting(self):
- """returns the number of bytes waiting to be read"""
- raise NotImplementedError
-
- def read(self, num = 1):
- """Read num bytes from serial port"""
- handle = os.open(self.portstr,
- os.O_RDONLY | os.O_BINARY)
- rv = os.read(handle, num)
- os.close(handle)
- return rv
-
- def write(self, s):
- """Write string to serial port"""
- handle = os.open(self.portstr,
- os.O_WRONLY | os.O_BINARY)
- rv = os.write(handle, s)
- os.close(handle)
- return rv
-
- def flushInput(self):
- raise NotImplementedError
-
- def flushOutput(self):
- raise NotImplementedError
-
- def sendBreak(self):
- raise NotImplementedError
-
- def setRTS(self,level=1):
- """Set terminal status line"""
- raise NotImplementedError
-
- def setDTR(self,level=1):
- """Set terminal status line"""
- raise NotImplementedError
-
- def getCTS(self):
- """Eead terminal status line"""
- raise NotImplementedError
-
- def getDSR(self):
- """Eead terminal status line"""
- raise NotImplementedError
-
- def getRI(self):
- """Eead terminal status line"""
- raise NotImplementedError
-
- def getCD(self):
- """Eead terminal status line"""
- raise NotImplementedError
-
- def __repr__(self):
- return string.join(( "<Serial>: ", self.portstr
- , self.baud, self.parity, self.bytesize, self.stop,
- self.retry , self.filename), ' ')
-
-if __name__ == '__main__':
- s = Serial(0)
- sys.stdio.write('%s %s\n' % (__name__, s))
diff --git a/pyserial/serial/win32.py b/pyserial/serial/win32.py
deleted file mode 100644
index 9b8c118..0000000
--- a/pyserial/serial/win32.py
+++ /dev/null
@@ -1,288 +0,0 @@
-from ctypes import *
-from ctypes.wintypes import HANDLE
-from ctypes.wintypes import BOOL
-from ctypes.wintypes import LPCWSTR
-_stdcall_libraries = {}
-_stdcall_libraries['kernel32'] = WinDLL('kernel32')
-from ctypes.wintypes import DWORD
-from ctypes.wintypes import WORD
-from ctypes.wintypes import BYTE
-
-INVALID_HANDLE_VALUE = HANDLE(-1).value
-
-class _SECURITY_ATTRIBUTES(Structure):
- pass
-LPSECURITY_ATTRIBUTES = POINTER(_SECURITY_ATTRIBUTES)
-
-CreateEventW = _stdcall_libraries['kernel32'].CreateEventW
-CreateEventW.restype = HANDLE
-CreateEventW.argtypes = [LPSECURITY_ATTRIBUTES, BOOL, BOOL, LPCWSTR]
-CreateEvent = CreateEventW # alias
-
-CreateFileW = _stdcall_libraries['kernel32'].CreateFileW
-CreateFileW.restype = HANDLE
-CreateFileW.argtypes = [LPCWSTR, DWORD, DWORD, LPSECURITY_ATTRIBUTES, DWORD, DWORD, HANDLE]
-CreateFile = CreateFileW # alias
-
-class _OVERLAPPED(Structure):
- pass
-OVERLAPPED = _OVERLAPPED
-
-class _COMSTAT(Structure):
- pass
-COMSTAT = _COMSTAT
-
-class _DCB(Structure):
- pass
-DCB = _DCB
-
-class _COMMTIMEOUTS(Structure):
- pass
-COMMTIMEOUTS = _COMMTIMEOUTS
-
-GetLastError = _stdcall_libraries['kernel32'].GetLastError
-GetLastError.restype = DWORD
-GetLastError.argtypes = []
-
-LPOVERLAPPED = POINTER(_OVERLAPPED)
-LPDWORD = POINTER(DWORD)
-
-GetOverlappedResult = _stdcall_libraries['kernel32'].GetOverlappedResult
-GetOverlappedResult.restype = BOOL
-GetOverlappedResult.argtypes = [HANDLE, LPOVERLAPPED, LPDWORD, BOOL]
-
-ResetEvent = _stdcall_libraries['kernel32'].ResetEvent
-ResetEvent.restype = BOOL
-ResetEvent.argtypes = [HANDLE]
-
-LPCVOID = c_void_p
-
-WriteFile = _stdcall_libraries['kernel32'].WriteFile
-WriteFile.restype = BOOL
-WriteFile.argtypes = [HANDLE, LPCVOID, DWORD, LPDWORD, LPOVERLAPPED]
-
-LPVOID = c_void_p
-
-ReadFile = _stdcall_libraries['kernel32'].ReadFile
-ReadFile.restype = BOOL
-ReadFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, LPOVERLAPPED]
-
-CloseHandle = _stdcall_libraries['kernel32'].CloseHandle
-CloseHandle.restype = BOOL
-CloseHandle.argtypes = [HANDLE]
-
-ClearCommBreak = _stdcall_libraries['kernel32'].ClearCommBreak
-ClearCommBreak.restype = BOOL
-ClearCommBreak.argtypes = [HANDLE]
-
-LPCOMSTAT = POINTER(_COMSTAT)
-
-ClearCommError = _stdcall_libraries['kernel32'].ClearCommError
-ClearCommError.restype = BOOL
-ClearCommError.argtypes = [HANDLE, LPDWORD, LPCOMSTAT]
-
-SetupComm = _stdcall_libraries['kernel32'].SetupComm
-SetupComm.restype = BOOL
-SetupComm.argtypes = [HANDLE, DWORD, DWORD]
-
-EscapeCommFunction = _stdcall_libraries['kernel32'].EscapeCommFunction
-EscapeCommFunction.restype = BOOL
-EscapeCommFunction.argtypes = [HANDLE, DWORD]
-
-GetCommModemStatus = _stdcall_libraries['kernel32'].GetCommModemStatus
-GetCommModemStatus.restype = BOOL
-GetCommModemStatus.argtypes = [HANDLE, LPDWORD]
-
-LPDCB = POINTER(_DCB)
-
-GetCommState = _stdcall_libraries['kernel32'].GetCommState
-GetCommState.restype = BOOL
-GetCommState.argtypes = [HANDLE, LPDCB]
-
-LPCOMMTIMEOUTS = POINTER(_COMMTIMEOUTS)
-
-GetCommTimeouts = _stdcall_libraries['kernel32'].GetCommTimeouts
-GetCommTimeouts.restype = BOOL
-GetCommTimeouts.argtypes = [HANDLE, LPCOMMTIMEOUTS]
-
-PurgeComm = _stdcall_libraries['kernel32'].PurgeComm
-PurgeComm.restype = BOOL
-PurgeComm.argtypes = [HANDLE, DWORD]
-
-SetCommBreak = _stdcall_libraries['kernel32'].SetCommBreak
-SetCommBreak.restype = BOOL
-SetCommBreak.argtypes = [HANDLE]
-
-SetCommMask = _stdcall_libraries['kernel32'].SetCommMask
-SetCommMask.restype = BOOL
-SetCommMask.argtypes = [HANDLE, DWORD]
-
-SetCommState = _stdcall_libraries['kernel32'].SetCommState
-SetCommState.restype = BOOL
-SetCommState.argtypes = [HANDLE, LPDCB]
-
-SetCommTimeouts = _stdcall_libraries['kernel32'].SetCommTimeouts
-SetCommTimeouts.restype = BOOL
-SetCommTimeouts.argtypes = [HANDLE, LPCOMMTIMEOUTS]
-
-WaitForSingleObject = _stdcall_libraries['kernel32'].WaitForSingleObject
-WaitForSingleObject.restype = DWORD
-WaitForSingleObject.argtypes = [HANDLE, DWORD]
-
-ONESTOPBIT = 0 # Variable c_int
-TWOSTOPBITS = 2 # Variable c_int
-ONE5STOPBITS = 1
-
-NOPARITY = 0 # Variable c_int
-ODDPARITY = 1 # Variable c_int
-EVENPARITY = 2 # Variable c_int
-MARKPARITY = 3
-SPACEPARITY = 4
-
-RTS_CONTROL_HANDSHAKE = 2 # Variable c_int
-RTS_CONTROL_DISABLE = 0 # Variable c_int
-RTS_CONTROL_ENABLE = 1 # Variable c_int
-SETRTS = 3
-CLRRTS = 4
-
-DTR_CONTROL_HANDSHAKE = 2 # Variable c_int
-DTR_CONTROL_DISABLE = 0 # Variable c_int
-DTR_CONTROL_ENABLE = 1 # Variable c_int
-SETDTR = 5
-CLRDTR = 6
-
-MS_DSR_ON = 32 # Variable c_ulong
-EV_RING = 256 # Variable c_int
-EV_PERR = 512 # Variable c_int
-EV_ERR = 128 # Variable c_int
-SETXOFF = 1 # Variable c_int
-EV_RXCHAR = 1 # Variable c_int
-GENERIC_WRITE = 1073741824 # Variable c_long
-PURGE_TXCLEAR = 4 # Variable c_int
-FILE_FLAG_OVERLAPPED = 1073741824 # Variable c_int
-EV_DSR = 16 # Variable c_int
-MAXDWORD = 4294967295L # Variable c_uint
-EV_RLSD = 32 # Variable c_int
-ERROR_IO_PENDING = 997 # Variable c_long
-MS_CTS_ON = 16 # Variable c_ulong
-EV_EVENT1 = 2048 # Variable c_int
-EV_RX80FULL = 1024 # Variable c_int
-PURGE_RXABORT = 2 # Variable c_int
-FILE_ATTRIBUTE_NORMAL = 128 # Variable c_int
-PURGE_TXABORT = 1 # Variable c_int
-SETXON = 2 # Variable c_int
-OPEN_EXISTING = 3 # Variable c_int
-MS_RING_ON = 64 # Variable c_ulong
-EV_TXEMPTY = 4 # Variable c_int
-EV_RXFLAG = 2 # Variable c_int
-MS_RLSD_ON = 128 # Variable c_ulong
-GENERIC_READ = 2147483648L # Variable c_ulong
-EV_EVENT2 = 4096 # Variable c_int
-EV_CTS = 8 # Variable c_int
-EV_BREAK = 64 # Variable c_int
-PURGE_RXCLEAR = 8 # Variable c_int
-ULONG_PTR = c_ulong
-INFINITE = 0xFFFFFFFFL
-
-class N11_OVERLAPPED4DOLLAR_48E(Union):
- pass
-class N11_OVERLAPPED4DOLLAR_484DOLLAR_49E(Structure):
- pass
-N11_OVERLAPPED4DOLLAR_484DOLLAR_49E._fields_ = [
- ('Offset', DWORD),
- ('OffsetHigh', DWORD),
-]
-
-PVOID = c_void_p
-
-N11_OVERLAPPED4DOLLAR_48E._anonymous_ = ['_0']
-N11_OVERLAPPED4DOLLAR_48E._fields_ = [
- ('_0', N11_OVERLAPPED4DOLLAR_484DOLLAR_49E),
- ('Pointer', PVOID),
-]
-_OVERLAPPED._anonymous_ = ['_0']
-_OVERLAPPED._fields_ = [
- ('Internal', ULONG_PTR),
- ('InternalHigh', ULONG_PTR),
- ('_0', N11_OVERLAPPED4DOLLAR_48E),
- ('hEvent', HANDLE),
-]
-_SECURITY_ATTRIBUTES._fields_ = [
- ('nLength', DWORD),
- ('lpSecurityDescriptor', LPVOID),
- ('bInheritHandle', BOOL),
-]
-_COMSTAT._fields_ = [
- ('fCtsHold', DWORD, 1),
- ('fDsrHold', DWORD, 1),
- ('fRlsdHold', DWORD, 1),
- ('fXoffHold', DWORD, 1),
- ('fXoffSent', DWORD, 1),
- ('fEof', DWORD, 1),
- ('fTxim', DWORD, 1),
- ('fReserved', DWORD, 25),
- ('cbInQue', DWORD),
- ('cbOutQue', DWORD),
-]
-_DCB._fields_ = [
- ('DCBlength', DWORD),
- ('BaudRate', DWORD),
- ('fBinary', DWORD, 1),
- ('fParity', DWORD, 1),
- ('fOutxCtsFlow', DWORD, 1),
- ('fOutxDsrFlow', DWORD, 1),
- ('fDtrControl', DWORD, 2),
- ('fDsrSensitivity', DWORD, 1),
- ('fTXContinueOnXoff', DWORD, 1),
- ('fOutX', DWORD, 1),
- ('fInX', DWORD, 1),
- ('fErrorChar', DWORD, 1),
- ('fNull', DWORD, 1),
- ('fRtsControl', DWORD, 2),
- ('fAbortOnError', DWORD, 1),
- ('fDummy2', DWORD, 17),
- ('wReserved', WORD),
- ('XonLim', WORD),
- ('XoffLim', WORD),
- ('ByteSize', BYTE),
- ('Parity', BYTE),
- ('StopBits', BYTE),
- ('XonChar', c_char),
- ('XoffChar', c_char),
- ('ErrorChar', c_char),
- ('EofChar', c_char),
- ('EvtChar', c_char),
- ('wReserved1', WORD),
-]
-_COMMTIMEOUTS._fields_ = [
- ('ReadIntervalTimeout', DWORD),
- ('ReadTotalTimeoutMultiplier', DWORD),
- ('ReadTotalTimeoutConstant', DWORD),
- ('WriteTotalTimeoutMultiplier', DWORD),
- ('WriteTotalTimeoutConstant', DWORD),
-]
-__all__ = ['GetLastError', 'MS_CTS_ON', 'FILE_ATTRIBUTE_NORMAL',
- 'DTR_CONTROL_ENABLE', '_COMSTAT', 'MS_RLSD_ON',
- 'GetOverlappedResult', 'SETXON', 'PURGE_TXABORT',
- 'PurgeComm', 'N11_OVERLAPPED4DOLLAR_48E', 'EV_RING',
- 'ONESTOPBIT', 'SETXOFF', 'PURGE_RXABORT', 'GetCommState',
- 'RTS_CONTROL_ENABLE', '_DCB', 'CreateEvent',
- '_COMMTIMEOUTS', '_SECURITY_ATTRIBUTES', 'EV_DSR',
- 'EV_PERR', 'EV_RXFLAG', 'OPEN_EXISTING', 'DCB',
- 'FILE_FLAG_OVERLAPPED', 'EV_CTS', 'SetupComm',
- 'LPOVERLAPPED', 'EV_TXEMPTY', 'ClearCommBreak',
- 'LPSECURITY_ATTRIBUTES', 'SetCommBreak', 'SetCommTimeouts',
- 'COMMTIMEOUTS', 'ODDPARITY', 'EV_RLSD',
- 'GetCommModemStatus', 'EV_EVENT2', 'PURGE_TXCLEAR',
- 'EV_BREAK', 'EVENPARITY', 'LPCVOID', 'COMSTAT', 'ReadFile',
- 'PVOID', '_OVERLAPPED', 'WriteFile', 'GetCommTimeouts',
- 'ResetEvent', 'EV_RXCHAR', 'LPCOMSTAT', 'ClearCommError',
- 'ERROR_IO_PENDING', 'EscapeCommFunction', 'GENERIC_READ',
- 'RTS_CONTROL_HANDSHAKE', 'OVERLAPPED',
- 'DTR_CONTROL_HANDSHAKE', 'PURGE_RXCLEAR', 'GENERIC_WRITE',
- 'LPDCB', 'CreateEventW', 'SetCommMask', 'EV_EVENT1',
- 'SetCommState', 'LPVOID', 'CreateFileW', 'LPDWORD',
- 'EV_RX80FULL', 'TWOSTOPBITS', 'LPCOMMTIMEOUTS', 'MAXDWORD',
- 'MS_DSR_ON', 'MS_RING_ON',
- 'N11_OVERLAPPED4DOLLAR_484DOLLAR_49E', 'EV_ERR',
- 'ULONG_PTR', 'CreateFile', 'NOPARITY', 'CloseHandle']