diff options
Diffstat (limited to 'qpid/tools/src/py/qpid-ha-tool')
-rwxr-xr-x | qpid/tools/src/py/qpid-ha-tool | 183 |
1 files changed, 183 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-ha-tool b/qpid/tools/src/py/qpid-ha-tool new file mode 100755 index 0000000000..8e8107657c --- /dev/null +++ b/qpid/tools/src/py/qpid-ha-tool @@ -0,0 +1,183 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import qmf.console, optparse, sys +from qpid.management import managementChannel, managementClient +from qpid.messaging import Connection +from qpid.messaging import Message as QpidMessage +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +# Utility for doing fast qmf2 operations on a broker. +class QmfBroker(object): + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def close(self): + self.conn.close() + + def __repr__(self): + return "Qpid Broker: %s" % self.url + + def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def _sendRequest(self, opcode, content): + props = {'method' : 'request', + 'qmf.opcode' : opcode, + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + return correlator + + def _doClassQuery(self, class_name): + query = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + return items + + def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): + query = {'_what' : 'OBJECT', + '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + if len(items) == 1: + return items[0] + return None + + def _getAllBrokerObjects(self, cls): + items = self._doClassQuery(cls.__name__.lower()) + objs = [] + for item in items: + objs.append(cls(self, item)) + return objs + + def _getBrokerObject(self, cls, name): + obj = self._doNameQuery(cls.__name__.lower(), name) + if obj: + return cls(self, obj) + return None + + +op=optparse.OptionParser(usage="Usage: %prog [options] [broker-address]") + +op.add_option("-p", "--promote", action="store_true", + help="Promote a backup broker to become the primary.") +op.add_option("-c", "--client-addresses", action="store", type="string", + help="Set list of addresses used by clients to connect to the HA cluster.") +op.add_option("-b", "--broker-addresses", action="store", type="string", + help="Set list of addresses used by HA brokers to connect to each other.") +op.add_option("-q", "--query", action="store_true", + help="Show the current HA settings on the broker.") + +def get_ha_broker(qmf_broker): + ha_brokers = qmf_broker._doClassQuery("habroker") + if (not ha_brokers): raise Exception("Broker does not have HA enabled.") + return ha_brokers[0] + +def main(argv): + try: + opts, args = op.parse_args(argv) + if len(args) >1: broker = args[1] + else: broker = "localhost:5672" + conn = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) + ha_broker = "org.apache.qpid.ha:habroker:ha-broker" + try: + qmf_broker = QmfBroker(conn) + get_ha_broker(qmf_broker) # Verify that HA is enabled + action=False + if opts.promote: + qmf_broker._method("promote", {}, ha_broker) + action=True + if opts.broker_addresses: + qmf_broker._method('setBrokerAddresses', {'brokerAddresses':opts.broker_addresses}, ha_broker) + action=True + if opts.client_addresses: + qmf_broker._method('setClientAddresses', {'clientAddresses':opts.client_addresses}, ha_broker) + action=True + if opts.query or not action: + hb = get_ha_broker(qmf_broker) + print "status=%s"%hb["status"] + print "broker-addresses=%s"%hb["brokerAddresses"] + print "client-addresses=%s"%hb["clientAddresses"] + return 0 + finally: + conn.close() # Avoid errors shutting down threads. + except Exception, e: + raise # FIXME aconway 2012-01-31: + print e + return 1 + +if __name__ == "__main__": + sys.exit(main(sys.argv)) |