diff options
author | cliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a> | 2002-06-04 22:46:50 +0000 |
---|---|---|
committer | cliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a> | 2002-06-04 22:46:50 +0000 |
commit | 1e95baa3c8168a32ced63f10a73e6162bd360c47 (patch) | |
tree | f767c8b16c6d59d5294da2f9a6fc13eb11ce8463 | |
parent | c813b21216cffa615008a0a9c32dcf9dbcc5fb6c (diff) | |
download | pyserial-git-1e95baa3c8168a32ced63f10a73e6162bd360c47.tar.gz |
added another implementation of readline (enhancedserial)
added test ofport without timout
other fixes, typos
-rw-r--r-- | pyserial/examples/enhancedserial.py | 62 | ||||
-rw-r--r-- | pyserial/examples/scan.py | 10 | ||||
-rw-r--r-- | pyserial/examples/test.py | 110 |
3 files changed, 157 insertions, 25 deletions
diff --git a/pyserial/examples/enhancedserial.py b/pyserial/examples/enhancedserial.py new file mode 100644 index 0000000..2c81ae1 --- /dev/null +++ b/pyserial/examples/enhancedserial.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +"""Enhanced Serial Port class +part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net + +another implementation of the readline and readlines method. +this one should be more efficient because a bunch of characters are read +on each access, but the drawback is that a timeout must be specified to +make it work (enforced by the class __init__). + +this class could be enhanced with a read_until() method and more +like found in the telnetlib. +""" + +from serial import Serial + +class EnhancedSerial(Serial): + def __init__(self, *args, **kwargs): + #ensure that a reasonable timeout is set + timeout = kwargs.get('timeout',0.1) + if timeout < 0.01: timeout = 0.1 + kwargs['timeout'] = timeout + Serial.__init__(self, *args, **kwargs) + self.buf = '' + + def readline(self, maxsize=None, timeout=1): + """maxsize is ignored, timeout in seconds is the max time that is way for a complete line""" + tries = 0 + while 1: + self.buf += self.read(512) + pos = self.buf.find('\n') + if pos >= 0: + line, self.buf = self.buf[:pos+1], self.buf[pos+1:] + return line + tries += 1 + if tries * self.timeout > timeout: + break + line, self.buf = self.buf, '' + return line + + def readlines(self, sizehint=None, timeout=1): + """read all lines that are available. abort after timout + when no more data arrives.""" + lines = [] + while 1: + line = self.readline(timeout=timeout) + if line: + lines.append(line) + if not line or line[-1:] != '\n': + break + return lines + +if __name__=='__main__': + #do some simple tests with a Loopback HW (see test.py for details) + PORT = 0 + #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) ) + s = EnhancedSerial(PORT) + #write out some test data lines + s.write('\n'.join("hello how are you".split())) + #and read them back + print s.readlines() + #this one should print an empty list + print s.readlines(timeout=0.4) diff --git a/pyserial/examples/scan.py b/pyserial/examples/scan.py index 8be17f7..b34aba3 100644 --- a/pyserial/examples/scan.py +++ b/pyserial/examples/scan.py @@ -1,4 +1,12 @@ #!/usr/bin/env python +"""Scan for serial ports +part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net + +the scan function of this module tries to open each port number +from 0 to 255 and it builds a list of those ports where this was +successful. +""" + from serial import Serial from serial.serialutil import SerialException @@ -17,4 +25,4 @@ def scan(): if __name__=='__main__': print "Found ports:" for n,s in scan(): - print "(%d) %s" % (n,s)
\ No newline at end of file + print "(%d) %s" % (n,s) diff --git a/pyserial/examples/test.py b/pyserial/examples/test.py index 0c85ba0..6e8ea8e 100644 --- a/pyserial/examples/test.py +++ b/pyserial/examples/test.py @@ -1,26 +1,33 @@ #!/usr/bin/env python -import unittest -import serial, time +"""Some Tests for the serial module. +part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net + +intended to be run on different platforms, to ensure portability of +the code. + +for all these tests a simple hardware is required. +Loopback HW adapter: +shortcut these pin pairs on a 9 pole DSUB: (2-3) (4-6) (7-8) + + TX -\ + RX -/ + + RTS -\ + CTS -/ + + DTR -\ + DSR -/ + + GND + RI +""" + +import unittest, threading, time +import serial #of which port should the tests be performed: PORT=0 -#for all these tests a simple hardware is requires. -#Loopback adapter: -#shortcut these pin pairs on a 9 pole DSUB: (2-3) (4-6) (7-8) -# -# TX -\ -# RX -/ -# -# RTS -\ -# CTS -/ -# -# DTR -\ -# DSR -/ -# -# GND -# RI -# class TestNonblocking(unittest.TestCase): """Test with timeouts""" @@ -34,18 +41,19 @@ class TestNonblocking(unittest.TestCase): """After port open, the input buffer must be empty""" self.failUnless(self.s.read(1)=='', "expected empty buffer") def test2_Loopback(self): - """With the Loopback HW, heach sent character should return""" + """With the Loopback HW, each sent character should return. + this is also a test for the binary capability of a port.""" for c in map(chr,range(256)): self.s.write(c) - time.sleep(0.02) #there migh be a small delay until the character is ready + time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32) self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c) self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read") def test2_LoopbackTimeout(self): """test the timeout/immediate return, and that partial results are returned""" self.s.write("HELLO") - time.sleep(0.02) #there migh be a small delay until the character is ready + time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32) #read more characters as are available to run in the timeout - self.failUnless(self.s.read(10)=='HELLO', "expected an 'HELLO' wich was written before") + self.failUnless(self.s.read(10)=='HELLO', "expected an 'HELLO' which was written before") self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read") @@ -53,8 +61,62 @@ class TestTimeout(TestNonblocking): """Same tests as the NonBlocking ones but this time with timeout""" timeout=1 -class TestForever(unittest.TestCase): - """Tests for a posrt with no timeout""" +class SendEvent(threading.Thread): + def __init__(self, serial, delay=1): + threading.Thread.__init__(self) + self.serial = serial + self.delay = delay + self.x = threading.Event() + self.stopped = 0 + self.start() + def run(self): + time.sleep(self.delay) + if not self.stopped: + self.serial.write("E") + self.x.set() + def isSet(self): + return self.x.isSet() + def stop(self): + self.stopped = 1 + self.x.wait() + +class Test1_Forever(unittest.TestCase): + """Tests a port with no timeout. These tests require that a + character is sent after some time to stop the test, this is done + through the SendEvent class and the Loopback HW.""" + def setUp(self): + self.s = serial.Serial(PORT,timeout=None) + self.event = SendEvent(self.s) + def tearDown(self): + self.event.stop() + self.s.close() + + def test2_ReadEmpty(self): + """After port open, the input buffer must be empty. a character is + sent after some time to terminate the test (SendEvent).""" + c = self.s.read(1) + if not (self.event.isSet() and c =='E'): + self.fail("expected marker") + +class Test2_Forever(unittest.TestCase): + """Tests a port with no timeout""" + def setUp(self): + self.s = serial.Serial(PORT,timeout=None) + def tearDown(self): + self.s.close() + + def test1_inWaitingEmpty(self): + """After port open, the input buffer must be empty""" + self.failUnless(self.s.inWaiting()==0, "expected empty buffer") + + def test2_Loopback(self): + """With the Loopback HW, each sent character should return. + this is also a test for the binary capability of a port.""" + for c in map(chr,range(256)): + self.s.write(c) + time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32) + self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c) + self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read") class TestDataWires(unittest.TestCase): |