From 161691ae36196604195e4615c387ebfbf7235da3 Mon Sep 17 00:00:00 2001 From: Luiz Augusto von Dentz Date: Thu, 31 Aug 2017 15:35:03 +0300 Subject: build: Cleanup GATT profiles Alert, cyclingspeed, heartrate and proximity can all be implemented using the GATT D-Bus API so they no longer need dedicated APIs. --- test/test-alert | 185 ---------------------------------------------- test/test-cyclingspeed | 197 ------------------------------------------------- test/test-heartrate | 108 --------------------------- test/test-proximity | 70 ------------------ 4 files changed, 560 deletions(-) delete mode 100755 test/test-alert delete mode 100755 test/test-cyclingspeed delete mode 100755 test/test-heartrate delete mode 100755 test/test-proximity (limited to 'test') diff --git a/test/test-alert b/test/test-alert deleted file mode 100755 index 43b3cf362..000000000 --- a/test/test-alert +++ /dev/null @@ -1,185 +0,0 @@ -#!/usr/bin/python - -from __future__ import absolute_import, print_function, unicode_literals - -import optparse -import os -import sys -import dbus -import dbus.service -import dbus.mainloop.glib -try: - from gi.repository import GObject -except ImportError: - import gobject as GObject - -BUS_NAME = 'org.bluez' -ALERT_INTERFACE = 'org.bluez.Alert1' -ALERT_AGENT_INTERFACE = 'org.bluez.AlertAgent1' -BLUEZ_OBJECT_PATH = '/org/bluez' -TEST_OBJECT_PATH = '/org/bluez/test' - -class AlertAgent(dbus.service.Object): - def __init__(self, bus, object_path, alert, mainloop): - dbus.service.Object.__init__(self, bus, object_path) - self.alert = alert - self.mainloop = mainloop - - @dbus.service.method(ALERT_AGENT_INTERFACE, in_signature='', - out_signature='') - def MuteOnce(self): - print('method MuteOnce() was called') - self.alert.NewAlert('ringer', 1, 'not active') - - @dbus.service.method(ALERT_AGENT_INTERFACE, in_signature='s', - out_signature='') - def SetRinger(self, mode): - print('method SetRinger(%s) was called' % mode) - self.alert.NewAlert('ringer', 1, mode) - - @dbus.service.method(ALERT_AGENT_INTERFACE, in_signature='', - out_signature='') - def Release(self): - print('method Release() was called') - self.mainloop.quit() - -def print_command_line(options): - if not options.verbose: - return False - - print('-w: ' + str(options.wait)) - - if options.times: - print('-t: ' + str(options.times)) - - if options.register: - print('-r: ' + options.register) - else: - print('-r: ' + str(None)) - - if options.new_alert: - print('-n:') - for i in options.new_alert: - print(' ' + i[0] + ', ' + i[1] + ', ' + i[2]) - else: - print('-n: ' + str(None)) - - if options.unread_alert: - print('-u:') - for i in options.unread_alert: - print(' ' + i[0] + ', ' + i[1]) - else: - print('-u: ' + str(None)) - - print() - - return True - -def read_count(param): - try: - return int(param) - except ValueError: - print(' must be integer, not \"%s\"' % param) - sys.exit(1) - -def new_alert(alert, params): - if not params: - return False - - for param in params: - category = param[0] - count = read_count(param[1]) - description = param[2] - - alert.NewAlert(category, count, description) - -def unread_alert(alert, params): - if not params: - return False - - for param in params: - category = param[0] - count = read_count(param[1]) - - alert.UnreadAlert(category, count) - -option_list = [ - optparse.make_option('-v', None, - action = 'store_true', - default = False, - dest = 'verbose', - help = 'verbose'), - - optparse.make_option('-w', None, - action = 'store_true', - default = False, - dest = 'wait', - help = 'wait for dbus events'), - - optparse.make_option('-t', None, - action = 'store', - default = 1, - type = "int", - dest = 'times', - help = 'repeat UnreadAlert/NewAlert times', - metavar = ''), - - optparse.make_option('-r', None, - action = 'store', - dest = 'register', - type = 'string', - metavar = '', - help = 'register alert'), - - optparse.make_option('-n', None, - action = 'append', - dest = 'new_alert', - type = 'string', - nargs = 3, - metavar = ' ', - help = 'send new alert'), - - optparse.make_option('-u', None, - action = 'append', - dest = 'unread_alert', - type = 'string', - nargs = 2, - metavar = ' ', - help = 'send unread alert'), -] - -parser = optparse.OptionParser(option_list=option_list) -parser.disable_interspersed_args() -(options, args) = parser.parse_args() - -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) -bus = dbus.SystemBus() -mainloop = GObject.MainLoop() -alert = dbus.Interface(bus.get_object(BUS_NAME, BLUEZ_OBJECT_PATH), - ALERT_INTERFACE) -alert_agent = AlertAgent(bus, TEST_OBJECT_PATH, alert, mainloop) - -print_command_line(options) - -if not (options.register or options.new_alert or options.unread_alert or - options.wait): - parser.print_usage() - sys.exit(1) - -if options.register: - alert.RegisterAlert(options.register, TEST_OBJECT_PATH) - -times = 0 -while times < options.times: - times += 1 - - new_alert(alert, options.new_alert) - unread_alert(alert, options.unread_alert) - -if not options.wait: - sys.exit(0) - -try: - mainloop.run() -except: - pass diff --git a/test/test-cyclingspeed b/test/test-cyclingspeed deleted file mode 100755 index 393f79c72..000000000 --- a/test/test-cyclingspeed +++ /dev/null @@ -1,197 +0,0 @@ -#!/usr/bin/python - -from __future__ import absolute_import, print_function, unicode_literals - -''' -Cycling Speed and Cadence test script -''' - -from optparse import OptionParser, make_option -import sys -import dbus -import dbus.service -import dbus.mainloop.glib -try: - from gi.repository import GObject -except ImportError: - import gobject as GObject -import bluezutils - -BUS_NAME = 'org.bluez' -CYCLINGSPEED_MANAGER_INTERFACE = 'org.bluez.CyclingSpeedManager1' -CYCLINGSPEED_WATCHER_INTERFACE = 'org.bluez.CyclingSpeedWatcher1' -CYCLINGSPEED_INTERFACE = 'org.bluez.CyclingSpeed1' - -class MeasurementQ: - def __init__(self, wrap_v): - self._now = [None, None] - self._prev = [None, None] - self._wrap_v = wrap_v - - def can_calc(self): - return ((self._now[0] is not None) - and (self._now[1] is not None) - and (self._prev[0] is not None) - and (self._prev[1] is not None)) - - def delta_v(self): - delta = self._now[0] - self._prev[0] - if (delta < 0) and (self._wrap_v): - delta = delta + 65536 - return delta - - def delta_t(self): - delta = self._now[1] - self._prev[1] - if delta < 0: - delta = delta + 65536 - return delta - - def put(self, data): - self._prev = self._now - self._now = data - -class Watcher(dbus.service.Object): - _wheel = MeasurementQ(False) - _crank = MeasurementQ(True) - _circumference = None - - def enable_calc(self, v): - self._circumference = v - - @dbus.service.method(CYCLINGSPEED_WATCHER_INTERFACE, - in_signature="oa{sv}", out_signature="") - def MeasurementReceived(self, device, measure): - print("Measurement received from %s" % device) - - rev = None - evt = None - if "WheelRevolutions" in measure: - rev = measure["WheelRevolutions"] - print("WheelRevolutions: ", measure["WheelRevolutions"]) - if "LastWheelEventTime" in measure: - evt = measure["LastWheelEventTime"] - print("LastWheelEventTime: ", measure["LastWheelEventTime"]) - self._wheel.put( [rev, evt] ) - - rev = None - evt = None - if "CrankRevolutions" in measure: - rev = measure["CrankRevolutions"] - print("CrankRevolutions: ", measure["CrankRevolutions"]) - if "LastCrankEventTime" in measure: - evt = measure["LastCrankEventTime"] - print("LastCrankEventTime: ", measure["LastCrankEventTime"]) - self._crank.put( [rev, evt] ) - - if self._circumference is None: - return - - if self._wheel.can_calc(): - delta_v = self._wheel.delta_v() - delta_t = self._wheel.delta_t() - - if (delta_v >= 0) and (delta_t > 0): - speed = delta_v * self._circumference * 1024 / delta_t # mm/s - speed = speed * 0.0036 # mm/s -> km/h - print("(calculated) Speed: %.2f km/h" % speed) - - if self._crank.can_calc(): - delta_v = self._crank.delta_v() - delta_t = self._crank.delta_t() - - if delta_t > 0: - cadence = delta_v * 1024 / delta_t - print("(calculated) Cadence: %d rpm" % cadence) - -def properties_changed(interface, changed, invalidated): - if "Location" in changed: - print("Sensor location: %s" % changed["Location"]) - -if __name__ == "__main__": - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - - bus = dbus.SystemBus() - - option_list = [ - make_option("-i", "--adapter", action="store", - type="string", dest="adapter"), - make_option("-b", "--device", action="store", - type="string", dest="address"), - make_option("-c", "--circumference", action="store", - type="int", dest="circumference"), - ] - - parser = OptionParser(option_list=option_list) - - (options, args) = parser.parse_args() - - if not options.address: - print("Usage: %s [-i ] -b [-c ] [cmd]" % (sys.argv[0])) - print("Possible commands:") - print("\tShowSupportedLocations") - print("\tSetLocation ") - print("\tSetCumulativeWheelRevolutions ") - sys.exit(1) - - managed_objects = bluezutils.get_managed_objects() - adapter = bluezutils.find_adapter_in_objects(managed_objects, - options.adapter) - adapter_path = adapter.object_path - - device = bluezutils.find_device_in_objects(managed_objects, - options.address, - options.adapter) - device_path = device.object_path - - cscmanager = dbus.Interface(bus.get_object(BUS_NAME, adapter_path), - CYCLINGSPEED_MANAGER_INTERFACE) - - watcher_path = "/test/watcher" - watcher = Watcher(bus, watcher_path) - if options.circumference: - watcher.enable_calc(options.circumference) - cscmanager.RegisterWatcher(watcher_path) - - csc = dbus.Interface(bus.get_object(BUS_NAME, device_path), - CYCLINGSPEED_INTERFACE) - - bus.add_signal_receiver(properties_changed, bus_name=BUS_NAME, - path=device_path, - dbus_interface="org.freedesktop.DBus.Properties", - signal_name="PropertiesChanged") - - device_prop = dbus.Interface(bus.get_object(BUS_NAME, device_path), - "org.freedesktop.DBus.Properties") - - properties = device_prop.GetAll(CYCLINGSPEED_INTERFACE) - - if "Location" in properties: - print("Sensor location: %s" % properties["Location"]) - else: - print("Sensor location is not supported") - - if len(args) > 0: - if args[0] == "ShowSupportedLocations": - if properties["MultipleLocationsSupported"]: - print("Supported locations: ", properties["SupportedLocations"]) - else: - print("Multiple sensor locations not supported") - - elif args[0] == "SetLocation": - if properties["MultipleLocationsSupported"]: - device_prop.Set(CYCLINGSPEED_INTERFACE, "Location", args[1]) - else: - print("Multiple sensor locations not supported") - - elif args[0] == "SetCumulativeWheelRevolutions": - if properties["WheelRevolutionDataSupported"]: - csc.SetCumulativeWheelRevolutions(dbus.UInt32(args[1])) - else: - print("Wheel revolution data not supported") - - else: - print("Unknown command") - sys.exit(1) - - mainloop = GObject.MainLoop() - mainloop.run() diff --git a/test/test-heartrate b/test/test-heartrate deleted file mode 100755 index 5e4e7e5c5..000000000 --- a/test/test-heartrate +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/python - -from __future__ import absolute_import, print_function, unicode_literals - -''' -Heart Rate Monitor test script -''' - -from optparse import OptionParser, make_option -import sys -import dbus -import dbus.service -import dbus.mainloop.glib -try: - from gi.repository import GObject -except ImportError: - import gobject as GObject -import bluezutils - -BUS_NAME = 'org.bluez' -HEARTRATE_MANAGER_INTERFACE = 'org.bluez.HeartRateManager1' -HEARTRATE_WATCHER_INTERFACE = 'org.bluez.HeartRateWatcher1' -HEARTRATE_INTERFACE = 'org.bluez.HeartRate1' - -class Watcher(dbus.service.Object): - @dbus.service.method(HEARTRATE_WATCHER_INTERFACE, - in_signature="oa{sv}", out_signature="") - def MeasurementReceived(self, device, measure): - print("Measurement received from %s" % device) - print("Value: ", measure["Value"]) - - if "Energy" in measure: - print("Energy: ", measure["Energy"]) - - if "Contact" in measure: - print("Contact: ", measure["Contact"]) - - if "Interval" in measure: - for i in measure["Interval"]: - print("Interval: ", i) - -if __name__ == "__main__": - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - - bus = dbus.SystemBus() - - option_list = [ - make_option("-i", "--adapter", action="store", - type="string", dest="adapter"), - make_option("-b", "--device", action="store", - type="string", dest="address"), - ] - - parser = OptionParser(option_list=option_list) - - (options, args) = parser.parse_args() - - if not options.address: - print("Usage: %s [-i ] -b [cmd]" % (sys.argv[0])) - print("Possible commands:") - print("\tReset") - sys.exit(1) - - managed_objects = bluezutils.get_managed_objects() - adapter = bluezutils.find_adapter_in_objects(managed_objects, - options.adapter) - adapter_path = adapter.object_path - - heartrateManager = dbus.Interface(bus.get_object(BUS_NAME, - adapter_path), HEARTRATE_MANAGER_INTERFACE) - - path = "/test/watcher" - heartrateManager.RegisterWatcher(path) - - device = bluezutils.find_device_in_objects(managed_objects, - options.address, - options.adapter) - device_path = device.object_path - - heartrate = dbus.Interface(bus.get_object(BUS_NAME, device_path), - HEARTRATE_INTERFACE) - - watcher = Watcher(bus, path) - - dev_prop = dbus.Interface(bus.get_object(BUS_NAME, device_path), - "org.freedesktop.DBus.Properties") - - properties = dev_prop.GetAll(HEARTRATE_INTERFACE) - - if "Location" in properties: - print("Sensor location: %s" % properties["Location"]) - else: - print("Sensor location is not supported") - - if len(args) > 0: - if args[0] == "Reset": - reset_sup = properties["ResetSupported"] - if reset_sup: - heartrate.Reset() - else: - print("Reset not supported") - sys.exit(1) - else: - print("unknown command") - sys.exit(1) - - mainloop = GObject.MainLoop() - mainloop.run() diff --git a/test/test-proximity b/test/test-proximity deleted file mode 100755 index 66b7bc24c..000000000 --- a/test/test-proximity +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/python - -from __future__ import absolute_import, print_function, unicode_literals - -''' -Proximity Monitor test script -''' - -from optparse import OptionParser, make_option -import sys -import dbus -import dbus.mainloop.glib -try: - from gi.repository import GObject -except ImportError: - import gobject as GObject -import bluezutils - -BUS_NAME = 'org.bluez' -PROXIMITY_MONITOR_INTERFACE = 'org.bluez.ProximityMonitor1' - -def properties_changed(interface, changed, invalidated): - if interface != PROXIMITY_MONITOR_INTERFACE: - return - - for name, value in changed.iteritems(): - print("Property %s changed: %s" % (name, str(value))) - -if __name__ == "__main__": - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - - bus = dbus.SystemBus() - - option_list = [ - make_option("-i", "--adapter", action="store", - type="string", dest="dev_id"), - make_option("-b", "--device", action="store", - type="string", dest="address"), - - ] - parser = OptionParser(option_list=option_list) - - (options, args) = parser.parse_args() - - if (len(args) < 1): - print("Usage: %s " % (sys.argv[0])) - print("") - print(" -b MAC LinkLossAlertLevel ") - print(" -b MAC ImmediateAlertLevel ") - sys.exit(1) - - device = bluezutils.find_device(options.address, options.dev_id) - device_path = device.object_path - - bus.add_signal_receiver(properties_changed, bus_name=BUS_NAME, - path=device_path, - dbus_interface="org.freedesktop.DBus.Properties", - signal_name="PropertiesChanged") - - proximity = dbus.Interface(bus.get_object(BUS_NAME, device_path), - PROXIMITY_MONITOR_INTERFACE) - - device_prop = dbus.Interface(bus.get_object(BUS_NAME, device_path), - "org.freedesktop.DBus.Properties") - - print("Proximity SetProperty('%s', '%s')" % (args[0], args[1])) - device_prop.Set(PROXIMITY_MONITOR_INTERFACE, args[0], args[1]) - - mainloop = GObject.MainLoop() - mainloop.run() -- cgit v1.2.1