#!/usr/bin/python import gobject import sys import os import dbus import dbus.service import dbus.mainloop.glib class Transfer: def __init__(self, callback_func): self.callback_func = callback_func self.path = None self.filename = None class PbapClient: def __init__(self, session_path): self.transfers = 0 self.props = dict() self.flush_func = None bus = dbus.SessionBus() obj = bus.get_object("org.bluez.obex.client", session_path) self.session = dbus.Interface(obj, "org.bluez.obex.Session") self.pbap = dbus.Interface(obj, "org.bluez.obex.PhonebookAccess") bus.add_signal_receiver(self.transfer_complete, dbus_interface="org.bluez.obex.Transfer", signal_name="Complete", path_keyword="path") bus.add_signal_receiver(self.transfer_error, dbus_interface="org.bluez.obex.Transfer", signal_name="Error", path_keyword="path") def register(self, reply, transfer): (path, properties) = reply transfer.path = path transfer.filename = properties["Filename"] self.props[path] = transfer print "Transfer created: %s (file %s)" % (path, transfer.filename) def error(self, err): print err mainloop.quit() def transfer_complete(self, path): req = self.props.get(path) if req == None: return self.transfers -= 1 print "Transfer %s finished" % path f = open(req.filename, "r") os.remove(req.filename) lines = f.readlines() del self.props[path] req.callback_func(lines) if (len(self.props) == 0) and (self.transfers == 0): if self.flush_func != None: f = self.flush_func self.flush_func = None f() def transfer_error(self, code, message, path): req = self.props.get(path) if req == None: return print "Transfer finished with error %s: %s" % (code, message) mainloop.quit() def pull(self, vcard, func): req = Transfer(func) self.pbap.Pull(vcard, "", reply_handler=lambda r: self.register(r, req), error_handler=self.error) self.transfers += 1 def pull_all(self, func): req = Transfer(func) self.pbap.PullAll("", reply_handler=lambda r: self.register(r, req), error_handler=self.error) self.transfers += 1 def flush_transfers(self, func): if (len(self.props) == 0) and (self.transfers == 0): return self.flush_func = func def interface(self): return self.pbap if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() mainloop = gobject.MainLoop() client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"), "org.bluez.obex.Client") if (len(sys.argv) < 2): print "Usage: %s " % (sys.argv[0]) sys.exit(1) print "Creating Session" session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" }) pbap_client = PbapClient(session_path) def process_result(lines, header): if header != None: print header for line in lines: print line, print def test_paths(paths): if len(paths) == 0: print print "FINISHED" mainloop.quit() return path = paths[0] print "\n--- Select Phonebook %s ---\n" % (path) pbap_client.interface().Select("int", path) print "\n--- GetSize ---\n" ret = pbap_client.interface().GetSize() print "Size = %d\n" % (ret) print "\n--- List vCard ---\n" ret = pbap_client.interface().List() for item in ret: print "%s : %s" % (item[0], item[1]) pbap_client.interface().SetFormat("vcard30") pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]); pbap_client.pull(item[0], lambda x: process_result(x, None)) pbap_client.interface().SetFormat("vcard30") pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]); pbap_client.pull_all(lambda x: process_result(x, "\n--- PullAll ---\n")) pbap_client.flush_transfers(lambda: test_paths(paths[1:])) test_paths(["PB", "ICH", "OCH", "MCH", "CCH"]) mainloop.run()