#!/usr/bin/env python # -*- coding: UTF-8 ''' zerk -- GREIS configurator and packet decoder usage: zerk [OPTIONS] [server[:port[:device]]] ''' # This program conforms to the JAVAD document: # GNSS Receiver External Interface Specification # Revised: October 11, 2017 # # Hereafter referred to as "the specification" # # This file is Copyright (c) 2018 by the GPSD project # BSD terms apply: see the file COPYING in the distribution root for details. # # This code runs compatibly under Python 2 and 3.x for x >= 2. # Preserve this property! # # ENVIRONMENT: # Options in the ZERKOPTS environment variable will be parsed before # the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED' # # example usages: # Coldboot the GPS: zerk -p COLDBOOT # Print current serial port: zerk -c "print,/cur/term" # Decode raw log file: zerk -r -f greis-binary.log -v 2 # Change GPS port speed: zerk -S 230400 # Watch entire reset cycle: zerk -p RESET -v 2 -w 20 -W # poll SVs Status: zerk -W -w 2 -v 2 -c "out,,jps/{CS,ES,GS,Is,WS,QS}" # dump local gpsd data zerk -v 2 -w 5 localhost # # TODO: no CRC16 packets handled yet # TODO: more packet decodes from __future__ import absolute_import, print_function, division import binascii # for binascii.hexlify() import getopt # for getopt.getopt(), to parse CLI options import hashlib # for hashlib.sha1 import os # for os.environ import socket # for socket.error import stat # for stat.S_ISBLK() import struct # for pack() import sys import time import xml.etree.ElementTree # to parse .jpo files PROG_NAME = 'zerk' try: import serial except ImportError: serial = None # Defer complaining until we know we need it. try: import gps import gps.misc # for polybyte() polystr() except ImportError: # PEP8 says local imports last sys.stderr.write("%s: failed to import gps, check PYTHON_PATH\n" % PROG_NAME) sys.exit(2) gps_version = '3.19-dev' if gps.__version__ != gps_version: sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" % (PROG_NAME, gps_version, gps.__version__)) sys.exit(1) VERB_QUIET = 0 # quiet VERB_NONE = 1 # just output requested data and some info VERB_DECODE = 2 # decode all messages VERB_INFO = 3 # more info VERB_RAW = 4 # raw info VERB_PROG = 5 # program trace # dictionary to hold all user options opts = { # command to send to GPS, -c 'command': None, # command for -d disable 'disable': None, # command for -e enable 'enable': None, # default input -f file 'input_file_name': None, # default forced wait? -W 'input_forced_wait': False, # default port speed -s 'input_speed': 115200, # default input wait time -w in seconds 'input_wait': 2.0, # the name of an OAF file, extension .jpo 'oaf_name': None, # poll command -p 'poll': None, # raw log file name 'raw_file': None, # open port read only -r 'read_only': False, # speed to set GPS -S 'set_speed': None, # target gpsd (server:port:device) to connect to 'target': {"server": None, "port": gps.GPSD_PORT, "device": None}, # verbosity level, -v 'verbosity': VERB_NONE, # contents of environment variable ZERKOPTS 'progopts': '', } class greis(object): """A class for working with the GREIS GPS message formats This class contains functions to decode messages in the Javad GREIS "Receiver Input Language" and "Receiver Messages" formats. """ # when a statement identifier is received, it is stored here last_statement_identifier = None # expected statement identifier. expect_statement_identifier = False # ID of current message as a string s_id = '' def __init__(self): "Initialize class" self.last_statement_identifier = None self.expect_statement_identifier = False # last epoch received in [~~] # epoch == None means never got epoch, epoch == -1 means missing. self.epoch = None def f4_s(self, f): "convert an '! f4' to a string" if gps.isfinite(f): # yeah, the precision is a guess return "%.6f" % f return 'X' def f8_s(self, f): "convert an '! f8' to a string" if gps.isfinite(f): # yeah, the precision is a guess return "%.4f" % f return 'X' def i1_s(self, i): "convert an '! i1' to a string" return 'X' if i == 127 else str(i) def i2_s(self, i): "convert an '! i2' to a string" return 'X' if i == 32767 else str(i) def i4_s(self, i): "convert an '! i4' to a string" return 'X' if i == 2147483647 else str(i) def u1_s(self, u): "convert an '! u1' to a string" return 'X' if u == 255 else str(u) def u2_s(self, u): "convert an '! u2' to a string" return 'X' if u == 65535 else str(u) def u4_s(self, u): "convert an '! u4' to a string" return 'X' if u == 4294967295 else str(u) def isuchex(self, c): "Is byte an upper case hex char?" if 48 <= c and 57 >= c: # 0 to 9 return int(c) - 48 if 65 <= c and 70 >= c: # A to F return int(c) - 55 return -1 soltypes = {0: "None", 1: "3D", 2: "DGPS", 3: "RTK float", 4: "RTK fixed", 5: "fixed" } # allowable speeds speeds = (460800, 230400, 153600, 115200, 57600, 38400, 19200, 9600, 4800, 2400, 1200, 600, 300) def msg_c_(self, payload): "[c?] decode, Smoothing Corrections" s = ' smooth' for i in range(0, len(payload) - 1, 2): u = struct.unpack_from(' m_len): return " Bad Length %s" % m_len u = struct.unpack_from(' m_len): return " Bad Length %s" % m_len if ('[CN]' == self.s_id) and (132 > m_len): return " Bad Length %s" % m_len if ('[EN]' == self.s_id) and (145 > m_len): return " Bad Length %s" % m_len u = struct.unpack_from(' m_len): u = struct.unpack_from(' m_len): u = struct.unpack_from('= c): # c is printable print("state: %s char %c (%#x)" % (state, chr(c), c)) else: # c is not printable print("state: %s char %#x" % (state, c)) m_raw.extend([c]) # parse input stream per GREIS Ref Guide Section 3.3.3 if 'BASE' == state: # start fresh # place to store 'comments' comment = '' # message id byte one m_id1 = 0 # message id byte two m_id2 = 0 # message length as integer m_len = 0 # byte array to hold payload, including possible checksum m_payload = bytearray(0) m_raw = bytearray(0) m_raw.extend([c]) if (ord('0') <= c) and (ord('~') >= c): # maybe id 1, '0' to '~' state = 'ID1' # start the grab m_id1 = c continue if ord("%") == c: # start of %ID%, Receiver Input Language # per GREIS Ref Guide Section 2.2 state = 'RIL' # start fresh comment = "%" continue if ord("$") == c: # NMEA line, treat as comment state = 'NMEA' # start fresh comment = "$" continue if ord("#") == c: # comment line state = 'COMMENT' # start fresh comment = "#" continue if ord('\n') == c or ord('\r') == c: # stray newline or linefeed, eat it return consumed # none of the above, stay in BASE continue if state in ('COMMENT', 'JSON', 'RIL'): # inside comment if ord('\n') == c or ord('\r') == c: # Got newline or linefeed # GREIS terminates messages on or # Done, got a full message if '{"class":"ERROR"' in comment: # always print gpsd errors print(comment) elif VERB_DECODE <= opts['verbosity']: print(comment) return consumed else: comment += chr(c) continue if 'ID1' == state: # maybe id 2, '0' to '~' if ord('"') == c: # technically could be GREIS, but likely JSON state = 'JSON' comment += chr(m_id1) + chr(c) elif (ord('0') <= c) and (ord('~') >= c): state = 'ID2' m_id2 = c else: state = 'BASE' continue if 'ID2' == state: # maybe len 1, 'A' to 'F' x = self.isuchex(c) if -1 < x: state = 'LEN1' m_len = x * 256 else: state = 'BASE' continue if 'LEN1' == state: # maybe len 2, 'A' to 'F' x = self.isuchex(c) if -1 < x: state = 'LEN2' m_len += x * 16 else: state = 'BASE' continue if 'LEN2' == state: # maybe len 3, 'A' to 'F' x = self.isuchex(c) if -1 < x: state = 'PAYLOAD' m_len += x else: state = 'BASE' continue if 'NMEA' == state: # inside NMEA if ord('\n') == c or ord('\r') == c: # Got newline or linefeed # done, got a full message # GREIS terminates messages on or if VERB_DECODE <= opts['verbosity']: print(comment) return consumed else: comment += chr(c) continue if 'PAYLOAD' == state: # getting payload m_payload.extend([c]) if len(m_payload) < m_len: continue # got entire payload self.s_id = "[%c%c]" % (chr(m_id1), chr(m_id2)) if VERB_DECODE <= opts['verbosity']: x_payload = binascii.hexlify(m_payload) # [RE], [ER] and more have no 8-bit checksum # assume the rest do if ((self.s_id not in ('[CS]', '[ER]', '[ES]', '[GS]', '[Is]', '[MF]', '[NS]', '[PM]', '[QS]', '[RE]', '[WS]') and not self.checksum_OK(m_raw))): print("ERROR: Bad checksum\n") if VERB_DECODE <= opts['verbosity']: print("DECODE: id: %s len: %d\n" "DECODE: payload: %s\n" % (self.s_id, m_len, x_payload)) # skip it. return consumed if self.s_id in self.messages: if VERB_INFO <= opts['verbosity']: print("INFO: id: %s len: %d\n" "INFO: payload: %s\n" % (self.s_id, m_len, x_payload)) (decode, length) = self.messages[self.s_id] if m_len < length: print("DECODE: %s Bad Length %s\n" % (self.s_id, m_len)) else: s = self.s_id + decode(self, m_payload) if VERB_DECODE <= opts['verbosity']: print(s) else: # unknown message if VERB_DECODE <= opts['verbosity']: print("DECODE: Unknown: id: %s len: %d\n" "DECODE: payload: %s\n" % (self.s_id, m_len, x_payload)) return consumed # give up state = 'BASE' # fell out of loop, no more chars to look at return 0 def checksum_OK(self, raw_msg): "Check the i8-bit checksum on a message, return True if good" # some packets from the GPS use CRC16, some i8-bit checksum, some none # only 8-bit checksum done here for now calc_checksum = self.checksum(raw_msg, len(raw_msg) - 1) rcode = raw_msg[len(raw_msg) - 1] == calc_checksum if VERB_RAW <= opts['verbosity']: print("Checksum was %#x, calculated %#x" % (raw_msg[len(raw_msg) - 1], calc_checksum)) return rcode def _rol(self, value): "rotate a byte left 2 bits" return ((value << 2) | (value >> 6)) & 0x0ff def checksum(self, msg, m_len): "Calculate GREIS 8-bit checksum" # Calculated per section A.1.1 of the specification # msg may be bytes (incoming messages) or str (outgoing messages) ck = 0 for c in msg[0:m_len]: if isinstance(c, str): # a string, make a byte c = ord(c) ck = self._rol(ck) ^ c return self._rol(ck) & 0x0ff def make_pkt(self, m_data): "Build an output message, always ASCII, add checksum and terminator" # build core message # no leading spaces, checksum includes the @ m_data = m_data.lstrip() + b'@' chk = self.checksum(m_data, len(m_data)) # all commands end with CR and/or LF return m_data + (b'%02X' % chk) + b'\n' def gps_send(self, m_data): "Send message to GREIS GPS" m_all = self.make_pkt(m_data) if not opts['read_only']: io_handle.ser.write(m_all) if VERB_INFO <= opts['verbosity']: print("sent:", m_all) self.decode_msg(m_all) sys.stdout.flush() # Table of known options. From table 4-2 of the specification. oafs = ( b"_AJM", b"AUTH", b"_BLT", b"_CAN", b"CDIF", b"CMRI", b"CMRO", b"COMP", b"COOP", b"COPN", b"CORI", b"_CPH", b"DEVS", b"DIST", b"_DTM", b"_E5B", b"_E6_", b"EDEV", b"ETHR", b"EVNT", b"_FRI", b"_FRO", b"_FTP", b"_GAL", b"GBAI", b"GBAO", b"GCLB", b"_GEN", b"_GEO", b"_GLO", b"_GPS", b"_GSM", b"HTTP", b"_IMU", b"INFR", b"IRIG", b"IRNS", b"JPSI", b"JPSO", b"_L1C", b"_L1_", b"_L2C", b"_L2_", b"_L5_", b"LAT1", b"LAT2", b"LAT3", b"LAT4", b"LCS2", b"L_CS", b"_LIM", b"LON1", b"LON2", b"LON3", b"LON4", b"MAGN", b"_MEM", b"_MPR", b"OCTO", b"OMNI", b"_PAR", b"PDIF", b"_POS", b"_PPP", b"_PPS", b"PRTT", b"_PTP", b"QZSS", b"RAIM", b"_RAW", b"RCVT", b"RM3I", b"RM3O", b"RS_A", b"RS_B", b"RS_C", b"RS_D", b"RTMI", b"RTMO", b"SPEC", b"TCCL", b"_TCP", b"TCPO", b"_TLS", b"TRST", b"UDPO", b"_UHF", b"_USB", b"WAAS", b"WIFI", b"_WPT", ) def send_able_4hz(self, able): "enable basic GREIS messages at 4Hz" self.expect_statement_identifier = 'greis' # the messages we want # [SX] requires 3.7 firmware, we use [SI] to support 3.6 messages = b"jps/{RT,UO,GT,PV,SG,DP,SI,EL,AZ,EC,SS,ET}" if able: # Message rate must be an integer multiple of /par/raw/msint # Default msint is 0.100 seconds, so that must be changed first self.gps_send(b"%msint%set,/par/raw/msint,250") self.gps_send(b"%greis%em,," + messages + b":0.25") else: self.gps_send(b"%greis%dm,," + messages) def send_able_comp(self, able): "dis/enable COMPASS, aka BeiDou" self.expect_statement_identifier = 'cons' en_dis = b'y' if 1 == able else b'n' self.gps_send(b"%cons%set,/par/pos/sys/comp," + en_dis) def send_able_constellations(self, able): "dis/enable all constellations" self.expect_statement_identifier = 'cons7' en_dis = b'y' if 1 == able else b'n' self.gps_send(b"%cons1%set,/par/pos/sys/comp," + en_dis) self.gps_send(b"%cons2%set,/par/pos/sys/gal," + en_dis) # this will fail on TR-G2H, as it has no GLONASS self.gps_send(b"%cons3%set,/par/pos/sys/glo," + en_dis) self.gps_send(b"%cons4%set,/par/pos/sys/gps," + en_dis) self.gps_send(b"%cons5%set,/par/pos/sys/irnss," + en_dis) self.gps_send(b"%cons6%set,/par/pos/sys/sbas," + en_dis) self.gps_send(b"%cons7%set,/par/pos/sys/qzss," + en_dis) def send_able_defmsg(self, able): "dis/enable default messages at 1Hz" self.expect_statement_identifier = 'defmsg' if able: self.gps_send(b"%defmsg%em,,jps/RT,/msg/def:1,jps/ET") else: # leave RT and ET to break less? self.gps_send(b"%defmsg%dm,,/msg/def:1") def send_able_gal(self, able): "dis/enable GALILEO" self.expect_statement_identifier = 'cons' en_dis = b'y' if 1 == able else b'n' self.gps_send(b"%cons%set,/par/pos/sys/gal," + en_dis) def send_able_glo(self, able): "dis/enable GLONASS" self.expect_statement_identifier = 'cons' en_dis = b'y' if 1 == able else b'n' self.gps_send(b"%cons%set,/par/pos/sys/glo," + en_dis) def send_able_gps(self, able): "dis/enable GPS" self.expect_statement_identifier = 'cons' en_dis = b'y' if 1 == able else b'n' self.gps_send(b"%cons%set,/par/pos/sys/gps," + en_dis) def send_able_ipr(self, able): "dis/enable all Integer Psuedo-Range messages" self.expect_statement_identifier = 'em' if able: self.gps_send(b"%em%em,,jps/{rx,rc,r1,r2,r3,r5,rl}:0.25") else: self.gps_send(b"%em%dm,,jps/{rx,rc,r1,r2,r3,r5,rl}") def send_able_irnss(self, able): "dis/enable IRNSS" self.expect_statement_identifier = 'cons' en_dis = b'y' if 1 == able else b'n' self.gps_send(b"%cons%set,/par/pos/sys/irnss," + en_dis) def send_able_nmea41(self, able): "dis/enable basic NMEA 4.1e messages at 4Hz" self.expect_statement_identifier = 'nmea' messages = b"nmea/{GBS,GGA,GSA,GST,GSV,RMC,VTG,ZDA}" if able: # set NMEA version 4.1e self.gps_send(b"%nmeaver%set,/par/nmea/ver,v4.1e") # Message rate must be an integer multiple of /par/raw/msint # Default msint is 0.100 seconds, so that must be changed first self.gps_send(b"%msint%set,/par/raw/msint,250") # now we can set the messages we want self.gps_send(b"%nmea%em,," + messages + b":0.25") else: # disable self.gps_send(b"%nmea%dm,," + messages) def send_able_raw(self, able): """dis/enable Raw mode messages""" self.expect_statement_identifier = 'raw' messages = (b"jps/{RT,UO,GT,PV,SG,DP,SI,EL,AZ,EC,SS," b"PC,P1,P2,P3,P5,Pl," b"RC,R1,R2,R3,R5,Rl," b"DC,D1,D2,D3,D5,Dl," b"ET}") if able: self.gps_send(b"%raw%em,," + messages + b":1") else: self.gps_send(b"%raw%dm,," + messages) def send_able_sbas(self, able): "dis/enable SBAS" self.expect_statement_identifier = 'cons' en_dis = b'y' if 1 == able else b'n' self.gps_send(b"%cons%set,/par/pos/sys/sbas," + en_dis) def send_able_qzss(self, able): "dis/enable QZSS" self.expect_statement_identifier = 'cons' en_dis = b'y' if 1 == able else b'n' self.gps_send(b"%cons%set,/par/pos/sys/qzss," + en_dis) def send_able_snr(self, able): "dis/enable all SNR messages, except [EC]" self.expect_statement_identifier = 'em' if able: self.gps_send(b"%em%em,,jps/{E1,E2,E3,E5,El}:0.25") else: self.gps_send(b"%em%dm,,jps/{E1,E2,E3,E5,El}") able_commands = { # en/disable basic GREIS messages at 4HZ "4HZ": {"command": send_able_4hz, "help": "basic GREIS messages at 4Hz"}, # en/disable all constellations "CONS": {"command": send_able_constellations, "help": "all constellations"}, # en/disable COMPASS, aka Beidou "COMPASS": {"command": send_able_comp, "help": "COMPASS"}, # en/disable default message set. "DEFMSG": {"command": send_able_defmsg, "help": "default message set at 1Hz"}, # en/disable GALILEO "GALILEO": {"command": send_able_gal, "help": "GALILEO"}, # en/disable GLONASS "GLONASS": {"command": send_able_glo, "help": "GLONASS"}, # en/disable GPS "GPS": {"command": send_able_gps, "help": "GPS"}, # en/disable Integer Psuedo Range messages "IPR": {"command": send_able_ipr, "help": "all Integer Psuedo Range messages"}, # en/disable IRNSS "IRNSS": {"command": send_able_irnss, "help": "IRNSS"}, # en/disable NMEA 4.1e "NMEA": {"command": send_able_nmea41, "help": "basic messages NMEA 4.1 at 4Hz"}, # en/disable Psuedo Range, Carrier Phase and Doppler messages "RAW": {"command": send_able_raw, "help": "Raw mode messages"}, # en/disable SBAS "SBAS": {"command": send_able_sbas, "help": "SBAS"}, # en/disable all SNRs "SNR": {"command": send_able_snr, "help": "all SNR messages, except [EC]"}, # en/disable QZSS "QZSS": {"command": send_able_qzss, "help": "QZSS"}, } def send_coldboot(self): "Delete NVRAM (almanac, ephemeris, location) and restart" self.expect_statement_identifier = 'coldboot' self.gps_send(b"%coldboot%init,/dev/nvm/a") def send_constellations(self): "poll all constellations" self.expect_statement_identifier = 'cons' self.gps_send(b"%cons%print,/par/pos/sys:on") def send_get_id(self): "get receiver id" self.expect_statement_identifier = 'id' self.gps_send(b"%id%print,/par/rcv/id") def send_get_oaf(self): "poll OAF (GPS opts)" self.expect_statement_identifier = 'opts,_WPT' if VERB_RAW <= opts['verbosity']: # get list of all opts self.gps_send(b"%opts,list%list,/par/opts") # request opts one at a time from canned list for s in self.oafs: self.gps_send(b"%%opts,%s%%print,/par/opts/%s" % (s, s)) def send_get_serial(self): "get receiver serial number" self.expect_statement_identifier = 'serial' self.gps_send(b"%serial%print,/par/rcv/sn") def send_reset(self): "reset (reboot) the GPS" self.expect_statement_identifier = 'reset' self.gps_send(b"%reset%set,/par/reset,y") def send_set_dm(self): "disable all messages" self.expect_statement_identifier = 'dm' self.gps_send(b"%dm%dm") def send_set_ipr(self): "poll Integer Psuedo-Range messages" self.expect_statement_identifier = 'out' self.gps_send(b"%out%out,,jps/{rx,rc,r1,r2,r3,r5,rl}") def send_get_snr(self): "poll all SNR messages" # nothing we can wait on, depending on GPS model/configuration # we may never see some of E2, E3, E5 or El self.gps_send(b"%out%out,,jps/{EC,E1,E2,E3,E5,El}") def send_set_speed(self, set_speed): "change GPS speed" self.expect_statement_identifier = 'setspeed' self.gps_send(b"%%setspeed%%set,/par/cur/term/rate,%d" % set_speed) def send_get_vendor(self): "get receiver vendor" self.expect_statement_identifier = 'vendor' self.gps_send(b"%vendor%print,/par/rcv/vendor") def send_get_ver(self): "get receiver version, per section 4.4.3 of the specification" self.expect_statement_identifier = 'ver' self.gps_send(b"%ver%print,/par/rcv/ver") # list of canned commands that can be sent to the receiver commands = { "COLDBOOT": {"command": send_coldboot, "help": "cold boot the GPS"}, "CONS": {"command": send_constellations, "help": "poll enabled constellations"}, "DM": {"command": send_set_dm, "help": "disable all periodic messages"}, "ID": {"command": send_get_id, "help": "poll receiver ID"}, "IPR": {"command": send_set_ipr, "help": "poll all Integer Psuedo-range messages"}, "OAF": {"command": send_get_oaf, "help": "poll all OAF options"}, "RESET": {"command": send_reset, "help": "reset (reboot) the GPS"}, "SERIAL": {"command": send_get_serial, "help": "poll receiver serial number"}, "SNR": {"command": send_get_snr, "help": "poll all SNR messages"}, "VENDOR": {"command": send_get_vendor, "help": "poll GPS vendor"}, "VER": {"command": send_get_ver, "help": "poll GPS version"}, } class gps_io(object): """All the GPS I/O in one place" Three types of GPS I/O 1. read only from a file 2. read/write through a device 3. read only from a gpsd instance """ out = b'' ser = None input_is_device = False def __init__(self): "Initialize class" Serial = serial Serial_v3 = Serial and Serial.VERSION.split('.')[0] >= '3' # buffer to hold read data self.out = b'' # open the input: device, file, or gpsd if opts['input_file_name'] is not None: # check if input file is a file or device try: mode = os.stat(opts['input_file_name']).st_mode except OSError: sys.stderr.write('%s: failed to open input file %s\n' % (PROG_NAME, opts['input_file_name'])) sys.exit(1) if stat.S_ISCHR(mode): # character device, need not be read only self.input_is_device = True if ((opts['disable'] or opts['enable'] or opts['poll'] or opts['oaf_name'])): # check that we can write if opts['read_only']: sys.stderr.write('%s: read-only mode, ' 'can not send commands\n' % PROG_NAME) sys.exit(1) if self.input_is_device is False: sys.stderr.write('%s: input is plain file, ' 'can not send commands\n' % PROG_NAME) sys.exit(1) if opts['target']['server'] is not None: # try to open local gpsd try: self.ser = gps.gpscommon(host=None) self.ser.connect(opts['target']['server'], opts['target']['port']) # alias self.ser.write() to self.write_gpsd() self.ser.write = self.write_gpsd # ask for raw, not rare, data data_out = b'?WATCH={' if opts['target']['device'] is not None: # add in the requested device data_out += (b'"device":"' + opts['target']['device'] + b'",') data_out += b'"enable":true,"raw":2}\r\n' if VERB_RAW <= opts['verbosity']: print("sent: ", data_out) self.ser.send(data_out) except socket.error as err: sys.stderr.write('%s: failed to connect to gpsd %s\n' % (PROG_NAME, err)) sys.exit(1) elif self.input_is_device: # configure the serial connections (the parameters refer to # the device you are connecting to) # pyserial Ver 3.0+ changes writeTimeout to write_timeout # Using the wrong one causes an error write_timeout_arg = ('write_timeout' if Serial_v3 else 'writeTimeout') try: self.ser = Serial.Serial( baudrate=opts['input_speed'], # 8N1 is GREIS default bytesize=Serial.EIGHTBITS, parity=Serial.PARITY_NONE, port=opts['input_file_name'], stopbits=Serial.STOPBITS_ONE, # read timeout timeout=0.05, **{write_timeout_arg: 0.5} ) except AttributeError: sys.stderr.write('%s: failed to import pyserial\n' % PROG_NAME) sys.exit(2) except Serial.serialutil.SerialException: # this exception happens on bad serial port device name sys.stderr.write('%s: failed to open serial port "%s"\n' ' Your computer has these serial ports:\n' % (PROG_NAME, opts['input_file_name'])) # print out list of supported ports import serial.tools.list_ports as List_Ports ports = List_Ports.comports() for port in ports: sys.stderr.write(" %s: %s\n" % (port.device, port.description)) sys.exit(1) # flush input buffer, discarding all its contents # pyserial 3.0+ deprecates flushInput() in favor of # reset_input_buffer(), but flushInput() is still present. self.ser.flushInput() else: # Read from a plain file of GREIS messages try: self.ser = open(opts['input_file_name'], 'rb') except IOError: sys.stderr.write('%s: failed to open input %s\n' % (PROG_NAME, opts['input_file_name'])) sys.exit(1) def read(self, read_opts): "Read from device, until timeout or expected message" # are we expecting a certain message? if gps_model.expect_statement_identifier: # assume failure, until we see expected message ret_code = 1 else: # not expecting anything, so OK if we did not see it. ret_code = 0 try: if read_opts['target']['server'] is not None: # gpsd input start = time.clock() while read_opts['input_wait'] > (time.clock() - start): # First priority is to be sure the input buffer is read. # This is to prevent input buffer overuns if 0 < self.ser.waiting(): # We have serial input waiting, get it # No timeout possible # RTCM3 JSON can be over 4.4k long, so go big new_out = self.ser.sock.recv(8192) if raw is not None: # save to raw file raw.write(new_out) self.out += new_out consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if ((gps_model.expect_statement_identifier and (gps_model.expect_statement_identifier == gps_model.last_statement_identifier))): # Got what we were waiting for. Done? ret_code = 0 if not read_opts['input_forced_wait']: # Done break elif self.input_is_device: # input is a serial device start = time.clock() while read_opts['input_wait'] > (time.clock() - start): # First priority is to be sure the input buffer is read. # This is to prevent input buffer overuns # pyserial 3.0+ deprecates inWaiting() in favor of # in_waiting, but inWaiting() is still present. if 0 < self.ser.inWaiting(): # We have serial input waiting, get it # 1024 is comfortably large, almost always the # Read timeout is what causes ser.read() to return new_out = self.ser.read(1024) if raw is not None: # save to raw file raw.write(new_out) self.out += new_out consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if ((gps_model.expect_statement_identifier and (gps_model.expect_statement_identifier == gps_model.last_statement_identifier))): # Got what we were waiting for. Done? ret_code = 0 if not read_opts['input_forced_wait']: # Done break else: # ordinary file, so all read at once self.out += self.ser.read() if raw is not None: # save to raw file raw.write(self.out) while True: consumed = gps_model.decode_msg(self.out) self.out = self.out[consumed:] if 0 >= consumed: break except IOError: # This happens on a good device name, but gpsd already running. # or if USB device unplugged sys.stderr.write('%s: failed to read %s\n' '%s: Is gpsd already holding the port?\n' % (PROG_NAME, PROG_NAME, read_opts['input_file_name'])) return 1 if 0 < ret_code: # did not see the message we were expecting to see sys.stderr.write('%s: waited %0.2f seconds for, ' 'but did not get: %%%s%%\n' % (PROG_NAME, read_opts['input_wait'], gps_model.expect_statement_identifier)) return ret_code def write_gpsd(self, data): "write data to gpsd daemon" # HEXDATA_MAX = 512, from gps.h, The max hex digits can write. # Input data is binary, converting to hex doubles its size. # Limit binary data to length 255, so hex data length less than 510. if 255 < len(data): sys.stderr.write('%s: trying to send %d bytes, max is 255\n' % (PROG_NAME, len(data))) return 1 if opts['target']['device'] is not None: # add in the requested device data_out = b'?DEVICE={"path":"' + opts['target']['device'] + b'",' else: data_out = b'?DEVICE={' # Convert binary data to hex and build the message. data_out += b'"hexdata":"' + binascii.hexlify(data) + b'"}\r\n' if VERB_RAW <= opts['verbosity']: print("sent: ", data_out) self.ser.send(data_out) return 0 def usage(): "Print usage information, and exit" print("usage: %s [-?hrVW] [-c C] [-d D] [-e E] [-f F] [-O O] [-p P]\n" " [-R R] [-S S] [-s S] [-v V] [-w W]\n" " [server[:port[:device]]]\n\n" % PROG_NAME) print('usage: %s [options]\n' ' -? print this help\n' ' -c C send command C to GPS\n' ' -d D disable D\n' ' -e E enable E\n' ' -f F open F as file/device\n' ' default: %s\n' ' -h print this help\n' ' -O O send OAF file to GPS\n' ' -p P send preset GPS command P\n' ' -R R save raw data from GPS in file R\n' ' -r open file/device read only\n' ' default: %s\n' ' -S S configure GPS speed to S\n' ' -s S set port speed to S\n' ' default: %d bps\n' ' -V print version\n' ' -v V Set verbosity level to V, 0 to 4\n' ' default: %d\n' ' -W force entire wait time, no exit early\n' ' -w W wait time, exit early on -p result\n' ' default: %s seconds\n' ' [server[:port[:device]]] Connect to gpsd\n' ' default port: 2947\n' ' default device: None\n' '\n' 'D and E can be one of:' % (PROG_NAME, opts['input_file_name'], opts['raw_file'], opts['input_speed'], opts['verbosity'], opts['input_wait']) ) # print list of enable/disable commands for item in sorted(gps_model.able_commands.keys()): print(" %-12s %s" % (item, gps_model.able_commands[item]["help"])) print('\nthe preset GPS command P can be one of:') # print list of possible canned commands for item in sorted(gps_model.commands.keys()): print(" %-12s %s" % (item, gps_model.commands[item]["help"])) print('\nOptions can be placed in the ZERKOPTS environment variable.\n' 'ZERKOPTS is processed before the CLI options.') sys.exit(0) # create the GREIS instance gps_model = greis() if 'ZERKOPTS' in os.environ: # grab the ZERKOPTS environment variable for options opts['progopts'] = os.environ['ZERKOPTS'] options = opts['progopts'].split(' ') + sys.argv[1:] else: options = sys.argv[1:] try: (options, arguments) = getopt.getopt(options, "?c:d:e:f:hrp:s:w:v:O:R:S:WV") except getopt.GetoptError as err: sys.stderr.write("%s: %s\n" "Try '%s -h' for more information.\n" % (PROG_NAME, str(err), PROG_NAME)) sys.exit(2) for (opt, val) in options: if opt == '-c': # command opts['command'] = val elif opt == '-d': # disable opts['disable'] = val elif opt == '-e': # enable opts['enable'] = val elif opt == '-f': # file input opts['input_file_name'] = val elif opt == '-h' or opt == '-?': # help usage() elif opt == '-p': # preprogrammed command opts['poll'] = val elif opt == '-r': # read only opts['read_only'] = True elif opt == '-s': # serial port speed opts['input_speed'] = int(val) if opts['input_speed'] not in gps_model.speeds: sys.stderr.write('%s: -s invalid speed %s\n' % (PROG_NAME, opts['input_speed'])) sys.exit(1) elif opt == '-w': # max wait time, seconds opts['input_wait'] = float(val) elif opt in '-v': # verbosity level opts['verbosity'] = int(val) elif opt in '-O': # OAF .jpo file opts['oaf_name'] = val elif opt in '-R': # raw log file opts['raw_file'] = val elif opt in '-S': # set GPS serial port speed opts['set_speed'] = int(val) if opts['set_speed'] not in gps_model.speeds: sys.stderr.write('%s: -S invalid speed %s\n' % (PROG_NAME, opts['set_speed'])) sys.exit(1) elif opt == '-W': # forced wait, no early exit on command completion opts['input_forced_wait'] = True elif opt == '-V': # version sys.stderr.write('zerk: Version %s\n' % gps_version) sys.exit(0) if opts['input_file_name'] is None: # no input file given # default to local gpsd opts['target']['server'] = "localhost" opts['target']['port'] = gps.GPSD_PORT opts['target']['device'] = None if arguments: # server[:port[:device]] arg_parts = arguments[0].split(':') opts['target']['server'] = arg_parts[0] if 1 < len(arg_parts): opts['target']['port'] = arg_parts[1] if 2 < len(arg_parts): opts['target']['device'] = arg_parts[2] elif arguments: sys.stderr.write('%s: Both input file and server specified\n' % PROG_NAME) sys.exit(1) if VERB_PROG <= opts['verbosity']: # dump all options print('Options:') for option in sorted(opts): print(" %s: %s" % (option, opts[option])) # done parsing arguments from environment and CLI try: # raw log file requested? raw = None if opts['raw_file']: try: raw = open(opts['raw_file'], 'w') except IOError: sys.stderr.write('%s: failed to open raw file %s\n' % (PROG_NAME, opts['raw_file'])) sys.exit(1) # create the I/O instance io_handle = gps_io() # keep it simple, only one of -O, -c -d -e or -S if opts['oaf_name'] is not None: # parse an OAF file try: oaf_root = xml.etree.ElementTree.parse(opts['oaf_name']).getroot() oaf = dict() for tag in ('id', 'oaf', 'hash'): oaf[tag] = oaf_root.find(tag).text oaf['oaf'] = oaf['oaf'].split('\n') if VERB_PROG <= opts['verbosity']: print(oaf) except xml.etree.ElementTree.ParseError: sys.stderr.write('%s: failed to parse OAF "%s"\n' % (PROG_NAME, opts['oaf_name'])) sys.exit(1) except IOError: sys.stderr.write('%s: failed to read OAF "%s"\n' % (PROG_NAME, opts['oaf_name'])) sys.exit(1) # calculate hash oaf_s = '\n'.join(oaf['oaf']) hash_s = hashlib.sha1(oaf_s).hexdigest() if hash_s != oaf['hash']: sys.stderr.write('%s: OAF bad hash "%s", s/b %s\n' % (PROG_NAME, hash_s, oaf['hash'])) sys.exit(1) # TODO: probably should check the ID first... # TODO: prolly should send one command per handshake # blasting all commands at once, seems to not work reliably for command in oaf['oaf']: time.sleep(0.1) # wait 0.1 seconds each gps_model.gps_send(command) # this will detect when it is all done gps_model.gps_send(b'%DONE%') gps_model.expect_statement_identifier = 'DONE' elif opts['command'] is not None: # zero length is OK to send if 1 < len(opts['command']) and '%' != opts['command'][0]: # add ID, if missing gps_model.expect_statement_identifier = 'CMD' opts['command'] = "%CMD%" + opts['command'] # add trailing new line opts['command'] += "\n" if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: command %s\n' % (PROG_NAME, opts['command'])) gps_model.gps_send(opts['command']) elif opts['disable'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: disable %s\n' % (PROG_NAME, opts['disable'])) if opts['disable'] in gps_model.able_commands: command = gps_model.able_commands[opts['disable']] command["command"](gps_model, 0) else: sys.stderr.write('%s: disable %s not found\n' % (PROG_NAME, opts['disable'])) sys.exit(1) elif opts['enable'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: enable %s\n' % (PROG_NAME, opts['enable'])) if opts['enable'] in gps_model.able_commands: command = gps_model.able_commands[opts['enable']] command["command"](gps_model, 1) else: sys.stderr.write('%s: enable %s not found\n' % (PROG_NAME, opts['enable'])) sys.exit(1) elif opts['poll'] is not None: if VERB_QUIET < opts['verbosity']: sys.stderr.write('%s: poll %s\n' % (PROG_NAME, opts['poll'])) if opts['poll'] in gps_model.commands: command = gps_model.commands[opts['poll']] command["command"](gps_model) else: sys.stderr.write('%s: poll %s not found\n' % (PROG_NAME, opts['poll'])) sys.exit(1) elif opts['set_speed'] is not None: gps_model.send_set_speed(opts['set_speed']) exit_code = io_handle.read(opts) if ((VERB_RAW <= opts['verbosity']) and io_handle.out): # dump raw left overs print("Left over data:") print(io_handle.out) sys.stdout.flush() io_handle.ser.close() except KeyboardInterrupt: print('') exit_code = 1 sys.exit(exit_code)