summaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
authorGary E. Miller <gem@rellim.com>2018-09-24 15:21:42 -0700
committerGary E. Miller <gem@rellim.com>2018-09-24 15:21:42 -0700
commit52a27d71f19563a40270b25ac1d127529e0a2360 (patch)
tree0c83b7d148f051bc277ba0dba79d4797f98e7d6a /contrib
parentb02507f4f5f6c5fe36e3f7609308706818e075a8 (diff)
downloadgpsd-52a27d71f19563a40270b25ac1d127529e0a2360.tar.gz
ubxtool/zerk: install these two programs by default.
Diffstat (limited to 'contrib')
-rwxr-xr-xcontrib/ubxtool2377
-rwxr-xr-xcontrib/zerk1896
2 files changed, 0 insertions, 4273 deletions
diff --git a/contrib/ubxtool b/contrib/ubxtool
deleted file mode 100755
index b8c80e95..00000000
--- a/contrib/ubxtool
+++ /dev/null
@@ -1,2377 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: UTF-8
-'''
-ubxtool -- u-blox configurator and packet decoder
-
-usage: ubxtool [OPTIONS] [server[:port[:device]]]
-'''
-
-# This file is Copyright (c) 2018 by the GPSD project
-# BSD terms apply: see the file COPYING in the distribution root for details.
-#
-# This code runs compatibly under Python 2 and 3.x for x >= 2.
-# Preserve this property!
-#
-# ENVIRONMENT:
-# Options in the UBXOPTS environment variable will be parsed before
-# the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED'
-#
-# To see what constellations are enabled:
-# ubxtool -p GNSS -f /dev/ttyXX
-#
-# To disable GALILEO and enable GALILEO:
-# ubxtool -d GLONASS -f /dev/ttyXX
-# ubxtool -e GALILEO -f /dev/ttyXX
-#
-# To read GPS messages a log file:
-# ubxtool -v 2 -f test/daemon/ublox-neo-m8n.log
-
-from __future__ import absolute_import, print_function, division
-
-import binascii # for binascii.hexlify()
-import getopt # for getopt.getopt(), to parse CLI options
-import os # for os.environ
-import socket # for socket.error
-import stat # for stat.S_ISBLK()
-import struct # for pack()
-import sys
-import time
-
-PROG_NAME = 'ubxtool'
-
-try:
- import serial
-except ImportError:
- # treat serial as special since it is not part of standard Python
- sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME)
- sys.exit(2)
-
-try:
- import gps
-except ImportError:
- # PEP8 says local imports last
- sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" %
- PROG_NAME)
- sys.exit(2)
-
-gps_version = '3.18-dev'
-if gps.__version__ != gps_version:
- sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" %
- (PROG_NAME, gps_version, gps.__version__))
- sys.exit(1)
-
-
-VERB_QUIET = 0 # quiet
-VERB_NONE = 1 # just output requested data and some info
-VERB_DECODE = 2 # decode all messages
-VERB_INFO = 3 # more info
-VERB_RAW = 4 # raw info
-VERB_PROG = 5 # program trace
-
-# dictionary to hold all user options
-opts = {
- # command to send to GPS, -c
- 'command': None,
- # command for -d disable
- 'disable': None,
- # command for -e enable
- 'enable': None,
- # default input -f file
- 'input_file_name': None,
- # default forced wait? -W
- 'input_forced_wait': False,
- # default port speed -s
- 'input_speed': 9600,
- # default input wait time -w in seconds
- 'input_wait': 2.0,
- # optional mode to -p P
- 'mode': None,
- # the name of an OAF file, extension .jpo
- 'oaf_name': None,
- # poll command -p
- 'poll': None,
- # raw log file name
- 'raw_file': None,
- # open port read only -r
- 'read_only': False,
- # speed to set GPS -S
- 'set_speed': None,
- # target gpsd (server:port:device) to connect to
- 'target': {"server": None, "port": gps.GPSD_PORT, "device": None},
- # verbosity level, -v
- 'verbosity': VERB_NONE,
- # contents of environment variable UBXOPTS
- 'progopts': '',
-}
-
-
-class ubx(object):
- "class to hold u-blox stuff"
-
- # when a statement identifier is received, it is stored here
- last_statement_identifier = None
- # expected statement identifier.
- expect_statement_identifier = False
-
- def __init__(self):
- pass
-
- # allowable speeds
- speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600,
- 4800, 2400, 1200, 600, 300)
-
- # UBX Satellite Numbering
- gnss_id = {0: 'GPS',
- 1: 'SBAS',
- 2: 'Galileo',
- 3: 'BeiDou',
- 4: 'IMES',
- 5: 'QZSS',
- 6: 'GLONASS'}
-
- def ack_ack(self, buf):
- "UBX-ACK-ACK decode"
- m_len = len(buf)
- if 2 > m_len:
- return "Bad Length %s" % m_len
-
- u = struct.unpack_from('<BB', buf, 0)
- return ' ACK to: %s' % self.class_id_s(u[0], u[1])
-
- ack_ids = {0: {'str': 'NAK', 'dec': ack_ack, 'name': 'UBX-ACK-NAK'},
- 1: {'str': 'ACK', 'dec': ack_ack, 'name': 'UBX-ACK-ACK'}}
-
- def cfg_ant(self, buf):
- "UBX-CFG-ANT decode"
- m_len = len(buf)
- if 0 == m_len:
- return "Poll request all"
-
- if 4 > m_len:
- return "Bad Length %s" % m_len
-
- u = struct.unpack_from('<HH', buf, 0)
- s = ' flags: %#x pins: %#x (' % u
- if u[0] & 0x1:
- s += 'svcs '
- if u[0] & 0x2:
- s += 'scd '
- if u[0] & 0x4:
- s += 'ocd '
- if u[0] & 0x8:
- s += 'pdwnOnSCD '
- if u[0] & 0x10:
- s += 'recovery '
- s += (')\n pinSwitch: %d, pinSCD: %d, pinOCD: %d reconfig: %d\n' %
- (u[1] & 0x1f, (u[1] >> 5) & 0x1f, (u[1] >> 10) & 0x1f,
- u[1] >> 15))
- return s
-
- def cfg_cfg_mask(self, mask):
- "decode Mask in UBX-CFG-CFG, return string"
- s = ''
- if mask & 0x1:
- s += 'ioPort '
- if mask & 0x2:
- s += 'msgConf '
- if mask & 0x4:
- s += 'infMsg '
- if mask & 0x8:
- s += 'navConf '
- if mask & 0x10:
- s += 'rxmConf '
- if mask & 0x80:
- # not on M8030
- s += 'senConf '
- if mask & 0x100:
- s += 'rinvConf '
- if mask & 0x200:
- s += 'antConf '
- if mask & 0x800:
- s += 'logConf '
- if mask & 0x1000:
- s += 'ftsConf '
-
- return s
-
- def cfg_cfg(self, buf):
- "UBX-CFG-CFG decode"
- m_len = len(buf)
- if 12 > m_len:
- return "Bad Length %s" % m_len
-
- if 12 == m_len:
- u = struct.unpack_from('<LLL', buf, 0)
- else:
- u = struct.unpack_from('<LLLB', buf, 0)
-
- s = ' clearMask: %#x (%s)\n' % (u[0], self.cfg_cfg_mask(u[0]))
- s += (' saveMask: %#x (%s)\n' %
- (u[1], self.cfg_cfg_mask(u[1])))
- s += (' loadMask: %#x (%s)\n' %
- (u[2], self.cfg_cfg_mask(u[2])))
-
- if 13 <= m_len:
- bit_str = ''
- if u[3] & 0x1:
- bit_str += 'devBBR '
- if u[3] & 0x2:
- bit_str += 'devFlash '
- if u[3] & 0x4:
- bit_str += 'devEEPROM '
- if u[3] & 0x10:
- bit_str += 'devSpiFlash '
-
- s += (' deviceMask: %#x (%s)\n' % (u[3], bit_str))
-
- return s
-
- def cfg_gnss(self, buf):
- "UBX-CFG-GNSS decode"
- m_len = len(buf)
- if 0 == m_len:
- return "Poll request"
-
- if m_len < 4:
- return "Bad Length %d" % m_len
-
- u = struct.unpack_from('<BBBB', buf, 0)
- s = " Ver: %u ChHw; %x ChUse: %x, Blocks: %x" % u
- num_blocks = u[3]
- i = 0
- while i < num_blocks:
- u = struct.unpack_from('<BBBBBBBB', buf, 4 + (i*8))
- sat = u[0]
- if u[0] in self.gnss_id:
- s_sat = self.gnss_id[u[0]]
- else:
- s_sat = u[0]
- s += ("\n gnssId: %s TrkCh: %d maxTrCh: %d,"
- " Flags: %#02x %02x %02x %02x" %
- (s_sat, u[1], u[2], u[7], u[6], u[5], u[4]))
- if sat in (0, 1):
- # gps, sbas
- if u[6] & 0x1:
- s += '\n L1C/A'
- if 2 == sat:
- # Galileo
- if u[6] & 0x1:
- s += '\n E1OS'
- if 3 == sat:
- # BeiDou
- if u[6] & 0x1:
- s += '\n B1I'
- if 4 == sat:
- # IMES
- if u[6] & 0x1:
- s += '\n L1'
- if 5 == sat:
- # QZSS
- if u[6] & 0x5:
- s += '\n'
- if u[6] & 0x1:
- s += ' L1C/A'
- if u[6] & 0x4:
- s += ' L1SAIF'
- if 6 == sat:
- # Glonass
- if u[6] & 0x1:
- s += '\n L1OF'
- if u[4] & 0x01:
- s += ' enabled'
-
- i += 1
- return s
-
- def cfg_nav5(self, buf):
- "UBX-CFG-NAV5 nav Engine Settings"
- m_len = len(buf)
- if 36 > m_len:
- return "Bad Length %s" % m_len
-
- u = struct.unpack_from('<HBBlLbBHHHHbbbbHHb', buf, 0)
- s = (' mask %#x dynModel %u fixmode %d fixedAlt %d FixedAltVar %u\n'
- ' minElev %d drLimit %u pDop %u tDop %u pAcc %u tAcc %u\n'
- ' staticHoldThresh %u dgpsTimeOut %u cnoThreshNumSVs %u\n'
- ' cnoThresh %u res %u staticHoldMacDist %u utcStandard %u' % u)
- return s
-
- def cfg_msg(self, buf):
- "UBX-CFG-MSG decode "
- m_len = len(buf)
- if 2 == m_len:
- u = struct.unpack_from('<BB', buf, 0)
- return ' Rate request: %s' % self.class_id_s(u[0], u[1])
-
- if 3 == m_len:
- u = struct.unpack_from('<BBB', buf, 0)
- return (' Rate set: %s Rate:%d' %
- (self.class_id_s(u[0], u[1]), u[2]))
-
- if 8 != m_len:
- return "Bad Length %s" % m_len
-
- u = struct.unpack_from('<BBBBBBBB', buf, 0)
- s = (' %s Rates: %u %u %u %u %u %u' %
- (self.class_id_s(u[0], u[1]), u[2], u[3], u[4], u[5], u[6], u[7]))
- return s
-
- def cfg_pms(self, buf):
- "UBX-CFG-PMS decode, Power Mode Setup"
-
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 8 > m_len:
- return "Bad Length %s" % m_len
-
- values = {0: "Full power",
- 1: "Balanced",
- 2: "Interval",
- 3: "Aggresive with 1Hz",
- 4: "Aggresive with 2Hz",
- 5: "Aggresive with 4Hz",
- 0xff: "Invalid"
- }
- u = struct.unpack_from('<BBHHBB', buf, 0)
- s = (' version: %u powerSetupValue: %u'
- ' period: %u onTime: %#x reserved1[%u %u]' % u)
- if u[0] in values:
- s += "\n powerSetupValue: %s" % values[u[0]]
-
- return s
-
- def cfg_prt(self, buf):
- "UBX-CFG-PRT decode"
-
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 1 == m_len:
- return " Poll request PortID %d" % buf[0]
-
- if 20 > m_len:
- return "Bad Length %s" % m_len
-
- u = struct.unpack_from('<BBHLLHHH', buf, 0)
- s = (' PortID: %u reserved1 %u\n'
- ' txReady: %#x mode: %#x baudRate: %u inProtoMask: %#x'
- ' outProtoMask: %#x\n flags: %#x' % u)
- return s
-
- def cfg_sbas(self, buf):
- "UBX-CFG-SBAS decode"
-
- m_len = len(buf)
- if 8 > m_len:
- return "Bad Length %s" % m_len
-
- u = struct.unpack_from('<BBBBL', buf, 0)
- return (' mode: %#x usage: %#x maxSBAS: %u scanMode2: %#x'
- ' scanMode1: %#x' % u)
-
- def cfg_tmode2(self, buf):
- "UBX-CFG-TMODE2 decode, Time Mode Settings 2"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 28 > m_len:
- return "Bad Length %s" % m_len
-
- u = struct.unpack_from('<BBHlllLLL', buf, 0)
- s = (' timeMode: %u reserved1: %u usage: %#x\n'
- ' ecefXOrLat: %d ecefYOrLon: %d ecefZOrLon: %d\n'
- ' fixeedPosAcc %u svinMinDur %u svinAccLimit %u\n' % u)
- return s
-
- def cfg_tp5(self, buf):
- "UBX-CFG-TP5 decode, Time Pulse Parameters"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request tpIdx 0"
-
- if 1 == m_len:
- return " Poll request tpIdx %d" % buf[0]
-
- if 32 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<BBhhhLLLLLL', buf, 0)
- s = ('tpIdx: %u, version: %u reserved1[%u %u]\n'
- ' antCableDelay: %u rfGroupDelay %u freqPeriod: %u '
- 'freqPeriod %u\n'
- ' pulseLenRatio: %u pulseLenRationLock %u userConfigDelay: %u\n'
- 'Flags: %#x\n ' % u)
-
- if 0x01 & u[10]:
- s += 'active, '
- else:
- s += 'inactive, '
- if 0x02 & u[10]:
- s += 'lockGnsFreq, '
- if 0x04 & u[10]:
- s += 'lockedOtherSet, '
- if 0x08 & u[10]:
- s += 'is frequency, '
- else:
- s += 'is period, '
- if 0x10 & u[10]:
- s += 'is pulse length\n '
- else:
- s += 'is duty cycle\n '
- if 0x20 & u[10]:
- s += 'alignToTow, '
- if 0x40 & u[10]:
- s += 'rising, '
- else:
- s += 'falling, '
- gridToGps = (u[10] >> 7) & 0x0f
- gridToGpsDec = ('UTC', 'GPS', 'Glonass', 'BeiDou')
- syncMode = (u[10] >> 11) & 0x03
- s += "gridToGps %s, syncMode %d " % (gridToGpsDec[gridToGps], syncMode)
-
- return s
-
- def cfg_usb(self, buf):
- "UBX-CFG-USB decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 108 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<HHHHHH', buf, 0)
- s = (' vendorID: %#x productID: %#x reserved1[%u %u]'
- ' reserved2[%u %u]\n'
- ' powerConsumption %u mA flags: %#x ' % u)
- if 0x01 & u[5]:
- s += "reEnum, "
- if 0x02 & u[5]:
- s += "self-powered"
- else:
- s += "bus-powered"
-
- s += '\nvendorString: %s\n' % gps.polystr(buf[12:43])
- s += 'productString: %s\n' % gps.polystr(buf[44:75])
- s += 'serialNumber: %s' % gps.polystr(buf[76:107])
- return s
-
- cfg_ids = {0: {'str': 'PRT', 'dec': cfg_prt, 'name': 'UBX-CFG-PRT'},
- 1: {'str': 'MSG', 'dec': cfg_msg, 'name': 'UBX-CFG-MSG'},
- 2: {'str': 'INF', 'name': 'UBX-CFG-INF'},
- 4: {'str': 'RST', 'name': 'UBX-CFG-RST'},
- 6: {'str': 'DAT', 'name': 'UBX-CFG-DAT'},
- 8: {'str': 'RATE', 'name': 'UBX-CFG-RATE'},
- 9: {'str': 'CFG', 'dec': cfg_cfg, 'name': 'UBX-CFG-CFG'},
- 0x11: {'str': 'RXM', 'name': 'UBX-CFG-RXM'},
- 0x13: {'str': 'ANT', 'dec': cfg_ant, 'name': 'UBX-CFG-ANT'},
- 0x16: {'str': 'SBAS', 'dec': cfg_sbas, 'name': 'UBX-CFG-SBAS'},
- 0x17: {'str': 'NMEA', 'name': 'UBX-CFG-NMEA'},
- 0x1b: {'str': 'USB', 'dec': cfg_usb, 'name': 'UBX-CFG-USB'},
- 0x1e: {'str': 'ODO', 'name': 'UBX-CFG-ODO'},
- 0x23: {'str': 'NAVX5', 'name': 'UBX-CFG-NAVX5'},
- 0x24: {'str': 'NAV5', 'dec': cfg_nav5, 'name': 'UBX-CFG-NAV5'},
- 0x31: {'str': 'TP5', 'dec': cfg_tp5, 'name': 'UBX-CFG-TP5'},
- 0x34: {'str': 'RINV', 'name': 'UBX-CFG-RINV'},
- 0x39: {'str': 'ITFM', 'name': 'UBX-CFG-ITFM'},
- 0x3b: {'str': 'PM2', 'name': 'UBX-CFG-PM2'},
- 0x3d: {'str': 'TMODE2', 'dec': cfg_tmode2,
- 'name': 'UBX-CFG-TMODE2'},
- 0x3e: {'str': 'GNSS', 'dec': cfg_gnss, 'name': 'UBX-CFG-GNSS'},
- 0x47: {'str': 'LOGFILTER', 'name': 'UBX-CFG-LOGFILTER'},
- 0x53: {'str': 'TXSLOT', 'name': 'UBX-CFG-TXSLOT'},
- 0x57: {'str': 'PWR', 'name': 'UBX-CFG-PWR'},
- 0x5c: {'str': 'HNR', 'name': 'UBX-CFG-HNR'},
- 0x60: {'str': 'ESRC', 'name': 'UBX-CFG-ESRC'},
- 0x61: {'str': 'DOSC', 'name': 'UBX-CFG-OSC'},
- 0x62: {'str': 'SMGR', 'name': 'UBX-CFG-SMGR'},
- 0x69: {'str': 'GEOFENCE', 'name': 'UBX-CFG-GEOFENCE'},
- 0x70: {'str': 'DGNSS', 'name': 'UBX-CFG-DGNSS'},
- 0x71: {'str': 'TMODE3', 'name': 'UBX-CFG-TMODE3'},
- 0x84: {'str': 'FIXSEED', 'name': 'UBX-CFG-FIXSEED'},
- 0x85: {'str': 'DYNSEED', 'name': 'UBX-CFG-DYNSEED'},
- 0x86: {'str': 'PMS', 'dec': cfg_pms, 'name': 'UBX-CFG-PMS'},
- }
-
- def inf_debug(self, buf):
- "UBX-INF-DEBUG decode"
- return ' Debug: ' + gps.polystr(buf)
-
- def inf_error(self, buf):
- "UBX-INF-ERROR decode"
- return ' Error: ' + gps.polystr(buf)
-
- def inf_notice(self, buf):
- "UBX-INF-NOTICE decode"
- return ' Notice: ' + gps.polystr(buf)
-
- def inf_test(self, buf):
- "UBX-INF-TET decode"
- return ' Test: ' + gps.polystr(buf)
-
- def inf_warning(self, buf):
- "UBX-INF-WARNING decode"
- return ' Warning: ' + gps.polystr(buf)
-
- inf_ids = {0x0: {'str': 'ERROR', 'dec': inf_error,
- 'name': 'UBX-INF-ERROR'},
- 0x1: {'str': 'WARNING', 'dec': inf_warning,
- 'name': 'UBX-INF-WARNING'},
- 0x2: {'str': 'NOTICE', 'dec': inf_notice,
- 'name': 'UBX-INF-NOTICE'},
- 0x3: {'str': 'TEST', 'dec': inf_test,
- 'name': 'UBX-INF-TEST'},
- 0x4: {'str': 'DEBUG', 'dec': inf_debug,
- 'name': 'UBX-INF-DEBUG'},
- }
-
- def mon_ver(self, buf):
- "UBX-MON-VER decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 40 > m_len:
- return " Bad Length %s" % m_len
-
- substr = buf.split('\0')[0]
- s = ' swVersion: %s\n' % (substr)
- substr = buf[30:39]
- substr = substr.split('\0')[0]
- s += ' hwVersion: %s' % (substr[30:39])
- # extensions??
- num_ext = int((m_len - 40) / 30)
- i = 0
- while i < num_ext:
- loc = 40 + (i * 30)
- substr = buf[loc:]
- substr = substr.split('\0')[0]
- s += '\n extension: %s' % (substr)
- i += 1
- return s
-
- mon_ids = {2: {'str': 'IO', 'name': 'UBX-MON-IO'},
- 4: {'str': 'VER', 'dec': mon_ver, 'name': 'UBX-MON-VER'},
- 6: {'str': 'MSGPP', 'name': 'UBX-MON-MSGPP'},
- 7: {'str': 'RXBUF', 'name': 'UBX-MON-RXBUF'},
- 8: {'str': 'TXBUF', 'name': 'UBX-MON-TXBUF'},
- 9: {'str': 'HW', 'name': 'UBX-MON-HW'},
- 0x0b: {'str': 'HW2', 'name': 'UBX-MON-HW2'},
- 0x21: {'str': 'RXR', 'name': 'UBX-MON-RXR'},
- 0x27: {'str': 'PATCH', 'name': 'UBX-MON-PATCH'},
- 0x28: {'str': 'GNSS', 'name': 'UBX-MON-GNSS'},
- 0x2e: {'str': 'SMGR', 'name': 'UBX-MON-SMGR'},
- }
-
- def nav_clock(self, buf):
- "UBX-NAV-CLOCK decode, Clock Solution"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 20 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LllLL', buf, 0)
- return (' iTOW:%d ms, clkB:%d ns clkD:%d ns/s tAcc:%d ns,'
- 'fAcc:%d ns/s' % u)
-
- def nav_dgps(self, buf):
- "UBX-NAV-DGPS decode, DGPS Data used for NAV"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 16 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LlhhBBBB', buf, 0)
- s = (' iTOW:%d ms, age:%d ms, baseID:%d basehealth:%d numCh:%d\n'
- ' status:%#x reserved1[%u %u]' % u)
-
- m_len -= 16
- i = 0
- while 0 < m_len:
- u = struct.unpack_from('<BbHff', buf, 16 + i * 12)
- # dunno how to do R4
- s += ('\n svid %3u flags %#4x ageC:%d ms prc:%f prcc:%f' % u)
- m_len -= 12
- i += 1
-
- return s
-
- def nav_dop(self, buf):
- "UBX-NAV-DOP decode, Dilution of Precision"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 18 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<Lhhhhhhh', buf, 0)
- s = (' iTOW:%d ms, gDOP:%.2f pDOP:%.2f tDOP:%.2f vDOP:%.2f\n'
- ' hDOP:%.2f nDOP:%.2f eDOP:%.2f' %
- (u[0], u[1] / 100.0, u[2] / 100.0, u[3] / 100.0,
- u[4] / 100.0, u[5] / 100.0, u[6] / 100.0, u[7] / 100.0))
- return s
-
- def nav_navx5(self, buf):
- "UBX-CFG-NAVX5 decode"
-
- # length == 20 case seems broken?
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 20 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<HHLHBBBBBHBH', buf, 0)
- s = (' version %u mask1 %#x mask2 %#x minSVs %d maxSVs %d minCNO %u\n'
- ' iniFix3D %u ackAiding %u wknRollover %u' %
- (u[0], u[1], u[2], u[4], u[5], u[6], u[8], u[10], u[11]))
-
- if 40 <= m_len:
- u = struct.unpack_from('<BBHHLHBB', buf, 20)
- s = ('\n usePPP %d aopCfg %d aopOrbMaxErr %u useAdr %u' %
- (u[0], u[1], u[3], u[7]))
- return s
-
- def nav_posecef(self, buf):
- "UBX-NAV-POSECEF decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 20 != m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LlllL', buf, 0)
- return ' iTOW:%d ms, ecefX:%d cm Y:%d cm Z:%d cm\n pAcc:%d cm' % u
-
- def nav_pvt(self, buf):
- "UBX-NAV-PVT decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 92 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LHBBBBBBLlBBBBllllLLlllllLLHbbbbbblhH',
- buf, 0)
- s = (' iTOW %d ms, time %d/%d/%d %2d:%2d:%2d valid %#x\n'
- ' tAcc %d ns nano %d fixType %d flags %#x flags2 %#x\n'
- ' numSV %d lon %.7f lat %.7f height %.3f\n'
- ' hMSL %.3f hAcc %.3f' %
- (u[0], u[1], u[2], u[3], u[4], u[5], u[6], u[7],
- u[8], u[9], u[10], u[11], u[12],
- u[13], u[14] * 10e-7, u[15] * 10e-7, u[16] / 1000,
- u[17] / 1000, u[18] / 1000))
- return s
-
- def nav_sat(self, buf):
- "UBX-NAV-SAT decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 8 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LBB', buf, 0)
- s = ' iTOW %d ms, version %d numSvs %d' % u
-
- m_len -= 8
- i = 0
- while 0 < m_len:
- u = struct.unpack_from('<BBBbhhL', buf, 8 + i * 12)
- s += ('\n gnssd %d svid %3d cno %2d elev %3d azim %3d prRes %6d'
- ' flags %#x\n' % u)
- if 0 < u[6]:
- s += ' '
- s += 'qualityInd %u ' % (0x07 & u[6])
- if 8 & u[6]:
- s += 'svUsed '
- s += 'health %u ' % (0x03 & (u[6] >> 4))
- if 0x40 & u[6]:
- s += 'diffCorr '
- if 0x80 & u[6]:
- s += 'smoothed '
- s += 'orbitSource %u ' % (0x07 & (u[6] >> 8))
- if 0x800 & u[6]:
- s += 'ephAvail '
- if 0x1000 & u[6]:
- s += 'almAvail '
- if 0x2000 & u[6]:
- s += 'anoAvail '
- if 0x4000 & u[6]:
- s += 'aopAvail '
-
- if 0x730000 & u[6]:
- s += '\n'
- if 0x10000 & u[6]:
- s += 'sbasCorrused '
- if 0x20000 & u[6]:
- s += 'rtcmCorrused '
- if 0x1000000 & u[6]:
- s += 'prCorrused '
- if 0x2000000 & u[6]:
- s += 'crCorrused '
- if 0x4000000 & u[6]:
- s += 'doCorrused '
- m_len -= 12
- i += 1
-
- return s
-
- def nav_sbas(self, buf):
- "UBX-NAV-SBAS decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 12 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LBBbBb', buf, 0)
- s = (' iTOW:%d ms, geo:%u mode:%#x, sys:%#x service:%#x cnt:%d' % u)
-
- m_len -= 12
- i = 0
- while 0 < m_len:
- u = struct.unpack_from('<BBBBBBhhh', buf, 12 + (i * 12))
- s += ('\n svid %3d flags %#4x udre:%#2x svSys:%3d syService:%2d'
- ' prc:%3d ic:%3d' %
- (u[0], u[1], u[2], u[3], u[4], u[6], u[8]))
- if 0x0f & u[4]:
- s += '\n svService: '
- if 1 & u[4]:
- s += 'Ranging '
- if 2 & u[4]:
- s += 'Corrections '
- if 4 & u[4]:
- s += 'Integrity '
- if 8 & u[4]:
- s += 'Testmode'
- m_len -= 12
- i += 1
-
- return s
-
- fix_types = ('None', 'Dead Reckoning', '2D', '3D', 'GPS+DR', 'Surveyed')
-
- def nav_sol(self, buf):
- "UBX-NAV-SOL decode deprecated by u-blox"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 52 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LlhBBlllLlllLHBBBBB', buf, 0)
- s = (' iTOW:%u ms, fTOW %u ns, week:%d gpsFix:%d flags:%#x\n'
- ' ECEF X:%.3f Y:%.3f Z:%.3f pAcc:%.3f\n'
- ' VECEF X:%.3f Y:%.3f Z:%.3f vAcc:%.3f\n'
- ' pDOP:%.2f numSV:%d' %
- (u[0], u[1], u[2], u[3], u[4],
- u[5] / 100.0, u[6] / 100.0, u[7] / 100.0, u[8] / 100.0,
- u[9] / 100.0, u[10] / 100.0, u[11] / 100.0, u[12] / 100.0,
- u[13] / 100.0, u[15]))
- if u[3] < len(self.fix_types):
- s += '\n gpsFix: ' + self.fix_types[u[3]]
- if 0x0f & u[4]:
- s += '\n flags: '
- if 1 & u[4]:
- s += 'GPSfixOK '
- if 2 & u[4]:
- s += 'DiffSoln '
- if 4 & u[4]:
- s += 'WKNSET '
- if 8 & u[4]:
- s += 'TOWSET'
- return s
-
- def nav_status(self, buf):
- "UBX-NAV-STATUS decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 16 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LBBBBLL', buf, 0)
- return (' iTOW:%d ms, fix:%d flags:%#x fixstat:%#x flags2:%#x\n'
- ' ttff:%d, msss:%d' % u)
-
- def nav_svin(self, buf):
- "UBX-NAV-SVIN decode, Survey-in data"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 40 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<BBBBLLlllbbbBLLBB', buf, 0)
- return (' version %u reserved1[%u %u %u] iTOW %u dur %u\n'
- ' meanX %d meanY %d meanZ %d\n'
- ' meanXHP %d meanYHP %d meanZHP %d reserved2 %u meanAcc %u\n'
- ' obs %u valid %u active %u\n' % u)
-
- def nav_svinfo(self, buf):
- "UBX-NAV-SVINFO decode"
- m_len = len(buf)
- if 0 == m_len:
- return "Poll request"
-
- if 8 > m_len:
- return "Bad Length %s" % m_len
-
- u = struct.unpack_from('<Lbb', buf, 0)
- s = ' iTOW:%d ms, numCh:%d globalFlags:%d' % u
-
- m_len -= 8
- i = 0
- while 0 < m_len:
- u = struct.unpack_from('<BBBBBbhl', buf, 8 + i * 12)
- s += ('\n chn %3d svid %3d flags %#0.2x quality %#x cno %2d'
- ' elev %3d azim %3d prRes %6d' % u)
- if 0 < u[2]:
- s += '\n '
- if 1 & u[2]:
- s += 'svUsed '
- if 2 & u[2]:
- s += 'diffCorr '
- if 4 & u[2]:
- s += 'orbitAvail '
- if 8 & u[2]:
- s += 'orbitEph '
- if 0x10 & u[2]:
- s += 'unhealthy '
- if 0x20 & u[2]:
- s += 'orbitAlm '
- if 0x40 & u[2]:
- s += 'orbitAop '
- if 0x80 & u[2]:
- s += 'smoothed '
- m_len -= 12
- i += 1
-
- return s
-
- def nav_timebds(self, buf):
- "UBX-NAV-TIMEBDS decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 20 > m_len:
- return ".Bad Length %s" % m_len
-
- u = struct.unpack_from('<LLlhbBL', buf, 0)
- return (' iTOW:%d ms, SOW:%d fSOW:%d week %d leapS:%d Valid:%#x\n'
- 'tAcc:%d ns' % u)
-
- def nav_timegal(self, buf):
- "UBX-NAV-TIMEGAL decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 20 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LLlhbBL', buf, 0)
- return (' iTOW:%d ms, galTOW:%.9f galWno:%d leapS:%d Valid:%#x\n'
- ' tAcc:%d ns' %
- (u[0], u[1] + (u[2] * 10e-9), u[3], u[4], u[5], u[6]))
-
- def nav_timegps(self, buf):
- "UBX-NAV-TIMEGPS decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 16 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LlhbBL', buf, 0)
- s = (' iTOW:%u ms, fTOW:%u ns, week:%d leapS:%d valid:%#x tAcc:%d ns' %
- u)
- if 0x07 & u[4]:
- s += '\n valid: '
- if 1 & u[4]:
- s += 'towValid '
- if 2 & u[4]:
- s += 'weekValid '
- if 4 & u[4]:
- s += 'leapValid '
- return s
-
- def nav_timeutc(self, buf):
- "UBX-NAV-TIMEUTC decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 20 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LLlHbbbbbb', buf, 0)
- return (' iTOW:%d ms, tAcc:%d ns nano:%d ns Time: %d/%d/%d %d:%d:%d\n'
- ' valid:%#x' % u)
-
- def nav_velecef(self, buf):
- "UBX-NAV-VELECEF decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 20 != m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LlllL', buf, 0)
- return (' iTOW:%d ms,'
- ' ecefVX:%.2f m/s VY:%.2f m/s VZ:%.2f m/s vAcc:%.2f m/s' %
- (u[0], u[1] / 100.0, u[2] / 100.0, u[3] / 100.0,
- u[4] / 100.0))
-
- def nav_velned(self, buf):
- "UBX-NAV-VELNED decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 36 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LlllLLlLL', buf, 0)
- return (' iTOW:%d ms,'
- ' velN:%d cm/s velE:%d cm/s velD:%d cm/s speed:%d cm/s\n'
- ' gspeed:%d cm/s heading:%f cm/s sAcc:%d cm/s cAcc:%d deg' %
- (u[0], u[1], u[2], u[3], u[4], u[5], u[6] * 1e-5,
- u[7], u[8] * 1e-5))
-
- nav_ids = {1: {'str': 'POSECEF', 'dec': nav_posecef,
- 'name': 'UBX-NAV-POSECEF'},
- 2: {'str': 'POSLLH', 'name': 'UBX-NAV-POSLLH'},
- 3: {'str': 'STATUS', 'dec': nav_status,
- 'name': 'UBX-NAV-STATUS'},
- 0x4: {'str': 'DOP', 'dec': nav_dop, 'name': 'UBX-NAV-DOP'},
- 0x5: {'str': 'ATT', 'name': 'UBX-NAV-ATT'},
- 0x6: {'str': 'SOL', 'dec': nav_sol, 'name': 'UBX-NAV-SOL'},
- 0x7: {'str': 'PVT', 'dec': nav_pvt, 'name': 'UBX-NAV-PVT'},
- 0x9: {'str': 'ODO', 'name': 'UBX-NAV-ODO'},
- 0x10: {'str': 'RESETODO', 'name': 'UBX-NAV-RESETODO'},
- 0x11: {'str': 'VELECEF', 'dec': nav_velecef,
- 'name': 'UBX-NAV-VELECEF'},
- 0x12: {'str': 'VELNED', 'dec': nav_velned,
- 'name': 'UBX-NAV-VELNED'},
- 0x13: {'str': 'HPPOSECEF', 'name': 'UBX-NAV-HPPOSECEF'},
- 0x14: {'str': 'HPPOSLLH', 'name': 'UBX-NAV-HPPOSLLH'},
- 0x20: {'str': 'TIMEGPS', 'dec': nav_timegps,
- 'name': 'UBX-NAV-TIMEGPS'},
- 0x21: {'str': 'TIMEUTC', 'dec': nav_timeutc,
- 'name': 'UBX-NAV-TIMEUTC'},
- 0x22: {'str': 'CLOCK', 'dec': nav_clock,
- 'name': 'UBX-NAV-CLOCK'},
- 0x23: {'str': 'NAVX5', 'dec': nav_navx5,
- 'name': 'UBX-NAV-NAVX5'},
- 0x24: {'str': 'TIMEBDS', 'dec': nav_timebds,
- 'name': 'UBX-NAV-TIMEBDS'},
- 0x25: {'str': 'TIMEGAL', 'dec': nav_timegal,
- 'name': 'UBX-NAV-TIMEGAL'},
- 0x26: {'str': 'TIMEGLO', 'name': 'UBX-NAV-TIMEGLO'},
- 0x30: {'str': 'SVINFO', 'dec': nav_svinfo,
- 'name': 'UBX-NAV-SVINFO'},
- 0x31: {'str': 'DGPS', 'dec': nav_dgps, 'name': 'UBX-NAV-DGPS'},
- 0x32: {'str': 'SBAS', 'dec': nav_sbas, 'name': 'UBX-NAV-SBAS'},
- 0x34: {'str': 'ORB', 'name': 'UBX-NAV-ORB'},
- 0x35: {'str': 'SAT', 'dec': nav_sat, 'name': 'UBX-NAV-SAT'},
- 0x39: {'str': 'GEOFENCE', 'name': 'UBX-NAV-GEOFENCE'},
- 0x3B: {'str': 'SVIN', 'dec': nav_svin, 'name': 'UBX-NAV-SVIN'},
- 0x3C: {'str': 'RELPOSNED', 'name': 'UBX-NAV-RELPOSNED'},
- 0x60: {'str': 'AOPSTATUS', 'name': 'UBX-NAV-AOPSTATUS'},
- 0x61: {'str': 'EOF', 'name': 'UBX-NAV-EOF'},
- }
-
- def rxm_raw(self, buf):
- "UBX-RXM-RAW decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 8 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LlhhBBBB', buf, 0)
- s = ' iTOW:%d ms weeks:%d numSV:%d' % u
-
- m_len -= 8
- i = 0
- while 0 < m_len:
- u = struct.unpack_from('<ddfBbbB', buf, 8 + i * 24)
- s += ('\n cpMes:%f prMes:%f doMes:%f sv:%d mesQI:%d\n'
- ' eno:%d lli:%d' % u)
- m_len -= 24
- i += 1
-
- return s
-
- def rxm_rawx(self, buf):
- "UBX-RXM-RAWX decode"
- m_len = len(buf)
- if 16 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<dHbBBB', buf, 0)
- s = (' rcvTow %.3f week %u leapS %d numMeas %u recStat %#x'
- ' version %u' % u)
-
- m_len -= 16
- i = 0
- while 0 < m_len:
- u = struct.unpack_from('<ddfBBBBHBBBBB', buf, 16 + i * 32)
- s += ('\n prmes %.3f cpMes %.3f doMes %f\n'
- ' gnssID %u svId %u %u freqId %u locktime %u cno %u\n'
- ' prStdev %u, cpStdev %u doStdev %u trkStat %u' % u)
- m_len -= 32
- i += 1
- return s
-
- def rxm_sfrb(self, buf):
- "UBX-RXM-SFRB decode, Subframe Buffer"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 42 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<BBLLLLLLLLLL', buf, 0)
- s = (' chn:%d s svid %3d\n'
- ' dwrd:%08x %08x %08x %08x %08x\n'
- ' %08x %08x %08x %08x %08x' % u)
-
- return s
-
- def rxm_sfrbx(self, buf):
- "UBX-RXM-SFRBX decode, Broadcast Navigation Data Subframe"
- m_len = len(buf)
-
- if 8 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<BBBBBBBB', buf, 0)
- s = (' gnssId:%u svId %3u reserved1 %u freqId %u numWords %u\n'
- ' reserved2 %u version %u reserved3 %u\n' % u)
- s += ' dwrd'
- for i in range(8, m_len - 1, 4):
- u = struct.unpack_from('<L', buf, i)
- s += " %08x" % u
-
- return s
-
- def rxm_svsi(self, buf):
- "UBX-RXM-SVSI decode, SV Status Info"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 8 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LhBB', buf, 0)
- s = ' iTOW:%d ns week:%d numVis:%d numSV:%d' % u
-
- m_len -= 8
- i = 0
- while 0 < m_len:
- u = struct.unpack_from('<BBhbB', buf, 8 + i * 6)
- s += '\n svid:%3d svFlag:%#x azim:%3d elev:% 3d age:%3d' % u
- m_len -= 6
- i += 1
-
- return s
-
- rxm_ids = {0x10: {'str': 'RAW', 'dec': rxm_raw,
- 'name': 'UBX-RXM-RAW'}, # obsolete
- 0x11: {'str': 'SFRB', 'dec': rxm_sfrb,
- 'name': 'UBX-RXM-SFRB'},
- 0x13: {'str': 'SFRBX', 'dec': rxm_sfrbx,
- 'name': 'UBX-RXM-SFRBX'},
- 0x14: {'str': 'MEASX', 'name': 'UBX-RXM-MEASX'},
- 0x15: {'str': 'RAWX', 'dec': rxm_rawx, 'name': 'UBX-RXM-RAWX'},
- 0x20: {'str': 'SVSI', 'dec': rxm_svsi, 'name': 'UBX-RXM-SVSI'},
- 0x32: {'str': 'RTCM', 'name': 'UBX-RXM-RTCM'},
- 0x41: {'str': 'PMREQ', 'name': 'UBX-RXM-PMREQ'},
- 0x59: {'str': 'RLM', 'name': 'UBX-RXM-RLM'},
- 0x61: {'str': 'IMES', 'name': 'UBX-RXM-IMES'},
- }
-
- def tim_svin(self, buf):
- "UBX-TIM-SVIN decode, Survey-in data"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 28 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LlllLLBB', buf, 0)
- s = (' dur %u meanX %d meanY %d meanZ %d\n'
- ' meanV %u obs %u valid %u active %u' % u)
- return s
-
- def tim_tp(self, buf):
- "UBX-TIM-TP decode"
- m_len = len(buf)
- if 0 == m_len:
- return " Poll request"
-
- if 16 > m_len:
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<LLlHbb', buf, 0)
- s = (' towMS:%u ms, towSubMS:%u qErr:%d ps, week:%d\n'
- ' flags:%#x refInfo:%#x\n flags: ' % u)
-
- if 0x01 & u[4]:
- s += "timeBase is UTC, "
- else:
- s += "timeBase is GNSS, "
- if 0x02 & u[4]:
- s += "UTC available, "
- else:
- s += "UTC not available, "
-
- raim = (u[4] >> 2) & 0x03
- if 0 == raim:
- s += "RAIM not available"
- elif 1 == raim:
- s += "RAIM not active"
- elif 2 == raim:
- s += "RAIM active"
- else:
- s += "RAIM ??"
- return s
-
- tim_ids = {1: {'str': 'TP', 'dec': tim_tp, 'name': 'UBX-TIM-TP'},
- 3: {'str': 'TM2', 'name': 'UBX-TIM-TM2'},
- 4: {'str': 'SVIN', 'dec': tim_svin, 'name': 'UBX-TIM-SVIN'},
- 6: {'str': 'VRFY', 'name': 'UBX-TIM-VRFY'},
- 0x11: {'str': 'DOSC', 'name': 'UBX-TIM-DOSC'},
- 0x12: {'str': 'TOS', 'name': 'UBX-TIM-TOS'},
- 0x13: {'str': 'SMEAS', 'name': 'UBX-TIM-SMEAS'},
- 0x15: {'str': 'VCOCAL', 'name': 'UBX-TIM-VCOCAL'},
- 0x16: {'str': 'FCHG', 'name': 'UBX-TIM-FCHG'},
- 0x17: {'str': 'HOC', 'name': 'UBX-TIM-HOC'},
- }
-
- classes = {
- 0x01: {'str': 'NAV', 'ids': nav_ids},
- 0x02: {'str': 'RXM', 'ids': rxm_ids},
- 0x04: {'str': 'INF', 'ids': inf_ids},
- 0x05: {'str': 'ACK', 'ids': ack_ids},
- 0x06: {'str': 'CFG', 'ids': cfg_ids},
- 0x09: {'str': 'UPD'},
- 0x0A: {'str': 'MON', 'ids': mon_ids},
- 0x0B: {'str': 'ATD'},
- 0x0D: {'str': 'TIM', 'ids': tim_ids},
- 0x10: {'str': 'ESF'},
- 0x13: {'str': 'MGA'},
- 0x21: {'str': 'LOG'}
- }
-
- def class_id_s(self, m_class, m_id):
- "Return class and ID numbers as a string."
- s = 'Class: '
- if m_class not in self.classes:
- s += '%#x ID: %#x' % (m_class, m_id)
- return s
-
- if 'str' in self.classes[m_class]:
- s_class = self.classes[m_class]['str']
- s += '%s(%#x) ' % (s_class, m_class)
- else:
- s += '%#x ' % (m_class)
-
- if (('ids' in self.classes[m_class] and
- m_id in self.classes[m_class]['ids'] and
- 'str' in self.classes[m_class]['ids'][m_id])):
- s_id = self.classes[m_class]['ids'][m_id]['str']
- s += 'ID: %s(%#x)' % (s_id, m_id)
- else:
- s += 'ID: %#x' % (m_id)
-
- return s
-
- def decode_msg(self, out):
- "Decode one message and then return number of chars consumed"
-
- state = 'BASE'
- consumed = 0
-
- # decode state machine
- for this_byte in out:
- consumed += 1
- if isinstance(this_byte, str):
- # a character, probably read from a file
- c = ord(this_byte)
- else:
- # a byte, probably read from a serial port
- c = int(this_byte)
-
- if VERB_RAW <= opts['verbosity']:
- if (ord(' ') <= c) and (ord('~') >= c):
- # c is printable
- print("state: %s char %c (%#x)" % (state, chr(c), c))
- else:
- # c is not printable
- print("state: %s char %#x" % (state, c))
-
- if 'BASE' == state:
- # start fresh
- # place to store 'comments'
- comment = ''
- m_class = 0
- m_id = 0
- m_len = 0
- m_raw = bytearray(0) # class, id, len, payload
- m_payload = bytearray(0) # just the payload
- m_ck_a = 0
- m_ck_b = 0
-
- if 0xb5 == c:
- # got header 1, mu
- state = 'HEADER1'
-
- if ord('$') == c:
- # got $, so NMEA?
- state = 'NMEA'
- comment = '$'
-
- if ord("{") == c:
- # JSON, treat as comment line
- state = 'JSON'
-
- # start fresh
- comment = "{"
- continue
-
- if ord("#") == c:
- # comment line
- state = 'COMMENT'
-
- # start fresh
- comment = "#"
- continue
-
- if (ord('\n') == c) or (ord('\r') == c):
- # CR or LF, leftovers
- return 1
- continue
-
- if state in ('COMMENT', 'JSON'):
- # inside comment
- if ord('\n') == c or ord('\r') == c:
- # Got newline or linefeed
- # terminate messages on <CR> or <LF>
- # Done, got a full message
- if gps.polystr('{"class":"ERROR"') in comment:
- # always print gpsd errors
- print(comment)
- elif VERB_DECODE <= opts['verbosity']:
- print(comment)
- return consumed
- else:
- comment += chr(c)
- continue
-
- if 'NMEA' == state:
- # getting NMEA payload
- if (ord('\n') == c) or (ord('\r') == c):
- # CR or LF, done, got a full message
- # terminates messages on <CR> or <LF>
- if VERB_DECODE <= opts['verbosity']:
- print(comment + '\n')
- return consumed
- else:
- comment += chr(c)
- continue
-
- if ord('b') == c and 'HEADER1' == state:
- # got header 2
- state = 'HEADER2'
- continue
-
- if 'HEADER2' == state:
- # got class
- state = 'CLASS'
- m_class = c
- m_raw.extend([c])
- continue
-
- if 'CLASS' == state:
- # got ID
- state = 'ID'
- m_id = c
- m_raw.extend([c])
- continue
-
- if 'ID' == state:
- # got first length
- state = 'LEN1'
- m_len = c
- m_raw.extend([c])
- continue
-
- if 'LEN1' == state:
- # got second length
- m_raw.extend([c])
- m_len += 256 * c
- if 0 == m_len:
- # no payload
- state = 'CSUM1'
- else:
- state = 'PAYLOAD'
- continue
-
- if 'PAYLOAD' == state:
- # getting payload
- m_raw.extend([c])
- m_payload.extend([c])
- if len(m_payload) == m_len:
- state = 'CSUM1'
- continue
-
- if 'CSUM1' == state:
- # got ck_a
- state = 'CSUM2'
- m_ck_a = c
- continue
-
- if 'CSUM2' == state:
- # ck_b
- state = 'BASE'
- m_ck_b = c
- # check checksum
- chk = self.checksum(m_raw, len(m_raw))
- if (chk[0] != m_ck_a) or (chk[1] != m_ck_b):
- sys.stderr.write("%s: ERROR checksum failed,"
- "was (%d,%d) s/b (%d, %d)\n" %
- (PROG_NAME, m_ck_a, m_ck_b,
- chk[0], chk[1]))
-
- s_payload = ''.join('{:02x} '.format(x) for x in m_payload)
- x_payload = binascii.hexlify(m_payload)
-
- if m_class in self.classes:
- this_class = self.classes[m_class]
- if 'ids' in this_class:
- if m_id in this_class['ids']:
- if 'dec' in this_class['ids'][m_id]:
- dec = this_class['ids'][m_id]['dec']
- s_payload = this_class['ids'][m_id]['name']
- s_payload += ':\n'
- s_payload += dec(self, m_payload)
- else:
- s_payload = x_payload
-
- if VERB_INFO < opts['verbosity']:
- print("%s, len: %#x" %
- (self.class_id_s(m_class, m_id), m_len))
- if VERB_RAW < opts['verbosity']:
- print("payload: %s" % x_payload)
- print("%s\n" % s_payload)
- return consumed
-
- # give up
- state = 'BASE'
-
- # fell out of loop, no more chars to look at
- return 0
-
- def checksum(self, msg, m_len):
- "Calculate u-blox message checksum"
- # the checksum is calculated over the Message, starting and including
- # the CLASS field, up until, but excluding, the Checksum Field:
-
- ck_a = 0
- ck_b = 0
- for c in msg[0:m_len]:
- ck_a += c
- ck_b += ck_a
-
- return [ck_a & 0xff, ck_b & 0xff]
-
- def make_pkt(self, m_class, m_id, m_data):
- "Make a message packet"
- # always little endian, leader, class, id, length
- m_len = len(m_data)
-
- # build core message
- msg = bytearray(m_len + 6)
- struct.pack_into('<BBH', msg, 0, m_class, m_id, m_len)
-
- # copy payload into message buffer
- i = 0
- while i < m_len:
- msg[i + 4] = m_data[i]
- i += 1
-
- # add checksum
- chk = self.checksum(msg, m_len + 4)
- m_chk = bytearray(2)
- struct.pack_into('<BB', m_chk, 0, chk[0], chk[1])
-
- header = b"\xb5\x62"
- return header + msg[:m_len+4] + m_chk
-
- def gps_send(self, m_class, m_id, m_data):
- "Build, and send, a message to GPS"
- m_all = self.make_pkt(m_class, m_id, m_data)
- self.gps_send_raw(m_all)
-
- def gps_send_raw(self, m_all):
- "Send a raw message to GPS"
- if not opts['read_only']:
- io_handle.ser.write(m_all)
- if VERB_QUIET < opts['verbosity']:
- sys.stdout.write("sent:\n")
- if VERB_INFO < opts['verbosity']:
- sys.stdout.write(binascii.hexlify(m_all))
- sys.stdout.write("\n")
- self.decode_msg(m_all)
- sys.stdout.flush()
-
- def send_able_beidou(self, able):
- "dis/enable BeiDou"
- # Two frequency GPS use BeiDou or GLONASS
- # disable, then enable
- gps_model.send_cfg_gnss1(3, able)
-
- def send_able_binary(self, able):
- "dis/enable basic binary messages"
-
- rate = 1 if able else 0
-
- # UBX-NAV-DOP
- m_data = bytearray([0x01, 0x04, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # UBX-NAV-SOL is ECEF. deprecated, use UBX-NAV-PVT instead?
- m_data = bytearray([0x01, 0x06, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # UBX-NAV-TIMEGPS
- m_data = bytearray([0x01, 0x20, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # UBX-NAV-SBAS
- m_data = bytearray([0x01, 0x32, 10])
- gps_model.gps_send(6, 1, m_data)
-
- # UBX-NAV-SVINFO
- m_data = bytearray([0x01, 0x30, 10])
- gps_model.gps_send(6, 1, m_data)
-
- def send_able_ecef(self, able):
- "Enable ECEF messages"
- # set NAV-POSECEF rate
- gps_model.send_cfg_msg(1, 1, able)
- # set NAV-VELECEF rate
- gps_model.send_cfg_msg(1, 0x11, able)
-
- def send_able_gps(self, able):
- "dis/enable GPS/QZSS"
- # GPS and QZSS both on, or both off, together
- # GPS
- gps_model.send_cfg_gnss1(0, able)
- # QZSS
- gps_model.send_cfg_gnss1(5, able)
-
- def send_able_galileo(self, able):
- "dis/enable GALILEO"
- gps_model.send_cfg_gnss1(2, able)
-
- def send_able_glonass(self, able):
- "dis/enable GLONASS"
- # Two frequency GPS use BeiDou or GLONASS
- # disable, then enable
- gps_model.send_cfg_gnss1(6, able)
-
- def send_able_nmea(self, able):
- "dis/enable basic NMEA messages"
-
- rate = 1 if able else 0
-
- # xxGBS
- m_data = bytearray([0xf0, 0x09, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # xxGGA
- m_data = bytearray([0xf0, 0x00, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # xxGGL
- m_data = bytearray([0xf0, 0x01, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # xxGSA
- m_data = bytearray([0xf0, 0x02, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # xxGST
- m_data = bytearray([0xf0, 0x07, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # xxGSV
- m_data = bytearray([0xf0, 0x03, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # xxRMC
- m_data = bytearray([0xf0, 0x04, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # xxVTG
- m_data = bytearray([0xf0, 0x05, rate])
- gps_model.gps_send(6, 1, m_data)
-
- # xxZDA
- m_data = bytearray([0xf0, 0x08, rate])
- gps_model.gps_send(6, 1, m_data)
-
- def send_able_rawx(self, able):
- "dis/enable UBX-RXM-RAWX"
-
- rate = 1 if able else 0
- m_data = bytearray([0x2, 0x15, rate])
- gps_model.gps_send(6, 1, m_data)
-
- def send_able_sbas(self, able):
- "dis/enable SBAS"
- gps_model.send_cfg_gnss1(1, able)
-
- def send_able_tmode2(self, able):
- "UBX-CFG-TMODE2, set time mode 2 config"
-
- m_data = bytearray(28)
- if able:
- # enable survey-in
- m_data[0] = 1
-
- # Survey-in minimum duration seconds 86400
- seconds = 86400
- m_data[20] = seconds & 0x0ff
- seconds >>= 8
- m_data[21] = seconds & 0x0ff
- seconds >>= 8
- m_data[22] = seconds & 0x0ff
- seconds >>= 8
- m_data[23] = seconds & 0x0ff
-
- # Survey-in position accuracy limit 500 mm
- mmeters = 500
- m_data[24] = mmeters & 0x0ff
- mmeters >>= 8
- m_data[25] = mmeters & 0x0ff
- mmeters >>= 8
- m_data[26] = seconds & 0x0ff
- seconds >>= 8
- m_data[27] = mmeters & 0x0ff
- gps_model.gps_send(6, 0x3d, m_data)
-
- def send_able_tp(self, able):
- "dis/enable UBX-TIM-TP Time Pulse"
- rate = 1 if able else 0
- m_data = bytearray([0xd, 0x1, rate])
- gps_model.gps_send(6, 1, m_data)
-
- def send_cfg_ant(self):
- "UBX-CFG-ANT, Get Antenna Control Settings"
-
- m_data = bytearray(0)
- gps_model.gps_send(6, 0x13, m_data)
-
- def send_cfg_cfg(self, save_clear):
- "UBX-CFG-CFG, save config"
-
- # Save: save_clear = 0
- # Clear: save_clear = 1
-
- # basic configs alays available to change:
- # ioPort, msgConf, infMsg, navConf, rxmConf, senConf
- cfg1 = 0x9f
- # rinvConf, antConf, logConf
- cfg2 = 0x03
-
- m_data = bytearray(13)
-
- # clear
- if 0 == save_clear:
- # saving
- m_data[0] = 0
- m_data[1] = 0
- else:
- # clearing
- m_data[0] = cfg1
- m_data[1] = cfg2
- m_data[2] = 0 #
- m_data[3] = 0 #
-
- # save
- if 0 == type:
- # saving
- m_data[4] = cfg1
- m_data[5] = cfg2
- else:
- # clearing
- m_data[4] = 0
- m_data[5] = 0
- m_data[6] = 0 #
- m_data[7] = 0 #
-
- # load
- if False and 0 == type:
- # saving
- m_data[8] = 0
- m_data[9] = 0
- else:
- # clearing, load it to save a reboot
- m_data[8] = cfg1
- m_data[9] = cfg2
- m_data[10] = 0 #
- m_data[11] = 0 #
-
- # deviceMask, where to save it, try all options
- m_data[12] = 0x17 # devBBR, devFLASH devEEPROM, devSpiFlash
-
- gps_model.gps_send(6, 0x9, m_data)
-
- def send_cfg_gnss1(self, gnssId, enable):
- "UBX-CFG-GNSS, set GNSS config"
- m_data = bytearray(12)
- m_data[0] = 0 # version 0, msgVer
- m_data[1] = 0 # read only, numTrkChHw
- m_data[2] = 0xFF # read only, numTrkChUse
- m_data[3] = 1 # 1 block follows
- # block 1
- m_data[4] = gnssId # gnssId
- if 0 == gnssId:
- # GPS
- m_data[5] = 8 # resTrkCh
- m_data[6] = 16 # maxTrkCh
- if 1 == gnssId:
- # SBAS
- m_data[5] = 1 # resTrkCh
- m_data[6] = 3 # maxTrkCh
- if 2 == gnssId:
- # GALILEO
- m_data[5] = 4 # resTrkCh
- m_data[6] = 8 # maxTrkCh
- if 3 == gnssId:
- # BeiDou
- m_data[5] = 2 # resTrkCh
- m_data[6] = 16 # maxTrkCh
- if 4 == gnssId:
- # IMES
- m_data[5] = 0 # resTrkCh
- m_data[6] = 8 # maxTrkCh
- if 5 == gnssId:
- # QZSS
- m_data[5] = 0 # resTrkCh
- m_data[6] = 3 # maxTrkCh
- if 6 == gnssId:
- # GLONASS
- m_data[5] = 8 # resTrkCh
- m_data[6] = 14 # maxTrkCh
- m_data[7] = 0 # reserved1
- m_data[8] = enable # flags
- m_data[9] = 0 # flagflags, unused
- if 5 == gnssId:
- # QZSS
- m_data[10] = 5 # flags E1OS, L1SAIF
- else:
- m_data[10] = 1 # flags E1OS
- m_data[11] = 1 # flags, unused
- gps_model.gps_send(6, 0x3e, m_data)
-
- def send_cfg_gnss(self):
- "UBX-CFG-GNSS, set Galileo config decode"
- m_data = bytearray(0)
- gps_model.gps_send(6, 0x3e, m_data)
-
- def send_cfg_nav5_model(self):
- "UBX-CFG-NAV5, set dynamic platform model"
-
- m_data = bytearray(36)
- m_data[0] = 1 # just setting dynamic model
- m_data[1] = 0 # just setting dynamic model
- m_data[2] = opts["mode"]
-
- gps_model.gps_send(6, 0x24, m_data)
-
- def send_cfg_msg(self, m_class, m_id, rate=None):
- "UBX-CFG-MSG, poll, or set, message rates decode"
- m_data = bytearray(2)
- m_data[0] = m_class
- m_data[1] = m_id
- if rate is not None:
- m_data.extend([rate])
- gps_model.gps_send(6, 1, m_data)
-
- def send_cfg_nav5(self):
- "UBX-CFG-NAV5, get Nav Engine Settings"
- m_data = bytearray(0)
- gps_model.gps_send(6, 0x24, m_data)
-
- def send_cfg_pms(self):
- "UBX-CFG-PMS, Get Power Management Settings"
-
- if opts["mode"] is not None:
- m_data = bytearray(8)
- # set powerSetupValue to mode
- m_data[1] = opts["mode"]
- # leave period and onTime zero, which breaks powerSetupValue = 3
- else:
- m_data = bytearray(0)
-
- gps_model.gps_send(6, 0x86, m_data)
-
- def send_cfg_prt(self):
- "UBX-CFG-PRT, get I/O Port"
- m_data = bytearray(0)
- gps_model.gps_send(6, 0x0, m_data)
-
- def send_set_speed(self, speed):
- "UBX-CFG-PRT, set port"
-
- # FIXME! Poll current masks, then adjust speed
- m_data = bytearray(20)
- m_data[0] = 1 # port 1, UART 1
- # m_data[0] = 3 # port 3, USB
- m_data[4] = 0xc0 # 8N1
- m_data[5] = 0x8 # 8N1
-
- m_data[8] = speed & 0xff
- m_data[9] = (speed >> 8) & 0xff
- m_data[10] = (speed >> 16) & 0xff
- m_data[11] = (speed >> 24) & 0xff
-
- m_data[12] = 3 # in, ubx and nmea
- m_data[14] = 3 # out, ubx and nmea
- gps_model.gps_send(6, 0, m_data)
-
- def send_cfg_rst(self, reset_type):
- "UBX-CFG-RST, reset"
- # always do a hardware reset
- # if on USB, this will disconnect and reconnect, giving you
- # a new tty.
- m_data = bytearray(4)
- m_data[0] = reset_type & 0xff
- m_data[1] = (reset_type >> 8) & 0xff
- gps_model.gps_send(6, 0x4, m_data)
-
- def send_cfg_tmode2(self):
- "UBX-CFG-TMODE2, get time mode 2 configuration"
- m_data = bytearray(0)
- gps_model.gps_send(6, 0x3d, m_data)
-
- def send_cfg_tp5(self):
- "UBX-CFG-TP5, get time0 decodes, timepulse 0 and 1"
- m_data = bytearray(0)
- gps_model.gps_send(6, 0x31, m_data)
- # and timepulse 1
- m_data = bytearray(1)
- m_data[0] = 1
- gps_model.gps_send(6, 0x31, m_data)
-
- def send_cfg_usb(self):
- "UBX-CFG-USB, get USB configuration"
- m_data = bytearray(0)
- gps_model.gps_send(6, 0x1b, m_data)
-
- def send_mon_ver(self):
- "UBX-MON-VER get versions"
- m_data = bytearray(0)
- gps_model.gps_send(0x0a, 0x04, m_data)
-
- def send_nav_posecef(self):
- "UBX-NAV-POSECEF, poll ECEF position"
- m_data = bytearray(0)
- gps_model.gps_send(1, 1, m_data)
-
- def send_nav_velecef(self):
- "UBX-NAV-VELECEF, poll ECEF velocity decode"
- m_data = bytearray(0)
- gps_model.gps_send(1, 0x11, m_data)
-
- def send_tim_svin(self):
- "UBX-TIM-SVIN, get survey in data"
- m_data = bytearray(0)
- gps_model.gps_send(0x0d, 0x04, m_data)
-
- def send_tim_tp(self):
- "UBX-TIM-TP, get time pulse timedata"
- m_data = bytearray(0)
- gps_model.gps_send(0x0d, 0x01, m_data)
-
- able_commands = {
- # en/dis able BeiDou
- "BEIDOU": {"command": send_able_beidou,
- "help": "BeiDou"},
- # en/dis able basic binary messages
- "BINARY": {"command": send_able_binary,
- "help": "basic binary messages"},
- # en/dis able ECEF
- "ECEF": {"command": send_able_ecef,
- "help": "ECEF"},
- # en/dis able GPS
- "GPS": {"command": send_able_gps,
- "help": "GPS and QZSS"},
- # en/dis able GALILEO
- "GALILEO": {"command": send_able_galileo,
- "help": "GALILEO"},
- # en/dis able GLONASS
- "GLONASS": {"command": send_able_glonass,
- "help": "GLONASS"},
- # en/dis able basic NMEA messages
- "NMEA": {"command": send_able_nmea,
- "help": "basic NMEA messages"},
- # en/dis able RAWX
- "RAWX": {"command": send_able_rawx,
- "help": "RAWX measurements"},
- # en/dis able SBAS
- "SBAS": {"command": send_able_sbas,
- "help": "SBAS"},
- # en/dis able TP time pulse message
- "TP": {"command": send_able_tp,
- "help": "TP Time Pulse message"},
- # en/dis able TMODE2 Survey-in
- "TMODE2": {"command": send_able_tmode2,
- "help": "TMODE2"},
- }
- commands = {
- # UBX-CFG-ANT, poll antenna config decode
- "ANT": {"command": send_cfg_ant,
- "help": "UBX-CFG-ANT poll antenna config"},
- # UBX-CFG-RST cold boot
- "COLDBOOT": {"command": send_cfg_rst,
- "help": "UBS-CFG-RST coldboot the GPS",
- "opt": 0xfff},
- # UBX-CFG-GNSS poll gnss config
- "GNSS": {"command": send_cfg_gnss,
- "help": "UBX-CFG-GNSS poll GNSS config"},
- # UBX-CFG-RST hot boot
- "HOTBOOT": {"command": send_cfg_rst,
- "help": "UBX-CFG-RST hotboot the GPS",
- "opt": 0},
- # UBX-CFG-NAV5 set Dynamic Platform Model
- "MODEL": {"command": send_cfg_nav5_model,
- "help": "UBX-CFG-NAV5 set Dynamic Platform Model"},
- # UBX-CFG-NAV5, poll Nav Engine Settings
- "NAV5": {"command": send_cfg_nav5,
- "help": "UBX-CFG-NAV5 poll Nav Engines settings"},
- # UBX-CFG-PMS, poll power management settings
- "PMS": {"command": send_cfg_pms,
- "help": "UBX-CFG-PMS poll power management settings"},
- # UBX-CFG-PRT, poll I/O port number
- "PRT": {"command": send_cfg_prt,
- "help": "UBX-CFG-PRT poll I/O port settings"},
- # UBX-CFG-CFG reset config
- "RESET": {"command": send_cfg_cfg,
- "help": "UBX-CFG-CFG reset config to defaults",
- "opt": 1},
- # UBX-CFG-CFG save config
- "SAVE": {"command": send_cfg_cfg,
- "help": "UBX-CFG-CFG save current config",
- "opt": 0},
- # UBX-TIM-SVIN, get survey in data
- "SVIN": {"command": send_tim_svin,
- "help": "UBX-TIM-SVIN get survey in data"},
- # UBX-CFG-TMODE2, get time mode 2 config
- "TMODE2": {"command": send_cfg_tmode2,
- "help": "UBX-CFG-TMODE2 get time mode 2 config"},
- # UBX-TIM-TP, get time pulse timedata
- "TP": {"command": send_tim_tp,
- "help": "UBX-TIM-TP get time pulse timedata"},
- # UBX-CFG-TP5, get time0 decodes
- "TP5": {"command": send_cfg_tp5,
- "help": "UBX-TIM-TP5 get time pulse decodes"},
- # UBX-CFG-RST warm boot
- "WARMBOOT": {"command": send_cfg_rst,
- "help": "UBX-CFG-RST warmboot the GPS",
- "opt": 1},
- # poll UBX-CFG-USB
- "USB": {"command": send_cfg_usb,
- "help": "UBX-CFG-USB get USB config"},
- # poll UBX-MON-VER
- "VER": {"command": send_mon_ver,
- "help": "UBX-MON-VER get GPS version"},
- }
- # end class ubx
-
-
-class gps_io(object):
- """All the GPS I/O in one place"
-
- Three types of GPS I/O
- 1. read only from a file
- 2. read/write through a device
- 3. read only from a gpsd instance
- """
-
- out = b''
- ser = None
- input_is_device = False
-
- def __init__(self, serial_class):
- "Initialize class"
-
- Serial = serial_class
- # buffer to hold read data
- self.out = b''
-
- # open the input: device, file, or gpsd
- if opts['input_file_name'] is not None:
- # check if input file is a file or device
- try:
- mode = os.stat(opts['input_file_name']).st_mode
- except OSError:
- sys.stderr.write('%s: failed to open input file %s\n' %
- (PROG_NAME, opts['input_file_name']))
- sys.exit(1)
-
- if stat.S_ISCHR(mode):
- # character device, need not be read only
- self.input_is_device = True
-
- if ((opts['disable'] or opts['enable'] or opts['poll'] or
- opts['oaf_name'])):
-
- # check that we can write
- if opts['read_only']:
- sys.stderr.write('%s: read-only mode, '
- 'can not send commands\n' % PROG_NAME)
- sys.exit(1)
- if self.input_is_device is False:
- sys.stderr.write('%s: input is plain file, '
- 'can not send commands\n' % PROG_NAME)
- sys.exit(1)
-
- if opts['target']['server'] is not None:
- # try to open local gpsd
- try:
- self.ser = gps.gpscommon(host=None)
- self.ser.connect(opts['target']['server'],
- opts['target']['port'])
-
- # alias self.ser.write() to self.write_gpsd()
- self.ser.write = self.write_gpsd
-
- # ask for raw, not rare, data
- data_out = b'?WATCH={'
- if opts['target']['device'] is not None:
- # add in the requested device
- data_out += (b'"device":"' + opts['target']['device'] +
- b'",')
- data_out += b'"enable":true,"raw":2}\r\n'
- if VERB_RAW <= opts['verbosity']:
- print("sent: ", data_out)
- self.ser.send(data_out)
- except socket.error as err:
- sys.stderr.write('%s: failed to connect to gpsd %s\n' %
- (PROG_NAME, err))
- sys.exit(1)
-
- elif self.input_is_device:
- # configure the serial connections (the parameters refer to
- # the device you are connecting to)
-
- try:
- self.ser = Serial.Serial(
- baudrate=opts['input_speed'],
- # 8N1 is GREIS default
- bytesize=Serial.EIGHTBITS,
- parity=Serial.PARITY_NONE,
- port=opts['input_file_name'],
- stopbits=Serial.STOPBITS_ONE,
- # read timeout
- timeout=0.05,
- # pyserial Ver 3.0+ changes writeTimeout to write_timeout
- # just set both
- write_timeout=0.5,
- writeTimeout=0.5,
- )
- except Serial.serialutil.SerialException:
- # this exception happens on bad serial port device name
- sys.stderr.write('%s: failed to open serial port "%s"\n'
- '%s: Your computer has the serial ports:\n' %
- (PROG_NAME, opts['input_file_name'],
- PROG_NAME))
-
- # print out list of supported ports
- import serial.tools.list_ports as List_Ports
- ports = List_Ports.comports()
- for port in ports:
- sys.stderr.write(" %s: %s\n" %
- (port.device, port.description))
- sys.exit(1)
-
- # flush input buffer, discarding all its contents
- self.ser.flushInput()
-
- else:
- # Read from a plain file of GREIS messages
- try:
- self.ser = open(opts['input_file_name'], 'rb')
- except IOError:
- sys.stderr.write('%s: failed to open input %s\n' %
- (PROG_NAME, opts['input_file_name']))
- sys.exit(1)
-
- def read(self, read_opts):
- "Read from device, until timeout or expected message"
-
- # are we expecting a certain message?
- if gps_model.expect_statement_identifier:
- # assume failure, until we see expected message
- ret_code = 1
- else:
- # not expecting anything, so OK if we did not see it.
- ret_code = 0
-
- try:
- if read_opts['target']['server'] is not None:
- # gpsd input
- start = time.clock()
- while read_opts['input_wait'] > (time.clock() - start):
- # First priority is to be sure the input buffer is read.
- # This is to prevent input buffer overuns
- if 0 < self.ser.waiting():
- # We have serial input waiting, get it
- # No timeout possible
- # RTCM3 JSON can be over 4.4k long, so go big
- new_out = self.ser.sock.recv(8192)
- if raw is not None:
- # save to raw file
- raw.write(new_out)
- self.out += new_out
-
- consumed = gps_model.decode_msg(self.out)
- self.out = self.out[consumed:]
- if ((gps_model.expect_statement_identifier and
- (gps_model.expect_statement_identifier ==
- gps_model.last_statement_identifier))):
- # Got what we were waiting for. Done?
- ret_code = 0
- if not read_opts['input_forced_wait']:
- # Done
- break
-
- elif self.input_is_device:
- # input is a serial device
- start = time.clock()
- while read_opts['input_wait'] > (time.clock() - start):
- # First priority is to be sure the input buffer is read.
- # This is to prevent input buffer overuns
- # pyserial 3.0 replaces inWaiting() with in_waiting
- if 0 < self.ser.inWaiting():
- # We have serial input waiting, get it
- # 1024 is comfortably large, almost always the
- # Read timeout is what causes ser.read() to return
- new_out = self.ser.read(1024)
- if raw is not None:
- # save to raw file
- raw.write(new_out)
- self.out += new_out
-
- consumed = gps_model.decode_msg(self.out)
- self.out = self.out[consumed:]
- if ((gps_model.expect_statement_identifier and
- (gps_model.expect_statement_identifier ==
- gps_model.last_statement_identifier))):
- # Got what we were waiting for. Done?
- ret_code = 0
- if not read_opts['input_forced_wait']:
- # Done
- break
- else:
- # ordinary file, so all read at once
- self.out += self.ser.read()
- if raw is not None:
- # save to raw file
- raw.write(self.out)
-
- while True:
- consumed = gps_model.decode_msg(self.out)
- self.out = self.out[consumed:]
- if 0 >= consumed:
- break
-
- except IOError:
- # This happens on a good device name, but gpsd already running.
- # or if USB device unplugged
- sys.stderr.write('%s: failed to read %s\n'
- '%s: Is gpsd already holding the port?\n'
- % (PROG_NAME, read_opts['input_file_name'],
- PROG_NAME))
- return 1
-
- if 0 < ret_code:
- # did not see the message we were expecting to see
- sys.stderr.write('%s: waited %0.2f seconds for, '
- 'but did not get: %%%s%%\n'
- % (PROG_NAME, read_opts['input_wait'],
- gps_model.expect_statement_identifier))
- return ret_code
-
- def write_gpsd(self, data):
- "write data to gpsd daemon"
-
- # HEXDATA_MAX = 512, from gps.h, The max hex digits can write.
- # Input data is binary, converting to hex doubles its size.
- # Limit binary data to length 255, so hex data length less than 510.
- if 255 < len(data):
- sys.stderr.write('%s: trying to send %d bytes, max is 255\n'
- % (PROG_NAME, len(data)))
- return 1
-
- if opts['target']['device'] is not None:
- # add in the requested device
- data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",'
- else:
- data_out = b'?DEVICE={'
-
- # Convert binary data to hex and build the message.
- data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n'
- if VERB_RAW <= opts['verbosity']:
- print("sent: ", data_out)
- self.ser.send(data_out)
- return 0
-
-
-# instantiate the GPS class
-gps_model = ubx()
-
-
-def usage():
- "Ouput usage information, and exit"
- print('usage: %s [-c C] [-f F] [-r] [-p P] [-s S] [-v V]\n'
- ' [-hV?] [-S S]\n'
- ' -c C send raw command C to GPS\n'
- ' -d D disable D\n'
- ' -e E enable E\n'
- ' -f F open F as file/device\n'
- ' default: %s\n'
- ' -h print help\n'
- ' -m M optional mode to -p P\n'
- ' -p P send a prepackaged query P to GPS\n'
- ' -R R save raw data from GPS in file R\n'
- ' default: %s\n'
- ' -r open file/device read only\n'
- ' -S S set GPS speed to S\n'
- ' -s S set port speed to S\n'
- ' default: %s bps\n'
- ' -w wait time\n'
- ' default: %s seconds\n'
- ' -V print version\n'
- ' -v V Set verbosity level to V, 0 to 4\n'
- ' default: %s\n'
- ' -? print help\n'
- '\n'
- 'D and E can be one of:' %
- (PROG_NAME, opts['input_file_name'], opts['raw_file'],
- opts['input_speed'], opts['input_wait'], opts['verbosity'])
- )
-
- for item in sorted(gps_model.able_commands.keys()):
- print(" %-12s %s" % (item, gps_model.able_commands[item]["help"]))
-
- print('\nP can be one of:')
- for item in sorted(gps_model.commands.keys()):
- print(" %-12s %s" % (item, gps_model.commands[item]["help"]))
- print('\nOptions can be placed in the UBXOPTS environment variable.\n'
- 'UBXOPTS is processed before the CLI options.')
- sys.exit(0)
-
-
-if 'UBXOPTS' in os.environ:
- # grab the UBXOPTS environment variable for options
- opts['progopts'] = os.environ['UBXOPTS']
- options = opts['progopts'].split(' ') + sys.argv[1:]
-else:
- options = sys.argv[1:]
-
-
-try:
- (options, arguments) = getopt.getopt(options, "?c:d:e:f:hm:rp:s:w:v:R:S:V")
-except getopt.GetoptError as err:
- sys.stderr.write("%s: %s\n"
- "Try '%s -h' for more information.\n" %
- (PROG_NAME, str(err), PROG_NAME))
- sys.exit(2)
-
-for (opt, val) in options:
- if opt == '-c':
- opts['command'] = val
- elif opt == '-d':
- opts['disable'] = val
- elif opt == '-e':
- opts['enable'] = val
- elif opt == '-f':
- opts['input_file_name'] = val
- elif opt == '-h' or opt == '-?':
- usage()
- elif opt == '-m':
- opts['mode'] = int(val)
- elif opt == '-p':
- opts['poll'] = val
- elif opt == '-r':
- opts['read_only'] = True
- elif opt == '-s':
- opts['input_speed'] = int(val)
- if opts['input_speed'] not in gps_model.speeds:
- sys.stderr.write('%s: -s invalid speed %s\n' %
- (PROG_NAME, opts['input_speed']))
- sys.exit(1)
-
- elif opt == '-w':
- opts['input_wait'] = float(val)
- elif opt in '-v':
- opts['verbosity'] = int(val)
- elif opt in '-R':
- # raw log file
- opts['raw_file'] = val
- elif opt in '-S':
- opts['set_speed'] = int(val)
- if opts['set_speed'] not in gps_model.speeds:
- sys.stderr.write('%s: -S invalid speed %s\n' %
- (PROG_NAME, opts['set_speed']))
- sys.exit(1)
-
- elif opt == '-V':
- # version
- sys.stderr.write('%s: Version %s\n' % (PROG_NAME, gps_version))
- sys.exit(0)
-
-if opts['input_file_name'] is None:
- # no input file given
- # default to local gpsd
- opts['target']['server'] = "localhost"
- opts['target']['port'] = gps.GPSD_PORT
- opts['target']['device'] = None
- if arguments:
- # server[:port[:device]]
- parts = arguments[0].split(':')
- opts['target']['server'] = parts[0]
- if 1 < len(parts):
- opts['target']['port'] = parts[1]
- if 2 < len(parts):
- opts['target']['device'] = parts[2]
-
-elif arguments:
- sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME)
- sys.exit(1)
-
-if VERB_PROG <= opts['verbosity']:
- # dump all options
- print('Options:')
- for option in sorted(opts):
- print(" %s: %s" % (option, opts[option]))
-
-# done parsing arguments from environment and CLI
-
-try:
- # raw log file requested?
- raw = None
- if opts['raw_file']:
- try:
- raw = open(opts['raw_file'], 'w')
- except IOError:
- sys.stderr.write('%s: failed to open raw file %s\n' %
- (PROG_NAME, opts['raw_file']))
- sys.exit(1)
-
- # create the I/O instance
- io_handle = gps_io(serial)
-
- sys.stdout.flush()
-
- if opts['disable'] is not None:
- if VERB_QUIET < opts['verbosity']:
- sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable']))
- if opts['disable'] in gps_model.able_commands:
- command = gps_model.able_commands[opts['disable']]
- command["command"](gps, 0)
- else:
- sys.stderr.write('%s: disable %s not found\n' %
- (PROG_NAME, opts['disable']))
- sys.exit(1)
-
- elif opts['enable'] is not None:
- if VERB_QUIET < opts['verbosity']:
- sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable']))
- if opts['enable'] in gps_model.able_commands:
- command = gps_model.able_commands[opts['enable']]
- command["command"](gps, 1)
- else:
- sys.stderr.write('%s: enable %s not found\n' %
- (PROG_NAME, opts['enable']))
- sys.exit(1)
-
- elif opts['poll'] is not None:
- if VERB_QUIET < opts['verbosity']:
- sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll']))
-
- if 'MODEL' == opts["poll"]:
- if opts["mode"] is None:
- opts["mode"] = 0 # default to portable model
-
- if opts['poll'] in gps_model.commands:
- command = gps_model.commands[opts['poll']]
- if 'opt' in command:
- command["command"](gps, command["opt"])
- else:
- command["command"](gps)
- else:
- sys.stderr.write('%s: poll %s not found\n' %
- (PROG_NAME, opts['poll']))
- sys.exit(1)
-
- elif opts['set_speed'] is not None:
- gps_model.send_set_speed(opts['set_speed'])
-
- elif opts['command'] is not None:
- # zero length is OK to send
- # add trailing new line
- opts['command'] += "\n"
-
- if VERB_QUIET < opts['verbosity']:
- sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command']))
- gps_model.gps_send_raw(opts['command'])
-
- exit_code = io_handle.read(opts)
-
- if ((VERB_RAW <= opts['verbosity']) and io_handle.out):
- # dump raw left overs
- print("Left over data:")
- print(io_handle.out)
-
- sys.stdout.flush()
- io_handle.ser.close()
-
-except KeyboardInterrupt:
- print('')
- exit_code = 1
-
-sys.exit(exit_code)
diff --git a/contrib/zerk b/contrib/zerk
deleted file mode 100755
index 848d216b..00000000
--- a/contrib/zerk
+++ /dev/null
@@ -1,1896 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: UTF-8
-'''
-zerk -- GREIS configurator and packet decoder
-
-usage: zerk [OPTIONS] [server[:port[:device]]]
-'''
-
-# This program conforms to the JAVAD document:
-# GNSS Receiver External Interface Specification
-# Revised: October 11, 2017
-#
-# Hereafter referred to as "the specification"
-#
-# This file is Copyright (c) 2018 by the GPSD project
-# BSD terms apply: see the file COPYING in the distribution root for details.
-#
-# This code runs compatibly under Python 2 and 3.x for x >= 2.
-# Preserve this property!
-#
-# ENVIRONMENT:
-# Options in the ZERKOPTS environment variable will be parsed before
-# the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED'
-#
-# example usages:
-# Coldboot the GPS: zerk -p COLDBOOT
-# Print current serial port: zerk -c "print,/cur/term"
-# Decode raw log file: zerk -r -f greis-binary.log -v 2
-# Change GPS port speed: zerk -S 230400
-# Watch entire reset cycle: zerk -p RESET -v 2 -w 20 -W
-# poll SVs Status: zerk -W -w 2 -v 2 -c "out,,jps/{CS,ES,GS,Is,WS,QS}"
-# dump local gpsd data zerk -v 2 -w 5 localhost
-#
-# TODO: no CRC16 packets handled yet
-# TODO: more packet decodes
-
-from __future__ import absolute_import, print_function, division
-
-import binascii # for binascii.hexlify()
-import getopt # for getopt.getopt(), to parse CLI options
-import hashlib # for hashlib.sha1
-import os # for os.environ
-import socket # for socket.error
-import stat # for stat.S_ISBLK()
-import struct # for pack()
-import sys
-import time
-import xml.etree.ElementTree # to parse .jpo files
-
-PROG_NAME = 'zerk'
-
-try:
- import serial
-except ImportError:
- # treat serial as special since it is not part of standard Python
- sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME)
- sys.exit(2)
-
-try:
- import gps
-except ImportError:
- # PEP8 says local imports last
- sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" %
- PROG_NAME)
- sys.exit(2)
-
-gps_version = '3.18-dev'
-if gps.__version__ != gps_version:
- sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" %
- (PROG_NAME, gps_version, gps.__version__))
- sys.exit(1)
-
-
-VERB_QUIET = 0 # quiet
-VERB_NONE = 1 # just output requested data and some info
-VERB_DECODE = 2 # decode all messages
-VERB_INFO = 3 # more info
-VERB_RAW = 4 # raw info
-VERB_PROG = 5 # program trace
-
-# dictionary to hold all user options
-opts = {
- # command to send to GPS, -c
- 'command': None,
- # command for -d disable
- 'disable': None,
- # command for -e enable
- 'enable': None,
- # default input -f file
- 'input_file_name': None,
- # default forced wait? -W
- 'input_forced_wait': False,
- # default port speed -s
- 'input_speed': 115200,
- # default input wait time -w in seconds
- 'input_wait': 2.0,
- # the name of an OAF file, extension .jpo
- 'oaf_name': None,
- # poll command -p
- 'poll': None,
- # raw log file name
- 'raw_file': None,
- # open port read only -r
- 'read_only': False,
- # speed to set GPS -S
- 'set_speed': None,
- # target gpsd (server:port:device) to connect to
- 'target': {"server": None, "port": gps.GPSD_PORT, "device": None},
- # verbosity level, -v
- 'verbosity': VERB_NONE,
- # contents of environment variable ZERKOPTS
- 'progopts': '',
-}
-
-
-class greis(object):
- """A class for working with the GREIS GPS message formats
-
- This class contains functions to decode messages in the Javad GREIS
- "Receiver Input Language" and "Receiver Messages" formats.
- """
-
- # when a statement identifier is received, it is stored here
- last_statement_identifier = None
- # expected statement identifier.
- expect_statement_identifier = False
- # ID of current message as a string
- s_id = ''
-
- def __init__(self):
- "Initialize class"
-
- self.last_statement_identifier = None
- self.expect_statement_identifier = False
- # last epoch received in [~~]
- # epoch == None means never got epoch, epoch == -1 means missing.
- self.epoch = None
-
- def f4_s(self, f):
- "convert an '! f4' to a string"
-
- if gps.isfinite(f):
- # yeah, the precision is a guess
- return "%.6f" % f
- return 'X'
-
- def f8_s(self, f):
- "convert an '! f8' to a string"
-
- if gps.isfinite(f):
- # yeah, the precision is a guess
- return "%.4f" % f
- return 'X'
-
- def i1_s(self, i):
- "convert an '! i1' to a string"
- return 'X' if i == 127 else str(i)
-
- def i2_s(self, i):
- "convert an '! i2' to a string"
- return 'X' if i == 32767 else str(i)
-
- def i4_s(self, i):
- "convert an '! i4' to a string"
- return 'X' if i == 2147483647 else str(i)
-
- def u1_s(self, u):
- "convert an '! u1' to a string"
- return 'X' if u == 255 else str(u)
-
- def u2_s(self, u):
- "convert an '! u2' to a string"
- return 'X' if u == 65535 else str(u)
-
- def u4_s(self, u):
- "convert an '! u4' to a string"
- return 'X' if u == 4294967295 else str(u)
-
- def isuchex(self, c):
- "Is byte an upper case hex char?"
- if 48 <= c and 57 >= c:
- # 0 to 9
- return int(c) - 48
- if 65 <= c and 70 >= c:
- # A to F
- return int(c) - 55
- return -1
-
- soltypes = {0: "None",
- 1: "3D",
- 2: "DGPS",
- 3: "RTK float",
- 4: "RTK fixed",
- 5: "fixed"
- }
-
- # allowable speeds
- speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600,
- 4800, 2400, 1200, 600, 300)
-
- def msg_c_(self, payload):
- "[c?] decode, Smoothing Corrections"
-
- s = ' smooth'
- for i in range(0, len(payload) - 1, 2):
- u = struct.unpack_from('<h', payload, i)
- s += " " + self.i2_s(u[0])
-
- return s + '\n'
-
- def msg__p(self, payload):
- "[?p] decode, Integer Relative Carrier Phases"
-
- s = ' rcp'
- for i in range(0, len(payload) - 1, 4):
- u = struct.unpack_from('<l', payload, i)
- s += " " + self.i4_s(u[0])
-
- return s + '\n'
-
- def msg__d(self, payload):
- "[?d] decode, Relative Doppler"
-
- s = ' srdp'
- for i in range(0, len(payload) - 1, 2):
- u = struct.unpack_from('<h', payload, i)
- s += " " + self.i2_s(u[0])
-
- return s + '\n'
-
- def msg__r(self, payload):
- "[?r] decode, Integer Relative Pseudo-ranges"
-
- s = ' srdp'
- for i in range(0, len(payload) - 1, 2):
- u = struct.unpack_from('<h', payload, i)
- s += " " + self.i2_s(u[0])
-
- return s + '\n'
-
- def msg__A(self, payload):
- "[?A] decode, GPS, GALILEO Almanac"
- m_len = len(payload)
-
- if ('[EA]' == self.s_id) and (49 > m_len):
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<BhlBBBfffffffff', payload, 0)
-
- s = (" sv %u wna %d toa %d healthA %u healthS %u config %u\n"
- " af1 %f af0 %f rootA %f ecc %f m0 %f\n"
- " omega0 %f argPer %f delf %f omegaDot %f\n" % u)
-
- if '[EA]' == self.s_id:
- u = struct.unpack_from('<H', payload, 46)
- s += (" iod %d" % (u[0]))
- return s
-
- def msg__E(self, payload):
- "[?E] decode, SNR x 4"
-
- s = ' cnrX4'
- for i in range(0, len(payload) - 1, 1):
- u = struct.unpack_from('<B', payload, i)
- s += " " + self.u1_s(u[0])
-
- return s + '\n'
-
- def msg_WE(self, payload):
- "[WE] decode, SBAS Ephemeris"
-
- u = struct.unpack_from('<BBBBLdddffffffffLHB', payload, 0)
- s = (" waasPrn %u gpsPrn %u iod %u acc %u tod %u\n"
- " xg %f yg %f zg %f\n"
- " vxg %f vyg %f vzg %f\n"
- " vvxg %f vvyg %f vvzg %f\n"
- " agf0 %f agf1 %f tow %u wn %u flags %u\n" % u)
-
- return s
-
- def msg_r(self, payload):
- "[r?] decode, Integer Psudeo Ranges"
-
- s = ' spr'
- for i in range(0, len(payload) - 1, 4):
- u = struct.unpack_from('<l', payload, i)
- s += " " + self.i4_s(u[0])
-
- return s + '\n'
-
- def msg_AZ(self, payload):
- "[AZ] decode, Satellite Azimuths"
-
- s = " azim"
- for i in range(0, len(payload) - 1):
- # azimuth/2, 0 to 180 degrees
- s += " " + self.u1_s(payload[i])
-
- return s + '\n'
-
- def msg_BP(self, payload):
- "[BP] decode"
-
- u = struct.unpack_from('<f', payload, 0)
- return " acc %.3e\n" % u[0]
-
- def msg_DC(self, payload):
- "[DC] decode, P/L1 Doppler"
-
- s = " dp"
- for i in range(0, len(payload) - 1, 4):
- u = struct.unpack_from('<L', payload, i)
- s += " %d" % (u[0])
-
- return s + '\n'
-
- def msg_DO(self, payload):
- "[DO] decode"
-
- u = struct.unpack_from('<ff', payload, 0)
- return " val %.3f sval %.3f\n" % u
-
- def msg_DP(self, payload):
- "[DP] decode"
-
- u = struct.unpack_from('<fffBfB', payload, 0)
- return (" hdop %f vdop %f tdop %f edop %f\n"
- " solType %s\n" %
- (u[0], u[1], u[2], u[4], self.soltypes[u[3]]))
-
- def msg_E_(self, payload):
- "[E?] decode, SNR"
-
- s = ' cnr'
- for i in range(0, len(payload) - 1):
- s += " " + self.u1_s(payload[i])
-
- return s + '\n'
-
- def msg_ET(self, payload):
- "[::](ET) decode, Epoch time, end of epoch"
-
- u = struct.unpack_from('<L', payload, 0)
- if ((self.epoch is not None and self.epoch != u[0])):
- if -1 == self.epoch:
- print("Error: [::](ET) missing [~~](RT)\n")
- else:
- print("Error: [::](ET) Wrong Epoch %u, should be %u\n" %
- (u[0], self.epoch))
- # reset epoch
- self.epoch = -1
- return "(ET) tod %u\n" % u[0]
-
- def msg_EL(self, payload):
- "[EL] decode, Satellite Elevations"
-
- s = " elev"
- for i in range(0, len(payload) - 1):
- # looking for integer (-90 to 90), not byte
- u = struct.unpack_from('<b', payload, i)
- s += " " + self.i1_s(u[0])
-
- return s + '\n'
-
- def msg_ER(self, payload):
- "[ER] decode, Error messages"
-
- parts = payload.split(b'%')
- if 1 < len(parts):
- self.last_statement_identifier = parts[1]
-
- s_payload = "".join(map(chr, payload))
- print("[ER] %s\n" % s_payload)
- return " %s\n" % s_payload
-
- def msg_EU(self, payload):
- "[EU] decode, GALILEO UTC and GPS Time Parameters"
-
- u = struct.unpack_from('<dfLHbBHbffLHH', payload, 0)
- return (" ao %f a1 %f tot %u wnt %u dtls %d dn %u wnlsf %u\n"
- " dtlsf %d a0g %f a1g %f t0g %u wn0g %u flags %#x\n" % u)
-
- def msg_FC(self, payload):
- "[FC] [F1] [F2] [F3] [f5] [Fl] decode, Signal Lock Loop Flags"
-
- s = " flags 0x"
- for i in range(0, len(payload) - 1):
- u = struct.unpack_from('<H', payload, i)
- s += " %2x" % (u[0])
-
- return s + '\n'
-
- def msg__E1(self, payload):
- "[?E] decode, BeiDos, GPS, GALILEO, IRNSS Ephemeris "
- m_len = len(payload)
- # [GE]
- if ('[IE]' == self.s_id) and (124 > m_len):
- return " Bad Length %s" % m_len
- if ('[CN]' == self.s_id) and (132 > m_len):
- return " Bad Length %s" % m_len
- if ('[EN]' == self.s_id) and (145 > m_len):
- return " Bad Length %s" % m_len
-
- u = struct.unpack_from('<BLBhlbBhfffflhddddddfffffffff', payload, 0)
- s = (" sv %u tow %u flags %u iodc %d toc %d ura %d healthS %u\n"
- " wn %d tgd %f af2 %f af1 %f af0 %f toe %d\n"
- " iode %d rootA %f ecc %f m0 %f omega0 %f\n"
- " inc0 %f argPer %f deln %f omegaDot %f\n"
- " incDot %f crc %f crs %f cuc %f\n"
- " cus %f cic %f cis %f\n" % u)
-
- if '[EN]' == self.s_id:
- u = struct.unpack_from('<fffffBB', payload, 122)
- s += (" bgdE1E5a %f bgdE1E5b %f aio %f ai1 %f ai2 %f\n"
- " sfi %u navType %u" % u)
- if 149 <= m_len:
- # DAf0 added in 3.7.0
- u = struct.unpack_from('<f', payload, 144)
- s += (" DAf0 %f" % u)
- s += '\n'
-
- if ('[IE]' == self.s_id) and (124 > m_len):
- u = struct.unpack_from('<B', payload, 122)
- s += (" navType %u\n" % u[0])
-
- if ('[CN]' == self.s_id) and (132 > m_len):
- u = struct.unpack_from('<fBf', payload, 122)
- s += (" tgd2 %f navType %u DAf0 %f\n" % u)
-
- # TODO: decode length 160 168
-
- return s
-
- def msg_GT(self, payload):
- "[GT] decode, GPS Time "
-
- u = struct.unpack_from('<LH', payload, 0)
- return " tow %u wn %d\n" % u
-
- def msg_ID(self, payload):
- "[ID] Ionosphere Delays"
-
- s = ' delay'
- for i in range(0, len(payload) - 1, 4):
- u = struct.unpack_from('<f', payload, i)
- s += " %s" % self.f4_s(u[0])
-
- return s + '\n'
-
- def msg_IO(self, payload):
- "[IO] decode, GPS Ionospheric Parameters"
-
- u = struct.unpack_from('<LHffffffff', payload, 0)
-
- return (" tot %d wn %u alpha0 %f alpha1 %f alpha2 %f\n"
- " alpha3 %f beta0 %u beta1 %d beta2 %f\n"
- " beta3 %f\n" % u)
-
- def msg_LO(self, payload):
- "[LO] decode, undocumented message"
-
- return " Undocumented message\n"
-
- def msg_MF(self, payload):
- "[MF] Messages Format"
-
- u = struct.unpack_from('<BBBBBBB', payload, 0)
- return (" id %c%c majorVer %c%c minorVer %c%c order %c\n" %
- (chr(u[0]), chr(u[1]), chr(u[2]), chr(u[3]),
- chr(u[4]), chr(u[5]), chr(u[6])))
-
- def msg_PM(self, payload):
- "[PM] parameters"
-
- # PM only seems to work after a coldboot, once
- # zerk -v 2 -w 20 -c 'out,,jps/{PM}' -W
- return " %s\n" % payload
-
- def msg_PV(self, payload):
- "[PV] decode, Cartesian Position and Velocity"
-
- u = struct.unpack_from('<dddfffffBB', payload, 0)
- return (" x %s y %s z %s sigma %s\n"
- " vx %s vy %s vz %s\n"
- " vsigma %s soltype %s\n" %
- (self.f8_s(u[0]), self.f8_s(u[1]), self.f8_s(u[2]),
- self.f4_s(u[3]), self.f4_s(u[4]), self.f4_s(u[5]),
- self.f4_s(u[6]), self.f4_s(u[7]), self.soltypes[u[8]]))
-
- def msg_RD(self, payload):
- "[RD] decode, Receiver Date"
-
- u = struct.unpack_from('<HBBB', payload, 0)
- return " year %d month %d day %d base %d\n" % u
-
- def msg_RE(self, payload):
- "[RE] decode"
-
- parts = payload.split(b'%')
- if 1 < len(parts):
- # Got a statement identifier (ID), save it?
- # Multiline statement if payload ends with comma or left brace
- if payload[-1] not in (ord(','), ord('{')):
- # yes, this is the end
- self.last_statement_identifier = parts[1]
-
- # Get the message body
- part1 = parts[1].split(b',')
-
- if 'em' == parts[1]:
- # Probably no parts[2]
- print("Enable Messages %s" % parts[2])
- return " Enable Messages %s\n" % parts[2]
-
- if 'id' == parts[1]:
- print("ID: %s" % parts[2])
- return " ID %s\n" % parts[2]
-
- if 'opts' == part1[0]:
- if 1 < len(part1):
- s = "OAF %s: %s" % (part1[1], parts[2])
- else:
- s = " OAF: %s" % (parts[2])
- print(s)
- return " %s\n" % s
-
- if 'serial' == parts[1]:
- print("SERIAL: %s" % parts[2])
- return " SERIAL %s\n" % parts[2]
-
- if 'vendor' == parts[1]:
- print("VENDOR: %s" % parts[2])
- return " Vendor %s\n" % parts[2]
-
- if 'ver' == parts[1]:
- print("VER: %s" % parts[2])
- return " Version %s\n" % parts[2]
-
- # unknown statement identifier
- s_payload = "".join(map(chr, payload))
- print("RE: %s\n" % s_payload)
-
- return " %s\n" % s_payload
-
- def msg_RT(self, payload):
- "[~~](RT) decode, Receiver Time, start of epoch"
-
- if self.epoch is not None and -1 != self.epoch:
- print("Error: [~~](RT) missing [::](ET)\n")
-
- u = struct.unpack_from('<L', payload, 0)
- # save start of epoch
- self.epoch = u[0]
- return "(RT) tod %u\n" % self.epoch
-
- def msg_S_(self, payload):
- "[CS], [ES], [GS], [Is], [WS], [NS], [QS], decode, SVs Status"
-
- # to poll them all: zerk -W -w 2 -v 2 -c "out,,jps/{CS,ES,GS,Is,WS,QS}"
- # TODO, check @checksum
-
- return "%s" % payload
-
- def msg_SE(self, payload):
- "[SE] decode"
-
- u = struct.unpack_from('<BBBBB', payload, 0)
- return " data 0x %x %x %x %x %x\n" % u
-
- def msg_SG(self, payload):
- "[SG] decode"
-
- u = struct.unpack_from('<ffffBB', payload, 0)
- return (" hpos %s vpos %s hvel %s vvel %s\n"
- " soltype %s\n" %
- (self.f4_s(u[0]), self.f4_s(u[1]), self.f4_s(u[2]),
- self.f4_s(u[3]), self.soltypes[u[4]]))
-
- def msg_SI(self, payload):
- "[SI] decode, Satellite Index, deprecated by Javad, use [SX]"
-
- # [SX] require 3.7 firmware, we use [SI] to support 3.6
- s = " usi"
- for i in range(0, len(payload) - 1):
- s += " %d" % payload[i]
-
- return s + '\n'
-
- def msg_SP(self, payload):
- "[SP] decode, Position Covariance Matrix"
-
- u = struct.unpack_from('<ffffffffffB', payload, 0)
- return (" xx % f yy % f zz % f tt % f xy % f\n"
- " xz % f xt % f yz % f yt % f zt % f\n"
- " solType %s\n" %
- (u[0], u[1], u[2], u[3], u[4],
- u[5], u[6], u[7], u[8], u[9],
- self.soltypes[u[10]]))
-
- def msg_SS(self, payload):
- "[SS] decode, Satellite Navigation Status"
-
- s = " ns"
- for i in range(0, len(payload) - 2):
- s += " %d" % payload[i]
-
- return (s + '\n solType %s\n' %
- self.soltypes[payload[len(payload) - 2]])
-
- def msg_ST(self, payload):
- "[ST] decode, Solution Time Tag"
-
- u = struct.unpack_from('<LBB', payload, 0)
- return (" time %u ms, soltype %s\n" %
- (u[0], self.soltypes[u[1]]))
-
- def msg_SX(self, payload):
- "[SX] decode, Extended Satellite Indices"
-
- # [SX] require 3.7 firmware
- s = " ESI"
- for i in range(0, len(payload) - 2, 2):
- u = struct.unpack_from('<BB', payload, i)
- s += " (%u, %u)" % u
-
- return s + '\n'
-
- def msg_TC(self, payload):
- "[TC] decode, CA/L1 Continous Tracking Time"
-
- s = " tt"
- for i in range(0, len(payload) - 1, 2):
- u = struct.unpack_from('<H', payload, i)
- s += " %.2f" % u[0]
-
- return s + '\n'
-
- def msg_TO(self, payload):
- "[TO] decode, Reference Time to Receiver Time Offset"
-
- u = struct.unpack_from('<dd', payload, 0)
- return " val %.3f sval %.3f\n" % u
-
- def msg_UO(self, payload):
- "[UO] decode, GPS UTC Time Parameters"
-
- u = struct.unpack_from('<dfLHbBHb', payload, 0)
- return (" a0 %f a1 %f tot %d wnt %d dtls %d\n"
- " dn %d wnlsf %d dtlsf %d\n" % u)
-
- def msg_WA(self, payload):
- "[WA] decode"
-
- u = struct.unpack_from('<BBBBLdddfffLH', payload, 0)
- return (" waasPrn %d gpsPrn %d if %d healthS %d tod %d\n"
- " ECEF %.3f %.3f %.3f, %.3f %.3f %.3f\n"
- " tow %d wn %d\n" % u)
-
- def msg_WU(self, payload):
- "[WU] decode, SBAS UTC Time Parameters"
-
- u = struct.unpack_from('<dfLHbBHbfbLHB', payload, 0)
- return (" ao %f a1 %f tot %u wnt %u dtls %d dn %u\n"
- "wnlsf %u dtlsf %d utcsi %d tow %u wn %u flags %#x\n" % u)
-
- # table from message id to respective message decoder.
- # Note: id (%id%) is different than ID (statement identifier)
- # the id is the first two characters of a GREIS receiver Message
- # see section 3.3 of the specification
- messages = {
- '[0d]': (msg__d, 1),
- '[1d]': (msg__d, 1),
- '[1E]': (msg__E, 1),
- '[1p]': (msg__p, 1),
- '[1r]': (msg__r, 1),
- '[2d]': (msg__d, 1),
- '[2E]': (msg__E, 1),
- '[2p]': (msg__p, 1),
- '[2r]': (msg__r, 1),
- '[3d]': (msg__d, 1),
- '[3E]': (msg__E, 1),
- '[3p]': (msg__p, 1),
- '[3r]': (msg__r, 1),
- '[5d]': (msg__d, 1),
- '[5E]': (msg__E, 1),
- '[5p]': (msg__p, 1),
- '[5r]': (msg__r, 1),
- '[AZ]': (msg_AZ, 1),
- '[BP]': (msg_BP, 5),
- '[c1]': (msg_c_, 1),
- '[c2]': (msg_c_, 1),
- '[c3]': (msg_c_, 1),
- '[c5]': (msg_c_, 1),
- '[CA]': (msg__A, 47),
- '[cc]': (msg_c_, 1),
- '[CE]': (msg__E, 1),
- '[cl]': (msg_c_, 1),
- '[CN]': (msg__E1, 123),
- '[cp]': (msg__p, 1),
- '[cr]': (msg__r, 1),
- '[CS]': (msg_S_, 8),
- '[DC]': (msg_DC, 1),
- '[DO]': (msg_DO, 6),
- '[DP]': (msg_DP, 18),
- '[E1]': (msg_E_, 1),
- '[E2]': (msg_E_, 1),
- '[E3]': (msg_E_, 1),
- '[E5]': (msg_E_, 1),
- '[EA]': (msg__A, 47),
- '[EC]': (msg_E_, 1),
- '[El]': (msg_E_, 1),
- '[EL]': (msg_EL, 1),
- '[EN]': (msg__E1, 123),
- '[ER]': (msg_ER, 1),
- '[ES]': (msg_S_, 8),
- '[EU]': (msg_EU, 40),
- '[F1]': (msg_FC, 1),
- '[F2]': (msg_FC, 1),
- '[F3]': (msg_FC, 1),
- '[F5]': (msg_FC, 1),
- '[FA]': (msg_FC, 1),
- '[FC]': (msg_FC, 1),
- '[Fl]': (msg_FC, 1),
- '[GA]': (msg__A, 47),
- '[GE]': (msg__E1, 123),
- '[GS]': (msg_S_, 8),
- '[GT]': (msg_GT, 7),
- '[IA]': (msg__A, 47),
- '[ID]': (msg_ID, 1),
- '[IE]': (msg__E1, 123),
- '[IO]': (msg_IO, 39),
- '[Is]': (msg_S_, 8),
- '[ld]': (msg__d, 1),
- '[lE]': (msg__E, 1),
- '[lp]': (msg__p, 1),
- '[lr]': (msg__r, 1),
- '[LO]': (msg_LO, 1),
- '[MF]': (msg_MF, 9),
- '[::]': (msg_ET, 4),
- '[~~]': (msg_RT, 4),
- '[NS]': (msg_S_, 8),
- '[PM]': (msg_PM, 0),
- '[PV]': (msg_PV, 46),
- '[QA]': (msg__A, 47),
- '[QE]': (msg__E1, 123),
- '[QS]': (msg_S_, 8),
- '[r1]': (msg_r, 1),
- '[r2]': (msg_r, 1),
- '[r3]': (msg_r, 1),
- '[r5]': (msg_r, 1),
- '[rc]': (msg_r, 1),
- '[RD]': (msg_RD, 6),
- '[RE]': (msg_RE, 1),
- '[rl]': (msg_r, 1),
- '[rx]': (msg_r, 1),
- '[SE]': (msg_SE, 6),
- '[SG]': (msg_SG, 18),
- '[SI]': (msg_SI, 1),
- '[SP]': (msg_SP, 42),
- '[SS]': (msg_SS, 1),
- '[ST]': (msg_ST, 6),
- '[SX]': (msg_SX, 1),
- '[TC]': (msg_TC, 1),
- '[TO]': (msg_TO, 6),
- '[UO]': (msg_UO, 24),
- '[WA]': (msg_WA, 51),
- '[WE]': (msg_WE, 73),
- '[WS]': (msg_S_, 8),
- '[WU]': (msg_WU, 40),
- }
-
- def decode_msg(self, out):
- "Decode one message and then return number of chars consumed"
-
- state = 'BASE'
- consumed = 0
- # raw message, sometimes used for checksum calc
- m_raw = bytearray(0)
-
- # decode state machine
- for this_byte in out:
- consumed += 1
- if isinstance(this_byte, str):
- # a character, probably read from a file
- c = ord(this_byte)
- else:
- # a byte, probably read from a serial port
- c = int(this_byte)
-
- if VERB_RAW <= opts['verbosity']:
- if (ord(' ') <= c) and (ord('~') >= c):
- # c is printable
- print("state: %s char %c (%#x)" % (state, chr(c), c))
- else:
- # c is not printable
- print("state: %s char %#x" % (state, c))
-
- m_raw.extend([c])
-
- # parse input stream per GREIS Ref Guide Section 3.3.3
- if 'BASE' == state:
- # start fresh
- # place to store 'comments'
- comment = ''
- # message id byte one
- m_id1 = 0
- # message id byte two
- m_id2 = 0
- # message length as integer
- m_len = 0
- # byte array to hold payload, including possible checksum
- m_payload = bytearray(0)
- m_raw = bytearray(0)
- m_raw.extend([c])
-
- if (ord('0') <= c) and (ord('~') >= c):
- # maybe id 1, '0' to '~'
- state = 'ID1'
-
- # start the grab
- m_id1 = c
- continue
-
- if ord("%") == c:
- # start of %ID%, Receiver Input Language
- # per GREIS Ref Guide Section 2.2
- state = 'RIL'
-
- # start fresh
- comment = "%"
- continue
-
- if ord("$") == c:
- # NMEA line, treat as comment
- state = 'NMEA'
-
- # start fresh
- comment = "$"
- continue
-
- if ord("#") == c:
- # comment line
- state = 'COMMENT'
-
- # start fresh
- comment = "#"
- continue
-
- if ord('\n') == c or ord('\r') == c:
- # stray newline or linefeed, eat it
- return consumed
-
- # none of the above, stay in BASE
- continue
-
- if state in ('COMMENT', 'JSON', 'RIL'):
- # inside comment
- if ord('\n') == c or ord('\r') == c:
- # Got newline or linefeed
- # GREIS terminates messages on <CR> or <LF>
- # Done, got a full message
- if b'{"class":"ERROR"' in comment:
- # always print gpsd errors
- print(comment)
- elif VERB_DECODE <= opts['verbosity']:
- print(comment)
- return consumed
- else:
- comment += chr(c)
- continue
-
- if 'ID1' == state:
- # maybe id 2, '0' to '~'
- if ord('"') == c:
- # technically could be GREIS, but likely JSON
- state = 'JSON'
- comment += chr(m_id1) + chr(c)
- elif (ord('0') <= c) and (ord('~') >= c):
- state = 'ID2'
- m_id2 = c
- else:
- state = 'BASE'
- continue
-
- if 'ID2' == state:
- # maybe len 1, 'A' to 'F'
- x = self.isuchex(c)
- if -1 < x:
- state = 'LEN1'
- m_len = x * 256
- else:
- state = 'BASE'
- continue
-
- if 'LEN1' == state:
- # maybe len 2, 'A' to 'F'
- x = self.isuchex(c)
- if -1 < x:
- state = 'LEN2'
- m_len += x * 16
- else:
- state = 'BASE'
- continue
-
- if 'LEN2' == state:
- # maybe len 3, 'A' to 'F'
- x = self.isuchex(c)
- if -1 < x:
- state = 'PAYLOAD'
- m_len += x
- else:
- state = 'BASE'
- continue
-
- if 'NMEA' == state:
- # inside NMEA
- if ord('\n') == c or ord('\r') == c:
- # Got newline or linefeed
- # done, got a full message
- # GREIS terminates messages on <CR> or <LF>
- if VERB_DECODE <= opts['verbosity']:
- print(comment)
- return consumed
- else:
- comment += chr(c)
- continue
-
- if 'PAYLOAD' == state:
- # getting payload
- m_payload.extend([c])
- if len(m_payload) < m_len:
- continue
-
- # got entire payload
- self.s_id = "[%c%c]" % (chr(m_id1), chr(m_id2))
- if VERB_DECODE <= opts['verbosity']:
- x_payload = binascii.hexlify(m_payload)
-
- # [RE], [ER] and more have no 8-bit checksum
- # assume the rest do
- if ((self.s_id not in ('[CS]', '[ER]', '[ES]', '[GS]', '[Is]',
- '[MF]', '[NS]', '[PM]', '[QS]', '[RE]',
- '[WS]') and
- not self.checksum_OK(m_raw))):
- print("ERROR: Bad checksum\n")
- if VERB_DECODE <= opts['verbosity']:
- print("DECODE: id: %s len: %d\n"
- "DECODE: payload: %s\n" %
- (self.s_id, m_len, x_payload))
- # skip it.
- return consumed
-
- if self.s_id in self.messages:
- if VERB_INFO <= opts['verbosity']:
- print("INFO: id: %s len: %d\n"
- "INFO: payload: %s\n" %
- (self.s_id, m_len, x_payload))
-
- (decode, length) = self.messages[self.s_id]
- if m_len < length:
- print("DECODE: %s Bad Length %s\n" %
- (self.s_id, m_len))
- else:
- s = self.s_id + decode(self, m_payload)
- if VERB_DECODE <= opts['verbosity']:
- print(s)
- else:
- # unknown message
- if VERB_DECODE <= opts['verbosity']:
- print("DECODE: Unknown: id: %s len: %d\n"
- "DECODE: payload: %s\n" %
- (self.s_id, m_len, x_payload))
- return consumed
-
- # give up
- state = 'BASE'
-
- # fell out of loop, no more chars to look at
- return 0
-
- def checksum_OK(self, raw_msg):
- "Check the i8-bit checksum on a message, return True if good"
-
- # some packets from the GPS use CRC16, some i8-bit checksum, some none
- # only 8-bit checksum done here for now
- calc_checksum = self.checksum(raw_msg, len(raw_msg) - 1)
- rcode = raw_msg[len(raw_msg)-1] == calc_checksum
- if VERB_RAW <= opts['verbosity']:
- print("Checksum was %#x, calculated %#x" %
- (raw_msg[len(raw_msg)-1], calc_checksum))
- return rcode
-
- def _rol(self, value):
- "rotate a byte left 2 bits"
- return ((value << 2) | (value >> 6)) & 0x0ff
-
- def checksum(self, msg, m_len):
- "Calculate GREIS 8-bit checksum"
-
- # Calculated per section A.1.1 of the specification
- # msg may be bytes (incoming messages) or str (outgoing messages)
-
- ck = 0
- for c in msg[0:m_len]:
- if isinstance(c, str):
- # a string, make a byte
- c = ord(c)
- ck = self._rol(ck) ^ c
-
- return self._rol(ck) & 0x0ff
-
- def make_pkt(self, m_data):
- "Build an output message, always ASCII, add checksum and terminator"
-
- # build core message
-
- # no leading spaces, checksum includes the @
- m_data = m_data.lstrip() + b'@'
-
- chk = self.checksum(m_data, len(m_data))
-
- # all commands end with CR and/or LF
- return m_data + (b'%02X' % chk) + b'\n'
-
- def gps_send(self, m_data):
- "Send message to GREIS GPS"
-
- m_all = self.make_pkt(m_data)
- if not opts['read_only']:
- io_handle.ser.write(m_all)
- if VERB_INFO <= opts['verbosity']:
- print("sent:", m_all)
- self.decode_msg(m_all)
- sys.stdout.flush()
-
- # Table of known options. From table 4-2 of the specification.
- oafs = (
- b"_AJM",
- b"AUTH",
- b"_BLT",
- b"_CAN",
- b"CDIF",
- b"CMRI",
- b"CMRO",
- b"COMP",
- b"COOP",
- b"COPN",
- b"CORI",
- b"_CPH",
- b"DEVS",
- b"DIST",
- b"_DTM",
- b"_E5B",
- b"_E6_",
- b"EDEV",
- b"ETHR",
- b"EVNT",
- b"_FRI",
- b"_FRO",
- b"_FTP",
- b"_GAL",
- b"GBAI",
- b"GBAO",
- b"GCLB",
- b"_GEN",
- b"_GEO",
- b"_GLO",
- b"_GPS",
- b"_GSM",
- b"HTTP",
- b"_IMU",
- b"INFR",
- b"IRIG",
- b"IRNS",
- b"JPSI",
- b"JPSO",
- b"_L1C",
- b"_L1_",
- b"_L2C",
- b"_L2_",
- b"_L5_",
- b"LAT1",
- b"LAT2",
- b"LAT3",
- b"LAT4",
- b"LCS2",
- b"L_CS",
- b"_LIM",
- b"LON1",
- b"LON2",
- b"LON3",
- b"LON4",
- b"MAGN",
- b"_MEM",
- b"_MPR",
- b"OCTO",
- b"OMNI",
- b"_PAR",
- b"PDIF",
- b"_POS",
- b"_PPP",
- b"_PPS",
- b"PRTT",
- b"_PTP",
- b"QZSS",
- b"RAIM",
- b"_RAW",
- b"RCVT",
- b"RM3I",
- b"RM3O",
- b"RS_A",
- b"RS_B",
- b"RS_C",
- b"RS_D",
- b"RTMI",
- b"RTMO",
- b"SPEC",
- b"TCCL",
- b"_TCP",
- b"TCPO",
- b"_TLS",
- b"TRST",
- b"UDPO",
- b"_UHF",
- b"_USB",
- b"WAAS",
- b"WIFI",
- b"_WPT",
- )
-
- def send_able_4hz(self, able):
- "enable basic GREIS messages at 4Hz"
-
- self.expect_statement_identifier = 'greis'
-
- # the messages we want
- # [SX] requires 3.7 firmware, we use [SI] to support 3.6
- messages = b"jps/{RT,UO,GT,PV,SG,DP,SI,EL,AZ,EC,SS,ET}"
-
- if able:
- # Message rate must be an integer multiple of /par/raw/msint
- # Default msint is 0.100 seconds, so that must be changed first
- self.gps_send(b"%msint%set,/par/raw/msint,250")
-
- self.gps_send(b"%greis%em,," + messages + b":0.25")
- else:
- self.gps_send(b"%greis%dm,," + messages)
-
- def send_able_comp(self, able):
- "dis/enable COMPASS, aka BeiDou"
- self.expect_statement_identifier = 'cons'
- en_dis = b'y' if 1 == able else b'n'
- self.gps_send(b"%cons%set,/par/pos/sys/comp," + en_dis)
-
- def send_able_constellations(self, able):
- "dis/enable all constellations"
- self.expect_statement_identifier = 'cons7'
- en_dis = b'y' if 1 == able else b'n'
- self.gps_send(b"%cons1%set,/par/pos/sys/comp," + en_dis)
- self.gps_send(b"%cons2%set,/par/pos/sys/gal," + en_dis)
- # this will fail on TR-G2H, as it has no GLONASS
- self.gps_send(b"%cons3%set,/par/pos/sys/glo," + en_dis)
- self.gps_send(b"%cons4%set,/par/pos/sys/gps," + en_dis)
- self.gps_send(b"%cons5%set,/par/pos/sys/irnss," + en_dis)
- self.gps_send(b"%cons6%set,/par/pos/sys/sbas," + en_dis)
- self.gps_send(b"%cons7%set,/par/pos/sys/qzss," + en_dis)
-
- def send_able_defmsg(self, able):
- "dis/enable default messages at 1Hz"
- self.expect_statement_identifier = 'defmsg'
- if able:
- self.gps_send(b"%defmsg%em,,jps/RT,/msg/def:1,jps/ET")
- else:
- # leave RT and ET to break less?
- self.gps_send(b"%defmsg%dm,,/msg/def:1")
-
- def send_able_gal(self, able):
- "dis/enable GALILEO"
- self.expect_statement_identifier = 'cons'
- en_dis = b'y' if 1 == able else b'n'
- self.gps_send(b"%cons%set,/par/pos/sys/gal," + en_dis)
-
- def send_able_glo(self, able):
- "dis/enable GLONASS"
- self.expect_statement_identifier = 'cons'
- en_dis = b'y' if 1 == able else b'n'
- self.gps_send(b"%cons%set,/par/pos/sys/glo," + en_dis)
-
- def send_able_gps(self, able):
- "dis/enable GPS"
- self.expect_statement_identifier = 'cons'
- en_dis = b'y' if 1 == able else b'n'
- self.gps_send(b"%cons%set,/par/pos/sys/gps," + en_dis)
-
- def send_able_ipr(self, able):
- "dis/enable all Integer Psuedo-Range messages"
- self.expect_statement_identifier = 'em'
- if able:
- self.gps_send(b"%em%em,,jps/{rx,rc,r1,r2,r3,r5,rl}:0.25")
- else:
- self.gps_send(b"%em%dm,,jps/{rx,rc,r1,r2,r3,r5,rl}")
-
- def send_able_irnss(self, able):
- "dis/enable IRNSS"
- self.expect_statement_identifier = 'cons'
- en_dis = b'y' if 1 == able else b'n'
- self.gps_send(b"%cons%set,/par/pos/sys/irnss," + en_dis)
-
- def send_able_nmea41(self, able):
- "dis/enable basic NMEA 4.1e messages at 4Hz"
-
- self.expect_statement_identifier = 'nmea'
-
- messages = b"nmea/{GBS,GGA,GSA,GST,GSV,RMC,VTG,ZDA}"
-
- if able:
- # set NMEA version 4.1e
- self.gps_send(b"%nmeaver%set,/par/nmea/ver,v4.1e")
-
- # Message rate must be an integer multiple of /par/raw/msint
- # Default msint is 0.100 seconds, so that must be changed first
- self.gps_send(b"%msint%set,/par/raw/msint,250")
-
- # now we can set the messages we want
- self.gps_send(b"%nmea%em,," + messages + b":0.25")
- else:
- # disable
- self.gps_send(b"%nmea%dm,," + messages)
-
- def send_able_sbas(self, able):
- "dis/enable SBAS"
- self.expect_statement_identifier = 'cons'
- en_dis = b'y' if 1 == able else b'n'
- self.gps_send(b"%cons%set,/par/pos/sys/sbas," + en_dis)
-
- def send_able_qzss(self, able):
- "dis/enable QZSS"
- self.expect_statement_identifier = 'cons'
- en_dis = b'y' if 1 == able else b'n'
- self.gps_send(b"%cons%set,/par/pos/sys/qzss," + en_dis)
-
- def send_able_snr(self, able):
- "dis/enable all SNR messages, except [EC]"
- self.expect_statement_identifier = 'em'
- if able:
- self.gps_send(b"%em%em,,jps/{E1,E2,E3,E5,El}:0.25")
- else:
- self.gps_send(b"%em%dm,,jps/{E1,E2,E3,E5,El}")
-
- able_commands = {
- # en/disable basic GREIS messages at 4HZ
- "4HZ": {"command": send_able_4hz,
- "help": "basic GREIS messages at 4Hz"},
- # en/disable all constellations
- "CONS": {"command": send_able_constellations,
- "help": "all constellations"},
- # en/disable COMPASS, aka Beidou
- "COMPASS": {"command": send_able_comp,
- "help": "COMPASS"},
- # en/disable default message set.
- "DEFMSG": {"command": send_able_defmsg,
- "help": "default message set at 1Hz"},
- # en/disable GALILEO
- "GALILEO": {"command": send_able_gal,
- "help": "GALILEO"},
- # en/disable GLONASS
- "GLONASS": {"command": send_able_glo,
- "help": "GLONASS"},
- # en/disable GPS
- "GPS": {"command": send_able_gps,
- "help": "GPS"},
- # en/disable Integer Psuedo Range messages
- "IPR": {"command": send_able_ipr,
- "help": "all Integer Psuedo Range messages"},
- # en/disable IRNSS
- "IRNSS": {"command": send_able_irnss,
- "help": "IRNSS"},
- # en/disable NMEA 4.1e
- "NMEA": {"command": send_able_nmea41,
- "help": "basic messages NMEA 4.1 at 4Hz"},
- # en/disable SBAS
- "SBAS": {"command": send_able_sbas,
- "help": "SBAS"},
- # en/disable all SNRs
- "SNR": {"command": send_able_snr,
- "help": "all SNR messages, except [EC]"},
- # en/disable QZSS
- "QZSS": {"command": send_able_qzss,
- "help": "QZSS"},
- }
-
- def send_coldboot(self):
- "Delete NVRAM (almanac, ephemeris, location) and restart"
- self.expect_statement_identifier = 'coldboot'
- self.gps_send(b"%coldboot%init,/dev/nvm/a")
-
- def send_constellations(self):
- "poll all constellations"
- self.expect_statement_identifier = 'cons'
- self.gps_send(b"%cons%print,/par/pos/sys:on")
-
- def send_get_id(self):
- "get receiver id"
- self.expect_statement_identifier = 'id'
- self.gps_send(b"%id%print,/par/rcv/id")
-
- def send_get_oaf(self):
- "poll OAF (GPS opts)"
-
- self.expect_statement_identifier = 'opts,_WPT'
- if VERB_RAW <= opts['verbosity']:
- # get list of all opts
- self.gps_send(b"%opts,list%list,/par/opts")
-
- # request opts one at a time from canned list
- for s in self.oafs:
- self.gps_send(b"%%opts,%s%%print,/par/opts/%s" % (s, s))
-
- def send_get_serial(self):
- "get receiver serial number"
- self.expect_statement_identifier = 'serial'
- self.gps_send(b"%serial%print,/par/rcv/sn")
-
- def send_reset(self):
- "reset (reboot) the GPS"
- self.expect_statement_identifier = 'reset'
- self.gps_send(b"%reset%set,/par/reset,y")
-
- def send_set_dm(self):
- "disable all messages"
- self.expect_statement_identifier = 'dm'
- self.gps_send(b"%dm%dm")
-
- def send_set_ipr(self):
- "poll Integer Psuedo-Range messages"
- self.expect_statement_identifier = 'out'
- self.gps_send(b"%out%out,,jps/{rx,rc,r1,r2,r3,r5,rl}")
-
- def send_get_snr(self):
- "poll all SNR messages"
- # nothing we can wait on, depending on GPS model/configuration
- # we may never see some of E2, E3, E5 or El
- self.gps_send(b"%out%out,,jps/{EC,E1,E2,E3,E5,El}")
-
- def send_set_speed(self, set_speed):
- "change GPS speed"
- self.expect_statement_identifier = 'setspeed'
- self.gps_send(b"%%setspeed%%set,/par/cur/term/rate,%d" %
- set_speed)
-
- def send_get_vendor(self):
- "get receiver vendor"
- self.expect_statement_identifier = 'vendor'
- self.gps_send(b"%vendor%print,/par/rcv/vendor")
-
- def send_get_ver(self):
- "get receiver version, per section 4.4.3 of the specification"
- self.expect_statement_identifier = 'ver'
- self.gps_send(b"%ver%print,/par/rcv/ver")
-
- # list of canned commands that can be sent to the receiver
- commands = {
- "COLDBOOT": {"command": send_coldboot,
- "help": "cold boot the GPS"},
- "CONS": {"command": send_constellations,
- "help": "poll enabled constellations"},
- "DM": {"command": send_set_dm,
- "help": "disable all periodic messages"},
- "ID": {"command": send_get_id,
- "help": "poll receiver ID"},
- "IPR": {"command": send_set_ipr,
- "help": "poll all Integer Psuedo-range messages"},
- "OAF": {"command": send_get_oaf,
- "help": "poll all OAF options"},
- "RESET": {"command": send_reset,
- "help": "reset (reboot) the GPS"},
- "SERIAL": {"command": send_get_serial,
- "help": "poll receiver serial number"},
- "SNR": {"command": send_get_snr,
- "help": "poll all SNR messages"},
- "VENDOR": {"command": send_get_vendor,
- "help": "poll GPS vendor"},
- "VER": {"command": send_get_ver,
- "help": "poll GPS version"},
- }
-
-
-class gps_io(object):
- """All the GPS I/O in one place"
-
- Three types of GPS I/O
- 1. read only from a file
- 2. read/write through a device
- 3. read only from a gpsd instance
- """
-
- out = b''
- ser = None
- input_is_device = False
-
- def __init__(self, serial_class):
- "Initialize class"
-
- Serial = serial_class
- # buffer to hold read data
- self.out = b''
-
- # open the input: device, file, or gpsd
- if opts['input_file_name'] is not None:
- # check if input file is a file or device
- try:
- mode = os.stat(opts['input_file_name']).st_mode
- except OSError:
- sys.stderr.write('%s: failed to open input file %s\n' %
- (PROG_NAME, opts['input_file_name']))
- sys.exit(1)
-
- if stat.S_ISCHR(mode):
- # character device, need not be read only
- self.input_is_device = True
-
- if ((opts['disable'] or opts['enable'] or opts['poll'] or
- opts['oaf_name'])):
-
- # check that we can write
- if opts['read_only']:
- sys.stderr.write('%s: read-only mode, '
- 'can not send commands\n' % PROG_NAME)
- sys.exit(1)
- if self.input_is_device is False:
- sys.stderr.write('%s: input is plain file, '
- 'can not send commands\n' % PROG_NAME)
- sys.exit(1)
-
- if opts['target']['server'] is not None:
- # try to open local gpsd
- try:
- self.ser = gps.gpscommon(host=None)
- self.ser.connect(opts['target']['server'],
- opts['target']['port'])
-
- # alias self.ser.write() to self.write_gpsd()
- self.ser.write = self.write_gpsd
- # ask for raw, not rare, data
- data_out = b'?WATCH={'
- if opts['target']['device'] is not None:
- # add in the requested device
- data_out += (b'"device":"' + opts['target']['device'] +
- b'",')
- data_out += b'"enable":true,"raw":2}\r\n'
- if VERB_RAW <= opts['verbosity']:
- print("sent: ", data_out)
- self.ser.send(data_out)
- except socket.error as err:
- sys.stderr.write('%s: failed to connect to gpsd %s\n' %
- (PROG_NAME, err))
- sys.exit(1)
-
- elif self.input_is_device:
- # configure the serial connections (the parameters refer to
- # the device you are connecting to)
-
- try:
- self.ser = Serial.Serial(
- baudrate=opts['input_speed'],
- # 8N1 is GREIS default
- bytesize=Serial.EIGHTBITS,
- parity=Serial.PARITY_NONE,
- port=opts['input_file_name'],
- stopbits=Serial.STOPBITS_ONE,
- # read timeout
- timeout=0.05,
- # pyserial Ver 3.0+ changes writeTimeout to write_timeout
- # just set both
- write_timeout=0.5,
- writeTimeout=0.5,
- )
- except Serial.serialutil.SerialException:
- # this exception happens on bad serial port device name
- sys.stderr.write('%s: failed to open serial port "%s"\n'
- ' Your computer has these serial ports:\n'
- % (PROG_NAME, opts['input_file_name']))
-
- # print out list of supported ports
- import serial.tools.list_ports as List_Ports
- ports = List_Ports.comports()
- for port in ports:
- sys.stderr.write(" %s: %s\n" %
- (port.device, port.description))
- sys.exit(1)
-
- # flush input buffer, discarding all its contents
- self.ser.flushInput()
-
- else:
- # Read from a plain file of GREIS messages
- try:
- self.ser = open(opts['input_file_name'], 'rb')
- except IOError:
- sys.stderr.write('%s: failed to open input %s\n' %
- (PROG_NAME, opts['input_file_name']))
- sys.exit(1)
-
- def read(self, read_opts):
- "Read from device, until timeout or expected message"
-
- # are we expecting a certain message?
- if gps_model.expect_statement_identifier:
- # assume failure, until we see expected message
- ret_code = 1
- else:
- # not expecting anything, so OK if we did not see it.
- ret_code = 0
-
- try:
- if read_opts['target']['server'] is not None:
- # gpsd input
- start = time.clock()
- while read_opts['input_wait'] > (time.clock() - start):
- # First priority is to be sure the input buffer is read.
- # This is to prevent input buffer overuns
- if 0 < self.ser.waiting():
- # We have serial input waiting, get it
- # No timeout possible
- # RTCM3 JSON can be over 4.4k long, so go big
- new_out = self.ser.sock.recv(8192)
- if raw is not None:
- # save to raw file
- raw.write(new_out)
- self.out += new_out
-
- consumed = gps_model.decode_msg(self.out)
- self.out = self.out[consumed:]
- if ((gps_model.expect_statement_identifier and
- (gps_model.expect_statement_identifier ==
- gps_model.last_statement_identifier))):
- # Got what we were waiting for. Done?
- ret_code = 0
- if not read_opts['input_forced_wait']:
- # Done
- break
-
- elif self.input_is_device:
- # input is a serial device
- start = time.clock()
- while read_opts['input_wait'] > (time.clock() - start):
- # First priority is to be sure the input buffer is read.
- # This is to prevent input buffer overuns
- # pyserial 3.0 replaces inWaiting() with in_waiting
- if 0 < self.ser.inWaiting():
- # We have serial input waiting, get it
- # 1024 is comfortably large, almost always the
- # Read timeout is what causes ser.read() to return
- new_out = self.ser.read(1024)
- if raw is not None:
- # save to raw file
- raw.write(new_out)
- self.out += new_out
-
- consumed = gps_model.decode_msg(self.out)
- self.out = self.out[consumed:]
- if ((gps_model.expect_statement_identifier and
- (gps_model.expect_statement_identifier ==
- gps_model.last_statement_identifier))):
- # Got what we were waiting for. Done?
- ret_code = 0
- if not read_opts['input_forced_wait']:
- # Done
- break
- else:
- # ordinary file, so all read at once
- self.out += self.ser.read()
- if raw is not None:
- # save to raw file
- raw.write(self.out)
-
- while True:
- consumed = gps_model.decode_msg(self.out)
- self.out = self.out[consumed:]
- if 0 >= consumed:
- break
-
- except IOError:
- # This happens on a good device name, but gpsd already running.
- # or if USB device unplugged
- sys.stderr.write('%s: failed to read %s\n'
- '%s: Is gpsd already holding the port?\n'
- % (PROG_NAME, PROG_NAME,
- read_opts['input_file_name']))
- return 1
-
- if 0 < ret_code:
- # did not see the message we were expecting to see
- sys.stderr.write('%s: waited %0.2f seconds for, '
- 'but did not get: %%%s%%\n'
- % (PROG_NAME, read_opts['input_wait'],
- gps_model.expect_statement_identifier))
- return ret_code
-
- def write_gpsd(self, data):
- "write data to gpsd daemon"
-
- # HEXDATA_MAX = 512, from gps.h, The max hex digits can write.
- # Input data is binary, converting to hex doubles its size.
- # Limit binary data to length 255, so hex data length less than 510.
- if 255 < len(data):
- sys.stderr.write('%s: trying to send %d bytes, max is 255\n'
- % (PROG_NAME, len(data)))
- return 1
-
- if opts['target']['device'] is not None:
- # add in the requested device
- data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",'
- else:
- data_out = b'?DEVICE={'
-
- # Convert binary data to hex and build the message.
- data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n'
- if VERB_RAW <= opts['verbosity']:
- print("sent: ", data_out)
- self.ser.send(data_out)
- return 0
-
-
-def usage():
- "Print usage information, and exit"
-
- print("usage: %s [-?hrVW] [-c C] [-d D] [-e E] [-f F] [-O O] [-p P]\n"
- " [-R R] [-S S] [-s S] [-v V] [-w W]\n"
- " [server[:port[:device]]]\n\n" % PROG_NAME)
- print('usage: %s [options]\n'
- ' -? print this help\n'
- ' -c C send command C to GPS\n'
- ' -d D disable D\n'
- ' -e E enable E\n'
- ' -f F open F as file/device\n'
- ' default: %s\n'
- ' -h print this help\n'
- ' -O O send OAF file to GPS\n'
- ' -p P send preset GPS command P\n'
- ' -R R save raw data from GPS in file R\n'
- ' -r open file/device read only\n'
- ' default: %s\n'
- ' -S S configure GPS speed to S\n'
- ' -s S set port speed to S\n'
- ' default: %d bps\n'
- ' -V print version\n'
- ' -v V Set verbosity level to V, 0 to 4\n'
- ' default: %d\n'
- ' -W force entire wait time, no exit early\n'
- ' -w W wait time, exit early on -p result\n'
- ' default: %s seconds\n'
- ' [server[:port[:device]]] Connect to gpsd\n'
- ' default port: 2947\n'
- ' default device: None\n'
- '\n'
- 'D and E can be one of:' %
- (PROG_NAME, opts['input_file_name'], opts['raw_file'],
- opts['input_speed'], opts['verbosity'], opts['input_wait'])
- )
-
- # print list of enable/disable commands
- for item in sorted(gps_model.able_commands.keys()):
- print(" %-12s %s" % (item, gps_model.able_commands[item]["help"]))
-
- print('\nthe preset GPS command P can be one of:')
-
- # print list of possible canned commands
- for item in sorted(gps_model.commands.keys()):
- print(" %-12s %s" % (item, gps_model.commands[item]["help"]))
- print('\nOptions can be placed in the ZERKOPTS environment variable.\n'
- 'ZERKOPTS is processed before the CLI options.')
- sys.exit(0)
-
-
-# create the GREIS instance
-gps_model = greis()
-
-if 'ZERKOPTS' in os.environ:
- # grab the ZERKOPTS environment variable for options
- opts['progopts'] = os.environ['ZERKOPTS']
- options = opts['progopts'].split(' ') + sys.argv[1:]
-else:
- options = sys.argv[1:]
-
-try:
- (options, arguments) = getopt.getopt(options,
- "?c:d:e:f:hrp:s:w:v:O:R:S:WV")
-except getopt.GetoptError as err:
- sys.stderr.write("%s: %s\n"
- "Try '%s -h' for more information.\n" %
- (PROG_NAME, str(err), PROG_NAME))
- sys.exit(2)
-
-for (opt, val) in options:
- if opt == '-c':
- # command
- opts['command'] = val
- elif opt == '-d':
- # disable
- opts['disable'] = val
- elif opt == '-e':
- # enable
- opts['enable'] = val
- elif opt == '-f':
- # file input
- opts['input_file_name'] = val
- elif opt == '-h' or opt == '-?':
- # help
- usage()
- elif opt == '-p':
- # preprogrammed command
- opts['poll'] = val
- elif opt == '-r':
- # read only
- opts['read_only'] = True
- elif opt == '-s':
- # serial port speed
- opts['input_speed'] = int(val)
- if opts['input_speed'] not in gps_model.speeds:
- sys.stderr.write('%s: -s invalid speed %s\n' %
- (PROG_NAME, opts['input_speed']))
- sys.exit(1)
-
- elif opt == '-w':
- # max wait time, seconds
- opts['input_wait'] = float(val)
- elif opt in '-v':
- # verbosity level
- opts['verbosity'] = int(val)
- elif opt in '-O':
- # OAF .jpo file
- opts['oaf_name'] = val
- elif opt in '-R':
- # raw log file
- opts['raw_file'] = val
- elif opt in '-S':
- # set GPS serial port speed
- opts['set_speed'] = int(val)
- if opts['set_speed'] not in gps_model.speeds:
- sys.stderr.write('%s: -S invalid speed %s\n' %
- (PROG_NAME, opts['set_speed']))
- sys.exit(1)
-
- elif opt == '-W':
- # forced wait, no early exit on command completion
- opts['input_forced_wait'] = True
- elif opt == '-V':
- # version
- sys.stderr.write('zerk: Version %s\n' % gps_version)
- sys.exit(0)
-
-if opts['input_file_name'] is None:
- # no input file given
- # default to local gpsd
- opts['target']['server'] = "localhost"
- opts['target']['port'] = gps.GPSD_PORT
- opts['target']['device'] = None
- if arguments:
- # server[:port[:device]]
- arg_parts = arguments[0].split(':')
- opts['target']['server'] = arg_parts[0]
- if 1 < len(arg_parts):
- opts['target']['port'] = arg_parts[1]
- if 2 < len(arg_parts):
- opts['target']['device'] = arg_parts[2]
-
-elif arguments:
- sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME)
- sys.exit(1)
-
-if VERB_PROG <= opts['verbosity']:
- # dump all options
- print('Options:')
- for option in sorted(opts):
- print(" %s: %s" % (option, opts[option]))
-
-# done parsing arguments from environment and CLI
-
-try:
- # raw log file requested?
- raw = None
- if opts['raw_file']:
- try:
- raw = open(opts['raw_file'], 'w')
- except IOError:
- sys.stderr.write('%s: failed to open raw file %s\n' %
- (PROG_NAME, opts['raw_file']))
- sys.exit(1)
-
- # create the I/O instance
- io_handle = gps_io(serial)
-
- # keep it simple, only one of -O, -c -d -e or -S
- if opts['oaf_name'] is not None:
- # parse an OAF file
- try:
- oaf_root = xml.etree.ElementTree.parse(opts['oaf_name']).getroot()
- oaf = dict()
- for tag in ('id', 'oaf', 'hash'):
- oaf[tag] = oaf_root.find(tag).text
- oaf['oaf'] = oaf['oaf'].split('\n')
- if VERB_PROG <= opts['verbosity']:
- print(oaf)
- except xml.etree.ElementTree.ParseError:
- sys.stderr.write('%s: failed to parse OAF "%s"\n'
- % (PROG_NAME, opts['oaf_name']))
- sys.exit(1)
- except IOError:
- sys.stderr.write('%s: failed to read OAF "%s"\n'
- % (PROG_NAME, opts['oaf_name']))
- sys.exit(1)
-
- # calculate hash
- oaf_s = '\n'.join(oaf['oaf'])
- hash_s = hashlib.sha1(oaf_s).hexdigest()
- if hash_s != oaf['hash']:
- sys.stderr.write('%s: OAF bad hash "%s", s/b %s\n'
- % (PROG_NAME, hash_s, oaf['hash']))
- sys.exit(1)
-
- # TODO: probably should check the ID first...
- # TODO: prolly should send one command per handshake
- # blasting all commands at once, seems to not work reliably
- for command in oaf['oaf']:
- time.sleep(0.1) # wait 0.1 seconds each
- gps_model.gps_send(command)
- # this will detect when it is all done
- gps_model.gps_send(b'%DONE%')
- gps_model.expect_statement_identifier = 'DONE'
-
- elif opts['command'] is not None:
- # zero length is OK to send
- if 1 < len(opts['command']) and '%' != opts['command'][0]:
- # add ID, if missing
- gps_model.expect_statement_identifier = 'CMD'
- opts['command'] = "%CMD%" + opts['command']
-
- # add trailing new line
- opts['command'] += "\n"
-
- if VERB_QUIET < opts['verbosity']:
- sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command']))
- gps_model.gps_send(opts['command'])
-
- elif opts['disable'] is not None:
- if VERB_QUIET < opts['verbosity']:
- sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable']))
- if opts['disable'] in gps_model.able_commands:
- command = gps_model.able_commands[opts['disable']]
- command["command"](gps_model, 0)
- else:
- sys.stderr.write('%s: disable %s not found\n' %
- (PROG_NAME, opts['disable']))
- sys.exit(1)
-
- elif opts['enable'] is not None:
- if VERB_QUIET < opts['verbosity']:
- sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable']))
- if opts['enable'] in gps_model.able_commands:
- command = gps_model.able_commands[opts['enable']]
- command["command"](gps_model, 1)
- else:
- sys.stderr.write('%s: enable %s not found\n' %
- (PROG_NAME, opts['enable']))
- sys.exit(1)
-
- elif opts['poll'] is not None:
- if VERB_QUIET < opts['verbosity']:
- sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll']))
- if opts['poll'] in gps_model.commands:
- command = gps_model.commands[opts['poll']]
- command["command"](gps_model)
- else:
- sys.stderr.write('%s: poll %s not found\n' %
- (PROG_NAME, opts['poll']))
- sys.exit(1)
-
- elif opts['set_speed'] is not None:
- gps_model.send_set_speed(opts['set_speed'])
-
- exit_code = io_handle.read(opts)
-
- if ((VERB_RAW <= opts['verbosity']) and io_handle.out):
- # dump raw left overs
- print("Left over data:")
- print(io_handle.out)
-
- sys.stdout.flush()
- io_handle.ser.close()
-
-except KeyboardInterrupt:
- print('')
- exit_code = 1
-
-sys.exit(exit_code)