#!/usr/bin/env python # # gpsfake -- test harness for gpsd # # Simulates a GPS, playing back a logfile import sys, os, time, signal, pty, getopt, tempfile, termios, socket, string import gps class Baton: "Ship progress indication to stderr." def __init__(self, prompt, endmsg=None): self.stream = sys.stderr self.stream.write(prompt + "... \010") self.stream.flush() self.count = 0 self.endmsg = endmsg self.time = time.time() return def twirl(self, ch=None): if self.stream is None: return if ch: self.stream.write(ch) else: self.stream.write("-/|\\"[self.count % 4]) self.stream.write("\010") self.count = self.count + 1 self.stream.flush() return def end(self, msg=None): if msg == None: msg = self.endmsg if self.stream: self.stream.write("...(%2.2f sec) %s.\n" % (time.time() - self.time, msg)) return (options, arguments) = getopt.getopt(sys.argv[1:], "c:hlno:ps:v") cycle = 1 speed = 4800 spawn = True linedump = False pipe = False verbose = False doptions = "" for (switch, val) in options: if (switch == '-c'): cycle = float(val) elif (switch == '-l'): linedump = True elif (switch == '-n'): spawn = False elif (switch == '-o'): doptions = val elif (switch == '-p'): pipe = True cycle = 0.1 elif (switch == '-s'): speed = int(val) elif (switch == '-v'): verbose = True elif (switch == '-h'): sys.stderr.write("usage: gpsfake [-h] [-l] [-n] [-o options] [-p] [-s speed] [-c cycle] logfile\n") sys.exit(0) logfile = arguments[0] try: (master_fd, slave_fd) = pty.openpty() except: sys.stderr.write("gpsfake: can't open pty.\n") sys.exit(1) slave = os.ttyname(slave_fd) pidfile = "/tmp/gpsfake_pid-%s" % os.getpid() spawncmd = "gpsd -N -P %s %s %s" % (pidfile, doptions, slave) spawncmd = spawncmd.strip() if not spawn: raw_input("gpsfake: launch '%s' and press enter..." % spawncmd) elif os.system(spawncmd + " &"): sys.stderr.write("gpsfake: '%s' failed.\n" % spawncmd) sys.exit(1) else: time.sleep(1) # Time for pidfile to get written. if not pipe or verbose: sys.stderr.write("gpsfake: '%s' launch OK.\n" % spawncmd) fp = open(pidfile) pid = int(fp.read()) fp.close() os.remove(pidfile) def packet_get(fp): "Grab a packet. Unlike the daemon's state machine, this assumes no noise." first = fp.read(1) if not first: return None elif first == '$': # NMEA packet return "$" + fp.readline() second = fp.read(1) if first == '\xa0' and second == '\xa2': # SiRF packet third = fp.read(1) fourth = fp.read(1) length = (ord(third) << 8) | ord(fourth) return "\xa0\xa2" + third + fourth + fp.read(length+4) else: raise ValueError, "unknown packet type, leader %s, (0x%x)" % (first, ord(first)) try: logfp = open(logfile, "r") sentences = [] # Skip the comment header while True: first = logfp.read(1) if first == "#": logfp.readline() else: logfp.seek(-1, 1) # Must be a real file, not stdin break # Grab the packets try: while True: packet = packet_get(logfp) if not packet: break else: sentences.append(packet) except ValueError, msg: sys.stderr.write(`msg` + "\n") raise SystemExit, 1 # Look at the first packet to grok the GPS type if sentences[0][0] == '$': packtype = "NMEA" legend = "gpsfake: line %d " textual = 1 elif sentences[0][0] == '\xa0': packtype = "SiRF-II binary" legend = "gpsfake: packet %d" textual = 0 else: print "gpsfake: unknown log type (not NMEA or SiRF) can't handle it!" sentences = None if verbose: sys.stderr.write("gpsfake: interpreting as %s packets\n" % packtype) # Feed the daemon if sentences: ttyfp = open(slave, "rw") raw = termios.tcgetattr(ttyfp.fileno()) raw[0] = 0 # iflag raw[1] = termios.ONLCR # oflag raw[2] &= ~(termios.PARENB | termios.CRTSCTS) # cflag raw[2] |= (termios.CSIZE & termios.CS8) # cflag raw[2] |= termios.CREAD | termios.CLOCAL # cflag raw[3] = 0 # lflag try: raw[4] = raw[5] = eval("termios.B" + `speed`) except AttributeError: print "gpsfake: illegal baud rate." sys.exit(1) termios.tcsetattr(ttyfp.fileno(), termios.TCSANOW, raw) if pipe: try: session = gps.gps() except socket.error: sys.stderr.write("gpsfake: gpsd unreachable.\n") raise SystemExit, 1 session.query("w+r+") session.set_thread_hook(lambda x: sys.stdout.write(x)) baton = Baton("Processing %s" % logfile, "done") try: i = 0; while True: if i % len(sentences) == 0: if pipe and i > 0: break if not pipe: sys.stderr.write("gpsfake: log cycle begins.\n") time.sleep(cycle) if linedump: if textual: ml = sentences[i % len(sentences)].strip() else: ml = "" print legend % (i % len(sentences) + 1) + ml os.write(master_fd, sentences[i % len(sentences)]) if pipe: baton.twirl() if spawn: try: st = os.kill(pid, 0) except OSError: pid = 0 print "gpsfake: gpsd is gone." break i += 1 if pipe: baton.end() except KeyboardInterrupt: pass finally: if spawn and pid: os.kill(pid, signal.SIGTERM)