summaryrefslogtreecommitdiff
path: root/test/opp-client
blob: 4f00a41c0129ea3f1448440ef1b1bf0df6be2cc6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/python
# SPDX-License-Identifier: LGPL-2.1-or-later

from __future__ import absolute_import, print_function, unicode_literals

from optparse import OptionParser
import os.path
import sys
import dbus
import dbus.mainloop.glib
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject

BUS_NAME='org.bluez.obex'
PATH = '/org/bluez/obex'
CLIENT_INTERFACE='org.bluez.obex.Client1'
SESSION_INTERFACE='org.bluez.obex.Session1'
OBJECT_PUSH_INTERFACE='org.bluez.obex.ObjectPush1'
TRANSFER_INTERFACE='org.bluez.obex.Transfer1'

def parse_options():
	parser.add_option("-d", "--device", dest="device",
			help="Device to connect", metavar="DEVICE")
	parser.add_option("-p", "--pull", dest="pull_to_file",
			help="Pull vcard and store in FILE", metavar="FILE")
	parser.add_option("-s", "--send", dest="send_file",
			help="Send FILE", metavar="FILE")
	parser.add_option("-v", "--verbose", action="store_true",
			dest="verbose")

	return parser.parse_args()

class OppClient:
	def __init__(self, session_path, verbose=False):
		self.transferred = 0
		self.transfer_path = None
		self.verbose = verbose
		bus = dbus.SessionBus()
		obj = bus.get_object(BUS_NAME, session_path)
		self.session = dbus.Interface(obj, SESSION_INTERFACE)
		self.opp = dbus.Interface(obj, OBJECT_PUSH_INTERFACE)
		bus.add_signal_receiver(self.properties_changed,
			dbus_interface="org.freedesktop.DBus.Properties",
			signal_name="PropertiesChanged",
			path_keyword="path")

	def create_transfer_reply(self, path, properties):
		self.transfer_path = path
		self.transfer_size = properties["Size"]
		if self.verbose:
			print("Transfer created: %s" % path)

	def error(self, err):
		print(err)
		mainloop.quit()

	def properties_changed(self, interface, properties, invalidated, path):
		if path != self.transfer_path:
			return

		if "Status" in properties and \
				(properties["Status"] == "complete" or \
				properties["Status"] == "error"):
			if self.verbose:
				print("Transfer %s" % properties["Status"])
			mainloop.quit()
			return

		if "Transferred" not in properties:
			return

		value = properties["Transferred"]
		speed = (value - self.transferred) / 1000
		print("Transfer progress %d/%d at %d kBps" % (value,
							self.transfer_size,
							speed))
		self.transferred = value

	def pull_business_card(self, filename):
		self.opp.PullBusinessCard(os.path.abspath(filename),
				reply_handler=self.create_transfer_reply,
				error_handler=self.error)

	def send_file(self, filename):
		self.opp.SendFile(os.path.abspath(filename),
				reply_handler=self.create_transfer_reply,
				error_handler=self.error)

if  __name__ == '__main__':

	dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

	parser = OptionParser()

	(options, args) = parse_options()

	if not options.device:
		parser.print_help()
		sys.exit(0)

	bus = dbus.SessionBus()
	mainloop = GObject.MainLoop()

	client = dbus.Interface(bus.get_object(BUS_NAME, PATH),
							CLIENT_INTERFACE)

	print("Creating Session")
	path = client.CreateSession(options.device, { "Target": "OPP" })

	opp_client = OppClient(path, options.verbose)

	if options.pull_to_file:
		opp_client.pull_business_card(options.pull_to_file)

	if options.send_file:
		opp_client.send_file(options.send_file)

	mainloop.run()