#!/usr/bin/python # SPDX-License-Identifier: LGPL-2.1-or-later from __future__ import absolute_import, print_function, unicode_literals # -*- coding: utf-8 -*- import sys import dbus import dbus.service from dbus.mainloop.glib import DBusGMainLoop try: from gi.repository import GObject except ImportError: import gobject as GObject BUS_NAME = 'org.bluez' PATH = '/org/bluez' ADAPTER_INTERFACE = 'org.bluez.Adapter1' HEALTH_MANAGER_INTERFACE = 'org.bluez.HealthManager1' HEALTH_DEVICE_INTERFACE = 'org.bluez.HealthDevice1' DBusGMainLoop(set_as_default=True) loop = GObject.MainLoop() bus = dbus.SystemBus() def sig_received(*args, **kwargs): if "member" not in kwargs: return if "path" not in kwargs: return; sig_name = kwargs["member"] path = kwargs["path"] print(sig_name) print(path) if sig_name == "PropertyChanged": k, v = args print(k) print(v) else: ob = args[0] print(ob) def enter_mainloop(): bus.add_signal_receiver(sig_received, bus_name=BUS_NAME, dbus_interface=HEALTH_DEVICE_INTERFACE, path_keyword="path", member_keyword="member", interface_keyword="interface") try: print("Entering main lopp, push Ctrl+C for finish") mainloop = GObject.MainLoop() mainloop.run() except KeyboardInterrupt: pass finally: print("Exiting, bye") hdp_manager = dbus.Interface(bus.get_object(BUS_NAME, PATH), HEALTH_MANAGER_INTERFACE) role = None while role == None: print("Select 1. source or 2. sink: ",) try: sel = int(sys.stdin.readline()) if sel == 1: role = "source" elif sel == 2: role = "sink" else: raise ValueError except (TypeError, ValueError): print("Wrong selection, try again: ",) except KeyboardInterrupt: sys.exit() dtype = None while dtype == None: print("Select a data type: ",) try: sel = int(sys.stdin.readline()) if (sel < 0) or (sel > 65535): raise ValueError dtype = sel; except (TypeError, ValueError): print("Wrong selection, try again: ",) except KeyboardInterrupt: sys.exit() pref = None if role == "source": while pref == None: try: print("Select a preferred data channel type 1.",) print("reliable 2. streaming: ",) sel = int(sys.stdin.readline()) if sel == 1: pref = "reliable" elif sel == 2: pref = "streaming" else: raise ValueError except (TypeError, ValueError): print("Wrong selection, try again") except KeyboardInterrupt: sys.exit() app_path = hdp_manager.CreateApplication({ "DataType": dbus.types.UInt16(dtype), "Role": role, "Description": "Test Source", "ChannelType": pref}) else: app_path = hdp_manager.CreateApplication({ "DataType": dbus.types.UInt16(dtype), "Description": "Test sink", "Role": role}) print("New application created:", app_path) con = None while con == None: try: print("Connect to a remote device (y/n)? ",) sel = sys.stdin.readline() if sel in ("y\n", "yes\n", "Y\n", "YES\n"): con = True elif sel in ("n\n", "no\n", "N\n", "NO\n"): con = False else: print("Wrong selection, try again.") except KeyboardInterrupt: sys.exit() if not con: enter_mainloop() sys.exit() manager = dbus.Interface(bus.get_object(BUS_NAME, "/"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() adapters = [] for path, ifaces in objects.items(): if ifaces.has_key(ADAPTER_INTERFACE): adapters.append(path) i = 1 for ad in adapters: print("%d. %s" % (i, ad)) i = i + 1 print("Select an adapter: ",) select = None while select == None: try: pos = int(sys.stdin.readline()) - 1 if pos < 0: raise TypeError select = adapters[pos] except (TypeError, IndexError, ValueError): print("Wrong selection, try again: ",) except KeyboardInterrupt: sys.exit() adapter = dbus.Interface(bus.get_object(BUS_NAME, select), ADAPTER_INTERFACE) devices = [] for path, interfaces in objects.items(): if "org.bluez.Device1" not in interfaces: continue properties = interfaces["org.bluez.Device1"] if properties["Adapter"] != select: continue; if HEALTH_DEVICE_INTERFACE not in interfaces: continue devices.append(path) if len(devices) == 0: print("No devices available") sys.exit() i = 1 for dev in devices: print("%d. %s" % (i, dev)) i = i + 1 print("Select a device: ",) select = None while select == None: try: pos = int(sys.stdin.readline()) - 1 if pos < 0: raise TypeError select = devices[pos] except (TypeError, IndexError, ValueError): print("Wrong selection, try again: ",) except KeyboardInterrupt: sys.exit() device = dbus.Interface(bus.get_object(BUS_NAME, select), HEALTH_DEVICE_INTERFACE) echo = None while echo == None: try: print("Perform an echo (y/n)? ",) sel = sys.stdin.readline() if sel in ("y\n", "yes\n", "Y\n", "YES\n"): echo = True elif sel in ("n\n", "no\n", "N\n", "NO\n"): echo = False else: print("Wrong selection, try again.") except KeyboardInterrupt: sys.exit() if echo: if device.Echo(): print("Echo was ok") else: print("Echo war wrong, exiting") sys.exit() print("Connecting to device %s" % (select)) if role == "source": chan = device.CreateChannel(app_path, "reliable") else: chan = device.CreateChannel(app_path, "any") print(chan) enter_mainloop() hdp_manager.DestroyApplication(app_path)