summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qpid-ha
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py/qpid-ha')
-rwxr-xr-xqpid/tools/src/py/qpid-ha296
1 files changed, 296 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
new file mode 100755
index 0000000000..76d9abde79
--- /dev/null
+++ b/qpid/tools/src/py/qpid-ha
@@ -0,0 +1,296 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, sys, time, os, re, math
+from qpid.messaging import Connection
+from qpid.messaging import Message as QpidMessage
+from qpid.util import URL
+from qpidtoollibs.broker import BrokerAgent
+from qpidtoollibs.config import parse_qpidd_conf
+try:
+ from uuid import uuid4
+except ImportError:
+ from qpid.datatypes import uuid4
+
+# QMF address for the HA broker object.
+HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
+# Define these defaults here rather than in add_option because we want
+# to use qpidd.conf for defaults if --config is specified and
+# these defaults otherwise:
+DEFAULTS = { "broker":"0.0.0.0", "timeout":10.0}
+
+class ExitStatus(Exception):
+ """Raised if a command want's a non-0 exit status from the script"""
+ def __init__(self, status): self.status = status
+
+def find_qpidd_conf():
+ """Return the path to the local qpid.conf file or None if it is not found"""
+ p = os.path
+ prefix, bin = p.split(p.dirname(__file__))
+ if bin == "bin": # Installed in a standard place.
+ conf = p.join(prefix, "etc", "qpid", "qpidd.conf")
+ if p.isfile(conf): return conf
+ return None
+
+class Command(object):
+ """
+ Common options and logic for all commands. Subclasses provide additional
+ options and execution logic.
+ """
+
+ commands = []
+
+ def __init__(self, name, help, arg_names=[], connect_agent=True):
+ """@param connect_agent true if we should establish a QMF agent connection"""
+ Command.commands.append(self)
+ self.name = name
+ self.connect_agent = connect_agent
+ self.arg_names = arg_names
+ usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
+ self.help = help
+ self.op=optparse.OptionParser(usage)
+ common = optparse.OptionGroup(self.op, "Broker connection options")
+ def help_default(what): return " (Default %s)"%DEFAULTS[what]
+ common.add_option("-b", "--broker", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]"+help_default("broker"))
+ common.add_option("--timeout", type="float", metavar="<seconds>", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout"))
+ common.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override")
+ common.add_option("--ssl-certificate", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ common.add_option("--ssl-key", metavar="<key>", help="Client SSL private key (PEM Format)")
+ common.add_option("--config", metavar="<path/to/qpidd.conf>", help="Read default connection configuration from the qpidd.conf broker configuration file. Defaults are overridden by command-line options.)")
+ self.op.add_option_group(common)
+
+ def connect(self, opts):
+ conn_options = {}
+ if not opts.broker:
+ opts.broker = DEFAULTS["broker"]
+ # If we are connecting locally, use local qpidd.conf by default
+ if not opts.config: opts.config = find_qpidd_conf()
+ url = URL(opts.broker)
+ if opts.config: # Use broker config file for defaults
+ config = parse_qpidd_conf(opts.config)
+ if not url.user: url.user = config.get("ha-username")
+ if not url.password: url.password = config.get("ha-password")
+ if not url.port: url.port = config.get("port")
+ opts.broker = str(url)
+ if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism")
+ if not opts.timeout:
+ timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval")
+ if timeout: opts.timeout = float(timeout)
+ else: # Use DEFAULTS
+ if not opts.timeout: opts.timeout = DEFAULTS["timeout"]
+ if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ if not opts.ssl_certificate:
+ self.op.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ conn_options['ssl_keyfile'] = opts.ssl_key
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+ if opts.timeout:
+ conn_options['timeout'] = opts.timeout
+ conn_options['heartbeat'] = int(math.ceil(opts.timeout/2))
+ connection = Connection.establish(opts.broker, **conn_options)
+ qmf_broker = self.connect_agent and BrokerAgent(connection)
+ ha_broker = self.connect_agent and qmf_broker.getHaBroker()
+ return (connection, qmf_broker, ha_broker)
+
+ def all_brokers(self, ha_broker, opts, func):
+ """@return: List of (broker_addr, ha_broker) for all brokers in the cluster.
+ Returns (broker_addr, Exception) if an exception is raised accessing a broker.
+ """
+ # The brokersUrl setting is not in python URL format, simpler parsing here.
+ result = []
+ brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(","))
+ if brokers and opts.all:
+ if "@" in opts.broker: userpass = opts.broker.split("@")[0]
+ else: userpass = None
+ for b in brokers:
+ if userpass and not "@" in b: opts.broker = userpass+"@"+b
+ else: opts.broker = b
+ try:
+ connection, qmf_broker, ha_broker = self.connect(opts)
+ func(ha_broker, b)
+ except Exception,e:
+ func(ha_broker, b, e)
+ else:
+ func(ha_broker)
+
+ def execute(self, args):
+ opts, args = self.op.parse_args(args)
+ if len(args) != len(self.arg_names)+1:
+ self.op.print_help()
+ raise Exception("Wrong number of arguments")
+ self.connection, qmf_broker, ha_broker = self.connect(opts)
+ if self.connect_agent and not ha_broker:
+ raise Exception("HA module is not loaded on broker at %s" % opts.broker)
+ try: self.do_execute(qmf_broker, ha_broker, opts, args)
+ finally: self.connection.close()
+
+ def do_execute(self, qmf_broker, opts, args):
+ raise Exception("Command '%s' is not yet implemented"%self.name)
+
+class ManagerCommand(Command):
+ """
+ Base for commands that should only be used by a cluster manager tool that ensures
+ cluster consistency.
+ """
+
+ manager_commands = [] # Cluster manager commands
+
+ def __init__(self, name, help, arg_names=[], connect_agent=True):
+ """@param connect_agent true if we should establish a QMF agent connection"""
+ super(ManagerCommand, self).__init__(name, "[Cluster manager only] "+help, arg_names, connect_agent)
+ self.commands.remove(self) # Not a user command
+ self.manager_commands.append(self)
+
+
+class PingCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False)
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ self.connection.session() # Make sure we can establish a session.
+PingCmd()
+
+class PromoteCmd(ManagerCommand):
+ def __init__(self):
+ super(PromoteCmd, self).__init__("promote", "Promote a backup broker to primary. This command should *only* be used by a cluster manager (such as rgmanager) that ensures only one broker is primary at a time. Promoting more than one broker to primary at the same time will make the cluster inconsistent and will cause data loss and unexpected behavior.")
+
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ qmf_broker._method("promote", {}, HA_BROKER, timeout=opts.timeout)
+
+PromoteCmd()
+
+
+class StatusCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "status", "Print HA status")
+ self.op.add_option(
+ "--expect", metavar="<status>",
+ help="Don't print status. Return 0 if it matches <status>, 1 otherwise")
+ self.op.add_option(
+ "--is-primary", action="store_true", default=False,
+ help="Don't print status. Return 0 if the broker is primary, 1 otherwise")
+ self.op.add_option(
+ "--all", action="store_true", default=False,
+ help="Print status for all brokers in the cluster")
+
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ if opts.is_primary:
+ if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1)
+ return
+ if opts.expect:
+ if opts.expect != ha_broker.status: raise ExitStatus(1)
+ return
+
+ def status(hb, b=None, ex=None):
+ if ex: print b, ex
+ elif b: print b, hb.status
+ else: print hb.status
+ self.all_brokers(ha_broker, opts, status)
+
+StatusCmd()
+
+class ReplicateCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER, timeout=opts.timeout)
+ReplicateCmd()
+
+class QueryCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "query", "Print HA configuration and status")
+ self.op.add_option(
+ "--all", action="store_true", default=False,
+ help="Print configuration and status for all brokers in the cluster")
+
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ def query(hb, b=None, ex=None):
+ if ex:
+ print "%s %s\n" % (b, ex)
+ else:
+ if b:
+ print "%-20s %s"%("Address:", b)
+ for x in [("Status:", hb.status),
+ ("Broker ID:", hb.systemId),
+ ("Brokers URL:", hb.brokersUrl),
+ ("Public URL:", hb.publicUrl),
+ ("Replicate: ", hb.replicateDefault)
+ ]:
+ print "%-20s %s"%x
+ if b: print
+ self.all_brokers(ha_broker, opts, query)
+
+
+QueryCmd()
+
+def print_usage(prog):
+ print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%prog
+ for cmd in Command.commands:
+ print " %-12s %s."%(cmd.name, cmd.help.split(".")[0])
+ print "\nFor help with a command type: %s <command> --help\n"%prog
+
+def find_command(args, commands):
+ """Find a command among the arguments and options"""
+ for arg in args:
+ cmds = [cmd for cmd in commands if cmd.name == arg]
+ if cmds: return cmds[0]
+ return None
+
+def main_except(argv):
+ """This version of main raises exceptions"""
+ args = argv[1:]
+ commands = Command.commands
+ if "--cluster-manager" in args:
+ commands += ManagerCommand.manager_commands
+ args.remove("--cluster-manager")
+ if len(args) and args[0] in ['help', '--help', '-help', '-h', 'help-all', '--help-all']:
+ if 'help-all' in args[0]:
+ for c in commands: c.op.print_help(); print
+ else:
+ print_usage(os.path.basename(argv[0]));
+ else:
+ command = find_command(args, commands)
+ if command:
+ command.execute(args)
+ else:
+ # Check for attempt to use a manager command without --cluster-manager
+ command = find_command(args, ManagerCommand.manager_commands)
+ if command:
+ message="""'%s' should only be called by the cluster manager.
+Incorrect use of '%s' will cause cluster malfunction.
+To call from a cluster manager use '%s --cluster-manager'. """
+ raise Exception(message%((command.name,)*3))
+ else:
+ print_usage(os.path.basename(argv[0]));
+ raise Exception("No valid command")
+
+def main(argv):
+ try:
+ main_except(argv)
+ return 0
+ except ExitStatus, e:
+ return e.status
+ except Exception, e:
+ print "%s: %s"%(type(e).__name__, e)
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv))