From f05942c42c0601653be6678145e9df83d5fe2ede Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 14 Feb 2012 16:11:04 +0000 Subject: QPID-3603: Speed up qpid-ha-tool with fast QMF2 method calls. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244098 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/ha/HaBroker.cpp | 2 +- qpid/cpp/src/qpid/ha/management-schema.xml | 1 + qpid/cpp/src/tests/ha_tests.py | 5 +- qpid/tools/src/py/qpid-ha-tool | 151 ++++++++++++++++++++++++----- 4 files changed, 133 insertions(+), 26 deletions(-) diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index c7692ac9a2..349b85431a 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -70,7 +70,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) throw Exception("Cannot start HA: management is disabled"); if (ma) { _qmf::Package packageInit(ma); - mgmtObject = new _qmf::HaBroker(ma, this); + mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); mgmtObject->set_status(BACKUP); ma->addObject(mgmtObject); } diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml index 47b8880b35..fe4a14d111 100644 --- a/qpid/cpp/src/qpid/ha/management-schema.xml +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -21,6 +21,7 @@ + diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index ddcc19ee1e..f28b31dd03 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -281,12 +281,12 @@ class ShortTests(BrokerTest): assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die backup.promote() n = receiver.received # Make sure we are still running - assert retry(lambda: receiver.received > n + 10) + # FIXME aconway 2012-02-01: c++ client has 1 sec min retry, hence long timeout + assert retry(lambda: receiver.received > n + 10, timeout=5) sender.stop() receiver.stop() def test_backup_failover(self): - # FIXME aconway 2012-01-30: UNFINISHED brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) for name in ["a","b","c"] ] url = ",".join([b.host_port() for b in brokers]) @@ -300,7 +300,6 @@ class ShortTests(BrokerTest): brokers[2].promote() # c must fail over to b. brokers[2].connect().session().sender("q").send("b") self.assert_browse_backup(brokers[1], "q", ["a","b"]) - # FIXME aconway 2012-01-30: finish for b in brokers[1:]: b.kill() if __name__ == "__main__": diff --git a/qpid/tools/src/py/qpid-ha-tool b/qpid/tools/src/py/qpid-ha-tool index 97cbd617d9..8e8107657c 100755 --- a/qpid/tools/src/py/qpid-ha-tool +++ b/qpid/tools/src/py/qpid-ha-tool @@ -21,6 +21,114 @@ import qmf.console, optparse, sys from qpid.management import managementChannel, managementClient +from qpid.messaging import Connection +from qpid.messaging import Message as QpidMessage +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +# Utility for doing fast qmf2 operations on a broker. +class QmfBroker(object): + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def close(self): + self.conn.close() + + def __repr__(self): + return "Qpid Broker: %s" % self.url + + def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def _sendRequest(self, opcode, content): + props = {'method' : 'request', + 'qmf.opcode' : opcode, + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + return correlator + + def _doClassQuery(self, class_name): + query = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + return items + + def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): + query = {'_what' : 'OBJECT', + '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + if len(items) == 1: + return items[0] + return None + + def _getAllBrokerObjects(self, cls): + items = self._doClassQuery(cls.__name__.lower()) + objs = [] + for item in items: + objs.append(cls(self, item)) + return objs + + def _getBrokerObject(self, cls, name): + obj = self._doNameQuery(cls.__name__.lower(), name) + if obj: + return cls(self, obj) + return None + op=optparse.OptionParser(usage="Usage: %prog [options] [broker-address]") @@ -33,42 +141,41 @@ op.add_option("-b", "--broker-addresses", action="store", type="string", op.add_option("-q", "--query", action="store_true", help="Show the current HA settings on the broker.") -class HaBroker: - def __init__(self, session, broker): - self.session = session - self.qmf_broker = self.session.addBroker( - broker, client_properties={"qpid.ha-admin":1}) - ha_brokers = self.session.getObjects( - _class="habroker", _package="org.apache.qpid.ha") - if (not ha_brokers): raise Exception("Broker does not have HA enabled.") - self.ha_broker = ha_brokers[0] - - def query(self): - self.ha_broker.update() - print "status=", self.ha_broker.status - print "broker-addresses=", self.ha_broker.brokerAddresses - print "client-addresses=", self.ha_broker.clientAddresses +def get_ha_broker(qmf_broker): + ha_brokers = qmf_broker._doClassQuery("habroker") + if (not ha_brokers): raise Exception("Broker does not have HA enabled.") + return ha_brokers[0] def main(argv): try: opts, args = op.parse_args(argv) if len(args) >1: broker = args[1] else: broker = "localhost:5672" - session = qmf.console.Session() + conn = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) + ha_broker = "org.apache.qpid.ha:habroker:ha-broker" try: - hb = HaBroker(session, broker) + qmf_broker = QmfBroker(conn) + get_ha_broker(qmf_broker) # Verify that HA is enabled action=False if opts.promote: - hb.ha_broker.promote(); action=True + qmf_broker._method("promote", {}, ha_broker) + action=True if opts.broker_addresses: - hb.ha_broker.setBrokerAddresses(opts.broker_addresses); action=True + qmf_broker._method('setBrokerAddresses', {'brokerAddresses':opts.broker_addresses}, ha_broker) + action=True if opts.client_addresses: - hb.ha_broker.setClientAddresses(opts.client_addresses); action=True - if opts.query or not action: hb.query() + qmf_broker._method('setClientAddresses', {'clientAddresses':opts.client_addresses}, ha_broker) + action=True + if opts.query or not action: + hb = get_ha_broker(qmf_broker) + print "status=%s"%hb["status"] + print "broker-addresses=%s"%hb["brokerAddresses"] + print "client-addresses=%s"%hb["clientAddresses"] return 0 finally: - session.close() # Avoid errors shutting down threads. + conn.close() # Avoid errors shutting down threads. except Exception, e: + raise # FIXME aconway 2012-01-31: print e return 1 -- cgit v1.2.1