summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Liechti <cliechti@gmx.net>2015-08-19 03:07:05 +0200
committerChris Liechti <cliechti@gmx.net>2015-08-19 03:07:05 +0200
commit730e9f202daa0b5e7ad01e47c8e18ca103c80de5 (patch)
treed75bf7ffa21638f80f9a2259b20c3bbd3368c598
parent6c8887c4bf01caf38f7fed4cf03245520068909e (diff)
downloadpyserial-git-730e9f202daa0b5e7ad01e47c8e18ca103c80de5.tar.gz
spy: default format is now a hexdump, also log control line access
-rw-r--r--serial/urlhandler/protocol_spy.py176
1 files changed, 151 insertions, 25 deletions
diff --git a/serial/urlhandler/protocol_spy.py b/serial/urlhandler/protocol_spy.py
index 06620ed..3db8773 100644
--- a/serial/urlhandler/protocol_spy.py
+++ b/serial/urlhandler/protocol_spy.py
@@ -16,13 +16,15 @@
# options:
# - dev=X a file or device to write to
# - color use escape code to colorize output
-# - hex hex encode the output
+# - raw forward raw bytes instead of hexdump
#
# example:
# redirect output to an other terminal window on Posix (Linux):
-# python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/12\&color
+# python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
import sys
+import time
+
import serial
try:
@@ -30,16 +32,100 @@ try:
except ImportError:
import urllib.parse as urlparse
+
+def sixteen(data):
+ n = 0
+ for b in serial.iterbytes(data):
+ yield ('{:02X} '.format(ord(b)), b if b' ' <= b < b'\x7f' else b'.')
+ n += 1
+ if n == 8:
+ yield ' ', ' '
+ elif n > 16:
+ yield None, None
+ n = 0
+ while n < 16:
+ yield (' ', ' ')
+ n += 1
+ yield None, None
+
+
+
+def hexdump(data):
+ values = []
+ ascii = []
+ for h, a in sixteen(data):
+ if h is None:
+ yield ' '.join([
+ ''.join(values),
+ ''.join(ascii)])
+ del values[:]
+ del ascii[:]
+ else:
+ values.append(h)
+ ascii.append(a)
+
+
+
+class FormatRaw(object):
+ def __init__(self, output, color):
+ self.output = output
+ self.color = color
+ self.rx_color = '\x1b[32m'
+ self.tx_color = '\x1b[31m'
+
+ def rx(self, data):
+ if self.color:
+ self.output.write(self.rx_color)
+ self.output.write(data)
+ self.output.flush()
+
+ def tx(self, data):
+ if self.color:
+ self.output.write(self.tx_color)
+ self.output.write(data)
+ self.output.flush()
+
+ def control(self, name, value):
+ pass
+
+
+class FormatHexdump(object):
+ def __init__(self, output, color):
+ self.start_time = time.time()
+ self.output = output
+ self.color = color
+ self.rx_color = '\x1b[32m'
+ self.tx_color = '\x1b[31m'
+ self.control_color = '\x1b[37m'
+
+ def write_line(self, timestamp, label, value):
+ self.output.write('{:010.3f} {:4} {}\n'.format(timestamp, label, value))
+ self.output.flush()
+
+ def rx(self, data):
+ if self.color:
+ self.output.write(self.rx_color)
+ for row in hexdump(data):
+ self.write_line(time.time() - self.start_time, 'RX', row)
+
+ def tx(self, data):
+ if self.color:
+ self.output.write(self.tx_color)
+ for row in hexdump(data):
+ self.write_line(time.time() - self.start_time, 'TX', row)
+
+ def control(self, name, value):
+ if self.color:
+ self.output.write(self.control_color)
+ self.write_line(time.time() - self.start_time, name, value)
+
+
class Serial(serial.Serial):
"""Just inherit the native Serial port implementation and patch the port property."""
def __init__(self, *args, **kwargs):
super(Serial, self).__init__(*args, **kwargs)
- self.output = sys.stderr
- self.hexlify = False
- self.color = False
- self.rx_color = '\x1b[32m'
- self.tx_color = '\x1b[31m'
+ self.formatter = None
@serial.Serial.port.setter
def port(self, value):
@@ -52,38 +138,78 @@ class Serial(serial.Serial):
if parts.scheme != 'spy':
raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
# process options now, directly altering self
+ formatter = FormatHexdump
+ color = False
+ output = sys.stderr
for option, values in urlparse.parse_qs(parts.query, True).items():
if option == 'dev':
- self.output = open(values[0], 'w')
+ output = open(values[0], 'w')
elif option == 'color':
- self.color = True
- elif option == 'hex':
- self.hexlify = True
+ color = True
+ elif option == 'raw':
+ formatter = FormatRaw
+ self.formatter = formatter(output, color)
return ''.join([parts.netloc, parts.path])
def write(self, tx):
- if self.color:
- self.output.write(self.tx_color)
- if self.hexlify:
- self.output.write(tx.encode('hex'))
- else:
- self.output.write(tx)
- self.output.flush()
+ self.formatter.tx(tx)
return super(Serial, self).write(tx)
def read(self, size=1):
rx = super(Serial, self).read(size)
if rx:
- if self.color:
- self.output.write(self.rx_color)
- if self.hexlify:
- self.output.write(rx.encode('hex'))
- else:
- self.output.write(rx)
- self.output.flush()
+ self.formatter.rx(rx)
return rx
+ def flush(self):
+ self.formatter.control('FLSH', 'flush')
+ super(Serial, self).flush()
+
+ def flushInput(self):
+ self.formatter.control('FLSH', 'flushInput')
+ super(Serial, self).flush()
+
+ def flushOutput(self):
+ self.formatter.control('FLSH', 'flushOutput')
+ super(Serial, self).flushOutput()
+
+ def sendBreak(self, duration=0.25):
+ self.formatter.control('FLSH', 'sendBreak')
+ super(Serial, self).sendBreak(duration)
+
+ def setBreak(self, level=1):
+ self.formatter.control('BRK', 'active' if level else 'inactive')
+ super(Serial, self).setBreak(level)
+
+ def setRTS(self, level=1):
+ self.formatter.control('RTS', 'active' if level else 'inactive')
+ super(Serial, self).setRTS(level)
+
+ def setDTR(self, level=1):
+ self.formatter.control('DTR', 'active' if level else 'inactive')
+ super(Serial, self).setDTR(level)
+
+ def getCTS(self):
+ level = super(Serial, self).getCTS()
+ self.formatter.control('CTS', 'active' if level else 'inactive')
+ return level
+
+ def getDSR(self):
+ level = super(Serial, self).getDSR()
+ self.formatter.control('DSR', 'active' if level else 'inactive')
+ return level
+
+ def getRI(self):
+ level = super(Serial, self).getRI()
+ self.formatter.control('RI', 'active' if level else 'inactive')
+ return level
+
+ def getCD(self):
+ self.formatter.control('CD', 'active' if level else 'inactive')
+ level = super(Serial, self).getCD()
+ return level
+
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
if __name__ == '__main__':
s = Serial(None)