diff options
author | cliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a> | 2002-06-04 22:46:50 +0000 |
---|---|---|
committer | cliechti <cliechti@f19166aa-fa4f-0410-85c2-fa1106f25c8a> | 2002-06-04 22:46:50 +0000 |
commit | 7b88c5d4895cc67bf1f9f6518e3bb6cfd1a7ce8b (patch) | |
tree | 1443de1b28e35e2ee2f06412c54db3cad7949050 /examples/enhancedserial.py | |
parent | c5eb604b41ea55c3f8ba02aa00e8f4a9f54a7aa2 (diff) | |
download | pyserial-7b88c5d4895cc67bf1f9f6518e3bb6cfd1a7ce8b.tar.gz |
added another implementation of readline (enhancedserial)
added test ofport without timout
other fixes, typos
git-svn-id: http://svn.code.sf.net/p/pyserial/code/trunk/pyserial@38 f19166aa-fa4f-0410-85c2-fa1106f25c8a
Diffstat (limited to 'examples/enhancedserial.py')
-rw-r--r-- | examples/enhancedserial.py | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/examples/enhancedserial.py b/examples/enhancedserial.py new file mode 100644 index 0000000..2c81ae1 --- /dev/null +++ b/examples/enhancedserial.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +"""Enhanced Serial Port class +part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net + +another implementation of the readline and readlines method. +this one should be more efficient because a bunch of characters are read +on each access, but the drawback is that a timeout must be specified to +make it work (enforced by the class __init__). + +this class could be enhanced with a read_until() method and more +like found in the telnetlib. +""" + +from serial import Serial + +class EnhancedSerial(Serial): + def __init__(self, *args, **kwargs): + #ensure that a reasonable timeout is set + timeout = kwargs.get('timeout',0.1) + if timeout < 0.01: timeout = 0.1 + kwargs['timeout'] = timeout + Serial.__init__(self, *args, **kwargs) + self.buf = '' + + def readline(self, maxsize=None, timeout=1): + """maxsize is ignored, timeout in seconds is the max time that is way for a complete line""" + tries = 0 + while 1: + self.buf += self.read(512) + pos = self.buf.find('\n') + if pos >= 0: + line, self.buf = self.buf[:pos+1], self.buf[pos+1:] + return line + tries += 1 + if tries * self.timeout > timeout: + break + line, self.buf = self.buf, '' + return line + + def readlines(self, sizehint=None, timeout=1): + """read all lines that are available. abort after timout + when no more data arrives.""" + lines = [] + while 1: + line = self.readline(timeout=timeout) + if line: + lines.append(line) + if not line or line[-1:] != '\n': + break + return lines + +if __name__=='__main__': + #do some simple tests with a Loopback HW (see test.py for details) + PORT = 0 + #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) ) + s = EnhancedSerial(PORT) + #write out some test data lines + s.write('\n'.join("hello how are you".split())) + #and read them back + print s.readlines() + #this one should print an empty list + print s.readlines(timeout=0.4) |