#!/usr/bin/python import sys import dbus import gobject import dbus.mainloop.glib import os.path from optparse import OptionParser def parse_options(): parser.add_option("-d", "--device", dest="device", help="Device to connect", metavar="DEVICE") parser.add_option("-p", "--pull", dest="pull_to_file", help="Pull vcard and store in FILE", metavar="FILE") parser.add_option("-s", "--send", dest="send_file", help="Send FILE", metavar="FILE") parser.add_option("-v", "--verbose", action="store_true", dest="verbose") return parser.parse_args() class OppClient: def __init__(self, session_path, verbose=False): self.progress = 0 self.transfer_path = None self.verbose = verbose bus = dbus.SessionBus() obj = bus.get_object("org.bluez.obex.client", session_path) self.session = dbus.Interface(obj, "org.bluez.obex.Session") self.opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush") bus.add_signal_receiver(self.transfer_complete, dbus_interface="org.bluez.obex.Transfer", signal_name="Complete", path_keyword="path") bus.add_signal_receiver(self.transfer_error, dbus_interface="org.bluez.obex.Transfer", signal_name="Error", path_keyword="path") if self.verbose: bus.add_signal_receiver(self.transfer_progress, dbus_interface="org.bluez.obex.Transfer", signal_name="PropertyChanged", path_keyword="path") def create_transfer_reply(self, reply): (path, properties) = reply self.transfer_path = path self.transfer_size = properties["Size"] if self.verbose: print "Transfer created: %s" % path def error(self, err): print err mainloop.quit() def transfer_complete(self, path): if path != self.transfer_path: return if self.verbose: print "Transfer finished" mainloop.quit() def transfer_error(self, code, message, path): if path != self.transfer_path: return print "Transfer finished with error %s: %s" % (code, message) mainloop.quit() def transfer_progress(self, prop, value, path): if path != self.transfer_path: return if prop != "Progress": return speed = (value - self.progress) / 1000 print "Transfer progress %d/%d at %d kBps" % (value, self.transfer_size, speed) self.progress = value def pull_business_card(self, filename): self.opp.PullBusinessCard(os.path.abspath(filename), reply_handler=self.create_transfer_reply, error_handler=self.error) def send_file(self, filename): self.opp.SendFile(os.path.abspath(filename), reply_handler=self.create_transfer_reply, error_handler=self.error) if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) parser = OptionParser() (options, args) = parse_options() if not options.device: parser.print_help() sys.exit(0) bus = dbus.SessionBus() mainloop = gobject.MainLoop() client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"), "org.bluez.obex.Client") print "Creating Session" path = client.CreateSession(options.device, { "Target": "OPP" }) opp_client = OppClient(path, options.verbose) if options.pull_to_file: opp_client.pull_business_card(options.pull_to_file) if options.send_file: opp_client.send_file(options.send_file) mainloop.run()