diff options
Diffstat (limited to 'qpid/python/qmf/test/basic_method.py')
-rw-r--r-- | qpid/python/qmf/test/basic_method.py | 348 |
1 files changed, 348 insertions, 0 deletions
diff --git a/qpid/python/qmf/test/basic_method.py b/qpid/python/qmf/test/basic_method.py new file mode 100644 index 0000000000..dfd2a65bff --- /dev/null +++ b/qpid/python/qmf/test/basic_method.py @@ -0,0 +1,348 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf.qmfCommon import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate, WorkItem) +import qmf.qmfConsole +from qmf.qmfAgent import(QmfAgentData, Agent, MethodCallParams) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + self.agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( self.agent, _schema=_schema ) + _obj1.set_value("index1", 100) + _obj1.set_value("index2", "a name" ) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + self.agent.add_object( _obj1 ) + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(self.agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj2) + + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # Agent application main processing loop + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + if wi.get_type() == WorkItem.METHOD_CALL: + mc = wi.get_params() + if not isinstance(mc, MethodCallParams): + raise Exception("Unexpected method call parameters") + + if mc.get_name() == "set_meth": + obj = self.agent.get_object(mc.get_object_id()) + if obj is None: + error_info = QmfData.create({"code": -2, + "description": + "Bad Object Id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + obj.inc_value("method_call_count") + if "arg_int" in mc.get_args(): + obj.set_value("set_int", mc.get_args()["arg_int"]) + if "arg_str" in mc.get_args(): + obj.set_value("set_string", mc.get_args()["arg_str"]) + self.agent.method_response(wi.get_handle(), + {"code" : 0}) + elif mc.get_name() == "a_method": + obj = self.agent.get_object(mc.get_object_id()) + if obj is None: + error_info = QmfData.create({"code": -3, + "description": + "Unknown object id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + elif obj.get_object_id() != "01545": + error_info = QmfData.create({"code": -4, + "description": + "Unexpected id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + args = mc.get_args() + if ("arg1" in args and args["arg1"] == 1 and + "arg2" in args and args["arg2"] == "Now set!" + and "arg3" in args and args["arg3"] == 1966): + self.agent.method_response(wi.get_handle(), + {"code" : 0}) + else: + error_info = QmfData.create({"code": -5, + "description": + "Bad Args."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + error_info = QmfData.create({"code": -1, + "description": + "Unknown method call."}) + self.agent.method_response(wi.get_handle(), _error=error_info) + + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", 1) + self.agent1.connect_agent(self.broker) + self.agent2 = _agentApp("agent2", 1) + self.agent2.connect_agent(self.broker) + + def tearDown(self): + if self.agent1: + self.agent1.shutdown_agent(10) + self.agent1.stop() + self.agent1 = None + if self.agent2: + self.agent2.shutdown_agent(10) + self.agent2.stop() + self.agent2 = None + + def test_described_obj(self): + # create console + # find agents + # synchronous query for all objects in schema + # method call on each object + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, + QmfQueryPredicate( + {QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]}, + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "MyPackage"]}]})) + + obj_list = self.console.doQuery(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + mr = obj.invoke_method( "set_meth", {"arg_int": -99, + "arg_str": "Now set!"}, + _timeout=3) + self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult)) + self.assertTrue(mr.succeeded()) + self.assertTrue(mr.get_argument("code") == 0) + + self.assertTrue(obj.get_value("method_call_count") == 0) + self.assertTrue(obj.get_value("set_string") == "UNSET") + self.assertTrue(obj.get_value("set_int") == 0) + + obj.refresh() + + self.assertTrue(obj.get_value("method_call_count") == 1) + self.assertTrue(obj.get_value("set_string") == "Now set!") + self.assertTrue(obj.get_value("set_int") == -99) + + self.console.destroy(10) + + + def test_bad_method(self): + # create console + # find agents + # synchronous query for all objects in schema + # invalid method call on each object + # - should throw a ValueError + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, + QmfQueryPredicate( + {QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]}, + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "MyPackage"]}]})) + + obj_list = self.console.doQuery(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + self.failUnlessRaises(ValueError, + obj.invoke_method, + "unknown_meth", + {"arg1": -99, "arg2": "Now set!"}, + _timeout=3) + self.console.destroy(10) + + + def test_managed_obj(self): + # create console + # find agents + # synchronous query for a managed object + # method call on each object + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545") + obj_list = self.console.doQuery(agent, query) + + self.assertTrue(isinstance(obj_list, type([]))) + self.assertTrue(len(obj_list) == 1) + obj = obj_list[0] + + mr = obj.invoke_method("a_method", + {"arg1": 1, + "arg2": "Now set!", + "arg3": 1966}, + _timeout=3) + self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult)) + self.assertTrue(mr.succeeded()) + self.assertTrue(mr.get_argument("code") == 0) + # @todo refresh and verify changes + + self.console.destroy(10) |