summaryrefslogtreecommitdiff
path: root/test/example-onoff-client
diff options
context:
space:
mode:
Diffstat (limited to 'test/example-onoff-client')
-rw-r--r--test/example-onoff-client288
1 files changed, 288 insertions, 0 deletions
diff --git a/test/example-onoff-client b/test/example-onoff-client
new file mode 100644
index 000000000..e4a87eb12
--- /dev/null
+++ b/test/example-onoff-client
@@ -0,0 +1,288 @@
+#!/usr/bin/env python3
+
+import sys
+import struct
+import numpy
+import dbus
+import dbus.service
+import dbus.exceptions
+
+try:
+ from gi.repository import GObject
+except ImportError:
+ import gobject as GObject
+from dbus.mainloop.glib import DBusGMainLoop
+
+MESH_SERVICE_NAME = 'org.bluez.mesh'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+
+MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
+MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
+MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
+
+VENDOR_ID_NONE = 0xffff
+
+app = None
+bus = None
+mainloop = None
+node = None
+token = numpy.uint64(0x76bd4f2372477600)
+
+def unwrap(item):
+ if isinstance(item, dbus.Boolean):
+ return bool(item)
+ if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
+ dbus.UInt64, dbus.Int64)):
+ return int(item)
+ if isinstance(item, dbus.Byte):
+ return bytes([int(item)])
+ if isinstance(item, dbus.String):
+ return item
+ if isinstance(item, (dbus.Array, list, tuple)):
+ return [unwrap(x) for x in item]
+ if isinstance(item, (dbus.Dictionary, dict)):
+ return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
+
+ print('Dictionary item not handled')
+ print(type(item))
+ return item
+
+def attach_app_cb(node_path, dict_array):
+ print('Mesh application registered ', node_path)
+ print(type(node_path))
+ print(type(dict_array))
+ print(dict_array)
+
+ els = unwrap(dict_array)
+ print("Get Elements")
+ for el in els:
+ print(el)
+ idx = struct.unpack('b', el[0])[0]
+ print('Configuration for Element ', end='')
+ print(idx)
+ models = el[1]
+
+ element = app.get_element(idx)
+ element.set_model_config(models)
+
+ obj = bus.get_object(MESH_SERVICE_NAME, node_path)
+ global node
+ node = dbus.Interface(obj, MESH_NODE_IFACE)
+
+def error_cb(error):
+ print('D-Bus call failed: ' + str(error))
+
+def generic_reply_cb():
+ print('D-Bus call done')
+
+def interfaces_removed_cb(object_path, interfaces):
+ if not mesh_net:
+ return
+
+ if object_path == mesh_net[2]:
+ print('Service was removed')
+ mainloop.quit()
+
+class Application(dbus.service.Object):
+
+ def __init__(self, bus):
+ self.path = '/example'
+ self.elements = []
+ dbus.service.Object.__init__(self, bus, self.path)
+
+ def get_path(self):
+ return dbus.ObjectPath(self.path)
+
+ def add_element(self, element):
+ self.elements.append(element)
+
+ def get_element(self, idx):
+ for ele in self.elements:
+ if ele.get_index() == idx:
+ return ele
+
+ @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+ def GetManagedObjects(self):
+ response = {}
+ print('GetManagedObjects')
+ for element in self.elements:
+ response[element.get_path()] = element.get_properties()
+ return response
+
+class Element(dbus.service.Object):
+ PATH_BASE = '/example/ele'
+
+ def __init__(self, bus, index):
+ self.path = self.PATH_BASE + format(index, '02x')
+ print(self.path)
+ self.models = []
+ self.bus = bus
+ self.index = index
+ dbus.service.Object.__init__(self, bus, self.path)
+
+ def _get_sig_models(self):
+ ids = []
+ for model in self.models:
+ id = model.get_id()
+ vendor = model.get_vendor()
+ if vendor == VENDOR_ID_NONE:
+ ids.append(id)
+ return ids
+
+ def get_properties(self):
+ return {
+ MESH_ELEMENT_IFACE: {
+ 'Index': dbus.Byte(self.index),
+ 'Models': dbus.Array(
+ self._get_sig_models(), signature='q')
+ }
+ }
+
+ def add_model(self, model):
+ model.set_path(self.path)
+ self.models.append(model)
+
+ def get_index(self):
+ return self.index
+
+ def set_model_config(self, config):
+ print('Set element models config')
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="qqbay", out_signature="")
+ def MessageReceived(self, source, key, is_sub, data):
+ print('Message Received on Element ', end='')
+ print(self.index)
+ for model in self.models:
+ model.process_message(source, key, data)
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="qa{sv}", out_signature="")
+
+ def UpdateModelConfiguration(self, model_id, config):
+ print('UpdateModelConfig ', end='')
+ print(hex(model_id))
+ for model in self.models:
+ if model_id == model.get_id():
+ model.set_config(config)
+ return
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="", out_signature="")
+ def get_path(self):
+ return dbus.ObjectPath(self.path)
+
+class Model():
+ def __init__(self, model_id):
+ self.cmd_ops = []
+ self.model_id = model_id
+ self.vendor = VENDOR_ID_NONE
+ self.path = None
+
+ def set_path(self, path):
+ self.path = path
+
+ def get_id(self):
+ return self.model_id
+
+ def get_vendor(self):
+ return self.vendor
+
+ def process_message(self, source, key, data):
+ print('Model process message')
+
+ def set_publication(self, period):
+ self.period = period
+
+ def set_bindings(self, bindings):
+ self.bindings = bindings
+
+ def set_config(self, config):
+ if 'Bindings' in config:
+ self.bindings = config.get('Bindings')
+ print('Bindings: ', end='')
+ print(self.bindings)
+ if 'PublicationPeriod' in config:
+ self.set_publication(config.get('PublicationPeriod'))
+ print('Model publication period ', end='')
+ print(self.pub_period, end='')
+ print(' ms')
+
+class OnOffClient(Model):
+ def __init__(self, model_id):
+ Model.__init__(self, model_id)
+ self.cmd_ops = { 0x8201, # get
+ 0x8202, # set
+ 0x8203 } # set unacknowledged
+ print('OnOff Client')
+
+ def _reply_cb(state):
+ print('State ', end='');
+ print(state)
+
+ def _send_message(self, dest, key, data, reply_cb):
+ print('OnOffClient send data')
+ node.Send(self.path, dest, key, data, reply_handler=reply_cb,
+ error_handler=error_cb)
+
+ def get_state(self, dest, key):
+ opcode = 0x8201
+ data = struct.pack('<H', opcode)
+ self._send_message(dest, key, data, self._reply_cb)
+
+ def set_state(self, dest, key, state):
+ opcode = 0x8202
+ data = struct.pack('<HB', opcode, state)
+ self._send_message(dest, key, data, self._reply_cb)
+
+ def process_message(self, source, key, data):
+ print('OnOffClient process message len ', end = '')
+ datalen = len(data)
+ print(datalen)
+
+ if datalen!=3:
+ return
+
+ opcode, state=struct.unpack('<HB',bytes(data))
+ if opcode != 0x8202 :
+ print('Bad opcode ', end='')
+ print(hex(opcode))
+ return
+
+ print('Got state ', end = '')
+ print(hex(state))
+
+def attach_app_error_cb(error):
+ print('Failed to register application: ' + str(error))
+ mainloop.quit()
+
+def main():
+
+ DBusGMainLoop(set_as_default=True)
+
+ global bus
+ bus = dbus.SystemBus()
+ global mainloop
+ global app
+
+ mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
+ "/org/bluez/mesh"),
+ MESH_NETWORK_IFACE)
+ mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
+
+ app = Application(bus)
+ first_ele = Element(bus, 0x00)
+ first_ele.add_model(OnOffClient(0x1001))
+ app.add_element(first_ele)
+
+ mainloop = GObject.MainLoop()
+
+ print('Attach')
+ mesh_net.Attach(app.get_path(), token,
+ reply_handler=attach_app_cb,
+ error_handler=attach_app_error_cb)
+ mainloop.run()
+
+if __name__ == '__main__':
+ main()