diff options
Diffstat (limited to 'qpid/tools/src/py/qpid-ha')
-rwxr-xr-x | qpid/tools/src/py/qpid-ha | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha new file mode 100755 index 0000000000..76d9abde79 --- /dev/null +++ b/qpid/tools/src/py/qpid-ha @@ -0,0 +1,296 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, time, os, re, math +from qpid.messaging import Connection +from qpid.messaging import Message as QpidMessage +from qpid.util import URL +from qpidtoollibs.broker import BrokerAgent +from qpidtoollibs.config import parse_qpidd_conf +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +# QMF address for the HA broker object. +HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" +# Define these defaults here rather than in add_option because we want +# to use qpidd.conf for defaults if --config is specified and +# these defaults otherwise: +DEFAULTS = { "broker":"0.0.0.0", "timeout":10.0} + +class ExitStatus(Exception): + """Raised if a command want's a non-0 exit status from the script""" + def __init__(self, status): self.status = status + +def find_qpidd_conf(): + """Return the path to the local qpid.conf file or None if it is not found""" + p = os.path + prefix, bin = p.split(p.dirname(__file__)) + if bin == "bin": # Installed in a standard place. + conf = p.join(prefix, "etc", "qpid", "qpidd.conf") + if p.isfile(conf): return conf + return None + +class Command(object): + """ + Common options and logic for all commands. Subclasses provide additional + options and execution logic. + """ + + commands = [] + + def __init__(self, name, help, arg_names=[], connect_agent=True): + """@param connect_agent true if we should establish a QMF agent connection""" + Command.commands.append(self) + self.name = name + self.connect_agent = connect_agent + self.arg_names = arg_names + usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help) + self.help = help + self.op=optparse.OptionParser(usage) + common = optparse.OptionGroup(self.op, "Broker connection options") + def help_default(what): return " (Default %s)"%DEFAULTS[what] + common.add_option("-b", "--broker", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]"+help_default("broker")) + common.add_option("--timeout", type="float", metavar="<seconds>", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout")) + common.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override") + common.add_option("--ssl-certificate", metavar="<cert>", help="Client SSL certificate (PEM Format)") + common.add_option("--ssl-key", metavar="<key>", help="Client SSL private key (PEM Format)") + common.add_option("--config", metavar="<path/to/qpidd.conf>", help="Read default connection configuration from the qpidd.conf broker configuration file. Defaults are overridden by command-line options.)") + self.op.add_option_group(common) + + def connect(self, opts): + conn_options = {} + if not opts.broker: + opts.broker = DEFAULTS["broker"] + # If we are connecting locally, use local qpidd.conf by default + if not opts.config: opts.config = find_qpidd_conf() + url = URL(opts.broker) + if opts.config: # Use broker config file for defaults + config = parse_qpidd_conf(opts.config) + if not url.user: url.user = config.get("ha-username") + if not url.password: url.password = config.get("ha-password") + if not url.port: url.port = config.get("port") + opts.broker = str(url) + if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism") + if not opts.timeout: + timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval") + if timeout: opts.timeout = float(timeout) + else: # Use DEFAULTS + if not opts.timeout: opts.timeout = DEFAULTS["timeout"] + if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism + if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate + if opts.ssl_key: + if not opts.ssl_certificate: + self.op.error("missing '--ssl-certificate' (required by '--ssl-key')") + conn_options['ssl_keyfile'] = opts.ssl_key + conn_options['client_properties'] = {'qpid.ha-admin' : 1} + if opts.timeout: + conn_options['timeout'] = opts.timeout + conn_options['heartbeat'] = int(math.ceil(opts.timeout/2)) + connection = Connection.establish(opts.broker, **conn_options) + qmf_broker = self.connect_agent and BrokerAgent(connection) + ha_broker = self.connect_agent and qmf_broker.getHaBroker() + return (connection, qmf_broker, ha_broker) + + def all_brokers(self, ha_broker, opts, func): + """@return: List of (broker_addr, ha_broker) for all brokers in the cluster. + Returns (broker_addr, Exception) if an exception is raised accessing a broker. + """ + # The brokersUrl setting is not in python URL format, simpler parsing here. + result = [] + brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(",")) + if brokers and opts.all: + if "@" in opts.broker: userpass = opts.broker.split("@")[0] + else: userpass = None + for b in brokers: + if userpass and not "@" in b: opts.broker = userpass+"@"+b + else: opts.broker = b + try: + connection, qmf_broker, ha_broker = self.connect(opts) + func(ha_broker, b) + except Exception,e: + func(ha_broker, b, e) + else: + func(ha_broker) + + def execute(self, args): + opts, args = self.op.parse_args(args) + if len(args) != len(self.arg_names)+1: + self.op.print_help() + raise Exception("Wrong number of arguments") + self.connection, qmf_broker, ha_broker = self.connect(opts) + if self.connect_agent and not ha_broker: + raise Exception("HA module is not loaded on broker at %s" % opts.broker) + try: self.do_execute(qmf_broker, ha_broker, opts, args) + finally: self.connection.close() + + def do_execute(self, qmf_broker, opts, args): + raise Exception("Command '%s' is not yet implemented"%self.name) + +class ManagerCommand(Command): + """ + Base for commands that should only be used by a cluster manager tool that ensures + cluster consistency. + """ + + manager_commands = [] # Cluster manager commands + + def __init__(self, name, help, arg_names=[], connect_agent=True): + """@param connect_agent true if we should establish a QMF agent connection""" + super(ManagerCommand, self).__init__(name, "[Cluster manager only] "+help, arg_names, connect_agent) + self.commands.remove(self) # Not a user command + self.manager_commands.append(self) + + +class PingCmd(Command): + def __init__(self): + Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False) + def do_execute(self, qmf_broker, ha_broker, opts, args): + self.connection.session() # Make sure we can establish a session. +PingCmd() + +class PromoteCmd(ManagerCommand): + def __init__(self): + super(PromoteCmd, self).__init__("promote", "Promote a backup broker to primary. This command should *only* be used by a cluster manager (such as rgmanager) that ensures only one broker is primary at a time. Promoting more than one broker to primary at the same time will make the cluster inconsistent and will cause data loss and unexpected behavior.") + + def do_execute(self, qmf_broker, ha_broker, opts, args): + qmf_broker._method("promote", {}, HA_BROKER, timeout=opts.timeout) + +PromoteCmd() + + +class StatusCmd(Command): + def __init__(self): + Command.__init__(self, "status", "Print HA status") + self.op.add_option( + "--expect", metavar="<status>", + help="Don't print status. Return 0 if it matches <status>, 1 otherwise") + self.op.add_option( + "--is-primary", action="store_true", default=False, + help="Don't print status. Return 0 if the broker is primary, 1 otherwise") + self.op.add_option( + "--all", action="store_true", default=False, + help="Print status for all brokers in the cluster") + + def do_execute(self, qmf_broker, ha_broker, opts, args): + if opts.is_primary: + if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1) + return + if opts.expect: + if opts.expect != ha_broker.status: raise ExitStatus(1) + return + + def status(hb, b=None, ex=None): + if ex: print b, ex + elif b: print b, hb.status + else: print hb.status + self.all_brokers(ha_broker, opts, status) + +StatusCmd() + +class ReplicateCmd(Command): + def __init__(self): + Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"]) + def do_execute(self, qmf_broker, ha_broker, opts, args): + qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER, timeout=opts.timeout) +ReplicateCmd() + +class QueryCmd(Command): + def __init__(self): + Command.__init__(self, "query", "Print HA configuration and status") + self.op.add_option( + "--all", action="store_true", default=False, + help="Print configuration and status for all brokers in the cluster") + + def do_execute(self, qmf_broker, ha_broker, opts, args): + def query(hb, b=None, ex=None): + if ex: + print "%s %s\n" % (b, ex) + else: + if b: + print "%-20s %s"%("Address:", b) + for x in [("Status:", hb.status), + ("Broker ID:", hb.systemId), + ("Brokers URL:", hb.brokersUrl), + ("Public URL:", hb.publicUrl), + ("Replicate: ", hb.replicateDefault) + ]: + print "%-20s %s"%x + if b: print + self.all_brokers(ha_broker, opts, query) + + +QueryCmd() + +def print_usage(prog): + print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%prog + for cmd in Command.commands: + print " %-12s %s."%(cmd.name, cmd.help.split(".")[0]) + print "\nFor help with a command type: %s <command> --help\n"%prog + +def find_command(args, commands): + """Find a command among the arguments and options""" + for arg in args: + cmds = [cmd for cmd in commands if cmd.name == arg] + if cmds: return cmds[0] + return None + +def main_except(argv): + """This version of main raises exceptions""" + args = argv[1:] + commands = Command.commands + if "--cluster-manager" in args: + commands += ManagerCommand.manager_commands + args.remove("--cluster-manager") + if len(args) and args[0] in ['help', '--help', '-help', '-h', 'help-all', '--help-all']: + if 'help-all' in args[0]: + for c in commands: c.op.print_help(); print + else: + print_usage(os.path.basename(argv[0])); + else: + command = find_command(args, commands) + if command: + command.execute(args) + else: + # Check for attempt to use a manager command without --cluster-manager + command = find_command(args, ManagerCommand.manager_commands) + if command: + message="""'%s' should only be called by the cluster manager. +Incorrect use of '%s' will cause cluster malfunction. +To call from a cluster manager use '%s --cluster-manager'. """ + raise Exception(message%((command.name,)*3)) + else: + print_usage(os.path.basename(argv[0])); + raise Exception("No valid command") + +def main(argv): + try: + main_except(argv) + return 0 + except ExitStatus, e: + return e.status + except Exception, e: + print "%s: %s"%(type(e).__name__, e) + return 1 + +if __name__ == "__main__": + sys.exit(main(sys.argv)) |