summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--cpp/src/qpid/ha/Primary.cpp138
1 files changed, 125 insertions, 13 deletions
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 63cba14484..cd731fe732 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -19,11 +19,14 @@
*
*/
#include "Backup.h"
-#include "ConnectionExcluder.h"
#include "HaBroker.h"
#include "Primary.h"
#include "ReplicatingSubscription.h"
+#include "RemoteBackup.h"
+#include "ConnectionObserver.h"
+#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/ConfigurationObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
@@ -32,31 +35,140 @@
namespace qpid {
namespace ha {
+using sys::Mutex;
+
+namespace {
+// No-op connection observer, allows all connections.
+class PrimaryConnectionObserver : public broker::ConnectionObserver
+{
+ public:
+ PrimaryConnectionObserver(Primary& p) : primary(p) {}
+ void opened(broker::Connection& c) { primary.opened(c); }
+ void closed(broker::Connection& c) { primary.closed(c); }
+ private:
+ Primary& primary;
+};
+
+class PrimaryConfigurationObserver : public broker::ConfigurationObserver
+{
+ public:
+ PrimaryConfigurationObserver(Primary& p) : primary(p) {}
+ void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
+ void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
+ private:
+ Primary& primary;
+};
+
+} // namespace
+
Primary* Primary::instance = 0;
-Primary::Primary(HaBroker& hb, const IdSet& backups) :
- haBroker(hb), logPrefix("HA primary: "),
- unready(0), activated(false),
- queues(hb.getBroker(), hb.getReplicationTest(), backups)
+Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
+ haBroker(hb), logPrefix("HA primary: "), active(false)
{
assert(instance == 0);
instance = this; // Let queue replicators find us.
- if (backups.empty()) {
- QPID_LOG(debug, logPrefix << "Not waiting for backups");
- activated = true;
+ if (expect.empty()) {
+ QPID_LOG(debug, logPrefix << "No initial backups");
}
else {
- QPID_LOG(debug, logPrefix << "Waiting for backups: " << backups);
+ QPID_LOG(debug, logPrefix << "Waiting for initial backups: " << expect);
+ for (BrokerInfo::Set::iterator i = expect.begin(); i != expect.end(); ++i) {
+ boost::shared_ptr<RemoteBackup> backup(
+ new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest()));
+ backups[i->getSystemId()] = backup;
+ if (!backup->isReady()) initialBackups.insert(backup);
+ }
+ }
+
+ configurationObserver.reset(new PrimaryConfigurationObserver(*this));
+ haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
+
+ Mutex::ScopedLock l(lock); // We are now active as a configurationObserver
+ checkReady(l);
+ // Allow client connections
+ connectionObserver.reset(new PrimaryConnectionObserver(*this));
+ haBroker.getObserver()->setObserver(connectionObserver);
+}
+
+Primary::~Primary() {
+ haBroker.getObserver()->setObserver(boost::shared_ptr<broker::ConnectionObserver>());
+ haBroker.getBroker().getConfigurationObservers().remove(configurationObserver);
+}
+
+void Primary::checkReady(Mutex::ScopedLock&) {
+ if (!active && initialBackups.empty()) {
+ active = true;
+ QPID_LOG(notice, logPrefix << "Active, all initial queues are safe.");
+ Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
+ haBroker.activate();
+ }
+}
+
+void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
+ if (i != backups.end() && i->second->isReady()) {
+ initialBackups.erase(i->second);
+ checkReady(l);
}
}
void Primary::readyReplica(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
- if (queues.ready(rs.getQueue(), rs.getBrokerInfo().getSystemId()) && !activated) {
- activated = true;
- haBroker.activate();
- QPID_LOG(notice, logPrefix << "Activated, all initial queues are safe.");
+ BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
+ if (i != backups.end()) {
+ i->second->ready(rs.getQueue());
+ checkReady(i, l);
+ }
+}
+
+void Primary::queueCreate(const QueuePtr& q) {
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
+ i->second->queueCreate(q);
+ checkReady(i, l);
+ }
+}
+
+void Primary::queueDestroy(const QueuePtr& q) {
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
+ i->second->queueDestroy(q);
+ checkReady(l);
+}
+
+void Primary::opened(broker::Connection& connection) {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo info;
+ if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ haBroker.getMembership().add(info);
+ BackupMap::iterator i = backups.find(info.getSystemId());
+ if (i == backups.end()) {
+ QPID_LOG(debug, logPrefix << "New backup connected: " << info);
+ backups[info.getSystemId()].reset(
+ new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest()));
+ }
+ else {
+ QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
+ }
+ }
+}
+
+void Primary::closed(broker::Connection& connection) {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo info;
+ if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ haBroker.getMembership().remove(info.getSystemId());
+ QPID_LOG(debug, "HA primary: Backup disconnected: " << info);
+ backups.erase(info.getSystemId());
+ // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
}
}
+
+boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info)
+{
+ BackupMap::iterator i = backups.find(info.getSystemId());
+ return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
+}
+
}} // namespace qpid::ha