diff options
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 177 |
1 files changed, 91 insertions, 86 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index c74abb6cdd..08f6fb7dcc 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -49,30 +49,38 @@ const string DOLLAR("$"); const string INTERNAL("-internal"); } // namespace -class DequeueRemover +// Scan the queue for gaps and add them to the subscriptions dequed set. +class DequeueScanner { public: - DequeueRemover( - SequenceSet& r, - const SequenceNumber& s, - const SequenceNumber& e - ) : dequeues(r), start(s), end(e) + DequeueScanner( + ReplicatingSubscription& rs, + const SequenceNumber& first_, + const SequenceNumber& last_ // Inclusive + ) : subscription(rs), first(first_), last(last_) { - dequeues.add(start, end); + assert(first <= last); + // INVARIANT no deques are needed for positions <= at. + at = first - 1; } - void operator()(const QueuedMessage& message) { - if (message.position >= start && message.position <= end) { - //i.e. message is within the intial range and has not been dequeued, - //so remove it from the dequeues - dequeues.remove(message.position); + void operator()(const QueuedMessage& qm) { + if (qm.position >= first && qm.position <= last) { + if (qm.position > at+1) + subscription.dequeued(at+1, qm.position-1); + at = qm.position; } } + // Must call after scanning the queue. + void finish() { + if (at < last) subscription.dequeued(at+1, last); + } private: - SequenceSet& dequeues; - const SequenceNumber start; - const SequenceNumber end; + ReplicatingSubscription& subscription; + SequenceNumber first; + SequenceNumber last; + SequenceNumber at; }; string mask(const string& in) @@ -113,11 +121,6 @@ bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) return getNext(q, 0, front); } -bool ReplicatingSubscription::isEmpty(broker::Queue& q) { - SequenceNumber front; - return getFront(q, front); -} - /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -149,11 +152,11 @@ struct QueueRange { QueueRange() { } - // FIXME aconway 2012-05-26: fix front calculation QueueRange(broker::Queue& q) { back = q.getPosition(); - front = back+1; + front = back+1; // Assume empty empty = !ReplicatingSubscription::getFront(q, front); + assert(empty || front <= back); } QueueRange(const framing::FieldTable args) { @@ -163,7 +166,8 @@ struct QueueRange { if (!empty) { front = args.getAsInt(ReplicatingSubscription::QPID_FRONT); if (back < front) - throw InvalidArgumentException("Invalid bounds for backup queue"); + throw InvalidArgumentException( + QPID_MSG("Invalid range [" << front << "," << back <<"]")); } } }; @@ -192,68 +196,72 @@ ReplicatingSubscription::ReplicatingSubscription( try { FieldTable ft; if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) - throw Exception("Replicating subscription does not have broker info"); + throw Exception("Replicating subscription does not have broker info: " + tag); info.assign(ft); // Set a log prefix message that identifies the remote broker. ostringstream os; - os << "HA primary replicate " << queue->getName() << "@" << info.getLogId() << ": "; + os << "HA primary replica " << queue->getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); - QueueRange primary(*queue); - QueueRange backup(arguments); + // FIXME aconway 2012-06-10: unsafe to rely in queue front or position they are changing? + + QueueRange primary(*queue); // The local primary queue. + QueueRange backup(arguments); // The remote backup state. backupPosition = backup.back; + + // NOTE: Once the guard is attached we can have concurrent + // calles to dequeued so we need to lock use of this->deques. + // + + // However we must attach the guard _before_ we scan for + // initial dequeues to be sure we don't miss any dequeues + // between the scan and attaching the guard. + if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo()); + if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo())); + guard->attach(*this); + // We can re-use some backup messages if backup and primary queues // overlap and the backup is not missing messages at the front of the queue. - if (!primary.empty && // Primary not empty + // FIXME aconway 2012-06-10: disable re-use of backup queue till stall problem is solved. + /* if (!primary.empty && // Primary not empty !backup.empty && // Backup not empty primary.front >= backup.front && // Not missing messages at the front primary.front <= backup.back // Overlap ) { - // Remove messages that are still on the primary queue from dequeues - // FIXME aconway 2012-05-22: optimize to iterate only the relevant - // section of the queue - DequeueRemover remover(dequeues, backup.front, backup.back); - queue->eachMessage(remover); - position = std::min(primary.back, backup.back); + // Scan primary queue for gaps that should be dequeued on the backup. + // NOTE: this runs concurrently with the guard calling dequeued() + // FIXME aconway 2012-05-22: optimize queue iteration + DequeueScanner scan(*this, backup.front, backup.back); + queue->eachMessage(scan); + scan.finish(); + // If the backup was ahead it has been pruned back to the primary. + position = std::min(guard->getFirstSafe(), backup.back); } - else { + else */ { // Clear the backup queue and reset to start browsing at the // front of the primary queue. - if (!backup.empty) dequeues.add(backup.front, backup.back); + if (!backup.empty) dequeued(backup.front, backup.back); position = primary.front - 1; // Start consuming from front. - } QPID_LOG(debug, logPrefix << "Subscribed: " - << " backup" << backup - << " primary" << primary - << " position=" << position - << " dequeues=" << dequeues); - - // Set the guard - if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo()); - if (!guard) { - QPID_LOG(debug, logPrefix << "No pre-set guard found, creating one."); - guard.reset(new QueueGuard(*queue, getBrokerInfo())); - } - guard->attach(*this); - - // Guard is active, dequeued can be called concurrently. - Mutex::ScopedLock l(lock); - - // Set the ready position. All messages after this position have - // been seen by the guard. - readyPosition = guard->getReadyPosition(); - if (position >= readyPosition || isEmpty(*getQueue())) - setReady(l); + << " backup:" << backup + << " backup position:" << backupPosition + << " primary:" << primary + << " position:" << position + ); + + // Are we ready yet? + if (position+1 >= guard->getFirstSafe()) // Next message will be safe. + setReady(); else QPID_LOG(debug, logPrefix << "Catching up from " - << position << " to " << readyPosition); + << position << " to " << guard->getFirstSafe()); } catch (const std::exception& e) { - throw Exception(QPID_MSG(logPrefix << "Error setting up replication: " - << e.what())); + throw InvalidArgumentException(QPID_MSG(logPrefix << e.what() + << ": arguments=" << arguments)); } } @@ -295,16 +303,13 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { // Send the position before qm was enqueued. sendPositionEvent(qm.position-1, l); } - // Backup will automaticall advance by 1 on delivery of message. + // Backup will automatically advance by 1 on delivery of message. backupPosition = qm.position; } // Deliver the message bool delivered = ConsumerImpl::deliver(qm); - { - Mutex::ScopedLock l(lock); - // If we have advanced to the initial position, the backup is ready. - if (qm.position >= readyPosition) setReady(l); - } + // If we have advanced past the initial position, the backup is ready. + if (qm.position >= guard->getFirstSafe()) setReady(); return delivered; } else @@ -316,15 +321,15 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { } } -void ReplicatingSubscription::setReady(Mutex::ScopedLock&) { - if (ready) return; - ready = true; - // Notify Primary that a subscription is ready. +void ReplicatingSubscription::setReady() { { - Mutex::ScopedUnlock u(lock); - QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); - if (Primary::get()) Primary::get()->readyReplica(*this); + Mutex::ScopedLock l(lock); + if (ready) return; + ready = true; } + // Notify Primary that a subscription is ready. + QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); + if (Primary::get()) Primary::get()->readyReplica(*this); } // Called in the subscription's connection thread. @@ -341,12 +346,9 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) { QPID_LOG(trace, logPrefix << "Acknowledged " << qm); guard->complete(qm); } + ConsumerImpl::acknowledged(qm); } -// Hide the "queue deleted" error for a ReplicatingSubscription when a -// queue is deleted, this is normal and not an error. -bool ReplicatingSubscription::hideDeletedError() { return true; } - // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) { @@ -370,25 +372,28 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { assert (qm.queue == getQueue().get()); - bool doComplete = false; + QPID_LOG(trace, logPrefix << "Dequeued " << qm); { Mutex::ScopedLock l(lock); - assert(!dequeues.contains(qm.position)); dequeues.add(qm.position); - // If we have not yet sent this message to the backup, then - // complete it now as it will never be accepted. - if (qm.position > position) doComplete = true; } - if (doComplete) guard->complete(qm); notify(); // Ensure a call to doDispatch } +// Called during construction while scanning for initial dequeues. +void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) { + QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]"); + { + Mutex::ScopedLock l(lock); + dequeues.add(first,last); + } +} + // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&) { if (pos == backupPosition) return; // No need to send. - QPID_LOG(trace, logPrefix << "Sending position " << pos - << ", was " << backupPosition); + QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition); string buf(pos.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); pos.encode(buffer); |
