#!/usr/bin/env python # # gpsfake -- test harness for gpsd # # Simulates one or more GPSes, playing back logfiles. # Most of the logic for this now lives in gps.fake, # factored out so we can write other test programs with it. # # This file is Copyright (c) 2010 by the GPSD project # BSD terms apply: see the file COPYING in the distribution root for details. import sys, os, time, getopt, socket, random, pty, platform import gps, gps.fake as gpsfake # The "as" pacifies pychecker class Baton: "Ship progress indications to stderr." # By setting this > 1 we reduce the frequency of the twirl # and speed up test runs. Should be relatively prime to the # nunber of baton states, otherwise it will cause beat artifacts # in the twirling. SPINNER_INTERVAL = 11 def __init__(self, prompt, endmsg=None): self.stream = sys.stderr self.stream.write(prompt + "...") if os.isatty(self.stream.fileno()): self.stream.write(" \b") self.stream.flush() self.count = 0 self.endmsg = endmsg self.time = time.time() return def twirl(self, ch=None): if self.stream is None: return if os.isatty(self.stream.fileno()): if ch: self.stream.write(ch) self.stream.flush() elif self.count % Baton.SPINNER_INTERVAL == 0: self.stream.write("-/|\\"[self.count % 4]) self.stream.write("\b") self.stream.flush() self.count = self.count + 1 return def end(self, msg=None): if msg == None: msg = self.endmsg if self.stream: self.stream.write("...(%2.2f sec) %s.\n" % (time.time() - self.time, msg)) return def fakeport(): "Find a port that isn't in use to bind to." # Grab a random port from IANA's unassigned/private range. Not perfect, # but at least less than in 16000 chance of a pair collision. It would # be very hard to get this deterministically correct, since there would # always be a window between port allocation and when the daemon picked # it up. We'd need to do some kind of semaphore, etc., and even that # wouldn't prevent collisions with other apps using the range. return random.randint(49152, 65535) def hexdump(s): rep = "" for c in s: rep += "%02x" % ord(c) return rep def fakehook(linenumber, fakegps): if len(fakegps.testload.sentences) == 0: print >>sys.stderr, "fakegps: no sentences in test load." raise SystemExit, 1 if linenumber % len(fakegps.testload.sentences) == 0: if singleshot and linenumber > 0: return False if progress: baton.twirl('*\b') elif not singleshot: sys.stderr.write("gpsfake: log cycle of %s begins.\n" % fakegps.testload.name) time.sleep(cycle) if linedump and fakegps.testload.legend: ml = fakegps.testload.sentences[linenumber % len(fakegps.testload.sentences)].strip() if not fakegps.testload.textual: ml = hexdump(ml) announce = fakegps.testload.legend % (linenumber % len(fakegps.testload.sentences) + 1) + ml if promptme: raw_input(announce + "? ") else: print announce if progress: baton.twirl() return True if __name__ == '__main__': try: (options, arguments) = getopt.getopt(sys.argv[1:], "1bc:D:fghilm:no:pP:r:s:StTuvx") except getopt.GetoptError, msg: print "gpsfake: " + str(msg) raise SystemExit, 1 port = None progress = False cycle = 0 monitor = "" speed = 4800 linedump = False predump = False pipe = False singleshot = False promptme = False client_init = '?WATCH={"json":true,"nmea":true}' doptions = "" tcp = False udp = False verbose = 0 slow = False for (switch, val) in options: if (switch == '-1'): singleshot = True elif (switch == '-f'): port = fakeport() elif (switch == '-b'): progress = True elif (switch == '-c'): cycle = float(val) elif (switch == '-D'): doptions += " -D " + val elif (switch == '-g'): monitor = "xterm -e gdb -tui --args " elif (switch == '-i'): linedump = promptme = True elif (switch == '-l'): linedump = True elif (switch == '-m'): monitor = val + " " elif (switch == '-n'): doptions += " -n" elif (switch == '-x'): predump = True elif (switch == '-o'): doptions = val elif (switch == '-p'): pipe = True elif (switch == '-P'): port = int(val) elif (switch == '-r'): client_init = val elif (switch == '-s'): speed = int(val) elif (switch == '-S'): slow = True elif (switch == '-t'): tcp = True elif (switch == '-T'): sys.stdout.write("sys %s platform %s: WRITE_PAD = %.f CLOSE_DELAY = %.2f\n" % (sys.platform, platform.platform(), gpsfake.WRITE_PAD, gpsfake.CLOSE_DELAY)) raise SystemExit,0 elif (switch == '-u'): udp = True elif (switch == '-v'): verbose += 1 elif (switch == '-h'): sys.stderr.write("usage: gpsfake [-h] [-l] [-m monitor] [--D debug] [-o options] [-p] [-s speed] [-S] [-c cycle] [-b] logfile\n") raise SystemExit,0 try: pty.openpty() except Exception: print >>sys.stderr, "gpsfake: ptys not available, falling back to UDP." udp = True if not arguments: print >>sys.stderr, "gpsfake: requires at least one logfile argument." raise SystemExit, 1 if progress: baton = Baton("Processing %s" % ",".join(arguments), "done") else: print >>sys.stderr, "Processing %s" % ",".join(arguments) test = gpsfake.TestSession(prefix=monitor, port=port, options=doptions, tcp=tcp, udp=udp, verbose=verbose, predump=predump, slow=slow) if pipe: test.reporter = sys.stdout.write if verbose: progress = False test.progress = sys.stdout.write test.spawn() try: for logfile in arguments: try: test.gps_add(logfile, speed=speed, pred=fakehook) except gpsfake.TestLoadError, e: sys.stderr.write("gpsfake: " + e.msg + "\n") raise SystemExit, 1 except gpsfake.PacketError, e: sys.stderr.write("gpsfake: " + e.msg + "\n") raise SystemExit, 1 except gpsfake.DaemonError, e: sys.stderr.write("gpsfake: " + e.msg + "\n") raise SystemExit, 1 except IOError, e: if e.filename is None: sys.stderr.write("gpsfake: unknown internal I/O error %s\n" % e) else: sys.stderr.write("gpsfake: no such file as %s or file unreadable\n"%e.filename) raise SystemExit, 1 except OSError: sys.stderr.write("gpsfake: can't open pty.\n") raise SystemExit, 1 try: if pipe: test.client_add(client_init + "\n") # Give daemon time to get ready for the feeds. # Without a delay here there's a window for test # sentences to arrive before the watch takes effect. # This needs to increase if leading sentences in # test loads aren't being processed. time.sleep(1) test.run() except socket.error, msg: sys.stderr.write("gpsfake: socket error %s.\n" % msg) raise SystemExit, 1 except gps.client.json_error, e: sys.stderr.write("gpsfake: JSON error on line %s is %s.\n" % (repr(e.data), e.explanation)) raise SystemExit, 1 except KeyboardInterrupt: sys.stderr.write("gpsfake: aborted\n") raise SystemExit, 1 finally: test.cleanup() if progress: baton.end() # The following sets edit modes for GNU EMACS # Local Variables: # mode:python # End: