summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-01 20:27:05 +0000
committerAlan Conway <aconway@apache.org>2013-08-01 20:27:05 +0000
commit9bf9a96c336635b415235dcaaf9524d64f1504f0 (patch)
tree819256b98024e2f024db67a1d19aa4cb6c250050
parent72cd95abd9ab2f59c164d60e8f5b0c43cb0b2c0c (diff)
downloadqpid-python-9bf9a96c336635b415235dcaaf9524d64f1504f0.tar.gz
QPID-4327: HA get rid of Primary::get() singleton.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509422 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/ha/Primary.cpp4
-rw-r--r--cpp/src/qpid/ha/Primary.h3
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp18
3 files changed, 11 insertions, 14 deletions
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 38e75e7818..bae651a3fc 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -82,8 +82,6 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
} // namespace
-Primary* Primary::instance = 0;
-
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
@@ -92,8 +90,6 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
hb.getMembership().setStatus(RECOVERING);
broker::QueueRegistry& queues = hb.getBroker().getQueues();
queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
- assert(instance == 0);
- instance = this; // Let queue replicators find us.
if (expect.empty()) {
QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups.");
}
diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h
index 7bacfbc1fe..031ad3aab9 100644
--- a/cpp/src/qpid/ha/Primary.h
+++ b/cpp/src/qpid/ha/Primary.h
@@ -66,8 +66,6 @@ class Primary : public Role
typedef boost::shared_ptr<broker::Exchange> ExchangePtr;
typedef boost::shared_ptr<RemoteBackup> RemoteBackupPtr;
- static Primary* get() { return instance; }
-
Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
~Primary();
@@ -128,7 +126,6 @@ class Primary : public Role
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::BrokerObserver> brokerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
- static Primary* instance;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 1ede47ed60..2001ec5332 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -137,7 +137,9 @@ ReplicatingSubscription::ReplicatingSubscription(
}
// If there's already a guard (we are in failover) use it, else create one.
- if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
+ boost::shared_ptr<Primary> primary =
+ boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
+ if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
// NOTE: Once the observer is attached we can have concurrent
@@ -148,19 +150,19 @@ ReplicatingSubscription::ReplicatingSubscription(
// between the snapshot and attaching the observer.
observer.reset(new QueueObserver(*this));
queue->addObserver(observer);
- ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot();
+ ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot();
std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
- ReplicationIdSet backup;
- if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr);
+ ReplicationIdSet backupIds;
+ if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
// Initial dequeues are messages on backup but not on primary.
- ReplicationIdSet initDequeues = backup - primary;
+ ReplicationIdSet initDequeues = backupIds - primaryIds;
QueuePosition front,back;
queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
{
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
- skip = backup - initDequeues; // Messages already on the backup.
+ skip = backupIds - initDequeues; // Messages already on the backup.
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
@@ -247,7 +249,9 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
QPID_LOG(debug, logPrefix << "Caught up");
- if (Primary::get()) Primary::get()->readyReplica(*this);
+ boost::shared_ptr<Primary> primary =
+ boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
+ if (primary) primary->readyReplica(*this);
}
}