summaryrefslogtreecommitdiff
path: root/pyserial/examples
diff options
context:
space:
mode:
Diffstat (limited to 'pyserial/examples')
-rw-r--r--pyserial/examples/enhancedserial.py62
-rw-r--r--pyserial/examples/miniterm.py135
-rw-r--r--pyserial/examples/scan.py28
-rw-r--r--pyserial/examples/tcp_serial_redirect.py119
-rw-r--r--pyserial/examples/test.py159
5 files changed, 0 insertions, 503 deletions
diff --git a/pyserial/examples/enhancedserial.py b/pyserial/examples/enhancedserial.py
deleted file mode 100644
index 2c81ae1..0000000
--- a/pyserial/examples/enhancedserial.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#!/usr/bin/env python
-"""Enhanced Serial Port class
-part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
-
-another implementation of the readline and readlines method.
-this one should be more efficient because a bunch of characters are read
-on each access, but the drawback is that a timeout must be specified to
-make it work (enforced by the class __init__).
-
-this class could be enhanced with a read_until() method and more
-like found in the telnetlib.
-"""
-
-from serial import Serial
-
-class EnhancedSerial(Serial):
- def __init__(self, *args, **kwargs):
- #ensure that a reasonable timeout is set
- timeout = kwargs.get('timeout',0.1)
- if timeout < 0.01: timeout = 0.1
- kwargs['timeout'] = timeout
- Serial.__init__(self, *args, **kwargs)
- self.buf = ''
-
- def readline(self, maxsize=None, timeout=1):
- """maxsize is ignored, timeout in seconds is the max time that is way for a complete line"""
- tries = 0
- while 1:
- self.buf += self.read(512)
- pos = self.buf.find('\n')
- if pos >= 0:
- line, self.buf = self.buf[:pos+1], self.buf[pos+1:]
- return line
- tries += 1
- if tries * self.timeout > timeout:
- break
- line, self.buf = self.buf, ''
- return line
-
- def readlines(self, sizehint=None, timeout=1):
- """read all lines that are available. abort after timout
- when no more data arrives."""
- lines = []
- while 1:
- line = self.readline(timeout=timeout)
- if line:
- lines.append(line)
- if not line or line[-1:] != '\n':
- break
- return lines
-
-if __name__=='__main__':
- #do some simple tests with a Loopback HW (see test.py for details)
- PORT = 0
- #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) )
- s = EnhancedSerial(PORT)
- #write out some test data lines
- s.write('\n'.join("hello how are you".split()))
- #and read them back
- print s.readlines()
- #this one should print an empty list
- print s.readlines(timeout=0.4)
diff --git a/pyserial/examples/miniterm.py b/pyserial/examples/miniterm.py
deleted file mode 100644
index 43b4fa8..0000000
--- a/pyserial/examples/miniterm.py
+++ /dev/null
@@ -1,135 +0,0 @@
-#!/usr/bin/env python
-
-#very simple serial terminal
-#(C)2002 Chris Liechti >cliecht@gmx.net>
-
-#input characters are sent directly, received characters are displays as is
-#baudrate and echo configuartion is done through globals
-
-
-import sys, os, serial, threading, getopt
-#EXITCHARCTER = '\x1b' #ESC
-EXITCHARCTER = '\x04' #ctrl+d
-
-#first choosea platform dependant way to read single characters from the console
-if os.name == 'nt':
- import msvcrt
- def getkey():
- while 1:
- if echo:
- z = msvcrt.getche()
- else:
- z = msvcrt.getch()
- if z == '\0' or z == '\xe0': #functions keys
- msvcrt.getch()
- else:
- return z
-
-elif os.name == 'posix':
- #XXX: Untested code drrived from the Python FAQ....
- import termios, TERMIOS, sys, os
- fd = sys.stdin.fileno()
- old = termios.tcgetattr(fd)
- new = termios.tcgetattr(fd)
- new[3] = new[3] & ~TERMIOS.ICANON & ~TERMIOS.ECHO
- new[6][TERMIOS.VMIN] = 1
- new[6][TERMIOS.VTIME] = 0
- termios.tcsetattr(fd, TERMIOS.TCSANOW, new)
- s = '' # We'll save the characters typed and add them to the pool.
- def getkey():
- c = os.read(fd, 1)
- if echo: sys.stdout.write(c)
- return c
- def clenaup_console():
- termios.tcsetattr(fd, TERMIOS.TCSAFLUSH, old)
- sys.exitfunc = clenaup_console #terminal modes have to be restored on exit...
-
-else:
- raise "Sorry no implementation for your platform (%s) available." % sys.platform
-
-
-def reader():
- """loop forever and copy serial->console"""
- while 1:
- sys.stdout.write(s.read())
-
-def writer():
- """loop and copy console->serial until EOF character is found"""
- while 1:
- c = getkey()
- if c == EXITCHARCTER: break #exit on esc
- s.write(c) #send character
- if convert_outgoing_cr and c == '\r':
- s.write('\n')
- if echo: sys.stdout.write('\n')
-
-
-#print a short help message
-def usage():
- print >>sys.stderr, """USAGE: %s [options]
- Simple Terminal Programm for the serial port.
-
- options:
- -p, --port=PORT: port, a number, defualt = 0 or a device name
- -b, --baud=BAUD: baudrate, default 9600
- -r, --rtscts: enable RTS/CTS flow control (default off)
- -x, --xonxoff: enable software flow control (default off)
- -e, --echo: enable local echo (default off)
- -c, --cr: disable CR -> CR+LF translation
-
- """ % sys.argv[0]
-
-if __name__ == '__main__':
- #parse command line options
- try:
- opts, args = getopt.getopt(sys.argv[1:],
- "hp:b:rxec",
- ["help", "port=", "baud=", "rtscts", "xonxoff", "echo", "cr"])
- except getopt.GetoptError:
- # print help information and exit:
- usage()
- sys.exit(2)
-
- port = 0
- baudrate = 9600
- echo = 0
- convert_outgoing_cr = 1
- rtscts = 0
- xonxoff = 0
- for o, a in opts:
- if o in ("-h", "--help"): #help text
- usage()
- sys.exit()
- elif o in ("-p", "--port"): #specified port
- try:
- port = int(a)
- except ValueError:
- port = a
- elif o in ("-b", "--baud"): #specified baudrate
- try:
- baudrate = int(a)
- except ValueError:
- raise ValueError, "Baudrate must be a integer number"
- elif o in ("-r", "--rtscts"):
- rtscts = 1
- elif o in ("-x", "--xonxoff"):
- xonxoff = 1
- elif o in ("-e", "--echo"):
- echo = 1
- elif o in ("-c", "--cr"):
- convert_outgoing_cr = 0
-
- try:
- s = serial.Serial(port, baudrate, rtscts=rtscts, xonxoff=xonxoff)
- except:
- print "could not open port"
- sys.exit(1)
- print "--- Miniterm --- type Ctrl-D to quit"
- #start serial->console thread
- r = threading.Thread(target=reader)
- r.setDaemon(1)
- r.start()
- #enter console->serial loop
- writer()
-
- print "\n--- exit ---"
diff --git a/pyserial/examples/scan.py b/pyserial/examples/scan.py
deleted file mode 100644
index b34aba3..0000000
--- a/pyserial/examples/scan.py
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/usr/bin/env python
-"""Scan for serial ports
-part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
-
-the scan function of this module tries to open each port number
-from 0 to 255 and it builds a list of those ports where this was
-successful.
-"""
-
-from serial import Serial
-from serial.serialutil import SerialException
-
-def scan():
- """scan for available ports. return a list of tuples (num, name)"""
- available = []
- for i in range(256):
- try:
- s = Serial(i)
- available.append( (i, s.portstr))
- s.close() #explicit close 'cause of delayed GC in java
- except SerialException:
- pass
- return available
-
-if __name__=='__main__':
- print "Found ports:"
- for n,s in scan():
- print "(%d) %s" % (n,s)
diff --git a/pyserial/examples/tcp_serial_redirect.py b/pyserial/examples/tcp_serial_redirect.py
deleted file mode 100644
index 0816ad8..0000000
--- a/pyserial/examples/tcp_serial_redirect.py
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/usr/bin/env python
-
-#(C)2002 Chris Liechti >cliecht@gmx.net>
-#redirect data from a TCP/IP connection to a serial port and vice versa
-#requires python 2.2 'cause socket.sendall is used
-
-#this program is a hack - do not use it as an example of clean
-#threading programming! it's only an example for pyserial.
-
-import sys, os, serial, threading, getopt, socket, time
-
-def reader():
- """loop forever and copy serial->console"""
- global connection
- while 1:
- try:
- if connection:
- connection.sendall(s.read(s.inWaiting()))
- else:
- time.sleep(0.2) #lower CPU usage...
- except socket.error, msg:
- print msg
- if connection: connection.close()
- connection = None
- except:
- pass
-
-def writer():
- """loop forever and copy console->serial"""
- global connection
- try:
- while 1:
- s.write(connection.recv(1024))
- except socket.error, msg:
- print msg
-
-
-#print a short help message
-def usage():
- print >>sys.stderr, """USAGE: %s [options]
- Simple Terminal Programm for the serial port.
-
- options:
- -p, --port=PORT: serial port, a number, defualt = 0 or a device name
- -b, --baud=BAUD: baudrate, default 9600
- -r, --rtscts: enable RTS/CTS flow control (default off)
- -x, --xonxoff: enable software flow control (default off)
- -P, --localport: TCP/IP port on which to run the server (default 7777)
- """ % sys.argv[0]
-
-if __name__ == '__main__':
- connection = None
-
- #parse command line options
- try:
- opts, args = getopt.getopt(sys.argv[1:],
- "hp:b:rxec",
- ["help", "port=", "baud=", "rtscts", "xonxoff", "echo", "cr"])
- except getopt.GetoptError:
- # print help information and exit:
- usage()
- sys.exit(2)
-
- port = 0
- baudrate = 9600
- rtscts = 0
- xonxoff = 0
- localport = 7777
- for o, a in opts:
- if o in ("-h", "--help"): #help text
- usage()
- sys.exit()
- elif o in ("-p", "--port"): #specified port
- try:
- port = int(a)
- except ValueError:
- port = a
- elif o in ("-b", "--baud"): #specified baudrate
- try:
- baudrate = int(a)
- except ValueError:
- raise ValueError, "Baudrate must be a integer number"
- elif o in ("-r", "--rtscts"):
- rtscts = 1
- elif o in ("-x", "--xonxoff"):
- xonxoff = 1
- elif o in ("-P", "--localport"):
- try:
- localport = int(a)
- except ValueError:
- raise ValueError, "local port must be an integer number"
-
- print "--- TCP/IP to Serial redirector --- type Ctrl-C / BREAK to quit"
- #start serial->tcp/ip thread
- r = threading.Thread(target=reader)
- r.setDaemon(1)
- r.start()
-
- try:
- s = serial.Serial(port, baudrate, rtscts=rtscts, xonxoff=xonxoff)
- except:
- print "could not open port"
- sys.exit(1)
-
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.bind( ('', localport) )
- srv.listen(1)
- while 1:
- try:
- connection, addr = srv.accept()
- print 'Connected by', addr
- #enter console->serial loop
- writer()
- except socket.error, msg:
- print msg
- if connection: connection.close()
- connection = None
-
- print "\n--- exit ---"
diff --git a/pyserial/examples/test.py b/pyserial/examples/test.py
deleted file mode 100644
index ec8d13a..0000000
--- a/pyserial/examples/test.py
+++ /dev/null
@@ -1,159 +0,0 @@
-#!/usr/bin/env python
-"""Some Tests for the serial module.
-part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
-
-intended to be run on different platforms, to ensure portability of
-the code.
-
-for all these tests a simple hardware is required.
-Loopback HW adapter:
-shortcut these pin pairs:
- TX <-> RX
- RTS <-> CTS
- DTR <-> DSR
-
-on a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
-
-"""
-
-import unittest, threading, time
-import serial
-
-#of which port should the tests be performed:
-PORT=0
-
-
-class Test4_Nonblocking(unittest.TestCase):
- """Test with timeouts"""
- timeout=0
- def setUp(self):
- self.s = serial.Serial(PORT,timeout=self.timeout)
- def tearDown(self):
- self.s.close()
-
- def test0_Messy(self):
- """NonBlocking (timeout=0)"""
- #this is only here to write out the message in verbose mode
- #because Test3 and Test4 print the same messages
-
- def test1_ReadEmpty(self):
- """timeout: After port open, the input buffer must be empty"""
- self.failUnless(self.s.read(1)=='', "expected empty buffer")
- def test2_Loopback(self):
- """timeout: each sent character should return (binary test).
- this is also a test for the binary capability of a port."""
- for c in map(chr,range(256)):
- self.s.write(c)
- time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32)
- self.failUnless(self.s.inWaiting()==1, "expected exactly one character for inWainting()")
- self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c)
- self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read")
- def test2_LoopbackTimeout(self):
- """timeout: test the timeout/immediate return.
- partial results should be returned."""
- self.s.write("HELLO")
- time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32)
- #read more characters as are available to run in the timeout
- self.failUnless(self.s.read(10)=='HELLO', "expected an 'HELLO' which was written before")
- self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read")
-
-
-class Test3_Timeout(Test4_Nonblocking):
- """Same tests as the NonBlocking ones but this time with timeout"""
- timeout=1
- def test0_Messy(self):
- """Blocking (timeout=1)"""
- #this is only here to write out the message in verbose mode
- #because Test3 and Test4 print the same messages
-
-class SendEvent(threading.Thread):
- def __init__(self, serial, delay=1):
- threading.Thread.__init__(self)
- self.serial = serial
- self.delay = delay
- self.x = threading.Event()
- self.stopped = 0
- self.start()
- def run(self):
- time.sleep(self.delay)
- if not self.stopped:
- self.serial.write("E")
- self.x.set()
- def isSet(self):
- return self.x.isSet()
- def stop(self):
- self.stopped = 1
- self.x.wait()
-
-class Test1_Forever(unittest.TestCase):
- """Tests a port with no timeout. These tests require that a
- character is sent after some time to stop the test, this is done
- through the SendEvent class and the Loopback HW."""
- def setUp(self):
- self.s = serial.Serial(PORT,timeout=None)
- self.event = SendEvent(self.s)
- def tearDown(self):
- self.event.stop()
- self.s.close()
-
- def test2_ReadEmpty(self):
- """no timeout: after port open, the input buffer must be empty (read).
- a character is sent after some time to terminate the test (SendEvent)."""
- c = self.s.read(1)
- if not (self.event.isSet() and c =='E'):
- self.fail("expected marker")
-
-class Test2_Forever(unittest.TestCase):
- """Tests a port with no timeout"""
- def setUp(self):
- self.s = serial.Serial(PORT,timeout=None)
- def tearDown(self):
- self.s.close()
-
- def test1_inWaitingEmpty(self):
- """no timeout: after port open, the input buffer must be empty (inWaiting)"""
- self.failUnless(self.s.inWaiting()==0, "expected empty buffer")
-
- def test2_Loopback(self):
- """no timeout: each sent character should return (binary test).
- this is also a test for the binary capability of a port."""
- for c in map(chr,range(256)):
- self.s.write(c)
- time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32)
- self.failUnless(self.s.inWaiting()==1, "expected exactly one character for inWainting()")
- self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c)
- self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read")
-
-
-class Test0_DataWires(unittest.TestCase):
- """Test modem control lines"""
- def setUp(self):
- self.s = serial.Serial(PORT)
- def tearDown(self):
- self.s.close()
-
- def test1_RTS(self):
- """Test RTS/CTS"""
- self.s.setRTS(0)
- self.failUnless(self.s.getCTS()==0, "CTS -> 0")
- self.s.setRTS(1)
- self.failUnless(self.s.getCTS()==1, "CTS -> 1")
-
- def test2_DTR(self):
- """Test DTR/DSR"""
- self.s.setDTR(0)
- self.failUnless(self.s.getDSR()==0, "DSR -> 0")
- self.s.setDTR(1)
- self.failUnless(self.s.getDSR()==1, "DSR -> 1")
-
- def test3_RI(self):
- """Test RI"""
- self.failUnless(self.s.getRI()==0, "RI -> 0")
-
-if __name__ == '__main__':
- import sys
- print __doc__
- print "testing port", PORT
- sys.argv.append('-v')
- # When this module is executed from the command-line, run all its tests
- unittest.main()