# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import unittest import logging from threading import Thread, Event import qpid.messaging from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, QmfData) import qmf2.console from qmf2.agent import(QmfAgentData, Agent) class _testNotifier(Notifier): def __init__(self): self._event = Event() def indication(self): # note: called by qmf daemon thread self._event.set() def wait_for_work(self, timeout): # note: called by application thread to wait # for qmf to generate work self._event.wait(timeout) timed_out = self._event.isSet() == False if not timed_out: self._event.clear() return True return False class _agentApp(Thread): def __init__(self, name, broker_url, heartbeat): Thread.__init__(self) self.notifier = _testNotifier() self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, _heartbeat_interval=heartbeat) # Dynamically construct a management database _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), _desc="A test data schema", _object_id_names=["index1", "index2"] ) # add properties _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) # these two properties are statistics _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) # These two properties can be set via the method call _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) # add method _meth = SchemaMethod( _desc="Method to set string and int in object." ) _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) _schema.add_method( "set_meth", _meth ) # Add schema to Agent self.agent.register_object_class(_schema) # instantiate managed data objects matching the schema _obj1 = QmfAgentData( self.agent, _schema=_schema, _values={"index1":100, "index2":"a name"}) _obj1.set_value("set_string", "UNSET") _obj1.set_value("set_int", 0) _obj1.set_value("query_count", 0) _obj1.set_value("method_call_count", 0) self.agent.add_object( _obj1 ) self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, _values={"index1":99, "index2": "another name", "set_string": "UNSET", "set_int": 0, "query_count": 0, "method_call_count": 0} )) self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, _values={"index1":50, "index2": "my name", "set_string": "SET", "set_int": 0, "query_count": 0, "method_call_count": 0} )) # add an "unstructured" object to the Agent _obj2 = QmfAgentData(self.agent, _object_id="01545") _obj2.set_value("field1", "a value") _obj2.set_value("field2", 2) _obj2.set_value("field3", {"a":1, "map":2, "value":3}) _obj2.set_value("field4", ["a", "list", "value"]) _obj2.set_value("index1", 50) self.agent.add_object(_obj2) _obj2 = QmfAgentData(self.agent, _object_id="01546") _obj2.set_value("field1", "a value") _obj2.set_value("field2", 3) _obj2.set_value("field3", {"a":1, "map":2, "value":3}) _obj2.set_value("field4", ["a", "list", "value"]) _obj2.set_value("index1", 51) self.agent.add_object(_obj2) _obj2 = QmfAgentData(self.agent, _object_id="01544") _obj2.set_value("field1", "a value") _obj2.set_value("field2", 4) _obj2.set_value("field3", {"a":1, "map":2, "value":3}) _obj2.set_value("field4", ["a", "list", "value"]) _obj2.set_value("index1", 49) self.agent.add_object(_obj2) _obj2 = QmfAgentData(self.agent, _object_id="01543") _obj2.set_value("field1", "a value") _obj2.set_value("field2", 4) _obj2.set_value("field3", {"a":1, "map":2, "value":3}) _obj2.set_value("field4", ["a", "list", "value"]) _obj2.set_value("index1", 48) self.agent.add_object(_obj2) self.running = False self.ready = Event() def start_app(self): self.running = True self.start() self.ready.wait(10) if not self.ready.is_set(): raise Exception("Agent failed to connect to broker.") def stop_app(self): self.running = False # wake main thread self.notifier.indication() # hmmm... collide with daemon??? self.join(10) if self.isAlive(): raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") def run(self): # broker_url = "user/passwd@hostname:port" self.conn = qpid.messaging.Connection(self.broker_url.host, self.broker_url.port, self.broker_url.user, self.broker_url.password) self.conn.connect() self.agent.set_connection(self.conn) self.ready.set() while self.running: self.notifier.wait_for_work(None) wi = self.agent.get_next_workitem(timeout=0) while wi is not None: logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) self.agent.release_workitem(wi) wi = self.agent.get_next_workitem(timeout=0) if self.conn: self.agent.remove_connection(10) self.agent.destroy(10) class BaseTest(unittest.TestCase): def configure(self, config): self.config = config self.broker = config.broker self.defines = self.config.defines def setUp(self): # one second agent indication interval self.agent_heartbeat = 1 self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) self.agent1.start_app() self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) self.agent2.start_app() def tearDown(self): if self.agent1: self.agent1.stop_app() self.agent1 = None if self.agent2: self.agent2.stop_app() self.agent2 = None def test_all_oids(self): # create console # find agents # synchronous query for all schemas # synchronous query for all objects per schema # verify known object ids are returned self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.add_connection(self.conn) for aname in ["agent1", "agent2"]: agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) # first, find objects per schema query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) sid_list = self.console.do_query(agent, query) self.assertTrue(sid_list and len(sid_list) == 1) for sid in sid_list: t_params = {QmfData.KEY_SCHEMA_ID: sid} query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID, _target_params=t_params) oid_list = self.console.do_query(agent, query) self.assertTrue(isinstance(oid_list, type([])), "Unexpected return type") self.assertTrue(len(oid_list) == 3, "Wrong count") self.assertTrue('100a name' in oid_list) self.assertTrue('99another name' in oid_list) self.assertTrue('50my name' in oid_list) self.assertTrue('01545' not in oid_list) # now, find all unmanaged objects (no schema) query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) oid_list = self.console.do_query(agent, query) self.assertTrue(isinstance(oid_list, type([])), "Unexpected return type") self.assertTrue(len(oid_list) == 4, "Wrong count") self.assertTrue('100a name' not in oid_list) self.assertTrue('99another name' not in oid_list) self.assertTrue('01545' in oid_list) self.assertTrue('01544' in oid_list) self.assertTrue('01543' in oid_list) self.assertTrue('01546' in oid_list) self.console.destroy(10) def test_direct_oids(self): # create console # find agents # synchronous query for each objects # verify objects and schemas are correct self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.add_connection(self.conn) for aname in ["agent1", "agent2"]: agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) # first, find objects per schema query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) sid_list = self.console.do_query(agent, query) self.assertTrue(sid_list and len(sid_list) == 1) for oid in ['100a name', '99another name']: query = QmfQuery.create_id_object(oid, sid_list[0]) obj_list = self.console.do_query(agent, query) self.assertTrue(isinstance(obj_list, type([])), "Unexpected return type") self.assertTrue(len(obj_list) == 1) obj = obj_list[0] self.assertTrue(isinstance(obj, QmfData)) self.assertTrue(obj.get_object_id() == oid) self.assertTrue(obj.get_schema_class_id() == sid_list[0]) schema_id = obj.get_schema_class_id() self.assertTrue(isinstance(schema_id, SchemaClassId)) self.assertTrue(obj.is_described()) # now find schema-less objects for oid in ['01545']: query = QmfQuery.create_id_object(oid) obj_list = self.console.do_query(agent, query) self.assertTrue(isinstance(obj_list, type([])), "Unexpected return type") self.assertTrue(len(obj_list) == 1) obj = obj_list[0] self.assertTrue(isinstance(obj, QmfData)) self.assertTrue(obj.get_object_id() == oid) self.assertFalse(obj.is_described()) self.console.destroy(10) def test_packages(self): # create console # find agents # synchronous query all package names self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.add_connection(self.conn) for aname in ["agent1", "agent2"]: agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) package_list = self.console.do_query(agent, query) self.assertTrue(len(package_list) == 1) self.assertTrue('MyPackage' in package_list) self.console.destroy(10) def test_predicate_schema_id(self): # create console # find agents # synchronous query for all schema by package name self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.add_connection(self.conn) for aname in ["agent1", "agent2"]: agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, [QmfQuery.QUOTE, "MyPackage"]]) schema_list = self.console.do_query(agent, query) self.assertTrue(len(schema_list)) for schema in schema_list: self.assertTrue(schema.get_class_id().get_package_name() == "MyPackage") self.console.destroy(10) def test_predicate_no_match(self): # create console # find agents # synchronous query for all schema by package name self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.add_connection(self.conn) for aname in ["agent1", "agent2"]: agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, [QmfQuery.EQ, [QmfQuery.UNQUOTE, SchemaClassId.KEY_PACKAGE], [QmfQuery.QUOTE, "No-Such-Package"]]) schema_list = self.console.do_query(agent, query) self.assertTrue(len(schema_list) == 0) self.console.destroy(10) def test_predicate_match_string(self): # create console # find agents # synchronous query for all objects with a value named # set_string which is < or equal to "UNSET" self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.add_connection(self.conn) for aname in ["agent1", "agent2"]: agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) # get the schema id for MyPackage:MyClass schema query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, [QmfQuery.AND, [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, [QmfQuery.QUOTE, "MyPackage"]], [QmfQuery.EQ, SchemaClassId.KEY_CLASS, [QmfQuery.QUOTE, "MyClass"]]]) sid_list = self.console.do_query(agent, query) self.assertTrue(len(sid_list) == 1) query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, [QmfQuery.AND, [QmfQuery.EXISTS, [QmfQuery.QUOTE, "set_string"]], [QmfQuery.EQ, "set_string", [QmfQuery.QUOTE, "UNSET"]]], _target_params={QmfData.KEY_SCHEMA_ID: sid_list[0]}) obj_list = self.console.do_query(agent, query) self.assertTrue(len(obj_list) == 2) for obj in obj_list: self.assertTrue(obj.has_value("set_string")) self.assertTrue(obj.get_value("set_string") == "UNSET") self.console.destroy(10) def test_predicate_match_integer(self): # create console # find agents # synchronous query for all objects with a value named # "index1" which is < or equal to various values self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.add_connection(self.conn) for aname in ["agent1", "agent2"]: agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) # Query the unmanaged (no schema) objects # == 50 query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, [QmfQuery.AND, [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], [QmfQuery.EQ, "index1", 50]]) obj_list = self.console.do_query(agent, query) self.assertTrue(len(obj_list) == 1) self.assertTrue(obj_list[0].has_value("index1")) self.assertTrue(obj_list[0].get_value("index1") == 50) # <= 50 query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, [QmfQuery.AND, [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], [QmfQuery.LE, "index1", 50]]) obj_list = self.console.do_query(agent, query) self.assertTrue(len(obj_list) == 3) for obj in obj_list: self.assertTrue(obj.has_value("index1")) self.assertTrue(obj.get_value("index1") <= 50) # > 50 query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, [QmfQuery.AND, [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], [QmfQuery.GT, "index1", 50]]) obj_list = self.console.do_query(agent, query) self.assertTrue(len(obj_list) == 1) for obj in obj_list: self.assertTrue(obj.has_value("index1")) self.assertTrue(obj.get_value("index1") > 50) self.console.destroy(10)