summaryrefslogtreecommitdiff
path: root/pyserial/serial/serialwin32.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyserial/serial/serialwin32.py')
-rw-r--r--pyserial/serial/serialwin32.py383
1 files changed, 0 insertions, 383 deletions
diff --git a/pyserial/serial/serialwin32.py b/pyserial/serial/serialwin32.py
deleted file mode 100644
index af436bb..0000000
--- a/pyserial/serial/serialwin32.py
+++ /dev/null
@@ -1,383 +0,0 @@
-#! python
-# Python Serial Port Extension for Win32, Linux, BSD, Jython
-# serial driver for win32
-# see __init__.py
-#
-# (C) 2001-2009 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-#
-# Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com>
-
-import ctypes
-import win32
-
-from serialutil import *
-
-
-def device(portnum):
- """Turn a port number into a device name"""
- return 'COM%d' % (portnum+1) # numbers are transformed to a string
-
-
-class Win32Serial(SerialBase):
- """Serial port implementation for Win32 based on ctypes."""
-
- BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
- 9600, 19200, 38400, 57600, 115200)
-
- def open(self):
- """Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- self.hComPort = None
- # the "\\.\COMx" format is required for devices other than COM1-COM8
- # not all versions of windows seem to support this properly
- # so that the first few ports are used with the DOS device name
- port = self.portstr
- try:
- if port.upper().startswith('COM') and int(port[3:]) > 8:
- port = '\\\\.\\' + port
- except ValueError:
- # for like COMnotanumber
- pass
- self.hComPort = win32.CreateFile(port,
- win32.GENERIC_READ | win32.GENERIC_WRITE,
- 0, # exclusive access
- None, # no security
- win32.OPEN_EXISTING,
- win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
- 0)
- if self.hComPort == win32.INVALID_HANDLE_VALUE:
- self.hComPort = None # 'cause __del__ is called anyway
- raise SerialException("could not open port %s: %s" % (self.portstr, ctypes.WinError()))
-
- # Setup a 4k buffer
- win32.SetupComm(self.hComPort, 4096, 4096)
-
- # Save original timeout values:
- self._orgTimeouts = win32.COMMTIMEOUTS()
- win32.GetCommTimeouts(self.hComPort, ctypes.byref(self._orgTimeouts))
-
- self._rtsState = win32.RTS_CONTROL_ENABLE
- self._dtrState = win32.DTR_CONTROL_ENABLE
-
- self._reconfigurePort()
-
- # Clear buffers:
- # Remove anything that was there
- win32.PurgeComm(self.hComPort,
- win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
- win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
-
- self._overlappedRead = win32.OVERLAPPED()
- self._overlappedRead.hEvent = win32.CreateEvent(None, 1, 0, None)
- self._overlappedWrite = win32.OVERLAPPED()
- #~ self._overlappedWrite.hEvent = win32.CreateEvent(None, 1, 0, None)
- self._overlappedWrite.hEvent = win32.CreateEvent(None, 0, 0, None)
- self._isOpen = True
-
- def _reconfigurePort(self):
- """Set communication parameters on opened port."""
- if not self.hComPort:
- raise SerialException("Can only operate on a valid port handle")
-
- # Set Windows timeout values
- # timeouts is a tuple with the following items:
- # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
- # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
- # WriteTotalTimeoutConstant)
- if self._timeout is None:
- timeouts = (0, 0, 0, 0, 0)
- elif self._timeout == 0:
- timeouts = (win32.MAXDWORD, 0, 0, 0, 0)
- else:
- timeouts = (0, 0, int(self._timeout*1000), 0, 0)
- if self._timeout != 0 and self._interCharTimeout is not None:
- timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
-
- if self._writeTimeout is None:
- pass
- elif self._writeTimeout == 0:
- timeouts = timeouts[:-2] + (0, win32.MAXDWORD)
- else:
- timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000))
- win32.SetCommTimeouts(self.hComPort, ctypes.byref(win32.COMMTIMEOUTS(*timeouts)))
-
- win32.SetCommMask(self.hComPort, win32.EV_ERR)
-
- # Setup the connection info.
- # Get state and modify it:
- comDCB = win32.DCB()
- win32.GetCommState(self.hComPort, ctypes.byref(comDCB))
- comDCB.BaudRate = self._baudrate
-
- if self._bytesize == FIVEBITS:
- comDCB.ByteSize = 5
- elif self._bytesize == SIXBITS:
- comDCB.ByteSize = 6
- elif self._bytesize == SEVENBITS:
- comDCB.ByteSize = 7
- elif self._bytesize == EIGHTBITS:
- comDCB.ByteSize = 8
- else:
- raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
-
- if self._parity == PARITY_NONE:
- comDCB.Parity = win32.NOPARITY
- comDCB.fParity = 0 # Disable Parity Check
- elif self._parity == PARITY_EVEN:
- comDCB.Parity = win32.EVENPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_ODD:
- comDCB.Parity = win32.ODDPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_MARK:
- comDCB.Parity = win32.MARKPARITY
- comDCB.fParity = 1 # Enable Parity Check
- elif self._parity == PARITY_SPACE:
- comDCB.Parity = win32.SPACEPARITY
- comDCB.fParity = 1 # Enable Parity Check
- else:
- raise ValueError("Unsupported parity mode: %r" % self._parity)
-
- if self._stopbits == STOPBITS_ONE:
- comDCB.StopBits = win32.ONESTOPBIT
- elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
- comDCB.StopBits = win32.ONE5STOPBITS
- elif self._stopbits == STOPBITS_TWO:
- comDCB.StopBits = win32.TWOSTOPBITS
- else:
- raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
-
- comDCB.fBinary = 1 # Enable Binary Transmission
- # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
- if self._rtscts:
- comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
- else:
- comDCB.fRtsControl = self._rtsState
- if self._dsrdtr:
- comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
- else:
- comDCB.fDtrControl = self._dtrState
- comDCB.fOutxCtsFlow = self._rtscts
- comDCB.fOutxDsrFlow = self._dsrdtr
- comDCB.fOutX = self._xonxoff
- comDCB.fInX = self._xonxoff
- comDCB.fNull = 0
- comDCB.fErrorChar = 0
- comDCB.fAbortOnError = 0
- comDCB.XonChar = XON
- comDCB.XoffChar = XOFF
-
- if not win32.SetCommState(self.hComPort, ctypes.byref(comDCB)):
- raise ValueError("Cannot configure port, some setting was wrong. Original message: %s" % ctypes.WinError())
-
- #~ def __del__(self):
- #~ self.close()
-
- def close(self):
- """Close port"""
- if self._isOpen:
- if self.hComPort:
- # Restore original timeout values:
- win32.SetCommTimeouts(self.hComPort, self._orgTimeouts)
- # Close COM-Port:
- win32.CloseHandle(self.hComPort)
- win32.CloseHandle(self._overlappedRead.hEvent)
- win32.CloseHandle(self._overlappedWrite.hEvent)
- self.hComPort = None
- self._isOpen = False
-
- def makeDeviceName(self, port):
- return device(port)
-
- # - - - - - - - - - - - - - - - - - - - - - - - -
-
- def inWaiting(self):
- """Return the number of characters currently in the input buffer."""
- flags = win32.DWORD()
- comstat = win32.COMSTAT()
- if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
- raise SerialException('call to ClearCommError failed')
- return comstat.cbInQue
-
- def read(self, size=1):
- """Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read."""
- if not self.hComPort: raise portNotOpenError
- if size > 0:
- win32.ResetEvent(self._overlappedRead.hEvent)
- flags = win32.DWORD()
- comstat = win32.COMSTAT()
- if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
- raise SerialException('call to ClearCommError failed')
- if self.timeout == 0:
- n = min(comstat.cbInQue, size)
- if n > 0:
- buf = ctypes.create_string_buffer(n)
- rc = win32.DWORD()
- err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead))
- if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
- raise SerialException("ReadFile failed (%s)" % ctypes.WinError())
- err = win32.WaitForSingleObject(self._overlappedRead.hEvent, win32.INFINITE)
- read = buf.raw[:rc.value]
- else:
- read = bytes()
- else:
- buf = ctypes.create_string_buffer(size)
- rc = win32.DWORD()
- err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead))
- if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
- raise SerialException("ReadFile failed (%s)" % ctypes.WinError())
- err = win32.GetOverlappedResult(self.hComPort, ctypes.byref(self._overlappedRead), ctypes.byref(rc), True)
- read = buf.raw[:rc.value]
- else:
- read = bytes()
- return bytes(read)
-
- def write(self, data):
- """Output the given string over the serial port."""
- if not self.hComPort: raise portNotOpenError
- #~ if not isinstance(data, (bytes, bytearray)):
- #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
- # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
- data = bytes(data)
- if data:
- #~ win32event.ResetEvent(self._overlappedWrite.hEvent)
- n = win32.DWORD()
- err = win32.WriteFile(self.hComPort, data, len(data), ctypes.byref(n), self._overlappedWrite)
- if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
- raise SerialException("WriteFile failed (%s)" % ctypes.WinError())
- # Wait for the write to complete.
- #~ win32.WaitForSingleObject(self._overlappedWrite.hEvent, win32.INFINITE)
- err = win32.GetOverlappedResult(self.hComPort, self._overlappedWrite, ctypes.byref(n), True)
- if n.value != len(data):
- raise writeTimeoutError
- return n.value
- else:
- return 0
-
-
- def flushInput(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if not self.hComPort: raise portNotOpenError
- win32.PurgeComm(self.hComPort, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
-
- def flushOutput(self):
- """Clear output buffer, aborting the current output and
- discarding all that is in the buffer."""
- if not self.hComPort: raise portNotOpenError
- win32.PurgeComm(self.hComPort, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
-
- def sendBreak(self, duration=0.25):
- """Send break condition. Timed, returns to idle state after given duration."""
- if not self.hComPort: raise portNotOpenError
- import time
- win32.SetCommBreak(self.hComPort)
- time.sleep(duration)
- win32.ClearCommBreak(self.hComPort)
-
- def setBreak(self, level=1):
- """Set break: Controls TXD. When active, to transmitting is possible."""
- if not self.hComPort: raise portNotOpenError
- if level:
- win32.SetCommBreak(self.hComPort)
- else:
- win32.ClearCommBreak(self.hComPort)
-
- def setRTS(self, level=1):
- """Set terminal status line: Request To Send"""
- if not self.hComPort: raise portNotOpenError
- if level:
- self._rtsState = win32.RTS_CONTROL_ENABLE
- win32.EscapeCommFunction(self.hComPort, win32.SETRTS)
- else:
- self._rtsState = win32.RTS_CONTROL_DISABLE
- win32.EscapeCommFunction(self.hComPort, win32.CLRRTS)
-
- def setDTR(self, level=1):
- """Set terminal status line: Data Terminal Ready"""
- if not self.hComPort: raise portNotOpenError
- if level:
- self._dtrState = win32.DTR_CONTROL_ENABLE
- win32.EscapeCommFunction(self.hComPort, win32.SETDTR)
- else:
- self._dtrState = win32.DTR_CONTROL_DISABLE
- win32.EscapeCommFunction(self.hComPort, win32.CLRDTR)
-
- def _GetCommModemStatus(self):
- stat = win32.DWORD()
- win32.GetCommModemStatus(self.hComPort, ctypes.byref(stat))
- return stat.value
-
- def getCTS(self):
- """Read terminal status line: Clear To Send"""
- if not self.hComPort: raise portNotOpenError
- return win32.MS_CTS_ON & self._GetCommModemStatus() != 0
-
- def getDSR(self):
- """Read terminal status line: Data Set Ready"""
- if not self.hComPort: raise portNotOpenError
- return win32.MS_DSR_ON & self._GetCommModemStatus() != 0
-
- def getRI(self):
- """Read terminal status line: Ring Indicator"""
- if not self.hComPort: raise portNotOpenError
- return win32.MS_RING_ON & self._GetCommModemStatus() != 0
-
- def getCD(self):
- """Read terminal status line: Carrier Detect"""
- if not self.hComPort: raise portNotOpenError
- return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0
-
- # - - platform specific - - - -
-
- def setXON(self, level=True):
- """Platform specific - set flow state."""
- if not self.hComPort: raise portNotOpenError
- if level:
- win32.EscapeCommFunction(self.hComPort, win32.SETXON)
- else:
- win32.EscapeCommFunction(self.hComPort, win32.SETXOFF)
-
- def outWaiting(self):
- """return how many characters the in the outgoing buffer"""
- flags = win32.DWORD()
- comstat = win32.COMSTAT()
- if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
- raise SerialException('call to ClearCommError failed')
- return comstat.cbOutQue
-
-
-# assemble Serial class with the platform specific implementation and the base
-# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
-# library, derive from io.RawIOBase
-try:
- import io
-except ImportError:
- # classic version with our own file-like emulation
- class Serial(Win32Serial, FileLike):
- pass
-else:
- # io library present
- class Serial(Win32Serial, io.RawIOBase):
- pass
-
-
-# Nur Testfunktion!!
-if __name__ == '__main__':
- s = Serial(0)
- sys.stdout.write("%s\n" % s)
-
- s = Serial()
- sys.stdout.write("%s\n" % s)
-
- s.baudrate = 19200
- s.databits = 7
- s.close()
- s.port = 0
- s.open()
- sys.stdout.write("%s\n" % s)
-