From e20bed55215d15ed453e7a19de73fb28bf1aca84 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jul 2012 19:16:21 +0000 Subject: QPID-4145: HA Minor fixes to recovery - Demote timed-out backups from ready to catch-up. - Don't cancel connected backups on timeout, only disconnected ones. - Don't allow promotion of a catch-up broker. - Minor logging improvement. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1363487 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 5 +++-- qpid/cpp/src/qpid/ha/HaBroker.cpp | 23 ++++++++++------------- qpid/cpp/src/qpid/ha/Primary.cpp | 21 +++++++++++++++++---- qpid/cpp/src/qpid/ha/QueueGuard.cpp | 2 -- qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 2 +- qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 13 ++++++++++--- qpid/cpp/src/tests/ha_tests.py | 5 +++-- 7 files changed, 44 insertions(+), 27 deletions(-) diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 232ed26f00..cf2b5d9bea 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -540,7 +540,7 @@ const string REPLICATE_DEFAULT="replicateDefault"; // Received the ha-broker configuration object for the primary broker. void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { - QPID_LOG(debug, logPrefix << "HA Broker response: " << values); + QPID_LOG(trace, logPrefix << "HA Broker response: " << values); ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); ReplicateLevel primary = replicationTest.replicateLevel( values[REPLICATE_DEFAULT].asString()); @@ -549,7 +549,8 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { << ") does not match primary (" << primary << ")")); haBroker.setMembership(values[MEMBERS].asList()); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()); + QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what() + << ": " << values); haBroker.shutdown(); } } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 4fc0317fe5..a0502b94ae 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -141,10 +141,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, switch (getStatus()) { case JOINING: recover(); break; case CATCHUP: - // FIXME aconway 2012-04-27: don't allow promotion in catch-up - // QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); - // throw Exception("Still catching up, cannot be promoted."); - recover(); + QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); + throw Exception("Still catching up, cannot be promoted."); break; case READY: recover(); break; case RECOVERING: break; @@ -243,12 +241,12 @@ namespace { bool checkTransition(BrokerStatus from, BrokerStatus to) { // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. static const BrokerStatus TRANSITIONS[][2] = { - { CATCHUP, RECOVERING }, // FIXME aconway 2012-04-27: illegal transition, allow while fixing behavior - { JOINING, CATCHUP }, // Connected to primary - { JOINING, RECOVERING }, // Chosen as initial primary. - { CATCHUP, READY }, // Caught up all queues, ready to take over. + { JOINING, CATCHUP }, // Connected to primary + { JOINING, RECOVERING }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. { READY, RECOVERING }, // Chosen as new primary - { RECOVERING, ACTIVE } + { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. + { RECOVERING, ACTIVE } // All expected backups are ready }; static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); for (size_t i = 0; i < N; ++i) { @@ -290,10 +288,9 @@ void HaBroker::setMembership(const Variant::List& brokers) { membership.assign(brokers); QPID_LOG(debug, logPrefix << "Membership update: " << membership); BrokerInfo info; - // Check if my own status has been updated to READY - if (getStatus() == CATCHUP && - membership.get(systemId, info) && info.getStatus() == READY) - setStatus(READY, l); + // Update my status to what the primary thinks. + if (membership.get(systemId, info) && status != info.getStatus()) + setStatus(info.getStatus(), l); membershipUpdated(brokers); } diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index af6066fc5b..f3df5b2263 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -140,12 +140,23 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { void Primary::timeoutExpectedBackups() { sys::Mutex::ScopedLock l(lock); if (active) return; // Already activated - for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end(); ++i) + // Remove records for any expectedBackups that are not yet connected + // Allow backups that are connected to continue becoming ready. + for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();) { - QPID_LOG(error, "Expected backup timed out: " << (*i)->getBrokerInfo()); - (*i)->cancel(); + boost::shared_ptr rb = *i; + if (!rb->isConnected()) { + BrokerInfo info = rb->getBrokerInfo(); + QPID_LOG(error, "Expected backup timed out: " << info); + expectedBackups.erase(i++); + backups.erase(info.getSystemId()); + rb->cancel(); + // Downgrade the broker to CATCHUP + info.setStatus(CATCHUP); + haBroker.addBroker(info); + } + else ++i; } - expectedBackups.clear(); checkReady(l); } @@ -191,6 +202,7 @@ void Primary::opened(broker::Connection& connection) { i->second->setConnected(true); checkReady(i, l); } + if (info.getStatus() == JOINING) info.setStatus(CATCHUP); haBroker.addBroker(info); } else @@ -218,6 +230,7 @@ void Primary::closed(broker::Connection& connection) { boost::shared_ptr Primary::getGuard(const QueuePtr& q, const BrokerInfo& info) { + Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); return i == backups.end() ? boost::shared_ptr() : i->second->guard(q); } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 1f4ff4e48b..28ec855c2d 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -135,6 +135,4 @@ void QueueGuard::complete(const QueuedMessage& qm) { qm.payload->getIngressCompletion().finishCompleter(); } -// FIXME aconway 2012-06-04: TODO support for timeout. - }} // namespaces qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index acac39fcb9..a55fa00562 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -123,7 +123,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings); - // FIXME aconway 2012-05-22: use a finite credit window + // FIXME aconway 2012-05-22: use a finite credit window? peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 810913ba8f..54383750fd 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -29,6 +29,7 @@ namespace qpid { namespace ha { using sys::Mutex; +using boost::bind; RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : logPrefix("Primary remote backup "+info.getLogId()+": "), @@ -43,7 +44,11 @@ void RemoteBackup::createGuards(broker::QueueRegistry& queues) RemoteBackup::~RemoteBackup() { cancel(); } -void RemoteBackup::cancel() { guards.clear(); } +void RemoteBackup::cancel() { + for_each(guards.begin(), guards.end(), + bind(&QueueGuard::cancel, bind(&GuardMap::value_type::second, _1))); + guards.clear(); +} bool RemoteBackup::isReady() { return connected && initialQueues.empty(); @@ -70,9 +75,11 @@ namespace { typedef std::set > QS; struct QueueSetPrinter { const QS& qs; - QueueSetPrinter(const QS& q) : qs(q) {} + std::string prefix; + QueueSetPrinter(const std::string& p, const QS& q) : qs(q), prefix(p) {} }; std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { + if (!qp.qs.empty()) o << qp.prefix; for (QS::const_iterator i = qp.qs.begin(); i != qp.qs.end(); ++i) o << (*i)->getName() << " "; return o; @@ -82,7 +89,7 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { void RemoteBackup::ready(const QueuePtr& q) { initialQueues.erase(q); QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() - << " remaining unready: " << QueueSetPrinter(initialQueues)); + << QueueSetPrinter(", waiting for: ", initialQueues)); if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f900a841d5..a841423121 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -211,7 +211,7 @@ class HaCluster(object): if promote_next: self[(i+1) % len(self)].promote() def restart(self, i): - """Start a broker with the same name and data directory. It will get + """Start a broker with the same port, name and data directory. It will get a separate log file: foo.n.log""" b = self._brokers[i] self._brokers[i] = HaBroker( @@ -910,7 +910,8 @@ class RecoveryTests(BrokerTest): def test_expected_backup_timeout(self): """Verify that we time-out expected backups and release held queues - after a configured interval + after a configured interval. Verify backup is demoted to catch-up, + but can still rejoin. """ cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]); cluster[0].wait_status("active") # Primary ready -- cgit v1.2.1