diff options
Diffstat (limited to 'pyserial/examples')
-rw-r--r-- | pyserial/examples/enhancedserial.py | 62 | ||||
-rw-r--r-- | pyserial/examples/miniterm.py | 135 | ||||
-rw-r--r-- | pyserial/examples/scan.py | 28 | ||||
-rw-r--r-- | pyserial/examples/tcp_serial_redirect.py | 119 | ||||
-rw-r--r-- | pyserial/examples/test.py | 159 |
5 files changed, 0 insertions, 503 deletions
diff --git a/pyserial/examples/enhancedserial.py b/pyserial/examples/enhancedserial.py deleted file mode 100644 index 2c81ae1..0000000 --- a/pyserial/examples/enhancedserial.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python -"""Enhanced Serial Port class -part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net - -another implementation of the readline and readlines method. -this one should be more efficient because a bunch of characters are read -on each access, but the drawback is that a timeout must be specified to -make it work (enforced by the class __init__). - -this class could be enhanced with a read_until() method and more -like found in the telnetlib. -""" - -from serial import Serial - -class EnhancedSerial(Serial): - def __init__(self, *args, **kwargs): - #ensure that a reasonable timeout is set - timeout = kwargs.get('timeout',0.1) - if timeout < 0.01: timeout = 0.1 - kwargs['timeout'] = timeout - Serial.__init__(self, *args, **kwargs) - self.buf = '' - - def readline(self, maxsize=None, timeout=1): - """maxsize is ignored, timeout in seconds is the max time that is way for a complete line""" - tries = 0 - while 1: - self.buf += self.read(512) - pos = self.buf.find('\n') - if pos >= 0: - line, self.buf = self.buf[:pos+1], self.buf[pos+1:] - return line - tries += 1 - if tries * self.timeout > timeout: - break - line, self.buf = self.buf, '' - return line - - def readlines(self, sizehint=None, timeout=1): - """read all lines that are available. abort after timout - when no more data arrives.""" - lines = [] - while 1: - line = self.readline(timeout=timeout) - if line: - lines.append(line) - if not line or line[-1:] != '\n': - break - return lines - -if __name__=='__main__': - #do some simple tests with a Loopback HW (see test.py for details) - PORT = 0 - #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) ) - s = EnhancedSerial(PORT) - #write out some test data lines - s.write('\n'.join("hello how are you".split())) - #and read them back - print s.readlines() - #this one should print an empty list - print s.readlines(timeout=0.4) diff --git a/pyserial/examples/miniterm.py b/pyserial/examples/miniterm.py deleted file mode 100644 index 43b4fa8..0000000 --- a/pyserial/examples/miniterm.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python - -#very simple serial terminal -#(C)2002 Chris Liechti >cliecht@gmx.net> - -#input characters are sent directly, received characters are displays as is -#baudrate and echo configuartion is done through globals - - -import sys, os, serial, threading, getopt -#EXITCHARCTER = '\x1b' #ESC -EXITCHARCTER = '\x04' #ctrl+d - -#first choosea platform dependant way to read single characters from the console -if os.name == 'nt': - import msvcrt - def getkey(): - while 1: - if echo: - z = msvcrt.getche() - else: - z = msvcrt.getch() - if z == '\0' or z == '\xe0': #functions keys - msvcrt.getch() - else: - return z - -elif os.name == 'posix': - #XXX: Untested code drrived from the Python FAQ.... - import termios, TERMIOS, sys, os - fd = sys.stdin.fileno() - old = termios.tcgetattr(fd) - new = termios.tcgetattr(fd) - new[3] = new[3] & ~TERMIOS.ICANON & ~TERMIOS.ECHO - new[6][TERMIOS.VMIN] = 1 - new[6][TERMIOS.VTIME] = 0 - termios.tcsetattr(fd, TERMIOS.TCSANOW, new) - s = '' # We'll save the characters typed and add them to the pool. - def getkey(): - c = os.read(fd, 1) - if echo: sys.stdout.write(c) - return c - def clenaup_console(): - termios.tcsetattr(fd, TERMIOS.TCSAFLUSH, old) - sys.exitfunc = clenaup_console #terminal modes have to be restored on exit... - -else: - raise "Sorry no implementation for your platform (%s) available." % sys.platform - - -def reader(): - """loop forever and copy serial->console""" - while 1: - sys.stdout.write(s.read()) - -def writer(): - """loop and copy console->serial until EOF character is found""" - while 1: - c = getkey() - if c == EXITCHARCTER: break #exit on esc - s.write(c) #send character - if convert_outgoing_cr and c == '\r': - s.write('\n') - if echo: sys.stdout.write('\n') - - -#print a short help message -def usage(): - print >>sys.stderr, """USAGE: %s [options] - Simple Terminal Programm for the serial port. - - options: - -p, --port=PORT: port, a number, defualt = 0 or a device name - -b, --baud=BAUD: baudrate, default 9600 - -r, --rtscts: enable RTS/CTS flow control (default off) - -x, --xonxoff: enable software flow control (default off) - -e, --echo: enable local echo (default off) - -c, --cr: disable CR -> CR+LF translation - - """ % sys.argv[0] - -if __name__ == '__main__': - #parse command line options - try: - opts, args = getopt.getopt(sys.argv[1:], - "hp:b:rxec", - ["help", "port=", "baud=", "rtscts", "xonxoff", "echo", "cr"]) - except getopt.GetoptError: - # print help information and exit: - usage() - sys.exit(2) - - port = 0 - baudrate = 9600 - echo = 0 - convert_outgoing_cr = 1 - rtscts = 0 - xonxoff = 0 - for o, a in opts: - if o in ("-h", "--help"): #help text - usage() - sys.exit() - elif o in ("-p", "--port"): #specified port - try: - port = int(a) - except ValueError: - port = a - elif o in ("-b", "--baud"): #specified baudrate - try: - baudrate = int(a) - except ValueError: - raise ValueError, "Baudrate must be a integer number" - elif o in ("-r", "--rtscts"): - rtscts = 1 - elif o in ("-x", "--xonxoff"): - xonxoff = 1 - elif o in ("-e", "--echo"): - echo = 1 - elif o in ("-c", "--cr"): - convert_outgoing_cr = 0 - - try: - s = serial.Serial(port, baudrate, rtscts=rtscts, xonxoff=xonxoff) - except: - print "could not open port" - sys.exit(1) - print "--- Miniterm --- type Ctrl-D to quit" - #start serial->console thread - r = threading.Thread(target=reader) - r.setDaemon(1) - r.start() - #enter console->serial loop - writer() - - print "\n--- exit ---" diff --git a/pyserial/examples/scan.py b/pyserial/examples/scan.py deleted file mode 100644 index b34aba3..0000000 --- a/pyserial/examples/scan.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python -"""Scan for serial ports -part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net - -the scan function of this module tries to open each port number -from 0 to 255 and it builds a list of those ports where this was -successful. -""" - -from serial import Serial -from serial.serialutil import SerialException - -def scan(): - """scan for available ports. return a list of tuples (num, name)""" - available = [] - for i in range(256): - try: - s = Serial(i) - available.append( (i, s.portstr)) - s.close() #explicit close 'cause of delayed GC in java - except SerialException: - pass - return available - -if __name__=='__main__': - print "Found ports:" - for n,s in scan(): - print "(%d) %s" % (n,s) diff --git a/pyserial/examples/tcp_serial_redirect.py b/pyserial/examples/tcp_serial_redirect.py deleted file mode 100644 index 0816ad8..0000000 --- a/pyserial/examples/tcp_serial_redirect.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python - -#(C)2002 Chris Liechti >cliecht@gmx.net> -#redirect data from a TCP/IP connection to a serial port and vice versa -#requires python 2.2 'cause socket.sendall is used - -#this program is a hack - do not use it as an example of clean -#threading programming! it's only an example for pyserial. - -import sys, os, serial, threading, getopt, socket, time - -def reader(): - """loop forever and copy serial->console""" - global connection - while 1: - try: - if connection: - connection.sendall(s.read(s.inWaiting())) - else: - time.sleep(0.2) #lower CPU usage... - except socket.error, msg: - print msg - if connection: connection.close() - connection = None - except: - pass - -def writer(): - """loop forever and copy console->serial""" - global connection - try: - while 1: - s.write(connection.recv(1024)) - except socket.error, msg: - print msg - - -#print a short help message -def usage(): - print >>sys.stderr, """USAGE: %s [options] - Simple Terminal Programm for the serial port. - - options: - -p, --port=PORT: serial port, a number, defualt = 0 or a device name - -b, --baud=BAUD: baudrate, default 9600 - -r, --rtscts: enable RTS/CTS flow control (default off) - -x, --xonxoff: enable software flow control (default off) - -P, --localport: TCP/IP port on which to run the server (default 7777) - """ % sys.argv[0] - -if __name__ == '__main__': - connection = None - - #parse command line options - try: - opts, args = getopt.getopt(sys.argv[1:], - "hp:b:rxec", - ["help", "port=", "baud=", "rtscts", "xonxoff", "echo", "cr"]) - except getopt.GetoptError: - # print help information and exit: - usage() - sys.exit(2) - - port = 0 - baudrate = 9600 - rtscts = 0 - xonxoff = 0 - localport = 7777 - for o, a in opts: - if o in ("-h", "--help"): #help text - usage() - sys.exit() - elif o in ("-p", "--port"): #specified port - try: - port = int(a) - except ValueError: - port = a - elif o in ("-b", "--baud"): #specified baudrate - try: - baudrate = int(a) - except ValueError: - raise ValueError, "Baudrate must be a integer number" - elif o in ("-r", "--rtscts"): - rtscts = 1 - elif o in ("-x", "--xonxoff"): - xonxoff = 1 - elif o in ("-P", "--localport"): - try: - localport = int(a) - except ValueError: - raise ValueError, "local port must be an integer number" - - print "--- TCP/IP to Serial redirector --- type Ctrl-C / BREAK to quit" - #start serial->tcp/ip thread - r = threading.Thread(target=reader) - r.setDaemon(1) - r.start() - - try: - s = serial.Serial(port, baudrate, rtscts=rtscts, xonxoff=xonxoff) - except: - print "could not open port" - sys.exit(1) - - srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - srv.bind( ('', localport) ) - srv.listen(1) - while 1: - try: - connection, addr = srv.accept() - print 'Connected by', addr - #enter console->serial loop - writer() - except socket.error, msg: - print msg - if connection: connection.close() - connection = None - - print "\n--- exit ---" diff --git a/pyserial/examples/test.py b/pyserial/examples/test.py deleted file mode 100644 index ec8d13a..0000000 --- a/pyserial/examples/test.py +++ /dev/null @@ -1,159 +0,0 @@ -#!/usr/bin/env python -"""Some Tests for the serial module. -part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net - -intended to be run on different platforms, to ensure portability of -the code. - -for all these tests a simple hardware is required. -Loopback HW adapter: -shortcut these pin pairs: - TX <-> RX - RTS <-> CTS - DTR <-> DSR - -on a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) - -""" - -import unittest, threading, time -import serial - -#of which port should the tests be performed: -PORT=0 - - -class Test4_Nonblocking(unittest.TestCase): - """Test with timeouts""" - timeout=0 - def setUp(self): - self.s = serial.Serial(PORT,timeout=self.timeout) - def tearDown(self): - self.s.close() - - def test0_Messy(self): - """NonBlocking (timeout=0)""" - #this is only here to write out the message in verbose mode - #because Test3 and Test4 print the same messages - - def test1_ReadEmpty(self): - """timeout: After port open, the input buffer must be empty""" - self.failUnless(self.s.read(1)=='', "expected empty buffer") - def test2_Loopback(self): - """timeout: each sent character should return (binary test). - this is also a test for the binary capability of a port.""" - for c in map(chr,range(256)): - self.s.write(c) - time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32) - self.failUnless(self.s.inWaiting()==1, "expected exactly one character for inWainting()") - self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c) - self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read") - def test2_LoopbackTimeout(self): - """timeout: test the timeout/immediate return. - partial results should be returned.""" - self.s.write("HELLO") - time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32) - #read more characters as are available to run in the timeout - self.failUnless(self.s.read(10)=='HELLO', "expected an 'HELLO' which was written before") - self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read") - - -class Test3_Timeout(Test4_Nonblocking): - """Same tests as the NonBlocking ones but this time with timeout""" - timeout=1 - def test0_Messy(self): - """Blocking (timeout=1)""" - #this is only here to write out the message in verbose mode - #because Test3 and Test4 print the same messages - -class SendEvent(threading.Thread): - def __init__(self, serial, delay=1): - threading.Thread.__init__(self) - self.serial = serial - self.delay = delay - self.x = threading.Event() - self.stopped = 0 - self.start() - def run(self): - time.sleep(self.delay) - if not self.stopped: - self.serial.write("E") - self.x.set() - def isSet(self): - return self.x.isSet() - def stop(self): - self.stopped = 1 - self.x.wait() - -class Test1_Forever(unittest.TestCase): - """Tests a port with no timeout. These tests require that a - character is sent after some time to stop the test, this is done - through the SendEvent class and the Loopback HW.""" - def setUp(self): - self.s = serial.Serial(PORT,timeout=None) - self.event = SendEvent(self.s) - def tearDown(self): - self.event.stop() - self.s.close() - - def test2_ReadEmpty(self): - """no timeout: after port open, the input buffer must be empty (read). - a character is sent after some time to terminate the test (SendEvent).""" - c = self.s.read(1) - if not (self.event.isSet() and c =='E'): - self.fail("expected marker") - -class Test2_Forever(unittest.TestCase): - """Tests a port with no timeout""" - def setUp(self): - self.s = serial.Serial(PORT,timeout=None) - def tearDown(self): - self.s.close() - - def test1_inWaitingEmpty(self): - """no timeout: after port open, the input buffer must be empty (inWaiting)""" - self.failUnless(self.s.inWaiting()==0, "expected empty buffer") - - def test2_Loopback(self): - """no timeout: each sent character should return (binary test). - this is also a test for the binary capability of a port.""" - for c in map(chr,range(256)): - self.s.write(c) - time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32) - self.failUnless(self.s.inWaiting()==1, "expected exactly one character for inWainting()") - self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c) - self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read") - - -class Test0_DataWires(unittest.TestCase): - """Test modem control lines""" - def setUp(self): - self.s = serial.Serial(PORT) - def tearDown(self): - self.s.close() - - def test1_RTS(self): - """Test RTS/CTS""" - self.s.setRTS(0) - self.failUnless(self.s.getCTS()==0, "CTS -> 0") - self.s.setRTS(1) - self.failUnless(self.s.getCTS()==1, "CTS -> 1") - - def test2_DTR(self): - """Test DTR/DSR""" - self.s.setDTR(0) - self.failUnless(self.s.getDSR()==0, "DSR -> 0") - self.s.setDTR(1) - self.failUnless(self.s.getDSR()==1, "DSR -> 1") - - def test3_RI(self): - """Test RI""" - self.failUnless(self.s.getRI()==0, "RI -> 0") - -if __name__ == '__main__': - import sys - print __doc__ - print "testing port", PORT - sys.argv.append('-v') - # When this module is executed from the command-line, run all its tests - unittest.main() |