summaryrefslogtreecommitdiff
path: root/enhanced-position-service/dbus/test/test-scripts/test-enhanced-position-service-tk-gui.py
blob: 6ef7fc08a1576599bd48a392ab32eea104d7d51b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/python

"""
**************************************************************************
* @licence app begin@
* SPDX-License-Identifier: MPL-2.0
*
* \copyright Copyright (C) 2014, XS Embedded GmbH
*
* \file test-enhanced-position-service.py
*
* \brief This simple test shows how the enhanced-position-service 
*        can be tested using a python script with a TK based GUI
*        which reacts on DBus Signals
*        It is based on test-enhanced-position-service.py from Marco Residori
*        and the multithreaded Tk approach described in
*        http://bytes.com/topic/python/answers/448559-oddities-tkinter
*
* \author Helmut Schmidt <Helmut.3.Schmidt@continental-corporation.com>
*
* \version 1.0
*
* This Source Code Form is subject to the terms of the
* Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with
# this file, You can obtain one at http://mozilla.org/MPL/2.0/.
* List of changes:
* <date>, <name>, <description of change>
*
* @licence end@
**************************************************************************
"""

"""
**************************************************************************
*
* Current restrictions
* Stopping the GUI does not always work as expected
* - when the Exit button is clicked, it sometimes hangs for a while
* - it's not possible to stop the GUI by closing the window
*
* Alternative implementation
* - Poll on the queue instead of waiting for event
*   as described in http://effbot.org/zone/tkinter-threads.htm
*   But apparently this does not improve the issue conerning stopping
*
**************************************************************************
"""

#for dbus access
import dbus
import gobject
import dbus.mainloop.glib

#for the TK based GUI (dbus mainloop runs in background thread)
import threading
import Queue
from Tkinter import *

#constants as defined in the Positioning API
LATITUDE  = 0x00000001
LONGITUDE = 0x00000002
ALTITUDE  = 0x00000004
CLIMB     = 0x00000020
SPEED     = 0x00000010
HEADING   = 0x00000008

if __name__ == '__main__':
#According http://dbus.freedesktop.org/doc/dbus-python/doc/tutorial.html#setting-up-an-event-loop
# the main loop must be setup before connecting to the bus.
#Probably this line could be moved further down
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 


#The following functions depend on global variables which are defined later
#That's ugly, but it works in python
#Probably we can avoid this with using classes in a redesign

#DBus signal receiver
#Sends a custom event to the Tk main loop and puts the data in a queue
def catchall_positioning_signals_handler(changedValues):
    #print ('PositionUpdate')
    #print changedValues
    ## Each time the signal is catched, put the parameter in the queue...
    commQueue.put(changedValues)
    ## ... and generate a custom event on the main window
    try:
        tk_root.event_generate('<<SignalPositionUpdate>>', when='tail')
    ## If it failed, the window has been destoyed: over
    except:
        print ("Cannot send Tk Event, terminating ....");
        stopEvent.set()

# Receive the custom event in the GUI thread:
# Retrieve changed data over dbus and display them  
def signalPositionUpdate(event):
    #print("signalPositionUpdate Event received by GUI thread")
    changedValues = commQueue.get()
    #print changedValues
    position = enhanced_position_interface.GetPositionInfo(changedValues)
    timestamp = position[0]
    #print 'TIMESTAMP:' +str(timestamp)
    data = position[1]
    for key in data:
        if key == LATITUDE:
            labelLat.config(text='LATITUDE:' + str(data[dbus.UInt16(key)]))
        if key == LONGITUDE:
            labelLon.config(text='LONGITUDE:' + str(data[dbus.UInt16(key)]))
        if key == ALTITUDE:
            labelAlt.config(text='ALTITUDE:' + str(data[dbus.UInt16(key)]))
        if key == CLIMB:
            labelClimb.config(text='CLIMB:' + str(data[dbus.UInt16(key)]))
        if key == SPEED:
            labelSpeed.config(text='SPEED:' + str(data[dbus.UInt16(key)]))
        if key == HEADING:
            labelHeading.config(text='HEADING:' + str(data[dbus.UInt16(key)]))
    #It's important to call update_idletasks() to update the GUI
    tk_root.update_idletasks()

#Tasks which have to be done periodically within the dbus thread
def dbus_timeout_periodic():
    #it seems that the dbus thread blocks the Tk GUI thread if we don't call update_idletasks()
    tk_root.update_idletasks()
    #Check whether the termination event is set and terminate dbus thread and Tk GUI
    if stopEvent.is_set():
        print ('stopEvent.is_set()')
        dbus_loop.quit()
        tk_root.quit() # as well call root.destroy() ???
    else:
        gobject.timeout_add(100, dbus_timeout_periodic)

##############Here comes the real main()#################
print ('Enhanced Positioning Test GUI')
print ('== Always use the Exit button to terminate (closing the window may not work) ==')

###Event and Queue for thread synchronization/communication
stopEvent = threading.Event()
commQueue = Queue.Queue()

###Create Tk main window
tk_root = Tk()
tk_root.title("Enhanced Positioning Test GUI")
# Setup the GUI
labelLat = Label(tk_root, width=40, text='LATITUDE: UKNOWN')
labelLat.pack()
labelLon = Label(tk_root, width=40, text='LONGITUDE: UKNOWN')
labelLon.pack()
labelAlt = Label(tk_root, width=40, text='ALTITUDE: UKNOWN')
labelAlt.pack()
labelClimb = Label(tk_root, width=40, text='CLIMB: UKNOWN')
labelClimb .pack()
labelSpeed = Label(tk_root, width=40, text='SPEED: UKNOWN')
labelSpeed.pack()
labelHeading = Label(tk_root, width=40, text='HEADING: UKNOWN')
labelHeading.pack()
bExit = Button(tk_root, text="Exit", command=stopEvent.set)
bExit.pack()
#bind the event to the callback function
tk_root.bind('<<SignalPositionUpdate>>', signalPositionUpdate)

### Setup the DBus
#connect to session bus
bus = dbus.SessionBus()
#some global variables for the dbus used inside the signal handler
enhanced_position = bus.get_object('org.genivi.positioning.EnhancedPosition','/org/genivi/positioning/EnhancedPosition')
enhanced_position_interface = dbus.Interface(enhanced_position, dbus_interface='org.genivi.positioning.EnhancedPosition')
#register the signal handler
bus.add_signal_receiver(catchall_positioning_signals_handler, \
                        dbus_interface = "org.genivi.positioning.EnhancedPosition", \
                        signal_name = "PositionUpdate")
#create the dbus loop (must be global so we can terminate it)
gobject.timeout_add(100, dbus_timeout_periodic)
dbus_loop = gobject.MainLoop()


###Finally: start the dbus thread and then the Tk main loop
dbus_thread=threading.Thread(target=dbus_loop.run)
dbus_thread.start()
tk_root.mainloop()