diff options
Diffstat (limited to 'qpid/python/qmf2/tests/agent_discovery.py')
-rw-r--r-- | qpid/python/qmf2/tests/agent_discovery.py | 320 |
1 files changed, 320 insertions, 0 deletions
diff --git a/qpid/python/qmf2/tests/agent_discovery.py b/qpid/python/qmf2/tests/agent_discovery.py new file mode 100644 index 0000000000..19ed79cbc2 --- /dev/null +++ b/qpid/python/qmf2/tests/agent_discovery.py @@ -0,0 +1,320 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +import qmf2.common +import qmf2.console +import qmf2.agent + + +class _testNotifier(qmf.qmfCommon.Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = qmf.qmfAgent.Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + # No database needed for this test + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", 1) + self.agent1.connect_agent(self.broker) + self.agent2 = _agentApp("agent2", 1) + self.agent2.connect_agent(self.broker) + + def tearDown(self): + if self.agent1: + self.agent1.shutdown_agent(10) + self.agent1.stop() + self.agent1 = None + if self.agent2: + self.agent2.shutdown_agent(10) + self.agent2.stop() + self.agent2 = None + + def test_discover_all(self): + # create console + # enable agent discovery + # wait + # expect agent add for agent1 and agent2 + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + self.console.enable_agent_discovery() + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=3) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=3) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + self.console.destroy(10) + + + def test_discover_one(self): + # create console + # enable agent discovery, filter for agent1 only + # wait until timeout + # expect agent add for agent1 only + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + query = qmf.qmfCommon.QmfQuery.create_predicate( + qmf.qmfCommon.QmfQuery.TARGET_AGENT, + qmf.qmfCommon.QmfQueryPredicate({qmf.qmfCommon.QmfQuery.CMP_EQ: + [qmf.qmfCommon.QmfQuery.KEY_AGENT_NAME, "agent1"]})) + self.console.enable_agent_discovery(query) + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=3) + while wi: + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + + wi = self.console.get_next_workitem(timeout=2) + + self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered") + + self.console.destroy(10) + + + def test_heartbeat(self): + # create console with 2 sec agent timeout + # enable agent discovery, find all agents + # stop agent1, expect timeout notification + # stop agent2, expect timeout notification + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier, + agent_timeout=2) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + self.console.enable_agent_discovery() + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=4) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + # now kill agent1 and wait for expiration + + agent1 = self.agent1 + self.agent1 = None + agent1.shutdown_agent(10) + agent1.stop() + + wi = self.console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = False + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + if not agent1_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertFalse(agent1_found, "agent1 did not delete!") + + # now kill agent2 and wait for expiration + + agent2 = self.agent2 + self.agent2 = None + agent2.shutdown_agent(10) + agent2.stop() + + wi = self.console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf.qmfConsole.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent2": + agent2_found = False + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + if not agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertFalse(agent2_found, "agent2 did not delete!") + + self.console.destroy(10) + + + def test_find_agent(self): + # create console + # do not enable agent discovery + # find agent1, expect success + # find agent-none, expect failure + # find agent2, expect success + self.notifier = _testNotifier() + self.console = qmf.qmfConsole.Console(notifier=self.notifier) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + agent1 = self.console.find_agent("agent1", timeout=3) + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + no_agent = self.console.find_agent("agent-none", timeout=3) + self.assertTrue(no_agent == None) + + agent2 = self.console.find_agent("agent2", timeout=3) + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + self.console.removeConnection(self.conn, 10) + self.console.destroy(10) + + |