diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2013-04-05 17:17:06 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2013-04-05 17:17:06 +0000 |
commit | dea8a4b6628ab7ddce1b9cfc52316c80df7eff34 (patch) | |
tree | 23fbf78d24726071ebca897bda75380f8908c858 | |
parent | d0416615f7a54ae6888901a9b1453e005e9ef22e (diff) | |
download | qpid-python-dea8a4b6628ab7ddce1b9cfc52316c80df7eff34.tar.gz |
QPID-4689: fix dropped heartbeat indications and object update callbacks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1465050 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/CMakeLists.txt | 1 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpidd_qmfv2_tests.py | 203 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 30 |
3 files changed, 220 insertions, 14 deletions
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 001b5d2d69..cb10414970 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -326,6 +326,7 @@ endif (PYTHON_EXECUTABLE) add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix}) if (PYTHON_EXECUTABLE) add_test (ha_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py) + add_test (qpidd_qmfv2_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py) if (BUILD_AMQP) add_test (interlink_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py) endif (BUILD_AMQP) diff --git a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py new file mode 100755 index 0000000000..d9a725fb23 --- /dev/null +++ b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Runs QMF tests against a broker running with QMFv1 disabled. This forces the +# broker to use QMFv2 only. This is necessary as if there is a bug in V2, some +# V1 operations may hide that (esp. asynchonous notifications) + + +import sys, shutil, os +from time import sleep +from brokertest import * +from qpid.messaging import Message +try: import qmf.console +except: print "Cannot import module qmf.console, skipping tests"; exit(0); + + +class ConsoleTest(BrokerTest): + """ + Test QMFv2 support using the qmf.console library. + """ + PUB_INTERVAL=1 + + def setUp(self): + BrokerTest.setUp(self) + args = ["--mgmt-qmf1=no", + "--mgmt-pub-interval=%d" % self.PUB_INTERVAL] + self.broker = BrokerTest.broker(self, args) + + def _startQmfV2(self, broker, console=None): + # I manually set up the QMF session here rather than call the startQmf + # method from BrokerTest as I can guarantee the console library is used + # (assuming BrokerTest's implementation of startQmf could change) + self.qmf_session = qmf.console.Session(console) + self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (broker.host(), + broker.port())) + self.assertEqual(self.qmf_broker.getBrokerAgent().isV2, True, + "Expected broker agent to support QMF V2") + + + def _create_queue( self, q_name, args={} ): + broker = self.qmf_session.getObjects(_class="broker")[0] + result = broker.create("queue", q_name, args, False) + self.assertEqual(result.status, 0, result) + + + def test_method_call(self): + """ Verify method calls work, and check the behavior of getObjects() + call + """ + self._startQmfV2( self.broker ) + self._create_queue( "fleabag", {"auto-delete":True} ) + + qObj = None + queues = self.qmf_session.getObjects(_class="queue") + for q in queues: + if q.name == "fleabag": + qObj = q + break + self.assertNotEqual(qObj, None, "Failed to get queue object") + #print qObj + + def test_unsolicited_updates(self): + """ Verify that the Console callbacks work + """ + + class Handler(qmf.console.Console): + def __init__(self): + self.broker_info = [] + self.broker_conn = [] + self.newpackage = [] + self.newclass = [] + self.agents = [] + self.events = [] + self.props = [] + self.stats = [] + self.heartbeats = [] + + def brokerInfo(self, broker): + #print "brokerInfo:", broker + self.broker_info.append(broker) + def brokerConnected(self, broker): + #print "brokerConnected:", broker + self.broker_conn.append(broker) + def newPackage(self, name): + #print "newPackage:", name + self.newpackage.append(name) + def newClass(self, kind, classKey): + #print "newClass:", kind, classKey + self.newclass.append( (kind, classKey) ) + def newAgent(self, agent): + #print "newAgent:", agent + self.agents.append( agent ) + def event(self, broker, event): + #print "EVENT %s" % event + self.events.append(event) + def objectProps(self, broker, record): + #print "ObjProps PROPS=[%s]" % str(record.getProperties()) + #print "ObjProps STATS=[%s]" % str(record.getStatistics()) + # Both statistics and properties are available: + props = record.getProperties() + if props: + self.props.append(record) + stats = record.getStatistics() + if stats: + self.stats.append(record) + def objectStats(self, broker, record): + assert False, "objectStats() not called if QMFv2" + def heartbeat(self, agent, timestamp): + #print "Heartbeat %s" % agent + self.heartbeats.append( (agent, timestamp) ) + + handler = Handler() + self._startQmfV2( self.broker, handler ) + # this should force objectProps, queueDeclare Event callbacks + self._create_queue( "fleabag", {"auto-delete":True} ) + # this should force objectStats callback + self.broker.send_message( "fleabag", Message("Hi") ) + # and we should get a few heartbeats + sleep(self.PUB_INTERVAL * 3) + + assert handler.broker_info, "No BrokerInfo callbacks received" + assert handler.broker_conn, "No BrokerConnected callbacks received" + assert handler.newpackage, "No NewPackage callbacks received" + assert handler.newclass, "No NewClass callbacks received" + assert handler.agents, "No NewAgent callbacks received" + assert handler.events, "No event callbacks received" + assert handler.props, "No properties updates received" + assert handler.stats, "No statistics received" + assert handler.heartbeats, "No heartbeat callbacks received" + + def test_async_method(self): + class Handler (qmf.console.Console): + def __init__(self): + self.cv = Condition() + self.xmtList = {} + self.rcvList = {} + + def methodResponse(self, broker, seq, response): + self.cv.acquire() + try: + self.rcvList[seq] = response + finally: + self.cv.release() + + def request(self, broker, count): + self.count = count + for idx in range(count): + self.cv.acquire() + try: + seq = broker.echo(idx, "Echo Message", _async = True) + self.xmtList[seq] = idx + finally: + self.cv.release() + + def check(self): + if self.count != len(self.xmtList): + return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList)) + lost = 0 + mismatched = 0 + for seq in self.xmtList: + value = self.xmtList[seq] + if seq in self.rcvList: + result = self.rcvList.pop(seq) + if result.sequence != value: + mismatched += 1 + else: + lost += 1 + spurious = len(self.rcvList) + if lost == 0 and mismatched == 0 and spurious == 0: + return "pass" + else: + return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious) + + handler = Handler() + self._startQmfV2(self.broker, handler) + broker = self.qmf_session.getObjects(_class="broker")[0] + handler.request(broker, 20) + sleep(1) + self.assertEqual(handler.check(), "pass") + + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + os.execvp("qpid-python-test", + ["qpid-python-test", "-m", "qpidd_qmfv2_tests"] + sys.argv[1:]) + diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 0a30176ed5..bd30730617 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -1241,14 +1241,6 @@ class Session: except Exception,e: return - ## - ## For now, ignore heartbeats from messaging brokers. We already have the "local-broker" - ## agent in our list. - ## - if '_vendor' in values and values['_vendor'] == 'apache.org' and \ - '_product' in values and values['_product'] == 'qpidd': - return - if self.agent_filter: # only allow V2 agents that satisfy the filter v = agentName.split(":", 2) @@ -1257,7 +1249,14 @@ class Session: and (v[0], v[1], v[2]) not in self.agent_filter): return - agent = broker.getAgent(1, agentName) + ## + ## We already have the "local-broker" agent in our list as ['0']. + ## + if '_vendor' in values and values['_vendor'] == 'apache.org' and \ + '_product' in values and values['_product'] == 'qpidd': + agent = broker.getBrokerAgent() + else: + agent = broker.getAgent(1, agentName) if agent == None: agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) agent.setEpoch(epoch) @@ -2928,11 +2927,14 @@ class Broker(Thread): ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender ## of the message. ## - agent_addr = ah['qmf.agent'] - if agent_addr == 'broker': - agent_addr = '0' - if agent_addr in self.agents: - agent = self.agents[agent_addr] + # the broker's agent is mapped to index ['0'] + agentName = ah['qmf.agent'] + v = agentName.split(":") + if agentName == 'broker' or (len(v) >= 2 and v[0] == 'apache.org' + and v[1] == 'qpidd'): + agentName = '0' + if agentName in self.agents: + agent = self.agents[agentName] agent._handleQmfV2Message(opcode, mp, ah, content) agent.touch() |