diff options
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py')
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py | 464 |
1 files changed, 464 insertions, 0 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py b/qpid/extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py new file mode 100644 index 0000000000..2c20794aaa --- /dev/null +++ b/qpid/extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py @@ -0,0 +1,464 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +import time +from threading import Thread, Event + +import qpid.messaging +import qmf2.common +import qmf2.console +import qmf2.agent + + +class _testNotifier(qmf2.common.Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.timeout = 3 + self.broker_url = broker_url + self.notifier = _testNotifier() + self.agent = qmf2.agent.Agent(name, + _notifier=self.notifier, + heartbeat_interval=heartbeat) + # No database needed for this test + self.running = False + self.ready = Event() + + def start_app(self): + self.running = True + self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") + + def stop_app(self): + self.running = False + # wake main thread + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # Connect the agent to the broker, + # broker_url = "user/passwd@hostname:port" + + conn = qpid.messaging.Connection(self.broker_url) + conn.open() + self.agent.set_connection(conn) + self.ready.set() + + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + # done, cleanup agent + self.agent.remove_connection(self.timeout) + self.agent.destroy(self.timeout) + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent_heartbeat = 1 + self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) + self.agent2.start_app() + + def tearDown(self): + if self.agent1: + self.agent1.stop_app() + self.agent1 = None + if self.agent2: + self.agent2.stop_app() + self.agent2 = None + + def test_discover_all(self): + """ + create console + enable agent discovery + wait + expect agent add for agent1 and agent2 + """ + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker) + self.conn.open() + self.console.add_connection(self.conn) + self.console.enable_agent_discovery() + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=3) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=3) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + self.console.destroy(10) + + + def test_discover_one(self): + """ + create console + enable agent discovery, filter for agent1 only + wait until timeout + expect agent add for agent1 only + """ + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker) + self.conn.open() + self.console.add_connection(self.conn) + + query = qmf2.common.QmfQuery.create_predicate( + qmf2.common.QmfQuery.TARGET_AGENT, + [qmf2.common.QmfQuery.EQ, qmf2.common.QmfQuery.KEY_AGENT_NAME, + [qmf2.common.QmfQuery.QUOTE, "agent1"]]) + self.console.enable_agent_discovery(query) + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=3) + while wi: + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + + wi = self.console.get_next_workitem(timeout=2) + + self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered") + + self.console.destroy(10) + + + def test_heartbeat(self): + """ + create console with 2 sec agent timeout + enable agent discovery, find all agents + stop agent1, expect timeout notification + stop agent2, expect timeout notification + """ + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=2) + self.conn = qpid.messaging.Connection(self.broker) + self.conn.open() + self.console.add_connection(self.conn) + self.console.enable_agent_discovery() + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=4) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + # now kill agent1 and wait for expiration + + agent1 = self.agent1 + self.agent1 = None + agent1.stop_app() + + wi = self.console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = False + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + if not agent1_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertFalse(agent1_found, "agent1 did not delete!") + + # now kill agent2 and wait for expiration + + agent2 = self.agent2 + self.agent2 = None + agent2.stop_app() + + wi = self.console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent2": + agent2_found = False + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + if not agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertFalse(agent2_found, "agent2 did not delete!") + + self.console.destroy(10) + + + def test_find_agent(self): + """ + create console + do not enable agent discovery + find agent1, expect success + find agent-none, expect failure + find agent2, expect success + """ + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier) + self.conn = qpid.messaging.Connection(self.broker) + self.conn.open() + self.console.add_connection(self.conn) + + agent1 = self.console.find_agent("agent1", timeout=3) + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + no_agent = self.console.find_agent("agent-none", timeout=3) + self.assertTrue(no_agent == None) + + agent2 = self.console.find_agent("agent2", timeout=3) + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + self.console.remove_connection(self.conn, 10) + self.console.destroy(10) + + + def test_heartbeat_x2(self): + """ + create 2 consoles with 2 sec agent timeout + enable agent discovery, find all agents + stop agent1, expect timeout notification on both consoles + stop agent2, expect timeout notification on both consoles + """ + console_count = 2 + self.consoles = [] + for i in range(console_count): + console = qmf2.console.Console("test-console-" + str(i), + notifier=_testNotifier(), + agent_timeout=2) + conn = qpid.messaging.Connection(self.broker) + conn.open() + console.add_connection(conn) + console.enable_agent_discovery() + self.consoles.append(console) + + # now wait for all consoles to discover all agents, + # agents send a heartbeat once a second + for console in self.consoles: + agent1_found = agent2_found = False + wi = console.get_next_workitem(timeout=2) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + wi = console.get_next_workitem(timeout=2) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + # now kill agent1 and wait for expiration + + agent1 = self.agent1 + self.agent1 = None + agent1.stop_app() + + for console in self.consoles: + agent1_found = True + wi = console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = False + break + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + + wi = console.get_next_workitem(timeout=4) + + self.assertFalse(agent1_found, "agent1 did not delete!") + + # now kill agent2 and wait for expiration + + agent2 = self.agent2 + self.agent2 = None + agent2.stop_app() + + for console in self.consoles: + agent2_found = True + wi = console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent2": + agent2_found = False + break + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + + wi = console.get_next_workitem(timeout=4) + + self.assertFalse(agent2_found, "agent2 did not delete!") + + + for console in self.consoles: + console.destroy(10) + + + def test_find_agent_x2(self): + """ + create 2 consoles, do not enable agent discovery + console-1: find agent1, expect success + console-2: find agent2, expect success + Verify console-1 does -not- know agent2 + Verify console-2 does -not- know agent1 + """ + console_count = 2 + self.consoles = [] + for i in range(console_count): + console = qmf2.console.Console("test-console-" + str(i), + notifier=_testNotifier(), + agent_timeout=2) + conn = qpid.messaging.Connection(self.broker) + conn.open() + console.add_connection(conn) + self.consoles.append(console) + + agent1 = self.consoles[0].find_agent("agent1", timeout=3) + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + agent2 = self.consoles[1].find_agent("agent2", timeout=3) + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + # wait long enough for agent heartbeats to be sent... + + time.sleep(self.agent_heartbeat * 2) + + agents = self.consoles[0].get_agents() + self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent1") + agent1 = self.consoles[0].get_agent("agent1") + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + + agents = self.consoles[1].get_agents() + self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent2") + agent2 = self.consoles[1].get_agent("agent2") + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + # verify no new agents were learned + + for console in self.consoles: + console.destroy(10) + |