summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-10 20:03:38 +0000
committerAlan Conway <aconway@apache.org>2012-07-10 20:03:38 +0000
commit2bce9c26170416e8a9d30db2f98da0bfa91c09e0 (patch)
tree81439d5524dcce42ca83fd91325885adff253b00 /cpp
parent91d37992d85ae721e1ab2e0549d43bf464553a4b (diff)
downloadqpid-python-2bce9c26170416e8a9d30db2f98da0bfa91c09e0.tar.gz
QPID-4126: HA primary times out expected backups.
After a failure, the newly-promoted primary broker guards messages for each of the backups that were connected at the time of the failure. It waits for them to reconnect to the new primary before becoming active. This patch introduces a time-out so that if an expected backup fails to fail-over within the time limit, the primary will cancel the guards for that backup and carry on. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1359872 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/ha/HaPlugin.cpp2
-rw-r--r--cpp/src/qpid/ha/Primary.cpp69
-rw-r--r--cpp/src/qpid/ha/Primary.h11
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.cpp16
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.h11
-rw-r--r--cpp/src/qpid/ha/Settings.h3
-rwxr-xr-xcpp/src/tests/ha_tests.py33
7 files changed, 112 insertions, 33 deletions
diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp
index 42758c4689..360b6892ab 100644
--- a/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/cpp/src/qpid/ha/HaPlugin.cpp
@@ -46,6 +46,8 @@ struct Options : public qpid::Options {
"Password for connections between HA brokers")
("ha-mechanism", optValue(settings.mechanism, "MECH"),
"Authentication mechanism for connections between HA brokers")
+ ("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"),
+ "Maximum time to wait for an expected backup to connect and become ready.")
;
}
};
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 56598c2b5a..a1ce81c3a5 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -32,6 +32,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
namespace qpid {
@@ -40,7 +41,7 @@ namespace ha {
using sys::Mutex;
namespace {
-// No-op connection observer, allows all connections.
+
class PrimaryConnectionObserver : public broker::ConnectionObserver
{
public:
@@ -61,6 +62,15 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver
Primary& primary;
};
+class ExpectedBackupTimerTask : public sys::TimerTask {
+ public:
+ ExpectedBackupTimerTask(Primary& p, sys::AbsTime deadline)
+ : TimerTask(deadline, "ExpectedBackupTimerTask"), primary(p) {}
+ void fire() { primary.timeoutExpectedBackups(); }
+ private:
+ Primary& primary;
+};
+
} // namespace
Primary* Primary::instance = 0;
@@ -75,16 +85,24 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
else {
// NOTE: RemoteBackups must be created before we set the ConfigurationObserver
- // orr ConnectionObserver so that there is no client activity while
+ // or ConnectionObserver so that there is no client activity while
// the QueueGuards are created.
QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- bool guard = true; // Create queue guards immediately for expected backups.
boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest(), guard));
+ new RemoteBackup(
+ *i, haBroker.getBroker(), haBroker.getReplicationTest(),
+ true, // Create queue guards immediately for expected backups.
+ false // Not yet connected.
+ ));
backups[i->getSystemId()] = backup;
- if (!backup->isReady()) initialBackups.insert(backup);
+ if (!backup->isReady()) expectedBackups.insert(backup);
}
+ // Set timeout for expected brokers to connect and become ready.
+ sys::Duration timeout(hb.getSettings().backupTimeout*sys::TIME_SEC);
+ sys::AbsTime deadline(sys::now(), timeout);
+ timerTask.reset(new ExpectedBackupTimerTask(*this, deadline));
+ hb.getBroker().getTimer().add(timerTask);
}
configurationObserver.reset(new PrimaryConfigurationObserver(*this));
@@ -98,14 +116,15 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
Primary::~Primary() {
+ if (timerTask) timerTask->cancel();
haBroker.getBroker().getConfigurationObservers().remove(configurationObserver);
}
void Primary::checkReady(Mutex::ScopedLock&) {
- if (!active && initialBackups.empty()) {
+ if (!active && expectedBackups.empty()) {
active = true;
- QPID_LOG(notice, logPrefix << "All initial backups are ready.");
Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
+ QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
haBroker.activate();
}
}
@@ -113,13 +132,26 @@ void Primary::checkReady(Mutex::ScopedLock&) {
void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->isReady()) {
BrokerInfo info = i->second->getBrokerInfo();
+ QPID_LOG(info, "Expected backup is ready: " << info);
info.setStatus(READY);
haBroker.addBroker(info);
- initialBackups.erase(i->second);
+ expectedBackups.erase(i->second);
checkReady(l);
}
}
+void Primary::timeoutExpectedBackups() {
+ sys::Mutex::ScopedLock l(lock);
+ if (active) return; // Already activated
+ for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end(); ++i)
+ {
+ QPID_LOG(error, "Expected backup timed out: " << (*i)->getBrokerInfo());
+ (*i)->cancel();
+ }
+ expectedBackups.clear();
+ checkReady(l);
+}
+
void Primary::readyReplica(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
@@ -153,12 +185,17 @@ void Primary::opened(broker::Connection& connection) {
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
QPID_LOG(debug, logPrefix << "New backup connected: " << info);
- bool guard = false; // Lazy-create guards for new backups. Creating them here could deadlock.
backups[info.getSystemId()].reset(
- new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest(), guard));
+ new RemoteBackup(
+ info, haBroker.getBroker(), haBroker.getReplicationTest(),
+ false, // Lazy-create guards for new backups, creating now deadlocks
+ true // Backup is connected
+ ));
}
else {
QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
+ i->second->setConnected(true);
+ checkReady(i, l);
}
haBroker.addBroker(info);
}
@@ -171,14 +208,16 @@ void Primary::closed(broker::Connection& connection) {
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- haBroker.removeBroker(info.getSystemId());
QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
+ haBroker.removeBroker(info.getSystemId());
+ BackupMap::iterator i = backups.find(info.getSystemId());
+ if (i != backups.end()) i->second->setConnected(false);
}
- // NOTE: we do not modify backups here, we only add to the known backups set
- // we never remove from it.
-
+ // NOTE: we do not remove from the backups map here, the backups map holds
+ // all the backups we know about whether connected or not.
+ //
// It is possible for a backup connection to be rejected while we are a backup,
- // but the closed is seen when we have become primary. Removing the entry
+ // but the closed is seen after we have become primary. Removing the entry
// from backups in this case would be incorrect.
}
diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h
index 3de579a88d..26883f4416 100644
--- a/cpp/src/qpid/ha/Primary.h
+++ b/cpp/src/qpid/ha/Primary.h
@@ -26,6 +26,7 @@
#include "BrokerInfo.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <map>
#include <string>
@@ -38,6 +39,10 @@ class ConnectionObserver;
class ConfigurationObserver;
}
+namespace sys {
+class TimerTask;
+}
+
namespace ha {
class HaBroker;
class ReplicatingSubscription;
@@ -74,6 +79,9 @@ class Primary
boost::shared_ptr<QueueGuard> getGuard(const QueuePtr& q, const BrokerInfo&);
+ // Called in timer thread when the deadline for expected backups expires.
+ void timeoutExpectedBackups();
+
private:
typedef std::map<types::Uuid, boost::shared_ptr<RemoteBackup> > BackupMap;
typedef std::set<boost::shared_ptr<RemoteBackup> > BackupSet;
@@ -89,7 +97,7 @@ class Primary
* Set of expected backups that must be ready before we declare ourselves
* active
*/
- BackupSet initialBackups;
+ BackupSet expectedBackups;
/**
* Map of all the remote backups we know about: any expected backups plus
* all actual backups that have connected. We do not remove entries when a
@@ -98,6 +106,7 @@ class Primary
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;
+ boost::intrusive_ptr<sys::TimerTask> timerTask;
static Primary* instance;
};
diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp
index 7e65b287b0..2b8a0077f5 100644
--- a/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -31,21 +31,21 @@ namespace ha {
using sys::Mutex;
RemoteBackup::RemoteBackup(
- const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg) :
- logPrefix("Primary remote backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt),
- createGuards(cg)
+ const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg, bool con) :
+ logPrefix("Primary remote backup "+info.getLogId()+": "),
+ brokerInfo(info), replicationTest(rt),
+ createGuards(cg), connected(con)
{
QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
}
-RemoteBackup::~RemoteBackup() {
- for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
- i->second->cancel();
-}
+RemoteBackup::~RemoteBackup() { cancel(); }
+
+void RemoteBackup::cancel() { guards.clear(); }
bool RemoteBackup::isReady() {
- return initialQueues.empty();
+ return connected && initialQueues.empty();
}
void RemoteBackup::initialQueue(const QueuePtr& q) {
diff --git a/cpp/src/qpid/ha/RemoteBackup.h b/cpp/src/qpid/ha/RemoteBackup.h
index f2e46c8042..8ea539b167 100644
--- a/cpp/src/qpid/ha/RemoteBackup.h
+++ b/cpp/src/qpid/ha/RemoteBackup.h
@@ -52,12 +52,17 @@ class RemoteBackup
typedef boost::shared_ptr<broker::Queue> QueuePtr;
/** Note: isReady() can be true after construction */
- RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, bool createGuards);
+ RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt,
+ bool createGuards, bool connected);
~RemoteBackup();
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
+ /** Is the remote backup connected? */
+ void setConnected(bool b) { connected=b; }
+ bool isConnected() const { return connected; }
+
/** ReplicatingSubscription associated with queue is ready.
* Note: may set isReady()
*/
@@ -72,6 +77,9 @@ class RemoteBackup
/**@return true when all initial queues for this backup are ready. */
bool isReady();
+ /**Cancel all queue guards, called if we are timed out. */
+ void cancel();
+
BrokerInfo getBrokerInfo() const { return brokerInfo; }
private:
typedef std::map<QueuePtr, GuardPtr> GuardMap;
@@ -83,6 +91,7 @@ class RemoteBackup
GuardMap guards;
QueueSet initialQueues;
bool createGuards;
+ bool connected;
void initialQueue(const QueuePtr&);
};
diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h
index 213a5f64d5..1a612aee66 100644
--- a/cpp/src/qpid/ha/Settings.h
+++ b/cpp/src/qpid/ha/Settings.h
@@ -34,7 +34,7 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), replicateDefault(NONE)
+ Settings() : cluster(false), replicateDefault(NONE), backupTimeout(2)
{}
bool cluster; // True if we are a cluster member.
@@ -42,6 +42,7 @@ class Settings
std::string brokerUrl;
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
+ double backupTimeout;
private:
};
}} // namespace qpid::ha
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 3612837214..01dac2664d 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -869,9 +869,9 @@ class RecoveryTests(BrokerTest):
cluster[3].promote() # New primary, backups will be 1 and 2
cluster[3].wait_status("recovering")
- def trySync(s):
+ def assertSyncTimeout(s):
try:
- s.sync(timeout=.1)
+ s.sync(timeout=.01)
self.fail("Expected Timeout exception")
except Timeout: pass
@@ -879,18 +879,18 @@ class RecoveryTests(BrokerTest):
s2 = cluster.connect(3).session().sender("q2;{create:always}")
# Verify that messages sent are not completed
for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
- trySync(s1)
+ assertSyncTimeout(s1)
self.assertEqual(s1.unsettled(), 100)
- trySync(s2)
+ assertSyncTimeout(s2)
self.assertEqual(s2.unsettled(), 100)
# Verify we can receive even if sending is on hold:
cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
# Restart backups, verify queues are released only when both backups are up
cluster.restart(1)
- trySync(s1)
+ assertSyncTimeout(s1)
self.assertEqual(s1.unsettled(), 100)
- trySync(s2)
+ assertSyncTimeout(s2)
self.assertEqual(s2.unsettled(), 100)
self.assertEqual(cluster[3].ha_status(), "recovering")
cluster.restart(2)
@@ -904,7 +904,26 @@ class RecoveryTests(BrokerTest):
s1.session.connection.close()
s2.session.connection.close()
-
+ def test_expected_backup_timeout(self):
+ """Verify that we time-out expected backups and release held queues
+ after a configured interval
+ """
+ cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
+ cluster[0].wait_status("active") # Primary ready
+ for b in cluster[1:4]: b.wait_status("ready") # Backups ready
+ for i in [0,1]: cluster.kill(i, False)
+ cluster[2].promote() # New primary, backups will be 1 and 2
+ cluster[2].wait_status("recovering")
+ # Should not go active till the expected backup connects or times out.
+ self.assertEqual(cluster[2].ha_status(), "recovering")
+ # Messages should be held expected backup times out
+ s = cluster[2].connect().session().sender("q;{create:always}")
+ for i in xrange(100): s.send(str(i), sync=False)
+ # Verify message held initially.
+ try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
+ except Timeout: pass
+ s.sync(timeout=1) # And released after the timeout.
+ self.assertEqual(cluster[2].ha_status(), "active")
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)