summaryrefslogtreecommitdiff
path: root/test/opp-client
blob: 3d23dfbc448af89b5de31c0fca50db5005097fc9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/python

import sys
import dbus
import gobject
import dbus.mainloop.glib
import os.path
from optparse import OptionParser

def parse_options():
	parser.add_option("-d", "--device", dest="device",
			help="Device to connect", metavar="DEVICE")
	parser.add_option("-p", "--pull", dest="pull_to_file",
			help="Pull vcard and store in FILE", metavar="FILE")
	parser.add_option("-s", "--send", dest="send_file",
			help="Send FILE", metavar="FILE")
	parser.add_option("-v", "--verbose", action="store_true",
			dest="verbose")

	return parser.parse_args()

class OppClient:
	def __init__(self, session_path, verbose=False):
		self.progress = 0
		self.transfer_path = None
		self.verbose = verbose
		bus = dbus.SessionBus()
		obj = bus.get_object("org.bluez.obex.client", session_path)
		self.session = dbus.Interface(obj, "org.bluez.obex.Session")
		self.opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush")
		bus.add_signal_receiver(self.transfer_complete,
				dbus_interface="org.bluez.obex.Transfer",
				signal_name="Complete",
				path_keyword="path")
		bus.add_signal_receiver(self.transfer_error,
				dbus_interface="org.bluez.obex.Transfer",
				signal_name="Error",
				path_keyword="path")
		if self.verbose:
			bus.add_signal_receiver(self.transfer_progress,
				dbus_interface="org.bluez.obex.Transfer",
				signal_name="PropertyChanged",
				path_keyword="path")

	def create_transfer_reply(self, reply):
		(path, properties) = reply
		self.transfer_path = path
		self.transfer_size = properties["Size"]
		if self.verbose:
			print "Transfer created: %s" % path

	def error(self, err):
		print err
		mainloop.quit()

	def transfer_complete(self, path):
		if path != self.transfer_path:
			return
		if self.verbose:
			print "Transfer finished"
		mainloop.quit()

	def transfer_error(self, code, message, path):
		if path != self.transfer_path:
			return
		print "Transfer finished with error %s: %s" % (code, message)
		mainloop.quit()

	def transfer_progress(self, prop, value, path):
		if path != self.transfer_path:
			return

		if prop != "Progress":
			return

		speed = (value - self.progress) / 1000
		print "Transfer progress %d/%d at %d kBps" % (value,
							self.transfer_size,
							speed)
		self.progress = value

	def pull_business_card(self, filename):
		self.opp.PullBusinessCard(os.path.abspath(filename),
				reply_handler=self.create_transfer_reply,
				error_handler=self.error)

	def send_file(self, filename):
		self.opp.SendFile(os.path.abspath(filename),
				reply_handler=self.create_transfer_reply,
				error_handler=self.error)

if  __name__ == '__main__':

	dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

	parser = OptionParser()

	(options, args) = parse_options()

	if not options.device:
		parser.print_help()
		sys.exit(0)

	bus = dbus.SessionBus()
	mainloop = gobject.MainLoop()

	client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
				"org.bluez.obex.Client")

	print "Creating Session"
	path = client.CreateSession(options.device, { "Target": "OPP" })

	opp_client = OppClient(path, options.verbose)

	if options.pull_to_file:
		opp_client.pull_business_card(options.pull_to_file)

	if options.send_file:
		opp_client.send_file(options.send_file)

	mainloop.run()