#!/usr/bin/python import gobject import sys import dbus import dbus.service import dbus.mainloop.glib class Agent(dbus.service.Object): def __init__(self, conn=None, obj_path=None): dbus.service.Object.__init__(self, conn, obj_path) self.pending_auth = False @dbus.service.method("org.bluez.obex.Agent", in_signature="osssii", out_signature="s") def Authorize(self, dpath, device, filename, ftype, length, time): global transfers self.pending_auth = True print "Authorize (%s, %s, %s) Y/n" % (path, device, filename) auth = raw_input().strip("\n ") if auth == "n" or auth == "N": self.pending_auth = False raise dbus.DBusException( "org.bluez.obex.Error.Rejected: " "Not Autorized") print "Full filename (including path):" self.pending_auth = False transfers.append(Transfer(dpath, filename, 0, length)) return raw_input().strip("\n ") @dbus.service.method("org.bluez.obex.Agent", in_signature="", out_signature="") def Cancel(self): print "Authorization Canceled" self.pending_auth = False class Transfer(object): def __init__(self, dpath, filename=None, transfered=-1, size=-1): self.dpath = dpath self.filename = filename self.transfered = transfered self.size = size def update(self, filename=None, transfered=-1, total=-1): if filename: self.filename = filename self.transfered = transfered self.size = total def cancel(self): transfer_iface = dbus.Interface(bus.get_object( "org.bluez.obex", self.dpath), "org.bluez.obex.Transfer") transfer_iface.Cancel() def __str__(self): p = float(self.transfered) / float(self.size) * 100 return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p) __repr__ = __str__ if __name__ == '__main__': global transfers def new_transfer(dpath): print "new transfer" bus.add_signal_receiver(progress, dbus_interface="org.bluez.obex.Transfer", signal_name="Progress", path_keyword="dpath") def transfer_completed(dpath, success): global transfers print "\ntransfer completed => %s" % (success and "Success" or "Fail") transfers = [t for t in transfers if t.dpath != dpath] def progress(total, current, dpath): s = [] for t in transfers: if t.dpath == dpath: t.update(None, current, total) s.append("%s" % (t)) sys.stdout.write(" ".join(s) + "\r") sys.stdout.flush() transfers = [] dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() manager = dbus.Interface(bus.get_object("org.bluez.obex", "/"), "org.bluez.obex.Manager") bus.add_signal_receiver(new_transfer, dbus_interface="org.bluez.obex.Manager", signal_name="TransferStarted") bus.add_signal_receiver(transfer_completed, dbus_interface="org.bluez.obex.Manager", signal_name="TransferCompleted") path = "/test/agent" agent = Agent(bus, path) mainloop = gobject.MainLoop() manager.RegisterAgent(path) print "Agent registered" cont = True while cont: try: mainloop.run() except KeyboardInterrupt: if agent.pending_auth: agent.Cancel() elif len(transfers) > 0: for a in transfers: a.cancel() else: cont = False # manager.UnregisterAgent(path) # print "Agent unregistered"