diff options
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py')
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py b/qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py new file mode 100644 index 0000000000..624c9b3823 --- /dev/null +++ b/qpid/extras/qmf/src/py/qmf2-prototype/tests/events.py @@ -0,0 +1,202 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import time +import datetime +import logging +from threading import Thread, Event + +import qpid.messaging +from qpid.harness import Skipped +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, SchemaEventClass, + QmfEvent) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.timeout = 3 + self.broker_url = broker_url + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage", + "MyClass", + stype=SchemaClassId.TYPE_EVENT), + _desc="A test event schema") + # add properties + _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # Add schema to Agent + self.schema = _schema + self.agent.register_object_class(_schema) + + self.running = False + self.ready = Event() + + def start_app(self): + self.running = True + self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") + # time.sleep(1) + + def stop_app(self): + self.running = False + # wake main thread + self.notifier.indication() # hmmm... collide with daemon??? + self.join(self.timeout) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # broker_url = "user/passwd@hostname:port" + conn = qpid.messaging.Connection(self.broker_url) + try: + conn.open() + except qpid.messaging.ConnectError, e: + raise Skipped(e) + + self.agent.set_connection(conn) + self.ready.set() + + counter = 1 + while self.running: + # post an event every second + event = QmfEvent.create(long(time.time() * 1000), + QmfEvent.SEV_WARNING, + {"prop-1": counter, + "prop-2": str(datetime.datetime.utcnow())}, + _schema_id=self.schema.get_class_id()) + counter += 1 + self.agent.raise_event(event) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + self.notifier.wait_for_work(1) + + self.agent.remove_connection(self.timeout) + self.agent.destroy(self.timeout) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", self.broker, 1) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, 1) + self.agent2.start_app() + + def tearDown(self): + if self.agent1: + self.agent1.stop_app() + self.agent1 = None + if self.agent2: + self.agent2.stop_app() + self.agent2 = None + + def test_get_events(self): + # create console + # find agents + + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker) + try: + self.conn.open() + except qpid.messaging.ConnectError, e: + raise Skipped(e) + + self.console.add_connection(self.conn) + + # find the agents + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now wait for events + agent1_events = agent2_events = 0 + wi = self.console.get_next_workitem(timeout=4) + while wi: + if wi.get_type() == wi.EVENT_RECEIVED: + event = wi.get_params().get("event") + self.assertTrue(isinstance(event, QmfEvent)) + self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING) + self.assertTrue(event.get_value("prop-1") > 0) + + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_events += 1 + elif agent.get_name() == "agent2": + agent2_events += 1 + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_events and agent2_events: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertTrue(agent1_events > 0 and agent2_events > 0) + + self.console.destroy(10) + + + + |