summaryrefslogtreecommitdiff
path: root/examples/enhancedserial.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/enhancedserial.py')
-rw-r--r--examples/enhancedserial.py62
1 files changed, 62 insertions, 0 deletions
diff --git a/examples/enhancedserial.py b/examples/enhancedserial.py
new file mode 100644
index 0000000..2c81ae1
--- /dev/null
+++ b/examples/enhancedserial.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+"""Enhanced Serial Port class
+part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
+
+another implementation of the readline and readlines method.
+this one should be more efficient because a bunch of characters are read
+on each access, but the drawback is that a timeout must be specified to
+make it work (enforced by the class __init__).
+
+this class could be enhanced with a read_until() method and more
+like found in the telnetlib.
+"""
+
+from serial import Serial
+
+class EnhancedSerial(Serial):
+ def __init__(self, *args, **kwargs):
+ #ensure that a reasonable timeout is set
+ timeout = kwargs.get('timeout',0.1)
+ if timeout < 0.01: timeout = 0.1
+ kwargs['timeout'] = timeout
+ Serial.__init__(self, *args, **kwargs)
+ self.buf = ''
+
+ def readline(self, maxsize=None, timeout=1):
+ """maxsize is ignored, timeout in seconds is the max time that is way for a complete line"""
+ tries = 0
+ while 1:
+ self.buf += self.read(512)
+ pos = self.buf.find('\n')
+ if pos >= 0:
+ line, self.buf = self.buf[:pos+1], self.buf[pos+1:]
+ return line
+ tries += 1
+ if tries * self.timeout > timeout:
+ break
+ line, self.buf = self.buf, ''
+ return line
+
+ def readlines(self, sizehint=None, timeout=1):
+ """read all lines that are available. abort after timout
+ when no more data arrives."""
+ lines = []
+ while 1:
+ line = self.readline(timeout=timeout)
+ if line:
+ lines.append(line)
+ if not line or line[-1:] != '\n':
+ break
+ return lines
+
+if __name__=='__main__':
+ #do some simple tests with a Loopback HW (see test.py for details)
+ PORT = 0
+ #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) )
+ s = EnhancedSerial(PORT)
+ #write out some test data lines
+ s.write('\n'.join("hello how are you".split()))
+ #and read them back
+ print s.readlines()
+ #this one should print an empty list
+ print s.readlines(timeout=0.4)