summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a>2002-06-04 22:46:50 +0000
committercliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a>2002-06-04 22:46:50 +0000
commit1e95baa3c8168a32ced63f10a73e6162bd360c47 (patch)
treef767c8b16c6d59d5294da2f9a6fc13eb11ce8463
parentc813b21216cffa615008a0a9c32dcf9dbcc5fb6c (diff)
downloadpyserial-git-1e95baa3c8168a32ced63f10a73e6162bd360c47.tar.gz
added another implementation of readline (enhancedserial)
added test ofport without timout other fixes, typos
-rw-r--r--pyserial/examples/enhancedserial.py62
-rw-r--r--pyserial/examples/scan.py10
-rw-r--r--pyserial/examples/test.py110
3 files changed, 157 insertions, 25 deletions
diff --git a/pyserial/examples/enhancedserial.py b/pyserial/examples/enhancedserial.py
new file mode 100644
index 0000000..2c81ae1
--- /dev/null
+++ b/pyserial/examples/enhancedserial.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+"""Enhanced Serial Port class
+part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
+
+another implementation of the readline and readlines method.
+this one should be more efficient because a bunch of characters are read
+on each access, but the drawback is that a timeout must be specified to
+make it work (enforced by the class __init__).
+
+this class could be enhanced with a read_until() method and more
+like found in the telnetlib.
+"""
+
+from serial import Serial
+
+class EnhancedSerial(Serial):
+ def __init__(self, *args, **kwargs):
+ #ensure that a reasonable timeout is set
+ timeout = kwargs.get('timeout',0.1)
+ if timeout < 0.01: timeout = 0.1
+ kwargs['timeout'] = timeout
+ Serial.__init__(self, *args, **kwargs)
+ self.buf = ''
+
+ def readline(self, maxsize=None, timeout=1):
+ """maxsize is ignored, timeout in seconds is the max time that is way for a complete line"""
+ tries = 0
+ while 1:
+ self.buf += self.read(512)
+ pos = self.buf.find('\n')
+ if pos >= 0:
+ line, self.buf = self.buf[:pos+1], self.buf[pos+1:]
+ return line
+ tries += 1
+ if tries * self.timeout > timeout:
+ break
+ line, self.buf = self.buf, ''
+ return line
+
+ def readlines(self, sizehint=None, timeout=1):
+ """read all lines that are available. abort after timout
+ when no more data arrives."""
+ lines = []
+ while 1:
+ line = self.readline(timeout=timeout)
+ if line:
+ lines.append(line)
+ if not line or line[-1:] != '\n':
+ break
+ return lines
+
+if __name__=='__main__':
+ #do some simple tests with a Loopback HW (see test.py for details)
+ PORT = 0
+ #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) )
+ s = EnhancedSerial(PORT)
+ #write out some test data lines
+ s.write('\n'.join("hello how are you".split()))
+ #and read them back
+ print s.readlines()
+ #this one should print an empty list
+ print s.readlines(timeout=0.4)
diff --git a/pyserial/examples/scan.py b/pyserial/examples/scan.py
index 8be17f7..b34aba3 100644
--- a/pyserial/examples/scan.py
+++ b/pyserial/examples/scan.py
@@ -1,4 +1,12 @@
#!/usr/bin/env python
+"""Scan for serial ports
+part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
+
+the scan function of this module tries to open each port number
+from 0 to 255 and it builds a list of those ports where this was
+successful.
+"""
+
from serial import Serial
from serial.serialutil import SerialException
@@ -17,4 +25,4 @@ def scan():
if __name__=='__main__':
print "Found ports:"
for n,s in scan():
- print "(%d) %s" % (n,s) \ No newline at end of file
+ print "(%d) %s" % (n,s)
diff --git a/pyserial/examples/test.py b/pyserial/examples/test.py
index 0c85ba0..6e8ea8e 100644
--- a/pyserial/examples/test.py
+++ b/pyserial/examples/test.py
@@ -1,26 +1,33 @@
#!/usr/bin/env python
-import unittest
-import serial, time
+"""Some Tests for the serial module.
+part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
+
+intended to be run on different platforms, to ensure portability of
+the code.
+
+for all these tests a simple hardware is required.
+Loopback HW adapter:
+shortcut these pin pairs on a 9 pole DSUB: (2-3) (4-6) (7-8)
+
+ TX -\
+ RX -/
+
+ RTS -\
+ CTS -/
+
+ DTR -\
+ DSR -/
+
+ GND
+ RI
+"""
+
+import unittest, threading, time
+import serial
#of which port should the tests be performed:
PORT=0
-#for all these tests a simple hardware is requires.
-#Loopback adapter:
-#shortcut these pin pairs on a 9 pole DSUB: (2-3) (4-6) (7-8)
-#
-# TX -\
-# RX -/
-#
-# RTS -\
-# CTS -/
-#
-# DTR -\
-# DSR -/
-#
-# GND
-# RI
-#
class TestNonblocking(unittest.TestCase):
"""Test with timeouts"""
@@ -34,18 +41,19 @@ class TestNonblocking(unittest.TestCase):
"""After port open, the input buffer must be empty"""
self.failUnless(self.s.read(1)=='', "expected empty buffer")
def test2_Loopback(self):
- """With the Loopback HW, heach sent character should return"""
+ """With the Loopback HW, each sent character should return.
+ this is also a test for the binary capability of a port."""
for c in map(chr,range(256)):
self.s.write(c)
- time.sleep(0.02) #there migh be a small delay until the character is ready
+ time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32)
self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c)
self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read")
def test2_LoopbackTimeout(self):
"""test the timeout/immediate return, and that partial results are returned"""
self.s.write("HELLO")
- time.sleep(0.02) #there migh be a small delay until the character is ready
+ time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32)
#read more characters as are available to run in the timeout
- self.failUnless(self.s.read(10)=='HELLO', "expected an 'HELLO' wich was written before")
+ self.failUnless(self.s.read(10)=='HELLO', "expected an 'HELLO' which was written before")
self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read")
@@ -53,8 +61,62 @@ class TestTimeout(TestNonblocking):
"""Same tests as the NonBlocking ones but this time with timeout"""
timeout=1
-class TestForever(unittest.TestCase):
- """Tests for a posrt with no timeout"""
+class SendEvent(threading.Thread):
+ def __init__(self, serial, delay=1):
+ threading.Thread.__init__(self)
+ self.serial = serial
+ self.delay = delay
+ self.x = threading.Event()
+ self.stopped = 0
+ self.start()
+ def run(self):
+ time.sleep(self.delay)
+ if not self.stopped:
+ self.serial.write("E")
+ self.x.set()
+ def isSet(self):
+ return self.x.isSet()
+ def stop(self):
+ self.stopped = 1
+ self.x.wait()
+
+class Test1_Forever(unittest.TestCase):
+ """Tests a port with no timeout. These tests require that a
+ character is sent after some time to stop the test, this is done
+ through the SendEvent class and the Loopback HW."""
+ def setUp(self):
+ self.s = serial.Serial(PORT,timeout=None)
+ self.event = SendEvent(self.s)
+ def tearDown(self):
+ self.event.stop()
+ self.s.close()
+
+ def test2_ReadEmpty(self):
+ """After port open, the input buffer must be empty. a character is
+ sent after some time to terminate the test (SendEvent)."""
+ c = self.s.read(1)
+ if not (self.event.isSet() and c =='E'):
+ self.fail("expected marker")
+
+class Test2_Forever(unittest.TestCase):
+ """Tests a port with no timeout"""
+ def setUp(self):
+ self.s = serial.Serial(PORT,timeout=None)
+ def tearDown(self):
+ self.s.close()
+
+ def test1_inWaitingEmpty(self):
+ """After port open, the input buffer must be empty"""
+ self.failUnless(self.s.inWaiting()==0, "expected empty buffer")
+
+ def test2_Loopback(self):
+ """With the Loopback HW, each sent character should return.
+ this is also a test for the binary capability of a port."""
+ for c in map(chr,range(256)):
+ self.s.write(c)
+ time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32)
+ self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c)
+ self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read")
class TestDataWires(unittest.TestCase):