summaryrefslogtreecommitdiff
path: root/gps
diff options
context:
space:
mode:
authorEric S. Raymond <esr@thyrsus.com>2010-04-25 20:34:50 -0400
committerEric S. Raymond <esr@thyrsus.com>2010-04-25 20:34:50 -0400
commitff545a93f5d68daa35925e966de7ae57a807ba5b (patch)
tree7b4fb2119420f5bd0234ea6a3f16fd79787683a9 /gps
parent7f78685201d1322a88be795406d9e3ab8d561e95 (diff)
downloadgpsd-ff545a93f5d68daa35925e966de7ae57a807ba5b.tar.gz
The gpsfake.py module can broadcacst on UDP rather than using a pty.
This is a step towards making UDP data sources work.
Diffstat (limited to 'gps')
-rw-r--r--gps/fake.py111
1 files changed, 74 insertions, 37 deletions
diff --git a/gps/fake.py b/gps/fake.py
index 39d2079d..2d709b72 100644
--- a/gps/fake.py
+++ b/gps/fake.py
@@ -86,10 +86,14 @@ class TestLoad:
"Digest a logfile into a list of sentences we can cycle through."
def __init__(self, logfp, predump=False):
self.sentences = [] # This is the interesting part
+ if type(logfp) == type(""):
+ logfp = open(logfp, "r");
+ self.name = logfp.name
self.logfp = logfp
self.predump = predump
self.logfile = logfp.name
self.type = None
+ self.sourcetype = "pty"
self.serial = None
# Grab the packets
getter = sniffer.new()
@@ -123,9 +127,10 @@ class TestLoad:
raise ValueError
except (ValueError, IndexError):
raise TestLoadError("bad serial-parameter spec in %s"%\
- logfp.name)
-
+ logfp.name)
self.serial = (baud, databits, parity, stopbits)
+ elif "UDP" in packet:
+ self.sourcetype = "UDP"
elif "%" in packet:
# Pass through for later interpretation
self.sentences.append(packet)
@@ -147,16 +152,35 @@ class PacketError(exceptions.Exception):
self.msg = msg
class FakeGPS:
- "A fake GPS is a pty with a test log ready to be cycled to it."
- def __init__(self, logfp,
- speed=4800, databits=8, parity='N', stopbits=1,
- predump=False, progress=None):
+ def __init__(self, testload, progress=None):
+ self.testload = testload
self.progress = progress
self.go_predicate = lambda: True
self.readers = 0
self.index = 0
+ self.progress("gpsfake: %s provides %d sentences\n" % (self.testload.name, len(self.testload.sentences)))
+
+ def feed(self):
+ "Feed a line from the contents of the GPS log to the daemon."
+ line = self.testload.sentences[self.index % len(self.testload.sentences)]
+ if "%Delay:" in line:
+ # Delay specified number of seconds
+ delay = line.split()[1]
+ time.sleep(int(delay))
+ # self.write has to be set by the derived class
+ self.write(line)
+ if self.progress:
+ self.progress("gpsfake: %s feeds %d=%s\n" % (self.testload.name, len(line), `line`))
+ time.sleep(WRITE_PAD)
+ self.index += 1
+
+class FakePTY(FakeGPS):
+ "A FakePTY is a pty with a test log ready to be cycled to it."
+ def __init__(self, testload,
+ speed=4800, databits=8, parity='N', stopbits=1,
+ progress=None):
+ FakeGPS.__init__(self, testload, progress)
self.speed = speed
- self.name = None
baudrates = {
0: termios.B0,
50: termios.B50,
@@ -179,16 +203,11 @@ class FakeGPS:
230400: termios.B230400,
}
speed = baudrates[speed] # Throw an error if the speed isn't legal
- if type(logfp) == type(""):
- logfp = open(logfp, "r");
- self.name = logfp.name
- self.testload = TestLoad(logfp, predump)
- self.progress("gpsfake: %s provides %d sentences\n" % (self.name, len(self.testload.sentences)))
# FIXME: explicit arguments should probably override this
#if self.testload.serial:
# (speed, databits, parity, stopbits) = self.testload.serial
- (self.master_fd, self.slave_fd) = pty.openpty()
- self.slave = os.ttyname(self.slave_fd)
+ (self.fd, self.slave_fd) = pty.openpty()
+ self.byname = os.ttyname(self.slave_fd)
(iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(self.slave_fd)
cc[termios.VMIN] = 1
cflag &= ~(termios.PARENB | termios.PARODD | termios.CRTSCTS)
@@ -214,7 +233,7 @@ class FakeGPS:
def read(self):
"Discard control strings written by gpsd."
# A tcflush implementation works on Linux but fails on OpenBSD 4.
- termios.tcflush(self.master_fd, termios.TCIFLUSH)
+ termios.tcflush(self.fd, termios.TCIFLUSH)
# Alas, the FIONREAD version also works on Linux and fails on OpenBSD.
#try:
# buf = array.array('i', [0])
@@ -224,22 +243,34 @@ class FakeGPS:
#except IOError:
# pass
- def feed(self):
- "Feed a line from the contents of the GPS log to the daemon."
- line = self.testload.sentences[self.index % len(self.testload.sentences)]
- if "%Delay:" in line:
- # Delay specified number of seconds
- delay = line.split()[1]
- time.sleep(int(delay))
- os.write(self.master_fd, line)
- if self.progress:
- self.progress("gpsfake: %s feeds %d=%s\n" % (self.name, len(line), `line`))
- time.sleep(WRITE_PAD)
- self.index += 1
+ def write(self, line):
+ os.write(self.fd, line)
def drain(self):
"Wait for the associated device to drain (e.g. before closing)."
- termios.tcdrain(self.master_fd)
+ termios.tcdrain(self.fd)
+
+class FakeUDP(FakeGPS):
+ "A UDP broadcaster with a test log ready to be cycled to it."
+ def __init__(self, testload,
+ ipaddr, port,
+ progress=None):
+ FakeGPS.__init__(self, testload, progress)
+ self.ipaddr = ipaddr
+ self.port = port
+ self.byname = "udp://" + ipaddr + ":" + port
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
+ def read(self):
+ "Discard control strings written by gpsd."
+ pass
+
+ def write(self, line):
+ self.sock.sendto(line, (self.ipaddr, int(self.port)))
+
+ def drain(self):
+ "Wait for the associated device to drain (e.g. before closing)."
+ self.sock.shutdown(socket.SHUT_RDWR)
class DaemonError(exceptions.Exception):
def __init__(self, msg):
@@ -384,21 +415,27 @@ class TestSession:
def set_predicate(self, pred):
"Set a default go predicate for the session."
self.default_predicate = pred
- def gps_add(self, logfile, speed=4800, pred=None):
+ def gps_add(self, logfile, speed=19200, pred=None):
"Add a simulated GPS being fed by the specified logfile."
self.progress("gpsfake: gps_add(%s, %d)\n" % (logfile, speed))
if logfile not in self.fakegpslist:
- newgps = FakeGPS(logfile, speed=speed, predump=self.predump,
- progress=self.progress)
+ testload = TestLoad(logfile, predump=self.predump)
+ if testload.sourcetype == "UDP":
+ # FIXME: As yet, no entries for these on the fakegps list
+ newgps = FakeUDP(testload, ipaddr="127.0.1.255", port="5000",
+ progress=self.progress)
+ else:
+ newgps = FakePTY(testload, speed=speed,
+ progress=self.progress)
if pred:
newgps.go_predicate = pred
elif self.default_predicate:
newgps.go_predicate = self.default_predicate
- self.fakegpslist[newgps.slave] = newgps
+ self.fakegpslist[newgps.byname] = newgps
self.append(newgps)
newgps.exhausted = 0
- self.daemon.add_device(newgps.slave)
- return newgps.slave
+ self.daemon.add_device(newgps.byname)
+ return newgps.byname
def gps_remove(self, name):
"Remove a simulated GPS from the daemon's search list."
self.progress("gpsfake: gps_remove(%s)\n" % name)
@@ -461,12 +498,12 @@ class TestSession:
# before removing it. This should give its subscribers time
# to get gpsd's response before we call cleanup()
if chosen.exhausted and (time.time() - chosen.exhausted > TestSession.CLOSE_DELAY):
- self.gps_remove(chosen.slave)
- self.progress("gpsfake: GPS %s removed\n" % chosen.slave)
+ self.gps_remove(chosen.byname)
+ self.progress("gpsfake: GPS %s removed\n" % chosen.byname)
elif not chosen.go_predicate(chosen.index, chosen):
if chosen.exhausted == 0:
chosen.exhausted = time.time()
- self.progress("gpsfake: GPS %s ran out of input\n" % chosen.slave)
+ self.progress("gpsfake: GPS %s ran out of input\n" % chosen.byname)
else:
chosen.feed()
elif isinstance(chosen, gps.gps):