diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:06 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:06 +0000 |
| commit | a78cef0941374e4aa27c9025fbb3b5a43686b8fd (patch) | |
| tree | f82688402704057000bfa13ce775fb8ad2f036ff /cpp/src/qpid | |
| parent | 2fcdb5bc17a6ae502a3af7df4ba66dd7adb79dfa (diff) | |
| download | qpid-python-a78cef0941374e4aa27c9025fbb3b5a43686b8fd.tar.gz | |
QPID-3603: Allow Queue::setPosition() to truncate the queue.
In the new HA code a backup may sometimes be ahead of the new primary after a
fail-over. In that case the backup truncates it's queues to the same position
as the primary so it can continue replicating.
(Note the assertions added to verify setPosition showed up a minor bug in the
old cluster code, which was leaving messages on the cluster update queue after
an update. This patch fixes the issue.)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1343347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/MessageDeque.cpp | 20 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageDeque.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageMap.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageMap.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Messages.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PriorityQueue.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PriorityQueue.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 85 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 1 |
10 files changed, 116 insertions, 38 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp index f70c996975..474e4139bd 100644 --- a/cpp/src/qpid/broker/MessageDeque.cpp +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -173,6 +173,26 @@ void MessageDeque::updateAcquired(const QueuedMessage& acquired) } } +namespace { +bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; } +} // namespace + +void MessageDeque::setPosition(const framing::SequenceNumber& n) { + size_t i = index(n+1); + if (i >= messages.size()) return; // Nothing to do. + + // Assertion to verify the precondition: no messaages after n. + assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) == + messages.end()); + messages.erase(messages.begin()+i, messages.end()); + if (head >= messages.size()) head = messages.size() - 1; + // Re-count the available messages + available = 0; + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE) ++available; + } +} + void MessageDeque::clean() { while (messages.size() && messages.front().status == QueuedMessage::DELETED) { diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h index 9b53716d4e..c5670b2a72 100644 --- a/cpp/src/qpid/broker/MessageDeque.h +++ b/cpp/src/qpid/broker/MessageDeque.h @@ -44,7 +44,7 @@ class MessageDeque : public Messages bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void updateAcquired(const QueuedMessage& acquired); - + void setPosition(const framing::SequenceNumber&); void foreach(Functor); void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp index 9b164d4e5c..d6702a9336 100644 --- a/cpp/src/qpid/broker/MessageMap.cpp +++ b/cpp/src/qpid/broker/MessageMap.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/MessageMap.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/log/Statement.h" +#include <algorithm> namespace qpid { namespace broker { @@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) QueuedMessage& a = messages[added.position]; a = added; a.status = QueuedMessage::AVAILABLE; - QPID_LOG(debug, "Added message at " << a.position); + QPID_LOG(debug, "Added message " << a); return false; } else { //there is already a message with that key which needs to be replaced removed = result.first->second; result.first->second = replace(result.first->second, added); result.first->second.status = QueuedMessage::AVAILABLE; - QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first); + QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first); return true; } } +void MessageMap::setPosition(const framing::SequenceNumber& seq) { + // Nothing to do, just assert that the precondition is respected and there + // are no undeleted messages after seq. + assert(messages.empty() || (--messages.end())->first <= seq); +} + void MessageMap::foreach(Functor f) { for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h index a668450250..1f0481cb6b 100644 --- a/cpp/src/qpid/broker/MessageMap.h +++ b/cpp/src/qpid/broker/MessageMap.h @@ -6,7 +6,7 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file +o * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at @@ -50,6 +50,7 @@ class MessageMap : public Messages virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); bool consume(QueuedMessage&); virtual bool push(const QueuedMessage& added, QueuedMessage& removed); + void setPosition(const framing::SequenceNumber&); void foreach(Functor); virtual void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h index 61e9fa110a..45f5e6cd81 100644 --- a/cpp/src/qpid/broker/Messages.h +++ b/cpp/src/qpid/broker/Messages.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "qpid/framing/SequenceNumber.h" #include <boost/function.hpp> namespace qpid { @@ -101,14 +102,22 @@ class Messages virtual void updateAcquired(const QueuedMessage&) { } /** + * Set the position of the back of the queue. Next message enqueued will be n+1. + *@pre Any messages with seq > n must already be dequeued. + */ + virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0; + + /** * Apply, the functor to each message held */ + virtual void foreach(Functor) = 0; /** * Remove every message held that for which the specified * predicate returns true */ virtual void removeIf(Predicate) = 0; + private: }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp index ab5ec7235a..9a0fead744 100644 --- a/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/cpp/src/qpid/broker/PriorityQueue.cpp @@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const QueuedMessage& acquired) { fifo.updateAcquired(acquired); } +void PriorityQueue::setPosition(const framing::SequenceNumber& n) { + fifo.setPosition(n); +} + void PriorityQueue::foreach(Functor f) { fifo.foreach(f); diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h index 8628745db1..301367358b 100644 --- a/cpp/src/qpid/broker/PriorityQueue.h +++ b/cpp/src/qpid/broker/PriorityQueue.h @@ -52,6 +52,7 @@ class PriorityQueue : public Messages bool consume(QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void updateAcquired(const QueuedMessage& acquired); + void setPosition(const framing::SequenceNumber&); void foreach(Functor); void removeIf(Predicate); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 3d90490186..9df0ebf313 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -588,21 +588,51 @@ QueuedMessage Queue::get(){ return msg; } -bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message) +namespace { +bool collectIf(QueuedMessage& qm, Messages::Predicate predicate, + std::deque<QueuedMessage>& collection) { - if (message.payload->hasExpired()) { - expired.push_back(message); + if (predicate(qm)) { + collection.push_back(qm); return true; } else { return false; } } +bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); } +} // namespace + +void Queue::dequeueIf(Messages::Predicate predicate, + std::deque<QueuedMessage>& dequeued) +{ + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued))); + } + if (!dequeued.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(dequeued.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(dequeued.size()); + } + for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin(); + i != dequeued.end(); ++i) { + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf() call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); + } + } +} + /** *@param lapse: time since the last purgeExpired */ -void Queue::purgeExpired(qpid::sys::Duration lapse) -{ +void Queue::purgeExpired(sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. @@ -610,37 +640,18 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { - std::deque<QueuedMessage> expired; - { - Mutex::ScopedLock locker(messageLock); - messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); - } - - if (!expired.empty()) { + std::deque<QueuedMessage> dequeued; + dequeueIf(boost::bind(&isExpired, _1), dequeued); + if (dequeued.size()) { if (mgmtObject) { - mgmtObject->inc_acquires(expired.size()); - mgmtObject->inc_discardsTtl(expired.size()); - if (brokerMgmtObject) { - brokerMgmtObject->inc_acquires(expired.size()); - brokerMgmtObject->inc_discardsTtl(expired.size()); - } - } - - for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); - i != expired.end(); ++i) { - { - // KAG: should be safe to retake lock after the removeIf, since - // no other thread can touch these messages after the removeIf() call - Mutex::ScopedLock locker(messageLock); - observeAcquire(*i, locker); - } - dequeue( 0, *i ); + mgmtObject->inc_discardsTtl(dequeued.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(dequeued.size()); } } } } - namespace { // for use with purge/move below - collect messages that match a given filter // @@ -1661,8 +1672,22 @@ void Queue::query(qpid::types::Variant::Map& results) const if (allocator) allocator->query(results); } +namespace { +struct After { + framing::SequenceNumber seq; + After(framing::SequenceNumber s) : seq(s) {} + bool operator()(const QueuedMessage& qm) { return qm.position > seq; } +}; +} // namespace + + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); + if (n < sequence) { + std::deque<QueuedMessage> dequeued; + dequeueIf(After(n), dequeued); + messages->setPosition(n); + } sequence = n; QPID_LOG(trace, "Set position to " << sequence << " on " << getName()); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 9869a698c1..ed1f63504b 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -175,6 +175,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void configureImpl(const qpid::framing::FieldTable& settings); void checkNotDeleted(const Consumer::shared_ptr& c); void notifyDeleted(); + void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued); public: @@ -375,12 +376,21 @@ class Queue : public boost::enable_shared_from_this<Queue>, std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); } - /** Set the position sequence number for the next message on the queue. - * Must be >= the current sequence number. - * Used by cluster to replicate queues. + /** + * Set the sequence number for the back of the queue, the + * next message enqueued will be pos+1. + * If pos > getPosition() this creates a gap in the sequence numbers. + * if pos < getPosition() the back of the queue is reset to pos, + * + * The _caller_ must ensure that any messages after pos have been dequeued. + * + * Used by HA/cluster code for queue replication. */ QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos); - /** return current position sequence number for the next message on the queue. + + /** + *@return sequence number for the back of the queue. The next message pushed + * will be at getPosition+1 */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 3754e791f9..081d54ab49 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -524,6 +524,7 @@ broker::QueuedMessage Connection::getUpdateMessage() { boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); assert(!updateq->isDurable()); broker::QueuedMessage m = updateq->get(); + updateq->dequeue(0, m); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } |
