diff options
Diffstat (limited to 'pyserial/examples')
-rw-r--r-- | pyserial/examples/enhancedserial.py | 62 | ||||
-rw-r--r-- | pyserial/examples/miniterm.py | 562 | ||||
-rw-r--r-- | pyserial/examples/port_publisher.py | 451 | ||||
-rw-r--r-- | pyserial/examples/port_publisher.sh | 44 | ||||
-rw-r--r-- | pyserial/examples/scan.py | 30 | ||||
-rw-r--r-- | pyserial/examples/scanlinux.py | 20 | ||||
-rw-r--r-- | pyserial/examples/scanwin32.py | 200 | ||||
-rw-r--r-- | pyserial/examples/setup-miniterm-py2exe.py | 26 | ||||
-rw-r--r-- | pyserial/examples/setup_demo.py | 35 | ||||
-rw-r--r-- | pyserial/examples/tcp_serial_redirect.py | 305 | ||||
-rw-r--r-- | pyserial/examples/test.py | 216 | ||||
-rw-r--r-- | pyserial/examples/test_advanced.py | 173 | ||||
-rw-r--r-- | pyserial/examples/test_high_load.py | 76 | ||||
-rw-r--r-- | pyserial/examples/test_iolib.py | 78 | ||||
-rw-r--r-- | pyserial/examples/wxSerialConfigDialog.py | 260 | ||||
-rw-r--r-- | pyserial/examples/wxSerialConfigDialog.wxg | 262 | ||||
-rw-r--r-- | pyserial/examples/wxTerminal.py | 333 | ||||
-rw-r--r-- | pyserial/examples/wxTerminal.wxg | 127 |
18 files changed, 0 insertions, 3260 deletions
diff --git a/pyserial/examples/enhancedserial.py b/pyserial/examples/enhancedserial.py deleted file mode 100644 index 2c81ae1..0000000 --- a/pyserial/examples/enhancedserial.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python -"""Enhanced Serial Port class -part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net - -another implementation of the readline and readlines method. -this one should be more efficient because a bunch of characters are read -on each access, but the drawback is that a timeout must be specified to -make it work (enforced by the class __init__). - -this class could be enhanced with a read_until() method and more -like found in the telnetlib. -""" - -from serial import Serial - -class EnhancedSerial(Serial): - def __init__(self, *args, **kwargs): - #ensure that a reasonable timeout is set - timeout = kwargs.get('timeout',0.1) - if timeout < 0.01: timeout = 0.1 - kwargs['timeout'] = timeout - Serial.__init__(self, *args, **kwargs) - self.buf = '' - - def readline(self, maxsize=None, timeout=1): - """maxsize is ignored, timeout in seconds is the max time that is way for a complete line""" - tries = 0 - while 1: - self.buf += self.read(512) - pos = self.buf.find('\n') - if pos >= 0: - line, self.buf = self.buf[:pos+1], self.buf[pos+1:] - return line - tries += 1 - if tries * self.timeout > timeout: - break - line, self.buf = self.buf, '' - return line - - def readlines(self, sizehint=None, timeout=1): - """read all lines that are available. abort after timout - when no more data arrives.""" - lines = [] - while 1: - line = self.readline(timeout=timeout) - if line: - lines.append(line) - if not line or line[-1:] != '\n': - break - return lines - -if __name__=='__main__': - #do some simple tests with a Loopback HW (see test.py for details) - PORT = 0 - #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) ) - s = EnhancedSerial(PORT) - #write out some test data lines - s.write('\n'.join("hello how are you".split())) - #and read them back - print s.readlines() - #this one should print an empty list - print s.readlines(timeout=0.4) diff --git a/pyserial/examples/miniterm.py b/pyserial/examples/miniterm.py deleted file mode 100644 index 48d07a7..0000000 --- a/pyserial/examples/miniterm.py +++ /dev/null @@ -1,562 +0,0 @@ -#!/usr/bin/env python - -# Very simple serial terminal -# (C)2002-2009 Chris Liechti <cliechti@gmx.net> - -# Input characters are sent directly (only LF -> CR/LF/CRLF translation is -# done), received characters are displayed as is (or escaped trough pythons -# repr, useful for debug purposes) - - -import sys, os, serial, threading - -EXITCHARCTER = '\x1d' # GS/CTRL+] -MENUCHARACTER = '\x14' # Menu: CTRL+T - - -def key_description(character): - """generate a readable description for a key""" - ascii_code = ord(character) - if ascii_code < 32: - return 'Ctrl+%c' % (ord('@') + ascii_code) - else: - return repr(character) - -# help text, starts with blank line! it's a function so that the current values -# for the shortcut keys is used and not the value at program start -def get_help_text(): - return """ ---- pySerial - miniterm - help ---- ---- %(exit)-8s Exit program ---- %(menu)-8s Menu escape key, followed by: ---- Menu keys: ---- %(itself)-8s Send the menu character itself to remote ---- %(exchar)-8s Send the exit character to remote ---- %(info)-8s Show info ---- %(upload)-8s Upload file (prompt will be shown) ---- Toggles: ---- %(rts)s RTS %(echo)s local echo ---- %(dtr)s DTR %(break)s BREAK ---- %(lfm)s line feed %(repr)s Cycle repr mode ---- ---- Port settings (%(menu)s followed by the following): ---- 7 8 set data bits ---- n e o s m change parity (None, Even, Odd, Space, Mark) ---- 1 2 3 set stop bits (1, 2, 1.5) ---- b change baud rate ---- x X disable/enable software flow control ---- r R disable/enable hardware flow control -""" % { - 'exit': key_description(EXITCHARCTER), - 'menu': key_description(MENUCHARACTER), - 'rts': key_description('\x12'), - 'repr': key_description('\x01'), - 'dtr': key_description('\x04'), - 'lfm': key_description('\x0c'), - 'break': key_description('\x02'), - 'echo': key_description('\x05'), - 'info': key_description('\x09'), - 'upload': key_description('\x15'), - 'itself': key_description(MENUCHARACTER), - 'exchar': key_description(EXITCHARCTER), -} - -# first choose a platform dependant way to read single characters from the console -global console - -if os.name == 'nt': - import msvcrt - class Console: - def __init__(self): - pass - - def setup(self): - pass # Do nothing for 'nt' - - def cleanup(self): - pass # Do nothing for 'nt' - - def getkey(self): - while 1: - z = msvcrt.getch() - if z == '\0' or z == '\xe0': #functions keys - msvcrt.getch() - else: - if z == '\r': - return '\n' - return z - - console = Console() - -elif os.name == 'posix': - import termios, sys, os - class Console: - def __init__(self): - self.fd = sys.stdin.fileno() - - def setup(self): - self.old = termios.tcgetattr(self.fd) - new = termios.tcgetattr(self.fd) - new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG - new[6][termios.VMIN] = 1 - new[6][termios.VTIME] = 0 - termios.tcsetattr(self.fd, termios.TCSANOW, new) - #s = '' # We'll save the characters typed and add them to the pool. - - def getkey(self): - c = os.read(self.fd, 1) - return c - - def cleanup(self): - termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) - - console = Console() - - def cleanup_console(): - console.cleanup() - - console.setup() - sys.exitfunc = cleanup_console #terminal modes have to be restored on exit... - -else: - raise "Sorry no implementation for your platform (%s) available." % sys.platform - - -CONVERT_CRLF = 2 -CONVERT_CR = 1 -CONVERT_LF = 0 -NEWLINE_CONVERISON_MAP = ('\n', '\r', '\r\n') -LF_MODES = ('LF', 'CR', 'CR/LF') - -REPR_MODES = ('raw', 'some control', 'all control', 'hex') - -class Miniterm: - def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False, convert_outgoing=CONVERT_CRLF, repr_mode=0): - self.serial = serial.Serial(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=0.7) - self.echo = echo - self.repr_mode = repr_mode - self.convert_outgoing = convert_outgoing - self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing] - self.dtr_state = True - self.rts_state = True - self.break_state = False - - def start(self): - self.alive = True - # start serial->console thread - self.receiver_thread = threading.Thread(target=self.reader) - self.receiver_thread.setDaemon(1) - self.receiver_thread.start() - # enter console->serial loop - self.transmitter_thread = threading.Thread(target=self.writer) - self.transmitter_thread.setDaemon(1) - self.transmitter_thread.start() - - def stop(self): - self.alive = False - - def join(self, transmit_only=False): - self.transmitter_thread.join() - if not transmit_only: - self.receiver_thread.join() - - def dump_port_settings(self): - sys.stderr.write("\n--- Settings: %s %s,%s,%s,%s\n" % ( - self.serial.portstr, - self.serial.baudrate, - self.serial.bytesize, - self.serial.parity, - self.serial.stopbits, - )) - sys.stderr.write('--- RTS %s\n' % (self.rts_state and 'active' or 'inactive')) - sys.stderr.write('--- DTR %s\n' % (self.dtr_state and 'active' or 'inactive')) - sys.stderr.write('--- BREAK %s\n' % (self.break_state and 'active' or 'inactive')) - sys.stderr.write('--- software flow control %s\n' % (self.serial.xonxoff and 'active' or 'inactive')) - sys.stderr.write('--- hardware flow control %s\n' % (self.serial.rtscts and 'active' or 'inactive')) - sys.stderr.write('--- data escaping: %s\n' % (REPR_MODES[self.repr_mode],)) - sys.stderr.write('--- linefeed: %s\n' % (LF_MODES[self.convert_outgoing],)) - - def reader(self): - """loop and copy serial->console""" - while self.alive: - data = self.serial.read(1) - - if self.repr_mode == 0: - # direct output, just have to care about newline setting - if data == '\r' and self.convert_outgoing == CONVERT_CR: - sys.stdout.write('\n') - else: - sys.stdout.write(data) - elif self.repr_mode == 1: - # escape non-printable, let pass newlines - if self.convert_outgoing == CONVERT_CRLF and data in '\r\n': - if data == '\n': - sys.stdout.write('\n') - elif data == '\r': - pass - elif data == '\n' and self.convert_outgoing == CONVERT_LF: - sys.stdout.write('\n') - elif data == '\r' and self.convert_outgoing == CONVERT_CR: - sys.stdout.write('\n') - else: - sys.stdout.write(repr(data)[1:-1]) - elif self.repr_mode == 2: - # escape all non-printable, including newline - sys.stdout.write(repr(data)[1:-1]) - elif self.repr_mode == 3: - # escape everything (hexdump) - for character in data: - sys.stdout.write("%s " % character.encode('hex')) - sys.stdout.flush() - - - def writer(self): - """loop and copy console->serial until EXITCHARCTER character is - found. when MENUCHARACTER is found, interpret the next key - locally. - """ - menu_active = False - try: - while self.alive: - try: - c = console.getkey() - except KeyboardInterrupt: - c = '\x03' - if menu_active: - if c == MENUCHARACTER or c == EXITCHARCTER: # Menu character again/exit char -> send itself - self.serial.write(c) # send character - if self.echo: - sys.stdout.write(c) - elif c == '\x15': # CTRL+U -> upload file - sys.stderr.write('\n--- File to upload: ') - sys.stderr.flush() - console.cleanup() - filename = sys.stdin.readline().rstrip('\r\n') - if filename: - try: - file = open(filename, 'r') - sys.stderr.write('--- Sending file %s ---\n' % filename) - while True: - line = file.readline().rstrip('\r\n') - if not line: - break - self.serial.write(line) - self.serial.write('\r\n') - # Wait for output buffer to drain. - self.serial.flush() - sys.stderr.write('.') # Progress indicator. - sys.stderr.write('\n--- File %s sent ---\n' % filename) - except IOError, e: - sys.stderr.write('--- ERROR opening file %s: %s ---\n' % (filename, e)) - console.setup() - elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help - sys.stderr.write(get_help_text()) - elif c == '\x12': # CTRL+R -> Toggle RTS - self.rts_state = not self.rts_state - self.serial.setRTS(self.rts_state) - sys.stderr.write('--- RTS %s ---\n' % (self.rts_state and 'active' or 'inactive')) - elif c == '\x04': # CTRL+D -> Toggle DTR - self.dtr_state = not self.dtr_state - self.serial.setDTR(self.dtr_state) - sys.stderr.write('--- DTR %s ---\n' % (self.dtr_state and 'active' or 'inactive')) - elif c == '\x02': # CTRL+B -> toggle BREAK condition - self.break_state = not self.break_state - self.serial.setBreak(self.break_state) - sys.stderr.write('--- BREAK %s ---\n' % (self.break_state and 'active' or 'inactive')) - elif c == '\x05': # CTRL+E -> toggle local echo - self.echo = not self.echo - sys.stderr.write('--- local echo %s ---\n' % (self.echo and 'active' or 'inactive')) - elif c == '\x09': # CTRL+I -> info - self.dump_port_settings() - elif c == '\x01': # CTRL+A -> cycle escape mode - self.repr_mode += 1 - if self.repr_mode > 3: - self.repr_mode = 0 - sys.stderr.write('--- escape data: %s ---\n' % ( - REPR_MODES[self.repr_mode], - )) - elif c == '\x0c': # CTRL+L -> cycle linefeed mode - self.convert_outgoing += 1 - if self.convert_outgoing > 2: - self.convert_outgoing = 0 - self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing] - sys.stderr.write('--- line feed %s ---\n' % ( - LF_MODES[self.convert_outgoing], - )) - #~ elif c in 'pP': # P -> change port XXX reader thread would exit - elif c in 'bB': # B -> change baudrate - sys.stderr.write('\n--- Baudrate: ') - sys.stderr.flush() - console.cleanup() - backup = self.serial.baudrate - try: - self.serial.baudrate = int(sys.stdin.readline().strip()) - except ValueError, e: - sys.stderr.write('--- ERROR setting baudrate: %s ---\n' % (e,)) - self.serial.baudrate = backup - else: - self.dump_port_settings() - console.setup() - elif c == '8': # 8 -> change to 8 bits - self.serial.bytesize = serial.EIGHTBITS - self.dump_port_settings() - elif c == '7': # 7 -> change to 8 bits - self.serial.bytesize = serial.SEVENBITS - self.dump_port_settings() - elif c in 'eE': # E -> change to even parity - self.serial.parity = serial.PARITY_EVEN - self.dump_port_settings() - elif c in 'oO': # O -> change to odd parity - self.serial.parity = serial.PARITY_ODD - self.dump_port_settings() - elif c in 'mM': # M -> change to mark parity - self.serial.parity = serial.PARITY_MARK - self.dump_port_settings() - elif c in 'sS': # S -> change to space parity - self.serial.parity = serial.PARITY_SPACE - self.dump_port_settings() - elif c in 'nN': # N -> change to no parity - self.serial.parity = serial.PARITY_NONE - self.dump_port_settings() - elif c == '1': # 1 -> change to 1 stop bits - self.serial.stopbits = serial.STOPBITS_ONE - self.dump_port_settings() - elif c == '2': # 2 -> change to 2 stop bits - self.serial.stopbits = serial.STOPBITS_TWO - self.dump_port_settings() - elif c == '3': # 3 -> change to 1.5 stop bits - self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE - self.dump_port_settings() - elif c in 'xX': # X -> change software flow control - self.serial.xonxoff = (c == 'X') - self.dump_port_settings() - elif c in 'rR': # R -> change hardware flow control - self.serial.rtscts = (c == 'R') - self.dump_port_settings() - else: - sys.stderr.write('--- unknown menu character %s --\n' % key_description(c)) - menu_active = False - elif c == MENUCHARACTER: # next char will be for menu - menu_active = True - elif c == EXITCHARCTER: - self.stop() - break # exit app - elif c == '\n': - self.serial.write(self.newline) # send newline character(s) - if self.echo: - sys.stdout.write(c) # local echo is a real newline in any case - sys.stdout.flush() - else: - self.serial.write(c) # send character - if self.echo: - sys.stdout.write(c) - sys.stdout.flush() - except: - self.alive = False - raise - -def main(): - import optparse - - parser = optparse.OptionParser( - usage = "%prog [options] [port [baudrate]]", - description = "Miniterm - A simple terminal program for the serial port." - ) - - parser.add_option("-p", "--port", - dest = "port", - help = "port, a number (default 0) or a device name (deprecated option)", - default = None - ) - - parser.add_option("-b", "--baud", - dest = "baudrate", - action = "store", - type = 'int', - help = "set baud rate, default %default", - default = 9600 - ) - - parser.add_option("--parity", - dest = "parity", - action = "store", - help = "set parity, one of [N, E, O, S, M], default=N", - default = 'N' - ) - - parser.add_option("-e", "--echo", - dest = "echo", - action = "store_true", - help = "enable local echo (default off)", - default = False - ) - - parser.add_option("--rtscts", - dest = "rtscts", - action = "store_true", - help = "enable RTS/CTS flow control (default off)", - default = False - ) - - parser.add_option("--xonxoff", - dest = "xonxoff", - action = "store_true", - help = "enable software flow control (default off)", - default = False - ) - - parser.add_option("--cr", - dest = "cr", - action = "store_true", - help = "do not send CR+LF, send CR only", - default = False - ) - - parser.add_option("--lf", - dest = "lf", - action = "store_true", - help = "do not send CR+LF, send LF only", - default = False - ) - - parser.add_option("-D", "--debug", - dest = "repr_mode", - action = "count", - help = """debug received data (escape non-printable chars) ---debug can be given multiple times: -0: just print what is received -1: escape non-printable characters, do newlines as unusual -2: escape non-printable characters, newlines too -3: hex dump everything""", - default = 0 - ) - - parser.add_option("--rts", - dest = "rts_state", - action = "store", - type = 'int', - help = "set initial RTS line state (possible values: 0, 1)", - default = None - ) - - parser.add_option("--dtr", - dest = "dtr_state", - action = "store", - type = 'int', - help = "set initial DTR line state (possible values: 0, 1)", - default = None - ) - - parser.add_option("-q", "--quiet", - dest = "quiet", - action = "store_true", - help = "suppress non error messages", - default = False - ) - - parser.add_option("--exit-char", - dest = "exit_char", - action = "store", - type = 'int', - help = "ASCII code of special character that is used to exit the application", - default = 0x1d - ) - - parser.add_option("--menu-char", - dest = "menu_char", - action = "store", - type = 'int', - help = "ASCII code of special character that is used to control miniterm (menu)", - default = 0x14 - ) - - (options, args) = parser.parse_args() - - options.parity = options.parity.upper() - if options.parity not in 'NEOSM': - parser.error("invalid parity") - - if options.cr and options.lf: - parser.error("only one of --cr or --lf can be specified") - - if options.dtr_state is not None and options.rts_state is not None and options.dtr_state == options.rts_state: - parser.error('--exit-char can not be the same as --menu-char') - - global EXITCHARCTER, MENUCHARACTER - EXITCHARCTER = chr(options.exit_char) - MENUCHARACTER = chr(options.menu_char) - - port = options.port - baudrate = options.baudrate - if args: - if options.port is not None: - parser.error("no arguments are allowed, options only when --port is given") - port = args.pop(0) - if args: - try: - baudrate = int(args[0]) - except ValueError: - parser.error("baud rate must be a number, not %r" % args[0]) - args.pop(0) - if args: - parser.error("too many arguments") - else: - if port is None: port = 0 - - convert_outgoing = CONVERT_CRLF - if options.cr: - convert_outgoing = CONVERT_CR - elif options.lf: - convert_outgoing = CONVERT_LF - - try: - miniterm = Miniterm( - port, - baudrate, - options.parity, - rtscts=options.rtscts, - xonxoff=options.xonxoff, - echo=options.echo, - convert_outgoing=convert_outgoing, - repr_mode=options.repr_mode, - ) - except serial.SerialException: - sys.stderr.write("could not open port %r\n" % port) - sys.exit(1) - - if not options.quiet: - sys.stderr.write('--- Miniterm on %s: %d,%s,%s,%s ---\n' % ( - miniterm.serial.portstr, - miniterm.serial.baudrate, - miniterm.serial.bytesize, - miniterm.serial.parity, - miniterm.serial.stopbits, - )) - sys.stderr.write('--- Quit: %s | Menu: %s | Help: %s followed by %s ---\n' % ( - key_description(EXITCHARCTER), - key_description(MENUCHARACTER), - key_description(MENUCHARACTER), - key_description('\x08'), - )) - - if options.dtr_state is not None: - if not options.quiet: - sys.stderr.write('--- forcing DTR %s\n' % (options.dtr_state and 'active' or 'inactive')) - miniterm.serial.setDTR(options.dtr_state) - miniterm.dtr_state = options.dtr_state - if options.rts_state is not None: - if not options.quiet: - sys.stderr.write('--- forcing RTS %s\n' % (options.rts_state and 'active' or 'inactive')) - miniterm.serial.setRTS(options.rts_state) - miniterm.rts_state = options.rts_state - - miniterm.start() - miniterm.join(True) - if not options.quiet: - sys.stderr.write("\n--- exit ---\n") - miniterm.join() - - -if __name__ == '__main__': - main() diff --git a/pyserial/examples/port_publisher.py b/pyserial/examples/port_publisher.py deleted file mode 100644 index e695a98..0000000 --- a/pyserial/examples/port_publisher.py +++ /dev/null @@ -1,451 +0,0 @@ -#! /usr/bin/env python
-"""\
-Multi-port serial<->TCP/IP forwarder.
-- check existence of serial port periodically
-- start/stop forwarders
-- each forwarder creates a server socket and opens the serial port
-- serial ports are opened only once. network connect/disconnect
- does not influence serial port
-- only one client per connection
-"""
-import sys, os, time
-import socket
-import select
-import serial
-import traceback
-import avahi
-import dbus
-
-class ZeroconfService:
- """\
- A simple class to publish a network service with zeroconf using avahi.
- """
-
- def __init__(self, name, port, stype="_http._tcp",
- domain="", host="", text=""):
- self.name = name
- self.stype = stype
- self.domain = domain
- self.host = host
- self.port = port
- self.text = text
- self.group = None
-
- def publish(self):
- bus = dbus.SystemBus()
- server = dbus.Interface(
- bus.get_object(
- avahi.DBUS_NAME,
- avahi.DBUS_PATH_SERVER
- ),
- avahi.DBUS_INTERFACE_SERVER
- )
-
- g = dbus.Interface(
- bus.get_object(
- avahi.DBUS_NAME,
- server.EntryGroupNew()
- ),
- avahi.DBUS_INTERFACE_ENTRY_GROUP
- )
-
- g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
- self.name, self.stype, self.domain, self.host,
- dbus.UInt16(self.port), self.text)
-
- g.Commit()
- self.group = g
-
- def unpublish(self):
- if self.group is not None:
- self.group.Reset()
- self.group = None
-
- def __str__(self):
- return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype)
-
-
-
-class Forwarder(ZeroconfService):
- """\
- Single port serial<->TCP/IP forarder that depends on an external select
- loop. Zeroconf publish/unpublish on open/close.
- """
-
- def __init__(self, device, name, network_port, on_close=None):
- ZeroconfService.__init__(self, name, network_port, stype='_serial_port._tcp')
- self.alive = False
- self.network_port = network_port
- self.on_close = on_close
- self.device = device
- self.serial = serial.Serial()
- self.serial.port = device
- self.serial.baudrate = 115200
- self.serial.timeout = 0
- self.socket = None
- self.server_socket = None
-
- def __del__(self):
- try:
- if self.alive: self.close()
- except:
- pass # XXX errors on shutdown
-
- def open(self):
- """open serial port, start network server and publish service"""
- self.buffer_net2ser = ''
- self.buffer_ser2net = ''
-
- # open serial port
- try:
- self.serial.open()
- self.serial.setRTS(False)
- except Exception, msg:
- self.handle_serial_error(msg)
-
- # start the socket server
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR,
- self.server_socket.getsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR
- ) | 1
- )
- self.server_socket.setblocking(0)
- try:
- self.server_socket.bind( ('', self.network_port) )
- self.server_socket.listen(1)
- except socket.error, msg:
- self.handle_server_error()
- #~ raise
- if not options.quiet:
- print "%s: Waiting for connection on %s..." % (self.device, self.network_port)
-
- # zeroconfig
- self.publish()
-
- # now we are ready
- self.alive = True
-
- def close(self):
- """Close all resources and unpublish service"""
- if not options.quiet:
- print "%s: closing..." % (self.device, )
- self.alive = False
- self.unpublish()
- if self.server_socket: self.server_socket.close()
- if self.socket:
- self.handle_disconnect()
- self.serial.close()
- if self.on_close is not None:
- # ensure it is only called once
- callback = self.on_close
- self.on_close = None
- callback(self)
-
- def update_select_maps(self, read_map, write_map, error_map):
- """Update dictionaries for select call. insert fd->callback mapping"""
- if self.alive:
- # always handle serial port reads
- read_map[self.serial] = self.handle_serial_read
- error_map[self.serial] = self.handle_serial_error
- # handle serial port writes if buffer is not empty
- if self.buffer_net2ser:
- write_map[self.serial] = self.handle_serial_write
- # handle network
- if self.socket is not None:
- # handle socket if connected
- # only read from network if the internal buffer is not
- # already filled. the TCP flow control will hold back data
- if len(self.buffer_net2ser) < 2048:
- read_map[self.socket] = self.handle_socket_read
- # only check for write readiness when there is data
- if self.buffer_ser2net:
- write_map[self.socket] = self.handle_socket_write
- error_map[self.socket] = self.handle_socket_error
- else:
- # no connection, ensure clear buffer
- self.buffer_ser2net = ''
- # check the server socket
- read_map[self.server_socket] = self.handle_connect
- error_map[self.server_socket] = self.handle_server_error
-
-
- def handle_serial_read(self):
- """Reading from serial port"""
- try:
- data = os.read(self.serial.fileno(), 1024)
- if data:
- # store data in buffer if there is a client connected
- if self.socket is not None:
- self.buffer_ser2net += data
- else:
- self.handle_serial_error()
- except Exception, msg:
- self.handle_serial_error(msg)
-
- def handle_serial_write(self):
- """Writing to serial port"""
- try:
- # write a chunk
- n = os.write(self.serial.fileno(), self.buffer_net2ser)
- # and see how large that chunk was, remove that from buffer
- self.buffer_net2ser = self.buffer_net2ser[n:]
- except Exception, msg:
- self.handle_serial_error(msg)
-
- def handle_serial_error(self, error=None):
- """Serial port error"""
- # terminate connection
- self.close()
-
- def handle_socket_read(self):
- """Read from socket"""
- try:
- # read a chunk from the serial port
- data = self.socket.recv(1024)
- if data:
- # add data to buffer
- self.buffer_net2ser += data
- else:
- # empty read indicates disconnection
- self.handle_disconnect()
- except socket.error:
- self.handle_socket_error()
-
- def handle_socket_write(self):
- """Write to socket"""
- try:
- # write a chunk
- count = self.socket.send(self.buffer_ser2net)
- # and remove the sent data from the buffer
- self.buffer_ser2net = self.buffer_ser2net[count:]
- except socket.error:
- self.handle_socket_error()
-
- def handle_socket_error(self):
- """Socket connection fails"""
- self.handle_disconnect()
-
- def handle_connect(self):
- """Server socket gets a connection"""
- # accept a connection in any case, close connection
- # below if already busy
- connection, addr = self.server_socket.accept()
- if self.socket is None:
- self.socket = connection
- self.socket.setblocking(0)
- if not options.quiet:
- print '%s: Connected by %s:%s' % (self.device, addr[0], addr[1])
- else:
- # reject connection if there is already one
- connection.close()
- if not options.quiet:
- print '%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1])
-
- def handle_server_error(self):
- """Socker server fails"""
- self.close()
-
- def handle_disconnect(self):
- """Socket gets disconnected"""
- # clear send buffer
- self.buffer_ser2net = ''
- # close network connection
- if self.socket is not None:
- self.socket.close()
- self.socket = None
- if not options.quiet:
- print '%s: Disconnected' % self.device
-
-
-def test():
- service = ZeroconfService(name="TestService", port=3000)
- service.publish()
- raw_input("Press any key to unpublish the service ")
- service.unpublish()
-
-
-if __name__ == '__main__':
- import optparse
-
- parser = optparse.OptionParser(usage="""\
-%prog [options]
-
-Announce the existence of devices using zeroconf and provide
-a TCP/IP <-> serial port gateway.
-
-Note that the TCP/IP server is not protected. Everyone can connect
-to it!
-
-If running as daemon, write to syslog. Otherwise write to stdout.
-""")
-
- parser.add_option("-q", "--quiet", dest="quiet", action="store_true",
- help="suppress non error messages", default=False)
-
- parser.add_option("-o", "--logfile", dest="log_file",
- help="write messages file instead of stdout", default=None, metavar="FILE")
-
- parser.add_option("-d", "--daemon", dest="daemonize", action="store_true",
- help="start as daemon", default=False)
-
- parser.add_option("", "--pidfile", dest="pid_file",
- help="specify a name for the PID file", default=None, metavar="FILE")
-
- (options, args) = parser.parse_args()
-
- # redirect output if specified
- if options.log_file is not None:
- class WriteFlushed:
- def __init__(self, fileobj):
- self.fileobj = fileobj
- def write(self, s):
- self.fileobj.write(s)
- self.fileobj.flush()
- def close(self):
- self.fileobj.close()
- sys.stdout = sys.stderr = WriteFlushed(open(options.log_file, 'a'))
- # atexit.register(lambda: sys.stdout.close())
-
- if options.daemonize:
- # if running as daemon is requested, do the fork magic
- # options.quiet = True
- import pwd
- # do the UNIX double-fork magic, see Stevens' "Advanced
- # Programming in the UNIX Environment" for details (ISBN 0201563177)
- try:
- pid = os.fork()
- if pid > 0:
- # exit first parent
- sys.exit(0)
- except OSError, e:
- sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
- sys.exit(1)
-
- # decouple from parent environment
- os.chdir("/") # don't prevent unmounting....
- os.setsid()
- os.umask(0)
-
- # do second fork
- try:
- pid = os.fork()
- if pid > 0:
- # exit from second parent, print eventual PID before
- # print "Daemon PID %d" % pid
- if options.pid_file is not None:
- open(options.pid_file,'w').write("%d"%pid)
- sys.exit(0)
- except OSError, e:
- sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
- sys.exit(1)
-
- if options.log_file is None:
- import syslog
- syslog.openlog("serial port publisher")
- # redirect output to syslog
- class WriteToSysLog:
- def __init__(self):
- self.buffer = ''
- def write(self, s):
- self.buffer += s
- if '\n' in self.buffer:
- output, self.buffer = self.buffer.split('\n', 1)
- syslog.syslog(output)
- def flush(self):
- syslog.syslog(self.buffer)
- self.buffer = ''
- def close(self):
- self.flush()
- sys.stdout = sys.stderr = WriteToSysLog()
-
- # ensure the that the daemon runs a normal user, if run as root
- #if os.getuid() == 0:
- # name, passwd, uid, gid, desc, home, shell = pwd.getpwnam('someuser')
- # os.setgid(gid) # set group first
- # os.setuid(uid) # set user
-
- # keep the published stuff in a dictionary
- published = {}
- # prepare list of device names (hard coded)
- device_list = ['/dev/ttyUSB%d' % p for p in range(8)]
- # get a nice hostname
- hostname = socket.gethostname()
-
- def unpublish(forwarder):
- """when forwarders die, we need to unregister them"""
- try:
- del published[forwarder.device]
- except KeyError:
- pass
- else:
- if not options.quiet: print "unpublish: %s" % (forwarder)
-
- alive = True
- next_check = 0
- # main loop
- while alive:
- try:
- # if it is time, check for serial port devices
- now = time.time()
- if now > next_check:
- next_check = now + 5
- # check each device
- for device in device_list:
- # if it appeared
- if os.path.exists(device):
- if device not in published:
- num = int(device[-1])
- published[device] = Forwarder(
- device,
- "%s on %s" % (device, hostname),
- 7000+num,
- on_close=unpublish
- )
- if not options.quiet: print "publish: %s" % (published[device])
- published[device].open()
- else:
- # or when it disappeared
- if device in published:
- if not options.quiet: print "unpublish: %s" % (published[device])
- published[device].close()
- try:
- del published[device]
- except KeyError:
- pass
-
- # select_start = time.time()
- read_map = {}
- write_map = {}
- error_map = {}
- for publisher in published.values():
- publisher.update_select_maps(read_map, write_map, error_map)
- try:
- readers, writers, errors = select.select(
- read_map.keys(),
- write_map.keys(),
- error_map.keys(),
- 5
- )
- except select.error, err:
- if err[0] != EINTR:
- raise
- # select_end = time.time()
- # print "select used %.3f s" % (select_end - select_start)
- for reader in readers:
- read_map[reader]()
- for writer in writers:
- write_map[writer]()
- for error in errors:
- error_map[error]()
- # print "operation used %.3f s" % (time.time() - select_end)
- except KeyboardInterrupt:
- alive = False
- except SystemExit:
- raise
- except:
- raise
- traceback.print_exc()
diff --git a/pyserial/examples/port_publisher.sh b/pyserial/examples/port_publisher.sh deleted file mode 100644 index 50d4f17..0000000 --- a/pyserial/examples/port_publisher.sh +++ /dev/null @@ -1,44 +0,0 @@ -#! /bin/sh -# daemon starter script -# based on skeleton from Debian GNU/Linux -# cliechti at gmx.net - -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin -DAEMON=/usr/local/bin/port_publisher.py -NAME=port_publisher -DESC="serial port avahi device publisher" - -test -f $DAEMON || exit 0 - -set -e - -case "$1" in - start) - echo -n "Starting $DESC: " - $DAEMON --daemon --pidfile /var/run/$NAME.pid - echo "$NAME." - ;; - stop) - echo -n "Stopping $DESC: " - start-stop-daemon --stop --quiet --pidfile /var/run/$NAME.pid - # \ --exec $DAEMON - echo "$NAME." - ;; - restart|force-reload) - echo -n "Restarting $DESC: " - start-stop-daemon --stop --quiet --pidfile \ - /var/run/$NAME.pid - # --exec $DAEMON - sleep 1 - $DAEMON --daemon --pidfile /var/run/$NAME.pid - echo "$NAME." - ;; - *) - N=/etc/init.d/$NAME - echo "Usage: $N {start|stop|restart|force-reload}" >&2 - exit 1 - ;; -esac - -exit 0 - diff --git a/pyserial/examples/scan.py b/pyserial/examples/scan.py deleted file mode 100644 index 82c5458..0000000 --- a/pyserial/examples/scan.py +++ /dev/null @@ -1,30 +0,0 @@ -#! /usr/bin/env python -"""\ -Scan for serial ports. - -Part of pySerial (http://pyserial.sf.net) -(C) 2002-2003 <cliechti@gmx.net> - -The scan function of this module tries to open each port number -from 0 to 255 and it builds a list of those ports where this was -successful. -""" - -import serial - -def scan(): - """scan for available ports. return a list of tuples (num, name)""" - available = [] - for i in range(256): - try: - s = serial.Serial(i) - available.append( (i, s.portstr)) - s.close() # explicit close 'cause of delayed GC in java - except serial.SerialException: - pass - return available - -if __name__=='__main__': - print "Found ports:" - for n,s in scan(): - print "(%d) %s" % (n,s) diff --git a/pyserial/examples/scanlinux.py b/pyserial/examples/scanlinux.py deleted file mode 100644 index 7cf6383..0000000 --- a/pyserial/examples/scanlinux.py +++ /dev/null @@ -1,20 +0,0 @@ -#! /usr/bin/env python -"""\ -Scan for serial ports. Linux specific variant that also includes USB/Serial -adapters. - -Part of pySerial (http://pyserial.sf.net) -(C) 2009 <cliechti@gmx.net> -""" - -import serial -import glob - -def scan(): - """scan for available ports. return a list of device names.""" - return glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') - -if __name__=='__main__': - print "Found ports:" - for name in scan(): - print name diff --git a/pyserial/examples/scanwin32.py b/pyserial/examples/scanwin32.py deleted file mode 100644 index 6d58d89..0000000 --- a/pyserial/examples/scanwin32.py +++ /dev/null @@ -1,200 +0,0 @@ -import ctypes -import re - -def ValidHandle(value): - if value == 0: - raise ctypes.WinError() - return value - -NULL = 0 -HDEVINFO = ctypes.c_int -BOOL = ctypes.c_int -CHAR = ctypes.c_char -PCTSTR = ctypes.c_char_p -HWND = ctypes.c_uint -DWORD = ctypes.c_ulong -PDWORD = ctypes.POINTER(DWORD) -ULONG = ctypes.c_ulong -ULONG_PTR = ctypes.POINTER(ULONG) -#~ PBYTE = ctypes.c_char_p -PBYTE = ctypes.c_void_p - -class GUID(ctypes.Structure): - _fields_ = [ - ('Data1', ctypes.c_ulong), - ('Data2', ctypes.c_ushort), - ('Data3', ctypes.c_ushort), - ('Data4', ctypes.c_ubyte*8), - ] - def __str__(self): - return "{%08x-%04x-%04x-%s-%s}" % ( - self.Data1, - self.Data2, - self.Data3, - ''.join(["%02x" % d for d in self.Data4[:2]]), - ''.join(["%02x" % d for d in self.Data4[2:]]), - ) - -class SP_DEVINFO_DATA(ctypes.Structure): - _fields_ = [ - ('cbSize', DWORD), - ('ClassGuid', GUID), - ('DevInst', DWORD), - ('Reserved', ULONG_PTR), - ] - def __str__(self): - return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst) -PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA) - -class SP_DEVICE_INTERFACE_DATA(ctypes.Structure): - _fields_ = [ - ('cbSize', DWORD), - ('InterfaceClassGuid', GUID), - ('Flags', DWORD), - ('Reserved', ULONG_PTR), - ] - def __str__(self): - return "InterfaceClassGuid:%s Flags:%s" % (self.InterfaceClassGuid, self.Flags) - -PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA) - -PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p - -class dummy(ctypes.Structure): - _fields_=[("d1",ctypes.DWORD), ("d2",ctypes.CHAR)] -SIZEOF_SP_DEVICE_INTERFACE_DETAIL_DATA_A = ctypes.sizeof(dummy) - -SetupDiDestroyDeviceInfoList = ctypes.windll.setupapi.SetupDiDestroyDeviceInfoList -SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO] -SetupDiDestroyDeviceInfoList.restype = BOOL - -SetupDiGetClassDevs = ctypes.windll.setupapi.SetupDiGetClassDevsA -SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD] -SetupDiGetClassDevs.restype = ValidHandle # HDEVINFO - -SetupDiEnumDeviceInterfaces = ctypes.windll.setupapi.SetupDiEnumDeviceInterfaces -SetupDiEnumDeviceInterfaces.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, ctypes.POINTER(GUID), DWORD, PSP_DEVICE_INTERFACE_DATA] -SetupDiEnumDeviceInterfaces.restype = BOOL - -SetupDiGetDeviceInterfaceDetail = ctypes.windll.setupapi.SetupDiGetDeviceInterfaceDetailA -SetupDiGetDeviceInterfaceDetail.argtypes = [HDEVINFO, PSP_DEVICE_INTERFACE_DATA, PSP_DEVICE_INTERFACE_DETAIL_DATA, DWORD, PDWORD, PSP_DEVINFO_DATA] -SetupDiGetDeviceInterfaceDetail.restype = BOOL - -SetupDiGetDeviceRegistryProperty = ctypes.windll.setupapi.SetupDiGetDeviceRegistryPropertyA -SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD] -SetupDiGetDeviceRegistryProperty.restype = BOOL - - -GUID_CLASS_COMPORT = GUID(0x86e0d1e0L, 0x8089, 0x11d0, - (ctypes.c_ubyte*8)(0x9c, 0xe4, 0x08, 0x00, 0x3e, 0x30, 0x1f, 0x73)) - -DIGCF_PRESENT = 2 -DIGCF_DEVICEINTERFACE = 16 -INVALID_HANDLE_VALUE = 0 -ERROR_INSUFFICIENT_BUFFER = 122 -SPDRP_HARDWAREID = 1 -SPDRP_FRIENDLYNAME = 12 -ERROR_NO_MORE_ITEMS = 259 - -def comports(available_only=True): - """This generator scans the device registry for com ports and yields port, desc, hwid. - If available_only is true only return currently existing ports.""" - flags = DIGCF_DEVICEINTERFACE - if available_only: - flags |= DIGCF_PRESENT - g_hdi = SetupDiGetClassDevs(ctypes.byref(GUID_CLASS_COMPORT), None, NULL, flags); - #~ for i in range(256): - for dwIndex in range(256): - did = SP_DEVICE_INTERFACE_DATA() - did.cbSize = ctypes.sizeof(did) - - if not SetupDiEnumDeviceInterfaces( - g_hdi, - None, - ctypes.byref(GUID_CLASS_COMPORT), - dwIndex, - ctypes.byref(did) - ): - if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS: - raise ctypes.WinError() - break - - dwNeeded = DWORD() - # get the size - if not SetupDiGetDeviceInterfaceDetail( - g_hdi, - ctypes.byref(did), - None, 0, ctypes.byref(dwNeeded), - None - ): - # Ignore ERROR_INSUFFICIENT_BUFFER - if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: - raise ctypes.WinError() - # allocate buffer - class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure): - _fields_ = [ - ('cbSize', DWORD), - ('DevicePath', CHAR*(dwNeeded.value - ctypes.sizeof(DWORD))), - ] - def __str__(self): - return "DevicePath:%s" % (self.DevicePath,) - idd = SP_DEVICE_INTERFACE_DETAIL_DATA_A() - idd.cbSize = SIZEOF_SP_DEVICE_INTERFACE_DETAIL_DATA_A - devinfo = SP_DEVINFO_DATA() - devinfo.cbSize = ctypes.sizeof(devinfo) - if not SetupDiGetDeviceInterfaceDetail( - g_hdi, - ctypes.byref(did), - ctypes.byref(idd), dwNeeded, None, - ctypes.byref(devinfo) - ): - raise ctypes.WinError() - - # hardware ID - szHardwareID = ctypes.create_string_buffer('\0' * 250) - if not SetupDiGetDeviceRegistryProperty( - g_hdi, - ctypes.byref(devinfo), - SPDRP_HARDWAREID, - None, - ctypes.byref(szHardwareID), ctypes.sizeof(szHardwareID) - 1, - None - ): - # Ignore ERROR_INSUFFICIENT_BUFFER - if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: - raise ctypes.WinError() - - # friendly name - szFriendlyName = ctypes.create_string_buffer('\0' * 250) - if not SetupDiGetDeviceRegistryProperty( - g_hdi, - ctypes.byref(devinfo), - SPDRP_FRIENDLYNAME, - None, - ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1, - None - ): - # Ignore ERROR_INSUFFICIENT_BUFFER - if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER: - raise ctypes.WinError() - try: - port_name = re.search(r"\((.*)\)", szFriendlyName.value).group(1) - except AttributeError,msg: - port_name = "???" - yield port_name, szFriendlyName.value, szHardwareID.value - - SetupDiDestroyDeviceInfoList(g_hdi) - - -if __name__ == '__main__': - import serial - for port, desc, hwid in comports(): - try: - print "%s: %s (%s)" % (port, desc, hwid) - print " "*10, serial.Serial(port) #test open - except serial.serialutil.SerialException: - print "can't be openend." - # list of all ports the system knows - print "-"*60 - for port, desc, hwid in comports(False): - print "%-10s: %s (%s)" % (port, desc, hwid) diff --git a/pyserial/examples/setup-miniterm-py2exe.py b/pyserial/examples/setup-miniterm-py2exe.py deleted file mode 100644 index e935cf0..0000000 --- a/pyserial/examples/setup-miniterm-py2exe.py +++ /dev/null @@ -1,26 +0,0 @@ -# setup script for py2exe to create the miniterm.exe -# $Id: setup-miniterm-py2exe.py,v 1.1 2005-09-21 19:51:19 cliechti Exp $ - -from distutils.core import setup -import glob, sys, py2exe, os - -sys.path.append('..') - -sys.argv.extend("py2exe --bundle 1".split()) - -setup( - name='miniterm', - #~ version='0.5', - zipfile=None, - options = {"py2exe": - { - 'dist_dir': 'bin', - 'excludes': ['javax.comm'], - 'compressed': 1, - } - }, - console = [ - #~ "miniterm_exe_wrapper.py", - "miniterm.py", - ], -) diff --git a/pyserial/examples/setup_demo.py b/pyserial/examples/setup_demo.py deleted file mode 100644 index 854c0b9..0000000 --- a/pyserial/examples/setup_demo.py +++ /dev/null @@ -1,35 +0,0 @@ -# This is a setup.py example script for the use with py2exe -from distutils.core import setup -import py2exe -import sys, os - -#this script is only useful for py2exe so just run that distutils command. -#that allows to run it with a simple double click. -sys.argv.append('py2exe') - -#get an icon from somewhere.. the python installation should have one: -icon = os.path.join(os.path.dirname(sys.executable), 'py.ico') - -setup( - options = {'py2exe': { - 'excludes': ['javax.comm'], - 'optimize': 2, - 'dist_dir': 'dist', - } - }, - - name = "wxTerminal", - windows = [ - { - 'script': "wxTerminal.py", - 'icon_resources': [(0x0004, icon)] - }, - ], - zipfile = "stuff.lib", - - description = "Simple serial terminal application", - version = "0.1", - author = "Chris Liechti", - author_email = "cliechti@gmx.net", - url = "http://pyserial.sf.net", -) diff --git a/pyserial/examples/tcp_serial_redirect.py b/pyserial/examples/tcp_serial_redirect.py deleted file mode 100644 index e62d8a5..0000000 --- a/pyserial/examples/tcp_serial_redirect.py +++ /dev/null @@ -1,305 +0,0 @@ -#!/usr/bin/env python - -# (C) 2002-2009 Chris Liechti <cliechti@gmx.net> -# redirect data from a TCP/IP connection to a serial port and vice versa -# requires Python 2.2 'cause socket.sendall is used - - -import sys, os, serial, threading, socket, codecs - -try: - True -except NameError: - True = 1 - False = 0 - -class Redirector: - def __init__(self, serial, socket, ser_newline=None, net_newline=None, spy=False): - self.serial = serial - self.socket = socket - self.ser_newline = ser_newline - self.net_newline = net_newline - self.spy = spy - - def shortcut(self): - """connect the serial port to the tcp port by copying everything - from one side to the other""" - self.alive = True - self.thread_read = threading.Thread(target=self.reader) - self.thread_read.setDaemon(1) - self.thread_read.start() - self.writer() - - def reader(self): - """loop forever and copy serial->socket""" - while self.alive: - try: - data = self.serial.read(1) # read one, blocking - n = self.serial.inWaiting() # look if there is more - if n: - data = data + self.serial.read(n) # and get as much as possible - if data: - # the spy shows what's on the serial port, so log it before converting newlines - if self.spy: - sys.stdout.write(codecs.escape_encode(data)[0]) - sys.stdout.flush() - if self.ser_newline and self.net_newline: - # do the newline conversion - # XXX fails for CR+LF in input when it is cut in half at the begin or end of the string - data = net_newline.join(data.split(ser_newline)) - self.socket.sendall(data) # send it over TCP - except socket.error, msg: - sys.stderr.write('ERROR: %s\n' % msg) - # probably got disconnected - break - self.alive = False - - def writer(self): - """loop forever and copy socket->serial""" - while self.alive: - try: - data = self.socket.recv(1024) - if not data: - break - if self.ser_newline and self.net_newline: - # do the newline conversion - # XXX fails for CR+LF in input when it is cut in half at the begin or end of the string - data = ser_newline.join(data.split(net_newline)) - self.serial.write(data) # get a bunch of bytes and send them - # the spy shows what's on the serial port, so log it after converting newlines - if self.spy: - sys.stdout.write(codecs.escape_encode(data)[0]) - sys.stdout.flush() - except socket.error, msg: - sys.stderr.write('ERROR: %s\n' % msg) - # probably got disconnected - break - self.alive = False - self.thread_read.join() - - def stop(self): - """Stop copying""" - if self.alive: - self.alive = False - self.thread_read.join() - - -if __name__ == '__main__': - import optparse - - parser = optparse.OptionParser( - usage = "%prog [options] [port [baudrate]]", - description = "Simple Serial to Network (TCP/IP) redirector.", - epilog = """\ -NOTE: no security measures are implemented. Anyone can remotely connect -to this service over the network. - -Only one connection at once is supported. When the connection is terminated -it waits for the next connect. -""") - - parser.add_option("-q", "--quiet", - dest = "quiet", - action = "store_true", - help = "suppress non error messages", - default = False - ) - - parser.add_option("--spy", - dest = "spy", - action = "store_true", - help = "peek at the communication and print all data to the console", - default = False - ) - - group = optparse.OptionGroup(parser, - "Serial Port", - "Serial port settings" - ) - parser.add_option_group(group) - - group.add_option("-p", "--port", - dest = "port", - help = "port, a number (default 0) or a device name", - default = None - ) - - group.add_option("-b", "--baud", - dest = "baudrate", - action = "store", - type = 'int', - help = "set baud rate, default: %default", - default = 9600 - ) - - group.add_option("", "--parity", - dest = "parity", - action = "store", - help = "set parity, one of [N, E, O], default=%default", - default = 'N' - ) - - group.add_option("--rtscts", - dest = "rtscts", - action = "store_true", - help = "enable RTS/CTS flow control (default off)", - default = False - ) - - group.add_option("--xonxoff", - dest = "xonxoff", - action = "store_true", - help = "enable software flow control (default off)", - default = False - ) - - group.add_option("--rts", - dest = "rts_state", - action = "store", - type = 'int', - help = "set initial RTS line state (possible values: 0, 1)", - default = None - ) - - group.add_option("--dtr", - dest = "dtr_state", - action = "store", - type = 'int', - help = "set initial DTR line state (possible values: 0, 1)", - default = None - ) - - group = optparse.OptionGroup(parser, - "Network settings", - "Network configuration." - ) - parser.add_option_group(group) - - group.add_option("-P", "--localport", - dest = "local_port", - action = "store", - type = 'int', - help = "local TCP port", - default = 7777 - ) - - group = optparse.OptionGroup(parser, - "Newline Settings", - "Convert newlines between network and serial port. Conversion is normally disabled and can be enabled by --convert." - ) - parser.add_option_group(group) - - group.add_option("-c", "--convert", - dest = "convert", - action = "store_true", - help = "enable newline conversion (default off)", - default = False - ) - - group.add_option("--net-nl", - dest = "net_newline", - action = "store", - help = "type of newlines that are expected on the network (default: %default)", - default = "LF" - ) - - group.add_option("--ser-nl", - dest = "ser_newline", - action = "store", - help = "type of newlines that are expected on the serial port (default: %default)", - default = "CR+LF" - ) - - (options, args) = parser.parse_args() - - # get port and baud rate from command line arguments or the option switches - port = options.port - baudrate = options.baudrate - if args: - if options.port is not None: - parser.error("no arguments are allowed, options only when --port is given") - port = args.pop(0) - if args: - try: - baudrate = int(args[0]) - except ValueError: - parser.error("baud rate must be a number, not %r" % args[0]) - args.pop(0) - if args: - parser.error("too many arguments") - else: - if port is None: port = 0 - - # check newline modes for network connection - mode = options.net_newline.upper() - if mode == 'CR': - net_newline = '\r' - elif mode == 'LF': - net_newline = '\n' - elif mode == 'CR+LF' or mode == 'CRLF': - net_newline = '\r\n' - else: - parser.error("Invalid value for --net-nl. Valid are 'CR', 'LF' and 'CR+LF'/'CRLF'.") - - # check newline modes for serial connection - mode = options.ser_newline.upper() - if mode == 'CR': - ser_newline = '\r' - elif mode == 'LF': - ser_newline = '\n' - elif mode == 'CR+LF' or mode == 'CRLF': - ser_newline = '\r\n' - else: - parser.error("Invalid value for --ser-nl. Valid are 'CR', 'LF' and 'CR+LF'/'CRLF'.") - - # connect to serial port - ser = serial.Serial() - ser.port = port - ser.baudrate = baudrate - ser.parity = options.parity - ser.rtscts = options.rtscts - ser.xonxoff = options.xonxoff - ser.timeout = 1 # required so that the reader thread can exit - - if not options.quiet: - sys.stderr.write("--- TCP/IP to Serial redirector --- type Ctrl-C / BREAK to quit\n") - sys.stderr.write("--- %s %s,%s,%s,%s ---\n" % (ser.portstr, ser.baudrate, 8, ser.parity, 1)) - - try: - ser.open() - except serial.SerialException, e: - sys.stderr.write("Could not open serial port %s: %s\n" % (ser.portstr, e)) - sys.exit(1) - - if options.rts_state is not None: - ser.setRTS(options.rts_state) - - if options.dtr_state is not None: - ser.setDTR(options.dtr_state) - - srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - srv.bind( ('', options.local_port) ) - srv.listen(1) - while 1: - try: - sys.stderr.write("Waiting for connection on %s...\n" % options.local_port) - connection, addr = srv.accept() - sys.stderr.write('Connected by %s\n' % (addr,)) - # enter console->serial loop - r = Redirector( - ser, - connection, - options.convert and ser_newline or None, - options.convert and net_newline or None, - options.spy, - ) - r.shortcut() - if options.spy: sys.stdout.write('\n') - sys.stderr.write('Disconnected\n') - connection.close() - except socket.error, msg: - sys.stderr.write('ERROR: %s\n' % msg) - - sys.stderr.write('\n--- exit ---\n') - diff --git a/pyserial/examples/test.py b/pyserial/examples/test.py deleted file mode 100644 index 666275f..0000000 --- a/pyserial/examples/test.py +++ /dev/null @@ -1,216 +0,0 @@ -#! /usr/bin/env python -# Python Serial Port Extension for Win32, Linux, BSD, Jython -# see __init__.py -# -# (C) 2001-2008 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt - -"""\ -Some tests for the serial module. -Part of pyserial (http://pyserial.sf.net) (C)2001-2008 cliechti@gmx.net - -Intended to be run on different platforms, to ensure portability of -the code. - -For all these tests a simple hardware is required. -Loopback HW adapter: -Shortcut these pin pairs: - TX <-> RX - RTS <-> CTS - DTR <-> DSR - -On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) -""" - -import unittest, threading, time -import sys -import serial - -# on which port should the tests be performed: -PORT=0 - -if sys.version_info >= (3, 0): - def data(string): - return bytes(string, 'latin1') - bytes_0to255 = [bytes([x]) for x in range(256)] -else: - def data(string): return string - bytes_0to255 = [chr(x) for x in range(256)] - - -class Test4_Nonblocking(unittest.TestCase): - """Test with timeouts""" - timeout = 0 - - def setUp(self): - self.s = serial.Serial(PORT, timeout=self.timeout) - - def tearDown(self): - self.s.close() - - def test0_Messy(self): - """NonBlocking (timeout=0)""" - # this is only here to write out the message in verbose mode - # because Test3 and Test4 print the same messages - - def test1_ReadEmpty(self): - """timeout: After port open, the input buffer must be empty""" - self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer") - - def test2_Loopback(self): - """timeout: each sent character should return (binary test). - this is also a test for the binary capability of a port.""" - for c in bytes_0to255: - self.s.write(c) - # there might be a small delay until the character is ready (especially on win32) - time.sleep(0.02) - self.failUnlessEqual(self.s.inWaiting(), 1, "expected exactly one character for inWainting()") - self.failUnlessEqual(self.s.read(1), c, "expected a '%s' which was written before" % c) - self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read") - - def test2_LoopbackTimeout(self): - """timeout: test the timeout/immediate return. - partial results should be returned.""" - self.s.write(data("HELLO")) - time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32) - # read more characters as are available to run in the timeout - self.failUnlessEqual(self.s.read(10), data('HELLO'), "expected the 'HELLO' which was written before") - self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read") - - -class Test3_Timeout(Test4_Nonblocking): - """Same tests as the NonBlocking ones but this time with timeout""" - timeout = 1 - - def test0_Messy(self): - """Blocking (timeout=1)""" - # this is only here to write out the message in verbose mode - # because Test3 and Test4 print the same messages - -class SendEvent(threading.Thread): - def __init__(self, serial, delay=1): - threading.Thread.__init__(self) - self.serial = serial - self.delay = delay - self.x = threading.Event() - self.stopped = 0 - self.start() - - def run(self): - time.sleep(self.delay) - if not self.stopped: - self.serial.write(data("E")) - self.x.set() - - def isSet(self): - return self.x.isSet() - - def stop(self): - self.stopped = 1 - self.x.wait() - -class Test1_Forever(unittest.TestCase): - """Tests a port with no timeout. These tests require that a - character is sent after some time to stop the test, this is done - through the SendEvent class and the Loopback HW.""" - def setUp(self): - self.s = serial.Serial(PORT, timeout=None) - self.event = SendEvent(self.s) - - def tearDown(self): - self.event.stop() - self.s.close() - - def test2_ReadEmpty(self): - """no timeout: after port open, the input buffer must be empty (read). - a character is sent after some time to terminate the test (SendEvent).""" - c = self.s.read(1) - if not (self.event.isSet() and c == data('E')): - self.fail("expected marker") - -class Test2_Forever(unittest.TestCase): - """Tests a port with no timeout""" - def setUp(self): - self.s = serial.Serial(PORT, timeout=None) - - def tearDown(self): - self.s.close() - - def test1_inWaitingEmpty(self): - """no timeout: after port open, the input buffer must be empty (inWaiting)""" - self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer") - - def test2_Loopback(self): - """no timeout: each sent character should return (binary test). - this is also a test for the binary capability of a port.""" - for c in bytes_0to255: - self.s.write(c) - # there might be a small delay until the character is ready (especially on win32) - time.sleep(0.02) - self.failUnlessEqual(self.s.inWaiting(), 1, "expected exactly one character for inWainting()") - self.failUnlessEqual(self.s.read(1), c, "expected an '%s' which was written before" % c) - self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer after all sent chars are read") - - -class Test0_DataWires(unittest.TestCase): - """Test modem control lines""" - def setUp(self): - self.s = serial.Serial(PORT) - - def tearDown(self): - self.s.close() - - def test1_RTS(self): - """Test RTS/CTS""" - self.s.setRTS(0) - time.sleep(0.1) - self.failUnless(not self.s.getCTS(), "CTS -> 0") - self.s.setRTS(1) - time.sleep(0.1) - self.failUnless(self.s.getCTS(), "CTS -> 1") - - def test2_DTR(self): - """Test DTR/DSR""" - self.s.setDTR(0) - time.sleep(0.1) - self.failUnless(not self.s.getDSR(), "DSR -> 0") - self.s.setDTR(1) - time.sleep(0.1) - self.failUnless(self.s.getDSR(), "DSR -> 1") - - def test3_RI(self): - """Test RI""" - self.failUnless(not self.s.getRI(), "RI -> 0") - -class Test_MoreTimeouts(unittest.TestCase): - """Test with timeouts""" - def setUp(self): - self.s = serial.Serial() # create an closed serial port - - def tearDown(self): - self.s.close() - - def test_WriteTimeout(self): - """Test write() timeout.""" - # use xonxoff setting and the loop-back adapter to switch traffic on hold - self.s.port = PORT - self.s.writeTimeout = 1 - self.s.xonxoff = 1 - self.s.open() - self.s.write(serial.XOFF) - time.sleep(0.5) # some systems need a little delay so that they can react on XOFF - t1 = time.time() - self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*100)) - t2 = time.time() - self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1)) - - -if __name__ == '__main__': - import sys - sys.stdout.write(__doc__) - if len(sys.argv) > 1: - PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) - sys.argv[1:] = ['-v'] - # When this module is executed from the command-line, it runs all its tests - unittest.main() diff --git a/pyserial/examples/test_advanced.py b/pyserial/examples/test_advanced.py deleted file mode 100644 index a19361c..0000000 --- a/pyserial/examples/test_advanced.py +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/env python -# needs at least python 2.2.3 - -# Python Serial Port Extension for Win32, Linux, BSD, Jython -# see __init__.py -# -# (C) 2001-2003 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt - -"""\ -Some tests for the serial module. -Part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net - -Intended to be run on different platforms, to ensure portability of -the code. - -These tests open a serial port and change all the settings on the fly. -If the port is really correctly configured cannot be determined - that -would require external hardware or a nullmodem cable and an other -serial port library... Thus it mainly tests that all features are -correctly implemented and that the interface does what it should. - -""" - -import unittest -import serial - -# on which port should the tests be performed: -PORT=0 - -class Test_ChangeAttributes(unittest.TestCase): - """Test with timeouts""" - - def setUp(self): - self.s = serial.Serial() # create a closed serial port - - def tearDown(self): - self.s.close() - - def test_PortSetting(self): - self.s.port = PORT - # portstr has to be set - if isinstance(PORT, str): - self.failUnlessEqual(self.s.portstr.lower(), PORT.lower()) - else: - self.failUnlessEqual(self.s.portstr, serial.device(PORT)) - # test internals - self.failUnlessEqual(self.s._port, PORT) - # test on the fly change - self.s.open() - self.failUnless(self.s.isOpen()) - self.s.port = 0 - self.failUnless(self.s.isOpen()) - self.failUnlessEqual(self.s.port, 0) - self.failUnlessEqual(self.s.portstr, serial.device(0)) - try: - self.s.port = 1 - except serial.SerialException: # port not available on system - pass # can't test on this machine... - else: - self.failUnless(self.s.isOpen()) - self.failUnlessEqual(self.s.port, 1) - self.failUnlessEqual(self.s.portstr, serial.device(1)) - - def test_BaudrateSetting(self): - self.s.port = PORT - self.s.open() - for baudrate in (300, 9600, 19200, 115200): - self.s.baudrate = baudrate - # test get method - self.failUnlessEqual(self.s.baudrate, baudrate) - # test internals - self.failUnlessEqual(self.s._baudrate, baudrate) - # test illegal values - for illegal_value in (-300, -1, 'a', None): - self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value) - - # skip this test as pyserial now tries to set even non standard baud rates. - # therefore the test can not choose a value that fails on any system. - def disabled_test_BaudrateSetting2(self): - # test illegal values, depending on machine/port some of these may be valid... - self.s.port = PORT - self.s.open() - for illegal_value in (500000,576000,921600,92160): - self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value) - - def test_BytesizeSetting(self): - for bytesize in (5,6,7,8): - self.s.bytesize = bytesize - # test get method - self.failUnlessEqual(self.s.bytesize, bytesize) - # test internals - self.failUnlessEqual(self.s._bytesize, bytesize) - # test illegal values - for illegal_value in (0, 1, 3, 4, 9, 10, 'a', None): - self.failUnlessRaises(ValueError, self.s.setByteSize, illegal_value) - - def test_ParitySetting(self): - for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD): - self.s.parity = parity - # test get method - self.failUnlessEqual(self.s.parity, parity) - # test internals - self.failUnlessEqual(self.s._parity, parity) - # test illegal values - for illegal_value in (0, 57, 'a', None): - self.failUnlessRaises(ValueError, self.s.setParity, illegal_value) - - def test_StopbitsSetting(self): - for stopbits in (1, 2): - self.s.stopbits = stopbits - # test get method - self.failUnlessEqual(self.s.stopbits, stopbits) - # test internals - self.failUnlessEqual(self.s._stopbits, stopbits) - # test illegal values - for illegal_value in (0, 3, 2.5, 57, 'a', None): - self.failUnlessRaises(ValueError, self.s.setStopbits, illegal_value) - - def test_TimeoutSetting(self): - for timeout in (None, 0, 1, 3.14159, 10, 1000, 3600): - self.s.timeout = timeout - # test get method - self.failUnlessEqual(self.s.timeout, timeout) - # test internals - self.failUnlessEqual(self.s._timeout, timeout) - # test illegal values - for illegal_value in (-1, 'a'): - self.failUnlessRaises(ValueError, self.s.setTimeout, illegal_value) - - def test_XonXoffSetting(self): - for xonxoff in (True, False): - self.s.xonxoff = xonxoff - # test get method - self.failUnlessEqual(self.s.xonxoff, xonxoff) - # test internals - self.failUnlessEqual(self.s._xonxoff, xonxoff) - # no illegal values here, normal rules for the boolean value of an - # object are used thus all objects have a truth value. - - def test_RtsCtsSetting(self): - for rtscts in (True, False): - self.s.rtscts = rtscts - # test get method - self.failUnlessEqual(self.s.rtscts, rtscts) - # test internals - self.failUnlessEqual(self.s._rtscts, rtscts) - # no illegal values here, normal rules for the boolean value of an - # object are used thus all objects have a truth value. - - def test_UnconfiguredPort(self): - # an unconfigured port cannot be opened - self.failUnlessRaises(serial.SerialException, self.s.open) - - def test_PortOpenClose(self): - self.s.port = PORT - for i in range(3): - # open the port and check flag - self.failUnless(not self.s.isOpen()) - self.s.open() - self.failUnless(self.s.isOpen()) - self.s.close() - self.failUnless(not self.s.isOpen()) - -if __name__ == '__main__': - import sys - sys.stdout.write(__doc__) - if len(sys.argv) > 1: - PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) - sys.argv[1:] = ['-v'] - # When this module is executed from the command-line, it runs all its tests - unittest.main() diff --git a/pyserial/examples/test_high_load.py b/pyserial/examples/test_high_load.py deleted file mode 100644 index a4d8e04..0000000 --- a/pyserial/examples/test_high_load.py +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env python -#Python Serial Port Extension for Win32, Linux, BSD, Jython -#see __init__.py -# -#(C) 2001-2003 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt - -"""Some tests for the serial module. -Part of pyserial (http://pyserial.sf.net) (C)2002-2003 cliechti@gmx.net - -Intended to be run on different platforms, to ensure portability of -the code. - -For all these tests a simple hardware is required. -Loopback HW adapter: -Shortcut these pin pairs: - TX <-> RX - RTS <-> CTS - DTR <-> DSR - -On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) -""" - -import unittest, threading, time -import sys -import serial - -#on which port should the tests be performed: -PORT = 0 -BAUDRATE = 115200 -#~ BAUDRATE=9600 - -if sys.version_info >= (2, 6): - bytes_0to255 = bytes(range(256)) -else: - def data(string): return string - bytes_0to255 = ''.join([chr(x) for x in range(256)]) - - -class TestHighLoad(unittest.TestCase): - """Test sending and receiving large amount of data""" - - N = 16 - #~ N = 1 - - def setUp(self): - self.s = serial.Serial(PORT,BAUDRATE, timeout=10) - def tearDown(self): - self.s.close() - - def test0_WriteReadLoopback(self): - """Send big strings, write/read order.""" - for i in range(self.N): - q = bytes_0to255 - self.s.write(q) - self.failUnless(self.s.read(len(q))==q, "expected a '%s' which was written before" % q) - self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read") - - def test1_WriteWriteReadLoopback(self): - """Send big strings, multiple write one read.""" - q = bytes_0to255 - for i in range(self.N): - self.s.write(q) - read = self.s.read(len(q)*self.N) - self.failUnless(read==q*self.N, "expected what was written before. got %d bytes, expected %d" % (len(read), self.N*len(q))) - self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read") - -if __name__ == '__main__': - import sys - sys.stdout.write(__doc__) - if len(sys.argv) > 1: - PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) - sys.argv[1:] = ['-v'] - # When this module is executed from the command-line, it runs all its tests - unittest.main() diff --git a/pyserial/examples/test_iolib.py b/pyserial/examples/test_iolib.py deleted file mode 100644 index e740a4c..0000000 --- a/pyserial/examples/test_iolib.py +++ /dev/null @@ -1,78 +0,0 @@ -##! /usr/bin/env python -# Python Serial Port Extension for Win32, Linux, BSD, Jython -# see __init__.py -# -# (C) 2001-2008 Chris Liechti <cliechti@gmx.net> -# this is distributed under a free software license, see license.txt - -"""\ -Some tests for the serial module. -Part of pyserial (http://pyserial.sf.net) (C)2001-2009 cliechti@gmx.net - -Intended to be run on different platforms, to ensure portability of -the code. - -This modules contains test for the interaction between Serial and the io -library. This only works on Python 2.6+ that introduced the io library. - -For all these tests a simple hardware is required. -Loopback HW adapter: -Shortcut these pin pairs: - TX <-> RX - RTS <-> CTS - DTR <-> DSR - -On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) -""" - -import unittest -import sys - -if sys.version_info < (2, 6): - sys.stderr.write("""\ -============================================================================== -WARNING: this test is intended for Python 2.6 and newer where the io library -is available. This seems to be an older version of Python running. -Continuing anyway... -============================================================================== -""") - -import io -import serial - -# trick to make that this test run under 2.6 and 3.x without modification. -# problem is, io library on 2.6 does NOT accept type 'str' and 3.x doesn't -# like u'nicode' strings with the prefix and it is not providing an unicode -# function ('str' is now what 'unicode' used to be) -if sys.version_info >= (3, 0): - def unicode(x): return x - - -# on which port should the tests be performed: -PORT = 0 - -class Test_SerialAndIO(unittest.TestCase): - - def setUp(self): - self.s = serial.Serial(PORT, timeout=1) - self.io = io.TextIOWrapper(io.BufferedRWPair(self.s, self.s)) - - def tearDown(self): - self.s.close() - - def test_hello_raw(self): - self.io.write(unicode("hello\n")) - self.io.flush() # it is buffering. required to get the data out - hello = self.io.readline() - self.failUnlessEqual(hello, unicode("hello\n")) - - -if __name__ == '__main__': - import sys - sys.stdout.write(__doc__) - if len(sys.argv) > 1: - PORT = sys.argv[1] - sys.stdout.write("Testing port: %r\n" % PORT) - sys.argv[1:] = ['-v'] - # When this module is executed from the command-line, it runs all its tests - unittest.main() diff --git a/pyserial/examples/wxSerialConfigDialog.py b/pyserial/examples/wxSerialConfigDialog.py deleted file mode 100644 index 7085035..0000000 --- a/pyserial/examples/wxSerialConfigDialog.py +++ /dev/null @@ -1,260 +0,0 @@ -#!/usr/bin/env python -# generated by wxGlade 0.3.1 on Thu Oct 02 23:25:44 2003 - -#from wxPython.wx import * -import wx -import serial - -SHOW_BAUDRATE = 1<<0 -SHOW_FORMAT = 1<<1 -SHOW_FLOW = 1<<2 -SHOW_TIMEOUT = 1<<3 -SHOW_ALL = SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT - -try: - enumerate -except NameError: - def enumerate(sequence): - return zip(range(len(sequence)), sequence) - -class SerialConfigDialog(wx.Dialog): - """Serial Port confiuration dialog, to be used with pyserial 2.0+ - When instantiating a class of this dialog, then the "serial" keyword - argument is mandatory. It is a reference to a serial.Serial instance. - the optional "show" keyword argument can be used to show/hide different - settings. The default is SHOW_ALL which coresponds to - SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT. All constants can be - found in ths module (not the class).""" - - def __init__(self, *args, **kwds): - #grab the serial keyword and remove it from the dict - self.serial = kwds['serial'] - del kwds['serial'] - self.show = SHOW_ALL - if kwds.has_key('show'): - self.show = kwds['show'] - del kwds['show'] - # begin wxGlade: SerialConfigDialog.__init__ - # end wxGlade - kwds["style"] = wx.DEFAULT_DIALOG_STYLE - wx.Dialog.__init__(self, *args, **kwds) - self.label_2 = wx.StaticText(self, -1, "Port") - self.combo_box_port = wx.ComboBox(self, -1, choices=["dummy1", "dummy2", "dummy3", "dummy4", "dummy5"], style=wx.CB_DROPDOWN) - if self.show & SHOW_BAUDRATE: - self.label_1 = wx.StaticText(self, -1, "Baudrate") - self.choice_baudrate = wx.Choice(self, -1, choices=["choice 1"]) - if self.show & SHOW_FORMAT: - self.label_3 = wx.StaticText(self, -1, "Data Bits") - self.choice_databits = wx.Choice(self, -1, choices=["choice 1"]) - self.label_4 = wx.StaticText(self, -1, "Stop Bits") - self.choice_stopbits = wx.Choice(self, -1, choices=["choice 1"]) - self.label_5 = wx.StaticText(self, -1, "Parity") - self.choice_parity = wx.Choice(self, -1, choices=["choice 1"]) - if self.show & SHOW_TIMEOUT: - self.checkbox_timeout = wx.CheckBox(self, -1, "Use Timeout") - self.text_ctrl_timeout = wx.TextCtrl(self, -1, "") - self.label_6 = wx.StaticText(self, -1, "seconds") - if self.show & SHOW_FLOW: - self.checkbox_rtscts = wx.CheckBox(self, -1, "RTS/CTS") - self.checkbox_xonxoff = wx.CheckBox(self, -1, "Xon/Xoff") - self.button_ok = wx.Button(self, -1, "OK") - self.button_cancel = wx.Button(self, -1, "Cancel") - - self.__set_properties() - self.__do_layout() - #fill in ports and select current setting - index = 0 - self.combo_box_port.Clear() - for n in range(4): - portname = serial.device(n) - self.combo_box_port.Append(portname) - if self.serial.portstr == portname: - index = n - if self.serial.portstr is not None: - self.combo_box_port.SetValue(str(self.serial.portstr)) - else: - self.combo_box_port.SetSelection(index) - if self.show & SHOW_BAUDRATE: - #fill in badrates and select current setting - self.choice_baudrate.Clear() - for n, baudrate in enumerate(self.serial.BAUDRATES): - self.choice_baudrate.Append(str(baudrate)) - if self.serial.baudrate == baudrate: - index = n - self.choice_baudrate.SetSelection(index) - if self.show & SHOW_FORMAT: - #fill in databits and select current setting - self.choice_databits.Clear() - for n, bytesize in enumerate(self.serial.BYTESIZES): - self.choice_databits.Append(str(bytesize)) - if self.serial.bytesize == bytesize: - index = n - self.choice_databits.SetSelection(index) - #fill in stopbits and select current setting - self.choice_stopbits.Clear() - for n, stopbits in enumerate(self.serial.STOPBITS): - self.choice_stopbits.Append(str(stopbits)) - if self.serial.stopbits == stopbits: - index = n - self.choice_stopbits.SetSelection(index) - #fill in parities and select current setting - self.choice_parity.Clear() - for n, parity in enumerate(self.serial.PARITIES): - self.choice_parity.Append(str(serial.PARITY_NAMES[parity])) - if self.serial.parity == parity: - index = n - self.choice_parity.SetSelection(index) - if self.show & SHOW_TIMEOUT: - #set the timeout mode and value - if self.serial.timeout is None: - self.checkbox_timeout.SetValue(False) - self.text_ctrl_timeout.Enable(False) - else: - self.checkbox_timeout.SetValue(True) - self.text_ctrl_timeout.Enable(True) - self.text_ctrl_timeout.SetValue(str(self.serial.timeout)) - if self.show & SHOW_FLOW: - #set the rtscts mode - self.checkbox_rtscts.SetValue(self.serial.rtscts) - #set the rtscts mode - self.checkbox_xonxoff.SetValue(self.serial.xonxoff) - #attach the event handlers - self.__attach_events() - - def __set_properties(self): - # begin wxGlade: SerialConfigDialog.__set_properties - # end wxGlade - self.SetTitle("Serial Port Configuration") - if self.show & SHOW_TIMEOUT: - self.text_ctrl_timeout.Enable(0) - self.button_ok.SetDefault() - - def __do_layout(self): - # begin wxGlade: SerialConfigDialog.__do_layout - # end wxGlade - sizer_2 = wx.BoxSizer(wx.VERTICAL) - sizer_3 = wx.BoxSizer(wx.HORIZONTAL) - sizer_basics = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Basics"), wx.VERTICAL) - sizer_5 = wx.BoxSizer(wx.HORIZONTAL) - sizer_5.Add(self.label_2, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_5.Add(self.combo_box_port, 1, 0, 0) - sizer_basics.Add(sizer_5, 0, wx.RIGHT|wx.EXPAND, 0) - if self.show & SHOW_BAUDRATE: - sizer_baudrate = wx.BoxSizer(wx.HORIZONTAL) - sizer_baudrate.Add(self.label_1, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_baudrate.Add(self.choice_baudrate, 1, wx.ALIGN_RIGHT, 0) - sizer_basics.Add(sizer_baudrate, 0, wx.EXPAND, 0) - sizer_2.Add(sizer_basics, 0, wx.EXPAND, 0) - if self.show & SHOW_FORMAT: - sizer_8 = wx.BoxSizer(wx.HORIZONTAL) - sizer_7 = wx.BoxSizer(wx.HORIZONTAL) - sizer_6 = wx.BoxSizer(wx.HORIZONTAL) - sizer_format = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Data Format"), wx.VERTICAL) - sizer_6.Add(self.label_3, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_6.Add(self.choice_databits, 1, wx.ALIGN_RIGHT, 0) - sizer_format.Add(sizer_6, 0, wx.EXPAND, 0) - sizer_7.Add(self.label_4, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_7.Add(self.choice_stopbits, 1, wx.ALIGN_RIGHT, 0) - sizer_format.Add(sizer_7, 0, wx.EXPAND, 0) - sizer_8.Add(self.label_5, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_8.Add(self.choice_parity, 1, wx.ALIGN_RIGHT, 0) - sizer_format.Add(sizer_8, 0, wx.EXPAND, 0) - sizer_2.Add(sizer_format, 0, wx.EXPAND, 0) - if self.show & SHOW_TIMEOUT: - sizer_timeout = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Timeout"), wx.HORIZONTAL) - sizer_timeout.Add(self.checkbox_timeout, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_timeout.Add(self.text_ctrl_timeout, 0, 0, 0) - sizer_timeout.Add(self.label_6, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_2.Add(sizer_timeout, 0, 0, 0) - if self.show & SHOW_FLOW: - sizer_flow = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Flow Control"), wx.HORIZONTAL) - sizer_flow.Add(self.checkbox_rtscts, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_flow.Add(self.checkbox_xonxoff, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4) - sizer_flow.Add((10,10), 1, wx.EXPAND, 0) - sizer_2.Add(sizer_flow, 0, wx.EXPAND, 0) - sizer_3.Add(self.button_ok, 0, 0, 0) - sizer_3.Add(self.button_cancel, 0, 0, 0) - sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4) - self.SetAutoLayout(1) - self.SetSizer(sizer_2) - sizer_2.Fit(self) - sizer_2.SetSizeHints(self) - self.Layout() - - def __attach_events(self): - wx.EVT_BUTTON(self, self.button_ok.GetId(), self.OnOK) - wx.EVT_BUTTON(self, self.button_cancel.GetId(), self.OnCancel) - if self.show & SHOW_TIMEOUT: - wx.EVT_CHECKBOX(self, self.checkbox_timeout.GetId(), self.OnTimeout) - - def OnOK(self, events): - success = True - self.serial.port = str(self.combo_box_port.GetValue()) - if self.show & SHOW_BAUDRATE: - self.serial.baudrate = self.serial.BAUDRATES[self.choice_baudrate.GetSelection()] - if self.show & SHOW_FORMAT: - self.serial.bytesize = self.serial.BYTESIZES[self.choice_databits.GetSelection()] - self.serial.stopbits = self.serial.STOPBITS[self.choice_stopbits.GetSelection()] - self.serial.parity = self.serial.PARITIES[self.choice_parity.GetSelection()] - if self.show & SHOW_FLOW: - self.serial.rtscts = self.checkbox_rtscts.GetValue() - self.serial.xonxoff = self.checkbox_xonxoff.GetValue() - if self.show & SHOW_TIMEOUT: - if self.checkbox_timeout.GetValue(): - try: - self.serial.timeout = float(self.text_ctrl_timeout.GetValue()) - except ValueError: - dlg = wx.MessageDialog(self, 'Timeout must be a numeric value', - 'Value Error', wx.OK | wx.ICON_ERROR) - dlg.ShowModal() - dlg.Destroy() - success = False - else: - self.serial.timeout = None - if success: - self.EndModal(wx.ID_OK) - - def OnCancel(self, events): - self.EndModal(wx.ID_CANCEL) - - def OnTimeout(self, events): - if self.checkbox_timeout.GetValue(): - self.text_ctrl_timeout.Enable(True) - else: - self.text_ctrl_timeout.Enable(False) - -# end of class SerialConfigDialog - - -class MyApp(wx.App): - """Test code""" - def OnInit(self): - wx.InitAllImageHandlers() - - ser = serial.Serial() - print ser - #loop until cancel is pressed, old values are used as start for the next run - #show the different views, one after the other - #value are kept. - for flags in (SHOW_BAUDRATE, SHOW_FLOW, SHOW_FORMAT, SHOW_TIMEOUT, SHOW_ALL): - dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser, show=flags) - self.SetTopWindow(dialog_serial_cfg) - result = dialog_serial_cfg.ShowModal() - print ser - if result != wx.ID_OK: - break - #the user can play around with the values, CANCEL aborts the loop - while 1: - dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser) - self.SetTopWindow(dialog_serial_cfg) - result = dialog_serial_cfg.ShowModal() - print ser - if result != wx.ID_OK: - break - return 0 - -# end of class MyApp - -if __name__ == "__main__": - app = MyApp(0) - app.MainLoop() diff --git a/pyserial/examples/wxSerialConfigDialog.wxg b/pyserial/examples/wxSerialConfigDialog.wxg deleted file mode 100644 index f5e92e0..0000000 --- a/pyserial/examples/wxSerialConfigDialog.wxg +++ /dev/null @@ -1,262 +0,0 @@ -<?xml version="1.0"?> -<!-- generated by wxGlade 0.3.1 on Fri Oct 03 01:53:04 2003 --> - -<application path="D:\prog\python\pyserial_sf\pyserial\examples\wxSerialConfigDialog.py" name="app" class="MyApp" option="0" language="python" top_window="dialog_serial_cfg" encoding="ISO-8859-1" use_gettext="0" overwrite="0"> - <object class="SerialConfigDialog" name="dialog_serial_cfg" base="EditDialog"> - <style>wxDEFAULT_DIALOG_STYLE</style> - <title>Serial Port Configuration</title> - <object class="wxBoxSizer" name="sizer_2" base="EditBoxSizer"> - <orient>wxVERTICAL</orient> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxStaticBoxSizer" name="sizer_basics" base="EditStaticBoxSizer"> - <orient>wxVERTICAL</orient> - <label>Basics</label> - <object class="sizeritem"> - <flag>wxRIGHT|wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxBoxSizer" name="sizer_5" base="EditBoxSizer"> - <orient>wxHORIZONTAL</orient> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>1</option> - <object class="wxStaticText" name="label_2" base="EditStaticText"> - <attribute>1</attribute> - <label>Port</label> - </object> - </object> - <object class="sizeritem"> - <border>0</border> - <option>1</option> - <object class="wxComboBox" name="combo_box_port" base="EditComboBox"> - <selection>0</selection> - <choices> - <choice>dummy1</choice> - <choice>dummy2</choice> - <choice>dummy3</choice> - <choice>dummy4</choice> - <choice>dummy5</choice> - </choices> - </object> - </object> - </object> - </object> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxBoxSizer" name="sizer_baudrate" base="EditBoxSizer"> - <orient>wxHORIZONTAL</orient> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>1</option> - <object class="wxStaticText" name="label_1" base="EditStaticText"> - <attribute>1</attribute> - <label>Baudrate</label> - </object> - </object> - <object class="sizeritem"> - <flag>wxALIGN_RIGHT</flag> - <border>0</border> - <option>1</option> - <object class="wxChoice" name="choice_baudrate" base="EditChoice"> - <selection>0</selection> - <choices> - <choice>choice 1</choice> - </choices> - </object> - </object> - </object> - </object> - </object> - </object> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxStaticBoxSizer" name="sizer_format" base="EditStaticBoxSizer"> - <orient>wxVERTICAL</orient> - <label>Data Format</label> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxBoxSizer" name="sizer_6" base="EditBoxSizer"> - <orient>wxHORIZONTAL</orient> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>1</option> - <object class="wxStaticText" name="label_3" base="EditStaticText"> - <attribute>1</attribute> - <label>Data Bits</label> - </object> - </object> - <object class="sizeritem"> - <flag>wxALIGN_RIGHT</flag> - <border>0</border> - <option>1</option> - <object class="wxChoice" name="choice_databits" base="EditChoice"> - <selection>0</selection> - <choices> - <choice>choice 1</choice> - </choices> - </object> - </object> - </object> - </object> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxBoxSizer" name="sizer_7" base="EditBoxSizer"> - <orient>wxHORIZONTAL</orient> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>1</option> - <object class="wxStaticText" name="label_4" base="EditStaticText"> - <attribute>1</attribute> - <label>Stop Bits</label> - </object> - </object> - <object class="sizeritem"> - <flag>wxALIGN_RIGHT</flag> - <border>0</border> - <option>1</option> - <object class="wxChoice" name="choice_stopbits" base="EditChoice"> - <selection>0</selection> - <choices> - <choice>choice 1</choice> - </choices> - </object> - </object> - </object> - </object> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxBoxSizer" name="sizer_8" base="EditBoxSizer"> - <orient>wxHORIZONTAL</orient> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>1</option> - <object class="wxStaticText" name="label_5" base="EditStaticText"> - <attribute>1</attribute> - <label>Parity</label> - </object> - </object> - <object class="sizeritem"> - <flag>wxALIGN_RIGHT</flag> - <border>0</border> - <option>1</option> - <object class="wxChoice" name="choice_parity" base="EditChoice"> - <selection>0</selection> - <choices> - <choice>choice 1</choice> - </choices> - </object> - </object> - </object> - </object> - </object> - </object> - <object class="sizeritem"> - <border>0</border> - <option>0</option> - <object class="wxStaticBoxSizer" name="sizer_timeout" base="EditStaticBoxSizer"> - <orient>wxHORIZONTAL</orient> - <label>Timeout</label> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>0</option> - <object class="wxCheckBox" name="checkbox_timeout" base="EditCheckBox"> - <label>Use Timeout</label> - </object> - </object> - <object class="sizeritem"> - <border>0</border> - <option>0</option> - <object class="wxTextCtrl" name="text_ctrl_timeout" base="EditTextCtrl"> - <disabled>1</disabled> - </object> - </object> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>0</option> - <object class="wxStaticText" name="label_6" base="EditStaticText"> - <attribute>1</attribute> - <label>seconds</label> - </object> - </object> - </object> - </object> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxStaticBoxSizer" name="sizer_flow" base="EditStaticBoxSizer"> - <orient>wxHORIZONTAL</orient> - <label>Flow Control</label> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>0</option> - <object class="wxCheckBox" name="checkbox_rtscts" base="EditCheckBox"> - <label>RTS/CTS</label> - </object> - </object> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag> - <border>4</border> - <option>0</option> - <object class="wxCheckBox" name="checkbox_xonxoff" base="EditCheckBox"> - <label>Xon/Xoff</label> - </object> - </object> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>1</option> - <object class="spacer" name="spacer" base="EditSpacer"> - <height>10</height> - <width>10</width> - </object> - </object> - </object> - </object> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_RIGHT</flag> - <border>4</border> - <option>0</option> - <object class="wxBoxSizer" name="sizer_3" base="EditBoxSizer"> - <orient>wxHORIZONTAL</orient> - <object class="sizeritem"> - <border>0</border> - <option>0</option> - <object class="wxButton" name="button_ok" base="EditButton"> - <default>1</default> - <label>OK</label> - </object> - </object> - <object class="sizeritem"> - <border>0</border> - <option>0</option> - <object class="wxButton" name="button_cancel" base="EditButton"> - <label>Cancel</label> - </object> - </object> - </object> - </object> - </object> - </object> -</application> diff --git a/pyserial/examples/wxTerminal.py b/pyserial/examples/wxTerminal.py deleted file mode 100644 index 646c272..0000000 --- a/pyserial/examples/wxTerminal.py +++ /dev/null @@ -1,333 +0,0 @@ -#!/usr/bin/env python -# generated by wxGlade 0.3.1 on Fri Oct 03 23:23:45 2003 - -#from wxPython.wx import * -import wx -import wxSerialConfigDialog -import serial -import threading - -#---------------------------------------------------------------------- -# Create an own event type, so that GUI updates can be delegated -# this is required as on some platforms only the main thread can -# access the GUI without crashing. wxMutexGuiEnter/wxMutexGuiLeave -# could be used too, but an event is more elegant. - -SERIALRX = wx.NewEventType() -# bind to serial data receive events -EVT_SERIALRX = wx.PyEventBinder(SERIALRX, 0) - -class SerialRxEvent(wx.PyCommandEvent): - eventType = SERIALRX - def __init__(self, windowID, data): - wx.PyCommandEvent.__init__(self, self.eventType, windowID) - self.data = data - - def Clone(self): - self.__class__(self.GetId(), self.data) - -#---------------------------------------------------------------------- - -ID_CLEAR = wx.NewId() -ID_SAVEAS = wx.NewId() -ID_SETTINGS = wx.NewId() -ID_TERM = wx.NewId() -ID_EXIT = wx.NewId() - -NEWLINE_CR = 0 -NEWLINE_LF = 1 -NEWLINE_CRLF = 2 - -class TerminalSetup: - """Placeholder for various terminal settings. Used to pass the - options to the TerminalSettingsDialog.""" - def __init__(self): - self.echo = False - self.unprintable = False - self.newline = NEWLINE_CRLF - -class TerminalSettingsDialog(wx.Dialog): - """Simple dialog with common terminal settings like echo, newline mode.""" - - def __init__(self, *args, **kwds): - self.settings = kwds['settings'] - del kwds['settings'] - # begin wxGlade: TerminalSettingsDialog.__init__ - kwds["style"] = wx.DEFAULT_DIALOG_STYLE - wx.Dialog.__init__(self, *args, **kwds) - self.checkbox_echo = wx.CheckBox(self, -1, "Local Echo") - self.checkbox_unprintable = wx.CheckBox(self, -1, "Show unprintable characters") - self.radio_box_newline = wx.RadioBox(self, -1, "Newline Handling", choices=["CR only", "LF only", "CR+LF"], majorDimension=0, style=wx.RA_SPECIFY_ROWS) - self.button_ok = wx.Button(self, -1, "OK") - self.button_cancel = wx.Button(self, -1, "Cancel") - - self.__set_properties() - self.__do_layout() - # end wxGlade - self.__attach_events() - self.checkbox_echo.SetValue(self.settings.echo) - self.checkbox_unprintable.SetValue(self.settings.unprintable) - self.radio_box_newline.SetSelection(self.settings.newline) - - def __set_properties(self): - # begin wxGlade: TerminalSettingsDialog.__set_properties - self.SetTitle("Terminal Settings") - self.radio_box_newline.SetSelection(0) - self.button_ok.SetDefault() - # end wxGlade - - def __do_layout(self): - # begin wxGlade: TerminalSettingsDialog.__do_layout - sizer_2 = wx.BoxSizer(wx.VERTICAL) - sizer_3 = wx.BoxSizer(wx.HORIZONTAL) - sizer_4 = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Input/Output"), wx.VERTICAL) - sizer_4.Add(self.checkbox_echo, 0, wx.ALL, 4) - sizer_4.Add(self.checkbox_unprintable, 0, wx.ALL, 4) - sizer_4.Add(self.radio_box_newline, 0, 0, 0) - sizer_2.Add(sizer_4, 0, wx.EXPAND, 0) - sizer_3.Add(self.button_ok, 0, 0, 0) - sizer_3.Add(self.button_cancel, 0, 0, 0) - sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4) - self.SetAutoLayout(1) - self.SetSizer(sizer_2) - sizer_2.Fit(self) - sizer_2.SetSizeHints(self) - self.Layout() - # end wxGlade - - def __attach_events(self): - self.Bind(wx.EVT_BUTTON, self.OnOK, id = self.button_ok.GetId()) - self.Bind(wx.EVT_BUTTON, self.OnCancel, id = self.button_cancel.GetId()) - - def OnOK(self, events): - """Update data wil new values and close dialog.""" - self.settings.echo = self.checkbox_echo.GetValue() - self.settings.unprintable = self.checkbox_unprintable.GetValue() - self.settings.newline = self.radio_box_newline.GetSelection() - self.EndModal(wx.ID_OK) - - def OnCancel(self, events): - """Do not update data but close dialog.""" - self.EndModal(wx.ID_CANCEL) - -# end of class TerminalSettingsDialog - - -class TerminalFrame(wx.Frame): - """Simple terminal program for wxPython""" - - def __init__(self, *args, **kwds): - self.serial = serial.Serial() - self.serial.timeout = 0.5 #make sure that the alive event can be checked from time to time - self.settings = TerminalSetup() #placeholder for the settings - self.thread = None - self.alive = threading.Event() - # begin wxGlade: TerminalFrame.__init__ - kwds["style"] = wx.DEFAULT_FRAME_STYLE - wx.Frame.__init__(self, *args, **kwds) - self.text_ctrl_output = wx.TextCtrl(self, -1, "", style=wx.TE_MULTILINE|wx.TE_READONLY) - - # Menu Bar - self.frame_terminal_menubar = wx.MenuBar() - self.SetMenuBar(self.frame_terminal_menubar) - wxglade_tmp_menu = wx.Menu() - wxglade_tmp_menu.Append(ID_CLEAR, "&Clear", "", wx.ITEM_NORMAL) - wxglade_tmp_menu.Append(ID_SAVEAS, "&Save Text As...", "", wx.ITEM_NORMAL) - wxglade_tmp_menu.AppendSeparator() - wxglade_tmp_menu.Append(ID_SETTINGS, "&Port Settings...", "", wx.ITEM_NORMAL) - wxglade_tmp_menu.Append(ID_TERM, "&Terminal Settings...", "", wx.ITEM_NORMAL) - wxglade_tmp_menu.AppendSeparator() - wxglade_tmp_menu.Append(ID_EXIT, "&Exit", "", wx.ITEM_NORMAL) - self.frame_terminal_menubar.Append(wxglade_tmp_menu, "&File") - # Menu Bar end - - self.__set_properties() - self.__do_layout() - # end wxGlade - self.__attach_events() #register events - self.OnPortSettings(None) #call setup dialog on startup, opens port - if not self.alive.isSet(): - self.Close() - - def StartThread(self): - """Start the receiver thread""" - self.thread = threading.Thread(target=self.ComPortThread) - self.thread.setDaemon(1) - self.alive.set() - self.thread.start() - - def StopThread(self): - """Stop the receiver thread, wait util it's finished.""" - if self.thread is not None: - self.alive.clear() #clear alive event for thread - self.thread.join() #wait until thread has finished - self.thread = None - - def __set_properties(self): - # begin wxGlade: TerminalFrame.__set_properties - self.SetTitle("Serial Terminal") - self.SetSize((546, 383)) - # end wxGlade - - def __do_layout(self): - # begin wxGlade: TerminalFrame.__do_layout - sizer_1 = wx.BoxSizer(wx.VERTICAL) - sizer_1.Add(self.text_ctrl_output, 1, wx.EXPAND, 0) - self.SetAutoLayout(1) - self.SetSizer(sizer_1) - self.Layout() - # end wxGlade - - def __attach_events(self): - #register events at the controls - self.Bind(wx.EVT_MENU, self.OnClear, id = ID_CLEAR) - self.Bind(wx.EVT_MENU, self.OnSaveAs, id = ID_SAVEAS) - self.Bind(wx.EVT_MENU, self.OnExit, id = ID_EXIT) - self.Bind(wx.EVT_MENU, self.OnPortSettings, id = ID_SETTINGS) - self.Bind(wx.EVT_MENU, self.OnTermSettings, id = ID_TERM) - self.text_ctrl_output.Bind(wx.EVT_CHAR, self.OnKey) - self.Bind(EVT_SERIALRX, self.OnSerialRead) - self.Bind(wx.EVT_CLOSE, self.OnClose) - - def OnExit(self, event): - """Menu point Exit""" - self.Close() - - def OnClose(self, event): - """Called on application shutdown.""" - self.StopThread() #stop reader thread - self.serial.close() #cleanup - self.Destroy() #close windows, exit app - - def OnSaveAs(self, event): - """Save contents of output window.""" - filename = None - dlg = wx.FileDialog(None, "Save Text As...", ".", "", "Text File|*.txt|All Files|*", wx.SAVE) - if dlg.ShowModal() == wx.ID_OK: - filename = dlg.GetPath() - dlg.Destroy() - - if filename is not None: - f = file(filename, 'w') - text = self.text_ctrl_output.GetValue() - if type(text) == unicode: - text = text.encode("latin1") #hm, is that a good asumption? - f.write(text) - f.close() - - def OnClear(self, event): - """Clear contents of output window.""" - self.text_ctrl_output.Clear() - - def OnPortSettings(self, event=None): - """Show the portsettings dialog. The reader thread is stopped for the - settings change.""" - if event is not None: #will be none when called on startup - self.StopThread() - self.serial.close() - ok = False - while not ok: - dialog_serial_cfg = wxSerialConfigDialog.SerialConfigDialog(None, -1, "", - show=wxSerialConfigDialog.SHOW_BAUDRATE|wxSerialConfigDialog.SHOW_FORMAT|wxSerialConfigDialog.SHOW_FLOW, - serial=self.serial - ) - result = dialog_serial_cfg.ShowModal() - dialog_serial_cfg.Destroy() - #open port if not called on startup, open it on startup and OK too - if result == wx.ID_OK or event is not None: - try: - self.serial.open() - except serial.SerialException, e: - dlg = wx.MessageDialog(None, str(e), "Serial Port Error", wx.OK | wx.ICON_ERROR) - dlg.ShowModal() - dlg.Destroy() - else: - self.StartThread() - self.SetTitle("Serial Terminal on %s [%s, %s%s%s%s%s]" % ( - self.serial.portstr, - self.serial.baudrate, - self.serial.bytesize, - self.serial.parity, - self.serial.stopbits, - self.serial.rtscts and ' RTS/CTS' or '', - self.serial.xonxoff and ' Xon/Xoff' or '', - ) - ) - ok = True - else: - #on startup, dialog aborted - self.alive.clear() - ok = True - - def OnTermSettings(self, event): - """Menu point Terminal Settings. Show the settings dialog - with the current terminal settings""" - dialog = TerminalSettingsDialog(None, -1, "", settings=self.settings) - result = dialog.ShowModal() - dialog.Destroy() - - def OnKey(self, event): - """Key event handler. if the key is in the ASCII range, write it to the serial port. - Newline handling and local echo is also done here.""" - code = event.GetKeyCode() - if code < 256: #is it printable? - if code == 13: #is it a newline? (check for CR which is the RETURN key) - if self.settings.echo: #do echo if needed - self.text_ctrl_output.AppendText('\n') - if self.settings.newline == NEWLINE_CR: - self.serial.write('\r') #send CR - elif self.settings.newline == NEWLINE_LF: - self.serial.write('\n') #send LF - elif self.settings.newline == NEWLINE_CRLF: - self.serial.write('\r\n') #send CR+LF - else: - char = chr(code) - if self.settings.echo: #do echo if needed - self.text_ctrl_output.WriteText(char) - self.serial.write(char) #send the charcater - else: - print "Extra Key:", code - - def OnSerialRead(self, event): - """Handle input from the serial port.""" - text = event.data - if self.settings.unprintable: - text = ''.join([(c >= ' ') and c or '<%d>' % ord(c) for c in text]) - self.text_ctrl_output.AppendText(text) - - def ComPortThread(self): - """Thread that handles the incomming traffic. Does the basic input - transformation (newlines) and generates an SerialRxEvent""" - while self.alive.isSet(): #loop while alive event is true - text = self.serial.read(1) #read one, with timout - if text: #check if not timeout - n = self.serial.inWaiting() #look if there is more to read - if n: - text = text + self.serial.read(n) #get it - #newline transformation - if self.settings.newline == NEWLINE_CR: - text = text.replace('\r', '\n') - elif self.settings.newline == NEWLINE_LF: - pass - elif self.settings.newline == NEWLINE_CRLF: - text = text.replace('\r\n', '\n') - event = SerialRxEvent(self.GetId(), text) - self.GetEventHandler().AddPendingEvent(event) - #~ self.OnSerialRead(text) #output text in window - -# end of class TerminalFrame - - -class MyApp(wx.App): - def OnInit(self): - wx.InitAllImageHandlers() - frame_terminal = TerminalFrame(None, -1, "") - self.SetTopWindow(frame_terminal) - frame_terminal.Show(1) - return 1 - -# end of class MyApp - -if __name__ == "__main__": - app = MyApp(0) - app.MainLoop() diff --git a/pyserial/examples/wxTerminal.wxg b/pyserial/examples/wxTerminal.wxg deleted file mode 100644 index 183f876..0000000 --- a/pyserial/examples/wxTerminal.wxg +++ /dev/null @@ -1,127 +0,0 @@ -<?xml version="1.0"?> -<!-- generated by wxGlade 0.3.1 on Sat Oct 04 02:41:48 2003 --> - -<application path="D:\prog\python\pyserial_sf\pyserial\examples\wxTerminal.py" name="app" class="MyApp" option="0" language="python" top_window="frame_terminal" encoding="ISO-8859-1" use_gettext="0" overwrite="0"> - <object class="TerminalFrame" name="frame_terminal" base="EditFrame"> - <style>wxDEFAULT_FRAME_STYLE</style> - <title>Serial Terminal</title> - <menubar>1</menubar> - <size>546, 383</size> - <object class="wxBoxSizer" name="sizer_1" base="EditBoxSizer"> - <orient>wxVERTICAL</orient> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>1</option> - <object class="wxTextCtrl" name="text_ctrl_output" base="EditTextCtrl"> - <style>wxTE_MULTILINE|wxTE_READONLY</style> - </object> - </object> - </object> - <object class="wxMenuBar" name="frame_terminal_menubar" base="EditMenuBar"> - <menus> - <menu name="" label="&File"> - <item> - <label>&Clear</label> - <id>ID_CLEAR</id> - </item> - <item> - <label>&Save Text As...</label> - <id>ID_SAVEAS</id> - </item> - <item> - <label>---</label> - <id>---</id> - <name>---</name> - </item> - <item> - <label>&Port Settings...</label> - <id>ID_SETTINGS</id> - </item> - <item> - <label>&Terminal Settings...</label> - <id>ID_TERM</id> - </item> - <item> - <label>---</label> - <name>---</name> - </item> - <item> - <label>&Exit</label> - <id>ID_EXIT</id> - </item> - </menu> - </menus> - </object> - </object> - <object class="TerminalSettingsDialog" name="dialog_terminal_Settings" base="EditDialog"> - <style>wxDEFAULT_DIALOG_STYLE</style> - <title>Terminal Settings</title> - <object class="wxBoxSizer" name="sizer_2" base="EditBoxSizer"> - <orient>wxVERTICAL</orient> - <object class="sizeritem"> - <flag>wxEXPAND</flag> - <border>0</border> - <option>0</option> - <object class="wxStaticBoxSizer" name="sizer_4" base="EditStaticBoxSizer"> - <orient>wxVERTICAL</orient> - <label>Input/Output</label> - <object class="sizeritem"> - <flag>wxALL</flag> - <border>4</border> - <option>0</option> - <object class="wxCheckBox" name="checkbox_echo" base="EditCheckBox"> - <label>Local Echo</label> - </object> - </object> - <object class="sizeritem"> - <flag>wxALL</flag> - <border>4</border> - <option>0</option> - <object class="wxCheckBox" name="checkbox_unprintable" base="EditCheckBox"> - <label>Show unprintable characters</label> - </object> - </object> - <object class="sizeritem"> - <border>0</border> - <option>0</option> - <object class="wxRadioBox" name="radio_box_newline" base="EditRadioBox"> - <style>wxRA_SPECIFY_ROWS</style> - <selection>0</selection> - <dimension>0</dimension> - <label>Newline Handling</label> - <choices> - <choice>CR only</choice> - <choice>LF only</choice> - <choice>CR+LF</choice> - </choices> - </object> - </object> - </object> - </object> - <object class="sizeritem"> - <flag>wxALL|wxALIGN_RIGHT</flag> - <border>4</border> - <option>0</option> - <object class="wxBoxSizer" name="sizer_3" base="EditBoxSizer"> - <orient>wxHORIZONTAL</orient> - <object class="sizeritem"> - <border>0</border> - <option>0</option> - <object class="wxButton" name="button_ok" base="EditButton"> - <default>1</default> - <label>OK</label> - </object> - </object> - <object class="sizeritem"> - <border>0</border> - <option>0</option> - <object class="wxButton" name="button_cancel" base="EditButton"> - <label>Cancel</label> - </object> - </object> - </object> - </object> - </object> - </object> -</application> |