summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2013-04-05 17:17:06 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2013-04-05 17:17:06 +0000
commitdea8a4b6628ab7ddce1b9cfc52316c80df7eff34 (patch)
tree23fbf78d24726071ebca897bda75380f8908c858
parentd0416615f7a54ae6888901a9b1453e005e9ef22e (diff)
downloadqpid-python-dea8a4b6628ab7ddce1b9cfc52316c80df7eff34.tar.gz
QPID-4689: fix dropped heartbeat indications and object update callbacks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1465050 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt1
-rwxr-xr-xqpid/cpp/src/tests/qpidd_qmfv2_tests.py203
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py30
3 files changed, 220 insertions, 14 deletions
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index 001b5d2d69..cb10414970 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -326,6 +326,7 @@ endif (PYTHON_EXECUTABLE)
add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix})
if (PYTHON_EXECUTABLE)
add_test (ha_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py)
+ add_test (qpidd_qmfv2_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py)
if (BUILD_AMQP)
add_test (interlink_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py)
endif (BUILD_AMQP)
diff --git a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
new file mode 100755
index 0000000000..d9a725fb23
--- /dev/null
+++ b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Runs QMF tests against a broker running with QMFv1 disabled. This forces the
+# broker to use QMFv2 only. This is necessary as if there is a bug in V2, some
+# V1 operations may hide that (esp. asynchonous notifications)
+
+
+import sys, shutil, os
+from time import sleep
+from brokertest import *
+from qpid.messaging import Message
+try: import qmf.console
+except: print "Cannot import module qmf.console, skipping tests"; exit(0);
+
+
+class ConsoleTest(BrokerTest):
+ """
+ Test QMFv2 support using the qmf.console library.
+ """
+ PUB_INTERVAL=1
+
+ def setUp(self):
+ BrokerTest.setUp(self)
+ args = ["--mgmt-qmf1=no",
+ "--mgmt-pub-interval=%d" % self.PUB_INTERVAL]
+ self.broker = BrokerTest.broker(self, args)
+
+ def _startQmfV2(self, broker, console=None):
+ # I manually set up the QMF session here rather than call the startQmf
+ # method from BrokerTest as I can guarantee the console library is used
+ # (assuming BrokerTest's implementation of startQmf could change)
+ self.qmf_session = qmf.console.Session(console)
+ self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (broker.host(),
+ broker.port()))
+ self.assertEqual(self.qmf_broker.getBrokerAgent().isV2, True,
+ "Expected broker agent to support QMF V2")
+
+
+ def _create_queue( self, q_name, args={} ):
+ broker = self.qmf_session.getObjects(_class="broker")[0]
+ result = broker.create("queue", q_name, args, False)
+ self.assertEqual(result.status, 0, result)
+
+
+ def test_method_call(self):
+ """ Verify method calls work, and check the behavior of getObjects()
+ call
+ """
+ self._startQmfV2( self.broker )
+ self._create_queue( "fleabag", {"auto-delete":True} )
+
+ qObj = None
+ queues = self.qmf_session.getObjects(_class="queue")
+ for q in queues:
+ if q.name == "fleabag":
+ qObj = q
+ break
+ self.assertNotEqual(qObj, None, "Failed to get queue object")
+ #print qObj
+
+ def test_unsolicited_updates(self):
+ """ Verify that the Console callbacks work
+ """
+
+ class Handler(qmf.console.Console):
+ def __init__(self):
+ self.broker_info = []
+ self.broker_conn = []
+ self.newpackage = []
+ self.newclass = []
+ self.agents = []
+ self.events = []
+ self.props = []
+ self.stats = []
+ self.heartbeats = []
+
+ def brokerInfo(self, broker):
+ #print "brokerInfo:", broker
+ self.broker_info.append(broker)
+ def brokerConnected(self, broker):
+ #print "brokerConnected:", broker
+ self.broker_conn.append(broker)
+ def newPackage(self, name):
+ #print "newPackage:", name
+ self.newpackage.append(name)
+ def newClass(self, kind, classKey):
+ #print "newClass:", kind, classKey
+ self.newclass.append( (kind, classKey) )
+ def newAgent(self, agent):
+ #print "newAgent:", agent
+ self.agents.append( agent )
+ def event(self, broker, event):
+ #print "EVENT %s" % event
+ self.events.append(event)
+ def objectProps(self, broker, record):
+ #print "ObjProps PROPS=[%s]" % str(record.getProperties())
+ #print "ObjProps STATS=[%s]" % str(record.getStatistics())
+ # Both statistics and properties are available:
+ props = record.getProperties()
+ if props:
+ self.props.append(record)
+ stats = record.getStatistics()
+ if stats:
+ self.stats.append(record)
+ def objectStats(self, broker, record):
+ assert False, "objectStats() not called if QMFv2"
+ def heartbeat(self, agent, timestamp):
+ #print "Heartbeat %s" % agent
+ self.heartbeats.append( (agent, timestamp) )
+
+ handler = Handler()
+ self._startQmfV2( self.broker, handler )
+ # this should force objectProps, queueDeclare Event callbacks
+ self._create_queue( "fleabag", {"auto-delete":True} )
+ # this should force objectStats callback
+ self.broker.send_message( "fleabag", Message("Hi") )
+ # and we should get a few heartbeats
+ sleep(self.PUB_INTERVAL * 3)
+
+ assert handler.broker_info, "No BrokerInfo callbacks received"
+ assert handler.broker_conn, "No BrokerConnected callbacks received"
+ assert handler.newpackage, "No NewPackage callbacks received"
+ assert handler.newclass, "No NewClass callbacks received"
+ assert handler.agents, "No NewAgent callbacks received"
+ assert handler.events, "No event callbacks received"
+ assert handler.props, "No properties updates received"
+ assert handler.stats, "No statistics received"
+ assert handler.heartbeats, "No heartbeat callbacks received"
+
+ def test_async_method(self):
+ class Handler (qmf.console.Console):
+ def __init__(self):
+ self.cv = Condition()
+ self.xmtList = {}
+ self.rcvList = {}
+
+ def methodResponse(self, broker, seq, response):
+ self.cv.acquire()
+ try:
+ self.rcvList[seq] = response
+ finally:
+ self.cv.release()
+
+ def request(self, broker, count):
+ self.count = count
+ for idx in range(count):
+ self.cv.acquire()
+ try:
+ seq = broker.echo(idx, "Echo Message", _async = True)
+ self.xmtList[seq] = idx
+ finally:
+ self.cv.release()
+
+ def check(self):
+ if self.count != len(self.xmtList):
+ return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
+ lost = 0
+ mismatched = 0
+ for seq in self.xmtList:
+ value = self.xmtList[seq]
+ if seq in self.rcvList:
+ result = self.rcvList.pop(seq)
+ if result.sequence != value:
+ mismatched += 1
+ else:
+ lost += 1
+ spurious = len(self.rcvList)
+ if lost == 0 and mismatched == 0 and spurious == 0:
+ return "pass"
+ else:
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
+
+ handler = Handler()
+ self._startQmfV2(self.broker, handler)
+ broker = self.qmf_session.getObjects(_class="broker")[0]
+ handler.request(broker, 20)
+ sleep(1)
+ self.assertEqual(handler.check(), "pass")
+
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ os.execvp("qpid-python-test",
+ ["qpid-python-test", "-m", "qpidd_qmfv2_tests"] + sys.argv[1:])
+
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 0a30176ed5..bd30730617 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -1241,14 +1241,6 @@ class Session:
except Exception,e:
return
- ##
- ## For now, ignore heartbeats from messaging brokers. We already have the "local-broker"
- ## agent in our list.
- ##
- if '_vendor' in values and values['_vendor'] == 'apache.org' and \
- '_product' in values and values['_product'] == 'qpidd':
- return
-
if self.agent_filter:
# only allow V2 agents that satisfy the filter
v = agentName.split(":", 2)
@@ -1257,7 +1249,14 @@ class Session:
and (v[0], v[1], v[2]) not in self.agent_filter):
return
- agent = broker.getAgent(1, agentName)
+ ##
+ ## We already have the "local-broker" agent in our list as ['0'].
+ ##
+ if '_vendor' in values and values['_vendor'] == 'apache.org' and \
+ '_product' in values and values['_product'] == 'qpidd':
+ agent = broker.getBrokerAgent()
+ else:
+ agent = broker.getAgent(1, agentName)
if agent == None:
agent = Agent(broker, agentName, "QMFv2 Agent", True, interval)
agent.setEpoch(epoch)
@@ -2928,11 +2927,14 @@ class Broker(Thread):
## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender
## of the message.
##
- agent_addr = ah['qmf.agent']
- if agent_addr == 'broker':
- agent_addr = '0'
- if agent_addr in self.agents:
- agent = self.agents[agent_addr]
+ # the broker's agent is mapped to index ['0']
+ agentName = ah['qmf.agent']
+ v = agentName.split(":")
+ if agentName == 'broker' or (len(v) >= 2 and v[0] == 'apache.org'
+ and v[1] == 'qpidd'):
+ agentName = '0'
+ if agentName in self.agents:
+ agent = self.agents[agentName]
agent._handleQmfV2Message(opcode, mp, ah, content)
agent.touch()