diff options
author | Gary E. Miller <gem@rellim.com> | 2018-09-24 15:21:42 -0700 |
---|---|---|
committer | Gary E. Miller <gem@rellim.com> | 2018-09-24 15:21:42 -0700 |
commit | 52a27d71f19563a40270b25ac1d127529e0a2360 (patch) | |
tree | 0c83b7d148f051bc277ba0dba79d4797f98e7d6a /ubxtool | |
parent | b02507f4f5f6c5fe36e3f7609308706818e075a8 (diff) | |
download | gpsd-52a27d71f19563a40270b25ac1d127529e0a2360.tar.gz |
ubxtool/zerk: install these two programs by default.
Diffstat (limited to 'ubxtool')
-rwxr-xr-x | ubxtool | 2377 |
1 files changed, 2377 insertions, 0 deletions
diff --git a/ubxtool b/ubxtool new file mode 100755 index 00000000..b8c80e95 --- /dev/null +++ b/ubxtool @@ -0,0 +1,2377 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 +''' +ubxtool -- u-blox configurator and packet decoder + +usage: ubxtool [OPTIONS] [server[:port[:device]]] +''' + +# This file is Copyright (c) 2018 by the GPSD project +# BSD terms apply: see the file COPYING in the distribution root for details. +# +# This code runs compatibly under Python 2 and 3.x for x >= 2. +# Preserve this property! +# +# ENVIRONMENT: +# Options in the UBXOPTS environment variable will be parsed before +# the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' +# +# To see what constellations are enabled: +# ubxtool -p GNSS -f /dev/ttyXX +# +# To disable GALILEO and enable GALILEO: +# ubxtool -d GLONASS -f /dev/ttyXX +# ubxtool -e GALILEO -f /dev/ttyXX +# +# To read GPS messages a log file: +# ubxtool -v 2 -f test/daemon/ublox-neo-m8n.log + +from __future__ import absolute_import, print_function, division + +import binascii # for binascii.hexlify() +import getopt # for getopt.getopt(), to parse CLI options +import os # for os.environ +import socket # for socket.error +import stat # for stat.S_ISBLK() +import struct # for pack() +import sys +import time + +PROG_NAME = 'ubxtool' + +try: + import serial +except ImportError: + # treat serial as special since it is not part of standard Python + sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME) + sys.exit(2) + +try: + import gps +except ImportError: + # PEP8 says local imports last + sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % + PROG_NAME) + sys.exit(2) + +gps_version = '3.18-dev' +if gps.__version__ != gps_version: + sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % + (PROG_NAME, gps_version, gps.__version__)) + sys.exit(1) + + +VERB_QUIET = 0 # quiet +VERB_NONE = 1 # just output requested data and some info +VERB_DECODE = 2 # decode all messages +VERB_INFO = 3 # more info +VERB_RAW = 4 # raw info +VERB_PROG = 5 # program trace + +# dictionary to hold all user options +opts = { + # command to send to GPS, -c + 'command': None, + # command for -d disable + 'disable': None, + # command for -e enable + 'enable': None, + # default input -f file + 'input_file_name': None, + # default forced wait? -W + 'input_forced_wait': False, + # default port speed -s + 'input_speed': 9600, + # default input wait time -w in seconds + 'input_wait': 2.0, + # optional mode to -p P + 'mode': None, + # the name of an OAF file, extension .jpo + 'oaf_name': None, + # poll command -p + 'poll': None, + # raw log file name + 'raw_file': None, + # open port read only -r + 'read_only': False, + # speed to set GPS -S + 'set_speed': None, + # target gpsd (server:port:device) to connect to + 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, + # verbosity level, -v + 'verbosity': VERB_NONE, + # contents of environment variable UBXOPTS + 'progopts': '', +} + + +class ubx(object): + "class to hold u-blox stuff" + + # when a statement identifier is received, it is stored here + last_statement_identifier = None + # expected statement identifier. + expect_statement_identifier = False + + def __init__(self): + pass + + # allowable speeds + speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600, + 4800, 2400, 1200, 600, 300) + + # UBX Satellite Numbering + gnss_id = {0: 'GPS', + 1: 'SBAS', + 2: 'Galileo', + 3: 'BeiDou', + 4: 'IMES', + 5: 'QZSS', + 6: 'GLONASS'} + + def ack_ack(self, buf): + "UBX-ACK-ACK decode" + m_len = len(buf) + if 2 > m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('<BB', buf, 0) + return ' ACK to: %s' % self.class_id_s(u[0], u[1]) + + ack_ids = {0: {'str': 'NAK', 'dec': ack_ack, 'name': 'UBX-ACK-NAK'}, + 1: {'str': 'ACK', 'dec': ack_ack, 'name': 'UBX-ACK-ACK'}} + + def cfg_ant(self, buf): + "UBX-CFG-ANT decode" + m_len = len(buf) + if 0 == m_len: + return "Poll request all" + + if 4 > m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('<HH', buf, 0) + s = ' flags: %#x pins: %#x (' % u + if u[0] & 0x1: + s += 'svcs ' + if u[0] & 0x2: + s += 'scd ' + if u[0] & 0x4: + s += 'ocd ' + if u[0] & 0x8: + s += 'pdwnOnSCD ' + if u[0] & 0x10: + s += 'recovery ' + s += (')\n pinSwitch: %d, pinSCD: %d, pinOCD: %d reconfig: %d\n' % + (u[1] & 0x1f, (u[1] >> 5) & 0x1f, (u[1] >> 10) & 0x1f, + u[1] >> 15)) + return s + + def cfg_cfg_mask(self, mask): + "decode Mask in UBX-CFG-CFG, return string" + s = '' + if mask & 0x1: + s += 'ioPort ' + if mask & 0x2: + s += 'msgConf ' + if mask & 0x4: + s += 'infMsg ' + if mask & 0x8: + s += 'navConf ' + if mask & 0x10: + s += 'rxmConf ' + if mask & 0x80: + # not on M8030 + s += 'senConf ' + if mask & 0x100: + s += 'rinvConf ' + if mask & 0x200: + s += 'antConf ' + if mask & 0x800: + s += 'logConf ' + if mask & 0x1000: + s += 'ftsConf ' + + return s + + def cfg_cfg(self, buf): + "UBX-CFG-CFG decode" + m_len = len(buf) + if 12 > m_len: + return "Bad Length %s" % m_len + + if 12 == m_len: + u = struct.unpack_from('<LLL', buf, 0) + else: + u = struct.unpack_from('<LLLB', buf, 0) + + s = ' clearMask: %#x (%s)\n' % (u[0], self.cfg_cfg_mask(u[0])) + s += (' saveMask: %#x (%s)\n' % + (u[1], self.cfg_cfg_mask(u[1]))) + s += (' loadMask: %#x (%s)\n' % + (u[2], self.cfg_cfg_mask(u[2]))) + + if 13 <= m_len: + bit_str = '' + if u[3] & 0x1: + bit_str += 'devBBR ' + if u[3] & 0x2: + bit_str += 'devFlash ' + if u[3] & 0x4: + bit_str += 'devEEPROM ' + if u[3] & 0x10: + bit_str += 'devSpiFlash ' + + s += (' deviceMask: %#x (%s)\n' % (u[3], bit_str)) + + return s + + def cfg_gnss(self, buf): + "UBX-CFG-GNSS decode" + m_len = len(buf) + if 0 == m_len: + return "Poll request" + + if m_len < 4: + return "Bad Length %d" % m_len + + u = struct.unpack_from('<BBBB', buf, 0) + s = " Ver: %u ChHw; %x ChUse: %x, Blocks: %x" % u + num_blocks = u[3] + i = 0 + while i < num_blocks: + u = struct.unpack_from('<BBBBBBBB', buf, 4 + (i*8)) + sat = u[0] + if u[0] in self.gnss_id: + s_sat = self.gnss_id[u[0]] + else: + s_sat = u[0] + s += ("\n gnssId: %s TrkCh: %d maxTrCh: %d," + " Flags: %#02x %02x %02x %02x" % + (s_sat, u[1], u[2], u[7], u[6], u[5], u[4])) + if sat in (0, 1): + # gps, sbas + if u[6] & 0x1: + s += '\n L1C/A' + if 2 == sat: + # Galileo + if u[6] & 0x1: + s += '\n E1OS' + if 3 == sat: + # BeiDou + if u[6] & 0x1: + s += '\n B1I' + if 4 == sat: + # IMES + if u[6] & 0x1: + s += '\n L1' + if 5 == sat: + # QZSS + if u[6] & 0x5: + s += '\n' + if u[6] & 0x1: + s += ' L1C/A' + if u[6] & 0x4: + s += ' L1SAIF' + if 6 == sat: + # Glonass + if u[6] & 0x1: + s += '\n L1OF' + if u[4] & 0x01: + s += ' enabled' + + i += 1 + return s + + def cfg_nav5(self, buf): + "UBX-CFG-NAV5 nav Engine Settings" + m_len = len(buf) + if 36 > m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('<HBBlLbBHHHHbbbbHHb', buf, 0) + s = (' mask %#x dynModel %u fixmode %d fixedAlt %d FixedAltVar %u\n' + ' minElev %d drLimit %u pDop %u tDop %u pAcc %u tAcc %u\n' + ' staticHoldThresh %u dgpsTimeOut %u cnoThreshNumSVs %u\n' + ' cnoThresh %u res %u staticHoldMacDist %u utcStandard %u' % u) + return s + + def cfg_msg(self, buf): + "UBX-CFG-MSG decode " + m_len = len(buf) + if 2 == m_len: + u = struct.unpack_from('<BB', buf, 0) + return ' Rate request: %s' % self.class_id_s(u[0], u[1]) + + if 3 == m_len: + u = struct.unpack_from('<BBB', buf, 0) + return (' Rate set: %s Rate:%d' % + (self.class_id_s(u[0], u[1]), u[2])) + + if 8 != m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('<BBBBBBBB', buf, 0) + s = (' %s Rates: %u %u %u %u %u %u' % + (self.class_id_s(u[0], u[1]), u[2], u[3], u[4], u[5], u[6], u[7])) + return s + + def cfg_pms(self, buf): + "UBX-CFG-PMS decode, Power Mode Setup" + + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 8 > m_len: + return "Bad Length %s" % m_len + + values = {0: "Full power", + 1: "Balanced", + 2: "Interval", + 3: "Aggresive with 1Hz", + 4: "Aggresive with 2Hz", + 5: "Aggresive with 4Hz", + 0xff: "Invalid" + } + u = struct.unpack_from('<BBHHBB', buf, 0) + s = (' version: %u powerSetupValue: %u' + ' period: %u onTime: %#x reserved1[%u %u]' % u) + if u[0] in values: + s += "\n powerSetupValue: %s" % values[u[0]] + + return s + + def cfg_prt(self, buf): + "UBX-CFG-PRT decode" + + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 1 == m_len: + return " Poll request PortID %d" % buf[0] + + if 20 > m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('<BBHLLHHH', buf, 0) + s = (' PortID: %u reserved1 %u\n' + ' txReady: %#x mode: %#x baudRate: %u inProtoMask: %#x' + ' outProtoMask: %#x\n flags: %#x' % u) + return s + + def cfg_sbas(self, buf): + "UBX-CFG-SBAS decode" + + m_len = len(buf) + if 8 > m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('<BBBBL', buf, 0) + return (' mode: %#x usage: %#x maxSBAS: %u scanMode2: %#x' + ' scanMode1: %#x' % u) + + def cfg_tmode2(self, buf): + "UBX-CFG-TMODE2 decode, Time Mode Settings 2" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 28 > m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('<BBHlllLLL', buf, 0) + s = (' timeMode: %u reserved1: %u usage: %#x\n' + ' ecefXOrLat: %d ecefYOrLon: %d ecefZOrLon: %d\n' + ' fixeedPosAcc %u svinMinDur %u svinAccLimit %u\n' % u) + return s + + def cfg_tp5(self, buf): + "UBX-CFG-TP5 decode, Time Pulse Parameters" + m_len = len(buf) + if 0 == m_len: + return " Poll request tpIdx 0" + + if 1 == m_len: + return " Poll request tpIdx %d" % buf[0] + + if 32 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<BBhhhLLLLLL', buf, 0) + s = ('tpIdx: %u, version: %u reserved1[%u %u]\n' + ' antCableDelay: %u rfGroupDelay %u freqPeriod: %u ' + 'freqPeriod %u\n' + ' pulseLenRatio: %u pulseLenRationLock %u userConfigDelay: %u\n' + 'Flags: %#x\n ' % u) + + if 0x01 & u[10]: + s += 'active, ' + else: + s += 'inactive, ' + if 0x02 & u[10]: + s += 'lockGnsFreq, ' + if 0x04 & u[10]: + s += 'lockedOtherSet, ' + if 0x08 & u[10]: + s += 'is frequency, ' + else: + s += 'is period, ' + if 0x10 & u[10]: + s += 'is pulse length\n ' + else: + s += 'is duty cycle\n ' + if 0x20 & u[10]: + s += 'alignToTow, ' + if 0x40 & u[10]: + s += 'rising, ' + else: + s += 'falling, ' + gridToGps = (u[10] >> 7) & 0x0f + gridToGpsDec = ('UTC', 'GPS', 'Glonass', 'BeiDou') + syncMode = (u[10] >> 11) & 0x03 + s += "gridToGps %s, syncMode %d " % (gridToGpsDec[gridToGps], syncMode) + + return s + + def cfg_usb(self, buf): + "UBX-CFG-USB decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 108 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<HHHHHH', buf, 0) + s = (' vendorID: %#x productID: %#x reserved1[%u %u]' + ' reserved2[%u %u]\n' + ' powerConsumption %u mA flags: %#x ' % u) + if 0x01 & u[5]: + s += "reEnum, " + if 0x02 & u[5]: + s += "self-powered" + else: + s += "bus-powered" + + s += '\nvendorString: %s\n' % gps.polystr(buf[12:43]) + s += 'productString: %s\n' % gps.polystr(buf[44:75]) + s += 'serialNumber: %s' % gps.polystr(buf[76:107]) + return s + + cfg_ids = {0: {'str': 'PRT', 'dec': cfg_prt, 'name': 'UBX-CFG-PRT'}, + 1: {'str': 'MSG', 'dec': cfg_msg, 'name': 'UBX-CFG-MSG'}, + 2: {'str': 'INF', 'name': 'UBX-CFG-INF'}, + 4: {'str': 'RST', 'name': 'UBX-CFG-RST'}, + 6: {'str': 'DAT', 'name': 'UBX-CFG-DAT'}, + 8: {'str': 'RATE', 'name': 'UBX-CFG-RATE'}, + 9: {'str': 'CFG', 'dec': cfg_cfg, 'name': 'UBX-CFG-CFG'}, + 0x11: {'str': 'RXM', 'name': 'UBX-CFG-RXM'}, + 0x13: {'str': 'ANT', 'dec': cfg_ant, 'name': 'UBX-CFG-ANT'}, + 0x16: {'str': 'SBAS', 'dec': cfg_sbas, 'name': 'UBX-CFG-SBAS'}, + 0x17: {'str': 'NMEA', 'name': 'UBX-CFG-NMEA'}, + 0x1b: {'str': 'USB', 'dec': cfg_usb, 'name': 'UBX-CFG-USB'}, + 0x1e: {'str': 'ODO', 'name': 'UBX-CFG-ODO'}, + 0x23: {'str': 'NAVX5', 'name': 'UBX-CFG-NAVX5'}, + 0x24: {'str': 'NAV5', 'dec': cfg_nav5, 'name': 'UBX-CFG-NAV5'}, + 0x31: {'str': 'TP5', 'dec': cfg_tp5, 'name': 'UBX-CFG-TP5'}, + 0x34: {'str': 'RINV', 'name': 'UBX-CFG-RINV'}, + 0x39: {'str': 'ITFM', 'name': 'UBX-CFG-ITFM'}, + 0x3b: {'str': 'PM2', 'name': 'UBX-CFG-PM2'}, + 0x3d: {'str': 'TMODE2', 'dec': cfg_tmode2, + 'name': 'UBX-CFG-TMODE2'}, + 0x3e: {'str': 'GNSS', 'dec': cfg_gnss, 'name': 'UBX-CFG-GNSS'}, + 0x47: {'str': 'LOGFILTER', 'name': 'UBX-CFG-LOGFILTER'}, + 0x53: {'str': 'TXSLOT', 'name': 'UBX-CFG-TXSLOT'}, + 0x57: {'str': 'PWR', 'name': 'UBX-CFG-PWR'}, + 0x5c: {'str': 'HNR', 'name': 'UBX-CFG-HNR'}, + 0x60: {'str': 'ESRC', 'name': 'UBX-CFG-ESRC'}, + 0x61: {'str': 'DOSC', 'name': 'UBX-CFG-OSC'}, + 0x62: {'str': 'SMGR', 'name': 'UBX-CFG-SMGR'}, + 0x69: {'str': 'GEOFENCE', 'name': 'UBX-CFG-GEOFENCE'}, + 0x70: {'str': 'DGNSS', 'name': 'UBX-CFG-DGNSS'}, + 0x71: {'str': 'TMODE3', 'name': 'UBX-CFG-TMODE3'}, + 0x84: {'str': 'FIXSEED', 'name': 'UBX-CFG-FIXSEED'}, + 0x85: {'str': 'DYNSEED', 'name': 'UBX-CFG-DYNSEED'}, + 0x86: {'str': 'PMS', 'dec': cfg_pms, 'name': 'UBX-CFG-PMS'}, + } + + def inf_debug(self, buf): + "UBX-INF-DEBUG decode" + return ' Debug: ' + gps.polystr(buf) + + def inf_error(self, buf): + "UBX-INF-ERROR decode" + return ' Error: ' + gps.polystr(buf) + + def inf_notice(self, buf): + "UBX-INF-NOTICE decode" + return ' Notice: ' + gps.polystr(buf) + + def inf_test(self, buf): + "UBX-INF-TET decode" + return ' Test: ' + gps.polystr(buf) + + def inf_warning(self, buf): + "UBX-INF-WARNING decode" + return ' Warning: ' + gps.polystr(buf) + + inf_ids = {0x0: {'str': 'ERROR', 'dec': inf_error, + 'name': 'UBX-INF-ERROR'}, + 0x1: {'str': 'WARNING', 'dec': inf_warning, + 'name': 'UBX-INF-WARNING'}, + 0x2: {'str': 'NOTICE', 'dec': inf_notice, + 'name': 'UBX-INF-NOTICE'}, + 0x3: {'str': 'TEST', 'dec': inf_test, + 'name': 'UBX-INF-TEST'}, + 0x4: {'str': 'DEBUG', 'dec': inf_debug, + 'name': 'UBX-INF-DEBUG'}, + } + + def mon_ver(self, buf): + "UBX-MON-VER decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 40 > m_len: + return " Bad Length %s" % m_len + + substr = buf.split('\0')[0] + s = ' swVersion: %s\n' % (substr) + substr = buf[30:39] + substr = substr.split('\0')[0] + s += ' hwVersion: %s' % (substr[30:39]) + # extensions?? + num_ext = int((m_len - 40) / 30) + i = 0 + while i < num_ext: + loc = 40 + (i * 30) + substr = buf[loc:] + substr = substr.split('\0')[0] + s += '\n extension: %s' % (substr) + i += 1 + return s + + mon_ids = {2: {'str': 'IO', 'name': 'UBX-MON-IO'}, + 4: {'str': 'VER', 'dec': mon_ver, 'name': 'UBX-MON-VER'}, + 6: {'str': 'MSGPP', 'name': 'UBX-MON-MSGPP'}, + 7: {'str': 'RXBUF', 'name': 'UBX-MON-RXBUF'}, + 8: {'str': 'TXBUF', 'name': 'UBX-MON-TXBUF'}, + 9: {'str': 'HW', 'name': 'UBX-MON-HW'}, + 0x0b: {'str': 'HW2', 'name': 'UBX-MON-HW2'}, + 0x21: {'str': 'RXR', 'name': 'UBX-MON-RXR'}, + 0x27: {'str': 'PATCH', 'name': 'UBX-MON-PATCH'}, + 0x28: {'str': 'GNSS', 'name': 'UBX-MON-GNSS'}, + 0x2e: {'str': 'SMGR', 'name': 'UBX-MON-SMGR'}, + } + + def nav_clock(self, buf): + "UBX-NAV-CLOCK decode, Clock Solution" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 20 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LllLL', buf, 0) + return (' iTOW:%d ms, clkB:%d ns clkD:%d ns/s tAcc:%d ns,' + 'fAcc:%d ns/s' % u) + + def nav_dgps(self, buf): + "UBX-NAV-DGPS decode, DGPS Data used for NAV" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 16 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LlhhBBBB', buf, 0) + s = (' iTOW:%d ms, age:%d ms, baseID:%d basehealth:%d numCh:%d\n' + ' status:%#x reserved1[%u %u]' % u) + + m_len -= 16 + i = 0 + while 0 < m_len: + u = struct.unpack_from('<BbHff', buf, 16 + i * 12) + # dunno how to do R4 + s += ('\n svid %3u flags %#4x ageC:%d ms prc:%f prcc:%f' % u) + m_len -= 12 + i += 1 + + return s + + def nav_dop(self, buf): + "UBX-NAV-DOP decode, Dilution of Precision" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 18 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<Lhhhhhhh', buf, 0) + s = (' iTOW:%d ms, gDOP:%.2f pDOP:%.2f tDOP:%.2f vDOP:%.2f\n' + ' hDOP:%.2f nDOP:%.2f eDOP:%.2f' % + (u[0], u[1] / 100.0, u[2] / 100.0, u[3] / 100.0, + u[4] / 100.0, u[5] / 100.0, u[6] / 100.0, u[7] / 100.0)) + return s + + def nav_navx5(self, buf): + "UBX-CFG-NAVX5 decode" + + # length == 20 case seems broken? + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 20 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<HHLHBBBBBHBH', buf, 0) + s = (' version %u mask1 %#x mask2 %#x minSVs %d maxSVs %d minCNO %u\n' + ' iniFix3D %u ackAiding %u wknRollover %u' % + (u[0], u[1], u[2], u[4], u[5], u[6], u[8], u[10], u[11])) + + if 40 <= m_len: + u = struct.unpack_from('<BBHHLHBB', buf, 20) + s = ('\n usePPP %d aopCfg %d aopOrbMaxErr %u useAdr %u' % + (u[0], u[1], u[3], u[7])) + return s + + def nav_posecef(self, buf): + "UBX-NAV-POSECEF decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 20 != m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LlllL', buf, 0) + return ' iTOW:%d ms, ecefX:%d cm Y:%d cm Z:%d cm\n pAcc:%d cm' % u + + def nav_pvt(self, buf): + "UBX-NAV-PVT decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 92 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LHBBBBBBLlBBBBllllLLlllllLLHbbbbbblhH', + buf, 0) + s = (' iTOW %d ms, time %d/%d/%d %2d:%2d:%2d valid %#x\n' + ' tAcc %d ns nano %d fixType %d flags %#x flags2 %#x\n' + ' numSV %d lon %.7f lat %.7f height %.3f\n' + ' hMSL %.3f hAcc %.3f' % + (u[0], u[1], u[2], u[3], u[4], u[5], u[6], u[7], + u[8], u[9], u[10], u[11], u[12], + u[13], u[14] * 10e-7, u[15] * 10e-7, u[16] / 1000, + u[17] / 1000, u[18] / 1000)) + return s + + def nav_sat(self, buf): + "UBX-NAV-SAT decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 8 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LBB', buf, 0) + s = ' iTOW %d ms, version %d numSvs %d' % u + + m_len -= 8 + i = 0 + while 0 < m_len: + u = struct.unpack_from('<BBBbhhL', buf, 8 + i * 12) + s += ('\n gnssd %d svid %3d cno %2d elev %3d azim %3d prRes %6d' + ' flags %#x\n' % u) + if 0 < u[6]: + s += ' ' + s += 'qualityInd %u ' % (0x07 & u[6]) + if 8 & u[6]: + s += 'svUsed ' + s += 'health %u ' % (0x03 & (u[6] >> 4)) + if 0x40 & u[6]: + s += 'diffCorr ' + if 0x80 & u[6]: + s += 'smoothed ' + s += 'orbitSource %u ' % (0x07 & (u[6] >> 8)) + if 0x800 & u[6]: + s += 'ephAvail ' + if 0x1000 & u[6]: + s += 'almAvail ' + if 0x2000 & u[6]: + s += 'anoAvail ' + if 0x4000 & u[6]: + s += 'aopAvail ' + + if 0x730000 & u[6]: + s += '\n' + if 0x10000 & u[6]: + s += 'sbasCorrused ' + if 0x20000 & u[6]: + s += 'rtcmCorrused ' + if 0x1000000 & u[6]: + s += 'prCorrused ' + if 0x2000000 & u[6]: + s += 'crCorrused ' + if 0x4000000 & u[6]: + s += 'doCorrused ' + m_len -= 12 + i += 1 + + return s + + def nav_sbas(self, buf): + "UBX-NAV-SBAS decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 12 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LBBbBb', buf, 0) + s = (' iTOW:%d ms, geo:%u mode:%#x, sys:%#x service:%#x cnt:%d' % u) + + m_len -= 12 + i = 0 + while 0 < m_len: + u = struct.unpack_from('<BBBBBBhhh', buf, 12 + (i * 12)) + s += ('\n svid %3d flags %#4x udre:%#2x svSys:%3d syService:%2d' + ' prc:%3d ic:%3d' % + (u[0], u[1], u[2], u[3], u[4], u[6], u[8])) + if 0x0f & u[4]: + s += '\n svService: ' + if 1 & u[4]: + s += 'Ranging ' + if 2 & u[4]: + s += 'Corrections ' + if 4 & u[4]: + s += 'Integrity ' + if 8 & u[4]: + s += 'Testmode' + m_len -= 12 + i += 1 + + return s + + fix_types = ('None', 'Dead Reckoning', '2D', '3D', 'GPS+DR', 'Surveyed') + + def nav_sol(self, buf): + "UBX-NAV-SOL decode deprecated by u-blox" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 52 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LlhBBlllLlllLHBBBBB', buf, 0) + s = (' iTOW:%u ms, fTOW %u ns, week:%d gpsFix:%d flags:%#x\n' + ' ECEF X:%.3f Y:%.3f Z:%.3f pAcc:%.3f\n' + ' VECEF X:%.3f Y:%.3f Z:%.3f vAcc:%.3f\n' + ' pDOP:%.2f numSV:%d' % + (u[0], u[1], u[2], u[3], u[4], + u[5] / 100.0, u[6] / 100.0, u[7] / 100.0, u[8] / 100.0, + u[9] / 100.0, u[10] / 100.0, u[11] / 100.0, u[12] / 100.0, + u[13] / 100.0, u[15])) + if u[3] < len(self.fix_types): + s += '\n gpsFix: ' + self.fix_types[u[3]] + if 0x0f & u[4]: + s += '\n flags: ' + if 1 & u[4]: + s += 'GPSfixOK ' + if 2 & u[4]: + s += 'DiffSoln ' + if 4 & u[4]: + s += 'WKNSET ' + if 8 & u[4]: + s += 'TOWSET' + return s + + def nav_status(self, buf): + "UBX-NAV-STATUS decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 16 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LBBBBLL', buf, 0) + return (' iTOW:%d ms, fix:%d flags:%#x fixstat:%#x flags2:%#x\n' + ' ttff:%d, msss:%d' % u) + + def nav_svin(self, buf): + "UBX-NAV-SVIN decode, Survey-in data" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 40 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<BBBBLLlllbbbBLLBB', buf, 0) + return (' version %u reserved1[%u %u %u] iTOW %u dur %u\n' + ' meanX %d meanY %d meanZ %d\n' + ' meanXHP %d meanYHP %d meanZHP %d reserved2 %u meanAcc %u\n' + ' obs %u valid %u active %u\n' % u) + + def nav_svinfo(self, buf): + "UBX-NAV-SVINFO decode" + m_len = len(buf) + if 0 == m_len: + return "Poll request" + + if 8 > m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('<Lbb', buf, 0) + s = ' iTOW:%d ms, numCh:%d globalFlags:%d' % u + + m_len -= 8 + i = 0 + while 0 < m_len: + u = struct.unpack_from('<BBBBBbhl', buf, 8 + i * 12) + s += ('\n chn %3d svid %3d flags %#0.2x quality %#x cno %2d' + ' elev %3d azim %3d prRes %6d' % u) + if 0 < u[2]: + s += '\n ' + if 1 & u[2]: + s += 'svUsed ' + if 2 & u[2]: + s += 'diffCorr ' + if 4 & u[2]: + s += 'orbitAvail ' + if 8 & u[2]: + s += 'orbitEph ' + if 0x10 & u[2]: + s += 'unhealthy ' + if 0x20 & u[2]: + s += 'orbitAlm ' + if 0x40 & u[2]: + s += 'orbitAop ' + if 0x80 & u[2]: + s += 'smoothed ' + m_len -= 12 + i += 1 + + return s + + def nav_timebds(self, buf): + "UBX-NAV-TIMEBDS decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 20 > m_len: + return ".Bad Length %s" % m_len + + u = struct.unpack_from('<LLlhbBL', buf, 0) + return (' iTOW:%d ms, SOW:%d fSOW:%d week %d leapS:%d Valid:%#x\n' + 'tAcc:%d ns' % u) + + def nav_timegal(self, buf): + "UBX-NAV-TIMEGAL decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 20 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LLlhbBL', buf, 0) + return (' iTOW:%d ms, galTOW:%.9f galWno:%d leapS:%d Valid:%#x\n' + ' tAcc:%d ns' % + (u[0], u[1] + (u[2] * 10e-9), u[3], u[4], u[5], u[6])) + + def nav_timegps(self, buf): + "UBX-NAV-TIMEGPS decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 16 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LlhbBL', buf, 0) + s = (' iTOW:%u ms, fTOW:%u ns, week:%d leapS:%d valid:%#x tAcc:%d ns' % + u) + if 0x07 & u[4]: + s += '\n valid: ' + if 1 & u[4]: + s += 'towValid ' + if 2 & u[4]: + s += 'weekValid ' + if 4 & u[4]: + s += 'leapValid ' + return s + + def nav_timeutc(self, buf): + "UBX-NAV-TIMEUTC decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 20 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LLlHbbbbbb', buf, 0) + return (' iTOW:%d ms, tAcc:%d ns nano:%d ns Time: %d/%d/%d %d:%d:%d\n' + ' valid:%#x' % u) + + def nav_velecef(self, buf): + "UBX-NAV-VELECEF decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 20 != m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LlllL', buf, 0) + return (' iTOW:%d ms,' + ' ecefVX:%.2f m/s VY:%.2f m/s VZ:%.2f m/s vAcc:%.2f m/s' % + (u[0], u[1] / 100.0, u[2] / 100.0, u[3] / 100.0, + u[4] / 100.0)) + + def nav_velned(self, buf): + "UBX-NAV-VELNED decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 36 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LlllLLlLL', buf, 0) + return (' iTOW:%d ms,' + ' velN:%d cm/s velE:%d cm/s velD:%d cm/s speed:%d cm/s\n' + ' gspeed:%d cm/s heading:%f cm/s sAcc:%d cm/s cAcc:%d deg' % + (u[0], u[1], u[2], u[3], u[4], u[5], u[6] * 1e-5, + u[7], u[8] * 1e-5)) + + nav_ids = {1: {'str': 'POSECEF', 'dec': nav_posecef, + 'name': 'UBX-NAV-POSECEF'}, + 2: {'str': 'POSLLH', 'name': 'UBX-NAV-POSLLH'}, + 3: {'str': 'STATUS', 'dec': nav_status, + 'name': 'UBX-NAV-STATUS'}, + 0x4: {'str': 'DOP', 'dec': nav_dop, 'name': 'UBX-NAV-DOP'}, + 0x5: {'str': 'ATT', 'name': 'UBX-NAV-ATT'}, + 0x6: {'str': 'SOL', 'dec': nav_sol, 'name': 'UBX-NAV-SOL'}, + 0x7: {'str': 'PVT', 'dec': nav_pvt, 'name': 'UBX-NAV-PVT'}, + 0x9: {'str': 'ODO', 'name': 'UBX-NAV-ODO'}, + 0x10: {'str': 'RESETODO', 'name': 'UBX-NAV-RESETODO'}, + 0x11: {'str': 'VELECEF', 'dec': nav_velecef, + 'name': 'UBX-NAV-VELECEF'}, + 0x12: {'str': 'VELNED', 'dec': nav_velned, + 'name': 'UBX-NAV-VELNED'}, + 0x13: {'str': 'HPPOSECEF', 'name': 'UBX-NAV-HPPOSECEF'}, + 0x14: {'str': 'HPPOSLLH', 'name': 'UBX-NAV-HPPOSLLH'}, + 0x20: {'str': 'TIMEGPS', 'dec': nav_timegps, + 'name': 'UBX-NAV-TIMEGPS'}, + 0x21: {'str': 'TIMEUTC', 'dec': nav_timeutc, + 'name': 'UBX-NAV-TIMEUTC'}, + 0x22: {'str': 'CLOCK', 'dec': nav_clock, + 'name': 'UBX-NAV-CLOCK'}, + 0x23: {'str': 'NAVX5', 'dec': nav_navx5, + 'name': 'UBX-NAV-NAVX5'}, + 0x24: {'str': 'TIMEBDS', 'dec': nav_timebds, + 'name': 'UBX-NAV-TIMEBDS'}, + 0x25: {'str': 'TIMEGAL', 'dec': nav_timegal, + 'name': 'UBX-NAV-TIMEGAL'}, + 0x26: {'str': 'TIMEGLO', 'name': 'UBX-NAV-TIMEGLO'}, + 0x30: {'str': 'SVINFO', 'dec': nav_svinfo, + 'name': 'UBX-NAV-SVINFO'}, + 0x31: {'str': 'DGPS', 'dec': nav_dgps, 'name': 'UBX-NAV-DGPS'}, + 0x32: {'str': 'SBAS', 'dec': nav_sbas, 'name': 'UBX-NAV-SBAS'}, + 0x34: {'str': 'ORB', 'name': 'UBX-NAV-ORB'}, + 0x35: {'str': 'SAT', 'dec': nav_sat, 'name': 'UBX-NAV-SAT'}, + 0x39: {'str': 'GEOFENCE', 'name': 'UBX-NAV-GEOFENCE'}, + 0x3B: {'str': 'SVIN', 'dec': nav_svin, 'name': 'UBX-NAV-SVIN'}, + 0x3C: {'str': 'RELPOSNED', 'name': 'UBX-NAV-RELPOSNED'}, + 0x60: {'str': 'AOPSTATUS', 'name': 'UBX-NAV-AOPSTATUS'}, + 0x61: {'str': 'EOF', 'name': 'UBX-NAV-EOF'}, + } + + def rxm_raw(self, buf): + "UBX-RXM-RAW decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 8 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LlhhBBBB', buf, 0) + s = ' iTOW:%d ms weeks:%d numSV:%d' % u + + m_len -= 8 + i = 0 + while 0 < m_len: + u = struct.unpack_from('<ddfBbbB', buf, 8 + i * 24) + s += ('\n cpMes:%f prMes:%f doMes:%f sv:%d mesQI:%d\n' + ' eno:%d lli:%d' % u) + m_len -= 24 + i += 1 + + return s + + def rxm_rawx(self, buf): + "UBX-RXM-RAWX decode" + m_len = len(buf) + if 16 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<dHbBBB', buf, 0) + s = (' rcvTow %.3f week %u leapS %d numMeas %u recStat %#x' + ' version %u' % u) + + m_len -= 16 + i = 0 + while 0 < m_len: + u = struct.unpack_from('<ddfBBBBHBBBBB', buf, 16 + i * 32) + s += ('\n prmes %.3f cpMes %.3f doMes %f\n' + ' gnssID %u svId %u %u freqId %u locktime %u cno %u\n' + ' prStdev %u, cpStdev %u doStdev %u trkStat %u' % u) + m_len -= 32 + i += 1 + return s + + def rxm_sfrb(self, buf): + "UBX-RXM-SFRB decode, Subframe Buffer" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 42 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<BBLLLLLLLLLL', buf, 0) + s = (' chn:%d s svid %3d\n' + ' dwrd:%08x %08x %08x %08x %08x\n' + ' %08x %08x %08x %08x %08x' % u) + + return s + + def rxm_sfrbx(self, buf): + "UBX-RXM-SFRBX decode, Broadcast Navigation Data Subframe" + m_len = len(buf) + + if 8 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<BBBBBBBB', buf, 0) + s = (' gnssId:%u svId %3u reserved1 %u freqId %u numWords %u\n' + ' reserved2 %u version %u reserved3 %u\n' % u) + s += ' dwrd' + for i in range(8, m_len - 1, 4): + u = struct.unpack_from('<L', buf, i) + s += " %08x" % u + + return s + + def rxm_svsi(self, buf): + "UBX-RXM-SVSI decode, SV Status Info" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 8 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LhBB', buf, 0) + s = ' iTOW:%d ns week:%d numVis:%d numSV:%d' % u + + m_len -= 8 + i = 0 + while 0 < m_len: + u = struct.unpack_from('<BBhbB', buf, 8 + i * 6) + s += '\n svid:%3d svFlag:%#x azim:%3d elev:% 3d age:%3d' % u + m_len -= 6 + i += 1 + + return s + + rxm_ids = {0x10: {'str': 'RAW', 'dec': rxm_raw, + 'name': 'UBX-RXM-RAW'}, # obsolete + 0x11: {'str': 'SFRB', 'dec': rxm_sfrb, + 'name': 'UBX-RXM-SFRB'}, + 0x13: {'str': 'SFRBX', 'dec': rxm_sfrbx, + 'name': 'UBX-RXM-SFRBX'}, + 0x14: {'str': 'MEASX', 'name': 'UBX-RXM-MEASX'}, + 0x15: {'str': 'RAWX', 'dec': rxm_rawx, 'name': 'UBX-RXM-RAWX'}, + 0x20: {'str': 'SVSI', 'dec': rxm_svsi, 'name': 'UBX-RXM-SVSI'}, + 0x32: {'str': 'RTCM', 'name': 'UBX-RXM-RTCM'}, + 0x41: {'str': 'PMREQ', 'name': 'UBX-RXM-PMREQ'}, + 0x59: {'str': 'RLM', 'name': 'UBX-RXM-RLM'}, + 0x61: {'str': 'IMES', 'name': 'UBX-RXM-IMES'}, + } + + def tim_svin(self, buf): + "UBX-TIM-SVIN decode, Survey-in data" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 28 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LlllLLBB', buf, 0) + s = (' dur %u meanX %d meanY %d meanZ %d\n' + ' meanV %u obs %u valid %u active %u' % u) + return s + + def tim_tp(self, buf): + "UBX-TIM-TP decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 16 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('<LLlHbb', buf, 0) + s = (' towMS:%u ms, towSubMS:%u qErr:%d ps, week:%d\n' + ' flags:%#x refInfo:%#x\n flags: ' % u) + + if 0x01 & u[4]: + s += "timeBase is UTC, " + else: + s += "timeBase is GNSS, " + if 0x02 & u[4]: + s += "UTC available, " + else: + s += "UTC not available, " + + raim = (u[4] >> 2) & 0x03 + if 0 == raim: + s += "RAIM not available" + elif 1 == raim: + s += "RAIM not active" + elif 2 == raim: + s += "RAIM active" + else: + s += "RAIM ??" + return s + + tim_ids = {1: {'str': 'TP', 'dec': tim_tp, 'name': 'UBX-TIM-TP'}, + 3: {'str': 'TM2', 'name': 'UBX-TIM-TM2'}, + 4: {'str': 'SVIN', 'dec': tim_svin, 'name': 'UBX-TIM-SVIN'}, + 6: {'str': 'VRFY', 'name': 'UBX-TIM-VRFY'}, + 0x11: {'str': 'DOSC', 'name': 'UBX-TIM-DOSC'}, + 0x12: {'str': 'TOS', 'name': 'UBX-TIM-TOS'}, + 0x13: {'str': 'SMEAS', 'name': 'UBX-TIM-SMEAS'}, + 0x15: {'str': 'VCOCAL', 'name': 'UBX-TIM-VCOCAL'}, + 0x16: {'str': 'FCHG', 'name': 'UBX-TIM-FCHG'}, + 0x17: {'str': 'HOC', 'name': 'UBX-TIM-HOC'}, + } + + classes = { + 0x01: {'str': 'NAV', 'ids': nav_ids}, + 0x02: {'str': 'RXM', 'ids': rxm_ids}, + 0x04: {'str': 'INF', 'ids': inf_ids}, + 0x05: {'str': 'ACK', 'ids': ack_ids}, + 0x06: {'str': 'CFG', 'ids': cfg_ids}, + 0x09: {'str': 'UPD'}, + 0x0A: {'str': 'MON', 'ids': mon_ids}, + 0x0B: {'str': 'ATD'}, + 0x0D: {'str': 'TIM', 'ids': tim_ids}, + 0x10: {'str': 'ESF'}, + 0x13: {'str': 'MGA'}, + 0x21: {'str': 'LOG'} + } + + def class_id_s(self, m_class, m_id): + "Return class and ID numbers as a string." + s = 'Class: ' + if m_class not in self.classes: + s += '%#x ID: %#x' % (m_class, m_id) + return s + + if 'str' in self.classes[m_class]: + s_class = self.classes[m_class]['str'] + s += '%s(%#x) ' % (s_class, m_class) + else: + s += '%#x ' % (m_class) + + if (('ids' in self.classes[m_class] and + m_id in self.classes[m_class]['ids'] and + 'str' in self.classes[m_class]['ids'][m_id])): + s_id = self.classes[m_class]['ids'][m_id]['str'] + s += 'ID: %s(%#x)' % (s_id, m_id) + else: + s += 'ID: %#x' % (m_id) + + return s + + def decode_msg(self, out): + "Decode one message and then return number of chars consumed" + + state = 'BASE' + consumed = 0 + + # decode state machine + for this_byte in out: + consumed += 1 + if isinstance(this_byte, str): + # a character, probably read from a file + c = ord(this_byte) + else: + # a byte, probably read from a serial port + c = int(this_byte) + + if VERB_RAW <= opts['verbosity']: + if (ord(' ') <= c) and (ord('~') >= c): + # c is printable + print("state: %s char %c (%#x)" % (state, chr(c), c)) + else: + # c is not printable + print("state: %s char %#x" % (state, c)) + + if 'BASE' == state: + # start fresh + # place to store 'comments' + comment = '' + m_class = 0 + m_id = 0 + m_len = 0 + m_raw = bytearray(0) # class, id, len, payload + m_payload = bytearray(0) # just the payload + m_ck_a = 0 + m_ck_b = 0 + + if 0xb5 == c: + # got header 1, mu + state = 'HEADER1' + + if ord('$') == c: + # got $, so NMEA? + state = 'NMEA' + comment = '$' + + if ord("{") == c: + # JSON, treat as comment line + state = 'JSON' + + # start fresh + comment = "{" + continue + + if ord("#") == c: + # comment line + state = 'COMMENT' + + # start fresh + comment = "#" + continue + + if (ord('\n') == c) or (ord('\r') == c): + # CR or LF, leftovers + return 1 + continue + + if state in ('COMMENT', 'JSON'): + # inside comment + if ord('\n') == c or ord('\r') == c: + # Got newline or linefeed + # terminate messages on <CR> or <LF> + # Done, got a full message + if gps.polystr('{"class":"ERROR"') in comment: + # always print gpsd errors + print(comment) + elif VERB_DECODE <= opts['verbosity']: + print(comment) + return consumed + else: + comment += chr(c) + continue + + if 'NMEA' == state: + # getting NMEA payload + if (ord('\n') == c) or (ord('\r') == c): + # CR or LF, done, got a full message + # terminates messages on <CR> or <LF> + if VERB_DECODE <= opts['verbosity']: + print(comment + '\n') + return consumed + else: + comment += chr(c) + continue + + if ord('b') == c and 'HEADER1' == state: + # got header 2 + state = 'HEADER2' + continue + + if 'HEADER2' == state: + # got class + state = 'CLASS' + m_class = c + m_raw.extend([c]) + continue + + if 'CLASS' == state: + # got ID + state = 'ID' + m_id = c + m_raw.extend([c]) + continue + + if 'ID' == state: + # got first length + state = 'LEN1' + m_len = c + m_raw.extend([c]) + continue + + if 'LEN1' == state: + # got second length + m_raw.extend([c]) + m_len += 256 * c + if 0 == m_len: + # no payload + state = 'CSUM1' + else: + state = 'PAYLOAD' + continue + + if 'PAYLOAD' == state: + # getting payload + m_raw.extend([c]) + m_payload.extend([c]) + if len(m_payload) == m_len: + state = 'CSUM1' + continue + + if 'CSUM1' == state: + # got ck_a + state = 'CSUM2' + m_ck_a = c + continue + + if 'CSUM2' == state: + # ck_b + state = 'BASE' + m_ck_b = c + # check checksum + chk = self.checksum(m_raw, len(m_raw)) + if (chk[0] != m_ck_a) or (chk[1] != m_ck_b): + sys.stderr.write("%s: ERROR checksum failed," + "was (%d,%d) s/b (%d, %d)\n" % + (PROG_NAME, m_ck_a, m_ck_b, + chk[0], chk[1])) + + s_payload = ''.join('{:02x} '.format(x) for x in m_payload) + x_payload = binascii.hexlify(m_payload) + + if m_class in self.classes: + this_class = self.classes[m_class] + if 'ids' in this_class: + if m_id in this_class['ids']: + if 'dec' in this_class['ids'][m_id]: + dec = this_class['ids'][m_id]['dec'] + s_payload = this_class['ids'][m_id]['name'] + s_payload += ':\n' + s_payload += dec(self, m_payload) + else: + s_payload = x_payload + + if VERB_INFO < opts['verbosity']: + print("%s, len: %#x" % + (self.class_id_s(m_class, m_id), m_len)) + if VERB_RAW < opts['verbosity']: + print("payload: %s" % x_payload) + print("%s\n" % s_payload) + return consumed + + # give up + state = 'BASE' + + # fell out of loop, no more chars to look at + return 0 + + def checksum(self, msg, m_len): + "Calculate u-blox message checksum" + # the checksum is calculated over the Message, starting and including + # the CLASS field, up until, but excluding, the Checksum Field: + + ck_a = 0 + ck_b = 0 + for c in msg[0:m_len]: + ck_a += c + ck_b += ck_a + + return [ck_a & 0xff, ck_b & 0xff] + + def make_pkt(self, m_class, m_id, m_data): + "Make a message packet" + # always little endian, leader, class, id, length + m_len = len(m_data) + + # build core message + msg = bytearray(m_len + 6) + struct.pack_into('<BBH', msg, 0, m_class, m_id, m_len) + + # copy payload into message buffer + i = 0 + while i < m_len: + msg[i + 4] = m_data[i] + i += 1 + + # add checksum + chk = self.checksum(msg, m_len + 4) + m_chk = bytearray(2) + struct.pack_into('<BB', m_chk, 0, chk[0], chk[1]) + + header = b"\xb5\x62" + return header + msg[:m_len+4] + m_chk + + def gps_send(self, m_class, m_id, m_data): + "Build, and send, a message to GPS" + m_all = self.make_pkt(m_class, m_id, m_data) + self.gps_send_raw(m_all) + + def gps_send_raw(self, m_all): + "Send a raw message to GPS" + if not opts['read_only']: + io_handle.ser.write(m_all) + if VERB_QUIET < opts['verbosity']: + sys.stdout.write("sent:\n") + if VERB_INFO < opts['verbosity']: + sys.stdout.write(binascii.hexlify(m_all)) + sys.stdout.write("\n") + self.decode_msg(m_all) + sys.stdout.flush() + + def send_able_beidou(self, able): + "dis/enable BeiDou" + # Two frequency GPS use BeiDou or GLONASS + # disable, then enable + gps_model.send_cfg_gnss1(3, able) + + def send_able_binary(self, able): + "dis/enable basic binary messages" + + rate = 1 if able else 0 + + # UBX-NAV-DOP + m_data = bytearray([0x01, 0x04, rate]) + gps_model.gps_send(6, 1, m_data) + + # UBX-NAV-SOL is ECEF. deprecated, use UBX-NAV-PVT instead? + m_data = bytearray([0x01, 0x06, rate]) + gps_model.gps_send(6, 1, m_data) + + # UBX-NAV-TIMEGPS + m_data = bytearray([0x01, 0x20, rate]) + gps_model.gps_send(6, 1, m_data) + + # UBX-NAV-SBAS + m_data = bytearray([0x01, 0x32, 10]) + gps_model.gps_send(6, 1, m_data) + + # UBX-NAV-SVINFO + m_data = bytearray([0x01, 0x30, 10]) + gps_model.gps_send(6, 1, m_data) + + def send_able_ecef(self, able): + "Enable ECEF messages" + # set NAV-POSECEF rate + gps_model.send_cfg_msg(1, 1, able) + # set NAV-VELECEF rate + gps_model.send_cfg_msg(1, 0x11, able) + + def send_able_gps(self, able): + "dis/enable GPS/QZSS" + # GPS and QZSS both on, or both off, together + # GPS + gps_model.send_cfg_gnss1(0, able) + # QZSS + gps_model.send_cfg_gnss1(5, able) + + def send_able_galileo(self, able): + "dis/enable GALILEO" + gps_model.send_cfg_gnss1(2, able) + + def send_able_glonass(self, able): + "dis/enable GLONASS" + # Two frequency GPS use BeiDou or GLONASS + # disable, then enable + gps_model.send_cfg_gnss1(6, able) + + def send_able_nmea(self, able): + "dis/enable basic NMEA messages" + + rate = 1 if able else 0 + + # xxGBS + m_data = bytearray([0xf0, 0x09, rate]) + gps_model.gps_send(6, 1, m_data) + + # xxGGA + m_data = bytearray([0xf0, 0x00, rate]) + gps_model.gps_send(6, 1, m_data) + + # xxGGL + m_data = bytearray([0xf0, 0x01, rate]) + gps_model.gps_send(6, 1, m_data) + + # xxGSA + m_data = bytearray([0xf0, 0x02, rate]) + gps_model.gps_send(6, 1, m_data) + + # xxGST + m_data = bytearray([0xf0, 0x07, rate]) + gps_model.gps_send(6, 1, m_data) + + # xxGSV + m_data = bytearray([0xf0, 0x03, rate]) + gps_model.gps_send(6, 1, m_data) + + # xxRMC + m_data = bytearray([0xf0, 0x04, rate]) + gps_model.gps_send(6, 1, m_data) + + # xxVTG + m_data = bytearray([0xf0, 0x05, rate]) + gps_model.gps_send(6, 1, m_data) + + # xxZDA + m_data = bytearray([0xf0, 0x08, rate]) + gps_model.gps_send(6, 1, m_data) + + def send_able_rawx(self, able): + "dis/enable UBX-RXM-RAWX" + + rate = 1 if able else 0 + m_data = bytearray([0x2, 0x15, rate]) + gps_model.gps_send(6, 1, m_data) + + def send_able_sbas(self, able): + "dis/enable SBAS" + gps_model.send_cfg_gnss1(1, able) + + def send_able_tmode2(self, able): + "UBX-CFG-TMODE2, set time mode 2 config" + + m_data = bytearray(28) + if able: + # enable survey-in + m_data[0] = 1 + + # Survey-in minimum duration seconds 86400 + seconds = 86400 + m_data[20] = seconds & 0x0ff + seconds >>= 8 + m_data[21] = seconds & 0x0ff + seconds >>= 8 + m_data[22] = seconds & 0x0ff + seconds >>= 8 + m_data[23] = seconds & 0x0ff + + # Survey-in position accuracy limit 500 mm + mmeters = 500 + m_data[24] = mmeters & 0x0ff + mmeters >>= 8 + m_data[25] = mmeters & 0x0ff + mmeters >>= 8 + m_data[26] = seconds & 0x0ff + seconds >>= 8 + m_data[27] = mmeters & 0x0ff + gps_model.gps_send(6, 0x3d, m_data) + + def send_able_tp(self, able): + "dis/enable UBX-TIM-TP Time Pulse" + rate = 1 if able else 0 + m_data = bytearray([0xd, 0x1, rate]) + gps_model.gps_send(6, 1, m_data) + + def send_cfg_ant(self): + "UBX-CFG-ANT, Get Antenna Control Settings" + + m_data = bytearray(0) + gps_model.gps_send(6, 0x13, m_data) + + def send_cfg_cfg(self, save_clear): + "UBX-CFG-CFG, save config" + + # Save: save_clear = 0 + # Clear: save_clear = 1 + + # basic configs alays available to change: + # ioPort, msgConf, infMsg, navConf, rxmConf, senConf + cfg1 = 0x9f + # rinvConf, antConf, logConf + cfg2 = 0x03 + + m_data = bytearray(13) + + # clear + if 0 == save_clear: + # saving + m_data[0] = 0 + m_data[1] = 0 + else: + # clearing + m_data[0] = cfg1 + m_data[1] = cfg2 + m_data[2] = 0 # + m_data[3] = 0 # + + # save + if 0 == type: + # saving + m_data[4] = cfg1 + m_data[5] = cfg2 + else: + # clearing + m_data[4] = 0 + m_data[5] = 0 + m_data[6] = 0 # + m_data[7] = 0 # + + # load + if False and 0 == type: + # saving + m_data[8] = 0 + m_data[9] = 0 + else: + # clearing, load it to save a reboot + m_data[8] = cfg1 + m_data[9] = cfg2 + m_data[10] = 0 # + m_data[11] = 0 # + + # deviceMask, where to save it, try all options + m_data[12] = 0x17 # devBBR, devFLASH devEEPROM, devSpiFlash + + gps_model.gps_send(6, 0x9, m_data) + + def send_cfg_gnss1(self, gnssId, enable): + "UBX-CFG-GNSS, set GNSS config" + m_data = bytearray(12) + m_data[0] = 0 # version 0, msgVer + m_data[1] = 0 # read only, numTrkChHw + m_data[2] = 0xFF # read only, numTrkChUse + m_data[3] = 1 # 1 block follows + # block 1 + m_data[4] = gnssId # gnssId + if 0 == gnssId: + # GPS + m_data[5] = 8 # resTrkCh + m_data[6] = 16 # maxTrkCh + if 1 == gnssId: + # SBAS + m_data[5] = 1 # resTrkCh + m_data[6] = 3 # maxTrkCh + if 2 == gnssId: + # GALILEO + m_data[5] = 4 # resTrkCh + m_data[6] = 8 # maxTrkCh + if 3 == gnssId: + # BeiDou + m_data[5] = 2 # resTrkCh + m_data[6] = 16 # maxTrkCh + if 4 == gnssId: + # IMES + m_data[5] = 0 # resTrkCh + m_data[6] = 8 # maxTrkCh + if 5 == gnssId: + # QZSS + m_data[5] = 0 # resTrkCh + m_data[6] = 3 # maxTrkCh + if 6 == gnssId: + # GLONASS + m_data[5] = 8 # resTrkCh + m_data[6] = 14 # maxTrkCh + m_data[7] = 0 # reserved1 + m_data[8] = enable # flags + m_data[9] = 0 # flagflags, unused + if 5 == gnssId: + # QZSS + m_data[10] = 5 # flags E1OS, L1SAIF + else: + m_data[10] = 1 # flags E1OS + m_data[11] = 1 # flags, unused + gps_model.gps_send(6, 0x3e, m_data) + + def send_cfg_gnss(self): + "UBX-CFG-GNSS, set Galileo config decode" + m_data = bytearray(0) + gps_model.gps_send(6, 0x3e, m_data) + + def send_cfg_nav5_model(self): + "UBX-CFG-NAV5, set dynamic platform model" + + m_data = bytearray(36) + m_data[0] = 1 # just setting dynamic model + m_data[1] = 0 # just setting dynamic model + m_data[2] = opts["mode"] + + gps_model.gps_send(6, 0x24, m_data) + + def send_cfg_msg(self, m_class, m_id, rate=None): + "UBX-CFG-MSG, poll, or set, message rates decode" + m_data = bytearray(2) + m_data[0] = m_class + m_data[1] = m_id + if rate is not None: + m_data.extend([rate]) + gps_model.gps_send(6, 1, m_data) + + def send_cfg_nav5(self): + "UBX-CFG-NAV5, get Nav Engine Settings" + m_data = bytearray(0) + gps_model.gps_send(6, 0x24, m_data) + + def send_cfg_pms(self): + "UBX-CFG-PMS, Get Power Management Settings" + + if opts["mode"] is not None: + m_data = bytearray(8) + # set powerSetupValue to mode + m_data[1] = opts["mode"] + # leave period and onTime zero, which breaks powerSetupValue = 3 + else: + m_data = bytearray(0) + + gps_model.gps_send(6, 0x86, m_data) + + def send_cfg_prt(self): + "UBX-CFG-PRT, get I/O Port" + m_data = bytearray(0) + gps_model.gps_send(6, 0x0, m_data) + + def send_set_speed(self, speed): + "UBX-CFG-PRT, set port" + + # FIXME! Poll current masks, then adjust speed + m_data = bytearray(20) + m_data[0] = 1 # port 1, UART 1 + # m_data[0] = 3 # port 3, USB + m_data[4] = 0xc0 # 8N1 + m_data[5] = 0x8 # 8N1 + + m_data[8] = speed & 0xff + m_data[9] = (speed >> 8) & 0xff + m_data[10] = (speed >> 16) & 0xff + m_data[11] = (speed >> 24) & 0xff + + m_data[12] = 3 # in, ubx and nmea + m_data[14] = 3 # out, ubx and nmea + gps_model.gps_send(6, 0, m_data) + + def send_cfg_rst(self, reset_type): + "UBX-CFG-RST, reset" + # always do a hardware reset + # if on USB, this will disconnect and reconnect, giving you + # a new tty. + m_data = bytearray(4) + m_data[0] = reset_type & 0xff + m_data[1] = (reset_type >> 8) & 0xff + gps_model.gps_send(6, 0x4, m_data) + + def send_cfg_tmode2(self): + "UBX-CFG-TMODE2, get time mode 2 configuration" + m_data = bytearray(0) + gps_model.gps_send(6, 0x3d, m_data) + + def send_cfg_tp5(self): + "UBX-CFG-TP5, get time0 decodes, timepulse 0 and 1" + m_data = bytearray(0) + gps_model.gps_send(6, 0x31, m_data) + # and timepulse 1 + m_data = bytearray(1) + m_data[0] = 1 + gps_model.gps_send(6, 0x31, m_data) + + def send_cfg_usb(self): + "UBX-CFG-USB, get USB configuration" + m_data = bytearray(0) + gps_model.gps_send(6, 0x1b, m_data) + + def send_mon_ver(self): + "UBX-MON-VER get versions" + m_data = bytearray(0) + gps_model.gps_send(0x0a, 0x04, m_data) + + def send_nav_posecef(self): + "UBX-NAV-POSECEF, poll ECEF position" + m_data = bytearray(0) + gps_model.gps_send(1, 1, m_data) + + def send_nav_velecef(self): + "UBX-NAV-VELECEF, poll ECEF velocity decode" + m_data = bytearray(0) + gps_model.gps_send(1, 0x11, m_data) + + def send_tim_svin(self): + "UBX-TIM-SVIN, get survey in data" + m_data = bytearray(0) + gps_model.gps_send(0x0d, 0x04, m_data) + + def send_tim_tp(self): + "UBX-TIM-TP, get time pulse timedata" + m_data = bytearray(0) + gps_model.gps_send(0x0d, 0x01, m_data) + + able_commands = { + # en/dis able BeiDou + "BEIDOU": {"command": send_able_beidou, + "help": "BeiDou"}, + # en/dis able basic binary messages + "BINARY": {"command": send_able_binary, + "help": "basic binary messages"}, + # en/dis able ECEF + "ECEF": {"command": send_able_ecef, + "help": "ECEF"}, + # en/dis able GPS + "GPS": {"command": send_able_gps, + "help": "GPS and QZSS"}, + # en/dis able GALILEO + "GALILEO": {"command": send_able_galileo, + "help": "GALILEO"}, + # en/dis able GLONASS + "GLONASS": {"command": send_able_glonass, + "help": "GLONASS"}, + # en/dis able basic NMEA messages + "NMEA": {"command": send_able_nmea, + "help": "basic NMEA messages"}, + # en/dis able RAWX + "RAWX": {"command": send_able_rawx, + "help": "RAWX measurements"}, + # en/dis able SBAS + "SBAS": {"command": send_able_sbas, + "help": "SBAS"}, + # en/dis able TP time pulse message + "TP": {"command": send_able_tp, + "help": "TP Time Pulse message"}, + # en/dis able TMODE2 Survey-in + "TMODE2": {"command": send_able_tmode2, + "help": "TMODE2"}, + } + commands = { + # UBX-CFG-ANT, poll antenna config decode + "ANT": {"command": send_cfg_ant, + "help": "UBX-CFG-ANT poll antenna config"}, + # UBX-CFG-RST cold boot + "COLDBOOT": {"command": send_cfg_rst, + "help": "UBS-CFG-RST coldboot the GPS", + "opt": 0xfff}, + # UBX-CFG-GNSS poll gnss config + "GNSS": {"command": send_cfg_gnss, + "help": "UBX-CFG-GNSS poll GNSS config"}, + # UBX-CFG-RST hot boot + "HOTBOOT": {"command": send_cfg_rst, + "help": "UBX-CFG-RST hotboot the GPS", + "opt": 0}, + # UBX-CFG-NAV5 set Dynamic Platform Model + "MODEL": {"command": send_cfg_nav5_model, + "help": "UBX-CFG-NAV5 set Dynamic Platform Model"}, + # UBX-CFG-NAV5, poll Nav Engine Settings + "NAV5": {"command": send_cfg_nav5, + "help": "UBX-CFG-NAV5 poll Nav Engines settings"}, + # UBX-CFG-PMS, poll power management settings + "PMS": {"command": send_cfg_pms, + "help": "UBX-CFG-PMS poll power management settings"}, + # UBX-CFG-PRT, poll I/O port number + "PRT": {"command": send_cfg_prt, + "help": "UBX-CFG-PRT poll I/O port settings"}, + # UBX-CFG-CFG reset config + "RESET": {"command": send_cfg_cfg, + "help": "UBX-CFG-CFG reset config to defaults", + "opt": 1}, + # UBX-CFG-CFG save config + "SAVE": {"command": send_cfg_cfg, + "help": "UBX-CFG-CFG save current config", + "opt": 0}, + # UBX-TIM-SVIN, get survey in data + "SVIN": {"command": send_tim_svin, + "help": "UBX-TIM-SVIN get survey in data"}, + # UBX-CFG-TMODE2, get time mode 2 config + "TMODE2": {"command": send_cfg_tmode2, + "help": "UBX-CFG-TMODE2 get time mode 2 config"}, + # UBX-TIM-TP, get time pulse timedata + "TP": {"command": send_tim_tp, + "help": "UBX-TIM-TP get time pulse timedata"}, + # UBX-CFG-TP5, get time0 decodes + "TP5": {"command": send_cfg_tp5, + "help": "UBX-TIM-TP5 get time pulse decodes"}, + # UBX-CFG-RST warm boot + "WARMBOOT": {"command": send_cfg_rst, + "help": "UBX-CFG-RST warmboot the GPS", + "opt": 1}, + # poll UBX-CFG-USB + "USB": {"command": send_cfg_usb, + "help": "UBX-CFG-USB get USB config"}, + # poll UBX-MON-VER + "VER": {"command": send_mon_ver, + "help": "UBX-MON-VER get GPS version"}, + } + # end class ubx + + +class gps_io(object): + """All the GPS I/O in one place" + + Three types of GPS I/O + 1. read only from a file + 2. read/write through a device + 3. read only from a gpsd instance + """ + + out = b'' + ser = None + input_is_device = False + + def __init__(self, serial_class): + "Initialize class" + + Serial = serial_class + # buffer to hold read data + self.out = b'' + + # open the input: device, file, or gpsd + if opts['input_file_name'] is not None: + # check if input file is a file or device + try: + mode = os.stat(opts['input_file_name']).st_mode + except OSError: + sys.stderr.write('%s: failed to open input file %s\n' % + (PROG_NAME, opts['input_file_name'])) + sys.exit(1) + + if stat.S_ISCHR(mode): + # character device, need not be read only + self.input_is_device = True + + if ((opts['disable'] or opts['enable'] or opts['poll'] or + opts['oaf_name'])): + + # check that we can write + if opts['read_only']: + sys.stderr.write('%s: read-only mode, ' + 'can not send commands\n' % PROG_NAME) + sys.exit(1) + if self.input_is_device is False: + sys.stderr.write('%s: input is plain file, ' + 'can not send commands\n' % PROG_NAME) + sys.exit(1) + + if opts['target']['server'] is not None: + # try to open local gpsd + try: + self.ser = gps.gpscommon(host=None) + self.ser.connect(opts['target']['server'], + opts['target']['port']) + + # alias self.ser.write() to self.write_gpsd() + self.ser.write = self.write_gpsd + + # ask for raw, not rare, data + data_out = b'?WATCH={' + if opts['target']['device'] is not None: + # add in the requested device + data_out += (b'"device":"' + opts['target']['device'] + + b'",') + data_out += b'"enable":true,"raw":2}\r\n' + if VERB_RAW <= opts['verbosity']: + print("sent: ", data_out) + self.ser.send(data_out) + except socket.error as err: + sys.stderr.write('%s: failed to connect to gpsd %s\n' % + (PROG_NAME, err)) + sys.exit(1) + + elif self.input_is_device: + # configure the serial connections (the parameters refer to + # the device you are connecting to) + + try: + self.ser = Serial.Serial( + baudrate=opts['input_speed'], + # 8N1 is GREIS default + bytesize=Serial.EIGHTBITS, + parity=Serial.PARITY_NONE, + port=opts['input_file_name'], + stopbits=Serial.STOPBITS_ONE, + # read timeout + timeout=0.05, + # pyserial Ver 3.0+ changes writeTimeout to write_timeout + # just set both + write_timeout=0.5, + writeTimeout=0.5, + ) + except Serial.serialutil.SerialException: + # this exception happens on bad serial port device name + sys.stderr.write('%s: failed to open serial port "%s"\n' + '%s: Your computer has the serial ports:\n' % + (PROG_NAME, opts['input_file_name'], + PROG_NAME)) + + # print out list of supported ports + import serial.tools.list_ports as List_Ports + ports = List_Ports.comports() + for port in ports: + sys.stderr.write(" %s: %s\n" % + (port.device, port.description)) + sys.exit(1) + + # flush input buffer, discarding all its contents + self.ser.flushInput() + + else: + # Read from a plain file of GREIS messages + try: + self.ser = open(opts['input_file_name'], 'rb') + except IOError: + sys.stderr.write('%s: failed to open input %s\n' % + (PROG_NAME, opts['input_file_name'])) + sys.exit(1) + + def read(self, read_opts): + "Read from device, until timeout or expected message" + + # are we expecting a certain message? + if gps_model.expect_statement_identifier: + # assume failure, until we see expected message + ret_code = 1 + else: + # not expecting anything, so OK if we did not see it. + ret_code = 0 + + try: + if read_opts['target']['server'] is not None: + # gpsd input + start = time.clock() + while read_opts['input_wait'] > (time.clock() - start): + # First priority is to be sure the input buffer is read. + # This is to prevent input buffer overuns + if 0 < self.ser.waiting(): + # We have serial input waiting, get it + # No timeout possible + # RTCM3 JSON can be over 4.4k long, so go big + new_out = self.ser.sock.recv(8192) + if raw is not None: + # save to raw file + raw.write(new_out) + self.out += new_out + + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if ((gps_model.expect_statement_identifier and + (gps_model.expect_statement_identifier == + gps_model.last_statement_identifier))): + # Got what we were waiting for. Done? + ret_code = 0 + if not read_opts['input_forced_wait']: + # Done + break + + elif self.input_is_device: + # input is a serial device + start = time.clock() + while read_opts['input_wait'] > (time.clock() - start): + # First priority is to be sure the input buffer is read. + # This is to prevent input buffer overuns + # pyserial 3.0 replaces inWaiting() with in_waiting + if 0 < self.ser.inWaiting(): + # We have serial input waiting, get it + # 1024 is comfortably large, almost always the + # Read timeout is what causes ser.read() to return + new_out = self.ser.read(1024) + if raw is not None: + # save to raw file + raw.write(new_out) + self.out += new_out + + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if ((gps_model.expect_statement_identifier and + (gps_model.expect_statement_identifier == + gps_model.last_statement_identifier))): + # Got what we were waiting for. Done? + ret_code = 0 + if not read_opts['input_forced_wait']: + # Done + break + else: + # ordinary file, so all read at once + self.out += self.ser.read() + if raw is not None: + # save to raw file + raw.write(self.out) + + while True: + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if 0 >= consumed: + break + + except IOError: + # This happens on a good device name, but gpsd already running. + # or if USB device unplugged + sys.stderr.write('%s: failed to read %s\n' + '%s: Is gpsd already holding the port?\n' + % (PROG_NAME, read_opts['input_file_name'], + PROG_NAME)) + return 1 + + if 0 < ret_code: + # did not see the message we were expecting to see + sys.stderr.write('%s: waited %0.2f seconds for, ' + 'but did not get: %%%s%%\n' + % (PROG_NAME, read_opts['input_wait'], + gps_model.expect_statement_identifier)) + return ret_code + + def write_gpsd(self, data): + "write data to gpsd daemon" + + # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. + # Input data is binary, converting to hex doubles its size. + # Limit binary data to length 255, so hex data length less than 510. + if 255 < len(data): + sys.stderr.write('%s: trying to send %d bytes, max is 255\n' + % (PROG_NAME, len(data))) + return 1 + + if opts['target']['device'] is not None: + # add in the requested device + data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' + else: + data_out = b'?DEVICE={' + + # Convert binary data to hex and build the message. + data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' + if VERB_RAW <= opts['verbosity']: + print("sent: ", data_out) + self.ser.send(data_out) + return 0 + + +# instantiate the GPS class +gps_model = ubx() + + +def usage(): + "Ouput usage information, and exit" + print('usage: %s [-c C] [-f F] [-r] [-p P] [-s S] [-v V]\n' + ' [-hV?] [-S S]\n' + ' -c C send raw command C to GPS\n' + ' -d D disable D\n' + ' -e E enable E\n' + ' -f F open F as file/device\n' + ' default: %s\n' + ' -h print help\n' + ' -m M optional mode to -p P\n' + ' -p P send a prepackaged query P to GPS\n' + ' -R R save raw data from GPS in file R\n' + ' default: %s\n' + ' -r open file/device read only\n' + ' -S S set GPS speed to S\n' + ' -s S set port speed to S\n' + ' default: %s bps\n' + ' -w wait time\n' + ' default: %s seconds\n' + ' -V print version\n' + ' -v V Set verbosity level to V, 0 to 4\n' + ' default: %s\n' + ' -? print help\n' + '\n' + 'D and E can be one of:' % + (PROG_NAME, opts['input_file_name'], opts['raw_file'], + opts['input_speed'], opts['input_wait'], opts['verbosity']) + ) + + for item in sorted(gps_model.able_commands.keys()): + print(" %-12s %s" % (item, gps_model.able_commands[item]["help"])) + + print('\nP can be one of:') + for item in sorted(gps_model.commands.keys()): + print(" %-12s %s" % (item, gps_model.commands[item]["help"])) + print('\nOptions can be placed in the UBXOPTS environment variable.\n' + 'UBXOPTS is processed before the CLI options.') + sys.exit(0) + + +if 'UBXOPTS' in os.environ: + # grab the UBXOPTS environment variable for options + opts['progopts'] = os.environ['UBXOPTS'] + options = opts['progopts'].split(' ') + sys.argv[1:] +else: + options = sys.argv[1:] + + +try: + (options, arguments) = getopt.getopt(options, "?c:d:e:f:hm:rp:s:w:v:R:S:V") +except getopt.GetoptError as err: + sys.stderr.write("%s: %s\n" + "Try '%s -h' for more information.\n" % + (PROG_NAME, str(err), PROG_NAME)) + sys.exit(2) + +for (opt, val) in options: + if opt == '-c': + opts['command'] = val + elif opt == '-d': + opts['disable'] = val + elif opt == '-e': + opts['enable'] = val + elif opt == '-f': + opts['input_file_name'] = val + elif opt == '-h' or opt == '-?': + usage() + elif opt == '-m': + opts['mode'] = int(val) + elif opt == '-p': + opts['poll'] = val + elif opt == '-r': + opts['read_only'] = True + elif opt == '-s': + opts['input_speed'] = int(val) + if opts['input_speed'] not in gps_model.speeds: + sys.stderr.write('%s: -s invalid speed %s\n' % + (PROG_NAME, opts['input_speed'])) + sys.exit(1) + + elif opt == '-w': + opts['input_wait'] = float(val) + elif opt in '-v': + opts['verbosity'] = int(val) + elif opt in '-R': + # raw log file + opts['raw_file'] = val + elif opt in '-S': + opts['set_speed'] = int(val) + if opts['set_speed'] not in gps_model.speeds: + sys.stderr.write('%s: -S invalid speed %s\n' % + (PROG_NAME, opts['set_speed'])) + sys.exit(1) + + elif opt == '-V': + # version + sys.stderr.write('%s: Version %s\n' % (PROG_NAME, gps_version)) + sys.exit(0) + +if opts['input_file_name'] is None: + # no input file given + # default to local gpsd + opts['target']['server'] = "localhost" + opts['target']['port'] = gps.GPSD_PORT + opts['target']['device'] = None + if arguments: + # server[:port[:device]] + parts = arguments[0].split(':') + opts['target']['server'] = parts[0] + if 1 < len(parts): + opts['target']['port'] = parts[1] + if 2 < len(parts): + opts['target']['device'] = parts[2] + +elif arguments: + sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) + sys.exit(1) + +if VERB_PROG <= opts['verbosity']: + # dump all options + print('Options:') + for option in sorted(opts): + print(" %s: %s" % (option, opts[option])) + +# done parsing arguments from environment and CLI + +try: + # raw log file requested? + raw = None + if opts['raw_file']: + try: + raw = open(opts['raw_file'], 'w') + except IOError: + sys.stderr.write('%s: failed to open raw file %s\n' % + (PROG_NAME, opts['raw_file'])) + sys.exit(1) + + # create the I/O instance + io_handle = gps_io(serial) + + sys.stdout.flush() + + if opts['disable'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) + if opts['disable'] in gps_model.able_commands: + command = gps_model.able_commands[opts['disable']] + command["command"](gps, 0) + else: + sys.stderr.write('%s: disable %s not found\n' % + (PROG_NAME, opts['disable'])) + sys.exit(1) + + elif opts['enable'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) + if opts['enable'] in gps_model.able_commands: + command = gps_model.able_commands[opts['enable']] + command["command"](gps, 1) + else: + sys.stderr.write('%s: enable %s not found\n' % + (PROG_NAME, opts['enable'])) + sys.exit(1) + + elif opts['poll'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) + + if 'MODEL' == opts["poll"]: + if opts["mode"] is None: + opts["mode"] = 0 # default to portable model + + if opts['poll'] in gps_model.commands: + command = gps_model.commands[opts['poll']] + if 'opt' in command: + command["command"](gps, command["opt"]) + else: + command["command"](gps) + else: + sys.stderr.write('%s: poll %s not found\n' % + (PROG_NAME, opts['poll'])) + sys.exit(1) + + elif opts['set_speed'] is not None: + gps_model.send_set_speed(opts['set_speed']) + + elif opts['command'] is not None: + # zero length is OK to send + # add trailing new line + opts['command'] += "\n" + + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) + gps_model.gps_send_raw(opts['command']) + + exit_code = io_handle.read(opts) + + if ((VERB_RAW <= opts['verbosity']) and io_handle.out): + # dump raw left overs + print("Left over data:") + print(io_handle.out) + + sys.stdout.flush() + io_handle.ser.close() + +except KeyboardInterrupt: + print('') + exit_code = 1 + +sys.exit(exit_code) |