diff options
author | Eric S. Raymond <esr@thyrsus.com> | 2009-01-12 21:23:39 +0000 |
---|---|---|
committer | Eric S. Raymond <esr@thyrsus.com> | 2009-01-12 21:23:39 +0000 |
commit | 582c3bddadacba8ee0535bd5b3431748e0444d15 (patch) | |
tree | 434b11364c4e4c06c89233a1c55656696f8511b8 /gpsfake.py | |
parent | f1c56f14c18357ca5b19831b19cf0348b142cdee (diff) | |
download | gpsd-582c3bddadacba8ee0535bd5b3431748e0444d15.tar.gz |
Make gpsfake use the C packet-getter...
...so it will no longer need to have its own duplicate knowledge of
packet types wired in.
Diffstat (limited to 'gpsfake.py')
-rw-r--r-- | gpsfake.py | 173 |
1 files changed, 28 insertions, 145 deletions
@@ -69,7 +69,7 @@ the run method in a subthread, with locking of critical regions. """ import sys, os, time, signal, pty, termios # fcntl, array, struct import exceptions, threading, socket -import gps +import gps, gpspacket # Define a per-character delay on writes so we won't spam the buffers # in the pty layer or gpsd itself. The magic number here has to be @@ -87,25 +87,27 @@ class TestLoadError(exceptions.Exception): class TestLoad: "Digest a logfile into a list of sentences we can cycle through." def __init__(self, logfp, predump=False): - self.sentences = [] # This and .packtype are the interesting bits + self.sentences = [] # This is the interesting part self.logfp = logfp self.predump = predump self.logfile = logfp.name self.type = None self.serial = None - # Skip the comment header + # Grab the packets + getter = gpspacket.new() + #gpspacket.register_report(reporter) + type_latch = None while True: - first = logfp.read(1) - self.first = first; - if first == "#": - line = logfp.readline() - if line.strip().startswith("Type:"): - if line.find("RTCM") > -1: - self.type = "RTCM" - if "Serial:" in line: - line = line[1:].strip() + (ptype, packet) = getter.get(logfp.fileno()) + if ptype == gpspacket.BAD_PACKET: + break + elif ptype == gpspacket.EMPTY_PACKET: + break + elif ptype == gpspacket.COMMENT_PACKET: + if "Serial:" in packet: + packet = packet[1:].strip() try: - (xx, baud, params) = line.split() + (xx, baud, params) = packet.split() baud = int(baud) if params[0] in ('7', '8'): databits = int(params[0]) @@ -125,137 +127,17 @@ class TestLoad: self.serial = (baud, databits, parity, stopbits) else: - break - # Grab the packets - while True: - packet = self.packet_get() - if self.predump: - print `packet` - if not packet or packet == "\n": - break - elif packet[0] != "#": + if type_latch is None: + type_latch = ptype + if self.predump: + print `packet` self.sentences.append(packet) # Look at the first packet to grok the GPS type - if self.sentences[0][0] == '$': - self.packtype = "NMEA" + self.textual = (type_latch == gpspacket.NMEA_PACKET) + if self.textual: self.legend = "gpsfake: line %d: " - self.idoffset = None - self.textual = True - elif self.sentences[0][0] == '\xff': - self.packtype = "Zodiac binary" - self.legend = "gpsfake: packet %d: " - self.idoffset = None - self.textual = True - elif self.sentences[0][0] == '\xa0': - self.packtype = "SiRF binary" - self.legend = "gpsfake: packet %d: " - self.idoffset = 3 - self.textual = False - elif self.sentences[0][0] == '\x02': - self.packtype = "Navcom binary" - self.legend = "gpsfake: packet %d" - self.textual = False - elif self.sentences[0][0] == '\x10': - self.packtype = "TSIP binary" - self.legend = "gpsfake: packet %d: " - self.idoffset = 1 - self.textual = False - elif self.sentences[0][0] == '\xb5': - self.packtype = "uBlox" - self.legend = "gpsfake: packet %d: " - self.idoffset = None - self.textual = False - elif self.sentences[0][0] == '\x3c': - self.packtype = "iTrax" - self.legend = "gpsfake: packet %d: " - self.idoffset = None - self.textual = False - elif self.type == "RTCM": - self.packtype = "RTCM" - self.legend = None - self.idoffset = None - self.textual = False - else: - sys.stderr.write("gpsfake: unknown log type (not NMEA or SiRF) can't handle it!\n") - self.sentences = None - def packet_get(self): - "Grab a packet. Unlike the daemon's state machine, this assumes no noise." - if self.first == '': - first = self.logfp.read(1) - else: - first=self.first - self.first='' - if not first: - return None - elif self.type == "RTCM": - return first - elif first == '#': # Comment - return "#" + self.logfp.readline() - elif first == '$': # NMEA packet - return "$" + self.logfp.readline() - second = self.logfp.read(1) - if first == '\xa0' and second == '\xa2': # SiRF packet - third = self.logfp.read(1) - fourth = self.logfp.read(1) - length = (ord(third) << 8) | ord(fourth) - return "\xa0\xa2" + third + fourth + self.logfp.read(length+4) - elif first == '\xff' and second == '\x81': # Zodiac - third = self.logfp.read(1) - fourth = self.logfp.read(1) - fifth = self.logfp.read(1) - sixth = self.logfp.read(1) - #id = ord(third) | (ord(fourth) << 8) - ndata = ord(fifth) | (ord(sixth) << 8) - return "\xff\x81" + third + fourth + fifth + sixth + self.logfp.read(2*ndata+6) - elif first == '\x02' and second == '\x99': # Navcom - third = self.logfp.read(1) - fourth = self.logfp.read(1) - fifth = self.logfp.read(1) - sixth = self.logfp.read(1) - #id = ord(fourth) - ndata = ord(fifth) | (ord(sixth) << 8) - return "\x02\x99\x66" + fourth + fifth + sixth + self.logfp.read(ndata-2) - elif first == '\x10': # TSIP - packet = first + second - delcnt = 0 - while True: - next = self.logfp.read(1) - if not next: - return '' - packet += next - if next == '\x10': - delcnt += 1 - elif next == '\x03': - if delcnt % 2: - break - else: - delcnt = 0 - return packet - elif first == '\xb5' and second == '\x62': # ubx - third = self.logfp.read(1) - fourth = self.logfp.read(1) - fifth = self.logfp.read(1) - sixth = self.logfp.read(1) - # classid = third - # messageid = fourth - ndata = ord(fifth) | (ord(sixth) << 8) - return "\xb5\x62" + third + fourth + fifth + sixth + self.logfp.read(ndata+2) - elif first == '\x3c' and second == '\x21': # italk - third = self.logfp.read(1) - fourth = self.logfp.read(1) - fifth = self.logfp.read(1) - sixth = self.logfp.read(1) - seventh = self.logfp.read(1) - # srcnode = third - # dstnode = fourth - # messageid = fifth - # transaction = sixth - ndata = (ord(seventh)+1)*2 + 1 - return "\x3c\x21" + third + fourth + fifth + sixth + seventh + self.logfp.read(ndata) - elif first == "\n": # Use this to ignore trailing EOF on logs - return "\n" else: - raise PacketError("unknown packet type, leader %s (0x%x)" % (`first`, ord(first))) + self.legend = "gpsfake: packet %d" class PacketError(exceptions.Exception): def __init__(self, msg): @@ -265,8 +147,8 @@ class FakeGPS: "A fake GPS is a pty with a test log ready to be cycled to it." def __init__(self, logfp, speed=4800, databits=8, parity='N', stopbits=1, - predump=False, reporter=None): - self.reporter = reporter + predump=False, progress=None): + self.progress = progress self.go_predicate = lambda: True self.readers = 0 self.index = 0 @@ -298,6 +180,7 @@ class FakeGPS: logfp = open(logfp, "r"); self.name = logfp.name self.testload = TestLoad(logfp, predump) + self.progress("gpsfake: %s provides %d sentences\n" % (self.name, len(self.testload.sentences))) # FIXME: explicit arguments should probably override this #if self.testload.serial: # (speed, databits, parity, stopbits) = self.testload.serial @@ -342,8 +225,8 @@ class FakeGPS: "Feed a line from the contents of the GPS log to the daemon." line = self.testload.sentences[self.index % len(self.testload.sentences)] os.write(self.master_fd, line) - if self.reporter: - self.reporter("%s feeds %d=%s\n" % (self.name, len(line), `line`)) + if self.progress: + self.progress("gpsfake: %s feeds %d=%s\n" % (self.name, len(line), `line`)) time.sleep((WRITE_PAD * len(line)) / self.speed) self.index += 1 @@ -484,7 +367,7 @@ class TestSession: self.progress("gpsfake: gps_add(%s, %d)\n" % (logfile, speed)) if logfile not in self.fakegpslist: newgps = FakeGPS(logfile, speed=speed, predump=self.predump, - reporter=self.reporter) + progress=self.progress) if pred: newgps.go_predicate = pred elif self.default_predicate: |