#!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-2.1-or-later import dbus import dbus.exceptions import dbus.mainloop.glib import dbus.service try: from gi.repository import GObject except ImportError: import gobject as GObject import sys mainloop = None app = None bus = None BLUEZ_SERVICE_NAME = 'org.bluez' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' BATTERY_PROVIDER_MANAGER_IFACE = 'org.bluez.BatteryProviderManager1' BATTERY_PROVIDER_IFACE = 'org.bluez.BatteryProvider1' BATTERY_PROVIDER_PATH = '/path/to/provider' BATTERY_PATH1 = '11_11_11_11_11_11' BATTERY_PATH2 = '22_22_22_22_22_22' BATTERY_PATH3 = '33_33_33_33_33_33' class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class Application(dbus.service.Object): def __init__(self, bus): self.path = BATTERY_PROVIDER_PATH self.services = [] self.batteries = [] dbus.service.Object.__init__(self, bus, self.path) def get_path(self): return dbus.ObjectPath(self.path) def add_battery(self, battery): self.batteries.append(battery) self.InterfacesAdded(battery.get_path(), battery.get_properties()) GObject.timeout_add(1000, drain_battery, battery) def remove_battery(self, battery): self.batteries.remove(battery) self.InterfacesRemoved(battery.get_path(), [BATTERY_PROVIDER_IFACE]) @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): response = {} print('GetManagedObjects called') for battery in self.batteries: response[battery.get_path()] = battery.get_properties() return response @dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}') def InterfacesAdded(self, object_path, interfaces_and_properties): return @dbus.service.signal(DBUS_OM_IFACE, signature='oas') def InterfacesRemoved(self, object_path, interfaces): return class Battery(dbus.service.Object): """ org.bluez.BatteryProvider1 interface implementation """ def __init__(self, bus, dev, percentage, source = None): self.path = BATTERY_PROVIDER_PATH + '/dev_' + dev self.dev_path = '/org/bluez/hci0/dev_' + dev self.bus = bus self.percentage = percentage self.source = source dbus.service.Object.__init__(self, bus, self.path) def get_battery_properties(self): properties = {} if self.percentage != None: properties['Percentage'] = dbus.Byte(self.percentage) if self.source != None: properties['Source'] = self.source properties['Device'] = dbus.ObjectPath(self.dev_path) return properties def get_properties(self): return { BATTERY_PROVIDER_IFACE: self.get_battery_properties() } def get_path(self): return dbus.ObjectPath(self.path) def set_percentage(self, percentage): if percentage < 0 or percentage > 100: print('percentage not valid') return self.percentage = percentage print('battery %s percentage %d' % (self.path, self.percentage)) self.PropertiesChanged( BATTERY_PROVIDER_IFACE, self.get_battery_properties()) @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != BATTERY_PROVIDER_IFACE: raise InvalidArgsException() return self.get_properties()[BATTERY_PROVIDER_IFACE] @dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}') def PropertiesChanged(self, interface, properties): return def add_late_battery(): app.add_battery(Battery(bus, BATTERY_PATH3, 70, 'Protocol 2')) def drain_battery(battery): new_percentage = 100 if battery.percentage != None: new_percentage = battery.percentage - 5 if new_percentage < 0: new_percentage = 0 battery.set_percentage(new_percentage) if new_percentage <= 0: return False return True def register_provider_cb(): print('Battery Provider registered') # Battery added early right after RegisterBatteryProvider succeeds app.add_battery(Battery(bus, BATTERY_PATH2, None)) # Battery added later GObject.timeout_add(5000, add_late_battery) def register_provider_error_cb(error): print('Failed to register Battery Provider: ' + str(error)) mainloop.quit() def find_manager(bus): remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) objects = remote_om.GetManagedObjects() for o, props in objects.items(): if BATTERY_PROVIDER_MANAGER_IFACE in props.keys(): return o return None def unregister_provider_cb(): print('Battery Provider unregistered') def unregister_provider_error_cb(error): print('Failed to unregister Battery Provider: ' + str(error)) def unregister_battery_provider(battery_provider_manager): battery_provider_manager.UnregisterBatteryProvider(BATTERY_PROVIDER_PATH, reply_handler=unregister_provider_cb, error_handler=unregister_provider_error_cb) def remove_battery(app, battery): app.remove_battery(battery) """ Simulates an application registering to BlueZ as a Battery Provider providing fake batteries drained periodically. """ def main(): global mainloop, bus, app dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() manager_path = find_manager(bus) if not manager_path: print('BatteryProviderManager1 interface not found') return print('BatteryProviderManager1 path = ', manager_path) battery_provider_manager = dbus.Interface( bus.get_object(BLUEZ_SERVICE_NAME, manager_path), BATTERY_PROVIDER_MANAGER_IFACE) app = Application(bus) # Battery pre-added before RegisterBatteryProvider battery1 = Battery(bus, BATTERY_PATH1, 87, 'Protocol 1') app.add_battery(battery1) mainloop = GObject.MainLoop() print('Registering Battery Provider...') battery_provider_manager.RegisterBatteryProvider(BATTERY_PROVIDER_PATH, reply_handler=register_provider_cb, error_handler=register_provider_error_cb) # Unregister the Battery Provider after an arbitrary amount of time GObject.timeout_add( 12000, unregister_battery_provider, battery_provider_manager) # Simulate battery removal by a provider GObject.timeout_add(8000, remove_battery, app, battery1) mainloop.run() if __name__ == '__main__': main()