diff options
author | Alan Conway <aconway@apache.org> | 2013-08-01 20:27:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-01 20:27:05 +0000 |
commit | 9bf9a96c336635b415235dcaaf9524d64f1504f0 (patch) | |
tree | 819256b98024e2f024db67a1d19aa4cb6c250050 | |
parent | 72cd95abd9ab2f59c164d60e8f5b0c43cb0b2c0c (diff) | |
download | qpid-python-9bf9a96c336635b415235dcaaf9524d64f1504f0.tar.gz |
QPID-4327: HA get rid of Primary::get() singleton.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509422 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Primary.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 18 |
3 files changed, 11 insertions, 14 deletions
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index 38e75e7818..bae651a3fc 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -82,8 +82,6 @@ class ExpectedBackupTimerTask : public sys::TimerTask { } // namespace -Primary* Primary::instance = 0; - Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), logPrefix("Primary: "), active(false), @@ -92,8 +90,6 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : hb.getMembership().setStatus(RECOVERING); broker::QueueRegistry& queues = hb.getBroker().getQueues(); queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1)); - assert(instance == 0); - instance = this; // Let queue replicators find us. if (expect.empty()) { QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups."); } diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h index 7bacfbc1fe..031ad3aab9 100644 --- a/cpp/src/qpid/ha/Primary.h +++ b/cpp/src/qpid/ha/Primary.h @@ -66,8 +66,6 @@ class Primary : public Role typedef boost::shared_ptr<broker::Exchange> ExchangePtr; typedef boost::shared_ptr<RemoteBackup> RemoteBackupPtr; - static Primary* get() { return instance; } - Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups); ~Primary(); @@ -128,7 +126,6 @@ class Primary : public Role boost::shared_ptr<broker::ConnectionObserver> connectionObserver; boost::shared_ptr<broker::BrokerObserver> brokerObserver; boost::intrusive_ptr<sys::TimerTask> timerTask; - static Primary* instance; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 1ede47ed60..2001ec5332 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -137,7 +137,9 @@ ReplicatingSubscription::ReplicatingSubscription( } // If there's already a guard (we are in failover) use it, else create one. - if (Primary::get()) guard = Primary::get()->getGuard(queue, info); + boost::shared_ptr<Primary> primary = + boost::dynamic_pointer_cast<Primary>(haBroker.getRole()); + if (primary) guard = primary->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); // NOTE: Once the observer is attached we can have concurrent @@ -148,19 +150,19 @@ ReplicatingSubscription::ReplicatingSubscription( // between the snapshot and attaching the observer. observer.reset(new QueueObserver(*this)); queue->addObserver(observer); - ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot(); + ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot(); std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET); - ReplicationIdSet backup; - if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr); + ReplicationIdSet backupIds; + if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr); // Initial dequeues are messages on backup but not on primary. - ReplicationIdSet initDequeues = backup - primary; + ReplicationIdSet initDequeues = backupIds - primaryIds; QueuePosition front,back; queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue { sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() dequeues += initDequeues; // Messages on backup that are not on primary. - skip = backup - initDequeues; // Messages already on the backup. + skip = backupIds - initDequeues; // Messages already on the backup. // Queue front is moving but we know this subscriptions will start at a // position >= front so if front is safe then position must be. @@ -247,7 +249,9 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { sys::Mutex::ScopedUnlock u(lock); // Notify Primary that a subscription is ready. QPID_LOG(debug, logPrefix << "Caught up"); - if (Primary::get()) Primary::get()->readyReplica(*this); + boost::shared_ptr<Primary> primary = + boost::dynamic_pointer_cast<Primary>(haBroker.getRole()); + if (primary) primary->readyReplica(*this); } } |