diff options
Diffstat (limited to 'contrib/zerk')
-rwxr-xr-x | contrib/zerk | 1896 |
1 files changed, 0 insertions, 1896 deletions
diff --git a/contrib/zerk b/contrib/zerk deleted file mode 100755 index 848d216b..00000000 --- a/contrib/zerk +++ /dev/null @@ -1,1896 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -''' -zerk -- GREIS configurator and packet decoder - -usage: zerk [OPTIONS] [server[:port[:device]]] -''' - -# This program conforms to the JAVAD document: -# GNSS Receiver External Interface Specification -# Revised: October 11, 2017 -# -# Hereafter referred to as "the specification" -# -# This file is Copyright (c) 2018 by the GPSD project -# BSD terms apply: see the file COPYING in the distribution root for details. -# -# This code runs compatibly under Python 2 and 3.x for x >= 2. -# Preserve this property! -# -# ENVIRONMENT: -# Options in the ZERKOPTS environment variable will be parsed before -# the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' -# -# example usages: -# Coldboot the GPS: zerk -p COLDBOOT -# Print current serial port: zerk -c "print,/cur/term" -# Decode raw log file: zerk -r -f greis-binary.log -v 2 -# Change GPS port speed: zerk -S 230400 -# Watch entire reset cycle: zerk -p RESET -v 2 -w 20 -W -# poll SVs Status: zerk -W -w 2 -v 2 -c "out,,jps/{CS,ES,GS,Is,WS,QS}" -# dump local gpsd data zerk -v 2 -w 5 localhost -# -# TODO: no CRC16 packets handled yet -# TODO: more packet decodes - -from __future__ import absolute_import, print_function, division - -import binascii # for binascii.hexlify() -import getopt # for getopt.getopt(), to parse CLI options -import hashlib # for hashlib.sha1 -import os # for os.environ -import socket # for socket.error -import stat # for stat.S_ISBLK() -import struct # for pack() -import sys -import time -import xml.etree.ElementTree # to parse .jpo files - -PROG_NAME = 'zerk' - -try: - import serial -except ImportError: - # treat serial as special since it is not part of standard Python - sys.stderr.write("%s: failed to import pyserial\n" % PROG_NAME) - sys.exit(2) - -try: - import gps -except ImportError: - # PEP8 says local imports last - sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % - PROG_NAME) - sys.exit(2) - -gps_version = '3.18-dev' -if gps.__version__ != gps_version: - sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % - (PROG_NAME, gps_version, gps.__version__)) - sys.exit(1) - - -VERB_QUIET = 0 # quiet -VERB_NONE = 1 # just output requested data and some info -VERB_DECODE = 2 # decode all messages -VERB_INFO = 3 # more info -VERB_RAW = 4 # raw info -VERB_PROG = 5 # program trace - -# dictionary to hold all user options -opts = { - # command to send to GPS, -c - 'command': None, - # command for -d disable - 'disable': None, - # command for -e enable - 'enable': None, - # default input -f file - 'input_file_name': None, - # default forced wait? -W - 'input_forced_wait': False, - # default port speed -s - 'input_speed': 115200, - # default input wait time -w in seconds - 'input_wait': 2.0, - # the name of an OAF file, extension .jpo - 'oaf_name': None, - # poll command -p - 'poll': None, - # raw log file name - 'raw_file': None, - # open port read only -r - 'read_only': False, - # speed to set GPS -S - 'set_speed': None, - # target gpsd (server:port:device) to connect to - 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, - # verbosity level, -v - 'verbosity': VERB_NONE, - # contents of environment variable ZERKOPTS - 'progopts': '', -} - - -class greis(object): - """A class for working with the GREIS GPS message formats - - This class contains functions to decode messages in the Javad GREIS - "Receiver Input Language" and "Receiver Messages" formats. - """ - - # when a statement identifier is received, it is stored here - last_statement_identifier = None - # expected statement identifier. - expect_statement_identifier = False - # ID of current message as a string - s_id = '' - - def __init__(self): - "Initialize class" - - self.last_statement_identifier = None - self.expect_statement_identifier = False - # last epoch received in [~~] - # epoch == None means never got epoch, epoch == -1 means missing. - self.epoch = None - - def f4_s(self, f): - "convert an '! f4' to a string" - - if gps.isfinite(f): - # yeah, the precision is a guess - return "%.6f" % f - return 'X' - - def f8_s(self, f): - "convert an '! f8' to a string" - - if gps.isfinite(f): - # yeah, the precision is a guess - return "%.4f" % f - return 'X' - - def i1_s(self, i): - "convert an '! i1' to a string" - return 'X' if i == 127 else str(i) - - def i2_s(self, i): - "convert an '! i2' to a string" - return 'X' if i == 32767 else str(i) - - def i4_s(self, i): - "convert an '! i4' to a string" - return 'X' if i == 2147483647 else str(i) - - def u1_s(self, u): - "convert an '! u1' to a string" - return 'X' if u == 255 else str(u) - - def u2_s(self, u): - "convert an '! u2' to a string" - return 'X' if u == 65535 else str(u) - - def u4_s(self, u): - "convert an '! u4' to a string" - return 'X' if u == 4294967295 else str(u) - - def isuchex(self, c): - "Is byte an upper case hex char?" - if 48 <= c and 57 >= c: - # 0 to 9 - return int(c) - 48 - if 65 <= c and 70 >= c: - # A to F - return int(c) - 55 - return -1 - - soltypes = {0: "None", - 1: "3D", - 2: "DGPS", - 3: "RTK float", - 4: "RTK fixed", - 5: "fixed" - } - - # allowable speeds - speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600, - 4800, 2400, 1200, 600, 300) - - def msg_c_(self, payload): - "[c?] decode, Smoothing Corrections" - - s = ' smooth' - for i in range(0, len(payload) - 1, 2): - u = struct.unpack_from('<h', payload, i) - s += " " + self.i2_s(u[0]) - - return s + '\n' - - def msg__p(self, payload): - "[?p] decode, Integer Relative Carrier Phases" - - s = ' rcp' - for i in range(0, len(payload) - 1, 4): - u = struct.unpack_from('<l', payload, i) - s += " " + self.i4_s(u[0]) - - return s + '\n' - - def msg__d(self, payload): - "[?d] decode, Relative Doppler" - - s = ' srdp' - for i in range(0, len(payload) - 1, 2): - u = struct.unpack_from('<h', payload, i) - s += " " + self.i2_s(u[0]) - - return s + '\n' - - def msg__r(self, payload): - "[?r] decode, Integer Relative Pseudo-ranges" - - s = ' srdp' - for i in range(0, len(payload) - 1, 2): - u = struct.unpack_from('<h', payload, i) - s += " " + self.i2_s(u[0]) - - return s + '\n' - - def msg__A(self, payload): - "[?A] decode, GPS, GALILEO Almanac" - m_len = len(payload) - - if ('[EA]' == self.s_id) and (49 > m_len): - return " Bad Length %s" % m_len - - u = struct.unpack_from('<BhlBBBfffffffff', payload, 0) - - s = (" sv %u wna %d toa %d healthA %u healthS %u config %u\n" - " af1 %f af0 %f rootA %f ecc %f m0 %f\n" - " omega0 %f argPer %f delf %f omegaDot %f\n" % u) - - if '[EA]' == self.s_id: - u = struct.unpack_from('<H', payload, 46) - s += (" iod %d" % (u[0])) - return s - - def msg__E(self, payload): - "[?E] decode, SNR x 4" - - s = ' cnrX4' - for i in range(0, len(payload) - 1, 1): - u = struct.unpack_from('<B', payload, i) - s += " " + self.u1_s(u[0]) - - return s + '\n' - - def msg_WE(self, payload): - "[WE] decode, SBAS Ephemeris" - - u = struct.unpack_from('<BBBBLdddffffffffLHB', payload, 0) - s = (" waasPrn %u gpsPrn %u iod %u acc %u tod %u\n" - " xg %f yg %f zg %f\n" - " vxg %f vyg %f vzg %f\n" - " vvxg %f vvyg %f vvzg %f\n" - " agf0 %f agf1 %f tow %u wn %u flags %u\n" % u) - - return s - - def msg_r(self, payload): - "[r?] decode, Integer Psudeo Ranges" - - s = ' spr' - for i in range(0, len(payload) - 1, 4): - u = struct.unpack_from('<l', payload, i) - s += " " + self.i4_s(u[0]) - - return s + '\n' - - def msg_AZ(self, payload): - "[AZ] decode, Satellite Azimuths" - - s = " azim" - for i in range(0, len(payload) - 1): - # azimuth/2, 0 to 180 degrees - s += " " + self.u1_s(payload[i]) - - return s + '\n' - - def msg_BP(self, payload): - "[BP] decode" - - u = struct.unpack_from('<f', payload, 0) - return " acc %.3e\n" % u[0] - - def msg_DC(self, payload): - "[DC] decode, P/L1 Doppler" - - s = " dp" - for i in range(0, len(payload) - 1, 4): - u = struct.unpack_from('<L', payload, i) - s += " %d" % (u[0]) - - return s + '\n' - - def msg_DO(self, payload): - "[DO] decode" - - u = struct.unpack_from('<ff', payload, 0) - return " val %.3f sval %.3f\n" % u - - def msg_DP(self, payload): - "[DP] decode" - - u = struct.unpack_from('<fffBfB', payload, 0) - return (" hdop %f vdop %f tdop %f edop %f\n" - " solType %s\n" % - (u[0], u[1], u[2], u[4], self.soltypes[u[3]])) - - def msg_E_(self, payload): - "[E?] decode, SNR" - - s = ' cnr' - for i in range(0, len(payload) - 1): - s += " " + self.u1_s(payload[i]) - - return s + '\n' - - def msg_ET(self, payload): - "[::](ET) decode, Epoch time, end of epoch" - - u = struct.unpack_from('<L', payload, 0) - if ((self.epoch is not None and self.epoch != u[0])): - if -1 == self.epoch: - print("Error: [::](ET) missing [~~](RT)\n") - else: - print("Error: [::](ET) Wrong Epoch %u, should be %u\n" % - (u[0], self.epoch)) - # reset epoch - self.epoch = -1 - return "(ET) tod %u\n" % u[0] - - def msg_EL(self, payload): - "[EL] decode, Satellite Elevations" - - s = " elev" - for i in range(0, len(payload) - 1): - # looking for integer (-90 to 90), not byte - u = struct.unpack_from('<b', payload, i) - s += " " + self.i1_s(u[0]) - - return s + '\n' - - def msg_ER(self, payload): - "[ER] decode, Error messages" - - parts = payload.split(b'%') - if 1 < len(parts): - self.last_statement_identifier = parts[1] - - s_payload = "".join(map(chr, payload)) - print("[ER] %s\n" % s_payload) - return " %s\n" % s_payload - - def msg_EU(self, payload): - "[EU] decode, GALILEO UTC and GPS Time Parameters" - - u = struct.unpack_from('<dfLHbBHbffLHH', payload, 0) - return (" ao %f a1 %f tot %u wnt %u dtls %d dn %u wnlsf %u\n" - " dtlsf %d a0g %f a1g %f t0g %u wn0g %u flags %#x\n" % u) - - def msg_FC(self, payload): - "[FC] [F1] [F2] [F3] [f5] [Fl] decode, Signal Lock Loop Flags" - - s = " flags 0x" - for i in range(0, len(payload) - 1): - u = struct.unpack_from('<H', payload, i) - s += " %2x" % (u[0]) - - return s + '\n' - - def msg__E1(self, payload): - "[?E] decode, BeiDos, GPS, GALILEO, IRNSS Ephemeris " - m_len = len(payload) - # [GE] - if ('[IE]' == self.s_id) and (124 > m_len): - return " Bad Length %s" % m_len - if ('[CN]' == self.s_id) and (132 > m_len): - return " Bad Length %s" % m_len - if ('[EN]' == self.s_id) and (145 > m_len): - return " Bad Length %s" % m_len - - u = struct.unpack_from('<BLBhlbBhfffflhddddddfffffffff', payload, 0) - s = (" sv %u tow %u flags %u iodc %d toc %d ura %d healthS %u\n" - " wn %d tgd %f af2 %f af1 %f af0 %f toe %d\n" - " iode %d rootA %f ecc %f m0 %f omega0 %f\n" - " inc0 %f argPer %f deln %f omegaDot %f\n" - " incDot %f crc %f crs %f cuc %f\n" - " cus %f cic %f cis %f\n" % u) - - if '[EN]' == self.s_id: - u = struct.unpack_from('<fffffBB', payload, 122) - s += (" bgdE1E5a %f bgdE1E5b %f aio %f ai1 %f ai2 %f\n" - " sfi %u navType %u" % u) - if 149 <= m_len: - # DAf0 added in 3.7.0 - u = struct.unpack_from('<f', payload, 144) - s += (" DAf0 %f" % u) - s += '\n' - - if ('[IE]' == self.s_id) and (124 > m_len): - u = struct.unpack_from('<B', payload, 122) - s += (" navType %u\n" % u[0]) - - if ('[CN]' == self.s_id) and (132 > m_len): - u = struct.unpack_from('<fBf', payload, 122) - s += (" tgd2 %f navType %u DAf0 %f\n" % u) - - # TODO: decode length 160 168 - - return s - - def msg_GT(self, payload): - "[GT] decode, GPS Time " - - u = struct.unpack_from('<LH', payload, 0) - return " tow %u wn %d\n" % u - - def msg_ID(self, payload): - "[ID] Ionosphere Delays" - - s = ' delay' - for i in range(0, len(payload) - 1, 4): - u = struct.unpack_from('<f', payload, i) - s += " %s" % self.f4_s(u[0]) - - return s + '\n' - - def msg_IO(self, payload): - "[IO] decode, GPS Ionospheric Parameters" - - u = struct.unpack_from('<LHffffffff', payload, 0) - - return (" tot %d wn %u alpha0 %f alpha1 %f alpha2 %f\n" - " alpha3 %f beta0 %u beta1 %d beta2 %f\n" - " beta3 %f\n" % u) - - def msg_LO(self, payload): - "[LO] decode, undocumented message" - - return " Undocumented message\n" - - def msg_MF(self, payload): - "[MF] Messages Format" - - u = struct.unpack_from('<BBBBBBB', payload, 0) - return (" id %c%c majorVer %c%c minorVer %c%c order %c\n" % - (chr(u[0]), chr(u[1]), chr(u[2]), chr(u[3]), - chr(u[4]), chr(u[5]), chr(u[6]))) - - def msg_PM(self, payload): - "[PM] parameters" - - # PM only seems to work after a coldboot, once - # zerk -v 2 -w 20 -c 'out,,jps/{PM}' -W - return " %s\n" % payload - - def msg_PV(self, payload): - "[PV] decode, Cartesian Position and Velocity" - - u = struct.unpack_from('<dddfffffBB', payload, 0) - return (" x %s y %s z %s sigma %s\n" - " vx %s vy %s vz %s\n" - " vsigma %s soltype %s\n" % - (self.f8_s(u[0]), self.f8_s(u[1]), self.f8_s(u[2]), - self.f4_s(u[3]), self.f4_s(u[4]), self.f4_s(u[5]), - self.f4_s(u[6]), self.f4_s(u[7]), self.soltypes[u[8]])) - - def msg_RD(self, payload): - "[RD] decode, Receiver Date" - - u = struct.unpack_from('<HBBB', payload, 0) - return " year %d month %d day %d base %d\n" % u - - def msg_RE(self, payload): - "[RE] decode" - - parts = payload.split(b'%') - if 1 < len(parts): - # Got a statement identifier (ID), save it? - # Multiline statement if payload ends with comma or left brace - if payload[-1] not in (ord(','), ord('{')): - # yes, this is the end - self.last_statement_identifier = parts[1] - - # Get the message body - part1 = parts[1].split(b',') - - if 'em' == parts[1]: - # Probably no parts[2] - print("Enable Messages %s" % parts[2]) - return " Enable Messages %s\n" % parts[2] - - if 'id' == parts[1]: - print("ID: %s" % parts[2]) - return " ID %s\n" % parts[2] - - if 'opts' == part1[0]: - if 1 < len(part1): - s = "OAF %s: %s" % (part1[1], parts[2]) - else: - s = " OAF: %s" % (parts[2]) - print(s) - return " %s\n" % s - - if 'serial' == parts[1]: - print("SERIAL: %s" % parts[2]) - return " SERIAL %s\n" % parts[2] - - if 'vendor' == parts[1]: - print("VENDOR: %s" % parts[2]) - return " Vendor %s\n" % parts[2] - - if 'ver' == parts[1]: - print("VER: %s" % parts[2]) - return " Version %s\n" % parts[2] - - # unknown statement identifier - s_payload = "".join(map(chr, payload)) - print("RE: %s\n" % s_payload) - - return " %s\n" % s_payload - - def msg_RT(self, payload): - "[~~](RT) decode, Receiver Time, start of epoch" - - if self.epoch is not None and -1 != self.epoch: - print("Error: [~~](RT) missing [::](ET)\n") - - u = struct.unpack_from('<L', payload, 0) - # save start of epoch - self.epoch = u[0] - return "(RT) tod %u\n" % self.epoch - - def msg_S_(self, payload): - "[CS], [ES], [GS], [Is], [WS], [NS], [QS], decode, SVs Status" - - # to poll them all: zerk -W -w 2 -v 2 -c "out,,jps/{CS,ES,GS,Is,WS,QS}" - # TODO, check @checksum - - return "%s" % payload - - def msg_SE(self, payload): - "[SE] decode" - - u = struct.unpack_from('<BBBBB', payload, 0) - return " data 0x %x %x %x %x %x\n" % u - - def msg_SG(self, payload): - "[SG] decode" - - u = struct.unpack_from('<ffffBB', payload, 0) - return (" hpos %s vpos %s hvel %s vvel %s\n" - " soltype %s\n" % - (self.f4_s(u[0]), self.f4_s(u[1]), self.f4_s(u[2]), - self.f4_s(u[3]), self.soltypes[u[4]])) - - def msg_SI(self, payload): - "[SI] decode, Satellite Index, deprecated by Javad, use [SX]" - - # [SX] require 3.7 firmware, we use [SI] to support 3.6 - s = " usi" - for i in range(0, len(payload) - 1): - s += " %d" % payload[i] - - return s + '\n' - - def msg_SP(self, payload): - "[SP] decode, Position Covariance Matrix" - - u = struct.unpack_from('<ffffffffffB', payload, 0) - return (" xx % f yy % f zz % f tt % f xy % f\n" - " xz % f xt % f yz % f yt % f zt % f\n" - " solType %s\n" % - (u[0], u[1], u[2], u[3], u[4], - u[5], u[6], u[7], u[8], u[9], - self.soltypes[u[10]])) - - def msg_SS(self, payload): - "[SS] decode, Satellite Navigation Status" - - s = " ns" - for i in range(0, len(payload) - 2): - s += " %d" % payload[i] - - return (s + '\n solType %s\n' % - self.soltypes[payload[len(payload) - 2]]) - - def msg_ST(self, payload): - "[ST] decode, Solution Time Tag" - - u = struct.unpack_from('<LBB', payload, 0) - return (" time %u ms, soltype %s\n" % - (u[0], self.soltypes[u[1]])) - - def msg_SX(self, payload): - "[SX] decode, Extended Satellite Indices" - - # [SX] require 3.7 firmware - s = " ESI" - for i in range(0, len(payload) - 2, 2): - u = struct.unpack_from('<BB', payload, i) - s += " (%u, %u)" % u - - return s + '\n' - - def msg_TC(self, payload): - "[TC] decode, CA/L1 Continous Tracking Time" - - s = " tt" - for i in range(0, len(payload) - 1, 2): - u = struct.unpack_from('<H', payload, i) - s += " %.2f" % u[0] - - return s + '\n' - - def msg_TO(self, payload): - "[TO] decode, Reference Time to Receiver Time Offset" - - u = struct.unpack_from('<dd', payload, 0) - return " val %.3f sval %.3f\n" % u - - def msg_UO(self, payload): - "[UO] decode, GPS UTC Time Parameters" - - u = struct.unpack_from('<dfLHbBHb', payload, 0) - return (" a0 %f a1 %f tot %d wnt %d dtls %d\n" - " dn %d wnlsf %d dtlsf %d\n" % u) - - def msg_WA(self, payload): - "[WA] decode" - - u = struct.unpack_from('<BBBBLdddfffLH', payload, 0) - return (" waasPrn %d gpsPrn %d if %d healthS %d tod %d\n" - " ECEF %.3f %.3f %.3f, %.3f %.3f %.3f\n" - " tow %d wn %d\n" % u) - - def msg_WU(self, payload): - "[WU] decode, SBAS UTC Time Parameters" - - u = struct.unpack_from('<dfLHbBHbfbLHB', payload, 0) - return (" ao %f a1 %f tot %u wnt %u dtls %d dn %u\n" - "wnlsf %u dtlsf %d utcsi %d tow %u wn %u flags %#x\n" % u) - - # table from message id to respective message decoder. - # Note: id (%id%) is different than ID (statement identifier) - # the id is the first two characters of a GREIS receiver Message - # see section 3.3 of the specification - messages = { - '[0d]': (msg__d, 1), - '[1d]': (msg__d, 1), - '[1E]': (msg__E, 1), - '[1p]': (msg__p, 1), - '[1r]': (msg__r, 1), - '[2d]': (msg__d, 1), - '[2E]': (msg__E, 1), - '[2p]': (msg__p, 1), - '[2r]': (msg__r, 1), - '[3d]': (msg__d, 1), - '[3E]': (msg__E, 1), - '[3p]': (msg__p, 1), - '[3r]': (msg__r, 1), - '[5d]': (msg__d, 1), - '[5E]': (msg__E, 1), - '[5p]': (msg__p, 1), - '[5r]': (msg__r, 1), - '[AZ]': (msg_AZ, 1), - '[BP]': (msg_BP, 5), - '[c1]': (msg_c_, 1), - '[c2]': (msg_c_, 1), - '[c3]': (msg_c_, 1), - '[c5]': (msg_c_, 1), - '[CA]': (msg__A, 47), - '[cc]': (msg_c_, 1), - '[CE]': (msg__E, 1), - '[cl]': (msg_c_, 1), - '[CN]': (msg__E1, 123), - '[cp]': (msg__p, 1), - '[cr]': (msg__r, 1), - '[CS]': (msg_S_, 8), - '[DC]': (msg_DC, 1), - '[DO]': (msg_DO, 6), - '[DP]': (msg_DP, 18), - '[E1]': (msg_E_, 1), - '[E2]': (msg_E_, 1), - '[E3]': (msg_E_, 1), - '[E5]': (msg_E_, 1), - '[EA]': (msg__A, 47), - '[EC]': (msg_E_, 1), - '[El]': (msg_E_, 1), - '[EL]': (msg_EL, 1), - '[EN]': (msg__E1, 123), - '[ER]': (msg_ER, 1), - '[ES]': (msg_S_, 8), - '[EU]': (msg_EU, 40), - '[F1]': (msg_FC, 1), - '[F2]': (msg_FC, 1), - '[F3]': (msg_FC, 1), - '[F5]': (msg_FC, 1), - '[FA]': (msg_FC, 1), - '[FC]': (msg_FC, 1), - '[Fl]': (msg_FC, 1), - '[GA]': (msg__A, 47), - '[GE]': (msg__E1, 123), - '[GS]': (msg_S_, 8), - '[GT]': (msg_GT, 7), - '[IA]': (msg__A, 47), - '[ID]': (msg_ID, 1), - '[IE]': (msg__E1, 123), - '[IO]': (msg_IO, 39), - '[Is]': (msg_S_, 8), - '[ld]': (msg__d, 1), - '[lE]': (msg__E, 1), - '[lp]': (msg__p, 1), - '[lr]': (msg__r, 1), - '[LO]': (msg_LO, 1), - '[MF]': (msg_MF, 9), - '[::]': (msg_ET, 4), - '[~~]': (msg_RT, 4), - '[NS]': (msg_S_, 8), - '[PM]': (msg_PM, 0), - '[PV]': (msg_PV, 46), - '[QA]': (msg__A, 47), - '[QE]': (msg__E1, 123), - '[QS]': (msg_S_, 8), - '[r1]': (msg_r, 1), - '[r2]': (msg_r, 1), - '[r3]': (msg_r, 1), - '[r5]': (msg_r, 1), - '[rc]': (msg_r, 1), - '[RD]': (msg_RD, 6), - '[RE]': (msg_RE, 1), - '[rl]': (msg_r, 1), - '[rx]': (msg_r, 1), - '[SE]': (msg_SE, 6), - '[SG]': (msg_SG, 18), - '[SI]': (msg_SI, 1), - '[SP]': (msg_SP, 42), - '[SS]': (msg_SS, 1), - '[ST]': (msg_ST, 6), - '[SX]': (msg_SX, 1), - '[TC]': (msg_TC, 1), - '[TO]': (msg_TO, 6), - '[UO]': (msg_UO, 24), - '[WA]': (msg_WA, 51), - '[WE]': (msg_WE, 73), - '[WS]': (msg_S_, 8), - '[WU]': (msg_WU, 40), - } - - def decode_msg(self, out): - "Decode one message and then return number of chars consumed" - - state = 'BASE' - consumed = 0 - # raw message, sometimes used for checksum calc - m_raw = bytearray(0) - - # decode state machine - for this_byte in out: - consumed += 1 - if isinstance(this_byte, str): - # a character, probably read from a file - c = ord(this_byte) - else: - # a byte, probably read from a serial port - c = int(this_byte) - - if VERB_RAW <= opts['verbosity']: - if (ord(' ') <= c) and (ord('~') >= c): - # c is printable - print("state: %s char %c (%#x)" % (state, chr(c), c)) - else: - # c is not printable - print("state: %s char %#x" % (state, c)) - - m_raw.extend([c]) - - # parse input stream per GREIS Ref Guide Section 3.3.3 - if 'BASE' == state: - # start fresh - # place to store 'comments' - comment = '' - # message id byte one - m_id1 = 0 - # message id byte two - m_id2 = 0 - # message length as integer - m_len = 0 - # byte array to hold payload, including possible checksum - m_payload = bytearray(0) - m_raw = bytearray(0) - m_raw.extend([c]) - - if (ord('0') <= c) and (ord('~') >= c): - # maybe id 1, '0' to '~' - state = 'ID1' - - # start the grab - m_id1 = c - continue - - if ord("%") == c: - # start of %ID%, Receiver Input Language - # per GREIS Ref Guide Section 2.2 - state = 'RIL' - - # start fresh - comment = "%" - continue - - if ord("$") == c: - # NMEA line, treat as comment - state = 'NMEA' - - # start fresh - comment = "$" - continue - - if ord("#") == c: - # comment line - state = 'COMMENT' - - # start fresh - comment = "#" - continue - - if ord('\n') == c or ord('\r') == c: - # stray newline or linefeed, eat it - return consumed - - # none of the above, stay in BASE - continue - - if state in ('COMMENT', 'JSON', 'RIL'): - # inside comment - if ord('\n') == c or ord('\r') == c: - # Got newline or linefeed - # GREIS terminates messages on <CR> or <LF> - # Done, got a full message - if b'{"class":"ERROR"' in comment: - # always print gpsd errors - print(comment) - elif VERB_DECODE <= opts['verbosity']: - print(comment) - return consumed - else: - comment += chr(c) - continue - - if 'ID1' == state: - # maybe id 2, '0' to '~' - if ord('"') == c: - # technically could be GREIS, but likely JSON - state = 'JSON' - comment += chr(m_id1) + chr(c) - elif (ord('0') <= c) and (ord('~') >= c): - state = 'ID2' - m_id2 = c - else: - state = 'BASE' - continue - - if 'ID2' == state: - # maybe len 1, 'A' to 'F' - x = self.isuchex(c) - if -1 < x: - state = 'LEN1' - m_len = x * 256 - else: - state = 'BASE' - continue - - if 'LEN1' == state: - # maybe len 2, 'A' to 'F' - x = self.isuchex(c) - if -1 < x: - state = 'LEN2' - m_len += x * 16 - else: - state = 'BASE' - continue - - if 'LEN2' == state: - # maybe len 3, 'A' to 'F' - x = self.isuchex(c) - if -1 < x: - state = 'PAYLOAD' - m_len += x - else: - state = 'BASE' - continue - - if 'NMEA' == state: - # inside NMEA - if ord('\n') == c or ord('\r') == c: - # Got newline or linefeed - # done, got a full message - # GREIS terminates messages on <CR> or <LF> - if VERB_DECODE <= opts['verbosity']: - print(comment) - return consumed - else: - comment += chr(c) - continue - - if 'PAYLOAD' == state: - # getting payload - m_payload.extend([c]) - if len(m_payload) < m_len: - continue - - # got entire payload - self.s_id = "[%c%c]" % (chr(m_id1), chr(m_id2)) - if VERB_DECODE <= opts['verbosity']: - x_payload = binascii.hexlify(m_payload) - - # [RE], [ER] and more have no 8-bit checksum - # assume the rest do - if ((self.s_id not in ('[CS]', '[ER]', '[ES]', '[GS]', '[Is]', - '[MF]', '[NS]', '[PM]', '[QS]', '[RE]', - '[WS]') and - not self.checksum_OK(m_raw))): - print("ERROR: Bad checksum\n") - if VERB_DECODE <= opts['verbosity']: - print("DECODE: id: %s len: %d\n" - "DECODE: payload: %s\n" % - (self.s_id, m_len, x_payload)) - # skip it. - return consumed - - if self.s_id in self.messages: - if VERB_INFO <= opts['verbosity']: - print("INFO: id: %s len: %d\n" - "INFO: payload: %s\n" % - (self.s_id, m_len, x_payload)) - - (decode, length) = self.messages[self.s_id] - if m_len < length: - print("DECODE: %s Bad Length %s\n" % - (self.s_id, m_len)) - else: - s = self.s_id + decode(self, m_payload) - if VERB_DECODE <= opts['verbosity']: - print(s) - else: - # unknown message - if VERB_DECODE <= opts['verbosity']: - print("DECODE: Unknown: id: %s len: %d\n" - "DECODE: payload: %s\n" % - (self.s_id, m_len, x_payload)) - return consumed - - # give up - state = 'BASE' - - # fell out of loop, no more chars to look at - return 0 - - def checksum_OK(self, raw_msg): - "Check the i8-bit checksum on a message, return True if good" - - # some packets from the GPS use CRC16, some i8-bit checksum, some none - # only 8-bit checksum done here for now - calc_checksum = self.checksum(raw_msg, len(raw_msg) - 1) - rcode = raw_msg[len(raw_msg)-1] == calc_checksum - if VERB_RAW <= opts['verbosity']: - print("Checksum was %#x, calculated %#x" % - (raw_msg[len(raw_msg)-1], calc_checksum)) - return rcode - - def _rol(self, value): - "rotate a byte left 2 bits" - return ((value << 2) | (value >> 6)) & 0x0ff - - def checksum(self, msg, m_len): - "Calculate GREIS 8-bit checksum" - - # Calculated per section A.1.1 of the specification - # msg may be bytes (incoming messages) or str (outgoing messages) - - ck = 0 - for c in msg[0:m_len]: - if isinstance(c, str): - # a string, make a byte - c = ord(c) - ck = self._rol(ck) ^ c - - return self._rol(ck) & 0x0ff - - def make_pkt(self, m_data): - "Build an output message, always ASCII, add checksum and terminator" - - # build core message - - # no leading spaces, checksum includes the @ - m_data = m_data.lstrip() + b'@' - - chk = self.checksum(m_data, len(m_data)) - - # all commands end with CR and/or LF - return m_data + (b'%02X' % chk) + b'\n' - - def gps_send(self, m_data): - "Send message to GREIS GPS" - - m_all = self.make_pkt(m_data) - if not opts['read_only']: - io_handle.ser.write(m_all) - if VERB_INFO <= opts['verbosity']: - print("sent:", m_all) - self.decode_msg(m_all) - sys.stdout.flush() - - # Table of known options. From table 4-2 of the specification. - oafs = ( - b"_AJM", - b"AUTH", - b"_BLT", - b"_CAN", - b"CDIF", - b"CMRI", - b"CMRO", - b"COMP", - b"COOP", - b"COPN", - b"CORI", - b"_CPH", - b"DEVS", - b"DIST", - b"_DTM", - b"_E5B", - b"_E6_", - b"EDEV", - b"ETHR", - b"EVNT", - b"_FRI", - b"_FRO", - b"_FTP", - b"_GAL", - b"GBAI", - b"GBAO", - b"GCLB", - b"_GEN", - b"_GEO", - b"_GLO", - b"_GPS", - b"_GSM", - b"HTTP", - b"_IMU", - b"INFR", - b"IRIG", - b"IRNS", - b"JPSI", - b"JPSO", - b"_L1C", - b"_L1_", - b"_L2C", - b"_L2_", - b"_L5_", - b"LAT1", - b"LAT2", - b"LAT3", - b"LAT4", - b"LCS2", - b"L_CS", - b"_LIM", - b"LON1", - b"LON2", - b"LON3", - b"LON4", - b"MAGN", - b"_MEM", - b"_MPR", - b"OCTO", - b"OMNI", - b"_PAR", - b"PDIF", - b"_POS", - b"_PPP", - b"_PPS", - b"PRTT", - b"_PTP", - b"QZSS", - b"RAIM", - b"_RAW", - b"RCVT", - b"RM3I", - b"RM3O", - b"RS_A", - b"RS_B", - b"RS_C", - b"RS_D", - b"RTMI", - b"RTMO", - b"SPEC", - b"TCCL", - b"_TCP", - b"TCPO", - b"_TLS", - b"TRST", - b"UDPO", - b"_UHF", - b"_USB", - b"WAAS", - b"WIFI", - b"_WPT", - ) - - def send_able_4hz(self, able): - "enable basic GREIS messages at 4Hz" - - self.expect_statement_identifier = 'greis' - - # the messages we want - # [SX] requires 3.7 firmware, we use [SI] to support 3.6 - messages = b"jps/{RT,UO,GT,PV,SG,DP,SI,EL,AZ,EC,SS,ET}" - - if able: - # Message rate must be an integer multiple of /par/raw/msint - # Default msint is 0.100 seconds, so that must be changed first - self.gps_send(b"%msint%set,/par/raw/msint,250") - - self.gps_send(b"%greis%em,," + messages + b":0.25") - else: - self.gps_send(b"%greis%dm,," + messages) - - def send_able_comp(self, able): - "dis/enable COMPASS, aka BeiDou" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/comp," + en_dis) - - def send_able_constellations(self, able): - "dis/enable all constellations" - self.expect_statement_identifier = 'cons7' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons1%set,/par/pos/sys/comp," + en_dis) - self.gps_send(b"%cons2%set,/par/pos/sys/gal," + en_dis) - # this will fail on TR-G2H, as it has no GLONASS - self.gps_send(b"%cons3%set,/par/pos/sys/glo," + en_dis) - self.gps_send(b"%cons4%set,/par/pos/sys/gps," + en_dis) - self.gps_send(b"%cons5%set,/par/pos/sys/irnss," + en_dis) - self.gps_send(b"%cons6%set,/par/pos/sys/sbas," + en_dis) - self.gps_send(b"%cons7%set,/par/pos/sys/qzss," + en_dis) - - def send_able_defmsg(self, able): - "dis/enable default messages at 1Hz" - self.expect_statement_identifier = 'defmsg' - if able: - self.gps_send(b"%defmsg%em,,jps/RT,/msg/def:1,jps/ET") - else: - # leave RT and ET to break less? - self.gps_send(b"%defmsg%dm,,/msg/def:1") - - def send_able_gal(self, able): - "dis/enable GALILEO" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/gal," + en_dis) - - def send_able_glo(self, able): - "dis/enable GLONASS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/glo," + en_dis) - - def send_able_gps(self, able): - "dis/enable GPS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/gps," + en_dis) - - def send_able_ipr(self, able): - "dis/enable all Integer Psuedo-Range messages" - self.expect_statement_identifier = 'em' - if able: - self.gps_send(b"%em%em,,jps/{rx,rc,r1,r2,r3,r5,rl}:0.25") - else: - self.gps_send(b"%em%dm,,jps/{rx,rc,r1,r2,r3,r5,rl}") - - def send_able_irnss(self, able): - "dis/enable IRNSS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/irnss," + en_dis) - - def send_able_nmea41(self, able): - "dis/enable basic NMEA 4.1e messages at 4Hz" - - self.expect_statement_identifier = 'nmea' - - messages = b"nmea/{GBS,GGA,GSA,GST,GSV,RMC,VTG,ZDA}" - - if able: - # set NMEA version 4.1e - self.gps_send(b"%nmeaver%set,/par/nmea/ver,v4.1e") - - # Message rate must be an integer multiple of /par/raw/msint - # Default msint is 0.100 seconds, so that must be changed first - self.gps_send(b"%msint%set,/par/raw/msint,250") - - # now we can set the messages we want - self.gps_send(b"%nmea%em,," + messages + b":0.25") - else: - # disable - self.gps_send(b"%nmea%dm,," + messages) - - def send_able_sbas(self, able): - "dis/enable SBAS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/sbas," + en_dis) - - def send_able_qzss(self, able): - "dis/enable QZSS" - self.expect_statement_identifier = 'cons' - en_dis = b'y' if 1 == able else b'n' - self.gps_send(b"%cons%set,/par/pos/sys/qzss," + en_dis) - - def send_able_snr(self, able): - "dis/enable all SNR messages, except [EC]" - self.expect_statement_identifier = 'em' - if able: - self.gps_send(b"%em%em,,jps/{E1,E2,E3,E5,El}:0.25") - else: - self.gps_send(b"%em%dm,,jps/{E1,E2,E3,E5,El}") - - able_commands = { - # en/disable basic GREIS messages at 4HZ - "4HZ": {"command": send_able_4hz, - "help": "basic GREIS messages at 4Hz"}, - # en/disable all constellations - "CONS": {"command": send_able_constellations, - "help": "all constellations"}, - # en/disable COMPASS, aka Beidou - "COMPASS": {"command": send_able_comp, - "help": "COMPASS"}, - # en/disable default message set. - "DEFMSG": {"command": send_able_defmsg, - "help": "default message set at 1Hz"}, - # en/disable GALILEO - "GALILEO": {"command": send_able_gal, - "help": "GALILEO"}, - # en/disable GLONASS - "GLONASS": {"command": send_able_glo, - "help": "GLONASS"}, - # en/disable GPS - "GPS": {"command": send_able_gps, - "help": "GPS"}, - # en/disable Integer Psuedo Range messages - "IPR": {"command": send_able_ipr, - "help": "all Integer Psuedo Range messages"}, - # en/disable IRNSS - "IRNSS": {"command": send_able_irnss, - "help": "IRNSS"}, - # en/disable NMEA 4.1e - "NMEA": {"command": send_able_nmea41, - "help": "basic messages NMEA 4.1 at 4Hz"}, - # en/disable SBAS - "SBAS": {"command": send_able_sbas, - "help": "SBAS"}, - # en/disable all SNRs - "SNR": {"command": send_able_snr, - "help": "all SNR messages, except [EC]"}, - # en/disable QZSS - "QZSS": {"command": send_able_qzss, - "help": "QZSS"}, - } - - def send_coldboot(self): - "Delete NVRAM (almanac, ephemeris, location) and restart" - self.expect_statement_identifier = 'coldboot' - self.gps_send(b"%coldboot%init,/dev/nvm/a") - - def send_constellations(self): - "poll all constellations" - self.expect_statement_identifier = 'cons' - self.gps_send(b"%cons%print,/par/pos/sys:on") - - def send_get_id(self): - "get receiver id" - self.expect_statement_identifier = 'id' - self.gps_send(b"%id%print,/par/rcv/id") - - def send_get_oaf(self): - "poll OAF (GPS opts)" - - self.expect_statement_identifier = 'opts,_WPT' - if VERB_RAW <= opts['verbosity']: - # get list of all opts - self.gps_send(b"%opts,list%list,/par/opts") - - # request opts one at a time from canned list - for s in self.oafs: - self.gps_send(b"%%opts,%s%%print,/par/opts/%s" % (s, s)) - - def send_get_serial(self): - "get receiver serial number" - self.expect_statement_identifier = 'serial' - self.gps_send(b"%serial%print,/par/rcv/sn") - - def send_reset(self): - "reset (reboot) the GPS" - self.expect_statement_identifier = 'reset' - self.gps_send(b"%reset%set,/par/reset,y") - - def send_set_dm(self): - "disable all messages" - self.expect_statement_identifier = 'dm' - self.gps_send(b"%dm%dm") - - def send_set_ipr(self): - "poll Integer Psuedo-Range messages" - self.expect_statement_identifier = 'out' - self.gps_send(b"%out%out,,jps/{rx,rc,r1,r2,r3,r5,rl}") - - def send_get_snr(self): - "poll all SNR messages" - # nothing we can wait on, depending on GPS model/configuration - # we may never see some of E2, E3, E5 or El - self.gps_send(b"%out%out,,jps/{EC,E1,E2,E3,E5,El}") - - def send_set_speed(self, set_speed): - "change GPS speed" - self.expect_statement_identifier = 'setspeed' - self.gps_send(b"%%setspeed%%set,/par/cur/term/rate,%d" % - set_speed) - - def send_get_vendor(self): - "get receiver vendor" - self.expect_statement_identifier = 'vendor' - self.gps_send(b"%vendor%print,/par/rcv/vendor") - - def send_get_ver(self): - "get receiver version, per section 4.4.3 of the specification" - self.expect_statement_identifier = 'ver' - self.gps_send(b"%ver%print,/par/rcv/ver") - - # list of canned commands that can be sent to the receiver - commands = { - "COLDBOOT": {"command": send_coldboot, - "help": "cold boot the GPS"}, - "CONS": {"command": send_constellations, - "help": "poll enabled constellations"}, - "DM": {"command": send_set_dm, - "help": "disable all periodic messages"}, - "ID": {"command": send_get_id, - "help": "poll receiver ID"}, - "IPR": {"command": send_set_ipr, - "help": "poll all Integer Psuedo-range messages"}, - "OAF": {"command": send_get_oaf, - "help": "poll all OAF options"}, - "RESET": {"command": send_reset, - "help": "reset (reboot) the GPS"}, - "SERIAL": {"command": send_get_serial, - "help": "poll receiver serial number"}, - "SNR": {"command": send_get_snr, - "help": "poll all SNR messages"}, - "VENDOR": {"command": send_get_vendor, - "help": "poll GPS vendor"}, - "VER": {"command": send_get_ver, - "help": "poll GPS version"}, - } - - -class gps_io(object): - """All the GPS I/O in one place" - - Three types of GPS I/O - 1. read only from a file - 2. read/write through a device - 3. read only from a gpsd instance - """ - - out = b'' - ser = None - input_is_device = False - - def __init__(self, serial_class): - "Initialize class" - - Serial = serial_class - # buffer to hold read data - self.out = b'' - - # open the input: device, file, or gpsd - if opts['input_file_name'] is not None: - # check if input file is a file or device - try: - mode = os.stat(opts['input_file_name']).st_mode - except OSError: - sys.stderr.write('%s: failed to open input file %s\n' % - (PROG_NAME, opts['input_file_name'])) - sys.exit(1) - - if stat.S_ISCHR(mode): - # character device, need not be read only - self.input_is_device = True - - if ((opts['disable'] or opts['enable'] or opts['poll'] or - opts['oaf_name'])): - - # check that we can write - if opts['read_only']: - sys.stderr.write('%s: read-only mode, ' - 'can not send commands\n' % PROG_NAME) - sys.exit(1) - if self.input_is_device is False: - sys.stderr.write('%s: input is plain file, ' - 'can not send commands\n' % PROG_NAME) - sys.exit(1) - - if opts['target']['server'] is not None: - # try to open local gpsd - try: - self.ser = gps.gpscommon(host=None) - self.ser.connect(opts['target']['server'], - opts['target']['port']) - - # alias self.ser.write() to self.write_gpsd() - self.ser.write = self.write_gpsd - # ask for raw, not rare, data - data_out = b'?WATCH={' - if opts['target']['device'] is not None: - # add in the requested device - data_out += (b'"device":"' + opts['target']['device'] + - b'",') - data_out += b'"enable":true,"raw":2}\r\n' - if VERB_RAW <= opts['verbosity']: - print("sent: ", data_out) - self.ser.send(data_out) - except socket.error as err: - sys.stderr.write('%s: failed to connect to gpsd %s\n' % - (PROG_NAME, err)) - sys.exit(1) - - elif self.input_is_device: - # configure the serial connections (the parameters refer to - # the device you are connecting to) - - try: - self.ser = Serial.Serial( - baudrate=opts['input_speed'], - # 8N1 is GREIS default - bytesize=Serial.EIGHTBITS, - parity=Serial.PARITY_NONE, - port=opts['input_file_name'], - stopbits=Serial.STOPBITS_ONE, - # read timeout - timeout=0.05, - # pyserial Ver 3.0+ changes writeTimeout to write_timeout - # just set both - write_timeout=0.5, - writeTimeout=0.5, - ) - except Serial.serialutil.SerialException: - # this exception happens on bad serial port device name - sys.stderr.write('%s: failed to open serial port "%s"\n' - ' Your computer has these serial ports:\n' - % (PROG_NAME, opts['input_file_name'])) - - # print out list of supported ports - import serial.tools.list_ports as List_Ports - ports = List_Ports.comports() - for port in ports: - sys.stderr.write(" %s: %s\n" % - (port.device, port.description)) - sys.exit(1) - - # flush input buffer, discarding all its contents - self.ser.flushInput() - - else: - # Read from a plain file of GREIS messages - try: - self.ser = open(opts['input_file_name'], 'rb') - except IOError: - sys.stderr.write('%s: failed to open input %s\n' % - (PROG_NAME, opts['input_file_name'])) - sys.exit(1) - - def read(self, read_opts): - "Read from device, until timeout or expected message" - - # are we expecting a certain message? - if gps_model.expect_statement_identifier: - # assume failure, until we see expected message - ret_code = 1 - else: - # not expecting anything, so OK if we did not see it. - ret_code = 0 - - try: - if read_opts['target']['server'] is not None: - # gpsd input - start = time.clock() - while read_opts['input_wait'] > (time.clock() - start): - # First priority is to be sure the input buffer is read. - # This is to prevent input buffer overuns - if 0 < self.ser.waiting(): - # We have serial input waiting, get it - # No timeout possible - # RTCM3 JSON can be over 4.4k long, so go big - new_out = self.ser.sock.recv(8192) - if raw is not None: - # save to raw file - raw.write(new_out) - self.out += new_out - - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if ((gps_model.expect_statement_identifier and - (gps_model.expect_statement_identifier == - gps_model.last_statement_identifier))): - # Got what we were waiting for. Done? - ret_code = 0 - if not read_opts['input_forced_wait']: - # Done - break - - elif self.input_is_device: - # input is a serial device - start = time.clock() - while read_opts['input_wait'] > (time.clock() - start): - # First priority is to be sure the input buffer is read. - # This is to prevent input buffer overuns - # pyserial 3.0 replaces inWaiting() with in_waiting - if 0 < self.ser.inWaiting(): - # We have serial input waiting, get it - # 1024 is comfortably large, almost always the - # Read timeout is what causes ser.read() to return - new_out = self.ser.read(1024) - if raw is not None: - # save to raw file - raw.write(new_out) - self.out += new_out - - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if ((gps_model.expect_statement_identifier and - (gps_model.expect_statement_identifier == - gps_model.last_statement_identifier))): - # Got what we were waiting for. Done? - ret_code = 0 - if not read_opts['input_forced_wait']: - # Done - break - else: - # ordinary file, so all read at once - self.out += self.ser.read() - if raw is not None: - # save to raw file - raw.write(self.out) - - while True: - consumed = gps_model.decode_msg(self.out) - self.out = self.out[consumed:] - if 0 >= consumed: - break - - except IOError: - # This happens on a good device name, but gpsd already running. - # or if USB device unplugged - sys.stderr.write('%s: failed to read %s\n' - '%s: Is gpsd already holding the port?\n' - % (PROG_NAME, PROG_NAME, - read_opts['input_file_name'])) - return 1 - - if 0 < ret_code: - # did not see the message we were expecting to see - sys.stderr.write('%s: waited %0.2f seconds for, ' - 'but did not get: %%%s%%\n' - % (PROG_NAME, read_opts['input_wait'], - gps_model.expect_statement_identifier)) - return ret_code - - def write_gpsd(self, data): - "write data to gpsd daemon" - - # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. - # Input data is binary, converting to hex doubles its size. - # Limit binary data to length 255, so hex data length less than 510. - if 255 < len(data): - sys.stderr.write('%s: trying to send %d bytes, max is 255\n' - % (PROG_NAME, len(data))) - return 1 - - if opts['target']['device'] is not None: - # add in the requested device - data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' - else: - data_out = b'?DEVICE={' - - # Convert binary data to hex and build the message. - data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' - if VERB_RAW <= opts['verbosity']: - print("sent: ", data_out) - self.ser.send(data_out) - return 0 - - -def usage(): - "Print usage information, and exit" - - print("usage: %s [-?hrVW] [-c C] [-d D] [-e E] [-f F] [-O O] [-p P]\n" - " [-R R] [-S S] [-s S] [-v V] [-w W]\n" - " [server[:port[:device]]]\n\n" % PROG_NAME) - print('usage: %s [options]\n' - ' -? print this help\n' - ' -c C send command C to GPS\n' - ' -d D disable D\n' - ' -e E enable E\n' - ' -f F open F as file/device\n' - ' default: %s\n' - ' -h print this help\n' - ' -O O send OAF file to GPS\n' - ' -p P send preset GPS command P\n' - ' -R R save raw data from GPS in file R\n' - ' -r open file/device read only\n' - ' default: %s\n' - ' -S S configure GPS speed to S\n' - ' -s S set port speed to S\n' - ' default: %d bps\n' - ' -V print version\n' - ' -v V Set verbosity level to V, 0 to 4\n' - ' default: %d\n' - ' -W force entire wait time, no exit early\n' - ' -w W wait time, exit early on -p result\n' - ' default: %s seconds\n' - ' [server[:port[:device]]] Connect to gpsd\n' - ' default port: 2947\n' - ' default device: None\n' - '\n' - 'D and E can be one of:' % - (PROG_NAME, opts['input_file_name'], opts['raw_file'], - opts['input_speed'], opts['verbosity'], opts['input_wait']) - ) - - # print list of enable/disable commands - for item in sorted(gps_model.able_commands.keys()): - print(" %-12s %s" % (item, gps_model.able_commands[item]["help"])) - - print('\nthe preset GPS command P can be one of:') - - # print list of possible canned commands - for item in sorted(gps_model.commands.keys()): - print(" %-12s %s" % (item, gps_model.commands[item]["help"])) - print('\nOptions can be placed in the ZERKOPTS environment variable.\n' - 'ZERKOPTS is processed before the CLI options.') - sys.exit(0) - - -# create the GREIS instance -gps_model = greis() - -if 'ZERKOPTS' in os.environ: - # grab the ZERKOPTS environment variable for options - opts['progopts'] = os.environ['ZERKOPTS'] - options = opts['progopts'].split(' ') + sys.argv[1:] -else: - options = sys.argv[1:] - -try: - (options, arguments) = getopt.getopt(options, - "?c:d:e:f:hrp:s:w:v:O:R:S:WV") -except getopt.GetoptError as err: - sys.stderr.write("%s: %s\n" - "Try '%s -h' for more information.\n" % - (PROG_NAME, str(err), PROG_NAME)) - sys.exit(2) - -for (opt, val) in options: - if opt == '-c': - # command - opts['command'] = val - elif opt == '-d': - # disable - opts['disable'] = val - elif opt == '-e': - # enable - opts['enable'] = val - elif opt == '-f': - # file input - opts['input_file_name'] = val - elif opt == '-h' or opt == '-?': - # help - usage() - elif opt == '-p': - # preprogrammed command - opts['poll'] = val - elif opt == '-r': - # read only - opts['read_only'] = True - elif opt == '-s': - # serial port speed - opts['input_speed'] = int(val) - if opts['input_speed'] not in gps_model.speeds: - sys.stderr.write('%s: -s invalid speed %s\n' % - (PROG_NAME, opts['input_speed'])) - sys.exit(1) - - elif opt == '-w': - # max wait time, seconds - opts['input_wait'] = float(val) - elif opt in '-v': - # verbosity level - opts['verbosity'] = int(val) - elif opt in '-O': - # OAF .jpo file - opts['oaf_name'] = val - elif opt in '-R': - # raw log file - opts['raw_file'] = val - elif opt in '-S': - # set GPS serial port speed - opts['set_speed'] = int(val) - if opts['set_speed'] not in gps_model.speeds: - sys.stderr.write('%s: -S invalid speed %s\n' % - (PROG_NAME, opts['set_speed'])) - sys.exit(1) - - elif opt == '-W': - # forced wait, no early exit on command completion - opts['input_forced_wait'] = True - elif opt == '-V': - # version - sys.stderr.write('zerk: Version %s\n' % gps_version) - sys.exit(0) - -if opts['input_file_name'] is None: - # no input file given - # default to local gpsd - opts['target']['server'] = "localhost" - opts['target']['port'] = gps.GPSD_PORT - opts['target']['device'] = None - if arguments: - # server[:port[:device]] - arg_parts = arguments[0].split(':') - opts['target']['server'] = arg_parts[0] - if 1 < len(arg_parts): - opts['target']['port'] = arg_parts[1] - if 2 < len(arg_parts): - opts['target']['device'] = arg_parts[2] - -elif arguments: - sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) - sys.exit(1) - -if VERB_PROG <= opts['verbosity']: - # dump all options - print('Options:') - for option in sorted(opts): - print(" %s: %s" % (option, opts[option])) - -# done parsing arguments from environment and CLI - -try: - # raw log file requested? - raw = None - if opts['raw_file']: - try: - raw = open(opts['raw_file'], 'w') - except IOError: - sys.stderr.write('%s: failed to open raw file %s\n' % - (PROG_NAME, opts['raw_file'])) - sys.exit(1) - - # create the I/O instance - io_handle = gps_io(serial) - - # keep it simple, only one of -O, -c -d -e or -S - if opts['oaf_name'] is not None: - # parse an OAF file - try: - oaf_root = xml.etree.ElementTree.parse(opts['oaf_name']).getroot() - oaf = dict() - for tag in ('id', 'oaf', 'hash'): - oaf[tag] = oaf_root.find(tag).text - oaf['oaf'] = oaf['oaf'].split('\n') - if VERB_PROG <= opts['verbosity']: - print(oaf) - except xml.etree.ElementTree.ParseError: - sys.stderr.write('%s: failed to parse OAF "%s"\n' - % (PROG_NAME, opts['oaf_name'])) - sys.exit(1) - except IOError: - sys.stderr.write('%s: failed to read OAF "%s"\n' - % (PROG_NAME, opts['oaf_name'])) - sys.exit(1) - - # calculate hash - oaf_s = '\n'.join(oaf['oaf']) - hash_s = hashlib.sha1(oaf_s).hexdigest() - if hash_s != oaf['hash']: - sys.stderr.write('%s: OAF bad hash "%s", s/b %s\n' - % (PROG_NAME, hash_s, oaf['hash'])) - sys.exit(1) - - # TODO: probably should check the ID first... - # TODO: prolly should send one command per handshake - # blasting all commands at once, seems to not work reliably - for command in oaf['oaf']: - time.sleep(0.1) # wait 0.1 seconds each - gps_model.gps_send(command) - # this will detect when it is all done - gps_model.gps_send(b'%DONE%') - gps_model.expect_statement_identifier = 'DONE' - - elif opts['command'] is not None: - # zero length is OK to send - if 1 < len(opts['command']) and '%' != opts['command'][0]: - # add ID, if missing - gps_model.expect_statement_identifier = 'CMD' - opts['command'] = "%CMD%" + opts['command'] - - # add trailing new line - opts['command'] += "\n" - - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) - gps_model.gps_send(opts['command']) - - elif opts['disable'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) - if opts['disable'] in gps_model.able_commands: - command = gps_model.able_commands[opts['disable']] - command["command"](gps_model, 0) - else: - sys.stderr.write('%s: disable %s not found\n' % - (PROG_NAME, opts['disable'])) - sys.exit(1) - - elif opts['enable'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) - if opts['enable'] in gps_model.able_commands: - command = gps_model.able_commands[opts['enable']] - command["command"](gps_model, 1) - else: - sys.stderr.write('%s: enable %s not found\n' % - (PROG_NAME, opts['enable'])) - sys.exit(1) - - elif opts['poll'] is not None: - if VERB_QUIET < opts['verbosity']: - sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) - if opts['poll'] in gps_model.commands: - command = gps_model.commands[opts['poll']] - command["command"](gps_model) - else: - sys.stderr.write('%s: poll %s not found\n' % - (PROG_NAME, opts['poll'])) - sys.exit(1) - - elif opts['set_speed'] is not None: - gps_model.send_set_speed(opts['set_speed']) - - exit_code = io_handle.read(opts) - - if ((VERB_RAW <= opts['verbosity']) and io_handle.out): - # dump raw left overs - print("Left over data:") - print(io_handle.out) - - sys.stdout.flush() - io_handle.ser.close() - -except KeyboardInterrupt: - print('') - exit_code = 1 - -sys.exit(exit_code) |