diff options
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 109 |
1 files changed, 54 insertions, 55 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index f7bfe6fda0..7438c95bc2 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -37,11 +37,12 @@ namespace ha { using namespace framing; using namespace broker; using namespace std; +using sys::Mutex; -const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); -const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); -const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); -const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.broker-info"); +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); +const string ReplicatingSubscription::QPID_BACK("qpid.ha-back"); +const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front"); +const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info"); namespace { const string DOLLAR("$"); @@ -61,7 +62,7 @@ class DequeueRemover } void operator()(const QueuedMessage& message) { - if (message.position >= start && message.position <= end) { + if (message.position >= start && message.position <= end) { //i.e. message is within the intial range and has not been dequeued, //so remove it from the dequeues dequeues.remove(message.position); @@ -112,6 +113,11 @@ bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) return getNext(q, 0, front); } +bool ReplicatingSubscription::isEmpty(broker::Queue& q) { + SequenceNumber front; + return getFront(q, front); +} + /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -131,9 +137,7 @@ ReplicatingSubscription::Factory::create( rs.reset(new ReplicatingSubscription( parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - boost::shared_ptr<QueueGuard> guard(new QueueGuard(*queue, rs->getBrokerInfo())); - guard->initialize(); // Must call before ReplicatingSubscription::initialize - rs->initialize(guard); + rs->initialize(); } return rs; } @@ -153,23 +157,19 @@ struct QueueRange { } QueueRange(const framing::FieldTable args) { - back = args.getAsInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER); + back = args.getAsInt(ReplicatingSubscription::QPID_BACK); front = back+1; - empty = !args.isSet(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER); + empty = !args.isSet(ReplicatingSubscription::QPID_FRONT); if (!empty) { - front = args.getAsInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER); + front = args.getAsInt(ReplicatingSubscription::QPID_FRONT); if (back < front) throw InvalidArgumentException("Invalid bounds for backup queue"); } } - - /** Consumer position to start consuming from the front */ - SequenceNumber browserStart() { return front-1; } }; ostream& operator<<(ostream& o, const QueueRange& qr) { - - if (qr.front > qr.back) return o << "empty(" << qr.back << ")"; + if (qr.front > qr.back) return o << "[-" << qr.back << "]"; else return o << "[" << qr.front << "," << qr.back << "]"; } @@ -222,14 +222,31 @@ ReplicatingSubscription::ReplicatingSubscription( // Clear the backup queue and reset to start browsing at the // front of the primary queue. if (!backup.empty) dequeues.add(backup.front, backup.back); - position = primary.browserStart(); + position = primary.front - 1; // Start consuming from front. } - QPID_LOG(debug, logPrefix << "New backup subscription " << getName() - << " backup range " << backup - << " primary range " << primary - << " position " << position - << " dequeues " << dequeues); + QPID_LOG(debug, logPrefix << "Subscribed: " + << " backup" << backup + << " primary" << primary + << " position=" << position + << " dequeues=" << dequeues); + + // Set the guard + if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo()); + if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo())); + guard->attach(*this); + + // Guard is active, dequeued can be called concurrently. + Mutex::ScopedLock l(lock); + + // Set the ready position. All messages after this position have + // been seen by the guard. + readyPosition = guard->getReadyPosition(); + if (position >= readyPosition || isEmpty(*getQueue())) + setReady(l); + else + QPID_LOG(debug, logPrefix << "Catching up from " + << position << " to " << readyPosition); } catch (const std::exception& e) { throw Exception(QPID_MSG(logPrefix << "Error setting up replication: " @@ -242,35 +259,17 @@ ReplicatingSubscription::~ReplicatingSubscription() { } // Called in subscription's connection thread when the subscription is created. -void ReplicatingSubscription::initialize(const boost::shared_ptr<QueueGuard>& g) { - sys::Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. - // Attach to the guard. - guard = g; - guard->attach(*this); +// Called separate from ctor because sending events requires +// shared_from_this +// +void ReplicatingSubscription::initialize() { + Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. // Send initial dequeues and position to the backup. // There must be a shared_ptr(this) when sending. sendDequeueEvent(l); sendPositionEvent(position, l); backupPosition = position; - - // Set the ready position. All messages after this position have - // been seen by the guard. - QueueRange range; - { - // Drop the lock, QueueRange will lock the queues message lock - // which is also locked around calls to enqueued() and dequeued() - sys::Mutex::ScopedUnlock u(lock); - range = QueueRange(*getQueue()); - } - readyPosition = range.back; - if (range.empty || position >= readyPosition) { - setReady(l); - } - else { - QPID_LOG(debug, logPrefix << "Backup subscription catching up from " - << position << " to " << readyPosition); - } } // Message is delivered in the subscription's connection thread. @@ -280,7 +279,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { if (qm.queue == getQueue().get()) { QPID_LOG(trace, logPrefix << "Replicating " << qm); { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); assert(position == qm.position); // qm.position is the position of the newly enqueued qm on local queue. // backupPosition is latest position on backup queue before enqueueing @@ -299,7 +298,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { // Deliver the message bool delivered = ConsumerImpl::deliver(qm); { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); // If we have advanced to the initial position, the backup is ready. if (qm.position >= readyPosition) setReady(l); } @@ -314,12 +313,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { } } -void ReplicatingSubscription::setReady(sys::Mutex::ScopedLock&) { +void ReplicatingSubscription::setReady(Mutex::ScopedLock&) { if (ready) return; ready = true; // Notify Primary that a subscription is ready. { - sys::Mutex::ScopedUnlock u(lock); + Mutex::ScopedUnlock u(lock); QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); if (Primary::get()) Primary::get()->readyReplica(*this); } @@ -343,7 +342,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { bool ReplicatingSubscription::hideDeletedError() { return true; } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&) +void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); @@ -353,7 +352,7 @@ void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&) dequeues.clear(); buffer.reset(); { - sys::Mutex::ScopedUnlock u(lock); + Mutex::ScopedUnlock u(lock); sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); } } @@ -367,7 +366,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) bool doComplete = false; QPID_LOG(trace, logPrefix << "Dequeued " << qm); { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); dequeues.add(qm.position); if (qm.position > position) doComplete = true; } @@ -378,7 +377,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::ScopedLock&) +void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&) { if (pos == backupPosition) return; // No need to send. QPID_LOG(trace, logPrefix << "Sending position " << pos @@ -388,7 +387,7 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex:: pos.encode(buffer); buffer.reset(); { - sys::Mutex::ScopedUnlock u(lock); + Mutex::ScopedUnlock u(lock); sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); } } @@ -428,7 +427,7 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& bool ReplicatingSubscription::doDispatch() { { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); if (!dequeues.empty()) sendDequeueEvent(l); } return ConsumerImpl::doDispatch(); |
