#!/usr/bin/env python # # This file is Copyright (c) 2010 by the GPSD project # BSD terms apply: see the file COPYING in the distribution root for details. """ cycle_analyzer - perform cycle analysis on GPS log files This tool analyzes one or more NMEA or JSON files to determine the cycle sequence of sentences. JSON files must be reports from a gpsd driver which is without the CYCLE_END_RELIABLE capability and therefore ships every sentence containing a fix; otherwise the results will be meaningless because only the end-of-cycle sentences will show in the JSON. If a filename argument ends with '.log', and the sentence type in it is not recognizable, this tool adds '.chk' to the name and tries again assuming the latter is a JSON dump. Thus, invoking it again *.log in a directory full of check files will do the right thing. One purpose of this tool is to determine the end-of-cycle sentence that a binary-protocol device emits, so the result can be patched into the driver as a CYCLE_END_RELIABLE capability. To get this, apply the tool to the JSON output from the driver using the -j switch. It will ignore everything but tag and timestamp fields, and will also ignore any NMEA in the file. Another purpose is to sanity-check the assumptions of the NMEA end-of-cycle detector. For this purpose, run without -j; if a device has a regular reporting cycle with a constant end-of-cycle sentence, this tool will confirm that. Otherwise, it will perform various checks attempting to find an end-of-cycle marker and report on what it finds. When cycle_analyzer reports a split- or variable-cycle device, some arguments to the -d switch can dump various analysis stages so you can get a better idea what is going on. These are: sequence - the entire sequence of dump tag/timestamp pairs from the log events - show how those reduce to event sequences bursts - show how sentences are grouped into bursts trim - show the burst list after the end bursts have been removed In an event sequence, a '<' is a nornal start of cycle where the timestamp increments. A '>' is where the timestamp actually *decreases* between a a sentence and the one that follows. The NMEA cycle-end detector will ignore the '>' event; it sometimes occurs when GPZDA starts a cycle, but has no effect on where the actual end of fix reporting is. If you see a message saying 'cycle-enders ... also occur in mid-cycle', the device will confuse the NMEA cycle detector, leading to more reports per cycle than the ideal. """ # This code runs compatibly under Python 2 and 3.x for x >= 2. # Preserve this property! from __future__ import absolute_import, print_function, division import sys, os, getopt, json from gps.misc import polystr, isotime verbose = 0 suppress_regular = False parse_json = False class analyze_error(BaseException): def __init__(self, filename, msg): self.filename = filename self.msg = msg def __repr__(self): return '%s: %s' % (self.filename, self.msg) class event(object): def __init__(self, tag, time=0): self.tag = tag self.time = time def __str__(self): if self.time == 0: return self.tag else: return self.tag + ":" + self.time __repr__ = __str__ def tags(lst): return [x.tag for x in lst] def extract_from_nmea(filename, lineno, line): "Extend sequence of tag/timestamp tuples from an NMEA sentence" hhmmss = { "RMC": 1, "GLL": 5, "GGA": 1, "GBS": 1, "PASHR": {"POS": 4}, } fields = line.split(",") tag = fields[0] if tag.startswith("$GP") or tag.startswith("$IN"): tag = tag[3:] elif tag[0] == "$": tag = tag[1:] field = hhmmss.get(tag) if isinstance(field, dict): field = field.get(fields[1]) if field: timestamp = fields[field] return [event(tag, timestamp)] else: return [] def extract_from_json(filename, lineno, line): "Extend sequence of tag/timestamp tuples from a JSON dump of a sentence" if not line.startswith("{"): return [] try: sentence = json.loads(line) if "time" not in sentence: return [] return [event(polystr(sentence["class"]), "%.2f" % isotime(sentence["time"]))] except ValueError as e: print(line.rstrip(), file=sys.stderr) print(repr(e), file=sys.stderr) return [] def extract_timestamped_sentences(fp, json_parse=parse_json): "Do the basic work of extracting tags and timestamps" sequence = [] lineno = 0 while True: line = polystr(fp.readline()) if not line: break lineno += 1 if line.startswith("#"): continue if line[0] not in ("$", "!", "{"): raise analyze_error(fp.name, "unknown sentence type.") if not json_parse and line.startswith("$"): sequence += extract_from_nmea(fp.name, lineno, line) elif json_parse and line.startswith("{"): sequence += extract_from_json(fp.name, lineno, line) return sequence def analyze(sequence, name): "Analyze the cycle sequence of a device from its output logs." # First, extract tags and timestamps regular = False if not sequence: return if "sequence" in stages: print("Raw tag/timestamp sequence") for e in sequence: print(e) # Then, do cycle detection events = [] out_of_order = False for i in range(len(sequence)): this = sequence[i] if this.time == "" or float(this.time) == 0: continue events.append(this) if i < len(sequence)-1: next = sequence[i+1] if float(this.time) < float(next.time): events.append(event("<")) if float(this.time) > float(next.time): events.append(event(">")) out_of_order = True if out_of_order and verbose: sys.stderr.write("%s: has some timestamps out of order.\n" % name) if "events" in stages: print("Event list:") for e in events: print(e) # Now group events into bursts bursts = [] current = [] for e in events + [event('<')]: if e.tag == '<': bursts.append(tuple(current)) current = [] else: current.append(e) if "bursts" in stages: print("Burst list:") for burst in bursts: print(burst) # We need 4 cycles because the first and last might be incomplete. if tags(events).count("<") < 4: sys.stderr.write("%s: has fewer than 4 cycles.\n" % name) return # First try at detecting a regular cycle unequal = False for i in range(len(bursts)-1): if tags(bursts[i]) != tags(bursts[i+1]): unequal = True break if not unequal: # All bursts looked the same regular = True else: # Trim off first and last bursts, which are likely incomplete. bursts = bursts[1:-1] if "trim" in stages: "After trimming:" for burst in bursts: print(burst) # Now the actual clique analysis unequal = False for i in range(len(bursts)-1): if tags(bursts[i]) != tags(bursts[i+1]): unequal = True break if not unequal: regular = True # Should know now if cycle is regular if regular: if not suppress_regular: print("%s: has a regular cycle %s." % (name, " ".join(tags(bursts[0])))) else: # If it was not the case that all cycles matched, then we need # a minimum of 6 cycles because the first and last might be # incomplete, and we need at least 4 cycles in the middle to # have two full ones on split-cycle devices like old Garmins. if tags(events).count("<") < 6: sys.stderr.write("%s: variable-cycle log has has fewer than 6 cycles.\n" % name) return if verbose > 0: print("%s: has a split or variable cycle." % name) cycle_enders = [] for burst in bursts: if burst[-1].tag not in cycle_enders: cycle_enders.append(burst[-1].tag) if len(cycle_enders) == 1: if not suppress_regular: print("%s: has a fixed end-of-cycle sentence %s." % (name, cycle_enders[0])) else: print("%s: has multiple cycle-enders %s." % (name, " ".join(cycle_enders))) # Sanity check pathological = [] for ender in cycle_enders: for burst in bursts: if ender in tags(burst) and not ender == burst[-1].tag and not ender in pathological: pathological.append(ender) if pathological: print("%s: cycle-enders %s also occur in mid-cycle!" % (name, " ".join(pathological))) if __name__ == "__main__": stages = "" try: (options, arguments) = getopt.getopt(sys.argv[1:], "d:jsv") for (switch, val) in options: if (switch == '-d'): # Debug stages = val elif (switch == '-j'): # Interpret JSON, not NMEA parse_json = True elif (switch == '-v'): # Verbose verbose += 1 elif (switch == '-s'): # Suppress logs with no problems suppress_regular = True except getopt.GetoptError as msg: print("cycle_analyzer: " + str(msg)) raise SystemExit(1) try: if arguments: for filename in arguments: fp = open(filename, 'rb') try: sequence = extract_timestamped_sentences(fp) analyze(sequence, filename) except analyze_error as e: if filename.endswith(".log") and os.path.exists(filename+".chk"): fp2 = open(filename+".chk", "rb") try: sequence = extract_timestamped_sentences(fp2, json_parse=True) analyze(sequence, filename+".chk") finally: fp2.close() else: print(repr(e), file=sys.stderr) fp.close() else: sequence = extract_timestamped_sentences(sys.stdin) analyze(sequence, "standard input") except analyze_error as e: print(repr(e), file=sys.stderr) raise SystemExit(1)