#!/usr/bin/env python # # gpsprobe -- collect send-cycle and performance statistics on a GPS from math import * import time, os import gps, gpsd class Baton: "Ship progress indication to stdout or stserr, whichever isn't redirected." def __init__(self, prompt, endmsg=None): if os.isatty(1): self.stream = sys.stdout elif os.isatty(2): self.stream = sys.stderr else: self.stream = None if self.stream: self.stream.write(prompt + "... \010") self.stream.flush() self.count = 0 self.endmsg = endmsg self.time = time.time() return def twirl(self, ch=None): if self.stream is None: return if ch: self.stream.write(ch) else: self.stream.write("-/|\\"[self.count % 4]) self.stream.write("\010") self.count = self.count + 1 self.stream.flush() return def end(self, msg=None): if msg == None: msg = self.endmsg if self.stream: self.stream.write("...(%2.2f sec) %s.\n" % (time.time() - self.time, msg)) return triggers = { "PRWIZCH": "This GPS has a Rockwell Zodiac chipset.\n" } if __name__ == '__main__': import sys, getopt fixes = [] # Process options (options, arguments) = getopt.getopt(sys.argv[1:], "n:p:") await = 100; device="/dev/gps" for (switch, val) in options: if (switch == '-n'): await = int(val) elif (switch == '-p'): device = val intervals = {} last_seen = {} notifications = [] last_command = None def roundoff(n): # Round a time to hundredths of a second return round(n*100) / 100.0 def register(trait): if ("# " + trait) not in notifications: notifications.append("# " + trait) def count(sentence): global intervals, last_seen, last_command baton.twirl() # Toss out everything that doesn't look like well-formed NMEA fields = sentence.split(",") leader = fields[0] if leader and leader[0] == '$': leader = leader[1:] else: return # Throw out everything but the leader in each GPGSV group if leader == "GPGSV" and last_command == "GPGSV": return last_command = leader # Record timings now = time.time() if not leader in intervals: intervals[leader] = [] if leader in last_seen: intervals[leader].append(roundoff(now - last_seen[leader])) last_seen[leader] = now # Watch for trigger strings for string in triggers.keys(): if sentence.find(string) > -1: register(triggers[string]) if leader == "GPVTG": if fields[2] == 'T': register("GPVTG format indicates NMEA version >= 3.01.\n") else: register("GPVTG format indicates NMEA version < 3.01.\n") if leader == "GPRMC": if len(fields) > 12 and fields[12] in "ADEMSN": register("GPRMC format indicates NMEA version >= 2.3.\n") else: register("GPRMC format indicates NMEA version < 2.3.\n") try: # Step one: Gather data dev = gpsd.gpsd(device=device) baton = Baton("Waiting for %s" % device) while True: baton.twirl() try: dev.activate() break except IOError, e: time.sleep(1) baton.end("acquired") starttime = time.time() dev.set_raw_hook(count) sys.stderr.write("Gathering %d sentences will probably take about %d seconds.\n"%(await, await/3,)) baton = Baton("Looking for first fix", "done") countdown = await while countdown > 0: status = dev.poll() if status > 0: if dev.status > gps.STATUS_NO_FIX and dev.mode > gps.MODE_NO_FIX: if not fixes: fixtime = (time.time()-starttime,) baton.end("got it") baton = Baton("Gathering fixes") fixes.append((dev.latitude, dev.longitude)) countdown -= 1 if '.' in dev.utc: register("This GPS probably has a SiRF-II chipset.\n") baton.end() del last_seen # Step two: get command frequencies and the basic send cycle time frequencies = {} for (key, interval_list) in intervals.items(): frequencies[key] = {} for interval in interval_list: frequencies[key][interval] = frequencies[key].get(interval, 0) + 1 # filter out noise for key in frequencies: distribution = frequencies[key] for interval in distribution.keys(): if distribution[interval] < 2: del distribution[interval] cycles = {} for key in frequencies: distribution = frequencies[key] if len(frequencies[key].values()) == 1: # The value is uniqe after filtering cycles[key] = distribution.keys()[0] else: # Compute the mode maxfreq = 0 for (interval, frequency) in distribution.items(): if distribution[interval] > maxfreq: cycles[key] = interval maxfreq = distribution[interval] print "# This is a gnuplot script generated by gpsprobe at %s\n" % time.asctime() print "# First fix in %f seconds." % fixtime for key in cycles: if len(frequencies[key].values()) == 1: if cycles[key] == 1: print "# %s: is emitted once a second." % key else: print "# %s: is emitted once every %d seconds." % (key, cycles[key]) else: if cycles[key] == 1: print "# %s: is probably emitted once a second." % key else: print "# %s: is probably emitted once every %d seconds." % (key, cycles[key]) sendcycle = min(*cycles.values()) if sendcycle == 1: print "# Send cycle is once per second." else: print "# Send cycle is once per %d seconds." % sendcycle # Step three: print out registered traits sys.stdout.write("".join(notifications) + "\n") # Step four: run an empirical check on uncertainty of position. if len(fixes) == 0: print "# No fixes collected, can't estimate accuracy." else: centroid = (sum(map(lambda x:x[0], fixes))/len(fixes), sum(map(lambda x:x[1], fixes))/len(fixes)) # Sort fixes by distance from centroid def d(a, b): return sqrt((a[0] - b[0])**2 + (a[1] - b[1])**2) fixes.sort(lambda x, y: cmp(d(centroid, x), d(centroid, y))) # Compute CEP(50%) cep_meters = gps.EarthDistance(centroid, fixes[len(fixes)/2]) # Convert fixes to offsets from centroid in meters recentered = map(lambda fix: gps.MeterOffset(centroid, fix), fixes) if centroid[0] < 0: latstring = "%fS" % -centroid[0] elif centroid[0] == 0: latstring = "0" else: latstring = "%fN" % centroid[0] if centroid[1] < 0: lonstring = "%fW" % -centroid[1] elif centroid[1] == 0: lonstring = "0" else: lonstring = "%fE" % centroid[1] sys.stdout.write("set autoscale\n") sys.stdout.write('set key below\n') sys.stdout.write('set key title "%s"\n' % time.asctime()) sys.stdout.write('set size ratio -1\n') sys.stdout.write('set xlabel "Meters east from %s"\n' % lonstring) sys.stdout.write('set ylabel "Meters north from %s"\n' % latstring) sys.stdout.write('cep=%f\n' % d((0,0), recentered[len(fixes)/2])) sys.stdout.write('set parametric\n') sys.stdout.write('set trange [0:2*pi]\n') sys.stdout.write('cx(t, r) = sin(t)*r\n') sys.stdout.write('cy(t, r) = cos(t)*r\n') sys.stdout.write('chlen = cep/20\n') sys.stdout.write("set arrow from -chlen,0 to chlen,0 nohead\n") sys.stdout.write("set arrow from 0,-chlen to 0,chlen nohead\n") sys.stdout.write('plot "-" using 1:2 title "%d GPS fixes", cx(t, cep),cy(t, cep) title "CEP (50%%) = %f meters"\n' % (len(fixes), cep_meters)) sys.stdout.write("#\n") sys.stdout.write("# Lat Lon\n") for (lat, lon) in recentered: sys.stdout.write(" %f %f\n" % (lat, lon)) sys.stdout.write("end\n") except KeyboardInterrupt: print "Aborted."