#!/usr/bin/env python # # gpsprobe -- collect send-cycle and performance wstatistics on a GPS from math import * import time, os import gps, gpsd class Baton: "Ship progress indication to stdout or stserr, whichever isn't redirected." def __init__(self, prompt, endmsg=None): if os.isatty(1): self.stream = sys.stdout elif os.isatty(2): self.stream = sys.stderr else: self.stream = None if self.stream: self.stream.write(prompt + "... \010") self.stream.flush() self.count = 0 self.endmsg = endmsg self.time = time.time() return def twirl(self, ch=None): if self.stream is None: return if ch: self.stream.write(ch) else: self.stream.write("-/|\\"[self.count % 4]) self.stream.write("\010") self.count = self.count + 1 self.stream.flush() return def end(self, msg=None): if msg == None: msg = self.endmsg if self.stream: self.stream.write("...(%2.2f sec) %s.\n" % (time.time() - self.time, msg)) return triggers = { "PRWIZCH": "This GPS has a Rockweell Zodiac chipset.\n" } if __name__ == '__main__': import sys, getopt fixes = [] outfile = "gpsprobe.gnuplot" # Process options (options, arguments) = getopt.getopt(sys.argv[1:], "n:o:p:") await = 100; device="/dev/gps" for (switch, val) in options: if (switch == '-n'): await = int(val) elif (switch == '-o'): outfile = val elif (switch == '-p'): device = val intervals = {} last_seen = {} notifications = [] last_command = None def roundoff(n): # Round a time to hundredths of a second return round(n*100) / 100.0 def count(sentence): global intervals, last_seen, last_command baton.twirl() # Toss out everything that doesn't look like well-formed NMEA leader = sentence.split(",")[0] if leader and leader[0] == '$': leader = leader[1:] else: return # Throw out everything but the leader in each GPGSV group if leader == "GPGSV" and last_command == "GPGSV": return last_command = leader # Record timings now = time.time() if not leader in intervals: intervals[leader] = [] if leader in last_seen: intervals[leader].append(roundoff(now - last_seen[leader])) last_seen[leader] = now # Watch for trigger strings for string in triggers.keys(): if sentence.find(string) > -1: notifications.append(triggers[string]) try: # Step one: Gather data dev = gpsd.gpsd(device=device) dev.set_raw_hook(count) try: dev.activate() except IOError: print "GPS is not readable." sys.exit(0) print "Data-gathering will probably take about %d seconds."%(await/3,) baton = Baton("Gathering NMEA data", "done") countdown = await while countdown > 0: status = dev.poll() if status > 0: countdown -= 1 if dev.status > gps.STATUS_NO_FIX and dev.mode > gps.MODE_NO_FIX: fixes.append((dev.latitude, dev.longitude)) baton.end() del last_seen # Step two: get command frequencies and the basic send cycle time frequencies = {} for (key, interval_list) in intervals.items(): frequencies[key] = {} for interval in interval_list: frequencies[key][interval] = frequencies[key].get(interval, 0) + 1 # filter out noise for key in frequencies: distribution = frequencies[key] for interval in distribution.keys(): if distribution[interval] < 2: del distribution[interval] cycles = {} for key in frequencies: distribution = frequencies[key] if len(frequencies[key].values()) == 1: # The value is uniqe after filtering cycles[key] = distribution.keys()[0] else: # Compute the mode maxfreq = 0 for (interval, frequency) in distribution.items(): if distribution[interval] > maxfreq: cycles[key] = interval maxfreq = distribution[interval] for key in cycles: if len(frequencies[key].values()) == 1: if cycles[key] == 1: print "%s: is emitted once a second." % key else: print "%s: is emitted once every %d seconds." % (key, cycles[key]) else: if cycles[key] == 1: print "%s: is probably emitted once a second." % key else: print "%s: is probably emitted once every %d seconds." % (key, cycles[key]) sendcycle = min(*cycles.values()) if sendcycle == 1: print "Send cycle is once per second." else: print "Send cycle is once per %d seconds." % sendcycle # Step three: run an empirical check on uncertainty of position. if len(fixes) == 0: print "No fixes collected, can't estimate accuracy." else: centroid = (sum(map(lambda x:x[0], fixes))/len(fixes), sum(map(lambda x:x[1], fixes))/len(fixes)) print "Centroid of %d fixes is (%f, %f)" % (len(fixes), centroid[0], centroid[1]) # Compute probable error on the assumption that the centroid # is the best approximation we'll get. maxdev = 0 for (lat, lon) in fixes: d = sqrt((lat - centroid[0])**2 + (lon - centroid[1])**2) if d > maxdev: maxdev = d fartherest = (lat, lon) #print "Fix most distant from centroid is (%f, %f)" % (fartherest[0], fartherest[1]) #print "Separation is %f degrees longitude, %f degrees latitude" % \ # (abs(fartherest[0] - centroid[0]), abs(fartherest[1] - centroid[1])) #print "Error box is %f degrees longitude, %f degrees latitude" % \ # (2*abs(fartherest[0] - centroid[0]), 2*abs(fartherest[1] - centroid[1])) print "Probable error is %f meters." % gps.EarthDistance(centroid, fartherest) sys.stdout.write("Writing fix scattergram to %s..." % outfile) fp = open(outfile,"w") fp.write("# This is a gnuplot script generated by gpsprobe.\n") fp.write("set autoscale\n") fp.write('set format x "%2.6f"\n') fp.write('set format y "%2.6f"\n') fp.write('set key below\n') fp.write('plot "-" using 1:2 notitle\n') fp.write("#\n") fp.write("# Lat Lon\n") for (lat, lon) in fixes: fp.write(" %f %f 1\n" % (lat, lon)) fp.write("end\n") fp.write("pause -1\n") fp.close() sys.stdout.write("done.\n") except KeyboardInterrupt: print "Aborted."