summaryrefslogtreecommitdiff
path: root/pyserial/examples
diff options
context:
space:
mode:
Diffstat (limited to 'pyserial/examples')
-rw-r--r--pyserial/examples/enhancedserial.py62
-rw-r--r--pyserial/examples/miniterm.py562
-rw-r--r--pyserial/examples/port_publisher.py451
-rw-r--r--pyserial/examples/port_publisher.sh44
-rw-r--r--pyserial/examples/scan.py30
-rw-r--r--pyserial/examples/scanlinux.py20
-rw-r--r--pyserial/examples/scanwin32.py200
-rw-r--r--pyserial/examples/setup-miniterm-py2exe.py26
-rw-r--r--pyserial/examples/setup_demo.py35
-rw-r--r--pyserial/examples/tcp_serial_redirect.py305
-rw-r--r--pyserial/examples/test.py216
-rw-r--r--pyserial/examples/test_advanced.py173
-rw-r--r--pyserial/examples/test_high_load.py76
-rw-r--r--pyserial/examples/test_iolib.py78
-rw-r--r--pyserial/examples/wxSerialConfigDialog.py260
-rw-r--r--pyserial/examples/wxSerialConfigDialog.wxg262
-rw-r--r--pyserial/examples/wxTerminal.py333
-rw-r--r--pyserial/examples/wxTerminal.wxg127
18 files changed, 0 insertions, 3260 deletions
diff --git a/pyserial/examples/enhancedserial.py b/pyserial/examples/enhancedserial.py
deleted file mode 100644
index 2c81ae1..0000000
--- a/pyserial/examples/enhancedserial.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#!/usr/bin/env python
-"""Enhanced Serial Port class
-part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
-
-another implementation of the readline and readlines method.
-this one should be more efficient because a bunch of characters are read
-on each access, but the drawback is that a timeout must be specified to
-make it work (enforced by the class __init__).
-
-this class could be enhanced with a read_until() method and more
-like found in the telnetlib.
-"""
-
-from serial import Serial
-
-class EnhancedSerial(Serial):
- def __init__(self, *args, **kwargs):
- #ensure that a reasonable timeout is set
- timeout = kwargs.get('timeout',0.1)
- if timeout < 0.01: timeout = 0.1
- kwargs['timeout'] = timeout
- Serial.__init__(self, *args, **kwargs)
- self.buf = ''
-
- def readline(self, maxsize=None, timeout=1):
- """maxsize is ignored, timeout in seconds is the max time that is way for a complete line"""
- tries = 0
- while 1:
- self.buf += self.read(512)
- pos = self.buf.find('\n')
- if pos >= 0:
- line, self.buf = self.buf[:pos+1], self.buf[pos+1:]
- return line
- tries += 1
- if tries * self.timeout > timeout:
- break
- line, self.buf = self.buf, ''
- return line
-
- def readlines(self, sizehint=None, timeout=1):
- """read all lines that are available. abort after timout
- when no more data arrives."""
- lines = []
- while 1:
- line = self.readline(timeout=timeout)
- if line:
- lines.append(line)
- if not line or line[-1:] != '\n':
- break
- return lines
-
-if __name__=='__main__':
- #do some simple tests with a Loopback HW (see test.py for details)
- PORT = 0
- #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) )
- s = EnhancedSerial(PORT)
- #write out some test data lines
- s.write('\n'.join("hello how are you".split()))
- #and read them back
- print s.readlines()
- #this one should print an empty list
- print s.readlines(timeout=0.4)
diff --git a/pyserial/examples/miniterm.py b/pyserial/examples/miniterm.py
deleted file mode 100644
index 48d07a7..0000000
--- a/pyserial/examples/miniterm.py
+++ /dev/null
@@ -1,562 +0,0 @@
-#!/usr/bin/env python
-
-# Very simple serial terminal
-# (C)2002-2009 Chris Liechti <cliechti@gmx.net>
-
-# Input characters are sent directly (only LF -> CR/LF/CRLF translation is
-# done), received characters are displayed as is (or escaped trough pythons
-# repr, useful for debug purposes)
-
-
-import sys, os, serial, threading
-
-EXITCHARCTER = '\x1d' # GS/CTRL+]
-MENUCHARACTER = '\x14' # Menu: CTRL+T
-
-
-def key_description(character):
- """generate a readable description for a key"""
- ascii_code = ord(character)
- if ascii_code < 32:
- return 'Ctrl+%c' % (ord('@') + ascii_code)
- else:
- return repr(character)
-
-# help text, starts with blank line! it's a function so that the current values
-# for the shortcut keys is used and not the value at program start
-def get_help_text():
- return """
---- pySerial - miniterm - help
----
---- %(exit)-8s Exit program
---- %(menu)-8s Menu escape key, followed by:
---- Menu keys:
---- %(itself)-8s Send the menu character itself to remote
---- %(exchar)-8s Send the exit character to remote
---- %(info)-8s Show info
---- %(upload)-8s Upload file (prompt will be shown)
---- Toggles:
---- %(rts)s RTS %(echo)s local echo
---- %(dtr)s DTR %(break)s BREAK
---- %(lfm)s line feed %(repr)s Cycle repr mode
----
---- Port settings (%(menu)s followed by the following):
---- 7 8 set data bits
---- n e o s m change parity (None, Even, Odd, Space, Mark)
---- 1 2 3 set stop bits (1, 2, 1.5)
---- b change baud rate
---- x X disable/enable software flow control
---- r R disable/enable hardware flow control
-""" % {
- 'exit': key_description(EXITCHARCTER),
- 'menu': key_description(MENUCHARACTER),
- 'rts': key_description('\x12'),
- 'repr': key_description('\x01'),
- 'dtr': key_description('\x04'),
- 'lfm': key_description('\x0c'),
- 'break': key_description('\x02'),
- 'echo': key_description('\x05'),
- 'info': key_description('\x09'),
- 'upload': key_description('\x15'),
- 'itself': key_description(MENUCHARACTER),
- 'exchar': key_description(EXITCHARCTER),
-}
-
-# first choose a platform dependant way to read single characters from the console
-global console
-
-if os.name == 'nt':
- import msvcrt
- class Console:
- def __init__(self):
- pass
-
- def setup(self):
- pass # Do nothing for 'nt'
-
- def cleanup(self):
- pass # Do nothing for 'nt'
-
- def getkey(self):
- while 1:
- z = msvcrt.getch()
- if z == '\0' or z == '\xe0': #functions keys
- msvcrt.getch()
- else:
- if z == '\r':
- return '\n'
- return z
-
- console = Console()
-
-elif os.name == 'posix':
- import termios, sys, os
- class Console:
- def __init__(self):
- self.fd = sys.stdin.fileno()
-
- def setup(self):
- self.old = termios.tcgetattr(self.fd)
- new = termios.tcgetattr(self.fd)
- new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
- new[6][termios.VMIN] = 1
- new[6][termios.VTIME] = 0
- termios.tcsetattr(self.fd, termios.TCSANOW, new)
- #s = '' # We'll save the characters typed and add them to the pool.
-
- def getkey(self):
- c = os.read(self.fd, 1)
- return c
-
- def cleanup(self):
- termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
-
- console = Console()
-
- def cleanup_console():
- console.cleanup()
-
- console.setup()
- sys.exitfunc = cleanup_console #terminal modes have to be restored on exit...
-
-else:
- raise "Sorry no implementation for your platform (%s) available." % sys.platform
-
-
-CONVERT_CRLF = 2
-CONVERT_CR = 1
-CONVERT_LF = 0
-NEWLINE_CONVERISON_MAP = ('\n', '\r', '\r\n')
-LF_MODES = ('LF', 'CR', 'CR/LF')
-
-REPR_MODES = ('raw', 'some control', 'all control', 'hex')
-
-class Miniterm:
- def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False, convert_outgoing=CONVERT_CRLF, repr_mode=0):
- self.serial = serial.Serial(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=0.7)
- self.echo = echo
- self.repr_mode = repr_mode
- self.convert_outgoing = convert_outgoing
- self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing]
- self.dtr_state = True
- self.rts_state = True
- self.break_state = False
-
- def start(self):
- self.alive = True
- # start serial->console thread
- self.receiver_thread = threading.Thread(target=self.reader)
- self.receiver_thread.setDaemon(1)
- self.receiver_thread.start()
- # enter console->serial loop
- self.transmitter_thread = threading.Thread(target=self.writer)
- self.transmitter_thread.setDaemon(1)
- self.transmitter_thread.start()
-
- def stop(self):
- self.alive = False
-
- def join(self, transmit_only=False):
- self.transmitter_thread.join()
- if not transmit_only:
- self.receiver_thread.join()
-
- def dump_port_settings(self):
- sys.stderr.write("\n--- Settings: %s %s,%s,%s,%s\n" % (
- self.serial.portstr,
- self.serial.baudrate,
- self.serial.bytesize,
- self.serial.parity,
- self.serial.stopbits,
- ))
- sys.stderr.write('--- RTS %s\n' % (self.rts_state and 'active' or 'inactive'))
- sys.stderr.write('--- DTR %s\n' % (self.dtr_state and 'active' or 'inactive'))
- sys.stderr.write('--- BREAK %s\n' % (self.break_state and 'active' or 'inactive'))
- sys.stderr.write('--- software flow control %s\n' % (self.serial.xonxoff and 'active' or 'inactive'))
- sys.stderr.write('--- hardware flow control %s\n' % (self.serial.rtscts and 'active' or 'inactive'))
- sys.stderr.write('--- data escaping: %s\n' % (REPR_MODES[self.repr_mode],))
- sys.stderr.write('--- linefeed: %s\n' % (LF_MODES[self.convert_outgoing],))
-
- def reader(self):
- """loop and copy serial->console"""
- while self.alive:
- data = self.serial.read(1)
-
- if self.repr_mode == 0:
- # direct output, just have to care about newline setting
- if data == '\r' and self.convert_outgoing == CONVERT_CR:
- sys.stdout.write('\n')
- else:
- sys.stdout.write(data)
- elif self.repr_mode == 1:
- # escape non-printable, let pass newlines
- if self.convert_outgoing == CONVERT_CRLF and data in '\r\n':
- if data == '\n':
- sys.stdout.write('\n')
- elif data == '\r':
- pass
- elif data == '\n' and self.convert_outgoing == CONVERT_LF:
- sys.stdout.write('\n')
- elif data == '\r' and self.convert_outgoing == CONVERT_CR:
- sys.stdout.write('\n')
- else:
- sys.stdout.write(repr(data)[1:-1])
- elif self.repr_mode == 2:
- # escape all non-printable, including newline
- sys.stdout.write(repr(data)[1:-1])
- elif self.repr_mode == 3:
- # escape everything (hexdump)
- for character in data:
- sys.stdout.write("%s " % character.encode('hex'))
- sys.stdout.flush()
-
-
- def writer(self):
- """loop and copy console->serial until EXITCHARCTER character is
- found. when MENUCHARACTER is found, interpret the next key
- locally.
- """
- menu_active = False
- try:
- while self.alive:
- try:
- c = console.getkey()
- except KeyboardInterrupt:
- c = '\x03'
- if menu_active:
- if c == MENUCHARACTER or c == EXITCHARCTER: # Menu character again/exit char -> send itself
- self.serial.write(c) # send character
- if self.echo:
- sys.stdout.write(c)
- elif c == '\x15': # CTRL+U -> upload file
- sys.stderr.write('\n--- File to upload: ')
- sys.stderr.flush()
- console.cleanup()
- filename = sys.stdin.readline().rstrip('\r\n')
- if filename:
- try:
- file = open(filename, 'r')
- sys.stderr.write('--- Sending file %s ---\n' % filename)
- while True:
- line = file.readline().rstrip('\r\n')
- if not line:
- break
- self.serial.write(line)
- self.serial.write('\r\n')
- # Wait for output buffer to drain.
- self.serial.flush()
- sys.stderr.write('.') # Progress indicator.
- sys.stderr.write('\n--- File %s sent ---\n' % filename)
- except IOError, e:
- sys.stderr.write('--- ERROR opening file %s: %s ---\n' % (filename, e))
- console.setup()
- elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
- sys.stderr.write(get_help_text())
- elif c == '\x12': # CTRL+R -> Toggle RTS
- self.rts_state = not self.rts_state
- self.serial.setRTS(self.rts_state)
- sys.stderr.write('--- RTS %s ---\n' % (self.rts_state and 'active' or 'inactive'))
- elif c == '\x04': # CTRL+D -> Toggle DTR
- self.dtr_state = not self.dtr_state
- self.serial.setDTR(self.dtr_state)
- sys.stderr.write('--- DTR %s ---\n' % (self.dtr_state and 'active' or 'inactive'))
- elif c == '\x02': # CTRL+B -> toggle BREAK condition
- self.break_state = not self.break_state
- self.serial.setBreak(self.break_state)
- sys.stderr.write('--- BREAK %s ---\n' % (self.break_state and 'active' or 'inactive'))
- elif c == '\x05': # CTRL+E -> toggle local echo
- self.echo = not self.echo
- sys.stderr.write('--- local echo %s ---\n' % (self.echo and 'active' or 'inactive'))
- elif c == '\x09': # CTRL+I -> info
- self.dump_port_settings()
- elif c == '\x01': # CTRL+A -> cycle escape mode
- self.repr_mode += 1
- if self.repr_mode > 3:
- self.repr_mode = 0
- sys.stderr.write('--- escape data: %s ---\n' % (
- REPR_MODES[self.repr_mode],
- ))
- elif c == '\x0c': # CTRL+L -> cycle linefeed mode
- self.convert_outgoing += 1
- if self.convert_outgoing > 2:
- self.convert_outgoing = 0
- self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing]
- sys.stderr.write('--- line feed %s ---\n' % (
- LF_MODES[self.convert_outgoing],
- ))
- #~ elif c in 'pP': # P -> change port XXX reader thread would exit
- elif c in 'bB': # B -> change baudrate
- sys.stderr.write('\n--- Baudrate: ')
- sys.stderr.flush()
- console.cleanup()
- backup = self.serial.baudrate
- try:
- self.serial.baudrate = int(sys.stdin.readline().strip())
- except ValueError, e:
- sys.stderr.write('--- ERROR setting baudrate: %s ---\n' % (e,))
- self.serial.baudrate = backup
- else:
- self.dump_port_settings()
- console.setup()
- elif c == '8': # 8 -> change to 8 bits
- self.serial.bytesize = serial.EIGHTBITS
- self.dump_port_settings()
- elif c == '7': # 7 -> change to 8 bits
- self.serial.bytesize = serial.SEVENBITS
- self.dump_port_settings()
- elif c in 'eE': # E -> change to even parity
- self.serial.parity = serial.PARITY_EVEN
- self.dump_port_settings()
- elif c in 'oO': # O -> change to odd parity
- self.serial.parity = serial.PARITY_ODD
- self.dump_port_settings()
- elif c in 'mM': # M -> change to mark parity
- self.serial.parity = serial.PARITY_MARK
- self.dump_port_settings()
- elif c in 'sS': # S -> change to space parity
- self.serial.parity = serial.PARITY_SPACE
- self.dump_port_settings()
- elif c in 'nN': # N -> change to no parity
- self.serial.parity = serial.PARITY_NONE
- self.dump_port_settings()
- elif c == '1': # 1 -> change to 1 stop bits
- self.serial.stopbits = serial.STOPBITS_ONE
- self.dump_port_settings()
- elif c == '2': # 2 -> change to 2 stop bits
- self.serial.stopbits = serial.STOPBITS_TWO
- self.dump_port_settings()
- elif c == '3': # 3 -> change to 1.5 stop bits
- self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE
- self.dump_port_settings()
- elif c in 'xX': # X -> change software flow control
- self.serial.xonxoff = (c == 'X')
- self.dump_port_settings()
- elif c in 'rR': # R -> change hardware flow control
- self.serial.rtscts = (c == 'R')
- self.dump_port_settings()
- else:
- sys.stderr.write('--- unknown menu character %s --\n' % key_description(c))
- menu_active = False
- elif c == MENUCHARACTER: # next char will be for menu
- menu_active = True
- elif c == EXITCHARCTER:
- self.stop()
- break # exit app
- elif c == '\n':
- self.serial.write(self.newline) # send newline character(s)
- if self.echo:
- sys.stdout.write(c) # local echo is a real newline in any case
- sys.stdout.flush()
- else:
- self.serial.write(c) # send character
- if self.echo:
- sys.stdout.write(c)
- sys.stdout.flush()
- except:
- self.alive = False
- raise
-
-def main():
- import optparse
-
- parser = optparse.OptionParser(
- usage = "%prog [options] [port [baudrate]]",
- description = "Miniterm - A simple terminal program for the serial port."
- )
-
- parser.add_option("-p", "--port",
- dest = "port",
- help = "port, a number (default 0) or a device name (deprecated option)",
- default = None
- )
-
- parser.add_option("-b", "--baud",
- dest = "baudrate",
- action = "store",
- type = 'int',
- help = "set baud rate, default %default",
- default = 9600
- )
-
- parser.add_option("--parity",
- dest = "parity",
- action = "store",
- help = "set parity, one of [N, E, O, S, M], default=N",
- default = 'N'
- )
-
- parser.add_option("-e", "--echo",
- dest = "echo",
- action = "store_true",
- help = "enable local echo (default off)",
- default = False
- )
-
- parser.add_option("--rtscts",
- dest = "rtscts",
- action = "store_true",
- help = "enable RTS/CTS flow control (default off)",
- default = False
- )
-
- parser.add_option("--xonxoff",
- dest = "xonxoff",
- action = "store_true",
- help = "enable software flow control (default off)",
- default = False
- )
-
- parser.add_option("--cr",
- dest = "cr",
- action = "store_true",
- help = "do not send CR+LF, send CR only",
- default = False
- )
-
- parser.add_option("--lf",
- dest = "lf",
- action = "store_true",
- help = "do not send CR+LF, send LF only",
- default = False
- )
-
- parser.add_option("-D", "--debug",
- dest = "repr_mode",
- action = "count",
- help = """debug received data (escape non-printable chars)
---debug can be given multiple times:
-0: just print what is received
-1: escape non-printable characters, do newlines as unusual
-2: escape non-printable characters, newlines too
-3: hex dump everything""",
- default = 0
- )
-
- parser.add_option("--rts",
- dest = "rts_state",
- action = "store",
- type = 'int',
- help = "set initial RTS line state (possible values: 0, 1)",
- default = None
- )
-
- parser.add_option("--dtr",
- dest = "dtr_state",
- action = "store",
- type = 'int',
- help = "set initial DTR line state (possible values: 0, 1)",
- default = None
- )
-
- parser.add_option("-q", "--quiet",
- dest = "quiet",
- action = "store_true",
- help = "suppress non error messages",
- default = False
- )
-
- parser.add_option("--exit-char",
- dest = "exit_char",
- action = "store",
- type = 'int',
- help = "ASCII code of special character that is used to exit the application",
- default = 0x1d
- )
-
- parser.add_option("--menu-char",
- dest = "menu_char",
- action = "store",
- type = 'int',
- help = "ASCII code of special character that is used to control miniterm (menu)",
- default = 0x14
- )
-
- (options, args) = parser.parse_args()
-
- options.parity = options.parity.upper()
- if options.parity not in 'NEOSM':
- parser.error("invalid parity")
-
- if options.cr and options.lf:
- parser.error("only one of --cr or --lf can be specified")
-
- if options.dtr_state is not None and options.rts_state is not None and options.dtr_state == options.rts_state:
- parser.error('--exit-char can not be the same as --menu-char')
-
- global EXITCHARCTER, MENUCHARACTER
- EXITCHARCTER = chr(options.exit_char)
- MENUCHARACTER = chr(options.menu_char)
-
- port = options.port
- baudrate = options.baudrate
- if args:
- if options.port is not None:
- parser.error("no arguments are allowed, options only when --port is given")
- port = args.pop(0)
- if args:
- try:
- baudrate = int(args[0])
- except ValueError:
- parser.error("baud rate must be a number, not %r" % args[0])
- args.pop(0)
- if args:
- parser.error("too many arguments")
- else:
- if port is None: port = 0
-
- convert_outgoing = CONVERT_CRLF
- if options.cr:
- convert_outgoing = CONVERT_CR
- elif options.lf:
- convert_outgoing = CONVERT_LF
-
- try:
- miniterm = Miniterm(
- port,
- baudrate,
- options.parity,
- rtscts=options.rtscts,
- xonxoff=options.xonxoff,
- echo=options.echo,
- convert_outgoing=convert_outgoing,
- repr_mode=options.repr_mode,
- )
- except serial.SerialException:
- sys.stderr.write("could not open port %r\n" % port)
- sys.exit(1)
-
- if not options.quiet:
- sys.stderr.write('--- Miniterm on %s: %d,%s,%s,%s ---\n' % (
- miniterm.serial.portstr,
- miniterm.serial.baudrate,
- miniterm.serial.bytesize,
- miniterm.serial.parity,
- miniterm.serial.stopbits,
- ))
- sys.stderr.write('--- Quit: %s | Menu: %s | Help: %s followed by %s ---\n' % (
- key_description(EXITCHARCTER),
- key_description(MENUCHARACTER),
- key_description(MENUCHARACTER),
- key_description('\x08'),
- ))
-
- if options.dtr_state is not None:
- if not options.quiet:
- sys.stderr.write('--- forcing DTR %s\n' % (options.dtr_state and 'active' or 'inactive'))
- miniterm.serial.setDTR(options.dtr_state)
- miniterm.dtr_state = options.dtr_state
- if options.rts_state is not None:
- if not options.quiet:
- sys.stderr.write('--- forcing RTS %s\n' % (options.rts_state and 'active' or 'inactive'))
- miniterm.serial.setRTS(options.rts_state)
- miniterm.rts_state = options.rts_state
-
- miniterm.start()
- miniterm.join(True)
- if not options.quiet:
- sys.stderr.write("\n--- exit ---\n")
- miniterm.join()
-
-
-if __name__ == '__main__':
- main()
diff --git a/pyserial/examples/port_publisher.py b/pyserial/examples/port_publisher.py
deleted file mode 100644
index e695a98..0000000
--- a/pyserial/examples/port_publisher.py
+++ /dev/null
@@ -1,451 +0,0 @@
-#! /usr/bin/env python
-"""\
-Multi-port serial<->TCP/IP forwarder.
-- check existence of serial port periodically
-- start/stop forwarders
-- each forwarder creates a server socket and opens the serial port
-- serial ports are opened only once. network connect/disconnect
- does not influence serial port
-- only one client per connection
-"""
-import sys, os, time
-import socket
-import select
-import serial
-import traceback
-import avahi
-import dbus
-
-class ZeroconfService:
- """\
- A simple class to publish a network service with zeroconf using avahi.
- """
-
- def __init__(self, name, port, stype="_http._tcp",
- domain="", host="", text=""):
- self.name = name
- self.stype = stype
- self.domain = domain
- self.host = host
- self.port = port
- self.text = text
- self.group = None
-
- def publish(self):
- bus = dbus.SystemBus()
- server = dbus.Interface(
- bus.get_object(
- avahi.DBUS_NAME,
- avahi.DBUS_PATH_SERVER
- ),
- avahi.DBUS_INTERFACE_SERVER
- )
-
- g = dbus.Interface(
- bus.get_object(
- avahi.DBUS_NAME,
- server.EntryGroupNew()
- ),
- avahi.DBUS_INTERFACE_ENTRY_GROUP
- )
-
- g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
- self.name, self.stype, self.domain, self.host,
- dbus.UInt16(self.port), self.text)
-
- g.Commit()
- self.group = g
-
- def unpublish(self):
- if self.group is not None:
- self.group.Reset()
- self.group = None
-
- def __str__(self):
- return "%r @ %s:%s (%s)" % (self.name, self.host, self.port, self.stype)
-
-
-
-class Forwarder(ZeroconfService):
- """\
- Single port serial<->TCP/IP forarder that depends on an external select
- loop. Zeroconf publish/unpublish on open/close.
- """
-
- def __init__(self, device, name, network_port, on_close=None):
- ZeroconfService.__init__(self, name, network_port, stype='_serial_port._tcp')
- self.alive = False
- self.network_port = network_port
- self.on_close = on_close
- self.device = device
- self.serial = serial.Serial()
- self.serial.port = device
- self.serial.baudrate = 115200
- self.serial.timeout = 0
- self.socket = None
- self.server_socket = None
-
- def __del__(self):
- try:
- if self.alive: self.close()
- except:
- pass # XXX errors on shutdown
-
- def open(self):
- """open serial port, start network server and publish service"""
- self.buffer_net2ser = ''
- self.buffer_ser2net = ''
-
- # open serial port
- try:
- self.serial.open()
- self.serial.setRTS(False)
- except Exception, msg:
- self.handle_serial_error(msg)
-
- # start the socket server
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR,
- self.server_socket.getsockopt(
- socket.SOL_SOCKET,
- socket.SO_REUSEADDR
- ) | 1
- )
- self.server_socket.setblocking(0)
- try:
- self.server_socket.bind( ('', self.network_port) )
- self.server_socket.listen(1)
- except socket.error, msg:
- self.handle_server_error()
- #~ raise
- if not options.quiet:
- print "%s: Waiting for connection on %s..." % (self.device, self.network_port)
-
- # zeroconfig
- self.publish()
-
- # now we are ready
- self.alive = True
-
- def close(self):
- """Close all resources and unpublish service"""
- if not options.quiet:
- print "%s: closing..." % (self.device, )
- self.alive = False
- self.unpublish()
- if self.server_socket: self.server_socket.close()
- if self.socket:
- self.handle_disconnect()
- self.serial.close()
- if self.on_close is not None:
- # ensure it is only called once
- callback = self.on_close
- self.on_close = None
- callback(self)
-
- def update_select_maps(self, read_map, write_map, error_map):
- """Update dictionaries for select call. insert fd->callback mapping"""
- if self.alive:
- # always handle serial port reads
- read_map[self.serial] = self.handle_serial_read
- error_map[self.serial] = self.handle_serial_error
- # handle serial port writes if buffer is not empty
- if self.buffer_net2ser:
- write_map[self.serial] = self.handle_serial_write
- # handle network
- if self.socket is not None:
- # handle socket if connected
- # only read from network if the internal buffer is not
- # already filled. the TCP flow control will hold back data
- if len(self.buffer_net2ser) < 2048:
- read_map[self.socket] = self.handle_socket_read
- # only check for write readiness when there is data
- if self.buffer_ser2net:
- write_map[self.socket] = self.handle_socket_write
- error_map[self.socket] = self.handle_socket_error
- else:
- # no connection, ensure clear buffer
- self.buffer_ser2net = ''
- # check the server socket
- read_map[self.server_socket] = self.handle_connect
- error_map[self.server_socket] = self.handle_server_error
-
-
- def handle_serial_read(self):
- """Reading from serial port"""
- try:
- data = os.read(self.serial.fileno(), 1024)
- if data:
- # store data in buffer if there is a client connected
- if self.socket is not None:
- self.buffer_ser2net += data
- else:
- self.handle_serial_error()
- except Exception, msg:
- self.handle_serial_error(msg)
-
- def handle_serial_write(self):
- """Writing to serial port"""
- try:
- # write a chunk
- n = os.write(self.serial.fileno(), self.buffer_net2ser)
- # and see how large that chunk was, remove that from buffer
- self.buffer_net2ser = self.buffer_net2ser[n:]
- except Exception, msg:
- self.handle_serial_error(msg)
-
- def handle_serial_error(self, error=None):
- """Serial port error"""
- # terminate connection
- self.close()
-
- def handle_socket_read(self):
- """Read from socket"""
- try:
- # read a chunk from the serial port
- data = self.socket.recv(1024)
- if data:
- # add data to buffer
- self.buffer_net2ser += data
- else:
- # empty read indicates disconnection
- self.handle_disconnect()
- except socket.error:
- self.handle_socket_error()
-
- def handle_socket_write(self):
- """Write to socket"""
- try:
- # write a chunk
- count = self.socket.send(self.buffer_ser2net)
- # and remove the sent data from the buffer
- self.buffer_ser2net = self.buffer_ser2net[count:]
- except socket.error:
- self.handle_socket_error()
-
- def handle_socket_error(self):
- """Socket connection fails"""
- self.handle_disconnect()
-
- def handle_connect(self):
- """Server socket gets a connection"""
- # accept a connection in any case, close connection
- # below if already busy
- connection, addr = self.server_socket.accept()
- if self.socket is None:
- self.socket = connection
- self.socket.setblocking(0)
- if not options.quiet:
- print '%s: Connected by %s:%s' % (self.device, addr[0], addr[1])
- else:
- # reject connection if there is already one
- connection.close()
- if not options.quiet:
- print '%s: Rejecting connect from %s:%s' % (self.device, addr[0], addr[1])
-
- def handle_server_error(self):
- """Socker server fails"""
- self.close()
-
- def handle_disconnect(self):
- """Socket gets disconnected"""
- # clear send buffer
- self.buffer_ser2net = ''
- # close network connection
- if self.socket is not None:
- self.socket.close()
- self.socket = None
- if not options.quiet:
- print '%s: Disconnected' % self.device
-
-
-def test():
- service = ZeroconfService(name="TestService", port=3000)
- service.publish()
- raw_input("Press any key to unpublish the service ")
- service.unpublish()
-
-
-if __name__ == '__main__':
- import optparse
-
- parser = optparse.OptionParser(usage="""\
-%prog [options]
-
-Announce the existence of devices using zeroconf and provide
-a TCP/IP <-> serial port gateway.
-
-Note that the TCP/IP server is not protected. Everyone can connect
-to it!
-
-If running as daemon, write to syslog. Otherwise write to stdout.
-""")
-
- parser.add_option("-q", "--quiet", dest="quiet", action="store_true",
- help="suppress non error messages", default=False)
-
- parser.add_option("-o", "--logfile", dest="log_file",
- help="write messages file instead of stdout", default=None, metavar="FILE")
-
- parser.add_option("-d", "--daemon", dest="daemonize", action="store_true",
- help="start as daemon", default=False)
-
- parser.add_option("", "--pidfile", dest="pid_file",
- help="specify a name for the PID file", default=None, metavar="FILE")
-
- (options, args) = parser.parse_args()
-
- # redirect output if specified
- if options.log_file is not None:
- class WriteFlushed:
- def __init__(self, fileobj):
- self.fileobj = fileobj
- def write(self, s):
- self.fileobj.write(s)
- self.fileobj.flush()
- def close(self):
- self.fileobj.close()
- sys.stdout = sys.stderr = WriteFlushed(open(options.log_file, 'a'))
- # atexit.register(lambda: sys.stdout.close())
-
- if options.daemonize:
- # if running as daemon is requested, do the fork magic
- # options.quiet = True
- import pwd
- # do the UNIX double-fork magic, see Stevens' "Advanced
- # Programming in the UNIX Environment" for details (ISBN 0201563177)
- try:
- pid = os.fork()
- if pid > 0:
- # exit first parent
- sys.exit(0)
- except OSError, e:
- sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
- sys.exit(1)
-
- # decouple from parent environment
- os.chdir("/") # don't prevent unmounting....
- os.setsid()
- os.umask(0)
-
- # do second fork
- try:
- pid = os.fork()
- if pid > 0:
- # exit from second parent, print eventual PID before
- # print "Daemon PID %d" % pid
- if options.pid_file is not None:
- open(options.pid_file,'w').write("%d"%pid)
- sys.exit(0)
- except OSError, e:
- sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
- sys.exit(1)
-
- if options.log_file is None:
- import syslog
- syslog.openlog("serial port publisher")
- # redirect output to syslog
- class WriteToSysLog:
- def __init__(self):
- self.buffer = ''
- def write(self, s):
- self.buffer += s
- if '\n' in self.buffer:
- output, self.buffer = self.buffer.split('\n', 1)
- syslog.syslog(output)
- def flush(self):
- syslog.syslog(self.buffer)
- self.buffer = ''
- def close(self):
- self.flush()
- sys.stdout = sys.stderr = WriteToSysLog()
-
- # ensure the that the daemon runs a normal user, if run as root
- #if os.getuid() == 0:
- # name, passwd, uid, gid, desc, home, shell = pwd.getpwnam('someuser')
- # os.setgid(gid) # set group first
- # os.setuid(uid) # set user
-
- # keep the published stuff in a dictionary
- published = {}
- # prepare list of device names (hard coded)
- device_list = ['/dev/ttyUSB%d' % p for p in range(8)]
- # get a nice hostname
- hostname = socket.gethostname()
-
- def unpublish(forwarder):
- """when forwarders die, we need to unregister them"""
- try:
- del published[forwarder.device]
- except KeyError:
- pass
- else:
- if not options.quiet: print "unpublish: %s" % (forwarder)
-
- alive = True
- next_check = 0
- # main loop
- while alive:
- try:
- # if it is time, check for serial port devices
- now = time.time()
- if now > next_check:
- next_check = now + 5
- # check each device
- for device in device_list:
- # if it appeared
- if os.path.exists(device):
- if device not in published:
- num = int(device[-1])
- published[device] = Forwarder(
- device,
- "%s on %s" % (device, hostname),
- 7000+num,
- on_close=unpublish
- )
- if not options.quiet: print "publish: %s" % (published[device])
- published[device].open()
- else:
- # or when it disappeared
- if device in published:
- if not options.quiet: print "unpublish: %s" % (published[device])
- published[device].close()
- try:
- del published[device]
- except KeyError:
- pass
-
- # select_start = time.time()
- read_map = {}
- write_map = {}
- error_map = {}
- for publisher in published.values():
- publisher.update_select_maps(read_map, write_map, error_map)
- try:
- readers, writers, errors = select.select(
- read_map.keys(),
- write_map.keys(),
- error_map.keys(),
- 5
- )
- except select.error, err:
- if err[0] != EINTR:
- raise
- # select_end = time.time()
- # print "select used %.3f s" % (select_end - select_start)
- for reader in readers:
- read_map[reader]()
- for writer in writers:
- write_map[writer]()
- for error in errors:
- error_map[error]()
- # print "operation used %.3f s" % (time.time() - select_end)
- except KeyboardInterrupt:
- alive = False
- except SystemExit:
- raise
- except:
- raise
- traceback.print_exc()
diff --git a/pyserial/examples/port_publisher.sh b/pyserial/examples/port_publisher.sh
deleted file mode 100644
index 50d4f17..0000000
--- a/pyserial/examples/port_publisher.sh
+++ /dev/null
@@ -1,44 +0,0 @@
-#! /bin/sh
-# daemon starter script
-# based on skeleton from Debian GNU/Linux
-# cliechti at gmx.net
-
-PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
-DAEMON=/usr/local/bin/port_publisher.py
-NAME=port_publisher
-DESC="serial port avahi device publisher"
-
-test -f $DAEMON || exit 0
-
-set -e
-
-case "$1" in
- start)
- echo -n "Starting $DESC: "
- $DAEMON --daemon --pidfile /var/run/$NAME.pid
- echo "$NAME."
- ;;
- stop)
- echo -n "Stopping $DESC: "
- start-stop-daemon --stop --quiet --pidfile /var/run/$NAME.pid
- # \ --exec $DAEMON
- echo "$NAME."
- ;;
- restart|force-reload)
- echo -n "Restarting $DESC: "
- start-stop-daemon --stop --quiet --pidfile \
- /var/run/$NAME.pid
- # --exec $DAEMON
- sleep 1
- $DAEMON --daemon --pidfile /var/run/$NAME.pid
- echo "$NAME."
- ;;
- *)
- N=/etc/init.d/$NAME
- echo "Usage: $N {start|stop|restart|force-reload}" >&2
- exit 1
- ;;
-esac
-
-exit 0
-
diff --git a/pyserial/examples/scan.py b/pyserial/examples/scan.py
deleted file mode 100644
index 82c5458..0000000
--- a/pyserial/examples/scan.py
+++ /dev/null
@@ -1,30 +0,0 @@
-#! /usr/bin/env python
-"""\
-Scan for serial ports.
-
-Part of pySerial (http://pyserial.sf.net)
-(C) 2002-2003 <cliechti@gmx.net>
-
-The scan function of this module tries to open each port number
-from 0 to 255 and it builds a list of those ports where this was
-successful.
-"""
-
-import serial
-
-def scan():
- """scan for available ports. return a list of tuples (num, name)"""
- available = []
- for i in range(256):
- try:
- s = serial.Serial(i)
- available.append( (i, s.portstr))
- s.close() # explicit close 'cause of delayed GC in java
- except serial.SerialException:
- pass
- return available
-
-if __name__=='__main__':
- print "Found ports:"
- for n,s in scan():
- print "(%d) %s" % (n,s)
diff --git a/pyserial/examples/scanlinux.py b/pyserial/examples/scanlinux.py
deleted file mode 100644
index 7cf6383..0000000
--- a/pyserial/examples/scanlinux.py
+++ /dev/null
@@ -1,20 +0,0 @@
-#! /usr/bin/env python
-"""\
-Scan for serial ports. Linux specific variant that also includes USB/Serial
-adapters.
-
-Part of pySerial (http://pyserial.sf.net)
-(C) 2009 <cliechti@gmx.net>
-"""
-
-import serial
-import glob
-
-def scan():
- """scan for available ports. return a list of device names."""
- return glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*')
-
-if __name__=='__main__':
- print "Found ports:"
- for name in scan():
- print name
diff --git a/pyserial/examples/scanwin32.py b/pyserial/examples/scanwin32.py
deleted file mode 100644
index 6d58d89..0000000
--- a/pyserial/examples/scanwin32.py
+++ /dev/null
@@ -1,200 +0,0 @@
-import ctypes
-import re
-
-def ValidHandle(value):
- if value == 0:
- raise ctypes.WinError()
- return value
-
-NULL = 0
-HDEVINFO = ctypes.c_int
-BOOL = ctypes.c_int
-CHAR = ctypes.c_char
-PCTSTR = ctypes.c_char_p
-HWND = ctypes.c_uint
-DWORD = ctypes.c_ulong
-PDWORD = ctypes.POINTER(DWORD)
-ULONG = ctypes.c_ulong
-ULONG_PTR = ctypes.POINTER(ULONG)
-#~ PBYTE = ctypes.c_char_p
-PBYTE = ctypes.c_void_p
-
-class GUID(ctypes.Structure):
- _fields_ = [
- ('Data1', ctypes.c_ulong),
- ('Data2', ctypes.c_ushort),
- ('Data3', ctypes.c_ushort),
- ('Data4', ctypes.c_ubyte*8),
- ]
- def __str__(self):
- return "{%08x-%04x-%04x-%s-%s}" % (
- self.Data1,
- self.Data2,
- self.Data3,
- ''.join(["%02x" % d for d in self.Data4[:2]]),
- ''.join(["%02x" % d for d in self.Data4[2:]]),
- )
-
-class SP_DEVINFO_DATA(ctypes.Structure):
- _fields_ = [
- ('cbSize', DWORD),
- ('ClassGuid', GUID),
- ('DevInst', DWORD),
- ('Reserved', ULONG_PTR),
- ]
- def __str__(self):
- return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
-PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
-
-class SP_DEVICE_INTERFACE_DATA(ctypes.Structure):
- _fields_ = [
- ('cbSize', DWORD),
- ('InterfaceClassGuid', GUID),
- ('Flags', DWORD),
- ('Reserved', ULONG_PTR),
- ]
- def __str__(self):
- return "InterfaceClassGuid:%s Flags:%s" % (self.InterfaceClassGuid, self.Flags)
-
-PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA)
-
-PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
-
-class dummy(ctypes.Structure):
- _fields_=[("d1",ctypes.DWORD), ("d2",ctypes.CHAR)]
-SIZEOF_SP_DEVICE_INTERFACE_DETAIL_DATA_A = ctypes.sizeof(dummy)
-
-SetupDiDestroyDeviceInfoList = ctypes.windll.setupapi.SetupDiDestroyDeviceInfoList
-SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
-SetupDiDestroyDeviceInfoList.restype = BOOL
-
-SetupDiGetClassDevs = ctypes.windll.setupapi.SetupDiGetClassDevsA
-SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
-SetupDiGetClassDevs.restype = ValidHandle # HDEVINFO
-
-SetupDiEnumDeviceInterfaces = ctypes.windll.setupapi.SetupDiEnumDeviceInterfaces
-SetupDiEnumDeviceInterfaces.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, ctypes.POINTER(GUID), DWORD, PSP_DEVICE_INTERFACE_DATA]
-SetupDiEnumDeviceInterfaces.restype = BOOL
-
-SetupDiGetDeviceInterfaceDetail = ctypes.windll.setupapi.SetupDiGetDeviceInterfaceDetailA
-SetupDiGetDeviceInterfaceDetail.argtypes = [HDEVINFO, PSP_DEVICE_INTERFACE_DATA, PSP_DEVICE_INTERFACE_DETAIL_DATA, DWORD, PDWORD, PSP_DEVINFO_DATA]
-SetupDiGetDeviceInterfaceDetail.restype = BOOL
-
-SetupDiGetDeviceRegistryProperty = ctypes.windll.setupapi.SetupDiGetDeviceRegistryPropertyA
-SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
-SetupDiGetDeviceRegistryProperty.restype = BOOL
-
-
-GUID_CLASS_COMPORT = GUID(0x86e0d1e0L, 0x8089, 0x11d0,
- (ctypes.c_ubyte*8)(0x9c, 0xe4, 0x08, 0x00, 0x3e, 0x30, 0x1f, 0x73))
-
-DIGCF_PRESENT = 2
-DIGCF_DEVICEINTERFACE = 16
-INVALID_HANDLE_VALUE = 0
-ERROR_INSUFFICIENT_BUFFER = 122
-SPDRP_HARDWAREID = 1
-SPDRP_FRIENDLYNAME = 12
-ERROR_NO_MORE_ITEMS = 259
-
-def comports(available_only=True):
- """This generator scans the device registry for com ports and yields port, desc, hwid.
- If available_only is true only return currently existing ports."""
- flags = DIGCF_DEVICEINTERFACE
- if available_only:
- flags |= DIGCF_PRESENT
- g_hdi = SetupDiGetClassDevs(ctypes.byref(GUID_CLASS_COMPORT), None, NULL, flags);
- #~ for i in range(256):
- for dwIndex in range(256):
- did = SP_DEVICE_INTERFACE_DATA()
- did.cbSize = ctypes.sizeof(did)
-
- if not SetupDiEnumDeviceInterfaces(
- g_hdi,
- None,
- ctypes.byref(GUID_CLASS_COMPORT),
- dwIndex,
- ctypes.byref(did)
- ):
- if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS:
- raise ctypes.WinError()
- break
-
- dwNeeded = DWORD()
- # get the size
- if not SetupDiGetDeviceInterfaceDetail(
- g_hdi,
- ctypes.byref(did),
- None, 0, ctypes.byref(dwNeeded),
- None
- ):
- # Ignore ERROR_INSUFFICIENT_BUFFER
- if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
- raise ctypes.WinError()
- # allocate buffer
- class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure):
- _fields_ = [
- ('cbSize', DWORD),
- ('DevicePath', CHAR*(dwNeeded.value - ctypes.sizeof(DWORD))),
- ]
- def __str__(self):
- return "DevicePath:%s" % (self.DevicePath,)
- idd = SP_DEVICE_INTERFACE_DETAIL_DATA_A()
- idd.cbSize = SIZEOF_SP_DEVICE_INTERFACE_DETAIL_DATA_A
- devinfo = SP_DEVINFO_DATA()
- devinfo.cbSize = ctypes.sizeof(devinfo)
- if not SetupDiGetDeviceInterfaceDetail(
- g_hdi,
- ctypes.byref(did),
- ctypes.byref(idd), dwNeeded, None,
- ctypes.byref(devinfo)
- ):
- raise ctypes.WinError()
-
- # hardware ID
- szHardwareID = ctypes.create_string_buffer('\0' * 250)
- if not SetupDiGetDeviceRegistryProperty(
- g_hdi,
- ctypes.byref(devinfo),
- SPDRP_HARDWAREID,
- None,
- ctypes.byref(szHardwareID), ctypes.sizeof(szHardwareID) - 1,
- None
- ):
- # Ignore ERROR_INSUFFICIENT_BUFFER
- if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
- raise ctypes.WinError()
-
- # friendly name
- szFriendlyName = ctypes.create_string_buffer('\0' * 250)
- if not SetupDiGetDeviceRegistryProperty(
- g_hdi,
- ctypes.byref(devinfo),
- SPDRP_FRIENDLYNAME,
- None,
- ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1,
- None
- ):
- # Ignore ERROR_INSUFFICIENT_BUFFER
- if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
- raise ctypes.WinError()
- try:
- port_name = re.search(r"\((.*)\)", szFriendlyName.value).group(1)
- except AttributeError,msg:
- port_name = "???"
- yield port_name, szFriendlyName.value, szHardwareID.value
-
- SetupDiDestroyDeviceInfoList(g_hdi)
-
-
-if __name__ == '__main__':
- import serial
- for port, desc, hwid in comports():
- try:
- print "%s: %s (%s)" % (port, desc, hwid)
- print " "*10, serial.Serial(port) #test open
- except serial.serialutil.SerialException:
- print "can't be openend."
- # list of all ports the system knows
- print "-"*60
- for port, desc, hwid in comports(False):
- print "%-10s: %s (%s)" % (port, desc, hwid)
diff --git a/pyserial/examples/setup-miniterm-py2exe.py b/pyserial/examples/setup-miniterm-py2exe.py
deleted file mode 100644
index e935cf0..0000000
--- a/pyserial/examples/setup-miniterm-py2exe.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# setup script for py2exe to create the miniterm.exe
-# $Id: setup-miniterm-py2exe.py,v 1.1 2005-09-21 19:51:19 cliechti Exp $
-
-from distutils.core import setup
-import glob, sys, py2exe, os
-
-sys.path.append('..')
-
-sys.argv.extend("py2exe --bundle 1".split())
-
-setup(
- name='miniterm',
- #~ version='0.5',
- zipfile=None,
- options = {"py2exe":
- {
- 'dist_dir': 'bin',
- 'excludes': ['javax.comm'],
- 'compressed': 1,
- }
- },
- console = [
- #~ "miniterm_exe_wrapper.py",
- "miniterm.py",
- ],
-)
diff --git a/pyserial/examples/setup_demo.py b/pyserial/examples/setup_demo.py
deleted file mode 100644
index 854c0b9..0000000
--- a/pyserial/examples/setup_demo.py
+++ /dev/null
@@ -1,35 +0,0 @@
-# This is a setup.py example script for the use with py2exe
-from distutils.core import setup
-import py2exe
-import sys, os
-
-#this script is only useful for py2exe so just run that distutils command.
-#that allows to run it with a simple double click.
-sys.argv.append('py2exe')
-
-#get an icon from somewhere.. the python installation should have one:
-icon = os.path.join(os.path.dirname(sys.executable), 'py.ico')
-
-setup(
- options = {'py2exe': {
- 'excludes': ['javax.comm'],
- 'optimize': 2,
- 'dist_dir': 'dist',
- }
- },
-
- name = "wxTerminal",
- windows = [
- {
- 'script': "wxTerminal.py",
- 'icon_resources': [(0x0004, icon)]
- },
- ],
- zipfile = "stuff.lib",
-
- description = "Simple serial terminal application",
- version = "0.1",
- author = "Chris Liechti",
- author_email = "cliechti@gmx.net",
- url = "http://pyserial.sf.net",
-)
diff --git a/pyserial/examples/tcp_serial_redirect.py b/pyserial/examples/tcp_serial_redirect.py
deleted file mode 100644
index e62d8a5..0000000
--- a/pyserial/examples/tcp_serial_redirect.py
+++ /dev/null
@@ -1,305 +0,0 @@
-#!/usr/bin/env python
-
-# (C) 2002-2009 Chris Liechti <cliechti@gmx.net>
-# redirect data from a TCP/IP connection to a serial port and vice versa
-# requires Python 2.2 'cause socket.sendall is used
-
-
-import sys, os, serial, threading, socket, codecs
-
-try:
- True
-except NameError:
- True = 1
- False = 0
-
-class Redirector:
- def __init__(self, serial, socket, ser_newline=None, net_newline=None, spy=False):
- self.serial = serial
- self.socket = socket
- self.ser_newline = ser_newline
- self.net_newline = net_newline
- self.spy = spy
-
- def shortcut(self):
- """connect the serial port to the tcp port by copying everything
- from one side to the other"""
- self.alive = True
- self.thread_read = threading.Thread(target=self.reader)
- self.thread_read.setDaemon(1)
- self.thread_read.start()
- self.writer()
-
- def reader(self):
- """loop forever and copy serial->socket"""
- while self.alive:
- try:
- data = self.serial.read(1) # read one, blocking
- n = self.serial.inWaiting() # look if there is more
- if n:
- data = data + self.serial.read(n) # and get as much as possible
- if data:
- # the spy shows what's on the serial port, so log it before converting newlines
- if self.spy:
- sys.stdout.write(codecs.escape_encode(data)[0])
- sys.stdout.flush()
- if self.ser_newline and self.net_newline:
- # do the newline conversion
- # XXX fails for CR+LF in input when it is cut in half at the begin or end of the string
- data = net_newline.join(data.split(ser_newline))
- self.socket.sendall(data) # send it over TCP
- except socket.error, msg:
- sys.stderr.write('ERROR: %s\n' % msg)
- # probably got disconnected
- break
- self.alive = False
-
- def writer(self):
- """loop forever and copy socket->serial"""
- while self.alive:
- try:
- data = self.socket.recv(1024)
- if not data:
- break
- if self.ser_newline and self.net_newline:
- # do the newline conversion
- # XXX fails for CR+LF in input when it is cut in half at the begin or end of the string
- data = ser_newline.join(data.split(net_newline))
- self.serial.write(data) # get a bunch of bytes and send them
- # the spy shows what's on the serial port, so log it after converting newlines
- if self.spy:
- sys.stdout.write(codecs.escape_encode(data)[0])
- sys.stdout.flush()
- except socket.error, msg:
- sys.stderr.write('ERROR: %s\n' % msg)
- # probably got disconnected
- break
- self.alive = False
- self.thread_read.join()
-
- def stop(self):
- """Stop copying"""
- if self.alive:
- self.alive = False
- self.thread_read.join()
-
-
-if __name__ == '__main__':
- import optparse
-
- parser = optparse.OptionParser(
- usage = "%prog [options] [port [baudrate]]",
- description = "Simple Serial to Network (TCP/IP) redirector.",
- epilog = """\
-NOTE: no security measures are implemented. Anyone can remotely connect
-to this service over the network.
-
-Only one connection at once is supported. When the connection is terminated
-it waits for the next connect.
-""")
-
- parser.add_option("-q", "--quiet",
- dest = "quiet",
- action = "store_true",
- help = "suppress non error messages",
- default = False
- )
-
- parser.add_option("--spy",
- dest = "spy",
- action = "store_true",
- help = "peek at the communication and print all data to the console",
- default = False
- )
-
- group = optparse.OptionGroup(parser,
- "Serial Port",
- "Serial port settings"
- )
- parser.add_option_group(group)
-
- group.add_option("-p", "--port",
- dest = "port",
- help = "port, a number (default 0) or a device name",
- default = None
- )
-
- group.add_option("-b", "--baud",
- dest = "baudrate",
- action = "store",
- type = 'int',
- help = "set baud rate, default: %default",
- default = 9600
- )
-
- group.add_option("", "--parity",
- dest = "parity",
- action = "store",
- help = "set parity, one of [N, E, O], default=%default",
- default = 'N'
- )
-
- group.add_option("--rtscts",
- dest = "rtscts",
- action = "store_true",
- help = "enable RTS/CTS flow control (default off)",
- default = False
- )
-
- group.add_option("--xonxoff",
- dest = "xonxoff",
- action = "store_true",
- help = "enable software flow control (default off)",
- default = False
- )
-
- group.add_option("--rts",
- dest = "rts_state",
- action = "store",
- type = 'int',
- help = "set initial RTS line state (possible values: 0, 1)",
- default = None
- )
-
- group.add_option("--dtr",
- dest = "dtr_state",
- action = "store",
- type = 'int',
- help = "set initial DTR line state (possible values: 0, 1)",
- default = None
- )
-
- group = optparse.OptionGroup(parser,
- "Network settings",
- "Network configuration."
- )
- parser.add_option_group(group)
-
- group.add_option("-P", "--localport",
- dest = "local_port",
- action = "store",
- type = 'int',
- help = "local TCP port",
- default = 7777
- )
-
- group = optparse.OptionGroup(parser,
- "Newline Settings",
- "Convert newlines between network and serial port. Conversion is normally disabled and can be enabled by --convert."
- )
- parser.add_option_group(group)
-
- group.add_option("-c", "--convert",
- dest = "convert",
- action = "store_true",
- help = "enable newline conversion (default off)",
- default = False
- )
-
- group.add_option("--net-nl",
- dest = "net_newline",
- action = "store",
- help = "type of newlines that are expected on the network (default: %default)",
- default = "LF"
- )
-
- group.add_option("--ser-nl",
- dest = "ser_newline",
- action = "store",
- help = "type of newlines that are expected on the serial port (default: %default)",
- default = "CR+LF"
- )
-
- (options, args) = parser.parse_args()
-
- # get port and baud rate from command line arguments or the option switches
- port = options.port
- baudrate = options.baudrate
- if args:
- if options.port is not None:
- parser.error("no arguments are allowed, options only when --port is given")
- port = args.pop(0)
- if args:
- try:
- baudrate = int(args[0])
- except ValueError:
- parser.error("baud rate must be a number, not %r" % args[0])
- args.pop(0)
- if args:
- parser.error("too many arguments")
- else:
- if port is None: port = 0
-
- # check newline modes for network connection
- mode = options.net_newline.upper()
- if mode == 'CR':
- net_newline = '\r'
- elif mode == 'LF':
- net_newline = '\n'
- elif mode == 'CR+LF' or mode == 'CRLF':
- net_newline = '\r\n'
- else:
- parser.error("Invalid value for --net-nl. Valid are 'CR', 'LF' and 'CR+LF'/'CRLF'.")
-
- # check newline modes for serial connection
- mode = options.ser_newline.upper()
- if mode == 'CR':
- ser_newline = '\r'
- elif mode == 'LF':
- ser_newline = '\n'
- elif mode == 'CR+LF' or mode == 'CRLF':
- ser_newline = '\r\n'
- else:
- parser.error("Invalid value for --ser-nl. Valid are 'CR', 'LF' and 'CR+LF'/'CRLF'.")
-
- # connect to serial port
- ser = serial.Serial()
- ser.port = port
- ser.baudrate = baudrate
- ser.parity = options.parity
- ser.rtscts = options.rtscts
- ser.xonxoff = options.xonxoff
- ser.timeout = 1 # required so that the reader thread can exit
-
- if not options.quiet:
- sys.stderr.write("--- TCP/IP to Serial redirector --- type Ctrl-C / BREAK to quit\n")
- sys.stderr.write("--- %s %s,%s,%s,%s ---\n" % (ser.portstr, ser.baudrate, 8, ser.parity, 1))
-
- try:
- ser.open()
- except serial.SerialException, e:
- sys.stderr.write("Could not open serial port %s: %s\n" % (ser.portstr, e))
- sys.exit(1)
-
- if options.rts_state is not None:
- ser.setRTS(options.rts_state)
-
- if options.dtr_state is not None:
- ser.setDTR(options.dtr_state)
-
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- srv.bind( ('', options.local_port) )
- srv.listen(1)
- while 1:
- try:
- sys.stderr.write("Waiting for connection on %s...\n" % options.local_port)
- connection, addr = srv.accept()
- sys.stderr.write('Connected by %s\n' % (addr,))
- # enter console->serial loop
- r = Redirector(
- ser,
- connection,
- options.convert and ser_newline or None,
- options.convert and net_newline or None,
- options.spy,
- )
- r.shortcut()
- if options.spy: sys.stdout.write('\n')
- sys.stderr.write('Disconnected\n')
- connection.close()
- except socket.error, msg:
- sys.stderr.write('ERROR: %s\n' % msg)
-
- sys.stderr.write('\n--- exit ---\n')
-
diff --git a/pyserial/examples/test.py b/pyserial/examples/test.py
deleted file mode 100644
index 666275f..0000000
--- a/pyserial/examples/test.py
+++ /dev/null
@@ -1,216 +0,0 @@
-#! /usr/bin/env python
-# Python Serial Port Extension for Win32, Linux, BSD, Jython
-# see __init__.py
-#
-# (C) 2001-2008 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-
-"""\
-Some tests for the serial module.
-Part of pyserial (http://pyserial.sf.net) (C)2001-2008 cliechti@gmx.net
-
-Intended to be run on different platforms, to ensure portability of
-the code.
-
-For all these tests a simple hardware is required.
-Loopback HW adapter:
-Shortcut these pin pairs:
- TX <-> RX
- RTS <-> CTS
- DTR <-> DSR
-
-On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
-"""
-
-import unittest, threading, time
-import sys
-import serial
-
-# on which port should the tests be performed:
-PORT=0
-
-if sys.version_info >= (3, 0):
- def data(string):
- return bytes(string, 'latin1')
- bytes_0to255 = [bytes([x]) for x in range(256)]
-else:
- def data(string): return string
- bytes_0to255 = [chr(x) for x in range(256)]
-
-
-class Test4_Nonblocking(unittest.TestCase):
- """Test with timeouts"""
- timeout = 0
-
- def setUp(self):
- self.s = serial.Serial(PORT, timeout=self.timeout)
-
- def tearDown(self):
- self.s.close()
-
- def test0_Messy(self):
- """NonBlocking (timeout=0)"""
- # this is only here to write out the message in verbose mode
- # because Test3 and Test4 print the same messages
-
- def test1_ReadEmpty(self):
- """timeout: After port open, the input buffer must be empty"""
- self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer")
-
- def test2_Loopback(self):
- """timeout: each sent character should return (binary test).
- this is also a test for the binary capability of a port."""
- for c in bytes_0to255:
- self.s.write(c)
- # there might be a small delay until the character is ready (especially on win32)
- time.sleep(0.02)
- self.failUnlessEqual(self.s.inWaiting(), 1, "expected exactly one character for inWainting()")
- self.failUnlessEqual(self.s.read(1), c, "expected a '%s' which was written before" % c)
- self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read")
-
- def test2_LoopbackTimeout(self):
- """timeout: test the timeout/immediate return.
- partial results should be returned."""
- self.s.write(data("HELLO"))
- time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32)
- # read more characters as are available to run in the timeout
- self.failUnlessEqual(self.s.read(10), data('HELLO'), "expected the 'HELLO' which was written before")
- self.failUnlessEqual(self.s.read(1), data(''), "expected empty buffer after all sent chars are read")
-
-
-class Test3_Timeout(Test4_Nonblocking):
- """Same tests as the NonBlocking ones but this time with timeout"""
- timeout = 1
-
- def test0_Messy(self):
- """Blocking (timeout=1)"""
- # this is only here to write out the message in verbose mode
- # because Test3 and Test4 print the same messages
-
-class SendEvent(threading.Thread):
- def __init__(self, serial, delay=1):
- threading.Thread.__init__(self)
- self.serial = serial
- self.delay = delay
- self.x = threading.Event()
- self.stopped = 0
- self.start()
-
- def run(self):
- time.sleep(self.delay)
- if not self.stopped:
- self.serial.write(data("E"))
- self.x.set()
-
- def isSet(self):
- return self.x.isSet()
-
- def stop(self):
- self.stopped = 1
- self.x.wait()
-
-class Test1_Forever(unittest.TestCase):
- """Tests a port with no timeout. These tests require that a
- character is sent after some time to stop the test, this is done
- through the SendEvent class and the Loopback HW."""
- def setUp(self):
- self.s = serial.Serial(PORT, timeout=None)
- self.event = SendEvent(self.s)
-
- def tearDown(self):
- self.event.stop()
- self.s.close()
-
- def test2_ReadEmpty(self):
- """no timeout: after port open, the input buffer must be empty (read).
- a character is sent after some time to terminate the test (SendEvent)."""
- c = self.s.read(1)
- if not (self.event.isSet() and c == data('E')):
- self.fail("expected marker")
-
-class Test2_Forever(unittest.TestCase):
- """Tests a port with no timeout"""
- def setUp(self):
- self.s = serial.Serial(PORT, timeout=None)
-
- def tearDown(self):
- self.s.close()
-
- def test1_inWaitingEmpty(self):
- """no timeout: after port open, the input buffer must be empty (inWaiting)"""
- self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer")
-
- def test2_Loopback(self):
- """no timeout: each sent character should return (binary test).
- this is also a test for the binary capability of a port."""
- for c in bytes_0to255:
- self.s.write(c)
- # there might be a small delay until the character is ready (especially on win32)
- time.sleep(0.02)
- self.failUnlessEqual(self.s.inWaiting(), 1, "expected exactly one character for inWainting()")
- self.failUnlessEqual(self.s.read(1), c, "expected an '%s' which was written before" % c)
- self.failUnlessEqual(self.s.inWaiting(), 0, "expected empty buffer after all sent chars are read")
-
-
-class Test0_DataWires(unittest.TestCase):
- """Test modem control lines"""
- def setUp(self):
- self.s = serial.Serial(PORT)
-
- def tearDown(self):
- self.s.close()
-
- def test1_RTS(self):
- """Test RTS/CTS"""
- self.s.setRTS(0)
- time.sleep(0.1)
- self.failUnless(not self.s.getCTS(), "CTS -> 0")
- self.s.setRTS(1)
- time.sleep(0.1)
- self.failUnless(self.s.getCTS(), "CTS -> 1")
-
- def test2_DTR(self):
- """Test DTR/DSR"""
- self.s.setDTR(0)
- time.sleep(0.1)
- self.failUnless(not self.s.getDSR(), "DSR -> 0")
- self.s.setDTR(1)
- time.sleep(0.1)
- self.failUnless(self.s.getDSR(), "DSR -> 1")
-
- def test3_RI(self):
- """Test RI"""
- self.failUnless(not self.s.getRI(), "RI -> 0")
-
-class Test_MoreTimeouts(unittest.TestCase):
- """Test with timeouts"""
- def setUp(self):
- self.s = serial.Serial() # create an closed serial port
-
- def tearDown(self):
- self.s.close()
-
- def test_WriteTimeout(self):
- """Test write() timeout."""
- # use xonxoff setting and the loop-back adapter to switch traffic on hold
- self.s.port = PORT
- self.s.writeTimeout = 1
- self.s.xonxoff = 1
- self.s.open()
- self.s.write(serial.XOFF)
- time.sleep(0.5) # some systems need a little delay so that they can react on XOFF
- t1 = time.time()
- self.failUnlessRaises(serial.SerialTimeoutException, self.s.write, data("timeout please"*100))
- t2 = time.time()
- self.failUnless( 0.9 <= (t2-t1) < 2.1, "Timeout not in the given interval (%s)" % (t2-t1))
-
-
-if __name__ == '__main__':
- import sys
- sys.stdout.write(__doc__)
- if len(sys.argv) > 1:
- PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
- sys.argv[1:] = ['-v']
- # When this module is executed from the command-line, it runs all its tests
- unittest.main()
diff --git a/pyserial/examples/test_advanced.py b/pyserial/examples/test_advanced.py
deleted file mode 100644
index a19361c..0000000
--- a/pyserial/examples/test_advanced.py
+++ /dev/null
@@ -1,173 +0,0 @@
-#!/usr/bin/env python
-# needs at least python 2.2.3
-
-# Python Serial Port Extension for Win32, Linux, BSD, Jython
-# see __init__.py
-#
-# (C) 2001-2003 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-
-"""\
-Some tests for the serial module.
-Part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net
-
-Intended to be run on different platforms, to ensure portability of
-the code.
-
-These tests open a serial port and change all the settings on the fly.
-If the port is really correctly configured cannot be determined - that
-would require external hardware or a nullmodem cable and an other
-serial port library... Thus it mainly tests that all features are
-correctly implemented and that the interface does what it should.
-
-"""
-
-import unittest
-import serial
-
-# on which port should the tests be performed:
-PORT=0
-
-class Test_ChangeAttributes(unittest.TestCase):
- """Test with timeouts"""
-
- def setUp(self):
- self.s = serial.Serial() # create a closed serial port
-
- def tearDown(self):
- self.s.close()
-
- def test_PortSetting(self):
- self.s.port = PORT
- # portstr has to be set
- if isinstance(PORT, str):
- self.failUnlessEqual(self.s.portstr.lower(), PORT.lower())
- else:
- self.failUnlessEqual(self.s.portstr, serial.device(PORT))
- # test internals
- self.failUnlessEqual(self.s._port, PORT)
- # test on the fly change
- self.s.open()
- self.failUnless(self.s.isOpen())
- self.s.port = 0
- self.failUnless(self.s.isOpen())
- self.failUnlessEqual(self.s.port, 0)
- self.failUnlessEqual(self.s.portstr, serial.device(0))
- try:
- self.s.port = 1
- except serial.SerialException: # port not available on system
- pass # can't test on this machine...
- else:
- self.failUnless(self.s.isOpen())
- self.failUnlessEqual(self.s.port, 1)
- self.failUnlessEqual(self.s.portstr, serial.device(1))
-
- def test_BaudrateSetting(self):
- self.s.port = PORT
- self.s.open()
- for baudrate in (300, 9600, 19200, 115200):
- self.s.baudrate = baudrate
- # test get method
- self.failUnlessEqual(self.s.baudrate, baudrate)
- # test internals
- self.failUnlessEqual(self.s._baudrate, baudrate)
- # test illegal values
- for illegal_value in (-300, -1, 'a', None):
- self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value)
-
- # skip this test as pyserial now tries to set even non standard baud rates.
- # therefore the test can not choose a value that fails on any system.
- def disabled_test_BaudrateSetting2(self):
- # test illegal values, depending on machine/port some of these may be valid...
- self.s.port = PORT
- self.s.open()
- for illegal_value in (500000,576000,921600,92160):
- self.failUnlessRaises(ValueError, self.s.setBaudrate, illegal_value)
-
- def test_BytesizeSetting(self):
- for bytesize in (5,6,7,8):
- self.s.bytesize = bytesize
- # test get method
- self.failUnlessEqual(self.s.bytesize, bytesize)
- # test internals
- self.failUnlessEqual(self.s._bytesize, bytesize)
- # test illegal values
- for illegal_value in (0, 1, 3, 4, 9, 10, 'a', None):
- self.failUnlessRaises(ValueError, self.s.setByteSize, illegal_value)
-
- def test_ParitySetting(self):
- for parity in (serial.PARITY_NONE, serial.PARITY_EVEN, serial.PARITY_ODD):
- self.s.parity = parity
- # test get method
- self.failUnlessEqual(self.s.parity, parity)
- # test internals
- self.failUnlessEqual(self.s._parity, parity)
- # test illegal values
- for illegal_value in (0, 57, 'a', None):
- self.failUnlessRaises(ValueError, self.s.setParity, illegal_value)
-
- def test_StopbitsSetting(self):
- for stopbits in (1, 2):
- self.s.stopbits = stopbits
- # test get method
- self.failUnlessEqual(self.s.stopbits, stopbits)
- # test internals
- self.failUnlessEqual(self.s._stopbits, stopbits)
- # test illegal values
- for illegal_value in (0, 3, 2.5, 57, 'a', None):
- self.failUnlessRaises(ValueError, self.s.setStopbits, illegal_value)
-
- def test_TimeoutSetting(self):
- for timeout in (None, 0, 1, 3.14159, 10, 1000, 3600):
- self.s.timeout = timeout
- # test get method
- self.failUnlessEqual(self.s.timeout, timeout)
- # test internals
- self.failUnlessEqual(self.s._timeout, timeout)
- # test illegal values
- for illegal_value in (-1, 'a'):
- self.failUnlessRaises(ValueError, self.s.setTimeout, illegal_value)
-
- def test_XonXoffSetting(self):
- for xonxoff in (True, False):
- self.s.xonxoff = xonxoff
- # test get method
- self.failUnlessEqual(self.s.xonxoff, xonxoff)
- # test internals
- self.failUnlessEqual(self.s._xonxoff, xonxoff)
- # no illegal values here, normal rules for the boolean value of an
- # object are used thus all objects have a truth value.
-
- def test_RtsCtsSetting(self):
- for rtscts in (True, False):
- self.s.rtscts = rtscts
- # test get method
- self.failUnlessEqual(self.s.rtscts, rtscts)
- # test internals
- self.failUnlessEqual(self.s._rtscts, rtscts)
- # no illegal values here, normal rules for the boolean value of an
- # object are used thus all objects have a truth value.
-
- def test_UnconfiguredPort(self):
- # an unconfigured port cannot be opened
- self.failUnlessRaises(serial.SerialException, self.s.open)
-
- def test_PortOpenClose(self):
- self.s.port = PORT
- for i in range(3):
- # open the port and check flag
- self.failUnless(not self.s.isOpen())
- self.s.open()
- self.failUnless(self.s.isOpen())
- self.s.close()
- self.failUnless(not self.s.isOpen())
-
-if __name__ == '__main__':
- import sys
- sys.stdout.write(__doc__)
- if len(sys.argv) > 1:
- PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
- sys.argv[1:] = ['-v']
- # When this module is executed from the command-line, it runs all its tests
- unittest.main()
diff --git a/pyserial/examples/test_high_load.py b/pyserial/examples/test_high_load.py
deleted file mode 100644
index a4d8e04..0000000
--- a/pyserial/examples/test_high_load.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#!/usr/bin/env python
-#Python Serial Port Extension for Win32, Linux, BSD, Jython
-#see __init__.py
-#
-#(C) 2001-2003 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-
-"""Some tests for the serial module.
-Part of pyserial (http://pyserial.sf.net) (C)2002-2003 cliechti@gmx.net
-
-Intended to be run on different platforms, to ensure portability of
-the code.
-
-For all these tests a simple hardware is required.
-Loopback HW adapter:
-Shortcut these pin pairs:
- TX <-> RX
- RTS <-> CTS
- DTR <-> DSR
-
-On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
-"""
-
-import unittest, threading, time
-import sys
-import serial
-
-#on which port should the tests be performed:
-PORT = 0
-BAUDRATE = 115200
-#~ BAUDRATE=9600
-
-if sys.version_info >= (2, 6):
- bytes_0to255 = bytes(range(256))
-else:
- def data(string): return string
- bytes_0to255 = ''.join([chr(x) for x in range(256)])
-
-
-class TestHighLoad(unittest.TestCase):
- """Test sending and receiving large amount of data"""
-
- N = 16
- #~ N = 1
-
- def setUp(self):
- self.s = serial.Serial(PORT,BAUDRATE, timeout=10)
- def tearDown(self):
- self.s.close()
-
- def test0_WriteReadLoopback(self):
- """Send big strings, write/read order."""
- for i in range(self.N):
- q = bytes_0to255
- self.s.write(q)
- self.failUnless(self.s.read(len(q))==q, "expected a '%s' which was written before" % q)
- self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read")
-
- def test1_WriteWriteReadLoopback(self):
- """Send big strings, multiple write one read."""
- q = bytes_0to255
- for i in range(self.N):
- self.s.write(q)
- read = self.s.read(len(q)*self.N)
- self.failUnless(read==q*self.N, "expected what was written before. got %d bytes, expected %d" % (len(read), self.N*len(q)))
- self.failUnless(self.s.inWaiting()==0, "expected empty buffer after all sent chars are read")
-
-if __name__ == '__main__':
- import sys
- sys.stdout.write(__doc__)
- if len(sys.argv) > 1:
- PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
- sys.argv[1:] = ['-v']
- # When this module is executed from the command-line, it runs all its tests
- unittest.main()
diff --git a/pyserial/examples/test_iolib.py b/pyserial/examples/test_iolib.py
deleted file mode 100644
index e740a4c..0000000
--- a/pyserial/examples/test_iolib.py
+++ /dev/null
@@ -1,78 +0,0 @@
-##! /usr/bin/env python
-# Python Serial Port Extension for Win32, Linux, BSD, Jython
-# see __init__.py
-#
-# (C) 2001-2008 Chris Liechti <cliechti@gmx.net>
-# this is distributed under a free software license, see license.txt
-
-"""\
-Some tests for the serial module.
-Part of pyserial (http://pyserial.sf.net) (C)2001-2009 cliechti@gmx.net
-
-Intended to be run on different platforms, to ensure portability of
-the code.
-
-This modules contains test for the interaction between Serial and the io
-library. This only works on Python 2.6+ that introduced the io library.
-
-For all these tests a simple hardware is required.
-Loopback HW adapter:
-Shortcut these pin pairs:
- TX <-> RX
- RTS <-> CTS
- DTR <-> DSR
-
-On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
-"""
-
-import unittest
-import sys
-
-if sys.version_info < (2, 6):
- sys.stderr.write("""\
-==============================================================================
-WARNING: this test is intended for Python 2.6 and newer where the io library
-is available. This seems to be an older version of Python running.
-Continuing anyway...
-==============================================================================
-""")
-
-import io
-import serial
-
-# trick to make that this test run under 2.6 and 3.x without modification.
-# problem is, io library on 2.6 does NOT accept type 'str' and 3.x doesn't
-# like u'nicode' strings with the prefix and it is not providing an unicode
-# function ('str' is now what 'unicode' used to be)
-if sys.version_info >= (3, 0):
- def unicode(x): return x
-
-
-# on which port should the tests be performed:
-PORT = 0
-
-class Test_SerialAndIO(unittest.TestCase):
-
- def setUp(self):
- self.s = serial.Serial(PORT, timeout=1)
- self.io = io.TextIOWrapper(io.BufferedRWPair(self.s, self.s))
-
- def tearDown(self):
- self.s.close()
-
- def test_hello_raw(self):
- self.io.write(unicode("hello\n"))
- self.io.flush() # it is buffering. required to get the data out
- hello = self.io.readline()
- self.failUnlessEqual(hello, unicode("hello\n"))
-
-
-if __name__ == '__main__':
- import sys
- sys.stdout.write(__doc__)
- if len(sys.argv) > 1:
- PORT = sys.argv[1]
- sys.stdout.write("Testing port: %r\n" % PORT)
- sys.argv[1:] = ['-v']
- # When this module is executed from the command-line, it runs all its tests
- unittest.main()
diff --git a/pyserial/examples/wxSerialConfigDialog.py b/pyserial/examples/wxSerialConfigDialog.py
deleted file mode 100644
index 7085035..0000000
--- a/pyserial/examples/wxSerialConfigDialog.py
+++ /dev/null
@@ -1,260 +0,0 @@
-#!/usr/bin/env python
-# generated by wxGlade 0.3.1 on Thu Oct 02 23:25:44 2003
-
-#from wxPython.wx import *
-import wx
-import serial
-
-SHOW_BAUDRATE = 1<<0
-SHOW_FORMAT = 1<<1
-SHOW_FLOW = 1<<2
-SHOW_TIMEOUT = 1<<3
-SHOW_ALL = SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT
-
-try:
- enumerate
-except NameError:
- def enumerate(sequence):
- return zip(range(len(sequence)), sequence)
-
-class SerialConfigDialog(wx.Dialog):
- """Serial Port confiuration dialog, to be used with pyserial 2.0+
- When instantiating a class of this dialog, then the "serial" keyword
- argument is mandatory. It is a reference to a serial.Serial instance.
- the optional "show" keyword argument can be used to show/hide different
- settings. The default is SHOW_ALL which coresponds to
- SHOW_BAUDRATE|SHOW_FORMAT|SHOW_FLOW|SHOW_TIMEOUT. All constants can be
- found in ths module (not the class)."""
-
- def __init__(self, *args, **kwds):
- #grab the serial keyword and remove it from the dict
- self.serial = kwds['serial']
- del kwds['serial']
- self.show = SHOW_ALL
- if kwds.has_key('show'):
- self.show = kwds['show']
- del kwds['show']
- # begin wxGlade: SerialConfigDialog.__init__
- # end wxGlade
- kwds["style"] = wx.DEFAULT_DIALOG_STYLE
- wx.Dialog.__init__(self, *args, **kwds)
- self.label_2 = wx.StaticText(self, -1, "Port")
- self.combo_box_port = wx.ComboBox(self, -1, choices=["dummy1", "dummy2", "dummy3", "dummy4", "dummy5"], style=wx.CB_DROPDOWN)
- if self.show & SHOW_BAUDRATE:
- self.label_1 = wx.StaticText(self, -1, "Baudrate")
- self.choice_baudrate = wx.Choice(self, -1, choices=["choice 1"])
- if self.show & SHOW_FORMAT:
- self.label_3 = wx.StaticText(self, -1, "Data Bits")
- self.choice_databits = wx.Choice(self, -1, choices=["choice 1"])
- self.label_4 = wx.StaticText(self, -1, "Stop Bits")
- self.choice_stopbits = wx.Choice(self, -1, choices=["choice 1"])
- self.label_5 = wx.StaticText(self, -1, "Parity")
- self.choice_parity = wx.Choice(self, -1, choices=["choice 1"])
- if self.show & SHOW_TIMEOUT:
- self.checkbox_timeout = wx.CheckBox(self, -1, "Use Timeout")
- self.text_ctrl_timeout = wx.TextCtrl(self, -1, "")
- self.label_6 = wx.StaticText(self, -1, "seconds")
- if self.show & SHOW_FLOW:
- self.checkbox_rtscts = wx.CheckBox(self, -1, "RTS/CTS")
- self.checkbox_xonxoff = wx.CheckBox(self, -1, "Xon/Xoff")
- self.button_ok = wx.Button(self, -1, "OK")
- self.button_cancel = wx.Button(self, -1, "Cancel")
-
- self.__set_properties()
- self.__do_layout()
- #fill in ports and select current setting
- index = 0
- self.combo_box_port.Clear()
- for n in range(4):
- portname = serial.device(n)
- self.combo_box_port.Append(portname)
- if self.serial.portstr == portname:
- index = n
- if self.serial.portstr is not None:
- self.combo_box_port.SetValue(str(self.serial.portstr))
- else:
- self.combo_box_port.SetSelection(index)
- if self.show & SHOW_BAUDRATE:
- #fill in badrates and select current setting
- self.choice_baudrate.Clear()
- for n, baudrate in enumerate(self.serial.BAUDRATES):
- self.choice_baudrate.Append(str(baudrate))
- if self.serial.baudrate == baudrate:
- index = n
- self.choice_baudrate.SetSelection(index)
- if self.show & SHOW_FORMAT:
- #fill in databits and select current setting
- self.choice_databits.Clear()
- for n, bytesize in enumerate(self.serial.BYTESIZES):
- self.choice_databits.Append(str(bytesize))
- if self.serial.bytesize == bytesize:
- index = n
- self.choice_databits.SetSelection(index)
- #fill in stopbits and select current setting
- self.choice_stopbits.Clear()
- for n, stopbits in enumerate(self.serial.STOPBITS):
- self.choice_stopbits.Append(str(stopbits))
- if self.serial.stopbits == stopbits:
- index = n
- self.choice_stopbits.SetSelection(index)
- #fill in parities and select current setting
- self.choice_parity.Clear()
- for n, parity in enumerate(self.serial.PARITIES):
- self.choice_parity.Append(str(serial.PARITY_NAMES[parity]))
- if self.serial.parity == parity:
- index = n
- self.choice_parity.SetSelection(index)
- if self.show & SHOW_TIMEOUT:
- #set the timeout mode and value
- if self.serial.timeout is None:
- self.checkbox_timeout.SetValue(False)
- self.text_ctrl_timeout.Enable(False)
- else:
- self.checkbox_timeout.SetValue(True)
- self.text_ctrl_timeout.Enable(True)
- self.text_ctrl_timeout.SetValue(str(self.serial.timeout))
- if self.show & SHOW_FLOW:
- #set the rtscts mode
- self.checkbox_rtscts.SetValue(self.serial.rtscts)
- #set the rtscts mode
- self.checkbox_xonxoff.SetValue(self.serial.xonxoff)
- #attach the event handlers
- self.__attach_events()
-
- def __set_properties(self):
- # begin wxGlade: SerialConfigDialog.__set_properties
- # end wxGlade
- self.SetTitle("Serial Port Configuration")
- if self.show & SHOW_TIMEOUT:
- self.text_ctrl_timeout.Enable(0)
- self.button_ok.SetDefault()
-
- def __do_layout(self):
- # begin wxGlade: SerialConfigDialog.__do_layout
- # end wxGlade
- sizer_2 = wx.BoxSizer(wx.VERTICAL)
- sizer_3 = wx.BoxSizer(wx.HORIZONTAL)
- sizer_basics = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Basics"), wx.VERTICAL)
- sizer_5 = wx.BoxSizer(wx.HORIZONTAL)
- sizer_5.Add(self.label_2, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_5.Add(self.combo_box_port, 1, 0, 0)
- sizer_basics.Add(sizer_5, 0, wx.RIGHT|wx.EXPAND, 0)
- if self.show & SHOW_BAUDRATE:
- sizer_baudrate = wx.BoxSizer(wx.HORIZONTAL)
- sizer_baudrate.Add(self.label_1, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_baudrate.Add(self.choice_baudrate, 1, wx.ALIGN_RIGHT, 0)
- sizer_basics.Add(sizer_baudrate, 0, wx.EXPAND, 0)
- sizer_2.Add(sizer_basics, 0, wx.EXPAND, 0)
- if self.show & SHOW_FORMAT:
- sizer_8 = wx.BoxSizer(wx.HORIZONTAL)
- sizer_7 = wx.BoxSizer(wx.HORIZONTAL)
- sizer_6 = wx.BoxSizer(wx.HORIZONTAL)
- sizer_format = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Data Format"), wx.VERTICAL)
- sizer_6.Add(self.label_3, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_6.Add(self.choice_databits, 1, wx.ALIGN_RIGHT, 0)
- sizer_format.Add(sizer_6, 0, wx.EXPAND, 0)
- sizer_7.Add(self.label_4, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_7.Add(self.choice_stopbits, 1, wx.ALIGN_RIGHT, 0)
- sizer_format.Add(sizer_7, 0, wx.EXPAND, 0)
- sizer_8.Add(self.label_5, 1, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_8.Add(self.choice_parity, 1, wx.ALIGN_RIGHT, 0)
- sizer_format.Add(sizer_8, 0, wx.EXPAND, 0)
- sizer_2.Add(sizer_format, 0, wx.EXPAND, 0)
- if self.show & SHOW_TIMEOUT:
- sizer_timeout = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Timeout"), wx.HORIZONTAL)
- sizer_timeout.Add(self.checkbox_timeout, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_timeout.Add(self.text_ctrl_timeout, 0, 0, 0)
- sizer_timeout.Add(self.label_6, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_2.Add(sizer_timeout, 0, 0, 0)
- if self.show & SHOW_FLOW:
- sizer_flow = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Flow Control"), wx.HORIZONTAL)
- sizer_flow.Add(self.checkbox_rtscts, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_flow.Add(self.checkbox_xonxoff, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 4)
- sizer_flow.Add((10,10), 1, wx.EXPAND, 0)
- sizer_2.Add(sizer_flow, 0, wx.EXPAND, 0)
- sizer_3.Add(self.button_ok, 0, 0, 0)
- sizer_3.Add(self.button_cancel, 0, 0, 0)
- sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4)
- self.SetAutoLayout(1)
- self.SetSizer(sizer_2)
- sizer_2.Fit(self)
- sizer_2.SetSizeHints(self)
- self.Layout()
-
- def __attach_events(self):
- wx.EVT_BUTTON(self, self.button_ok.GetId(), self.OnOK)
- wx.EVT_BUTTON(self, self.button_cancel.GetId(), self.OnCancel)
- if self.show & SHOW_TIMEOUT:
- wx.EVT_CHECKBOX(self, self.checkbox_timeout.GetId(), self.OnTimeout)
-
- def OnOK(self, events):
- success = True
- self.serial.port = str(self.combo_box_port.GetValue())
- if self.show & SHOW_BAUDRATE:
- self.serial.baudrate = self.serial.BAUDRATES[self.choice_baudrate.GetSelection()]
- if self.show & SHOW_FORMAT:
- self.serial.bytesize = self.serial.BYTESIZES[self.choice_databits.GetSelection()]
- self.serial.stopbits = self.serial.STOPBITS[self.choice_stopbits.GetSelection()]
- self.serial.parity = self.serial.PARITIES[self.choice_parity.GetSelection()]
- if self.show & SHOW_FLOW:
- self.serial.rtscts = self.checkbox_rtscts.GetValue()
- self.serial.xonxoff = self.checkbox_xonxoff.GetValue()
- if self.show & SHOW_TIMEOUT:
- if self.checkbox_timeout.GetValue():
- try:
- self.serial.timeout = float(self.text_ctrl_timeout.GetValue())
- except ValueError:
- dlg = wx.MessageDialog(self, 'Timeout must be a numeric value',
- 'Value Error', wx.OK | wx.ICON_ERROR)
- dlg.ShowModal()
- dlg.Destroy()
- success = False
- else:
- self.serial.timeout = None
- if success:
- self.EndModal(wx.ID_OK)
-
- def OnCancel(self, events):
- self.EndModal(wx.ID_CANCEL)
-
- def OnTimeout(self, events):
- if self.checkbox_timeout.GetValue():
- self.text_ctrl_timeout.Enable(True)
- else:
- self.text_ctrl_timeout.Enable(False)
-
-# end of class SerialConfigDialog
-
-
-class MyApp(wx.App):
- """Test code"""
- def OnInit(self):
- wx.InitAllImageHandlers()
-
- ser = serial.Serial()
- print ser
- #loop until cancel is pressed, old values are used as start for the next run
- #show the different views, one after the other
- #value are kept.
- for flags in (SHOW_BAUDRATE, SHOW_FLOW, SHOW_FORMAT, SHOW_TIMEOUT, SHOW_ALL):
- dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser, show=flags)
- self.SetTopWindow(dialog_serial_cfg)
- result = dialog_serial_cfg.ShowModal()
- print ser
- if result != wx.ID_OK:
- break
- #the user can play around with the values, CANCEL aborts the loop
- while 1:
- dialog_serial_cfg = SerialConfigDialog(None, -1, "", serial=ser)
- self.SetTopWindow(dialog_serial_cfg)
- result = dialog_serial_cfg.ShowModal()
- print ser
- if result != wx.ID_OK:
- break
- return 0
-
-# end of class MyApp
-
-if __name__ == "__main__":
- app = MyApp(0)
- app.MainLoop()
diff --git a/pyserial/examples/wxSerialConfigDialog.wxg b/pyserial/examples/wxSerialConfigDialog.wxg
deleted file mode 100644
index f5e92e0..0000000
--- a/pyserial/examples/wxSerialConfigDialog.wxg
+++ /dev/null
@@ -1,262 +0,0 @@
-<?xml version="1.0"?>
-<!-- generated by wxGlade 0.3.1 on Fri Oct 03 01:53:04 2003 -->
-
-<application path="D:\prog\python\pyserial_sf\pyserial\examples\wxSerialConfigDialog.py" name="app" class="MyApp" option="0" language="python" top_window="dialog_serial_cfg" encoding="ISO-8859-1" use_gettext="0" overwrite="0">
- <object class="SerialConfigDialog" name="dialog_serial_cfg" base="EditDialog">
- <style>wxDEFAULT_DIALOG_STYLE</style>
- <title>Serial Port Configuration</title>
- <object class="wxBoxSizer" name="sizer_2" base="EditBoxSizer">
- <orient>wxVERTICAL</orient>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxStaticBoxSizer" name="sizer_basics" base="EditStaticBoxSizer">
- <orient>wxVERTICAL</orient>
- <label>Basics</label>
- <object class="sizeritem">
- <flag>wxRIGHT|wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxBoxSizer" name="sizer_5" base="EditBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>1</option>
- <object class="wxStaticText" name="label_2" base="EditStaticText">
- <attribute>1</attribute>
- <label>Port</label>
- </object>
- </object>
- <object class="sizeritem">
- <border>0</border>
- <option>1</option>
- <object class="wxComboBox" name="combo_box_port" base="EditComboBox">
- <selection>0</selection>
- <choices>
- <choice>dummy1</choice>
- <choice>dummy2</choice>
- <choice>dummy3</choice>
- <choice>dummy4</choice>
- <choice>dummy5</choice>
- </choices>
- </object>
- </object>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxBoxSizer" name="sizer_baudrate" base="EditBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>1</option>
- <object class="wxStaticText" name="label_1" base="EditStaticText">
- <attribute>1</attribute>
- <label>Baudrate</label>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALIGN_RIGHT</flag>
- <border>0</border>
- <option>1</option>
- <object class="wxChoice" name="choice_baudrate" base="EditChoice">
- <selection>0</selection>
- <choices>
- <choice>choice 1</choice>
- </choices>
- </object>
- </object>
- </object>
- </object>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxStaticBoxSizer" name="sizer_format" base="EditStaticBoxSizer">
- <orient>wxVERTICAL</orient>
- <label>Data Format</label>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxBoxSizer" name="sizer_6" base="EditBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>1</option>
- <object class="wxStaticText" name="label_3" base="EditStaticText">
- <attribute>1</attribute>
- <label>Data Bits</label>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALIGN_RIGHT</flag>
- <border>0</border>
- <option>1</option>
- <object class="wxChoice" name="choice_databits" base="EditChoice">
- <selection>0</selection>
- <choices>
- <choice>choice 1</choice>
- </choices>
- </object>
- </object>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxBoxSizer" name="sizer_7" base="EditBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>1</option>
- <object class="wxStaticText" name="label_4" base="EditStaticText">
- <attribute>1</attribute>
- <label>Stop Bits</label>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALIGN_RIGHT</flag>
- <border>0</border>
- <option>1</option>
- <object class="wxChoice" name="choice_stopbits" base="EditChoice">
- <selection>0</selection>
- <choices>
- <choice>choice 1</choice>
- </choices>
- </object>
- </object>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxBoxSizer" name="sizer_8" base="EditBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>1</option>
- <object class="wxStaticText" name="label_5" base="EditStaticText">
- <attribute>1</attribute>
- <label>Parity</label>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALIGN_RIGHT</flag>
- <border>0</border>
- <option>1</option>
- <object class="wxChoice" name="choice_parity" base="EditChoice">
- <selection>0</selection>
- <choices>
- <choice>choice 1</choice>
- </choices>
- </object>
- </object>
- </object>
- </object>
- </object>
- </object>
- <object class="sizeritem">
- <border>0</border>
- <option>0</option>
- <object class="wxStaticBoxSizer" name="sizer_timeout" base="EditStaticBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <label>Timeout</label>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>0</option>
- <object class="wxCheckBox" name="checkbox_timeout" base="EditCheckBox">
- <label>Use Timeout</label>
- </object>
- </object>
- <object class="sizeritem">
- <border>0</border>
- <option>0</option>
- <object class="wxTextCtrl" name="text_ctrl_timeout" base="EditTextCtrl">
- <disabled>1</disabled>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>0</option>
- <object class="wxStaticText" name="label_6" base="EditStaticText">
- <attribute>1</attribute>
- <label>seconds</label>
- </object>
- </object>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxStaticBoxSizer" name="sizer_flow" base="EditStaticBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <label>Flow Control</label>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>0</option>
- <object class="wxCheckBox" name="checkbox_rtscts" base="EditCheckBox">
- <label>RTS/CTS</label>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_CENTER_VERTICAL</flag>
- <border>4</border>
- <option>0</option>
- <object class="wxCheckBox" name="checkbox_xonxoff" base="EditCheckBox">
- <label>Xon/Xoff</label>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>1</option>
- <object class="spacer" name="spacer" base="EditSpacer">
- <height>10</height>
- <width>10</width>
- </object>
- </object>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_RIGHT</flag>
- <border>4</border>
- <option>0</option>
- <object class="wxBoxSizer" name="sizer_3" base="EditBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <object class="sizeritem">
- <border>0</border>
- <option>0</option>
- <object class="wxButton" name="button_ok" base="EditButton">
- <default>1</default>
- <label>OK</label>
- </object>
- </object>
- <object class="sizeritem">
- <border>0</border>
- <option>0</option>
- <object class="wxButton" name="button_cancel" base="EditButton">
- <label>Cancel</label>
- </object>
- </object>
- </object>
- </object>
- </object>
- </object>
-</application>
diff --git a/pyserial/examples/wxTerminal.py b/pyserial/examples/wxTerminal.py
deleted file mode 100644
index 646c272..0000000
--- a/pyserial/examples/wxTerminal.py
+++ /dev/null
@@ -1,333 +0,0 @@
-#!/usr/bin/env python
-# generated by wxGlade 0.3.1 on Fri Oct 03 23:23:45 2003
-
-#from wxPython.wx import *
-import wx
-import wxSerialConfigDialog
-import serial
-import threading
-
-#----------------------------------------------------------------------
-# Create an own event type, so that GUI updates can be delegated
-# this is required as on some platforms only the main thread can
-# access the GUI without crashing. wxMutexGuiEnter/wxMutexGuiLeave
-# could be used too, but an event is more elegant.
-
-SERIALRX = wx.NewEventType()
-# bind to serial data receive events
-EVT_SERIALRX = wx.PyEventBinder(SERIALRX, 0)
-
-class SerialRxEvent(wx.PyCommandEvent):
- eventType = SERIALRX
- def __init__(self, windowID, data):
- wx.PyCommandEvent.__init__(self, self.eventType, windowID)
- self.data = data
-
- def Clone(self):
- self.__class__(self.GetId(), self.data)
-
-#----------------------------------------------------------------------
-
-ID_CLEAR = wx.NewId()
-ID_SAVEAS = wx.NewId()
-ID_SETTINGS = wx.NewId()
-ID_TERM = wx.NewId()
-ID_EXIT = wx.NewId()
-
-NEWLINE_CR = 0
-NEWLINE_LF = 1
-NEWLINE_CRLF = 2
-
-class TerminalSetup:
- """Placeholder for various terminal settings. Used to pass the
- options to the TerminalSettingsDialog."""
- def __init__(self):
- self.echo = False
- self.unprintable = False
- self.newline = NEWLINE_CRLF
-
-class TerminalSettingsDialog(wx.Dialog):
- """Simple dialog with common terminal settings like echo, newline mode."""
-
- def __init__(self, *args, **kwds):
- self.settings = kwds['settings']
- del kwds['settings']
- # begin wxGlade: TerminalSettingsDialog.__init__
- kwds["style"] = wx.DEFAULT_DIALOG_STYLE
- wx.Dialog.__init__(self, *args, **kwds)
- self.checkbox_echo = wx.CheckBox(self, -1, "Local Echo")
- self.checkbox_unprintable = wx.CheckBox(self, -1, "Show unprintable characters")
- self.radio_box_newline = wx.RadioBox(self, -1, "Newline Handling", choices=["CR only", "LF only", "CR+LF"], majorDimension=0, style=wx.RA_SPECIFY_ROWS)
- self.button_ok = wx.Button(self, -1, "OK")
- self.button_cancel = wx.Button(self, -1, "Cancel")
-
- self.__set_properties()
- self.__do_layout()
- # end wxGlade
- self.__attach_events()
- self.checkbox_echo.SetValue(self.settings.echo)
- self.checkbox_unprintable.SetValue(self.settings.unprintable)
- self.radio_box_newline.SetSelection(self.settings.newline)
-
- def __set_properties(self):
- # begin wxGlade: TerminalSettingsDialog.__set_properties
- self.SetTitle("Terminal Settings")
- self.radio_box_newline.SetSelection(0)
- self.button_ok.SetDefault()
- # end wxGlade
-
- def __do_layout(self):
- # begin wxGlade: TerminalSettingsDialog.__do_layout
- sizer_2 = wx.BoxSizer(wx.VERTICAL)
- sizer_3 = wx.BoxSizer(wx.HORIZONTAL)
- sizer_4 = wx.StaticBoxSizer(wx.StaticBox(self, -1, "Input/Output"), wx.VERTICAL)
- sizer_4.Add(self.checkbox_echo, 0, wx.ALL, 4)
- sizer_4.Add(self.checkbox_unprintable, 0, wx.ALL, 4)
- sizer_4.Add(self.radio_box_newline, 0, 0, 0)
- sizer_2.Add(sizer_4, 0, wx.EXPAND, 0)
- sizer_3.Add(self.button_ok, 0, 0, 0)
- sizer_3.Add(self.button_cancel, 0, 0, 0)
- sizer_2.Add(sizer_3, 0, wx.ALL|wx.ALIGN_RIGHT, 4)
- self.SetAutoLayout(1)
- self.SetSizer(sizer_2)
- sizer_2.Fit(self)
- sizer_2.SetSizeHints(self)
- self.Layout()
- # end wxGlade
-
- def __attach_events(self):
- self.Bind(wx.EVT_BUTTON, self.OnOK, id = self.button_ok.GetId())
- self.Bind(wx.EVT_BUTTON, self.OnCancel, id = self.button_cancel.GetId())
-
- def OnOK(self, events):
- """Update data wil new values and close dialog."""
- self.settings.echo = self.checkbox_echo.GetValue()
- self.settings.unprintable = self.checkbox_unprintable.GetValue()
- self.settings.newline = self.radio_box_newline.GetSelection()
- self.EndModal(wx.ID_OK)
-
- def OnCancel(self, events):
- """Do not update data but close dialog."""
- self.EndModal(wx.ID_CANCEL)
-
-# end of class TerminalSettingsDialog
-
-
-class TerminalFrame(wx.Frame):
- """Simple terminal program for wxPython"""
-
- def __init__(self, *args, **kwds):
- self.serial = serial.Serial()
- self.serial.timeout = 0.5 #make sure that the alive event can be checked from time to time
- self.settings = TerminalSetup() #placeholder for the settings
- self.thread = None
- self.alive = threading.Event()
- # begin wxGlade: TerminalFrame.__init__
- kwds["style"] = wx.DEFAULT_FRAME_STYLE
- wx.Frame.__init__(self, *args, **kwds)
- self.text_ctrl_output = wx.TextCtrl(self, -1, "", style=wx.TE_MULTILINE|wx.TE_READONLY)
-
- # Menu Bar
- self.frame_terminal_menubar = wx.MenuBar()
- self.SetMenuBar(self.frame_terminal_menubar)
- wxglade_tmp_menu = wx.Menu()
- wxglade_tmp_menu.Append(ID_CLEAR, "&Clear", "", wx.ITEM_NORMAL)
- wxglade_tmp_menu.Append(ID_SAVEAS, "&Save Text As...", "", wx.ITEM_NORMAL)
- wxglade_tmp_menu.AppendSeparator()
- wxglade_tmp_menu.Append(ID_SETTINGS, "&Port Settings...", "", wx.ITEM_NORMAL)
- wxglade_tmp_menu.Append(ID_TERM, "&Terminal Settings...", "", wx.ITEM_NORMAL)
- wxglade_tmp_menu.AppendSeparator()
- wxglade_tmp_menu.Append(ID_EXIT, "&Exit", "", wx.ITEM_NORMAL)
- self.frame_terminal_menubar.Append(wxglade_tmp_menu, "&File")
- # Menu Bar end
-
- self.__set_properties()
- self.__do_layout()
- # end wxGlade
- self.__attach_events() #register events
- self.OnPortSettings(None) #call setup dialog on startup, opens port
- if not self.alive.isSet():
- self.Close()
-
- def StartThread(self):
- """Start the receiver thread"""
- self.thread = threading.Thread(target=self.ComPortThread)
- self.thread.setDaemon(1)
- self.alive.set()
- self.thread.start()
-
- def StopThread(self):
- """Stop the receiver thread, wait util it's finished."""
- if self.thread is not None:
- self.alive.clear() #clear alive event for thread
- self.thread.join() #wait until thread has finished
- self.thread = None
-
- def __set_properties(self):
- # begin wxGlade: TerminalFrame.__set_properties
- self.SetTitle("Serial Terminal")
- self.SetSize((546, 383))
- # end wxGlade
-
- def __do_layout(self):
- # begin wxGlade: TerminalFrame.__do_layout
- sizer_1 = wx.BoxSizer(wx.VERTICAL)
- sizer_1.Add(self.text_ctrl_output, 1, wx.EXPAND, 0)
- self.SetAutoLayout(1)
- self.SetSizer(sizer_1)
- self.Layout()
- # end wxGlade
-
- def __attach_events(self):
- #register events at the controls
- self.Bind(wx.EVT_MENU, self.OnClear, id = ID_CLEAR)
- self.Bind(wx.EVT_MENU, self.OnSaveAs, id = ID_SAVEAS)
- self.Bind(wx.EVT_MENU, self.OnExit, id = ID_EXIT)
- self.Bind(wx.EVT_MENU, self.OnPortSettings, id = ID_SETTINGS)
- self.Bind(wx.EVT_MENU, self.OnTermSettings, id = ID_TERM)
- self.text_ctrl_output.Bind(wx.EVT_CHAR, self.OnKey)
- self.Bind(EVT_SERIALRX, self.OnSerialRead)
- self.Bind(wx.EVT_CLOSE, self.OnClose)
-
- def OnExit(self, event):
- """Menu point Exit"""
- self.Close()
-
- def OnClose(self, event):
- """Called on application shutdown."""
- self.StopThread() #stop reader thread
- self.serial.close() #cleanup
- self.Destroy() #close windows, exit app
-
- def OnSaveAs(self, event):
- """Save contents of output window."""
- filename = None
- dlg = wx.FileDialog(None, "Save Text As...", ".", "", "Text File|*.txt|All Files|*", wx.SAVE)
- if dlg.ShowModal() == wx.ID_OK:
- filename = dlg.GetPath()
- dlg.Destroy()
-
- if filename is not None:
- f = file(filename, 'w')
- text = self.text_ctrl_output.GetValue()
- if type(text) == unicode:
- text = text.encode("latin1") #hm, is that a good asumption?
- f.write(text)
- f.close()
-
- def OnClear(self, event):
- """Clear contents of output window."""
- self.text_ctrl_output.Clear()
-
- def OnPortSettings(self, event=None):
- """Show the portsettings dialog. The reader thread is stopped for the
- settings change."""
- if event is not None: #will be none when called on startup
- self.StopThread()
- self.serial.close()
- ok = False
- while not ok:
- dialog_serial_cfg = wxSerialConfigDialog.SerialConfigDialog(None, -1, "",
- show=wxSerialConfigDialog.SHOW_BAUDRATE|wxSerialConfigDialog.SHOW_FORMAT|wxSerialConfigDialog.SHOW_FLOW,
- serial=self.serial
- )
- result = dialog_serial_cfg.ShowModal()
- dialog_serial_cfg.Destroy()
- #open port if not called on startup, open it on startup and OK too
- if result == wx.ID_OK or event is not None:
- try:
- self.serial.open()
- except serial.SerialException, e:
- dlg = wx.MessageDialog(None, str(e), "Serial Port Error", wx.OK | wx.ICON_ERROR)
- dlg.ShowModal()
- dlg.Destroy()
- else:
- self.StartThread()
- self.SetTitle("Serial Terminal on %s [%s, %s%s%s%s%s]" % (
- self.serial.portstr,
- self.serial.baudrate,
- self.serial.bytesize,
- self.serial.parity,
- self.serial.stopbits,
- self.serial.rtscts and ' RTS/CTS' or '',
- self.serial.xonxoff and ' Xon/Xoff' or '',
- )
- )
- ok = True
- else:
- #on startup, dialog aborted
- self.alive.clear()
- ok = True
-
- def OnTermSettings(self, event):
- """Menu point Terminal Settings. Show the settings dialog
- with the current terminal settings"""
- dialog = TerminalSettingsDialog(None, -1, "", settings=self.settings)
- result = dialog.ShowModal()
- dialog.Destroy()
-
- def OnKey(self, event):
- """Key event handler. if the key is in the ASCII range, write it to the serial port.
- Newline handling and local echo is also done here."""
- code = event.GetKeyCode()
- if code < 256: #is it printable?
- if code == 13: #is it a newline? (check for CR which is the RETURN key)
- if self.settings.echo: #do echo if needed
- self.text_ctrl_output.AppendText('\n')
- if self.settings.newline == NEWLINE_CR:
- self.serial.write('\r') #send CR
- elif self.settings.newline == NEWLINE_LF:
- self.serial.write('\n') #send LF
- elif self.settings.newline == NEWLINE_CRLF:
- self.serial.write('\r\n') #send CR+LF
- else:
- char = chr(code)
- if self.settings.echo: #do echo if needed
- self.text_ctrl_output.WriteText(char)
- self.serial.write(char) #send the charcater
- else:
- print "Extra Key:", code
-
- def OnSerialRead(self, event):
- """Handle input from the serial port."""
- text = event.data
- if self.settings.unprintable:
- text = ''.join([(c >= ' ') and c or '<%d>' % ord(c) for c in text])
- self.text_ctrl_output.AppendText(text)
-
- def ComPortThread(self):
- """Thread that handles the incomming traffic. Does the basic input
- transformation (newlines) and generates an SerialRxEvent"""
- while self.alive.isSet(): #loop while alive event is true
- text = self.serial.read(1) #read one, with timout
- if text: #check if not timeout
- n = self.serial.inWaiting() #look if there is more to read
- if n:
- text = text + self.serial.read(n) #get it
- #newline transformation
- if self.settings.newline == NEWLINE_CR:
- text = text.replace('\r', '\n')
- elif self.settings.newline == NEWLINE_LF:
- pass
- elif self.settings.newline == NEWLINE_CRLF:
- text = text.replace('\r\n', '\n')
- event = SerialRxEvent(self.GetId(), text)
- self.GetEventHandler().AddPendingEvent(event)
- #~ self.OnSerialRead(text) #output text in window
-
-# end of class TerminalFrame
-
-
-class MyApp(wx.App):
- def OnInit(self):
- wx.InitAllImageHandlers()
- frame_terminal = TerminalFrame(None, -1, "")
- self.SetTopWindow(frame_terminal)
- frame_terminal.Show(1)
- return 1
-
-# end of class MyApp
-
-if __name__ == "__main__":
- app = MyApp(0)
- app.MainLoop()
diff --git a/pyserial/examples/wxTerminal.wxg b/pyserial/examples/wxTerminal.wxg
deleted file mode 100644
index 183f876..0000000
--- a/pyserial/examples/wxTerminal.wxg
+++ /dev/null
@@ -1,127 +0,0 @@
-<?xml version="1.0"?>
-<!-- generated by wxGlade 0.3.1 on Sat Oct 04 02:41:48 2003 -->
-
-<application path="D:\prog\python\pyserial_sf\pyserial\examples\wxTerminal.py" name="app" class="MyApp" option="0" language="python" top_window="frame_terminal" encoding="ISO-8859-1" use_gettext="0" overwrite="0">
- <object class="TerminalFrame" name="frame_terminal" base="EditFrame">
- <style>wxDEFAULT_FRAME_STYLE</style>
- <title>Serial Terminal</title>
- <menubar>1</menubar>
- <size>546, 383</size>
- <object class="wxBoxSizer" name="sizer_1" base="EditBoxSizer">
- <orient>wxVERTICAL</orient>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>1</option>
- <object class="wxTextCtrl" name="text_ctrl_output" base="EditTextCtrl">
- <style>wxTE_MULTILINE|wxTE_READONLY</style>
- </object>
- </object>
- </object>
- <object class="wxMenuBar" name="frame_terminal_menubar" base="EditMenuBar">
- <menus>
- <menu name="" label="&amp;File">
- <item>
- <label>&amp;Clear</label>
- <id>ID_CLEAR</id>
- </item>
- <item>
- <label>&amp;Save Text As...</label>
- <id>ID_SAVEAS</id>
- </item>
- <item>
- <label>---</label>
- <id>---</id>
- <name>---</name>
- </item>
- <item>
- <label>&amp;Port Settings...</label>
- <id>ID_SETTINGS</id>
- </item>
- <item>
- <label>&amp;Terminal Settings...</label>
- <id>ID_TERM</id>
- </item>
- <item>
- <label>---</label>
- <name>---</name>
- </item>
- <item>
- <label>&amp;Exit</label>
- <id>ID_EXIT</id>
- </item>
- </menu>
- </menus>
- </object>
- </object>
- <object class="TerminalSettingsDialog" name="dialog_terminal_Settings" base="EditDialog">
- <style>wxDEFAULT_DIALOG_STYLE</style>
- <title>Terminal Settings</title>
- <object class="wxBoxSizer" name="sizer_2" base="EditBoxSizer">
- <orient>wxVERTICAL</orient>
- <object class="sizeritem">
- <flag>wxEXPAND</flag>
- <border>0</border>
- <option>0</option>
- <object class="wxStaticBoxSizer" name="sizer_4" base="EditStaticBoxSizer">
- <orient>wxVERTICAL</orient>
- <label>Input/Output</label>
- <object class="sizeritem">
- <flag>wxALL</flag>
- <border>4</border>
- <option>0</option>
- <object class="wxCheckBox" name="checkbox_echo" base="EditCheckBox">
- <label>Local Echo</label>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALL</flag>
- <border>4</border>
- <option>0</option>
- <object class="wxCheckBox" name="checkbox_unprintable" base="EditCheckBox">
- <label>Show unprintable characters</label>
- </object>
- </object>
- <object class="sizeritem">
- <border>0</border>
- <option>0</option>
- <object class="wxRadioBox" name="radio_box_newline" base="EditRadioBox">
- <style>wxRA_SPECIFY_ROWS</style>
- <selection>0</selection>
- <dimension>0</dimension>
- <label>Newline Handling</label>
- <choices>
- <choice>CR only</choice>
- <choice>LF only</choice>
- <choice>CR+LF</choice>
- </choices>
- </object>
- </object>
- </object>
- </object>
- <object class="sizeritem">
- <flag>wxALL|wxALIGN_RIGHT</flag>
- <border>4</border>
- <option>0</option>
- <object class="wxBoxSizer" name="sizer_3" base="EditBoxSizer">
- <orient>wxHORIZONTAL</orient>
- <object class="sizeritem">
- <border>0</border>
- <option>0</option>
- <object class="wxButton" name="button_ok" base="EditButton">
- <default>1</default>
- <label>OK</label>
- </object>
- </object>
- <object class="sizeritem">
- <border>0</border>
- <option>0</option>
- <object class="wxButton" name="button_cancel" base="EditButton">
- <label>Cancel</label>
- </object>
- </object>
- </object>
- </object>
- </object>
- </object>
-</application>