diff options
Diffstat (limited to 'qpid/cpp/src/tests/qpidd_qmfv2_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/qpidd_qmfv2_tests.py | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py new file mode 100755 index 0000000000..2b45cb6eea --- /dev/null +++ b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Runs QMF tests against a broker running with QMFv1 disabled. This forces the +# broker to use QMFv2 only. This is necessary as if there is a bug in V2, some +# V1 operations may hide that (esp. asynchonous notifications) + + +import sys, shutil, os +from time import sleep +from brokertest import * +from qpid.messaging import Message +try: import qmf.console +except: print "Cannot import module qmf.console, skipping tests"; exit(0); + +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + +class ConsoleTest(BrokerTest): + """ + Test QMFv2 support using the qmf.console library. + """ + PUB_INTERVAL=1 + + def setUp(self): + BrokerTest.setUp(self) + + def _startBroker(self, QMFv1=False ): + self._broker_is_v1 = QMFv1 + if self._broker_is_v1: + args = ["--mgmt-qmf1=yes", "--mgmt-qmf2=no"] + else: + args = ["--mgmt-qmf1=no", "--mgmt-qmf2=yes"] + + args.append("--mgmt-pub-interval=%d" % self.PUB_INTERVAL) + self.broker = BrokerTest.broker(self, args) + + + def _myStartQmf(self, broker, console=None): + # I manually set up the QMF session here rather than call the startQmf + # method from BrokerTest as I can guarantee the console library is used + # (assuming BrokerTest's implementation of startQmf could change) + self.qmf_session = qmf.console.Session(console) + self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (broker.host(), + broker.port())) + + def _create_queue( self, q_name, args={} ): + broker = self.qmf_session.getObjects(_class="broker")[0] + result = broker.create("queue", q_name, args, False) + self.assertEqual(result.status, 0, result) + + + def _test_method_call(self): + """ Verify method calls work, and check the behavior of getObjects() + call + """ + self._myStartQmf( self.broker ) + self._create_queue( "fleabag", {"auto-delete":True} ) + + qObj = None + queues = self.qmf_session.getObjects(_class="queue") + for q in queues: + if q.name == "fleabag": + qObj = q + break + self.assertNotEqual(qObj, None, "Failed to get queue object") + #print qObj + + def _test_unsolicited_updates(self): + """ Verify that the Console callbacks work + """ + + class Handler(qmf.console.Console): + def __init__(self): + self.v1_oids = 0 + self.v1_events = 0 + self.v2_oids = 0 + self.v2_events = 0 + self.broker_info = [] + self.broker_conn = [] + self.newpackage = [] + self.newclass = [] + self.agents = [] + self.events = [] + self.updates = {} # holds the objects by OID + self.heartbeats = [] + + def brokerInfo(self, broker): + #print "brokerInfo:", broker + self.broker_info.append(broker) + def brokerConnected(self, broker): + #print "brokerConnected:", broker + self.broker_conn.append(broker) + def newPackage(self, name): + #print "newPackage:", name + self.newpackage.append(name) + def newClass(self, kind, classKey): + #print "newClass:", kind, classKey + self.newclass.append( (kind, classKey) ) + def newAgent(self, agent): + #print "newAgent:", agent + self.agents.append( agent ) + def event(self, broker, event): + #print "EVENT %s" % event + self.events.append(event) + if event.isV2: + self.v2_events += 1 + else: + self.v1_events += 1 + + def heartbeat(self, agent, timestamp): + #print "Heartbeat %s" % agent + self.heartbeats.append( (agent, timestamp) ) + + # generic handler for objectProps and objectStats + def _handle_obj_update(self, record): + oid = record.getObjectId() + if oid.isV2: + self.v2_oids += 1 + else: + self.v1_oids += 1 + + if oid not in self.updates: + self.updates[oid] = record + else: + self.updates[oid].mergeUpdate( record ) + + def objectProps(self, broker, record): + assert len(record.getProperties()), "objectProps() invoked with no properties?" + self._handle_obj_update(record) + + def objectStats(self, broker, record): + assert len(record.getStatistics()), "objectStats() invoked with no properties?" + self._handle_obj_update(record) + + handler = Handler() + self._myStartQmf( self.broker, handler ) + # this should force objectProps, queueDeclare Event callbacks + self._create_queue( "fleabag", {"auto-delete":True} ) + # this should force objectStats callback + self.broker.send_message( "fleabag", Message("Hi") ) + # and we should get a few heartbeats + sleep(self.PUB_INTERVAL) + self.broker.send_message( "fleabag", Message("Hi") ) + sleep(self.PUB_INTERVAL) + self.broker.send_message( "fleabag", Message("Hi") ) + sleep(self.PUB_INTERVAL * 2) + + assert handler.broker_info, "No BrokerInfo callbacks received" + assert handler.broker_conn, "No BrokerConnected callbacks received" + assert handler.newpackage, "No NewPackage callbacks received" + assert handler.newclass, "No NewClass callbacks received" + assert handler.agents, "No NewAgent callbacks received" + assert handler.events, "No event callbacks received" + assert handler.updates, "No updates received" + assert handler.heartbeats, "No heartbeat callbacks received" + + # now verify updates for queue "fleabag" were received, and the + # msgDepth statistic is correct + + msgs = 0 + for o in handler.updates.itervalues(): + key = o.getClassKey() + if key and key.getClassName() == "queue" and o.name == "fleabag": + assert o.msgDepth, "No update to msgDepth statistic!" + msgs = o.msgDepth + break + assert msgs == 3, "msgDepth statistics not accurate!" + + # verify that the published objects were of the correct QMF version + if self._broker_is_v1: + assert handler.v1_oids and handler.v2_oids == 0, "QMFv2 updates received while in V1-only mode!" + assert handler.v1_events and handler.v2_events == 0, "QMFv2 events received while in V1-only mode!" + else: + assert handler.v2_oids and handler.v1_oids == 0, "QMFv1 updates received while in V2-only mode!" + assert handler.v2_events and handler.v1_events == 0, "QMFv1 events received while in V2-only mode!" + + def _test_async_method(self): + class Handler (qmf.console.Console): + def __init__(self): + self.cv = Condition() + self.xmtList = {} + self.rcvList = {} + + def methodResponse(self, broker, seq, response): + self.cv.acquire() + try: + self.rcvList[seq] = response + finally: + self.cv.release() + + def request(self, broker, count): + self.count = count + for idx in range(count): + self.cv.acquire() + try: + seq = broker.echo(idx, "Echo Message", _async = True) + self.xmtList[seq] = idx + finally: + self.cv.release() + + def check(self): + if self.count != len(self.xmtList): + return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList)) + lost = 0 + mismatched = 0 + for seq in self.xmtList: + value = self.xmtList[seq] + if seq in self.rcvList: + result = self.rcvList.pop(seq) + if result.sequence != value: + mismatched += 1 + else: + lost += 1 + spurious = len(self.rcvList) + if lost == 0 and mismatched == 0 and spurious == 0: + return "pass" + else: + return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious) + + handler = Handler() + self._myStartQmf(self.broker, handler) + broker = self.qmf_session.getObjects(_class="broker")[0] + handler.request(broker, 20) + sleep(1) + self.assertEqual(handler.check(), "pass") + + def test_method_call(self): + self._startBroker() + self._test_method_call() + + def test_unsolicited_updates(self): + self._startBroker() + self._test_unsolicited_updates() + + def test_async_method(self): + self._startBroker() + self._test_async_method() + + # For now, include "QMFv1 only" tests. Once QMFv1 is deprecated, these can + # be removed + + def test_method_call_v1(self): + self._startBroker(QMFv1=True) + self._test_method_call() + + def test_unsolicited_updates_v1(self): + self._startBroker(QMFv1=True) + self._test_unsolicited_updates() + + def test_async_method_v1(self): + self._startBroker(QMFv1=True) + self._test_async_method() + + + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + os.execvp("qpid-python-test", + ["qpid-python-test", "-m", "qpidd_qmfv2_tests"] + sys.argv[1:]) + |