#!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-2.1-or-later ################################################################### # # This is a unified test sample for BT Mesh # # To run the test: # test-mesh [token] # # 'token' is an optional argument. It must be a 16-digit # hexadecimal number. The token must be associated with # an existing node. The token is generated and assigned # to a node as a result of successful provisioning (see # explanation of "join" option). # When the token is set, the menu operations "attach" # and "remove" may be performed on a node specified # by this token. # # The test imitates a device with 2 elements: # element 0: OnOff Server model # Sample Vendor model # element 1: OnOff Client model # # The main menu: # token # join # attach # remove # dest # uuid # app-index # client-menu # exit # # The main menu options explained: # token # Set the unique node token. # The token can be set from command line arguments as # well. # # join # Request provisioning of a device to become a node # on a mesh network. The test generates device UUID # which is displayed and will need to be provided to # an outside entity that acts as a Provisioner. Also, # during the provisioning process, an agent that is # part of the test, will request (or will be requested) # to perform a specified operation, e.g., a number will # be displayed and this number will need to be entered # on the Provisioner's side. # In case of successful provisioning, the application # automatically attaches as a node to the daemon. A node # 'token' is returned to the application and is used # for the runtime of the test. # # attach # Attach the application to bluetoothd-daemon as a node. # For the call to be successful, the valid node token must # be already set, either from command arguments or by # executing "set token" operation or automatically after # successfully executing "join" operation in the same # test run. # # remove # Permanently removes any node configuration from daemon # and persistent storage. After this operation, the node # is permanently forgotten by the daemon and the associated # node token is no longer valid. # # dest # Set destination address to send messages: 4 hex digits # # app-index # Set AppKey index to indicate which application key to use # to encode outgoing messages: up to 3 hex digits # # vendor-send # Allows to send an arbitrary endor message. # The destination is set based on previously executed "dest" # command (if not set, the outbound message will fail). # User is prompted to enter hex bytearray payload. # The message is originated from the vendor model registered # on element 0. For the command to succeed, the AppKey index # that is set by executing "app-key" must correspond to the # application key to which the Sample Vendor model is bound. # # client-menu # Enter On/Off client submenu. # # quit # Exits the test. # ################################################################### import sys import struct import fcntl import os import numpy import random import dbus import dbus.service import dbus.exceptions from threading import Timer import time import uuid try: from gi.repository import GLib except ImportError: import glib as GLib from dbus.mainloop.glib import DBusGMainLoop try: from termcolor import colored, cprint set_error = lambda x: colored('!' + x, 'red', attrs=['bold']) set_cyan = lambda x: colored(x, 'cyan', attrs=['bold']) set_green = lambda x: colored(x, 'green', attrs=['bold']) set_yellow = lambda x: colored(x, 'yellow', attrs=['bold']) except ImportError: print('!!! Install termcolor module for better experience !!!') set_error = lambda x: x set_cyan = lambda x: x set_green = lambda x: x set_yellow = lambda x: x # Provisioning agent try: import agent except ImportError: agent = None MESH_SERVICE_NAME = 'org.bluez.mesh' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' MESH_MGR_IFACE = 'org.bluez.mesh.Management1' MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1' MESH_NODE_IFACE = 'org.bluez.mesh.Node1' MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1' MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1' APP_COMPANY_ID = 0x05f1 APP_PRODUCT_ID = 0x0001 APP_VERSION_ID = 0x0001 VENDOR_ID_NONE = 0xffff TRANSACTION_TIMEOUT = 6 app = None bus = None mainloop = None node = None node_mgr = None mesh_net = None dst_addr = 0x0000 app_idx = 0 # Node token housekeeping token = None have_token = False attached = False # Remote device UUID have_uuid = False remote_uuid = None # Menu housekeeping MAIN_MENU = 0 ON_OFF_CLIENT_MENU = 1 INPUT_NONE = 0 INPUT_TOKEN = 1 INPUT_DEST_ADDRESS = 2 INPUT_APP_KEY_INDEX = 3 INPUT_MESSAGE_PAYLOAD = 4 INPUT_UUID = 5 menus = [] current_menu = None user_input = 0 input_error = False send_opts = dbus.Dictionary(signature='sv') send_opts = {'ForceSegmented' : dbus.Boolean(True)} def raise_error(str_value): global input_error input_error = True print(set_error(str_value)) def clear_error(): global input_error input_error = False def is_error(): return input_error def app_exit(): global mainloop global app for el in app.elements: for model in el.models: if model.timer != None: model.timer.cancel() mainloop.quit() def set_token(str_value): global token global have_token if len(str_value) != 16: raise_error('Expected 16 digits') return try: input_number = int(str_value, 16) except ValueError: raise_error('Not a valid hexadecimal number') return token = numpy.uint64(input_number) have_token = True def set_uuid(str_value): global remote_uuid global have_uuid if len(str_value) != 32: raise_error('Expected 32 digits') return remote_uuid = bytearray.fromhex(str_value) have_uuid = True def array_to_string(b_array): str_value = "" for b in b_array: str_value += "%02x" % b return str_value def generic_error_cb(error): print(set_error('D-Bus call failed: ') + str(error)) def generic_reply_cb(): return def attach_app_error_cb(error): print(set_error('Failed to register application: ') + str(error)) def attach(token): print('Attach mesh node to bluetooth-meshd daemon') mesh_net.Attach(app.get_path(), token, reply_handler=attach_app_cb, error_handler=attach_app_error_cb) def join_cb(): print('Join procedure started') def join_error_cb(reason): print('Join procedure failed: ', reason) def remove_node_cb(): global attached global have_token print(set_yellow('Node removed')) attached = False have_token = False def unwrap(item): if isinstance(item, dbus.Boolean): return bool(item) if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32, dbus.UInt64, dbus.Int64)): return int(item) if isinstance(item, dbus.Byte): return bytes([int(item)]) if isinstance(item, dbus.String): return item if isinstance(item, (dbus.Array, list, tuple)): return [unwrap(x) for x in item] if isinstance(item, (dbus.Dictionary, dict)): return dict([(unwrap(x), unwrap(y)) for x, y in item.items()]) print(set_error('Dictionary item not handled: ') + type(item)) return item def attach_app_cb(node_path, dict_array): global attached attached = True print(set_yellow('Mesh app registered: ') + set_green(node_path)) obj = bus.get_object(MESH_SERVICE_NAME, node_path) global node_mgr node_mgr = dbus.Interface(obj, MESH_MGR_IFACE) global node node = dbus.Interface(obj, MESH_NODE_IFACE) els = unwrap(dict_array) for el in els: idx = struct.unpack('b', el[0])[0] models = el[1] element = app.get_element(idx) element.set_model_config(models) def interfaces_removed_cb(object_path, interfaces): print('Removed') if not mesh_net: return print(object_path) if object_path == mesh_net[2]: print('Service was removed') app_exit() def print_state(state): print('State is ', end='') if state == 0: print('OFF') elif state == 1: print('ON') else: print('UNKNOWN') class ModTimer(): def __init__(self): self.seconds = None self.func = None self.thread = None self.busy = False def _timeout_cb(self): self.func() self.busy = True self._schedule_timer() self.busy =False def _schedule_timer(self): self.thread = Timer(self.seconds, self._timeout_cb) self.thread.start() def start(self, seconds, func): self.func = func self.seconds = seconds if not self.busy: self._schedule_timer() def cancel(self): if self.thread is not None: self.thread.cancel() self.thread = None class Application(dbus.service.Object): def __init__(self, bus): self.path = '/example' self.agent = None self.elements = [] dbus.service.Object.__init__(self, bus, self.path) def set_agent(self, agent): self.agent = agent def get_path(self): return dbus.ObjectPath(self.path) def add_element(self, element): self.elements.append(element) def get_element(self, idx): for ele in self.elements: if ele.get_index() == idx: return ele def get_properties(self): return { MESH_APPLICATION_IFACE: { 'CompanyID': dbus.UInt16(APP_COMPANY_ID), 'ProductID': dbus.UInt16(APP_PRODUCT_ID), 'VersionID': dbus.UInt16(APP_VERSION_ID) } } @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): response = {} response[self.path] = self.get_properties() response[self.agent.get_path()] = self.agent.get_properties() for element in self.elements: response[element.get_path()] = element.get_properties() return response @dbus.service.method(MESH_APPLICATION_IFACE, in_signature="t", out_signature="") def JoinComplete(self, value): global token global have_token global attach print(set_yellow('Joined mesh network with token ') + set_green(format(value, '016x'))) token = value have_token = True @dbus.service.method(MESH_APPLICATION_IFACE, in_signature="s", out_signature="") def JoinFailed(self, value): print(set_error('JoinFailed '), value) class Element(dbus.service.Object): PATH_BASE = '/example/ele' def __init__(self, bus, index): self.path = self.PATH_BASE + format(index, '02x') self.models = [] self.bus = bus self.index = index dbus.service.Object.__init__(self, bus, self.path) def _get_sig_models(self): mods = [] for model in self.models: opts = [] id = model.get_id() vendor = model.get_vendor() if vendor == VENDOR_ID_NONE: mod = (id, opts) mods.append(mod) return mods def _get_v_models(self): mods = [] for model in self.models: opts = [] id = model.get_id() v = model.get_vendor() if v != VENDOR_ID_NONE: mod = (v, id, opts) mods.append(mod) return mods def get_properties(self): vendor_models = self._get_v_models() sig_models = self._get_sig_models() props = {'Index' : dbus.Byte(self.index)} props['Models'] = dbus.Array(sig_models, signature='(qa{sv})') props['VendorModels'] = dbus.Array(vendor_models, signature='(qqa{sv})') #print(props) return { MESH_ELEMENT_IFACE: props } def add_model(self, model): model.set_path(self.path) self.models.append(model) def get_index(self): return self.index def set_model_config(self, configs): for config in configs: mod_id = config[0] self.update_model_config(mod_id, config[1]) @dbus.service.method(MESH_ELEMENT_IFACE, in_signature="qqvay", out_signature="") def MessageReceived(self, source, key, dest, data): print(('Message Received on Element %02x') % self.index, end='') print(', src=', format(source, '04x'), end='') if isinstance(dest, int): print(', dst=%04x' % dest) elif isinstance(dest, dbus.Array): dst_str = array_to_string(dest) print(', dst=' + dst_str) for model in self.models: model.process_message(source, dest, key, data) @dbus.service.method(MESH_ELEMENT_IFACE, in_signature="qa{sv}", out_signature="") def UpdateModelConfiguration(self, model_id, config): cfg = unwrap(config) print(cfg) self.update_model_config(model_id, cfg) def update_model_config(self, model_id, config): print(('Update Model Config '), end='') print(format(model_id, '04x')) for model in self.models: if model_id == model.get_id(): model.set_config(config) return @dbus.service.method(MESH_ELEMENT_IFACE, in_signature="", out_signature="") def get_path(self): return dbus.ObjectPath(self.path) class Model(): def __init__(self, model_id): self.cmd_ops = [] self.model_id = model_id self.vendor = VENDOR_ID_NONE self.bindings = [] self.pub_period = 0 self.pub_id = 0 self.path = None self.timer = None def set_path(self, path): self.path = path def get_id(self): return self.model_id def get_vendor(self): return self.vendor def process_message(self, source, dest, key, data): return def set_publication(self, period): self.pub_period = period def send_publication(self, data): pub_opts = dbus.Dictionary(signature='sv') print('Send publication ', end='') print(data) node.Publish(self.path, self.model_id, pub_opts, data, reply_handler=generic_reply_cb, error_handler=generic_error_cb) def send_message(self, dest, key, data): global send_opts node.Send(self.path, dest, key, send_opts, data, reply_handler=generic_reply_cb, error_handler=generic_error_cb) def set_config(self, config): if 'Bindings' in config: self.bindings = config.get('Bindings') print('Bindings: ', end='') print(self.bindings) if 'PublicationPeriod' in config: self.set_publication(config.get('PublicationPeriod')) print('Model publication period ', end='') print(self.pub_period, end='') print(' ms') if 'Subscriptions' in config: print('Model subscriptions ', end='') self.print_subscriptions(config.get('Subscriptions')) print() def print_subscriptions(self, subscriptions): for sub in subscriptions: if isinstance(sub, int): print('%04x,' % sub, end=' ') if isinstance(sub, list): label = uuid.UUID(bytes=b''.join(sub)) print(label, ',', end=' ') ######################## # On Off Server Model ######################## class OnOffServer(Model): def __init__(self, model_id): Model.__init__(self, model_id) self.tid = None self.last_src = 0x0000 self.last_dst = 0x0000 self.cmd_ops = { 0x8201, # get 0x8202, # set 0x8203, # set unacknowledged 0x8204 } # status print("OnOff Server ") self.state = 0 print_state(self.state) self.pub_timer = ModTimer() self.t_timer = ModTimer() def process_message(self, source, dest, key, data): datalen = len(data) if datalen != 2 and datalen != 4: # The opcode is not recognized by this model return if datalen == 2: op_tuple=struct.unpack('>H',bytes(data)) opcode = op_tuple[0] if opcode != 0x8201: # The opcode is not recognized by this model return print('Get state') elif datalen == 4: opcode,self.state, tid = struct.unpack('>HBB', bytes(data)) if opcode != 0x8202 and opcode != 0x8203: # The opcode is not recognized by this model return print_state(self.state) if (self.tid != None and self.tid == tid and self.last_src == source and self.last_dst == dest): # Ignore duplicate transaction return self.t_timer.cancel() self.tid = tid self.last_src = source self.last_dst = dest self.t_timer.start(TRANSACTION_TIMEOUT, self.t_track) # Unacknowledged "set" if opcode == 0x8203: return rsp_data = struct.pack('>HB', 0x8204, self.state) self.send_message(source, key, rsp_data) def t_track(self): self.t_timer.cancel() self.tid = None self.last_src = 0x0000 self.last_dst = 0x0000 def set_publication(self, period): self.pub_period = period if period == 0: self.pub_timer.cancel() return # We do not handle ms in this example if period < 1000: return self.pub_timer.start(period/1000, self.publish) def publish(self): print('Publish') data = struct.pack('>HB', 0x8204, self.state) self.send_publication(data) ######################## # On Off Client Model ######################## class OnOffClient(Model): def __init__(self, model_id): Model.__init__(self, model_id) self.tid = 0 self.data = None self.cmd_ops = { 0x8201, # get 0x8202, # set 0x8203, # set unacknowledged 0x8204 } # status print('OnOff Client') def _send_message(self, dest, key, data): print('OnOffClient send command') self.send_message(dest, key, data) def get_state(self, dest, key): opcode = 0x8201 self.data = struct.pack('>H', opcode) self._send_message(dest, key, self.data) def set_state(self, dest, key, state): opcode = 0x8202 print('Set state:', state) self.data = struct.pack('>HBB', opcode, state, self.tid) self.tid = (self.tid + 1) % 255 self._send_message(dest, key, self.data) def repeat(self, dest, key): if self.data != None: self._send_message(dest, key, self.data) else: print('No previous command stored') def process_message(self, source, dest, key, data): print('OnOffClient process message len = ', end = '') datalen = len(data) print(datalen) if datalen != 3: # The opcode is not recognized by this model return opcode, state = struct.unpack('>HB',bytes(data)) if opcode != 0x8204 : # The opcode is not recognized by this model return print(set_yellow('Got state '), end = '') state_str = "ON" if state == 0: state_str = "OFF" print(set_green(state_str), set_yellow('from'), set_green('%04x' % source)) ######################## # Sample Vendor Model ######################## class SampleVendor(Model): def __init__(self, model_id): Model.__init__(self, model_id) self.vendor = 0x05F1 # Linux Foundation Company ID ######################## # Menu functions ######################## class MenuItem(): def __init__(self, desc, func): self.desc = desc self.func = func class Menu(): def __init__(self, title, menu): self.title = title self.menu = menu def show(self): print(set_cyan('*** ' + self.title.upper() + ' ***')) for k, v in self.menu.items(): print(set_green(k), set_cyan(v.desc)) def process_cmd(self, str_value): if is_error(): self.show() clear_error() return cmds = [] for key in self.menu.keys(): if key.startswith(str_value): cmds.append(key) if len(cmds) == 0: print(set_error('Unknown menu option: '), str_value) self.show() return if len(cmds) > 1: for cmd in cmds: print(set_cyan(cmd + '?')) return self.menu.get(cmds[0]).func() class MenuHandler(object): def __init__(self, callback): self.cb = callback flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) flags |= os.O_NONBLOCK fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags) sys.stdin.flush() GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.input_callback) def input_callback(self, fd, condition): chunk = fd.read() buffer = '' for char in chunk: buffer += char if char == '\n': self.cb(buffer) return True def process_input(input_str): str_value = input_str.strip() # Allow entering empty lines for better output visibility if len(str_value) == 0: return current_menu.process_cmd(str_value) def switch_menu(level): global current_menu if level >= len(menus): return current_menu = menus[level] current_menu.show() ######################## # Main menu class ######################## class MainMenu(Menu): def __init__(self): menu_items = { 'token': MenuItem(' - set node ID (token)', self.__cmd_set_token), 'join': MenuItem(' - join mesh network', self.__cmd_join), 'attach': MenuItem(' - attach mesh node', self.__cmd_attach), 'remove': MenuItem(' - delete node', self.__cmd_remove), 'dest': MenuItem(' - set destination address', self.__cmd_set_dest), 'uuid': MenuItem(' - set remote uuid', self.__cmd_set_uuid), 'app-index': MenuItem(' - set AppKey index', self.__cmd_set_app_idx), 'vendor-send': MenuItem(' - send raw vendor message', self.__cmd_vendor_msg), 'client-menu': MenuItem(' - On/Off client menu', self.__cmd_client_menu), 'quit': MenuItem(' - exit the test', app_exit) } Menu.__init__(self, 'Main Menu', menu_items) def __cmd_client_menu(self): if attached != True: print(set_error('Disallowed: node is not attached')) return switch_menu(ON_OFF_CLIENT_MENU) def __cmd_set_token(self): global user_input if have_token == True: print('Token already set') return user_input = INPUT_TOKEN print(set_cyan('Enter 16-digit hex node ID:')) def __cmd_set_dest(self): global user_input user_input = INPUT_DEST_ADDRESS print(set_cyan('Enter 4-digit hex destination address:')) def __cmd_set_uuid(self): global user_input user_input = INPUT_UUID print(set_cyan('Enter 32-digit hex remote UUID:')) def __cmd_set_app_idx(self): global user_input user_input = INPUT_APP_KEY_INDEX; print(set_cyan('Enter app key index (up to 3 digit hex):')) def __cmd_vendor_msg(self): global user_input user_input = INPUT_MESSAGE_PAYLOAD; print(set_cyan('Enter message payload (hex):')) def __cmd_join(self): if agent == None: print(set_error('Provisioning agent not found')) return uuid_bytes = uuid.uuid4().bytes uuid_str = array_to_string(uuid_bytes) print(set_yellow('Joining with UUID ') + set_green(uuid_str)) mesh_net.Join(app.get_path(), uuid_bytes, reply_handler=join_cb, error_handler=join_error_cb) def __cmd_attach(self): if have_token == False: print(set_error('Token is not set')) self.show() return attach(token) def __cmd_remove(self): if have_token == False: print(set_error('Token is not set')) self.show() return print('Removing mesh node') mesh_net.Leave(token, reply_handler=remove_node_cb, error_handler=generic_error_cb) def __send_vendor_msg(self, str_value): try: msg_data = bytearray.fromhex(str_value) except ValueError: raise_error('Not a valid hexadecimal input') return print(set_yellow('Send data: ' + set_green(str_value))) app.elements[0].models[1].send_message(dst_addr, app_idx, msg_data) def process_cmd(self, str_value): global user_input global dst_addr global app_idx if user_input == INPUT_TOKEN: set_token(str_value) elif user_input == INPUT_UUID: set_uuid(str_value) elif user_input == INPUT_DEST_ADDRESS: res = set_value(str_value, 4, 4) if is_error() != True: dst_addr = res print(set_yellow("Destination address: ") + set_green(format(dst_addr, '04x'))) elif user_input == INPUT_APP_KEY_INDEX: res = set_value(str_value, 1, 3) if is_error() != True: app_idx = res print(set_yellow("Application index: ") + set_green(format(app_idx, '03x'))) elif user_input == INPUT_MESSAGE_PAYLOAD: self.__send_vendor_msg(str_value) if user_input != INPUT_NONE: user_input = INPUT_NONE if is_error() != True: return Menu.process_cmd(self, str_value) ############################## # On/Off Client menu class ############################## class ClientMenu(Menu): def __init__(self): menu_items = { 'get-state': MenuItem(' - get server state', self.__cmd_get_state), 'off': MenuItem(' - set state OFF', self.__cmd_set_state_off), 'on': MenuItem(' - set state ON', self.__cmd_set_state_on), 'repeat': MenuItem(' - repeat last command', self.__cmd_repeat_transaction), 'back': MenuItem(' - back to main menu', self.__cmd_main_menu), 'quit': MenuItem(' - exit the test', app_exit) } Menu.__init__(self, 'On/Off Client Menu', menu_items) def __cmd_main_menu(self): switch_menu(MAIN_MENU) def __cmd_get_state(self): app.elements[1].models[0].get_state(dst_addr, app_idx) def __cmd_set_state_off(self): app.elements[1].models[0].set_state(dst_addr, app_idx, 0) def __cmd_set_state_on(self): app.elements[1].models[0].set_state(dst_addr, app_idx, 1) def __cmd_repeat_transaction(self): app.elements[1].models[0].repeat(dst_addr, app_idx) def set_value(str_value, min, max): if len(str_value) > max or len(str_value) < min: raise_error('Bad input length %d' % len(str_value)) return -1 try: value = int(str_value, 16) except ValueError: raise_error('Not a valid hexadecimal number') return -1 return value ######################## # Main entry ######################## def main(): DBusGMainLoop(set_as_default=True) global bus bus = dbus.SystemBus() global mainloop global app global mesh_net global menu global current_menu if len(sys.argv) > 1 : set_token(sys.argv[1]) mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME, "/org/bluez/mesh"), MESH_NETWORK_IFACE) mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb) app = Application(bus) # Provisioning agent if agent != None: app.set_agent(agent.Agent(bus)) first_ele = Element(bus, 0x00) second_ele = Element(bus, 0x01) print(set_yellow('Register OnOff Server model on element 0')) first_ele.add_model(OnOffServer(0x1000)) print(set_yellow('Register Vendor model on element 0')) first_ele.add_model(SampleVendor(0x0001)) print(set_yellow('Register OnOff Client model on element 1')) second_ele.add_model(OnOffClient(0x1001)) app.add_element(first_ele) app.add_element(second_ele) mainloop = GLib.MainLoop() menus.append(MainMenu()) menus.append(ClientMenu()) switch_menu(MAIN_MENU) event_catcher = MenuHandler(process_input); mainloop.run() if __name__ == '__main__': main()