summaryrefslogtreecommitdiff
path: root/serial/serialwin32.py
diff options
context:
space:
mode:
Diffstat (limited to 'serial/serialwin32.py')
-rw-r--r--serial/serialwin32.py386
1 files changed, 386 insertions, 0 deletions
diff --git a/serial/serialwin32.py b/serial/serialwin32.py
new file mode 100644
index 0000000..38f71b1
--- /dev/null
+++ b/serial/serialwin32.py
@@ -0,0 +1,386 @@
+#! python
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# serial driver for win32
+# see __init__.py
+#
+# (C) 2001-2009 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+#
+# Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com>
+
+import ctypes
+import win32
+
+from serialutil import *
+
+
+def device(portnum):
+ """Turn a port number into a device name"""
+ return 'COM%d' % (portnum+1) # numbers are transformed to a string
+
+
+class Win32Serial(SerialBase):
+ """Serial port implementation for Win32 based on ctypes."""
+
+ BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
+ 9600, 19200, 38400, 57600, 115200)
+
+ def __init__(self, *args, **kwargs):
+ self.hComPort = None
+ SerialBase.__init__(self, *args, **kwargs)
+
+ def open(self):
+ """Open port with current settings. This may throw a SerialException
+ if the port cannot be opened."""
+ if self._port is None:
+ raise SerialException("Port must be configured before it can be used.")
+ # the "\\.\COMx" format is required for devices other than COM1-COM8
+ # not all versions of windows seem to support this properly
+ # so that the first few ports are used with the DOS device name
+ port = self.portstr
+ try:
+ if port.upper().startswith('COM') and int(port[3:]) > 8:
+ port = '\\\\.\\' + port
+ except ValueError:
+ # for like COMnotanumber
+ pass
+ self.hComPort = win32.CreateFile(port,
+ win32.GENERIC_READ | win32.GENERIC_WRITE,
+ 0, # exclusive access
+ None, # no security
+ win32.OPEN_EXISTING,
+ win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
+ 0)
+ if self.hComPort == win32.INVALID_HANDLE_VALUE:
+ self.hComPort = None # 'cause __del__ is called anyway
+ raise SerialException("could not open port %s: %s" % (self.portstr, ctypes.WinError()))
+
+ # Setup a 4k buffer
+ win32.SetupComm(self.hComPort, 4096, 4096)
+
+ # Save original timeout values:
+ self._orgTimeouts = win32.COMMTIMEOUTS()
+ win32.GetCommTimeouts(self.hComPort, ctypes.byref(self._orgTimeouts))
+
+ self._rtsState = win32.RTS_CONTROL_ENABLE
+ self._dtrState = win32.DTR_CONTROL_ENABLE
+
+ self._reconfigurePort()
+
+ # Clear buffers:
+ # Remove anything that was there
+ win32.PurgeComm(self.hComPort,
+ win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
+ win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
+
+ self._overlappedRead = win32.OVERLAPPED()
+ self._overlappedRead.hEvent = win32.CreateEvent(None, 1, 0, None)
+ self._overlappedWrite = win32.OVERLAPPED()
+ #~ self._overlappedWrite.hEvent = win32.CreateEvent(None, 1, 0, None)
+ self._overlappedWrite.hEvent = win32.CreateEvent(None, 0, 0, None)
+ self._isOpen = True
+
+ def _reconfigurePort(self):
+ """Set communication parameters on opened port."""
+ if not self.hComPort:
+ raise SerialException("Can only operate on a valid port handle")
+
+ # Set Windows timeout values
+ # timeouts is a tuple with the following items:
+ # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
+ # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
+ # WriteTotalTimeoutConstant)
+ if self._timeout is None:
+ timeouts = (0, 0, 0, 0, 0)
+ elif self._timeout == 0:
+ timeouts = (win32.MAXDWORD, 0, 0, 0, 0)
+ else:
+ timeouts = (0, 0, int(self._timeout*1000), 0, 0)
+ if self._timeout != 0 and self._interCharTimeout is not None:
+ timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
+
+ if self._writeTimeout is None:
+ pass
+ elif self._writeTimeout == 0:
+ timeouts = timeouts[:-2] + (0, win32.MAXDWORD)
+ else:
+ timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000))
+ win32.SetCommTimeouts(self.hComPort, ctypes.byref(win32.COMMTIMEOUTS(*timeouts)))
+
+ win32.SetCommMask(self.hComPort, win32.EV_ERR)
+
+ # Setup the connection info.
+ # Get state and modify it:
+ comDCB = win32.DCB()
+ win32.GetCommState(self.hComPort, ctypes.byref(comDCB))
+ comDCB.BaudRate = self._baudrate
+
+ if self._bytesize == FIVEBITS:
+ comDCB.ByteSize = 5
+ elif self._bytesize == SIXBITS:
+ comDCB.ByteSize = 6
+ elif self._bytesize == SEVENBITS:
+ comDCB.ByteSize = 7
+ elif self._bytesize == EIGHTBITS:
+ comDCB.ByteSize = 8
+ else:
+ raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
+
+ if self._parity == PARITY_NONE:
+ comDCB.Parity = win32.NOPARITY
+ comDCB.fParity = 0 # Disable Parity Check
+ elif self._parity == PARITY_EVEN:
+ comDCB.Parity = win32.EVENPARITY
+ comDCB.fParity = 1 # Enable Parity Check
+ elif self._parity == PARITY_ODD:
+ comDCB.Parity = win32.ODDPARITY
+ comDCB.fParity = 1 # Enable Parity Check
+ elif self._parity == PARITY_MARK:
+ comDCB.Parity = win32.MARKPARITY
+ comDCB.fParity = 1 # Enable Parity Check
+ elif self._parity == PARITY_SPACE:
+ comDCB.Parity = win32.SPACEPARITY
+ comDCB.fParity = 1 # Enable Parity Check
+ else:
+ raise ValueError("Unsupported parity mode: %r" % self._parity)
+
+ if self._stopbits == STOPBITS_ONE:
+ comDCB.StopBits = win32.ONESTOPBIT
+ elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
+ comDCB.StopBits = win32.ONE5STOPBITS
+ elif self._stopbits == STOPBITS_TWO:
+ comDCB.StopBits = win32.TWOSTOPBITS
+ else:
+ raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
+
+ comDCB.fBinary = 1 # Enable Binary Transmission
+ # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
+ if self._rtscts:
+ comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
+ else:
+ comDCB.fRtsControl = self._rtsState
+ if self._dsrdtr:
+ comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
+ else:
+ comDCB.fDtrControl = self._dtrState
+ comDCB.fOutxCtsFlow = self._rtscts
+ comDCB.fOutxDsrFlow = self._dsrdtr
+ comDCB.fOutX = self._xonxoff
+ comDCB.fInX = self._xonxoff
+ comDCB.fNull = 0
+ comDCB.fErrorChar = 0
+ comDCB.fAbortOnError = 0
+ comDCB.XonChar = XON
+ comDCB.XoffChar = XOFF
+
+ if not win32.SetCommState(self.hComPort, ctypes.byref(comDCB)):
+ raise ValueError("Cannot configure port, some setting was wrong. Original message: %s" % ctypes.WinError())
+
+ #~ def __del__(self):
+ #~ self.close()
+
+ def close(self):
+ """Close port"""
+ if self._isOpen:
+ if self.hComPort:
+ # Restore original timeout values:
+ win32.SetCommTimeouts(self.hComPort, self._orgTimeouts)
+ # Close COM-Port:
+ win32.CloseHandle(self.hComPort)
+ win32.CloseHandle(self._overlappedRead.hEvent)
+ win32.CloseHandle(self._overlappedWrite.hEvent)
+ self.hComPort = None
+ self._isOpen = False
+
+ def makeDeviceName(self, port):
+ return device(port)
+
+ # - - - - - - - - - - - - - - - - - - - - - - - -
+
+ def inWaiting(self):
+ """Return the number of characters currently in the input buffer."""
+ flags = win32.DWORD()
+ comstat = win32.COMSTAT()
+ if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
+ raise SerialException('call to ClearCommError failed')
+ return comstat.cbInQue
+
+ def read(self, size=1):
+ """Read size bytes from the serial port. If a timeout is set it may
+ return less characters as requested. With no timeout it will block
+ until the requested number of bytes is read."""
+ if not self.hComPort: raise portNotOpenError
+ if size > 0:
+ win32.ResetEvent(self._overlappedRead.hEvent)
+ flags = win32.DWORD()
+ comstat = win32.COMSTAT()
+ if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
+ raise SerialException('call to ClearCommError failed')
+ if self.timeout == 0:
+ n = min(comstat.cbInQue, size)
+ if n > 0:
+ buf = ctypes.create_string_buffer(n)
+ rc = win32.DWORD()
+ err = win32.ReadFile(self.hComPort, buf, n, ctypes.byref(rc), ctypes.byref(self._overlappedRead))
+ if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
+ raise SerialException("ReadFile failed (%s)" % ctypes.WinError())
+ err = win32.WaitForSingleObject(self._overlappedRead.hEvent, win32.INFINITE)
+ read = buf.raw[:rc.value]
+ else:
+ read = bytes()
+ else:
+ buf = ctypes.create_string_buffer(size)
+ rc = win32.DWORD()
+ err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead))
+ if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
+ raise SerialException("ReadFile failed (%s)" % ctypes.WinError())
+ err = win32.GetOverlappedResult(self.hComPort, ctypes.byref(self._overlappedRead), ctypes.byref(rc), True)
+ read = buf.raw[:rc.value]
+ else:
+ read = bytes()
+ return bytes(read)
+
+ def write(self, data):
+ """Output the given string over the serial port."""
+ if not self.hComPort: raise portNotOpenError
+ #~ if not isinstance(data, (bytes, bytearray)):
+ #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
+ # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
+ data = bytes(data)
+ if data:
+ #~ win32event.ResetEvent(self._overlappedWrite.hEvent)
+ n = win32.DWORD()
+ err = win32.WriteFile(self.hComPort, data, len(data), ctypes.byref(n), self._overlappedWrite)
+ if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
+ raise SerialException("WriteFile failed (%s)" % ctypes.WinError())
+ # Wait for the write to complete.
+ #~ win32.WaitForSingleObject(self._overlappedWrite.hEvent, win32.INFINITE)
+ err = win32.GetOverlappedResult(self.hComPort, self._overlappedWrite, ctypes.byref(n), True)
+ if n.value != len(data):
+ raise writeTimeoutError
+ return n.value
+ else:
+ return 0
+
+
+ def flushInput(self):
+ """Clear input buffer, discarding all that is in the buffer."""
+ if not self.hComPort: raise portNotOpenError
+ win32.PurgeComm(self.hComPort, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
+
+ def flushOutput(self):
+ """Clear output buffer, aborting the current output and
+ discarding all that is in the buffer."""
+ if not self.hComPort: raise portNotOpenError
+ win32.PurgeComm(self.hComPort, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
+
+ def sendBreak(self, duration=0.25):
+ """Send break condition. Timed, returns to idle state after given duration."""
+ if not self.hComPort: raise portNotOpenError
+ import time
+ win32.SetCommBreak(self.hComPort)
+ time.sleep(duration)
+ win32.ClearCommBreak(self.hComPort)
+
+ def setBreak(self, level=1):
+ """Set break: Controls TXD. When active, to transmitting is possible."""
+ if not self.hComPort: raise portNotOpenError
+ if level:
+ win32.SetCommBreak(self.hComPort)
+ else:
+ win32.ClearCommBreak(self.hComPort)
+
+ def setRTS(self, level=1):
+ """Set terminal status line: Request To Send"""
+ if not self.hComPort: raise portNotOpenError
+ if level:
+ self._rtsState = win32.RTS_CONTROL_ENABLE
+ win32.EscapeCommFunction(self.hComPort, win32.SETRTS)
+ else:
+ self._rtsState = win32.RTS_CONTROL_DISABLE
+ win32.EscapeCommFunction(self.hComPort, win32.CLRRTS)
+
+ def setDTR(self, level=1):
+ """Set terminal status line: Data Terminal Ready"""
+ if not self.hComPort: raise portNotOpenError
+ if level:
+ self._dtrState = win32.DTR_CONTROL_ENABLE
+ win32.EscapeCommFunction(self.hComPort, win32.SETDTR)
+ else:
+ self._dtrState = win32.DTR_CONTROL_DISABLE
+ win32.EscapeCommFunction(self.hComPort, win32.CLRDTR)
+
+ def _GetCommModemStatus(self):
+ stat = win32.DWORD()
+ win32.GetCommModemStatus(self.hComPort, ctypes.byref(stat))
+ return stat.value
+
+ def getCTS(self):
+ """Read terminal status line: Clear To Send"""
+ if not self.hComPort: raise portNotOpenError
+ return win32.MS_CTS_ON & self._GetCommModemStatus() != 0
+
+ def getDSR(self):
+ """Read terminal status line: Data Set Ready"""
+ if not self.hComPort: raise portNotOpenError
+ return win32.MS_DSR_ON & self._GetCommModemStatus() != 0
+
+ def getRI(self):
+ """Read terminal status line: Ring Indicator"""
+ if not self.hComPort: raise portNotOpenError
+ return win32.MS_RING_ON & self._GetCommModemStatus() != 0
+
+ def getCD(self):
+ """Read terminal status line: Carrier Detect"""
+ if not self.hComPort: raise portNotOpenError
+ return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0
+
+ # - - platform specific - - - -
+
+ def setXON(self, level=True):
+ """Platform specific - set flow state."""
+ if not self.hComPort: raise portNotOpenError
+ if level:
+ win32.EscapeCommFunction(self.hComPort, win32.SETXON)
+ else:
+ win32.EscapeCommFunction(self.hComPort, win32.SETXOFF)
+
+ def outWaiting(self):
+ """return how many characters the in the outgoing buffer"""
+ flags = win32.DWORD()
+ comstat = win32.COMSTAT()
+ if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
+ raise SerialException('call to ClearCommError failed')
+ return comstat.cbOutQue
+
+
+# assemble Serial class with the platform specific implementation and the base
+# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
+# library, derive from io.RawIOBase
+try:
+ import io
+except ImportError:
+ # classic version with our own file-like emulation
+ class Serial(Win32Serial, FileLike):
+ pass
+else:
+ # io library present
+ class Serial(Win32Serial, io.RawIOBase):
+ pass
+
+
+# Nur Testfunktion!!
+if __name__ == '__main__':
+ s = Serial(0)
+ sys.stdout.write("%s\n" % s)
+
+ s = Serial()
+ sys.stdout.write("%s\n" % s)
+
+ s.baudrate = 19200
+ s.databits = 7
+ s.close()
+ s.port = 0
+ s.open()
+ sys.stdout.write("%s\n" % s)
+