summaryrefslogtreecommitdiff
path: root/qpid/python/qmf2/tests/obj_gets.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-01-25 16:25:05 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-01-25 16:25:05 +0000
commitfadd169527df40c715e549f538d256fc23bab3da (patch)
tree4b95900eb03adeb45984c5451698eb499c6482d1 /qpid/python/qmf2/tests/obj_gets.py
parentf690f1d9aecfd8021dda72714923b55cfe8d607a (diff)
downloadqpid-python-fadd169527df40c715e549f538d256fc23bab3da.tar.gz
Move the QMFv2 implementation to its own directory.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@902858 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qmf2/tests/obj_gets.py')
-rw-r--r--qpid/python/qmf2/tests/obj_gets.py399
1 files changed, 399 insertions, 0 deletions
diff --git a/qpid/python/qmf2/tests/obj_gets.py b/qpid/python/qmf2/tests/obj_gets.py
new file mode 100644
index 0000000000..e58575440d
--- /dev/null
+++ b/qpid/python/qmf2/tests/obj_gets.py
@@ -0,0 +1,399 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, QmfQueryPredicate)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, heartbeat):
+ Thread.__init__(self)
+ self.notifier = _testNotifier()
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat)
+
+ # Management Database
+ # - two different schema packages,
+ # - two classes within one schema package
+ # - multiple objects per schema package+class
+ # - two "undescribed" objects
+
+ # "package1/class1"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"),
+ _desc="A test data schema - one",
+ _object_id_names=["key"] )
+
+ _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("key", "p1c1_key1")
+ _obj.set_value("count1", 0)
+ _obj.set_value("count2", 0)
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("key", "p1c1_key2")
+ _obj.set_value("count1", 9)
+ _obj.set_value("count2", 10)
+ self.agent.add_object( _obj )
+
+ # "package1/class2"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"),
+ _desc="A test data schema - two",
+ _object_id_names=["name"] )
+ # add properties
+ _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("name", "p1c2_name1")
+ _obj.set_value("string1", "a data string")
+ self.agent.add_object( _obj )
+
+
+ # "package2/class1"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"),
+ _desc="A test data schema - second package",
+ _object_id_names=["key"] )
+
+ _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("key", "p2c1_key1")
+ _obj.set_value("counter", 0)
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("key", "p2c1_key2")
+ _obj.set_value("counter", 2112)
+ self.agent.add_object( _obj )
+
+
+ # add two "unstructured" objects to the Agent
+
+ _obj = QmfAgentData(self.agent, _object_id="undesc-1")
+ _obj.set_value("field1", "a value")
+ _obj.set_value("field2", 2)
+ _obj.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj.set_value("field4", ["a", "list", "value"])
+ self.agent.add_object(_obj)
+
+
+ _obj = QmfAgentData(self.agent, _object_id="undesc-2")
+ _obj.set_value("key-1", "a value")
+ _obj.set_value("key-2", 2)
+ self.agent.add_object(_obj)
+
+ self.running = True
+ self.start()
+
+ def connect_agent(self, broker_url):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(broker_url.host,
+ broker_url.port,
+ broker_url.user,
+ broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+
+ def disconnect_agent(self, timeout):
+ if self.conn:
+ self.agent.remove_connection(timeout)
+
+ def shutdown_agent(self, timeout):
+ self.agent.destroy(timeout)
+
+ def stop(self):
+ self.running = False
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+ agent_count = 5
+
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ self.agents = []
+ for i in range(self.agent_count):
+ agent = _agentApp("agent-" + str(i), 1)
+ agent.connect_agent(self.broker)
+ self.agents.append(agent)
+
+ def tearDown(self):
+ for agent in self.agents:
+ if agent is not None:
+ agent.shutdown_agent(10)
+ agent.stop()
+
+
+ def test_all_agents(self):
+ # create console
+ # find all agents
+ # synchronous query for all objects by id
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # console has discovered all agents, now query all undesc-2 objects
+ objs = self.console.get_objects(_object_id="undesc-2", _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_object_id() == "undesc-2")
+
+ # now query all objects from schema "package1"
+ objs = self.console.get_objects(_pname="package1", _timeout=5)
+ self.assertTrue(len(objs) == (self.agent_count * 3))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+ # now query all objects from schema "package2"
+ objs = self.console.get_objects(_pname="package2", _timeout=5)
+ self.assertTrue(len(objs) == (self.agent_count * 2))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+ # now query all objects from schema "package1/class2"
+ objs = self.console.get_objects(_pname="package1", _cname="class2", _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+ # given the schema identifier from the last query, repeat using the
+ # specific schema id
+ schema_id = objs[0].get_schema_class_id()
+ objs = self.console.get_objects(_schema_id=schema_id, _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+ self.console.destroy(10)
+
+
+
+ def test_agent_subset(self):
+ # create console
+ # find all agents
+ # synchronous query for all objects by id
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ agent_list = []
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+ agent_list.append(agent)
+
+ # Only use a subset of the agents:
+ agent_list = agent_list[:len(agent_list)/2]
+
+ # console has discovered all agents, now query all undesc-2 objects
+ objs = self.console.get_objects(_object_id="undesc-2",
+ _agents=agent_list, _timeout=5)
+ self.assertTrue(len(objs) == len(agent_list))
+ for obj in objs:
+ self.assertTrue(obj.get_object_id() == "undesc-2")
+
+ # now query all objects from schema "package1"
+ objs = self.console.get_objects(_pname="package1",
+ _agents=agent_list,
+ _timeout=5)
+ self.assertTrue(len(objs) == (len(agent_list) * 3))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+ # now query all objects from schema "package2"
+ objs = self.console.get_objects(_pname="package2",
+ _agents=agent_list,
+ _timeout=5)
+ self.assertTrue(len(objs) == (len(agent_list) * 2))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+ # now query all objects from schema "package1/class2"
+ objs = self.console.get_objects(_pname="package1", _cname="class2",
+ _agents=agent_list,
+ _timeout=5)
+ self.assertTrue(len(objs) == len(agent_list))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+ # given the schema identifier from the last query, repeat using the
+ # specific schema id
+ schema_id = objs[0].get_schema_class_id()
+ objs = self.console.get_objects(_schema_id=schema_id,
+ _agents=agent_list,
+ _timeout=5)
+ self.assertTrue(len(objs) == len(agent_list))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+ self.console.destroy(10)
+
+
+
+ def test_single_agent(self):
+ # create console
+ # find all agents
+ # synchronous query for all objects by id
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ agent_list = []
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+ agent_list.append(agent)
+
+ # Only use one agetn
+ agent = agent_list[0]
+
+ # console has discovered all agents, now query all undesc-2 objects
+ objs = self.console.get_objects(_object_id="undesc-2",
+ _agents=agent, _timeout=5)
+ self.assertTrue(len(objs) == 1)
+ for obj in objs:
+ self.assertTrue(obj.get_object_id() == "undesc-2")
+
+ # now query all objects from schema "package1"
+ objs = self.console.get_objects(_pname="package1",
+ _agents=agent,
+ _timeout=5)
+ self.assertTrue(len(objs) == 3)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+ # now query all objects from schema "package2"
+ objs = self.console.get_objects(_pname="package2",
+ _agents=agent,
+ _timeout=5)
+ self.assertTrue(len(objs) == 2)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+ # now query all objects from schema "package1/class2"
+ objs = self.console.get_objects(_pname="package1", _cname="class2",
+ _agents=agent,
+ _timeout=5)
+ self.assertTrue(len(objs) == 1)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+ # given the schema identifier from the last query, repeat using the
+ # specific schema id
+ schema_id = objs[0].get_schema_class_id()
+ objs = self.console.get_objects(_schema_id=schema_id,
+ _agents=agent,
+ _timeout=5)
+ self.assertTrue(len(objs) == 1)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+ self.console.destroy(10)
+