From 52a27d71f19563a40270b25ac1d127529e0a2360 Mon Sep 17 00:00:00 2001 From: "Gary E. Miller" Date: Mon, 24 Sep 2018 15:21:42 -0700 Subject: ubxtool/zerk: install these two programs by default. --- SConstruct | 2 +- contrib/ubxtool | 2377 ------------------------------------------------------- contrib/zerk | 1896 -------------------------------------------- ubxtool | 2377 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ zerk | 1896 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 4274 insertions(+), 4274 deletions(-) delete mode 100755 contrib/ubxtool delete mode 100755 contrib/zerk create mode 100755 ubxtool create mode 100755 zerk diff --git a/SConstruct b/SConstruct index a4fde821..0fbad748 100644 --- a/SConstruct +++ b/SConstruct @@ -1364,7 +1364,7 @@ if not env['python']: python_targets = [] python_progs = [] else: - python_progs = ["gegps", "gpscat", "gpsfake", "gpsprof"] + python_progs = ["gegps", "gpscat", "gpsfake", "gpsprof", "ubxtool", "zerk"] if env['xgps']: # check for pycairo diff --git a/contrib/ubxtool b/contrib/ubxtool deleted file mode 100755 index b8c80e95..00000000 --- a/contrib/ubxtool +++ /dev/null @@ -1,2377 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -''' -ubxtool -- u-blox configurator and packet decoder - -usage: ubxtool [OPTIONS] [server[:port[:device]]] -''' - -# This file is Copyright (c) 2018 by the GPSD project -# BSD terms apply: see the file COPYING in the distribution root for details. -# -# This code runs compatibly under Python 2 and 3.x for x >= 2. -# Preserve this property! -# -# ENVIRONMENT: -# Options in the UBXOPTS environment variable will be parsed before -# the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' -# -# To see what constellations are enabled: -# ubxtool -p GNSS -f /dev/ttyXX -# -# To disable GALILEO and enable GALILEO: -# ubxtool -d GLONASS -f /dev/ttyXX -# ubxtool -e GALILEO -f /dev/ttyXX -# -# To read GPS messages a log file: -# ubxtool -v 2 -f test/daemon/ublox-neo-m8n.log - -from __future__ import absolute_import, print_function, division - -import binascii # for binascii.hexlify() -import getopt # for getopt.getopt(), to parse CLI options -import os # for os.environ -import socket # for socket.error -import stat # for stat.S_ISBLK() -import struct # for pack() -import sys -import time - -PROG_NAME = 'ubxtool' - -try: - import serial -except ImportError: - # treat serial as special since it is not part of standard Python - sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME) - sys.exit(2) - -try: - import gps -except ImportError: - # PEP8 says local imports last - sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % - PROG_NAME) - sys.exit(2) - -gps_version = '3.18-dev' -if gps.__version__ != gps_version: - sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % - (PROG_NAME, gps_version, gps.__version__)) - sys.exit(1) - - -VERB_QUIET = 0 # quiet -VERB_NONE = 1 # just output requested data and some info -VERB_DECODE = 2 # decode all messages -VERB_INFO = 3 # more info -VERB_RAW = 4 # raw info -VERB_PROG = 5 # program trace - -# dictionary to hold all user options -opts = { - # command to send to GPS, -c - 'command': None, - # command for -d disable - 'disable': None, - # command for -e enable - 'enable': None, - # default input -f file - 'input_file_name': None, - # default forced wait? -W - 'input_forced_wait': False, - # default port speed -s - 'input_speed': 9600, - # default input wait time -w in seconds - 'input_wait': 2.0, - # optional mode to -p P - 'mode': None, - # the name of an OAF file, extension .jpo - 'oaf_name': None, - # poll command -p - 'poll': None, - # raw log file name - 'raw_file': None, - # open port read only -r - 'read_only': False, - # speed to set GPS -S - 'set_speed': None, - # target gpsd (server:port:device) to connect to - 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, - # verbosity level, -v - 'verbosity': VERB_NONE, - # contents of environment variable UBXOPTS - 'progopts': '', -} - - -class ubx(object): - "class to hold u-blox stuff" - - # when a statement identifier is received, it is stored here - last_statement_identifier = None - # expected statement identifier. - expect_statement_identifier = False - - def __init__(self): - pass - - # allowable speeds - speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600, - 4800, 2400, 1200, 600, 300) - - # UBX Satellite Numbering - gnss_id = {0: 'GPS', - 1: 'SBAS', - 2: 'Galileo', - 3: 'BeiDou', - 4: 'IMES', - 5: 'QZSS', - 6: 'GLONASS'} - - def ack_ack(self, buf): - "UBX-ACK-ACK decode" - m_len = len(buf) - if 2 > m_len: - return "Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return "Bad Length %s" % m_len - - u = struct.unpack_from('> 5) & 0x1f, (u[1] >> 10) & 0x1f, - u[1] >> 15)) - return s - - def cfg_cfg_mask(self, mask): - "decode Mask in UBX-CFG-CFG, return string" - s = '' - if mask & 0x1: - s += 'ioPort ' - if mask & 0x2: - s += 'msgConf ' - if mask & 0x4: - s += 'infMsg ' - if mask & 0x8: - s += 'navConf ' - if mask & 0x10: - s += 'rxmConf ' - if mask & 0x80: - # not on M8030 - s += 'senConf ' - if mask & 0x100: - s += 'rinvConf ' - if mask & 0x200: - s += 'antConf ' - if mask & 0x800: - s += 'logConf ' - if mask & 0x1000: - s += 'ftsConf ' - - return s - - def cfg_cfg(self, buf): - "UBX-CFG-CFG decode" - m_len = len(buf) - if 12 > m_len: - return "Bad Length %s" % m_len - - if 12 == m_len: - u = struct.unpack_from(' m_len: - return "Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return "Bad Length %s" % m_len - - values = {0: "Full power", - 1: "Balanced", - 2: "Interval", - 3: "Aggresive with 1Hz", - 4: "Aggresive with 2Hz", - 5: "Aggresive with 4Hz", - 0xff: "Invalid" - } - u = struct.unpack_from(' m_len: - return "Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return "Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return "Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from('> 7) & 0x0f - gridToGpsDec = ('UTC', 'GPS', 'Glonass', 'BeiDou') - syncMode = (u[10] >> 11) & 0x03 - s += "gridToGps %s, syncMode %d " % (gridToGpsDec[gridToGps], syncMode) - - return s - - def cfg_usb(self, buf): - "UBX-CFG-USB decode" - m_len = len(buf) - if 0 == m_len: - return " Poll request" - - if 108 > m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - substr = buf.split('\0')[0] - s = ' swVersion: %s\n' % (substr) - substr = buf[30:39] - substr = substr.split('\0')[0] - s += ' hwVersion: %s' % (substr[30:39]) - # extensions?? - num_ext = int((m_len - 40) / 30) - i = 0 - while i < num_ext: - loc = 40 + (i * 30) - substr = buf[loc:] - substr = substr.split('\0')[0] - s += '\n extension: %s' % (substr) - i += 1 - return s - - mon_ids = {2: {'str': 'IO', 'name': 'UBX-MON-IO'}, - 4: {'str': 'VER', 'dec': mon_ver, 'name': 'UBX-MON-VER'}, - 6: {'str': 'MSGPP', 'name': 'UBX-MON-MSGPP'}, - 7: {'str': 'RXBUF', 'name': 'UBX-MON-RXBUF'}, - 8: {'str': 'TXBUF', 'name': 'UBX-MON-TXBUF'}, - 9: {'str': 'HW', 'name': 'UBX-MON-HW'}, - 0x0b: {'str': 'HW2', 'name': 'UBX-MON-HW2'}, - 0x21: {'str': 'RXR', 'name': 'UBX-MON-RXR'}, - 0x27: {'str': 'PATCH', 'name': 'UBX-MON-PATCH'}, - 0x28: {'str': 'GNSS', 'name': 'UBX-MON-GNSS'}, - 0x2e: {'str': 'SMGR', 'name': 'UBX-MON-SMGR'}, - } - - def nav_clock(self, buf): - "UBX-NAV-CLOCK decode, Clock Solution" - m_len = len(buf) - if 0 == m_len: - return " Poll request" - - if 20 > m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from('> 4)) - if 0x40 & u[6]: - s += 'diffCorr ' - if 0x80 & u[6]: - s += 'smoothed ' - s += 'orbitSource %u ' % (0x07 & (u[6] >> 8)) - if 0x800 & u[6]: - s += 'ephAvail ' - if 0x1000 & u[6]: - s += 'almAvail ' - if 0x2000 & u[6]: - s += 'anoAvail ' - if 0x4000 & u[6]: - s += 'aopAvail ' - - if 0x730000 & u[6]: - s += '\n' - if 0x10000 & u[6]: - s += 'sbasCorrused ' - if 0x20000 & u[6]: - s += 'rtcmCorrused ' - if 0x1000000 & u[6]: - s += 'prCorrused ' - if 0x2000000 & u[6]: - s += 'crCorrused ' - if 0x4000000 & u[6]: - s += 'doCorrused ' - m_len -= 12 - i += 1 - - return s - - def nav_sbas(self, buf): - "UBX-NAV-SBAS decode" - m_len = len(buf) - if 0 == m_len: - return " Poll request" - - if 12 > m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return "Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return ".Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len: - return " Bad Length %s" % m_len - - u = struct.unpack_from('> 2) & 0x03 - if 0 == raim: - s += "RAIM not available" - elif 1 == raim: - s += "RAIM not active" - elif 2 == raim: - s += "RAIM active" - else: - s += "RAIM ??" - return s - - tim_ids = {1: {'str': 'TP', 'dec': tim_tp, 'name': 'UBX-TIM-TP'}, - 3: {'str': 'TM2', 'name': 'UBX-TIM-TM2'}, - 4: {'str': 'SVIN', 'dec': tim_svin, 'name': 'UBX-TIM-SVIN'}, - 6: {'str': 'VRFY', 'name': 'UBX-TIM-VRFY'}, - 0x11: {'str': 'DOSC', 'name': 'UBX-TIM-DOSC'}, - 0x12: {'str': 'TOS', 'name': 'UBX-TIM-TOS'}, - 0x13: {'str': 'SMEAS', 'name': 'UBX-TIM-SMEAS'}, - 0x15: {'str': 'VCOCAL', 'name': 'UBX-TIM-VCOCAL'}, - 0x16: {'str': 'FCHG', 'name': 'UBX-TIM-FCHG'}, - 0x17: {'str': 'HOC', 'name': 'UBX-TIM-HOC'}, - } - - classes = { - 0x01: {'str': 'NAV', 'ids': nav_ids}, - 0x02: {'str': 'RXM', 'ids': rxm_ids}, - 0x04: {'str': 'INF', 'ids': inf_ids}, - 0x05: {'str': 'ACK', 'ids': ack_ids}, - 0x06: {'str': 'CFG', 'ids': cfg_ids}, - 0x09: {'str': 'UPD'}, - 0x0A: {'str': 'MON', 'ids': mon_ids}, - 0x0B: {'str': 'ATD'}, - 0x0D: {'str': 'TIM', 'ids': tim_ids}, - 0x10: {'str': 'ESF'}, - 0x13: {'str': 'MGA'}, - 0x21: {'str': 'LOG'} - } - - def class_id_s(self, m_class, m_id): - "Return class and ID numbers as a string." - s = 'Class: ' - if m_class not in self.classes: - s += '%#x ID: %#x' % (m_class, m_id) - return s - - if 'str' in self.classes[m_class]: - s_class = self.classes[m_class]['str'] - s += '%s(%#x) ' % (s_class, m_class) - else: - s += '%#x ' % (m_class) - - if (('ids' in self.classes[m_class] and - m_id in self.classes[m_class]['ids'] and - 'str' in self.classes[m_class]['ids'][m_id])): - s_id = self.classes[m_class]['ids'][m_id]['str'] - s += 'ID: %s(%#x)' % (s_id, m_id) - else: - s += 'ID: %#x' % (m_id) - - return s - - def decode_msg(self, out): - "Decode one message and then return number of chars consumed" - - state = 'BASE' - consumed = 0 - - # decode state machine - for this_byte in out: - consumed += 1 - if isinstance(this_byte, str): - # a character, probably read from a file - c = ord(this_byte) - else: - # a byte, probably read from a serial port - c = int(this_byte) - - if VERB_RAW <= opts['verbosity']: - if (ord(' ') <= c) and (ord('~') >= c): - # c is printable - print("state: %s char %c (%#x)" % (state, chr(c), c)) - else: - # c is not printable - print("state: %s char %#x" % (state, c)) - - if 'BASE' == state: - # start fresh - # place to store 'comments' - comment = '' - m_class = 0 - m_id = 0 - m_len = 0 - m_raw = bytearray(0) # class, id, len, payload - m_payload = bytearray(0) # just the payload - m_ck_a = 0 - m_ck_b = 0 - - if 0xb5 == c: - # got header 1, mu - state = 'HEADER1' - - if ord('$') == c: - # got $, so NMEA? - state = 'NMEA' - comment = '$' - - if ord("{") == c: - # JSON, treat as comment line - state = 'JSON' - - # start fresh - comment = "{" - continue - - if ord("#") == c: - # comment line - state = 'COMMENT' - - # start fresh - comment = "#" - continue - - if (ord('\n') == c) or (ord('\r') == c): - # CR or LF, leftovers - return 1 - continue - - if state in ('COMMENT', 'JSON'): - # inside comment - if ord('\n') == c or ord('\r') == c: - # Got newline or linefeed - # terminate messages on or - # Done, got a full message - if gps.polystr('{"class":"ERROR"') in comment: - # always print gpsd errors - print(comment) - elif VERB_DECODE <= opts['verbosity']: - print(comment) - return consumed - else: - comment += chr(c) - continue - - if 'NMEA' == state: - # getting NMEA payload - if (ord('\n') == c) or (ord('\r') == c): - # CR or LF, done, got a full message - # terminates messages on or - if VERB_DECODE <= opts['verbosity']: - print(comment + '\n') - return consumed - else: - comment += chr(c) - continue - - if ord('b') == c and 'HEADER1' == state: - # got header 2 - state = 'HEADER2' - continue - - if 'HEADER2' == state: - # got class - state = 'CLASS' - m_class = c - m_raw.extend([c]) - continue - - if 'CLASS' == state: - # got ID - state = 'ID' - m_id = c - m_raw.extend([c]) - continue - - if 'ID' == state: - # got first length - state = 'LEN1' - m_len = c - m_raw.extend([c]) - continue - - if 'LEN1' == state: - # got second length - m_raw.extend([c]) - m_len += 256 * c - if 0 == m_len: - # no payload - state = 'CSUM1' - else: - state = 'PAYLOAD' - continue - - if 'PAYLOAD' == state: - # getting payload - m_raw.extend([c]) - m_payload.extend([c]) - if len(m_payload) == m_len: - state = 'CSUM1' - continue - - if 'CSUM1' == state: - # got ck_a - state = 'CSUM2' - m_ck_a = c - continue - - if 'CSUM2' == state: - # ck_b - state = 'BASE' - m_ck_b = c - # check checksum - chk = self.checksum(m_raw, len(m_raw)) - if (chk[0] != m_ck_a) or (chk[1] != m_ck_b): - sys.stderr.write("%s: ERROR checksum failed," - "was (%d,%d) s/b (%d, %d)\n" % - (PROG_NAME, m_ck_a, m_ck_b, - chk[0], chk[1])) - - s_payload = ''.join('{:02x} '.format(x) for x in m_payload) - x_payload = binascii.hexlify(m_payload) - - if m_class in self.classes: - this_class = self.classes[m_class] - if 'ids' in this_class: - if m_id in this_class['ids']: - if 'dec' in this_class['ids'][m_id]: - dec = this_class['ids'][m_id]['dec'] - s_payload = this_class['ids'][m_id]['name'] - s_payload += ':\n' - s_payload += dec(self, m_payload) - else: - s_payload = x_payload - - if VERB_INFO < opts['verbosity']: - print("%s, len: %#x" % - (self.class_id_s(m_class, m_id), m_len)) - if VERB_RAW < opts['verbosity']: - print("payload: %s" % x_payload) - print("%s\n" % s_payload) - return consumed - - # give up - state = 'BASE' - - # fell out of loop, no more chars to look at - return 0 - - def checksum(self, msg, m_len): - "Calculate u-blox message checksum" - # the checksum is calculated over the Message, starting and including - # the CLASS field, up until, but excluding, the Checksum Field: - - ck_a = 0 - ck_b = 0 - for c in msg[0:m_len]: - ck_a += c - ck_b += ck_a - - return [ck_a & 0xff, ck_b & 0xff] - - def make_pkt(self, m_class, m_id, m_data): - "Make a message packet" - # always little endian, leader, class, id, length - m_len = len(m_data) - - # build core message - msg = bytearray(m_len + 6) - struct.pack_into('>= 8 - m_data[21] = seconds & 0x0ff - seconds >>= 8 - m_data[22] = seconds & 0x0ff - seconds >>= 8 - m_data[23] = seconds & 0x0ff - - # Survey-in position accuracy limit 500 mm - mmeters = 500 - m_data[24] = mmeters & 0x0ff - mmeters >>= 8 - m_data[25] = mmeters & 0x0ff - mmeters >>= 8 - m_data[26] = seconds & 0x0ff - seconds >>= 8 - m_data[27] = mmeters & 0x0ff - gps_model.gps_send(6, 0x3d, m_data) - - def send_able_tp(self, able): - "dis/enable UBX-TIM-TP Time Pulse" - rate = 1 if able else 0 - m_data = bytearray([0xd, 0x1, rate]) - gps_model.gps_send(6, 1, m_data) - - def send_cfg_ant(self): - "UBX-CFG-ANT, Get Antenna Control Settings" - - m_data = bytearray(0) - gps_model.gps_send(6, 0x13, m_data) - - def send_cfg_cfg(self, save_clear): - "UBX-CFG-CFG, save config" - - # Save: save_clear = 0 - # Clear: save_clear = 1 - - # basic configs alays available to change: - # ioPort, msgConf, infMsg, navConf, rxmConf, senConf - cfg1 = 0x9f - # rinvConf, antConf, logConf - cfg2 = 0x03 - - m_data = bytearray(13) - - # clear - if 0 == save_clear: - # saving - m_data[0] = 0 - m_data[1] = 0 - else: - # clearing - m_data[0] = cfg1 - m_data[1] = cfg2 - m_data[2] = 0 # - m_data[3] = 0 # - - # save - if 0 == type: - # saving - m_data[4] = cfg1 - m_data[5] = cfg2 - else: - # clearing - m_data[4] = 0 - m_data[5] = 0 - m_data[6] = 0 # - m_data[7] = 0 # - - # load - if False and 0 == type: - # saving - m_data[8] = 0 - m_data[9] = 0 - else: - # clearing, load it to save a reboot - m_data[8] = cfg1 - m_data[9] = cfg2 - m_data[10] = 0 # - m_data[11] = 0 # - - # deviceMask, where to save it, try all options - m_data[12] = 0x17 # devBBR, devFLASH devEEPROM, devSpiFlash - - gps_model.gps_send(6, 0x9, m_data) - - def send_cfg_gnss1(self, gnssId, enable): - "UBX-CFG-GNSS, set GNSS config" - m_data = bytearray(12) - m_data[0] = 0 # version 0, msgVer - m_data[1] = 0 # read only, numTrkChHw - m_data[2] = 0xFF # read only, numTrkChUse - m_data[3] = 1 # 1 block follows - # block 1 - m_data[4] = gnssId # gnssId - if 0 == gnssId: - # GPS - m_data[5] = 8 # resTrkCh - m_data[6] = 16 # maxTrkCh - if 1 == gnssId: - # SBAS - m_data[5] = 1 # resTrkCh - m_data[6] = 3 # maxTrkCh - if 2 == gnssId: - # GALILEO - m_data[5] = 4 # resTrkCh - m_data[6] = 8 # maxTrkCh - if 3 == gnssId: - # BeiDou - m_data[5] = 2 # resTrkCh - m_data[6] = 16 # maxTrkCh - if 4 == gnssId: - # IMES - m_data[5] = 0 # resTrkCh - m_data[6] = 8 # maxTrkCh - if 5 == gnssId: - # QZSS - m_data[5] = 0 # resTrkCh - m_data[6] = 3 # maxTrkCh - if 6 == gnssId: - # GLONASS - m_data[5] = 8 # resTrkCh - m_data[6] = 14 # maxTrkCh - m_data[7] = 0 # reserved1 - m_data[8] = enable # flags - m_data[9] = 0 # flagflags, unused - if 5 == gnssId: - # QZSS - m_data[10] = 5 # flags E1OS, L1SAIF - else: - m_data[10] = 1 # flags E1OS - m_data[11] = 1 # flags, unused - gps_model.gps_send(6, 0x3e, m_data) - - def send_cfg_gnss(self): - "UBX-CFG-GNSS, set Galileo config decode" - m_data = bytearray(0) - gps_model.gps_send(6, 0x3e, m_data) - - def send_cfg_nav5_model(self): - "UBX-CFG-NAV5, set dynamic platform model" - - m_data = bytearray(36) - m_data[0] = 1 # just setting dynamic model - m_data[1] = 0 # just setting dynamic model - m_data[2] = opts["mode"] - - gps_model.gps_send(6, 0x24, m_data) - - def send_cfg_msg(self, m_class, m_id, rate=None): - "UBX-CFG-MSG, poll, or set, message rates decode" - m_data = bytearray(2) - m_data[0] = m_class - m_data[1] = m_id - if rate is not None: - m_data.extend([rate]) - gps_model.gps_send(6, 1, m_data) - - def send_cfg_nav5(self): - "UBX-CFG-NAV5, get Nav Engine Settings" - m_data = bytearray(0) - gps_model.gps_send(6, 0x24, m_data) - - def send_cfg_pms(self): - "UBX-CFG-PMS, Get Power Management Settings" - - if opts["mode"] is not None: - m_data = bytearray(8) - # set powerSetupValue to mode - m_data[1] = opts["mode"] - # leave period and onTime zero, which breaks powerSetupValue = 3 - else: - m_data = bytearray(0) - - gps_model.gps_send(6, 0x86, m_data) - - def send_cfg_prt(self): - "UBX-CFG-PRT, get I/O Port" - m_data = bytearray(0) - gps_model.gps_send(6, 0x0, m_data) - - def send_set_speed(self, speed): - "UBX-CFG-PRT, set port" - - # FIXME! Poll current masks, then adjust speed - m_data = bytearray(20) - m_data[0] = 1 # port 1, UART 1 - # m_data[0] = 3 # port 3, USB - m_data[4] = 0xc0 # 8N1 - m_data[5] = 0x8 # 8N1 - - m_data[8] = speed & 0xff - m_data[9] = (speed >> 8) & 0xff - m_data[10] = (speed >> 16) & 0xff - m_data[11] = (speed >> 24) & 0xff - - m_data[12] = 3 # in, ubx and nmea - m_data[14] = 3 # out, ubx and nmea - gps_model.gps_send(6, 0, m_data) - - def send_cfg_rst(self, reset_type): - "UBX-CFG-RST, reset" - # always do a hardware reset - # if on USB, this will disconnect and reconnect, giving you - # a new tty. - m_data = bytearray(4) - m_data[0] = reset_type & 0xff - m_data[1] = (reset_type >> 8) & 0xff - gps_model.gps_send(6, 0x4, m_data) - - def send_cfg_tmode2(self): - "UBX-CFG-TMODE2, get time mode 2 configuration" - m_data = bytearray(0) - gps_model.gps_send(6, 0x3d, m_data) - - def send_cfg_tp5(self): - "UBX-CFG-TP5, get time0 decodes, timepulse 0 and 1" - m_data = bytearray(0) - gps_model.gps_send(6, 0x31, m_data) - # and timepulse 1 - m_data = bytearray(1) - m_data[0] = 1 - gps_model.gps_send(6, 0x31, m_data) - - def send_cfg_usb(self): - "UBX-CFG-USB, get USB configuration" - m_data = bytearray(0) - gps_model.gps_send(6, 0x1b, m_data) - - def send_mon_ver(self): - "UBX-MON-VER get versions" - m_data = bytearray(0) - gps_model.gps_send(0x0a, 0x04, m_data) - - def send_nav_posecef(self): - "UBX-NAV-POSECEF, poll ECEF position" - m_data = bytearray(0) - gps_model.gps_send(1, 1, m_data) - - def send_nav_velecef(self): - "UBX-NAV-VELECEF, poll ECEF velocity decode" - m_data = bytearray(0) - gps_model.gps_send(1, 0x11, m_data) - - def send_tim_svin(self): - "UBX-TIM-SVIN, get survey in data" - m_data = bytearray(0) - gps_model.gps_send(0x0d, 0x04, m_data) - - def send_tim_tp(self): - "UBX-TIM-TP, get time pulse timedata" - m_data = bytearray(0) - gps_model.gps_send(0x0d, 0x01, m_data) - - able_commands = { - # en/dis able BeiDou - "BEIDOU": {"command": send_able_beidou, - "help": "BeiDou"}, - # en/dis able basic binary messages - "BINARY": {"command": send_able_binary, - "help": "basic binary messages"}, - # en/dis able ECEF - "ECEF": {"command": send_able_ecef, - "help": "ECEF"}, - # en/dis able GPS - "GPS": {"command": send_able_gps, - "help": "GPS and QZSS"}, - # en/dis able GALILEO - "GALILEO": {"command": send_able_galileo, - "help": "GALILEO"}, - # en/dis able GLONASS - "GLONASS": {"command": send_able_glonass, - "help": "GLONASS"}, - # en/dis able basic NMEA messages - "NMEA": {"command": send_able_nmea, - "help": "basic NMEA messages"}, - # en/dis able RAWX - "RAWX": {"command": send_able_rawx, - "help": "RAWX measurements"}, - # en/dis able SBAS - "SBAS": {"command": send_able_sbas, - "help": "SBAS"}, - # en/dis able TP time pulse message - "TP": {"command": send_able_tp, - "help": "TP Time Pulse message"}, - # en/dis able TMODE2 Survey-in - "TMODE2": {"command": send_able_tmode2, - "help": "TMODE2"}, - } - commands = { - # UBX-CFG-ANT, poll antenna config decode - "ANT": {"command": send_cfg_ant, - "help": "UBX-CFG-ANT poll antenna config"}, - # UBX-CFG-RST cold boot - "COLDBOOT": {"command": send_cfg_rst, - "help": "UBS-CFG-RST coldboot the GPS", - "opt": 0xfff}, - # UBX-CFG-GNSS poll gnss config - "GNSS": {"command": send_cfg_gnss, - "help": "UBX-CFG-GNSS poll GNSS config"}, - # UBX-CFG-RST hot boot - "HOTBOOT": {"command": send_cfg_rst, - "help": "UBX-CFG-RST hotboot the GPS", - "opt": 0}, - # UBX-CFG-NAV5 set Dynamic Platform Model - "MODEL": {"command": send_cfg_nav5_model, - "help": "UBX-CFG-NAV5 set Dynamic Platform Model"}, - # UBX-CFG-NAV5, poll Nav Engine Settings - "NAV5": {"command": send_cfg_nav5, - "help": "UBX-CFG-NAV5 poll Nav Engines settings"}, - # UBX-CFG-PMS, poll power management settings - "PMS": {"command": send_cfg_pms, - "help": "UBX-CFG-PMS poll power management settings"}, - # UBX-CFG-PRT, poll I/O port number - "PRT": {"command": send_cfg_prt, - "help": "UBX-CFG-PRT poll I/O port settings"}, - # UBX-CFG-CFG reset config - "RESET": {"command": send_cfg_cfg, - "help": "UBX-CFG-CFG reset config to defaults", - "opt": 1}, - # UBX-CFG-CFG save config - "SAVE": {"command": send_cfg_cfg, - "help": "UBX-CFG-CFG save current config", - "opt": 0}, - # UBX-TIM-SVIN, get survey in data - "SVIN": {"command": send_tim_svin, - "help": "UBX-TIM-SVIN get survey in data"}, - # UBX-CFG-TMODE2, get time mode 2 config - "TMODE2": {"command": send_cfg_tmode2, - "help": "UBX-CFG-TMODE2 get time mode 2 config"}, - # UBX-TIM-TP, get time pulse timedata - "TP": {"command": send_tim_tp, - "help": "UBX-TIM-TP get time pulse timedata"}, - # UBX-CFG-TP5, get time0 decodes - "TP5": {"command": send_cfg_tp5, - "help": "UBX-TIM-TP5 get time pulse decodes"}, - # UBX-CFG-RST warm boot - "WARMBOOT": {"command": send_cfg_rst, - "help": "UBX-CFG-RST warmboot the GPS", - "opt": 1}, - # poll UBX-CFG-USB - "USB": {"command": send_cfg_usb, - "help": "UBX-CFG-USB get USB config"}, - # poll UBX-MON-VER - "VER": {"command": send_mon_ver, - "help": "UBX-MON-VER get GPS version"}, - } - # end class ubx - - -class gps_io(object): - """All the GPS I/O in one place" - - Three types of GPS I/O - 1. read only from a file - 2. read/write through a device - 3. read only from a gpsd instance - """ - - out = b'' - ser = None - input_is_device = False - - def __init__(self, serial_class): - "Initialize class" - - Serial = serial_class - # buffer to hold read data - self.out = b'' - - # open the input: device, file, or gpsd - if opts['input_file_name'] is not None: - # check if input file is a file or device - try: - mode = os.stat(opts['input_file_name']).st_mode - except OSError: - sys.stderr.write('%s: failed to open input file %s\n' % - (PROG_NAME, opts['input_file_name'])) - sys.exit(1) - - if stat.S_ISCHR(mode): - # character device, need not be read only - self.input_is_device = True - - if ((opts['disable'] or opts['enable'] or opts['poll'] or - opts['oaf_name'])): - - # check that we can write - if opts['read_only']: - sys.stderr.write('%s: read-only mode, ' - 'can not send commands\n' % PROG_NAME) - sys.exit(1) - if self.input_is_device is False: - sys.stderr.write('%s: input is plain file, ' - 'can not send commands\n' % PROG_NAME) - sys.exit(1) - - if opts['target']['server'] is not None: - # try to open local gpsd - try: - self.ser = gps.gpscommon(host=None) - self.ser.connect(opts['target']['server'], - opts['target']['port']) - - # alias self.ser.write() to self.write_gpsd() - self.ser.write = self.write_gpsd - - # ask for raw, not rare, data - data_out = b'?WATCH={' - if opts['target']['device'] is not None: - # add in the requested device - data_out += (b'"device":"' + opts['target']['device'] + - b'",') - data_out += b'"enable":true,"raw":2}\r\n' - if VERB_RAW <= opts['verbosity']: - print("sent: ", data_out) - self.ser.send(data_out) - except socket.error as err: - sys.stderr.write('%s: failed to connect to gpsd %s\n' % - (PROG_NAME, err)) - sys.exit(1) - - elif self.input_is_device: - # configure the serial connections (the parameters refer to - # the device you are connecting to) - - try: - self.ser = Serial.Serial( - baudrate=opts['input_speed'], - # 8N1 is GREIS default - bytesize=Serial.EIGHTBITS, - parity=Serial.PARITY_NONE, - port=opts['input_file_name'], - stopbits=Serial.STOPBITS_ONE, - # read timeout - timeout=0.05, - # pyserial Ver 3.0+ changes writeTimeout to write_timeout - # just set both - write_timeout=0.5, - writeTimeout=0.5, - ) - except Serial.serialutil.SerialException: - # this exception happens on bad serial port device name - sys.stderr.write('%s: failed to open serial port "%s"\n' - '%s: Your computer has the serial ports:\n' % - (PROG_NAME, opts['input_file_name'], - PROG_NAME)) - - # print out list of supported ports - import serial.tools.list_ports as List_Ports - ports = List_Ports.comports() - for port in ports: - sys.stderr.write(" %s: %s\n" % - (port.device, port.description)) - sys.exit(1) - - # flush input buffer, discarding all its contents - self.ser.flushInput() - - else: - # Read from a plain file of GREIS messages - try: - self.ser = open(opts['input_file_name'], 'rb') - except IOError: - sys.stderr.write('%s: failed to open input %s\n' % - (PROG_NAME, opts['input_file_name'])) - sys.exit(1) - - def read(self, read_opts): - "Read from device, until timeout or expected message" - - # are we expecting a certain message? - if gps_model.expect_statement_identifier: - # assume failure, until we see expected message - ret_code = 1 - else: - # not expecting anything, so OK if we did not see it. - ret_code = 0 - - try: - if read_opts['target']['server'] is not None: - # gpsd input - start = time.clock() - while read_opts['input_wait'] > (time.clock() - start): - # First priority is to be sure the input buffer is read. - # This is to prevent input buffer overuns - if 0 < self.ser.waiting(): - # We have serial input waiting, get it - # No timeout possible - # RTCM3 JSON can be over 4.4k long, so go big - new_out = self.ser.sock.recv(8192) - if raw is not None: - # save to raw file - raw.write(new_out) - self.out += new_out - - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if ((gps_model.expect_statement_identifier and - (gps_model.expect_statement_identifier == - gps_model.last_statement_identifier))): - # Got what we were waiting for. Done? - ret_code = 0 - if not read_opts['input_forced_wait']: - # Done - break - - elif self.input_is_device: - # input is a serial device - start = time.clock() - while read_opts['input_wait'] > (time.clock() - start): - # First priority is to be sure the input buffer is read. - # This is to prevent input buffer overuns - # pyserial 3.0 replaces inWaiting() with in_waiting - if 0 < self.ser.inWaiting(): - # We have serial input waiting, get it - # 1024 is comfortably large, almost always the - # Read timeout is what causes ser.read() to return - new_out = self.ser.read(1024) - if raw is not None: - # save to raw file - raw.write(new_out) - self.out += new_out - - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if ((gps_model.expect_statement_identifier and - (gps_model.expect_statement_identifier == - gps_model.last_statement_identifier))): - # Got what we were waiting for. Done? - ret_code = 0 - if not read_opts['input_forced_wait']: - # Done - break - else: - # ordinary file, so all read at once - self.out += self.ser.read() - if raw is not None: - # save to raw file - raw.write(self.out) - - while True: - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if 0 >= consumed: - break - - except IOError: - # This happens on a good device name, but gpsd already running. - # or if USB device unplugged - sys.stderr.write('%s: failed to read %s\n' - '%s: Is gpsd already holding the port?\n' - % (PROG_NAME, read_opts['input_file_name'], - PROG_NAME)) - return 1 - - if 0 < ret_code: - # did not see the message we were expecting to see - sys.stderr.write('%s: waited %0.2f seconds for, ' - 'but did not get: %%%s%%\n' - % (PROG_NAME, read_opts['input_wait'], - gps_model.expect_statement_identifier)) - return ret_code - - def write_gpsd(self, data): - "write data to gpsd daemon" - - # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. - # Input data is binary, converting to hex doubles its size. - # Limit binary data to length 255, so hex data length less than 510. - if 255 < len(data): - sys.stderr.write('%s: trying to send %d bytes, max is 255\n' - % (PROG_NAME, len(data))) - return 1 - - if opts['target']['device'] is not None: - # add in the requested device - data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' - else: - data_out = b'?DEVICE={' - - # Convert binary data to hex and build the message. - data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' - if VERB_RAW <= opts['verbosity']: - print("sent: ", data_out) - self.ser.send(data_out) - return 0 - - -# instantiate the GPS class -gps_model = ubx() - - -def usage(): - "Ouput usage information, and exit" - print('usage: %s [-c C] [-f F] [-r] [-p P] [-s S] [-v V]\n' - ' [-hV?] [-S S]\n' - ' -c C send raw command C to GPS\n' - ' -d D disable D\n' - ' -e E enable E\n' - ' -f F open F as file/device\n' - ' default: %s\n' - ' -h print help\n' - ' -m M optional mode to -p P\n' - ' -p P send a prepackaged query P to GPS\n' - ' -R R save raw data from GPS in file R\n' - ' default: %s\n' - ' -r open file/device read only\n' - ' -S S set GPS speed to S\n' - ' -s S set port speed to S\n' - ' default: %s bps\n' - ' -w wait time\n' - ' default: %s seconds\n' - ' -V print version\n' - ' -v V Set verbosity level to V, 0 to 4\n' - ' default: %s\n' - ' -? print help\n' - '\n' - 'D and E can be one of:' % - (PROG_NAME, opts['input_file_name'], opts['raw_file'], - opts['input_speed'], opts['input_wait'], opts['verbosity']) - ) - - for item in sorted(gps_model.able_commands.keys()): - print(" %-12s %s" % (item, gps_model.able_commands[item]["help"])) - - print('\nP can be one of:') - for item in sorted(gps_model.commands.keys()): - print(" %-12s %s" % (item, gps_model.commands[item]["help"])) - print('\nOptions can be placed in the UBXOPTS environment variable.\n' - 'UBXOPTS is processed before the CLI options.') - sys.exit(0) - - -if 'UBXOPTS' in os.environ: - # grab the UBXOPTS environment variable for options - opts['progopts'] = os.environ['UBXOPTS'] - options = opts['progopts'].split(' ') + sys.argv[1:] -else: - options = sys.argv[1:] - - -try: - (options, arguments) = getopt.getopt(options, "?c:d:e:f:hm:rp:s:w:v:R:S:V") -except getopt.GetoptError as err: - sys.stderr.write("%s: %s\n" - "Try '%s -h' for more information.\n" % - (PROG_NAME, str(err), PROG_NAME)) - sys.exit(2) - -for (opt, val) in options: - if opt == '-c': - opts['command'] = val - elif opt == '-d': - opts['disable'] = val - elif opt == '-e': - opts['enable'] = val - elif opt == '-f': - opts['input_file_name'] = val - elif opt == '-h' or opt == '-?': - usage() - elif opt == '-m': - opts['mode'] = int(val) - elif opt == '-p': - opts['poll'] = val - elif opt == '-r': - opts['read_only'] = True - elif opt == '-s': - opts['input_speed'] = int(val) - if opts['input_speed'] not in gps_model.speeds: - sys.stderr.write('%s: -s invalid speed %s\n' % - (PROG_NAME, opts['input_speed'])) - sys.exit(1) - - elif opt == '-w': - opts['input_wait'] = float(val) - elif opt in '-v': - opts['verbosity'] = int(val) - elif opt in '-R': - # raw log file - opts['raw_file'] = val - elif opt in '-S': - opts['set_speed'] = int(val) - if opts['set_speed'] not in gps_model.speeds: - sys.stderr.write('%s: -S invalid speed %s\n' % - (PROG_NAME, opts['set_speed'])) - sys.exit(1) - - elif opt == '-V': - # version - sys.stderr.write('%s: Version %s\n' % (PROG_NAME, gps_version)) - sys.exit(0) - -if opts['input_file_name'] is None: - # no input file given - # default to local gpsd - opts['target']['server'] = "localhost" - opts['target']['port'] = gps.GPSD_PORT - opts['target']['device'] = None - if arguments: - # server[:port[:device]] - parts = arguments[0].split(':') - opts['target']['server'] = parts[0] - if 1 < len(parts): - opts['target']['port'] = parts[1] - if 2 < len(parts): - opts['target']['device'] = parts[2] - -elif arguments: - sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) - sys.exit(1) - -if VERB_PROG <= opts['verbosity']: - # dump all options - print('Options:') - for option in sorted(opts): - print(" %s: %s" % (option, opts[option])) - -# done parsing arguments from environment and CLI - -try: - # raw log file requested? - raw = None - if opts['raw_file']: - try: - raw = open(opts['raw_file'], 'w') - except IOError: - sys.stderr.write('%s: failed to open raw file %s\n' % - (PROG_NAME, opts['raw_file'])) - sys.exit(1) - - # create the I/O instance - io_handle = gps_io(serial) - - sys.stdout.flush() - - if opts['disable'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) - if opts['disable'] in gps_model.able_commands: - command = gps_model.able_commands[opts['disable']] - command["command"](gps, 0) - else: - sys.stderr.write('%s: disable %s not found\n' % - (PROG_NAME, opts['disable'])) - sys.exit(1) - - elif opts['enable'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) - if opts['enable'] in gps_model.able_commands: - command = gps_model.able_commands[opts['enable']] - command["command"](gps, 1) - else: - sys.stderr.write('%s: enable %s not found\n' % - (PROG_NAME, opts['enable'])) - sys.exit(1) - - elif opts['poll'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) - - if 'MODEL' == opts["poll"]: - if opts["mode"] is None: - opts["mode"] = 0 # default to portable model - - if opts['poll'] in gps_model.commands: - command = gps_model.commands[opts['poll']] - if 'opt' in command: - command["command"](gps, command["opt"]) - else: - command["command"](gps) - else: - sys.stderr.write('%s: poll %s not found\n' % - (PROG_NAME, opts['poll'])) - sys.exit(1) - - elif opts['set_speed'] is not None: - gps_model.send_set_speed(opts['set_speed']) - - elif opts['command'] is not None: - # zero length is OK to send - # add trailing new line - opts['command'] += "\n" - - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) - gps_model.gps_send_raw(opts['command']) - - exit_code = io_handle.read(opts) - - if ((VERB_RAW <= opts['verbosity']) and io_handle.out): - # dump raw left overs - print("Left over data:") - print(io_handle.out) - - sys.stdout.flush() - io_handle.ser.close() - -except KeyboardInterrupt: - print('') - exit_code = 1 - -sys.exit(exit_code) diff --git a/contrib/zerk b/contrib/zerk deleted file mode 100755 index 848d216b..00000000 --- a/contrib/zerk +++ /dev/null @@ -1,1896 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -''' -zerk -- GREIS configurator and packet decoder - -usage: zerk [OPTIONS] [server[:port[:device]]] -''' - -# This program conforms to the JAVAD document: -# GNSS Receiver External Interface Specification -# Revised: October 11, 2017 -# -# Hereafter referred to as "the specification" -# -# This file is Copyright (c) 2018 by the GPSD project -# BSD terms apply: see the file COPYING in the distribution root for details. -# -# This code runs compatibly under Python 2 and 3.x for x >= 2. -# Preserve this property! -# -# ENVIRONMENT: -# Options in the ZERKOPTS environment variable will be parsed before -# the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' -# -# example usages: -# Coldboot the GPS: zerk -p COLDBOOT -# Print current serial port: zerk -c "print,/cur/term" -# Decode raw log file: zerk -r -f greis-binary.log -v 2 -# Change GPS port speed: zerk -S 230400 -# Watch entire reset cycle: zerk -p RESET -v 2 -w 20 -W -# poll SVs Status: zerk -W -w 2 -v 2 -c "out,,jps/{CS,ES,GS,Is,WS,QS}" -# dump local gpsd data zerk -v 2 -w 5 localhost -# -# TODO: no CRC16 packets handled yet -# TODO: more packet decodes - -from __future__ import absolute_import, print_function, division - -import binascii # for binascii.hexlify() -import getopt # for getopt.getopt(), to parse CLI options -import hashlib # for hashlib.sha1 -import os # for os.environ -import socket # for socket.error -import stat # for stat.S_ISBLK() -import struct # for pack() -import sys -import time -import xml.etree.ElementTree # to parse .jpo files - -PROG_NAME = 'zerk' - -try: - import serial -except ImportError: - # treat serial as special since it is not part of standard Python - sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME) - sys.exit(2) - -try: - import gps -except ImportError: - # PEP8 says local imports last - sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % - PROG_NAME) - sys.exit(2) - -gps_version = '3.18-dev' -if gps.__version__ != gps_version: - sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % - (PROG_NAME, gps_version, gps.__version__)) - sys.exit(1) - - -VERB_QUIET = 0 # quiet -VERB_NONE = 1 # just output requested data and some info -VERB_DECODE = 2 # decode all messages -VERB_INFO = 3 # more info -VERB_RAW = 4 # raw info -VERB_PROG = 5 # program trace - -# dictionary to hold all user options -opts = { - # command to send to GPS, -c - 'command': None, - # command for -d disable - 'disable': None, - # command for -e enable - 'enable': None, - # default input -f file - 'input_file_name': None, - # default forced wait? -W - 'input_forced_wait': False, - # default port speed -s - 'input_speed': 115200, - # default input wait time -w in seconds - 'input_wait': 2.0, - # the name of an OAF file, extension .jpo - 'oaf_name': None, - # poll command -p - 'poll': None, - # raw log file name - 'raw_file': None, - # open port read only -r - 'read_only': False, - # speed to set GPS -S - 'set_speed': None, - # target gpsd (server:port:device) to connect to - 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, - # verbosity level, -v - 'verbosity': VERB_NONE, - # contents of environment variable ZERKOPTS - 'progopts': '', -} - - -class greis(object): - """A class for working with the GREIS GPS message formats - - This class contains functions to decode messages in the Javad GREIS - "Receiver Input Language" and "Receiver Messages" formats. - """ - - # when a statement identifier is received, it is stored here - last_statement_identifier = None - # expected statement identifier. - expect_statement_identifier = False - # ID of current message as a string - s_id = '' - - def __init__(self): - "Initialize class" - - self.last_statement_identifier = None - self.expect_statement_identifier = False - # last epoch received in [~~] - # epoch == None means never got epoch, epoch == -1 means missing. - self.epoch = None - - def f4_s(self, f): - "convert an '! f4' to a string" - - if gps.isfinite(f): - # yeah, the precision is a guess - return "%.6f" % f - return 'X' - - def f8_s(self, f): - "convert an '! f8' to a string" - - if gps.isfinite(f): - # yeah, the precision is a guess - return "%.4f" % f - return 'X' - - def i1_s(self, i): - "convert an '! i1' to a string" - return 'X' if i == 127 else str(i) - - def i2_s(self, i): - "convert an '! i2' to a string" - return 'X' if i == 32767 else str(i) - - def i4_s(self, i): - "convert an '! i4' to a string" - return 'X' if i == 2147483647 else str(i) - - def u1_s(self, u): - "convert an '! u1' to a string" - return 'X' if u == 255 else str(u) - - def u2_s(self, u): - "convert an '! u2' to a string" - return 'X' if u == 65535 else str(u) - - def u4_s(self, u): - "convert an '! u4' to a string" - return 'X' if u == 4294967295 else str(u) - - def isuchex(self, c): - "Is byte an upper case hex char?" - if 48 <= c and 57 >= c: - # 0 to 9 - return int(c) - 48 - if 65 <= c and 70 >= c: - # A to F - return int(c) - 55 - return -1 - - soltypes = {0: "None", - 1: "3D", - 2: "DGPS", - 3: "RTK float", - 4: "RTK fixed", - 5: "fixed" - } - - # allowable speeds - speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600, - 4800, 2400, 1200, 600, 300) - - def msg_c_(self, payload): - "[c?] decode, Smoothing Corrections" - - s = ' smooth' - for i in range(0, len(payload) - 1, 2): - u = struct.unpack_from(' m_len): - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len): - return " Bad Length %s" % m_len - if ('[CN]' == self.s_id) and (132 > m_len): - return " Bad Length %s" % m_len - if ('[EN]' == self.s_id) and (145 > m_len): - return " Bad Length %s" % m_len - - u = struct.unpack_from(' m_len): - u = struct.unpack_from(' m_len): - u = struct.unpack_from('= c): - # c is printable - print("state: %s char %c (%#x)" % (state, chr(c), c)) - else: - # c is not printable - print("state: %s char %#x" % (state, c)) - - m_raw.extend([c]) - - # parse input stream per GREIS Ref Guide Section 3.3.3 - if 'BASE' == state: - # start fresh - # place to store 'comments' - comment = '' - # message id byte one - m_id1 = 0 - # message id byte two - m_id2 = 0 - # message length as integer - m_len = 0 - # byte array to hold payload, including possible checksum - m_payload = bytearray(0) - m_raw = bytearray(0) - m_raw.extend([c]) - - if (ord('0') <= c) and (ord('~') >= c): - # maybe id 1, '0' to '~' - state = 'ID1' - - # start the grab - m_id1 = c - continue - - if ord("%") == c: - # start of %ID%, Receiver Input Language - # per GREIS Ref Guide Section 2.2 - state = 'RIL' - - # start fresh - comment = "%" - continue - - if ord("$") == c: - # NMEA line, treat as comment - state = 'NMEA' - - # start fresh - comment = "$" - continue - - if ord("#") == c: - # comment line - state = 'COMMENT' - - # start fresh - comment = "#" - continue - - if ord('\n') == c or ord('\r') == c: - # stray newline or linefeed, eat it - return consumed - - # none of the above, stay in BASE - continue - - if state in ('COMMENT', 'JSON', 'RIL'): - # inside comment - if ord('\n') == c or ord('\r') == c: - # Got newline or linefeed - # GREIS terminates messages on or - # Done, got a full message - if b'{"class":"ERROR"' in comment: - # always print gpsd errors - print(comment) - elif VERB_DECODE <= opts['verbosity']: - print(comment) - return consumed - else: - comment += chr(c) - continue - - if 'ID1' == state: - # maybe id 2, '0' to '~' - if ord('"') == c: - # technically could be GREIS, but likely JSON - state = 'JSON' - comment += chr(m_id1) + chr(c) - elif (ord('0') <= c) and (ord('~') >= c): - state = 'ID2' - m_id2 = c - else: - state = 'BASE' - continue - - if 'ID2' == state: - # maybe len 1, 'A' to 'F' - x = self.isuchex(c) - if -1 < x: - state = 'LEN1' - m_len = x * 256 - else: - state = 'BASE' - continue - - if 'LEN1' == state: - # maybe len 2, 'A' to 'F' - x = self.isuchex(c) - if -1 < x: - state = 'LEN2' - m_len += x * 16 - else: - state = 'BASE' - continue - - if 'LEN2' == state: - # maybe len 3, 'A' to 'F' - x = self.isuchex(c) - if -1 < x: - state = 'PAYLOAD' - m_len += x - else: - state = 'BASE' - continue - - if 'NMEA' == state: - # inside NMEA - if ord('\n') == c or ord('\r') == c: - # Got newline or linefeed - # done, got a full message - # GREIS terminates messages on or - if VERB_DECODE <= opts['verbosity']: - print(comment) - return consumed - else: - comment += chr(c) - continue - - if 'PAYLOAD' == state: - # getting payload - m_payload.extend([c]) - if len(m_payload) < m_len: - continue - - # got entire payload - self.s_id = "[%c%c]" % (chr(m_id1), chr(m_id2)) - if VERB_DECODE <= opts['verbosity']: - x_payload = binascii.hexlify(m_payload) - - # [RE], [ER] and more have no 8-bit checksum - # assume the rest do - if ((self.s_id not in ('[CS]', '[ER]', '[ES]', '[GS]', '[Is]', - '[MF]', '[NS]', '[PM]', '[QS]', '[RE]', - '[WS]') and - not self.checksum_OK(m_raw))): - print("ERROR: Bad checksum\n") - if VERB_DECODE <= opts['verbosity']: - print("DECODE: id: %s len: %d\n" - "DECODE: payload: %s\n" % - (self.s_id, m_len, x_payload)) - # skip it. - return consumed - - if self.s_id in self.messages: - if VERB_INFO <= opts['verbosity']: - print("INFO: id: %s len: %d\n" - "INFO: payload: %s\n" % - (self.s_id, m_len, x_payload)) - - (decode, length) = self.messages[self.s_id] - if m_len < length: - print("DECODE: %s Bad Length %s\n" % - (self.s_id, m_len)) - else: - s = self.s_id + decode(self, m_payload) - if VERB_DECODE <= opts['verbosity']: - print(s) - else: - # unknown message - if VERB_DECODE <= opts['verbosity']: - print("DECODE: Unknown: id: %s len: %d\n" - "DECODE: payload: %s\n" % - (self.s_id, m_len, x_payload)) - return consumed - - # give up - state = 'BASE' - - # fell out of loop, no more chars to look at - return 0 - - def checksum_OK(self, raw_msg): - "Check the i8-bit checksum on a message, return True if good" - - # some packets from the GPS use CRC16, some i8-bit checksum, some none - # only 8-bit checksum done here for now - calc_checksum = self.checksum(raw_msg, len(raw_msg) - 1) - rcode = raw_msg[len(raw_msg)-1] == calc_checksum - if VERB_RAW <= opts['verbosity']: - print("Checksum was %#x, calculated %#x" % - (raw_msg[len(raw_msg)-1], calc_checksum)) - return rcode - - def _rol(self, value): - "rotate a byte left 2 bits" - return ((value << 2) | (value >> 6)) & 0x0ff - - def checksum(self, msg, m_len): - "Calculate GREIS 8-bit checksum" - - # Calculated per section A.1.1 of the specification - # msg may be bytes (incoming messages) or str (outgoing messages) - - ck = 0 - for c in msg[0:m_len]: - if isinstance(c, str): - # a string, make a byte - c = ord(c) - ck = self._rol(ck) ^ c - - return self._rol(ck) & 0x0ff - - def make_pkt(self, m_data): - "Build an output message, always ASCII, add checksum and terminator" - - # build core message - - # no leading spaces, checksum includes the @ - m_data = m_data.lstrip() + b'@' - - chk = self.checksum(m_data, len(m_data)) - - # all commands end with CR and/or LF - return m_data + (b'%02X' % chk) + b'\n' - - def gps_send(self, m_data): - "Send message to GREIS GPS" - - m_all = self.make_pkt(m_data) - if not opts['read_only']: - io_handle.ser.write(m_all) - if VERB_INFO <= opts['verbosity']: - print("sent:", m_all) - self.decode_msg(m_all) - sys.stdout.flush() - - # Table of known options. From table 4-2 of the specification. - oafs = ( - b"_AJM", - b"AUTH", - b"_BLT", - b"_CAN", - b"CDIF", - b"CMRI", - b"CMRO", - b"COMP", - b"COOP", - b"COPN", - b"CORI", - b"_CPH", - b"DEVS", - b"DIST", - b"_DTM", - b"_E5B", - b"_E6_", - b"EDEV", - b"ETHR", - b"EVNT", - b"_FRI", - b"_FRO", - b"_FTP", - b"_GAL", - b"GBAI", - b"GBAO", - b"GCLB", - b"_GEN", - b"_GEO", - b"_GLO", - b"_GPS", - b"_GSM", - b"HTTP", - b"_IMU", - b"INFR", - b"IRIG", - b"IRNS", - b"JPSI", - b"JPSO", - b"_L1C", - b"_L1_", - b"_L2C", - b"_L2_", - b"_L5_", - b"LAT1", - b"LAT2", - b"LAT3", - b"LAT4", - b"LCS2", - b"L_CS", - b"_LIM", - b"LON1", - b"LON2", - b"LON3", - b"LON4", - b"MAGN", - b"_MEM", - b"_MPR", - b"OCTO", - b"OMNI", - b"_PAR", - b"PDIF", - b"_POS", - b"_PPP", - b"_PPS", - b"PRTT", - b"_PTP", - b"QZSS", - b"RAIM", - b"_RAW", - b"RCVT", - b"RM3I", - b"RM3O", - b"RS_A", - b"RS_B", - b"RS_C", - b"RS_D", - b"RTMI", - b"RTMO", - b"SPEC", - b"TCCL", - b"_TCP", - b"TCPO", - b"_TLS", - b"TRST", - b"UDPO", - b"_UHF", - b"_USB", - b"WAAS", - b"WIFI", - b"_WPT", - ) - - def send_able_4hz(self, able): - "enable basic GREIS messages at 4Hz" - - self.expect_statement_identifier = 'greis' - - # the messages we want - # [SX] requires 3.7 firmware, we use [SI] to support 3.6 - messages = b"jps/{RT,UO,GT,PV,SG,DP,SI,EL,AZ,EC,SS,ET}" - - if able: - # Message rate must be an integer multiple of /par/raw/msint - # Default msint is 0.100 seconds, so that must be changed first - self.gps_send(b"%msint%set,/par/raw/msint,250") - - self.gps_send(b"%greis%em,," + messages + b":0.25") - else: - self.gps_send(b"%greis%dm,," + messages) - - def send_able_comp(self, able): - "dis/enable COMPASS, aka BeiDou" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/comp," + en_dis) - - def send_able_constellations(self, able): - "dis/enable all constellations" - self.expect_statement_identifier = 'cons7' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons1%set,/par/pos/sys/comp," + en_dis) - self.gps_send(b"%cons2%set,/par/pos/sys/gal," + en_dis) - # this will fail on TR-G2H, as it has no GLONASS - self.gps_send(b"%cons3%set,/par/pos/sys/glo," + en_dis) - self.gps_send(b"%cons4%set,/par/pos/sys/gps," + en_dis) - self.gps_send(b"%cons5%set,/par/pos/sys/irnss," + en_dis) - self.gps_send(b"%cons6%set,/par/pos/sys/sbas," + en_dis) - self.gps_send(b"%cons7%set,/par/pos/sys/qzss," + en_dis) - - def send_able_defmsg(self, able): - "dis/enable default messages at 1Hz" - self.expect_statement_identifier = 'defmsg' - if able: - self.gps_send(b"%defmsg%em,,jps/RT,/msg/def:1,jps/ET") - else: - # leave RT and ET to break less? - self.gps_send(b"%defmsg%dm,,/msg/def:1") - - def send_able_gal(self, able): - "dis/enable GALILEO" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/gal," + en_dis) - - def send_able_glo(self, able): - "dis/enable GLONASS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/glo," + en_dis) - - def send_able_gps(self, able): - "dis/enable GPS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/gps," + en_dis) - - def send_able_ipr(self, able): - "dis/enable all Integer Psuedo-Range messages" - self.expect_statement_identifier = 'em' - if able: - self.gps_send(b"%em%em,,jps/{rx,rc,r1,r2,r3,r5,rl}:0.25") - else: - self.gps_send(b"%em%dm,,jps/{rx,rc,r1,r2,r3,r5,rl}") - - def send_able_irnss(self, able): - "dis/enable IRNSS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/irnss," + en_dis) - - def send_able_nmea41(self, able): - "dis/enable basic NMEA 4.1e messages at 4Hz" - - self.expect_statement_identifier = 'nmea' - - messages = b"nmea/{GBS,GGA,GSA,GST,GSV,RMC,VTG,ZDA}" - - if able: - # set NMEA version 4.1e - self.gps_send(b"%nmeaver%set,/par/nmea/ver,v4.1e") - - # Message rate must be an integer multiple of /par/raw/msint - # Default msint is 0.100 seconds, so that must be changed first - self.gps_send(b"%msint%set,/par/raw/msint,250") - - # now we can set the messages we want - self.gps_send(b"%nmea%em,," + messages + b":0.25") - else: - # disable - self.gps_send(b"%nmea%dm,," + messages) - - def send_able_sbas(self, able): - "dis/enable SBAS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/sbas," + en_dis) - - def send_able_qzss(self, able): - "dis/enable QZSS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/qzss," + en_dis) - - def send_able_snr(self, able): - "dis/enable all SNR messages, except [EC]" - self.expect_statement_identifier = 'em' - if able: - self.gps_send(b"%em%em,,jps/{E1,E2,E3,E5,El}:0.25") - else: - self.gps_send(b"%em%dm,,jps/{E1,E2,E3,E5,El}") - - able_commands = { - # en/disable basic GREIS messages at 4HZ - "4HZ": {"command": send_able_4hz, - "help": "basic GREIS messages at 4Hz"}, - # en/disable all constellations - "CONS": {"command": send_able_constellations, - "help": "all constellations"}, - # en/disable COMPASS, aka Beidou - "COMPASS": {"command": send_able_comp, - "help": "COMPASS"}, - # en/disable default message set. - "DEFMSG": {"command": send_able_defmsg, - "help": "default message set at 1Hz"}, - # en/disable GALILEO - "GALILEO": {"command": send_able_gal, - "help": "GALILEO"}, - # en/disable GLONASS - "GLONASS": {"command": send_able_glo, - "help": "GLONASS"}, - # en/disable GPS - "GPS": {"command": send_able_gps, - "help": "GPS"}, - # en/disable Integer Psuedo Range messages - "IPR": {"command": send_able_ipr, - "help": "all Integer Psuedo Range messages"}, - # en/disable IRNSS - "IRNSS": {"command": send_able_irnss, - "help": "IRNSS"}, - # en/disable NMEA 4.1e - "NMEA": {"command": send_able_nmea41, - "help": "basic messages NMEA 4.1 at 4Hz"}, - # en/disable SBAS - "SBAS": {"command": send_able_sbas, - "help": "SBAS"}, - # en/disable all SNRs - "SNR": {"command": send_able_snr, - "help": "all SNR messages, except [EC]"}, - # en/disable QZSS - "QZSS": {"command": send_able_qzss, - "help": "QZSS"}, - } - - def send_coldboot(self): - "Delete NVRAM (almanac, ephemeris, location) and restart" - self.expect_statement_identifier = 'coldboot' - self.gps_send(b"%coldboot%init,/dev/nvm/a") - - def send_constellations(self): - "poll all constellations" - self.expect_statement_identifier = 'cons' - self.gps_send(b"%cons%print,/par/pos/sys:on") - - def send_get_id(self): - "get receiver id" - self.expect_statement_identifier = 'id' - self.gps_send(b"%id%print,/par/rcv/id") - - def send_get_oaf(self): - "poll OAF (GPS opts)" - - self.expect_statement_identifier = 'opts,_WPT' - if VERB_RAW <= opts['verbosity']: - # get list of all opts - self.gps_send(b"%opts,list%list,/par/opts") - - # request opts one at a time from canned list - for s in self.oafs: - self.gps_send(b"%%opts,%s%%print,/par/opts/%s" % (s, s)) - - def send_get_serial(self): - "get receiver serial number" - self.expect_statement_identifier = 'serial' - self.gps_send(b"%serial%print,/par/rcv/sn") - - def send_reset(self): - "reset (reboot) the GPS" - self.expect_statement_identifier = 'reset' - self.gps_send(b"%reset%set,/par/reset,y") - - def send_set_dm(self): - "disable all messages" - self.expect_statement_identifier = 'dm' - self.gps_send(b"%dm%dm") - - def send_set_ipr(self): - "poll Integer Psuedo-Range messages" - self.expect_statement_identifier = 'out' - self.gps_send(b"%out%out,,jps/{rx,rc,r1,r2,r3,r5,rl}") - - def send_get_snr(self): - "poll all SNR messages" - # nothing we can wait on, depending on GPS model/configuration - # we may never see some of E2, E3, E5 or El - self.gps_send(b"%out%out,,jps/{EC,E1,E2,E3,E5,El}") - - def send_set_speed(self, set_speed): - "change GPS speed" - self.expect_statement_identifier = 'setspeed' - self.gps_send(b"%%setspeed%%set,/par/cur/term/rate,%d" % - set_speed) - - def send_get_vendor(self): - "get receiver vendor" - self.expect_statement_identifier = 'vendor' - self.gps_send(b"%vendor%print,/par/rcv/vendor") - - def send_get_ver(self): - "get receiver version, per section 4.4.3 of the specification" - self.expect_statement_identifier = 'ver' - self.gps_send(b"%ver%print,/par/rcv/ver") - - # list of canned commands that can be sent to the receiver - commands = { - "COLDBOOT": {"command": send_coldboot, - "help": "cold boot the GPS"}, - "CONS": {"command": send_constellations, - "help": "poll enabled constellations"}, - "DM": {"command": send_set_dm, - "help": "disable all periodic messages"}, - "ID": {"command": send_get_id, - "help": "poll receiver ID"}, - "IPR": {"command": send_set_ipr, - "help": "poll all Integer Psuedo-range messages"}, - "OAF": {"command": send_get_oaf, - "help": "poll all OAF options"}, - "RESET": {"command": send_reset, - "help": "reset (reboot) the GPS"}, - "SERIAL": {"command": send_get_serial, - "help": "poll receiver serial number"}, - "SNR": {"command": send_get_snr, - "help": "poll all SNR messages"}, - "VENDOR": {"command": send_get_vendor, - "help": "poll GPS vendor"}, - "VER": {"command": send_get_ver, - "help": "poll GPS version"}, - } - - -class gps_io(object): - """All the GPS I/O in one place" - - Three types of GPS I/O - 1. read only from a file - 2. read/write through a device - 3. read only from a gpsd instance - """ - - out = b'' - ser = None - input_is_device = False - - def __init__(self, serial_class): - "Initialize class" - - Serial = serial_class - # buffer to hold read data - self.out = b'' - - # open the input: device, file, or gpsd - if opts['input_file_name'] is not None: - # check if input file is a file or device - try: - mode = os.stat(opts['input_file_name']).st_mode - except OSError: - sys.stderr.write('%s: failed to open input file %s\n' % - (PROG_NAME, opts['input_file_name'])) - sys.exit(1) - - if stat.S_ISCHR(mode): - # character device, need not be read only - self.input_is_device = True - - if ((opts['disable'] or opts['enable'] or opts['poll'] or - opts['oaf_name'])): - - # check that we can write - if opts['read_only']: - sys.stderr.write('%s: read-only mode, ' - 'can not send commands\n' % PROG_NAME) - sys.exit(1) - if self.input_is_device is False: - sys.stderr.write('%s: input is plain file, ' - 'can not send commands\n' % PROG_NAME) - sys.exit(1) - - if opts['target']['server'] is not None: - # try to open local gpsd - try: - self.ser = gps.gpscommon(host=None) - self.ser.connect(opts['target']['server'], - opts['target']['port']) - - # alias self.ser.write() to self.write_gpsd() - self.ser.write = self.write_gpsd - # ask for raw, not rare, data - data_out = b'?WATCH={' - if opts['target']['device'] is not None: - # add in the requested device - data_out += (b'"device":"' + opts['target']['device'] + - b'",') - data_out += b'"enable":true,"raw":2}\r\n' - if VERB_RAW <= opts['verbosity']: - print("sent: ", data_out) - self.ser.send(data_out) - except socket.error as err: - sys.stderr.write('%s: failed to connect to gpsd %s\n' % - (PROG_NAME, err)) - sys.exit(1) - - elif self.input_is_device: - # configure the serial connections (the parameters refer to - # the device you are connecting to) - - try: - self.ser = Serial.Serial( - baudrate=opts['input_speed'], - # 8N1 is GREIS default - bytesize=Serial.EIGHTBITS, - parity=Serial.PARITY_NONE, - port=opts['input_file_name'], - stopbits=Serial.STOPBITS_ONE, - # read timeout - timeout=0.05, - # pyserial Ver 3.0+ changes writeTimeout to write_timeout - # just set both - write_timeout=0.5, - writeTimeout=0.5, - ) - except Serial.serialutil.SerialException: - # this exception happens on bad serial port device name - sys.stderr.write('%s: failed to open serial port "%s"\n' - ' Your computer has these serial ports:\n' - % (PROG_NAME, opts['input_file_name'])) - - # print out list of supported ports - import serial.tools.list_ports as List_Ports - ports = List_Ports.comports() - for port in ports: - sys.stderr.write(" %s: %s\n" % - (port.device, port.description)) - sys.exit(1) - - # flush input buffer, discarding all its contents - self.ser.flushInput() - - else: - # Read from a plain file of GREIS messages - try: - self.ser = open(opts['input_file_name'], 'rb') - except IOError: - sys.stderr.write('%s: failed to open input %s\n' % - (PROG_NAME, opts['input_file_name'])) - sys.exit(1) - - def read(self, read_opts): - "Read from device, until timeout or expected message" - - # are we expecting a certain message? - if gps_model.expect_statement_identifier: - # assume failure, until we see expected message - ret_code = 1 - else: - # not expecting anything, so OK if we did not see it. - ret_code = 0 - - try: - if read_opts['target']['server'] is not None: - # gpsd input - start = time.clock() - while read_opts['input_wait'] > (time.clock() - start): - # First priority is to be sure the input buffer is read. - # This is to prevent input buffer overuns - if 0 < self.ser.waiting(): - # We have serial input waiting, get it - # No timeout possible - # RTCM3 JSON can be over 4.4k long, so go big - new_out = self.ser.sock.recv(8192) - if raw is not None: - # save to raw file - raw.write(new_out) - self.out += new_out - - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if ((gps_model.expect_statement_identifier and - (gps_model.expect_statement_identifier == - gps_model.last_statement_identifier))): - # Got what we were waiting for. Done? - ret_code = 0 - if not read_opts['input_forced_wait']: - # Done - break - - elif self.input_is_device: - # input is a serial device - start = time.clock() - while read_opts['input_wait'] > (time.clock() - start): - # First priority is to be sure the input buffer is read. - # This is to prevent input buffer overuns - # pyserial 3.0 replaces inWaiting() with in_waiting - if 0 < self.ser.inWaiting(): - # We have serial input waiting, get it - # 1024 is comfortably large, almost always the - # Read timeout is what causes ser.read() to return - new_out = self.ser.read(1024) - if raw is not None: - # save to raw file - raw.write(new_out) - self.out += new_out - - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if ((gps_model.expect_statement_identifier and - (gps_model.expect_statement_identifier == - gps_model.last_statement_identifier))): - # Got what we were waiting for. Done? - ret_code = 0 - if not read_opts['input_forced_wait']: - # Done - break - else: - # ordinary file, so all read at once - self.out += self.ser.read() - if raw is not None: - # save to raw file - raw.write(self.out) - - while True: - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if 0 >= consumed: - break - - except IOError: - # This happens on a good device name, but gpsd already running. - # or if USB device unplugged - sys.stderr.write('%s: failed to read %s\n' - '%s: Is gpsd already holding the port?\n' - % (PROG_NAME, PROG_NAME, - read_opts['input_file_name'])) - return 1 - - if 0 < ret_code: - # did not see the message we were expecting to see - sys.stderr.write('%s: waited %0.2f seconds for, ' - 'but did not get: %%%s%%\n' - % (PROG_NAME, read_opts['input_wait'], - gps_model.expect_statement_identifier)) - return ret_code - - def write_gpsd(self, data): - "write data to gpsd daemon" - - # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. - # Input data is binary, converting to hex doubles its size. - # Limit binary data to length 255, so hex data length less than 510. - if 255 < len(data): - sys.stderr.write('%s: trying to send %d bytes, max is 255\n' - % (PROG_NAME, len(data))) - return 1 - - if opts['target']['device'] is not None: - # add in the requested device - data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' - else: - data_out = b'?DEVICE={' - - # Convert binary data to hex and build the message. - data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' - if VERB_RAW <= opts['verbosity']: - print("sent: ", data_out) - self.ser.send(data_out) - return 0 - - -def usage(): - "Print usage information, and exit" - - print("usage: %s [-?hrVW] [-c C] [-d D] [-e E] [-f F] [-O O] [-p P]\n" - " [-R R] [-S S] [-s S] [-v V] [-w W]\n" - " [server[:port[:device]]]\n\n" % PROG_NAME) - print('usage: %s [options]\n' - ' -? print this help\n' - ' -c C send command C to GPS\n' - ' -d D disable D\n' - ' -e E enable E\n' - ' -f F open F as file/device\n' - ' default: %s\n' - ' -h print this help\n' - ' -O O send OAF file to GPS\n' - ' -p P send preset GPS command P\n' - ' -R R save raw data from GPS in file R\n' - ' -r open file/device read only\n' - ' default: %s\n' - ' -S S configure GPS speed to S\n' - ' -s S set port speed to S\n' - ' default: %d bps\n' - ' -V print version\n' - ' -v V Set verbosity level to V, 0 to 4\n' - ' default: %d\n' - ' -W force entire wait time, no exit early\n' - ' -w W wait time, exit early on -p result\n' - ' default: %s seconds\n' - ' [server[:port[:device]]] Connect to gpsd\n' - ' default port: 2947\n' - ' default device: None\n' - '\n' - 'D and E can be one of:' % - (PROG_NAME, opts['input_file_name'], opts['raw_file'], - opts['input_speed'], opts['verbosity'], opts['input_wait']) - ) - - # print list of enable/disable commands - for item in sorted(gps_model.able_commands.keys()): - print(" %-12s %s" % (item, gps_model.able_commands[item]["help"])) - - print('\nthe preset GPS command P can be one of:') - - # print list of possible canned commands - for item in sorted(gps_model.commands.keys()): - print(" %-12s %s" % (item, gps_model.commands[item]["help"])) - print('\nOptions can be placed in the ZERKOPTS environment variable.\n' - 'ZERKOPTS is processed before the CLI options.') - sys.exit(0) - - -# create the GREIS instance -gps_model = greis() - -if 'ZERKOPTS' in os.environ: - # grab the ZERKOPTS environment variable for options - opts['progopts'] = os.environ['ZERKOPTS'] - options = opts['progopts'].split(' ') + sys.argv[1:] -else: - options = sys.argv[1:] - -try: - (options, arguments) = getopt.getopt(options, - "?c:d:e:f:hrp:s:w:v:O:R:S:WV") -except getopt.GetoptError as err: - sys.stderr.write("%s: %s\n" - "Try '%s -h' for more information.\n" % - (PROG_NAME, str(err), PROG_NAME)) - sys.exit(2) - -for (opt, val) in options: - if opt == '-c': - # command - opts['command'] = val - elif opt == '-d': - # disable - opts['disable'] = val - elif opt == '-e': - # enable - opts['enable'] = val - elif opt == '-f': - # file input - opts['input_file_name'] = val - elif opt == '-h' or opt == '-?': - # help - usage() - elif opt == '-p': - # preprogrammed command - opts['poll'] = val - elif opt == '-r': - # read only - opts['read_only'] = True - elif opt == '-s': - # serial port speed - opts['input_speed'] = int(val) - if opts['input_speed'] not in gps_model.speeds: - sys.stderr.write('%s: -s invalid speed %s\n' % - (PROG_NAME, opts['input_speed'])) - sys.exit(1) - - elif opt == '-w': - # max wait time, seconds - opts['input_wait'] = float(val) - elif opt in '-v': - # verbosity level - opts['verbosity'] = int(val) - elif opt in '-O': - # OAF .jpo file - opts['oaf_name'] = val - elif opt in '-R': - # raw log file - opts['raw_file'] = val - elif opt in '-S': - # set GPS serial port speed - opts['set_speed'] = int(val) - if opts['set_speed'] not in gps_model.speeds: - sys.stderr.write('%s: -S invalid speed %s\n' % - (PROG_NAME, opts['set_speed'])) - sys.exit(1) - - elif opt == '-W': - # forced wait, no early exit on command completion - opts['input_forced_wait'] = True - elif opt == '-V': - # version - sys.stderr.write('zerk: Version %s\n' % gps_version) - sys.exit(0) - -if opts['input_file_name'] is None: - # no input file given - # default to local gpsd - opts['target']['server'] = "localhost" - opts['target']['port'] = gps.GPSD_PORT - opts['target']['device'] = None - if arguments: - # server[:port[:device]] - arg_parts = arguments[0].split(':') - opts['target']['server'] = arg_parts[0] - if 1 < len(arg_parts): - opts['target']['port'] = arg_parts[1] - if 2 < len(arg_parts): - opts['target']['device'] = arg_parts[2] - -elif arguments: - sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) - sys.exit(1) - -if VERB_PROG <= opts['verbosity']: - # dump all options - print('Options:') - for option in sorted(opts): - print(" %s: %s" % (option, opts[option])) - -# done parsing arguments from environment and CLI - -try: - # raw log file requested? - raw = None - if opts['raw_file']: - try: - raw = open(opts['raw_file'], 'w') - except IOError: - sys.stderr.write('%s: failed to open raw file %s\n' % - (PROG_NAME, opts['raw_file'])) - sys.exit(1) - - # create the I/O instance - io_handle = gps_io(serial) - - # keep it simple, only one of -O, -c -d -e or -S - if opts['oaf_name'] is not None: - # parse an OAF file - try: - oaf_root = xml.etree.ElementTree.parse(opts['oaf_name']).getroot() - oaf = dict() - for tag in ('id', 'oaf', 'hash'): - oaf[tag] = oaf_root.find(tag).text - oaf['oaf'] = oaf['oaf'].split('\n') - if VERB_PROG <= opts['verbosity']: - print(oaf) - except xml.etree.ElementTree.ParseError: - sys.stderr.write('%s: failed to parse OAF "%s"\n' - % (PROG_NAME, opts['oaf_name'])) - sys.exit(1) - except IOError: - sys.stderr.write('%s: failed to read OAF "%s"\n' - % (PROG_NAME, opts['oaf_name'])) - sys.exit(1) - - # calculate hash - oaf_s = '\n'.join(oaf['oaf']) - hash_s = hashlib.sha1(oaf_s).hexdigest() - if hash_s != oaf['hash']: - sys.stderr.write('%s: OAF bad hash "%s", s/b %s\n' - % (PROG_NAME, hash_s, oaf['hash'])) - sys.exit(1) - - # TODO: probably should check the ID first... - # TODO: prolly should send one command per handshake - # blasting all commands at once, seems to not work reliably - for command in oaf['oaf']: - time.sleep(0.1) # wait 0.1 seconds each - gps_model.gps_send(command) - # this will detect when it is all done - gps_model.gps_send(b'%DONE%') - gps_model.expect_statement_identifier = 'DONE' - - elif opts['command'] is not None: - # zero length is OK to send - if 1 < len(opts['command']) and '%' != opts['command'][0]: - # add ID, if missing - gps_model.expect_statement_identifier = 'CMD' - opts['command'] = "%CMD%" + opts['command'] - - # add trailing new line - opts['command'] += "\n" - - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) - gps_model.gps_send(opts['command']) - - elif opts['disable'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) - if opts['disable'] in gps_model.able_commands: - command = gps_model.able_commands[opts['disable']] - command["command"](gps_model, 0) - else: - sys.stderr.write('%s: disable %s not found\n' % - (PROG_NAME, opts['disable'])) - sys.exit(1) - - elif opts['enable'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) - if opts['enable'] in gps_model.able_commands: - command = gps_model.able_commands[opts['enable']] - command["command"](gps_model, 1) - else: - sys.stderr.write('%s: enable %s not found\n' % - (PROG_NAME, opts['enable'])) - sys.exit(1) - - elif opts['poll'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) - if opts['poll'] in gps_model.commands: - command = gps_model.commands[opts['poll']] - command["command"](gps_model) - else: - sys.stderr.write('%s: poll %s not found\n' % - (PROG_NAME, opts['poll'])) - sys.exit(1) - - elif opts['set_speed'] is not None: - gps_model.send_set_speed(opts['set_speed']) - - exit_code = io_handle.read(opts) - - if ((VERB_RAW <= opts['verbosity']) and io_handle.out): - # dump raw left overs - print("Left over data:") - print(io_handle.out) - - sys.stdout.flush() - io_handle.ser.close() - -except KeyboardInterrupt: - print('') - exit_code = 1 - -sys.exit(exit_code) diff --git a/ubxtool b/ubxtool new file mode 100755 index 00000000..b8c80e95 --- /dev/null +++ b/ubxtool @@ -0,0 +1,2377 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 +''' +ubxtool -- u-blox configurator and packet decoder + +usage: ubxtool [OPTIONS] [server[:port[:device]]] +''' + +# This file is Copyright (c) 2018 by the GPSD project +# BSD terms apply: see the file COPYING in the distribution root for details. +# +# This code runs compatibly under Python 2 and 3.x for x >= 2. +# Preserve this property! +# +# ENVIRONMENT: +# Options in the UBXOPTS environment variable will be parsed before +# the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' +# +# To see what constellations are enabled: +# ubxtool -p GNSS -f /dev/ttyXX +# +# To disable GALILEO and enable GALILEO: +# ubxtool -d GLONASS -f /dev/ttyXX +# ubxtool -e GALILEO -f /dev/ttyXX +# +# To read GPS messages a log file: +# ubxtool -v 2 -f test/daemon/ublox-neo-m8n.log + +from __future__ import absolute_import, print_function, division + +import binascii # for binascii.hexlify() +import getopt # for getopt.getopt(), to parse CLI options +import os # for os.environ +import socket # for socket.error +import stat # for stat.S_ISBLK() +import struct # for pack() +import sys +import time + +PROG_NAME = 'ubxtool' + +try: + import serial +except ImportError: + # treat serial as special since it is not part of standard Python + sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME) + sys.exit(2) + +try: + import gps +except ImportError: + # PEP8 says local imports last + sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % + PROG_NAME) + sys.exit(2) + +gps_version = '3.18-dev' +if gps.__version__ != gps_version: + sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % + (PROG_NAME, gps_version, gps.__version__)) + sys.exit(1) + + +VERB_QUIET = 0 # quiet +VERB_NONE = 1 # just output requested data and some info +VERB_DECODE = 2 # decode all messages +VERB_INFO = 3 # more info +VERB_RAW = 4 # raw info +VERB_PROG = 5 # program trace + +# dictionary to hold all user options +opts = { + # command to send to GPS, -c + 'command': None, + # command for -d disable + 'disable': None, + # command for -e enable + 'enable': None, + # default input -f file + 'input_file_name': None, + # default forced wait? -W + 'input_forced_wait': False, + # default port speed -s + 'input_speed': 9600, + # default input wait time -w in seconds + 'input_wait': 2.0, + # optional mode to -p P + 'mode': None, + # the name of an OAF file, extension .jpo + 'oaf_name': None, + # poll command -p + 'poll': None, + # raw log file name + 'raw_file': None, + # open port read only -r + 'read_only': False, + # speed to set GPS -S + 'set_speed': None, + # target gpsd (server:port:device) to connect to + 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, + # verbosity level, -v + 'verbosity': VERB_NONE, + # contents of environment variable UBXOPTS + 'progopts': '', +} + + +class ubx(object): + "class to hold u-blox stuff" + + # when a statement identifier is received, it is stored here + last_statement_identifier = None + # expected statement identifier. + expect_statement_identifier = False + + def __init__(self): + pass + + # allowable speeds + speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600, + 4800, 2400, 1200, 600, 300) + + # UBX Satellite Numbering + gnss_id = {0: 'GPS', + 1: 'SBAS', + 2: 'Galileo', + 3: 'BeiDou', + 4: 'IMES', + 5: 'QZSS', + 6: 'GLONASS'} + + def ack_ack(self, buf): + "UBX-ACK-ACK decode" + m_len = len(buf) + if 2 > m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from('> 5) & 0x1f, (u[1] >> 10) & 0x1f, + u[1] >> 15)) + return s + + def cfg_cfg_mask(self, mask): + "decode Mask in UBX-CFG-CFG, return string" + s = '' + if mask & 0x1: + s += 'ioPort ' + if mask & 0x2: + s += 'msgConf ' + if mask & 0x4: + s += 'infMsg ' + if mask & 0x8: + s += 'navConf ' + if mask & 0x10: + s += 'rxmConf ' + if mask & 0x80: + # not on M8030 + s += 'senConf ' + if mask & 0x100: + s += 'rinvConf ' + if mask & 0x200: + s += 'antConf ' + if mask & 0x800: + s += 'logConf ' + if mask & 0x1000: + s += 'ftsConf ' + + return s + + def cfg_cfg(self, buf): + "UBX-CFG-CFG decode" + m_len = len(buf) + if 12 > m_len: + return "Bad Length %s" % m_len + + if 12 == m_len: + u = struct.unpack_from(' m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return "Bad Length %s" % m_len + + values = {0: "Full power", + 1: "Balanced", + 2: "Interval", + 3: "Aggresive with 1Hz", + 4: "Aggresive with 2Hz", + 5: "Aggresive with 4Hz", + 0xff: "Invalid" + } + u = struct.unpack_from(' m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('> 7) & 0x0f + gridToGpsDec = ('UTC', 'GPS', 'Glonass', 'BeiDou') + syncMode = (u[10] >> 11) & 0x03 + s += "gridToGps %s, syncMode %d " % (gridToGpsDec[gridToGps], syncMode) + + return s + + def cfg_usb(self, buf): + "UBX-CFG-USB decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 108 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + substr = buf.split('\0')[0] + s = ' swVersion: %s\n' % (substr) + substr = buf[30:39] + substr = substr.split('\0')[0] + s += ' hwVersion: %s' % (substr[30:39]) + # extensions?? + num_ext = int((m_len - 40) / 30) + i = 0 + while i < num_ext: + loc = 40 + (i * 30) + substr = buf[loc:] + substr = substr.split('\0')[0] + s += '\n extension: %s' % (substr) + i += 1 + return s + + mon_ids = {2: {'str': 'IO', 'name': 'UBX-MON-IO'}, + 4: {'str': 'VER', 'dec': mon_ver, 'name': 'UBX-MON-VER'}, + 6: {'str': 'MSGPP', 'name': 'UBX-MON-MSGPP'}, + 7: {'str': 'RXBUF', 'name': 'UBX-MON-RXBUF'}, + 8: {'str': 'TXBUF', 'name': 'UBX-MON-TXBUF'}, + 9: {'str': 'HW', 'name': 'UBX-MON-HW'}, + 0x0b: {'str': 'HW2', 'name': 'UBX-MON-HW2'}, + 0x21: {'str': 'RXR', 'name': 'UBX-MON-RXR'}, + 0x27: {'str': 'PATCH', 'name': 'UBX-MON-PATCH'}, + 0x28: {'str': 'GNSS', 'name': 'UBX-MON-GNSS'}, + 0x2e: {'str': 'SMGR', 'name': 'UBX-MON-SMGR'}, + } + + def nav_clock(self, buf): + "UBX-NAV-CLOCK decode, Clock Solution" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 20 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('> 4)) + if 0x40 & u[6]: + s += 'diffCorr ' + if 0x80 & u[6]: + s += 'smoothed ' + s += 'orbitSource %u ' % (0x07 & (u[6] >> 8)) + if 0x800 & u[6]: + s += 'ephAvail ' + if 0x1000 & u[6]: + s += 'almAvail ' + if 0x2000 & u[6]: + s += 'anoAvail ' + if 0x4000 & u[6]: + s += 'aopAvail ' + + if 0x730000 & u[6]: + s += '\n' + if 0x10000 & u[6]: + s += 'sbasCorrused ' + if 0x20000 & u[6]: + s += 'rtcmCorrused ' + if 0x1000000 & u[6]: + s += 'prCorrused ' + if 0x2000000 & u[6]: + s += 'crCorrused ' + if 0x4000000 & u[6]: + s += 'doCorrused ' + m_len -= 12 + i += 1 + + return s + + def nav_sbas(self, buf): + "UBX-NAV-SBAS decode" + m_len = len(buf) + if 0 == m_len: + return " Poll request" + + if 12 > m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return "Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return ".Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len: + return " Bad Length %s" % m_len + + u = struct.unpack_from('> 2) & 0x03 + if 0 == raim: + s += "RAIM not available" + elif 1 == raim: + s += "RAIM not active" + elif 2 == raim: + s += "RAIM active" + else: + s += "RAIM ??" + return s + + tim_ids = {1: {'str': 'TP', 'dec': tim_tp, 'name': 'UBX-TIM-TP'}, + 3: {'str': 'TM2', 'name': 'UBX-TIM-TM2'}, + 4: {'str': 'SVIN', 'dec': tim_svin, 'name': 'UBX-TIM-SVIN'}, + 6: {'str': 'VRFY', 'name': 'UBX-TIM-VRFY'}, + 0x11: {'str': 'DOSC', 'name': 'UBX-TIM-DOSC'}, + 0x12: {'str': 'TOS', 'name': 'UBX-TIM-TOS'}, + 0x13: {'str': 'SMEAS', 'name': 'UBX-TIM-SMEAS'}, + 0x15: {'str': 'VCOCAL', 'name': 'UBX-TIM-VCOCAL'}, + 0x16: {'str': 'FCHG', 'name': 'UBX-TIM-FCHG'}, + 0x17: {'str': 'HOC', 'name': 'UBX-TIM-HOC'}, + } + + classes = { + 0x01: {'str': 'NAV', 'ids': nav_ids}, + 0x02: {'str': 'RXM', 'ids': rxm_ids}, + 0x04: {'str': 'INF', 'ids': inf_ids}, + 0x05: {'str': 'ACK', 'ids': ack_ids}, + 0x06: {'str': 'CFG', 'ids': cfg_ids}, + 0x09: {'str': 'UPD'}, + 0x0A: {'str': 'MON', 'ids': mon_ids}, + 0x0B: {'str': 'ATD'}, + 0x0D: {'str': 'TIM', 'ids': tim_ids}, + 0x10: {'str': 'ESF'}, + 0x13: {'str': 'MGA'}, + 0x21: {'str': 'LOG'} + } + + def class_id_s(self, m_class, m_id): + "Return class and ID numbers as a string." + s = 'Class: ' + if m_class not in self.classes: + s += '%#x ID: %#x' % (m_class, m_id) + return s + + if 'str' in self.classes[m_class]: + s_class = self.classes[m_class]['str'] + s += '%s(%#x) ' % (s_class, m_class) + else: + s += '%#x ' % (m_class) + + if (('ids' in self.classes[m_class] and + m_id in self.classes[m_class]['ids'] and + 'str' in self.classes[m_class]['ids'][m_id])): + s_id = self.classes[m_class]['ids'][m_id]['str'] + s += 'ID: %s(%#x)' % (s_id, m_id) + else: + s += 'ID: %#x' % (m_id) + + return s + + def decode_msg(self, out): + "Decode one message and then return number of chars consumed" + + state = 'BASE' + consumed = 0 + + # decode state machine + for this_byte in out: + consumed += 1 + if isinstance(this_byte, str): + # a character, probably read from a file + c = ord(this_byte) + else: + # a byte, probably read from a serial port + c = int(this_byte) + + if VERB_RAW <= opts['verbosity']: + if (ord(' ') <= c) and (ord('~') >= c): + # c is printable + print("state: %s char %c (%#x)" % (state, chr(c), c)) + else: + # c is not printable + print("state: %s char %#x" % (state, c)) + + if 'BASE' == state: + # start fresh + # place to store 'comments' + comment = '' + m_class = 0 + m_id = 0 + m_len = 0 + m_raw = bytearray(0) # class, id, len, payload + m_payload = bytearray(0) # just the payload + m_ck_a = 0 + m_ck_b = 0 + + if 0xb5 == c: + # got header 1, mu + state = 'HEADER1' + + if ord('$') == c: + # got $, so NMEA? + state = 'NMEA' + comment = '$' + + if ord("{") == c: + # JSON, treat as comment line + state = 'JSON' + + # start fresh + comment = "{" + continue + + if ord("#") == c: + # comment line + state = 'COMMENT' + + # start fresh + comment = "#" + continue + + if (ord('\n') == c) or (ord('\r') == c): + # CR or LF, leftovers + return 1 + continue + + if state in ('COMMENT', 'JSON'): + # inside comment + if ord('\n') == c or ord('\r') == c: + # Got newline or linefeed + # terminate messages on or + # Done, got a full message + if gps.polystr('{"class":"ERROR"') in comment: + # always print gpsd errors + print(comment) + elif VERB_DECODE <= opts['verbosity']: + print(comment) + return consumed + else: + comment += chr(c) + continue + + if 'NMEA' == state: + # getting NMEA payload + if (ord('\n') == c) or (ord('\r') == c): + # CR or LF, done, got a full message + # terminates messages on or + if VERB_DECODE <= opts['verbosity']: + print(comment + '\n') + return consumed + else: + comment += chr(c) + continue + + if ord('b') == c and 'HEADER1' == state: + # got header 2 + state = 'HEADER2' + continue + + if 'HEADER2' == state: + # got class + state = 'CLASS' + m_class = c + m_raw.extend([c]) + continue + + if 'CLASS' == state: + # got ID + state = 'ID' + m_id = c + m_raw.extend([c]) + continue + + if 'ID' == state: + # got first length + state = 'LEN1' + m_len = c + m_raw.extend([c]) + continue + + if 'LEN1' == state: + # got second length + m_raw.extend([c]) + m_len += 256 * c + if 0 == m_len: + # no payload + state = 'CSUM1' + else: + state = 'PAYLOAD' + continue + + if 'PAYLOAD' == state: + # getting payload + m_raw.extend([c]) + m_payload.extend([c]) + if len(m_payload) == m_len: + state = 'CSUM1' + continue + + if 'CSUM1' == state: + # got ck_a + state = 'CSUM2' + m_ck_a = c + continue + + if 'CSUM2' == state: + # ck_b + state = 'BASE' + m_ck_b = c + # check checksum + chk = self.checksum(m_raw, len(m_raw)) + if (chk[0] != m_ck_a) or (chk[1] != m_ck_b): + sys.stderr.write("%s: ERROR checksum failed," + "was (%d,%d) s/b (%d, %d)\n" % + (PROG_NAME, m_ck_a, m_ck_b, + chk[0], chk[1])) + + s_payload = ''.join('{:02x} '.format(x) for x in m_payload) + x_payload = binascii.hexlify(m_payload) + + if m_class in self.classes: + this_class = self.classes[m_class] + if 'ids' in this_class: + if m_id in this_class['ids']: + if 'dec' in this_class['ids'][m_id]: + dec = this_class['ids'][m_id]['dec'] + s_payload = this_class['ids'][m_id]['name'] + s_payload += ':\n' + s_payload += dec(self, m_payload) + else: + s_payload = x_payload + + if VERB_INFO < opts['verbosity']: + print("%s, len: %#x" % + (self.class_id_s(m_class, m_id), m_len)) + if VERB_RAW < opts['verbosity']: + print("payload: %s" % x_payload) + print("%s\n" % s_payload) + return consumed + + # give up + state = 'BASE' + + # fell out of loop, no more chars to look at + return 0 + + def checksum(self, msg, m_len): + "Calculate u-blox message checksum" + # the checksum is calculated over the Message, starting and including + # the CLASS field, up until, but excluding, the Checksum Field: + + ck_a = 0 + ck_b = 0 + for c in msg[0:m_len]: + ck_a += c + ck_b += ck_a + + return [ck_a & 0xff, ck_b & 0xff] + + def make_pkt(self, m_class, m_id, m_data): + "Make a message packet" + # always little endian, leader, class, id, length + m_len = len(m_data) + + # build core message + msg = bytearray(m_len + 6) + struct.pack_into('>= 8 + m_data[21] = seconds & 0x0ff + seconds >>= 8 + m_data[22] = seconds & 0x0ff + seconds >>= 8 + m_data[23] = seconds & 0x0ff + + # Survey-in position accuracy limit 500 mm + mmeters = 500 + m_data[24] = mmeters & 0x0ff + mmeters >>= 8 + m_data[25] = mmeters & 0x0ff + mmeters >>= 8 + m_data[26] = seconds & 0x0ff + seconds >>= 8 + m_data[27] = mmeters & 0x0ff + gps_model.gps_send(6, 0x3d, m_data) + + def send_able_tp(self, able): + "dis/enable UBX-TIM-TP Time Pulse" + rate = 1 if able else 0 + m_data = bytearray([0xd, 0x1, rate]) + gps_model.gps_send(6, 1, m_data) + + def send_cfg_ant(self): + "UBX-CFG-ANT, Get Antenna Control Settings" + + m_data = bytearray(0) + gps_model.gps_send(6, 0x13, m_data) + + def send_cfg_cfg(self, save_clear): + "UBX-CFG-CFG, save config" + + # Save: save_clear = 0 + # Clear: save_clear = 1 + + # basic configs alays available to change: + # ioPort, msgConf, infMsg, navConf, rxmConf, senConf + cfg1 = 0x9f + # rinvConf, antConf, logConf + cfg2 = 0x03 + + m_data = bytearray(13) + + # clear + if 0 == save_clear: + # saving + m_data[0] = 0 + m_data[1] = 0 + else: + # clearing + m_data[0] = cfg1 + m_data[1] = cfg2 + m_data[2] = 0 # + m_data[3] = 0 # + + # save + if 0 == type: + # saving + m_data[4] = cfg1 + m_data[5] = cfg2 + else: + # clearing + m_data[4] = 0 + m_data[5] = 0 + m_data[6] = 0 # + m_data[7] = 0 # + + # load + if False and 0 == type: + # saving + m_data[8] = 0 + m_data[9] = 0 + else: + # clearing, load it to save a reboot + m_data[8] = cfg1 + m_data[9] = cfg2 + m_data[10] = 0 # + m_data[11] = 0 # + + # deviceMask, where to save it, try all options + m_data[12] = 0x17 # devBBR, devFLASH devEEPROM, devSpiFlash + + gps_model.gps_send(6, 0x9, m_data) + + def send_cfg_gnss1(self, gnssId, enable): + "UBX-CFG-GNSS, set GNSS config" + m_data = bytearray(12) + m_data[0] = 0 # version 0, msgVer + m_data[1] = 0 # read only, numTrkChHw + m_data[2] = 0xFF # read only, numTrkChUse + m_data[3] = 1 # 1 block follows + # block 1 + m_data[4] = gnssId # gnssId + if 0 == gnssId: + # GPS + m_data[5] = 8 # resTrkCh + m_data[6] = 16 # maxTrkCh + if 1 == gnssId: + # SBAS + m_data[5] = 1 # resTrkCh + m_data[6] = 3 # maxTrkCh + if 2 == gnssId: + # GALILEO + m_data[5] = 4 # resTrkCh + m_data[6] = 8 # maxTrkCh + if 3 == gnssId: + # BeiDou + m_data[5] = 2 # resTrkCh + m_data[6] = 16 # maxTrkCh + if 4 == gnssId: + # IMES + m_data[5] = 0 # resTrkCh + m_data[6] = 8 # maxTrkCh + if 5 == gnssId: + # QZSS + m_data[5] = 0 # resTrkCh + m_data[6] = 3 # maxTrkCh + if 6 == gnssId: + # GLONASS + m_data[5] = 8 # resTrkCh + m_data[6] = 14 # maxTrkCh + m_data[7] = 0 # reserved1 + m_data[8] = enable # flags + m_data[9] = 0 # flagflags, unused + if 5 == gnssId: + # QZSS + m_data[10] = 5 # flags E1OS, L1SAIF + else: + m_data[10] = 1 # flags E1OS + m_data[11] = 1 # flags, unused + gps_model.gps_send(6, 0x3e, m_data) + + def send_cfg_gnss(self): + "UBX-CFG-GNSS, set Galileo config decode" + m_data = bytearray(0) + gps_model.gps_send(6, 0x3e, m_data) + + def send_cfg_nav5_model(self): + "UBX-CFG-NAV5, set dynamic platform model" + + m_data = bytearray(36) + m_data[0] = 1 # just setting dynamic model + m_data[1] = 0 # just setting dynamic model + m_data[2] = opts["mode"] + + gps_model.gps_send(6, 0x24, m_data) + + def send_cfg_msg(self, m_class, m_id, rate=None): + "UBX-CFG-MSG, poll, or set, message rates decode" + m_data = bytearray(2) + m_data[0] = m_class + m_data[1] = m_id + if rate is not None: + m_data.extend([rate]) + gps_model.gps_send(6, 1, m_data) + + def send_cfg_nav5(self): + "UBX-CFG-NAV5, get Nav Engine Settings" + m_data = bytearray(0) + gps_model.gps_send(6, 0x24, m_data) + + def send_cfg_pms(self): + "UBX-CFG-PMS, Get Power Management Settings" + + if opts["mode"] is not None: + m_data = bytearray(8) + # set powerSetupValue to mode + m_data[1] = opts["mode"] + # leave period and onTime zero, which breaks powerSetupValue = 3 + else: + m_data = bytearray(0) + + gps_model.gps_send(6, 0x86, m_data) + + def send_cfg_prt(self): + "UBX-CFG-PRT, get I/O Port" + m_data = bytearray(0) + gps_model.gps_send(6, 0x0, m_data) + + def send_set_speed(self, speed): + "UBX-CFG-PRT, set port" + + # FIXME! Poll current masks, then adjust speed + m_data = bytearray(20) + m_data[0] = 1 # port 1, UART 1 + # m_data[0] = 3 # port 3, USB + m_data[4] = 0xc0 # 8N1 + m_data[5] = 0x8 # 8N1 + + m_data[8] = speed & 0xff + m_data[9] = (speed >> 8) & 0xff + m_data[10] = (speed >> 16) & 0xff + m_data[11] = (speed >> 24) & 0xff + + m_data[12] = 3 # in, ubx and nmea + m_data[14] = 3 # out, ubx and nmea + gps_model.gps_send(6, 0, m_data) + + def send_cfg_rst(self, reset_type): + "UBX-CFG-RST, reset" + # always do a hardware reset + # if on USB, this will disconnect and reconnect, giving you + # a new tty. + m_data = bytearray(4) + m_data[0] = reset_type & 0xff + m_data[1] = (reset_type >> 8) & 0xff + gps_model.gps_send(6, 0x4, m_data) + + def send_cfg_tmode2(self): + "UBX-CFG-TMODE2, get time mode 2 configuration" + m_data = bytearray(0) + gps_model.gps_send(6, 0x3d, m_data) + + def send_cfg_tp5(self): + "UBX-CFG-TP5, get time0 decodes, timepulse 0 and 1" + m_data = bytearray(0) + gps_model.gps_send(6, 0x31, m_data) + # and timepulse 1 + m_data = bytearray(1) + m_data[0] = 1 + gps_model.gps_send(6, 0x31, m_data) + + def send_cfg_usb(self): + "UBX-CFG-USB, get USB configuration" + m_data = bytearray(0) + gps_model.gps_send(6, 0x1b, m_data) + + def send_mon_ver(self): + "UBX-MON-VER get versions" + m_data = bytearray(0) + gps_model.gps_send(0x0a, 0x04, m_data) + + def send_nav_posecef(self): + "UBX-NAV-POSECEF, poll ECEF position" + m_data = bytearray(0) + gps_model.gps_send(1, 1, m_data) + + def send_nav_velecef(self): + "UBX-NAV-VELECEF, poll ECEF velocity decode" + m_data = bytearray(0) + gps_model.gps_send(1, 0x11, m_data) + + def send_tim_svin(self): + "UBX-TIM-SVIN, get survey in data" + m_data = bytearray(0) + gps_model.gps_send(0x0d, 0x04, m_data) + + def send_tim_tp(self): + "UBX-TIM-TP, get time pulse timedata" + m_data = bytearray(0) + gps_model.gps_send(0x0d, 0x01, m_data) + + able_commands = { + # en/dis able BeiDou + "BEIDOU": {"command": send_able_beidou, + "help": "BeiDou"}, + # en/dis able basic binary messages + "BINARY": {"command": send_able_binary, + "help": "basic binary messages"}, + # en/dis able ECEF + "ECEF": {"command": send_able_ecef, + "help": "ECEF"}, + # en/dis able GPS + "GPS": {"command": send_able_gps, + "help": "GPS and QZSS"}, + # en/dis able GALILEO + "GALILEO": {"command": send_able_galileo, + "help": "GALILEO"}, + # en/dis able GLONASS + "GLONASS": {"command": send_able_glonass, + "help": "GLONASS"}, + # en/dis able basic NMEA messages + "NMEA": {"command": send_able_nmea, + "help": "basic NMEA messages"}, + # en/dis able RAWX + "RAWX": {"command": send_able_rawx, + "help": "RAWX measurements"}, + # en/dis able SBAS + "SBAS": {"command": send_able_sbas, + "help": "SBAS"}, + # en/dis able TP time pulse message + "TP": {"command": send_able_tp, + "help": "TP Time Pulse message"}, + # en/dis able TMODE2 Survey-in + "TMODE2": {"command": send_able_tmode2, + "help": "TMODE2"}, + } + commands = { + # UBX-CFG-ANT, poll antenna config decode + "ANT": {"command": send_cfg_ant, + "help": "UBX-CFG-ANT poll antenna config"}, + # UBX-CFG-RST cold boot + "COLDBOOT": {"command": send_cfg_rst, + "help": "UBS-CFG-RST coldboot the GPS", + "opt": 0xfff}, + # UBX-CFG-GNSS poll gnss config + "GNSS": {"command": send_cfg_gnss, + "help": "UBX-CFG-GNSS poll GNSS config"}, + # UBX-CFG-RST hot boot + "HOTBOOT": {"command": send_cfg_rst, + "help": "UBX-CFG-RST hotboot the GPS", + "opt": 0}, + # UBX-CFG-NAV5 set Dynamic Platform Model + "MODEL": {"command": send_cfg_nav5_model, + "help": "UBX-CFG-NAV5 set Dynamic Platform Model"}, + # UBX-CFG-NAV5, poll Nav Engine Settings + "NAV5": {"command": send_cfg_nav5, + "help": "UBX-CFG-NAV5 poll Nav Engines settings"}, + # UBX-CFG-PMS, poll power management settings + "PMS": {"command": send_cfg_pms, + "help": "UBX-CFG-PMS poll power management settings"}, + # UBX-CFG-PRT, poll I/O port number + "PRT": {"command": send_cfg_prt, + "help": "UBX-CFG-PRT poll I/O port settings"}, + # UBX-CFG-CFG reset config + "RESET": {"command": send_cfg_cfg, + "help": "UBX-CFG-CFG reset config to defaults", + "opt": 1}, + # UBX-CFG-CFG save config + "SAVE": {"command": send_cfg_cfg, + "help": "UBX-CFG-CFG save current config", + "opt": 0}, + # UBX-TIM-SVIN, get survey in data + "SVIN": {"command": send_tim_svin, + "help": "UBX-TIM-SVIN get survey in data"}, + # UBX-CFG-TMODE2, get time mode 2 config + "TMODE2": {"command": send_cfg_tmode2, + "help": "UBX-CFG-TMODE2 get time mode 2 config"}, + # UBX-TIM-TP, get time pulse timedata + "TP": {"command": send_tim_tp, + "help": "UBX-TIM-TP get time pulse timedata"}, + # UBX-CFG-TP5, get time0 decodes + "TP5": {"command": send_cfg_tp5, + "help": "UBX-TIM-TP5 get time pulse decodes"}, + # UBX-CFG-RST warm boot + "WARMBOOT": {"command": send_cfg_rst, + "help": "UBX-CFG-RST warmboot the GPS", + "opt": 1}, + # poll UBX-CFG-USB + "USB": {"command": send_cfg_usb, + "help": "UBX-CFG-USB get USB config"}, + # poll UBX-MON-VER + "VER": {"command": send_mon_ver, + "help": "UBX-MON-VER get GPS version"}, + } + # end class ubx + + +class gps_io(object): + """All the GPS I/O in one place" + + Three types of GPS I/O + 1. read only from a file + 2. read/write through a device + 3. read only from a gpsd instance + """ + + out = b'' + ser = None + input_is_device = False + + def __init__(self, serial_class): + "Initialize class" + + Serial = serial_class + # buffer to hold read data + self.out = b'' + + # open the input: device, file, or gpsd + if opts['input_file_name'] is not None: + # check if input file is a file or device + try: + mode = os.stat(opts['input_file_name']).st_mode + except OSError: + sys.stderr.write('%s: failed to open input file %s\n' % + (PROG_NAME, opts['input_file_name'])) + sys.exit(1) + + if stat.S_ISCHR(mode): + # character device, need not be read only + self.input_is_device = True + + if ((opts['disable'] or opts['enable'] or opts['poll'] or + opts['oaf_name'])): + + # check that we can write + if opts['read_only']: + sys.stderr.write('%s: read-only mode, ' + 'can not send commands\n' % PROG_NAME) + sys.exit(1) + if self.input_is_device is False: + sys.stderr.write('%s: input is plain file, ' + 'can not send commands\n' % PROG_NAME) + sys.exit(1) + + if opts['target']['server'] is not None: + # try to open local gpsd + try: + self.ser = gps.gpscommon(host=None) + self.ser.connect(opts['target']['server'], + opts['target']['port']) + + # alias self.ser.write() to self.write_gpsd() + self.ser.write = self.write_gpsd + + # ask for raw, not rare, data + data_out = b'?WATCH={' + if opts['target']['device'] is not None: + # add in the requested device + data_out += (b'"device":"' + opts['target']['device'] + + b'",') + data_out += b'"enable":true,"raw":2}\r\n' + if VERB_RAW <= opts['verbosity']: + print("sent: ", data_out) + self.ser.send(data_out) + except socket.error as err: + sys.stderr.write('%s: failed to connect to gpsd %s\n' % + (PROG_NAME, err)) + sys.exit(1) + + elif self.input_is_device: + # configure the serial connections (the parameters refer to + # the device you are connecting to) + + try: + self.ser = Serial.Serial( + baudrate=opts['input_speed'], + # 8N1 is GREIS default + bytesize=Serial.EIGHTBITS, + parity=Serial.PARITY_NONE, + port=opts['input_file_name'], + stopbits=Serial.STOPBITS_ONE, + # read timeout + timeout=0.05, + # pyserial Ver 3.0+ changes writeTimeout to write_timeout + # just set both + write_timeout=0.5, + writeTimeout=0.5, + ) + except Serial.serialutil.SerialException: + # this exception happens on bad serial port device name + sys.stderr.write('%s: failed to open serial port "%s"\n' + '%s: Your computer has the serial ports:\n' % + (PROG_NAME, opts['input_file_name'], + PROG_NAME)) + + # print out list of supported ports + import serial.tools.list_ports as List_Ports + ports = List_Ports.comports() + for port in ports: + sys.stderr.write(" %s: %s\n" % + (port.device, port.description)) + sys.exit(1) + + # flush input buffer, discarding all its contents + self.ser.flushInput() + + else: + # Read from a plain file of GREIS messages + try: + self.ser = open(opts['input_file_name'], 'rb') + except IOError: + sys.stderr.write('%s: failed to open input %s\n' % + (PROG_NAME, opts['input_file_name'])) + sys.exit(1) + + def read(self, read_opts): + "Read from device, until timeout or expected message" + + # are we expecting a certain message? + if gps_model.expect_statement_identifier: + # assume failure, until we see expected message + ret_code = 1 + else: + # not expecting anything, so OK if we did not see it. + ret_code = 0 + + try: + if read_opts['target']['server'] is not None: + # gpsd input + start = time.clock() + while read_opts['input_wait'] > (time.clock() - start): + # First priority is to be sure the input buffer is read. + # This is to prevent input buffer overuns + if 0 < self.ser.waiting(): + # We have serial input waiting, get it + # No timeout possible + # RTCM3 JSON can be over 4.4k long, so go big + new_out = self.ser.sock.recv(8192) + if raw is not None: + # save to raw file + raw.write(new_out) + self.out += new_out + + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if ((gps_model.expect_statement_identifier and + (gps_model.expect_statement_identifier == + gps_model.last_statement_identifier))): + # Got what we were waiting for. Done? + ret_code = 0 + if not read_opts['input_forced_wait']: + # Done + break + + elif self.input_is_device: + # input is a serial device + start = time.clock() + while read_opts['input_wait'] > (time.clock() - start): + # First priority is to be sure the input buffer is read. + # This is to prevent input buffer overuns + # pyserial 3.0 replaces inWaiting() with in_waiting + if 0 < self.ser.inWaiting(): + # We have serial input waiting, get it + # 1024 is comfortably large, almost always the + # Read timeout is what causes ser.read() to return + new_out = self.ser.read(1024) + if raw is not None: + # save to raw file + raw.write(new_out) + self.out += new_out + + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if ((gps_model.expect_statement_identifier and + (gps_model.expect_statement_identifier == + gps_model.last_statement_identifier))): + # Got what we were waiting for. Done? + ret_code = 0 + if not read_opts['input_forced_wait']: + # Done + break + else: + # ordinary file, so all read at once + self.out += self.ser.read() + if raw is not None: + # save to raw file + raw.write(self.out) + + while True: + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if 0 >= consumed: + break + + except IOError: + # This happens on a good device name, but gpsd already running. + # or if USB device unplugged + sys.stderr.write('%s: failed to read %s\n' + '%s: Is gpsd already holding the port?\n' + % (PROG_NAME, read_opts['input_file_name'], + PROG_NAME)) + return 1 + + if 0 < ret_code: + # did not see the message we were expecting to see + sys.stderr.write('%s: waited %0.2f seconds for, ' + 'but did not get: %%%s%%\n' + % (PROG_NAME, read_opts['input_wait'], + gps_model.expect_statement_identifier)) + return ret_code + + def write_gpsd(self, data): + "write data to gpsd daemon" + + # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. + # Input data is binary, converting to hex doubles its size. + # Limit binary data to length 255, so hex data length less than 510. + if 255 < len(data): + sys.stderr.write('%s: trying to send %d bytes, max is 255\n' + % (PROG_NAME, len(data))) + return 1 + + if opts['target']['device'] is not None: + # add in the requested device + data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' + else: + data_out = b'?DEVICE={' + + # Convert binary data to hex and build the message. + data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' + if VERB_RAW <= opts['verbosity']: + print("sent: ", data_out) + self.ser.send(data_out) + return 0 + + +# instantiate the GPS class +gps_model = ubx() + + +def usage(): + "Ouput usage information, and exit" + print('usage: %s [-c C] [-f F] [-r] [-p P] [-s S] [-v V]\n' + ' [-hV?] [-S S]\n' + ' -c C send raw command C to GPS\n' + ' -d D disable D\n' + ' -e E enable E\n' + ' -f F open F as file/device\n' + ' default: %s\n' + ' -h print help\n' + ' -m M optional mode to -p P\n' + ' -p P send a prepackaged query P to GPS\n' + ' -R R save raw data from GPS in file R\n' + ' default: %s\n' + ' -r open file/device read only\n' + ' -S S set GPS speed to S\n' + ' -s S set port speed to S\n' + ' default: %s bps\n' + ' -w wait time\n' + ' default: %s seconds\n' + ' -V print version\n' + ' -v V Set verbosity level to V, 0 to 4\n' + ' default: %s\n' + ' -? print help\n' + '\n' + 'D and E can be one of:' % + (PROG_NAME, opts['input_file_name'], opts['raw_file'], + opts['input_speed'], opts['input_wait'], opts['verbosity']) + ) + + for item in sorted(gps_model.able_commands.keys()): + print(" %-12s %s" % (item, gps_model.able_commands[item]["help"])) + + print('\nP can be one of:') + for item in sorted(gps_model.commands.keys()): + print(" %-12s %s" % (item, gps_model.commands[item]["help"])) + print('\nOptions can be placed in the UBXOPTS environment variable.\n' + 'UBXOPTS is processed before the CLI options.') + sys.exit(0) + + +if 'UBXOPTS' in os.environ: + # grab the UBXOPTS environment variable for options + opts['progopts'] = os.environ['UBXOPTS'] + options = opts['progopts'].split(' ') + sys.argv[1:] +else: + options = sys.argv[1:] + + +try: + (options, arguments) = getopt.getopt(options, "?c:d:e:f:hm:rp:s:w:v:R:S:V") +except getopt.GetoptError as err: + sys.stderr.write("%s: %s\n" + "Try '%s -h' for more information.\n" % + (PROG_NAME, str(err), PROG_NAME)) + sys.exit(2) + +for (opt, val) in options: + if opt == '-c': + opts['command'] = val + elif opt == '-d': + opts['disable'] = val + elif opt == '-e': + opts['enable'] = val + elif opt == '-f': + opts['input_file_name'] = val + elif opt == '-h' or opt == '-?': + usage() + elif opt == '-m': + opts['mode'] = int(val) + elif opt == '-p': + opts['poll'] = val + elif opt == '-r': + opts['read_only'] = True + elif opt == '-s': + opts['input_speed'] = int(val) + if opts['input_speed'] not in gps_model.speeds: + sys.stderr.write('%s: -s invalid speed %s\n' % + (PROG_NAME, opts['input_speed'])) + sys.exit(1) + + elif opt == '-w': + opts['input_wait'] = float(val) + elif opt in '-v': + opts['verbosity'] = int(val) + elif opt in '-R': + # raw log file + opts['raw_file'] = val + elif opt in '-S': + opts['set_speed'] = int(val) + if opts['set_speed'] not in gps_model.speeds: + sys.stderr.write('%s: -S invalid speed %s\n' % + (PROG_NAME, opts['set_speed'])) + sys.exit(1) + + elif opt == '-V': + # version + sys.stderr.write('%s: Version %s\n' % (PROG_NAME, gps_version)) + sys.exit(0) + +if opts['input_file_name'] is None: + # no input file given + # default to local gpsd + opts['target']['server'] = "localhost" + opts['target']['port'] = gps.GPSD_PORT + opts['target']['device'] = None + if arguments: + # server[:port[:device]] + parts = arguments[0].split(':') + opts['target']['server'] = parts[0] + if 1 < len(parts): + opts['target']['port'] = parts[1] + if 2 < len(parts): + opts['target']['device'] = parts[2] + +elif arguments: + sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) + sys.exit(1) + +if VERB_PROG <= opts['verbosity']: + # dump all options + print('Options:') + for option in sorted(opts): + print(" %s: %s" % (option, opts[option])) + +# done parsing arguments from environment and CLI + +try: + # raw log file requested? + raw = None + if opts['raw_file']: + try: + raw = open(opts['raw_file'], 'w') + except IOError: + sys.stderr.write('%s: failed to open raw file %s\n' % + (PROG_NAME, opts['raw_file'])) + sys.exit(1) + + # create the I/O instance + io_handle = gps_io(serial) + + sys.stdout.flush() + + if opts['disable'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) + if opts['disable'] in gps_model.able_commands: + command = gps_model.able_commands[opts['disable']] + command["command"](gps, 0) + else: + sys.stderr.write('%s: disable %s not found\n' % + (PROG_NAME, opts['disable'])) + sys.exit(1) + + elif opts['enable'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) + if opts['enable'] in gps_model.able_commands: + command = gps_model.able_commands[opts['enable']] + command["command"](gps, 1) + else: + sys.stderr.write('%s: enable %s not found\n' % + (PROG_NAME, opts['enable'])) + sys.exit(1) + + elif opts['poll'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) + + if 'MODEL' == opts["poll"]: + if opts["mode"] is None: + opts["mode"] = 0 # default to portable model + + if opts['poll'] in gps_model.commands: + command = gps_model.commands[opts['poll']] + if 'opt' in command: + command["command"](gps, command["opt"]) + else: + command["command"](gps) + else: + sys.stderr.write('%s: poll %s not found\n' % + (PROG_NAME, opts['poll'])) + sys.exit(1) + + elif opts['set_speed'] is not None: + gps_model.send_set_speed(opts['set_speed']) + + elif opts['command'] is not None: + # zero length is OK to send + # add trailing new line + opts['command'] += "\n" + + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) + gps_model.gps_send_raw(opts['command']) + + exit_code = io_handle.read(opts) + + if ((VERB_RAW <= opts['verbosity']) and io_handle.out): + # dump raw left overs + print("Left over data:") + print(io_handle.out) + + sys.stdout.flush() + io_handle.ser.close() + +except KeyboardInterrupt: + print('') + exit_code = 1 + +sys.exit(exit_code) diff --git a/zerk b/zerk new file mode 100755 index 00000000..848d216b --- /dev/null +++ b/zerk @@ -0,0 +1,1896 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 +''' +zerk -- GREIS configurator and packet decoder + +usage: zerk [OPTIONS] [server[:port[:device]]] +''' + +# This program conforms to the JAVAD document: +# GNSS Receiver External Interface Specification +# Revised: October 11, 2017 +# +# Hereafter referred to as "the specification" +# +# This file is Copyright (c) 2018 by the GPSD project +# BSD terms apply: see the file COPYING in the distribution root for details. +# +# This code runs compatibly under Python 2 and 3.x for x >= 2. +# Preserve this property! +# +# ENVIRONMENT: +# Options in the ZERKOPTS environment variable will be parsed before +# the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' +# +# example usages: +# Coldboot the GPS: zerk -p COLDBOOT +# Print current serial port: zerk -c "print,/cur/term" +# Decode raw log file: zerk -r -f greis-binary.log -v 2 +# Change GPS port speed: zerk -S 230400 +# Watch entire reset cycle: zerk -p RESET -v 2 -w 20 -W +# poll SVs Status: zerk -W -w 2 -v 2 -c "out,,jps/{CS,ES,GS,Is,WS,QS}" +# dump local gpsd data zerk -v 2 -w 5 localhost +# +# TODO: no CRC16 packets handled yet +# TODO: more packet decodes + +from __future__ import absolute_import, print_function, division + +import binascii # for binascii.hexlify() +import getopt # for getopt.getopt(), to parse CLI options +import hashlib # for hashlib.sha1 +import os # for os.environ +import socket # for socket.error +import stat # for stat.S_ISBLK() +import struct # for pack() +import sys +import time +import xml.etree.ElementTree # to parse .jpo files + +PROG_NAME = 'zerk' + +try: + import serial +except ImportError: + # treat serial as special since it is not part of standard Python + sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME) + sys.exit(2) + +try: + import gps +except ImportError: + # PEP8 says local imports last + sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % + PROG_NAME) + sys.exit(2) + +gps_version = '3.18-dev' +if gps.__version__ != gps_version: + sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % + (PROG_NAME, gps_version, gps.__version__)) + sys.exit(1) + + +VERB_QUIET = 0 # quiet +VERB_NONE = 1 # just output requested data and some info +VERB_DECODE = 2 # decode all messages +VERB_INFO = 3 # more info +VERB_RAW = 4 # raw info +VERB_PROG = 5 # program trace + +# dictionary to hold all user options +opts = { + # command to send to GPS, -c + 'command': None, + # command for -d disable + 'disable': None, + # command for -e enable + 'enable': None, + # default input -f file + 'input_file_name': None, + # default forced wait? -W + 'input_forced_wait': False, + # default port speed -s + 'input_speed': 115200, + # default input wait time -w in seconds + 'input_wait': 2.0, + # the name of an OAF file, extension .jpo + 'oaf_name': None, + # poll command -p + 'poll': None, + # raw log file name + 'raw_file': None, + # open port read only -r + 'read_only': False, + # speed to set GPS -S + 'set_speed': None, + # target gpsd (server:port:device) to connect to + 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, + # verbosity level, -v + 'verbosity': VERB_NONE, + # contents of environment variable ZERKOPTS + 'progopts': '', +} + + +class greis(object): + """A class for working with the GREIS GPS message formats + + This class contains functions to decode messages in the Javad GREIS + "Receiver Input Language" and "Receiver Messages" formats. + """ + + # when a statement identifier is received, it is stored here + last_statement_identifier = None + # expected statement identifier. + expect_statement_identifier = False + # ID of current message as a string + s_id = '' + + def __init__(self): + "Initialize class" + + self.last_statement_identifier = None + self.expect_statement_identifier = False + # last epoch received in [~~] + # epoch == None means never got epoch, epoch == -1 means missing. + self.epoch = None + + def f4_s(self, f): + "convert an '! f4' to a string" + + if gps.isfinite(f): + # yeah, the precision is a guess + return "%.6f" % f + return 'X' + + def f8_s(self, f): + "convert an '! f8' to a string" + + if gps.isfinite(f): + # yeah, the precision is a guess + return "%.4f" % f + return 'X' + + def i1_s(self, i): + "convert an '! i1' to a string" + return 'X' if i == 127 else str(i) + + def i2_s(self, i): + "convert an '! i2' to a string" + return 'X' if i == 32767 else str(i) + + def i4_s(self, i): + "convert an '! i4' to a string" + return 'X' if i == 2147483647 else str(i) + + def u1_s(self, u): + "convert an '! u1' to a string" + return 'X' if u == 255 else str(u) + + def u2_s(self, u): + "convert an '! u2' to a string" + return 'X' if u == 65535 else str(u) + + def u4_s(self, u): + "convert an '! u4' to a string" + return 'X' if u == 4294967295 else str(u) + + def isuchex(self, c): + "Is byte an upper case hex char?" + if 48 <= c and 57 >= c: + # 0 to 9 + return int(c) - 48 + if 65 <= c and 70 >= c: + # A to F + return int(c) - 55 + return -1 + + soltypes = {0: "None", + 1: "3D", + 2: "DGPS", + 3: "RTK float", + 4: "RTK fixed", + 5: "fixed" + } + + # allowable speeds + speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600, + 4800, 2400, 1200, 600, 300) + + def msg_c_(self, payload): + "[c?] decode, Smoothing Corrections" + + s = ' smooth' + for i in range(0, len(payload) - 1, 2): + u = struct.unpack_from(' m_len): + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len): + return " Bad Length %s" % m_len + if ('[CN]' == self.s_id) and (132 > m_len): + return " Bad Length %s" % m_len + if ('[EN]' == self.s_id) and (145 > m_len): + return " Bad Length %s" % m_len + + u = struct.unpack_from(' m_len): + u = struct.unpack_from(' m_len): + u = struct.unpack_from('= c): + # c is printable + print("state: %s char %c (%#x)" % (state, chr(c), c)) + else: + # c is not printable + print("state: %s char %#x" % (state, c)) + + m_raw.extend([c]) + + # parse input stream per GREIS Ref Guide Section 3.3.3 + if 'BASE' == state: + # start fresh + # place to store 'comments' + comment = '' + # message id byte one + m_id1 = 0 + # message id byte two + m_id2 = 0 + # message length as integer + m_len = 0 + # byte array to hold payload, including possible checksum + m_payload = bytearray(0) + m_raw = bytearray(0) + m_raw.extend([c]) + + if (ord('0') <= c) and (ord('~') >= c): + # maybe id 1, '0' to '~' + state = 'ID1' + + # start the grab + m_id1 = c + continue + + if ord("%") == c: + # start of %ID%, Receiver Input Language + # per GREIS Ref Guide Section 2.2 + state = 'RIL' + + # start fresh + comment = "%" + continue + + if ord("$") == c: + # NMEA line, treat as comment + state = 'NMEA' + + # start fresh + comment = "$" + continue + + if ord("#") == c: + # comment line + state = 'COMMENT' + + # start fresh + comment = "#" + continue + + if ord('\n') == c or ord('\r') == c: + # stray newline or linefeed, eat it + return consumed + + # none of the above, stay in BASE + continue + + if state in ('COMMENT', 'JSON', 'RIL'): + # inside comment + if ord('\n') == c or ord('\r') == c: + # Got newline or linefeed + # GREIS terminates messages on or + # Done, got a full message + if b'{"class":"ERROR"' in comment: + # always print gpsd errors + print(comment) + elif VERB_DECODE <= opts['verbosity']: + print(comment) + return consumed + else: + comment += chr(c) + continue + + if 'ID1' == state: + # maybe id 2, '0' to '~' + if ord('"') == c: + # technically could be GREIS, but likely JSON + state = 'JSON' + comment += chr(m_id1) + chr(c) + elif (ord('0') <= c) and (ord('~') >= c): + state = 'ID2' + m_id2 = c + else: + state = 'BASE' + continue + + if 'ID2' == state: + # maybe len 1, 'A' to 'F' + x = self.isuchex(c) + if -1 < x: + state = 'LEN1' + m_len = x * 256 + else: + state = 'BASE' + continue + + if 'LEN1' == state: + # maybe len 2, 'A' to 'F' + x = self.isuchex(c) + if -1 < x: + state = 'LEN2' + m_len += x * 16 + else: + state = 'BASE' + continue + + if 'LEN2' == state: + # maybe len 3, 'A' to 'F' + x = self.isuchex(c) + if -1 < x: + state = 'PAYLOAD' + m_len += x + else: + state = 'BASE' + continue + + if 'NMEA' == state: + # inside NMEA + if ord('\n') == c or ord('\r') == c: + # Got newline or linefeed + # done, got a full message + # GREIS terminates messages on or + if VERB_DECODE <= opts['verbosity']: + print(comment) + return consumed + else: + comment += chr(c) + continue + + if 'PAYLOAD' == state: + # getting payload + m_payload.extend([c]) + if len(m_payload) < m_len: + continue + + # got entire payload + self.s_id = "[%c%c]" % (chr(m_id1), chr(m_id2)) + if VERB_DECODE <= opts['verbosity']: + x_payload = binascii.hexlify(m_payload) + + # [RE], [ER] and more have no 8-bit checksum + # assume the rest do + if ((self.s_id not in ('[CS]', '[ER]', '[ES]', '[GS]', '[Is]', + '[MF]', '[NS]', '[PM]', '[QS]', '[RE]', + '[WS]') and + not self.checksum_OK(m_raw))): + print("ERROR: Bad checksum\n") + if VERB_DECODE <= opts['verbosity']: + print("DECODE: id: %s len: %d\n" + "DECODE: payload: %s\n" % + (self.s_id, m_len, x_payload)) + # skip it. + return consumed + + if self.s_id in self.messages: + if VERB_INFO <= opts['verbosity']: + print("INFO: id: %s len: %d\n" + "INFO: payload: %s\n" % + (self.s_id, m_len, x_payload)) + + (decode, length) = self.messages[self.s_id] + if m_len < length: + print("DECODE: %s Bad Length %s\n" % + (self.s_id, m_len)) + else: + s = self.s_id + decode(self, m_payload) + if VERB_DECODE <= opts['verbosity']: + print(s) + else: + # unknown message + if VERB_DECODE <= opts['verbosity']: + print("DECODE: Unknown: id: %s len: %d\n" + "DECODE: payload: %s\n" % + (self.s_id, m_len, x_payload)) + return consumed + + # give up + state = 'BASE' + + # fell out of loop, no more chars to look at + return 0 + + def checksum_OK(self, raw_msg): + "Check the i8-bit checksum on a message, return True if good" + + # some packets from the GPS use CRC16, some i8-bit checksum, some none + # only 8-bit checksum done here for now + calc_checksum = self.checksum(raw_msg, len(raw_msg) - 1) + rcode = raw_msg[len(raw_msg)-1] == calc_checksum + if VERB_RAW <= opts['verbosity']: + print("Checksum was %#x, calculated %#x" % + (raw_msg[len(raw_msg)-1], calc_checksum)) + return rcode + + def _rol(self, value): + "rotate a byte left 2 bits" + return ((value << 2) | (value >> 6)) & 0x0ff + + def checksum(self, msg, m_len): + "Calculate GREIS 8-bit checksum" + + # Calculated per section A.1.1 of the specification + # msg may be bytes (incoming messages) or str (outgoing messages) + + ck = 0 + for c in msg[0:m_len]: + if isinstance(c, str): + # a string, make a byte + c = ord(c) + ck = self._rol(ck) ^ c + + return self._rol(ck) & 0x0ff + + def make_pkt(self, m_data): + "Build an output message, always ASCII, add checksum and terminator" + + # build core message + + # no leading spaces, checksum includes the @ + m_data = m_data.lstrip() + b'@' + + chk = self.checksum(m_data, len(m_data)) + + # all commands end with CR and/or LF + return m_data + (b'%02X' % chk) + b'\n' + + def gps_send(self, m_data): + "Send message to GREIS GPS" + + m_all = self.make_pkt(m_data) + if not opts['read_only']: + io_handle.ser.write(m_all) + if VERB_INFO <= opts['verbosity']: + print("sent:", m_all) + self.decode_msg(m_all) + sys.stdout.flush() + + # Table of known options. From table 4-2 of the specification. + oafs = ( + b"_AJM", + b"AUTH", + b"_BLT", + b"_CAN", + b"CDIF", + b"CMRI", + b"CMRO", + b"COMP", + b"COOP", + b"COPN", + b"CORI", + b"_CPH", + b"DEVS", + b"DIST", + b"_DTM", + b"_E5B", + b"_E6_", + b"EDEV", + b"ETHR", + b"EVNT", + b"_FRI", + b"_FRO", + b"_FTP", + b"_GAL", + b"GBAI", + b"GBAO", + b"GCLB", + b"_GEN", + b"_GEO", + b"_GLO", + b"_GPS", + b"_GSM", + b"HTTP", + b"_IMU", + b"INFR", + b"IRIG", + b"IRNS", + b"JPSI", + b"JPSO", + b"_L1C", + b"_L1_", + b"_L2C", + b"_L2_", + b"_L5_", + b"LAT1", + b"LAT2", + b"LAT3", + b"LAT4", + b"LCS2", + b"L_CS", + b"_LIM", + b"LON1", + b"LON2", + b"LON3", + b"LON4", + b"MAGN", + b"_MEM", + b"_MPR", + b"OCTO", + b"OMNI", + b"_PAR", + b"PDIF", + b"_POS", + b"_PPP", + b"_PPS", + b"PRTT", + b"_PTP", + b"QZSS", + b"RAIM", + b"_RAW", + b"RCVT", + b"RM3I", + b"RM3O", + b"RS_A", + b"RS_B", + b"RS_C", + b"RS_D", + b"RTMI", + b"RTMO", + b"SPEC", + b"TCCL", + b"_TCP", + b"TCPO", + b"_TLS", + b"TRST", + b"UDPO", + b"_UHF", + b"_USB", + b"WAAS", + b"WIFI", + b"_WPT", + ) + + def send_able_4hz(self, able): + "enable basic GREIS messages at 4Hz" + + self.expect_statement_identifier = 'greis' + + # the messages we want + # [SX] requires 3.7 firmware, we use [SI] to support 3.6 + messages = b"jps/{RT,UO,GT,PV,SG,DP,SI,EL,AZ,EC,SS,ET}" + + if able: + # Message rate must be an integer multiple of /par/raw/msint + # Default msint is 0.100 seconds, so that must be changed first + self.gps_send(b"%msint%set,/par/raw/msint,250") + + self.gps_send(b"%greis%em,," + messages + b":0.25") + else: + self.gps_send(b"%greis%dm,," + messages) + + def send_able_comp(self, able): + "dis/enable COMPASS, aka BeiDou" + self.expect_statement_identifier = 'cons' + en_dis = b'y' if 1 == able else b'n' + self.gps_send(b"%cons%set,/par/pos/sys/comp," + en_dis) + + def send_able_constellations(self, able): + "dis/enable all constellations" + self.expect_statement_identifier = 'cons7' + en_dis = b'y' if 1 == able else b'n' + self.gps_send(b"%cons1%set,/par/pos/sys/comp," + en_dis) + self.gps_send(b"%cons2%set,/par/pos/sys/gal," + en_dis) + # this will fail on TR-G2H, as it has no GLONASS + self.gps_send(b"%cons3%set,/par/pos/sys/glo," + en_dis) + self.gps_send(b"%cons4%set,/par/pos/sys/gps," + en_dis) + self.gps_send(b"%cons5%set,/par/pos/sys/irnss," + en_dis) + self.gps_send(b"%cons6%set,/par/pos/sys/sbas," + en_dis) + self.gps_send(b"%cons7%set,/par/pos/sys/qzss," + en_dis) + + def send_able_defmsg(self, able): + "dis/enable default messages at 1Hz" + self.expect_statement_identifier = 'defmsg' + if able: + self.gps_send(b"%defmsg%em,,jps/RT,/msg/def:1,jps/ET") + else: + # leave RT and ET to break less? + self.gps_send(b"%defmsg%dm,,/msg/def:1") + + def send_able_gal(self, able): + "dis/enable GALILEO" + self.expect_statement_identifier = 'cons' + en_dis = b'y' if 1 == able else b'n' + self.gps_send(b"%cons%set,/par/pos/sys/gal," + en_dis) + + def send_able_glo(self, able): + "dis/enable GLONASS" + self.expect_statement_identifier = 'cons' + en_dis = b'y' if 1 == able else b'n' + self.gps_send(b"%cons%set,/par/pos/sys/glo," + en_dis) + + def send_able_gps(self, able): + "dis/enable GPS" + self.expect_statement_identifier = 'cons' + en_dis = b'y' if 1 == able else b'n' + self.gps_send(b"%cons%set,/par/pos/sys/gps," + en_dis) + + def send_able_ipr(self, able): + "dis/enable all Integer Psuedo-Range messages" + self.expect_statement_identifier = 'em' + if able: + self.gps_send(b"%em%em,,jps/{rx,rc,r1,r2,r3,r5,rl}:0.25") + else: + self.gps_send(b"%em%dm,,jps/{rx,rc,r1,r2,r3,r5,rl}") + + def send_able_irnss(self, able): + "dis/enable IRNSS" + self.expect_statement_identifier = 'cons' + en_dis = b'y' if 1 == able else b'n' + self.gps_send(b"%cons%set,/par/pos/sys/irnss," + en_dis) + + def send_able_nmea41(self, able): + "dis/enable basic NMEA 4.1e messages at 4Hz" + + self.expect_statement_identifier = 'nmea' + + messages = b"nmea/{GBS,GGA,GSA,GST,GSV,RMC,VTG,ZDA}" + + if able: + # set NMEA version 4.1e + self.gps_send(b"%nmeaver%set,/par/nmea/ver,v4.1e") + + # Message rate must be an integer multiple of /par/raw/msint + # Default msint is 0.100 seconds, so that must be changed first + self.gps_send(b"%msint%set,/par/raw/msint,250") + + # now we can set the messages we want + self.gps_send(b"%nmea%em,," + messages + b":0.25") + else: + # disable + self.gps_send(b"%nmea%dm,," + messages) + + def send_able_sbas(self, able): + "dis/enable SBAS" + self.expect_statement_identifier = 'cons' + en_dis = b'y' if 1 == able else b'n' + self.gps_send(b"%cons%set,/par/pos/sys/sbas," + en_dis) + + def send_able_qzss(self, able): + "dis/enable QZSS" + self.expect_statement_identifier = 'cons' + en_dis = b'y' if 1 == able else b'n' + self.gps_send(b"%cons%set,/par/pos/sys/qzss," + en_dis) + + def send_able_snr(self, able): + "dis/enable all SNR messages, except [EC]" + self.expect_statement_identifier = 'em' + if able: + self.gps_send(b"%em%em,,jps/{E1,E2,E3,E5,El}:0.25") + else: + self.gps_send(b"%em%dm,,jps/{E1,E2,E3,E5,El}") + + able_commands = { + # en/disable basic GREIS messages at 4HZ + "4HZ": {"command": send_able_4hz, + "help": "basic GREIS messages at 4Hz"}, + # en/disable all constellations + "CONS": {"command": send_able_constellations, + "help": "all constellations"}, + # en/disable COMPASS, aka Beidou + "COMPASS": {"command": send_able_comp, + "help": "COMPASS"}, + # en/disable default message set. + "DEFMSG": {"command": send_able_defmsg, + "help": "default message set at 1Hz"}, + # en/disable GALILEO + "GALILEO": {"command": send_able_gal, + "help": "GALILEO"}, + # en/disable GLONASS + "GLONASS": {"command": send_able_glo, + "help": "GLONASS"}, + # en/disable GPS + "GPS": {"command": send_able_gps, + "help": "GPS"}, + # en/disable Integer Psuedo Range messages + "IPR": {"command": send_able_ipr, + "help": "all Integer Psuedo Range messages"}, + # en/disable IRNSS + "IRNSS": {"command": send_able_irnss, + "help": "IRNSS"}, + # en/disable NMEA 4.1e + "NMEA": {"command": send_able_nmea41, + "help": "basic messages NMEA 4.1 at 4Hz"}, + # en/disable SBAS + "SBAS": {"command": send_able_sbas, + "help": "SBAS"}, + # en/disable all SNRs + "SNR": {"command": send_able_snr, + "help": "all SNR messages, except [EC]"}, + # en/disable QZSS + "QZSS": {"command": send_able_qzss, + "help": "QZSS"}, + } + + def send_coldboot(self): + "Delete NVRAM (almanac, ephemeris, location) and restart" + self.expect_statement_identifier = 'coldboot' + self.gps_send(b"%coldboot%init,/dev/nvm/a") + + def send_constellations(self): + "poll all constellations" + self.expect_statement_identifier = 'cons' + self.gps_send(b"%cons%print,/par/pos/sys:on") + + def send_get_id(self): + "get receiver id" + self.expect_statement_identifier = 'id' + self.gps_send(b"%id%print,/par/rcv/id") + + def send_get_oaf(self): + "poll OAF (GPS opts)" + + self.expect_statement_identifier = 'opts,_WPT' + if VERB_RAW <= opts['verbosity']: + # get list of all opts + self.gps_send(b"%opts,list%list,/par/opts") + + # request opts one at a time from canned list + for s in self.oafs: + self.gps_send(b"%%opts,%s%%print,/par/opts/%s" % (s, s)) + + def send_get_serial(self): + "get receiver serial number" + self.expect_statement_identifier = 'serial' + self.gps_send(b"%serial%print,/par/rcv/sn") + + def send_reset(self): + "reset (reboot) the GPS" + self.expect_statement_identifier = 'reset' + self.gps_send(b"%reset%set,/par/reset,y") + + def send_set_dm(self): + "disable all messages" + self.expect_statement_identifier = 'dm' + self.gps_send(b"%dm%dm") + + def send_set_ipr(self): + "poll Integer Psuedo-Range messages" + self.expect_statement_identifier = 'out' + self.gps_send(b"%out%out,,jps/{rx,rc,r1,r2,r3,r5,rl}") + + def send_get_snr(self): + "poll all SNR messages" + # nothing we can wait on, depending on GPS model/configuration + # we may never see some of E2, E3, E5 or El + self.gps_send(b"%out%out,,jps/{EC,E1,E2,E3,E5,El}") + + def send_set_speed(self, set_speed): + "change GPS speed" + self.expect_statement_identifier = 'setspeed' + self.gps_send(b"%%setspeed%%set,/par/cur/term/rate,%d" % + set_speed) + + def send_get_vendor(self): + "get receiver vendor" + self.expect_statement_identifier = 'vendor' + self.gps_send(b"%vendor%print,/par/rcv/vendor") + + def send_get_ver(self): + "get receiver version, per section 4.4.3 of the specification" + self.expect_statement_identifier = 'ver' + self.gps_send(b"%ver%print,/par/rcv/ver") + + # list of canned commands that can be sent to the receiver + commands = { + "COLDBOOT": {"command": send_coldboot, + "help": "cold boot the GPS"}, + "CONS": {"command": send_constellations, + "help": "poll enabled constellations"}, + "DM": {"command": send_set_dm, + "help": "disable all periodic messages"}, + "ID": {"command": send_get_id, + "help": "poll receiver ID"}, + "IPR": {"command": send_set_ipr, + "help": "poll all Integer Psuedo-range messages"}, + "OAF": {"command": send_get_oaf, + "help": "poll all OAF options"}, + "RESET": {"command": send_reset, + "help": "reset (reboot) the GPS"}, + "SERIAL": {"command": send_get_serial, + "help": "poll receiver serial number"}, + "SNR": {"command": send_get_snr, + "help": "poll all SNR messages"}, + "VENDOR": {"command": send_get_vendor, + "help": "poll GPS vendor"}, + "VER": {"command": send_get_ver, + "help": "poll GPS version"}, + } + + +class gps_io(object): + """All the GPS I/O in one place" + + Three types of GPS I/O + 1. read only from a file + 2. read/write through a device + 3. read only from a gpsd instance + """ + + out = b'' + ser = None + input_is_device = False + + def __init__(self, serial_class): + "Initialize class" + + Serial = serial_class + # buffer to hold read data + self.out = b'' + + # open the input: device, file, or gpsd + if opts['input_file_name'] is not None: + # check if input file is a file or device + try: + mode = os.stat(opts['input_file_name']).st_mode + except OSError: + sys.stderr.write('%s: failed to open input file %s\n' % + (PROG_NAME, opts['input_file_name'])) + sys.exit(1) + + if stat.S_ISCHR(mode): + # character device, need not be read only + self.input_is_device = True + + if ((opts['disable'] or opts['enable'] or opts['poll'] or + opts['oaf_name'])): + + # check that we can write + if opts['read_only']: + sys.stderr.write('%s: read-only mode, ' + 'can not send commands\n' % PROG_NAME) + sys.exit(1) + if self.input_is_device is False: + sys.stderr.write('%s: input is plain file, ' + 'can not send commands\n' % PROG_NAME) + sys.exit(1) + + if opts['target']['server'] is not None: + # try to open local gpsd + try: + self.ser = gps.gpscommon(host=None) + self.ser.connect(opts['target']['server'], + opts['target']['port']) + + # alias self.ser.write() to self.write_gpsd() + self.ser.write = self.write_gpsd + # ask for raw, not rare, data + data_out = b'?WATCH={' + if opts['target']['device'] is not None: + # add in the requested device + data_out += (b'"device":"' + opts['target']['device'] + + b'",') + data_out += b'"enable":true,"raw":2}\r\n' + if VERB_RAW <= opts['verbosity']: + print("sent: ", data_out) + self.ser.send(data_out) + except socket.error as err: + sys.stderr.write('%s: failed to connect to gpsd %s\n' % + (PROG_NAME, err)) + sys.exit(1) + + elif self.input_is_device: + # configure the serial connections (the parameters refer to + # the device you are connecting to) + + try: + self.ser = Serial.Serial( + baudrate=opts['input_speed'], + # 8N1 is GREIS default + bytesize=Serial.EIGHTBITS, + parity=Serial.PARITY_NONE, + port=opts['input_file_name'], + stopbits=Serial.STOPBITS_ONE, + # read timeout + timeout=0.05, + # pyserial Ver 3.0+ changes writeTimeout to write_timeout + # just set both + write_timeout=0.5, + writeTimeout=0.5, + ) + except Serial.serialutil.SerialException: + # this exception happens on bad serial port device name + sys.stderr.write('%s: failed to open serial port "%s"\n' + ' Your computer has these serial ports:\n' + % (PROG_NAME, opts['input_file_name'])) + + # print out list of supported ports + import serial.tools.list_ports as List_Ports + ports = List_Ports.comports() + for port in ports: + sys.stderr.write(" %s: %s\n" % + (port.device, port.description)) + sys.exit(1) + + # flush input buffer, discarding all its contents + self.ser.flushInput() + + else: + # Read from a plain file of GREIS messages + try: + self.ser = open(opts['input_file_name'], 'rb') + except IOError: + sys.stderr.write('%s: failed to open input %s\n' % + (PROG_NAME, opts['input_file_name'])) + sys.exit(1) + + def read(self, read_opts): + "Read from device, until timeout or expected message" + + # are we expecting a certain message? + if gps_model.expect_statement_identifier: + # assume failure, until we see expected message + ret_code = 1 + else: + # not expecting anything, so OK if we did not see it. + ret_code = 0 + + try: + if read_opts['target']['server'] is not None: + # gpsd input + start = time.clock() + while read_opts['input_wait'] > (time.clock() - start): + # First priority is to be sure the input buffer is read. + # This is to prevent input buffer overuns + if 0 < self.ser.waiting(): + # We have serial input waiting, get it + # No timeout possible + # RTCM3 JSON can be over 4.4k long, so go big + new_out = self.ser.sock.recv(8192) + if raw is not None: + # save to raw file + raw.write(new_out) + self.out += new_out + + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if ((gps_model.expect_statement_identifier and + (gps_model.expect_statement_identifier == + gps_model.last_statement_identifier))): + # Got what we were waiting for. Done? + ret_code = 0 + if not read_opts['input_forced_wait']: + # Done + break + + elif self.input_is_device: + # input is a serial device + start = time.clock() + while read_opts['input_wait'] > (time.clock() - start): + # First priority is to be sure the input buffer is read. + # This is to prevent input buffer overuns + # pyserial 3.0 replaces inWaiting() with in_waiting + if 0 < self.ser.inWaiting(): + # We have serial input waiting, get it + # 1024 is comfortably large, almost always the + # Read timeout is what causes ser.read() to return + new_out = self.ser.read(1024) + if raw is not None: + # save to raw file + raw.write(new_out) + self.out += new_out + + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if ((gps_model.expect_statement_identifier and + (gps_model.expect_statement_identifier == + gps_model.last_statement_identifier))): + # Got what we were waiting for. Done? + ret_code = 0 + if not read_opts['input_forced_wait']: + # Done + break + else: + # ordinary file, so all read at once + self.out += self.ser.read() + if raw is not None: + # save to raw file + raw.write(self.out) + + while True: + consumed = gps_model.decode_msg(self.out) + self.out = self.out[consumed:] + if 0 >= consumed: + break + + except IOError: + # This happens on a good device name, but gpsd already running. + # or if USB device unplugged + sys.stderr.write('%s: failed to read %s\n' + '%s: Is gpsd already holding the port?\n' + % (PROG_NAME, PROG_NAME, + read_opts['input_file_name'])) + return 1 + + if 0 < ret_code: + # did not see the message we were expecting to see + sys.stderr.write('%s: waited %0.2f seconds for, ' + 'but did not get: %%%s%%\n' + % (PROG_NAME, read_opts['input_wait'], + gps_model.expect_statement_identifier)) + return ret_code + + def write_gpsd(self, data): + "write data to gpsd daemon" + + # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. + # Input data is binary, converting to hex doubles its size. + # Limit binary data to length 255, so hex data length less than 510. + if 255 < len(data): + sys.stderr.write('%s: trying to send %d bytes, max is 255\n' + % (PROG_NAME, len(data))) + return 1 + + if opts['target']['device'] is not None: + # add in the requested device + data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' + else: + data_out = b'?DEVICE={' + + # Convert binary data to hex and build the message. + data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' + if VERB_RAW <= opts['verbosity']: + print("sent: ", data_out) + self.ser.send(data_out) + return 0 + + +def usage(): + "Print usage information, and exit" + + print("usage: %s [-?hrVW] [-c C] [-d D] [-e E] [-f F] [-O O] [-p P]\n" + " [-R R] [-S S] [-s S] [-v V] [-w W]\n" + " [server[:port[:device]]]\n\n" % PROG_NAME) + print('usage: %s [options]\n' + ' -? print this help\n' + ' -c C send command C to GPS\n' + ' -d D disable D\n' + ' -e E enable E\n' + ' -f F open F as file/device\n' + ' default: %s\n' + ' -h print this help\n' + ' -O O send OAF file to GPS\n' + ' -p P send preset GPS command P\n' + ' -R R save raw data from GPS in file R\n' + ' -r open file/device read only\n' + ' default: %s\n' + ' -S S configure GPS speed to S\n' + ' -s S set port speed to S\n' + ' default: %d bps\n' + ' -V print version\n' + ' -v V Set verbosity level to V, 0 to 4\n' + ' default: %d\n' + ' -W force entire wait time, no exit early\n' + ' -w W wait time, exit early on -p result\n' + ' default: %s seconds\n' + ' [server[:port[:device]]] Connect to gpsd\n' + ' default port: 2947\n' + ' default device: None\n' + '\n' + 'D and E can be one of:' % + (PROG_NAME, opts['input_file_name'], opts['raw_file'], + opts['input_speed'], opts['verbosity'], opts['input_wait']) + ) + + # print list of enable/disable commands + for item in sorted(gps_model.able_commands.keys()): + print(" %-12s %s" % (item, gps_model.able_commands[item]["help"])) + + print('\nthe preset GPS command P can be one of:') + + # print list of possible canned commands + for item in sorted(gps_model.commands.keys()): + print(" %-12s %s" % (item, gps_model.commands[item]["help"])) + print('\nOptions can be placed in the ZERKOPTS environment variable.\n' + 'ZERKOPTS is processed before the CLI options.') + sys.exit(0) + + +# create the GREIS instance +gps_model = greis() + +if 'ZERKOPTS' in os.environ: + # grab the ZERKOPTS environment variable for options + opts['progopts'] = os.environ['ZERKOPTS'] + options = opts['progopts'].split(' ') + sys.argv[1:] +else: + options = sys.argv[1:] + +try: + (options, arguments) = getopt.getopt(options, + "?c:d:e:f:hrp:s:w:v:O:R:S:WV") +except getopt.GetoptError as err: + sys.stderr.write("%s: %s\n" + "Try '%s -h' for more information.\n" % + (PROG_NAME, str(err), PROG_NAME)) + sys.exit(2) + +for (opt, val) in options: + if opt == '-c': + # command + opts['command'] = val + elif opt == '-d': + # disable + opts['disable'] = val + elif opt == '-e': + # enable + opts['enable'] = val + elif opt == '-f': + # file input + opts['input_file_name'] = val + elif opt == '-h' or opt == '-?': + # help + usage() + elif opt == '-p': + # preprogrammed command + opts['poll'] = val + elif opt == '-r': + # read only + opts['read_only'] = True + elif opt == '-s': + # serial port speed + opts['input_speed'] = int(val) + if opts['input_speed'] not in gps_model.speeds: + sys.stderr.write('%s: -s invalid speed %s\n' % + (PROG_NAME, opts['input_speed'])) + sys.exit(1) + + elif opt == '-w': + # max wait time, seconds + opts['input_wait'] = float(val) + elif opt in '-v': + # verbosity level + opts['verbosity'] = int(val) + elif opt in '-O': + # OAF .jpo file + opts['oaf_name'] = val + elif opt in '-R': + # raw log file + opts['raw_file'] = val + elif opt in '-S': + # set GPS serial port speed + opts['set_speed'] = int(val) + if opts['set_speed'] not in gps_model.speeds: + sys.stderr.write('%s: -S invalid speed %s\n' % + (PROG_NAME, opts['set_speed'])) + sys.exit(1) + + elif opt == '-W': + # forced wait, no early exit on command completion + opts['input_forced_wait'] = True + elif opt == '-V': + # version + sys.stderr.write('zerk: Version %s\n' % gps_version) + sys.exit(0) + +if opts['input_file_name'] is None: + # no input file given + # default to local gpsd + opts['target']['server'] = "localhost" + opts['target']['port'] = gps.GPSD_PORT + opts['target']['device'] = None + if arguments: + # server[:port[:device]] + arg_parts = arguments[0].split(':') + opts['target']['server'] = arg_parts[0] + if 1 < len(arg_parts): + opts['target']['port'] = arg_parts[1] + if 2 < len(arg_parts): + opts['target']['device'] = arg_parts[2] + +elif arguments: + sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) + sys.exit(1) + +if VERB_PROG <= opts['verbosity']: + # dump all options + print('Options:') + for option in sorted(opts): + print(" %s: %s" % (option, opts[option])) + +# done parsing arguments from environment and CLI + +try: + # raw log file requested? + raw = None + if opts['raw_file']: + try: + raw = open(opts['raw_file'], 'w') + except IOError: + sys.stderr.write('%s: failed to open raw file %s\n' % + (PROG_NAME, opts['raw_file'])) + sys.exit(1) + + # create the I/O instance + io_handle = gps_io(serial) + + # keep it simple, only one of -O, -c -d -e or -S + if opts['oaf_name'] is not None: + # parse an OAF file + try: + oaf_root = xml.etree.ElementTree.parse(opts['oaf_name']).getroot() + oaf = dict() + for tag in ('id', 'oaf', 'hash'): + oaf[tag] = oaf_root.find(tag).text + oaf['oaf'] = oaf['oaf'].split('\n') + if VERB_PROG <= opts['verbosity']: + print(oaf) + except xml.etree.ElementTree.ParseError: + sys.stderr.write('%s: failed to parse OAF "%s"\n' + % (PROG_NAME, opts['oaf_name'])) + sys.exit(1) + except IOError: + sys.stderr.write('%s: failed to read OAF "%s"\n' + % (PROG_NAME, opts['oaf_name'])) + sys.exit(1) + + # calculate hash + oaf_s = '\n'.join(oaf['oaf']) + hash_s = hashlib.sha1(oaf_s).hexdigest() + if hash_s != oaf['hash']: + sys.stderr.write('%s: OAF bad hash "%s", s/b %s\n' + % (PROG_NAME, hash_s, oaf['hash'])) + sys.exit(1) + + # TODO: probably should check the ID first... + # TODO: prolly should send one command per handshake + # blasting all commands at once, seems to not work reliably + for command in oaf['oaf']: + time.sleep(0.1) # wait 0.1 seconds each + gps_model.gps_send(command) + # this will detect when it is all done + gps_model.gps_send(b'%DONE%') + gps_model.expect_statement_identifier = 'DONE' + + elif opts['command'] is not None: + # zero length is OK to send + if 1 < len(opts['command']) and '%' != opts['command'][0]: + # add ID, if missing + gps_model.expect_statement_identifier = 'CMD' + opts['command'] = "%CMD%" + opts['command'] + + # add trailing new line + opts['command'] += "\n" + + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) + gps_model.gps_send(opts['command']) + + elif opts['disable'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) + if opts['disable'] in gps_model.able_commands: + command = gps_model.able_commands[opts['disable']] + command["command"](gps_model, 0) + else: + sys.stderr.write('%s: disable %s not found\n' % + (PROG_NAME, opts['disable'])) + sys.exit(1) + + elif opts['enable'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) + if opts['enable'] in gps_model.able_commands: + command = gps_model.able_commands[opts['enable']] + command["command"](gps_model, 1) + else: + sys.stderr.write('%s: enable %s not found\n' % + (PROG_NAME, opts['enable'])) + sys.exit(1) + + elif opts['poll'] is not None: + if VERB_QUIET < opts['verbosity']: + sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) + if opts['poll'] in gps_model.commands: + command = gps_model.commands[opts['poll']] + command["command"](gps_model) + else: + sys.stderr.write('%s: poll %s not found\n' % + (PROG_NAME, opts['poll'])) + sys.exit(1) + + elif opts['set_speed'] is not None: + gps_model.send_set_speed(opts['set_speed']) + + exit_code = io_handle.read(opts) + + if ((VERB_RAW <= opts['verbosity']) and io_handle.out): + # dump raw left overs + print("Left over data:") + print(io_handle.out) + + sys.stdout.flush() + io_handle.ser.close() + +except KeyboardInterrupt: + print('') + exit_code = 1 + +sys.exit(exit_code) -- cgit v1.2.1