summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-13 16:18:02 +0000
committerAlan Conway <aconway@apache.org>2012-02-13 16:18:02 +0000
commit89e2163e63d4a8f444425a3f686792d73142dcee (patch)
treef1ab43410cab883a763bf303cf02a9e4c3fa7ca0
parentd1e2f3cdba108b57a5305607eb4336fbfd1f2a06 (diff)
downloadqpid-python-89e2163e63d4a8f444425a3f686792d73142dcee.tar.gz
QPID-3603: Speed up qpid-ha-tool with fast QMF2 method calls.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1243578 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml1
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py5
-rwxr-xr-xqpid/tools/src/py/qpid-ha-tool151
4 files changed, 133 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index c7692ac9a2..349b85431a 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -70,7 +70,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
throw Exception("Cannot start HA: management is disabled");
if (ma) {
_qmf::Package packageInit(ma);
- mgmtObject = new _qmf::HaBroker(ma, this);
+ mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
mgmtObject->set_status(BACKUP);
ma->addObject(mgmtObject);
}
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index 47b8880b35..fe4a14d111 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -21,6 +21,7 @@
<!-- Monitor and control HA status of a broker. -->
<class name="HaBroker">
+ <property name="name" type="sstr" access="RC" index="y" desc="Primary Key"/>
<property name="status" type="sstr" desc="HA status: primary or backup"/>
<property name="clientAddresses" type="sstr" desc="List of addresses used by clients to connect to the HA cluster."/>
<property name="brokerAddresses" type="sstr" desc="List of addresses used by HA brokers to connect to each other."/>
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index ddcc19ee1e..f28b31dd03 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -281,12 +281,12 @@ class ShortTests(BrokerTest):
assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
backup.promote()
n = receiver.received # Make sure we are still running
- assert retry(lambda: receiver.received > n + 10)
+ # FIXME aconway 2012-02-01: c++ client has 1 sec min retry, hence long timeout
+ assert retry(lambda: receiver.received > n + 10, timeout=5)
sender.stop()
receiver.stop()
def test_backup_failover(self):
- # FIXME aconway 2012-01-30: UNFINISHED
brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
for name in ["a","b","c"] ]
url = ",".join([b.host_port() for b in brokers])
@@ -300,7 +300,6 @@ class ShortTests(BrokerTest):
brokers[2].promote() # c must fail over to b.
brokers[2].connect().session().sender("q").send("b")
self.assert_browse_backup(brokers[1], "q", ["a","b"])
- # FIXME aconway 2012-01-30: finish
for b in brokers[1:]: b.kill()
if __name__ == "__main__":
diff --git a/qpid/tools/src/py/qpid-ha-tool b/qpid/tools/src/py/qpid-ha-tool
index 97cbd617d9..8e8107657c 100755
--- a/qpid/tools/src/py/qpid-ha-tool
+++ b/qpid/tools/src/py/qpid-ha-tool
@@ -21,6 +21,114 @@
import qmf.console, optparse, sys
from qpid.management import managementChannel, managementClient
+from qpid.messaging import Connection
+from qpid.messaging import Message as QpidMessage
+try:
+ from uuid import uuid4
+except ImportError:
+ from qpid.datatypes import uuid4
+
+# Utility for doing fast qmf2 operations on a broker.
+class QmfBroker(object):
+ def __init__(self, conn):
+ self.conn = conn
+ self.sess = self.conn.session()
+ self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
+ str(uuid4())
+ self.reply_rx = self.sess.receiver(self.reply_to)
+ self.reply_rx.capacity = 10
+ self.tx = self.sess.sender("qmf.default.direct/broker")
+ self.next_correlator = 1
+
+ def close(self):
+ self.conn.close()
+
+ def __repr__(self):
+ return "Qpid Broker: %s" % self.url
+
+ def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"):
+ props = {'method' : 'request',
+ 'qmf.opcode' : '_method_request',
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+
+ content = {'_object_id' : {'_object_name' : addr},
+ '_method_name' : method,
+ '_arguments' : arguments}
+
+ message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] == '_exception':
+ raise Exception("Exception from Agent: %r" % response.content['_values'])
+ if response.properties['qmf.opcode'] != '_method_response':
+ raise Exception("bad response: %r" % response.properties)
+ return response.content['_arguments']
+
+ def _sendRequest(self, opcode, content):
+ props = {'method' : 'request',
+ 'qmf.opcode' : opcode,
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+ message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ return correlator
+
+ def _doClassQuery(self, class_name):
+ query = {'_what' : 'OBJECT',
+ '_schema_id' : {'_class_name' : class_name}}
+ correlator = self._sendRequest('_query_request', query)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item['_values'])
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ return items
+
+ def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'):
+ query = {'_what' : 'OBJECT',
+ '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}}
+ correlator = self._sendRequest('_query_request', query)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item['_values'])
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ if len(items) == 1:
+ return items[0]
+ return None
+
+ def _getAllBrokerObjects(self, cls):
+ items = self._doClassQuery(cls.__name__.lower())
+ objs = []
+ for item in items:
+ objs.append(cls(self, item))
+ return objs
+
+ def _getBrokerObject(self, cls, name):
+ obj = self._doNameQuery(cls.__name__.lower(), name)
+ if obj:
+ return cls(self, obj)
+ return None
+
op=optparse.OptionParser(usage="Usage: %prog [options] [broker-address]")
@@ -33,42 +141,41 @@ op.add_option("-b", "--broker-addresses", action="store", type="string",
op.add_option("-q", "--query", action="store_true",
help="Show the current HA settings on the broker.")
-class HaBroker:
- def __init__(self, session, broker):
- self.session = session
- self.qmf_broker = self.session.addBroker(
- broker, client_properties={"qpid.ha-admin":1})
- ha_brokers = self.session.getObjects(
- _class="habroker", _package="org.apache.qpid.ha")
- if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
- self.ha_broker = ha_brokers[0]
-
- def query(self):
- self.ha_broker.update()
- print "status=", self.ha_broker.status
- print "broker-addresses=", self.ha_broker.brokerAddresses
- print "client-addresses=", self.ha_broker.clientAddresses
+def get_ha_broker(qmf_broker):
+ ha_brokers = qmf_broker._doClassQuery("habroker")
+ if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
+ return ha_brokers[0]
def main(argv):
try:
opts, args = op.parse_args(argv)
if len(args) >1: broker = args[1]
else: broker = "localhost:5672"
- session = qmf.console.Session()
+ conn = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
+ ha_broker = "org.apache.qpid.ha:habroker:ha-broker"
try:
- hb = HaBroker(session, broker)
+ qmf_broker = QmfBroker(conn)
+ get_ha_broker(qmf_broker) # Verify that HA is enabled
action=False
if opts.promote:
- hb.ha_broker.promote(); action=True
+ qmf_broker._method("promote", {}, ha_broker)
+ action=True
if opts.broker_addresses:
- hb.ha_broker.setBrokerAddresses(opts.broker_addresses); action=True
+ qmf_broker._method('setBrokerAddresses', {'brokerAddresses':opts.broker_addresses}, ha_broker)
+ action=True
if opts.client_addresses:
- hb.ha_broker.setClientAddresses(opts.client_addresses); action=True
- if opts.query or not action: hb.query()
+ qmf_broker._method('setClientAddresses', {'clientAddresses':opts.client_addresses}, ha_broker)
+ action=True
+ if opts.query or not action:
+ hb = get_ha_broker(qmf_broker)
+ print "status=%s"%hb["status"]
+ print "broker-addresses=%s"%hb["brokerAddresses"]
+ print "client-addresses=%s"%hb["clientAddresses"]
return 0
finally:
- session.close() # Avoid errors shutting down threads.
+ conn.close() # Avoid errors shutting down threads.
except Exception, e:
+ raise # FIXME aconway 2012-01-31:
print e
return 1