#!/usr/bin/python # SPDX-License-Identifier: LGPL-2.1-or-later from __future__ import absolute_import, print_function, unicode_literals import sys import dbus import dbus.service import dbus.mainloop.glib try: from gi.repository import GObject except ImportError: import gobject as GObject BUS_NAME = 'org.bluez.obex' PATH = '/org/bluez/obex' AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1' AGENT_INTERFACE = 'org.bluez.obex.Agent1' TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1' def ask(prompt): try: return raw_input(prompt) except: return input(prompt) class Agent(dbus.service.Object): def __init__(self, conn=None, obj_path=None): dbus.service.Object.__init__(self, conn, obj_path) self.pending_auth = False @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") def AuthorizePush(self, path): transfer = dbus.Interface(bus.get_object(BUS_NAME, path), 'org.freedesktop.DBus.Properties') properties = transfer.GetAll(TRANSFER_INTERFACE); self.pending_auth = True auth = ask("Authorize (%s, %s) (Y/n):" % (path, properties['Name'])) if auth == "n" or auth == "N": self.pending_auth = False raise dbus.DBusException( "org.bluez.obex.Error.Rejected: " "Not Authorized") self.pending_auth = False return properties['Name'] @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Cancel(self): print("Authorization Canceled") self.pending_auth = False if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() manager = dbus.Interface(bus.get_object(BUS_NAME, PATH), AGENT_MANAGER_INTERFACE) path = "/test/agent" agent = Agent(bus, path) mainloop = GObject.MainLoop() manager.RegisterAgent(path) print("Agent registered") cont = True while cont: try: mainloop.run() except KeyboardInterrupt: if agent.pending_auth: agent.Cancel() elif len(transfers) > 0: for a in transfers: a.cancel() else: cont = False # manager.UnregisterAgent(path) # print "Agent unregistered"