#!/usr/bin/env python # -*- coding: UTF-8 ''' ubxtool -- u-blox configurator and packet decoder usage: ubxtool [OPTIONS] [server[:port[:device]]] ''' # This file is Copyright (c) 2018 by the GPSD project # BSD terms apply: see the file COPYING in the distribution root for details. # # This code runs compatibly under Python 2 and 3.x for x >= 2. # Preserve this property! # # ENVIRONMENT: # Options in the UBXOPTS environment variable will be parsed before # the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' # # To see what constellations are enabled: # ubxtool -p GNSS -f /dev/ttyXX # # To disable GALILEO and enable GALILEO: # ubxtool -d GLONASS -f /dev/ttyXX # ubxtool -e GALILEO -f /dev/ttyXX # # To read GPS messages a log file: # ubxtool -v 2 -f test/daemon/ublox-neo-m8n.log # # References: # [1] IS-GPS-200J from __future__ import absolute_import, print_function, division import binascii # for binascii.hexlify() from functools import reduce # pylint: disable=redefined-builtin import getopt # for getopt.getopt(), to parse CLI options import operator # for or_ import os # for os.environ import socket # for socket.error import stat # for stat.S_ISBLK() import struct # for pack() import sys import time PROG_NAME = 'ubxtool' try: import serial except ImportError: serial = None # Defer complaining until we know we need it. try: import gps except ImportError: # PEP8 says local imports last sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % PROG_NAME) sys.exit(2) gps_version = '3.19-dev' if gps.__version__ != gps_version: sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % (PROG_NAME, gps_version, gps.__version__)) sys.exit(1) VERB_QUIET = 0 # quiet VERB_NONE = 1 # just output requested data and some info VERB_DECODE = 2 # decode all messages VERB_INFO = 3 # more info VERB_RAW = 4 # raw info VERB_PROG = 5 # program trace # dictionary to hold all user options opts = { # command to send to GPS, -c 'command': None, # default -x items, up to 64 per call 'del_item': [], # command for -d disable 'disable': None, # command for -e enable 'enable': None, # help requested 'help': None, # default input -f file 'input_file_name': None, # default -g items, up to 64 per call 'get_item': [], # default forced wait? -W 'input_forced_wait': False, # default port speed -s 'input_speed': 9600, # default input wait time -w in seconds 'input_wait': 2.0, # interface for port-related commands 'interface': None, # optional mode to -p P 'mode': None, # the name of an OAF file, extension .jpo 'oaf_name': None, # poll command -p 'poll': None, # protocol version for sent commands # u-blox 5, firmware 4 to 6 is protver 10 to 12 # u-blox 6, firmware 6 to 7 is protver 12 to 13 # u-blox 6, firmware 1 is protver 14 # u-blox 7, firmware 1 is protver 14 # u-blox 8, is protver 15 to 23 # u-blox 9, firmware 1 is protver 27 # u-blox F9T, firmware 2 is protver 29 'protver': 10, # raw log file name 'raw_file': None, # open port read only -r 'read_only': False, # default -z item 'set_item': [], # speed to set GPS -S 'set_speed': None, # target gpsd (server:port:device) to connect to 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, # verbosity level, -v 'verbosity': VERB_NONE, # contents of environment variable UBXOPTS 'progopts': '', } # I'd like to use pypy module bitstring or bitarray, but # people complain when non stock python modules are used here. def unpack_s11(word, pos): """Grab a signed 11 bits from offset pos of word""" bytes = bytearray(2) bytes[0] = (word >> pos) & 0xff bytes[1] = (word >> (pos + 8)) & 0x07 if 0x04 & bytes[1]: # extend the sign bytes[1] |= 0xf8 u = struct.unpack_from('> 22) & 0xff newword <<= 3 newword |= (word >> 8) & 0x07 return unpack_s11(newword, 0) def unpack_s14(word, pos): """Grab a signed 14 bits from offset pos of word""" bytes = bytearray(2) bytes[0] = (word >> pos) & 0xff bytes[1] = (word >> (pos + 8)) & 0x3f if 0x20 & bytes[1]: # extend the sign bytes[1] |= 0xc0 u = struct.unpack_from('> pos) & 0xff bytes[1] = (word >> (pos + 8)) & 0xff u = struct.unpack_from('> pos) & 0xff bytes[1] = (word >> (pos + 8)) & 0xff u = struct.unpack_from('> pos) & 0xff bytes[1] = (word >> (pos + 8)) & 0xff bytes[2] = (word >> (pos + 16)) & 0x3f bytes[3] = 0 if 0x20 & bytes[2]: # extend the sign bytes[2] |= 0xc0 bytes[3] = 0xff u = struct.unpack_from('> pos) & 0xff bytes[1] = (word >> (pos + 8)) & 0xff bytes[2] = (word >> (pos + 16)) & 0xff bytes[3] = 0 if 0x80 & bytes[2]: # extend the sign bytes[3] = 0xff u = struct.unpack_from('> pos) & 0xff bytes[1] = (word >> (pos + 8)) & 0xff bytes[2] = (word >> (pos + 16)) & 0xff bytes[3] = 0 u = struct.unpack_from('> 6) & 0xff bytes[1] = (word >> 14) & 0xff bytes[2] = (word >> 22) & 0xff bytes[3] = (word1 >> 6) & 0xff u = struct.unpack_from('> 6) & 0xff bytes[1] = (word >> 14) & 0xff bytes[2] = (word >> 22) & 0xff bytes[3] = (word1 >> 6) & 0xff u = struct.unpack_from('> pos) & 0xff u = struct.unpack_from('> pos) & 0xff u = struct.unpack_from('> 28) & 0x07 cfg_type = key_map[key_size] return cfg_type def cfg_by_key(self, key): """Find a config item by key""" for item in self.cfgs: if item[1] == key: return item # not found, build a fake item, guess on decode name = "CFG-%u-%u" % ((key >> 16) & 0xff, ket & 0xff) map = {0: "Z0", 1: "L", 2: "U1", 3: "U2", 4: "U4", 5: "U8", 6: "Z6", 7: "Z7", } size = (key >> 28) & 0x07 item = (name, key, map[size], 1, "Unk", "Unknown") return item def cfg_by_name(self, name): """Find a config item by name""" for item in self.cfgs: if item[0] == name: return item return None id_map = { 0: {"name": "GPS", "sig": {0: "L1C/A", 3: "L2 CL", 4: "L2 CM"}}, 1: {"name": "SBAS", "sig": {0: "L1C/A", 3: "L2 CL", 4: "L2 CM"}}, 2: {"name": "Galileo", "sig": {0: "E1C", 1: "E1 B", 5: "E5 bl", 6: "E5 bQ"}}, 3: {"name": "BeiDou", "sig": {0: "B1I D1", 1: "B1I D2", 2: "B2I D1", 3: "B2I D2"}}, 4: {"name": "IMES", "sig": {0: "L1C/A", 3: "L2 CL", 4: "L2 CM"}}, 5: {"name": "QZSS", "sig": {0: "L1C/A", 4: "L2 CM", 5: "L2 CL"}}, 6: {"name": "GLONASS", "sig": {0: "L1 OF", 2: "L2 OF"}}, } def gnss_s(self, gnssId, svId, sigId): """Verbose decode of gnssId, svId and sigId""" s = '' if gnssId in self.id_map: if "name" not in self.id_map[gnssId]: s = "%d PRN %d sigId %d" % (gnssId, svId, sigId) elif sigId not in self.id_map[gnssId]["sig"]: s = ("%s PRN %d sigId %d" % (self.id_map[gnssId]["name"], svId, sigId)) else: s = ("%s PRN %d sigId %s" % (self.id_map[gnssId]["name"], svId, self.id_map[gnssId]["sig"][sigId])) else: s = "%d PRN %d sigId %d" % (gnssId, svId, sigId) return s def ack_ack(self, buf): """UBX-ACK-ACK decode""" m_len = len(buf) if 2 > m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from('> 5) & 0x1f, (u[1] >> 10) & 0x1f, u[1] >> 15)) if VERB_DECODE <= opts['verbosity']: s += ('\n flags (%s)' % flag_s(u[0], self.cfg_ant_pins)) return s cfg_cfg_mask = { 0x1: 'ioPort', 0x2: 'msgConf', 0x4: 'infMsg', 0x8: 'navConf', 0x10: 'rxmConf', 0x100: 'senConf', # not on M8030 0x200: 'rinvConf', 0x400: 'antConf', 0x800: 'logConf', 0x1000: 'ftsConf', } cfg_cfg_dev = { 0x1: 'devBBR', 0x2: 'devFlash', 0x4: 'devEEPROM', 0x10: 'devSpiFlash', } def cfg_cfg(self, buf): """UBX-CFG-CFG decode""" m_len = len(buf) if 12 > m_len: return "Bad Length %s" % m_len if 12 == m_len: u = struct.unpack_from(' m_len: return "Bad Length %d" % m_len u = struct.unpack_from(' m_len: return "Bad Length %d" % m_len u = struct.unpack_from(' sat: s += (" %s %s " % (index_s(sat, self.gnss_id), flag_s(u[4], self.cfg_gnss_sig[sat]))) else: s += "Unk " if u[4] & 0x01: s += 'enabled' return s cfg_inf_protid = { 0: "UBX", 1: "NMEA", } def cfg_inf(self, buf): """UBX-CFG-INF decode, Poll configuration for one protocol""" m_len = len(buf) if 1 == m_len: return ("Poll request: %s" % index_s(buf[0], self.cfg_inf_protid)) if 10 < m_len: return "Bad Length %d" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from('> 4, self.utc_std), flag_s(u[0] >> 4, self.cfg_nav5_mask))) return s cfg_navx5_mask1 = { 4: "minMax", 8: "minCno", 0x40: "initial3dfix", 0x200: "wknRoll", 0x400: "ackAid", 0x2000: "ppp", 0x4000: "aop", } cfg_navx5_mask2 = { 0x40: "adr", 0x80: "sigAttenComp", } cfg_navx5_aop = { 1: "useAOP", } def cfg_navx5(self, buf): """UBX-CFG-NAVX5 decode, Navigation Engine Expert Settings""" # deprecated protver 23+ # length == 20 case seems broken? m_len = len(buf) if 0 == m_len: return " Poll request" if 20 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from('> 1 & 0x7F)) s.append(' inProtoMask (%s)\n' ' outProtoMask (%s)' % (flag_s(u[5], self.cfg_prt_proto), flag_s(u[6], self.cfg_prt_proto))) if portid in set([1, 2, 4, 0]): s.append(' flags (%s)' % flag_s(u[7], self.cfg_prt_flags)) return '\n'.join(s) cfg_rate_system = { 0: "UTC", 1: "GPS", 2: "GLONASS", 3: "BeiDou", 4: "Galileo", } def cfg_rate(self, buf): """UBX-CFG-RATE decode, Navigation/Measurement Rate Settings""" m_len = len(buf) if 0 == m_len: return " Poll request" if 6 > m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len bbrvalues = {0: "Hot Start", 1: "Warm Start", 0xffff: "Cold Start", } mode = {0: "Hardware reset", 1: "Software reset", 2: "Software reset (GNSS only)", 4: "Hardware reset, after shutdown", 8: "Controled GNSS stop", 9: "Controled GNSS start", } u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 7) & 0x0f, self.cfg_tp5_grid), (u[10] >> 11) & 0x03)) return s def cfg_usb(self, buf): """UBX-CFG-USB decode""" m_len = len(buf) if 0 == m_len: return " Poll request" if 108 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len # this is a poll options, so does not set min protver u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' opts['protver']: opts['protver'] = 27 # duplicated in cfg_valset() m_len -= 4 i = 4 while 0 < m_len: u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len # this is a poll option, so does not set min protver u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' (8 + (i * 40)): u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len # at least protver 27 if 27 > opts['protver']: opts['protver'] = 27 u = struct.unpack_from(' (i * 20): u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len s = '' for i in range(1, 7): u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len # at least protver 27 if 27 > opts['protver']: opts['protver'] = 27 u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len # first seen in protver 15 u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len rxbuf_name = { 1: " pending ", 2: " usage ", 3: " peakUsage ", } s = '' for i in range(1, 4): u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len if 1 & buf[0]: s = " awake" else: s = " not awake" return s def mon_smgr(self, buf): """UBX-MON-SMGR decode, Synchronization manager status""" m_len = len(buf) if 0 == m_len: return " Poll request" if 16 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len rxbuf_name = { 1: " pending ", 2: " usage ", 3: " peakUsage ", } s = '' for i in range(1, 4): u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len substr = buf.split(gps.polybytes('\0'))[0] s = ' swVersion %s\n' % gps.polystr(substr) substr = buf[30:39] substr = substr.split(gps.polybytes('\0'))[0] s += ' hwVersion %s' % gps.polystr(substr) # extensions?? num_ext = int((m_len - 40) / 30) i = 0 while i < num_ext: loc = 40 + (i * 30) substr = buf[loc:] substr = substr.split(gps.polybytes('\0'))[0] s += '\n extension: %s' % gps.polystr(substr) i += 1 return s mon_ids = {2: {'str': 'IO', 'dec': mon_io, 'name': 'UBX-MON-IO'}, 4: {'str': 'VER', 'dec': mon_ver, 'name': 'UBX-MON-VER'}, 6: {'str': 'MSGPP', 'dec': mon_msgpp, 'name': 'UBX-MON-MSGPP'}, 7: {'str': 'RXBUF', 'dec': mon_rxbuf, 'name': 'UBX-MON-RXBUF'}, 8: {'str': 'TXBUF', 'dec': mon_txbuf, 'name': 'UBX-MON-TXBUF'}, 9: {'str': 'HW', 'dec': mon_hw, 'name': 'UBX-MON-HW'}, 0x0b: {'str': 'HW2', 'dec': mon_hw2, 'name': 'UBX-MON-HW2'}, 0x21: {'str': 'RXR', 'dec': mon_rxr, 'name': 'UBX-MON-RXR'}, 0x27: {'str': 'PATCH', 'dec': mon_patch, 'name': 'UBX-MON-PATCH'}, 0x28: {'str': 'GNSS', 'dec': mon_gnss, 'name': 'UBX-MON-GNSS'}, 0x2e: {'str': 'SMGR', 'dec': mon_smgr, 'name': 'UBX-MON-SMGR'}, 0x36: {'str': 'COMMS', 'dec': mon_comms, 'name': 'UBX-MON-COMMS'}, 0x37: {'str': 'HW3', 'dec': mon_hw3, 'name': 'UBX-MON-HW3'}, 0x38: {'str': 'RF', 'dec': mon_rf, 'name': 'UBX-MON-RF'}, } def nav_clock(self, buf): """UBX-NAV-CLOCK decode, Clock Solution""" m_len = len(buf) if 0 == m_len: return " Poll request" if 20 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 4) & 1)) return s def nav_dop(self, buf): """UBX-NAV-DOP decode, Dilution of Precision""" m_len = len(buf) if 0 == m_len: return " Poll request" if 18 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len # flags2 is protver 27 u = struct.unpack_from('> 2) & 0x0f, self.nav_pvt_psm), index_s((u[11] >> 6) & 0x04, self.nav_pvt_carr))) return s nav_sat_qual = { 0: "None", 1: "Searching", 2: "Acquired", 3: "Detected", 4: "Code and time locked", 5: "Code, carrier and time locked", 6: "Code, carrier and time locked", 7: "Code, carrier and time locked", } nav_sat_health = { 1: "Healthy", 2: "Unhealthy", } nav_sat_orbit = { 0: "None", 1: "Ephemeris", 2: "Almanac", 3: "AssistNow Offline", 4: "AssistNow Autonomous", 5: "Other", 6: "Other", 7: "Other", } nav_sat_flags = { 8: "svUsed", 0x40: "diffCorr", 0x80: "smoothed", 0x800: "ephAvail", 0x1000: "almAvail", 0x2000: "anoAvail", 0x4000: "aopAvail", 0x10000: "sbasCorrUsed", 0x20000: "rtcmCorrUsed", 0x40000: "slasCorrUsed", 0x100000: "prCorrUsed", 0x200000: "crCorrUsed", 0x400000: "doCorrUsed", } def nav_sat(self, buf): """UBX-NAV-SAT decode""" m_len = len(buf) if 0 == m_len: return " Poll request" if 8 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 4) & 3, self.nav_sat_health), index_s((u[6] >> 8) & 7, self.nav_sat_orbit))) return s nav_sbas_mode = { 0: "Disabled", 1: "Enabled Integrity", 2: "Enabled Testmode", } # sometimes U1 or I1 nav_sbas_sys = { 0: "WAAS", 1: "EGNOS", 2: "MSAS", 3: "GAGAN", 4: "SDCM", # per ICAO Annex 10, v1, Table B-27 16: "GPS", } nav_sbas_service = { 1: "Ranging", 2: "Corrections", 4: "Integrity", 8: "Testmode", } def nav_sbas(self, buf): """UBX-NAV-SBAS decode, SBAS Status Data""" # undocuemnted, but present in protver 27+ m_len = len(buf) if 0 == m_len: return " Poll request" if 12 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return ".Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 4, self.utc_std))) return s def nav_velecef(self, buf): """UBX-NAV-VELECEF decode""" m_len = len(buf) if 0 == m_len: return " Poll request" if 20 != m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len # version not here before protver 18, I hope it is zero. u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len # common to Short-RLM and Long-RLM report u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 24 if 0x8b == preamble: # CNAV msgid = (words[0] >> 12) & 0x3f s += ("\n CNAV: preamble %#x PRN %u msgid %d (%s)\n" % (preamble, (words[0] >> 18) & 0x3f, msgid, index_s(msgid, self.cnav_msgids))) else: # LNAV-L preamble = words[0] >> 22 subframe = (words[1] >> 8) & 0x07 s += ("\n LNAV-L: preamble %#x TLM %#x ISF %u" % (preamble, (words[0] >> 8) & 0xffff, 1 if (words[0] & 0x40) else 0)) s += ("\n TOW %u AF %u ASF %u Subframe %u" % (unpack_u8(words[1], 13) * 6, 1 if (words[0] & 0x1000) else 0, 1 if (words[0] & 0x800) else 0, subframe)) if 1 == subframe: # not well validated decode, possibly wrong... # [1] Figure 20-1 Sheet 1, Table 20-I # WN = GPS week number # TGD = Group Delay Differential # tOC = Time of Clock # af0 = SV Clock Bias Correction Coefficient # af1 = SV Clock Drift Correction Coefficient # af2 = Drift Rate Correction Coefficient ura = (words[2] >> 14) & 0x0f c_on_l2 = (words[2] >> 18) & 0x03 iodc = ((((words[2] >> 6) & 0x03) << 8) | (words[7] >> 24) & 0xff) s += ("\n WN %u Codes on L2 %u (%s) URA %u (%s) " "SVH %#04x IODC %u" % (words[2] >> 20, c_on_l2, index_s(c_on_l2, self.codes_on_l2), ura, index_s(ura, self.ura_meters), (words[2] >> 8) & 0x3f, iodc)) # tOC = Clock Data Reference Time of Week s += ("\n L2 P DF %u TGD %e tOC %u\n" " af2 %e af1 %e af0 %e" % ((words[2] >> 29) & 0x03, unpack_s8(words[6], 6) * (2 ** -31), unpack_u16(words[7], 6) * 16, unpack_s8(words[8], 22) * (2 ** -55), unpack_s16(words[8], 6) * (2 ** -43), unpack_s22(words[9], 8) * (2 ** -31))) elif 2 == subframe: # not well validated decode, possibly wrong... # [1] Figure 20-1 Sheet 1, Tables 20-II and 20-III # IODE = Issue of Data (Ephemeris) # Crs = Amplitude of the Sine Harmonic Correction # Term to the Orbit Radius # Deltan = Mean Motion Difference From Computed Value # M0 = Mean Anomaly at Reference Time # Cuc = Amplitude of the Cosine Harmonic Correction # Term to the Argument of Latitude # e = Eccentricity # Cus = Amplitude of the Sine Harmonic Correction Term # to the Argument of Latitude # sqrtA = Square Root of the Semi-Major Axis # tOE = Reference Time Ephemeris s += ("\n IODE %u Crs %e Deltan %e M0 %e" "\n Cuc %e e %e Cus %e sqrtA %f tOE %u" % (unpack_u8(words[2], 22), unpack_s16(words[2], 6) * (2 ** -5), unpack_s16(words[3], 14) * (2 ** -43), # M0 unpack_s32s(words[4], words[3]) * (2 ** -31), unpack_s16(words[5], 14) * (2 ** -29), unpack_u32s(words[6], words[5]) * (2 ** -33), unpack_s16(words[7], 14) * (2 ** -29), unpack_u32s(words[8], words[7]) * (2 ** -19), unpack_u16(words[9], 14) * 16)) elif 3 == subframe: # not well validated decode, possibly wrong... # [1] Figure 20-1 Sheet 3, Table 20-II, Table 20-III # Cic = Amplitude of the Cosine Harmonic Correction # Term to the Angle of Inclination # Omega0 = Longitude of Ascending Node of Orbit # Plane at Weekly Epoch # Cis = Amplitude of the Sine Harmonic Correction # Term to the Orbit Radius # i0 = Inclination Angle at Reference Time # Crc = Amplitude of the Cosine Harmonic Correction # Term to the Orbit Radius # omega = Argument of Perigee # Omegadot = Rate of Right Ascension # IODE = Issue of Data (Ephemeris) # IODT = Rate of Inclination Angle s += ("\n Cic %e Omega0 %e Cis %e i0 %e" "\n Crc %e omega %e Omegadot %e" "\n IDOE %u IDOT %e" % (unpack_s16(words[2], 14) * (2 ** -29), unpack_s32s(words[3], words[2]) * (2 ** -31), unpack_s16(words[4], 14) * (2 ** -29), unpack_s32s(words[5], words[4]) * (2 ** -31), # Crc unpack_s16(words[6], 14) * (2 ** -5), unpack_s32s(words[7], words[6]) * (2 ** -31), # Omegadot unpack_s24(words[8], 6) * (2 ** -43), unpack_u8(words[9], 22), unpack_s14(words[9], 8) * (2 ** -43))) elif 4 == subframe: # all data in subframe 4 is "reserved", # except for pages 13, 18, 15 # as of 2018, dataid is always 1. svid = (words[2] >> 22) & 0x3f if 0 < svid: page = index_s(svid, self.sbfr4_svid_page) else: # page of zero means the svId that sent it. page = "%d/Self" % svId s += ("\n dataid %u svid %u (page %s)\n" % (words[2] >> 28, svid, page)) if (((2 <= page and 5 >= page) or (5 <= page and 10 >= page))): s += self.almanac(words) elif 13 == page: s += " NWCT" elif 17 == page: s += (" Special messages: " + chr((words[2] >> 14) & 0xff) + chr((words[2] >> 6) & 0xff) + chr((words[3] >> 22) & 0xff) + chr((words[3] >> 14) & 0xff) + chr((words[3] >> 6) & 0xff) + chr((words[4] >> 22) & 0xff) + chr((words[4] >> 14) & 0xff) + chr((words[4] >> 6) & 0xff) + chr((words[5] >> 22) & 0xff) + chr((words[5] >> 14) & 0xff) + chr((words[5] >> 6) & 0xff) + chr((words[6] >> 22) & 0xff) + chr((words[6] >> 14) & 0xff) + chr((words[6] >> 6) & 0xff) + chr((words[7] >> 22) & 0xff) + chr((words[7] >> 14) & 0xff) + chr((words[7] >> 6) & 0xff) + chr((words[8] >> 22) & 0xff) + chr((words[8] >> 14) & 0xff) + chr((words[8] >> 6) & 0xff) + chr((words[9] >> 22) & 0xff) + chr((words[9] >> 14) & 0xff)) elif 18 == page: s += " Ionospheric and UTC data" elif 25 == page: s += " A/S flags" else: s += " Reserved" elif 5 == subframe: svid = (words[2] >> 22) & 0x3f page = index_s(svid, self.sbfr5_svid_page) s += ("\n dataid %u svid %u (page %s)\n" % (words[2] >> 28, svid, page)) if 1 <= page and 24 >= page: s += self.almanac(words) elif 25 == page: s += " A/S flags" else: s += " Reserved" return s def rxm_svsi(self, buf): """UBX-RXM-SVSI decode, SV Status Info""" m_len = len(buf) if 0 == m_len: return " Poll request" if 8 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 2) & 0x03 if 0 == raim: s += "RAIM not available" elif 1 == raim: s += "RAIM not active" elif 2 == raim: s += "RAIM active" else: s += "RAIM ??" return s tim_ids = {1: {'str': 'TP', 'dec': tim_tp, 'name': 'UBX-TIM-TP'}, 3: {'str': 'TM2', 'dec': tim_tm2, 'name': 'UBX-TIM-TM2'}, 4: {'str': 'SVIN', 'dec': tim_svin, 'name': 'UBX-TIM-SVIN'}, 6: {'str': 'VRFY', 'name': 'UBX-TIM-VRFY'}, 0x11: {'str': 'DOSC', 'name': 'UBX-TIM-DOSC'}, 0x12: {'str': 'TOS', 'name': 'UBX-TIM-TOS'}, 0x13: {'str': 'SMEAS', 'name': 'UBX-TIM-SMEAS'}, 0x15: {'str': 'VCOCAL', 'name': 'UBX-TIM-VCOCAL'}, 0x16: {'str': 'FCHG', 'name': 'UBX-TIM-FCHG'}, 0x17: {'str': 'HOC', 'name': 'UBX-TIM-HOC'}, } # UBX-UPD- ?? classes = { 0x01: {'str': 'NAV', 'ids': nav_ids}, 0x02: {'str': 'RXM', 'ids': rxm_ids}, 0x04: {'str': 'INF', 'ids': inf_ids}, 0x05: {'str': 'ACK', 'ids': ack_ids}, 0x06: {'str': 'CFG', 'ids': cfg_ids}, 0x09: {'str': 'UPD'}, 0x0A: {'str': 'MON', 'ids': mon_ids}, 0x0B: {'str': 'ATD'}, 0x0D: {'str': 'TIM', 'ids': tim_ids}, 0x10: {'str': 'ESF'}, 0x13: {'str': 'MGA'}, 0x21: {'str': 'LOG'}, 0x27: {'str': 'SEC', 'ids': sec_ids}, 0x28: {'str': 'HNR'}, 0xf0: {'str': 'NMEA', 'ids': nmea_ids}, 0xf5: {'str': 'RTCM', 'ids': rtcm_ids}, } def class_id_s(self, m_class, m_id): """Return class and ID numbers as a string.""" s = 'Class: ' if m_class not in self.classes: s += '%#x ID: %#x' % (m_class, m_id) return s if 'str' in self.classes[m_class]: s_class = self.classes[m_class]['str'] s += '%s(%#x) ' % (s_class, m_class) else: s += '%#x ' % (m_class) if (('ids' in self.classes[m_class] and m_id in self.classes[m_class]['ids'] and 'str' in self.classes[m_class]['ids'][m_id])): s_id = self.classes[m_class]['ids'][m_id]['str'] s += 'ID: %s(%#x)' % (s_id, m_id) else: s += 'ID: %#x' % (m_id) return s def decode_msg(self, out): """Decode one message and then return number of chars consumed""" state = 'BASE' consumed = 0 # decode state machine for this_byte in out: consumed += 1 if isinstance(this_byte, str): # a character, probably read from a file c = ord(this_byte) else: # a byte, probably read from a serial port c = int(this_byte) if VERB_RAW <= opts['verbosity']: if (ord(' ') <= c) and (ord('~') >= c): # c is printable print("state: %s char %c (%#x)" % (state, chr(c), c)) else: # c is not printable print("state: %s char %#x" % (state, c)) if 'BASE' == state: # start fresh # place to store 'comments' comment = '' m_class = 0 m_id = 0 m_len = 0 m_raw = bytearray(0) # class, id, len, payload m_payload = bytearray(0) # just the payload m_ck_a = 0 m_ck_b = 0 if 0xb5 == c: # got header 1, mu state = 'HEADER1' if ord('$') == c: # got $, so NMEA? state = 'NMEA' comment = '$' if ord("{") == c: # JSON, treat as comment line state = 'JSON' # start fresh comment = "{" continue if ord("#") == c: # comment line state = 'COMMENT' # start fresh comment = "#" continue if 0xd3 == c: # RTCM3 Leader 1 state = 'RTCM3_1' # start fresh comment = "#" continue if (ord('\n') == c) or (ord('\r') == c): # CR or LF, leftovers return 1 continue if state in ('COMMENT', 'JSON'): # inside comment if ord('\n') == c or ord('\r') == c: # Got newline or linefeed # terminate messages on or # Done, got a full message if gps.polystr('{"class":"ERROR"') in comment: # always print gpsd errors print(comment) elif VERB_DECODE <= opts['verbosity']: print(comment) return consumed else: comment += chr(c) continue if 'NMEA' == state: # getting NMEA payload if (ord('\n') == c) or (ord('\r') == c): # CR or LF, done, got a full message # terminates messages on or if VERB_DECODE <= opts['verbosity']: print(comment + '\n') return consumed else: comment += chr(c) continue if 'RTCM3_1' == state: # high 6 bits must be zero, if 0 != (c & 0xfc): state = 'BASE' else: # low 2 bits are MSB of a 10-bit length m_len = c << 8 state = 'RTCM3_2' m_raw.extend([c]) continue if 'RTCM3_2' == state: # 8 bits are LSB of a 10-bit length m_len |= 0xff & c # add 3 for checksum m_len += 3 state = 'RTCM3_PAYLOAD' m_raw.extend([c]) continue if 'RTCM3_PAYLOAD' == state: m_len -= 1 m_raw.extend([c]) m_payload.extend([c]) if 0 == m_len: state = 'BASE' type = m_payload[0] << 4 type |= 0x0f & (m_payload[1] >> 4) if VERB_DECODE <= opts['verbosity']: print("RTCM3 packet: type %d\n" % type) continue if ord('b') == c and 'HEADER1' == state: # got header 2 state = 'HEADER2' continue if 'HEADER2' == state: # got class state = 'CLASS' m_class = c m_raw.extend([c]) continue if 'CLASS' == state: # got ID state = 'ID' m_id = c m_raw.extend([c]) continue if 'ID' == state: # got first length state = 'LEN1' m_len = c m_raw.extend([c]) continue if 'LEN1' == state: # got second length m_raw.extend([c]) m_len += 256 * c if 0 == m_len: # no payload state = 'CSUM1' else: state = 'PAYLOAD' continue if 'PAYLOAD' == state: # getting payload m_raw.extend([c]) m_payload.extend([c]) if len(m_payload) == m_len: state = 'CSUM1' continue if 'CSUM1' == state: # got ck_a state = 'CSUM2' m_ck_a = c continue if 'CSUM2' == state: # ck_b state = 'BASE' m_ck_b = c # check checksum chk = self.checksum(m_raw, len(m_raw)) if (chk[0] != m_ck_a) or (chk[1] != m_ck_b): sys.stderr.write("%s: ERROR checksum failed," "was (%d,%d) s/b (%d, %d)\n" % (PROG_NAME, m_ck_a, m_ck_b, chk[0], chk[1])) s_payload = ''.join('{:02x} '.format(x) for x in m_payload) x_payload = ','.join(['%02x' % x for x in m_payload]) if m_class in self.classes: this_class = self.classes[m_class] if 'ids' in this_class: if m_id in this_class['ids']: if 'dec' in this_class['ids'][m_id]: dec = this_class['ids'][m_id]['dec'] s_payload = this_class['ids'][m_id]['name'] s_payload += ':\n' s_payload += dec(self, m_payload) else: s_payload = ("%s, len %#x, raw %s" % (self.class_id_s(m_class, m_id), m_len, x_payload)) if VERB_INFO <= opts['verbosity']: print("%s, len: %#x" % (self.class_id_s(m_class, m_id), m_len)) print("payload: %s" % x_payload) print("%s\n" % s_payload) return consumed # give up state = 'BASE' # fell out of loop, no more chars to look at return 0 def checksum(self, msg, m_len): """Calculate u-blox message checksum""" # the checksum is calculated over the Message, starting and including # the CLASS field, up until, but excluding, the Checksum Field: ck_a = 0 ck_b = 0 for c in msg[0:m_len]: ck_a += c ck_b += ck_a return [ck_a & 0xff, ck_b & 0xff] def make_pkt(self, m_class, m_id, m_data): """Make a message packet""" # always little endian, leader, class, id, length m_len = len(m_data) # build core message msg = bytearray(m_len + 6) struct.pack_into(' opts['protver']: # UBX-NAV-SOL is ECEF. deprecated in protver 14, gone in protver 27 m_data = bytearray([0x01, 0x06, rate]) gps_model.gps_send(6, 1, m_data) else: # UBX-NAV-PVT m_data = bytearray([0x01, 0x07, rate]) gps_model.gps_send(6, 1, m_data) # UBX-NAV-TIMEGPS # Note: UTC may, or may not be UBX-NAV-TIMEGPS. # depending on UBX-CFG-NAV5 utcStandard # Note: We use TIMEGPS to get the leapS m_data = bytearray([0x01, 0x20, rate]) gps_model.gps_send(6, 1, m_data) # no point doing UBX-NAV-SBAS and UBX-NAV-SVINFO # faster than every 10 seconds if rate: rate_s = 10 else: rate_s = 0 if 27 > opts['protver']: # UBX-NAV-SBAS, gone in protver 27 m_data = bytearray([0x01, 0x32, rate_s]) gps_model.gps_send(6, 1, m_data) # get Satellite Information if 15 > opts['protver']: # UBX-NAV-SVINFO - deprecated in protver 15, gone in 27 m_data = bytearray([0x01, 0x30, rate_s]) gps_model.gps_send(6, 1, m_data) # UBX-NAV-SAT turn it off, if we can m_data = bytearray([0x01, 0x35, 0]) gps_model.gps_send(6, 1, m_data) else: # use UBX-NAV-SAT for protver 15 and up m_data = bytearray([0x01, 0x35, rate_s]) gps_model.gps_send(6, 1, m_data) if 27 > opts['protver']: # UBX-NAV-SVINFO turn it off, if we can m_data = bytearray([0x01, 0x30, 0]) gps_model.gps_send(6, 1, m_data) if 18 <= opts['protver']: # first in u-blox 8 # UBX-NAV-EOE, end of epoch. Good cycle ender m_data = bytearray([0x01, 0x61, rate]) gps_model.gps_send(6, 1, m_data) def send_able_ecef(self, able): """Enable ECEF messages""" # set NAV-POSECEF rate gps_model.send_cfg_msg(1, 1, able) # set NAV-VELECEF rate gps_model.send_cfg_msg(1, 0x11, able) def send_able_gps(self, able): """dis/enable GPS/QZSS""" # GPS and QZSS both on, or both off, together # GPS gps_model.send_cfg_gnss1(0, able) # QZSS gps_model.send_cfg_gnss1(5, able) def send_able_galileo(self, able): """dis/enable GALILEO""" gps_model.send_cfg_gnss1(2, able) def send_able_glonass(self, able): """dis/enable GLONASS""" # Two frequency GPS use BeiDou or GLONASS # disable, then enable gps_model.send_cfg_gnss1(6, able) def send_able_nmea(self, able): """dis/enable basic NMEA messages""" rate = 1 if able else 0 # xxGBS m_data = bytearray([0xf0, 0x09, rate]) gps_model.gps_send(6, 1, m_data) # xxGGA m_data = bytearray([0xf0, 0x00, rate]) gps_model.gps_send(6, 1, m_data) # xxGGL m_data = bytearray([0xf0, 0x01, rate]) gps_model.gps_send(6, 1, m_data) # xxGSA m_data = bytearray([0xf0, 0x02, rate]) gps_model.gps_send(6, 1, m_data) # xxGST m_data = bytearray([0xf0, 0x07, rate]) gps_model.gps_send(6, 1, m_data) # xxGSV m_data = bytearray([0xf0, 0x03, rate]) gps_model.gps_send(6, 1, m_data) # xxRMC m_data = bytearray([0xf0, 0x04, rate]) gps_model.gps_send(6, 1, m_data) # xxVTG m_data = bytearray([0xf0, 0x05, rate]) gps_model.gps_send(6, 1, m_data) # xxZDA m_data = bytearray([0xf0, 0x08, rate]) gps_model.gps_send(6, 1, m_data) def send_able_rawx(self, able): """dis/enable UBX-RXM-RAW/RAWXX""" rate = 1 if able else 0 if 15 > opts['protver']: # u-blox 7 or earlier, use RAW sid = 0x10 else: # u-blox 8 or later, use RAWX sid = 0x15 m_data = bytearray([0x2, sid, rate]) gps_model.gps_send(6, 1, m_data) def send_able_pps(self, able): """dis/enable PPS, using UBX-CFG-TP5""" m_data = bytearray(32) m_data[0] = 0 # tpIdx m_data[1] = 1 # version m_data[2] = 0 # reserved m_data[3] = 0 # reserved m_data[4] = 2 # antCableDelay m_data[5] = 0 # antCableDelay m_data[6] = 0 # rfGroupDelay m_data[7] = 0 # rfGroupDelay m_data[8] = 0x40 # freqPeriod m_data[9] = 0x42 # freqPeriod m_data[10] = 0x0f # freqPeriod m_data[11] = 0 # freqPeriod m_data[12] = 0x40 # freqPeriodLock m_data[13] = 0x42 # freqPeriodLock m_data[14] = 0x0f # freqPeriodLock m_data[15] = 0 # freqPeriodLock m_data[16] = 0 # pulseLenRatio m_data[17] = 0 # pulseLenRatio m_data[18] = 0 # pulseLenRatio m_data[19] = 0 # pulseLenRatio m_data[20] = 0xa0 # pulseLenRatioLock m_data[21] = 0x86 # pulseLenRatioLock m_data[22] = 0x1 # pulseLenRatioLock m_data[23] = 0 # pulseLenRatioLock m_data[24] = 0 # userConfigDelay m_data[25] = 0 # userConfigDelay m_data[26] = 0 # userConfigDelay m_data[27] = 0 # userConfigDelay m_data[28] = 0x77 # flags m_data[29] = 0 # flags m_data[30] = 0 # flags m_data[31] = 0 # flags gps_model.gps_send(6, 0x31, m_data) def send_able_sbas(self, able): """dis/enable SBAS""" gps_model.send_cfg_gnss1(1, able) def send_able_sfrbx(self, able): """dis/enable UBX-RXM-SFRB/SFRBX""" rate = 1 if able else 0 if 15 > opts['protver']: # u-blox 7 or earlier, use SFRB sid = 0x11 else: # u-blox 8 or later, use SFRBX sid = 0x13 m_data = bytearray([0x2, sid, rate]) gps_model.gps_send(6, 1, m_data) def send_able_tmode2(self, able): """SURVEYIN, UBX-CFG-TMODE2, set time mode 2 config""" m_data = bytearray(28) if able: # enable survey-in m_data[0] = 1 # on a NEO-M8T, with good antenna # five minutes, gets about 1 m # ten minutes, gets about 0.9 m # twenty minutes, gets about 0.7 m # one hour, gets about 0.5 m # twelve hours, gets about 0.14 m # Survey-in minimum duration seconds seconds = 300 m_data[20] = seconds & 0x0ff seconds >>= 8 m_data[21] = seconds & 0x0ff seconds >>= 8 m_data[22] = seconds & 0x0ff seconds >>= 8 m_data[23] = seconds & 0x0ff # Survey-in position accuracy limit in mm # make it big, so the duration decides when to end survey mmeters = 50000 m_data[24] = mmeters & 0x0ff mmeters >>= 8 m_data[25] = mmeters & 0x0ff mmeters >>= 8 m_data[26] = seconds & 0x0ff seconds >>= 8 m_data[27] = mmeters & 0x0ff gps_model.gps_send(6, 0x3d, m_data) def send_able_tp(self, able): """dis/enable UBX-TIM-TP Time Pulse""" rate = 1 if able else 0 m_data = bytearray([0xd, 0x1, rate]) gps_model.gps_send(6, 1, m_data) def send_cfg_cfg(self, save_clear): """UBX-CFG-CFG, save config""" # Save: save_clear = 0 # Clear: save_clear = 1 # basic configs always available to change: # ioPort, msgConf, infMsg, navConf, rxmConf cfg1 = 0x1f # senConf, rinvConf, antConf, logConf cfg2 = 0x0f m_data = bytearray(13) # clear # as of protver 27, any bit in clearMask clears all if 0 == save_clear: # saving m_data[0] = 0 m_data[1] = 0 else: # clearing m_data[0] = cfg1 m_data[1] = cfg2 m_data[2] = 0 # m_data[3] = 0 # # save # as of protver 27, any bit in saveMask saves all if 0 == save_clear: # saving m_data[4] = cfg1 m_data[5] = cfg2 else: # clearing m_data[4] = 0 m_data[5] = 0 m_data[6] = 0 # m_data[7] = 0 # # load # as of protver 27, any bit in loadMask loads all if False and 0 == save_clear: # saving m_data[8] = 0 m_data[9] = 0 else: # clearing, load it to save a reboot m_data[8] = cfg1 m_data[9] = cfg2 m_data[10] = 0 # m_data[11] = 0 # # deviceMask, where to save it, try all options m_data[12] = 0x17 # devBBR, devFLASH devEEPROM, devSpiFlash gps_model.gps_send(6, 0x9, m_data) def send_cfg_gnss1(self, gnssId, enable): """UBX-CFG-GNSS, set GNSS config""" m_data = bytearray(12) m_data[0] = 0 # version 0, msgVer m_data[1] = 0 # read only, numTrkChHw m_data[2] = 0xFF # read only, numTrkChUse m_data[3] = 1 # 1 block follows # block 1 m_data[4] = gnssId # gnssId if 0 == gnssId: # GPS m_data[5] = 8 # resTrkCh m_data[6] = 16 # maxTrkCh if 1 == gnssId: # SBAS m_data[5] = 1 # resTrkCh m_data[6] = 3 # maxTrkCh if 2 == gnssId: # GALILEO m_data[5] = 4 # resTrkCh m_data[6] = 8 # maxTrkCh if 3 == gnssId: # BeiDou m_data[5] = 2 # resTrkCh m_data[6] = 16 # maxTrkCh if 4 == gnssId: # IMES m_data[5] = 0 # resTrkCh m_data[6] = 8 # maxTrkCh if 5 == gnssId: # QZSS m_data[5] = 0 # resTrkCh m_data[6] = 3 # maxTrkCh if 6 == gnssId: # GLONASS m_data[5] = 8 # resTrkCh m_data[6] = 14 # maxTrkCh m_data[7] = 0 # reserved1 m_data[8] = enable # flags m_data[9] = 0 # flagflags, unused if 5 == gnssId: # QZSS m_data[10] = 5 # flags E1OS, L1SAIF else: m_data[10] = 1 # flags E1OS m_data[11] = 1 # flags, unused gps_model.gps_send(6, 0x3e, m_data) def poll_cfg_inf(self): """UBX-CFG-INF, poll""" m_data = bytearray(1) m_data[0] = 0 # UBX gps_model.gps_send(6, 0x02, m_data) m_data[0] = 1 # NMEA gps_model.gps_send(6, 0x02, m_data) def send_cfg_nav5_model(self): """UBX-CFG-NAV5, set dynamic platform model""" m_data = bytearray(36) m_data[0] = 1 # just setting dynamic model m_data[1] = 0 # just setting dynamic model m_data[2] = opts["mode"] gps_model.gps_send(6, 0x24, m_data) def send_cfg_msg(self, m_class, m_id, rate=None): """UBX-CFG-MSG, poll, or set, message rates decode""" m_data = bytearray(2) m_data[0] = m_class m_data[1] = m_id if rate is not None: m_data.extend([rate]) gps_model.gps_send(6, 1, m_data) def send_cfg_pms(self): """UBX-CFG-PMS, poll/set Power Management Settings""" if opts["mode"] is not None: m_data = bytearray(8) # set powerSetupValue to mode m_data[1] = opts["mode"] # leave period and onTime zero, which breaks powerSetupValue = 3 else: m_data = bytearray(0) gps_model.gps_send(6, 0x86, m_data) def send_cfg_prt(self): """UBX-CFG-PRT, get I/O Port settings""" port = opts['interface'] if port is None: m_data = bytearray() else: m_data = bytearray([port]) gps_model.gps_send(6, 0x0, m_data) def send_set_speed(self, speed): """"UBX-CFG-PRT, set port""" port = opts['interface'] # FIXME! Determine and use current port as default if port is None: port = 1 # Default to port 1 (UART/UART_1) if port not in set([1, 2]): sys.stderr.write('%s: Invalid UART port - %d\n' % (PROG_NAME, port)) sys.exit(2) # FIXME! Poll current masks, then adjust speed m_data = bytearray(20) m_data[0] = port m_data[4] = 0xc0 # 8N1 m_data[5] = 0x8 # 8N1 m_data[8] = speed & 0xff m_data[9] = (speed >> 8) & 0xff m_data[10] = (speed >> 16) & 0xff m_data[11] = (speed >> 24) & 0xff m_data[12] = 3 # in, ubx and nmea m_data[14] = 3 # out, ubx and nmea gps_model.gps_send(6, 0, m_data) def send_cfg_rst(self, reset_type): """UBX-CFG-RST, reset""" # always do a hardware reset # if on USB, this will disconnect and reconnect, giving you # a new tty. m_data = bytearray(4) m_data[0] = reset_type & 0xff m_data[1] = (reset_type >> 8) & 0xff gps_model.gps_send(6, 0x4, m_data) def send_cfg_tp5(self): """UBX-CFG-TP5, get time0 decodes, timepulse 0 and 1""" m_data = bytearray(0) gps_model.gps_send(6, 0x31, m_data) # and timepulse 1 m_data = bytearray(1) m_data[0] = 1 gps_model.gps_send(6, 0x31, m_data) def send_mon_comms(self): """UBX-MON-COMMS Comm port information""" m_data = bytearray(0) gps_model.gps_send(0x0a, 0x36, m_data) def send_cfg_valdel(self, key): """UBX-CFG-VALDEL, delete config items by key""" m_data = bytearray(4) m_data[0] = 0 # version, 0 = transactionless, 1 = transaction m_data[1] = 6 # 2 = BBR, 4 = flash # can not delete RAM layer! # so options stay set until reset! for key in keys: k_data = bytearray(4) k_data[0] = (key) & 0xff k_data[1] = (key >> 8) & 0xff k_data[2] = (key >> 16) & 0xff k_data[3] = (key >> 24) & 0xff m_data.extend(k_data) gps_model.gps_send(0x06, 0x8c, m_data) def send_cfg_valget(self, keys): """UBX-CFG-VALGET, get config items by key""" m_data = bytearray(4) m_data[0] = 0 # version, 0 = request, 1 = answer m_data[1] = 0 # RAM layer for key in keys: k_data = bytearray(4) k_data[0] = (key) & 0xff k_data[1] = (key >> 8) & 0xff k_data[2] = (key >> 16) & 0xff k_data[3] = (key >> 24) & 0xff m_data.extend(k_data) gps_model.gps_send(0x06, 0x8b, m_data) def send_cfg_valset(self, nvs): """UBX-CFG-VALSET, set config items by key/val pairs""" m_data = bytearray(4) m_data[0] = 0 # version, 0 = request, 1 = transaction m_data[1] = 0x7 # RAM layer, 1=RAM, 2=BBR, 4=Flash for nv in nvs: size = 4 nv_split = nv.split(',') name = nv_split[0] val = nv_split[1] item = gps_model.cfg_by_name(name) key = item[1] val_type = item[2] cfg_type = self.item_to_type(item) size = 4 + cfg_type[0] frmat = cfg_type[1] flavor = cfg_type[2] if 'u' == flavor: val1 = int(val) elif 'i' == flavor: val1 = int(val) elif 'f' == flavor: val1 = float(val) kv_data = bytearray(size) kv_data[0] = (key) & 0xff kv_data[1] = (key >> 8) & 0xff kv_data[2] = (key >> 16) & 0xff kv_data[3] = (key >> 24) & 0xff struct.pack_into(frmat, kv_data, 4, val1) m_data.extend(kv_data) gps_model.gps_send(0x06, 0x8a, m_data) def send_poll(self, m_data): """generic send poll request""" gps_model.gps_send(m_data[0], m_data[1], m_data[2:]) able_commands = { # en/dis able BeiDou "BEIDOU": {"command": send_able_beidou, "help": "BeiDou"}, # en/dis able basic binary messages "BINARY": {"command": send_able_binary, "help": "basic binary messages"}, # en/dis able ECEF "ECEF": {"command": send_able_ecef, "help": "ECEF"}, # en/dis able GPS "GPS": {"command": send_able_gps, "help": "GPS and QZSS"}, # en/dis able GALILEO "GALILEO": {"command": send_able_galileo, "help": "GALILEO"}, # en/dis able GLONASS "GLONASS": {"command": send_able_glonass, "help": "GLONASS"}, # en/dis able basic NMEA messages "NMEA": {"command": send_able_nmea, "help": "basic NMEA messages"}, # en/dis able RAW/RAWX "RAWX": {"command": send_able_rawx, "help": "RAW/RAWX measurements"}, # en/dis able PPS "PPS": {"command": send_able_pps, "help": "PPS on TIMPULSE"}, # en/dis able SBAS "SBAS": {"command": send_able_sbas, "help": "SBAS"}, # en/dis able SFRB/SFRBX "SFRBX": {"command": send_able_sfrbx, "help": "SFRB/SFRBX subframes"}, # en/dis able TP time pulse message "TP": {"command": send_able_tp, "help": "TP Time Pulse message"}, # en/dis able TMODE2 Survey-in "SURVEYIN": {"command": send_able_tmode2, "help": "Survey-in mode with TMODE2"}, } commands = { # UBX-CFG-ANT "CFG-ANT": {"command": send_poll, "opt": [0x06, 0x13], "help": "poll UBX-CFG-ANT antenna config"}, # UBX-CFG-GNSS "CFG-GNSS": {"command": send_poll, "opt": [0x06, 0x3e], "help": "poll UBX-CFG-GNSS GNSS config"}, # UBX-CFG-INF "CFG-INF": {"command": poll_cfg_inf, "help": "poll UBX-CFG-INF Information Message " "Configuration"}, # UBX-CFG-NAV5 "CFG-NAV5": {"command": send_poll, "opt": [0x06, 0x24], "help": "poll UBX-CFG-NAV5 Nav Engines settings"}, # UBX-CFG-NAVX5 "CFG-NAVX5": {"command": send_poll, "opt": [0x06, 0x23], "help": "poll UBX-CFG-NAVX5 Nav Expert Settings"}, # UBX-CFG-NMEA "CFG-NMEA": {"command": send_poll, "opt": [0x06, 0x17], "help": "poll UBX-CFG-NMEA Extended NMEA protocol " "configuration V1"}, # UBX-CFG-ODO "CFG-ODO": {"command": send_poll, "opt": [0x06, 0x1e], "help": "poll UBX-CFG-ODO Odometer, Low-speed COG " "Engine Settings"}, # UBX-CFG-PMS "CFG-PMS": {"command": send_poll, "opt": [0x06, 0x86], "help": "poll UBX-CFG-PMS power management settings"}, # UBX-CFG-PRT "CFG-PRT": {"command": send_cfg_prt, "help": "poll UBX-CFG-PRT I/O port settings"}, # UBX-CFG-RATE "CFG-RATE": {"command": send_poll, "opt": [0x06, 0x08], "help": "poll UBX-CFG-RATE Navigation/Measurement " "Rate Settings"}, # UBX-CFG-RINV "CFG-RINV": {"command": send_poll, "opt": [0x06, 0x34], "help": "poll UBX-CFG-RINV Contents of Remote Inventory"}, # UBX-CFG-SBAS "CFG-SBAS": {"command": send_poll, "opt": [0x06, 0x16], "help": "poll UBX-CFG-SBAS SBAS settings"}, # UBX-CFG-SMGR "CFG-SMGR": {"command": send_poll, "opt": [0x06, 0x62], "help": "poll UBX-CFG-SMGR Synchronization manager " " configuration"}, # UBX-CFG-TMODE2 "CFG-TMODE2": {"command": send_poll, "opt": [0x06, 0x3d], "help": "poll UBX-CFG-TMODE2 time mode 2 config"}, # UBX-CFG-TP5 "CFG-TP5": {"command": send_cfg_tp5, "help": "poll UBX-TIM-TP5 time pulse decodes"}, # UBX-CFG-USB "CFG-USB": {"command": send_poll, "opt": [0x06, 0x1b], "help": "poll UBX-CFG-USB USB config"}, # UBX-CFG-RST "COLDBOOT": {"command": send_cfg_rst, "help": "UBS-CFG-RST coldboot the GPS", "opt": 0xfff}, # UBX-CFG-RST "HOTBOOT": {"command": send_cfg_rst, "help": "UBX-CFG-RST hotboot the GPS", "opt": 0}, # UBX-CFG-NAV5 "MODEL": {"command": send_cfg_nav5_model, "help": "set UBX-CFG-NAV5 Dynamic Platform Model"}, # UBX-MON-COMMS "MON-COMMS": {"command": send_mon_comms, "help": "poll UBX-MON-COMMS Comm port " " information (27+)"}, # UBX-MON-GNSS "MON-GNSS": {"command": send_poll, "opt": [0x0a, 0x28], "help": "poll UBX-MON-GNSS major GNSS selection"}, # UBX-MON-HW "MON-HW": {"command": send_poll, "opt": [0x0a, 0x09], "help": "poll UBX-MON-HW Hardware Status"}, # UBX-MON-HW2 "MON-HW2": {"command": send_poll, "opt": [0x0a, 0x0b], "help": "poll UBX-MON-HW2 Exended Hardware Status"}, # UBX-MON-HW3 "MON-HW3": {"command": send_poll, "opt": [0x0a, 0x37], "help": "poll UBX-MON-HW3 HW I/O pin infromation"}, # UBX-MON-IO "MON-IO": {"command": send_poll, "opt": [0x0a, 0x02], "help": "poll UBX-MON-IO I/O Subsystem Status"}, # UBX-MON-MSGPP "MON-MSGPP": {"command": send_poll, "opt": [0x0a, 0x06], "help": "poll UBX-MON-MSGPP Message Parese and " "Process Status"}, # UBX-MON-PATCH "MON-PATCH": {"command": send_poll, "opt": [0x0a, 0x27], "help": "poll UBX-MON-PATCH Info on Installed Patches"}, # UBX-MON-RF "MON-RF": {"command": send_poll, "opt": [0x0a, 0x38], "help": "poll UBX-MON-RF RF Information"}, # UBX-MON-RXBUF "MON-RXBUF": {"command": send_poll, "opt": [0x0a, 0x07], "help": "poll UBX-MON-RXBUF Receiver Buffer Status"}, # UBX-MON-SMGR "MON-SMGR": {"command": send_poll, "opt": [0x0a, 0x2e], "help": "poll UBX-MON-SMGR Synchronization manager " "configuration"}, # UBX-MON-TXBUF "MON-TXBUF": {"command": send_poll, "opt": [0x0a, 0x08], "help": "poll UBX-MON-TXBUF Transmitter Buffer Status"}, # UBX-MON-VER "MON-VER": {"command": send_poll, "opt": [0x0a, 0x04], "help": "poll UBX-MON-VER GPS version"}, # UBX-NAV-CLOCK "NAV-CLOCK": {"command": send_poll, "opt": [0x01, 0x22], "help": "poll UBX-NAV-CLOCK Clock Solution"}, # UBX-NAV-DGPS "NAV-DGPS": {"command": send_poll, "opt": [0x01, 0x31], "help": "poll UBX-NAV-DGPS DGPS Data Used for NAV"}, # UBX-NAV-DOP "NAV-DOP": {"command": send_poll, "opt": [0x01, 0x04], "help": "poll UBX-NAV-DOP Dilution of Precision"}, # UBX-NAV-GEOFENCE "NAV-GEOFENCE": {"command": send_poll, "opt": [0x01, 0x39], "help": "poll UBX-NAV-GEOFENCE Geofence status"}, # UBX-NAV-HPPOSECEF "NAV-HPPOSECEF": {"command": send_poll, "opt": [0x01, 0x13], "help": "poll UBX-NAV-HPPOSECEF ECEF position"}, # UBX-NAV-HPPOSLLH "NAV-HPPOSLLH": {"command": send_poll, "opt": [0x01, 0x14], "help": "poll UBX-NAV-HPPOSECEF LLH position"}, # UBX-NAV-POSECEF "NAV-POSECEF": {"command": send_poll, "opt": [0x01, 0x01], "help": "poll UBX-NAV-POSECEF ECEF position"}, # UBX-NAV-POSLLH "NAV-POSLLH": {"command": send_poll, "opt": [0x01, 0x02], "help": "poll UBX-NAV-POSLLH LLH position"}, # UBX-NAV-PVT "NAV-PVT": {"command": send_poll, "opt": [0x01, 0x07], "help": "poll UBX-NAV-PVT Navigation Position Velocity " "Time Solution"}, # UBX-NAV-SAT "NAV-SAT": {"command": send_poll, "opt": [0x01, 0x35], "help": "poll UBX-NAV-SAT Satellite Information"}, # UBX-NAV-SBAS "NAV-SBAS": {"command": send_poll, "opt": [0x01, 0x32], "help": "poll UBX-NAV-SBAS SBAS Status Data"}, # UBX-NAV-SIG "NAV-SIG": {"command": send_poll, "opt": [0x01, 0x43], "help": "poll UBX-NAV-SIG Signal Information"}, # UBX-NAV-SOL "NAV-SOL": {"command": send_poll, "opt": [0x01, 0x06], "help": "poll UBX-NAV-SOL Navigation Solution " "Information"}, # UBX-NAV-STATUS "NAV-STATUS": {"command": send_poll, "opt": [0x01, 0x03], "help": "poll UBX-NAV-STATUS Receiver Nav Status"}, # UBX-NAV-TIMEBDS "NAV-TIMEBDS": {"command": send_poll, "opt": [0x01, 0x24], "help": "poll UBX-NAV-TIMEBDS BDS Time Solution"}, # UBX-NAV-TIMEGAL "NAV-TIMEGAL": {"command": send_poll, "opt": [0x01, 0x25], "help": "poll UBX-NAV-TIMEGAL Galileo Time Solution"}, # UBX-NAV-TIMEGLO "NAV-TIMEGLO": {"command": send_poll, "opt": [0x01, 0x23], "help": "poll UBX-NAV-TIMEGLO GLO Time Solution"}, # UBX-NAV-TIMEGPS "NAV-TIMEGPS": {"command": send_poll, "opt": [0x01, 0x20], "help": "poll UBX-NAV-TIMEGPS GPS Time Solution"}, # UBX-NAV-TIMELS "NAV-TIMELS": {"command": send_poll, "opt": [0x01, 0x26], "help": "poll UBX-NAV-TIMELS Leap Second Info"}, # UBX-NAV-TIMEUTC "NAV-TIMEUTC": {"command": send_poll, "opt": [0x01, 0x21], "help": "poll UBX-NAV-TIMEUTC UTC Time Solution"}, # UBX-NAV-VELECEF "NAV-VELECEF": {"command": send_poll, "opt": [0x01, 0x11], "help": "poll UBX-NAV-VELECEF ECEF velocity"}, # UBX-NAV-VELNED "NAV-VELNED": {"command": send_poll, "opt": [0x01, 0x12], "help": "poll UBX-NAV-VELNED NED velocity"}, # UBX-CFG-PMS "PMS": {"command": send_cfg_pms, "help": "set UBX-CFG-PMS power management settings"}, # UBX-RXM-RAWX "RXM-RAWX": {"command": send_poll, "opt": [0x02, 0x15], "help": "poll UBX-RXM-RAWX raw measurement data"}, # UBX-CFG-CFG "RESET": {"command": send_cfg_cfg, "help": "UBX-CFG-CFG reset config to defaults", "opt": 1}, # UBX-CFG-CFG "SAVE": {"command": send_cfg_cfg, "help": "UBX-CFG-CFG save current config", "opt": 0}, # UBX-CFG-SBAS "SEC-UNIQID": {"command": send_poll, "opt": [0x27, 0x03], "help": "poll UBX-SEC-UNIQID Unique chip ID"}, # UBX-TIM-SVIN "TIM-SVIN": {"command": send_poll, "opt": [0x0d, 0x04], "help": "poll UBX-TIM-SVIN survey in data"}, # UBX-TIM-TM2 "TIM-TM2": {"command": send_poll, "opt": [0x0d, 0x03], "help": "poll UBX-TIM-TM2 time mark data"}, # UBX-TIM-TP "TIM-TP": {"command": send_poll, "opt": [0x0d, 0x01], "help": "poll UBX-TIM-TP time pulse timedata"}, # UBX-CFG-RST "WARMBOOT": {"command": send_cfg_rst, "help": "UBX-CFG-RST warmboot the GPS", "opt": 1}, } # end class ubx class gps_io(object): """All the GPS I/O in one place" Three types of GPS I/O 1. read only from a file 2. read/write through a device 3. read only from a gpsd instance """ out = b'' ser = None input_is_device = False def __init__(self): """Initialize class""" Serial = serial Serial_v3 = Serial and Serial.VERSION.split('.')[0] >= '3' # buffer to hold read data self.out = b'' # open the input: device, file, or gpsd if opts['input_file_name'] is not None: # check if input file is a file or device try: mode = os.stat(opts['input_file_name']).st_mode except OSError: sys.stderr.write('%s: failed to open input file %s\n' % (PROG_NAME, opts['input_file_name'])) sys.exit(1) if stat.S_ISCHR(mode): # character device, need not be read only self.input_is_device = True if ((opts['disable'] or opts['enable'] or opts['poll'] or opts['oaf_name'])): # check that we can write if opts['read_only']: sys.stderr.write('%s: read-only mode, ' 'can not send commands\n' % PROG_NAME) sys.exit(1) if self.input_is_device is False: sys.stderr.write('%s: input is plain file, ' 'can not send commands\n' % PROG_NAME) sys.exit(1) if opts['target']['server'] is not None: # try to open local gpsd try: self.ser = gps.gpscommon(host=None) self.ser.connect(opts['target']['server'], opts['target']['port']) # alias self.ser.write() to self.write_gpsd() self.ser.write = self.write_gpsd # ask for raw, not rare, data data_out = b'?WATCH={' if opts['target']['device'] is not None: # add in the requested device data_out += (b'"device":"' + opts['target']['device'] + b'",') data_out += b'"enable":true,"raw":2}\r\n' if VERB_RAW <= opts['verbosity']: print("sent: ", data_out) self.ser.send(data_out) except socket.error as err: sys.stderr.write('%s: failed to connect to gpsd %s\n' % (PROG_NAME, err)) sys.exit(1) elif self.input_is_device: # configure the serial connections (the parameters refer to # the device you are connecting to) # pyserial Ver 3.0+ changes writeTimeout to write_timeout # Using the wrong one causes an error write_timeout_arg = ('write_timeout' if Serial_v3 else 'writeTimeout') try: self.ser = Serial.Serial( baudrate=opts['input_speed'], # 8N1 is UBX default bytesize=Serial.EIGHTBITS, parity=Serial.PARITY_NONE, port=opts['input_file_name'], stopbits=Serial.STOPBITS_ONE, # read timeout timeout=0.05, **{write_timeout_arg: 0.5} ) except AttributeError: sys.stderr.write('%s: failed to import pyserial\n' % PROG_NAME) sys.exit(2) except Serial.serialutil.SerialException: # this exception happens on bad serial port device name sys.stderr.write('%s: failed to open serial port "%s"\n' '%s: Your computer has the serial ports:\n' % (PROG_NAME, opts['input_file_name'], PROG_NAME)) # print out list of supported ports import serial.tools.list_ports as List_Ports ports = List_Ports.comports() for port in ports: sys.stderr.write(" %s: %s\n" % (port.device, port.description)) sys.exit(1) # flush input buffer, discarding all its contents # pyserial 3.0+ deprecates flushInput() in favor of # reset_input_buffer(), but flushInput() is still present. self.ser.flushInput() else: # Read from a plain file of UBX messages try: self.ser = open(opts['input_file_name'], 'rb') except IOError: sys.stderr.write('%s: failed to open input %s\n' % (PROG_NAME, opts['input_file_name'])) sys.exit(1) def read(self, read_opts): """Read from device, until timeout or expected message""" # are we expecting a certain message? if gps_model.expect_statement_identifier: # assume failure, until we see expected message ret_code = 1 else: # not expecting anything, so OK if we did not see it. ret_code = 0 try: if read_opts['target']['server'] is not None: # gpsd input start = time.clock() while read_opts['input_wait'] > (time.clock() - start): # First priority is to be sure the input buffer is read. # This is to prevent input buffer overuns if 0 < self.ser.waiting(): # We have serial input waiting, get it # No timeout possible # RTCM3 JSON can be over 4.4k long, so go big new_out = self.ser.sock.recv(8192) if raw is not None: # save to raw file raw.write(new_out) self.out += new_out consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if ((gps_model.expect_statement_identifier and (gps_model.expect_statement_identifier == gps_model.last_statement_identifier))): # Got what we were waiting for. Done? ret_code = 0 if not read_opts['input_forced_wait']: # Done break elif self.input_is_device: # input is a serial device start = time.clock() while read_opts['input_wait'] > (time.clock() - start): # First priority is to be sure the input buffer is read. # This is to prevent input buffer overuns # pyserial 3.0+ deprecates inWaiting() in favor of # in_waiting, but inWaiting() is still present. if 0 < self.ser.inWaiting(): # We have serial input waiting, get it # 1024 is comfortably large, almost always the # Read timeout is what causes ser.read() to return new_out = self.ser.read(1024) if raw is not None: # save to raw file raw.write(new_out) self.out += new_out consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if ((gps_model.expect_statement_identifier and (gps_model.expect_statement_identifier == gps_model.last_statement_identifier))): # Got what we were waiting for. Done? ret_code = 0 if not read_opts['input_forced_wait']: # Done break else: # ordinary file, so all read at once self.out += self.ser.read() if raw is not None: # save to raw file raw.write(self.out) while True: consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if 0 >= consumed: break except IOError: # This happens on a good device name, but gpsd already running. # or if USB device unplugged sys.stderr.write('%s: failed to read %s\n' '%s: Is gpsd already holding the port?\n' % (PROG_NAME, read_opts['input_file_name'], PROG_NAME)) return 1 if 0 < ret_code: # did not see the message we were expecting to see sys.stderr.write('%s: waited %0.2f seconds for, ' 'but did not get: %%%s%%\n' % (PROG_NAME, read_opts['input_wait'], gps_model.expect_statement_identifier)) return ret_code def write_gpsd(self, data): """write data to gpsd daemon""" # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. # Input data is binary, converting to hex doubles its size. # Limit binary data to length 255, so hex data length less than 510. if 255 < len(data): sys.stderr.write('%s: trying to send %d bytes, max is 255\n' % (PROG_NAME, len(data))) return 1 if opts['target']['device'] is not None: # add in the requested device data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' else: data_out = b'?DEVICE={' # Convert binary data to hex and build the message. data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' if VERB_RAW <= opts['verbosity']: print("sent: ", data_out) self.ser.send(data_out) return 0 # instantiate the GPS class gps_model = ubx() def usage(): """Ouput usage information, and exit""" print('usage: %s [-c C] [-f F] [-r] [-p P] [-s S] [-v V]\n' ' [-hV?] [-S S]\n' ' -c C send raw command C (cls,id...) to GPS\n' ' -d D disable D\n' ' -e E enable E\n' ' -f F open F as file/device\n' ' default: %s\n' ' -g G get config item G\n' ' -h print help, increase -v for extra help\n' ' -i I interface (port) for UBX-CFG-PRT\n' ' -m M optional mode to -p P\n' ' -P P Protocol version for sending commands\n' ' default: %s\n' ' -p P send a prepackaged query P to GPS\n' ' -R R save raw data from GPS in file R\n' ' default: %s\n' ' -r open file/device read only\n' ' -S S set GPS speed to S\n' ' -s S set port speed to S\n' ' default: %s bps\n' ' -w W wait time W before exiting\n' ' default: %s seconds\n' ' -V print version\n' ' -v V Set verbosity level to V, 0 to 4\n' ' default: %s\n' ' -x X delete config item X\n' ' -z Z,z set config item Z to z\n' ' -? print help\n' '\n' % (PROG_NAME, opts['input_file_name'], opts['protver'], opts['raw_file'], opts['input_speed'], opts['input_wait'], opts['verbosity']) ) if VERB_DECODE <= opts['verbosity']: print('D and E can be one of:') for item in sorted(gps_model.able_commands.keys()): print(" %-13s %s" % (item, gps_model.able_commands[item]["help"])) print('\nP can be one of:') for item in sorted(gps_model.commands.keys()): print(" %-13s %s" % (item, gps_model.commands[item]["help"])) print('\n') if VERB_DECODE < opts['verbosity']: print('\nConfiguration items for -g, -x and -z can be one of:') for item in sorted(gps_model.cfgs): print(" %s\n" " %s" % (item[0], item[5])) print('\n') print('Options can be placed in the UBXOPTS environment variable.\n' 'UBXOPTS is processed before the CLI options.') sys.exit(0) if 'UBXOPTS' in os.environ: # grab the UBXOPTS environment variable for options opts['progopts'] = os.environ['UBXOPTS'] options = opts['progopts'].split(' ') + sys.argv[1:] else: options = sys.argv[1:] try: (options, arguments) = getopt.getopt(options, "?c:d:e:f:g:hi:m:rP:p:" "s:w:v:R:S:Vx:z:") except getopt.GetoptError as err: sys.stderr.write("%s: %s\n" "Try '%s -h' for more information.\n" % (PROG_NAME, str(err), PROG_NAME)) sys.exit(2) for (opt, val) in options: if opt == '-c': opts['command'] = val elif opt == '-d': opts['disable'] = val elif opt == '-e': opts['enable'] = val elif opt == '-f': opts['input_file_name'] = val elif opt == '-g': opts['get_item'].append(val) elif opt == '-h' or opt == '-?': opts['help'] = True elif opt == '-i': valnum = gps_model.port_id_map.get(val.upper()) opts['interface'] = valnum if valnum is not None else int(val) elif opt == '-m': opts['mode'] = int(val) elif opt == '-P': opts['protver'] = int(val) if 10 > opts['protver']: opts['protver'] = 10 if 27 < opts['protver']: opts['protver'] = 27 elif opt == '-p': opts['poll'] = val elif opt == '-r': opts['read_only'] = True elif opt == '-s': try: opts['input_speed'] = int(val) except ValueError: sys.stderr.write('%s: -s invalid speed %s\n' % (PROG_NAME, val)) sys.exit(1) if opts['input_speed'] not in gps_model.speeds: sys.stderr.write('%s: -s invalid speed %s\n' % (PROG_NAME, opts['input_speed'])) sys.exit(1) elif opt == '-w': opts['input_wait'] = float(val) elif opt in '-v': opts['verbosity'] = int(val) elif opt in '-R': # raw log file opts['raw_file'] = val elif opt in '-S': opts['set_speed'] = int(val) if opts['set_speed'] not in gps_model.speeds: sys.stderr.write('%s: -S invalid speed %s\n' % (PROG_NAME, opts['set_speed'])) sys.exit(1) elif opt == '-V': # version sys.stderr.write('%s: Version %s\n' % (PROG_NAME, gps_version)) sys.exit(0) elif opt == '-x': opts['del_item'].append(val) elif opt == '-z': opts['set_item'].append(val) if opts['help']: usage() if opts['input_file_name'] is None: # no input file given # default to local gpsd opts['target']['server'] = "localhost" opts['target']['port'] = gps.GPSD_PORT opts['target']['device'] = None if arguments: # server[:port[:device]] parts = arguments[0].split(':') opts['target']['server'] = parts[0] if 1 < len(parts): opts['target']['port'] = parts[1] if 2 < len(parts): opts['target']['device'] = parts[2] elif arguments: sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) sys.exit(1) if VERB_PROG <= opts['verbosity']: # dump all options print('Options:') for option in sorted(opts): print(" %s: %s" % (option, opts[option])) # done parsing arguments from environment and CLI try: # raw log file requested? raw = None if opts['raw_file']: try: raw = open(opts['raw_file'], 'w') except IOError: sys.stderr.write('%s: failed to open raw file %s\n' % (PROG_NAME, opts['raw_file'])) sys.exit(1) # create the I/O instance io_handle = gps_io() sys.stdout.flush() if opts['disable'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) if opts['disable'] in gps_model.able_commands: command = gps_model.able_commands[opts['disable']] command["command"](gps, 0) else: sys.stderr.write('%s: disable %s not found\n' % (PROG_NAME, opts['disable'])) sys.exit(1) elif opts['enable'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) if opts['enable'] in gps_model.able_commands: command = gps_model.able_commands[opts['enable']] command["command"](gps, 1) else: sys.stderr.write('%s: enable %s not found\n' % (PROG_NAME, opts['enable'])) sys.exit(1) elif opts['poll'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) if 'MODEL' == opts["poll"]: if opts["mode"] is None: opts["mode"] = 0 # default to portable model if opts['poll'] in gps_model.commands: command = gps_model.commands[opts['poll']] if 'opt' in command: command["command"](gps, command["opt"]) else: command["command"](gps) else: sys.stderr.write('%s: poll %s not found\n' % (PROG_NAME, opts['poll'])) sys.exit(1) elif opts['set_speed'] is not None: gps_model.send_set_speed(opts['set_speed']) elif opts['command'] is not None: cmd_list = opts['command'].split(',') try: cmd_data = [int(v, 16) for v in cmd_list] except ValueError: badarg = True else: data_or = reduce(operator.or_, cmd_data) badarg = data_or != data_or & 0xFF if badarg or len(cmd_list) < 2: sys.stderr.write('%s: Argument format (hex bytes) is' ' class,id[,payload...]\n' % PROG_NAME) sys.exit(1) payload = bytearray(cmd_data[2:]) if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) gps_model.gps_send(cmd_data[0], cmd_data[1], payload) elif opts['del_item']: keys = [] for name in opts['del_item']: item = gps_model.cfg_by_name(name) if item: keys.append(item[1]) else: sys.stderr.write('%s: ERROR: item %s unknown\n' % (PROG_NAME, opts['del_item'])) exit(1) gps_model.send_cfg_valdel(keys) elif opts['get_item']: keys = [] for name in opts['get_item']: item = gps_model.cfg_by_name(name) if item: keys.append(item[1]) else: sys.stderr.write('%s: ERROR: item %s unknown\n' % (PROG_NAME, name)) exit(1) gps_model.send_cfg_valget(keys) elif opts['set_item']: nvs = [] for nv in opts['set_item']: (name, val) = nv.split(',') item = gps_model.cfg_by_name(name) if item: nvs.append(nv) else: sys.stderr.write('%s: ERROR: item %s unknown\n' % (PROG_NAME, opts['set_item'])) exit(1) gps_model.send_cfg_valset(nvs) exit_code = io_handle.read(opts) if ((VERB_RAW <= opts['verbosity']) and io_handle.out): # dump raw left overs print("Left over data:") print(io_handle.out) sys.stdout.flush() io_handle.ser.close() except KeyboardInterrupt: print('') exit_code = 1 sys.exit(exit_code)