#!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-2.1-or-later import sys import struct import numpy import dbus import dbus.service import dbus.exceptions from threading import Timer import time try: from gi.repository import GObject except ImportError: import gobject as GObject from dbus.mainloop.glib import DBusGMainLoop import agent MESH_SERVICE_NAME = 'org.bluez.mesh' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1' MESH_NODE_IFACE = 'org.bluez.mesh.Node1' MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1' MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1' APP_COMPANY_ID = 0x05f1 APP_PRODUCT_ID = 0x0001 APP_VERSION_ID = 0x0001 VENDOR_ID_NONE = 0xffff mesh_net = None app = None bus = None mainloop = None node = None token = None def generic_error_cb(error): print('D-Bus call failed: ' + str(error)) def generic_reply_cb(): print('D-Bus call done') def unwrap(item): if isinstance(item, dbus.Boolean): return bool(item) if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32, dbus.UInt64, dbus.Int64)): return int(item) if isinstance(item, dbus.Byte): return bytes([int(item)]) if isinstance(item, dbus.String): return item if isinstance(item, (dbus.Array, list, tuple)): return [unwrap(x) for x in item] if isinstance(item, (dbus.Dictionary, dict)): return dict([(unwrap(x), unwrap(y)) for x, y in item.items()]) print('Dictionary item not handled') print(type(item)) return item def join_cb(): print('Join procedure started') def join_error_cb(reason): print('Join procedure failed: ', reason) def attach_app_cb(node_path, dict_array): print('Mesh application registered ', node_path) obj = bus.get_object(MESH_SERVICE_NAME, node_path) global node node = dbus.Interface(obj, MESH_NODE_IFACE) els = unwrap(dict_array) print("Get Elements") for el in els: idx = struct.unpack('b', el[0])[0] print('Configuration for Element ', end='') print(idx) models = el[1] element = app.get_element(idx) element.set_model_config(models) def attach_app_error_cb(error): print('Failed to register application: ' + str(error)) mainloop.quit() def attach(token): print('Attach') mesh_net.Attach(app.get_path(), token, reply_handler=attach_app_cb, error_handler=attach_app_error_cb) def interfaces_removed_cb(object_path, interfaces): if not mesh_net: return if object_path == mesh_net[2]: print('Service was removed') mainloop.quit() def send_response(path, dest, key, data): print('send response ', end='') print(data) node.Send(path, dest, key, data, reply_handler=generic_reply_cb, error_handler=generic_error_cb) def send_publication(path, model_id, data): print('send publication ', end='') print(data) node.Publish(path, model_id, data, reply_handler=generic_reply_cb, error_handler=generic_error_cb) class PubTimer(): def __init__(self): self.seconds = None self.func = None self.thread = None self.busy = False def _timeout_cb(self): self.func() self.busy = True self._schedule_timer() self.busy =False def _schedule_timer(self): self.thread = Timer(self.seconds, self._timeout_cb) self.thread.start() def start(self, seconds, func): self.func = func self.seconds = seconds if not self.busy: self._schedule_timer() def cancel(self): print('Cancel timer') if self.thread is not None: print('Cancel thread') self.thread.cancel() self.thread = None class Application(dbus.service.Object): def __init__(self, bus): self.path = '/example' self.agent = None self.elements = [] dbus.service.Object.__init__(self, bus, self.path) def set_agent(self, agent): self.agent = agent def get_path(self): return dbus.ObjectPath(self.path) def add_element(self, element): self.elements.append(element) def get_element(self, idx): for ele in self.elements: if ele.get_index() == idx: return ele def get_properties(self): return { MESH_APPLICATION_IFACE: { 'CompanyID': dbus.UInt16(APP_COMPANY_ID), 'ProductID': dbus.UInt16(APP_PRODUCT_ID), 'VersionID': dbus.UInt16(APP_VERSION_ID) } } @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): response = {} print('GetManagedObjects') response[self.path] = self.get_properties() response[self.agent.get_path()] = self.agent.get_properties() for element in self.elements: response[element.get_path()] = element.get_properties() return response @dbus.service.method(MESH_APPLICATION_IFACE, in_signature="t", out_signature="") def JoinComplete(self, value): global token print('JoinComplete ', value) token = value attach(token) @dbus.service.method(MESH_APPLICATION_IFACE, in_signature="s", out_signature="") def JoinFailed(self, value): print('JoinFailed ', value) token = value class Element(dbus.service.Object): PATH_BASE = '/example/ele' def __init__(self, bus, index): self.path = self.PATH_BASE + format(index, '02x') print(self.path) self.models = [] self.bus = bus self.index = index dbus.service.Object.__init__(self, bus, self.path) def _get_sig_models(self): ids = [] for model in self.models: id = model.get_id() vendor = model.get_vendor() if vendor == VENDOR_ID_NONE: ids.append(id) return ids def _get_v_models(self): ids = [] for model in self.models: id = model.get_id() v = model.get_vendor() if v != VENDOR_ID_NONE: vendor_id = (v, id) ids.append(vendor_id) return ids def get_properties(self): vendor_models = self._get_v_models() sig_models = self._get_sig_models() return { MESH_ELEMENT_IFACE: { 'Index': dbus.Byte(self.index), 'Models': dbus.Array(sig_models, 'q'), 'VendorModels': dbus.Array(vendor_models, '(qq)'), } } def add_model(self, model): model.set_path(self.path) self.models.append(model) def get_index(self): return self.index def set_model_config(self, configs): print('Set element models config') for config in configs: mod_id = config[0] self.UpdateModelConfiguration(mod_id, config[1]) @dbus.service.method(MESH_ELEMENT_IFACE, in_signature="qqvay", out_signature="") def MessageReceived(self, source, key, destination, data): print('Message Received on Element %d, src=%04x, dst=%s' % self.index, source, destination) for model in self.models: model.process_message(source, key, data) @dbus.service.method(MESH_ELEMENT_IFACE, in_signature="qa{sv}", out_signature="") def UpdateModelConfiguration(self, model_id, config): print('UpdateModelConfig ', end='') print(hex(model_id)) for model in self.models: if model_id == model.get_id(): model.set_config(config) return @dbus.service.method(MESH_ELEMENT_IFACE, in_signature="", out_signature="") def get_path(self): return dbus.ObjectPath(self.path) class Model(): def __init__(self, model_id): self.cmd_ops = [] self.model_id = model_id self.vendor = VENDOR_ID_NONE self.bindings = [] self.pub_period = 0 self.pub_id = 0 self.path = None def set_path(self, path): self.path = path def get_id(self): return self.model_id def get_vendor(self): return self.vendor def process_message(self, source, key, data): print('Model process message') def set_publication(self, period): self.pub_period = period def set_config(self, config): if 'Bindings' in config: self.bindings = config.get('Bindings') print('Bindings: ', end='') print(self.bindings) if 'PublicationPeriod' in config: self.set_publication(config.get('PublicationPeriod')) print('Model publication period ', end='') print(self.pub_period, end='') print(' ms') if 'Subscriptions' in config: self.print_subscriptions(config.get('Subscriptions')) def print_subscriptions(self, subscriptions): print('Model subscriptions ', end='') for sub in subscriptions: if isinstance(sub, int): print('%04x' % sub, end=' ') if isinstance(sub, list): label = uuid.UUID(bytes=b''.join(sub)) print(label, end=' ') print() class OnOffServer(Model): def __init__(self, model_id): Model.__init__(self, model_id) self.cmd_ops = { 0x8201, # get 0x8202, # set 0x8203 } # set unacknowledged print("OnOff Server ", end="") self.state = 0 print('State ', end='') self.timer = PubTimer() def process_message(self, source, key, data): datalen = len(data) print('OnOff Server process message len ', datalen) if datalen!=2 and datalen!=3: return if datalen==2: op_tuple=struct.unpack('