summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-28 18:24:06 +0000
committerAlan Conway <aconway@apache.org>2012-05-28 18:24:06 +0000
commita78cef0941374e4aa27c9025fbb3b5a43686b8fd (patch)
treef82688402704057000bfa13ce775fb8ad2f036ff /cpp/src/qpid
parent2fcdb5bc17a6ae502a3af7df4ba66dd7adb79dfa (diff)
downloadqpid-python-a78cef0941374e4aa27c9025fbb3b5a43686b8fd.tar.gz
QPID-3603: Allow Queue::setPosition() to truncate the queue.
In the new HA code a backup may sometimes be ahead of the new primary after a fail-over. In that case the backup truncates it's queues to the same position as the primary so it can continue replicating. (Note the assertions added to verify setPosition showed up a minor bug in the old cluster code, which was leaving messages on the cluster update queue after an update. This patch fixes the issue.) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1343347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/MessageDeque.cpp20
-rw-r--r--cpp/src/qpid/broker/MessageDeque.h2
-rw-r--r--cpp/src/qpid/broker/MessageMap.cpp11
-rw-r--r--cpp/src/qpid/broker/MessageMap.h3
-rw-r--r--cpp/src/qpid/broker/Messages.h9
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.cpp4
-rw-r--r--cpp/src/qpid/broker/PriorityQueue.h1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp85
-rw-r--r--cpp/src/qpid/broker/Queue.h18
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp1
10 files changed, 116 insertions, 38 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp
index f70c996975..474e4139bd 100644
--- a/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/cpp/src/qpid/broker/MessageDeque.cpp
@@ -173,6 +173,26 @@ void MessageDeque::updateAcquired(const QueuedMessage& acquired)
}
}
+namespace {
+bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
+} // namespace
+
+void MessageDeque::setPosition(const framing::SequenceNumber& n) {
+ size_t i = index(n+1);
+ if (i >= messages.size()) return; // Nothing to do.
+
+ // Assertion to verify the precondition: no messaages after n.
+ assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
+ messages.end());
+ messages.erase(messages.begin()+i, messages.end());
+ if (head >= messages.size()) head = messages.size() - 1;
+ // Re-count the available messages
+ available = 0;
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE) ++available;
+ }
+}
+
void MessageDeque::clean()
{
while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
diff --git a/cpp/src/qpid/broker/MessageDeque.h b/cpp/src/qpid/broker/MessageDeque.h
index 9b53716d4e..c5670b2a72 100644
--- a/cpp/src/qpid/broker/MessageDeque.h
+++ b/cpp/src/qpid/broker/MessageDeque.h
@@ -44,7 +44,7 @@ class MessageDeque : public Messages
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void updateAcquired(const QueuedMessage& acquired);
-
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp
index 9b164d4e5c..d6702a9336 100644
--- a/cpp/src/qpid/broker/MessageMap.cpp
+++ b/cpp/src/qpid/broker/MessageMap.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/MessageMap.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
QueuedMessage& a = messages[added.position];
a = added;
a.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Added message at " << a.position);
+ QPID_LOG(debug, "Added message " << a);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
result.first->second.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
+ QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
return true;
}
}
+void MessageMap::setPosition(const framing::SequenceNumber& seq) {
+ // Nothing to do, just assert that the precondition is respected and there
+ // are no undeleted messages after seq.
+ assert(messages.empty() || (--messages.end())->first <= seq);
+}
+
void MessageMap::foreach(Functor f)
{
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
diff --git a/cpp/src/qpid/broker/MessageMap.h b/cpp/src/qpid/broker/MessageMap.h
index a668450250..1f0481cb6b 100644
--- a/cpp/src/qpid/broker/MessageMap.h
+++ b/cpp/src/qpid/broker/MessageMap.h
@@ -6,7 +6,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+o * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -50,6 +50,7 @@ class MessageMap : public Messages
virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool consume(QueuedMessage&);
virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
virtual void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h
index 61e9fa110a..45f5e6cd81 100644
--- a/cpp/src/qpid/broker/Messages.h
+++ b/cpp/src/qpid/broker/Messages.h
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
namespace qpid {
@@ -101,14 +102,22 @@ class Messages
virtual void updateAcquired(const QueuedMessage&) { }
/**
+ * Set the position of the back of the queue. Next message enqueued will be n+1.
+ *@pre Any messages with seq > n must already be dequeued.
+ */
+ virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
+
+ /**
* Apply, the functor to each message held
*/
+
virtual void foreach(Functor) = 0;
/**
* Remove every message held that for which the specified
* predicate returns true
*/
virtual void removeIf(Predicate) = 0;
+
private:
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PriorityQueue.cpp b/cpp/src/qpid/broker/PriorityQueue.cpp
index ab5ec7235a..9a0fead744 100644
--- a/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const QueuedMessage& acquired) {
fifo.updateAcquired(acquired);
}
+void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
+ fifo.setPosition(n);
+}
+
void PriorityQueue::foreach(Functor f)
{
fifo.foreach(f);
diff --git a/cpp/src/qpid/broker/PriorityQueue.h b/cpp/src/qpid/broker/PriorityQueue.h
index 8628745db1..301367358b 100644
--- a/cpp/src/qpid/broker/PriorityQueue.h
+++ b/cpp/src/qpid/broker/PriorityQueue.h
@@ -52,6 +52,7 @@ class PriorityQueue : public Messages
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void updateAcquired(const QueuedMessage& acquired);
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
void removeIf(Predicate);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 3d90490186..9df0ebf313 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -588,21 +588,51 @@ QueuedMessage Queue::get(){
return msg;
}
-bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+namespace {
+bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
+ std::deque<QueuedMessage>& collection)
{
- if (message.payload->hasExpired()) {
- expired.push_back(message);
+ if (predicate(qm)) {
+ collection.push_back(qm);
return true;
} else {
return false;
}
}
+bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
+} // namespace
+
+void Queue::dequeueIf(Messages::Predicate predicate,
+ std::deque<QueuedMessage>& dequeued)
+{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
+ }
+ if (!dequeued.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires(dequeued.size());
+ }
+ for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
+ i != dequeued.end(); ++i) {
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf() call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
+ }
+ }
+}
+
/**
*@param lapse: time since the last purgeExpired
*/
-void Queue::purgeExpired(qpid::sys::Duration lapse)
-{
+void Queue::purgeExpired(sys::Duration lapse) {
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
@@ -610,37 +640,18 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- std::deque<QueuedMessage> expired;
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
- }
-
- if (!expired.empty()) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(boost::bind(&isExpired, _1), dequeued);
+ if (dequeued.size()) {
if (mgmtObject) {
- mgmtObject->inc_acquires(expired.size());
- mgmtObject->inc_discardsTtl(expired.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(expired.size());
- brokerMgmtObject->inc_discardsTtl(expired.size());
- }
- }
-
- for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
- i != expired.end(); ++i) {
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf() call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
- }
- dequeue( 0, *i );
+ mgmtObject->inc_discardsTtl(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl(dequeued.size());
}
}
}
}
-
namespace {
// for use with purge/move below - collect messages that match a given filter
//
@@ -1661,8 +1672,22 @@ void Queue::query(qpid::types::Variant::Map& results) const
if (allocator) allocator->query(results);
}
+namespace {
+struct After {
+ framing::SequenceNumber seq;
+ After(framing::SequenceNumber s) : seq(s) {}
+ bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+};
+} // namespace
+
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
+ if (n < sequence) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(After(n), dequeued);
+ messages->setPosition(n);
+ }
sequence = n;
QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 9869a698c1..ed1f63504b 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -175,6 +175,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void configureImpl(const qpid::framing::FieldTable& settings);
void checkNotDeleted(const Consumer::shared_ptr& c);
void notifyDeleted();
+ void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued);
public:
@@ -375,12 +376,21 @@ class Queue : public boost::enable_shared_from_this<Queue>,
std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
}
- /** Set the position sequence number for the next message on the queue.
- * Must be >= the current sequence number.
- * Used by cluster to replicate queues.
+ /**
+ * Set the sequence number for the back of the queue, the
+ * next message enqueued will be pos+1.
+ * If pos > getPosition() this creates a gap in the sequence numbers.
+ * if pos < getPosition() the back of the queue is reset to pos,
+ *
+ * The _caller_ must ensure that any messages after pos have been dequeued.
+ *
+ * Used by HA/cluster code for queue replication.
*/
QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
- /** return current position sequence number for the next message on the queue.
+
+ /**
+ *@return sequence number for the back of the queue. The next message pushed
+ * will be at getPosition+1
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 3754e791f9..081d54ab49 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -524,6 +524,7 @@ broker::QueuedMessage Connection::getUpdateMessage() {
boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
assert(!updateq->isDurable());
broker::QueuedMessage m = updateq->get();
+ updateq->dequeue(0, m);
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}