summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-19 19:16:21 +0000
committerAlan Conway <aconway@apache.org>2012-07-19 19:16:21 +0000
commite20bed55215d15ed453e7a19de73fb28bf1aca84 (patch)
tree8717bc48b53062f5acb544037f782725727d1f09
parent6af5d7550a52f3bce03f9517bfbdfefa1a90cbe8 (diff)
downloadqpid-python-e20bed55215d15ed453e7a19de73fb28bf1aca84.tar.gz
QPID-4145: HA Minor fixes to recovery
- Demote timed-out backups from ready to catch-up. - Don't cancel connected backups on timeout, only disconnected ones. - Don't allow promotion of a catch-up broker. - Minor logging improvement. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363487 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp13
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py5
7 files changed, 44 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 232ed26f00..cf2b5d9bea 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -540,7 +540,7 @@ const string REPLICATE_DEFAULT="replicateDefault";
// Received the ha-broker configuration object for the primary broker.
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
- QPID_LOG(debug, logPrefix << "HA Broker response: " << values);
+ QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
ReplicateLevel primary = replicationTest.replicateLevel(
values[REPLICATE_DEFAULT].asString());
@@ -549,7 +549,8 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
<< ") does not match primary (" << primary << ")"));
haBroker.setMembership(values[MEMBERS].asList());
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what());
+ QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()
+ << ": " << values);
haBroker.shutdown();
}
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 4fc0317fe5..a0502b94ae 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -141,10 +141,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
switch (getStatus()) {
case JOINING: recover(); break;
case CATCHUP:
- // FIXME aconway 2012-04-27: don't allow promotion in catch-up
- // QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
- // throw Exception("Still catching up, cannot be promoted.");
- recover();
+ QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+ throw Exception("Still catching up, cannot be promoted.");
break;
case READY: recover(); break;
case RECOVERING: break;
@@ -243,12 +241,12 @@ namespace {
bool checkTransition(BrokerStatus from, BrokerStatus to) {
// Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
static const BrokerStatus TRANSITIONS[][2] = {
- { CATCHUP, RECOVERING }, // FIXME aconway 2012-04-27: illegal transition, allow while fixing behavior
- { JOINING, CATCHUP }, // Connected to primary
- { JOINING, RECOVERING }, // Chosen as initial primary.
- { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
{ READY, RECOVERING }, // Chosen as new primary
- { RECOVERING, ACTIVE }
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
};
static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
for (size_t i = 0; i < N; ++i) {
@@ -290,10 +288,9 @@ void HaBroker::setMembership(const Variant::List& brokers) {
membership.assign(brokers);
QPID_LOG(debug, logPrefix << "Membership update: " << membership);
BrokerInfo info;
- // Check if my own status has been updated to READY
- if (getStatus() == CATCHUP &&
- membership.get(systemId, info) && info.getStatus() == READY)
- setStatus(READY, l);
+ // Update my status to what the primary thinks.
+ if (membership.get(systemId, info) && status != info.getStatus())
+ setStatus(info.getStatus(), l);
membershipUpdated(brokers);
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index af6066fc5b..f3df5b2263 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -140,12 +140,23 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
void Primary::timeoutExpectedBackups() {
sys::Mutex::ScopedLock l(lock);
if (active) return; // Already activated
- for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end(); ++i)
+ // Remove records for any expectedBackups that are not yet connected
+ // Allow backups that are connected to continue becoming ready.
+ for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();)
{
- QPID_LOG(error, "Expected backup timed out: " << (*i)->getBrokerInfo());
- (*i)->cancel();
+ boost::shared_ptr<RemoteBackup> rb = *i;
+ if (!rb->isConnected()) {
+ BrokerInfo info = rb->getBrokerInfo();
+ QPID_LOG(error, "Expected backup timed out: " << info);
+ expectedBackups.erase(i++);
+ backups.erase(info.getSystemId());
+ rb->cancel();
+ // Downgrade the broker to CATCHUP
+ info.setStatus(CATCHUP);
+ haBroker.addBroker(info);
+ }
+ else ++i;
}
- expectedBackups.clear();
checkReady(l);
}
@@ -191,6 +202,7 @@ void Primary::opened(broker::Connection& connection) {
i->second->setConnected(true);
checkReady(i, l);
}
+ if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
haBroker.addBroker(info);
}
else
@@ -218,6 +230,7 @@ void Primary::closed(broker::Connection& connection) {
boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info)
{
+ Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
}
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 1f4ff4e48b..28ec855c2d 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -135,6 +135,4 @@ void QueueGuard::complete(const QueuedMessage& qm) {
qm.payload->getIngressCompletion().finishCompleter();
}
-// FIXME aconway 2012-06-04: TODO support for timeout.
-
}} // namespaces qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index acac39fcb9..a55fa00562 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -123,7 +123,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
false/*exclusive*/, "", 0, settings);
- // FIXME aconway 2012-05-22: use a finite credit window
+ // FIXME aconway 2012-05-22: use a finite credit window?
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 810913ba8f..54383750fd 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -29,6 +29,7 @@ namespace qpid {
namespace ha {
using sys::Mutex;
+using boost::bind;
RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
logPrefix("Primary remote backup "+info.getLogId()+": "),
@@ -43,7 +44,11 @@ void RemoteBackup::createGuards(broker::QueueRegistry& queues)
RemoteBackup::~RemoteBackup() { cancel(); }
-void RemoteBackup::cancel() { guards.clear(); }
+void RemoteBackup::cancel() {
+ for_each(guards.begin(), guards.end(),
+ bind(&QueueGuard::cancel, bind(&GuardMap::value_type::second, _1)));
+ guards.clear();
+}
bool RemoteBackup::isReady() {
return connected && initialQueues.empty();
@@ -70,9 +75,11 @@ namespace {
typedef std::set<boost::shared_ptr<broker::Queue> > QS;
struct QueueSetPrinter {
const QS& qs;
- QueueSetPrinter(const QS& q) : qs(q) {}
+ std::string prefix;
+ QueueSetPrinter(const std::string& p, const QS& q) : qs(q), prefix(p) {}
};
std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) {
+ if (!qp.qs.empty()) o << qp.prefix;
for (QS::const_iterator i = qp.qs.begin(); i != qp.qs.end(); ++i)
o << (*i)->getName() << " ";
return o;
@@ -82,7 +89,7 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) {
void RemoteBackup::ready(const QueuePtr& q) {
initialQueues.erase(q);
QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()
- << " remaining unready: " << QueueSetPrinter(initialQueues));
+ << QueueSetPrinter(", waiting for: ", initialQueues));
if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f900a841d5..a841423121 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -211,7 +211,7 @@ class HaCluster(object):
if promote_next: self[(i+1) % len(self)].promote()
def restart(self, i):
- """Start a broker with the same name and data directory. It will get
+ """Start a broker with the same port, name and data directory. It will get
a separate log file: foo.n.log"""
b = self._brokers[i]
self._brokers[i] = HaBroker(
@@ -910,7 +910,8 @@ class RecoveryTests(BrokerTest):
def test_expected_backup_timeout(self):
"""Verify that we time-out expected backups and release held queues
- after a configured interval
+ after a configured interval. Verify backup is demoted to catch-up,
+ but can still rejoin.
"""
cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
cluster[0].wait_status("active") # Primary ready