diff options
Diffstat (limited to 'cpp/src/qpid/ha/Primary.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 138 |
1 files changed, 125 insertions, 13 deletions
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index 63cba14484..cd731fe732 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -19,11 +19,14 @@ * */ #include "Backup.h" -#include "ConnectionExcluder.h" #include "HaBroker.h" #include "Primary.h" #include "ReplicatingSubscription.h" +#include "RemoteBackup.h" +#include "ConnectionObserver.h" +#include "qpid/assert.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/ConfigurationObserver.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" @@ -32,31 +35,140 @@ namespace qpid { namespace ha { +using sys::Mutex; + +namespace { +// No-op connection observer, allows all connections. +class PrimaryConnectionObserver : public broker::ConnectionObserver +{ + public: + PrimaryConnectionObserver(Primary& p) : primary(p) {} + void opened(broker::Connection& c) { primary.opened(c); } + void closed(broker::Connection& c) { primary.closed(c); } + private: + Primary& primary; +}; + +class PrimaryConfigurationObserver : public broker::ConfigurationObserver +{ + public: + PrimaryConfigurationObserver(Primary& p) : primary(p) {} + void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); } + void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } + private: + Primary& primary; +}; + +} // namespace + Primary* Primary::instance = 0; -Primary::Primary(HaBroker& hb, const IdSet& backups) : - haBroker(hb), logPrefix("HA primary: "), - unready(0), activated(false), - queues(hb.getBroker(), hb.getReplicationTest(), backups) +Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : + haBroker(hb), logPrefix("HA primary: "), active(false) { assert(instance == 0); instance = this; // Let queue replicators find us. - if (backups.empty()) { - QPID_LOG(debug, logPrefix << "Not waiting for backups"); - activated = true; + if (expect.empty()) { + QPID_LOG(debug, logPrefix << "No initial backups"); } else { - QPID_LOG(debug, logPrefix << "Waiting for backups: " << backups); + QPID_LOG(debug, logPrefix << "Waiting for initial backups: " << expect); + for (BrokerInfo::Set::iterator i = expect.begin(); i != expect.end(); ++i) { + boost::shared_ptr<RemoteBackup> backup( + new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest())); + backups[i->getSystemId()] = backup; + if (!backup->isReady()) initialBackups.insert(backup); + } + } + + configurationObserver.reset(new PrimaryConfigurationObserver(*this)); + haBroker.getBroker().getConfigurationObservers().add(configurationObserver); + + Mutex::ScopedLock l(lock); // We are now active as a configurationObserver + checkReady(l); + // Allow client connections + connectionObserver.reset(new PrimaryConnectionObserver(*this)); + haBroker.getObserver()->setObserver(connectionObserver); +} + +Primary::~Primary() { + haBroker.getObserver()->setObserver(boost::shared_ptr<broker::ConnectionObserver>()); + haBroker.getBroker().getConfigurationObservers().remove(configurationObserver); +} + +void Primary::checkReady(Mutex::ScopedLock&) { + if (!active && initialBackups.empty()) { + active = true; + QPID_LOG(notice, logPrefix << "Active, all initial queues are safe."); + Mutex::ScopedUnlock u(lock); // Don't hold lock across callback + haBroker.activate(); + } +} + +void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { + if (i != backups.end() && i->second->isReady()) { + initialBackups.erase(i->second); + checkReady(l); } } void Primary::readyReplica(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); - if (queues.ready(rs.getQueue(), rs.getBrokerInfo().getSystemId()) && !activated) { - activated = true; - haBroker.activate(); - QPID_LOG(notice, logPrefix << "Activated, all initial queues are safe."); + BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId()); + if (i != backups.end()) { + i->second->ready(rs.getQueue()); + checkReady(i, l); + } +} + +void Primary::queueCreate(const QueuePtr& q) { + Mutex::ScopedLock l(lock); + for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { + i->second->queueCreate(q); + checkReady(i, l); + } +} + +void Primary::queueDestroy(const QueuePtr& q) { + Mutex::ScopedLock l(lock); + for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) + i->second->queueDestroy(q); + checkReady(l); +} + +void Primary::opened(broker::Connection& connection) { + Mutex::ScopedLock l(lock); + BrokerInfo info; + if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { + haBroker.getMembership().add(info); + BackupMap::iterator i = backups.find(info.getSystemId()); + if (i == backups.end()) { + QPID_LOG(debug, logPrefix << "New backup connected: " << info); + backups[info.getSystemId()].reset( + new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest())); + } + else { + QPID_LOG(debug, logPrefix << "Known backup connected: " << info); + } + } +} + +void Primary::closed(broker::Connection& connection) { + Mutex::ScopedLock l(lock); + BrokerInfo info; + if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { + haBroker.getMembership().remove(info.getSystemId()); + QPID_LOG(debug, "HA primary: Backup disconnected: " << info); + backups.erase(info.getSystemId()); + // FIXME aconway 2012-06-01: changes to expected backup set for unready queues. } } + +boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info) +{ + BackupMap::iterator i = backups.find(info.getSystemId()); + return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q); +} + }} // namespace qpid::ha |
