summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/enhancedserial.py62
-rw-r--r--examples/miniterm.py325
-rw-r--r--examples/scan.py27
-rw-r--r--examples/scanwin32.py190
-rw-r--r--examples/setup-miniterm-py2exe.py25
-rw-r--r--examples/setup_demo.py35
-rw-r--r--examples/tcp_serial_redirect.py176
-rw-r--r--examples/test.py189
-rw-r--r--examples/test_advanced.py169
-rw-r--r--examples/test_high_load.py69
-rw-r--r--examples/wxSerialConfigDialog.py260
-rw-r--r--examples/wxSerialConfigDialog.wxg262
-rw-r--r--examples/wxTerminal.py333
-rw-r--r--examples/wxTerminal.wxg127
14 files changed, 2249 insertions, 0 deletions
diff --git a/examples/enhancedserial.py b/examples/enhancedserial.py
new file mode 100644
index 0000000..2c81ae1
--- /dev/null
+++ b/examples/enhancedserial.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+"""Enhanced Serial Port class
+part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
+
+another implementation of the readline and readlines method.
+this one should be more efficient because a bunch of characters are read
+on each access, but the drawback is that a timeout must be specified to
+make it work (enforced by the class __init__).
+
+this class could be enhanced with a read_until() method and more
+like found in the telnetlib.
+"""
+
+from serial import Serial
+
+class EnhancedSerial(Serial):
+ def __init__(self, *args, **kwargs):
+ #ensure that a reasonable timeout is set
+ timeout = kwargs.get('timeout',0.1)
+ if timeout < 0.01: timeout = 0.1
+ kwargs['timeout'] = timeout
+ Serial.__init__(self, *args, **kwargs)
+ self.buf = ''
+
+ def readline(self, maxsize=None, timeout=1):
+ """maxsize is ignored, timeout in seconds is the max time that is way for a complete line"""
+ tries = 0
+ while 1:
+ self.buf += self.read(512)
+ pos = self.buf.find('\n')
+ if pos >= 0:
+ line, self.buf = self.buf[:pos+1], self.buf[pos+1:]
+ return line
+ tries += 1
+ if tries * self.timeout > timeout:
+ break
+ line, self.buf = self.buf, ''
+ return line
+
+ def readlines(self, sizehint=None, timeout=1):
+ """read all lines that are available. abort after timout
+ when no more data arrives."""
+ lines = []
+ while 1:
+ line = self.readline(timeout=timeout)
+ if line:
+ lines.append(line)
+ if not line or line[-1:] != '\n':
+ break
+ return lines
+
+if __name__=='__main__':
+ #do some simple tests with a Loopback HW (see test.py for details)
+ PORT = 0
+ #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) )
+ s = EnhancedSerial(PORT)
+ #write out some test data lines
+ s.write('\n'.join("hello how are you".split()))
+ #and read them back
+ print s.readlines()
+ #this one should print an empty list
+ print s.readlines(timeout=0.4)
diff --git a/examples/miniterm.py b/examples/miniterm.py
new file mode 100644
index 0000000..613af3d
--- /dev/null
+++ b/examples/miniterm.py
@@ -0,0 +1,325 @@
+#!/usr/bin/env python
+
+# Very simple serial terminal
+# (C)2002-2006 Chris Liechti <cliechti@gmx.net>
+
+# Input characters are sent directly (only LF -> CR/LF/CRLF translation is
+# done), received characters are displayed as is (or escaped trough pythons
+# repr, useful for debug purposes)
+
+
+import sys, os, serial, threading
+
+EXITCHARCTER = '\x1d' #GS/ctrl+]
+UPLOADCHARACTER = '\x15' # Upload: ctrl+u
+
+#first choose a platform dependant way to read single characters from the console
+global console
+
+if os.name == 'nt':
+ import msvcrt
+ class Console:
+ def __init__(self):
+ pass
+
+ def setup(self):
+ pass # Do nothing for 'nt'
+
+ def cleanup(self):
+ pass # Do nothing for 'nt'
+
+ def getkey():
+ while 1:
+ z = msvcrt.getch()
+ if z == '\0' or z == '\xe0': #functions keys
+ msvcrt.getch()
+ else:
+ if z == '\r':
+ return '\n'
+ return z
+
+ console = Console()
+
+elif os.name == 'posix':
+ import termios, sys, os
+ class Console:
+ def __init__(self):
+ self.fd = sys.stdin.fileno()
+
+ def setup(self):
+ self.old = termios.tcgetattr(self.fd)
+ new = termios.tcgetattr(self.fd)
+ new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
+ new[6][termios.VMIN] = 1
+ new[6][termios.VTIME] = 0
+ termios.tcsetattr(self.fd, termios.TCSANOW, new)
+ #s = '' # We'll save the characters typed and add them to the pool.
+
+ def getkey(self):
+ c = os.read(self.fd, 1)
+ return c
+
+ def cleanup(self):
+ termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
+
+ console = Console()
+
+ def cleanup_console():
+ console.cleanup()
+
+ console.setup()
+ sys.exitfunc = cleanup_console #terminal modes have to be restored on exit...
+
+else:
+ raise "Sorry no implementation for your platform (%s) available." % sys.platform
+
+CONVERT_CRLF = 2
+CONVERT_CR = 1
+CONVERT_LF = 0
+NEWLINE_CONVERISON_MAP = ('\n', '\r', '\r\n')
+
+class Miniterm:
+ def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False, convert_outgoing=CONVERT_CRLF, repr_mode=0):
+ self.serial = serial.Serial(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=0.7)
+ self.echo = echo
+ self.repr_mode = repr_mode
+ self.convert_outgoing = convert_outgoing
+ self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing]
+
+ def start(self):
+ self.alive = True
+ #start serial->console thread
+ self.receiver_thread = threading.Thread(target=self.reader)
+ self.receiver_thread.setDaemon(1)
+ self.receiver_thread.start()
+ #enter console->serial loop
+ self.transmitter_thread = threading.Thread(target=self.writer)
+ self.transmitter_thread.setDaemon(1)
+ self.transmitter_thread.start()
+
+ def stop(self):
+ self.alive = False
+
+ def join(self, transmit_only=False):
+ self.transmitter_thread.join()
+ if not transmit_only:
+ self.receiver_thread.join()
+
+ def reader(self):
+ """loop and copy serial->console"""
+ while self.alive:
+ data = self.serial.read(1)
+
+ if self.repr_mode == 0:
+ # direct output, just have to care about newline setting
+ if data == '\r' and self.convert_outgoing == CONVERT_CR:
+ sys.stdout.write('\n')
+ else:
+ sys.stdout.write(data)
+ elif self.repr_mode == 1:
+ # escape non-printable, let pass newlines
+ if self.convert_outgoing == CONVERT_CRLF and data in '\r\n':
+ if data == '\n':
+ sys.stdout.write('\n')
+ elif data == '\r':
+ pass
+ elif data == '\n' and self.convert_outgoing == CONVERT_LF:
+ sys.stdout.write('\n')
+ elif data == '\r' and self.convert_outgoing == CONVERT_CR:
+ sys.stdout.write('\n')
+ else:
+ sys.stdout.write(repr(data)[1:-1])
+ elif self.repr_mode == 2:
+ # escape all non-printable, including newline
+ sys.stdout.write(repr(data)[1:-1])
+ elif self.repr_mode == 3:
+ # escape everything (hexdump)
+ for character in data:
+ sys.stdout.write("%s " % character.encode('hex'))
+ sys.stdout.flush()
+
+
+ def writer(self):
+ """loop and copy console->serial until EXITCHARCTER character is found"""
+ while self.alive:
+ try:
+ c = console.getkey()
+ except KeyboardInterrupt:
+ c = '\x03'
+ if c == EXITCHARCTER:
+ self.stop()
+ break # exit app
+ elif c == UPLOADCHARACTER: # upload text file
+ sys.stderr.write('\nFile to upload: ')
+ sys.stderr.flush()
+ console.cleanup()
+ filename = sys.stdin.readline().rstrip('\r\n')
+ if filename != '':
+ try:
+ file = open(filename, 'r')
+ sys.stderr.write('Sending file %s ' % filename)
+ while True:
+ line = file.readline().rstrip('\r\n')
+ if not line:
+ break
+ self.serial.write(line)
+ self.serial.write('\r\n')
+ # Wait for output buffer to drain.
+ self.serial.flush()
+ sys.stderr.write('.') # Progress indicator.
+ sys.stderr.write('\nFile %s sent.\n' % filename)
+ except IOError:
+ print 'Error opening file %s' % filename
+ console.setup()
+
+ elif c == '\n':
+ self.serial.write(self.newline) # send newline character(s)
+ if self.echo:
+ sys.stdout.write(c) # local echo is a real newline in any case
+ else:
+ self.serial.write(c) # send character
+ if self.echo:
+ sys.stdout.write(c)
+
+def key_description(character):
+ """generate a readable description for a key"""
+ ascii_code = ord(character)
+ if ascii_code < 32:
+ return 'Ctrl+%c' % (ord('@') + ascii_code)
+ else:
+ return repr(ascii_code)
+
+
+def main():
+ import optparse
+
+ parser = optparse.OptionParser(usage="""\
+%prog [options] [port [baudrate]]
+
+Miniterm - A simple terminal program for the serial port.""")
+
+ parser.add_option("-p", "--port", dest="port",
+ help="port, a number (default 0) or a device name (deprecated option)",
+ default=None)
+
+ parser.add_option("-b", "--baud", dest="baudrate", action="store", type='int',
+ help="set baudrate, default 9600", default=9600)
+
+ parser.add_option("", "--parity", dest="parity", action="store",
+ help="set parity, one of [N, E, O], default=N", default='N')
+
+ parser.add_option("-e", "--echo", dest="echo", action="store_true",
+ help="enable local echo (default off)", default=False)
+
+ parser.add_option("", "--rtscts", dest="rtscts", action="store_true",
+ help="enable RTS/CTS flow control (default off)", default=False)
+
+ parser.add_option("", "--xonxoff", dest="xonxoff", action="store_true",
+ help="enable software flow control (default off)", default=False)
+
+ parser.add_option("", "--cr", dest="cr", action="store_true",
+ help="do not send CR+LF, send CR only", default=False)
+
+ parser.add_option("", "--lf", dest="lf", action="store_true",
+ help="do not send CR+LF, send LF only", default=False)
+
+ parser.add_option("-D", "--debug", dest="repr_mode", action="count",
+ help="""debug received data (escape non-printable chars)
+--debug can be given multiple times:
+0: just print what is received
+1: escape non-printable characters, do newlines as ususal
+2: escape non-printable characters, newlines too
+3: hex dump everything""", default=0)
+
+ parser.add_option("", "--rts", dest="rts_state", action="store", type='int',
+ help="set initial RTS line state (possible values: 0, 1)", default=None)
+
+ parser.add_option("", "--dtr", dest="dtr_state", action="store", type='int',
+ help="set initial DTR line state (possible values: 0, 1)", default=None)
+
+ parser.add_option("-q", "--quiet", dest="quiet", action="store_true",
+ help="suppress non error messages", default=False)
+
+ parser.add_option("", "--exit-char", dest="exit_char", action="store", type='int',
+ help="ASCII code of special charcter that is used to exit the application", default=0x1d)
+
+ parser.add_option("", "--upload-char", dest="upload_char", action="store", type='int',
+ help="ASCII code of special charcter that is used to send a file", default=0x15)
+
+ (options, args) = parser.parse_args()
+
+ if options.cr and options.lf:
+ parser.error("ony one of --cr or --lf can be specified")
+
+ global EXITCHARCTER, UPLOADCHARACTER
+ EXITCHARCTER = chr(options.exit_char)
+ UPLOADCHARACTER = chr(options.upload_char)
+
+ port = options.port
+ baudrate = options.baudrate
+ if args:
+ if options.port is not None:
+ parser.error("no arguments are allowed, options only when --port is given")
+ port = args.pop(0)
+ if args:
+ try:
+ baudrate = int(args[0])
+ except ValueError:
+ parser.error("baudrate must be a number, not %r" % args[0])
+ args.pop(0)
+ if args:
+ parser.error("too many arguments")
+ else:
+ if port is None: port = 0
+
+ convert_outgoing = CONVERT_CRLF
+ if options.cr:
+ convert_outgoing = CONVERT_CR
+ elif options.lf:
+ convert_outgoing = CONVERT_LF
+
+ try:
+ miniterm = Miniterm(
+ port,
+ baudrate,
+ options.parity,
+ rtscts=options.rtscts,
+ xonxoff=options.xonxoff,
+ echo=options.echo,
+ convert_outgoing=convert_outgoing,
+ repr_mode=options.repr_mode,
+ )
+ except serial.SerialException:
+ sys.stderr.write("could not open port %r\n" % port)
+ sys.exit(1)
+
+ if not options.quiet:
+ sys.stderr.write('--- Miniterm on %s: %d,%s,%s,%s. ---\n' % (
+ miniterm.serial.portstr,
+ miniterm.serial.baudrate,
+ miniterm.serial.bytesize,
+ miniterm.serial.parity,
+ miniterm.serial.stopbits,
+ ))
+ sys.stderr.write('--- Quit: %s | Upload: %s ---\n' % (
+ key_description(EXITCHARCTER),
+ key_description(UPLOADCHARACTER)
+ ))
+ if options.dtr_state is not None:
+ if not options.quiet:
+ sys.stderr.write('--- forcing DTR %s\n' % (options.dtr_state and 'active' or 'inactive'))
+ miniterm.serial.setDTR(options.dtr_state)
+ if options.rts_state is not None:
+ if not options.quiet:
+ sys.stderr.write('--- forcing RTS %s\n' % (options.rts_state and 'active' or 'inactive'))
+ miniterm.serial.setRTS(options.rts_state)
+
+ miniterm.start()
+ miniterm.join(True)
+ if not options.quiet:
+ sys.stderr.write("\n--- exit ---\n")
+ miniterm.join()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/scan.py b/examples/scan.py
new file mode 100644
index 0000000..439818c
--- /dev/null
+++ b/examples/scan.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+"""Scan for serial ports.
+Part of pySerial (http://pyserial.sf.net) (C)2002-2003 <cliechti@gmx.net>
+
+The scan function of this module tries to open each port number
+from 0 to 255 and it builds a list of those ports where this was
+successful.
+"""
+
+import serial
+
+def scan():
+ """scan for available ports. return a list of tuples (num, name)"""
+ available = []
+ for i in range(256):
+ try:
+ s = serial.Serial(i)
+ available.append( (i, s.portstr))
+ s.close() #explicit close 'cause of delayed GC in java
+ except serial.SerialException:
+ pass
+ return available
+
+if __name__=='__main__':
+ print "Found ports:"
+ for n,s in scan():
+ print "(%d) %s" % (n,s)
diff --git a/examples/scanwin32.py b/examples/scanwin32.py
new file mode 100644
index 0000000..eb5f544
--- /dev/null
+++ b/examples/scanwin32.py
@@ -0,0 +1,190 @@
+import ctypes
+import re
+
+def ValidHandle(value):
+ if value == 0:
+ raise ctypes.WinError()
+ return value
+
+NULL = 0
+HDEVINFO = ctypes.c_int
+BOOL = ctypes.c_int
+CHAR = ctypes.c_char
+PCTSTR = ctypes.c_char_p
+HWND = ctypes.c_uint
+DWORD = ctypes.c_ulong
+PDWORD = ctypes.POINTER(DWORD)
+ULONG = ctypes.c_ulong
+ULONG_PTR = ctypes.POINTER(ULONG)
+#~ PBYTE = ctypes.c_char_p
+PBYTE = ctypes.c_void_p
+
+class GUID(ctypes.Structure):
+ _fields_ = [
+ ('Data1', ctypes.c_ulong),
+ ('Data2', ctypes.c_ushort),
+ ('Data3', ctypes.c_ushort),
+ ('Data4', ctypes.c_ubyte*8),
+ ]
+ def __str__(self):
+ return "{%08x-%04x-%04x-%s-%s}" % (
+ self.Data1,
+ self.Data2,
+ self.Data3,
+ ''.join(["%02x" % d for d in self.Data4[:2]]),
+ ''.join(["%02x" % d for d in self.Data4[2:]]),
+ )
+
+class SP_DEVINFO_DATA(ctypes.Structure):
+ _fields_ = [
+ ('cbSize', DWORD),
+ ('ClassGuid', GUID),
+ ('DevInst', DWORD),
+ ('Reserved', ULONG_PTR),
+ ]
+ def __str__(self):
+ return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
+PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
+
+class SP_DEVICE_INTERFACE_DATA(ctypes.Structure):
+ _fields_ = [
+ ('cbSize', DWORD),
+ ('InterfaceClassGuid', GUID),
+ ('Flags', DWORD),
+ ('Reserved', ULONG_PTR),
+ ]
+ def __str__(self):
+ return "InterfaceClassGuid:%s Flags:%s" % (self.InterfaceClassGuid, self.Flags)
+PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA)
+
+PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
+
+SetupDiDestroyDeviceInfoList = ctypes.windll.setupapi.SetupDiDestroyDeviceInfoList
+SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
+SetupDiDestroyDeviceInfoList.restype = BOOL
+
+SetupDiGetClassDevs = ctypes.windll.setupapi.SetupDiGetClassDevsA
+SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
+SetupDiGetClassDevs.restype = ValidHandle #HDEVINFO
+
+SetupDiEnumDeviceInterfaces = ctypes.windll.setupapi.SetupDiEnumDeviceInterfaces
+SetupDiEnumDeviceInterfaces.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, ctypes.POINTER(GUID), DWORD, PSP_DEVICE_INTERFACE_DATA]
+SetupDiEnumDeviceInterfaces.restype = BOOL
+
+SetupDiGetDeviceInterfaceDetail = ctypes.windll.setupapi.SetupDiGetDeviceInterfaceDetailA
+SetupDiGetDeviceInterfaceDetail.argtypes = [HDEVINFO, PSP_DEVICE_INTERFACE_DATA, PSP_DEVICE_INTERFACE_DETAIL_DATA, DWORD, PDWORD, PSP_DEVINFO_DATA]
+SetupDiGetDeviceInterfaceDetail.restype = BOOL
+
+SetupDiGetDeviceRegistryProperty = ctypes.windll.setupapi.SetupDiGetDeviceRegistryPropertyA
+SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
+SetupDiGetDeviceRegistryProperty.restype = BOOL
+
+
+GUID_CLASS_COMPORT = GUID(0x86e0d1e0L, 0x8089, 0x11d0,
+ (ctypes.c_ubyte*8)(0x9c, 0xe4, 0x08, 0x00, 0x3e, 0x30, 0x1f, 0x73))
+
+DIGCF_PRESENT = 2
+DIGCF_DEVICEINTERFACE = 16
+INVALID_HANDLE_VALUE = 0
+ERROR_INSUFFICIENT_BUFFER = 122
+SPDRP_HARDWAREID = 1
+SPDRP_FRIENDLYNAME = 12
+ERROR_NO_MORE_ITEMS = 259
+
+def comports(available_only=True):
+ """This generator scans the device registry for com ports and yields port, desc, hwid.
+ If available_only is true only return currently existing ports."""
+ flags = DIGCF_DEVICEINTERFACE
+ if available_only:
+ flags |= DIGCF_PRESENT
+ g_hdi = SetupDiGetClassDevs(ctypes.byref(GUID_CLASS_COMPORT), None, NULL, flags);
+ #~ for i in range(256):
+ for dwIndex in range(256):
+ did = SP_DEVICE_INTERFACE_DATA()
+ did.cbSize = ctypes.sizeof(did)
+
+ if not SetupDiEnumDeviceInterfaces(
+ g_hdi,
+ None,
+ ctypes.byref(GUID_CLASS_COMPORT),
+ dwIndex,
+ ctypes.byref(did)
+ ):
+ if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS:
+ raise ctypes.WinError()
+ break
+
+ dwNeeded = DWORD()
+ # get the size
+ if not SetupDiGetDeviceInterfaceDetail(
+ g_hdi,
+ ctypes.byref(did),
+ None, 0, ctypes.byref(dwNeeded),
+ None
+ ):
+ # Ignore ERROR_INSUFFICIENT_BUFFER
+ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
+ raise ctypes.WinError()
+ # allocate buffer
+ class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure):
+ _fields_ = [
+ ('cbSize', DWORD),
+ ('DevicePath', CHAR*(dwNeeded.value - ctypes.sizeof(DWORD))),
+ ]
+ def __str__(self):
+ return "DevicePath:%s" % (self.DevicePath,)
+ idd = SP_DEVICE_INTERFACE_DETAIL_DATA_A()
+ idd.cbSize = 5
+ devinfo = SP_DEVINFO_DATA()
+ devinfo.cbSize = ctypes.sizeof(devinfo)
+ if not SetupDiGetDeviceInterfaceDetail(
+ g_hdi,
+ ctypes.byref(did),
+ ctypes.byref(idd), dwNeeded, None,
+ ctypes.byref(devinfo)
+ ):
+ raise ctypes.WinError()
+
+ # hardware ID
+ szHardwareID = ctypes.create_string_buffer('\0' * 250)
+ if not SetupDiGetDeviceRegistryProperty(
+ g_hdi,
+ ctypes.byref(devinfo),
+ SPDRP_HARDWAREID,
+ None,
+ ctypes.byref(szHardwareID), ctypes.sizeof(szHardwareID) - 1,
+ None
+ ):
+ # Ignore ERROR_INSUFFICIENT_BUFFER
+ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
+ raise ctypes.WinError()
+
+ # friendly name
+ szFriendlyName = ctypes.create_string_buffer('\0' * 250)
+ if not SetupDiGetDeviceRegistryProperty(
+ g_hdi,
+ ctypes.byref(devinfo),
+ SPDRP_FRIENDLYNAME,
+ None,
+ ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1,
+ None
+ ):
+ # Ignore ERROR_INSUFFICIENT_BUFFER
+ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
+ raise ctypes.WinError()
+ port_name = re.search(r"\((.*)\)", szFriendlyName.value).group(1)
+ yield port_name, szFriendlyName.value, szHardwareID.value
+
+ SetupDiDestroyDeviceInfoList(g_hdi)
+
+
+if __name__ == '__main__':
+ import serial
+ for port, desc, hwid in comports():
+ print "%s: %s (%s)" % (port, desc, hwid)
+ print " "*10, serial.Serial(port) #test open
+
+ # list of all ports the system knows
+ print "-"*60
+ for port, desc, hwid in comports(False):
+ print "%-10s: %s (%s)" % (port, desc, hwid)
diff --git a/examples/setup-miniterm-py2exe.py b/examples/setup-miniterm-py2exe.py
new file mode 100644
index 0000000..c28457a
--- /dev/null
+++ b/examples/setup-miniterm-py2exe.py
@@ -0,0 +1,25 @@
+# setup script for py2exe to create the miniterm.exe
+# $Id: setup-miniterm-py2exe.py,v 1.1 2005-09-21 19:51:19 cliechti Exp $
+
+from distutils.core import setup
+import glob, sys, py2exe, os
+
+sys.path.append('..')
+
+sys.argv.extend("py2exe --bundle 1".split())
+
+setup(
+ name='miniterm',
+ #~ version='0.5',
+ zipfile=None,
+ options = {"py2exe":
+ {
+ 'dist_dir': 'bin',
+ 'excludes': ['javax.comm'],
+ }
+ },
+ console = [
+ #~ "miniterm_exe_wrapper.py",
+ "miniterm.py",
+ ],
+)
diff --git a/examples/setup_demo.py b/examples/setup_demo.py
new file mode 100644
index 0000000..854c0b9
--- /dev/null
+++ b/examples/setup_demo.py
@@ -0,0 +1,35 @@
+# This is a setup.py example script for the use with py2exe
+from distutils.core import setup
+import py2exe
+import sys, os
+
+#this script is only useful for py2exe so just run that distutils command.
+#that allows to run it with a simple double click.
+sys.argv.append('py2exe')
+
+#get an icon from somewhere.. the python installation should have one:
+icon = os.path.join(os.path.dirname(sys.executable), 'py.ico')
+
+setup(
+ options = {'py2exe': {
+ 'excludes': ['javax.comm'],
+ 'optimize': 2,
+ 'dist_dir': 'dist',
+ }
+ },
+
+ name = "wxTerminal",
+ windows = [
+ {
+ 'script': "wxTerminal.py",
+ 'icon_resources': [(0x0004, icon)]
+ },
+ ],
+ zipfile = "stuff.lib",
+
+ description = "Simple serial terminal application",
+ version = "0.1",
+ author = "Chris Liechti",
+ author_email = "cliechti@gmx.net",
+ url = "http://pyserial.sf.net",
+)
diff --git a/examples/tcp_serial_redirect.py b/examples/tcp_serial_redirect.py
new file mode 100644
index 0000000..4c04294
--- /dev/null
+++ b/examples/tcp_serial_redirect.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+
+# (C) 2002-2006 Chris Liechti <cliechti@gmx.net>
+# redirect data from a TCP/IP connection to a serial port and vice versa
+# requires Python 2.2 'cause socket.sendall is used
+
+
+import sys, os, serial, threading, socket
+
+try:
+ True
+except NameError:
+ True = 1
+ False = 0
+
+class Redirector:
+ def __init__(self, serial, socket):
+ self.serial = serial
+ self.socket = socket
+
+ def shortcut(self):
+ """connect the serial port to the tcp port by copying everything
+ from one side to the other"""
+ self.alive = True
+ self.thread_read = threading.Thread(target=self.reader)
+ self.thread_read.setDaemon(1)
+ self.thread_read.start()
+ self.writer()
+
+ def reader(self):
+ """loop forever and copy serial->socket"""
+ while self.alive:
+ try:
+ data = self.serial.read(1) #read one, blocking
+ n = self.serial.inWaiting() #look if there is more
+ if n:
+ data = data + self.serial.read(n) #and get as much as possible
+ if data:
+ self.socket.sendall(data) #send it over TCP
+ except socket.error, msg:
+ print msg
+ #probably got disconnected
+ break
+ self.alive = False
+
+ def writer(self):
+ """loop forever and copy socket->serial"""
+ while self.alive:
+ try:
+ data = self.socket.recv(1024)
+ if not data:
+ break
+ self.serial.write(data) #get a bunch of bytes and send them
+ except socket.error, msg:
+ print msg
+ #probably got disconnected
+ break
+ self.alive = False
+ self.thread_read.join()
+
+ def stop(self):
+ """Stop copying"""
+ if self.alive:
+ self.alive = False
+ self.thread_read.join()
+
+
+if __name__ == '__main__':
+ import optparse
+
+ parser = optparse.OptionParser(usage="""\
+%prog [options] [port [baudrate]]
+Simple Serial to Network (TCP/IP) redirector.
+
+Note: no security measures are implemeted. Anyone can remotely connect
+to this service over the network.
+Only one connection at once is supported. When the connection is terminated
+it waits for the next connect.
+""")
+ parser.add_option("-p", "--port", dest="port",
+ help="port, a number (default 0) or a device name (deprecated option)",
+ default=None)
+
+ parser.add_option("-b", "--baud", dest="baudrate", action="store", type='int',
+ help="set baudrate, default 9600", default=9600)
+
+ parser.add_option("", "--parity", dest="parity", action="store",
+ help="set parity, one of [N, E, O], default=N", default='N')
+
+ parser.add_option("", "--rtscts", dest="rtscts", action="store_true",
+ help="enable RTS/CTS flow control (default off)", default=False)
+
+ parser.add_option("", "--xonxoff", dest="xonxoff", action="store_true",
+ help="enable software flow control (default off)", default=False)
+
+ parser.add_option("", "--cr", dest="cr", action="store_true",
+ help="do not send CR+LF, send CR only", default=False)
+
+ parser.add_option("", "--lf", dest="lf", action="store_true",
+ help="do not send CR+LF, send LF only", default=False)
+
+ parser.add_option("", "--rts", dest="rts_state", action="store", type='int',
+ help="set initial RTS line state (possible values: 0, 1)", default=None)
+
+ parser.add_option("", "--dtr", dest="dtr_state", action="store", type='int',
+ help="set initial DTR line state (possible values: 0, 1)", default=None)
+
+ parser.add_option("-q", "--quiet", dest="quiet", action="store_true",
+ help="suppress non error messages", default=False)
+
+ parser.add_option("-P", "--localport", dest="local_port", action="store", type='int',
+ help="local TCP port", default=7777)
+
+
+ (options, args) = parser.parse_args()
+
+ port = options.port
+ baudrate = options.baudrate
+ if args:
+ if options.port is not None:
+ parser.error("no arguments are allowed, options only when --port is given")
+ port = args.pop(0)
+ if args:
+ try:
+ baudrate = int(args[0])
+ except ValueError:
+ parser.error("baudrate must be a number, not %r" % args[0])
+ args.pop(0)
+ if args:
+ parser.error("too many arguments")
+ else:
+ if port is None: port = 0
+
+ if options.cr and options.lf:
+ parser.error("ony one of --cr or --lf can be specified")
+
+ ser = serial.Serial()
+ ser.port = port
+ ser.baudrate = baudrate
+ ser.rtscts = options.rtscts
+ ser.xonxoff = options.xonxoff
+ ser.timeout = 1 #required so that the reader thread can exit
+
+ if not options.quiet:
+ print "--- TCP/IP to Serial redirector --- type Ctrl-C / BREAK to quit"
+ print "--- %s %s,%s,%s,%s ---" % (ser.portstr, ser.baudrate, 8, ser.parity, 1)
+
+ try:
+ ser.open()
+ except serial.SerialException, e:
+ print "Could not open serial port %s: %s" % (ser.portstr, e)
+ sys.exit(1)
+
+ if options.rts_state is not None:
+ ser.setRTS(options.rts_state)
+
+ if options.dtr_state is not None:
+ ser.setDTR(options.dtr_state)
+
+ srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ srv.bind( ('', options.local_port) )
+ srv.listen(1)
+ while 1:
+ try:
+ print "Waiting for connection on %s..." % options.local_port
+ connection, addr = srv.accept()
+ print 'Connected by', addr
+ #enter console->serial loop
+ r = Redirector(ser, connection)
+ r.shortcut()
+ print 'Disconnected'
+ connection.close()
+ except socket.error, msg:
+ print msg
+
+ print "\n--- exit ---"
diff --git a/examples/test.py b/examples/test.py
new file mode 100644
index 0000000..a059d1e
--- /dev/null
+++ b/examples/test.py
@@ -0,0 +1,189 @@
+#! /usr/bin/env python
+# Python Serial Port Extension for Win32, Linux, BSD, Jython
+# see __init__.py
+#
+# (C) 2001-2008 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""\
+Some tests for the serial module.
+Part of pyserial (http://pyserial.sf.net) (C)2001-2008 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+For all these tests a simple hardware is required.
+Loopback HW adapter:
+Shortcut these pin pairs:
+ TX <-> RX
+ RTS <-> CTS
+ DTR <-> DSR
+
+On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
+"""
+
+import unittest, threading, time
+import serial
+
+#on which port should the tests be performed:
+PORT=0
+
+
+class Test4_Nonblocking(unittest.TestCase):
+ """Test with timeouts"""
+ timeout=0
+ def setUp(self):
+ self.s = serial.Serial(PORT,timeout=self.timeout)
+ def tearDown(self):
+ self.s.close()
+
+ def test0_Messy(self):
+ """NonBlocking (timeout=0)"""
+ #this is only here to write out the message in verbose mode
+ #because Test3 and Test4 print the same messages
+
+ def test1_ReadEmpty(self):
+ """timeout: After port open, the input buffer must be empty"""
+ self.failUnless(self.s.read(1)=='', "expected empty buffer")
+ def test2_Loopback(self):
+ """timeout: each sent character should return (binary test).
+ this is also a test for the binary capability of a port."""
+ for c in map(chr,range(256)):
+ self.s.write(c)
+ time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32)
+ self.failUnless(self.s.inWaiting()==1, "expected exactly one character for inWainting()")
+ self.failUnless(self.s.read(1)==c, "expected a '%s' which was written before" % c)
+ self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read")
+ def test2_LoopbackTimeout(self):
+ """timeout: test the timeout/immediate return.
+ partial results should be returned."""
+ self.s.write("HELLO")
+ time.sleep(0.1) #there might be a small delay until the character is ready (especialy on win32)
+ #read more characters as are available to run in the timeout
+ self.failUnless(self.s.read(10)=='HELLO', "expected the 'HELLO' which was written before")
+ self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read")
+
+
+class Test3_Timeout(Test4_Nonblocking):
+ """Same tests as the NonBlocking ones but this time with timeout"""
+ timeout=1
+ def test0_Messy(self):
+ """Blocking (timeout=1)"""
+ #this is only here to write out the message in verbose mode
+ #because Test3 and Test4 print the same messages
+
+class SendEvent(threading.Thread):
+ def __init__(self, serial, delay=1):
+ threading.Thread.__init__(self)
+ self.serial = serial
+ self.delay = delay
+ self.x = threading.Event()
+ self.stopped = 0
+ self.start()
+ def run(self):
+ time.sleep(self.delay)
+ if not self.stopped:
+ self.serial.write("E")
+ self.x.set()
+ def isSet(self):
+ return self.x.isSet()
+ def stop(self):
+ self.stopped = 1
+ self.x.wait()
+
+class Test1_Forever(unittest.TestCase):
+ """Tests a port with no timeout. These tests require that a
+ character is sent after some time to stop the test, this is done
+ through the SendEvent class and the Loopback HW."""
+ def setUp(self):
+ self.s = serial.Serial(PORT, timeout=None)
+ self.event = SendEvent(self.s)
+ def tearDown(self):
+ self.event.stop()
+ self.s.close()
+
+ def test2_ReadEmpty(self):
+ """no timeout: after port open, the input buffer must be empty (read).
+ a character is sent after some time to terminate the test (SendEvent)."""
+ c = self.s.read(1)
+ if not (self.event.isSet() and c == 'E'):
+ self.fail("expected marker")
+
+class Test2_Forever(unittest.TestCase):
+ """Tests a port with no timeout"""
+ def setUp(self):
+ self.s = serial.Serial(PORT,timeout=None)
+ def tearDown(self):
+ self.s.close()
+
+ def test1_inWaitingEmpty(self):
+ """no timeout: after port open, the input buffer must be empty (inWaiting)"""
+ self.failUnless(self.s.inWaiting()==0, "expected empty buffer")
+
+ def test2_Loopback(self):
+ """no timeout: each sent character should return (binary test).
+ this is also a test for the binary capability of a port."""
+ for c in map(chr,range(256)):
+ self.s.write(c)
+ time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32)
+ self.failUnless(self.s.inWaiting()==1, "expected exactly one character for inWainting()")
+ self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c)
+ self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read")
+
+
+class Test0_DataWires(unittest.TestCase):
+ """Test modem control lines"""
+ def setUp(self):
+ self.s = serial.Serial(PORT)
+ def tearDown(self):
+ self.s.close()
+
+ def test1_RTS(self):
+ """Test RTS/CTS"""
+ self.s.setRTS(0)
+ self.failUnless(not self.s.getCTS(), "CTS -> 0")
+ self.s.setRTS(1)
+ self.failUnless(self.s.getCTS(), "CTS -> 1")
+
+ def test2_DTR(self):
+ """Test DTR/DSR"""
+ self.s.setDTR(0)
+ self.failUnless(not self.s.getDSR(), "DSR -> 0")
+ self.s.setDTR(1)
+ self.failUnless(self.s.getDSR(), "DSR -> 1")
+
+ def test3_RI(self):
+ """Test RI"""
+ self.failUnless(not self.s.getRI(), "RI -> 0")
+
+class Test_MoreTimeouts(unittest.TestCase):
+ """Test with timeouts"""
+ def setUp(self):
+ self.s = serial.Serial() #create an closed serial port
+
+ def tearDown(self):
+ self.s.close()
+
+ def test_WriteTimeout(self):
+ """Test write() timeout."""
+ #use xonxoff setting and the loopback adapter to switch traffic on hold
+ self.s.port = PORT
+ self.s.writeTimeout = 1
+ self.s.xonxoff = 1
+ self.s.open()
+ self.s.write(serial.XOFF)
+ time.sleep(0.1) #some systems need a little delay so that they can react on XOFF
+ t1 = time.time()
+ self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, "timeout please"*100)
+ t2 = time.time()
+ self.failUnless( 1 <= (t2-t1) < 2, "Timeout not in the given intervall (%s)" % (t2-t1))
+
+if __name__ == '__main__':
+ import sys
+ print __doc__
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ print "Testing port", PORT
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/examples/test_advanced.py b/examples/test_advanced.py
new file mode 100644
index 0000000..2702e1c
--- /dev/null
+++ b/examples/test_advanced.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env python
+#needs at least python 2.2.3
+
+#Python Serial Port Extension for Win32, Linux, BSD, Jython
+#see __init__.py
+#
+#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""Some tests for the serial module.
+Part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+These tests open a serial port and change all the settings on the fly.
+If the port is realy correctly configured cannot be determined - that
+would require external hardware or a nullmodem cable and an other
+serial port library... Thus it mainly tests that all features are
+correctly implemented and that the interface does what it should.
+"""
+
+import unittest
+import serial
+
+#on which port should the tests be performed:
+PORT=0
+
+class Test_ChangeAttributes(unittest.TestCase):
+ """Test with timeouts"""
+
+ def setUp(self):
+ self.s = serial.Serial() #create an closed serial port
+
+ def tearDown(self):
+ self.s.close()
+
+ def test_PortSetting(self):
+ self.s.port = PORT
+ #portstr has to be set
+ if isinstance(PORT, str):
+ self.failUnlessEqual(self.s.portstr.lower(), PORT.lower())
+ else:
+ self.failUnlessEqual(self.s.portstr, serial.device(PORT))
+ #test internals
+ self.failUnlessEqual(self.s._port, PORT)
+ #test on the fly change
+ self.s.open()
+ self.failUnless(self.s.isOpen())
+ self.s.port = 0
+ self.failUnless(self.s.isOpen())
+ self.failUnlessEqual(self.s.port, 0)
+ self.failUnlessEqual(self.s.portstr, serial.device(0))
+ try:
+ self.s.port = 1
+ except serial.SerialException: #port not available on system
+ pass #cant test on this machine...
+ else:
+ self.failUnless(self.s.isOpen())
+ self.failUnlessEqual(self.s.port, 1)
+ self.failUnlessEqual(self.s.portstr, serial.device(1))
+
+ def test_BaudrateSetting(self):
+ self.s.port = PORT
+ self.s.open()
+ for baudrate in (300, 9600, 19200, 115200):
+ self.s.baudrate = baudrate
+ #test get method
+ self.failUnlessEqual(self.s.baudrate, baudrate)
+ #test internals
+ self.failUnlessEqual(self.s._baudrate, baudrate)
+ #test illegal values
+ for illegal_value in (-300, -1, 'a', None):
+ self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value)
+
+ def test_BaudrateSetting2(self):
+ #test illegal values, depending on machine/port some of these may be valid...
+ self.s.port = PORT
+ self.s.open()
+ for illegal_value in (500000,576000,921600,92160):
+ self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value)
+
+ def test_BytesizeSetting(self):
+ for bytesize in (5,6,7,8):
+ self.s.bytesize = bytesize
+ #test get method
+ self.failUnlessEqual(self.s.bytesize, bytesize)
+ #test internals
+ self.failUnlessEqual(self.s._bytesize, bytesize)
+ #test illegal values
+ for illegal_value in (0, 1, 3, 4, 9, 10, 'a', None):
+ self.failUnlessRaises(ValueError, self.s.setByteSize, illegal_value)
+
+ def test_ParitySetting(self):
+ for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD):
+ self.s.parity = parity
+ #test get method
+ self.failUnlessEqual(self.s.parity, parity)
+ #test internals
+ self.failUnlessEqual(self.s._parity, parity)
+ #test illegal values
+ for illegal_value in (0, 57, 'a', None):
+ self.failUnlessRaises(ValueError, self.s.setParity, illegal_value)
+
+ def test_StopbitsSetting(self):
+ for stopbits in (1, 2):
+ self.s.stopbits = stopbits
+ #test get method
+ self.failUnlessEqual(self.s.stopbits, stopbits)
+ #test internals
+ self.failUnlessEqual(self.s._stopbits, stopbits)
+ #test illegal values
+ for illegal_value in (0, 3, 1.5, 57, 'a', None):
+ self.failUnlessRaises(ValueError, self.s.setStopbits, illegal_value)
+
+ def test_TimeoutSetting(self):
+ for timeout in (None, 0, 1, 3.14159, 10, 1000, 3600):
+ self.s.timeout = timeout
+ #test get method
+ self.failUnlessEqual(self.s.timeout, timeout)
+ #test internals
+ self.failUnlessEqual(self.s._timeout, timeout)
+ #test illegal values
+ for illegal_value in (-1, 'a'):
+ self.failUnlessRaises(ValueError, self.s.setTimeout, illegal_value)
+
+ def test_XonXoffSetting(self):
+ for xonxoff in (True, False):
+ self.s.xonxoff = xonxoff
+ #test get method
+ self.failUnlessEqual(self.s.xonxoff, xonxoff)
+ #test internals
+ self.failUnlessEqual(self.s._xonxoff, xonxoff)
+ #no illegal values here, normal rules for the boolean value of an
+ #object are used thus all objects have a truth value.
+
+ def test_RtsCtsSetting(self):
+ for rtscts in (True, False):
+ self.s.rtscts = rtscts
+ #test get method
+ self.failUnlessEqual(self.s.rtscts, rtscts)
+ #test internals
+ self.failUnlessEqual(self.s._rtscts, rtscts)
+ #no illegal values here, normal rules for the boolean value of an
+ #object are used thus all objects have a truth value.
+
+ def test_UnconfiguredPort(self):
+ #an unconfigured port cannot be opened
+ self.failUnlessRaises(serial.SerialException, self.s.open)
+
+ def test_PortOpenClose(self):
+ self.s.port = PORT
+ for i in range(3):
+ #open the port and check flag
+ self.failUnless(not self.s.isOpen())
+ self.s.open()
+ self.failUnless(self.s.isOpen())
+ self.s.close()
+ self.failUnless(not self.s.isOpen())
+
+if __name__ == '__main__':
+ import sys
+ print __doc__
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ print "Testing port", PORT
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/examples/test_high_load.py b/examples/test_high_load.py
new file mode 100644
index 0000000..2c37fcf
--- /dev/null
+++ b/examples/test_high_load.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python
+#Python Serial Port Extension for Win32, Linux, BSD, Jython
+#see __init__.py
+#
+#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""Some tests for the serial module.
+Part of pyserial (http://pyserial.sf.net) (C)2002-2003 cliechti@gmx.net
+
+Intended to be run on different platforms, to ensure portability of
+the code.
+
+For all these tests a simple hardware is required.
+Loopback HW adapter:
+Shortcut these pin pairs:
+ TX <-> RX
+ RTS <-> CTS
+ DTR <-> DSR
+
+On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
+"""
+
+import unittest, threading, time
+import serial
+
+#on which port should the tests be performed:
+PORT=0
+BAUDRATE=115200
+#~ BAUDRATE=9600
+
+
+class TestHighLoad(unittest.TestCase):
+ """Test sending and receiving large amount of data"""
+
+ N = 16
+ #~ N = 1
+
+ def setUp(self):
+ self.s = serial.Serial(PORT,BAUDRATE, timeout=10)
+ def tearDown(self):
+ self.s.close()
+
+ def test0_WriteReadLoopback(self):
+ """Send big strings, write/read order."""
+ for i in range(self.N):
+ q = ''.join(map(chr,range(256)))
+ self.s.write(q)
+ self.failUnless(self.s.read(len(q))==q, "expected a '%s' which was written before" % q)
+ self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read")
+
+ def test1_WriteWriteReadLoopback(self):
+ """Send big strings, multiple write one read."""
+ q = ''.join(map(chr,range(256)))
+ for i in range(self.N):
+ self.s.write(q)
+ read = self.s.read(len(q)*self.N)
+ self.failUnless(read==q*self.N, "expected what was written before. got %d bytes, expected %d" % (len(read), self.N*len(q)))
+ self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read")
+
+if __name__ == '__main__':
+ import sys
+ print __doc__
+ if len(sys.argv) > 1:
+ PORT = sys.argv[1]
+ print "Testing port", PORT
+ sys.argv[1:] = ['-v']
+ # When this module is executed from the command-line, it runs all its tests
+ unittest.main()
diff --git a/examples/wxSerialConfigDialog.py b/examples/wxSerialConfigDialog.py
new file mode 100644
index 0000000..7085035
--- /dev/null
+++ b/examples/wxSerialConfigDialog.py
@@ -0,0 +1,260 @@
+#!/usr/bin/env python
+# generated by wxGlade 0.3.1 on Thu Oct 02 23:25:44 2003
+
+#from wxPython.wx import *
+import wx
+import serial
+
+SHOW_BAUDRATE = 1<<0
+SHOW_FORMAT = 1<<1
+SHOW_FLOW = 1<<2
+SHOW_TIMEOUT = 1<<3
+SHOW_ALL = SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT
+
+try:
+ enumerate
+except NameError:
+ def enumerate(sequence):
+ return zip(range(len(sequence)), sequence)
+
+class SerialConfigDialog(wx.Dialog):
+ """Serial Port confiuration dialog, to be used with pyserial 2.0+
+ When instantiating a class of this dialog, then the "serial" keyword
+ argument is mandatory. It is a reference to a serial.Serial instance.
+ the optional "show" keyword argument can be used to show/hide different
+ settings. The default is SHOW_ALL which coresponds to
+ SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT. All constants can be
+ found in ths module (not the class)."""
+
+ def __init__(self, *args, **kwds):
+ #grab the serial keyword and remove it from the dict
+ self.serial = kwds['serial']
+ del kwds['serial']
+ self.show = SHOW_ALL
+ if kwds.has_key('show'):
+ self.show = kwds['show']
+ del kwds['show']
+ # begin wxGlade: SerialConfigDialog.__init__
+ # end wxGlade
+ kwds["style"] = wx.DEFAULT_DIALOG_STYLE
+ wx.Dialog.__init__(self, *args, **kwds)
+ self.label_2 = wx.StaticText(self, -1, "Port")
+ self.combo_box_port = wx.ComboBox(self, -1, choices=["dummy1", "dummy2", "dummy3", "dummy4", "dummy5"], style=wx.CB_DROPDOWN)
+ if self.show & SHOW_BAUDRATE:
+ self.label_1 = wx.StaticText(self, -1, "Baudrate")
+ self.choice_baudrate = wx.Choice(self, -1, choices=["choice 1"])
+ if self.show & SHOW_FORMAT:
+ self.label_3 = wx.StaticText(self, -1, "Data Bits")
+ self.choice_databits = wx.Choice(self, -1, choices=["choice 1"])
+ self.label_4 = wx.StaticText(self, -1, "Stop Bits")
+ self.choice_stopbits = wx.Choice(self, -1, choices=["choice 1"])
+ self.label_5 = wx.StaticText(self, -1, "Parity")
+ self.choice_parity = wx.Choice(self, -1, choices=["choice 1"])
+ if self.show & SHOW_TIMEOUT:
+ self.checkbox_timeout = wx.CheckBox(self, -1, "Use Timeout")
+ self.text_ctrl_timeout = wx.TextCtrl(self, -1, "")
+ self.label_6 = wx.StaticText(self, -1, "seconds")
+ if self.show & SHOW_FLOW:
+ self.checkbox_rtscts = wx.CheckBox(self, -1, "RTS/CTS")
+ self.checkbox_xonxoff = wx.CheckBox(self, -1, "Xon/Xoff")
+ self.button_ok = wx.Button(self, -1, "OK")
+ self.button_cancel = wx.Button(self, -1, "Cancel")
+
+ self.__set_properties()
+ self.__do_layout()
+ #fill in ports and select current setting
+ index = 0
+ self.combo_box_port.Clear()
+ for n in range(4):
+ portname = serial.device(n)
+ self.combo_box_port.Append(portname)
+ if self.serial.portstr == portname:
+ index = n
+ if self.serial.portstr is not None:
+ self.combo_box_port.SetValue(str(self.serial.portstr))
+ else:
+ self.combo_box_port.SetSelection(index)
+ if self.show & SHOW_BAUDRATE:
+ #fill in badrates and select current setting
+ self.choice_baudrate.Clear()
+ for n, baudrate in enumerate(self.serial.BAUDRATES):
+ self.choice_baudrate.Append(str(baudrate))
+ if self.serial.baudrate == baudrate:
+ index = n
+ self.choice_baudrate.SetSelection(index)
+ if self.show & SHOW_FORMAT:
+ #fill in databits and select current setting
+ self.choice_databits.Clear()
+ for n, bytesize in enumerate(self.serial.BYTESIZES):
+ self.choice_databits.Append(str(bytesize))
+ if self.serial.bytesize == bytesize:
+ index = n
+ self.choice_databits.SetSelection(index)
+ #fill in stopbits and select current setting
+ self.choice_stopbits.Clear()
+ for n, stopbits in enumerate(self.serial.STOPBITS):
+ self.choice_stopbits.Append(str(stopbits))
+ if self.serial.stopbits == stopbits:
+ index = n
+ self.choice_stopbits.SetSelection(index)
+ #fill in parities and select current setting
+ self.choice_parity.Clear()
+ for n, parity in enumerate(self.serial.PARITIES):
+ self.choice_parity.Append(str(serial.PARITY_NAMES[parity]))
+ if self.serial.parity == parity:
+ index = n
+ self.choice_parity.SetSelection(index)
+ if self.show & SHOW_TIMEOUT:
+ #set the timeout mode and value
+ if self.serial.timeout is None:
+ self.checkbox_timeout.SetValue(False)
+ self.text_ctrl_timeout.Enable(False)
+ else:
+ self.checkbox_timeout.SetValue(True)
+ self.text_ctrl_timeout.Enable(True)
+ self.text_ctrl_timeout.SetValue(str(self.serial.timeout))
+ if self.show & SHOW_FLOW:
+ #set the rtscts mode
+ self.checkbox_rtscts.SetValue(self.serial.rtscts)
+ #set the rtscts mode
+ self.checkbox_xonxoff.SetValue(self.serial.xonxoff)
+ #attach the event handlers
+ self.__attach_events()
+
+ def __set_properties(self):
+ # begin wxGlade: SerialConfigDialog.__set_properties
+ # end wxGlade
+ self.SetTitle("Serial Port Configuration")
+ if self.show & SHOW_TIMEOUT:
+ self.text_ctrl_timeout.Enable(0)
+ self.button_ok.SetDefault()
+
+ def __do_layout(self):
+ # begin wxGlade: SerialConfigDialog.__do_layout
+ # end wxGlade
+ sizer_2 = wx.BoxSizer(wx.VERTICAL)
+ sizer_3 = wx.BoxSizer(wx.HORIZONTAL)
+ sizer_basics = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Basics"), wx.VERTICAL)
+ sizer_5 = wx.BoxSizer(wx.HORIZONTAL)
+ sizer_5.Add(self.label_2, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_5.Add(self.combo_box_port, 1, 0, 0)
+ sizer_basics.Add(sizer_5, 0, wx.RIGHT|wx.EXPAND, 0)
+ if self.show & SHOW_BAUDRATE:
+ sizer_baudrate = wx.BoxSizer(wx.HORIZONTAL)
+ sizer_baudrate.Add(self.label_1, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_baudrate.Add(self.choice_baudrate, 1, wx.ALIGN_RIGHT, 0)
+ sizer_basics.Add(sizer_baudrate, 0, wx.EXPAND, 0)
+ sizer_2.Add(sizer_basics, 0, wx.EXPAND, 0)
+ if self.show & SHOW_FORMAT:
+ sizer_8 = wx.BoxSizer(wx.HORIZONTAL)
+ sizer_7 = wx.BoxSizer(wx.HORIZONTAL)
+ sizer_6 = wx.BoxSizer(wx.HORIZONTAL)
+ sizer_format = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Data Format"), wx.VERTICAL)
+ sizer_6.Add(self.label_3, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_6.Add(self.choice_databits, 1, wx.ALIGN_RIGHT, 0)
+ sizer_format.Add(sizer_6, 0, wx.EXPAND, 0)
+ sizer_7.Add(self.label_4, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_7.Add(self.choice_stopbits, 1, wx.ALIGN_RIGHT, 0)
+ sizer_format.Add(sizer_7, 0, wx.EXPAND, 0)
+ sizer_8.Add(self.label_5, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_8.Add(self.choice_parity, 1, wx.ALIGN_RIGHT, 0)
+ sizer_format.Add(sizer_8, 0, wx.EXPAND, 0)
+ sizer_2.Add(sizer_format, 0, wx.EXPAND, 0)
+ if self.show & SHOW_TIMEOUT:
+ sizer_timeout = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Timeout"), wx.HORIZONTAL)
+ sizer_timeout.Add(self.checkbox_timeout, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_timeout.Add(self.text_ctrl_timeout, 0, 0, 0)
+ sizer_timeout.Add(self.label_6, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_2.Add(sizer_timeout, 0, 0, 0)
+ if self.show & SHOW_FLOW:
+ sizer_flow = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Flow Control"), wx.HORIZONTAL)
+ sizer_flow.Add(self.checkbox_rtscts, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_flow.Add(self.checkbox_xonxoff, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
+ sizer_flow.Add((10,10), 1, wx.EXPAND, 0)
+ sizer_2.Add(sizer_flow, 0, wx.EXPAND, 0)
+ sizer_3.Add(self.button_ok, 0, 0, 0)
+ sizer_3.Add(self.button_cancel, 0, 0, 0)
+ sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4)
+ self.SetAutoLayout(1)
+ self.SetSizer(sizer_2)
+ sizer_2.Fit(self)
+ sizer_2.SetSizeHints(self)
+ self.Layout()
+
+ def __attach_events(self):
+ wx.EVT_BUTTON(self, self.button_ok.GetId(), self.OnOK)
+ wx.EVT_BUTTON(self, self.button_cancel.GetId(), self.OnCancel)
+ if self.show & SHOW_TIMEOUT:
+ wx.EVT_CHECKBOX(self, self.checkbox_timeout.GetId(), self.OnTimeout)
+
+ def OnOK(self, events):
+ success = True
+ self.serial.port = str(self.combo_box_port.GetValue())
+ if self.show & SHOW_BAUDRATE:
+ self.serial.baudrate = self.serial.BAUDRATES[self.choice_baudrate.GetSelection()]
+ if self.show & SHOW_FORMAT:
+ self.serial.bytesize = self.serial.BYTESIZES[self.choice_databits.GetSelection()]
+ self.serial.stopbits = self.serial.STOPBITS[self.choice_stopbits.GetSelection()]
+ self.serial.parity = self.serial.PARITIES[self.choice_parity.GetSelection()]
+ if self.show & SHOW_FLOW:
+ self.serial.rtscts = self.checkbox_rtscts.GetValue()
+ self.serial.xonxoff = self.checkbox_xonxoff.GetValue()
+ if self.show & SHOW_TIMEOUT:
+ if self.checkbox_timeout.GetValue():
+ try:
+ self.serial.timeout = float(self.text_ctrl_timeout.GetValue())
+ except ValueError:
+ dlg = wx.MessageDialog(self, 'Timeout must be a numeric value',
+ 'Value Error', wx.OK | wx.ICON_ERROR)
+ dlg.ShowModal()
+ dlg.Destroy()
+ success = False
+ else:
+ self.serial.timeout = None
+ if success:
+ self.EndModal(wx.ID_OK)
+
+ def OnCancel(self, events):
+ self.EndModal(wx.ID_CANCEL)
+
+ def OnTimeout(self, events):
+ if self.checkbox_timeout.GetValue():
+ self.text_ctrl_timeout.Enable(True)
+ else:
+ self.text_ctrl_timeout.Enable(False)
+
+# end of class SerialConfigDialog
+
+
+class MyApp(wx.App):
+ """Test code"""
+ def OnInit(self):
+ wx.InitAllImageHandlers()
+
+ ser = serial.Serial()
+ print ser
+ #loop until cancel is pressed, old values are used as start for the next run
+ #show the different views, one after the other
+ #value are kept.
+ for flags in (SHOW_BAUDRATE, SHOW_FLOW, SHOW_FORMAT, SHOW_TIMEOUT, SHOW_ALL):
+ dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser, show=flags)
+ self.SetTopWindow(dialog_serial_cfg)
+ result = dialog_serial_cfg.ShowModal()
+ print ser
+ if result != wx.ID_OK:
+ break
+ #the user can play around with the values, CANCEL aborts the loop
+ while 1:
+ dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser)
+ self.SetTopWindow(dialog_serial_cfg)
+ result = dialog_serial_cfg.ShowModal()
+ print ser
+ if result != wx.ID_OK:
+ break
+ return 0
+
+# end of class MyApp
+
+if __name__ == "__main__":
+ app = MyApp(0)
+ app.MainLoop()
diff --git a/examples/wxSerialConfigDialog.wxg b/examples/wxSerialConfigDialog.wxg
new file mode 100644
index 0000000..f5e92e0
--- /dev/null
+++ b/examples/wxSerialConfigDialog.wxg
@@ -0,0 +1,262 @@
+<?xml version="1.0"?>
+<!-- generated by wxGlade 0.3.1 on Fri Oct 03 01:53:04 2003 -->
+
+<application path="D:\prog\python\pyserial_sf\pyserial\examples\wxSerialConfigDialog.py" name="app" class="MyApp" option="0" language="python" top_window="dialog_serial_cfg" encoding="ISO-8859-1" use_gettext="0" overwrite="0">
+ <object class="SerialConfigDialog" name="dialog_serial_cfg" base="EditDialog">
+ <style>wxDEFAULT_DIALOG_STYLE</style>
+ <title>Serial Port Configuration</title>
+ <object class="wxBoxSizer" name="sizer_2" base="EditBoxSizer">
+ <orient>wxVERTICAL</orient>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxStaticBoxSizer" name="sizer_basics" base="EditStaticBoxSizer">
+ <orient>wxVERTICAL</orient>
+ <label>Basics</label>
+ <object class="sizeritem">
+ <flag>wxRIGHT|wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxBoxSizer" name="sizer_5" base="EditBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>1</option>
+ <object class="wxStaticText" name="label_2" base="EditStaticText">
+ <attribute>1</attribute>
+ <label>Port</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <border>0</border>
+ <option>1</option>
+ <object class="wxComboBox" name="combo_box_port" base="EditComboBox">
+ <selection>0</selection>
+ <choices>
+ <choice>dummy1</choice>
+ <choice>dummy2</choice>
+ <choice>dummy3</choice>
+ <choice>dummy4</choice>
+ <choice>dummy5</choice>
+ </choices>
+ </object>
+ </object>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxBoxSizer" name="sizer_baudrate" base="EditBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>1</option>
+ <object class="wxStaticText" name="label_1" base="EditStaticText">
+ <attribute>1</attribute>
+ <label>Baudrate</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALIGN_RIGHT</flag>
+ <border>0</border>
+ <option>1</option>
+ <object class="wxChoice" name="choice_baudrate" base="EditChoice">
+ <selection>0</selection>
+ <choices>
+ <choice>choice 1</choice>
+ </choices>
+ </object>
+ </object>
+ </object>
+ </object>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxStaticBoxSizer" name="sizer_format" base="EditStaticBoxSizer">
+ <orient>wxVERTICAL</orient>
+ <label>Data Format</label>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxBoxSizer" name="sizer_6" base="EditBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>1</option>
+ <object class="wxStaticText" name="label_3" base="EditStaticText">
+ <attribute>1</attribute>
+ <label>Data Bits</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALIGN_RIGHT</flag>
+ <border>0</border>
+ <option>1</option>
+ <object class="wxChoice" name="choice_databits" base="EditChoice">
+ <selection>0</selection>
+ <choices>
+ <choice>choice 1</choice>
+ </choices>
+ </object>
+ </object>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxBoxSizer" name="sizer_7" base="EditBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>1</option>
+ <object class="wxStaticText" name="label_4" base="EditStaticText">
+ <attribute>1</attribute>
+ <label>Stop Bits</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALIGN_RIGHT</flag>
+ <border>0</border>
+ <option>1</option>
+ <object class="wxChoice" name="choice_stopbits" base="EditChoice">
+ <selection>0</selection>
+ <choices>
+ <choice>choice 1</choice>
+ </choices>
+ </object>
+ </object>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxBoxSizer" name="sizer_8" base="EditBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>1</option>
+ <object class="wxStaticText" name="label_5" base="EditStaticText">
+ <attribute>1</attribute>
+ <label>Parity</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALIGN_RIGHT</flag>
+ <border>0</border>
+ <option>1</option>
+ <object class="wxChoice" name="choice_parity" base="EditChoice">
+ <selection>0</selection>
+ <choices>
+ <choice>choice 1</choice>
+ </choices>
+ </object>
+ </object>
+ </object>
+ </object>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <border>0</border>
+ <option>0</option>
+ <object class="wxStaticBoxSizer" name="sizer_timeout" base="EditStaticBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <label>Timeout</label>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>0</option>
+ <object class="wxCheckBox" name="checkbox_timeout" base="EditCheckBox">
+ <label>Use Timeout</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <border>0</border>
+ <option>0</option>
+ <object class="wxTextCtrl" name="text_ctrl_timeout" base="EditTextCtrl">
+ <disabled>1</disabled>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>0</option>
+ <object class="wxStaticText" name="label_6" base="EditStaticText">
+ <attribute>1</attribute>
+ <label>seconds</label>
+ </object>
+ </object>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxStaticBoxSizer" name="sizer_flow" base="EditStaticBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <label>Flow Control</label>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>0</option>
+ <object class="wxCheckBox" name="checkbox_rtscts" base="EditCheckBox">
+ <label>RTS/CTS</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
+ <border>4</border>
+ <option>0</option>
+ <object class="wxCheckBox" name="checkbox_xonxoff" base="EditCheckBox">
+ <label>Xon/Xoff</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>1</option>
+ <object class="spacer" name="spacer" base="EditSpacer">
+ <height>10</height>
+ <width>10</width>
+ </object>
+ </object>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_RIGHT</flag>
+ <border>4</border>
+ <option>0</option>
+ <object class="wxBoxSizer" name="sizer_3" base="EditBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <object class="sizeritem">
+ <border>0</border>
+ <option>0</option>
+ <object class="wxButton" name="button_ok" base="EditButton">
+ <default>1</default>
+ <label>OK</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <border>0</border>
+ <option>0</option>
+ <object class="wxButton" name="button_cancel" base="EditButton">
+ <label>Cancel</label>
+ </object>
+ </object>
+ </object>
+ </object>
+ </object>
+ </object>
+</application>
diff --git a/examples/wxTerminal.py b/examples/wxTerminal.py
new file mode 100644
index 0000000..646c272
--- /dev/null
+++ b/examples/wxTerminal.py
@@ -0,0 +1,333 @@
+#!/usr/bin/env python
+# generated by wxGlade 0.3.1 on Fri Oct 03 23:23:45 2003
+
+#from wxPython.wx import *
+import wx
+import wxSerialConfigDialog
+import serial
+import threading
+
+#----------------------------------------------------------------------
+# Create an own event type, so that GUI updates can be delegated
+# this is required as on some platforms only the main thread can
+# access the GUI without crashing. wxMutexGuiEnter/wxMutexGuiLeave
+# could be used too, but an event is more elegant.
+
+SERIALRX = wx.NewEventType()
+# bind to serial data receive events
+EVT_SERIALRX = wx.PyEventBinder(SERIALRX, 0)
+
+class SerialRxEvent(wx.PyCommandEvent):
+ eventType = SERIALRX
+ def __init__(self, windowID, data):
+ wx.PyCommandEvent.__init__(self, self.eventType, windowID)
+ self.data = data
+
+ def Clone(self):
+ self.__class__(self.GetId(), self.data)
+
+#----------------------------------------------------------------------
+
+ID_CLEAR = wx.NewId()
+ID_SAVEAS = wx.NewId()
+ID_SETTINGS = wx.NewId()
+ID_TERM = wx.NewId()
+ID_EXIT = wx.NewId()
+
+NEWLINE_CR = 0
+NEWLINE_LF = 1
+NEWLINE_CRLF = 2
+
+class TerminalSetup:
+ """Placeholder for various terminal settings. Used to pass the
+ options to the TerminalSettingsDialog."""
+ def __init__(self):
+ self.echo = False
+ self.unprintable = False
+ self.newline = NEWLINE_CRLF
+
+class TerminalSettingsDialog(wx.Dialog):
+ """Simple dialog with common terminal settings like echo, newline mode."""
+
+ def __init__(self, *args, **kwds):
+ self.settings = kwds['settings']
+ del kwds['settings']
+ # begin wxGlade: TerminalSettingsDialog.__init__
+ kwds["style"] = wx.DEFAULT_DIALOG_STYLE
+ wx.Dialog.__init__(self, *args, **kwds)
+ self.checkbox_echo = wx.CheckBox(self, -1, "Local Echo")
+ self.checkbox_unprintable = wx.CheckBox(self, -1, "Show unprintable characters")
+ self.radio_box_newline = wx.RadioBox(self, -1, "Newline Handling", choices=["CR only", "LF only", "CR+LF"], majorDimension=0, style=wx.RA_SPECIFY_ROWS)
+ self.button_ok = wx.Button(self, -1, "OK")
+ self.button_cancel = wx.Button(self, -1, "Cancel")
+
+ self.__set_properties()
+ self.__do_layout()
+ # end wxGlade
+ self.__attach_events()
+ self.checkbox_echo.SetValue(self.settings.echo)
+ self.checkbox_unprintable.SetValue(self.settings.unprintable)
+ self.radio_box_newline.SetSelection(self.settings.newline)
+
+ def __set_properties(self):
+ # begin wxGlade: TerminalSettingsDialog.__set_properties
+ self.SetTitle("Terminal Settings")
+ self.radio_box_newline.SetSelection(0)
+ self.button_ok.SetDefault()
+ # end wxGlade
+
+ def __do_layout(self):
+ # begin wxGlade: TerminalSettingsDialog.__do_layout
+ sizer_2 = wx.BoxSizer(wx.VERTICAL)
+ sizer_3 = wx.BoxSizer(wx.HORIZONTAL)
+ sizer_4 = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Input/Output"), wx.VERTICAL)
+ sizer_4.Add(self.checkbox_echo, 0, wx.ALL, 4)
+ sizer_4.Add(self.checkbox_unprintable, 0, wx.ALL, 4)
+ sizer_4.Add(self.radio_box_newline, 0, 0, 0)
+ sizer_2.Add(sizer_4, 0, wx.EXPAND, 0)
+ sizer_3.Add(self.button_ok, 0, 0, 0)
+ sizer_3.Add(self.button_cancel, 0, 0, 0)
+ sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4)
+ self.SetAutoLayout(1)
+ self.SetSizer(sizer_2)
+ sizer_2.Fit(self)
+ sizer_2.SetSizeHints(self)
+ self.Layout()
+ # end wxGlade
+
+ def __attach_events(self):
+ self.Bind(wx.EVT_BUTTON, self.OnOK, id = self.button_ok.GetId())
+ self.Bind(wx.EVT_BUTTON, self.OnCancel, id = self.button_cancel.GetId())
+
+ def OnOK(self, events):
+ """Update data wil new values and close dialog."""
+ self.settings.echo = self.checkbox_echo.GetValue()
+ self.settings.unprintable = self.checkbox_unprintable.GetValue()
+ self.settings.newline = self.radio_box_newline.GetSelection()
+ self.EndModal(wx.ID_OK)
+
+ def OnCancel(self, events):
+ """Do not update data but close dialog."""
+ self.EndModal(wx.ID_CANCEL)
+
+# end of class TerminalSettingsDialog
+
+
+class TerminalFrame(wx.Frame):
+ """Simple terminal program for wxPython"""
+
+ def __init__(self, *args, **kwds):
+ self.serial = serial.Serial()
+ self.serial.timeout = 0.5 #make sure that the alive event can be checked from time to time
+ self.settings = TerminalSetup() #placeholder for the settings
+ self.thread = None
+ self.alive = threading.Event()
+ # begin wxGlade: TerminalFrame.__init__
+ kwds["style"] = wx.DEFAULT_FRAME_STYLE
+ wx.Frame.__init__(self, *args, **kwds)
+ self.text_ctrl_output = wx.TextCtrl(self, -1, "", style=wx.TE_MULTILINE|wx.TE_READONLY)
+
+ # Menu Bar
+ self.frame_terminal_menubar = wx.MenuBar()
+ self.SetMenuBar(self.frame_terminal_menubar)
+ wxglade_tmp_menu = wx.Menu()
+ wxglade_tmp_menu.Append(ID_CLEAR, "&Clear", "", wx.ITEM_NORMAL)
+ wxglade_tmp_menu.Append(ID_SAVEAS, "&Save Text As...", "", wx.ITEM_NORMAL)
+ wxglade_tmp_menu.AppendSeparator()
+ wxglade_tmp_menu.Append(ID_SETTINGS, "&Port Settings...", "", wx.ITEM_NORMAL)
+ wxglade_tmp_menu.Append(ID_TERM, "&Terminal Settings...", "", wx.ITEM_NORMAL)
+ wxglade_tmp_menu.AppendSeparator()
+ wxglade_tmp_menu.Append(ID_EXIT, "&Exit", "", wx.ITEM_NORMAL)
+ self.frame_terminal_menubar.Append(wxglade_tmp_menu, "&File")
+ # Menu Bar end
+
+ self.__set_properties()
+ self.__do_layout()
+ # end wxGlade
+ self.__attach_events() #register events
+ self.OnPortSettings(None) #call setup dialog on startup, opens port
+ if not self.alive.isSet():
+ self.Close()
+
+ def StartThread(self):
+ """Start the receiver thread"""
+ self.thread = threading.Thread(target=self.ComPortThread)
+ self.thread.setDaemon(1)
+ self.alive.set()
+ self.thread.start()
+
+ def StopThread(self):
+ """Stop the receiver thread, wait util it's finished."""
+ if self.thread is not None:
+ self.alive.clear() #clear alive event for thread
+ self.thread.join() #wait until thread has finished
+ self.thread = None
+
+ def __set_properties(self):
+ # begin wxGlade: TerminalFrame.__set_properties
+ self.SetTitle("Serial Terminal")
+ self.SetSize((546, 383))
+ # end wxGlade
+
+ def __do_layout(self):
+ # begin wxGlade: TerminalFrame.__do_layout
+ sizer_1 = wx.BoxSizer(wx.VERTICAL)
+ sizer_1.Add(self.text_ctrl_output, 1, wx.EXPAND, 0)
+ self.SetAutoLayout(1)
+ self.SetSizer(sizer_1)
+ self.Layout()
+ # end wxGlade
+
+ def __attach_events(self):
+ #register events at the controls
+ self.Bind(wx.EVT_MENU, self.OnClear, id = ID_CLEAR)
+ self.Bind(wx.EVT_MENU, self.OnSaveAs, id = ID_SAVEAS)
+ self.Bind(wx.EVT_MENU, self.OnExit, id = ID_EXIT)
+ self.Bind(wx.EVT_MENU, self.OnPortSettings, id = ID_SETTINGS)
+ self.Bind(wx.EVT_MENU, self.OnTermSettings, id = ID_TERM)
+ self.text_ctrl_output.Bind(wx.EVT_CHAR, self.OnKey)
+ self.Bind(EVT_SERIALRX, self.OnSerialRead)
+ self.Bind(wx.EVT_CLOSE, self.OnClose)
+
+ def OnExit(self, event):
+ """Menu point Exit"""
+ self.Close()
+
+ def OnClose(self, event):
+ """Called on application shutdown."""
+ self.StopThread() #stop reader thread
+ self.serial.close() #cleanup
+ self.Destroy() #close windows, exit app
+
+ def OnSaveAs(self, event):
+ """Save contents of output window."""
+ filename = None
+ dlg = wx.FileDialog(None, "Save Text As...", ".", "", "Text File|*.txt|All Files|*", wx.SAVE)
+ if dlg.ShowModal() == wx.ID_OK:
+ filename = dlg.GetPath()
+ dlg.Destroy()
+
+ if filename is not None:
+ f = file(filename, 'w')
+ text = self.text_ctrl_output.GetValue()
+ if type(text) == unicode:
+ text = text.encode("latin1") #hm, is that a good asumption?
+ f.write(text)
+ f.close()
+
+ def OnClear(self, event):
+ """Clear contents of output window."""
+ self.text_ctrl_output.Clear()
+
+ def OnPortSettings(self, event=None):
+ """Show the portsettings dialog. The reader thread is stopped for the
+ settings change."""
+ if event is not None: #will be none when called on startup
+ self.StopThread()
+ self.serial.close()
+ ok = False
+ while not ok:
+ dialog_serial_cfg = wxSerialConfigDialog.SerialConfigDialog(None, -1, "",
+ show=wxSerialConfigDialog.SHOW_BAUDRATE|wxSerialConfigDialog.SHOW_FORMAT|wxSerialConfigDialog.SHOW_FLOW,
+ serial=self.serial
+ )
+ result = dialog_serial_cfg.ShowModal()
+ dialog_serial_cfg.Destroy()
+ #open port if not called on startup, open it on startup and OK too
+ if result == wx.ID_OK or event is not None:
+ try:
+ self.serial.open()
+ except serial.SerialException, e:
+ dlg = wx.MessageDialog(None, str(e), "Serial Port Error", wx.OK | wx.ICON_ERROR)
+ dlg.ShowModal()
+ dlg.Destroy()
+ else:
+ self.StartThread()
+ self.SetTitle("Serial Terminal on %s [%s, %s%s%s%s%s]" % (
+ self.serial.portstr,
+ self.serial.baudrate,
+ self.serial.bytesize,
+ self.serial.parity,
+ self.serial.stopbits,
+ self.serial.rtscts and ' RTS/CTS' or '',
+ self.serial.xonxoff and ' Xon/Xoff' or '',
+ )
+ )
+ ok = True
+ else:
+ #on startup, dialog aborted
+ self.alive.clear()
+ ok = True
+
+ def OnTermSettings(self, event):
+ """Menu point Terminal Settings. Show the settings dialog
+ with the current terminal settings"""
+ dialog = TerminalSettingsDialog(None, -1, "", settings=self.settings)
+ result = dialog.ShowModal()
+ dialog.Destroy()
+
+ def OnKey(self, event):
+ """Key event handler. if the key is in the ASCII range, write it to the serial port.
+ Newline handling and local echo is also done here."""
+ code = event.GetKeyCode()
+ if code < 256: #is it printable?
+ if code == 13: #is it a newline? (check for CR which is the RETURN key)
+ if self.settings.echo: #do echo if needed
+ self.text_ctrl_output.AppendText('\n')
+ if self.settings.newline == NEWLINE_CR:
+ self.serial.write('\r') #send CR
+ elif self.settings.newline == NEWLINE_LF:
+ self.serial.write('\n') #send LF
+ elif self.settings.newline == NEWLINE_CRLF:
+ self.serial.write('\r\n') #send CR+LF
+ else:
+ char = chr(code)
+ if self.settings.echo: #do echo if needed
+ self.text_ctrl_output.WriteText(char)
+ self.serial.write(char) #send the charcater
+ else:
+ print "Extra Key:", code
+
+ def OnSerialRead(self, event):
+ """Handle input from the serial port."""
+ text = event.data
+ if self.settings.unprintable:
+ text = ''.join([(c >= ' ') and c or '<%d>' % ord(c) for c in text])
+ self.text_ctrl_output.AppendText(text)
+
+ def ComPortThread(self):
+ """Thread that handles the incomming traffic. Does the basic input
+ transformation (newlines) and generates an SerialRxEvent"""
+ while self.alive.isSet(): #loop while alive event is true
+ text = self.serial.read(1) #read one, with timout
+ if text: #check if not timeout
+ n = self.serial.inWaiting() #look if there is more to read
+ if n:
+ text = text + self.serial.read(n) #get it
+ #newline transformation
+ if self.settings.newline == NEWLINE_CR:
+ text = text.replace('\r', '\n')
+ elif self.settings.newline == NEWLINE_LF:
+ pass
+ elif self.settings.newline == NEWLINE_CRLF:
+ text = text.replace('\r\n', '\n')
+ event = SerialRxEvent(self.GetId(), text)
+ self.GetEventHandler().AddPendingEvent(event)
+ #~ self.OnSerialRead(text) #output text in window
+
+# end of class TerminalFrame
+
+
+class MyApp(wx.App):
+ def OnInit(self):
+ wx.InitAllImageHandlers()
+ frame_terminal = TerminalFrame(None, -1, "")
+ self.SetTopWindow(frame_terminal)
+ frame_terminal.Show(1)
+ return 1
+
+# end of class MyApp
+
+if __name__ == "__main__":
+ app = MyApp(0)
+ app.MainLoop()
diff --git a/examples/wxTerminal.wxg b/examples/wxTerminal.wxg
new file mode 100644
index 0000000..183f876
--- /dev/null
+++ b/examples/wxTerminal.wxg
@@ -0,0 +1,127 @@
+<?xml version="1.0"?>
+<!-- generated by wxGlade 0.3.1 on Sat Oct 04 02:41:48 2003 -->
+
+<application path="D:\prog\python\pyserial_sf\pyserial\examples\wxTerminal.py" name="app" class="MyApp" option="0" language="python" top_window="frame_terminal" encoding="ISO-8859-1" use_gettext="0" overwrite="0">
+ <object class="TerminalFrame" name="frame_terminal" base="EditFrame">
+ <style>wxDEFAULT_FRAME_STYLE</style>
+ <title>Serial Terminal</title>
+ <menubar>1</menubar>
+ <size>546, 383</size>
+ <object class="wxBoxSizer" name="sizer_1" base="EditBoxSizer">
+ <orient>wxVERTICAL</orient>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>1</option>
+ <object class="wxTextCtrl" name="text_ctrl_output" base="EditTextCtrl">
+ <style>wxTE_MULTILINE|wxTE_READONLY</style>
+ </object>
+ </object>
+ </object>
+ <object class="wxMenuBar" name="frame_terminal_menubar" base="EditMenuBar">
+ <menus>
+ <menu name="" label="&amp;File">
+ <item>
+ <label>&amp;Clear</label>
+ <id>ID_CLEAR</id>
+ </item>
+ <item>
+ <label>&amp;Save Text As...</label>
+ <id>ID_SAVEAS</id>
+ </item>
+ <item>
+ <label>---</label>
+ <id>---</id>
+ <name>---</name>
+ </item>
+ <item>
+ <label>&amp;Port Settings...</label>
+ <id>ID_SETTINGS</id>
+ </item>
+ <item>
+ <label>&amp;Terminal Settings...</label>
+ <id>ID_TERM</id>
+ </item>
+ <item>
+ <label>---</label>
+ <name>---</name>
+ </item>
+ <item>
+ <label>&amp;Exit</label>
+ <id>ID_EXIT</id>
+ </item>
+ </menu>
+ </menus>
+ </object>
+ </object>
+ <object class="TerminalSettingsDialog" name="dialog_terminal_Settings" base="EditDialog">
+ <style>wxDEFAULT_DIALOG_STYLE</style>
+ <title>Terminal Settings</title>
+ <object class="wxBoxSizer" name="sizer_2" base="EditBoxSizer">
+ <orient>wxVERTICAL</orient>
+ <object class="sizeritem">
+ <flag>wxEXPAND</flag>
+ <border>0</border>
+ <option>0</option>
+ <object class="wxStaticBoxSizer" name="sizer_4" base="EditStaticBoxSizer">
+ <orient>wxVERTICAL</orient>
+ <label>Input/Output</label>
+ <object class="sizeritem">
+ <flag>wxALL</flag>
+ <border>4</border>
+ <option>0</option>
+ <object class="wxCheckBox" name="checkbox_echo" base="EditCheckBox">
+ <label>Local Echo</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALL</flag>
+ <border>4</border>
+ <option>0</option>
+ <object class="wxCheckBox" name="checkbox_unprintable" base="EditCheckBox">
+ <label>Show unprintable characters</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <border>0</border>
+ <option>0</option>
+ <object class="wxRadioBox" name="radio_box_newline" base="EditRadioBox">
+ <style>wxRA_SPECIFY_ROWS</style>
+ <selection>0</selection>
+ <dimension>0</dimension>
+ <label>Newline Handling</label>
+ <choices>
+ <choice>CR only</choice>
+ <choice>LF only</choice>
+ <choice>CR+LF</choice>
+ </choices>
+ </object>
+ </object>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <flag>wxALL|wxALIGN_RIGHT</flag>
+ <border>4</border>
+ <option>0</option>
+ <object class="wxBoxSizer" name="sizer_3" base="EditBoxSizer">
+ <orient>wxHORIZONTAL</orient>
+ <object class="sizeritem">
+ <border>0</border>
+ <option>0</option>
+ <object class="wxButton" name="button_ok" base="EditButton">
+ <default>1</default>
+ <label>OK</label>
+ </object>
+ </object>
+ <object class="sizeritem">
+ <border>0</border>
+ <option>0</option>
+ <object class="wxButton" name="button_cancel" base="EditButton">
+ <label>Cancel</label>
+ </object>
+ </object>
+ </object>
+ </object>
+ </object>
+ </object>
+</application>