summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp109
1 files changed, 54 insertions, 55 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index f7bfe6fda0..7438c95bc2 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -37,11 +37,12 @@ namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
+using sys::Mutex;
-const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
-const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
-const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
-const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.broker-info");
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
+const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
+const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
+const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
namespace {
const string DOLLAR("$");
@@ -61,7 +62,7 @@ class DequeueRemover
}
void operator()(const QueuedMessage& message) {
- if (message.position >= start && message.position <= end) {
+ if (message.position >= start && message.position <= end) {
//i.e. message is within the intial range and has not been dequeued,
//so remove it from the dequeues
dequeues.remove(message.position);
@@ -112,6 +113,11 @@ bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front)
return getNext(q, 0, front);
}
+bool ReplicatingSubscription::isEmpty(broker::Queue& q) {
+ SequenceNumber front;
+ return getFront(q, front);
+}
+
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -131,9 +137,7 @@ ReplicatingSubscription::Factory::create(
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
- boost::shared_ptr<QueueGuard> guard(new QueueGuard(*queue, rs->getBrokerInfo()));
- guard->initialize(); // Must call before ReplicatingSubscription::initialize
- rs->initialize(guard);
+ rs->initialize();
}
return rs;
}
@@ -153,23 +157,19 @@ struct QueueRange {
}
QueueRange(const framing::FieldTable args) {
- back = args.getAsInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER);
+ back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
front = back+1;
- empty = !args.isSet(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER);
+ empty = !args.isSet(ReplicatingSubscription::QPID_FRONT);
if (!empty) {
- front = args.getAsInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER);
+ front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
if (back < front)
throw InvalidArgumentException("Invalid bounds for backup queue");
}
}
-
- /** Consumer position to start consuming from the front */
- SequenceNumber browserStart() { return front-1; }
};
ostream& operator<<(ostream& o, const QueueRange& qr) {
-
- if (qr.front > qr.back) return o << "empty(" << qr.back << ")";
+ if (qr.front > qr.back) return o << "[-" << qr.back << "]";
else return o << "[" << qr.front << "," << qr.back << "]";
}
@@ -222,14 +222,31 @@ ReplicatingSubscription::ReplicatingSubscription(
// Clear the backup queue and reset to start browsing at the
// front of the primary queue.
if (!backup.empty) dequeues.add(backup.front, backup.back);
- position = primary.browserStart();
+ position = primary.front - 1; // Start consuming from front.
}
- QPID_LOG(debug, logPrefix << "New backup subscription " << getName()
- << " backup range " << backup
- << " primary range " << primary
- << " position " << position
- << " dequeues " << dequeues);
+ QPID_LOG(debug, logPrefix << "Subscribed: "
+ << " backup" << backup
+ << " primary" << primary
+ << " position=" << position
+ << " dequeues=" << dequeues);
+
+ // Set the guard
+ if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
+ if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ guard->attach(*this);
+
+ // Guard is active, dequeued can be called concurrently.
+ Mutex::ScopedLock l(lock);
+
+ // Set the ready position. All messages after this position have
+ // been seen by the guard.
+ readyPosition = guard->getReadyPosition();
+ if (position >= readyPosition || isEmpty(*getQueue()))
+ setReady(l);
+ else
+ QPID_LOG(debug, logPrefix << "Catching up from "
+ << position << " to " << readyPosition);
}
catch (const std::exception& e) {
throw Exception(QPID_MSG(logPrefix << "Error setting up replication: "
@@ -242,35 +259,17 @@ ReplicatingSubscription::~ReplicatingSubscription() {
}
// Called in subscription's connection thread when the subscription is created.
-void ReplicatingSubscription::initialize(const boost::shared_ptr<QueueGuard>& g) {
- sys::Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
- // Attach to the guard.
- guard = g;
- guard->attach(*this);
+// Called separate from ctor because sending events requires
+// shared_from_this
+//
+void ReplicatingSubscription::initialize() {
+ Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues and position to the backup.
// There must be a shared_ptr(this) when sending.
sendDequeueEvent(l);
sendPositionEvent(position, l);
backupPosition = position;
-
- // Set the ready position. All messages after this position have
- // been seen by the guard.
- QueueRange range;
- {
- // Drop the lock, QueueRange will lock the queues message lock
- // which is also locked around calls to enqueued() and dequeued()
- sys::Mutex::ScopedUnlock u(lock);
- range = QueueRange(*getQueue());
- }
- readyPosition = range.back;
- if (range.empty || position >= readyPosition) {
- setReady(l);
- }
- else {
- QPID_LOG(debug, logPrefix << "Backup subscription catching up from "
- << position << " to " << readyPosition);
- }
}
// Message is delivered in the subscription's connection thread.
@@ -280,7 +279,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
if (qm.queue == getQueue().get()) {
QPID_LOG(trace, logPrefix << "Replicating " << qm);
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
assert(position == qm.position);
// qm.position is the position of the newly enqueued qm on local queue.
// backupPosition is latest position on backup queue before enqueueing
@@ -299,7 +298,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
// Deliver the message
bool delivered = ConsumerImpl::deliver(qm);
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
// If we have advanced to the initial position, the backup is ready.
if (qm.position >= readyPosition) setReady(l);
}
@@ -314,12 +313,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
}
-void ReplicatingSubscription::setReady(sys::Mutex::ScopedLock&) {
+void ReplicatingSubscription::setReady(Mutex::ScopedLock&) {
if (ready) return;
ready = true;
// Notify Primary that a subscription is ready.
{
- sys::Mutex::ScopedUnlock u(lock);
+ Mutex::ScopedUnlock u(lock);
QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
if (Primary::get()) Primary::get()->readyReplica(*this);
}
@@ -343,7 +342,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
@@ -353,7 +352,7 @@ void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&)
dequeues.clear();
buffer.reset();
{
- sys::Mutex::ScopedUnlock u(lock);
+ Mutex::ScopedUnlock u(lock);
sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
}
}
@@ -367,7 +366,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
bool doComplete = false;
QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
if (qm.position > position) doComplete = true;
}
@@ -378,7 +377,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
{
if (pos == backupPosition) return; // No need to send.
QPID_LOG(trace, logPrefix << "Sending position " << pos
@@ -388,7 +387,7 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::
pos.encode(buffer);
buffer.reset();
{
- sys::Mutex::ScopedUnlock u(lock);
+ Mutex::ScopedUnlock u(lock);
sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
}
}
@@ -428,7 +427,7 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
bool ReplicatingSubscription::doDispatch()
{
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
}
return ConsumerImpl::doDispatch();