diff options
author | Chris Liechti <cliechti@gmx.net> | 2015-09-18 21:23:42 +0200 |
---|---|---|
committer | Chris Liechti <cliechti@gmx.net> | 2015-09-18 21:23:42 +0200 |
commit | 6ed12e01f0bfaa53ea96ee5b9ec06a3414ed56e2 (patch) | |
tree | ee9349510eb44b430175f12d1137c2750a51765f | |
parent | 6d0b952003d012de4d3c5e5166b1b4c0940e29f4 (diff) | |
download | pyserial-git-6ed12e01f0bfaa53ea96ee5b9ec06a3414ed56e2.tar.gz |
cli: update implementation
- update test to use byte strings (IronPython 2.7 makes str == unicode so the previous data conversion function can't do its job)
-rw-r--r-- | serial/serialcli.py | 118 | ||||
-rw-r--r-- | test/test.py | 27 |
2 files changed, 70 insertions, 75 deletions
diff --git a/serial/serialcli.py b/serial/serialcli.py index 622f737..02605bc 100644 --- a/serial/serialcli.py +++ b/serial/serialcli.py @@ -13,9 +13,9 @@ import System.IO.Ports from serial.serialutil import * -def device(portnum): - """Turn a port number into a device name""" - return System.IO.Ports.SerialPort.GetPortNames()[portnum] +#~ def device(portnum): + #~ """Turn a port number into a device name""" + #~ return System.IO.Ports.SerialPort.GetPortNames()[portnum] # must invoke function with byte array, make a helper to convert strings @@ -37,22 +37,22 @@ class Serial(SerialBase): """ if self._port is None: raise SerialException("Port must be configured before it can be used.") - if self._isOpen: + if self.is_open: raise SerialException("Port is already open.") try: self._port_handle = System.IO.Ports.SerialPort(self.portstr) - except Exception, msg: + except Exception as msg: self._port_handle = None raise SerialException("could not open port %s: %s" % (self.portstr, msg)) self._reconfigurePort() self._port_handle.Open() - self._isOpen = True + self.is_open = True + if not self._dsrdtr: + self._update_dtr_state() if not self._rtscts: - self.setRTS(True) - self.setDTR(True) - self.flushInput() - self.flushOutput() + self._update_rts_state() + self.reset_input_buffer() def _reconfigurePort(self): """Set communication parameters on opened port.""" @@ -69,16 +69,16 @@ class Serial(SerialBase): # if self._timeout != 0 and self._interCharTimeout is not None: # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:] - if self._writeTimeout is None: + if self._write_timeout is None: self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout else: - self._port_handle.WriteTimeout = int(self._writeTimeout*1000) + self._port_handle.WriteTimeout = int(self._write_timeout*1000) # Setup the connection info. try: self._port_handle.BaudRate = self._baudrate - except IOError, e: + except IOError as e: # catch errors from illegal baudrate settings raise ValueError(str(e)) @@ -129,7 +129,7 @@ class Serial(SerialBase): def close(self): """Close port""" - if self._isOpen: + if self.is_open: if self._port_handle: try: self._port_handle.Close() @@ -137,19 +137,15 @@ class Serial(SerialBase): # ignore errors. can happen for unplugged USB serial devices pass self._port_handle = None - self._isOpen = False - - def makeDeviceName(self, port): - try: - return device(port) - except TypeError, e: - raise SerialException(str(e)) + self.is_open = False # - - - - - - - - - - - - - - - - - - - - - - - - - def inWaiting(self): + @property + def in_waiting(self): """Return the number of characters currently in the input buffer.""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError return self._port_handle.BytesToRead def read(self, size=1): @@ -158,14 +154,15 @@ class Serial(SerialBase): return less characters as requested. With no timeout it will block until the requested number of bytes is read. """ - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError # must use single byte reads as this is the only way to read # without applying encodings data = bytearray() while size: try: data.append(self._port_handle.ReadByte()) - except System.TimeoutException, e: + except System.TimeoutException as e: break else: size -= 1 @@ -173,77 +170,80 @@ class Serial(SerialBase): def write(self, data): """Output the given string over the serial port.""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError #~ if not isinstance(data, (bytes, bytearray)): #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) try: # must call overloaded method with byte array argument # as this is the only one not applying encodings self._port_handle.Write(as_byte_array(data), 0, len(data)) - except System.TimeoutException, e: + except System.TimeoutException as e: raise writeTimeoutError return len(data) - def flushInput(self): + def reset_input_buffer(self): """Clear input buffer, discarding all that is in the buffer.""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError self._port_handle.DiscardInBuffer() - def flushOutput(self): + def reset_output_buffer(self): """\ Clear output buffer, aborting the current output and discarding all that is in the buffer. """ - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError self._port_handle.DiscardOutBuffer() - def sendBreak(self, duration=0.25): - """\ - Send break condition. Timed, returns to idle state after given - duration. - """ - if not self._port_handle: raise portNotOpenError - import time - self._port_handle.BreakState = True - time.sleep(duration) - self._port_handle.BreakState = False - - def setBreak(self, level=True): + def _update_break_state(self): """ Set break: Controls TXD. When active, to transmitting is possible. """ - if not self._port_handle: raise portNotOpenError - self._port_handle.BreakState = bool(level) + if not self._port_handle: + raise portNotOpenError + self._port_handle.BreakState = bool(self._break_state) - def setRTS(self, level=True): + def _update_rts_state(self): """Set terminal status line: Request To Send""" - if not self._port_handle: raise portNotOpenError - self._port_handle.RtsEnable = bool(level) + if not self._port_handle: + raise portNotOpenError + self._port_handle.RtsEnable = bool(self._rts_state) - def setDTR(self, level=True): + def _update_dtr_state(self): """Set terminal status line: Data Terminal Ready""" - if not self._port_handle: raise portNotOpenError - self._port_handle.DtrEnable = bool(level) + if not self._port_handle: + raise portNotOpenError + self._port_handle.DtrEnable = bool(self._dtr_state) - def getCTS(self): + @property + def cts(self): """Read terminal status line: Clear To Send""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError return self._port_handle.CtsHolding - def getDSR(self): + @property + def dsr(self): """Read terminal status line: Data Set Ready""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError return self._port_handle.DsrHolding - def getRI(self): + @property + def ri(self): """Read terminal status line: Ring Indicator""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError #~ return self._port_handle.XXX return False #XXX an error would be better - def getCD(self): + @property + def cd(self): """Read terminal status line: Carrier Detect""" - if not self._port_handle: raise portNotOpenError + if not self._port_handle: + raise portNotOpenError return self._port_handle.CDHolding # - - platform specific - - - - diff --git a/test/test.py b/test/test.py index 440a7e4..ea28650 100644 --- a/test/test.py +++ b/test/test.py @@ -29,15 +29,10 @@ import sys import serial # on which port should the tests be performed: -PORT = 0 +PORT = 'loop://' -if sys.version_info >= (3, 0): - def data(string): - return bytes(string, 'latin1') - bytes_0to255 = bytes(range(256)) -else: - def data(string): return string - bytes_0to255 = ''.join([chr(x) for x in range(256)]) +# indirection via bytearray b/c bytes(range(256)) does something else in Pyhton 2.7 +bytes_0to255 = bytes(bytearray(range(256))) def segments(data, size=16): @@ -62,7 +57,7 @@ class Test4_Nonblocking(unittest.TestCase): def test1_ReadEmpty(self): """timeout: After port open, the input buffer must be empty""" - self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer") + self.failUnlessEqual(self.s.read(1), b'', "expected empty buffer") def test2_Loopback(self): """timeout: each sent character should return (binary test). @@ -74,16 +69,16 @@ class Test4_Nonblocking(unittest.TestCase): time.sleep(0.05) self.failUnlessEqual(self.s.in_waiting, length, "expected exactly %d character for inWainting()" % length) self.failUnlessEqual(self.s.read(length), block)#, "expected a %r which was written before" % block) - self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read") + self.failUnlessEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read") def test2_LoopbackTimeout(self): """timeout: test the timeout/immediate return. partial results should be returned.""" - self.s.write(data("HELLO")) + self.s.write(b"HELLO") time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32 and rfc2217) # read more characters as are available to run in the timeout - self.failUnlessEqual(self.s.read(10), data('HELLO'), "expected the 'HELLO' which was written before") - self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read") + self.failUnlessEqual(self.s.read(10), b'HELLO', "expected the 'HELLO' which was written before") + self.failUnlessEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read") class Test3_Timeout(Test4_Nonblocking): @@ -109,7 +104,7 @@ class SendEvent(threading.Thread): time.sleep(self.delay) self.x.set() if not self.stopped: - self.serial.write(data("E")) + self.serial.write(b"E") self.serial.flush() def isSet(self): @@ -135,7 +130,7 @@ class Test1_Forever(unittest.TestCase): """no timeout: after port open, the input buffer must be empty (read). a character is sent after some time to terminate the test (SendEvent).""" c = self.s.read(1) - if not (self.event.isSet() and c == data('E')): + if not (self.event.isSet() and c == b'E'): self.fail("expected marker (evt=%r, c=%r)" % (self.event.isSet(), c)) @@ -221,7 +216,7 @@ class Test_MoreTimeouts(unittest.TestCase): self.s.write(serial.XOFF) time.sleep(0.5) # some systems need a little delay so that they can react on XOFF t1 = time.time() - self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*200)) + self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, b"timeout please"*200) t2 = time.time() self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1)) |