summaryrefslogtreecommitdiff
path: root/test/pbap-client
diff options
context:
space:
mode:
authorMarcel Holtmann <marcel@holtmann.org>2012-12-10 23:01:18 +0100
committerMarcel Holtmann <marcel@holtmann.org>2012-12-10 23:01:18 +0100
commitf37facb3bd7c53d3b7629cb53b22a05a952bfa27 (patch)
tree8ae884f60c8a9b788353d9049de23084782ccc95 /test/pbap-client
parent7ca701a6120778fb65905f02305963b79ff6d8de (diff)
downloadbluez-f37facb3bd7c53d3b7629cb53b22a05a952bfa27.tar.gz
test: Add test scripts from obexd repository
Diffstat (limited to 'test/pbap-client')
-rwxr-xr-xtest/pbap-client157
1 files changed, 157 insertions, 0 deletions
diff --git a/test/pbap-client b/test/pbap-client
new file mode 100755
index 000000000..fbe930c9b
--- /dev/null
+++ b/test/pbap-client
@@ -0,0 +1,157 @@
+#!/usr/bin/python
+
+import gobject
+
+import sys
+import os
+import dbus
+import dbus.service
+import dbus.mainloop.glib
+
+class Transfer:
+ def __init__(self, callback_func):
+ self.callback_func = callback_func
+ self.path = None
+ self.filename = None
+
+class PbapClient:
+ def __init__(self, session_path):
+ self.transfers = 0
+ self.props = dict()
+ self.flush_func = None
+ bus = dbus.SessionBus()
+ obj = bus.get_object("org.bluez.obex.client", session_path)
+ self.session = dbus.Interface(obj, "org.bluez.obex.Session")
+ self.pbap = dbus.Interface(obj,
+ "org.bluez.obex.PhonebookAccess")
+ bus.add_signal_receiver(self.transfer_complete,
+ dbus_interface="org.bluez.obex.Transfer",
+ signal_name="Complete",
+ path_keyword="path")
+ bus.add_signal_receiver(self.transfer_error,
+ dbus_interface="org.bluez.obex.Transfer",
+ signal_name="Error",
+ path_keyword="path")
+
+ def register(self, reply, transfer):
+ (path, properties) = reply
+ transfer.path = path
+ transfer.filename = properties["Filename"]
+ self.props[path] = transfer
+ print "Transfer created: %s (file %s)" % (path,
+ transfer.filename)
+
+ def error(self, err):
+ print err
+ mainloop.quit()
+
+ def transfer_complete(self, path):
+ req = self.props.get(path)
+ if req == None:
+ return
+ self.transfers -= 1
+ print "Transfer %s finished" % path
+ f = open(req.filename, "r")
+ os.remove(req.filename)
+ lines = f.readlines()
+ del self.props[path]
+ req.callback_func(lines)
+
+ if (len(self.props) == 0) and (self.transfers == 0):
+ if self.flush_func != None:
+ f = self.flush_func
+ self.flush_func = None
+ f()
+
+ def transfer_error(self, code, message, path):
+ req = self.props.get(path)
+ if req == None:
+ return
+ print "Transfer finished with error %s: %s" % (code, message)
+ mainloop.quit()
+
+ def pull(self, vcard, params, func):
+ req = Transfer(func)
+ self.pbap.Pull(vcard, "", params,
+ reply_handler=lambda r: self.register(r, req),
+ error_handler=self.error)
+ self.transfers += 1
+
+ def pull_all(self, params, func):
+ req = Transfer(func)
+ self.pbap.PullAll("", params,
+ reply_handler=lambda r: self.register(r, req),
+ error_handler=self.error)
+ self.transfers += 1
+
+ def flush_transfers(self, func):
+ if (len(self.props) == 0) and (self.transfers == 0):
+ return
+ self.flush_func = func
+
+ def interface(self):
+ return self.pbap
+
+if __name__ == '__main__':
+
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+ bus = dbus.SessionBus()
+ mainloop = gobject.MainLoop()
+
+ client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
+ "org.bluez.obex.Client")
+
+ if (len(sys.argv) < 2):
+ print "Usage: %s <device>" % (sys.argv[0])
+ sys.exit(1)
+
+ print "Creating Session"
+ session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
+
+ pbap_client = PbapClient(session_path)
+
+ def process_result(lines, header):
+ if header != None:
+ print header
+ for line in lines:
+ print line,
+ print
+
+ def test_paths(paths):
+ if len(paths) == 0:
+ print
+ print "FINISHED"
+ mainloop.quit()
+ return
+
+ path = paths[0]
+
+ print "\n--- Select Phonebook %s ---\n" % (path)
+ pbap_client.interface().Select("int", path)
+
+ print "\n--- GetSize ---\n"
+ ret = pbap_client.interface().GetSize()
+ print "Size = %d\n" % (ret)
+
+ print "\n--- List vCard ---\n"
+ try:
+ ret = pbap_client.interface().List(dbus.Dictionary())
+ except:
+ ret = []
+
+ params = dbus.Dictionary({ "Format" : "vcard30",
+ "Fields" : [ "VERSION", "FN", "TEL"] })
+ for item in ret:
+ print "%s : %s" % (item[0], item[1])
+ pbap_client.pull(item[0], params,
+ lambda x: process_result(x, None))
+
+ pbap_client.pull_all(params, lambda x: process_result(x,
+ "\n--- PullAll ---\n"))
+
+ pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
+
+ test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])
+
+ mainloop.run()