summaryrefslogtreecommitdiff
path: root/gpsfake.py
diff options
context:
space:
mode:
authorEric S. Raymond <esr@thyrsus.com>2005-06-24 14:27:33 +0000
committerEric S. Raymond <esr@thyrsus.com>2005-06-24 14:27:33 +0000
commitb215f07c324f9d1e36fa9366c386dee65c27f371 (patch)
tree603efe12f0bd2f301cd675935404f95a3ff810ea /gpsfake.py
parente5e56cdc77c283b86ddba0b59e78d3bcb9c74e9c (diff)
downloadgpsd-b215f07c324f9d1e36fa9366c386dee65c27f371.tar.gz
Start of TestSession class -- not working yet.
Diffstat (limited to 'gpsfake.py')
-rw-r--r--gpsfake.py45
1 files changed, 36 insertions, 9 deletions
diff --git a/gpsfake.py b/gpsfake.py
index 5ba1b17f..749e95c0 100644
--- a/gpsfake.py
+++ b/gpsfake.py
@@ -2,7 +2,8 @@
gpsfake.py -- classes for creating a controlled test environment around gpsd
"""
-import sys, os, time, signal, pty, termios, socket, string, exceptions
+import sys, os, time, signal, pty, termios
+import string, exceptions, threading, socket
import gps
class PacketError(exceptions.Exception):
@@ -73,7 +74,9 @@ class TestLoad:
class FakeGPS:
"A fake GPS is a pty with a test log ready to be cycled to it."
- def __init__(self, logfp, rate):
+ def __init__(self, logfp, rate=4800):
+ self.stopme = False
+ self.thread = None
baudrates = {
0: termios.B0,
50: termios.B50,
@@ -121,15 +124,21 @@ class FakeGPS:
session.query("w+r+")
session.set_thread_hook(lambda x: self.responses.append(x))
return True
- def feed(self, daemon, go_predicate):
+ def __start(self, go_predicate):
"Feed the contents of the GPS log to the daemon."
i = 0;
- while go_predicate(i, self):
+ while not self.stopme and go_predicate(i, self):
os.write(self.master_fd, self.testload.sentences[i % len(self.testload.sentences)])
- if not daemon.is_alive():
- return False
i += 1
- return True
+ def start(self, go_predicate=lambda i,s: True, thread=False):
+ if thread:
+ self.thread = threading.Thread(self.__start(go_predicate))
+ self.thread.run()
+ else:
+ self.__start(go_predicate)
+ def stop(self):
+ "Stop this fake GPS."
+ self.stopme = True
class DaemonInstance:
"Control a gpsd instance."
@@ -141,9 +150,9 @@ class DaemonInstance:
else:
self.control_socket = "/tmp/gpsfake-%d.sock" % os.getpid()
self.pidfile = "/tmp/gpsfake_pid-%s" % os.getpid()
- def spawn(self, doptions, background=False, prefix=""):
+ def spawn(self, options, background=False, prefix=""):
"Spawn a daemon instance."
- self.spawncmd = "gpsd -N -F %s -P %s %s" % (self.control_socket, self.pidfile, doptions)
+ self.spawncmd = "gpsd -N -F %s -P %s %s" % (self.control_socket, self.pidfile, options)
self.spawncmd = prefix + self.spawncmd.strip()
if background:
self.spawncmd += " &"
@@ -196,4 +205,22 @@ class DaemonInstance:
os.kill(self.pid, signal.SIGTERM)
self.pid = None
+class TestSession:
+ "Manage a session including a daemon with fake GPS and client threads."
+ def __init__(self, prefix, options):
+ "Initialize the test session by launching the daemon."
+ self.daemon = DaemonInstance()
+ self.devices = []
+ self.clients = []
+ self.daemon.spawn(background=True, prefix=prefix, options=options)
+ self.daemon.wait_pid()
+ def add(self, logfile):
+ "Add a simulated GPS being fed by the specified logfile."
+ if not logfile.endswith(".log"):
+ logfile += ".log"
+ newgps = FakeGPS(logfile)
+ self.clients.append(newgps)
+ self.daemon.add_device(newgps.slave)
+ newgps.thread_id = thread
+
# End