#!/usr/bin/python from __future__ import absolute_import, print_function, unicode_literals from gi.repository import GObject import sys import dbus import dbus.service import dbus.mainloop.glib from optparse import OptionParser import bluezutils BUS_NAME = 'org.bluez' AGENT_INTERFACE = 'org.bluez.Agent1' AGENT_PATH = "/test/agent" bus = None device_obj = None dev_path = None def ask(prompt): try: return raw_input(prompt) except: return input(prompt) def set_trusted(path): props = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties") props.Set("org.bluez.Device1", "Trusted", True) def dev_connect(path): dev = dbus.Interface(bus.get_object("org.bluez", path), "org.bluez.Device1") dev.Connect() class Rejected(dbus.DBusException): _dbus_error_name = "org.bluez.Error.Rejected" class Agent(dbus.service.Object): exit_on_release = True def set_exit_on_release(self, exit_on_release): self.exit_on_release = exit_on_release @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Release(self): print("Release") if self.exit_on_release: mainloop.quit() @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def AuthorizeService(self, device, uuid): print("AuthorizeService (%s, %s)" % (device, uuid)) authorize = ask("Authorize connection (yes/no): ") if (authorize == "yes"): return raise Rejected("Connection rejected by user") @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") def RequestPinCode(self, device): print("RequestPinCode (%s)" % (device)) set_trusted(device) return ask("Enter PIN Code: ") @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="u") def RequestPasskey(self, device): print("RequestPasskey (%s)" % (device)) set_trusted(device) passkey = ask("Enter passkey: ") return dbus.UInt32(passkey) @dbus.service.method(AGENT_INTERFACE, in_signature="ouq", out_signature="") def DisplayPasskey(self, device, passkey, entered): print("DisplayPasskey (%s, %06u entered %u)" % (device, passkey, entered)) @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def DisplayPinCode(self, device, pincode): print("DisplayPinCode (%s, %s)" % (device, pincode)) @dbus.service.method(AGENT_INTERFACE, in_signature="ou", out_signature="") def RequestConfirmation(self, device, passkey): print("RequestConfirmation (%s, %06d)" % (device, passkey)) confirm = ask("Confirm passkey (yes/no): ") if (confirm == "yes"): set_trusted(device) return raise Rejected("Passkey doesn't match") @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="") def RequestAuthorization(self, device): print("RequestAuthorization (%s)" % (device)) auth = ask("Authorize? (yes/no): ") if (auth == "yes"): return raise Rejected("Pairing rejected") @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Cancel(self): print("Cancel") def pair_reply(): print("Device paired") set_trusted(dev_path) dev_connect(dev_path) mainloop.quit() def pair_error(error): err_name = error.get_dbus_name() if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj: print("Timed out. Cancelling pairing") device_obj.CancelPairing() else: print("Creating device failed: %s" % (error)) mainloop.quit() if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() capability = "KeyboardDisplay" parser = OptionParser() parser.add_option("-i", "--adapter", action="store", type="string", dest="adapter_pattern", default=None) parser.add_option("-c", "--capability", action="store", type="string", dest="capability") parser.add_option("-t", "--timeout", action="store", type="int", dest="timeout", default=60000) (options, args) = parser.parse_args() if options.capability: capability = options.capability path = "/test/agent" agent = Agent(bus, path) mainloop = GObject.MainLoop() obj = bus.get_object(BUS_NAME, "/org/bluez"); manager = dbus.Interface(obj, "org.bluez.AgentManager1") manager.RegisterAgent(path, capability) print("Agent registered") # Fix-up old style invocation (BlueZ 4) if len(args) > 0 and args[0].startswith("hci"): options.adapter_pattern = args[0] del args[:1] if len(args) > 0: device = bluezutils.find_device(args[0], options.adapter_pattern) dev_path = device.object_path agent.set_exit_on_release(False) device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000) device_obj = device else: manager.RequestDefaultAgent(path) mainloop.run() #adapter.UnregisterAgent(path) #print("Agent unregistered")