diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/enhancedserial.py | 62 | ||||
-rw-r--r-- | examples/miniterm.py | 325 | ||||
-rw-r--r-- | examples/scan.py | 27 | ||||
-rw-r--r-- | examples/scanwin32.py | 190 | ||||
-rw-r--r-- | examples/setup-miniterm-py2exe.py | 25 | ||||
-rw-r--r-- | examples/setup_demo.py | 35 | ||||
-rw-r--r-- | examples/tcp_serial_redirect.py | 176 | ||||
-rw-r--r-- | examples/test.py | 189 | ||||
-rw-r--r-- | examples/test_advanced.py | 169 | ||||
-rw-r--r-- | examples/test_high_load.py | 69 | ||||
-rw-r--r-- | examples/wxSerialConfigDialog.py | 260 | ||||
-rw-r--r-- | examples/wxSerialConfigDialog.wxg | 262 | ||||
-rw-r--r-- | examples/wxTerminal.py | 333 | ||||
-rw-r--r-- | examples/wxTerminal.wxg | 127 |
14 files changed, 2249 insertions, 0 deletions
diff --git a/examples/enhancedserial.py b/examples/enhancedserial.py new file mode 100644 index 0000000..2c81ae1 --- /dev/null +++ b/examples/enhancedserial.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +"""Enhanced Serial Port class +part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net + +another implementation of the readline and readlines method. +this one should be more efficient because a bunch of characters are read +on each access, but the drawback is that a timeout must be specified to +make it work (enforced by the class __init__). + +this class could be enhanced with a read_until() method and more +like found in the telnetlib. +""" + +from serial import Serial + +class EnhancedSerial(Serial): + def __init__(self, *args, **kwargs): + #ensure that a reasonable timeout is set + timeout = kwargs.get('timeout',0.1) + if timeout < 0.01: timeout = 0.1 + kwargs['timeout'] = timeout + Serial.__init__(self, *args, **kwargs) + self.buf = '' + + def readline(self, maxsize=None, timeout=1): + """maxsize is ignored, timeout in seconds is the max time that is way for a complete line""" + tries = 0 + while 1: + self.buf += self.read(512) + pos = self.buf.find('\n') + if pos >= 0: + line, self.buf = self.buf[:pos+1], self.buf[pos+1:] + return line + tries += 1 + if tries * self.timeout > timeout: + break + line, self.buf = self.buf, '' + return line + + def readlines(self, sizehint=None, timeout=1): + """read all lines that are available. abort after timout + when no more data arrives.""" + lines = [] + while 1: + line = self.readline(timeout=timeout) + if line: + lines.append(line) + if not line or line[-1:] != '\n': + break + return lines + +if __name__=='__main__': + #do some simple tests with a Loopback HW (see test.py for details) + PORT = 0 + #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) ) + s = EnhancedSerial(PORT) + #write out some test data lines + s.write('\n'.join("hello how are you".split())) + #and read them back + print s.readlines() + #this one should print an empty list + print s.readlines(timeout=0.4) diff --git a/examples/miniterm.py b/examples/miniterm.py new file mode 100644 index 0000000..613af3d --- /dev/null +++ b/examples/miniterm.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python + +# Very simple serial terminal +# (C)2002-2006 Chris Liechti <cliechti@gmx.net> + +# Input characters are sent directly (only LF -> CR/LF/CRLF translation is +# done), received characters are displayed as is (or escaped trough pythons +# repr, useful for debug purposes) + + +import sys, os, serial, threading + +EXITCHARCTER = '\x1d' #GS/ctrl+] +UPLOADCHARACTER = '\x15' # Upload: ctrl+u + +#first choose a platform dependant way to read single characters from the console +global console + +if os.name == 'nt': + import msvcrt + class Console: + def __init__(self): + pass + + def setup(self): + pass # Do nothing for 'nt' + + def cleanup(self): + pass # Do nothing for 'nt' + + def getkey(): + while 1: + z = msvcrt.getch() + if z == '\0' or z == '\xe0': #functions keys + msvcrt.getch() + else: + if z == '\r': + return '\n' + return z + + console = Console() + +elif os.name == 'posix': + import termios, sys, os + class Console: + def __init__(self): + self.fd = sys.stdin.fileno() + + def setup(self): + self.old = termios.tcgetattr(self.fd) + new = termios.tcgetattr(self.fd) + new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG + new[6][termios.VMIN] = 1 + new[6][termios.VTIME] = 0 + termios.tcsetattr(self.fd, termios.TCSANOW, new) + #s = '' # We'll save the characters typed and add them to the pool. + + def getkey(self): + c = os.read(self.fd, 1) + return c + + def cleanup(self): + termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) + + console = Console() + + def cleanup_console(): + console.cleanup() + + console.setup() + sys.exitfunc = cleanup_console #terminal modes have to be restored on exit... + +else: + raise "Sorry no implementation for your platform (%s) available." % sys.platform + +CONVERT_CRLF = 2 +CONVERT_CR = 1 +CONVERT_LF = 0 +NEWLINE_CONVERISON_MAP = ('\n', '\r', '\r\n') + +class Miniterm: + def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False, convert_outgoing=CONVERT_CRLF, repr_mode=0): + self.serial = serial.Serial(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=0.7) + self.echo = echo + self.repr_mode = repr_mode + self.convert_outgoing = convert_outgoing + self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing] + + def start(self): + self.alive = True + #start serial->console thread + self.receiver_thread = threading.Thread(target=self.reader) + self.receiver_thread.setDaemon(1) + self.receiver_thread.start() + #enter console->serial loop + self.transmitter_thread = threading.Thread(target=self.writer) + self.transmitter_thread.setDaemon(1) + self.transmitter_thread.start() + + def stop(self): + self.alive = False + + def join(self, transmit_only=False): + self.transmitter_thread.join() + if not transmit_only: + self.receiver_thread.join() + + def reader(self): + """loop and copy serial->console""" + while self.alive: + data = self.serial.read(1) + + if self.repr_mode == 0: + # direct output, just have to care about newline setting + if data == '\r' and self.convert_outgoing == CONVERT_CR: + sys.stdout.write('\n') + else: + sys.stdout.write(data) + elif self.repr_mode == 1: + # escape non-printable, let pass newlines + if self.convert_outgoing == CONVERT_CRLF and data in '\r\n': + if data == '\n': + sys.stdout.write('\n') + elif data == '\r': + pass + elif data == '\n' and self.convert_outgoing == CONVERT_LF: + sys.stdout.write('\n') + elif data == '\r' and self.convert_outgoing == CONVERT_CR: + sys.stdout.write('\n') + else: + sys.stdout.write(repr(data)[1:-1]) + elif self.repr_mode == 2: + # escape all non-printable, including newline + sys.stdout.write(repr(data)[1:-1]) + elif self.repr_mode == 3: + # escape everything (hexdump) + for character in data: + sys.stdout.write("%s " % character.encode('hex')) + sys.stdout.flush() + + + def writer(self): + """loop and copy console->serial until EXITCHARCTER character is found""" + while self.alive: + try: + c = console.getkey() + except KeyboardInterrupt: + c = '\x03' + if c == EXITCHARCTER: + self.stop() + break # exit app + elif c == UPLOADCHARACTER: # upload text file + sys.stderr.write('\nFile to upload: ') + sys.stderr.flush() + console.cleanup() + filename = sys.stdin.readline().rstrip('\r\n') + if filename != '': + try: + file = open(filename, 'r') + sys.stderr.write('Sending file %s ' % filename) + while True: + line = file.readline().rstrip('\r\n') + if not line: + break + self.serial.write(line) + self.serial.write('\r\n') + # Wait for output buffer to drain. + self.serial.flush() + sys.stderr.write('.') # Progress indicator. + sys.stderr.write('\nFile %s sent.\n' % filename) + except IOError: + print 'Error opening file %s' % filename + console.setup() + + elif c == '\n': + self.serial.write(self.newline) # send newline character(s) + if self.echo: + sys.stdout.write(c) # local echo is a real newline in any case + else: + self.serial.write(c) # send character + if self.echo: + sys.stdout.write(c) + +def key_description(character): + """generate a readable description for a key""" + ascii_code = ord(character) + if ascii_code < 32: + return 'Ctrl+%c' % (ord('@') + ascii_code) + else: + return repr(ascii_code) + + +def main(): + import optparse + + parser = optparse.OptionParser(usage="""\ +%prog [options] [port [baudrate]] + +Miniterm - A simple terminal program for the serial port.""") + + parser.add_option("-p", "--port", dest="port", + help="port, a number (default 0) or a device name (deprecated option)", + default=None) + + parser.add_option("-b", "--baud", dest="baudrate", action="store", type='int', + help="set baudrate, default 9600", default=9600) + + parser.add_option("", "--parity", dest="parity", action="store", + help="set parity, one of [N, E, O], default=N", default='N') + + parser.add_option("-e", "--echo", dest="echo", action="store_true", + help="enable local echo (default off)", default=False) + + parser.add_option("", "--rtscts", dest="rtscts", action="store_true", + help="enable RTS/CTS flow control (default off)", default=False) + + parser.add_option("", "--xonxoff", dest="xonxoff", action="store_true", + help="enable software flow control (default off)", default=False) + + parser.add_option("", "--cr", dest="cr", action="store_true", + help="do not send CR+LF, send CR only", default=False) + + parser.add_option("", "--lf", dest="lf", action="store_true", + help="do not send CR+LF, send LF only", default=False) + + parser.add_option("-D", "--debug", dest="repr_mode", action="count", + help="""debug received data (escape non-printable chars) +--debug can be given multiple times: +0: just print what is received +1: escape non-printable characters, do newlines as ususal +2: escape non-printable characters, newlines too +3: hex dump everything""", default=0) + + parser.add_option("", "--rts", dest="rts_state", action="store", type='int', + help="set initial RTS line state (possible values: 0, 1)", default=None) + + parser.add_option("", "--dtr", dest="dtr_state", action="store", type='int', + help="set initial DTR line state (possible values: 0, 1)", default=None) + + parser.add_option("-q", "--quiet", dest="quiet", action="store_true", + help="suppress non error messages", default=False) + + parser.add_option("", "--exit-char", dest="exit_char", action="store", type='int', + help="ASCII code of special charcter that is used to exit the application", default=0x1d) + + parser.add_option("", "--upload-char", dest="upload_char", action="store", type='int', + help="ASCII code of special charcter that is used to send a file", default=0x15) + + (options, args) = parser.parse_args() + + if options.cr and options.lf: + parser.error("ony one of --cr or --lf can be specified") + + global EXITCHARCTER, UPLOADCHARACTER + EXITCHARCTER = chr(options.exit_char) + UPLOADCHARACTER = chr(options.upload_char) + + port = options.port + baudrate = options.baudrate + if args: + if options.port is not None: + parser.error("no arguments are allowed, options only when --port is given") + port = args.pop(0) + if args: + try: + baudrate = int(args[0]) + except ValueError: + parser.error("baudrate must be a number, not %r" % args[0]) + args.pop(0) + if args: + parser.error("too many arguments") + else: + if port is None: port = 0 + + convert_outgoing = CONVERT_CRLF + if options.cr: + convert_outgoing = CONVERT_CR + elif options.lf: + convert_outgoing = CONVERT_LF + + try: + miniterm = Miniterm( + port, + baudrate, + options.parity, + rtscts=options.rtscts, + xonxoff=options.xonxoff, + echo=options.echo, + convert_outgoing=convert_outgoing, + repr_mode=options.repr_mode, + ) + except serial.SerialException: + sys.stderr.write("could not open port %r\n" % port) + sys.exit(1) + + if not options.quiet: + sys.stderr.write('--- Miniterm on %s: %d,%s,%s,%s. ---\n' % ( + miniterm.serial.portstr, + miniterm.serial.baudrate, + miniterm.serial.bytesize, + miniterm.serial.parity, + miniterm.serial.stopbits, + )) + sys.stderr.write('--- Quit: %s | Upload: %s ---\n' % ( + key_description(EXITCHARCTER), + key_description(UPLOADCHARACTER) + )) + if options.dtr_state is not None: + if not options.quiet: + sys.stderr.write('--- forcing DTR %s\n' % (options.dtr_state and 'active' or 'inactive')) + miniterm.serial.setDTR(options.dtr_state) + if options.rts_state is not None: + if not options.quiet: + sys.stderr.write('--- forcing RTS %s\n' % (options.rts_state and 'active' or 'inactive')) + miniterm.serial.setRTS(options.rts_state) + + miniterm.start() + miniterm.join(True) + if not options.quiet: + sys.stderr.write("\n--- exit ---\n") + miniterm.join() + + +if __name__ == '__main__': + main() diff --git a/examples/scan.py b/examples/scan.py new file mode 100644 index 0000000..439818c --- /dev/null +++ b/examples/scan.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +"""Scan for serial ports. +Part of pySerial (http://pyserial.sf.net) (C)2002-2003 <cliechti@gmx.net> + +The scan function of this module tries to open each port number +from 0 to 255 and it builds a list of those ports where this was +successful. +""" + +import serial + +def scan(): + """scan for available ports. return a list of tuples (num, name)""" + available = [] + for i in range(256): + try: + s = serial.Serial(i) + available.append( (i, s.portstr)) + s.close() #explicit close 'cause of delayed GC in java + except serial.SerialException: + pass + return available + +if __name__=='__main__': + print "Found ports:" + for n,s in scan(): + print "(%d) %s" % (n,s) diff --git a/examples/scanwin32.py b/examples/scanwin32.py new file mode 100644 index 0000000..eb5f544 --- /dev/null +++ b/examples/scanwin32.py @@ -0,0 +1,190 @@ +import ctypes +import re + +def ValidHandle(value): + if value == 0: + raise ctypes.WinError() + return value + +NULL = 0 +HDEVINFO = ctypes.c_int +BOOL = ctypes.c_int +CHAR = ctypes.c_char +PCTSTR = ctypes.c_char_p +HWND = ctypes.c_uint +DWORD = ctypes.c_ulong +PDWORD = ctypes.POINTER(DWORD) +ULONG = ctypes.c_ulong +ULONG_PTR = ctypes.POINTER(ULONG) +#~ PBYTE = ctypes.c_char_p +PBYTE = ctypes.c_void_p + +class GUID(ctypes.Structure): + _fields_ = [ + ('Data1', ctypes.c_ulong), + ('Data2', ctypes.c_ushort), + ('Data3', ctypes.c_ushort), + ('Data4', ctypes.c_ubyte*8), + ] + def __str__(self): + return "{%08x-%04x-%04x-%s-%s}" % ( + self.Data1, + self.Data2, + self.Data3, + ''.join(["%02x" % d for d in self.Data4[:2]]), + ''.join(["%02x" % d for d in self.Data4[2:]]), + ) + +class SP_DEVINFO_DATA(ctypes.Structure): + _fields_ = [ + ('cbSize', DWORD), + ('ClassGuid', GUID), + ('DevInst', DWORD), + ('Reserved', ULONG_PTR), + ] + def __str__(self): + return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst) +PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA) + +class SP_DEVICE_INTERFACE_DATA(ctypes.Structure): + _fields_ = [ + ('cbSize', DWORD), + ('InterfaceClassGuid', GUID), + ('Flags', DWORD), + ('Reserved', ULONG_PTR), + ] + def __str__(self): + return "InterfaceClassGuid:%s Flags:%s" % (self.InterfaceClassGuid, self.Flags) +PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA) + +PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p + +SetupDiDestroyDeviceInfoList = ctypes.windll.setupapi.SetupDiDestroyDeviceInfoList +SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO] +SetupDiDestroyDeviceInfoList.restype = BOOL + +SetupDiGetClassDevs = ctypes.windll.setupapi.SetupDiGetClassDevsA +SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD] +SetupDiGetClassDevs.restype = ValidHandle #HDEVINFO + +SetupDiEnumDeviceInterfaces = ctypes.windll.setupapi.SetupDiEnumDeviceInterfaces +SetupDiEnumDeviceInterfaces.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, ctypes.POINTER(GUID), DWORD, PSP_DEVICE_INTERFACE_DATA] +SetupDiEnumDeviceInterfaces.restype = BOOL + +SetupDiGetDeviceInterfaceDetail = ctypes.windll.setupapi.SetupDiGetDeviceInterfaceDetailA +SetupDiGetDeviceInterfaceDetail.argtypes = [HDEVINFO, PSP_DEVICE_INTERFACE_DATA, PSP_DEVICE_INTERFACE_DETAIL_DATA, DWORD, PDWORD, PSP_DEVINFO_DATA] +SetupDiGetDeviceInterfaceDetail.restype = BOOL + +SetupDiGetDeviceRegistryProperty = ctypes.windll.setupapi.SetupDiGetDeviceRegistryPropertyA +SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD] +SetupDiGetDeviceRegistryProperty.restype = BOOL + + +GUID_CLASS_COMPORT = GUID(0x86e0d1e0L, 0x8089, 0x11d0, + (ctypes.c_ubyte*8)(0x9c, 0xe4, 0x08, 0x00, 0x3e, 0x30, 0x1f, 0x73)) + +DIGCF_PRESENT = 2 +DIGCF_DEVICEINTERFACE = 16 +INVALID_HANDLE_VALUE = 0 +ERROR_INSUFFICIENT_BUFFER = 122 +SPDRP_HARDWAREID = 1 +SPDRP_FRIENDLYNAME = 12 +ERROR_NO_MORE_ITEMS = 259 + +def comports(available_only=True): + """This generator scans the device registry for com ports and yields port, desc, hwid. + If available_only is true only return currently existing ports.""" + flags = DIGCF_DEVICEINTERFACE + if available_only: + flags |= DIGCF_PRESENT + g_hdi = SetupDiGetClassDevs(ctypes.byref(GUID_CLASS_COMPORT), None, NULL, flags); + #~ for i in range(256): + for dwIndex in range(256): + did = SP_DEVICE_INTERFACE_DATA() + did.cbSize = ctypes.sizeof(did) + + if not SetupDiEnumDeviceInterfaces( + g_hdi, + None, + ctypes.byref(GUID_CLASS_COMPORT), + dwIndex, + ctypes.byref(did) + ): + if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS: + raise ctypes.WinError() + break + + dwNeeded = DWORD() + # get the size + if not SetupDiGetDeviceInterfaceDetail( + g_hdi, + ctypes.byref(did), + None, 0, ctypes.byref(dwNeeded), + None + ): + # Ignore ERROR_INSUFFICIENT_BUFFER + if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + raise ctypes.WinError() + # allocate buffer + class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure): + _fields_ = [ + ('cbSize', DWORD), + ('DevicePath', CHAR*(dwNeeded.value - ctypes.sizeof(DWORD))), + ] + def __str__(self): + return "DevicePath:%s" % (self.DevicePath,) + idd = SP_DEVICE_INTERFACE_DETAIL_DATA_A() + idd.cbSize = 5 + devinfo = SP_DEVINFO_DATA() + devinfo.cbSize = ctypes.sizeof(devinfo) + if not SetupDiGetDeviceInterfaceDetail( + g_hdi, + ctypes.byref(did), + ctypes.byref(idd), dwNeeded, None, + ctypes.byref(devinfo) + ): + raise ctypes.WinError() + + # hardware ID + szHardwareID = ctypes.create_string_buffer('\0' * 250) + if not SetupDiGetDeviceRegistryProperty( + g_hdi, + ctypes.byref(devinfo), + SPDRP_HARDWAREID, + None, + ctypes.byref(szHardwareID), ctypes.sizeof(szHardwareID) - 1, + None + ): + # Ignore ERROR_INSUFFICIENT_BUFFER + if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + raise ctypes.WinError() + + # friendly name + szFriendlyName = ctypes.create_string_buffer('\0' * 250) + if not SetupDiGetDeviceRegistryProperty( + g_hdi, + ctypes.byref(devinfo), + SPDRP_FRIENDLYNAME, + None, + ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1, + None + ): + # Ignore ERROR_INSUFFICIENT_BUFFER + if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + raise ctypes.WinError() + port_name = re.search(r"\((.*)\)", szFriendlyName.value).group(1) + yield port_name, szFriendlyName.value, szHardwareID.value + + SetupDiDestroyDeviceInfoList(g_hdi) + + +if __name__ == '__main__': + import serial + for port, desc, hwid in comports(): + print "%s: %s (%s)" % (port, desc, hwid) + print " "*10, serial.Serial(port) #test open + + # list of all ports the system knows + print "-"*60 + for port, desc, hwid in comports(False): + print "%-10s: %s (%s)" % (port, desc, hwid) diff --git a/examples/setup-miniterm-py2exe.py b/examples/setup-miniterm-py2exe.py new file mode 100644 index 0000000..c28457a --- /dev/null +++ b/examples/setup-miniterm-py2exe.py @@ -0,0 +1,25 @@ +# setup script for py2exe to create the miniterm.exe +# $Id: setup-miniterm-py2exe.py,v 1.1 2005-09-21 19:51:19 cliechti Exp $ + +from distutils.core import setup +import glob, sys, py2exe, os + +sys.path.append('..') + +sys.argv.extend("py2exe --bundle 1".split()) + +setup( + name='miniterm', + #~ version='0.5', + zipfile=None, + options = {"py2exe": + { + 'dist_dir': 'bin', + 'excludes': ['javax.comm'], + } + }, + console = [ + #~ "miniterm_exe_wrapper.py", + "miniterm.py", + ], +) diff --git a/examples/setup_demo.py b/examples/setup_demo.py new file mode 100644 index 0000000..854c0b9 --- /dev/null +++ b/examples/setup_demo.py @@ -0,0 +1,35 @@ +# This is a setup.py example script for the use with py2exe +from distutils.core import setup +import py2exe +import sys, os + +#this script is only useful for py2exe so just run that distutils command. +#that allows to run it with a simple double click. +sys.argv.append('py2exe') + +#get an icon from somewhere.. the python installation should have one: +icon = os.path.join(os.path.dirname(sys.executable), 'py.ico') + +setup( + options = {'py2exe': { + 'excludes': ['javax.comm'], + 'optimize': 2, + 'dist_dir': 'dist', + } + }, + + name = "wxTerminal", + windows = [ + { + 'script': "wxTerminal.py", + 'icon_resources': [(0x0004, icon)] + }, + ], + zipfile = "stuff.lib", + + description = "Simple serial terminal application", + version = "0.1", + author = "Chris Liechti", + author_email = "cliechti@gmx.net", + url = "http://pyserial.sf.net", +) diff --git a/examples/tcp_serial_redirect.py b/examples/tcp_serial_redirect.py new file mode 100644 index 0000000..4c04294 --- /dev/null +++ b/examples/tcp_serial_redirect.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +# (C) 2002-2006 Chris Liechti <cliechti@gmx.net> +# redirect data from a TCP/IP connection to a serial port and vice versa +# requires Python 2.2 'cause socket.sendall is used + + +import sys, os, serial, threading, socket + +try: + True +except NameError: + True = 1 + False = 0 + +class Redirector: + def __init__(self, serial, socket): + self.serial = serial + self.socket = socket + + def shortcut(self): + """connect the serial port to the tcp port by copying everything + from one side to the other""" + self.alive = True + self.thread_read = threading.Thread(target=self.reader) + self.thread_read.setDaemon(1) + self.thread_read.start() + self.writer() + + def reader(self): + """loop forever and copy serial->socket""" + while self.alive: + try: + data = self.serial.read(1) #read one, blocking + n = self.serial.inWaiting() #look if there is more + if n: + data = data + self.serial.read(n) #and get as much as possible + if data: + self.socket.sendall(data) #send it over TCP + except socket.error, msg: + print msg + #probably got disconnected + break + self.alive = False + + def writer(self): + """loop forever and copy socket->serial""" + while self.alive: + try: + data = self.socket.recv(1024) + if not data: + break + self.serial.write(data) #get a bunch of bytes and send them + except socket.error, msg: + print msg + #probably got disconnected + break + self.alive = False + self.thread_read.join() + + def stop(self): + """Stop copying""" + if self.alive: + self.alive = False + self.thread_read.join() + + +if __name__ == '__main__': + import optparse + + parser = optparse.OptionParser(usage="""\ +%prog [options] [port [baudrate]] +Simple Serial to Network (TCP/IP) redirector. + +Note: no security measures are implemeted. Anyone can remotely connect +to this service over the network. +Only one connection at once is supported. When the connection is terminated +it waits for the next connect. +""") + parser.add_option("-p", "--port", dest="port", + help="port, a number (default 0) or a device name (deprecated option)", + default=None) + + parser.add_option("-b", "--baud", dest="baudrate", action="store", type='int', + help="set baudrate, default 9600", default=9600) + + parser.add_option("", "--parity", dest="parity", action="store", + help="set parity, one of [N, E, O], default=N", default='N') + + parser.add_option("", "--rtscts", dest="rtscts", action="store_true", + help="enable RTS/CTS flow control (default off)", default=False) + + parser.add_option("", "--xonxoff", dest="xonxoff", action="store_true", + help="enable software flow control (default off)", default=False) + + parser.add_option("", "--cr", dest="cr", action="store_true", + help="do not send CR+LF, send CR only", default=False) + + parser.add_option("", "--lf", dest="lf", action="store_true", + help="do not send CR+LF, send LF only", default=False) + + parser.add_option("", "--rts", dest="rts_state", action="store", type='int', + help="set initial RTS line state (possible values: 0, 1)", default=None) + + parser.add_option("", "--dtr", dest="dtr_state", action="store", type='int', + help="set initial DTR line state (possible values: 0, 1)", default=None) + + parser.add_option("-q", "--quiet", dest="quiet", action="store_true", + help="suppress non error messages", default=False) + + parser.add_option("-P", "--localport", dest="local_port", action="store", type='int', + help="local TCP port", default=7777) + + + (options, args) = parser.parse_args() + + port = options.port + baudrate = options.baudrate + if args: + if options.port is not None: + parser.error("no arguments are allowed, options only when --port is given") + port = args.pop(0) + if args: + try: + baudrate = int(args[0]) + except ValueError: + parser.error("baudrate must be a number, not %r" % args[0]) + args.pop(0) + if args: + parser.error("too many arguments") + else: + if port is None: port = 0 + + if options.cr and options.lf: + parser.error("ony one of --cr or --lf can be specified") + + ser = serial.Serial() + ser.port = port + ser.baudrate = baudrate + ser.rtscts = options.rtscts + ser.xonxoff = options.xonxoff + ser.timeout = 1 #required so that the reader thread can exit + + if not options.quiet: + print "--- TCP/IP to Serial redirector --- type Ctrl-C / BREAK to quit" + print "--- %s %s,%s,%s,%s ---" % (ser.portstr, ser.baudrate, 8, ser.parity, 1) + + try: + ser.open() + except serial.SerialException, e: + print "Could not open serial port %s: %s" % (ser.portstr, e) + sys.exit(1) + + if options.rts_state is not None: + ser.setRTS(options.rts_state) + + if options.dtr_state is not None: + ser.setDTR(options.dtr_state) + + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.bind( ('', options.local_port) ) + srv.listen(1) + while 1: + try: + print "Waiting for connection on %s..." % options.local_port + connection, addr = srv.accept() + print 'Connected by', addr + #enter console->serial loop + r = Redirector(ser, connection) + r.shortcut() + print 'Disconnected' + connection.close() + except socket.error, msg: + print msg + + print "\n--- exit ---" diff --git a/examples/test.py b/examples/test.py new file mode 100644 index 0000000..a059d1e --- /dev/null +++ b/examples/test.py @@ -0,0 +1,189 @@ +#! /usr/bin/env python +# Python Serial Port Extension for Win32, Linux, BSD, Jython +# see __init__.py +# +# (C) 2001-2008 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""\ +Some tests for the serial module. +Part of pyserial (http://pyserial.sf.net) (C)2001-2008 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +For all these tests a simple hardware is required. +Loopback HW adapter: +Shortcut these pin pairs: + TX <-> RX + RTS <-> CTS + DTR <-> DSR + +On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) +""" + +import unittest, threading, time +import serial + +#on which port should the tests be performed: +PORT=0 + + +class Test4_Nonblocking(unittest.TestCase): + """Test with timeouts""" + timeout=0 + def setUp(self): + self.s = serial.Serial(PORT,timeout=self.timeout) + def tearDown(self): + self.s.close() + + def test0_Messy(self): + """NonBlocking (timeout=0)""" + #this is only here to write out the message in verbose mode + #because Test3 and Test4 print the same messages + + def test1_ReadEmpty(self): + """timeout: After port open, the input buffer must be empty""" + self.failUnless(self.s.read(1)=='', "expected empty buffer") + def test2_Loopback(self): + """timeout: each sent character should return (binary test). + this is also a test for the binary capability of a port.""" + for c in map(chr,range(256)): + self.s.write(c) + time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32) + self.failUnless(self.s.inWaiting()==1, "expected exactly one character for inWainting()") + self.failUnless(self.s.read(1)==c, "expected a '%s' which was written before" % c) + self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read") + def test2_LoopbackTimeout(self): + """timeout: test the timeout/immediate return. + partial results should be returned.""" + self.s.write("HELLO") + time.sleep(0.1) #there might be a small delay until the character is ready (especialy on win32) + #read more characters as are available to run in the timeout + self.failUnless(self.s.read(10)=='HELLO', "expected the 'HELLO' which was written before") + self.failUnless(self.s.read(1)=='', "expected empty buffer after all sent chars are read") + + +class Test3_Timeout(Test4_Nonblocking): + """Same tests as the NonBlocking ones but this time with timeout""" + timeout=1 + def test0_Messy(self): + """Blocking (timeout=1)""" + #this is only here to write out the message in verbose mode + #because Test3 and Test4 print the same messages + +class SendEvent(threading.Thread): + def __init__(self, serial, delay=1): + threading.Thread.__init__(self) + self.serial = serial + self.delay = delay + self.x = threading.Event() + self.stopped = 0 + self.start() + def run(self): + time.sleep(self.delay) + if not self.stopped: + self.serial.write("E") + self.x.set() + def isSet(self): + return self.x.isSet() + def stop(self): + self.stopped = 1 + self.x.wait() + +class Test1_Forever(unittest.TestCase): + """Tests a port with no timeout. These tests require that a + character is sent after some time to stop the test, this is done + through the SendEvent class and the Loopback HW.""" + def setUp(self): + self.s = serial.Serial(PORT, timeout=None) + self.event = SendEvent(self.s) + def tearDown(self): + self.event.stop() + self.s.close() + + def test2_ReadEmpty(self): + """no timeout: after port open, the input buffer must be empty (read). + a character is sent after some time to terminate the test (SendEvent).""" + c = self.s.read(1) + if not (self.event.isSet() and c == 'E'): + self.fail("expected marker") + +class Test2_Forever(unittest.TestCase): + """Tests a port with no timeout""" + def setUp(self): + self.s = serial.Serial(PORT,timeout=None) + def tearDown(self): + self.s.close() + + def test1_inWaitingEmpty(self): + """no timeout: after port open, the input buffer must be empty (inWaiting)""" + self.failUnless(self.s.inWaiting()==0, "expected empty buffer") + + def test2_Loopback(self): + """no timeout: each sent character should return (binary test). + this is also a test for the binary capability of a port.""" + for c in map(chr,range(256)): + self.s.write(c) + time.sleep(0.02) #there might be a small delay until the character is ready (especialy on win32) + self.failUnless(self.s.inWaiting()==1, "expected exactly one character for inWainting()") + self.failUnless(self.s.read(1)==c, "expected an '%s' which was written before" % c) + self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read") + + +class Test0_DataWires(unittest.TestCase): + """Test modem control lines""" + def setUp(self): + self.s = serial.Serial(PORT) + def tearDown(self): + self.s.close() + + def test1_RTS(self): + """Test RTS/CTS""" + self.s.setRTS(0) + self.failUnless(not self.s.getCTS(), "CTS -> 0") + self.s.setRTS(1) + self.failUnless(self.s.getCTS(), "CTS -> 1") + + def test2_DTR(self): + """Test DTR/DSR""" + self.s.setDTR(0) + self.failUnless(not self.s.getDSR(), "DSR -> 0") + self.s.setDTR(1) + self.failUnless(self.s.getDSR(), "DSR -> 1") + + def test3_RI(self): + """Test RI""" + self.failUnless(not self.s.getRI(), "RI -> 0") + +class Test_MoreTimeouts(unittest.TestCase): + """Test with timeouts""" + def setUp(self): + self.s = serial.Serial() #create an closed serial port + + def tearDown(self): + self.s.close() + + def test_WriteTimeout(self): + """Test write() timeout.""" + #use xonxoff setting and the loopback adapter to switch traffic on hold + self.s.port = PORT + self.s.writeTimeout = 1 + self.s.xonxoff = 1 + self.s.open() + self.s.write(serial.XOFF) + time.sleep(0.1) #some systems need a little delay so that they can react on XOFF + t1 = time.time() + self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, "timeout please"*100) + t2 = time.time() + self.failUnless( 1 <= (t2-t1) < 2, "Timeout not in the given intervall (%s)" % (t2-t1)) + +if __name__ == '__main__': + import sys + print __doc__ + if len(sys.argv) > 1: + PORT = sys.argv[1] + print "Testing port", PORT + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/examples/test_advanced.py b/examples/test_advanced.py new file mode 100644 index 0000000..2702e1c --- /dev/null +++ b/examples/test_advanced.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +#needs at least python 2.2.3 + +#Python Serial Port Extension for Win32, Linux, BSD, Jython +#see __init__.py +# +#(C) 2001-2003 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""Some tests for the serial module. +Part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +These tests open a serial port and change all the settings on the fly. +If the port is realy correctly configured cannot be determined - that +would require external hardware or a nullmodem cable and an other +serial port library... Thus it mainly tests that all features are +correctly implemented and that the interface does what it should. +""" + +import unittest +import serial + +#on which port should the tests be performed: +PORT=0 + +class Test_ChangeAttributes(unittest.TestCase): + """Test with timeouts""" + + def setUp(self): + self.s = serial.Serial() #create an closed serial port + + def tearDown(self): + self.s.close() + + def test_PortSetting(self): + self.s.port = PORT + #portstr has to be set + if isinstance(PORT, str): + self.failUnlessEqual(self.s.portstr.lower(), PORT.lower()) + else: + self.failUnlessEqual(self.s.portstr, serial.device(PORT)) + #test internals + self.failUnlessEqual(self.s._port, PORT) + #test on the fly change + self.s.open() + self.failUnless(self.s.isOpen()) + self.s.port = 0 + self.failUnless(self.s.isOpen()) + self.failUnlessEqual(self.s.port, 0) + self.failUnlessEqual(self.s.portstr, serial.device(0)) + try: + self.s.port = 1 + except serial.SerialException: #port not available on system + pass #cant test on this machine... + else: + self.failUnless(self.s.isOpen()) + self.failUnlessEqual(self.s.port, 1) + self.failUnlessEqual(self.s.portstr, serial.device(1)) + + def test_BaudrateSetting(self): + self.s.port = PORT + self.s.open() + for baudrate in (300, 9600, 19200, 115200): + self.s.baudrate = baudrate + #test get method + self.failUnlessEqual(self.s.baudrate, baudrate) + #test internals + self.failUnlessEqual(self.s._baudrate, baudrate) + #test illegal values + for illegal_value in (-300, -1, 'a', None): + self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value) + + def test_BaudrateSetting2(self): + #test illegal values, depending on machine/port some of these may be valid... + self.s.port = PORT + self.s.open() + for illegal_value in (500000,576000,921600,92160): + self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value) + + def test_BytesizeSetting(self): + for bytesize in (5,6,7,8): + self.s.bytesize = bytesize + #test get method + self.failUnlessEqual(self.s.bytesize, bytesize) + #test internals + self.failUnlessEqual(self.s._bytesize, bytesize) + #test illegal values + for illegal_value in (0, 1, 3, 4, 9, 10, 'a', None): + self.failUnlessRaises(ValueError, self.s.setByteSize, illegal_value) + + def test_ParitySetting(self): + for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD): + self.s.parity = parity + #test get method + self.failUnlessEqual(self.s.parity, parity) + #test internals + self.failUnlessEqual(self.s._parity, parity) + #test illegal values + for illegal_value in (0, 57, 'a', None): + self.failUnlessRaises(ValueError, self.s.setParity, illegal_value) + + def test_StopbitsSetting(self): + for stopbits in (1, 2): + self.s.stopbits = stopbits + #test get method + self.failUnlessEqual(self.s.stopbits, stopbits) + #test internals + self.failUnlessEqual(self.s._stopbits, stopbits) + #test illegal values + for illegal_value in (0, 3, 1.5, 57, 'a', None): + self.failUnlessRaises(ValueError, self.s.setStopbits, illegal_value) + + def test_TimeoutSetting(self): + for timeout in (None, 0, 1, 3.14159, 10, 1000, 3600): + self.s.timeout = timeout + #test get method + self.failUnlessEqual(self.s.timeout, timeout) + #test internals + self.failUnlessEqual(self.s._timeout, timeout) + #test illegal values + for illegal_value in (-1, 'a'): + self.failUnlessRaises(ValueError, self.s.setTimeout, illegal_value) + + def test_XonXoffSetting(self): + for xonxoff in (True, False): + self.s.xonxoff = xonxoff + #test get method + self.failUnlessEqual(self.s.xonxoff, xonxoff) + #test internals + self.failUnlessEqual(self.s._xonxoff, xonxoff) + #no illegal values here, normal rules for the boolean value of an + #object are used thus all objects have a truth value. + + def test_RtsCtsSetting(self): + for rtscts in (True, False): + self.s.rtscts = rtscts + #test get method + self.failUnlessEqual(self.s.rtscts, rtscts) + #test internals + self.failUnlessEqual(self.s._rtscts, rtscts) + #no illegal values here, normal rules for the boolean value of an + #object are used thus all objects have a truth value. + + def test_UnconfiguredPort(self): + #an unconfigured port cannot be opened + self.failUnlessRaises(serial.SerialException, self.s.open) + + def test_PortOpenClose(self): + self.s.port = PORT + for i in range(3): + #open the port and check flag + self.failUnless(not self.s.isOpen()) + self.s.open() + self.failUnless(self.s.isOpen()) + self.s.close() + self.failUnless(not self.s.isOpen()) + +if __name__ == '__main__': + import sys + print __doc__ + if len(sys.argv) > 1: + PORT = sys.argv[1] + print "Testing port", PORT + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/examples/test_high_load.py b/examples/test_high_load.py new file mode 100644 index 0000000..2c37fcf --- /dev/null +++ b/examples/test_high_load.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +#Python Serial Port Extension for Win32, Linux, BSD, Jython +#see __init__.py +# +#(C) 2001-2003 Chris Liechti <cliechti@gmx.net> +# this is distributed under a free software license, see license.txt + +"""Some tests for the serial module. +Part of pyserial (http://pyserial.sf.net) (C)2002-2003 cliechti@gmx.net + +Intended to be run on different platforms, to ensure portability of +the code. + +For all these tests a simple hardware is required. +Loopback HW adapter: +Shortcut these pin pairs: + TX <-> RX + RTS <-> CTS + DTR <-> DSR + +On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) +""" + +import unittest, threading, time +import serial + +#on which port should the tests be performed: +PORT=0 +BAUDRATE=115200 +#~ BAUDRATE=9600 + + +class TestHighLoad(unittest.TestCase): + """Test sending and receiving large amount of data""" + + N = 16 + #~ N = 1 + + def setUp(self): + self.s = serial.Serial(PORT,BAUDRATE, timeout=10) + def tearDown(self): + self.s.close() + + def test0_WriteReadLoopback(self): + """Send big strings, write/read order.""" + for i in range(self.N): + q = ''.join(map(chr,range(256))) + self.s.write(q) + self.failUnless(self.s.read(len(q))==q, "expected a '%s' which was written before" % q) + self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read") + + def test1_WriteWriteReadLoopback(self): + """Send big strings, multiple write one read.""" + q = ''.join(map(chr,range(256))) + for i in range(self.N): + self.s.write(q) + read = self.s.read(len(q)*self.N) + self.failUnless(read==q*self.N, "expected what was written before. got %d bytes, expected %d" % (len(read), self.N*len(q))) + self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read") + +if __name__ == '__main__': + import sys + print __doc__ + if len(sys.argv) > 1: + PORT = sys.argv[1] + print "Testing port", PORT + sys.argv[1:] = ['-v'] + # When this module is executed from the command-line, it runs all its tests + unittest.main() diff --git a/examples/wxSerialConfigDialog.py b/examples/wxSerialConfigDialog.py new file mode 100644 index 0000000..7085035 --- /dev/null +++ b/examples/wxSerialConfigDialog.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python +# generated by wxGlade 0.3.1 on Thu Oct 02 23:25:44 2003 + +#from wxPython.wx import * +import wx +import serial + +SHOW_BAUDRATE = 1<<0 +SHOW_FORMAT = 1<<1 +SHOW_FLOW = 1<<2 +SHOW_TIMEOUT = 1<<3 +SHOW_ALL = SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT + +try: + enumerate +except NameError: + def enumerate(sequence): + return zip(range(len(sequence)), sequence) + +class SerialConfigDialog(wx.Dialog): + """Serial Port confiuration dialog, to be used with pyserial 2.0+ + When instantiating a class of this dialog, then the "serial" keyword + argument is mandatory. It is a reference to a serial.Serial instance. + the optional "show" keyword argument can be used to show/hide different + settings. The default is SHOW_ALL which coresponds to + SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT. All constants can be + found in ths module (not the class).""" + + def __init__(self, *args, **kwds): + #grab the serial keyword and remove it from the dict + self.serial = kwds['serial'] + del kwds['serial'] + self.show = SHOW_ALL + if kwds.has_key('show'): + self.show = kwds['show'] + del kwds['show'] + # begin wxGlade: SerialConfigDialog.__init__ + # end wxGlade + kwds["style"] = wx.DEFAULT_DIALOG_STYLE + wx.Dialog.__init__(self, *args, **kwds) + self.label_2 = wx.StaticText(self, -1, "Port") + self.combo_box_port = wx.ComboBox(self, -1, choices=["dummy1", "dummy2", "dummy3", "dummy4", "dummy5"], style=wx.CB_DROPDOWN) + if self.show & SHOW_BAUDRATE: + self.label_1 = wx.StaticText(self, -1, "Baudrate") + self.choice_baudrate = wx.Choice(self, -1, choices=["choice 1"]) + if self.show & SHOW_FORMAT: + self.label_3 = wx.StaticText(self, -1, "Data Bits") + self.choice_databits = wx.Choice(self, -1, choices=["choice 1"]) + self.label_4 = wx.StaticText(self, -1, "Stop Bits") + self.choice_stopbits = wx.Choice(self, -1, choices=["choice 1"]) + self.label_5 = wx.StaticText(self, -1, "Parity") + self.choice_parity = wx.Choice(self, -1, choices=["choice 1"]) + if self.show & SHOW_TIMEOUT: + self.checkbox_timeout = wx.CheckBox(self, -1, "Use Timeout") + self.text_ctrl_timeout = wx.TextCtrl(self, -1, "") + self.label_6 = wx.StaticText(self, -1, "seconds") + if self.show & SHOW_FLOW: + self.checkbox_rtscts = wx.CheckBox(self, -1, "RTS/CTS") + self.checkbox_xonxoff = wx.CheckBox(self, -1, "Xon/Xoff") + self.button_ok = wx.Button(self, -1, "OK") + self.button_cancel = wx.Button(self, -1, "Cancel") + + self.__set_properties() + self.__do_layout() + #fill in ports and select current setting + index = 0 + self.combo_box_port.Clear() + for n in range(4): + portname = serial.device(n) + self.combo_box_port.Append(portname) + if self.serial.portstr == portname: + index = n + if self.serial.portstr is not None: + self.combo_box_port.SetValue(str(self.serial.portstr)) + else: + self.combo_box_port.SetSelection(index) + if self.show & SHOW_BAUDRATE: + #fill in badrates and select current setting + self.choice_baudrate.Clear() + for n, baudrate in enumerate(self.serial.BAUDRATES): + self.choice_baudrate.Append(str(baudrate)) + if self.serial.baudrate == baudrate: + index = n + self.choice_baudrate.SetSelection(index) + if self.show & SHOW_FORMAT: + #fill in databits and select current setting + self.choice_databits.Clear() + for n, bytesize in enumerate(self.serial.BYTESIZES): + self.choice_databits.Append(str(bytesize)) + if self.serial.bytesize == bytesize: + index = n + self.choice_databits.SetSelection(index) + #fill in stopbits and select current setting + self.choice_stopbits.Clear() + for n, stopbits in enumerate(self.serial.STOPBITS): + self.choice_stopbits.Append(str(stopbits)) + if self.serial.stopbits == stopbits: + index = n + self.choice_stopbits.SetSelection(index) + #fill in parities and select current setting + self.choice_parity.Clear() + for n, parity in enumerate(self.serial.PARITIES): + self.choice_parity.Append(str(serial.PARITY_NAMES[parity])) + if self.serial.parity == parity: + index = n + self.choice_parity.SetSelection(index) + if self.show & SHOW_TIMEOUT: + #set the timeout mode and value + if self.serial.timeout is None: + self.checkbox_timeout.SetValue(False) + self.text_ctrl_timeout.Enable(False) + else: + self.checkbox_timeout.SetValue(True) + self.text_ctrl_timeout.Enable(True) + self.text_ctrl_timeout.SetValue(str(self.serial.timeout)) + if self.show & SHOW_FLOW: + #set the rtscts mode + self.checkbox_rtscts.SetValue(self.serial.rtscts) + #set the rtscts mode + self.checkbox_xonxoff.SetValue(self.serial.xonxoff) + #attach the event handlers + self.__attach_events() + + def __set_properties(self): + # begin wxGlade: SerialConfigDialog.__set_properties + # end wxGlade + self.SetTitle("Serial Port Configuration") + if self.show & SHOW_TIMEOUT: + self.text_ctrl_timeout.Enable(0) + self.button_ok.SetDefault() + + def __do_layout(self): + # begin wxGlade: SerialConfigDialog.__do_layout + # end wxGlade + sizer_2 = wx.BoxSizer(wx.VERTICAL) + sizer_3 = wx.BoxSizer(wx.HORIZONTAL) + sizer_basics = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Basics"), wx.VERTICAL) + sizer_5 = wx.BoxSizer(wx.HORIZONTAL) + sizer_5.Add(self.label_2, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_5.Add(self.combo_box_port, 1, 0, 0) + sizer_basics.Add(sizer_5, 0, wx.RIGHT|wx.EXPAND, 0) + if self.show & SHOW_BAUDRATE: + sizer_baudrate = wx.BoxSizer(wx.HORIZONTAL) + sizer_baudrate.Add(self.label_1, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_baudrate.Add(self.choice_baudrate, 1, wx.ALIGN_RIGHT, 0) + sizer_basics.Add(sizer_baudrate, 0, wx.EXPAND, 0) + sizer_2.Add(sizer_basics, 0, wx.EXPAND, 0) + if self.show & SHOW_FORMAT: + sizer_8 = wx.BoxSizer(wx.HORIZONTAL) + sizer_7 = wx.BoxSizer(wx.HORIZONTAL) + sizer_6 = wx.BoxSizer(wx.HORIZONTAL) + sizer_format = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Data Format"), wx.VERTICAL) + sizer_6.Add(self.label_3, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_6.Add(self.choice_databits, 1, wx.ALIGN_RIGHT, 0) + sizer_format.Add(sizer_6, 0, wx.EXPAND, 0) + sizer_7.Add(self.label_4, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_7.Add(self.choice_stopbits, 1, wx.ALIGN_RIGHT, 0) + sizer_format.Add(sizer_7, 0, wx.EXPAND, 0) + sizer_8.Add(self.label_5, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_8.Add(self.choice_parity, 1, wx.ALIGN_RIGHT, 0) + sizer_format.Add(sizer_8, 0, wx.EXPAND, 0) + sizer_2.Add(sizer_format, 0, wx.EXPAND, 0) + if self.show & SHOW_TIMEOUT: + sizer_timeout = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Timeout"), wx.HORIZONTAL) + sizer_timeout.Add(self.checkbox_timeout, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_timeout.Add(self.text_ctrl_timeout, 0, 0, 0) + sizer_timeout.Add(self.label_6, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_2.Add(sizer_timeout, 0, 0, 0) + if self.show & SHOW_FLOW: + sizer_flow = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Flow Control"), wx.HORIZONTAL) + sizer_flow.Add(self.checkbox_rtscts, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_flow.Add(self.checkbox_xonxoff, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_flow.Add((10,10), 1, wx.EXPAND, 0) + sizer_2.Add(sizer_flow, 0, wx.EXPAND, 0) + sizer_3.Add(self.button_ok, 0, 0, 0) + sizer_3.Add(self.button_cancel, 0, 0, 0) + sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4) + self.SetAutoLayout(1) + self.SetSizer(sizer_2) + sizer_2.Fit(self) + sizer_2.SetSizeHints(self) + self.Layout() + + def __attach_events(self): + wx.EVT_BUTTON(self, self.button_ok.GetId(), self.OnOK) + wx.EVT_BUTTON(self, self.button_cancel.GetId(), self.OnCancel) + if self.show & SHOW_TIMEOUT: + wx.EVT_CHECKBOX(self, self.checkbox_timeout.GetId(), self.OnTimeout) + + def OnOK(self, events): + success = True + self.serial.port = str(self.combo_box_port.GetValue()) + if self.show & SHOW_BAUDRATE: + self.serial.baudrate = self.serial.BAUDRATES[self.choice_baudrate.GetSelection()] + if self.show & SHOW_FORMAT: + self.serial.bytesize = self.serial.BYTESIZES[self.choice_databits.GetSelection()] + self.serial.stopbits = self.serial.STOPBITS[self.choice_stopbits.GetSelection()] + self.serial.parity = self.serial.PARITIES[self.choice_parity.GetSelection()] + if self.show & SHOW_FLOW: + self.serial.rtscts = self.checkbox_rtscts.GetValue() + self.serial.xonxoff = self.checkbox_xonxoff.GetValue() + if self.show & SHOW_TIMEOUT: + if self.checkbox_timeout.GetValue(): + try: + self.serial.timeout = float(self.text_ctrl_timeout.GetValue()) + except ValueError: + dlg = wx.MessageDialog(self, 'Timeout must be a numeric value', + 'Value Error', wx.OK | wx.ICON_ERROR) + dlg.ShowModal() + dlg.Destroy() + success = False + else: + self.serial.timeout = None + if success: + self.EndModal(wx.ID_OK) + + def OnCancel(self, events): + self.EndModal(wx.ID_CANCEL) + + def OnTimeout(self, events): + if self.checkbox_timeout.GetValue(): + self.text_ctrl_timeout.Enable(True) + else: + self.text_ctrl_timeout.Enable(False) + +# end of class SerialConfigDialog + + +class MyApp(wx.App): + """Test code""" + def OnInit(self): + wx.InitAllImageHandlers() + + ser = serial.Serial() + print ser + #loop until cancel is pressed, old values are used as start for the next run + #show the different views, one after the other + #value are kept. + for flags in (SHOW_BAUDRATE, SHOW_FLOW, SHOW_FORMAT, SHOW_TIMEOUT, SHOW_ALL): + dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser, show=flags) + self.SetTopWindow(dialog_serial_cfg) + result = dialog_serial_cfg.ShowModal() + print ser + if result != wx.ID_OK: + break + #the user can play around with the values, CANCEL aborts the loop + while 1: + dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser) + self.SetTopWindow(dialog_serial_cfg) + result = dialog_serial_cfg.ShowModal() + print ser + if result != wx.ID_OK: + break + return 0 + +# end of class MyApp + +if __name__ == "__main__": + app = MyApp(0) + app.MainLoop() diff --git a/examples/wxSerialConfigDialog.wxg b/examples/wxSerialConfigDialog.wxg new file mode 100644 index 0000000..f5e92e0 --- /dev/null +++ b/examples/wxSerialConfigDialog.wxg @@ -0,0 +1,262 @@ +<?xml version="1.0"?> +<!-- generated by wxGlade 0.3.1 on Fri Oct 03 01:53:04 2003 --> + +<application path="D:\prog\python\pyserial_sf\pyserial\examples\wxSerialConfigDialog.py" name="app" class="MyApp" option="0" language="python" top_window="dialog_serial_cfg" encoding="ISO-8859-1" use_gettext="0" overwrite="0"> + <object class="SerialConfigDialog" name="dialog_serial_cfg" base="EditDialog"> + <style>wxDEFAULT_DIALOG_STYLE</style> + <title>Serial Port Configuration</title> + <object class="wxBoxSizer" name="sizer_2" base="EditBoxSizer"> + <orient>wxVERTICAL</orient> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxStaticBoxSizer" name="sizer_basics" base="EditStaticBoxSizer"> + <orient>wxVERTICAL</orient> + <label>Basics</label> + <object class="sizeritem"> + <flag>wxRIGHT|wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxBoxSizer" name="sizer_5" base="EditBoxSizer"> + <orient>wxHORIZONTAL</orient> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>1</option> + <object class="wxStaticText" name="label_2" base="EditStaticText"> + <attribute>1</attribute> + <label>Port</label> + </object> + </object> + <object class="sizeritem"> + <border>0</border> + <option>1</option> + <object class="wxComboBox" name="combo_box_port" base="EditComboBox"> + <selection>0</selection> + <choices> + <choice>dummy1</choice> + <choice>dummy2</choice> + <choice>dummy3</choice> + <choice>dummy4</choice> + <choice>dummy5</choice> + </choices> + </object> + </object> + </object> + </object> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxBoxSizer" name="sizer_baudrate" base="EditBoxSizer"> + <orient>wxHORIZONTAL</orient> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>1</option> + <object class="wxStaticText" name="label_1" base="EditStaticText"> + <attribute>1</attribute> + <label>Baudrate</label> + </object> + </object> + <object class="sizeritem"> + <flag>wxALIGN_RIGHT</flag> + <border>0</border> + <option>1</option> + <object class="wxChoice" name="choice_baudrate" base="EditChoice"> + <selection>0</selection> + <choices> + <choice>choice 1</choice> + </choices> + </object> + </object> + </object> + </object> + </object> + </object> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxStaticBoxSizer" name="sizer_format" base="EditStaticBoxSizer"> + <orient>wxVERTICAL</orient> + <label>Data Format</label> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxBoxSizer" name="sizer_6" base="EditBoxSizer"> + <orient>wxHORIZONTAL</orient> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>1</option> + <object class="wxStaticText" name="label_3" base="EditStaticText"> + <attribute>1</attribute> + <label>Data Bits</label> + </object> + </object> + <object class="sizeritem"> + <flag>wxALIGN_RIGHT</flag> + <border>0</border> + <option>1</option> + <object class="wxChoice" name="choice_databits" base="EditChoice"> + <selection>0</selection> + <choices> + <choice>choice 1</choice> + </choices> + </object> + </object> + </object> + </object> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxBoxSizer" name="sizer_7" base="EditBoxSizer"> + <orient>wxHORIZONTAL</orient> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>1</option> + <object class="wxStaticText" name="label_4" base="EditStaticText"> + <attribute>1</attribute> + <label>Stop Bits</label> + </object> + </object> + <object class="sizeritem"> + <flag>wxALIGN_RIGHT</flag> + <border>0</border> + <option>1</option> + <object class="wxChoice" name="choice_stopbits" base="EditChoice"> + <selection>0</selection> + <choices> + <choice>choice 1</choice> + </choices> + </object> + </object> + </object> + </object> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxBoxSizer" name="sizer_8" base="EditBoxSizer"> + <orient>wxHORIZONTAL</orient> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>1</option> + <object class="wxStaticText" name="label_5" base="EditStaticText"> + <attribute>1</attribute> + <label>Parity</label> + </object> + </object> + <object class="sizeritem"> + <flag>wxALIGN_RIGHT</flag> + <border>0</border> + <option>1</option> + <object class="wxChoice" name="choice_parity" base="EditChoice"> + <selection>0</selection> + <choices> + <choice>choice 1</choice> + </choices> + </object> + </object> + </object> + </object> + </object> + </object> + <object class="sizeritem"> + <border>0</border> + <option>0</option> + <object class="wxStaticBoxSizer" name="sizer_timeout" base="EditStaticBoxSizer"> + <orient>wxHORIZONTAL</orient> + <label>Timeout</label> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>0</option> + <object class="wxCheckBox" name="checkbox_timeout" base="EditCheckBox"> + <label>Use Timeout</label> + </object> + </object> + <object class="sizeritem"> + <border>0</border> + <option>0</option> + <object class="wxTextCtrl" name="text_ctrl_timeout" base="EditTextCtrl"> + <disabled>1</disabled> + </object> + </object> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>0</option> + <object class="wxStaticText" name="label_6" base="EditStaticText"> + <attribute>1</attribute> + <label>seconds</label> + </object> + </object> + </object> + </object> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxStaticBoxSizer" name="sizer_flow" base="EditStaticBoxSizer"> + <orient>wxHORIZONTAL</orient> + <label>Flow Control</label> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>0</option> + <object class="wxCheckBox" name="checkbox_rtscts" base="EditCheckBox"> + <label>RTS/CTS</label> + </object> + </object> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> + <border>4</border> + <option>0</option> + <object class="wxCheckBox" name="checkbox_xonxoff" base="EditCheckBox"> + <label>Xon/Xoff</label> + </object> + </object> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>1</option> + <object class="spacer" name="spacer" base="EditSpacer"> + <height>10</height> + <width>10</width> + </object> + </object> + </object> + </object> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_RIGHT</flag> + <border>4</border> + <option>0</option> + <object class="wxBoxSizer" name="sizer_3" base="EditBoxSizer"> + <orient>wxHORIZONTAL</orient> + <object class="sizeritem"> + <border>0</border> + <option>0</option> + <object class="wxButton" name="button_ok" base="EditButton"> + <default>1</default> + <label>OK</label> + </object> + </object> + <object class="sizeritem"> + <border>0</border> + <option>0</option> + <object class="wxButton" name="button_cancel" base="EditButton"> + <label>Cancel</label> + </object> + </object> + </object> + </object> + </object> + </object> +</application> diff --git a/examples/wxTerminal.py b/examples/wxTerminal.py new file mode 100644 index 0000000..646c272 --- /dev/null +++ b/examples/wxTerminal.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python +# generated by wxGlade 0.3.1 on Fri Oct 03 23:23:45 2003 + +#from wxPython.wx import * +import wx +import wxSerialConfigDialog +import serial +import threading + +#---------------------------------------------------------------------- +# Create an own event type, so that GUI updates can be delegated +# this is required as on some platforms only the main thread can +# access the GUI without crashing. wxMutexGuiEnter/wxMutexGuiLeave +# could be used too, but an event is more elegant. + +SERIALRX = wx.NewEventType() +# bind to serial data receive events +EVT_SERIALRX = wx.PyEventBinder(SERIALRX, 0) + +class SerialRxEvent(wx.PyCommandEvent): + eventType = SERIALRX + def __init__(self, windowID, data): + wx.PyCommandEvent.__init__(self, self.eventType, windowID) + self.data = data + + def Clone(self): + self.__class__(self.GetId(), self.data) + +#---------------------------------------------------------------------- + +ID_CLEAR = wx.NewId() +ID_SAVEAS = wx.NewId() +ID_SETTINGS = wx.NewId() +ID_TERM = wx.NewId() +ID_EXIT = wx.NewId() + +NEWLINE_CR = 0 +NEWLINE_LF = 1 +NEWLINE_CRLF = 2 + +class TerminalSetup: + """Placeholder for various terminal settings. Used to pass the + options to the TerminalSettingsDialog.""" + def __init__(self): + self.echo = False + self.unprintable = False + self.newline = NEWLINE_CRLF + +class TerminalSettingsDialog(wx.Dialog): + """Simple dialog with common terminal settings like echo, newline mode.""" + + def __init__(self, *args, **kwds): + self.settings = kwds['settings'] + del kwds['settings'] + # begin wxGlade: TerminalSettingsDialog.__init__ + kwds["style"] = wx.DEFAULT_DIALOG_STYLE + wx.Dialog.__init__(self, *args, **kwds) + self.checkbox_echo = wx.CheckBox(self, -1, "Local Echo") + self.checkbox_unprintable = wx.CheckBox(self, -1, "Show unprintable characters") + self.radio_box_newline = wx.RadioBox(self, -1, "Newline Handling", choices=["CR only", "LF only", "CR+LF"], majorDimension=0, style=wx.RA_SPECIFY_ROWS) + self.button_ok = wx.Button(self, -1, "OK") + self.button_cancel = wx.Button(self, -1, "Cancel") + + self.__set_properties() + self.__do_layout() + # end wxGlade + self.__attach_events() + self.checkbox_echo.SetValue(self.settings.echo) + self.checkbox_unprintable.SetValue(self.settings.unprintable) + self.radio_box_newline.SetSelection(self.settings.newline) + + def __set_properties(self): + # begin wxGlade: TerminalSettingsDialog.__set_properties + self.SetTitle("Terminal Settings") + self.radio_box_newline.SetSelection(0) + self.button_ok.SetDefault() + # end wxGlade + + def __do_layout(self): + # begin wxGlade: TerminalSettingsDialog.__do_layout + sizer_2 = wx.BoxSizer(wx.VERTICAL) + sizer_3 = wx.BoxSizer(wx.HORIZONTAL) + sizer_4 = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Input/Output"), wx.VERTICAL) + sizer_4.Add(self.checkbox_echo, 0, wx.ALL, 4) + sizer_4.Add(self.checkbox_unprintable, 0, wx.ALL, 4) + sizer_4.Add(self.radio_box_newline, 0, 0, 0) + sizer_2.Add(sizer_4, 0, wx.EXPAND, 0) + sizer_3.Add(self.button_ok, 0, 0, 0) + sizer_3.Add(self.button_cancel, 0, 0, 0) + sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4) + self.SetAutoLayout(1) + self.SetSizer(sizer_2) + sizer_2.Fit(self) + sizer_2.SetSizeHints(self) + self.Layout() + # end wxGlade + + def __attach_events(self): + self.Bind(wx.EVT_BUTTON, self.OnOK, id = self.button_ok.GetId()) + self.Bind(wx.EVT_BUTTON, self.OnCancel, id = self.button_cancel.GetId()) + + def OnOK(self, events): + """Update data wil new values and close dialog.""" + self.settings.echo = self.checkbox_echo.GetValue() + self.settings.unprintable = self.checkbox_unprintable.GetValue() + self.settings.newline = self.radio_box_newline.GetSelection() + self.EndModal(wx.ID_OK) + + def OnCancel(self, events): + """Do not update data but close dialog.""" + self.EndModal(wx.ID_CANCEL) + +# end of class TerminalSettingsDialog + + +class TerminalFrame(wx.Frame): + """Simple terminal program for wxPython""" + + def __init__(self, *args, **kwds): + self.serial = serial.Serial() + self.serial.timeout = 0.5 #make sure that the alive event can be checked from time to time + self.settings = TerminalSetup() #placeholder for the settings + self.thread = None + self.alive = threading.Event() + # begin wxGlade: TerminalFrame.__init__ + kwds["style"] = wx.DEFAULT_FRAME_STYLE + wx.Frame.__init__(self, *args, **kwds) + self.text_ctrl_output = wx.TextCtrl(self, -1, "", style=wx.TE_MULTILINE|wx.TE_READONLY) + + # Menu Bar + self.frame_terminal_menubar = wx.MenuBar() + self.SetMenuBar(self.frame_terminal_menubar) + wxglade_tmp_menu = wx.Menu() + wxglade_tmp_menu.Append(ID_CLEAR, "&Clear", "", wx.ITEM_NORMAL) + wxglade_tmp_menu.Append(ID_SAVEAS, "&Save Text As...", "", wx.ITEM_NORMAL) + wxglade_tmp_menu.AppendSeparator() + wxglade_tmp_menu.Append(ID_SETTINGS, "&Port Settings...", "", wx.ITEM_NORMAL) + wxglade_tmp_menu.Append(ID_TERM, "&Terminal Settings...", "", wx.ITEM_NORMAL) + wxglade_tmp_menu.AppendSeparator() + wxglade_tmp_menu.Append(ID_EXIT, "&Exit", "", wx.ITEM_NORMAL) + self.frame_terminal_menubar.Append(wxglade_tmp_menu, "&File") + # Menu Bar end + + self.__set_properties() + self.__do_layout() + # end wxGlade + self.__attach_events() #register events + self.OnPortSettings(None) #call setup dialog on startup, opens port + if not self.alive.isSet(): + self.Close() + + def StartThread(self): + """Start the receiver thread""" + self.thread = threading.Thread(target=self.ComPortThread) + self.thread.setDaemon(1) + self.alive.set() + self.thread.start() + + def StopThread(self): + """Stop the receiver thread, wait util it's finished.""" + if self.thread is not None: + self.alive.clear() #clear alive event for thread + self.thread.join() #wait until thread has finished + self.thread = None + + def __set_properties(self): + # begin wxGlade: TerminalFrame.__set_properties + self.SetTitle("Serial Terminal") + self.SetSize((546, 383)) + # end wxGlade + + def __do_layout(self): + # begin wxGlade: TerminalFrame.__do_layout + sizer_1 = wx.BoxSizer(wx.VERTICAL) + sizer_1.Add(self.text_ctrl_output, 1, wx.EXPAND, 0) + self.SetAutoLayout(1) + self.SetSizer(sizer_1) + self.Layout() + # end wxGlade + + def __attach_events(self): + #register events at the controls + self.Bind(wx.EVT_MENU, self.OnClear, id = ID_CLEAR) + self.Bind(wx.EVT_MENU, self.OnSaveAs, id = ID_SAVEAS) + self.Bind(wx.EVT_MENU, self.OnExit, id = ID_EXIT) + self.Bind(wx.EVT_MENU, self.OnPortSettings, id = ID_SETTINGS) + self.Bind(wx.EVT_MENU, self.OnTermSettings, id = ID_TERM) + self.text_ctrl_output.Bind(wx.EVT_CHAR, self.OnKey) + self.Bind(EVT_SERIALRX, self.OnSerialRead) + self.Bind(wx.EVT_CLOSE, self.OnClose) + + def OnExit(self, event): + """Menu point Exit""" + self.Close() + + def OnClose(self, event): + """Called on application shutdown.""" + self.StopThread() #stop reader thread + self.serial.close() #cleanup + self.Destroy() #close windows, exit app + + def OnSaveAs(self, event): + """Save contents of output window.""" + filename = None + dlg = wx.FileDialog(None, "Save Text As...", ".", "", "Text File|*.txt|All Files|*", wx.SAVE) + if dlg.ShowModal() == wx.ID_OK: + filename = dlg.GetPath() + dlg.Destroy() + + if filename is not None: + f = file(filename, 'w') + text = self.text_ctrl_output.GetValue() + if type(text) == unicode: + text = text.encode("latin1") #hm, is that a good asumption? + f.write(text) + f.close() + + def OnClear(self, event): + """Clear contents of output window.""" + self.text_ctrl_output.Clear() + + def OnPortSettings(self, event=None): + """Show the portsettings dialog. The reader thread is stopped for the + settings change.""" + if event is not None: #will be none when called on startup + self.StopThread() + self.serial.close() + ok = False + while not ok: + dialog_serial_cfg = wxSerialConfigDialog.SerialConfigDialog(None, -1, "", + show=wxSerialConfigDialog.SHOW_BAUDRATE|wxSerialConfigDialog.SHOW_FORMAT|wxSerialConfigDialog.SHOW_FLOW, + serial=self.serial + ) + result = dialog_serial_cfg.ShowModal() + dialog_serial_cfg.Destroy() + #open port if not called on startup, open it on startup and OK too + if result == wx.ID_OK or event is not None: + try: + self.serial.open() + except serial.SerialException, e: + dlg = wx.MessageDialog(None, str(e), "Serial Port Error", wx.OK | wx.ICON_ERROR) + dlg.ShowModal() + dlg.Destroy() + else: + self.StartThread() + self.SetTitle("Serial Terminal on %s [%s, %s%s%s%s%s]" % ( + self.serial.portstr, + self.serial.baudrate, + self.serial.bytesize, + self.serial.parity, + self.serial.stopbits, + self.serial.rtscts and ' RTS/CTS' or '', + self.serial.xonxoff and ' Xon/Xoff' or '', + ) + ) + ok = True + else: + #on startup, dialog aborted + self.alive.clear() + ok = True + + def OnTermSettings(self, event): + """Menu point Terminal Settings. Show the settings dialog + with the current terminal settings""" + dialog = TerminalSettingsDialog(None, -1, "", settings=self.settings) + result = dialog.ShowModal() + dialog.Destroy() + + def OnKey(self, event): + """Key event handler. if the key is in the ASCII range, write it to the serial port. + Newline handling and local echo is also done here.""" + code = event.GetKeyCode() + if code < 256: #is it printable? + if code == 13: #is it a newline? (check for CR which is the RETURN key) + if self.settings.echo: #do echo if needed + self.text_ctrl_output.AppendText('\n') + if self.settings.newline == NEWLINE_CR: + self.serial.write('\r') #send CR + elif self.settings.newline == NEWLINE_LF: + self.serial.write('\n') #send LF + elif self.settings.newline == NEWLINE_CRLF: + self.serial.write('\r\n') #send CR+LF + else: + char = chr(code) + if self.settings.echo: #do echo if needed + self.text_ctrl_output.WriteText(char) + self.serial.write(char) #send the charcater + else: + print "Extra Key:", code + + def OnSerialRead(self, event): + """Handle input from the serial port.""" + text = event.data + if self.settings.unprintable: + text = ''.join([(c >= ' ') and c or '<%d>' % ord(c) for c in text]) + self.text_ctrl_output.AppendText(text) + + def ComPortThread(self): + """Thread that handles the incomming traffic. Does the basic input + transformation (newlines) and generates an SerialRxEvent""" + while self.alive.isSet(): #loop while alive event is true + text = self.serial.read(1) #read one, with timout + if text: #check if not timeout + n = self.serial.inWaiting() #look if there is more to read + if n: + text = text + self.serial.read(n) #get it + #newline transformation + if self.settings.newline == NEWLINE_CR: + text = text.replace('\r', '\n') + elif self.settings.newline == NEWLINE_LF: + pass + elif self.settings.newline == NEWLINE_CRLF: + text = text.replace('\r\n', '\n') + event = SerialRxEvent(self.GetId(), text) + self.GetEventHandler().AddPendingEvent(event) + #~ self.OnSerialRead(text) #output text in window + +# end of class TerminalFrame + + +class MyApp(wx.App): + def OnInit(self): + wx.InitAllImageHandlers() + frame_terminal = TerminalFrame(None, -1, "") + self.SetTopWindow(frame_terminal) + frame_terminal.Show(1) + return 1 + +# end of class MyApp + +if __name__ == "__main__": + app = MyApp(0) + app.MainLoop() diff --git a/examples/wxTerminal.wxg b/examples/wxTerminal.wxg new file mode 100644 index 0000000..183f876 --- /dev/null +++ b/examples/wxTerminal.wxg @@ -0,0 +1,127 @@ +<?xml version="1.0"?> +<!-- generated by wxGlade 0.3.1 on Sat Oct 04 02:41:48 2003 --> + +<application path="D:\prog\python\pyserial_sf\pyserial\examples\wxTerminal.py" name="app" class="MyApp" option="0" language="python" top_window="frame_terminal" encoding="ISO-8859-1" use_gettext="0" overwrite="0"> + <object class="TerminalFrame" name="frame_terminal" base="EditFrame"> + <style>wxDEFAULT_FRAME_STYLE</style> + <title>Serial Terminal</title> + <menubar>1</menubar> + <size>546, 383</size> + <object class="wxBoxSizer" name="sizer_1" base="EditBoxSizer"> + <orient>wxVERTICAL</orient> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>1</option> + <object class="wxTextCtrl" name="text_ctrl_output" base="EditTextCtrl"> + <style>wxTE_MULTILINE|wxTE_READONLY</style> + </object> + </object> + </object> + <object class="wxMenuBar" name="frame_terminal_menubar" base="EditMenuBar"> + <menus> + <menu name="" label="&File"> + <item> + <label>&Clear</label> + <id>ID_CLEAR</id> + </item> + <item> + <label>&Save Text As...</label> + <id>ID_SAVEAS</id> + </item> + <item> + <label>---</label> + <id>---</id> + <name>---</name> + </item> + <item> + <label>&Port Settings...</label> + <id>ID_SETTINGS</id> + </item> + <item> + <label>&Terminal Settings...</label> + <id>ID_TERM</id> + </item> + <item> + <label>---</label> + <name>---</name> + </item> + <item> + <label>&Exit</label> + <id>ID_EXIT</id> + </item> + </menu> + </menus> + </object> + </object> + <object class="TerminalSettingsDialog" name="dialog_terminal_Settings" base="EditDialog"> + <style>wxDEFAULT_DIALOG_STYLE</style> + <title>Terminal Settings</title> + <object class="wxBoxSizer" name="sizer_2" base="EditBoxSizer"> + <orient>wxVERTICAL</orient> + <object class="sizeritem"> + <flag>wxEXPAND</flag> + <border>0</border> + <option>0</option> + <object class="wxStaticBoxSizer" name="sizer_4" base="EditStaticBoxSizer"> + <orient>wxVERTICAL</orient> + <label>Input/Output</label> + <object class="sizeritem"> + <flag>wxALL</flag> + <border>4</border> + <option>0</option> + <object class="wxCheckBox" name="checkbox_echo" base="EditCheckBox"> + <label>Local Echo</label> + </object> + </object> + <object class="sizeritem"> + <flag>wxALL</flag> + <border>4</border> + <option>0</option> + <object class="wxCheckBox" name="checkbox_unprintable" base="EditCheckBox"> + <label>Show unprintable characters</label> + </object> + </object> + <object class="sizeritem"> + <border>0</border> + <option>0</option> + <object class="wxRadioBox" name="radio_box_newline" base="EditRadioBox"> + <style>wxRA_SPECIFY_ROWS</style> + <selection>0</selection> + <dimension>0</dimension> + <label>Newline Handling</label> + <choices> + <choice>CR only</choice> + <choice>LF only</choice> + <choice>CR+LF</choice> + </choices> + </object> + </object> + </object> + </object> + <object class="sizeritem"> + <flag>wxALL|wxALIGN_RIGHT</flag> + <border>4</border> + <option>0</option> + <object class="wxBoxSizer" name="sizer_3" base="EditBoxSizer"> + <orient>wxHORIZONTAL</orient> + <object class="sizeritem"> + <border>0</border> + <option>0</option> + <object class="wxButton" name="button_ok" base="EditButton"> + <default>1</default> + <label>OK</label> + </object> + </object> + <object class="sizeritem"> + <border>0</border> + <option>0</option> + <object class="wxButton" name="button_cancel" base="EditButton"> + <label>Cancel</label> + </object> + </object> + </object> + </object> + </object> + </object> +</application> |