summaryrefslogtreecommitdiff
path: root/src/dfeet/executemethoddialog.py
blob: aa80da8fee6c7cb5e9853687c3df91caafc0f526 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# -*- coding: utf-8 -*-
import time
from pprint import pformat
from gi.repository import GLib, Gio, Gtk

from dfeet.uiloader import UILoader


class ExecuteMethodDialog:
    def __init__(self, data_dir, connection, connection_is_bus, bus_name, method_obj, parent_window):
        signal_dict = {
            'execute_dbus_method_cb': self.execute_cb,
            'execute_dialog_close_cb': self.close_cb
            }

        self.connection = connection
        self.connection_is_bus = connection_is_bus
        self.bus_name = bus_name
        self.method_obj = method_obj

        ui = UILoader(data_dir, UILoader.UI_EXECUTEDIALOG)
        self.dialog = ui.get_root_widget()
        self.dialog.set_transient_for(parent_window)
        self.label_method_name = ui.get_widget('label_method_name')
        self.label_object_path = ui.get_widget('label_object_path')
        self.label_interface = ui.get_widget('label_interface')
        self.notebook = ui.get_widget('notebook1')
        self.parameter_textview = ui.get_widget('parametertextview1')
        self.source_textview = ui.get_widget('sourcetextview1')
        self.prettyprint_textview = ui.get_widget('prettyprinttextview1')
        self.method_execution_count_spinbutton = ui.get_widget('method_exec_count_spinbutton')
        self.label_avg = ui.get_widget('label_avg')
        self.label_min = ui.get_widget('label_min')
        self.label_max = ui.get_widget('label_max')
        ui.connect_signals(signal_dict)
        self.label_method_name.set_markup("%s" % (self.method_obj.markup_str))
        self.label_object_path.set_markup("%s" % (self.method_obj.object_path))
        self.label_interface.set_markup("%s" % (self.method_obj.iface_info.name))

    def execute_cb(self, widget):
        # get given parameters
        buf = self.parameter_textview.get_buffer()
        params = buf.get_text(buf.get_start_iter(),
                              buf.get_end_iter(), False)

        # reset the statistics stuff
        self.label_avg.set_text("")
        self.label_min.set_text("")
        self.label_max.set_text("")
        user_data = {
            'avg': 0,
            'count': 0,
            }

        try:
            # build a GVariant
            if params:
                params = "(" + params + ",)"
                params_code = '(' + self.method_obj.in_args_code + ')'
                params_gvariant = GLib.Variant(params_code, eval(params))
            else:
                params_gvariant = None

            if self.connection_is_bus:
                proxy = Gio.DBusProxy.new_sync(self.connection,
                                               Gio.DBusProxyFlags.NONE,
                                               None,
                                               self.bus_name,
                                               self.method_obj.object_path,
                                               self.method_obj.iface_info.name,
                                               None)
                # call the function
                for i in range(0, self.method_execution_count_spinbutton.get_value_as_int()):
                    user_data['method_call_time_start'] = time.time()
                    proxy.call(
                        self.method_obj.method_info.name, params_gvariant,
                        Gio.DBusCallFlags.NONE, -1, None, self.method_connection_bus_cb, user_data)
            else:
                # FIXME: implement p2p connection execution
                raise Exception("Function execution on p2p connections not yet implemented")
                # self.connection.call(
                # None, object_path, self.method_obj.iface_obj.iface_info.name,
                # self.method_obj.method_info.name, params_gvariant,
                # GLib.VariantType.new("(s)"), Gio.DBusCallFlags.NONE, -1, None)

        except Exception as e:
            # output the exception
            self.source_textview.get_buffer().set_text(str(e))
            self.prettyprint_textview.get_buffer().set_text(pformat(str(e)))

    def method_connection_bus_cb(self, proxy, res_async, user_data):
        """async callback for executed method"""
        try:
            # get the result from the dbus method call
            result = proxy.call_finish(res_async)
            # remember the needed time for the method call
            method_call_time_end = time.time()
            method_call_time_needed = method_call_time_end - user_data['method_call_time_start']

            # update avg, min, max
            user_data['avg'] += method_call_time_needed
            user_data['count'] += 1
            self.label_avg.set_text("%.4f" % (float(user_data['avg'] / user_data['count'])))
            self.label_min.set_text(
                "%.4f" % min(float(self.label_min.get_text() or "999"), method_call_time_needed))
            self.label_max.set_text(
                "%.4f" % max(float(self.label_max.get_text() or "0"), method_call_time_needed))

            # output result
            if result:
                self.source_textview.get_buffer().set_text(str(result))
                lines = [pformat(x) for x in result.unpack()]
                self.prettyprint_textview.get_buffer().set_text(',\n'.join(lines))
            else:
                self.prettyprint_textview.get_buffer().set_text(
                    'This method did not return anything')
        except Exception as e:
            # output the exception
            self.source_textview.get_buffer().set_text(str(e))
            self.prettyprint_textview.get_buffer().set_text(pformat(str(e)))

    def run(self):
        response = self.dialog.run()
        if response == Gtk.ResponseType.DELETE_EVENT or response == Gtk.ResponseType.CLOSE:
            self.dialog.destroy()

    def close_cb(self, widget):
        self.dialog.destroy()