diff options
author | Eric S. Raymond <esr@thyrsus.com> | 2005-06-24 14:27:33 +0000 |
---|---|---|
committer | Eric S. Raymond <esr@thyrsus.com> | 2005-06-24 14:27:33 +0000 |
commit | b215f07c324f9d1e36fa9366c386dee65c27f371 (patch) | |
tree | 603efe12f0bd2f301cd675935404f95a3ff810ea /gpsfake.py | |
parent | e5e56cdc77c283b86ddba0b59e78d3bcb9c74e9c (diff) | |
download | gpsd-b215f07c324f9d1e36fa9366c386dee65c27f371.tar.gz |
Start of TestSession class -- not working yet.
Diffstat (limited to 'gpsfake.py')
-rw-r--r-- | gpsfake.py | 45 |
1 files changed, 36 insertions, 9 deletions
@@ -2,7 +2,8 @@ gpsfake.py -- classes for creating a controlled test environment around gpsd """ -import sys, os, time, signal, pty, termios, socket, string, exceptions +import sys, os, time, signal, pty, termios +import string, exceptions, threading, socket import gps class PacketError(exceptions.Exception): @@ -73,7 +74,9 @@ class TestLoad: class FakeGPS: "A fake GPS is a pty with a test log ready to be cycled to it." - def __init__(self, logfp, rate): + def __init__(self, logfp, rate=4800): + self.stopme = False + self.thread = None baudrates = { 0: termios.B0, 50: termios.B50, @@ -121,15 +124,21 @@ class FakeGPS: session.query("w+r+") session.set_thread_hook(lambda x: self.responses.append(x)) return True - def feed(self, daemon, go_predicate): + def __start(self, go_predicate): "Feed the contents of the GPS log to the daemon." i = 0; - while go_predicate(i, self): + while not self.stopme and go_predicate(i, self): os.write(self.master_fd, self.testload.sentences[i % len(self.testload.sentences)]) - if not daemon.is_alive(): - return False i += 1 - return True + def start(self, go_predicate=lambda i,s: True, thread=False): + if thread: + self.thread = threading.Thread(self.__start(go_predicate)) + self.thread.run() + else: + self.__start(go_predicate) + def stop(self): + "Stop this fake GPS." + self.stopme = True class DaemonInstance: "Control a gpsd instance." @@ -141,9 +150,9 @@ class DaemonInstance: else: self.control_socket = "/tmp/gpsfake-%d.sock" % os.getpid() self.pidfile = "/tmp/gpsfake_pid-%s" % os.getpid() - def spawn(self, doptions, background=False, prefix=""): + def spawn(self, options, background=False, prefix=""): "Spawn a daemon instance." - self.spawncmd = "gpsd -N -F %s -P %s %s" % (self.control_socket, self.pidfile, doptions) + self.spawncmd = "gpsd -N -F %s -P %s %s" % (self.control_socket, self.pidfile, options) self.spawncmd = prefix + self.spawncmd.strip() if background: self.spawncmd += " &" @@ -196,4 +205,22 @@ class DaemonInstance: os.kill(self.pid, signal.SIGTERM) self.pid = None +class TestSession: + "Manage a session including a daemon with fake GPS and client threads." + def __init__(self, prefix, options): + "Initialize the test session by launching the daemon." + self.daemon = DaemonInstance() + self.devices = [] + self.clients = [] + self.daemon.spawn(background=True, prefix=prefix, options=options) + self.daemon.wait_pid() + def add(self, logfile): + "Add a simulated GPS being fed by the specified logfile." + if not logfile.endswith(".log"): + logfile += ".log" + newgps = FakeGPS(logfile) + self.clients.append(newgps) + self.daemon.add_device(newgps.slave) + newgps.thread_id = thread + # End |