summaryrefslogtreecommitdiff
path: root/qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py')
-rw-r--r--qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py b/qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py
new file mode 100644
index 0000000000..624c9b3823
--- /dev/null
+++ b/qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py
@@ -0,0 +1,202 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import time
+import datetime
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qpid.harness import Skipped
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, SchemaEventClass,
+ QmfEvent)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, broker_url, heartbeat):
+ Thread.__init__(self)
+ self.timeout = 3
+ self.broker_url = broker_url
+ self.notifier = _testNotifier()
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ heartbeat_interval=heartbeat)
+
+ # Dynamically construct a management database
+
+ _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage",
+ "MyClass",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A test event schema")
+ # add properties
+ _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # Add schema to Agent
+ self.schema = _schema
+ self.agent.register_object_class(_schema)
+
+ self.running = False
+ self.ready = Event()
+
+ def start_app(self):
+ self.running = True
+ self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
+ # time.sleep(1)
+
+ def stop_app(self):
+ self.running = False
+ # wake main thread
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(self.timeout)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ conn = qpid.messaging.Connection(self.broker_url)
+ try:
+ conn.open()
+ except qpid.messaging.ConnectError, e:
+ raise Skipped(e)
+
+ self.agent.set_connection(conn)
+ self.ready.set()
+
+ counter = 1
+ while self.running:
+ # post an event every second
+ event = QmfEvent.create(long(time.time() * 1000),
+ QmfEvent.SEV_WARNING,
+ {"prop-1": counter,
+ "prop-2": str(datetime.datetime.utcnow())},
+ _schema_id=self.schema.get_class_id())
+ counter += 1
+ self.agent.raise_event(event)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+ self.notifier.wait_for_work(1)
+
+ self.agent.remove_connection(self.timeout)
+ self.agent.destroy(self.timeout)
+
+
+
+class BaseTest(unittest.TestCase):
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ # one second agent indication interval
+ self.agent1 = _agentApp("agent1", self.broker, 1)
+ self.agent1.start_app()
+ self.agent2 = _agentApp("agent2", self.broker, 1)
+ self.agent2.start_app()
+
+ def tearDown(self):
+ if self.agent1:
+ self.agent1.stop_app()
+ self.agent1 = None
+ if self.agent2:
+ self.agent2.stop_app()
+ self.agent2 = None
+
+ def test_get_events(self):
+ # create console
+ # find agents
+
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker)
+ try:
+ self.conn.open()
+ except qpid.messaging.ConnectError, e:
+ raise Skipped(e)
+
+ self.console.add_connection(self.conn)
+
+ # find the agents
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now wait for events
+ agent1_events = agent2_events = 0
+ wi = self.console.get_next_workitem(timeout=4)
+ while wi:
+ if wi.get_type() == wi.EVENT_RECEIVED:
+ event = wi.get_params().get("event")
+ self.assertTrue(isinstance(event, QmfEvent))
+ self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING)
+ self.assertTrue(event.get_value("prop-1") > 0)
+
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf2.console.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_events += 1
+ elif agent.get_name() == "agent2":
+ agent2_events += 1
+ else:
+ self.fail("Unexpected agent name received: %s" %
+ agent.get_name())
+ if agent1_events and agent2_events:
+ break;
+
+ wi = self.console.get_next_workitem(timeout=4)
+
+ self.assertTrue(agent1_events > 0 and agent2_events > 0)
+
+ self.console.destroy(10)
+
+
+
+