summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Liechti <cliechti@gmx.net>2015-09-18 21:23:42 +0200
committerChris Liechti <cliechti@gmx.net>2015-09-18 21:23:42 +0200
commit6ed12e01f0bfaa53ea96ee5b9ec06a3414ed56e2 (patch)
treeee9349510eb44b430175f12d1137c2750a51765f
parent6d0b952003d012de4d3c5e5166b1b4c0940e29f4 (diff)
downloadpyserial-git-6ed12e01f0bfaa53ea96ee5b9ec06a3414ed56e2.tar.gz
cli: update implementation
- update test to use byte strings (IronPython 2.7 makes str == unicode so the previous data conversion function can't do its job)
-rw-r--r--serial/serialcli.py118
-rw-r--r--test/test.py27
2 files changed, 70 insertions, 75 deletions
diff --git a/serial/serialcli.py b/serial/serialcli.py
index 622f737..02605bc 100644
--- a/serial/serialcli.py
+++ b/serial/serialcli.py
@@ -13,9 +13,9 @@ import System.IO.Ports
from serial.serialutil import *
-def device(portnum):
- """Turn a port number into a device name"""
- return System.IO.Ports.SerialPort.GetPortNames()[portnum]
+#~ def device(portnum):
+ #~ """Turn a port number into a device name"""
+ #~ return System.IO.Ports.SerialPort.GetPortNames()[portnum]
# must invoke function with byte array, make a helper to convert strings
@@ -37,22 +37,22 @@ class Serial(SerialBase):
"""
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
- if self._isOpen:
+ if self.is_open:
raise SerialException("Port is already open.")
try:
self._port_handle = System.IO.Ports.SerialPort(self.portstr)
- except Exception, msg:
+ except Exception as msg:
self._port_handle = None
raise SerialException("could not open port %s: %s" % (self.portstr, msg))
self._reconfigurePort()
self._port_handle.Open()
- self._isOpen = True
+ self.is_open = True
+ if not self._dsrdtr:
+ self._update_dtr_state()
if not self._rtscts:
- self.setRTS(True)
- self.setDTR(True)
- self.flushInput()
- self.flushOutput()
+ self._update_rts_state()
+ self.reset_input_buffer()
def _reconfigurePort(self):
"""Set communication parameters on opened port."""
@@ -69,16 +69,16 @@ class Serial(SerialBase):
# if self._timeout != 0 and self._interCharTimeout is not None:
# timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
- if self._writeTimeout is None:
+ if self._write_timeout is None:
self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
else:
- self._port_handle.WriteTimeout = int(self._writeTimeout*1000)
+ self._port_handle.WriteTimeout = int(self._write_timeout*1000)
# Setup the connection info.
try:
self._port_handle.BaudRate = self._baudrate
- except IOError, e:
+ except IOError as e:
# catch errors from illegal baudrate settings
raise ValueError(str(e))
@@ -129,7 +129,7 @@ class Serial(SerialBase):
def close(self):
"""Close port"""
- if self._isOpen:
+ if self.is_open:
if self._port_handle:
try:
self._port_handle.Close()
@@ -137,19 +137,15 @@ class Serial(SerialBase):
# ignore errors. can happen for unplugged USB serial devices
pass
self._port_handle = None
- self._isOpen = False
-
- def makeDeviceName(self, port):
- try:
- return device(port)
- except TypeError, e:
- raise SerialException(str(e))
+ self.is_open = False
# - - - - - - - - - - - - - - - - - - - - - - - -
- def inWaiting(self):
+ @property
+ def in_waiting(self):
"""Return the number of characters currently in the input buffer."""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
return self._port_handle.BytesToRead
def read(self, size=1):
@@ -158,14 +154,15 @@ class Serial(SerialBase):
return less characters as requested. With no timeout it will block
until the requested number of bytes is read.
"""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
# must use single byte reads as this is the only way to read
# without applying encodings
data = bytearray()
while size:
try:
data.append(self._port_handle.ReadByte())
- except System.TimeoutException, e:
+ except System.TimeoutException as e:
break
else:
size -= 1
@@ -173,77 +170,80 @@ class Serial(SerialBase):
def write(self, data):
"""Output the given string over the serial port."""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
#~ if not isinstance(data, (bytes, bytearray)):
#~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
try:
# must call overloaded method with byte array argument
# as this is the only one not applying encodings
self._port_handle.Write(as_byte_array(data), 0, len(data))
- except System.TimeoutException, e:
+ except System.TimeoutException as e:
raise writeTimeoutError
return len(data)
- def flushInput(self):
+ def reset_input_buffer(self):
"""Clear input buffer, discarding all that is in the buffer."""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
self._port_handle.DiscardInBuffer()
- def flushOutput(self):
+ def reset_output_buffer(self):
"""\
Clear output buffer, aborting the current output and
discarding all that is in the buffer.
"""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
self._port_handle.DiscardOutBuffer()
- def sendBreak(self, duration=0.25):
- """\
- Send break condition. Timed, returns to idle state after given
- duration.
- """
- if not self._port_handle: raise portNotOpenError
- import time
- self._port_handle.BreakState = True
- time.sleep(duration)
- self._port_handle.BreakState = False
-
- def setBreak(self, level=True):
+ def _update_break_state(self):
"""
Set break: Controls TXD. When active, to transmitting is possible.
"""
- if not self._port_handle: raise portNotOpenError
- self._port_handle.BreakState = bool(level)
+ if not self._port_handle:
+ raise portNotOpenError
+ self._port_handle.BreakState = bool(self._break_state)
- def setRTS(self, level=True):
+ def _update_rts_state(self):
"""Set terminal status line: Request To Send"""
- if not self._port_handle: raise portNotOpenError
- self._port_handle.RtsEnable = bool(level)
+ if not self._port_handle:
+ raise portNotOpenError
+ self._port_handle.RtsEnable = bool(self._rts_state)
- def setDTR(self, level=True):
+ def _update_dtr_state(self):
"""Set terminal status line: Data Terminal Ready"""
- if not self._port_handle: raise portNotOpenError
- self._port_handle.DtrEnable = bool(level)
+ if not self._port_handle:
+ raise portNotOpenError
+ self._port_handle.DtrEnable = bool(self._dtr_state)
- def getCTS(self):
+ @property
+ def cts(self):
"""Read terminal status line: Clear To Send"""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
return self._port_handle.CtsHolding
- def getDSR(self):
+ @property
+ def dsr(self):
"""Read terminal status line: Data Set Ready"""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
return self._port_handle.DsrHolding
- def getRI(self):
+ @property
+ def ri(self):
"""Read terminal status line: Ring Indicator"""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
#~ return self._port_handle.XXX
return False #XXX an error would be better
- def getCD(self):
+ @property
+ def cd(self):
"""Read terminal status line: Carrier Detect"""
- if not self._port_handle: raise portNotOpenError
+ if not self._port_handle:
+ raise portNotOpenError
return self._port_handle.CDHolding
# - - platform specific - - - -
diff --git a/test/test.py b/test/test.py
index 440a7e4..ea28650 100644
--- a/test/test.py
+++ b/test/test.py
@@ -29,15 +29,10 @@ import sys
import serial
# on which port should the tests be performed:
-PORT = 0
+PORT = 'loop://'
-if sys.version_info >= (3, 0):
- def data(string):
- return bytes(string, 'latin1')
- bytes_0to255 = bytes(range(256))
-else:
- def data(string): return string
- bytes_0to255 = ''.join([chr(x) for x in range(256)])
+# indirection via bytearray b/c bytes(range(256)) does something else in Pyhton 2.7
+bytes_0to255 = bytes(bytearray(range(256)))
def segments(data, size=16):
@@ -62,7 +57,7 @@ class Test4_Nonblocking(unittest.TestCase):
def test1_ReadEmpty(self):
"""timeout: After port open, the input buffer must be empty"""
- self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer")
+ self.failUnlessEqual(self.s.read(1), b'', "expected empty buffer")
def test2_Loopback(self):
"""timeout: each sent character should return (binary test).
@@ -74,16 +69,16 @@ class Test4_Nonblocking(unittest.TestCase):
time.sleep(0.05)
self.failUnlessEqual(self.s.in_waiting, length, "expected exactly %d character for inWainting()" % length)
self.failUnlessEqual(self.s.read(length), block)#, "expected a %r which was written before" % block)
- self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read")
+ self.failUnlessEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
def test2_LoopbackTimeout(self):
"""timeout: test the timeout/immediate return.
partial results should be returned."""
- self.s.write(data("HELLO"))
+ self.s.write(b"HELLO")
time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32 and rfc2217)
# read more characters as are available to run in the timeout
- self.failUnlessEqual(self.s.read(10), data('HELLO'), "expected the 'HELLO' which was written before")
- self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read")
+ self.failUnlessEqual(self.s.read(10), b'HELLO', "expected the 'HELLO' which was written before")
+ self.failUnlessEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
class Test3_Timeout(Test4_Nonblocking):
@@ -109,7 +104,7 @@ class SendEvent(threading.Thread):
time.sleep(self.delay)
self.x.set()
if not self.stopped:
- self.serial.write(data("E"))
+ self.serial.write(b"E")
self.serial.flush()
def isSet(self):
@@ -135,7 +130,7 @@ class Test1_Forever(unittest.TestCase):
"""no timeout: after port open, the input buffer must be empty (read).
a character is sent after some time to terminate the test (SendEvent)."""
c = self.s.read(1)
- if not (self.event.isSet() and c == data('E')):
+ if not (self.event.isSet() and c == b'E'):
self.fail("expected marker (evt=%r, c=%r)" % (self.event.isSet(), c))
@@ -221,7 +216,7 @@ class Test_MoreTimeouts(unittest.TestCase):
self.s.write(serial.XOFF)
time.sleep(0.5) # some systems need a little delay so that they can react on XOFF
t1 = time.time()
- self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*200))
+ self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, b"timeout please"*200)
t2 = time.time()
self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1))