summaryrefslogtreecommitdiff
path: root/gpsfake.py
diff options
context:
space:
mode:
authorEric S. Raymond <esr@thyrsus.com>2006-12-06 06:51:54 +0000
committerEric S. Raymond <esr@thyrsus.com>2006-12-06 06:51:54 +0000
commitf5ec11f4e2fd45c70a61e0bab18a0c4495278b07 (patch)
tree7f7d97f9b1626c34b924badb4a12eb4c86d9d1aa /gpsfake.py
parent5ed1579a91e54405d9ba96276ddff21f96cd5e94 (diff)
downloadgpsd-f5ec11f4e2fd45c70a61e0bab18a0c4495278b07.tar.gz
Eliminate threading from gpsfake.py.
The TestSession class now schedules events itself. As a bonus, the new code is simpler and doesn't require system-dependent primitives that have to be implemented with /proc or fuser. There actually still is a threaded mode, untested, for use with valgrind-audit. But gpsfake doesn't need it.
Diffstat (limited to 'gpsfake.py')
-rw-r--r--gpsfake.py279
1 files changed, 128 insertions, 151 deletions
diff --git a/gpsfake.py b/gpsfake.py
index 448f2dfc..1a3fc837 100644
--- a/gpsfake.py
+++ b/gpsfake.py
@@ -25,7 +25,7 @@ devices or clients attached, connected to a control socket.
TestSession has methods to attach and detch fake GPSes. The
TestSession class simulates GPS devices for you with objects composed
-from a pty and a thread that cycles sentences into the master side
+from a pty and a class instance that cycles sentences into the master side
from some specified logfile; gpsd reads the slave side. A fake GPS is
identified by the string naming its slave device.
@@ -44,74 +44,33 @@ TestSession does not currently capture the daemon's log output. It is
run with -N, so the output will go to stderr (along with, for example,
Valgrind notifications).
-Your test code should be wrapped in a try/finally block that calls the
-TestSession cleanup() method; this will ensure that any stray threads
-are properly terminated. If you do anything with the SIGINT, SIGQUIT,
-or SIGTERM signals, ensure that they call the TestSession.killall()
-method; otherwise your code will fail to clean up after itself when
-interrupted.
-
Each FakeGPS instance tries to packetize the data from the logfile it
is initialized with. It looks for packet headers associated with common
packet types such as NMEA, SiRF, TSIP, and Zodiac. Additionally, the Type
header in a logfile can be used to force the packet type, notably to RTCM
which is fed to the daemon character by character,
-There are some limitations. Due to indeterminacy in thread timings, it
-is not guaranteed that runs with identical options will present
-exactly the same sentences to the daemon at the same times from start.
+The TestSession code maintains a run queue of FakeGPS and gps.gs (client-
+session) objects. It repeatedly cycles through the run queue. For each
+client session object in the queue, it tries to read data from gpsd. For
+each fake GPS, it sends one line of stored data. When a fake-GPS's
+go predicate becomes false, the fake GPS is removed from the run queue.
+
+There are two ways to use this code. The more deterministic is
+non-threaded mode: set up your client sessions and fake GPS devices,
+then call the run() method. The run() method will terminate when
+there are no more objects in the run queue. Note, you must have
+created at least one fake client or fake GPS before calling run(),
+otherwise it will terminate immediately.
-This code requires that you either be running Linux with a /proc
-filesystem or that fstat(1) be available.
+To allow for adding and removing clients while the test is running,
+run in threaded mode by calling the start() method. This simply calls
+the run method in a subthread, with locking of critical regions.
"""
import sys, os, time, signal, pty, termios
import string, exceptions, threading, socket, commands
import gps
-### System-dependent code begins here
-#
-
-def proc_has_file_open(pid, file):
- "Does the given process have the specified file open?"
- if os.path.exists("/proc"):
- # Fast, avoids loading an external utility
- d = "/proc/%s/fd/" % pid
- try:
- for fd in os.listdir(d):
- if os.readlink(d+fd) == file:
- return True
- except OSError:
- pass
- return False
- else:
- # Slower, but portable to OpenBSD etc.
- (status, output) = \
- commands.getstatusoutput("fstat -p %d %s"%(pid,file))
- return status == 0 and output
-
-def proc_fd_set(pid):
- "Return the set of file descriptors currently opened by the process."
- if os.path.exists("/proc"):
- # Fast, avoids loading an external utility
- fds = map(int, os.listdir("/proc/%d/fd" % self.pid))
- # I wish I knew what the entries above 1000 in Linux /proc/*/fd mean...
- return filter(lambda x: x < 1000, fds)
- else:
- # Slower, but portable to OpenBSD etc.
- (status, output) = \
- commands.getstatusoutput("fstat -p %d"%pid)
- if status != 0:
- return []
- # Extract the f entries
- lst = filter(lambda s: s.startswith("f"), output.split("\n"))
- # Strip off the f
- lst = map(lambda s: s[1:], lst)
- # Filter for digits
- return map(int, filter(lambda s: s[0] in "0123456789", lst))
-
-#
-### System-dependent code ends here
-
class TestLoadError(exceptions.Exception):
def __init__(self, msg):
self.msg = msg
@@ -252,7 +211,6 @@ class FakeGPS:
self.verbose = verbose
self.go_predicate = lambda: True
self.readers = 0
- self.thread = None
self.index = 0
baudrates = {
0: termios.B0,
@@ -307,43 +265,11 @@ class FakeGPS:
ispeed = ospeed = speed
termios.tcsetattr(ttyfp.fileno(), termios.TCSANOW,
[iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
- def slave_is_open(self, pid):
- "Is the slave device of this pty opened by the specified process?"
- if self.verbose:
- sys.stderr.write("slave_is_open() begins")
- isopen = proc_has_file_open(pid, self.slave)
- if self.verbose:
- sys.stderr.write("slave_is_open() ends")
- return isopen
- def __feed(self):
- "Feed the contents of the GPS log to the daemon."
- while self.readers and \
- self.daemon.is_alive() and \
- self.go_predicate(self.index, self):
- os.write(self.master_fd, self.testload.sentences[self.index % len(self.testload.sentences)])
- self.index += 1
- # Voodoo programming -- try to avoid the race condition where the
- # daemon sometimes doesn't see the last few lines of logs.
- time.sleep(0.1)
- def start(self, daemon, thread=False):
- "Increment pseudodevice's reader count, starting it if necessary."
- self.daemon = daemon
- self.readers += 1
- if self.readers == 1:
- self.thread = threading.Thread(target=self.__feed)
- while not self.slave_is_open(daemon.pid):
- time.sleep(0.01);
- if thread:
- self.thread.start() # Run asynchronously
- else:
- self.thread.run() # Run synchronously
- def release(self):
- "Decrement pseudodevice's reader count; it will stop when count==0."
- if self.readers > 0:
- self.readers -= 1
- def stop(self):
- "Zero pseudodevice's reader count; it will stop."
- self.readers = 0
+ def feed(self):
+ "Feed a line from the contents of the GPS log to the daemon."
+ line = self.testload.sentences[self.index % len(self.testload.sentences)]
+ os.write(self.master_fd, line)
+ self.index += 1
class DaemonError(exceptions.Exception):
def __init__(self, msg):
@@ -432,38 +358,34 @@ class TestSessionError(exceptions.Exception):
self.msg = msg
class TestSession:
- "Manage a session including a daemon with fake GPS and client threads."
+ "Manage a session including a daemon with fake GPSes and clients."
+ CLOSE_DELAY = 1
def __init__(self, prefix=None, port=None, options=None, verbose=False):
"Initialize the test session by launching the daemon."
self.verbose = verbose
self.daemon = DaemonInstance()
self.fakegpslist = {}
- self.clients = []
self.client_id = 0
+ self.readers = 0
+ self.writers = 0
+ self.runqueue = []
+ self.index = 0
if port:
self.port = port
else:
self.port = gps.GPSD_PORT
- self.reporter = lambda x: None
self.progress = lambda x: None
+ self.reporter = lambda x: None
for sig in (signal.SIGQUIT, signal.SIGINT, signal.SIGTERM):
- signal.signal(sig, lambda signal, frame: self.killall())
+ signal.signal(sig, lambda signal, frame: self.cleanup())
self.daemon.spawn(background=True, prefix=prefix, port=self.port, options=options)
self.daemon.wait_pid()
self.default_predicate = None
self.fd_set = []
- self.sanity_check()
+ self.threadlock = None
def set_predicate(self, pred):
"Set a default go predicate for the session."
self.default_predicate = pred
- def sanity_check(self):
- try:
- now = proc_fd_set(self.daemon.pid)
- if now != self.fd_set:
- self.progress("File descriptors: %s\n" % now)
- self.fd_set = now
- except:
- self.progress("Sanity check not working -- port fd_set()\n")
def gps_add(self, logfile, speed=4800, pred=None):
"Add a simulated GPS being fed by the specified logfile."
self.progress("gpsfake: gps_add(%s, %d)\n" % (logfile, speed))
@@ -474,56 +396,48 @@ class TestSession:
elif self.default_predicate:
newgps.go_predicate = self.default_predicate
self.fakegpslist[newgps.slave] = newgps
+ self.append(newgps)
+ newgps.exhausted = 0
self.daemon.add_device(newgps.slave)
- self.fakegpslist[newgps.slave].start(self.daemon, thread=True)
- self.sanity_check()
return newgps.slave
def gps_remove(self, name):
- "Remove a simulated GPS from the daeon's search list."
+ "Remove a simulated GPS from the daemon's search list."
self.progress("gpsfake: gps_remove(%s)\n" % name)
- self.fakegpslist[name].stop()
+ self.remove(self.fakegpslist[name])
self.daemon.remove_device(name)
- self.sanity_check()
+ del self.fakegpslist[name]
def client_add(self, commands):
"Initiate a client session and force connection to a fake GPS."
self.progress("gpsfake: client_add()\n")
newclient = gps.gps(port=self.port)
- self.clients.append(newclient)
+ self.append(newclient)
newclient.id = self.client_id + 1
self.client_id += 1
self.progress("gpsfake: client %d has %s\n" % (self.client_id,newclient.device))
- newclient.set_thread_hook(lambda x: self.reporter(x))
if commands:
- newclient.query(commands)
- self.sanity_check()
+ self.initialize(newclient, commands),
return newclient.id
def client_query(self, id, commands):
- "Ship a command down a client channel, accept a response."
+ "Ship a command to a client channel, get a response (threaded mode only)."
self.progress("gpsfake: client_query(%d, %s)\n" % (id, `commands`))
- for client in self.clients:
- if client.id == id:
+ for object in self.runqueue:
+ if isinstance(object, gps.gps) and object.id == id:
client.query(commands)
return client.response
- self.sanity_check()
return None
def client_remove(self, id):
"Terminate a client session."
self.progress("gpsfake: client_remove(%d)\n" % id)
- for client in self.clients:
- if client.id == id:
- self.fakegpslist[client.device].release()
- self.clients.remove(client)
- del client
- self.sanity_check()
+ for object in self.runqueue:
+ if isinstance(object, gps.gps) and object.id == id:
+ self.remove(object)
return True
else:
- self.sanity_check()
return False
def wait(self, seconds):
"Wait, doing nothing."
self.progress("gpsfake: wait(%d)\n" % seconds)
time.sleep(seconds)
- self.sanity_check()
def gather(self, seconds):
"Wait, doing nothing but watching for sentences."
self.progress("gpsfake: gather(%d)\n" % seconds)
@@ -531,29 +445,92 @@ class TestSession:
time.sleep(seconds)
#if self.timings.c_recv_time <= mark:
# TestSessionError("no sentences received\n")
- self.sanity_check()
- def gps_count(self):
- "Return the number of GPSes active in this session"
- tc = 0
- for fakegps in self.fakegpslist.values():
- if fakegps.thread and fakegps.thread.isAlive():
- tc += 1
- return tc
def cleanup(self):
- "Wait for all threads to end and kill the daemon."
+ "We're done, kill the daemon."
self.progress("gpsfake: cleanup()\n")
- while self.gps_count():
- time.sleep(0.1)
- # Voodoo programming -- try to avoid the race condition where the
- # daemon sometimes doesn't see the last few lines of logs.
- time.sleep(0.1)
- self.daemon.kill()
- def killall(self):
- "Kill all fake-GPS threads and the daemon."
- self.progress("gpsfake: killall()\n")
- for fakegps in self.fakegpslist.values():
- if fakegps.thread and fakegps.thread.isAlive():
- fakegps.stop()
- self.daemon.kill()
+ if self.daemon:
+ self.daemon.kill()
+ self.daemon = None
+ def run(self):
+ "Run the tests."
+ try:
+ while self.daemon:
+ had_output = False
+ chosen = self.choose()
+ if isinstance(chosen, FakeGPS):
+ # Delay a few seconds after a GPS source is exhauseted
+ # to remove it. This should give its subscribers time
+ # to get gpsd's response before we call cleanup
+ if chosen.exhausted and (time.time() - chosen.exhausted > TestSession.CLOSE_DELAY):
+ self.remove(chosen)
+ self.progress("gpsfake: GPS %s removed\n" % chosen.slave)
+ elif not chosen.go_predicate(chosen.index, chosen):
+ if chosen.exhausted == 0:
+ chosen.exhausted = time.time()
+ self.progress("gpsfake: GPS %s ran out of input\n" % chosen.slave)
+ else:
+ chosen.feed()
+ elif isinstance(chosen, gps.gps):
+ if chosen.enqueued:
+ chosen.send(chosen.enqueued)
+ chosen.enqueued = ""
+ while chosen.waiting():
+ chosen.poll()
+ self.reporter(chosen.response)
+ had_output = True
+ else:
+ raise TestSessionError("test object of unknown type")
+ if not self.writers and not had_output:
+ break
+ finally:
+ self.cleanup()
+
+ # All knowledge about locks and threading is below this line,
+ # except for the bare fact that self.threadlock is set to None
+ # in the class init method.
+
+ def append(self, obj):
+ "Add a producer or consumer to the object list."
+ if self.threadlock:
+ self.threadlock.acquire()
+ self.runqueue.append(obj)
+ if isinstance(obj, FakeGPS):
+ self.writers += 1
+ elif isinstance(obj, gps.gps):
+ self.readers += 1
+ if self.threadlock:
+ self.threadlock.release()
+ def remove(self, obj):
+ "Remove a producer or consumer from the object list."
+ if self.threadlock:
+ self.threadlock.acquire()
+ self.runqueue.remove(obj)
+ if isinstance(obj, FakeGPS):
+ self.writers -= 1
+ elif isinstance(obj, gps.gps):
+ self.readers -= 1
+ self.index = min(len(self.runqueue)-1, self.index)
+ if self.threadlock:
+ self.threadlock.release()
+ def choose(self):
+ "Atomically get the next object scheduled to do something."
+ if self.threadlock:
+ self.threadlock.acquire()
+ chosen = self.index
+ self.index += 1
+ self.index %= len(self.runqueue)
+ if self.threadlock:
+ self.threadlock.release()
+ return self.runqueue[chosen]
+ def initialize(self, client, commands):
+ "Arrange for client to ship specified commands when it goes active."
+ client.enqueued = ""
+ if not self.threadlock:
+ client.query(commands)
+ else:
+ client.enqueued = commands
+ def start(self):
+ self.threadlock = threading.Lock()
+ threading.Thread(target=self.run)
# End