summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-28 18:24:06 +0000
committerAlan Conway <aconway@apache.org>2012-05-28 18:24:06 +0000
commita78cef0941374e4aa27c9025fbb3b5a43686b8fd (patch)
treef82688402704057000bfa13ce775fb8ad2f036ff /cpp/src/qpid/broker/Queue.cpp
parent2fcdb5bc17a6ae502a3af7df4ba66dd7adb79dfa (diff)
downloadqpid-python-a78cef0941374e4aa27c9025fbb3b5a43686b8fd.tar.gz
QPID-3603: Allow Queue::setPosition() to truncate the queue.
In the new HA code a backup may sometimes be ahead of the new primary after a fail-over. In that case the backup truncates it's queues to the same position as the primary so it can continue replicating. (Note the assertions added to verify setPosition showed up a minor bug in the old cluster code, which was leaving messages on the cluster update queue after an update. This patch fixes the issue.) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1343347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp85
1 files changed, 55 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 3d90490186..9df0ebf313 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -588,21 +588,51 @@ QueuedMessage Queue::get(){
return msg;
}
-bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+namespace {
+bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
+ std::deque<QueuedMessage>& collection)
{
- if (message.payload->hasExpired()) {
- expired.push_back(message);
+ if (predicate(qm)) {
+ collection.push_back(qm);
return true;
} else {
return false;
}
}
+bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
+} // namespace
+
+void Queue::dequeueIf(Messages::Predicate predicate,
+ std::deque<QueuedMessage>& dequeued)
+{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
+ }
+ if (!dequeued.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires(dequeued.size());
+ }
+ for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
+ i != dequeued.end(); ++i) {
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf() call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
+ }
+ }
+}
+
/**
*@param lapse: time since the last purgeExpired
*/
-void Queue::purgeExpired(qpid::sys::Duration lapse)
-{
+void Queue::purgeExpired(sys::Duration lapse) {
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
@@ -610,37 +640,18 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- std::deque<QueuedMessage> expired;
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
- }
-
- if (!expired.empty()) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(boost::bind(&isExpired, _1), dequeued);
+ if (dequeued.size()) {
if (mgmtObject) {
- mgmtObject->inc_acquires(expired.size());
- mgmtObject->inc_discardsTtl(expired.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(expired.size());
- brokerMgmtObject->inc_discardsTtl(expired.size());
- }
- }
-
- for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
- i != expired.end(); ++i) {
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf() call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
- }
- dequeue( 0, *i );
+ mgmtObject->inc_discardsTtl(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl(dequeued.size());
}
}
}
}
-
namespace {
// for use with purge/move below - collect messages that match a given filter
//
@@ -1661,8 +1672,22 @@ void Queue::query(qpid::types::Variant::Map& results) const
if (allocator) allocator->query(results);
}
+namespace {
+struct After {
+ framing::SequenceNumber seq;
+ After(framing::SequenceNumber s) : seq(s) {}
+ bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+};
+} // namespace
+
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
+ if (n < sequence) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(After(n), dequeued);
+ messages->setPosition(n);
+ }
sequence = n;
QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
}