diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:06 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:06 +0000 |
| commit | a78cef0941374e4aa27c9025fbb3b5a43686b8fd (patch) | |
| tree | f82688402704057000bfa13ce775fb8ad2f036ff /cpp/src/qpid/broker/Queue.cpp | |
| parent | 2fcdb5bc17a6ae502a3af7df4ba66dd7adb79dfa (diff) | |
| download | qpid-python-a78cef0941374e4aa27c9025fbb3b5a43686b8fd.tar.gz | |
QPID-3603: Allow Queue::setPosition() to truncate the queue.
In the new HA code a backup may sometimes be ahead of the new primary after a
fail-over. In that case the backup truncates it's queues to the same position
as the primary so it can continue replicating.
(Note the assertions added to verify setPosition showed up a minor bug in the
old cluster code, which was leaving messages on the cluster update queue after
an update. This patch fixes the issue.)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1343347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 85 |
1 files changed, 55 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 3d90490186..9df0ebf313 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -588,21 +588,51 @@ QueuedMessage Queue::get(){ return msg; } -bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message) +namespace { +bool collectIf(QueuedMessage& qm, Messages::Predicate predicate, + std::deque<QueuedMessage>& collection) { - if (message.payload->hasExpired()) { - expired.push_back(message); + if (predicate(qm)) { + collection.push_back(qm); return true; } else { return false; } } +bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); } +} // namespace + +void Queue::dequeueIf(Messages::Predicate predicate, + std::deque<QueuedMessage>& dequeued) +{ + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued))); + } + if (!dequeued.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(dequeued.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(dequeued.size()); + } + for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin(); + i != dequeued.end(); ++i) { + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf() call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); + } + } +} + /** *@param lapse: time since the last purgeExpired */ -void Queue::purgeExpired(qpid::sys::Duration lapse) -{ +void Queue::purgeExpired(sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. @@ -610,37 +640,18 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { - std::deque<QueuedMessage> expired; - { - Mutex::ScopedLock locker(messageLock); - messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); - } - - if (!expired.empty()) { + std::deque<QueuedMessage> dequeued; + dequeueIf(boost::bind(&isExpired, _1), dequeued); + if (dequeued.size()) { if (mgmtObject) { - mgmtObject->inc_acquires(expired.size()); - mgmtObject->inc_discardsTtl(expired.size()); - if (brokerMgmtObject) { - brokerMgmtObject->inc_acquires(expired.size()); - brokerMgmtObject->inc_discardsTtl(expired.size()); - } - } - - for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); - i != expired.end(); ++i) { - { - // KAG: should be safe to retake lock after the removeIf, since - // no other thread can touch these messages after the removeIf() call - Mutex::ScopedLock locker(messageLock); - observeAcquire(*i, locker); - } - dequeue( 0, *i ); + mgmtObject->inc_discardsTtl(dequeued.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(dequeued.size()); } } } } - namespace { // for use with purge/move below - collect messages that match a given filter // @@ -1661,8 +1672,22 @@ void Queue::query(qpid::types::Variant::Map& results) const if (allocator) allocator->query(results); } +namespace { +struct After { + framing::SequenceNumber seq; + After(framing::SequenceNumber s) : seq(s) {} + bool operator()(const QueuedMessage& qm) { return qm.position > seq; } +}; +} // namespace + + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); + if (n < sequence) { + std::deque<QueuedMessage> dequeued; + dequeueIf(After(n), dequeued); + messages->setPosition(n); + } sequence = n; QPID_LOG(trace, "Set position to " << sequence << " on " << getName()); } |
