# -*- coding: utf-8 -*-
from __future__ import print_function
from gi.repository import GObject, Gtk, Gio
from dfeet.uiloader import UILoader
from dfeet.introspection import AddressInfo
from dfeet.wnck_utils import IconTable
class BusNameBox(Gtk.VBox):
"""class to represent a BusName (eg 'org.freedesktop.NetworkManager')"""
def __init__(self, bus_name):
super(BusNameBox, self).__init__(spacing=5, expand=True)
self.__bus_name = bus_name
self.__process_id = 0
self.__command_line = ''
self.__activatable = False
self.__icon_table = IconTable.get_instance()
self.__icon_image = Gtk.Image.new_from_pixbuf(self.__icon_table.default_icon)
self.__hbox = Gtk.HBox(spacing=5, halign=Gtk.Align.START)
self.pack_start(self.__hbox, True, True, 0)
#icon
self.__hbox.pack_start(self.__icon_image, True, True, 0)
#other information
self.__vbox_right = Gtk.VBox(spacing=5, expand=True)
self.__hbox.pack_start(self.__vbox_right, True, True, 0)
#first element
self.__label_bus_name = Gtk.Label()
self.__label_bus_name.set_markup("{0}".format(self.__bus_name))
self.__label_bus_name.set_halign(Gtk.Align.START)
self.__vbox_right.pack_start(self.__label_bus_name, True, True, 0)
#second element
self.__label_info = Gtk.Label()
self.__label_info.set_halign(Gtk.Align.START)
self.__vbox_right.pack_start(self.__label_info, True, True, 0)
#separator for the boxes
self.pack_end(Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL), True, True, 0)
#update widget information
self.__update_widget()
self.show_all()
def __update_widget(self):
"""update the widget with the available information"""
#update the label info
label_info_str = ""
if self.__activatable:
label_info_str += "activatable: yes"
else:
label_info_str += "activatable: no"
if self.__process_id:
label_info_str += ", pid: {0}".format(self.__process_id)
#get the icon (if available)
if self.__process_id in self.__icon_table.app_map.keys():
self.__icon_image.set_from_pixbuf(self.__icon_table.app_map[self.__process_id])
else:
self.__icon_image.set_from_pixbuf(self.__icon_table.default_icon)
if self.__command_line:
label_info_str += ", cmd: {0}".format(self.__command_line)
label_info_str += ""
self.__label_info.set_markup(label_info_str)
def __repr__(self):
return "%s (pid: %s)" % (self.__bus_name, self.__process_id)
def __update_command_line(self):
"""get the command line of process-id is available"""
if self.__process_id > 0:
procpath = '/proc/' + str(self.__process_id) + '/cmdline'
with open(procpath, 'r') as f:
self.__command_line = " ".join(f.readline().split('\0'))
else:
self.__command_line = ''
@property
def bus_name(self):
return self.__bus_name
@property
def activatable(self):
return self.__activatable
@activatable.setter
def activatable(self, act_new):
self.__activatable = act_new
#update the shown widget
self.__update_widget()
@property
def process_id(self):
return self.__process_id
@process_id.setter
def process_id(self, process_id_new):
self.__process_id = process_id_new
try:
self.__update_command_line()
except:
self.__command_line = ''
#update the shown widget
self.__update_widget()
class BusWatch(object):
"""watch for a given bus"""
def __init__(self, data_dir, bus_address):
self.__data_dir = data_dir
self.__bus_address = bus_address
#setup UI
ui = UILoader(self.__data_dir, UILoader.UI_BUS)
self.__box_bus = ui.get_root_widget()
self.__scrolledwindow_listbox = ui.get_widget("scrolledwindow_listbox")
self.__bus_name_filter = ui.get_widget('entry_filter')
#create a listbox for all the busnames
self.__listbox = Gtk.ListBox(hexpand=True, vexpand=True, expand=True)
self.__listbox.set_sort_func(self.__listbox_sort_by_name, None)
self.__listbox.set_filter_func(self.__listbox_filter_by_name, None)
self.__scrolledwindow_listbox.add(self.__listbox)
self.__scrolledwindow_listbox.show_all()
#setup the bus connection
if self.__bus_address == Gio.BusType.SYSTEM or self.__bus_address == Gio.BusType.SESSION:
#TODO: do this async
self.connection = Gio.bus_get_sync(self.__bus_address, None)
elif Gio.dbus_is_supported_address(self.__bus_address):
#TODO: do this async
self.connection = Gio.DBusConnection.new_for_address_sync(
self.__bus_address,
Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT |
Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION,
None, None)
else:
raise ValueError("Invalid bus address '{0}'".format(self.__bus_address))
#setup signals
self.connection.signal_subscribe(None, "org.freedesktop.DBus", "NameOwnerChanged",
None, None, 0, self.__name_owner_changed_cb, None)
#refilter if someone wants to filter the busbox list
self.__bus_name_filter.connect("changed",
self.__bus_name_filter_changed_cb)
#change bus detail tree if a different bus is selected
self.__listbox.connect("row-selected",
self.__listbox_row_selected_cb)
#TODO: do this async
self.bus_proxy = Gio.DBusProxy.new_sync(self.connection,
Gio.DBusProxyFlags.NONE,
None,
'org.freedesktop.DBus',
'/org/freedesktop/DBus',
'org.freedesktop.DBus', None)
#get a list with activatable names
self.bus_proxy.ListActivatableNames('()',
result_handler=self.__list_act_names_handler,
error_handler=self.__list_act_names_error_handler)
#list all names
self.bus_proxy.ListNames('()',
result_handler=self.__list_names_handler,
error_handler=self.__list_names_error_handler)
@property
def box_bus(self):
"""the main widget for the bus"""
return self.__box_bus
def __bus_name_filter_changed_cb(self, bus_name_filter):
"""someone typed something in the searchbox - refilter"""
self.__listbox.invalidate_filter()
def __listbox_row_selected_cb(self, listbox, listbox_row):
"""someone selected a different row of the listbox"""
childs = self.box_bus.get_children()
#never remove first element - that's the listbox with the busnames
if len(childs) > 1:
self.box_bus.remove(childs[-1])
try:
del(self.__addr_info)
except:
pass
#get the selected busname
if listbox_row:
row_childs = listbox_row.get_children()
bus_name_box = row_childs[0]
#add the introspection info to the left side
self.__addr_info = AddressInfo(
self.__data_dir, self.__bus_address, bus_name_box.bus_name, connection_is_bus=True)
self.box_bus.pack_end(self.__addr_info.introspect_box, True, True, 0)
self.box_bus.show_all()
def __name_owner_changed_cb(self, connection, sender_name,
object_path, interface_name, signal_name,
parameters, user_data):
"""bus name added or removed"""
bus_name = parameters[0]
old_owner = parameters[1]
new_owner = parameters[2]
if bus_name[0] == ':':
if not old_owner:
bus_name_box = BusNameBox(bus_name)
self.__listbox_add_bus_name(bus_name_box)
else:
self.__listbox_remove_bus_name(bus_name)
else:
if new_owner:
bus_name_box = BusNameBox(bus_name)
self.__listbox_add_bus_name(bus_name_box)
if old_owner:
self.__listbox_remove_bus_name(bus_name)
def __listbox_find_bus_name(self, bus_name):
"""find the given busname in the listbox or return None if not found"""
for listbox_child in self.__listbox.get_children():
if listbox_child.get_children()[0].bus_name == bus_name:
return listbox_child
#busname not found
return None
def __listbox_remove_bus_name(self, bus_name):
"""remove the given busname from the listbox"""
obj = self.__listbox_find_bus_name(bus_name)
if obj:
self.__listbox.remove(obj)
#if bus is activatable, add the bus name again
if bus_name in self.__activatable_names:
bnb = BusNameBox(bus_name)
self.__listbox_add_bus_name(bnb)
else:
print("can not remove busname '{0}'. busname not found".format(bus_name))
def __listbox_add_bus_name(self, bus_name_box):
"""add the given busnamebox to the listbox and update the info"""
#first check if busname is already listed
#ie an activatable (but inactive) busname
bn = self.__listbox_find_bus_name(bus_name_box.bus_name)
if bn:
#bus name is already in the list - use this
bus_name_box = bn.get_children()[0]
else:
#add busnamebox to the list
self.__listbox.add(bus_name_box)
#update bus info stuff
self.bus_proxy.GetConnectionUnixProcessID(
'(s)', bus_name_box.bus_name,
result_handler=self.__get_unix_process_id_cb,
error_handler=self.__get_unix_process_id_error_cb,
user_data=bus_name_box)
#check if bus name is dbus activatable
if bus_name_box.bus_name in self.__activatable_names:
bus_name_box.activatable = True
else:
bus_name_box.activatable = False
def __list_names_handler(self, obj, names, userdata):
for n in names:
bus_name_box = BusNameBox(n)
self.__listbox_add_bus_name(bus_name_box)
def __list_names_error_handler(self, obj, error, userdata):
print("error getting bus names: %s" % str(error))
def __list_act_names_handler(self, obj, act_names, userdata):
#remember the activatable bus names
self.__activatable_names = act_names
#add all activatable bus names to the list
for name in act_names:
bnb = BusNameBox(name)
self.__listbox_add_bus_name(bnb)
def __list_act_names_error_handler(self, obj, error, userdata):
self.__activatable_names = []
print("error getting activatable names: %s" % str(error))
def __get_unix_process_id_cb(self, obj, pid, bus_name_box):
bus_name_box.process_id = pid
def __get_unix_process_id_error_cb(self, obj, error, bus_name_box):
#print("error getting unix process id for %s: %s" % (
# bus_name_box.bus_name, str(error)))
bus_name_box.process_id = 0
def __listbox_filter_by_name(self, row, user_data):
bus_name_box_list = row.get_children()
return self.__bus_name_filter.get_text().lower() in bus_name_box_list[0].bus_name.lower()
def __listbox_sort_by_name(self, row1, row2, user_data):
"""sort function for listbox"""
child1 = row1.get_children()
child2 = row2.get_children()
un1 = child1[0].bus_name
un2 = child2[0].bus_name
# covert to integers if comparing two unique names
if un1[0] == ':' and un2[0] == ':':
un1 = un1[1:].split('.')
un1 = tuple(map(int, un1))
un2 = un2[1:].split('.')
un2 = tuple(map(int, un2))
elif un1[0] == ':' and un2[0] != ':':
return 1
elif un1[0] != ':' and un2[0] == ':':
return -1
else:
un1 = un1.split('.')
un2 = un2.split('.')
if un1 == un2:
return 0
elif un1 > un2:
return 1
else:
return -1
if __name__ == "__main__":
"""for debugging"""
import sys
import argparse
parser = argparse.ArgumentParser(description='show a given bus address')
parser.add_argument('addr')
p = parser.parse_args()
if p.addr.lower() == 'system':
addr = Gio.BusType.SYSTEM
elif p.addr.lower() == 'session':
addr = Gio.BusType.SESSION
else:
addr = p.addr
bw = BusWatch(addr)
win = Gtk.Window()
win.connect("delete-event", Gtk.main_quit)
win.set_default_size(1024, 768)
win.add(bw.paned_buswatch)
win.show_all()
try:
Gtk.main()
except (KeyboardInterrupt, SystemExit):
Gtk.main_quit()