# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import unittest import logging from threading import Thread, Event import qpid.messaging import qmf2.common import qmf2.console import qmf2.agent class _testNotifier(qmf.qmfCommon.Notifier): def __init__(self): self._event = Event() def indication(self): # note: called by qmf daemon thread self._event.set() def wait_for_work(self, timeout): # note: called by application thread to wait # for qmf to generate work self._event.wait(timeout) timed_out = self._event.isSet() == False if not timed_out: self._event.clear() return True return False class _agentApp(Thread): def __init__(self, name, heartbeat): Thread.__init__(self) self.notifier = _testNotifier() self.agent = qmf.qmfAgent.Agent(name, _notifier=self.notifier, _heartbeat_interval=heartbeat) # No database needed for this test self.running = True self.start() def connect_agent(self, broker_url): # broker_url = "user/passwd@hostname:port" self.conn = qpid.messaging.Connection(broker_url.host, broker_url.port, broker_url.user, broker_url.password) self.conn.connect() self.agent.set_connection(self.conn) def disconnect_agent(self, timeout): if self.conn: self.agent.remove_connection(timeout) def shutdown_agent(self, timeout): self.agent.destroy(timeout) def stop(self): self.running = False self.notifier.indication() # hmmm... collide with daemon??? self.join(10) if self.isAlive(): logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!") def run(self): while self.running: self.notifier.wait_for_work(None) wi = self.agent.get_next_workitem(timeout=0) while wi is not None: logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) self.agent.release_workitem(wi) wi = self.agent.get_next_workitem(timeout=0) class BaseTest(unittest.TestCase): def configure(self, config): self.config = config self.broker = config.broker self.defines = self.config.defines def setUp(self): # one second agent indication interval self.agent1 = _agentApp("agent1", 1) self.agent1.connect_agent(self.broker) self.agent2 = _agentApp("agent2", 1) self.agent2.connect_agent(self.broker) def tearDown(self): if self.agent1: self.agent1.shutdown_agent(10) self.agent1.stop() self.agent1 = None if self.agent2: self.agent2.shutdown_agent(10) self.agent2.stop() self.agent2 = None def test_discover_all(self): # create console # enable agent discovery # wait # expect agent add for agent1 and agent2 self.notifier = _testNotifier() self.console = qmf.qmfConsole.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.addConnection(self.conn) self.console.enable_agent_discovery() agent1_found = agent2_found = False wi = self.console.get_next_workitem(timeout=3) while wi and not (agent1_found and agent2_found): if wi.get_type() == wi.AGENT_ADDED: agent = wi.get_params().get("agent") if not agent or not isinstance(agent, qmf.qmfConsole.Agent): self.fail("Unexpected workitem from agent") else: if agent.get_name() == "agent1": agent1_found = True elif agent.get_name() == "agent2": agent2_found = True else: self.fail("Unexpected agent name received: %s" % agent.get_name()) if agent1_found and agent2_found: break; wi = self.console.get_next_workitem(timeout=3) self.assertTrue(agent1_found and agent2_found, "All agents not discovered") self.console.destroy(10) def test_discover_one(self): # create console # enable agent discovery, filter for agent1 only # wait until timeout # expect agent add for agent1 only self.notifier = _testNotifier() self.console = qmf.qmfConsole.Console(notifier=self.notifier, agent_timeout=3) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.addConnection(self.conn) query = qmf.qmfCommon.QmfQuery.create_predicate( qmf.qmfCommon.QmfQuery.TARGET_AGENT, qmf.qmfCommon.QmfQueryPredicate({qmf.qmfCommon.QmfQuery.CMP_EQ: [qmf.qmfCommon.QmfQuery.KEY_AGENT_NAME, "agent1"]})) self.console.enable_agent_discovery(query) agent1_found = agent2_found = False wi = self.console.get_next_workitem(timeout=3) while wi: if wi.get_type() == wi.AGENT_ADDED: agent = wi.get_params().get("agent") if not agent or not isinstance(agent, qmf.qmfConsole.Agent): self.fail("Unexpected workitem from agent") else: if agent.get_name() == "agent1": agent1_found = True elif agent.get_name() == "agent2": agent2_found = True else: self.fail("Unexpected agent name received: %s" % agent.get_name()) wi = self.console.get_next_workitem(timeout=2) self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered") self.console.destroy(10) def test_heartbeat(self): # create console with 2 sec agent timeout # enable agent discovery, find all agents # stop agent1, expect timeout notification # stop agent2, expect timeout notification self.notifier = _testNotifier() self.console = qmf.qmfConsole.Console(notifier=self.notifier, agent_timeout=2) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.addConnection(self.conn) self.console.enable_agent_discovery() agent1_found = agent2_found = False wi = self.console.get_next_workitem(timeout=4) while wi and not (agent1_found and agent2_found): if wi.get_type() == wi.AGENT_ADDED: agent = wi.get_params().get("agent") if not agent or not isinstance(agent, qmf.qmfConsole.Agent): self.fail("Unexpected workitem from agent") else: if agent.get_name() == "agent1": agent1_found = True elif agent.get_name() == "agent2": agent2_found = True else: self.fail("Unexpected agent name received: %s" % agent.get_name()) if agent1_found and agent2_found: break; wi = self.console.get_next_workitem(timeout=4) self.assertTrue(agent1_found and agent2_found, "All agents not discovered") # now kill agent1 and wait for expiration agent1 = self.agent1 self.agent1 = None agent1.shutdown_agent(10) agent1.stop() wi = self.console.get_next_workitem(timeout=4) while wi is not None: if wi.get_type() == wi.AGENT_DELETED: agent = wi.get_params().get("agent") if not agent or not isinstance(agent, qmf.qmfConsole.Agent): self.fail("Unexpected workitem from agent") else: if agent.get_name() == "agent1": agent1_found = False else: self.fail("Unexpected agent_deleted received: %s" % agent.get_name()) if not agent1_found: break; wi = self.console.get_next_workitem(timeout=4) self.assertFalse(agent1_found, "agent1 did not delete!") # now kill agent2 and wait for expiration agent2 = self.agent2 self.agent2 = None agent2.shutdown_agent(10) agent2.stop() wi = self.console.get_next_workitem(timeout=4) while wi is not None: if wi.get_type() == wi.AGENT_DELETED: agent = wi.get_params().get("agent") if not agent or not isinstance(agent, qmf.qmfConsole.Agent): self.fail("Unexpected workitem from agent") else: if agent.get_name() == "agent2": agent2_found = False else: self.fail("Unexpected agent_deleted received: %s" % agent.get_name()) if not agent2_found: break; wi = self.console.get_next_workitem(timeout=4) self.assertFalse(agent2_found, "agent2 did not delete!") self.console.destroy(10) def test_find_agent(self): # create console # do not enable agent discovery # find agent1, expect success # find agent-none, expect failure # find agent2, expect success self.notifier = _testNotifier() self.console = qmf.qmfConsole.Console(notifier=self.notifier) self.conn = qpid.messaging.Connection(self.broker.host, self.broker.port, self.broker.user, self.broker.password) self.conn.connect() self.console.addConnection(self.conn) agent1 = self.console.find_agent("agent1", timeout=3) self.assertTrue(agent1 and agent1.get_name() == "agent1") no_agent = self.console.find_agent("agent-none", timeout=3) self.assertTrue(no_agent == None) agent2 = self.console.find_agent("agent2", timeout=3) self.assertTrue(agent2 and agent2.get_name() == "agent2") self.console.removeConnection(self.conn, 10) self.console.destroy(10)