From 1d3b4560f8a7f212976b536376a976b3b41f489b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 24 Apr 2014 17:54:05 +0000 Subject: QPID-5719: HA becomes unresponsive once any of the brokers are SIGSTOPed - Added timeout to qpid-ha. - qpidd init script pings broker to verify it is not hung. - updated documentation in qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml. The new results for the cases mentioned in the bug: a] stopped ALL brokers: rgmanager restarts the entire cluster but data is lost. Equivalent to killing all the brokers at once. This does not affect quorum because only qpidd services are affected, not other services managed by cman. b] stopped the primary: rgmanager restarts the primary after a timeout and promotes one of the backups. c] stopped a backup: rgmanager restarts the backups after a timeout. Clients that are actively sending messages may see a delay while backup is restarted. Note you need to set link-heartbeat-interval in qpidd.conf. The default is very high (120 seconds), it should be set lower to see recovery from sigstop in a reasonable time. See the updated documentation in qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1589807 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/tools/src/py/qpid-ha | 70 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 22 deletions(-) (limited to 'qpid/tools') diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha index 1b22213d0c..8d91ac829b 100755 --- a/qpid/tools/src/py/qpid-ha +++ b/qpid/tools/src/py/qpid-ha @@ -22,6 +22,7 @@ import optparse, sys, time, os, re from qpid.messaging import Connection from qpid.messaging import Message as QpidMessage +from qpid.util import URL from qpidtoollibs.broker import BrokerAgent from qpidtoollibs.config import parse_qpidd_conf try: @@ -31,6 +32,10 @@ except ImportError: # QMF address for the HA broker object. HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" +# Define these defaults here rather than in add_option because we want +# to use qpidd.conf for defaults if --config is specified and +# these defaults otherwise: +DEFAULTS = { "broker":"localhost", "timeout":10.0} class ExitStatus(Exception): """Raised if a command want's a non-0 exit status from the script""" @@ -40,31 +45,41 @@ class Command: commands = {} def add(self, optname, metavar, type, help): - self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") + self.op.add_option(optname, metavar=metavar, type=type, help=help) - def __init__(self, name, help, arg_names=[]): + def __init__(self, name, help, arg_names=[], connect_agent=True): + """@param connect_agent true if we should establish a QMF agent connection""" Command.commands[name] = self self.name = name + self.connect_agent = connect_agent self.arg_names = arg_names usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help) self.help = help self.op=optparse.OptionParser(usage) - self.op.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="
", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:]") - self.op.add_option("--sasl-mechanism", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") - self.op.add_option("--ssl-certificate", action="store", type="string", metavar="", help="Client SSL certificate (PEM Format)") - self.op.add_option("--ssl-key", action="store", type="string", metavar="", help="Client SSL private key (PEM Format)") + def help_default(what): return " (Default %s)"%DEFAULTS[what] + self.op.add_option("-b", "--broker", metavar="
", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:]"+help_default("broker")) + self.op.add_option("--timeout", type="float", metavar="", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout")) + self.op.add_option("--sasl-mechanism", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override") + self.op.add_option("--ssl-certificate", metavar="", help="Client SSL certificate (PEM Format)") + self.op.add_option("--ssl-key", metavar="", help="Client SSL private key (PEM Format)") self.op.add_option("--config", metavar="", help="Connect to the local qpidd by reading its configuration file.") def connect(self, opts): conn_options = {} - if opts.config: # Use broker config file. + if not opts.broker: opts.broker = DEFAULTS["broker"] + url = URL(opts.broker) + if opts.config: # Use broker config file for defaults config = parse_qpidd_conf(opts.config) - def joinif(separator, items): return separator.join(filter(None, items)) - userpass = joinif("/", [config.get("ha-username"), config.get("ha-password")]) - hostport = joinif(":", ["localhost", config.get("port")]) - opts.broker = joinif("@", [userpass, hostport]) - opts.sasl_mechanism = config.get("ha-mechanism") - + if not url.user: url.user = config.get("ha-username") + if not url.password: url.password = config.get("ha-password") + if not url.port: url.port = config.get("port") + opts.broker = str(url) + if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism") + if not opts.timeout: + timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval") + if timeout: opts.timeout = float(timeout) + else: # Use DEFAULTS + if not opts.timeout: opts.timeout = DEFAULTS["timeout"] if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate if opts.ssl_key: @@ -72,9 +87,12 @@ class Command: self.op.error("missing '--ssl-certificate' (required by '--ssl-key')") conn_options['ssl_keyfile'] = opts.ssl_key conn_options['client_properties'] = {'qpid.ha-admin' : 1} + if opts.timeout: + conn_options['timeout'] = opts.timeout + conn_options['heartbeat'] = int(opts.timeout) connection = Connection.establish(opts.broker, **conn_options) - qmf_broker = BrokerAgent(connection) - ha_broker = qmf_broker.getHaBroker() + qmf_broker = self.connect_agent and BrokerAgent(connection) + ha_broker = self.connect_agent and qmf_broker.getHaBroker() return (connection, qmf_broker, ha_broker) def execute(self, args): @@ -82,14 +100,22 @@ class Command: if len(args) != len(self.arg_names)+1: self.op.print_help() raise Exception("Wrong number of arguments") - connection, qmf_broker, ha_broker = self.connect(opts) - if not ha_broker: raise Exception("HA module is not loaded on broker at %s" % opts.broker) + self.connection, qmf_broker, ha_broker = self.connect(opts) + if self.connect_agent and not ha_broker: + raise Exception("HA module is not loaded on broker at %s" % opts.broker) try: self.do_execute(qmf_broker, ha_broker, opts, args) - finally: connection.close() + finally: self.connection.close() def do_execute(self, qmf_broker, opts, args): raise Exception("Command '%s' is not yet implemented"%self.name) +class PingCmd(Command): + def __init__(self): + Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False) + def do_execute(self, qmf_broker, ha_broker, opts, args): + self.connection.session() # Make sure we can establish a session. +PingCmd() + class PromoteCmd(Command): def __init__(self): Command.__init__(self, "promote","Promote broker from backup to primary") @@ -101,19 +127,20 @@ class StatusCmd(Command): def __init__(self): Command.__init__(self, "status", "Print HA status") self.op.add_option( - "--expect", type="string", metavar="", + "--expect", metavar="", help="Don't print status. Return 0 if it matches , 1 otherwise") self.op.add_option( "--is-primary", action="store_true", default=False, help="Don't print status. Return 0 if the broker is primary, 1 otherwise") self.op.add_option( "--all", action="store_true", default=False, - help="Print status for all brokers in the cluster.") + help="Print status for all brokers in the cluster") def do_execute(self, qmf_broker, ha_broker, opts, args): if opts.is_primary: if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1) if opts.expect: if opts.expect != ha_broker.status: raise ExitStatus(1) + # The brokersUrl setting is not in python UR format, simpler parsing here. brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(",")) if opts.all and brokers: opts.all=False @@ -129,7 +156,6 @@ class StatusCmd(Command): print b, e else: print ha_broker.status - StatusCmd() class ReplicateCmd(Command): @@ -200,7 +226,7 @@ def main(argv): except ExitStatus, e: return e.status except Exception, e: - print e + print "%s: %s"%(type(e).__name__, e) return 1 if __name__ == "__main__": -- cgit v1.2.1