diff options
Diffstat (limited to 'extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py')
-rw-r--r-- | extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py | 464 |
1 files changed, 0 insertions, 464 deletions
diff --git a/extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py b/extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py deleted file mode 100644 index 2c20794aaa..0000000000 --- a/extras/qmf/src/py/qmf2-prototype/tests/agent_discovery.py +++ /dev/null @@ -1,464 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -import time -from threading import Thread, Event - -import qpid.messaging -import qmf2.common -import qmf2.console -import qmf2.agent - - -class _testNotifier(qmf2.common.Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.timeout = 3 - self.broker_url = broker_url - self.notifier = _testNotifier() - self.agent = qmf2.agent.Agent(name, - _notifier=self.notifier, - heartbeat_interval=heartbeat) - # No database needed for this test - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # Connect the agent to the broker, - # broker_url = "user/passwd@hostname:port" - - conn = qpid.messaging.Connection(self.broker_url) - conn.open() - self.agent.set_connection(conn) - self.ready.set() - - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - # done, cleanup agent - self.agent.remove_connection(self.timeout) - self.agent.destroy(self.timeout) - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent indication interval - self.agent_heartbeat = 1 - self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) - self.agent1.start_app() - self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) - self.agent2.start_app() - - def tearDown(self): - if self.agent1: - self.agent1.stop_app() - self.agent1 = None - if self.agent2: - self.agent2.stop_app() - self.agent2 = None - - def test_discover_all(self): - """ - create console - enable agent discovery - wait - expect agent add for agent1 and agent2 - """ - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker) - self.conn.open() - self.console.add_connection(self.conn) - self.console.enable_agent_discovery() - - agent1_found = agent2_found = False - wi = self.console.get_next_workitem(timeout=3) - while wi and not (agent1_found and agent2_found): - if wi.get_type() == wi.AGENT_ADDED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = True - elif agent.get_name() == "agent2": - agent2_found = True - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - if agent1_found and agent2_found: - break; - - wi = self.console.get_next_workitem(timeout=3) - - self.assertTrue(agent1_found and agent2_found, "All agents not discovered") - - self.console.destroy(10) - - - def test_discover_one(self): - """ - create console - enable agent discovery, filter for agent1 only - wait until timeout - expect agent add for agent1 only - """ - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker) - self.conn.open() - self.console.add_connection(self.conn) - - query = qmf2.common.QmfQuery.create_predicate( - qmf2.common.QmfQuery.TARGET_AGENT, - [qmf2.common.QmfQuery.EQ, qmf2.common.QmfQuery.KEY_AGENT_NAME, - [qmf2.common.QmfQuery.QUOTE, "agent1"]]) - self.console.enable_agent_discovery(query) - - agent1_found = agent2_found = False - wi = self.console.get_next_workitem(timeout=3) - while wi: - if wi.get_type() == wi.AGENT_ADDED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = True - elif agent.get_name() == "agent2": - agent2_found = True - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - - wi = self.console.get_next_workitem(timeout=2) - - self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered") - - self.console.destroy(10) - - - def test_heartbeat(self): - """ - create console with 2 sec agent timeout - enable agent discovery, find all agents - stop agent1, expect timeout notification - stop agent2, expect timeout notification - """ - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=2) - self.conn = qpid.messaging.Connection(self.broker) - self.conn.open() - self.console.add_connection(self.conn) - self.console.enable_agent_discovery() - - agent1_found = agent2_found = False - wi = self.console.get_next_workitem(timeout=4) - while wi and not (agent1_found and agent2_found): - if wi.get_type() == wi.AGENT_ADDED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = True - elif agent.get_name() == "agent2": - agent2_found = True - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - if agent1_found and agent2_found: - break; - - wi = self.console.get_next_workitem(timeout=4) - - self.assertTrue(agent1_found and agent2_found, "All agents not discovered") - - # now kill agent1 and wait for expiration - - agent1 = self.agent1 - self.agent1 = None - agent1.stop_app() - - wi = self.console.get_next_workitem(timeout=4) - while wi is not None: - if wi.get_type() == wi.AGENT_DELETED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = False - else: - self.fail("Unexpected agent_deleted received: %s" % - agent.get_name()) - if not agent1_found: - break; - - wi = self.console.get_next_workitem(timeout=4) - - self.assertFalse(agent1_found, "agent1 did not delete!") - - # now kill agent2 and wait for expiration - - agent2 = self.agent2 - self.agent2 = None - agent2.stop_app() - - wi = self.console.get_next_workitem(timeout=4) - while wi is not None: - if wi.get_type() == wi.AGENT_DELETED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent2": - agent2_found = False - else: - self.fail("Unexpected agent_deleted received: %s" % - agent.get_name()) - if not agent2_found: - break; - - wi = self.console.get_next_workitem(timeout=4) - - self.assertFalse(agent2_found, "agent2 did not delete!") - - self.console.destroy(10) - - - def test_find_agent(self): - """ - create console - do not enable agent discovery - find agent1, expect success - find agent-none, expect failure - find agent2, expect success - """ - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier) - self.conn = qpid.messaging.Connection(self.broker) - self.conn.open() - self.console.add_connection(self.conn) - - agent1 = self.console.find_agent("agent1", timeout=3) - self.assertTrue(agent1 and agent1.get_name() == "agent1") - - no_agent = self.console.find_agent("agent-none", timeout=3) - self.assertTrue(no_agent == None) - - agent2 = self.console.find_agent("agent2", timeout=3) - self.assertTrue(agent2 and agent2.get_name() == "agent2") - - self.console.remove_connection(self.conn, 10) - self.console.destroy(10) - - - def test_heartbeat_x2(self): - """ - create 2 consoles with 2 sec agent timeout - enable agent discovery, find all agents - stop agent1, expect timeout notification on both consoles - stop agent2, expect timeout notification on both consoles - """ - console_count = 2 - self.consoles = [] - for i in range(console_count): - console = qmf2.console.Console("test-console-" + str(i), - notifier=_testNotifier(), - agent_timeout=2) - conn = qpid.messaging.Connection(self.broker) - conn.open() - console.add_connection(conn) - console.enable_agent_discovery() - self.consoles.append(console) - - # now wait for all consoles to discover all agents, - # agents send a heartbeat once a second - for console in self.consoles: - agent1_found = agent2_found = False - wi = console.get_next_workitem(timeout=2) - while wi and not (agent1_found and agent2_found): - if wi.get_type() == wi.AGENT_ADDED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = True - elif agent.get_name() == "agent2": - agent2_found = True - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - if agent1_found and agent2_found: - break; - wi = console.get_next_workitem(timeout=2) - - self.assertTrue(agent1_found and agent2_found, "All agents not discovered") - - # now kill agent1 and wait for expiration - - agent1 = self.agent1 - self.agent1 = None - agent1.stop_app() - - for console in self.consoles: - agent1_found = True - wi = console.get_next_workitem(timeout=4) - while wi is not None: - if wi.get_type() == wi.AGENT_DELETED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = False - break - else: - self.fail("Unexpected agent_deleted received: %s" % - agent.get_name()) - - wi = console.get_next_workitem(timeout=4) - - self.assertFalse(agent1_found, "agent1 did not delete!") - - # now kill agent2 and wait for expiration - - agent2 = self.agent2 - self.agent2 = None - agent2.stop_app() - - for console in self.consoles: - agent2_found = True - wi = console.get_next_workitem(timeout=4) - while wi is not None: - if wi.get_type() == wi.AGENT_DELETED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent2": - agent2_found = False - break - else: - self.fail("Unexpected agent_deleted received: %s" % - agent.get_name()) - - wi = console.get_next_workitem(timeout=4) - - self.assertFalse(agent2_found, "agent2 did not delete!") - - - for console in self.consoles: - console.destroy(10) - - - def test_find_agent_x2(self): - """ - create 2 consoles, do not enable agent discovery - console-1: find agent1, expect success - console-2: find agent2, expect success - Verify console-1 does -not- know agent2 - Verify console-2 does -not- know agent1 - """ - console_count = 2 - self.consoles = [] - for i in range(console_count): - console = qmf2.console.Console("test-console-" + str(i), - notifier=_testNotifier(), - agent_timeout=2) - conn = qpid.messaging.Connection(self.broker) - conn.open() - console.add_connection(conn) - self.consoles.append(console) - - agent1 = self.consoles[0].find_agent("agent1", timeout=3) - self.assertTrue(agent1 and agent1.get_name() == "agent1") - - agent2 = self.consoles[1].find_agent("agent2", timeout=3) - self.assertTrue(agent2 and agent2.get_name() == "agent2") - - # wait long enough for agent heartbeats to be sent... - - time.sleep(self.agent_heartbeat * 2) - - agents = self.consoles[0].get_agents() - self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent1") - agent1 = self.consoles[0].get_agent("agent1") - self.assertTrue(agent1 and agent1.get_name() == "agent1") - - - agents = self.consoles[1].get_agents() - self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent2") - agent2 = self.consoles[1].get_agent("agent2") - self.assertTrue(agent2 and agent2.get_name() == "agent2") - - # verify no new agents were learned - - for console in self.consoles: - console.destroy(10) - |