diff options
author | Alan Conway <aconway@apache.org> | 2014-04-24 17:54:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-04-24 17:54:05 +0000 |
commit | 1d3b4560f8a7f212976b536376a976b3b41f489b (patch) | |
tree | 82c4baadc8f4159bea4fa8ad872f9858061c727e | |
parent | 67f29e0685b4bfaa0721a25ae901c3b5e18c0db3 (diff) | |
download | qpid-python-1d3b4560f8a7f212976b536376a976b3b41f489b.tar.gz |
QPID-5719: HA becomes unresponsive once any of the brokers are SIGSTOPed
- Added timeout to qpid-ha.
- qpidd init script pings broker to verify it is not hung.
- updated documentation in qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml.
The new results for the cases mentioned in the bug:
a] stopped ALL brokers: rgmanager restarts the entire cluster but data is lost.
Equivalent to killing all the brokers at once. This does not affect quorum because
only qpidd services are affected, not other services managed by cman.
b] stopped the primary: rgmanager restarts the primary after a timeout and promotes one of the backups.
c] stopped a backup: rgmanager restarts the backups after a timeout.
Clients that are actively sending messages may see a delay while backup is restarted.
Note you need to set link-heartbeat-interval in qpidd.conf. The default is very
high (120 seconds), it should be set lower to see recovery from sigstop in a
reasonable time.
See the updated documentation in qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1589807 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/etc/qpidd-primary.in | 3 | ||||
-rwxr-xr-x | qpid/cpp/etc/qpidd.in | 34 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 8 | ||||
-rw-r--r-- | qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml | 29 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-ha | 70 |
6 files changed, 100 insertions, 48 deletions
diff --git a/qpid/cpp/etc/qpidd-primary.in b/qpid/cpp/etc/qpidd-primary.in index 377f2d623a..3119ebac6e 100755 --- a/qpid/cpp/etc/qpidd-primary.in +++ b/qpid/cpp/etc/qpidd-primary.in @@ -45,6 +45,7 @@ QPID_HA_OPTIONS="--config $QPID_CONFIG" # Source configuration test -f @sysconfdir@/sysconfig/$prog && source @sysconfdir@/sysconfig/$prog +source /etc/rc.d/init.d/functions # Check presence of executables/scripts for f in $QPID_INIT $QPID_HA; do @@ -53,8 +54,6 @@ done QPID_HA="$QPID_HA $QPID_HA_OPTIONS" -source /etc/rc.d/init.d/functions - RETVAL=0 status() { diff --git a/qpid/cpp/etc/qpidd.in b/qpid/cpp/etc/qpidd.in index 55697492e4..7db59e369f 100755 --- a/qpid/cpp/etc/qpidd.in +++ b/qpid/cpp/etc/qpidd.in @@ -41,32 +41,36 @@ pidfile=/var/run/qpidd.pid # The following variables can be overridden in @sysconfdir@/sysconfig/$prog QPID_BIN=@sbindir@/$prog -QPID_CONFIG=@confdir@/qpidd.conf QPID_DATA_DIR=/var/lib/qpidd +QPID_CONFIG=@confdir@/qpidd.conf +QPID_HA=@bindir@/qpid-ha +QPID_HA_OPTIONS="--config $QPID_CONFIG" # Source configuration -if [ -f @sysconfdir@/sysconfig/$prog ] ; then - . @sysconfdir@/sysconfig/$prog -fi +test -f @sysconfdir@/sysconfig/$prog && source @sysconfdir@/sysconfig/$prog +source /etc/rc.d/init.d/functions -# Source function library. -. /etc/rc.d/init.d/functions +# Check presence of executables/scripts +for f in $QPID_BIN $QPID_HA; do + test -x $f || { echo "$f not found or not executable"; exit 5; } +done -RETVAL=0 +QPID_HA="$QPID_HA $QPID_HA_OPTIONS" -#ensure binary is present and executable -if [[ !(-x @sbindir@/$prog) ]] ; then - echo "@sbindir@/$prog not found or not executable" - exit 5 -fi +RETVAL=0 -#ensure user has sufficient permissions +# Ensure user has sufficient permissions runuser -s /bin/sh qpidd -c "echo x > /dev/null" 2> /dev/null || RETVAL=4 if [ $RETVAL = 4 ]; then echo "user had insufficient privilege"; exit $RETVAL fi +do_status() { + # Check PID file and ping for liveness + status $prog && $QPID_HA ping +} + start() { echo -n $"Starting Qpid AMQP daemon: " daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config $QPID_CONFIG --data-dir $QPID_DATA_DIR --daemon $QPIDD_OPTIONS @@ -77,7 +81,7 @@ start() { touch $pidfile chown qpidd.qpidd $pidfile [ -x /sbin/restorecon ] && /sbin/restorecon $pidfile - runuser - -s /bin/sh qpidd -c "$QPID_BIN --check > $pidfile" + runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG --check > $pidfile" fi return $RETVAL } @@ -106,7 +110,7 @@ case "$1" in $1 ;; status) - status $prog + do_status RETVAL=$? ;; force-reload) diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 2bf8677cd1..132892cb2f 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -131,12 +131,14 @@ class HaBroker(Broker): "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont # stall on an address that doesn't currently have a broker running. - "--link-heartbeat-interval=%s"%(HaBroker.heartbeat), "--max-negotiate-time=1000", "--ha-cluster=%s"%ha_cluster] # Add default --log-enable arguments unless args already has --log arguments. if not [l for l in args if l.startswith("--log")]: args += ["--log-enable=info+", "--log-enable=debug+:ha::"] + if not [h for h in args if h.startswith("--link-heartbeat-interval")]: + args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)] + if ha_replicate is not None: args += [ "--ha-replicate=%s"%ha_replicate ] if brokers_url: args += [ "--ha-brokers-url", brokers_url ] diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index a40fd92922..cddbd90756 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1213,6 +1213,14 @@ class RecoveryTests(HaBrokerTest): cluster.bounce(0, promote_next=False) cluster[0].promote() + def test_stalled_backup(self): + """Make sure that a stalled backup broker does not stall the primary""" + # FIXME aconway 2014-04-15: merge with test_join_ready_cluster? + cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"]) + os.kill(cluster[1].pid, signal.SIGSTOP) + s = cluster[0].connect().session() + s.sender("q;{create:always}").send("x") + self.assertEqual("x", s.receiver("q").fetch(0).content) class ConfigurationTests(HaBrokerTest): """Tests for configuration settings.""" diff --git a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml index 4a4b8d9a5c..0a1cbc5e3d 100644 --- a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml +++ b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml @@ -335,9 +335,9 @@ ssl_addr = "ssl:" host [":" port]' </entry> <entry> <para> - Interval for the broker to check link health and re-connect links if need - be. If you want brokers to fail over quickly you can set this to a - fraction of a second, for example: 0.1. + Interval for backup brokers to check the link to the primary re-connect if need be. + Default 2 seconds. Can be set lower for faster failover, e.g. 0.1 seconds. + Setting it too low will result in excessive link-checking activity on the brokers. </para> </entry> </row> @@ -348,8 +348,12 @@ ssl_addr = "ssl:" host [":" port]' </entry> <entry> <para> - Heartbeat interval for replication links. The link will be assumed broken - if there is no heartbeat for twice the interval. + Heartbeat interval for replication links and timeout for broker status checks. + It may take up to this interval for rgmanager to detect a hung or partitioned broker. + The primary may take up to twice this interval to detect a hung or partitioned backup. + Clients sending messages may be held up during this time. + Default 120 seconds: you will probably want to set this to a lower value e.g. 10. + If set too low, a slow broker may be considered as failed and killed. </para> </entry> </row> @@ -430,8 +434,13 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl <clusternode name="node2.example.com" nodeid="2"/> <clusternode name="node3.example.com" nodeid="3"/> </clusternodes> + <!-- Resouce Manager configuration. --> - <rm> + + status_poll_interval is the interval in seconds that the resource manager checks the status + of managed services. This affects how quickly the manager will detect failed services. + --> + <rm status_poll_interval="1"> <!-- There is a failoverdomain for each node containing just that node. This lets us stipulate that the qpidd service should always run on each node. @@ -455,8 +464,12 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl <!-- This script promotes the qpidd broker on this node to primary. --> <script file="/etc/init.d/qpidd-primary" name="qpidd-primary"/> - <!-- This is a virtual IP address for client traffic. --> - <ip address="20.0.20.200" monitor_link="1"/> + <!-- + This is a virtual IP address for client traffic. + monitor_link="yes" means monitor the health of the NIC used for the VIP. + sleeptime="0" means don't delay when failing over the VIP to a new address. + --> + <ip address="20.0.20.200" monitor_link="yes" sleeptime="0"/> </resources> <!-- There is a qpidd service on each node, it should be restarted if it fails. --> diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha index 1b22213d0c..8d91ac829b 100755 --- a/qpid/tools/src/py/qpid-ha +++ b/qpid/tools/src/py/qpid-ha @@ -22,6 +22,7 @@ import optparse, sys, time, os, re from qpid.messaging import Connection from qpid.messaging import Message as QpidMessage +from qpid.util import URL from qpidtoollibs.broker import BrokerAgent from qpidtoollibs.config import parse_qpidd_conf try: @@ -31,6 +32,10 @@ except ImportError: # QMF address for the HA broker object. HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" +# Define these defaults here rather than in add_option because we want +# to use qpidd.conf for defaults if --config is specified and +# these defaults otherwise: +DEFAULTS = { "broker":"localhost", "timeout":10.0} class ExitStatus(Exception): """Raised if a command want's a non-0 exit status from the script""" @@ -40,31 +45,41 @@ class Command: commands = {} def add(self, optname, metavar, type, help): - self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") + self.op.add_option(optname, metavar=metavar, type=type, help=help) - def __init__(self, name, help, arg_names=[]): + def __init__(self, name, help, arg_names=[], connect_agent=True): + """@param connect_agent true if we should establish a QMF agent connection""" Command.commands[name] = self self.name = name + self.connect_agent = connect_agent self.arg_names = arg_names usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help) self.help = help self.op=optparse.OptionParser(usage) - self.op.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") - self.op.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") - self.op.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)") - self.op.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)") + def help_default(what): return " (Default %s)"%DEFAULTS[what] + self.op.add_option("-b", "--broker", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]"+help_default("broker")) + self.op.add_option("--timeout", type="float", metavar="<seconds>", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout")) + self.op.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override") + self.op.add_option("--ssl-certificate", metavar="<cert>", help="Client SSL certificate (PEM Format)") + self.op.add_option("--ssl-key", metavar="<key>", help="Client SSL private key (PEM Format)") self.op.add_option("--config", metavar="<path/to/qpidd.conf>", help="Connect to the local qpidd by reading its configuration file.") def connect(self, opts): conn_options = {} - if opts.config: # Use broker config file. + if not opts.broker: opts.broker = DEFAULTS["broker"] + url = URL(opts.broker) + if opts.config: # Use broker config file for defaults config = parse_qpidd_conf(opts.config) - def joinif(separator, items): return separator.join(filter(None, items)) - userpass = joinif("/", [config.get("ha-username"), config.get("ha-password")]) - hostport = joinif(":", ["localhost", config.get("port")]) - opts.broker = joinif("@", [userpass, hostport]) - opts.sasl_mechanism = config.get("ha-mechanism") - + if not url.user: url.user = config.get("ha-username") + if not url.password: url.password = config.get("ha-password") + if not url.port: url.port = config.get("port") + opts.broker = str(url) + if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism") + if not opts.timeout: + timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval") + if timeout: opts.timeout = float(timeout) + else: # Use DEFAULTS + if not opts.timeout: opts.timeout = DEFAULTS["timeout"] if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate if opts.ssl_key: @@ -72,9 +87,12 @@ class Command: self.op.error("missing '--ssl-certificate' (required by '--ssl-key')") conn_options['ssl_keyfile'] = opts.ssl_key conn_options['client_properties'] = {'qpid.ha-admin' : 1} + if opts.timeout: + conn_options['timeout'] = opts.timeout + conn_options['heartbeat'] = int(opts.timeout) connection = Connection.establish(opts.broker, **conn_options) - qmf_broker = BrokerAgent(connection) - ha_broker = qmf_broker.getHaBroker() + qmf_broker = self.connect_agent and BrokerAgent(connection) + ha_broker = self.connect_agent and qmf_broker.getHaBroker() return (connection, qmf_broker, ha_broker) def execute(self, args): @@ -82,14 +100,22 @@ class Command: if len(args) != len(self.arg_names)+1: self.op.print_help() raise Exception("Wrong number of arguments") - connection, qmf_broker, ha_broker = self.connect(opts) - if not ha_broker: raise Exception("HA module is not loaded on broker at %s" % opts.broker) + self.connection, qmf_broker, ha_broker = self.connect(opts) + if self.connect_agent and not ha_broker: + raise Exception("HA module is not loaded on broker at %s" % opts.broker) try: self.do_execute(qmf_broker, ha_broker, opts, args) - finally: connection.close() + finally: self.connection.close() def do_execute(self, qmf_broker, opts, args): raise Exception("Command '%s' is not yet implemented"%self.name) +class PingCmd(Command): + def __init__(self): + Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False) + def do_execute(self, qmf_broker, ha_broker, opts, args): + self.connection.session() # Make sure we can establish a session. +PingCmd() + class PromoteCmd(Command): def __init__(self): Command.__init__(self, "promote","Promote broker from backup to primary") @@ -101,19 +127,20 @@ class StatusCmd(Command): def __init__(self): Command.__init__(self, "status", "Print HA status") self.op.add_option( - "--expect", type="string", metavar="<status>", + "--expect", metavar="<status>", help="Don't print status. Return 0 if it matches <status>, 1 otherwise") self.op.add_option( "--is-primary", action="store_true", default=False, help="Don't print status. Return 0 if the broker is primary, 1 otherwise") self.op.add_option( "--all", action="store_true", default=False, - help="Print status for all brokers in the cluster.") + help="Print status for all brokers in the cluster") def do_execute(self, qmf_broker, ha_broker, opts, args): if opts.is_primary: if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1) if opts.expect: if opts.expect != ha_broker.status: raise ExitStatus(1) + # The brokersUrl setting is not in python UR format, simpler parsing here. brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(",")) if opts.all and brokers: opts.all=False @@ -129,7 +156,6 @@ class StatusCmd(Command): print b, e else: print ha_broker.status - StatusCmd() class ReplicateCmd(Command): @@ -200,7 +226,7 @@ def main(argv): except ExitStatus, e: return e.status except Exception, e: - print e + print "%s: %s"%(type(e).__name__, e) return 1 if __name__ == "__main__": |