summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--cpp/src/qpid/ha/Primary.cpp122
1 files changed, 88 insertions, 34 deletions
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index e4bf9671b8..93dbbbea85 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
@@ -39,6 +41,8 @@ namespace qpid {
namespace ha {
using sys::Mutex;
+using namespace std;
+using namespace framing;
namespace {
@@ -58,6 +62,8 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver
PrimaryConfigurationObserver(Primary& p) : primary(p) {}
void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
+ void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
+ void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
private:
Primary& primary;
};
@@ -76,8 +82,11 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary* Primary::instance = 0;
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
- haBroker(hb), logPrefix("Primary: "), active(false)
+ haBroker(hb), membership(hb.getMembership()),
+ logPrefix("Primary: "), active(false),
+ replicationTest(hb.getSettings().replicateDefault.get())
{
+ hb.getMembership().setStatus(RECOVERING);
assert(instance == 0);
instance = this; // Let queue replicators find us.
if (expect.empty()) {
@@ -89,11 +98,10 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// the QueueGuards are created.
QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getReplicationTest(), false));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
- backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards
+ backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
}
// Set timeout for expected brokers to connect and become ready.
sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
@@ -102,14 +110,21 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
hb.getBroker().getTimer().add(timerTask);
}
+
+ // Remove backup tag property from outgoing link properties.
+ framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+ hb.getBroker().setLinkClientProperties(linkProperties);
+
configurationObserver.reset(new PrimaryConfigurationObserver(*this));
haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
Mutex::ScopedLock l(lock); // We are now active as a configurationObserver
checkReady(l);
+
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
- haBroker.getObserver()->setObserver(connectionObserver);
+ haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
}
Primary::~Primary() {
@@ -122,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) {
active = true;
Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
- haBroker.activate();
+ membership.setStatus(ACTIVE);
}
}
@@ -130,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->reportReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
- haBroker.addBroker(info);
+ membership.add(info);
if (expectedBackups.erase(i->second)) {
QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
checkReady(l);
@@ -155,9 +170,10 @@ void Primary::timeoutExpectedBackups() {
expectedBackups.erase(i++);
backups.erase(info.getSystemId());
rb->cancel();
- // Downgrade the broker to CATCHUP
+ // Downgrade the broker's status to CATCHUP
+ // The broker will get this status change when it eventually connects.
info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ membership.add(info);
}
else ++i;
}
@@ -178,46 +194,78 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
}
}
+// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
- // Throw if there is an invalid replication level in the queue settings.
- haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings);
- Mutex::ScopedLock l(lock);
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
- i->second->queueCreate(q);
- checkReady(i, l);
+ // Set replication argument.
+ ReplicateLevel level = replicationTest.useLevel(*q);
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
+ q->addArgument(QPID_REPLICATE, printable(level).str());
+ if (level) {
+ // Give each queue a unique id to avoid confusion of same-named queues.
+ q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
+ i->second->queueCreate(q);
+ checkReady(i, l);
+ }
}
}
+// NOTE: Called with queue registry lock held.
void Primary::queueDestroy(const QueuePtr& q) {
+ QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueDestroy(q);
checkReady(l);
}
+// NOTE: Called with exchange registry lock held.
+void Primary::exchangeCreate(const ExchangePtr& ex) {
+ ReplicateLevel level = replicationTest.useLevel(*ex);
+ QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
+ << " replication: " << printable(level));
+ FieldTable args = ex->getArgs();
+ args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
+ if (level) {
+ // Give each exchange a unique id to avoid confusion of same-named exchanges.
+ args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
+ }
+ ex->setArgs(args);
+}
+
+// NOTE: Called with exchange registry lock held.
+void Primary::exchangeDestroy(const ExchangePtr& ex) {
+ QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
+ // Do nothing
+ }
+
void Primary::opened(broker::Connection& connection) {
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
- QPID_LOG(debug, logPrefix << "New backup connected: " << info);
- boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(info, haBroker.getReplicationTest(), true));
+ QPID_LOG(info, logPrefix << "New backup connected: " << info);
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
{
// Avoid deadlock with queue registry lock.
Mutex::ScopedUnlock u(lock);
- backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
+ backup->setCatchupQueues(haBroker.getBroker().getQueues(), false);
}
backups[info.getSystemId()] = backup;
+ i = backups.find(info.getSystemId());
}
else {
- QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
- i->second->setConnected(true);
- checkReady(i, l);
+ QPID_LOG(info, logPrefix << "Known backup connected: " << info);
+ i->second->setConnection(&connection);
}
- if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
+ if (i != backups.end()) checkReady(i, l);
}
else
QPID_LOG(debug, logPrefix << "Accepted client connection "
@@ -225,19 +273,20 @@ void Primary::opened(broker::Connection& connection) {
}
void Primary::closed(broker::Connection& connection) {
- // NOTE: It is possible for a backup connection to be rejected while we are
- // a backup, but closed() is called after we have become primary.
- //
- // For this reason we do not remove from the backups map here, the backups
- // map holds all the backups we know about whether connected or not.
- //
- Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
- haBroker.removeBroker(info.getSystemId());
+ Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
- if (i != backups.end()) i->second->setConnected(false);
+ // NOTE: It is possible for a backup connection to be rejected while we
+ // are a backup, but closed() is called after we have become primary.
+ // Checking isConnected() lets us ignore such spurious closes.
+ if (i != backups.end() && i->second->isConnected()) {
+ QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
+ membership.remove(info.getSystemId());
+ expectedBackups.erase(i->second);
+ backups.erase(i);
+ checkReady(l);
+ }
}
}
@@ -249,4 +298,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI
return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
}
+Role* Primary::promote() {
+ QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+ return 0;
+}
+
}} // namespace qpid::ha