From bfd7c372c99f0c07da9a3bda4685dc33ee06cd91 Mon Sep 17 00:00:00 2001 From: cliechti Date: Thu, 22 Jul 2010 00:28:51 +0000 Subject: tag for release 2.5 --- examples/enhancedserial.py | 62 ++++ examples/miniterm.py | 584 ++++++++++++++++++++++++++++++++ examples/port_publisher.py | 485 ++++++++++++++++++++++++++ examples/port_publisher.sh | 44 +++ examples/rfc2217_server.py | 204 +++++++++++ examples/scan.py | 30 ++ examples/scanlinux.py | 20 ++ examples/scanwin32.py | 232 +++++++++++++ examples/setup-miniterm-py2exe.py | 26 ++ examples/setup-rfc2217_server-py2exe.py | 24 ++ examples/setup-wxTerminal-py2exe.py | 35 ++ examples/tcp_serial_redirect.py | 326 ++++++++++++++++++ examples/wxSerialConfigDialog.py | 260 ++++++++++++++ examples/wxSerialConfigDialog.wxg | 262 ++++++++++++++ examples/wxTerminal.py | 333 ++++++++++++++++++ examples/wxTerminal.wxg | 127 +++++++ 16 files changed, 3054 insertions(+) create mode 100644 examples/enhancedserial.py create mode 100644 examples/miniterm.py create mode 100644 examples/port_publisher.py create mode 100644 examples/port_publisher.sh create mode 100644 examples/rfc2217_server.py create mode 100644 examples/scan.py create mode 100644 examples/scanlinux.py create mode 100644 examples/scanwin32.py create mode 100644 examples/setup-miniterm-py2exe.py create mode 100644 examples/setup-rfc2217_server-py2exe.py create mode 100644 examples/setup-wxTerminal-py2exe.py create mode 100644 examples/tcp_serial_redirect.py create mode 100644 examples/wxSerialConfigDialog.py create mode 100644 examples/wxSerialConfigDialog.wxg create mode 100644 examples/wxTerminal.py create mode 100644 examples/wxTerminal.wxg (limited to 'examples') diff --git a/examples/enhancedserial.py b/examples/enhancedserial.py new file mode 100644 index 0000000..2c81ae1 --- /dev/null +++ b/examples/enhancedserial.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +"""Enhanced Serial Port class +part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net + +another implementation of the readline and readlines method. +this one should be more efficient because a bunch of characters are read +on each access, but the drawback is that a timeout must be specified to +make it work (enforced by the class __init__). + +this class could be enhanced with a read_until() method and more +like found in the telnetlib. +""" + +from serial import Serial + +class EnhancedSerial(Serial): + def __init__(self, *args, **kwargs): + #ensure that a reasonable timeout is set + timeout = kwargs.get('timeout',0.1) + if timeout < 0.01: timeout = 0.1 + kwargs['timeout'] = timeout + Serial.__init__(self, *args, **kwargs) + self.buf = '' + + def readline(self, maxsize=None, timeout=1): + """maxsize is ignored, timeout in seconds is the max time that is way for a complete line""" + tries = 0 + while 1: + self.buf += self.read(512) + pos = self.buf.find('\n') + if pos >= 0: + line, self.buf = self.buf[:pos+1], self.buf[pos+1:] + return line + tries += 1 + if tries * self.timeout > timeout: + break + line, self.buf = self.buf, '' + return line + + def readlines(self, sizehint=None, timeout=1): + """read all lines that are available. abort after timout + when no more data arrives.""" + lines = [] + while 1: + line = self.readline(timeout=timeout) + if line: + lines.append(line) + if not line or line[-1:] != '\n': + break + return lines + +if __name__=='__main__': + #do some simple tests with a Loopback HW (see test.py for details) + PORT = 0 + #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) ) + s = EnhancedSerial(PORT) + #write out some test data lines + s.write('\n'.join("hello how are you".split())) + #and read them back + print s.readlines() + #this one should print an empty list + print s.readlines(timeout=0.4) diff --git a/examples/miniterm.py b/examples/miniterm.py new file mode 100644 index 0000000..672fe69 --- /dev/null +++ b/examples/miniterm.py @@ -0,0 +1,584 @@ +#!/usr/bin/env python + +# Very simple serial terminal +# (C)2002-2009 Chris Liechti + +# Input characters are sent directly (only LF -> CR/LF/CRLF translation is +# done), received characters are displayed as is (or escaped trough pythons +# repr, useful for debug purposes) + + +import sys, os, serial, threading + +EXITCHARCTER = '\x1d' # GS/CTRL+] +MENUCHARACTER = '\x14' # Menu: CTRL+T + + +def key_description(character): + """generate a readable description for a key""" + ascii_code = ord(character) + if ascii_code < 32: + return 'Ctrl+%c' % (ord('@') + ascii_code) + else: + return repr(character) + +# help text, starts with blank line! it's a function so that the current values +# for the shortcut keys is used and not the value at program start +def get_help_text(): + return """ +--- pySerial (%(version)s) - miniterm - help +--- +--- %(exit)-8s Exit program +--- %(menu)-8s Menu escape key, followed by: +--- Menu keys: +--- %(itself)-8s Send the menu character itself to remote +--- %(exchar)-8s Send the exit character to remote +--- %(info)-8s Show info +--- %(upload)-8s Upload file (prompt will be shown) +--- Toggles: +--- %(rts)s RTS %(echo)s local echo +--- %(dtr)s DTR %(break)s BREAK +--- %(lfm)s line feed %(repr)s Cycle repr mode +--- +--- Port settings (%(menu)s followed by the following): +--- 7 8 set data bits +--- n e o s m change parity (None, Even, Odd, Space, Mark) +--- 1 2 3 set stop bits (1, 2, 1.5) +--- b change baud rate +--- x X disable/enable software flow control +--- r R disable/enable hardware flow control +""" % { + 'version': getattr(serial, 'VERSION', 'unkown'), + 'exit': key_description(EXITCHARCTER), + 'menu': key_description(MENUCHARACTER), + 'rts': key_description('\x12'), + 'repr': key_description('\x01'), + 'dtr': key_description('\x04'), + 'lfm': key_description('\x0c'), + 'break': key_description('\x02'), + 'echo': key_description('\x05'), + 'info': key_description('\x09'), + 'upload': key_description('\x15'), + 'itself': key_description(MENUCHARACTER), + 'exchar': key_description(EXITCHARCTER), +} + +# first choose a platform dependant way to read single characters from the console +global console + +if os.name == 'nt': + import msvcrt + class Console: + def __init__(self): + pass + + def setup(self): + pass # Do nothing for 'nt' + + def cleanup(self): + pass # Do nothing for 'nt' + + def getkey(self): + while 1: + z = msvcrt.getch() + if z == '\0' or z == '\xe0': # functions keys + msvcrt.getch() + else: + if z == '\r': + return '\n' + return z + + console = Console() + +elif os.name == 'posix': + import termios, sys, os + class Console: + def __init__(self): + self.fd = sys.stdin.fileno() + + def setup(self): + self.old = termios.tcgetattr(self.fd) + new = termios.tcgetattr(self.fd) + new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG + new[6][termios.VMIN] = 1 + new[6][termios.VTIME] = 0 + termios.tcsetattr(self.fd, termios.TCSANOW, new) + + def getkey(self): + c = os.read(self.fd, 1) + return c + + def cleanup(self): + termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) + + console = Console() + + def cleanup_console(): + console.cleanup() + + console.setup() + sys.exitfunc = cleanup_console #terminal modes have to be restored on exit... + +else: + raise "Sorry no implementation for your platform (%s) available." % sys.platform + + +CONVERT_CRLF = 2 +CONVERT_CR = 1 +CONVERT_LF = 0 +NEWLINE_CONVERISON_MAP = ('\n', '\r', '\r\n') +LF_MODES = ('LF', 'CR', 'CR/LF') + +REPR_MODES = ('raw', 'some control', 'all control', 'hex') + +class Miniterm: + def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False, convert_outgoing=CONVERT_CRLF, repr_mode=0): + try: + self.serial = serial.serial_for_url(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=1) + except AttributeError: + # happens when the installed pyserial is older than 2.5. use the + # Serial class directly then. + self.serial = serial.Serial(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=1) + self.echo = echo + self.repr_mode = repr_mode + self.convert_outgoing = convert_outgoing + self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing] + self.dtr_state = True + self.rts_state = True + self.break_state = False + + def start(self): + self.alive = True + # start serial->console thread + self.receiver_thread = threading.Thread(target=self.reader) + self.receiver_thread.setDaemon(1) + self.receiver_thread.start() + # enter console->serial loop + self.transmitter_thread = threading.Thread(target=self.writer) + self.transmitter_thread.setDaemon(1) + self.transmitter_thread.start() + + def stop(self): + self.alive = False + + def join(self, transmit_only=False): + self.transmitter_thread.join() + if not transmit_only: + self.receiver_thread.join() + + def dump_port_settings(self): + sys.stderr.write("\n--- Settings: %s %s,%s,%s,%s\n" % ( + self.serial.portstr, + self.serial.baudrate, + self.serial.bytesize, + self.serial.parity, + self.serial.stopbits, + )) + sys.stderr.write('--- RTS %s\n' % (self.rts_state and 'active' or 'inactive')) + sys.stderr.write('--- DTR %s\n' % (self.dtr_state and 'active' or 'inactive')) + sys.stderr.write('--- BREAK %s\n' % (self.break_state and 'active' or 'inactive')) + sys.stderr.write('--- software flow control %s\n' % (self.serial.xonxoff and 'active' or 'inactive')) + sys.stderr.write('--- hardware flow control %s\n' % (self.serial.rtscts and 'active' or 'inactive')) + sys.stderr.write('--- data escaping: %s\n' % (REPR_MODES[self.repr_mode],)) + sys.stderr.write('--- linefeed: %s\n' % (LF_MODES[self.convert_outgoing],)) + try: + sys.stderr.write('--- CTS: %s DSR: %s RI: %s CD: %s\n' % ( + (self.serial.getCTS() and 'active' or 'inactive'), + (self.serial.getDSR() and 'active' or 'inactive'), + (self.serial.getRI() and 'active' or 'inactive'), + (self.serial.getCD() and 'active' or 'inactive'), + )) + except serial.SerialException: + # on RFC 2217 ports it can happen to no modem state notification was + # yet received. ignore this error. + pass + + def reader(self): + """loop and copy serial->console""" + try: + while self.alive: + data = self.serial.read(1) + + if self.repr_mode == 0: + # direct output, just have to care about newline setting + if data == '\r' and self.convert_outgoing == CONVERT_CR: + sys.stdout.write('\n') + else: + sys.stdout.write(data) + elif self.repr_mode == 1: + # escape non-printable, let pass newlines + if self.convert_outgoing == CONVERT_CRLF and data in '\r\n': + if data == '\n': + sys.stdout.write('\n') + elif data == '\r': + pass + elif data == '\n' and self.convert_outgoing == CONVERT_LF: + sys.stdout.write('\n') + elif data == '\r' and self.convert_outgoing == CONVERT_CR: + sys.stdout.write('\n') + else: + sys.stdout.write(repr(data)[1:-1]) + elif self.repr_mode == 2: + # escape all non-printable, including newline + sys.stdout.write(repr(data)[1:-1]) + elif self.repr_mode == 3: + # escape everything (hexdump) + for character in data: + sys.stdout.write("%s " % character.encode('hex')) + sys.stdout.flush() + except serial.SerialException, e: + self.alive = False + # would be nice if the console reader could be interruptted at this + # point... + raise + + + def writer(self): + """loop and copy console->serial until EXITCHARCTER character is + found. when MENUCHARACTER is found, interpret the next key + locally. + """ + menu_active = False + try: + while self.alive: + try: + c = console.getkey() + except KeyboardInterrupt: + c = '\x03' + if menu_active: + if c == MENUCHARACTER or c == EXITCHARCTER: # Menu character again/exit char -> send itself + self.serial.write(c) # send character + if self.echo: + sys.stdout.write(c) + elif c == '\x15': # CTRL+U -> upload file + sys.stderr.write('\n--- File to upload: ') + sys.stderr.flush() + console.cleanup() + filename = sys.stdin.readline().rstrip('\r\n') + if filename: + try: + file = open(filename, 'r') + sys.stderr.write('--- Sending file %s ---\n' % filename) + while True: + line = file.readline().rstrip('\r\n') + if not line: + break + self.serial.write(line) + self.serial.write('\r\n') + # Wait for output buffer to drain. + self.serial.flush() + sys.stderr.write('.') # Progress indicator. + sys.stderr.write('\n--- File %s sent ---\n' % filename) + except IOError, e: + sys.stderr.write('--- ERROR opening file %s: %s ---\n' % (filename, e)) + console.setup() + elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help + sys.stderr.write(get_help_text()) + elif c == '\x12': # CTRL+R -> Toggle RTS + self.rts_state = not self.rts_state + self.serial.setRTS(self.rts_state) + sys.stderr.write('--- RTS %s ---\n' % (self.rts_state and 'active' or 'inactive')) + elif c == '\x04': # CTRL+D -> Toggle DTR + self.dtr_state = not self.dtr_state + self.serial.setDTR(self.dtr_state) + sys.stderr.write('--- DTR %s ---\n' % (self.dtr_state and 'active' or 'inactive')) + elif c == '\x02': # CTRL+B -> toggle BREAK condition + self.break_state = not self.break_state + self.serial.setBreak(self.break_state) + sys.stderr.write('--- BREAK %s ---\n' % (self.break_state and 'active' or 'inactive')) + elif c == '\x05': # CTRL+E -> toggle local echo + self.echo = not self.echo + sys.stderr.write('--- local echo %s ---\n' % (self.echo and 'active' or 'inactive')) + elif c == '\x09': # CTRL+I -> info + self.dump_port_settings() + elif c == '\x01': # CTRL+A -> cycle escape mode + self.repr_mode += 1 + if self.repr_mode > 3: + self.repr_mode = 0 + sys.stderr.write('--- escape data: %s ---\n' % ( + REPR_MODES[self.repr_mode], + )) + elif c == '\x0c': # CTRL+L -> cycle linefeed mode + self.convert_outgoing += 1 + if self.convert_outgoing > 2: + self.convert_outgoing = 0 + self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing] + sys.stderr.write('--- line feed %s ---\n' % ( + LF_MODES[self.convert_outgoing], + )) + #~ elif c in 'pP': # P -> change port XXX reader thread would exit + elif c in 'bB': # B -> change baudrate + sys.stderr.write('\n--- Baudrate: ') + sys.stderr.flush() + console.cleanup() + backup = self.serial.baudrate + try: + self.serial.baudrate = int(sys.stdin.readline().strip()) + except ValueError, e: + sys.stderr.write('--- ERROR setting baudrate: %s ---\n' % (e,)) + self.serial.baudrate = backup + else: + self.dump_port_settings() + console.setup() + elif c == '8': # 8 -> change to 8 bits + self.serial.bytesize = serial.EIGHTBITS + self.dump_port_settings() + elif c == '7': # 7 -> change to 8 bits + self.serial.bytesize = serial.SEVENBITS + self.dump_port_settings() + elif c in 'eE': # E -> change to even parity + self.serial.parity = serial.PARITY_EVEN + self.dump_port_settings() + elif c in 'oO': # O -> change to odd parity + self.serial.parity = serial.PARITY_ODD + self.dump_port_settings() + elif c in 'mM': # M -> change to mark parity + self.serial.parity = serial.PARITY_MARK + self.dump_port_settings() + elif c in 'sS': # S -> change to space parity + self.serial.parity = serial.PARITY_SPACE + self.dump_port_settings() + elif c in 'nN': # N -> change to no parity + self.serial.parity = serial.PARITY_NONE + self.dump_port_settings() + elif c == '1': # 1 -> change to 1 stop bits + self.serial.stopbits = serial.STOPBITS_ONE + self.dump_port_settings() + elif c == '2': # 2 -> change to 2 stop bits + self.serial.stopbits = serial.STOPBITS_TWO + self.dump_port_settings() + elif c == '3': # 3 -> change to 1.5 stop bits + self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE + self.dump_port_settings() + elif c in 'xX': # X -> change software flow control + self.serial.xonxoff = (c == 'X') + self.dump_port_settings() + elif c in 'rR': # R -> change hardware flow control + self.serial.rtscts = (c == 'R') + self.dump_port_settings() + else: + sys.stderr.write('--- unknown menu character %s --\n' % key_description(c)) + menu_active = False + elif c == MENUCHARACTER: # next char will be for menu + menu_active = True + elif c == EXITCHARCTER: + self.stop() + break # exit app + elif c == '\n': + self.serial.write(self.newline) # send newline character(s) + if self.echo: + sys.stdout.write(c) # local echo is a real newline in any case + sys.stdout.flush() + else: + self.serial.write(c) # send character + if self.echo: + sys.stdout.write(c) + sys.stdout.flush() + except: + self.alive = False + raise + +def main(): + import optparse + + parser = optparse.OptionParser( + usage = "%prog [options] [port [baudrate]]", + description = "Miniterm - A simple terminal program for the serial port." + ) + + parser.add_option("-p", "--port", + dest = "port", + help = "port, a number (default 0) or a device name (deprecated option)", + default = None + ) + + parser.add_option("-b", "--baud", + dest = "baudrate", + action = "store", + type = 'int', + help = "set baud rate, default %default", + default = 9600 + ) + + parser.add_option("--parity", + dest = "parity", + action = "store", + help = "set parity, one of [N, E, O, S, M], default=N", + default = 'N' + ) + + parser.add_option("-e", "--echo", + dest = "echo", + action = "store_true", + help = "enable local echo (default off)", + default = False + ) + + parser.add_option("--rtscts", + dest = "rtscts", + action = "store_true", + help = "enable RTS/CTS flow control (default off)", + default = False + ) + + parser.add_option("--xonxoff", + dest = "xonxoff", + action = "store_true", + help = "enable software flow control (default off)", + default = False + ) + + parser.add_option("--cr", + dest = "cr", + action = "store_true", + help = "do not send CR+LF, send CR only", + default = False + ) + + parser.add_option("--lf", + dest = "lf", + action = "store_true", + help = "do not send CR+LF, send LF only", + default = False + ) + + parser.add_option("-D", "--debug", + dest = "repr_mode", + action = "count", + help = """debug received data (escape non-printable chars) +--debug can be given multiple times: +0: just print what is received +1: escape non-printable characters, do newlines as unusual +2: escape non-printable characters, newlines too +3: hex dump everything""", + default = 0 + ) + + parser.add_option("--rts", + dest = "rts_state", + action = "store", + type = 'int', + help = "set initial RTS line state (possible values: 0, 1)", + default = None + ) + + parser.add_option("--dtr", + dest = "dtr_state", + action = "store", + type = 'int', + help = "set initial DTR line state (possible values: 0, 1)", + default = None + ) + + parser.add_option("-q", "--quiet", + dest = "quiet", + action = "store_true", + help = "suppress non error messages", + default = False + ) + + parser.add_option("--exit-char", + dest = "exit_char", + action = "store", + type = 'int', + help = "ASCII code of special character that is used to exit the application", + default = 0x1d + ) + + parser.add_option("--menu-char", + dest = "menu_char", + action = "store", + type = 'int', + help = "ASCII code of special character that is used to control miniterm (menu)", + default = 0x14 + ) + + (options, args) = parser.parse_args() + + options.parity = options.parity.upper() + if options.parity not in 'NEOSM': + parser.error("invalid parity") + + if options.cr and options.lf: + parser.error("only one of --cr or --lf can be specified") + + if options.menu_char == options.exit_char: + parser.error('--exit-char can not be the same as --menu-char') + + global EXITCHARCTER, MENUCHARACTER + EXITCHARCTER = chr(options.exit_char) + MENUCHARACTER = chr(options.menu_char) + + port = options.port + baudrate = options.baudrate + if args: + if options.port is not None: + parser.error("no arguments are allowed, options only when --port is given") + port = args.pop(0) + if args: + try: + baudrate = int(args[0]) + except ValueError: + parser.error("baud rate must be a number, not %r" % args[0]) + args.pop(0) + if args: + parser.error("too many arguments") + else: + if port is None: port = 0 + + convert_outgoing = CONVERT_CRLF + if options.cr: + convert_outgoing = CONVERT_CR + elif options.lf: + convert_outgoing = CONVERT_LF + + try: + miniterm = Miniterm( + port, + baudrate, + options.parity, + rtscts=options.rtscts, + xonxoff=options.xonxoff, + echo=options.echo, + convert_outgoing=convert_outgoing, + repr_mode=options.repr_mode, + ) + except serial.SerialException, e: + sys.stderr.write("could not open port %r: %s\n" % (port, e)) + sys.exit(1) + + if not options.quiet: + sys.stderr.write('--- Miniterm on %s: %d,%s,%s,%s ---\n' % ( + miniterm.serial.portstr, + miniterm.serial.baudrate, + miniterm.serial.bytesize, + miniterm.serial.parity, + miniterm.serial.stopbits, + )) + sys.stderr.write('--- Quit: %s | Menu: %s | Help: %s followed by %s ---\n' % ( + key_description(EXITCHARCTER), + key_description(MENUCHARACTER), + key_description(MENUCHARACTER), + key_description('\x08'), + )) + + if options.dtr_state is not None: + if not options.quiet: + sys.stderr.write('--- forcing DTR %s\n' % (options.dtr_state and 'active' or 'inactive')) + miniterm.serial.setDTR(options.dtr_state) + miniterm.dtr_state = options.dtr_state + if options.rts_state is not None: + if not options.quiet: + sys.stderr.write('--- forcing RTS %s\n' % (options.rts_state and 'active' or 'inactive')) + miniterm.serial.setRTS(options.rts_state) + miniterm.rts_state = options.rts_state + + miniterm.start() + miniterm.join(True) + if not options.quiet: + sys.stderr.write("\n--- exit ---\n") + miniterm.join() + + +if __name__ == '__main__': + main() diff --git a/examples/port_publisher.py b/examples/port_publisher.py new file mode 100644 index 0000000..91de063 --- /dev/null +++ b/examples/port_publisher.py @@ -0,0 +1,485 @@ +#! /usr/bin/env python +"""\ +Multi-port serial<->TCP/IP forwarder. +- RFC 2217 +- check existence of serial port periodically +- start/stop forwarders +- each forwarder creates a server socket and opens the serial port +- serial ports are opened only once. network connect/disconnect + does not influence serial port +- only one client per connection +""" +import sys, os, time +import traceback +import socket +import select + +import serial +import serial.rfc2217 + +import avahi +import dbus + +class ZeroconfService: + """\ + A simple class to publish a network service with zeroconf using avahi. + """ + + def __init__(self, name, port, stype="_http._tcp", + domain="", host="", text=""): + self.name = name + self.stype = stype + self.domain = domain + self.host = host + self.port = port + self.text = text + self.group = None + + def publish(self): + bus = dbus.SystemBus() + server = dbus.Interface( + bus.get_object( + avahi.DBUS_NAME, + avahi.DBUS_PATH_SERVER + ), + avahi.DBUS_INTERFACE_SERVER + ) + + g = dbus.Interface( + bus.get_object( + avahi.DBUS_NAME, + server.EntryGroupNew() + ), + avahi.DBUS_INTERFACE_ENTRY_GROUP + ) + + g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0), + self.name, self.stype, self.domain, self.host, + dbus.UInt16(self.port), self.text) + + g.Commit() + self.group = g + + def unpublish(self): + if self.group is not None: + self.group.Reset() + self.group = None + + def __str__(self): + return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype) + + + +class Forwarder(ZeroconfService): + """\ + Single port serial<->TCP/IP forarder that depends on an external select + loop. + - Buffers for serial -> network and network -> serial + - RFC 2217 state + - Zeroconf publish/unpublish on open/close. + """ + + def __init__(self, device, name, network_port, on_close=None): + ZeroconfService.__init__(self, name, network_port, stype='_serial_port._tcp') + self.alive = False + self.network_port = network_port + self.on_close = on_close + self.device = device + self.serial = serial.Serial() + self.serial.port = device + self.serial.baudrate = 115200 + self.serial.timeout = 0 + self.socket = None + self.server_socket = None + self.rfc2217 = None # instantiate later, when connecting + + def __del__(self): + try: + if self.alive: self.close() + except: + pass # XXX errors on shutdown + + def open(self): + """open serial port, start network server and publish service""" + self.buffer_net2ser = '' + self.buffer_ser2net = '' + + # open serial port + try: + self.serial.open() + self.serial.setRTS(False) + except Exception, msg: + self.handle_serial_error(msg) + + self.serial_settings_backup = self.serial.getSettingsDict() + + # start the socket server + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt( + socket.SOL_SOCKET, + socket.SO_REUSEADDR, + self.server_socket.getsockopt( + socket.SOL_SOCKET, + socket.SO_REUSEADDR + ) | 1 + ) + self.server_socket.setblocking(0) + try: + self.server_socket.bind( ('', self.network_port) ) + self.server_socket.listen(1) + except socket.error, msg: + self.handle_server_error() + #~ raise + if not options.quiet: + print "%s: Waiting for connection on %s..." % (self.device, self.network_port) + + # zeroconfig + self.publish() + + # now we are ready + self.alive = True + + def close(self): + """Close all resources and unpublish service""" + if not options.quiet: + print "%s: closing..." % (self.device, ) + self.alive = False + self.unpublish() + if self.server_socket: self.server_socket.close() + if self.socket: + self.handle_disconnect() + self.serial.close() + if self.on_close is not None: + # ensure it is only called once + callback = self.on_close + self.on_close = None + callback(self) + + def write(self, data): + """the write method is used by serial.rfc2217.PortManager. it has to + write to the network.""" + self.buffer_ser2net += data + + def update_select_maps(self, read_map, write_map, error_map): + """Update dictionaries for select call. insert fd->callback mapping""" + if self.alive: + # always handle serial port reads + read_map[self.serial] = self.handle_serial_read + error_map[self.serial] = self.handle_serial_error + # handle serial port writes if buffer is not empty + if self.buffer_net2ser: + write_map[self.serial] = self.handle_serial_write + # handle network + if self.socket is not None: + # handle socket if connected + # only read from network if the internal buffer is not + # already filled. the TCP flow control will hold back data + if len(self.buffer_net2ser) < 2048: + read_map[self.socket] = self.handle_socket_read + # only check for write readiness when there is data + if self.buffer_ser2net: + write_map[self.socket] = self.handle_socket_write + error_map[self.socket] = self.handle_socket_error + else: + # no connection, ensure clear buffer + self.buffer_ser2net = '' + # check the server socket + read_map[self.server_socket] = self.handle_connect + error_map[self.server_socket] = self.handle_server_error + + + def handle_serial_read(self): + """Reading from serial port""" + try: + data = os.read(self.serial.fileno(), 1024) + if data: + # store data in buffer if there is a client connected + if self.socket is not None: + # escape outgoing data when needed (Telnet IAC (0xff) character) + if self.rfc2217: + data = serial.to_bytes(self.rfc2217.escape(data)) + self.buffer_ser2net += data + else: + self.handle_serial_error() + except Exception, msg: + self.handle_serial_error(msg) + + def handle_serial_write(self): + """Writing to serial port""" + try: + # write a chunk + n = os.write(self.serial.fileno(), self.buffer_net2ser) + # and see how large that chunk was, remove that from buffer + self.buffer_net2ser = self.buffer_net2ser[n:] + except Exception, msg: + self.handle_serial_error(msg) + + def handle_serial_error(self, error=None): + """Serial port error""" + # terminate connection + self.close() + + def handle_socket_read(self): + """Read from socket""" + try: + # read a chunk from the serial port + data = self.socket.recv(1024) + if data: + # Process RFC 2217 stuff when enabled + if self.rfc2217: + data = serial.to_bytes(self.rfc2217.filter(data)) + # add data to buffer + self.buffer_net2ser += data + else: + # empty read indicates disconnection + self.handle_disconnect() + except socket.error: + self.handle_socket_error() + + def handle_socket_write(self): + """Write to socket""" + try: + # write a chunk + count = self.socket.send(self.buffer_ser2net) + # and remove the sent data from the buffer + self.buffer_ser2net = self.buffer_ser2net[count:] + except socket.error: + self.handle_socket_error() + + def handle_socket_error(self): + """Socket connection fails""" + self.handle_disconnect() + + def handle_connect(self): + """Server socket gets a connection""" + # accept a connection in any case, close connection + # below if already busy + connection, addr = self.server_socket.accept() + if self.socket is None: + self.socket = connection + self.socket.setblocking(0) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + if not options.quiet: + print '%s: Connected by %s:%s' % (self.device, addr[0], addr[1]) + self.serial.setRTS(True) + self.serial.setDTR(True) + self.rfc2217 = serial.rfc2217.PortManager(self.serial, self) + else: + # reject connection if there is already one + connection.close() + if not options.quiet: + print '%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1]) + + def handle_server_error(self): + """Socket server fails""" + self.close() + + def handle_disconnect(self): + """Socket gets disconnected""" + # signal disconnected terminal with control lines + try: + self.serial.setRTS(False) + self.serial.setDTR(False) + finally: + # restore original port configuration in case it was changed + self.serial.applySettingsDict(self.serial_settings_backup) + # stop RFC 2217 state machine + self.rfc2217 = None + # clear send buffer + self.buffer_ser2net = '' + # close network connection + if self.socket is not None: + self.socket.close() + self.socket = None + if not options.quiet: + print '%s: Disconnected' % self.device + + +def test(): + service = ZeroconfService(name="TestService", port=3000) + service.publish() + raw_input("Press any key to unpublish the service ") + service.unpublish() + + +if __name__ == '__main__': + import optparse + + parser = optparse.OptionParser(usage="""\ +%prog [options] + +Announce the existence of devices using zeroconf and provide +a TCP/IP <-> serial port gateway (implements RFC 2217). + +Note that the TCP/IP server is not protected. Everyone can connect +to it! + +If running as daemon, write to syslog. Otherwise write to stdout. +""") + + parser.add_option("-q", "--quiet", dest="quiet", action="store_true", + help="suppress non error messages", default=False) + + parser.add_option("-o", "--logfile", dest="log_file", + help="write messages file instead of stdout", default=None, metavar="FILE") + + parser.add_option("-d", "--daemon", dest="daemonize", action="store_true", + help="start as daemon", default=False) + + parser.add_option("", "--pidfile", dest="pid_file", + help="specify a name for the PID file", default=None, metavar="FILE") + + (options, args) = parser.parse_args() + + # redirect output if specified + if options.log_file is not None: + class WriteFlushed: + def __init__(self, fileobj): + self.fileobj = fileobj + def write(self, s): + self.fileobj.write(s) + self.fileobj.flush() + def close(self): + self.fileobj.close() + sys.stdout = sys.stderr = WriteFlushed(open(options.log_file, 'a')) + # atexit.register(lambda: sys.stdout.close()) + + if options.daemonize: + # if running as daemon is requested, do the fork magic + # options.quiet = True + import pwd + # do the UNIX double-fork magic, see Stevens' "Advanced + # Programming in the UNIX Environment" for details (ISBN 0201563177) + try: + pid = os.fork() + if pid > 0: + # exit first parent + sys.exit(0) + except OSError, e: + sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # decouple from parent environment + os.chdir("/") # don't prevent unmounting.... + os.setsid() + os.umask(0) + + # do second fork + try: + pid = os.fork() + if pid > 0: + # exit from second parent, print eventual PID before + # print "Daemon PID %d" % pid + if options.pid_file is not None: + open(options.pid_file,'w').write("%d"%pid) + sys.exit(0) + except OSError, e: + sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + if options.log_file is None: + import syslog + syslog.openlog("serial port publisher") + # redirect output to syslog + class WriteToSysLog: + def __init__(self): + self.buffer = '' + def write(self, s): + self.buffer += s + if '\n' in self.buffer: + output, self.buffer = self.buffer.split('\n', 1) + syslog.syslog(output) + def flush(self): + syslog.syslog(self.buffer) + self.buffer = '' + def close(self): + self.flush() + sys.stdout = sys.stderr = WriteToSysLog() + + # ensure the that the daemon runs a normal user, if run as root + #if os.getuid() == 0: + # name, passwd, uid, gid, desc, home, shell = pwd.getpwnam('someuser') + # os.setgid(gid) # set group first + # os.setuid(uid) # set user + + # keep the published stuff in a dictionary + published = {} + # prepare list of device names (hard coded) + device_list = ['/dev/ttyUSB%d' % p for p in range(8)] + # get a nice hostname + hostname = socket.gethostname() + + def unpublish(forwarder): + """when forwarders die, we need to unregister them""" + try: + del published[forwarder.device] + except KeyError: + pass + else: + if not options.quiet: print "unpublish: %s" % (forwarder) + + alive = True + next_check = 0 + # main loop + while alive: + try: + # if it is time, check for serial port devices + now = time.time() + if now > next_check: + next_check = now + 5 + # check each device + for device in device_list: + # if it appeared + if os.path.exists(device): + if device not in published: + num = int(device[-1]) + published[device] = Forwarder( + device, + "%s on %s" % (device, hostname), + 7000+num, + on_close=unpublish + ) + if not options.quiet: print "publish: %s" % (published[device]) + published[device].open() + else: + # or when it disappeared + if device in published: + if not options.quiet: print "unpublish: %s" % (published[device]) + published[device].close() + try: + del published[device] + except KeyError: + pass + + # select_start = time.time() + read_map = {} + write_map = {} + error_map = {} + for publisher in published.values(): + publisher.update_select_maps(read_map, write_map, error_map) + try: + readers, writers, errors = select.select( + read_map.keys(), + write_map.keys(), + error_map.keys(), + 5 + ) + except select.error, err: + if err[0] != EINTR: + raise + # select_end = time.time() + # print "select used %.3f s" % (select_end - select_start) + for reader in readers: + read_map[reader]() + for writer in writers: + write_map[writer]() + for error in errors: + error_map[error]() + # print "operation used %.3f s" % (time.time() - select_end) + except KeyboardInterrupt: + alive = False + except SystemExit: + raise + except: + #~ raise + traceback.print_exc() diff --git a/examples/port_publisher.sh b/examples/port_publisher.sh new file mode 100644 index 0000000..50d4f17 --- /dev/null +++ b/examples/port_publisher.sh @@ -0,0 +1,44 @@ +#! /bin/sh +# daemon starter script +# based on skeleton from Debian GNU/Linux +# cliechti at gmx.net + +PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin +DAEMON=/usr/local/bin/port_publisher.py +NAME=port_publisher +DESC="serial port avahi device publisher" + +test -f $DAEMON || exit 0 + +set -e + +case "$1" in + start) + echo -n "Starting $DESC: " + $DAEMON --daemon --pidfile /var/run/$NAME.pid + echo "$NAME." + ;; + stop) + echo -n "Stopping $DESC: " + start-stop-daemon --stop --quiet --pidfile /var/run/$NAME.pid + # \ --exec $DAEMON + echo "$NAME." + ;; + restart|force-reload) + echo -n "Restarting $DESC: " + start-stop-daemon --stop --quiet --pidfile \ + /var/run/$NAME.pid + # --exec $DAEMON + sleep 1 + $DAEMON --daemon --pidfile /var/run/$NAME.pid + echo "$NAME." + ;; + *) + N=/etc/init.d/$NAME + echo "Usage: $N {start|stop|restart|force-reload}" >&2 + exit 1 + ;; +esac + +exit 0 + diff --git a/examples/rfc2217_server.py b/examples/rfc2217_server.py new file mode 100644 index 0000000..81fb520 --- /dev/null +++ b/examples/rfc2217_server.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python + +# (C) 2009 Chris Liechti +# redirect data from a TCP/IP connection to a serial port and vice versa +# using RFC 2217 + + +import sys +import os +import threading +import time +import socket +import serial +import serial.rfc2217 +import logging + +class Redirector: + def __init__(self, serial_instance, socket, debug=None): + self.serial = serial_instance + self.socket = socket + self._write_lock = threading.Lock() + self.rfc2217 = serial.rfc2217.PortManager( + self.serial, + self, + logger = (debug and logging.getLogger('rfc2217.server')) + ) + self.log = logging.getLogger('redirector') + + def statusline_poller(self): + self.log.debug('status line poll thread started') + while self.alive: + time.sleep(1) + self.rfc2217.check_modem_lines() + self.log.debug('status line poll thread terminated') + + def shortcut(self): + """connect the serial port to the TCP port by copying everything + from one side to the other""" + self.alive = True + self.thread_read = threading.Thread(target=self.reader) + self.thread_read.setDaemon(True) + self.thread_read.setName('serial->socket') + self.thread_read.start() + self.thread_poll = threading.Thread(target=self.statusline_poller) + self.thread_poll.setDaemon(True) + self.thread_poll.setName('status line poll') + self.thread_poll.start() + self.writer() + + def reader(self): + """loop forever and copy serial->socket""" + self.log.debug('reader thread started') + while self.alive: + try: + data = self.serial.read(1) # read one, blocking + n = self.serial.inWaiting() # look if there is more + if n: + data = data + self.serial.read(n) # and get as much as possible + if data: + # escape outgoing data when needed (Telnet IAC (0xff) character) + data = serial.to_bytes(self.rfc2217.escape(data)) + self._write_lock.acquire() + try: + self.socket.sendall(data) # send it over TCP + finally: + self._write_lock.release() + except socket.error, msg: + self.log.error('%s' % (msg,)) + # probably got disconnected + break + self.alive = False + self.log.debug('reader thread terminated') + + def write(self, data): + """thread safe socket write with no data escaping. used to send telnet stuff""" + self._write_lock.acquire() + try: + self.socket.sendall(data) + finally: + self._write_lock.release() + + def writer(self): + """loop forever and copy socket->serial""" + while self.alive: + try: + data = self.socket.recv(1024) + if not data: + break + self.serial.write(serial.to_bytes(self.rfc2217.filter(data))) + except socket.error, msg: + self.log.error('%s' % (msg,)) + # probably got disconnected + break + self.stop() + + def stop(self): + """Stop copying""" + self.log.debug('stopping') + if self.alive: + self.alive = False + self.thread_read.join() + self.thread_poll.join() + + +if __name__ == '__main__': + import optparse + + parser = optparse.OptionParser( + usage = "%prog [options] port", + description = "RFC 2217 Serial to Network (TCP/IP) redirector.", + epilog = """\ +NOTE: no security measures are implemented. Anyone can remotely connect +to this service over the network. + +Only one connection at once is supported. When the connection is terminated +it waits for the next connect. +""") + + parser.add_option("-p", "--localport", + dest = "local_port", + action = "store", + type = 'int', + help = "local TCP port", + default = 2217 + ) + + parser.add_option("-v", "--verbose", + dest = "verbosity", + action = "count", + help = "print more diagnostic messages (option can be given multiple times)", + default = 0 + ) + + (options, args) = parser.parse_args() + + if len(args) != 1: + parser.error('serial port name required as argument') + + if options.verbosity > 3: + options.verbosity = 3 + level = ( + logging.WARNING, + logging.INFO, + logging.DEBUG, + logging.NOTSET, + )[options.verbosity] + logging.basicConfig(level=logging.INFO) + logging.getLogger('root').setLevel(logging.INFO) + logging.getLogger('rfc2217').setLevel(level) + + # connect to serial port + ser = serial.Serial() + ser.port = args[0] + ser.timeout = 3 # required so that the reader thread can exit + + logging.info("RFC 2217 TCP/IP to Serial redirector - type Ctrl-C / BREAK to quit") + + try: + ser.open() + except serial.SerialException, e: + logging.error("Could not open serial port %s: %s" % (ser.portstr, e)) + sys.exit(1) + + logging.info("Serving serial port: %s" % (ser.portstr,)) + settings = ser.getSettingsDict() + # reset contol line as no _remote_ "terminal" has been connected yet + ser.setDTR(False) + ser.setRTS(False) + + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + srv.bind( ('', options.local_port) ) + srv.listen(1) + logging.info("TCP/IP port: %s" % (options.local_port,)) + while True: + try: + connection, addr = srv.accept() + logging.info('Connected by %s:%s' % (addr[0], addr[1])) + connection.setsockopt( socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + ser.setRTS(True) + ser.setDTR(True) + # enter network <-> serial loop + r = Redirector( + ser, + connection, + options.verbosity > 0 + ) + try: + r.shortcut() + finally: + logging.info('Disconnected') + r.stop() + connection.close() + ser.setDTR(False) + ser.setRTS(False) + # Restore port settings (may have been changed by RFC 2217 capable + # client) + ser.applySettingsDict(settings) + except KeyboardInterrupt: + break + except socket.error, msg: + logging.error('%s' % (msg,)) + + logging.info('--- exit ---') diff --git a/examples/scan.py b/examples/scan.py new file mode 100644 index 0000000..82c5458 --- /dev/null +++ b/examples/scan.py @@ -0,0 +1,30 @@ +#! /usr/bin/env python +"""\ +Scan for serial ports. + +Part of pySerial (http://pyserial.sf.net) +(C) 2002-2003 + +The scan function of this module tries to open each port number +from 0 to 255 and it builds a list of those ports where this was +successful. +""" + +import serial + +def scan(): + """scan for available ports. return a list of tuples (num, name)""" + available = [] + for i in range(256): + try: + s = serial.Serial(i) + available.append( (i, s.portstr)) + s.close() # explicit close 'cause of delayed GC in java + except serial.SerialException: + pass + return available + +if __name__=='__main__': + print "Found ports:" + for n,s in scan(): + print "(%d) %s" % (n,s) diff --git a/examples/scanlinux.py b/examples/scanlinux.py new file mode 100644 index 0000000..7cf6383 --- /dev/null +++ b/examples/scanlinux.py @@ -0,0 +1,20 @@ +#! /usr/bin/env python +"""\ +Scan for serial ports. Linux specific variant that also includes USB/Serial +adapters. + +Part of pySerial (http://pyserial.sf.net) +(C) 2009 +""" + +import serial +import glob + +def scan(): + """scan for available ports. return a list of device names.""" + return glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + +if __name__=='__main__': + print "Found ports:" + for name in scan(): + print name diff --git a/examples/scanwin32.py b/examples/scanwin32.py new file mode 100644 index 0000000..2d3249d --- /dev/null +++ b/examples/scanwin32.py @@ -0,0 +1,232 @@ +import ctypes +import re + +def ValidHandle(value): + if value == 0: + raise ctypes.WinError() + return value + +NULL = 0 +HDEVINFO = ctypes.c_int +BOOL = ctypes.c_int +CHAR = ctypes.c_char +PCTSTR = ctypes.c_char_p +HWND = ctypes.c_uint +DWORD = ctypes.c_ulong +PDWORD = ctypes.POINTER(DWORD) +ULONG = ctypes.c_ulong +ULONG_PTR = ctypes.POINTER(ULONG) +#~ PBYTE = ctypes.c_char_p +PBYTE = ctypes.c_void_p + +class GUID(ctypes.Structure): + _fields_ = [ + ('Data1', ctypes.c_ulong), + ('Data2', ctypes.c_ushort), + ('Data3', ctypes.c_ushort), + ('Data4', ctypes.c_ubyte*8), + ] + def __str__(self): + return "{%08x-%04x-%04x-%s-%s}" % ( + self.Data1, + self.Data2, + self.Data3, + ''.join(["%02x" % d for d in self.Data4[:2]]), + ''.join(["%02x" % d for d in self.Data4[2:]]), + ) + +class SP_DEVINFO_DATA(ctypes.Structure): + _fields_ = [ + ('cbSize', DWORD), + ('ClassGuid', GUID), + ('DevInst', DWORD), + ('Reserved', ULONG_PTR), + ] + def __str__(self): + return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst) +PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA) + +class SP_DEVICE_INTERFACE_DATA(ctypes.Structure): + _fields_ = [ + ('cbSize', DWORD), + ('InterfaceClassGuid', GUID), + ('Flags', DWORD), + ('Reserved', ULONG_PTR), + ] + def __str__(self): + return "InterfaceClassGuid:%s Flags:%s" % (self.InterfaceClassGuid, self.Flags) + +PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA) + +PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p + +class dummy(ctypes.Structure): + _fields_=[("d1", DWORD), ("d2", CHAR)] + _pack_ = 1 +SIZEOF_SP_DEVICE_INTERFACE_DETAIL_DATA_A = ctypes.sizeof(dummy) + +SetupDiDestroyDeviceInfoList = ctypes.windll.setupapi.SetupDiDestroyDeviceInfoList +SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO] +SetupDiDestroyDeviceInfoList.restype = BOOL + +SetupDiGetClassDevs = ctypes.windll.setupapi.SetupDiGetClassDevsA +SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD] +SetupDiGetClassDevs.restype = ValidHandle # HDEVINFO + +SetupDiEnumDeviceInterfaces = ctypes.windll.setupapi.SetupDiEnumDeviceInterfaces +SetupDiEnumDeviceInterfaces.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, ctypes.POINTER(GUID), DWORD, PSP_DEVICE_INTERFACE_DATA] +SetupDiEnumDeviceInterfaces.restype = BOOL + +SetupDiGetDeviceInterfaceDetail = ctypes.windll.setupapi.SetupDiGetDeviceInterfaceDetailA +SetupDiGetDeviceInterfaceDetail.argtypes = [HDEVINFO, PSP_DEVICE_INTERFACE_DATA, PSP_DEVICE_INTERFACE_DETAIL_DATA, DWORD, PDWORD, PSP_DEVINFO_DATA] +SetupDiGetDeviceInterfaceDetail.restype = BOOL + +SetupDiGetDeviceRegistryProperty = ctypes.windll.setupapi.SetupDiGetDeviceRegistryPropertyA +SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD] +SetupDiGetDeviceRegistryProperty.restype = BOOL + + +GUID_CLASS_COMPORT = GUID(0x86e0d1e0L, 0x8089, 0x11d0, + (ctypes.c_ubyte*8)(0x9c, 0xe4, 0x08, 0x00, 0x3e, 0x30, 0x1f, 0x73)) + +DIGCF_PRESENT = 2 +DIGCF_DEVICEINTERFACE = 16 +INVALID_HANDLE_VALUE = 0 +ERROR_INSUFFICIENT_BUFFER = 122 +SPDRP_HARDWAREID = 1 +SPDRP_FRIENDLYNAME = 12 +SPDRP_LOCATION_INFORMATION = 13 +ERROR_NO_MORE_ITEMS = 259 + +def comports(available_only=True): + """This generator scans the device registry for com ports and yields + (order, port, desc, hwid). If available_only is true only return currently + existing ports. Order is a helper to get sorted lists. it can be ignored + otherwise.""" + flags = DIGCF_DEVICEINTERFACE + if available_only: + flags |= DIGCF_PRESENT + g_hdi = SetupDiGetClassDevs(ctypes.byref(GUID_CLASS_COMPORT), None, NULL, flags); + #~ for i in range(256): + for dwIndex in range(256): + did = SP_DEVICE_INTERFACE_DATA() + did.cbSize = ctypes.sizeof(did) + + if not SetupDiEnumDeviceInterfaces( + g_hdi, + None, + ctypes.byref(GUID_CLASS_COMPORT), + dwIndex, + ctypes.byref(did) + ): + if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS: + raise ctypes.WinError() + break + + dwNeeded = DWORD() + # get the size + if not SetupDiGetDeviceInterfaceDetail( + g_hdi, + ctypes.byref(did), + None, 0, ctypes.byref(dwNeeded), + None + ): + # Ignore ERROR_INSUFFICIENT_BUFFER + if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + raise ctypes.WinError() + # allocate buffer + class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure): + _fields_ = [ + ('cbSize', DWORD), + ('DevicePath', CHAR*(dwNeeded.value - ctypes.sizeof(DWORD))), + ] + def __str__(self): + return "DevicePath:%s" % (self.DevicePath,) + idd = SP_DEVICE_INTERFACE_DETAIL_DATA_A() + idd.cbSize = SIZEOF_SP_DEVICE_INTERFACE_DETAIL_DATA_A + devinfo = SP_DEVINFO_DATA() + devinfo.cbSize = ctypes.sizeof(devinfo) + if not SetupDiGetDeviceInterfaceDetail( + g_hdi, + ctypes.byref(did), + ctypes.byref(idd), dwNeeded, None, + ctypes.byref(devinfo) + ): + raise ctypes.WinError() + + # hardware ID + szHardwareID = ctypes.create_string_buffer(250) + if not SetupDiGetDeviceRegistryProperty( + g_hdi, + ctypes.byref(devinfo), + SPDRP_HARDWAREID, + None, + ctypes.byref(szHardwareID), ctypes.sizeof(szHardwareID) - 1, + None + ): + # Ignore ERROR_INSUFFICIENT_BUFFER + if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + raise ctypes.WinError() + + # friendly name + szFriendlyName = ctypes.create_string_buffer(1024) + if not SetupDiGetDeviceRegistryProperty( + g_hdi, + ctypes.byref(devinfo), + SPDRP_FRIENDLYNAME, + None, + ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1, + None + ): + # Ignore ERROR_INSUFFICIENT_BUFFER + if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: + #~ raise ctypes.WinError() + # not getting friendly name for com0com devices, try something else + szFriendlyName = ctypes.create_string_buffer(1024) + if SetupDiGetDeviceRegistryProperty( + g_hdi, + ctypes.byref(devinfo), + SPDRP_LOCATION_INFORMATION, + None, + ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1, + None + ): + port_name = "\\\\.\\" + szFriendlyName.value + order = None + else: + port_name = szFriendlyName.value + order = None + else: + try: + m = re.search(r"\((.*?(\d+))\)", szFriendlyName.value) + #~ print szFriendlyName.value, m.groups() + port_name = m.group(1) + order = int(m.group(2)) + except AttributeError, msg: + port_name = szFriendlyName.value + order = None + yield order, port_name, szFriendlyName.value, szHardwareID.value + + SetupDiDestroyDeviceInfoList(g_hdi) + + +if __name__ == '__main__': + import serial + print "-"*78 + print "Serial ports" + print "-"*78 + for order, port, desc, hwid in sorted(comports()): + print "%-10s: %s (%s) ->" % (port, desc, hwid), + try: + serial.Serial(port) # test open + except serial.serialutil.SerialException: + print "can't be openend" + else: + print "Ready" + print + # list of all ports the system knows + print "-"*78 + print "All serial ports (registry)" + print "-"*78 + for order, port, desc, hwid in sorted(comports(False)): + print "%-10s: %s (%s)" % (port, desc, hwid) diff --git a/examples/setup-miniterm-py2exe.py b/examples/setup-miniterm-py2exe.py new file mode 100644 index 0000000..e935cf0 --- /dev/null +++ b/examples/setup-miniterm-py2exe.py @@ -0,0 +1,26 @@ +# setup script for py2exe to create the miniterm.exe +# $Id: setup-miniterm-py2exe.py,v 1.1 2005-09-21 19:51:19 cliechti Exp $ + +from distutils.core import setup +import glob, sys, py2exe, os + +sys.path.append('..') + +sys.argv.extend("py2exe --bundle 1".split()) + +setup( + name='miniterm', + #~ version='0.5', + zipfile=None, + options = {"py2exe": + { + 'dist_dir': 'bin', + 'excludes': ['javax.comm'], + 'compressed': 1, + } + }, + console = [ + #~ "miniterm_exe_wrapper.py", + "miniterm.py", + ], +) diff --git a/examples/setup-rfc2217_server-py2exe.py b/examples/setup-rfc2217_server-py2exe.py new file mode 100644 index 0000000..020427e --- /dev/null +++ b/examples/setup-rfc2217_server-py2exe.py @@ -0,0 +1,24 @@ +# setup script for py2exe to create the miniterm.exe +# $Id$ + +from distutils.core import setup +import glob, sys, py2exe, os + +sys.path.append('..') + +sys.argv.extend("py2exe --bundle 1".split()) + +setup( + name='rfc2217_server', + zipfile=None, + options = {"py2exe": + { + 'dist_dir': 'bin', + 'excludes': ['javax.comm'], + 'compressed': 1, + } + }, + console = [ + "rfc2217_server.py", + ], +) diff --git a/examples/setup-wxTerminal-py2exe.py b/examples/setup-wxTerminal-py2exe.py new file mode 100644 index 0000000..21b9c94 --- /dev/null +++ b/examples/setup-wxTerminal-py2exe.py @@ -0,0 +1,35 @@ +# This is a setup.py example script for the use with py2exe +from distutils.core import setup +import py2exe +import sys, os + +#this script is only useful for py2exe so just run that distutils command. +#that allows to run it with a simple double click. +sys.argv.append('py2exe') + +#get an icon from somewhere.. the python installation should have one: +icon = os.path.join(os.path.dirname(sys.executable), 'py.ico') + +setup( + options = {'py2exe': { + 'excludes': ['javax.comm'], + 'optimize': 2, + 'dist_dir': 'dist', + } + }, + + name = "wxTerminal", + windows = [ + { + 'script': "wxTerminal.py", + 'icon_resources': [(0x0004, icon)] + }, + ], + zipfile = "stuff.lib", + + description = "Simple serial terminal application", + version = "0.1", + author = "Chris Liechti", + author_email = "cliechti@gmx.net", + url = "http://pyserial.sf.net", +) diff --git a/examples/tcp_serial_redirect.py b/examples/tcp_serial_redirect.py new file mode 100644 index 0000000..8900ca9 --- /dev/null +++ b/examples/tcp_serial_redirect.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python + +# (C) 2002-2009 Chris Liechti +# redirect data from a TCP/IP connection to a serial port and vice versa +# requires Python 2.2 'cause socket.sendall is used + + +import sys +import os +import time +import threading +import socket +import codecs +import serial +try: + True +except NameError: + True = 1 + False = 0 + +class Redirector: + def __init__(self, serial_instance, socket, ser_newline=None, net_newline=None, spy=False): + self.serial = serial_instance + self.socket = socket + self.ser_newline = ser_newline + self.net_newline = net_newline + self.spy = spy + self._write_lock = threading.Lock() + + def shortcut(self): + """connect the serial port to the TCP port by copying everything + from one side to the other""" + self.alive = True + self.thread_read = threading.Thread(target=self.reader) + self.thread_read.setDaemon(True) + self.thread_read.setName('serial->socket') + self.thread_read.start() + self.writer() + + def reader(self): + """loop forever and copy serial->socket""" + while self.alive: + try: + data = self.serial.read(1) # read one, blocking + n = self.serial.inWaiting() # look if there is more + if n: + data = data + self.serial.read(n) # and get as much as possible + if data: + # the spy shows what's on the serial port, so log it before converting newlines + if self.spy: + sys.stdout.write(codecs.escape_encode(data)[0]) + sys.stdout.flush() + if self.ser_newline and self.net_newline: + # do the newline conversion + # XXX fails for CR+LF in input when it is cut in half at the begin or end of the string + data = net_newline.join(data.split(ser_newline)) + # escape outgoing data when needed (Telnet IAC (0xff) character) + self._write_lock.acquire() + try: + self.socket.sendall(data) # send it over TCP + finally: + self._write_lock.release() + except socket.error, msg: + sys.stderr.write('ERROR: %s\n' % msg) + # probably got disconnected + break + self.alive = False + + def write(self, data): + """thread safe socket write with no data escaping. used to send telnet stuff""" + self._write_lock.acquire() + try: + self.socket.sendall(data) + finally: + self._write_lock.release() + + def writer(self): + """loop forever and copy socket->serial""" + while self.alive: + try: + data = self.socket.recv(1024) + if not data: + break + if self.ser_newline and self.net_newline: + # do the newline conversion + # XXX fails for CR+LF in input when it is cut in half at the begin or end of the string + data = ser_newline.join(data.split(net_newline)) + self.serial.write(data) # get a bunch of bytes and send them + # the spy shows what's on the serial port, so log it after converting newlines + if self.spy: + sys.stdout.write(codecs.escape_encode(data)[0]) + sys.stdout.flush() + except socket.error, msg: + sys.stderr.write('ERROR: %s\n' % msg) + # probably got disconnected + break + self.alive = False + self.thread_read.join() + + def stop(self): + """Stop copying""" + if self.alive: + self.alive = False + self.thread_read.join() + + +if __name__ == '__main__': + import optparse + + parser = optparse.OptionParser( + usage = "%prog [options] [port [baudrate]]", + description = "Simple Serial to Network (TCP/IP) redirector.", + epilog = """\ +NOTE: no security measures are implemented. Anyone can remotely connect +to this service over the network. + +Only one connection at once is supported. When the connection is terminated +it waits for the next connect. +""") + + parser.add_option("-q", "--quiet", + dest = "quiet", + action = "store_true", + help = "suppress non error messages", + default = False + ) + + parser.add_option("--spy", + dest = "spy", + action = "store_true", + help = "peek at the communication and print all data to the console", + default = False + ) + + group = optparse.OptionGroup(parser, + "Serial Port", + "Serial port settings" + ) + parser.add_option_group(group) + + group.add_option("-p", "--port", + dest = "port", + help = "port, a number (default 0) or a device name", + default = None + ) + + group.add_option("-b", "--baud", + dest = "baudrate", + action = "store", + type = 'int', + help = "set baud rate, default: %default", + default = 9600 + ) + + group.add_option("", "--parity", + dest = "parity", + action = "store", + help = "set parity, one of [N, E, O], default=%default", + default = 'N' + ) + + group.add_option("--rtscts", + dest = "rtscts", + action = "store_true", + help = "enable RTS/CTS flow control (default off)", + default = False + ) + + group.add_option("--xonxoff", + dest = "xonxoff", + action = "store_true", + help = "enable software flow control (default off)", + default = False + ) + + group.add_option("--rts", + dest = "rts_state", + action = "store", + type = 'int', + help = "set initial RTS line state (possible values: 0, 1)", + default = None + ) + + group.add_option("--dtr", + dest = "dtr_state", + action = "store", + type = 'int', + help = "set initial DTR line state (possible values: 0, 1)", + default = None + ) + + group = optparse.OptionGroup(parser, + "Network settings", + "Network configuration." + ) + parser.add_option_group(group) + + group.add_option("-P", "--localport", + dest = "local_port", + action = "store", + type = 'int', + help = "local TCP port", + default = 7777 + ) + + group = optparse.OptionGroup(parser, + "Newline Settings", + "Convert newlines between network and serial port. Conversion is normally disabled and can be enabled by --convert." + ) + parser.add_option_group(group) + + group.add_option("-c", "--convert", + dest = "convert", + action = "store_true", + help = "enable newline conversion (default off)", + default = False + ) + + group.add_option("--net-nl", + dest = "net_newline", + action = "store", + help = "type of newlines that are expected on the network (default: %default)", + default = "LF" + ) + + group.add_option("--ser-nl", + dest = "ser_newline", + action = "store", + help = "type of newlines that are expected on the serial port (default: %default)", + default = "CR+LF" + ) + + (options, args) = parser.parse_args() + + # get port and baud rate from command line arguments or the option switches + port = options.port + baudrate = options.baudrate + if args: + if options.port is not None: + parser.error("no arguments are allowed, options only when --port is given") + port = args.pop(0) + if args: + try: + baudrate = int(args[0]) + except ValueError: + parser.error("baud rate must be a number, not %r" % args[0]) + args.pop(0) + if args: + parser.error("too many arguments") + else: + if port is None: port = 0 + + # check newline modes for network connection + mode = options.net_newline.upper() + if mode == 'CR': + net_newline = '\r' + elif mode == 'LF': + net_newline = '\n' + elif mode == 'CR+LF' or mode == 'CRLF': + net_newline = '\r\n' + else: + parser.error("Invalid value for --net-nl. Valid are 'CR', 'LF' and 'CR+LF'/'CRLF'.") + + # check newline modes for serial connection + mode = options.ser_newline.upper() + if mode == 'CR': + ser_newline = '\r' + elif mode == 'LF': + ser_newline = '\n' + elif mode == 'CR+LF' or mode == 'CRLF': + ser_newline = '\r\n' + else: + parser.error("Invalid value for --ser-nl. Valid are 'CR', 'LF' and 'CR+LF'/'CRLF'.") + + # connect to serial port + ser = serial.Serial() + ser.port = port + ser.baudrate = baudrate + ser.parity = options.parity + ser.rtscts = options.rtscts + ser.xonxoff = options.xonxoff + ser.timeout = 1 # required so that the reader thread can exit + + if not options.quiet: + sys.stderr.write("--- TCP/IP to Serial redirector --- type Ctrl-C / BREAK to quit\n") + sys.stderr.write("--- %s %s,%s,%s,%s ---\n" % (ser.portstr, ser.baudrate, 8, ser.parity, 1)) + + try: + ser.open() + except serial.SerialException, e: + sys.stderr.write("Could not open serial port %s: %s\n" % (ser.portstr, e)) + sys.exit(1) + + if options.rts_state is not None: + ser.setRTS(options.rts_state) + + if options.dtr_state is not None: + ser.setDTR(options.dtr_state) + + srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + srv.bind( ('', options.local_port) ) + srv.listen(1) + while True: + try: + sys.stderr.write("Waiting for connection on %s...\n" % options.local_port) + connection, addr = srv.accept() + sys.stderr.write('Connected by %s\n' % (addr,)) + # enter network <-> serial loop + r = Redirector( + ser, + connection, + options.convert and ser_newline or None, + options.convert and net_newline or None, + options.spy, + ) + r.shortcut() + if options.spy: sys.stdout.write('\n') + sys.stderr.write('Disconnected\n') + connection.close() + except KeyboardInterrupt: + break + except socket.error, msg: + sys.stderr.write('ERROR: %s\n' % msg) + + sys.stderr.write('\n--- exit ---\n') + diff --git a/examples/wxSerialConfigDialog.py b/examples/wxSerialConfigDialog.py new file mode 100644 index 0000000..7085035 --- /dev/null +++ b/examples/wxSerialConfigDialog.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python +# generated by wxGlade 0.3.1 on Thu Oct 02 23:25:44 2003 + +#from wxPython.wx import * +import wx +import serial + +SHOW_BAUDRATE = 1<<0 +SHOW_FORMAT = 1<<1 +SHOW_FLOW = 1<<2 +SHOW_TIMEOUT = 1<<3 +SHOW_ALL = SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT + +try: + enumerate +except NameError: + def enumerate(sequence): + return zip(range(len(sequence)), sequence) + +class SerialConfigDialog(wx.Dialog): + """Serial Port confiuration dialog, to be used with pyserial 2.0+ + When instantiating a class of this dialog, then the "serial" keyword + argument is mandatory. It is a reference to a serial.Serial instance. + the optional "show" keyword argument can be used to show/hide different + settings. The default is SHOW_ALL which coresponds to + SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT. All constants can be + found in ths module (not the class).""" + + def __init__(self, *args, **kwds): + #grab the serial keyword and remove it from the dict + self.serial = kwds['serial'] + del kwds['serial'] + self.show = SHOW_ALL + if kwds.has_key('show'): + self.show = kwds['show'] + del kwds['show'] + # begin wxGlade: SerialConfigDialog.__init__ + # end wxGlade + kwds["style"] = wx.DEFAULT_DIALOG_STYLE + wx.Dialog.__init__(self, *args, **kwds) + self.label_2 = wx.StaticText(self, -1, "Port") + self.combo_box_port = wx.ComboBox(self, -1, choices=["dummy1", "dummy2", "dummy3", "dummy4", "dummy5"], style=wx.CB_DROPDOWN) + if self.show & SHOW_BAUDRATE: + self.label_1 = wx.StaticText(self, -1, "Baudrate") + self.choice_baudrate = wx.Choice(self, -1, choices=["choice 1"]) + if self.show & SHOW_FORMAT: + self.label_3 = wx.StaticText(self, -1, "Data Bits") + self.choice_databits = wx.Choice(self, -1, choices=["choice 1"]) + self.label_4 = wx.StaticText(self, -1, "Stop Bits") + self.choice_stopbits = wx.Choice(self, -1, choices=["choice 1"]) + self.label_5 = wx.StaticText(self, -1, "Parity") + self.choice_parity = wx.Choice(self, -1, choices=["choice 1"]) + if self.show & SHOW_TIMEOUT: + self.checkbox_timeout = wx.CheckBox(self, -1, "Use Timeout") + self.text_ctrl_timeout = wx.TextCtrl(self, -1, "") + self.label_6 = wx.StaticText(self, -1, "seconds") + if self.show & SHOW_FLOW: + self.checkbox_rtscts = wx.CheckBox(self, -1, "RTS/CTS") + self.checkbox_xonxoff = wx.CheckBox(self, -1, "Xon/Xoff") + self.button_ok = wx.Button(self, -1, "OK") + self.button_cancel = wx.Button(self, -1, "Cancel") + + self.__set_properties() + self.__do_layout() + #fill in ports and select current setting + index = 0 + self.combo_box_port.Clear() + for n in range(4): + portname = serial.device(n) + self.combo_box_port.Append(portname) + if self.serial.portstr == portname: + index = n + if self.serial.portstr is not None: + self.combo_box_port.SetValue(str(self.serial.portstr)) + else: + self.combo_box_port.SetSelection(index) + if self.show & SHOW_BAUDRATE: + #fill in badrates and select current setting + self.choice_baudrate.Clear() + for n, baudrate in enumerate(self.serial.BAUDRATES): + self.choice_baudrate.Append(str(baudrate)) + if self.serial.baudrate == baudrate: + index = n + self.choice_baudrate.SetSelection(index) + if self.show & SHOW_FORMAT: + #fill in databits and select current setting + self.choice_databits.Clear() + for n, bytesize in enumerate(self.serial.BYTESIZES): + self.choice_databits.Append(str(bytesize)) + if self.serial.bytesize == bytesize: + index = n + self.choice_databits.SetSelection(index) + #fill in stopbits and select current setting + self.choice_stopbits.Clear() + for n, stopbits in enumerate(self.serial.STOPBITS): + self.choice_stopbits.Append(str(stopbits)) + if self.serial.stopbits == stopbits: + index = n + self.choice_stopbits.SetSelection(index) + #fill in parities and select current setting + self.choice_parity.Clear() + for n, parity in enumerate(self.serial.PARITIES): + self.choice_parity.Append(str(serial.PARITY_NAMES[parity])) + if self.serial.parity == parity: + index = n + self.choice_parity.SetSelection(index) + if self.show & SHOW_TIMEOUT: + #set the timeout mode and value + if self.serial.timeout is None: + self.checkbox_timeout.SetValue(False) + self.text_ctrl_timeout.Enable(False) + else: + self.checkbox_timeout.SetValue(True) + self.text_ctrl_timeout.Enable(True) + self.text_ctrl_timeout.SetValue(str(self.serial.timeout)) + if self.show & SHOW_FLOW: + #set the rtscts mode + self.checkbox_rtscts.SetValue(self.serial.rtscts) + #set the rtscts mode + self.checkbox_xonxoff.SetValue(self.serial.xonxoff) + #attach the event handlers + self.__attach_events() + + def __set_properties(self): + # begin wxGlade: SerialConfigDialog.__set_properties + # end wxGlade + self.SetTitle("Serial Port Configuration") + if self.show & SHOW_TIMEOUT: + self.text_ctrl_timeout.Enable(0) + self.button_ok.SetDefault() + + def __do_layout(self): + # begin wxGlade: SerialConfigDialog.__do_layout + # end wxGlade + sizer_2 = wx.BoxSizer(wx.VERTICAL) + sizer_3 = wx.BoxSizer(wx.HORIZONTAL) + sizer_basics = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Basics"), wx.VERTICAL) + sizer_5 = wx.BoxSizer(wx.HORIZONTAL) + sizer_5.Add(self.label_2, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_5.Add(self.combo_box_port, 1, 0, 0) + sizer_basics.Add(sizer_5, 0, wx.RIGHT|wx.EXPAND, 0) + if self.show & SHOW_BAUDRATE: + sizer_baudrate = wx.BoxSizer(wx.HORIZONTAL) + sizer_baudrate.Add(self.label_1, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_baudrate.Add(self.choice_baudrate, 1, wx.ALIGN_RIGHT, 0) + sizer_basics.Add(sizer_baudrate, 0, wx.EXPAND, 0) + sizer_2.Add(sizer_basics, 0, wx.EXPAND, 0) + if self.show & SHOW_FORMAT: + sizer_8 = wx.BoxSizer(wx.HORIZONTAL) + sizer_7 = wx.BoxSizer(wx.HORIZONTAL) + sizer_6 = wx.BoxSizer(wx.HORIZONTAL) + sizer_format = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Data Format"), wx.VERTICAL) + sizer_6.Add(self.label_3, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_6.Add(self.choice_databits, 1, wx.ALIGN_RIGHT, 0) + sizer_format.Add(sizer_6, 0, wx.EXPAND, 0) + sizer_7.Add(self.label_4, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_7.Add(self.choice_stopbits, 1, wx.ALIGN_RIGHT, 0) + sizer_format.Add(sizer_7, 0, wx.EXPAND, 0) + sizer_8.Add(self.label_5, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_8.Add(self.choice_parity, 1, wx.ALIGN_RIGHT, 0) + sizer_format.Add(sizer_8, 0, wx.EXPAND, 0) + sizer_2.Add(sizer_format, 0, wx.EXPAND, 0) + if self.show & SHOW_TIMEOUT: + sizer_timeout = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Timeout"), wx.HORIZONTAL) + sizer_timeout.Add(self.checkbox_timeout, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_timeout.Add(self.text_ctrl_timeout, 0, 0, 0) + sizer_timeout.Add(self.label_6, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_2.Add(sizer_timeout, 0, 0, 0) + if self.show & SHOW_FLOW: + sizer_flow = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Flow Control"), wx.HORIZONTAL) + sizer_flow.Add(self.checkbox_rtscts, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_flow.Add(self.checkbox_xonxoff, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) + sizer_flow.Add((10,10), 1, wx.EXPAND, 0) + sizer_2.Add(sizer_flow, 0, wx.EXPAND, 0) + sizer_3.Add(self.button_ok, 0, 0, 0) + sizer_3.Add(self.button_cancel, 0, 0, 0) + sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4) + self.SetAutoLayout(1) + self.SetSizer(sizer_2) + sizer_2.Fit(self) + sizer_2.SetSizeHints(self) + self.Layout() + + def __attach_events(self): + wx.EVT_BUTTON(self, self.button_ok.GetId(), self.OnOK) + wx.EVT_BUTTON(self, self.button_cancel.GetId(), self.OnCancel) + if self.show & SHOW_TIMEOUT: + wx.EVT_CHECKBOX(self, self.checkbox_timeout.GetId(), self.OnTimeout) + + def OnOK(self, events): + success = True + self.serial.port = str(self.combo_box_port.GetValue()) + if self.show & SHOW_BAUDRATE: + self.serial.baudrate = self.serial.BAUDRATES[self.choice_baudrate.GetSelection()] + if self.show & SHOW_FORMAT: + self.serial.bytesize = self.serial.BYTESIZES[self.choice_databits.GetSelection()] + self.serial.stopbits = self.serial.STOPBITS[self.choice_stopbits.GetSelection()] + self.serial.parity = self.serial.PARITIES[self.choice_parity.GetSelection()] + if self.show & SHOW_FLOW: + self.serial.rtscts = self.checkbox_rtscts.GetValue() + self.serial.xonxoff = self.checkbox_xonxoff.GetValue() + if self.show & SHOW_TIMEOUT: + if self.checkbox_timeout.GetValue(): + try: + self.serial.timeout = float(self.text_ctrl_timeout.GetValue()) + except ValueError: + dlg = wx.MessageDialog(self, 'Timeout must be a numeric value', + 'Value Error', wx.OK | wx.ICON_ERROR) + dlg.ShowModal() + dlg.Destroy() + success = False + else: + self.serial.timeout = None + if success: + self.EndModal(wx.ID_OK) + + def OnCancel(self, events): + self.EndModal(wx.ID_CANCEL) + + def OnTimeout(self, events): + if self.checkbox_timeout.GetValue(): + self.text_ctrl_timeout.Enable(True) + else: + self.text_ctrl_timeout.Enable(False) + +# end of class SerialConfigDialog + + +class MyApp(wx.App): + """Test code""" + def OnInit(self): + wx.InitAllImageHandlers() + + ser = serial.Serial() + print ser + #loop until cancel is pressed, old values are used as start for the next run + #show the different views, one after the other + #value are kept. + for flags in (SHOW_BAUDRATE, SHOW_FLOW, SHOW_FORMAT, SHOW_TIMEOUT, SHOW_ALL): + dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser, show=flags) + self.SetTopWindow(dialog_serial_cfg) + result = dialog_serial_cfg.ShowModal() + print ser + if result != wx.ID_OK: + break + #the user can play around with the values, CANCEL aborts the loop + while 1: + dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser) + self.SetTopWindow(dialog_serial_cfg) + result = dialog_serial_cfg.ShowModal() + print ser + if result != wx.ID_OK: + break + return 0 + +# end of class MyApp + +if __name__ == "__main__": + app = MyApp(0) + app.MainLoop() diff --git a/examples/wxSerialConfigDialog.wxg b/examples/wxSerialConfigDialog.wxg new file mode 100644 index 0000000..f5e92e0 --- /dev/null +++ b/examples/wxSerialConfigDialog.wxg @@ -0,0 +1,262 @@ + + + + + + + Serial Port Configuration + + wxVERTICAL + + wxEXPAND + 0 + + + wxVERTICAL + + + wxRIGHT|wxEXPAND + 0 + + + wxHORIZONTAL + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + 1 + + + + + 0 + + + 0 + + dummy1 + dummy2 + dummy3 + dummy4 + dummy5 + + + + + + + wxEXPAND + 0 + + + wxHORIZONTAL + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + 1 + + + + + wxALIGN_RIGHT + 0 + + + 0 + + choice 1 + + + + + + + + + wxEXPAND + 0 + + + wxVERTICAL + + + wxEXPAND + 0 + + + wxHORIZONTAL + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + 1 + + + + + wxALIGN_RIGHT + 0 + + + 0 + + choice 1 + + + + + + + wxEXPAND + 0 + + + wxHORIZONTAL + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + 1 + + + + + wxALIGN_RIGHT + 0 + + + 0 + + choice 1 + + + + + + + wxEXPAND + 0 + + + wxHORIZONTAL + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + 1 + + + + + wxALIGN_RIGHT + 0 + + + 0 + + choice 1 + + + + + + + + + 0 + + + wxHORIZONTAL + + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + + + + + 0 + + + 1 + + + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + 1 + + + + + + + wxEXPAND + 0 + + + wxHORIZONTAL + + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + + + + + wxALL|wxALIGN_CENTER_VERTICAL + 4 + + + + + + + wxEXPAND + 0 + + + 10 + 10 + + + + + + wxALL|wxALIGN_RIGHT + 4 + + + wxHORIZONTAL + + 0 + + + 1 + + + + + 0 + + + + + + + + + + diff --git a/examples/wxTerminal.py b/examples/wxTerminal.py new file mode 100644 index 0000000..646c272 --- /dev/null +++ b/examples/wxTerminal.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python +# generated by wxGlade 0.3.1 on Fri Oct 03 23:23:45 2003 + +#from wxPython.wx import * +import wx +import wxSerialConfigDialog +import serial +import threading + +#---------------------------------------------------------------------- +# Create an own event type, so that GUI updates can be delegated +# this is required as on some platforms only the main thread can +# access the GUI without crashing. wxMutexGuiEnter/wxMutexGuiLeave +# could be used too, but an event is more elegant. + +SERIALRX = wx.NewEventType() +# bind to serial data receive events +EVT_SERIALRX = wx.PyEventBinder(SERIALRX, 0) + +class SerialRxEvent(wx.PyCommandEvent): + eventType = SERIALRX + def __init__(self, windowID, data): + wx.PyCommandEvent.__init__(self, self.eventType, windowID) + self.data = data + + def Clone(self): + self.__class__(self.GetId(), self.data) + +#---------------------------------------------------------------------- + +ID_CLEAR = wx.NewId() +ID_SAVEAS = wx.NewId() +ID_SETTINGS = wx.NewId() +ID_TERM = wx.NewId() +ID_EXIT = wx.NewId() + +NEWLINE_CR = 0 +NEWLINE_LF = 1 +NEWLINE_CRLF = 2 + +class TerminalSetup: + """Placeholder for various terminal settings. Used to pass the + options to the TerminalSettingsDialog.""" + def __init__(self): + self.echo = False + self.unprintable = False + self.newline = NEWLINE_CRLF + +class TerminalSettingsDialog(wx.Dialog): + """Simple dialog with common terminal settings like echo, newline mode.""" + + def __init__(self, *args, **kwds): + self.settings = kwds['settings'] + del kwds['settings'] + # begin wxGlade: TerminalSettingsDialog.__init__ + kwds["style"] = wx.DEFAULT_DIALOG_STYLE + wx.Dialog.__init__(self, *args, **kwds) + self.checkbox_echo = wx.CheckBox(self, -1, "Local Echo") + self.checkbox_unprintable = wx.CheckBox(self, -1, "Show unprintable characters") + self.radio_box_newline = wx.RadioBox(self, -1, "Newline Handling", choices=["CR only", "LF only", "CR+LF"], majorDimension=0, style=wx.RA_SPECIFY_ROWS) + self.button_ok = wx.Button(self, -1, "OK") + self.button_cancel = wx.Button(self, -1, "Cancel") + + self.__set_properties() + self.__do_layout() + # end wxGlade + self.__attach_events() + self.checkbox_echo.SetValue(self.settings.echo) + self.checkbox_unprintable.SetValue(self.settings.unprintable) + self.radio_box_newline.SetSelection(self.settings.newline) + + def __set_properties(self): + # begin wxGlade: TerminalSettingsDialog.__set_properties + self.SetTitle("Terminal Settings") + self.radio_box_newline.SetSelection(0) + self.button_ok.SetDefault() + # end wxGlade + + def __do_layout(self): + # begin wxGlade: TerminalSettingsDialog.__do_layout + sizer_2 = wx.BoxSizer(wx.VERTICAL) + sizer_3 = wx.BoxSizer(wx.HORIZONTAL) + sizer_4 = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Input/Output"), wx.VERTICAL) + sizer_4.Add(self.checkbox_echo, 0, wx.ALL, 4) + sizer_4.Add(self.checkbox_unprintable, 0, wx.ALL, 4) + sizer_4.Add(self.radio_box_newline, 0, 0, 0) + sizer_2.Add(sizer_4, 0, wx.EXPAND, 0) + sizer_3.Add(self.button_ok, 0, 0, 0) + sizer_3.Add(self.button_cancel, 0, 0, 0) + sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4) + self.SetAutoLayout(1) + self.SetSizer(sizer_2) + sizer_2.Fit(self) + sizer_2.SetSizeHints(self) + self.Layout() + # end wxGlade + + def __attach_events(self): + self.Bind(wx.EVT_BUTTON, self.OnOK, id = self.button_ok.GetId()) + self.Bind(wx.EVT_BUTTON, self.OnCancel, id = self.button_cancel.GetId()) + + def OnOK(self, events): + """Update data wil new values and close dialog.""" + self.settings.echo = self.checkbox_echo.GetValue() + self.settings.unprintable = self.checkbox_unprintable.GetValue() + self.settings.newline = self.radio_box_newline.GetSelection() + self.EndModal(wx.ID_OK) + + def OnCancel(self, events): + """Do not update data but close dialog.""" + self.EndModal(wx.ID_CANCEL) + +# end of class TerminalSettingsDialog + + +class TerminalFrame(wx.Frame): + """Simple terminal program for wxPython""" + + def __init__(self, *args, **kwds): + self.serial = serial.Serial() + self.serial.timeout = 0.5 #make sure that the alive event can be checked from time to time + self.settings = TerminalSetup() #placeholder for the settings + self.thread = None + self.alive = threading.Event() + # begin wxGlade: TerminalFrame.__init__ + kwds["style"] = wx.DEFAULT_FRAME_STYLE + wx.Frame.__init__(self, *args, **kwds) + self.text_ctrl_output = wx.TextCtrl(self, -1, "", style=wx.TE_MULTILINE|wx.TE_READONLY) + + # Menu Bar + self.frame_terminal_menubar = wx.MenuBar() + self.SetMenuBar(self.frame_terminal_menubar) + wxglade_tmp_menu = wx.Menu() + wxglade_tmp_menu.Append(ID_CLEAR, "&Clear", "", wx.ITEM_NORMAL) + wxglade_tmp_menu.Append(ID_SAVEAS, "&Save Text As...", "", wx.ITEM_NORMAL) + wxglade_tmp_menu.AppendSeparator() + wxglade_tmp_menu.Append(ID_SETTINGS, "&Port Settings...", "", wx.ITEM_NORMAL) + wxglade_tmp_menu.Append(ID_TERM, "&Terminal Settings...", "", wx.ITEM_NORMAL) + wxglade_tmp_menu.AppendSeparator() + wxglade_tmp_menu.Append(ID_EXIT, "&Exit", "", wx.ITEM_NORMAL) + self.frame_terminal_menubar.Append(wxglade_tmp_menu, "&File") + # Menu Bar end + + self.__set_properties() + self.__do_layout() + # end wxGlade + self.__attach_events() #register events + self.OnPortSettings(None) #call setup dialog on startup, opens port + if not self.alive.isSet(): + self.Close() + + def StartThread(self): + """Start the receiver thread""" + self.thread = threading.Thread(target=self.ComPortThread) + self.thread.setDaemon(1) + self.alive.set() + self.thread.start() + + def StopThread(self): + """Stop the receiver thread, wait util it's finished.""" + if self.thread is not None: + self.alive.clear() #clear alive event for thread + self.thread.join() #wait until thread has finished + self.thread = None + + def __set_properties(self): + # begin wxGlade: TerminalFrame.__set_properties + self.SetTitle("Serial Terminal") + self.SetSize((546, 383)) + # end wxGlade + + def __do_layout(self): + # begin wxGlade: TerminalFrame.__do_layout + sizer_1 = wx.BoxSizer(wx.VERTICAL) + sizer_1.Add(self.text_ctrl_output, 1, wx.EXPAND, 0) + self.SetAutoLayout(1) + self.SetSizer(sizer_1) + self.Layout() + # end wxGlade + + def __attach_events(self): + #register events at the controls + self.Bind(wx.EVT_MENU, self.OnClear, id = ID_CLEAR) + self.Bind(wx.EVT_MENU, self.OnSaveAs, id = ID_SAVEAS) + self.Bind(wx.EVT_MENU, self.OnExit, id = ID_EXIT) + self.Bind(wx.EVT_MENU, self.OnPortSettings, id = ID_SETTINGS) + self.Bind(wx.EVT_MENU, self.OnTermSettings, id = ID_TERM) + self.text_ctrl_output.Bind(wx.EVT_CHAR, self.OnKey) + self.Bind(EVT_SERIALRX, self.OnSerialRead) + self.Bind(wx.EVT_CLOSE, self.OnClose) + + def OnExit(self, event): + """Menu point Exit""" + self.Close() + + def OnClose(self, event): + """Called on application shutdown.""" + self.StopThread() #stop reader thread + self.serial.close() #cleanup + self.Destroy() #close windows, exit app + + def OnSaveAs(self, event): + """Save contents of output window.""" + filename = None + dlg = wx.FileDialog(None, "Save Text As...", ".", "", "Text File|*.txt|All Files|*", wx.SAVE) + if dlg.ShowModal() == wx.ID_OK: + filename = dlg.GetPath() + dlg.Destroy() + + if filename is not None: + f = file(filename, 'w') + text = self.text_ctrl_output.GetValue() + if type(text) == unicode: + text = text.encode("latin1") #hm, is that a good asumption? + f.write(text) + f.close() + + def OnClear(self, event): + """Clear contents of output window.""" + self.text_ctrl_output.Clear() + + def OnPortSettings(self, event=None): + """Show the portsettings dialog. The reader thread is stopped for the + settings change.""" + if event is not None: #will be none when called on startup + self.StopThread() + self.serial.close() + ok = False + while not ok: + dialog_serial_cfg = wxSerialConfigDialog.SerialConfigDialog(None, -1, "", + show=wxSerialConfigDialog.SHOW_BAUDRATE|wxSerialConfigDialog.SHOW_FORMAT|wxSerialConfigDialog.SHOW_FLOW, + serial=self.serial + ) + result = dialog_serial_cfg.ShowModal() + dialog_serial_cfg.Destroy() + #open port if not called on startup, open it on startup and OK too + if result == wx.ID_OK or event is not None: + try: + self.serial.open() + except serial.SerialException, e: + dlg = wx.MessageDialog(None, str(e), "Serial Port Error", wx.OK | wx.ICON_ERROR) + dlg.ShowModal() + dlg.Destroy() + else: + self.StartThread() + self.SetTitle("Serial Terminal on %s [%s, %s%s%s%s%s]" % ( + self.serial.portstr, + self.serial.baudrate, + self.serial.bytesize, + self.serial.parity, + self.serial.stopbits, + self.serial.rtscts and ' RTS/CTS' or '', + self.serial.xonxoff and ' Xon/Xoff' or '', + ) + ) + ok = True + else: + #on startup, dialog aborted + self.alive.clear() + ok = True + + def OnTermSettings(self, event): + """Menu point Terminal Settings. Show the settings dialog + with the current terminal settings""" + dialog = TerminalSettingsDialog(None, -1, "", settings=self.settings) + result = dialog.ShowModal() + dialog.Destroy() + + def OnKey(self, event): + """Key event handler. if the key is in the ASCII range, write it to the serial port. + Newline handling and local echo is also done here.""" + code = event.GetKeyCode() + if code < 256: #is it printable? + if code == 13: #is it a newline? (check for CR which is the RETURN key) + if self.settings.echo: #do echo if needed + self.text_ctrl_output.AppendText('\n') + if self.settings.newline == NEWLINE_CR: + self.serial.write('\r') #send CR + elif self.settings.newline == NEWLINE_LF: + self.serial.write('\n') #send LF + elif self.settings.newline == NEWLINE_CRLF: + self.serial.write('\r\n') #send CR+LF + else: + char = chr(code) + if self.settings.echo: #do echo if needed + self.text_ctrl_output.WriteText(char) + self.serial.write(char) #send the charcater + else: + print "Extra Key:", code + + def OnSerialRead(self, event): + """Handle input from the serial port.""" + text = event.data + if self.settings.unprintable: + text = ''.join([(c >= ' ') and c or '<%d>' % ord(c) for c in text]) + self.text_ctrl_output.AppendText(text) + + def ComPortThread(self): + """Thread that handles the incomming traffic. Does the basic input + transformation (newlines) and generates an SerialRxEvent""" + while self.alive.isSet(): #loop while alive event is true + text = self.serial.read(1) #read one, with timout + if text: #check if not timeout + n = self.serial.inWaiting() #look if there is more to read + if n: + text = text + self.serial.read(n) #get it + #newline transformation + if self.settings.newline == NEWLINE_CR: + text = text.replace('\r', '\n') + elif self.settings.newline == NEWLINE_LF: + pass + elif self.settings.newline == NEWLINE_CRLF: + text = text.replace('\r\n', '\n') + event = SerialRxEvent(self.GetId(), text) + self.GetEventHandler().AddPendingEvent(event) + #~ self.OnSerialRead(text) #output text in window + +# end of class TerminalFrame + + +class MyApp(wx.App): + def OnInit(self): + wx.InitAllImageHandlers() + frame_terminal = TerminalFrame(None, -1, "") + self.SetTopWindow(frame_terminal) + frame_terminal.Show(1) + return 1 + +# end of class MyApp + +if __name__ == "__main__": + app = MyApp(0) + app.MainLoop() diff --git a/examples/wxTerminal.wxg b/examples/wxTerminal.wxg new file mode 100644 index 0000000..183f876 --- /dev/null +++ b/examples/wxTerminal.wxg @@ -0,0 +1,127 @@ + + + + + + + Serial Terminal + 1 + 546, 383 + + wxVERTICAL + + wxEXPAND + 0 + + + + + + + + + + + + ID_CLEAR + + + + ID_SAVEAS + + + + --- + --- + + + + ID_SETTINGS + + + + ID_TERM + + + + --- + + + + ID_EXIT + + + + + + + + Terminal Settings + + wxVERTICAL + + wxEXPAND + 0 + + + wxVERTICAL + + + wxALL + 4 + + + + + + + wxALL + 4 + + + + + + + 0 + + + + 0 + 0 + + + CR only + LF only + CR+LF + + + + + + + wxALL|wxALIGN_RIGHT + 4 + + + wxHORIZONTAL + + 0 + + + 1 + + + + + 0 + + + + + + + + + + -- cgit v1.2.1