summaryrefslogtreecommitdiff
path: root/gpsfake.py
diff options
context:
space:
mode:
authorEric S. Raymond <esr@thyrsus.com>2009-01-12 21:23:39 +0000
committerEric S. Raymond <esr@thyrsus.com>2009-01-12 21:23:39 +0000
commit582c3bddadacba8ee0535bd5b3431748e0444d15 (patch)
tree434b11364c4e4c06c89233a1c55656696f8511b8 /gpsfake.py
parentf1c56f14c18357ca5b19831b19cf0348b142cdee (diff)
downloadgpsd-582c3bddadacba8ee0535bd5b3431748e0444d15.tar.gz
Make gpsfake use the C packet-getter...
...so it will no longer need to have its own duplicate knowledge of packet types wired in.
Diffstat (limited to 'gpsfake.py')
-rw-r--r--gpsfake.py173
1 files changed, 28 insertions, 145 deletions
diff --git a/gpsfake.py b/gpsfake.py
index 2ebd3fcf..baf6d84f 100644
--- a/gpsfake.py
+++ b/gpsfake.py
@@ -69,7 +69,7 @@ the run method in a subthread, with locking of critical regions.
"""
import sys, os, time, signal, pty, termios # fcntl, array, struct
import exceptions, threading, socket
-import gps
+import gps, gpspacket
# Define a per-character delay on writes so we won't spam the buffers
# in the pty layer or gpsd itself. The magic number here has to be
@@ -87,25 +87,27 @@ class TestLoadError(exceptions.Exception):
class TestLoad:
"Digest a logfile into a list of sentences we can cycle through."
def __init__(self, logfp, predump=False):
- self.sentences = [] # This and .packtype are the interesting bits
+ self.sentences = [] # This is the interesting part
self.logfp = logfp
self.predump = predump
self.logfile = logfp.name
self.type = None
self.serial = None
- # Skip the comment header
+ # Grab the packets
+ getter = gpspacket.new()
+ #gpspacket.register_report(reporter)
+ type_latch = None
while True:
- first = logfp.read(1)
- self.first = first;
- if first == "#":
- line = logfp.readline()
- if line.strip().startswith("Type:"):
- if line.find("RTCM") > -1:
- self.type = "RTCM"
- if "Serial:" in line:
- line = line[1:].strip()
+ (ptype, packet) = getter.get(logfp.fileno())
+ if ptype == gpspacket.BAD_PACKET:
+ break
+ elif ptype == gpspacket.EMPTY_PACKET:
+ break
+ elif ptype == gpspacket.COMMENT_PACKET:
+ if "Serial:" in packet:
+ packet = packet[1:].strip()
try:
- (xx, baud, params) = line.split()
+ (xx, baud, params) = packet.split()
baud = int(baud)
if params[0] in ('7', '8'):
databits = int(params[0])
@@ -125,137 +127,17 @@ class TestLoad:
self.serial = (baud, databits, parity, stopbits)
else:
- break
- # Grab the packets
- while True:
- packet = self.packet_get()
- if self.predump:
- print `packet`
- if not packet or packet == "\n":
- break
- elif packet[0] != "#":
+ if type_latch is None:
+ type_latch = ptype
+ if self.predump:
+ print `packet`
self.sentences.append(packet)
# Look at the first packet to grok the GPS type
- if self.sentences[0][0] == '$':
- self.packtype = "NMEA"
+ self.textual = (type_latch == gpspacket.NMEA_PACKET)
+ if self.textual:
self.legend = "gpsfake: line %d: "
- self.idoffset = None
- self.textual = True
- elif self.sentences[0][0] == '\xff':
- self.packtype = "Zodiac binary"
- self.legend = "gpsfake: packet %d: "
- self.idoffset = None
- self.textual = True
- elif self.sentences[0][0] == '\xa0':
- self.packtype = "SiRF binary"
- self.legend = "gpsfake: packet %d: "
- self.idoffset = 3
- self.textual = False
- elif self.sentences[0][0] == '\x02':
- self.packtype = "Navcom binary"
- self.legend = "gpsfake: packet %d"
- self.textual = False
- elif self.sentences[0][0] == '\x10':
- self.packtype = "TSIP binary"
- self.legend = "gpsfake: packet %d: "
- self.idoffset = 1
- self.textual = False
- elif self.sentences[0][0] == '\xb5':
- self.packtype = "uBlox"
- self.legend = "gpsfake: packet %d: "
- self.idoffset = None
- self.textual = False
- elif self.sentences[0][0] == '\x3c':
- self.packtype = "iTrax"
- self.legend = "gpsfake: packet %d: "
- self.idoffset = None
- self.textual = False
- elif self.type == "RTCM":
- self.packtype = "RTCM"
- self.legend = None
- self.idoffset = None
- self.textual = False
- else:
- sys.stderr.write("gpsfake: unknown log type (not NMEA or SiRF) can't handle it!\n")
- self.sentences = None
- def packet_get(self):
- "Grab a packet. Unlike the daemon's state machine, this assumes no noise."
- if self.first == '':
- first = self.logfp.read(1)
- else:
- first=self.first
- self.first=''
- if not first:
- return None
- elif self.type == "RTCM":
- return first
- elif first == '#': # Comment
- return "#" + self.logfp.readline()
- elif first == '$': # NMEA packet
- return "$" + self.logfp.readline()
- second = self.logfp.read(1)
- if first == '\xa0' and second == '\xa2': # SiRF packet
- third = self.logfp.read(1)
- fourth = self.logfp.read(1)
- length = (ord(third) << 8) | ord(fourth)
- return "\xa0\xa2" + third + fourth + self.logfp.read(length+4)
- elif first == '\xff' and second == '\x81': # Zodiac
- third = self.logfp.read(1)
- fourth = self.logfp.read(1)
- fifth = self.logfp.read(1)
- sixth = self.logfp.read(1)
- #id = ord(third) | (ord(fourth) << 8)
- ndata = ord(fifth) | (ord(sixth) << 8)
- return "\xff\x81" + third + fourth + fifth + sixth + self.logfp.read(2*ndata+6)
- elif first == '\x02' and second == '\x99': # Navcom
- third = self.logfp.read(1)
- fourth = self.logfp.read(1)
- fifth = self.logfp.read(1)
- sixth = self.logfp.read(1)
- #id = ord(fourth)
- ndata = ord(fifth) | (ord(sixth) << 8)
- return "\x02\x99\x66" + fourth + fifth + sixth + self.logfp.read(ndata-2)
- elif first == '\x10': # TSIP
- packet = first + second
- delcnt = 0
- while True:
- next = self.logfp.read(1)
- if not next:
- return ''
- packet += next
- if next == '\x10':
- delcnt += 1
- elif next == '\x03':
- if delcnt % 2:
- break
- else:
- delcnt = 0
- return packet
- elif first == '\xb5' and second == '\x62': # ubx
- third = self.logfp.read(1)
- fourth = self.logfp.read(1)
- fifth = self.logfp.read(1)
- sixth = self.logfp.read(1)
- # classid = third
- # messageid = fourth
- ndata = ord(fifth) | (ord(sixth) << 8)
- return "\xb5\x62" + third + fourth + fifth + sixth + self.logfp.read(ndata+2)
- elif first == '\x3c' and second == '\x21': # italk
- third = self.logfp.read(1)
- fourth = self.logfp.read(1)
- fifth = self.logfp.read(1)
- sixth = self.logfp.read(1)
- seventh = self.logfp.read(1)
- # srcnode = third
- # dstnode = fourth
- # messageid = fifth
- # transaction = sixth
- ndata = (ord(seventh)+1)*2 + 1
- return "\x3c\x21" + third + fourth + fifth + sixth + seventh + self.logfp.read(ndata)
- elif first == "\n": # Use this to ignore trailing EOF on logs
- return "\n"
else:
- raise PacketError("unknown packet type, leader %s (0x%x)" % (`first`, ord(first)))
+ self.legend = "gpsfake: packet %d"
class PacketError(exceptions.Exception):
def __init__(self, msg):
@@ -265,8 +147,8 @@ class FakeGPS:
"A fake GPS is a pty with a test log ready to be cycled to it."
def __init__(self, logfp,
speed=4800, databits=8, parity='N', stopbits=1,
- predump=False, reporter=None):
- self.reporter = reporter
+ predump=False, progress=None):
+ self.progress = progress
self.go_predicate = lambda: True
self.readers = 0
self.index = 0
@@ -298,6 +180,7 @@ class FakeGPS:
logfp = open(logfp, "r");
self.name = logfp.name
self.testload = TestLoad(logfp, predump)
+ self.progress("gpsfake: %s provides %d sentences\n" % (self.name, len(self.testload.sentences)))
# FIXME: explicit arguments should probably override this
#if self.testload.serial:
# (speed, databits, parity, stopbits) = self.testload.serial
@@ -342,8 +225,8 @@ class FakeGPS:
"Feed a line from the contents of the GPS log to the daemon."
line = self.testload.sentences[self.index % len(self.testload.sentences)]
os.write(self.master_fd, line)
- if self.reporter:
- self.reporter("%s feeds %d=%s\n" % (self.name, len(line), `line`))
+ if self.progress:
+ self.progress("gpsfake: %s feeds %d=%s\n" % (self.name, len(line), `line`))
time.sleep((WRITE_PAD * len(line)) / self.speed)
self.index += 1
@@ -484,7 +367,7 @@ class TestSession:
self.progress("gpsfake: gps_add(%s, %d)\n" % (logfile, speed))
if logfile not in self.fakegpslist:
newgps = FakeGPS(logfile, speed=speed, predump=self.predump,
- reporter=self.reporter)
+ progress=self.progress)
if pred:
newgps.go_predicate = pred
elif self.default_predicate: