#!/usr/bin/env python # # gpsprobe -- collect send-cycle and performance statistics on a GPS from math import * import time, os import gps, gpsd, curses.ascii class Baton: "Ship progress indication to stderr." def __init__(self, prompt, endmsg=None): self.stream = sys.stderr self.stream.write(prompt + "... \010") self.stream.flush() self.count = 0 self.endmsg = endmsg self.time = time.time() return def twirl(self, ch=None): if self.stream is None: return if ch: self.stream.write(ch) else: self.stream.write("-/|\\"[self.count % 4]) self.stream.write("\010") self.count = self.count + 1 self.stream.flush() return def end(self, msg=None): if msg == None: msg = self.endmsg if self.stream: self.stream.write("...(%2.2f sec) %s.\n" % (time.time() - self.time, msg)) return def looks_like_NMEA(data): if data.find("$GP") == -1: return False #sys.stderr.write("[%s]" % data) # It's OK to have leading garbage, but not trailing garbage. # Leading garbage may just mean the device hasn't settled yet. # Trailing garbage means that the data accidentally looked like # NMEA or that old data that really was NMEA happened to be sitting # in the TTY buffer unread, but the new data we read is not # sentences. while not (curses.ascii.isprint(data[0]) or curses.ascii.isspace(data[0])): data = data[1:] if not data: return False #sys.stderr.write("(%s)" % data) binary = filter(lambda x: not (curses.ascii.isprint(x) or curses.ascii.isspace(x)), data) #sys.stderr.write("{"+repr(data)+"}") return not binary def looks_like_SiRF(data): header = data.find("\xa0\xa2") if header == -1: return False # We need a position/velocity/time message, SiRF type 0x02 if len(data) < header+5 or data[header+4] != chr(02): return False return True triggers = { "PRWIZCH": "# This GPS has a Rockwell Zodiac chipset.\n" } SIRF = "# This GPS probably has a SiRF-II or Evermore chipset.\n" GE301 = "# GPVTG format indicates NMEA version >= 3.01.\n" if __name__ == '__main__': import sys, getopt fixes = [] # Process options (options, arguments) = getopt.getopt(sys.argv[1:], "b:hn:") await = 100; device="/dev/gps" bpslist = (4800, 9600, 19200, 38400,) for (switch, val) in options: if (switch == '-b'): bpslist = (int(val),) elif (switch == '-n'): await = int(val) elif (switch == '-h'): sys.stderr.write("usage: gpsprobe [-h] [-n samplecount] [-b bps] [device]\n") if arguments: device = arguments[0] intervals = {} last_seen = {} notifications = [] last_command = None def roundoff(n): # Round a time to hundredths of a second return round(n*100) / 100.0 def register(trait): if (trait) not in notifications: notifications.append(trait) def count(sentence): global intervals, last_seen, last_command baton.twirl() # Toss out everything that doesn't look like well-formed NMEA fields = sentence.split(",") leader = fields[0] if leader and leader[0] == '$': leader = leader[1:] else: return # Throw out everything but the leader in each GPGSV group if leader == "GPGSV" and last_command == "GPGSV": return last_command = leader # Record timings now = time.time() if not leader in intervals: intervals[leader] = [] if leader in last_seen: intervals[leader].append(roundoff(now - last_seen[leader])) last_seen[leader] = now # Watch for trigger strings for string in triggers.keys(): if sentence.find(string) > -1: register(triggers[string]) if leader == "GPVTG": if fields[2] == 'T': register(GE301) else: register("# GPVTG format indicates NMEA version < 3.01.\n") if leader == "GPRMC": if len(fields) > 12 and fields[12] in "ADEMSN": register("# GPRMC format indicates NMEA version >= 2.3.\n") else: register("# GPRMC format indicates NMEA version < 2.3.\n") try: # Step one: Check that we have read permission on the device if not os.access(device, os.R_OK|os.W_OK): sys.stderr.write(device + " nonexistent or inaccessible.\n") sys.exit(0) # Step two: Open and sync up with the device for baudrate in bpslist: dev = gpsd.gpsd(device=device, bps=baudrate) # First, acquire the device baton = Baton("Trying %s at %dbps" % (device, baudrate)) starttime = time.time() while True: baton.twirl() try: dev.activate() break except IOError, e: time.sleep(1) baton.end("opened") # Now check to see if we get recognizable NMEA from it baton = Baton("Looking for NMEA sentences") starttime = time.time() time.sleep(2) # Give the device time to settle data = dev.rawread() sys.stderr.write("%d chars seen" % len(data)) baton.twirl() if data and looks_like_NMEA(data): baton.end("found NMEA") protocol = "# This is a NMEA device.\n" \ "# Use gpsd's default -T n driver." break elif data and looks_like_SiRF(data): sys.stderr.write("found SiRF data header...switching...") dev.rawwrite(SiRF().to_NMEA(baudrate)) time.sleep(2) data = dev.rawread() sys.stderr.write("%d more chars seen" % len(data)) baton.twirl() if data and looks_like_NMEA(data): baton.end("switch to NMEA successful") protocol = "# This is a SiRF device that can be switched to NMEA." break else: baton.end("no protocol recognized") dev.close() else: sys.stderr.write("Couldn't acquire the device.\n") sys.exit(0) # Step three: Gather data starttime = time.time() dev.set_raw_hook(count) sys.stderr.write("Gathering %d sentences will probably take about %d seconds.\n"%(await, await/3,)) baton = Baton("Looking for first fix", "done") countdown = await while countdown > 0: status = dev.poll() if status > 0: if dev.status > gps.STATUS_NO_FIX and dev.mode > gps.MODE_NO_FIX: if not fixes: fixtime = (time.time()-starttime,) baton.end("got it") baton = Baton("Gathering fixes") fixes.append((dev.latitude, dev.longitude)) countdown -= 1 if '.' in dev.utc: register(SIRF) baton.end() del last_seen # Step three: get command frequencies and the basic send cycle time frequencies = {} for (key, interval_list) in intervals.items(): frequencies[key] = {} for interval in interval_list: frequencies[key][interval] = frequencies[key].get(interval, 0) + 1 # filter out noise for key in frequencies: distribution = frequencies[key] for interval in distribution.keys(): if distribution[interval] < 2: del distribution[interval] cycles = {} for key in frequencies: distribution = frequencies[key] if len(frequencies[key].values()) == 1: # The value is uniqe after filtering cycles[key] = distribution.keys()[0] else: # Compute the mode maxfreq = 0 for (interval, frequency) in distribution.items(): if distribution[interval] > maxfreq: cycles[key] = interval maxfreq = distribution[interval] print "# This is a gnuplot script generated by gpsprobe at %s\n" % time.asctime() print protocol print "# First fix in %f seconds." % fixtime for key in cycles: if len(frequencies[key].values()) == 1: if cycles[key] == 1: print "# %s: is emitted once a second." % key else: print "# %s: is emitted once every %d seconds." % (key, cycles[key]) else: if cycles[key] == 1: print "# %s: is probably emitted once a second." % key else: print "# %s: is probably emitted once every %d seconds." % (key, cycles[key]) sendcycle = min(*cycles.values()) if sendcycle == 1: print "# Send cycle is once per second." else: print "# Send cycle is once per %d seconds." % sendcycle # SiRF-II speaks 2.2, but with the 3.01 VTG format if SIRF in notifications and GE301 in notifications: notifications.remove(GE301) # Step four: print out registered traits sys.stdout.write("".join(notifications) + "\n") # Step five: run an empirical check on uncertainty of position. if len(fixes) == 0: print "# No fixes collected, can't estimate accuracy." else: centroid = (sum(map(lambda x:x[0], fixes))/len(fixes), sum(map(lambda x:x[1], fixes))/len(fixes)) # Sort fixes by distance from centroid def d(a, b): return sqrt((a[0] - b[0])**2 + (a[1] - b[1])**2) fixes.sort(lambda x, y: cmp(d(centroid, x), d(centroid, y))) # Compute CEP(50%) cep_meters = gps.EarthDistance(centroid, fixes[len(fixes)/2]) # Convert fixes to offsets from centroid in meters recentered = map(lambda fix: gps.MeterOffset(centroid, fix), fixes) if centroid[0] < 0: latstring = "%fS" % -centroid[0] elif centroid[0] == 0: latstring = "0" else: latstring = "%fN" % centroid[0] if centroid[1] < 0: lonstring = "%fW" % -centroid[1] elif centroid[1] == 0: lonstring = "0" else: lonstring = "%fE" % centroid[1] sys.stdout.write("set autoscale\n") sys.stdout.write('set key below\n') sys.stdout.write('set key title "%s"\n' % time.asctime()) sys.stdout.write('set size ratio -1\n') sys.stdout.write('set style line 3 pt 2 # Looks good on X11\n') sys.stdout.write('set xlabel "Meters east from %s"\n' % lonstring) sys.stdout.write('set ylabel "Meters north from %s"\n' % latstring) sys.stdout.write('cep=%f\n' % d((0,0), recentered[len(fixes)/2])) sys.stdout.write('set parametric\n') sys.stdout.write('set trange [0:2*pi]\n') sys.stdout.write('cx(t, r) = sin(t)*r\n') sys.stdout.write('cy(t, r) = cos(t)*r\n') sys.stdout.write('chlen = cep/20\n') sys.stdout.write("set arrow from -chlen,0 to chlen,0 nohead\n") sys.stdout.write("set arrow from 0,-chlen to 0,chlen nohead\n") sys.stdout.write('plot cx(t, cep),cy(t, cep) title "CEP (50%%) = %f meters", "-" using 1:2 with points ls 3 title "%d GPS fixes"\n' % (cep_meters, len(fixes))) sys.stdout.write("#\n") sys.stdout.write("# Lat Lon\n") for (lat, lon) in recentered: sys.stdout.write(" %f %f\n" % (lat, lon)) sys.stdout.write("end\n") except KeyboardInterrupt: print "Aborted."