#!/usr/bin/env python # -*- coding: UTF-8 ''' ubxtool -- u-blox configurator and packet decoder usage: ubxtool [OPTIONS] [server[:port[:device]]] ''' # This file is Copyright (c) 2018 by the GPSD project # BSD terms apply: see the file COPYING in the distribution root for details. # # This code runs compatibly under Python 2 and 3.x for x >= 2. # Preserve this property! # # ENVIRONMENT: # Options in the UBXOPTS environment variable will be parsed before # the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' # # To see what constellations are enabled: # ubxtool -p GNSS -f /dev/ttyXX # # To disable GALILEO and enable GALILEO: # ubxtool -d GLONASS -f /dev/ttyXX # ubxtool -e GALILEO -f /dev/ttyXX # # To read GPS messages a log file: # ubxtool -v 2 -f test/daemon/ublox-neo-m8n.log from __future__ import absolute_import, print_function, division import binascii # for binascii.hexlify() import getopt # for getopt.getopt(), to parse CLI options import os # for os.environ import socket # for socket.error import stat # for stat.S_ISBLK() import struct # for pack() import sys import time PROG_NAME = 'ubxtool' try: import serial except ImportError: # treat serial as special since it is not part of standard Python sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME) sys.exit(2) try: import gps except ImportError: # PEP8 says local imports last sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % PROG_NAME) sys.exit(2) gps_version = '3.18' if gps.__version__ != gps_version: sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % (PROG_NAME, gps_version, gps.__version__)) sys.exit(1) VERB_QUIET = 0 # quiet VERB_NONE = 1 # just output requested data and some info VERB_DECODE = 2 # decode all messages VERB_INFO = 3 # more info VERB_RAW = 4 # raw info VERB_PROG = 5 # program trace # dictionary to hold all user options opts = { # command to send to GPS, -c 'command': None, # command for -d disable 'disable': None, # command for -e enable 'enable': None, # default input -f file 'input_file_name': None, # default forced wait? -W 'input_forced_wait': False, # default port speed -s 'input_speed': 9600, # default input wait time -w in seconds 'input_wait': 2.0, # optional mode to -p P 'mode': None, # the name of an OAF file, extension .jpo 'oaf_name': None, # poll command -p 'poll': None, # raw log file name 'raw_file': None, # open port read only -r 'read_only': False, # speed to set GPS -S 'set_speed': None, # target gpsd (server:port:device) to connect to 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, # verbosity level, -v 'verbosity': VERB_NONE, # contents of environment variable UBXOPTS 'progopts': '', } class ubx(object): "class to hold u-blox stuff" # when a statement identifier is received, it is stored here last_statement_identifier = None # expected statement identifier. expect_statement_identifier = False def __init__(self): pass # allowable speeds speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600, 4800, 2400, 1200, 600, 300) # UBX Satellite Numbering gnss_id = {0: 'GPS', 1: 'SBAS', 2: 'Galileo', 3: 'BeiDou', 4: 'IMES', 5: 'QZSS', 6: 'GLONASS'} def ack_ack(self, buf): "UBX-ACK-ACK decode" m_len = len(buf) if 2 > m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from('> 5) & 0x1f, (u[1] >> 10) & 0x1f, u[1] >> 15)) return s def cfg_cfg_mask(self, mask): "decode Mask in UBX-CFG-CFG, return string" s = '' if mask & 0x1: s += 'ioPort ' if mask & 0x2: s += 'msgConf ' if mask & 0x4: s += 'infMsg ' if mask & 0x8: s += 'navConf ' if mask & 0x10: s += 'rxmConf ' if mask & 0x80: # not on M8030 s += 'senConf ' if mask & 0x100: s += 'rinvConf ' if mask & 0x200: s += 'antConf ' if mask & 0x800: s += 'logConf ' if mask & 0x1000: s += 'ftsConf ' return s def cfg_cfg(self, buf): "UBX-CFG-CFG decode" m_len = len(buf) if 12 > m_len: return "Bad Length %s" % m_len if 12 == m_len: u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len values = {0: "Full power", 1: "Balanced", 2: "Interval", 3: "Aggresive with 1Hz", 4: "Aggresive with 2Hz", 5: "Aggresive with 4Hz", 0xff: "Invalid" } u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 7) & 0x0f gridToGpsDec = ('UTC', 'GPS', 'Glonass', 'BeiDou') syncMode = (u[10] >> 11) & 0x03 s += "gridToGps %s, syncMode %d " % (gridToGpsDec[gridToGps], syncMode) return s def cfg_usb(self, buf): "UBX-CFG-USB decode" m_len = len(buf) if 0 == m_len: return " Poll request" if 108 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len substr = buf.split('\0')[0] s = ' swVersion: %s\n' % (substr) substr = buf[30:39] substr = substr.split('\0')[0] s += ' hwVersion: %s' % (substr[30:39]) # extensions?? num_ext = int((m_len - 40) / 30) i = 0 while i < num_ext: loc = 40 + (i * 30) substr = buf[loc:] substr = substr.split('\0')[0] s += '\n extension: %s' % (substr) i += 1 return s mon_ids = {2: {'str': 'IO', 'name': 'UBX-MON-IO'}, 4: {'str': 'VER', 'dec': mon_ver, 'name': 'UBX-MON-VER'}, 6: {'str': 'MSGPP', 'name': 'UBX-MON-MSGPP'}, 7: {'str': 'RXBUF', 'name': 'UBX-MON-RXBUF'}, 8: {'str': 'TXBUF', 'name': 'UBX-MON-TXBUF'}, 9: {'str': 'HW', 'name': 'UBX-MON-HW'}, 0x0b: {'str': 'HW2', 'name': 'UBX-MON-HW2'}, 0x21: {'str': 'RXR', 'name': 'UBX-MON-RXR'}, 0x27: {'str': 'PATCH', 'name': 'UBX-MON-PATCH'}, 0x28: {'str': 'GNSS', 'name': 'UBX-MON-GNSS'}, 0x2e: {'str': 'SMGR', 'name': 'UBX-MON-SMGR'}, } def nav_clock(self, buf): "UBX-NAV-CLOCK decode, Clock Solution" m_len = len(buf) if 0 == m_len: return " Poll request" if 20 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 4)) if 0x40 & u[6]: s += 'diffCorr ' if 0x80 & u[6]: s += 'smoothed ' s += 'orbitSource %u ' % (0x07 & (u[6] >> 8)) if 0x800 & u[6]: s += 'ephAvail ' if 0x1000 & u[6]: s += 'almAvail ' if 0x2000 & u[6]: s += 'anoAvail ' if 0x4000 & u[6]: s += 'aopAvail ' if 0x730000 & u[6]: s += '\n' if 0x10000 & u[6]: s += 'sbasCorrused ' if 0x20000 & u[6]: s += 'rtcmCorrused ' if 0x1000000 & u[6]: s += 'prCorrused ' if 0x2000000 & u[6]: s += 'crCorrused ' if 0x4000000 & u[6]: s += 'doCorrused ' m_len -= 12 i += 1 return s def nav_sbas(self, buf): "UBX-NAV-SBAS decode" m_len = len(buf) if 0 == m_len: return " Poll request" if 12 > m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return "Bad Length %s" % m_len u = struct.unpack_from(' m_len: return ".Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from(' m_len: return " Bad Length %s" % m_len u = struct.unpack_from('> 2) & 0x03 if 0 == raim: s += "RAIM not available" elif 1 == raim: s += "RAIM not active" elif 2 == raim: s += "RAIM active" else: s += "RAIM ??" return s tim_ids = {1: {'str': 'TP', 'dec': tim_tp, 'name': 'UBX-TIM-TP'}, 3: {'str': 'TM2', 'name': 'UBX-TIM-TM2'}, 4: {'str': 'SVIN', 'dec': tim_svin, 'name': 'UBX-TIM-SVIN'}, 6: {'str': 'VRFY', 'name': 'UBX-TIM-VRFY'}, 0x11: {'str': 'DOSC', 'name': 'UBX-TIM-DOSC'}, 0x12: {'str': 'TOS', 'name': 'UBX-TIM-TOS'}, 0x13: {'str': 'SMEAS', 'name': 'UBX-TIM-SMEAS'}, 0x15: {'str': 'VCOCAL', 'name': 'UBX-TIM-VCOCAL'}, 0x16: {'str': 'FCHG', 'name': 'UBX-TIM-FCHG'}, 0x17: {'str': 'HOC', 'name': 'UBX-TIM-HOC'}, } classes = { 0x01: {'str': 'NAV', 'ids': nav_ids}, 0x02: {'str': 'RXM', 'ids': rxm_ids}, 0x04: {'str': 'INF', 'ids': inf_ids}, 0x05: {'str': 'ACK', 'ids': ack_ids}, 0x06: {'str': 'CFG', 'ids': cfg_ids}, 0x09: {'str': 'UPD'}, 0x0A: {'str': 'MON', 'ids': mon_ids}, 0x0B: {'str': 'ATD'}, 0x0D: {'str': 'TIM', 'ids': tim_ids}, 0x10: {'str': 'ESF'}, 0x13: {'str': 'MGA'}, 0x21: {'str': 'LOG'} } def class_id_s(self, m_class, m_id): "Return class and ID numbers as a string." s = 'Class: ' if m_class not in self.classes: s += '%#x ID: %#x' % (m_class, m_id) return s if 'str' in self.classes[m_class]: s_class = self.classes[m_class]['str'] s += '%s(%#x) ' % (s_class, m_class) else: s += '%#x ' % (m_class) if (('ids' in self.classes[m_class] and m_id in self.classes[m_class]['ids'] and 'str' in self.classes[m_class]['ids'][m_id])): s_id = self.classes[m_class]['ids'][m_id]['str'] s += 'ID: %s(%#x)' % (s_id, m_id) else: s += 'ID: %#x' % (m_id) return s def decode_msg(self, out): "Decode one message and then return number of chars consumed" state = 'BASE' consumed = 0 # decode state machine for this_byte in out: consumed += 1 if isinstance(this_byte, str): # a character, probably read from a file c = ord(this_byte) else: # a byte, probably read from a serial port c = int(this_byte) if VERB_RAW <= opts['verbosity']: if (ord(' ') <= c) and (ord('~') >= c): # c is printable print("state: %s char %c (%#x)" % (state, chr(c), c)) else: # c is not printable print("state: %s char %#x" % (state, c)) if 'BASE' == state: # start fresh # place to store 'comments' comment = '' m_class = 0 m_id = 0 m_len = 0 m_raw = bytearray(0) # class, id, len, payload m_payload = bytearray(0) # just the payload m_ck_a = 0 m_ck_b = 0 if 0xb5 == c: # got header 1, mu state = 'HEADER1' if ord('$') == c: # got $, so NMEA? state = 'NMEA' comment = '$' if ord("{") == c: # JSON, treat as comment line state = 'JSON' # start fresh comment = "{" continue if ord("#") == c: # comment line state = 'COMMENT' # start fresh comment = "#" continue if (ord('\n') == c) or (ord('\r') == c): # CR or LF, leftovers return 1 continue if state in ('COMMENT', 'JSON'): # inside comment if ord('\n') == c or ord('\r') == c: # Got newline or linefeed # terminate messages on or # Done, got a full message if gps.polystr('{"class":"ERROR"') in comment: # always print gpsd errors print(comment) elif VERB_DECODE <= opts['verbosity']: print(comment) return consumed else: comment += chr(c) continue if 'NMEA' == state: # getting NMEA payload if (ord('\n') == c) or (ord('\r') == c): # CR or LF, done, got a full message # terminates messages on or if VERB_DECODE <= opts['verbosity']: print(comment + '\n') return consumed else: comment += chr(c) continue if ord('b') == c and 'HEADER1' == state: # got header 2 state = 'HEADER2' continue if 'HEADER2' == state: # got class state = 'CLASS' m_class = c m_raw.extend([c]) continue if 'CLASS' == state: # got ID state = 'ID' m_id = c m_raw.extend([c]) continue if 'ID' == state: # got first length state = 'LEN1' m_len = c m_raw.extend([c]) continue if 'LEN1' == state: # got second length m_raw.extend([c]) m_len += 256 * c if 0 == m_len: # no payload state = 'CSUM1' else: state = 'PAYLOAD' continue if 'PAYLOAD' == state: # getting payload m_raw.extend([c]) m_payload.extend([c]) if len(m_payload) == m_len: state = 'CSUM1' continue if 'CSUM1' == state: # got ck_a state = 'CSUM2' m_ck_a = c continue if 'CSUM2' == state: # ck_b state = 'BASE' m_ck_b = c # check checksum chk = self.checksum(m_raw, len(m_raw)) if (chk[0] != m_ck_a) or (chk[1] != m_ck_b): sys.stderr.write("%s: ERROR checksum failed," "was (%d,%d) s/b (%d, %d)\n" % (PROG_NAME, m_ck_a, m_ck_b, chk[0], chk[1])) s_payload = ''.join('{:02x} '.format(x) for x in m_payload) x_payload = binascii.hexlify(m_payload) if m_class in self.classes: this_class = self.classes[m_class] if 'ids' in this_class: if m_id in this_class['ids']: if 'dec' in this_class['ids'][m_id]: dec = this_class['ids'][m_id]['dec'] s_payload = this_class['ids'][m_id]['name'] s_payload += ':\n' s_payload += dec(self, m_payload) else: s_payload = x_payload if VERB_INFO < opts['verbosity']: print("%s, len: %#x" % (self.class_id_s(m_class, m_id), m_len)) if VERB_RAW < opts['verbosity']: print("payload: %s" % x_payload) print("%s\n" % s_payload) return consumed # give up state = 'BASE' # fell out of loop, no more chars to look at return 0 def checksum(self, msg, m_len): "Calculate u-blox message checksum" # the checksum is calculated over the Message, starting and including # the CLASS field, up until, but excluding, the Checksum Field: ck_a = 0 ck_b = 0 for c in msg[0:m_len]: ck_a += c ck_b += ck_a return [ck_a & 0xff, ck_b & 0xff] def make_pkt(self, m_class, m_id, m_data): "Make a message packet" # always little endian, leader, class, id, length m_len = len(m_data) # build core message msg = bytearray(m_len + 6) struct.pack_into('>= 8 m_data[21] = seconds & 0x0ff seconds >>= 8 m_data[22] = seconds & 0x0ff seconds >>= 8 m_data[23] = seconds & 0x0ff # Survey-in position accuracy limit 500 mm mmeters = 500 m_data[24] = mmeters & 0x0ff mmeters >>= 8 m_data[25] = mmeters & 0x0ff mmeters >>= 8 m_data[26] = seconds & 0x0ff seconds >>= 8 m_data[27] = mmeters & 0x0ff gps_model.gps_send(6, 0x3d, m_data) def send_able_tp(self, able): "dis/enable UBX-TIM-TP Time Pulse" rate = 1 if able else 0 m_data = bytearray([0xd, 0x1, rate]) gps_model.gps_send(6, 1, m_data) def send_cfg_ant(self): "UBX-CFG-ANT, Get Antenna Control Settings" m_data = bytearray(0) gps_model.gps_send(6, 0x13, m_data) def send_cfg_cfg(self, save_clear): "UBX-CFG-CFG, save config" # Save: save_clear = 0 # Clear: save_clear = 1 # basic configs alays available to change: # ioPort, msgConf, infMsg, navConf, rxmConf, senConf cfg1 = 0x9f # rinvConf, antConf, logConf cfg2 = 0x03 m_data = bytearray(13) # clear if 0 == save_clear: # saving m_data[0] = 0 m_data[1] = 0 else: # clearing m_data[0] = cfg1 m_data[1] = cfg2 m_data[2] = 0 # m_data[3] = 0 # # save if 0 == type: # saving m_data[4] = cfg1 m_data[5] = cfg2 else: # clearing m_data[4] = 0 m_data[5] = 0 m_data[6] = 0 # m_data[7] = 0 # # load if False and 0 == type: # saving m_data[8] = 0 m_data[9] = 0 else: # clearing, load it to save a reboot m_data[8] = cfg1 m_data[9] = cfg2 m_data[10] = 0 # m_data[11] = 0 # # deviceMask, where to save it, try all options m_data[12] = 0x17 # devBBR, devFLASH devEEPROM, devSpiFlash gps_model.gps_send(6, 0x9, m_data) def send_cfg_gnss1(self, gnssId, enable): "UBX-CFG-GNSS, set GNSS config" m_data = bytearray(12) m_data[0] = 0 # version 0, msgVer m_data[1] = 0 # read only, numTrkChHw m_data[2] = 0xFF # read only, numTrkChUse m_data[3] = 1 # 1 block follows # block 1 m_data[4] = gnssId # gnssId if 0 == gnssId: # GPS m_data[5] = 8 # resTrkCh m_data[6] = 16 # maxTrkCh if 1 == gnssId: # SBAS m_data[5] = 1 # resTrkCh m_data[6] = 3 # maxTrkCh if 2 == gnssId: # GALILEO m_data[5] = 4 # resTrkCh m_data[6] = 8 # maxTrkCh if 3 == gnssId: # BeiDou m_data[5] = 2 # resTrkCh m_data[6] = 16 # maxTrkCh if 4 == gnssId: # IMES m_data[5] = 0 # resTrkCh m_data[6] = 8 # maxTrkCh if 5 == gnssId: # QZSS m_data[5] = 0 # resTrkCh m_data[6] = 3 # maxTrkCh if 6 == gnssId: # GLONASS m_data[5] = 8 # resTrkCh m_data[6] = 14 # maxTrkCh m_data[7] = 0 # reserved1 m_data[8] = enable # flags m_data[9] = 0 # flagflags, unused if 5 == gnssId: # QZSS m_data[10] = 5 # flags E1OS, L1SAIF else: m_data[10] = 1 # flags E1OS m_data[11] = 1 # flags, unused gps_model.gps_send(6, 0x3e, m_data) def send_cfg_gnss(self): "UBX-CFG-GNSS, set Galileo config decode" m_data = bytearray(0) gps_model.gps_send(6, 0x3e, m_data) def send_cfg_nav5_model(self): "UBX-CFG-NAV5, set dynamic platform model" m_data = bytearray(36) m_data[0] = 1 # just setting dynamic model m_data[1] = 0 # just setting dynamic model m_data[2] = opts["mode"] gps_model.gps_send(6, 0x24, m_data) def send_cfg_msg(self, m_class, m_id, rate=None): "UBX-CFG-MSG, poll, or set, message rates decode" m_data = bytearray(2) m_data[0] = m_class m_data[1] = m_id if rate is not None: m_data.extend([rate]) gps_model.gps_send(6, 1, m_data) def send_cfg_nav5(self): "UBX-CFG-NAV5, get Nav Engine Settings" m_data = bytearray(0) gps_model.gps_send(6, 0x24, m_data) def send_cfg_pms(self): "UBX-CFG-PMS, Get Power Management Settings" if opts["mode"] is not None: m_data = bytearray(8) # set powerSetupValue to mode m_data[1] = opts["mode"] # leave period and onTime zero, which breaks powerSetupValue = 3 else: m_data = bytearray(0) gps_model.gps_send(6, 0x86, m_data) def send_cfg_prt(self): "UBX-CFG-PRT, get I/O Port" m_data = bytearray(0) gps_model.gps_send(6, 0x0, m_data) def send_set_speed(self, speed): "UBX-CFG-PRT, set port" # FIXME! Poll current masks, then adjust speed m_data = bytearray(20) m_data[0] = 1 # port 1, UART 1 # m_data[0] = 3 # port 3, USB m_data[4] = 0xc0 # 8N1 m_data[5] = 0x8 # 8N1 m_data[8] = speed & 0xff m_data[9] = (speed >> 8) & 0xff m_data[10] = (speed >> 16) & 0xff m_data[11] = (speed >> 24) & 0xff m_data[12] = 3 # in, ubx and nmea m_data[14] = 3 # out, ubx and nmea gps_model.gps_send(6, 0, m_data) def send_cfg_rst(self, reset_type): "UBX-CFG-RST, reset" # always do a hardware reset # if on USB, this will disconnect and reconnect, giving you # a new tty. m_data = bytearray(4) m_data[0] = reset_type & 0xff m_data[1] = (reset_type >> 8) & 0xff gps_model.gps_send(6, 0x4, m_data) def send_cfg_tmode2(self): "UBX-CFG-TMODE2, get time mode 2 configuration" m_data = bytearray(0) gps_model.gps_send(6, 0x3d, m_data) def send_cfg_tp5(self): "UBX-CFG-TP5, get time0 decodes, timepulse 0 and 1" m_data = bytearray(0) gps_model.gps_send(6, 0x31, m_data) # and timepulse 1 m_data = bytearray(1) m_data[0] = 1 gps_model.gps_send(6, 0x31, m_data) def send_cfg_usb(self): "UBX-CFG-USB, get USB configuration" m_data = bytearray(0) gps_model.gps_send(6, 0x1b, m_data) def send_mon_ver(self): "UBX-MON-VER get versions" m_data = bytearray(0) gps_model.gps_send(0x0a, 0x04, m_data) def send_nav_posecef(self): "UBX-NAV-POSECEF, poll ECEF position" m_data = bytearray(0) gps_model.gps_send(1, 1, m_data) def send_nav_velecef(self): "UBX-NAV-VELECEF, poll ECEF velocity decode" m_data = bytearray(0) gps_model.gps_send(1, 0x11, m_data) def send_tim_svin(self): "UBX-TIM-SVIN, get survey in data" m_data = bytearray(0) gps_model.gps_send(0x0d, 0x04, m_data) def send_tim_tp(self): "UBX-TIM-TP, get time pulse timedata" m_data = bytearray(0) gps_model.gps_send(0x0d, 0x01, m_data) able_commands = { # en/dis able BeiDou "BEIDOU": {"command": send_able_beidou, "help": "BeiDou"}, # en/dis able basic binary messages "BINARY": {"command": send_able_binary, "help": "basic binary messages"}, # en/dis able ECEF "ECEF": {"command": send_able_ecef, "help": "ECEF"}, # en/dis able GPS "GPS": {"command": send_able_gps, "help": "GPS and QZSS"}, # en/dis able GALILEO "GALILEO": {"command": send_able_galileo, "help": "GALILEO"}, # en/dis able GLONASS "GLONASS": {"command": send_able_glonass, "help": "GLONASS"}, # en/dis able basic NMEA messages "NMEA": {"command": send_able_nmea, "help": "basic NMEA messages"}, # en/dis able RAWX "RAWX": {"command": send_able_rawx, "help": "RAWX measurements"}, # en/dis able SBAS "SBAS": {"command": send_able_sbas, "help": "SBAS"}, # en/dis able TP time pulse message "TP": {"command": send_able_tp, "help": "TP Time Pulse message"}, # en/dis able TMODE2 Survey-in "TMODE2": {"command": send_able_tmode2, "help": "TMODE2"}, } commands = { # UBX-CFG-ANT, poll antenna config decode "ANT": {"command": send_cfg_ant, "help": "UBX-CFG-ANT poll antenna config"}, # UBX-CFG-RST cold boot "COLDBOOT": {"command": send_cfg_rst, "help": "UBS-CFG-RST coldboot the GPS", "opt": 0xfff}, # UBX-CFG-GNSS poll gnss config "GNSS": {"command": send_cfg_gnss, "help": "UBX-CFG-GNSS poll GNSS config"}, # UBX-CFG-RST hot boot "HOTBOOT": {"command": send_cfg_rst, "help": "UBX-CFG-RST hotboot the GPS", "opt": 0}, # UBX-CFG-NAV5 set Dynamic Platform Model "MODEL": {"command": send_cfg_nav5_model, "help": "UBX-CFG-NAV5 set Dynamic Platform Model"}, # UBX-CFG-NAV5, poll Nav Engine Settings "NAV5": {"command": send_cfg_nav5, "help": "UBX-CFG-NAV5 poll Nav Engines settings"}, # UBX-CFG-PMS, poll power management settings "PMS": {"command": send_cfg_pms, "help": "UBX-CFG-PMS poll power management settings"}, # UBX-CFG-PRT, poll I/O port number "PRT": {"command": send_cfg_prt, "help": "UBX-CFG-PRT poll I/O port settings"}, # UBX-CFG-CFG reset config "RESET": {"command": send_cfg_cfg, "help": "UBX-CFG-CFG reset config to defaults", "opt": 1}, # UBX-CFG-CFG save config "SAVE": {"command": send_cfg_cfg, "help": "UBX-CFG-CFG save current config", "opt": 0}, # UBX-TIM-SVIN, get survey in data "SVIN": {"command": send_tim_svin, "help": "UBX-TIM-SVIN get survey in data"}, # UBX-CFG-TMODE2, get time mode 2 config "TMODE2": {"command": send_cfg_tmode2, "help": "UBX-CFG-TMODE2 get time mode 2 config"}, # UBX-TIM-TP, get time pulse timedata "TP": {"command": send_tim_tp, "help": "UBX-TIM-TP get time pulse timedata"}, # UBX-CFG-TP5, get time0 decodes "TP5": {"command": send_cfg_tp5, "help": "UBX-TIM-TP5 get time pulse decodes"}, # UBX-CFG-RST warm boot "WARMBOOT": {"command": send_cfg_rst, "help": "UBX-CFG-RST warmboot the GPS", "opt": 1}, # poll UBX-CFG-USB "USB": {"command": send_cfg_usb, "help": "UBX-CFG-USB get USB config"}, # poll UBX-MON-VER "VER": {"command": send_mon_ver, "help": "UBX-MON-VER get GPS version"}, } # end class ubx class gps_io(object): """All the GPS I/O in one place" Three types of GPS I/O 1. read only from a file 2. read/write through a device 3. read only from a gpsd instance """ out = b'' ser = None input_is_device = False def __init__(self, serial_class): "Initialize class" Serial = serial_class # buffer to hold read data self.out = b'' # open the input: device, file, or gpsd if opts['input_file_name'] is not None: # check if input file is a file or device try: mode = os.stat(opts['input_file_name']).st_mode except OSError: sys.stderr.write('%s: failed to open input file %s\n' % (PROG_NAME, opts['input_file_name'])) sys.exit(1) if stat.S_ISCHR(mode): # character device, need not be read only self.input_is_device = True if ((opts['disable'] or opts['enable'] or opts['poll'] or opts['oaf_name'])): # check that we can write if opts['read_only']: sys.stderr.write('%s: read-only mode, ' 'can not send commands\n' % PROG_NAME) sys.exit(1) if self.input_is_device is False: sys.stderr.write('%s: input is plain file, ' 'can not send commands\n' % PROG_NAME) sys.exit(1) if opts['target']['server'] is not None: # try to open local gpsd try: self.ser = gps.gpscommon(host=None) self.ser.connect(opts['target']['server'], opts['target']['port']) # alias self.ser.write() to self.write_gpsd() self.ser.write = self.write_gpsd # ask for raw, not rare, data data_out = b'?WATCH={' if opts['target']['device'] is not None: # add in the requested device data_out += (b'"device":"' + opts['target']['device'] + b'",') data_out += b'"enable":true,"raw":2}\r\n' if VERB_RAW <= opts['verbosity']: print("sent: ", data_out) self.ser.send(data_out) except socket.error as err: sys.stderr.write('%s: failed to connect to gpsd %s\n' % (PROG_NAME, err)) sys.exit(1) elif self.input_is_device: # configure the serial connections (the parameters refer to # the device you are connecting to) try: self.ser = Serial.Serial( baudrate=opts['input_speed'], # 8N1 is GREIS default bytesize=Serial.EIGHTBITS, parity=Serial.PARITY_NONE, port=opts['input_file_name'], stopbits=Serial.STOPBITS_ONE, # read timeout timeout=0.05, # pyserial Ver 3.0+ changes writeTimeout to write_timeout # just set both write_timeout=0.5, writeTimeout=0.5, ) except Serial.serialutil.SerialException: # this exception happens on bad serial port device name sys.stderr.write('%s: failed to open serial port "%s"\n' '%s: Your computer has the serial ports:\n' % (PROG_NAME, opts['input_file_name'], PROG_NAME)) # print out list of supported ports import serial.tools.list_ports as List_Ports ports = List_Ports.comports() for port in ports: sys.stderr.write(" %s: %s\n" % (port.device, port.description)) sys.exit(1) # flush input buffer, discarding all its contents self.ser.flushInput() else: # Read from a plain file of GREIS messages try: self.ser = open(opts['input_file_name'], 'rb') except IOError: sys.stderr.write('%s: failed to open input %s\n' % (PROG_NAME, opts['input_file_name'])) sys.exit(1) def read(self, read_opts): "Read from device, until timeout or expected message" # are we expecting a certain message? if gps_model.expect_statement_identifier: # assume failure, until we see expected message ret_code = 1 else: # not expecting anything, so OK if we did not see it. ret_code = 0 try: if read_opts['target']['server'] is not None: # gpsd input start = time.clock() while read_opts['input_wait'] > (time.clock() - start): # First priority is to be sure the input buffer is read. # This is to prevent input buffer overuns if 0 < self.ser.waiting(): # We have serial input waiting, get it # No timeout possible # RTCM3 JSON can be over 4.4k long, so go big new_out = self.ser.sock.recv(8192) if raw is not None: # save to raw file raw.write(new_out) self.out += new_out consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if ((gps_model.expect_statement_identifier and (gps_model.expect_statement_identifier == gps_model.last_statement_identifier))): # Got what we were waiting for. Done? ret_code = 0 if not read_opts['input_forced_wait']: # Done break elif self.input_is_device: # input is a serial device start = time.clock() while read_opts['input_wait'] > (time.clock() - start): # First priority is to be sure the input buffer is read. # This is to prevent input buffer overuns # pyserial 3.0 replaces inWaiting() with in_waiting if 0 < self.ser.inWaiting(): # We have serial input waiting, get it # 1024 is comfortably large, almost always the # Read timeout is what causes ser.read() to return new_out = self.ser.read(1024) if raw is not None: # save to raw file raw.write(new_out) self.out += new_out consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if ((gps_model.expect_statement_identifier and (gps_model.expect_statement_identifier == gps_model.last_statement_identifier))): # Got what we were waiting for. Done? ret_code = 0 if not read_opts['input_forced_wait']: # Done break else: # ordinary file, so all read at once self.out += self.ser.read() if raw is not None: # save to raw file raw.write(self.out) while True: consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if 0 >= consumed: break except IOError: # This happens on a good device name, but gpsd already running. # or if USB device unplugged sys.stderr.write('%s: failed to read %s\n' '%s: Is gpsd already holding the port?\n' % (PROG_NAME, read_opts['input_file_name'], PROG_NAME)) return 1 if 0 < ret_code: # did not see the message we were expecting to see sys.stderr.write('%s: waited %0.2f seconds for, ' 'but did not get: %%%s%%\n' % (PROG_NAME, read_opts['input_wait'], gps_model.expect_statement_identifier)) return ret_code def write_gpsd(self, data): "write data to gpsd daemon" # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. # Input data is binary, converting to hex doubles its size. # Limit binary data to length 255, so hex data length less than 510. if 255 < len(data): sys.stderr.write('%s: trying to send %d bytes, max is 255\n' % (PROG_NAME, len(data))) return 1 if opts['target']['device'] is not None: # add in the requested device data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' else: data_out = b'?DEVICE={' # Convert binary data to hex and build the message. data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' if VERB_RAW <= opts['verbosity']: print("sent: ", data_out) self.ser.send(data_out) return 0 # instantiate the GPS class gps_model = ubx() def usage(): "Ouput usage information, and exit" print('usage: %s [-c C] [-f F] [-r] [-p P] [-s S] [-v V]\n' ' [-hV?] [-S S]\n' ' -c C send raw command C to GPS\n' ' -d D disable D\n' ' -e E enable E\n' ' -f F open F as file/device\n' ' default: %s\n' ' -h print help\n' ' -m M optional mode to -p P\n' ' -p P send a prepackaged query P to GPS\n' ' -R R save raw data from GPS in file R\n' ' default: %s\n' ' -r open file/device read only\n' ' -S S set GPS speed to S\n' ' -s S set port speed to S\n' ' default: %s bps\n' ' -w wait time\n' ' default: %s seconds\n' ' -V print version\n' ' -v V Set verbosity level to V, 0 to 4\n' ' default: %s\n' ' -? print help\n' '\n' 'D and E can be one of:' % (PROG_NAME, opts['input_file_name'], opts['raw_file'], opts['input_speed'], opts['input_wait'], opts['verbosity']) ) for item in sorted(gps_model.able_commands.keys()): print(" %-12s %s" % (item, gps_model.able_commands[item]["help"])) print('\nP can be one of:') for item in sorted(gps_model.commands.keys()): print(" %-12s %s" % (item, gps_model.commands[item]["help"])) print('\nOptions can be placed in the UBXOPTS environment variable.\n' 'UBXOPTS is processed before the CLI options.') sys.exit(0) if 'UBXOPTS' in os.environ: # grab the UBXOPTS environment variable for options opts['progopts'] = os.environ['UBXOPTS'] options = opts['progopts'].split(' ') + sys.argv[1:] else: options = sys.argv[1:] try: (options, arguments) = getopt.getopt(options, "?c:d:e:f:hm:rp:s:w:v:R:S:V") except getopt.GetoptError as err: sys.stderr.write("%s: %s\n" "Try '%s -h' for more information.\n" % (PROG_NAME, str(err), PROG_NAME)) sys.exit(2) for (opt, val) in options: if opt == '-c': opts['command'] = val elif opt == '-d': opts['disable'] = val elif opt == '-e': opts['enable'] = val elif opt == '-f': opts['input_file_name'] = val elif opt == '-h' or opt == '-?': usage() elif opt == '-m': opts['mode'] = int(val) elif opt == '-p': opts['poll'] = val elif opt == '-r': opts['read_only'] = True elif opt == '-s': opts['input_speed'] = int(val) if opts['input_speed'] not in gps_model.speeds: sys.stderr.write('%s: -s invalid speed %s\n' % (PROG_NAME, opts['input_speed'])) sys.exit(1) elif opt == '-w': opts['input_wait'] = float(val) elif opt in '-v': opts['verbosity'] = int(val) elif opt in '-R': # raw log file opts['raw_file'] = val elif opt in '-S': opts['set_speed'] = int(val) if opts['set_speed'] not in gps_model.speeds: sys.stderr.write('%s: -S invalid speed %s\n' % (PROG_NAME, opts['set_speed'])) sys.exit(1) elif opt == '-V': # version sys.stderr.write('%s: Version %s\n' % (PROG_NAME, gps_version)) sys.exit(0) if opts['input_file_name'] is None: # no input file given # default to local gpsd opts['target']['server'] = "localhost" opts['target']['port'] = gps.GPSD_PORT opts['target']['device'] = None if arguments: # server[:port[:device]] parts = arguments[0].split(':') opts['target']['server'] = parts[0] if 1 < len(parts): opts['target']['port'] = parts[1] if 2 < len(parts): opts['target']['device'] = parts[2] elif arguments: sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) sys.exit(1) if VERB_PROG <= opts['verbosity']: # dump all options print('Options:') for option in sorted(opts): print(" %s: %s" % (option, opts[option])) # done parsing arguments from environment and CLI try: # raw log file requested? raw = None if opts['raw_file']: try: raw = open(opts['raw_file'], 'w') except IOError: sys.stderr.write('%s: failed to open raw file %s\n' % (PROG_NAME, opts['raw_file'])) sys.exit(1) # create the I/O instance io_handle = gps_io(serial) sys.stdout.flush() if opts['disable'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) if opts['disable'] in gps_model.able_commands: command = gps_model.able_commands[opts['disable']] command["command"](gps, 0) else: sys.stderr.write('%s: disable %s not found\n' % (PROG_NAME, opts['disable'])) sys.exit(1) elif opts['enable'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) if opts['enable'] in gps_model.able_commands: command = gps_model.able_commands[opts['enable']] command["command"](gps, 1) else: sys.stderr.write('%s: enable %s not found\n' % (PROG_NAME, opts['enable'])) sys.exit(1) elif opts['poll'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) if 'MODEL' == opts["poll"]: if opts["mode"] is None: opts["mode"] = 0 # default to portable model if opts['poll'] in gps_model.commands: command = gps_model.commands[opts['poll']] if 'opt' in command: command["command"](gps, command["opt"]) else: command["command"](gps) else: sys.stderr.write('%s: poll %s not found\n' % (PROG_NAME, opts['poll'])) sys.exit(1) elif opts['set_speed'] is not None: gps_model.send_set_speed(opts['set_speed']) elif opts['command'] is not None: # zero length is OK to send # add trailing new line opts['command'] += "\n" if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) gps_model.gps_send_raw(opts['command']) exit_code = io_handle.read(opts) if ((VERB_RAW <= opts['verbosity']) and io_handle.out): # dump raw left overs print("Left over data:") print(io_handle.out) sys.stdout.flush() io_handle.ser.close() except KeyboardInterrupt: print('') exit_code = 1 sys.exit(exit_code)