#!/usr/bin/env python # # gpsprobe -- collect send-cycle and performance statistics on a GPS from math import * import time, os import gps, gpsd, curses.ascii class Baton: "Ship progress indication to stderr." def __init__(self, prompt, endmsg=None): self.stream = sys.stderr self.stream.write(prompt + "... \010") self.stream.flush() self.count = 0 self.endmsg = endmsg self.time = time.time() return def twirl(self, ch=None): if self.stream is None: return if ch: self.stream.write(ch) else: self.stream.write("-/|\\"[self.count % 4]) self.stream.write("\010") self.count = self.count + 1 self.stream.flush() return def end(self, msg=None): if msg == None: msg = self.endmsg if self.stream: self.stream.write("...(%2.2f sec) %s.\n" % (time.time() - self.time, msg)) return def looks_like_NMEA(data): if data.find("$GP") == -1: return False #sys.stderr.write("[%s]" % data) # It's OK to have leading garbage, but not trailing garbage. # Leading garbage may just mean the device hasn't settled yet. # Trailing garbage means that the data accidentally looked like # NMEA or that old data that really was NMEA happened to be sitting # in the TTY buffer unread, but the new data we read is not # sentences. while not (curses.ascii.isprint(data[0]) or curses.ascii.isspace(data[0])): data = data[1:] if not data: return False #sys.stderr.write("(%s)" % data) binary = filter(lambda x: not (curses.ascii.isprint(x) or curses.ascii.isspace(x)), data) #sys.stderr.write("{"+repr(data)+"}") return not binary def looks_like_SiRF(data): header = data.find("\xa0\xa2") if header == -1: return False # We need a position/velocity/time message, SiRF type 0x02 if len(data) < header+5 or data[header+4] != chr(02): return False return True triggers = { "PRWIZCH": "# This GPS has a Rockwell Zodiac chipset.\n" } GE301 = "# GPVTG format indicates NMEA version >= 3.01.\n" if __name__ == '__main__': import sys, getopt fixes = [] # Process options (options, arguments) = getopt.getopt(sys.argv[1:], "hn:") await = 100; device="/dev/gps" for (switch, val) in options: if (switch == '-n'): await = int(val) elif (switch == '-h'): sys.stderr.write("usage: gpsprobe [-h] [-n samplecount] [device]\n") if arguments: device = arguments[0] intervals = {} last_seen = {} notifications = [] last_command = None def roundoff(n): # Round a time to hundredths of a second return round(n*100) / 100.0 def register(trait): if (trait) not in notifications: notifications.append(trait) def count(sentence): global intervals, last_seen, last_command baton.twirl() # Toss out everything that doesn't look like well-formed NMEA fields = sentence.split(",") leader = fields[0] if leader and leader[0] == '$': leader = leader[1:] else: return # Throw out everything but the leader in each GPGSV group if leader == "GPGSV" and last_command == "GPGSV": return last_command = leader # Record timings now = time.time() if not leader in intervals: intervals[leader] = [] if leader in last_seen: intervals[leader].append(roundoff(now - last_seen[leader])) last_seen[leader] = now # Watch for trigger strings for string in triggers.keys(): if sentence.find(string) > -1: register(triggers[string]) if leader == "GPVTG": if fields[2] == 'T': register(GE301) else: register("GPVTG format indicates NMEA version < 3.01.\n") if leader == "GPRMC": if len(fields) > 12 and fields[12] in "ADEMSN": register("GPRMC format indicates NMEA version >= 2.3.\n") else: register("GPRMC format indicates NMEA version < 2.3.\n") try: # Step one: Check that we have read permission on the device if not os.access(device, os.R_OK|os.W_OK): sys.stderr.write(device + " nonexistent or inaccessible.\n") sys.exit(0) # Step two: Open and sync up with the device dev = gpsd.gpsd(device=device) if dev.activate() > -1: sys.stderr.write("Found device at %dbps.\n" % dev.bps) else: sys.stderr.write("Couldn't acquire the device.\n") sys.exit(0) # Step three: Gather data starttime = time.time() dev.set_raw_hook(count) sys.stderr.write("Gathering %d sentences will probably take about %d seconds.\n"%(await, (await*4800)/(3*dev.bps),)) countdown = await baton = Baton("Gathering sentences", "done") while countdown > 0: if dev.poll() > 0 and dev.latlon_stamp.seen(): if not fixes: fixtime = (time.time()-starttime,) fixes.append((dev.latitude, dev.longitude)) baton.twirl() countdown -= 1 baton.end() del last_seen # Step three: get command frequencies and the basic send cycle time frequencies = {} for (key, interval_list) in intervals.items(): frequencies[key] = {} for interval in interval_list: frequencies[key][interval] = frequencies[key].get(interval, 0) + 1 # filter out noise for key in frequencies: distribution = frequencies[key] for interval in distribution.keys(): if distribution[interval] < 2: del distribution[interval] cycles = {} for key in frequencies: distribution = frequencies[key] if len(frequencies[key].values()) == 1: # The value is uniqe after filtering cycles[key] = distribution.keys()[0] else: # Compute the mode maxfreq = 0 for (interval, frequency) in distribution.items(): if distribution[interval] > maxfreq: cycles[key] = interval maxfreq = distribution[interval] print "First fix in %f seconds." % fixtime for key in cycles: if len(frequencies[key].values()) == 1: if cycles[key] == 1: print "%s: is emitted once a second." % key else: print "%s: is emitted once every %d seconds." % (key, cycles[key]) else: if cycles[key] == 1: print "%s: is probably emitted once a second." % key else: print "%s: is probably emitted once every %d seconds." % (key, cycles[key]) sendcycle = min(*cycles.values()) if sendcycle == 1: print "Send cycle is once per second." else: print "Send cycle is once per %d seconds." % sendcycle # Step four: print out registered traits sys.stdout.write("".join(notifications) + "\n") except KeyboardInterrupt: print "Aborted."