summaryrefslogtreecommitdiff
path: root/gpssim.py
blob: 51319812638b15c8e988ec59dac025b21562429b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# This file is Copyright (c) 2010 by the GPSD project
# SPDX-License-Identifier: BSD-2-clause
"""
A GPS simulator.

This is proof-of-concept code, not production ready; some functions are stubs.
"""
import math
import random
import sys
import time

import gps
import gpslib

# First, the mathematics.  We simulate a moving viewpoint on the Earth
# and a satellite with specified orbital elements in the sky.


class ksv(object):
    "Kinematic state vector."

    def __init__(self, time=0, lat=0, lon=0, alt=0, course=0,
                 speed=0, climb=0, h_acc=0, v_acc=0):
        self.time = time        # Seconds from epoch
        self.lat = lat          # Decimal degrees
        self.lon = lon          # Decimal degrees
        self.alt = alt          # Meters
        self.course = course    # Degrees from true North
        self.speed = speed      # Meters per second
        self.climb = climb      # Meters per second
        self.h_acc = h_acc      # Meters per second per second
        self.v_acc = v_acc      # Meters per second per second

    def next(self, quantum=1):
        "State after quantum."
        self.time += quantum
        avspeed = (2 * self.speed + self.h_acc * quantum) / 2
        avclimb = (2 * self.climb + self.v_acc * quantum) / 2
        self.alt += avclimb * quantum
        self.speed += self.h_acc * quantum
        self.climb += self.v_acc * quantum
        distance = avspeed * quantum
        # Formula from <http://williams.best.vwh.net/avform.htm#Rhumb>
        # Initial point cannot be a pole, but GPS doesn't work at high.
        # latitudes anyway so it would be OK to fail there.
        # Seems to assume a spherical Earth, which means it's going
        # to have a slight inaccuracy rising towards the poles.
        # The if/then avoids 0/0 indeterminacies on E-W courses.
        tc = gps.Deg2Rad(self.course)
        lat = gps.Deg2Rad(self.lat)
        lon = gps.Deg2Rad(self.lon)
        lat += distance * math.cos(tc)
        dphi = math.log(math.tan(lat / 2 + math.pi / 4) /
                        math.tan(self.lat / 2 + math.pi / 4))
        if abs(lat - self.lat) < math.sqrt(1e-15):
            q = math.cos(self.lat)
        else:
            q = (lat - self.lat) / dphi
        dlon = -distance * math.sin(tc) / q
        self.lon = gps.Rad2Deg(math.mod(lon + dlon + math.pi, 2 * math.pi) -
                               math.pi)
        self.lat = gps.Rad2Deg(lat)

# Satellite orbital elements are available at:
# <http://www.ngs.noaa.gov/orbits/>
# Orbital theory at:
# <http://www.wolffdata.se/gps/gpshtml/anomalies.html>


class satellite(object):
    "Orbital elements of one satellite. PRESENTLY A STUB"

    def __init__(self, prn):
        self.prn = prn

    def position(self, time):
        "Return right ascension and declination of satellite,"
        pass

# Next, the command interpreter.  This is an object that takes an
# input source in the track description language, interprets it into
# sammples that might be reported by a GPS, and calls a reporting
# class to generate output.


class gpssimException(BaseException):
    def __init__(self, message, filename, lineno):
        BaseException.__init__(self)
        self.message = message
        self.filename = filename
        self.lineno = lineno

    def __str__(self):
        return '"%s", %d:' % (self.filename, self.lineno)


class gpssim(object):
    "Simulate a moving sensor, with skyview."
    active_PRNs = list(range(1, 24 + 1)) + [134, ]

    def __init__(self, outfmt):
        self.ksv = ksv()
        self.ephemeris = {}
        # This sets up satellites at random.  Not really what we want.
        for prn in gpssim.active_PRNs:
            for (prn, _satellite) in list(self.ephemeris.items()):
                self.skyview[prn] = (random.randint(-60, +61),
                                     random.randint(0, 359))
        self.have_ephemeris = False
        self.channels = {}
        self.outfmt = outfmt
        self.status = gps.STATUS_NO_FIX
        self.mode = gps.MODE_NO_FIX
        self.validity = "V"
        self.satellites_used = 0
        self.filename = None
        self.lineno = 0

    def parse_tdl(self, line):
        "Interpret one TDL directive."
        line = line.strip()
        if "#" in line:
            line = line[:line.find("#")]
        if line == '':
            return
        fields = line.split()
        command = fields[0]
        if command == "time":
            self.ksv.time = gps.isotime(fields[1])
        elif command == "location":
            (self.lat, self.lon, self.alt) = list(map(float, fields[1:]))
        elif command == "course":
            self.ksv.time = float(fields[1])
        elif command == "speed":
            self.ksv.speed = float(fields[1])
        elif command == "climb":
            self.ksv.climb = float(fields[1])
        elif command == "acceleration":
            (self.ksv.h_acc, self.ksv.h_acc) = list(map(float, fields[1:]))
        elif command == "snr":
            self.channels[int(fields[1])] = float(fields[2])
        elif command == "go":
            self.go(int(fields[1]))
        elif command == "status":
            try:
                code = fields[1]
                self.status = {"no_fix": 0, "fix": 1, "dgps_fix": 2}[
                    code.lower()]
            except KeyError:
                raise gpssimException("invalid status code '%s'" % code,
                                      self.filename, self.lineno)
        elif command == "mode":
            try:
                code = fields[1]
                self.status = {"no_fix": 1, "2d": 2, "3d": 3}[code.lower()]
            except KeyError:
                raise gpssimException("invalid mode code '%s'" % code,
                                      self.filename, self.lineno)
        elif command == "satellites":
            self.satellites_used = int(fields[1])
        elif command == "validity":
            self.validity = fields[1]
        else:
            raise gpssimException("unknown command '%s'" % fields[1],
                                  self.filename, self.lineno)
        # FIX-ME: add syntax for ephemeris elements
        self.lineno += 1

    def filter(self, inp, outp):
        "Make this a filter for file-like objects."
        self.filename = input.name
        self.lineno = 1
        self.output = outp
        for line in inp:
            self.execute(line)

    def go(self, seconds):
        "Run the simulation for a specified number of seconds."
        for i in range(seconds):
            next(self.ksv)
            if self.have_ephemeris:
                self.skyview = {}
                for (prn, satellite) in list(self.ephemeris.items()):
                    self.skyview[prn] = satellite.position(i)
            self.output.write(self.gpstype.report(self))

# Reporting classes need to have a report() method returning a string
# that is a sentence (or possibly several sentences) reporting the
# state of the simulation.  Presently we have only one, for NMEA
# devices, but the point of the architecture is so that we could simulate
# others - SirF, Evermore, whatever.


MPS_TO_KNOTS = 1.9438445      # Meters per second to knots


class NMEA(object):
    "NMEA output generator."

    def __init__(self):
        self.sentences = ("RMC", "GGA",)
        self.counter = 0

    def add_checksum(self, mstr):
        "Concatenate NMEA checksum and trailer to a string"
        csum = 0
        for (i, c) in enumerate(mstr):
            if i == 0 and c == "$":
                continue
            csum ^= ord(c)
        mstr += "*%02X\r\n" % csum
        return mstr

    def degtodm(self, angle):
        "Decimal degrees to GPS-style, degrees first followed by minutes."
        (fraction, _integer) = math.modf(angle)
        return math.floor(angle) * 100 + fraction * 60

    def GGA(self, sim):
        "Emit GGA sentence describing the simulation state."
        tm = time.gmtime(sim.ksv.time)
        gga = "$GPGGA,%02d%02d%02d,%09.4f,%c,%010.4f,%c,%d,%02d," % (
            tm.tm_hour,
            tm.tm_min,
            tm.tm_sec,
            self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
            self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
            sim.status,
            sim.satellites_used)
        # HDOP calculation goes here
        gga += ","
        if sim.mode == gps.MODE_3D:
            gga += "%.1f,M" % self.ksv.lat
        gga += ","
        gga += "%.3f,M," % gpslib.wg84_separation(sim.ksv.lat, sim.ksv.lon)
        # Magnetic variation goes here
        # gga += "%3.2f,M," % mag_var
        gga += ",,"
        # Time in seconds since last DGPS update goes here
        gga += ","
        # DGPS station ID goes here
        return self.add_checksum(gga)

    def GLL(self, sim):
        "Emit GLL sentence describing the simulation state."
        tm = time.gmtime(sim.ksv.time)
        gll = "$GPLL,%09.4f,%c,%010.4f,%c,%02d%02d%02d,%s," % (
            self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
            self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
            tm.tm_hour,
            tm.tm_min,
            tm.tm_sec,
            sim.validity, )
        # FAA mode indicator could go after these fields.
        return self.add_checksum(gll)

    def RMC(self, sim):
        "Emit RMC sentence describing the simulation state."
        tm = time.gmtime(sim.ksv.time)
        rmc = \
            "GPRMC,%02d%02d%02d,%s,%09.4f,%c,%010.4f,%c,%.1f,%02d%02d%02d," % (
                tm.tm_hour,
                tm.tm_min,
                tm.tm_sec,
                sim.validity,
                self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
                self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
                sim.course * MPS_TO_KNOTS,
                tm.tm_mday,
                tm.tm_mon,
                tm.tm_year % 100)
        # Magnetic variation goes here
        # rmc += "%3.2f,M," % mag_var
        rmc += ",,"
        # FAA mode goes here
        return self.add_checksum(rmc)

    def ZDA(self, sim):
        "Emit ZDA sentence describing the simulation state."
        tm = time.gmtime(sim.ksv.time)
        zda = "$GPZDA,%02d%2d%02d,%02d,%02d,%04d" % (
            tm.tm_hour,
            tm.tm_min,
            tm.tm_sec,
            tm.tm_mday,
            tm.tm_mon,
            tm.tm_year, )
        # Local zone description, 00 to +- 13 hours, goes here
        zda += ","
        # Local zone minutes description goes here
        zda += ","
        return self.add_checksum(zda)

    def report(self, sim):
        "Report the simulation state."
        out = ""
        for sentence in self.sentences:
            if isinstance(sentence, tuple):
                (interval, sentence) = sentence
                if self.counter % interval:
                    continue
            out += getattr(self, sentence)(*[sim])
        self.counter += 1
        return out

# The very simple main line.


if __name__ == "__main__":
    try:
        gpssim(NMEA).filter(sys.stdin, sys.stdout)
    except gpssimException as e:
        sys.stderr.write(repr(e) + "\n")

# gpssim.py ends here.