summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-04-24 17:54:05 +0000
committerAlan Conway <aconway@apache.org>2014-04-24 17:54:05 +0000
commit1d3b4560f8a7f212976b536376a976b3b41f489b (patch)
tree82c4baadc8f4159bea4fa8ad872f9858061c727e
parent67f29e0685b4bfaa0721a25ae901c3b5e18c0db3 (diff)
downloadqpid-python-1d3b4560f8a7f212976b536376a976b3b41f489b.tar.gz
QPID-5719: HA becomes unresponsive once any of the brokers are SIGSTOPed
- Added timeout to qpid-ha. - qpidd init script pings broker to verify it is not hung. - updated documentation in qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml. The new results for the cases mentioned in the bug: a] stopped ALL brokers: rgmanager restarts the entire cluster but data is lost. Equivalent to killing all the brokers at once. This does not affect quorum because only qpidd services are affected, not other services managed by cman. b] stopped the primary: rgmanager restarts the primary after a timeout and promotes one of the backups. c] stopped a backup: rgmanager restarts the backups after a timeout. Clients that are actively sending messages may see a delay while backup is restarted. Note you need to set link-heartbeat-interval in qpidd.conf. The default is very high (120 seconds), it should be set lower to see recovery from sigstop in a reasonable time. See the updated documentation in qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1589807 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/etc/qpidd-primary.in3
-rwxr-xr-xqpid/cpp/etc/qpidd.in34
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py8
-rw-r--r--qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml29
-rwxr-xr-xqpid/tools/src/py/qpid-ha70
6 files changed, 100 insertions, 48 deletions
diff --git a/qpid/cpp/etc/qpidd-primary.in b/qpid/cpp/etc/qpidd-primary.in
index 377f2d623a..3119ebac6e 100755
--- a/qpid/cpp/etc/qpidd-primary.in
+++ b/qpid/cpp/etc/qpidd-primary.in
@@ -45,6 +45,7 @@ QPID_HA_OPTIONS="--config $QPID_CONFIG"
# Source configuration
test -f @sysconfdir@/sysconfig/$prog && source @sysconfdir@/sysconfig/$prog
+source /etc/rc.d/init.d/functions
# Check presence of executables/scripts
for f in $QPID_INIT $QPID_HA; do
@@ -53,8 +54,6 @@ done
QPID_HA="$QPID_HA $QPID_HA_OPTIONS"
-source /etc/rc.d/init.d/functions
-
RETVAL=0
status() {
diff --git a/qpid/cpp/etc/qpidd.in b/qpid/cpp/etc/qpidd.in
index 55697492e4..7db59e369f 100755
--- a/qpid/cpp/etc/qpidd.in
+++ b/qpid/cpp/etc/qpidd.in
@@ -41,32 +41,36 @@ pidfile=/var/run/qpidd.pid
# The following variables can be overridden in @sysconfdir@/sysconfig/$prog
QPID_BIN=@sbindir@/$prog
-QPID_CONFIG=@confdir@/qpidd.conf
QPID_DATA_DIR=/var/lib/qpidd
+QPID_CONFIG=@confdir@/qpidd.conf
+QPID_HA=@bindir@/qpid-ha
+QPID_HA_OPTIONS="--config $QPID_CONFIG"
# Source configuration
-if [ -f @sysconfdir@/sysconfig/$prog ] ; then
- . @sysconfdir@/sysconfig/$prog
-fi
+test -f @sysconfdir@/sysconfig/$prog && source @sysconfdir@/sysconfig/$prog
+source /etc/rc.d/init.d/functions
-# Source function library.
-. /etc/rc.d/init.d/functions
+# Check presence of executables/scripts
+for f in $QPID_BIN $QPID_HA; do
+ test -x $f || { echo "$f not found or not executable"; exit 5; }
+done
-RETVAL=0
+QPID_HA="$QPID_HA $QPID_HA_OPTIONS"
-#ensure binary is present and executable
-if [[ !(-x @sbindir@/$prog) ]] ; then
- echo "@sbindir@/$prog not found or not executable"
- exit 5
-fi
+RETVAL=0
-#ensure user has sufficient permissions
+# Ensure user has sufficient permissions
runuser -s /bin/sh qpidd -c "echo x > /dev/null" 2> /dev/null || RETVAL=4
if [ $RETVAL = 4 ]; then
echo "user had insufficient privilege";
exit $RETVAL
fi
+do_status() {
+ # Check PID file and ping for liveness
+ status $prog && $QPID_HA ping
+}
+
start() {
echo -n $"Starting Qpid AMQP daemon: "
daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config $QPID_CONFIG --data-dir $QPID_DATA_DIR --daemon $QPIDD_OPTIONS
@@ -77,7 +81,7 @@ start() {
touch $pidfile
chown qpidd.qpidd $pidfile
[ -x /sbin/restorecon ] && /sbin/restorecon $pidfile
- runuser - -s /bin/sh qpidd -c "$QPID_BIN --check > $pidfile"
+ runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG --check > $pidfile"
fi
return $RETVAL
}
@@ -106,7 +110,7 @@ case "$1" in
$1
;;
status)
- status $prog
+ do_status
RETVAL=$?
;;
force-reload)
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 2bf8677cd1..132892cb2f 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -131,12 +131,14 @@ class HaBroker(Broker):
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
# stall on an address that doesn't currently have a broker running.
- "--link-heartbeat-interval=%s"%(HaBroker.heartbeat),
"--max-negotiate-time=1000",
"--ha-cluster=%s"%ha_cluster]
# Add default --log-enable arguments unless args already has --log arguments.
if not [l for l in args if l.startswith("--log")]:
args += ["--log-enable=info+", "--log-enable=debug+:ha::"]
+ if not [h for h in args if h.startswith("--link-heartbeat-interval")]:
+ args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)]
+
if ha_replicate is not None:
args += [ "--ha-replicate=%s"%ha_replicate ]
if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index a40fd92922..cddbd90756 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1213,6 +1213,14 @@ class RecoveryTests(HaBrokerTest):
cluster.bounce(0, promote_next=False)
cluster[0].promote()
+ def test_stalled_backup(self):
+ """Make sure that a stalled backup broker does not stall the primary"""
+ # FIXME aconway 2014-04-15: merge with test_join_ready_cluster?
+ cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"])
+ os.kill(cluster[1].pid, signal.SIGSTOP)
+ s = cluster[0].connect().session()
+ s.sender("q;{create:always}").send("x")
+ self.assertEqual("x", s.receiver("q").fetch(0).content)
class ConfigurationTests(HaBrokerTest):
"""Tests for configuration settings."""
diff --git a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
index 4a4b8d9a5c..0a1cbc5e3d 100644
--- a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
+++ b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
@@ -335,9 +335,9 @@ ssl_addr = "ssl:" host [":" port]'
</entry>
<entry>
<para>
- Interval for the broker to check link health and re-connect links if need
- be. If you want brokers to fail over quickly you can set this to a
- fraction of a second, for example: 0.1.
+ Interval for backup brokers to check the link to the primary re-connect if need be.
+ Default 2 seconds. Can be set lower for faster failover, e.g. 0.1 seconds.
+ Setting it too low will result in excessive link-checking activity on the brokers.
</para>
</entry>
</row>
@@ -348,8 +348,12 @@ ssl_addr = "ssl:" host [":" port]'
</entry>
<entry>
<para>
- Heartbeat interval for replication links. The link will be assumed broken
- if there is no heartbeat for twice the interval.
+ Heartbeat interval for replication links and timeout for broker status checks.
+ It may take up to this interval for rgmanager to detect a hung or partitioned broker.
+ The primary may take up to twice this interval to detect a hung or partitioned backup.
+ Clients sending messages may be held up during this time.
+ Default 120 seconds: you will probably want to set this to a lower value e.g. 10.
+ If set too low, a slow broker may be considered as failed and killed.
</para>
</entry>
</row>
@@ -430,8 +434,13 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl
<clusternode name="node2.example.com" nodeid="2"/>
<clusternode name="node3.example.com" nodeid="3"/>
</clusternodes>
+
<!-- Resouce Manager configuration. -->
- <rm>
+
+ status_poll_interval is the interval in seconds that the resource manager checks the status
+ of managed services. This affects how quickly the manager will detect failed services.
+ -->
+ <rm status_poll_interval="1">
<!--
There is a failoverdomain for each node containing just that node.
This lets us stipulate that the qpidd service should always run on each node.
@@ -455,8 +464,12 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl
<!-- This script promotes the qpidd broker on this node to primary. -->
<script file="/etc/init.d/qpidd-primary" name="qpidd-primary"/>
- <!-- This is a virtual IP address for client traffic. -->
- <ip address="20.0.20.200" monitor_link="1"/>
+ <!--
+ This is a virtual IP address for client traffic.
+ monitor_link="yes" means monitor the health of the NIC used for the VIP.
+ sleeptime="0" means don't delay when failing over the VIP to a new address.
+ -->
+ <ip address="20.0.20.200" monitor_link="yes" sleeptime="0"/>
</resources>
<!-- There is a qpidd service on each node, it should be restarted if it fails. -->
diff --git a/qpid/tools/src/py/qpid-ha b/qpid/tools/src/py/qpid-ha
index 1b22213d0c..8d91ac829b 100755
--- a/qpid/tools/src/py/qpid-ha
+++ b/qpid/tools/src/py/qpid-ha
@@ -22,6 +22,7 @@
import optparse, sys, time, os, re
from qpid.messaging import Connection
from qpid.messaging import Message as QpidMessage
+from qpid.util import URL
from qpidtoollibs.broker import BrokerAgent
from qpidtoollibs.config import parse_qpidd_conf
try:
@@ -31,6 +32,10 @@ except ImportError:
# QMF address for the HA broker object.
HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
+# Define these defaults here rather than in add_option because we want
+# to use qpidd.conf for defaults if --config is specified and
+# these defaults otherwise:
+DEFAULTS = { "broker":"localhost", "timeout":10.0}
class ExitStatus(Exception):
"""Raised if a command want's a non-0 exit status from the script"""
@@ -40,31 +45,41 @@ class Command:
commands = {}
def add(self, optname, metavar, type, help):
- self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store")
+ self.op.add_option(optname, metavar=metavar, type=type, help=help)
- def __init__(self, name, help, arg_names=[]):
+ def __init__(self, name, help, arg_names=[], connect_agent=True):
+ """@param connect_agent true if we should establish a QMF agent connection"""
Command.commands[name] = self
self.name = name
+ self.connect_agent = connect_agent
self.arg_names = arg_names
usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
self.help = help
self.op=optparse.OptionParser(usage)
- self.op.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
- self.op.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
- self.op.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
- self.op.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ def help_default(what): return " (Default %s)"%DEFAULTS[what]
+ self.op.add_option("-b", "--broker", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]"+help_default("broker"))
+ self.op.add_option("--timeout", type="float", metavar="<seconds>", help="Give up if the broker does not respond within the timeout. 0 means wait forever"+help_default("timeout"))
+ self.op.add_option("--sasl-mechanism", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override")
+ self.op.add_option("--ssl-certificate", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ self.op.add_option("--ssl-key", metavar="<key>", help="Client SSL private key (PEM Format)")
self.op.add_option("--config", metavar="<path/to/qpidd.conf>", help="Connect to the local qpidd by reading its configuration file.")
def connect(self, opts):
conn_options = {}
- if opts.config: # Use broker config file.
+ if not opts.broker: opts.broker = DEFAULTS["broker"]
+ url = URL(opts.broker)
+ if opts.config: # Use broker config file for defaults
config = parse_qpidd_conf(opts.config)
- def joinif(separator, items): return separator.join(filter(None, items))
- userpass = joinif("/", [config.get("ha-username"), config.get("ha-password")])
- hostport = joinif(":", ["localhost", config.get("port")])
- opts.broker = joinif("@", [userpass, hostport])
- opts.sasl_mechanism = config.get("ha-mechanism")
-
+ if not url.user: url.user = config.get("ha-username")
+ if not url.password: url.password = config.get("ha-password")
+ if not url.port: url.port = config.get("port")
+ opts.broker = str(url)
+ if not opts.sasl_mechanism: opts.sasl_mechanism = config.get("ha-mechanism")
+ if not opts.timeout:
+ timeout = config.get("ha-heartbeat-interval") or config.get("link-heartbeat-interval")
+ if timeout: opts.timeout = float(timeout)
+ else: # Use DEFAULTS
+ if not opts.timeout: opts.timeout = DEFAULTS["timeout"]
if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = opts.sasl_mechanism
if opts.ssl_certificate: conn_options['ssl_certfile'] = opts.ssl_certificate
if opts.ssl_key:
@@ -72,9 +87,12 @@ class Command:
self.op.error("missing '--ssl-certificate' (required by '--ssl-key')")
conn_options['ssl_keyfile'] = opts.ssl_key
conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+ if opts.timeout:
+ conn_options['timeout'] = opts.timeout
+ conn_options['heartbeat'] = int(opts.timeout)
connection = Connection.establish(opts.broker, **conn_options)
- qmf_broker = BrokerAgent(connection)
- ha_broker = qmf_broker.getHaBroker()
+ qmf_broker = self.connect_agent and BrokerAgent(connection)
+ ha_broker = self.connect_agent and qmf_broker.getHaBroker()
return (connection, qmf_broker, ha_broker)
def execute(self, args):
@@ -82,14 +100,22 @@ class Command:
if len(args) != len(self.arg_names)+1:
self.op.print_help()
raise Exception("Wrong number of arguments")
- connection, qmf_broker, ha_broker = self.connect(opts)
- if not ha_broker: raise Exception("HA module is not loaded on broker at %s" % opts.broker)
+ self.connection, qmf_broker, ha_broker = self.connect(opts)
+ if self.connect_agent and not ha_broker:
+ raise Exception("HA module is not loaded on broker at %s" % opts.broker)
try: self.do_execute(qmf_broker, ha_broker, opts, args)
- finally: connection.close()
+ finally: self.connection.close()
def do_execute(self, qmf_broker, opts, args):
raise Exception("Command '%s' is not yet implemented"%self.name)
+class PingCmd(Command):
+ def __init__(self):
+ Command.__init__(self, "ping","Check if the broker is alive and responding", connect_agent=False)
+ def do_execute(self, qmf_broker, ha_broker, opts, args):
+ self.connection.session() # Make sure we can establish a session.
+PingCmd()
+
class PromoteCmd(Command):
def __init__(self):
Command.__init__(self, "promote","Promote broker from backup to primary")
@@ -101,19 +127,20 @@ class StatusCmd(Command):
def __init__(self):
Command.__init__(self, "status", "Print HA status")
self.op.add_option(
- "--expect", type="string", metavar="<status>",
+ "--expect", metavar="<status>",
help="Don't print status. Return 0 if it matches <status>, 1 otherwise")
self.op.add_option(
"--is-primary", action="store_true", default=False,
help="Don't print status. Return 0 if the broker is primary, 1 otherwise")
self.op.add_option(
"--all", action="store_true", default=False,
- help="Print status for all brokers in the cluster.")
+ help="Print status for all brokers in the cluster")
def do_execute(self, qmf_broker, ha_broker, opts, args):
if opts.is_primary:
if not ha_broker.status in ["active", "recovering"]: raise ExitStatus(1)
if opts.expect:
if opts.expect != ha_broker.status: raise ExitStatus(1)
+ # The brokersUrl setting is not in python UR format, simpler parsing here.
brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", ha_broker.brokersUrl).split(","))
if opts.all and brokers:
opts.all=False
@@ -129,7 +156,6 @@ class StatusCmd(Command):
print b, e
else:
print ha_broker.status
-
StatusCmd()
class ReplicateCmd(Command):
@@ -200,7 +226,7 @@ def main(argv):
except ExitStatus, e:
return e.status
except Exception, e:
- print e
+ print "%s: %s"%(type(e).__name__, e)
return 1
if __name__ == "__main__":