#!/usr/bin/env python # -*- coding: UTF-8 ''' xgps -- test client for gpsd usage: xgps [-?] [-D level] [-h] [-l degmfmt] [-r rotation] [-u units] [-V] [server[:port[:device]]] -? Print help and exit. -D lvl Set debug level to lvl -h Print help and exit. -l {d|m|s} Select lat/lon format d = DD.dddddd (default) m = DD MM.mmmm' s = DD MM' SS.sss" -r rotation Set rotation -u units Set units -V Print version and exit. ''' # This file is Copyright (c) 2010 by the GPSD project # SPDX-License-Identifier: BSD-2-clause # # This code runs compatibly under Python 2 and 3.x for x >= 2. # Preserve this property! from __future__ import absolute_import, print_function, division import cairo import math import os import socket import sys import time # Gtk3 imports. Gtk3 requires the require_version(), which then causes # pylint to complain about the subsequent "non-top" imports. import gi try: gi.require_version('Gtk', '3.0') except ValueError: # Gtk2 may be installed, has no equire_version() sys.stderr.write("xgps: ERROR Unsupported Gtk version\n") exit(1) from gi.repository import GObject # pylint: disable=wrong-import-position from gi.repository import Gtk # pylint: disable=wrong-import-position from gi.repository import Gdk # pylint: disable=wrong-import-position from gi.repository import Pango # pylint: disable=wrong-import-position # pylint wants local modules last try: import gps import gps.clienthelpers except ImportError as e: sys.stderr.write( "xgps: can't load Python gps libraries -- check PYTHONPATH.\n") sys.stderr.write("%s\n" % e) sys.exit(1) gps_version = '3.19-dev' if gps.__version__ != gps_version: sys.stderr.write("xgps: ERROR: need gps module version %s, got %s\n" % (gps_version, gps.__version__)) sys.exit(1) gui_about = '''\ This is xgps, a test client for the gpsd daemon. By Eric S. Raymond for the GPSD project, December 2009 ''' # MAXCHANNELS, from gps.h, currently 120 MAXCHANNELS = 120 # MAXCHANDISP, max channels to display # Use our own MAXCHANDISP value, due to the tradeoff between max sats and # the window size. Ideally, this should be dynamic. MAXCHANDISP = 28 # how to sort the Satellite List # some of ("PRN","el","az","ss","used") with optional '-' to reverse sort # by default, used at the top, then sort PRN SKY_VIEW_SORT_FIELDS = ('-used', 'PRN') # Each GNSS constellation reuses the same PRNs. To differentiate they are # all mushed into the PRN. Different GPS mush differently. gpsd should # have untangled and put in gnssid:svid def gnssid_str(sat): "convert gnssid:svid to string" # gnssid:svid appeared in gpsd 3.18 # allow for old servers if 'gnssid' not in sat or 'svid' not in sat: return ' ' if 0 >= sat.svid: return ' ' if 0 == sat.gnssid: return 'GP' if 1 == sat.gnssid: return 'SB' if 2 == sat.gnssid: return 'GA' if 3 == sat.gnssid: return 'BD' if 4 == sat.gnssid: return 'IM' if 5 == sat.gnssid: return 'QZ' if 6 == sat.gnssid: return 'GL' return ' ' class unit_adjustments(object): "Encapsulate adjustments for unit systems." def __init__(self, units=None): "Initialize class unit_adjustments" self.altfactor = gps.METERS_TO_FEET self.altunits = "ft" self.speedfactor = gps.MPS_TO_MPH self.speedunits = "mph" if units is None: units = gps.clienthelpers.gpsd_units() if units in (gps.clienthelpers.unspecified, gps.clienthelpers.imperial, "imperial", "i"): pass elif units in (gps.clienthelpers.nautical, "nautical", "n"): self.altfactor = gps.METERS_TO_FEET self.altunits = "ft" self.speedfactor = gps.MPS_TO_KNOTS self.speedunits = "knots" elif units in (gps.clienthelpers.metric, "metric", "m"): self.altfactor = 1.0 self.altunits = "m" self.speedfactor = gps.MPS_TO_KPH self.speedunits = "kph" else: raise ValueError # Should never happen def fit_to_grid(x, y, line_width): "Adjust coordinates to produce sharp lines." if line_width % 1.0 != 0: # Can't have sharp lines for non-integral line widths. return float(x), float(y) # Be consistent about returning floats if line_width % 2 == 0: # Round to a pixel corner. return round(x), round(y) # Round to a pixel center. return int(x) + 0.5, int(y) + 0.5 def fit_circle_to_grid(x, y, radius, line_width): """Adjust circle coordinates and radius to produce sharp horizontal and vertical tangents.""" r = radius x1, y1 = fit_to_grid(x - r, y - r, line_width) x2, y2 = fit_to_grid(x + r, y + r, line_width) x, y = (x1 + x2) / 2, (y1 + y2) / 2 r = (x2 - x1 + y2 - y1) / 4 return x, y, r class SkyView(Gtk.DrawingArea): "Satellite skyview, encapsulates pygtk's draw-on-expose behavior." # See HORIZON_PAD = 50 # How much whitespace to leave around horizon SAT_RADIUS = 5 # Diameter of satellite circle def __init__(self, rotation=0.0): "Initialize class SkyView" Gtk.DrawingArea.__init__(self) # GObject.GObject.__init__(self) self.set_size_request(400, 400) self.cr = None # New cairo context for each expose event self.step_of_grid = 45 # default step of polar grid self.connect('size-allocate', self.on_size_allocate) self.connect('draw', self.on_draw) self.satellites = [] self.sat_xy = [] self.center_x = self.center_y = self.radius = None self.rotate = rotation self.connect('motion_notify_event', self.popup) self.popover = None self.pop_xy = (None, None) def popdown(self): "See if need to popdown the sat details" if self.popover: self.popover.popdown() self.popover = None self.pop_xy = (None, None) def popup(self, skyview, event): "See if need to popup the sat details" for (x, y, sat) in self.sat_xy: if ((SkyView.SAT_RADIUS >= abs(x - event.x) and SkyView.SAT_RADIUS >= abs(y - event.y))): # got a sat match under the mouse # print((x, y)) if ((self.pop_xy[0] and self.pop_xy[1] and self.pop_xy == (int(x), int(y)))): # popup already up here, ignore event # print("(%d, %d)" % (x, y)) return if self.popover: # remove any old, no longer current popup # this never happens? self.popdown() # mouse is over a satellite, do popup self.pop_xy = (int(x), int(y)) self.popover = Gtk.Popover() if "gnssid" in sat and "svid" in sat: # gnssid:svid in gpsd 3.18 and up constellation = gnssid_str(sat) gnss_str = "%s:%d\n" % (constellation, sat.svid) else: gnss_str = '' s = ("PRN %d\n" "%s" "Elevation %3d\n" "Azimuth %3d\n" "SNR %3d\n" "Used %8s" % (sat.PRN, gnss_str, sat.el, sat.az, sat.ss, 'Yes' if sat.used else 'No')) label = Gtk.Label(s) rectangle = Gdk.Rectangle() rectangle.x = x - 25 rectangle.y = y - 25 rectangle.width = 50 rectangle.height = 50 self.popover.set_modal(False) self.popover.set_relative_to(self) self.popover.set_position(Gtk.PositionType.TOP) self.popover.set_pointing_to(rectangle) self.popover.add(label) self.popover.popup() self.popover.show_all() # remove popup after 5 seconds GObject.timeout_add(5000, self.popdown) return if self.popover: # remove any old, no longer current popup # this never happens? self.popdown() def on_size_allocate(self, _unused, allocation): "Adjust SkyView on size change" width = allocation.width height = allocation.height x = width // 2 y = height // 2 r = (min(width, height) - SkyView.HORIZON_PAD) // 2 x, y, r = fit_circle_to_grid(x, y, r, 1) self.center_x = x self.center_y = y self.radius = r def set_color(self, r, g, b): """Set foreground color for drawing. rgb: 0 to 255""" # Gdk.color_parse() deprecated in GDK 3.14 # gdkcolor = Gdk.color_parse(spec) r = r / 255.0 g = g / 255.0 b = b / 255.0 self.cr.set_source_rgb(r, g, b) def draw_circle(self, x, y, radius, filled=False): "Draw a circle centered on the specified midpoint." lw = self.cr.get_line_width() r = int(2 * radius + 0.5) // 2 x, y, r = fit_circle_to_grid(x, y, radius, lw) self.cr.arc(x, y, r, 0, math.pi * 2.0) self.cr.close_path() if filled: self.cr.fill() else: self.cr.stroke() def draw_line(self, x1, y1, x2, y2): "Draw a line between specified points." lw = self.cr.get_line_width() x1, y1 = fit_to_grid(x1, y1, lw) x2, y2 = fit_to_grid(x2, y2, lw) self.cr.move_to(x1, y1) self.cr.line_to(x2, y2) self.cr.stroke() def draw_square(self, x, y, radius, filled, flip): "Draw a square centered on the specified midpoint." lw = self.cr.get_line_width() if 0 == flip: x1, y1 = fit_to_grid(x - radius, y - radius, lw) x2, y2 = fit_to_grid(x + radius, y + radius, lw) self.cr.rectangle(x1, y1, x2 - x1, y2 - y1) else: self.cr.move_to(x, y + radius) self.cr.line_to(x + radius, y) self.cr.line_to(x, y - radius) self.cr.line_to(x - radius, y) self.cr.close_path() if filled: self.cr.fill() else: self.cr.stroke() def draw_string(self, x, y, text, centered=True): "Draw a text on the skyview." self.cr.select_font_face("Sans", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD) self.cr.set_font_size(10) if centered: extents = self.cr.text_extents(text) # width / 2 + x_bearing x -= extents[2] / 2 + extents[0] # height / 2 + y_bearing y -= extents[3] / 2 + extents[1] self.cr.move_to(x, y) self.cr.show_text(text) self.cr.new_path() def draw_triangle(self, x, y, radius, filled, flip): "Draw a triangle centered on the specified midpoint." lw = self.cr.get_line_width() if 0 == flip or 1 == flip: if 0 == flip: # down ytop = y + radius ybot = y - radius elif 1 == flip: # up ytop = y - radius ybot = y + radius x1, y1 = fit_to_grid(x, ytop, lw) x2, y2 = fit_to_grid(x + radius, ybot, lw) x3, y3 = fit_to_grid(x - radius, ybot, lw) else: # right ytop = y + radius ybot = y - radius x1, y1 = fit_to_grid(x - radius, ytop, lw) x2, y2 = fit_to_grid(x - radius, ybot, lw) x3, y3 = fit_to_grid(x + radius, y, lw) self.cr.move_to(x1, y1) self.cr.line_to(x2, y2) self.cr.line_to(x3, y3) self.cr.close_path() if filled: self.cr.fill() else: self.cr.stroke() def pol2cart(self, az, el): "Polar to Cartesian coordinates within the horizon circle." az = (az - self.rotate) % 360.0 az *= (math.pi / 180) # Degrees to radians # Exact spherical projection would be like this: # el = sin((90.0 - el) * DEG_2_RAD); el = ((90.0 - el) / 90.0) xout = self.center_x + math.sin(az) * el * self.radius yout = self.center_y - math.cos(az) * el * self.radius return (xout, yout) def on_draw(self, widget, _unused): "Draw the skyview" self.cr = widget.get_window().cairo_create() self.cr.set_line_width(1) self.cr.set_source_rgb(0, 0, 0) self.cr.paint() self.cr.set_source_rgb(1, 1, 1) # The zenith marker self.draw_circle(self.center_x, self.center_y, 6, filled=False) # The horizon circle if self.step_of_grid == 45: # The circle corresponding to 45 degrees elevation. # There are two ways we could plot this. Projecting the sphere # on the display plane, the circle would have a diameter of # sin(45) ~ 0.7. But the naive linear mapping, just splitting # the horizon diameter in half, seems to work better visually. self.draw_circle(self.center_x, self.center_y, self.radius / 2, filled=False) elif self.step_of_grid == 30: self.draw_circle(self.center_x, self.center_y, self.radius * 2 / 3, filled=False) self.draw_circle(self.center_x, self.center_y, self.radius / 3, filled=False) self.draw_circle(self.center_x, self.center_y, self.radius, filled=False) (x1, y1) = self.pol2cart(0, 0) (x2, y2) = self.pol2cart(180, 0) self.draw_line(x1, y1, x2, y2) (x1, y1) = self.pol2cart(90, 0) (x2, y2) = self.pol2cart(270, 0) self.draw_line(x1, y1, x2, y2) # The compass-point letters (x, y) = self.pol2cart(0, -5) self.draw_string(x, y, "N") (x, y) = self.pol2cart(90, -5) self.draw_string(x, y, "E") (x, y) = self.pol2cart(180, -5) self.draw_string(x, y, "S") (x, y) = self.pol2cart(270, -5) self.draw_string(x, y, "W") # place an invisible space above to allow sats below horizon (x, y) = self.pol2cart(0, -10) self.draw_string(x, y, "") # The satellites self.cr.set_line_width(2) self.sat_xy = [] for sat in self.satellites: if not 1 <= sat.PRN <= 437: # Bad PRN, skip. NMEA uses up to 437 continue if not 0 <= sat.az <= 359: # Bad azimuth, skip. continue if not -10 <= sat.el <= 90: # Bad elevation, skip. Allow just below horizon continue # The Navika-100 reports el/az of 0/0 for SBAS satellites, # causing them to appear inappropriately at the "north point". # Although this value isn't technically illegal (and hence not # filtered above), excluding this one specific case has a very # low probability of excluding legitimate cases, while avoiding # the improper display in this case. # Note that this only excludes them from the map, not the list. if sat.az == 0 and sat.el == 0: continue (x, y) = self.pol2cart(sat.az, sat.el) # colorize by signal to noise ratio # RINEX 3 uses 9 steps: 1 to 9. Corresponding to # <12, 12-17, 18-23, 24-29, 30-35, 36-41, 42-47, 48-53, >= 54 if sat.ss < 12: self.set_color(190, 190, 190) # gray elif sat.ss < 30: self.set_color(255, 0, 0) # red elif sat.ss < 36: # RINEX 3 says 30 is "threshold for good tracking" self.set_color(255, 255, 0) # yellow elif sat.ss < 42: self.set_color(0, 205, 0) # green3 else: self.set_color(0, 255, 180) # green and some blue # shape by constellation constellation = gnssid_str(sat) if constellation in ('GP', ' '): self.draw_circle(x, y, SkyView.SAT_RADIUS, sat.used) elif constellation == 'SB': self.draw_square(x, y, SkyView.SAT_RADIUS, sat.used, 0) elif constellation == 'GA': self.draw_triangle(x, y, SkyView.SAT_RADIUS, sat.used, 0) elif constellation == 'BD': self.draw_triangle(x, y, SkyView.SAT_RADIUS, sat.used, 1) elif constellation == 'GL': self.draw_square(x, y, SkyView.SAT_RADIUS, sat.used, 1) else: # QZSS, IMES, unknown or other self.draw_triangle(x, y, SkyView.SAT_RADIUS, sat.used, 2) self.sat_xy.append((x, y, sat)) self.cr.set_source_rgb(1, 1, 1) self.draw_string(x + SkyView.SAT_RADIUS, y + (SkyView.SAT_RADIUS * 2), str(sat.PRN), centered=False) self.cr = None def redraw(self, satellites): "Redraw the skyview." self.satellites = satellites self.queue_draw() class NoiseView(object): "Encapsulate view object for watching noise statistics." COLUMNS = 2 ROWS = 4 noisefields = ( # First column ("Time", "time"), ("Latitude", "lat"), ("Longitude", "lon"), ("Altitude", "alt"), # Second column ("RMS", "rms"), ("Major", "major"), ("Minor", "minor"), ("Orient", "orient"), ) def __init__(self): "Initialize class NoiseView" self.widget = Gtk.Table(NoiseView.COLUMNS, NoiseView.ROWS, False) self.noisewidgets = [] for i in range(len(NoiseView.noisefields)): colbase = (i // NoiseView.ROWS) * 2 label = Gtk.Label(label=NoiseView.noisefields[i][0] + ": ") # Wacky way to force right alignment label.set_alignment(xalign=1, yalign=0.5) self.widget.attach( label, colbase, colbase + 1, i % NoiseView.ROWS, i % NoiseView.ROWS + 1) entry = Gtk.Entry() # The right size for the ISO8601 timestamp entry.modify_font(Pango.FontDescription("Monospace 10")) entry.set_width_chars(24) entry.set_text("n/a") self.widget.attach( entry, colbase + 1, colbase + 2, i % NoiseView.ROWS, i % NoiseView.ROWS + 1) self.noisewidgets.append((NoiseView.noisefields[i][1], entry)) def update(self, noise): "Update the GPGST data fields." for (attrname, widget) in self.noisewidgets: if hasattr(noise, attrname): widget.set_text(str(getattr(noise, attrname))) else: widget.set_text("n/a") class MaidenheadView(object): "Encapsulate view object for watching Maidenhead grid location." def __init__(self): "Initialize class MaidenheadView" self.widget = Gtk.Entry() self.widget.set_editable(False) def update(self, tpv): "Update class MaidenheadView" if ((tpv.mode >= gps.MODE_2D and hasattr(tpv, "lat") and hasattr(tpv, "lon"))): self.widget.set_text(gps.clienthelpers.maidenhead(tpv.lat, tpv.lon)) else: self.widget.set_text("n/a") class AISView(object): "Encapsulate store and view objects for watching AIS data." AIS_ENTRIES = 10 DWELLTIME = 360 def __init__(self, deg_type): "Initialize the store and view." self.deg_type = deg_type self.name_to_mmsi = {} self.named = {} self.store = Gtk.ListStore(str, str, str, str, str, str) self.widget = Gtk.ScrolledWindow() self.widget.set_policy(Gtk.PolicyType.AUTOMATIC, Gtk.PolicyType.AUTOMATIC) self.view = Gtk.TreeView(model=self.store) self.widget.set_size_request(-1, 300) self.widget.add_with_viewport(self.view) for (i, label) in enumerate(('#', 'Name:', 'Callsign:', 'Destination:', "Lat/Lon:", "Information")): column = Gtk.TreeViewColumn(label) renderer = Gtk.CellRendererText() column.pack_start(renderer, expand=True) column.add_attribute(renderer, 'text', i) self.view.append_column(column) def enter(self, ais, name): "Add a named object (ship or station) to the store." if ais.mmsi in self.named: return False ais.entry_time = time.time() self.named[ais.mmsi] = ais self.name_to_mmsi[name] = ais.mmsi # Garbage-collect old entries try: for i in range(len(self.store)): here = self.store.get_iter(i) name = self.store.get_value(here, 1) mmsi = self.name_to_mmsi[name] if ((self.named[mmsi].entry_time < time.time() - AISView.DWELLTIME)): del self.named[mmsi] if name in self.name_to_mmsi: del self.name_to_mmsi[name] self.store.remove(here) except (ValueError, KeyError): # Invalid TreeIters throw these pass return True def latlon(self, lat, lon): "Latitude/longitude display in nice format." if lat < 0: latsuff = "S" elif lat > 0: latsuff = "N" else: latsuff = "" lat = gps.clienthelpers.deg_to_str(self.deg_type, lat) if lon < 0: lonsuff = "W" elif lon > 0: lonsuff = "E" else: lonsuff = "" lon = gps.clienthelpers.deg_to_str(gps.clienthelpers.deg_ddmmss, lon) return lat + latsuff + "/" + lon + lonsuff def update(self, ais): "Update the AIS data fields." if ais.type in (1, 2, 3, 18): if ais.mmsi in self.named: for i in range(len(self.store)): here = self.store.get_iter(i) name = self.store.get_value(here, 1) if name in self.name_to_mmsi: mmsi = self.name_to_mmsi[name] if mmsi == ais.mmsi: latlon = self.latlon(ais.lat, ais.lon) self.store.set_value(here, 4, latlon) elif ais.type == 4: if self.enter(ais, ais.mmsi): where = self.latlon(ais.lat, ais.lon) self.store.prepend( (ais.type, ais.mmsi, "(shore)", ais.timestamp, where, ais.epfd_text)) elif ais.type == 5: if self.enter(ais, ais.shipname): self.store.prepend( (ais.type, ais.shipname, ais.callsign, ais.destination, "", ais.shiptype)) elif ais.type == 12: sender = ais.mmsi if sender in self.named: sender = self.named[sender].shipname recipient = ais.dest_mmsi if ((recipient in self.named and hasattr(self.named[recipient], "shipname"))): recipient = self.named[recipient].shipname self.store.prepend( (ais.type, sender, "", recipient, "", ais.text)) elif ais.type == 14: sender = ais.mmsi if sender in self.named: sender = self.named[sender].shipname self.store.prepend( (ais.type, sender, "", "(broadcast)", "", ais.text)) elif ais.type in (19, 24): if self.enter(ais, ais.shipname): self.store.prepend( (ais.type, ais.shipname, "(class B)", "", "", ais.shiptype_text)) elif ais.type == 21: if self.enter(ais, ais.name): where = self.latlon(ais.lat, ais.lon) self.store.prepend( (ais.type, ais.name, "(%s navaid)" % ais.epfd_text, "", where, ais.aid_type_text)) class Base(object): "Base class for all the output" COLUMNS = 3 ROWS = 7 gpsfields = ( # First column ("Time", lambda s, r: s.update_time(r)), ("Latitude", lambda s, r: s.update_latitude(r)), ("Longitude", lambda s, r: s.update_longitude(r)), ("Altitude", lambda s, r: s.update_altitude(r)), ("Speed", lambda s, r: s.update_speed(r)), ("Climb", lambda s, r: s.update_climb(r)), ("Track", lambda s, r: s.update_track(r)), # Second column ("Status", lambda s, r: s.update_status(r)), ("EPX", lambda s, r: s.update_err(r, "epx")), ("EPY", lambda s, r: s.update_err(r, "epy")), ("EPV", lambda s, r: s.update_err(r, "epv")), ("EPS", lambda s, r: s.update_err_speed(r, "eps")), ("EPC", lambda s, r: s.update_err_speed(r, "epc")), ("EPD", lambda s, r: s.update_err_degrees(r, "epd")), # third column ("ECEF X", lambda s, r: s.update_ecef(r, "ecefx")), ("ECEF Y", lambda s, r: s.update_ecef(r, "ecefy")), ("ECEF Z", lambda s, r: s.update_ecef(r, "ecefz")), ("ECEF VX", lambda s, r: s.update_ecef(r, "ecefvx", "/s")), ("ECEF VY", lambda s, r: s.update_ecef(r, "ecefvy", "/s")), ("ECEF VZ", lambda s, r: s.update_ecef(r, "ecefvz", "/s")), ("ECEF pAcc", lambda s, r: s.update_ecef(r, "ecefpAcc")), # fourth column ("Sats", lambda s, r: s.update_seen(r)), ("XDOP", lambda s, r: s.update_dop(r, "xdop")), ("YDOP", lambda s, r: s.update_dop(r, "ydop")), ("VDOP", lambda s, r: s.update_dop(r, "vdop")), ("HDOP", lambda s, r: s.update_dop(r, "hdop")), ("TDOP", lambda s, r: s.update_dop(r, "tdop")), ("GDOP", lambda s, r: s.update_dop(r, "gdop")), ) def __init__(self, deg_type, rotation=0.0, title=""): "Initialize class Base" self.deg_type = deg_type self.rotate = rotation self.conversions = unit_adjustments() self.saved_mode = -1 self.ais_latch = False self.noise_latch = False self.last_transition = 0.0 self.daemon = None self.device = None self.window = Gtk.Window(Gtk.WindowType.TOPLEVEL) if not self.window.get_display(): raise Exception("Can't open display") if title: title = " " + title self.window.set_title("xgps" + title) self.window.connect("delete-event", self.delete_event) self.window.set_resizable(False) vbox = Gtk.VBox(False, 0) self.window.add(vbox) self.window.connect("destroy", lambda _unused: Gtk.main_quit()) self.uimanager = Gtk.UIManager() self.accelgroup = self.uimanager.get_accel_group() self.window.add_accel_group(self.accelgroup) self.actiongroup = Gtk.ActionGroup('xgps') self.actiongroup.add_actions( [('Quit', Gtk.STOCK_QUIT, '_Quit', None, 'Quit the Program', lambda _unused: Gtk.main_quit()), ('File', None, '_File'), ('View', None, '_View'), ('Units', None, '_Units'), ('Step of grid', None, '_Step of grid')]) self.actiongroup.add_toggle_actions( [('Skyview', None, '_Skyview', 's', 'Enable Skyview', self.view_toggle), ('Responses', None, '_Responses', 'r', 'Enable Response Reports', self.view_toggle), ('GPS', None, '_GPS Data', 'g', 'Enable GPS Data', self.view_toggle), ('Noise', None, '_Noise Statistics', 'n', 'Enable Noise Statistics', self.view_toggle), ('Maidenhead', None, '_Maidenhead', 'm', 'Enable Maidenhead locator', self.view_toggle), ('AIS', None, '_AIS Data', 'a', 'Enable AIS Data', self.view_toggle), ]) self.actiongroup.add_radio_actions( [('Imperial', None, '_Imperial', 'i', 'Imperial units', 0), ('Nautical', None, '_Nautical', 'n', 'Nautical units', 1), ('Metric', None, '_Metric', 'm', 'Metric Units', 2), ], 0, lambda a, _unused: self.set_units( ['i', 'n', 'm'][a.get_current_value()])) self.actiongroup.add_radio_actions( [('30deg', None, '30°', None, '30°', 30), ('45deg', None, '45°', None, '45°', 45), ('Off', None, 'Off', None, 'Off', 0), ], 45, lambda a, _unused: self.set_step_of_grid( a.get_current_value())) self.uimanager.insert_action_group(self.actiongroup, 0) self.uimanager.add_ui_from_string(''' ''') self.uimanager.get_widget('/MenuBar/View/Skyview').set_active(True) self.uimanager.get_widget('/MenuBar/View/Responses').set_active(True) self.uimanager.get_widget('/MenuBar/View/GPS').set_active(True) self.uimanager.get_widget('/MenuBar/View/Noise').set_active(True) self.uimanager.get_widget('/MenuBar/View/Maidenhead').set_active(True) self.uimanager.get_widget('/MenuBar/View/AIS').set_active(True) menubar = self.uimanager.get_widget('/MenuBar') vbox.pack_start(menubar, expand=False, fill=True, padding=0) self.satbox = Gtk.HBox(False, 0) vbox.add(self.satbox) skyframe = Gtk.Frame(label="Satellite List") self.satbox.add(skyframe) self.satlist = Gtk.ListStore(str, str, str, str, str, str) view = Gtk.TreeView(model=self.satlist) for (i, label) in enumerate(('', 'PRN', 'Elev', 'Azim', 'SNR', 'Used')): column = Gtk.TreeViewColumn(label) renderer = Gtk.CellRendererText(xalign=1.0) column.pack_start(renderer, expand=True) column.add_attribute(renderer, 'text', i) view.append_column(column) self.row_iters = [] for i in range(MAXCHANDISP): self.satlist.append(["", "", "", "", "", ""]) self.row_iters.append(self.satlist.get_iter(i)) skyframe.add(view) viewframe = Gtk.Frame(label="Skyview") self.satbox.add(viewframe) self.skyview = SkyView(self.rotate) try: # mouseovers fail with remote DISPLAY self.skyview.set_property('events', Gdk.EventMask.POINTER_MOTION_MASK) except NotImplementedError: # keep going anyway, w/o popups sys.stderr.write("xgps: WARNING: failed to grab mouse events, " "popups disabled\n") pass viewframe.add(self.skyview) self.rawdisplay = Gtk.Entry() self.rawdisplay.set_editable(False) vbox.add(self.rawdisplay) self.dataframe = Gtk.Frame(label="GPS data") datatable = Gtk.Table(Base.COLUMNS, Base.ROWS, False) self.dataframe.add(datatable) gpswidgets = [] for i in range(len(Base.gpsfields)): colbase = (i // Base.ROWS) * 2 label = Gtk.Label(label=Base.gpsfields[i][0] + ": ") # Wacky way to force right alignment label.set_alignment(xalign=1, yalign=0.5) datatable.attach(label, colbase, colbase + 1, i % Base.ROWS, i % Base.ROWS + 1) entry = Gtk.Entry() # The right size for the ISO8601 timestamp entry.modify_font(Pango.FontDescription("Monospace 10")) entry.set_width_chars(24) entry.set_text("n/a") datatable.attach(entry, colbase + 1, colbase + 2, i % Base.ROWS, i % Base.ROWS + 1) gpswidgets.append(entry) vbox.add(self.dataframe) self.noisebox = Gtk.HBox(False, 0) vbox.add(self.noisebox) noiseframe = Gtk.Frame(label="Noise Statistics") self.noisebox.add(noiseframe) self.noiseview = NoiseView() noiseframe.add(self.noiseview.widget) self.gsbox = Gtk.HBox(False, 0) vbox.add(self.gsbox) gsframe = Gtk.Frame(label="Maidenhead Grid Square") self.gsbox.add(gsframe) self.gsview = MaidenheadView() gsframe.add(self.gsview.widget) self.aisbox = Gtk.HBox(False, 0) vbox.add(self.aisbox) aisframe = Gtk.Frame(label="AIS Data") self.aisbox.add(aisframe) self.aisview = AISView(self.deg_type) aisframe.add(self.aisview.widget) self.window.show_all() # Hide the Noise Statistics window until user selects it. self.uimanager.get_widget('/MenuBar/View/Noise').set_active(False) self.noisebox.hide() # Hide the Maidenhead window until user selects it. self.uimanager.get_widget('/MenuBar/View/Maidenhead').set_active(False) self.gsbox.hide() # Hide the AIS window until user selects it. self.uimanager.get_widget('/MenuBar/View/AIS').set_active(False) self.aisbox.hide() self.view_name_to_widget = { "Skyview": self.satbox, "Responses": self.rawdisplay, "GPS": self.dataframe, "Noise": self.noisebox, "Maidenhead": self.gsbox, "AIS": self.aisbox} # Discard field labels and associate data hooks with their widgets Base.gpsfields = [(label_hook_widget[0][1], label_hook_widget[1]) for label_hook_widget in zip(Base.gpsfields, gpswidgets)] def view_toggle(self, action): "Toggle widget view" # print("View toggle:", action.get_active(), action.get_name()) if hasattr(self, 'view_name_to_widget'): if action.get_active(): self.view_name_to_widget[action.get_name()].show() else: self.view_name_to_widget[action.get_name()].hide() # The effect we're after is to make the top-level window # resize itself to fit when we show or hide widgets. # This is undocumented magic to do that. self.window.resize(1, 1) def set_satlist_field(self, row, column, value): "Set a specified field in the satellite list." try: self.satlist.set_value(self.row_iters[row], column, str(value)) except IndexError: sys.stderr.write("xgps: channel = %d, MAXCHANDISP = %d\n" % (row, MAXCHANDISP)) def delete_event(self, _widget, _event, _data=None): "Say goodbye nicely" Gtk.main_quit() return False # State updates def update_time(self, data): "Update time" if hasattr(data, "time"): # str() just in case we get an old-style float. return str(data.time) return "n/a" def update_latitude(self, data): "Update latitude" if data.mode >= gps.MODE_2D and hasattr(data, "lat"): lat = gps.clienthelpers.deg_to_str(self.deg_type, data.lat) if data.lat < 0: ns = 'S' else: ns = 'N' return "%14s %s" % (lat, ns) return "n/a" def update_longitude(self, data): "Update longitude" if data.mode >= gps.MODE_2D and hasattr(data, "lon"): lon = gps.clienthelpers.deg_to_str(self.deg_type, data.lon) if data.lon < 0: ew = 'W' else: ew = 'E' return "%14s %s" % (lon, ew) return "n/a" def update_altitude(self, data): "Update altitude" if data.mode >= gps.MODE_3D and hasattr(data, "alt"): return "%9.3f %s" % ( data.alt * self.conversions.altfactor, self.conversions.altunits) return "n/a" def update_speed(self, data): "Update speed" if hasattr(data, "speed"): return "%9.3f %s" % ( data.speed * self.conversions.speedfactor, self.conversions.speedunits) return "n/a" def update_climb(self, data): "Update climb" if hasattr(data, "climb"): return "%9.3f %s" % ( data.climb * self.conversions.speedfactor, self.conversions.speedunits) return "n/a" def update_track(self, data): "Update track" if hasattr(data, "track"): return "%14s °" % ( gps.clienthelpers.deg_to_str(self.deg_type, data.track)) return "n/a" def update_seen(self, data): "Update sats seen" # update sats seen/used in the GPS Data window if hasattr(data, 'satellites_seen'): return ("Seen %d, Used %d" % ( getattr(data, 'satellites_seen'), getattr(data, 'satellites_used'))) return "n/a" def update_dop(self, data, doptype): "update a DOP in the GPS Data window" if hasattr(data, doptype): return "%5.2f" % getattr(data, doptype) return "n/a" def update_ecef(self, data, eceftype, speedunit=''): "update a ECEF in the GPS Data window" if hasattr(data, eceftype): value = getattr(data, eceftype) return ("% 14.3f %s%s" % (value * self.conversions.altfactor, self.conversions.altunits, speedunit)) return "n/a" def update_err(self, data, errtype): "update a error estimate in the GPS Data window" if hasattr(data, errtype): return "%8.3f %s" % ( getattr(data, errtype) * self.conversions.altfactor, self.conversions.altunits) return "n/a" def update_err_speed(self, data, errtype): "update speed error estimate in the GPS Data window" if hasattr(data, errtype): return "%8.3f %s" % ( getattr(data, errtype) * self.conversions.speedfactor, self.conversions.speedunits) return "n/a" def update_err_degrees(self, data, errtype): "update heading error estimate in the GPS Data window" if hasattr(data, errtype): return "%8.3f °" % (getattr(data, errtype)) return "n/a" def update_status(self, data): "Update the status window" if data.mode == gps.MODE_2D: status = "2D FIX" elif data.mode == gps.MODE_3D: status = "3D FIX" else: status = "NO FIX" if hasattr(data, 'status') and data.status == gps.STATUS_DGPS_FIX: status += " DIFF" if data.mode != self.saved_mode: self.last_transition = time.time() self.saved_mode = data.mode return status + " (%d secs)" % (time.time() - self.last_transition) def update_gpsdata(self, tpv): "Update the GPS data fields." # the first 21 fields are updated using TPV data # the next 7 fields are updated using SKY data for (hook, widget) in Base.gpsfields[:21]: if hook: # Remove this guard when we have all hooks widget.set_text(hook(self, tpv)) self.gsview.update(tpv) def update_version(self, ver): "Update the Version" if ver.release != gps_version: sys.stderr.write("%s: WARNING gpsd version %s different than " "expected %s\n" % (sys.argv[0], ver.release, gps_version)) if ((ver.proto_major != gps.api_major_version or ver.proto_minor != gps.api_minor_version)): sys.stderr.write("%s: WARNING API version %s.%s different than " "expected %s.%s\n" % (sys.argv[0], ver.proto_major, ver.proto_minor, gps.api_major_version, gps.api_minor_version)) def _int_to_str(self, value, min_val, max_val): "test val in range min to max, or return" if min_val <= value <= max_val: return '%3d' % value return 'n/a' def update_skyview(self, data): "Update the satellite list and skyview." data.satellites_seen = 0 data.satellites_used = 0 if hasattr(data, 'satellites'): satellites = data.satellites for fld in reversed(SKY_VIEW_SORT_FIELDS): rev = (fld[0] == '-') if rev: fld = fld[1:] satellites = sorted( satellites[:], key=lambda x: x[fld], reverse=rev) for (i, satellite) in enumerate(satellites): yesno = 'N' data.satellites_seen += 1 if satellite.used: yesno = 'Y' data.satellites_used += 1 if i >= MAXCHANDISP: # more than can be displaced continue self.set_satlist_field(i, 0, gnssid_str(satellite)) # NMEA uses PRN up to 437 self.set_satlist_field(i, 1, self._int_to_str(satellite.PRN, 1, 437)) # allow satellites 10 degree below horizon self.set_satlist_field(i, 2, self._int_to_str(satellite.el, -10, 90)) self.set_satlist_field(i, 3, self._int_to_str(satellite.az, 0, 359)) self.set_satlist_field(i, 4, "%3d" % satellite.ss) self.set_satlist_field(i, 5, yesno) # clear rest of the list for i in range(data.satellites_seen, MAXCHANDISP): for j in range(0, 6): self.set_satlist_field(i, j, "") else: # clear all of the list for i in range(0, MAXCHANDISP): for j in range(0, 6): self.set_satlist_field(i, j, "") satellites = () # repaint Skyview self.skyview.redraw(satellites) # the first 21 fields are updated using TPV data # the next 7 fields are updated using SKY data for (hook, widget) in Base.gpsfields[21:28]: if hook: # Remove this guard when we have all hooks widget.set_text(hook(self, data)) # Preferences def set_units(self, system): "Change the display units." self.conversions = unit_adjustments(system) def set_step_of_grid(self, system): "Change the step of grid." self.skyview.step_of_grid = system # I/O monitoring and gtk housekeeping def watch(self, daem, dev): "Set up monitoring of a daemon instance." self.daemon = daem self.device = dev GObject.io_add_watch(daemon.sock, GObject.IO_IN, self.handle_response) GObject.io_add_watch(daemon.sock, GObject.IO_ERR, self.handle_hangup) GObject.io_add_watch(daemon.sock, GObject.IO_HUP, self.handle_hangup) def handle_response(self, source, condition): "Handle ordinary I/O ready condition from the daemon." if self.daemon.read() == -1: self.handle_hangup(source, condition) if self.daemon.valid & gps.PACKET_SET: if ((self.device and "device" in self.daemon.data and self.device != self.daemon.data["device"])): return True self.rawdisplay.set_text(self.daemon.response.strip()) if self.daemon.data["class"] == "VERSION": self.update_version(self.daemon.version) elif self.daemon.data["class"] == "SKY": self.update_skyview(self.daemon.data) elif self.daemon.data["class"] == "TPV": self.update_gpsdata(self.daemon.data) elif self.daemon.data["class"] == "GST": self.noiseview.update(self.daemon.data) if not self.noise_latch: self.noise_latch = True self.uimanager.get_widget( '/MenuBar/View/Noise').set_active(True) self.noisebox.show() elif self.daemon.data["class"] == "AIS": self.aisview.update(self.daemon.data) if not self.ais_latch: self.ais_latch = True self.uimanager.get_widget( '/MenuBar/View/AIS').set_active(True) self.aisbox.show() return True def handle_hangup(self, _source, _condition): "Handle hangup condition from the daemon." win = Gtk.MessageDialog(parent=self.window, type=Gtk.MessageType.ERROR, flags=Gtk.DialogFlags.DESTROY_WITH_PARENT, buttons=Gtk.ButtonsType.CANCEL) win.connect("destroy", lambda _unused: Gtk.main_quit()) win.set_markup("gpsd has stopped sending data.") win.run() Gtk.main_quit() return True def main(self): "The main routine" Gtk.main() if __name__ == "__main__": try: import getopt (options, arguments) = getopt.getopt(sys.argv[1:], "D:hl:u:r:V?", ['verbose']) debug = 0 degreefmt = 'd' unit_system = None rotate = 0.0 for (opt, val) in options: if opt in '-D': debug = int(val) elif opt == '-l': degreeformat = val elif opt == '-u': unit_system = val elif opt == '-r': try: rotate = float(val) except ValueError: rotate = 0.0 elif opt in ('-?', '-h', '--help'): print(__doc__) sys.exit(0) elif opt == '-V': sys.stderr.write("xgps: Version %s\n" % gps_version) sys.exit(0) degreefmt = {'d': gps.clienthelpers.deg_dd, 'm': gps.clienthelpers.deg_ddmm, 's': gps.clienthelpers.deg_ddmmss}[degreefmt] (host, port, device) = ("localhost", gps.GPSD_PORT, None) if arguments: args = arguments[0].split(":") if len(args) >= 1 and args[0]: host = args[0] if len(args) >= 2 and args[1]: port = args[1] if len(args) >= 3: device = args[2] target = ":".join(arguments[0:]) else: target = "" if 'DISPLAY' not in os.environ: sys.stderr.write("xgps: ERROR: DISPLAY not set\n") exit(1) base = Base(deg_type=degreefmt, rotation=rotate, title=target) base.set_units(unit_system) try: sys.stderr.write("xgps: host %s port %s\n" % (host, port)) daemon = gps.gps(host=host, port=port, mode=(gps.WATCH_ENABLE | gps.WATCH_JSON | gps.WATCH_SCALED), verbose=debug) base.watch(daemon, device) base.main() except socket.error: w = Gtk.MessageDialog(parent=base.window, type=Gtk.MessageType.ERROR, flags=Gtk.DialogFlags.DESTROY_WITH_PARENT, buttons=Gtk.ButtonsType.CANCEL) w.set_markup("gpsd is not running.") w.run() w.destroy() except KeyboardInterrupt: pass