summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageDeque.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageDeque.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageDeque.cpp207
1 files changed, 30 insertions, 177 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp
index 83c8ca6868..1529d4ac94 100644
--- a/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/cpp/src/qpid/broker/MessageDeque.cpp
@@ -19,218 +19,71 @@
*
*/
#include "qpid/broker/MessageDeque.h"
-#include "qpid/broker/QueuedMessage.h"
-#include "qpid/log/Statement.h"
#include "assert.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
-
-MessageDeque::MessageDeque() : available(0), head(0) {}
-
-size_t MessageDeque::index(const framing::SequenceNumber& position)
-{
- //assuming a monotonic sequence, with no messages removed except
- //from the ends of the deque, we can use the position to determin
- //an index into the deque
- if (messages.empty() || position < messages.front().position) return 0;
- return position - messages.front().position;
+namespace {
+Message padding(qpid::framing::SequenceNumber id) {
+ Message m;
+ m.setState(DELETED);
+ m.setSequence(id);
+ return m;
}
-
-bool MessageDeque::deleted(const QueuedMessage& m)
-{
- size_t i = index(m.position);
- if (i < messages.size()) {
- QueuedMessage *qm = &messages[i];
- if (qm->status != QueuedMessage::DELETED) {
- qm->status = QueuedMessage::DELETED;
- qm->payload = 0; // message no longer needed
- clean();
- return true;
- }
- }
- return false;
}
-size_t MessageDeque::size()
-{
- return available;
-}
+using qpid::framing::SequenceNumber;
-QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
-{
- size_t i = index(message.position);
- if (i < messages.size()) {
- QueuedMessage& m = messages[i];
- if (m.status == QueuedMessage::ACQUIRED) {
- if (head > i) head = i;
- m.status = QueuedMessage::AVAILABLE;
- ++available;
- return &messages[i];
- }
- } else {
- assert(0);
- QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
- }
- return 0;
-}
+MessageDeque::MessageDeque() : messages(&padding) {}
-void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); }
-bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageDeque::deleted(const QueueCursor& cursor)
{
- if (position < messages.front().position) return false;
- size_t i = index(position);
- if (i < messages.size()) {
- QueuedMessage& temp = messages[i];
- if (temp.status == QueuedMessage::AVAILABLE) {
- temp.status = QueuedMessage::ACQUIRED;
- --available;
- message = temp;
- return true;
- }
- }
- return false;
+ return messages.deleted(cursor);
}
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
+void MessageDeque::publish(const Message& added)
{
- size_t i = index(position);
- if (i < messages.size()) {
- message = messages[i];
- return true;
- } else {
- return false;
- }
+ messages.publish(added);
}
-bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+Message* MessageDeque::release(const QueueCursor& cursor)
{
- //get first message that is greater than position
- size_t i = index(position + 1);
- while (i < messages.size()) {
- QueuedMessage& m = messages[i++];
- if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) {
- message = m;
- return true;
- }
- }
- return false;
+ return messages.release(cursor);
}
-bool MessageDeque::consume(QueuedMessage& message)
+Message* MessageDeque::next(QueueCursor& cursor)
{
- while (head < messages.size()) {
- QueuedMessage& i = messages[head++];
- if (i.status == QueuedMessage::AVAILABLE) {
- i.status = QueuedMessage::ACQUIRED;
- --available;
- message = i;
- return true;
- }
- }
- return false;
+ return messages.next(cursor);
}
-namespace {
-QueuedMessage padding(uint32_t pos) {
- return QueuedMessage(0, 0, pos, QueuedMessage::DELETED);
-}
-} // namespace
-
-QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
- //add padding to prevent gaps in sequence, which break the index
- //calculation (needed for queue replication)
- while (messages.size() && (added.position - messages.back().position) > 1)
- messages.push_back(padding(messages.back().position + 1));
- messages.push_back(added);
- messages.back().status = QueuedMessage::AVAILABLE;
- if (head >= messages.size()) head = messages.size() - 1;
- ++available;
- clean(); // QPID-4046: let producer help clean the backlog of deleted messages
- return &messages.back();
-}
-
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) {
- pushPtr(added);
- return false; // adding a message never causes one to be removed for deque
-}
-
-void MessageDeque::updateAcquired(const QueuedMessage& acquired)
+size_t MessageDeque::size()
{
- // Pad the front of the queue if necessary
- while (messages.size() && (acquired.position < messages.front().position))
- messages.push_front(padding(uint32_t(messages.front().position) - 1));
- size_t i = index(acquired.position);
- if (i < messages.size()) { // Replace an existing padding message
- assert(messages[i].status == QueuedMessage::DELETED);
- messages[i] = acquired;
- messages[i].status = QueuedMessage::ACQUIRED;
- }
- else { // Push to the back
- // Pad the back of the queue if necessary
- while (messages.size() && (acquired.position - messages.back().position) > 1)
- messages.push_back(padding(messages.back().position + 1));
- assert(!messages.size() || (acquired.position - messages.back().position) == 1);
- messages.push_back(acquired);
- messages.back().status = QueuedMessage::ACQUIRED;
- }
+ return messages.size();
}
-namespace {
-bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
-} // namespace
-
-void MessageDeque::setPosition(const framing::SequenceNumber& n) {
- size_t i = index(n+1);
- if (i >= messages.size()) return; // Nothing to do.
-
- // Assertion to verify the precondition: no messaages after n.
- assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
- messages.end());
- messages.erase(messages.begin()+i, messages.end());
- if (head >= messages.size()) head = messages.size() - 1;
- // Re-count the available messages
- available = 0;
- for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->status == QueuedMessage::AVAILABLE) ++available;
- }
+Message* MessageDeque::find(const framing::SequenceNumber& position, QueueCursor* cursor)
+{
+ return messages.find(position, cursor);
}
-void MessageDeque::clean()
+Message* MessageDeque::find(const QueueCursor& cursor)
{
- // QPID-4046: If a queue has multiple consumers, then it is possible for a large
- // collection of deleted messages to build up. Limit the number of messages cleaned
- // up on each call to clean().
- size_t count = 0;
- while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) {
- messages.pop_front();
- count += 1;
- }
- head = (head > count) ? head - count : 0;
+ return messages.find(cursor);
}
void MessageDeque::foreach(Functor f)
{
- for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->status == QueuedMessage::AVAILABLE) {
- f(*i);
- }
- }
+ messages.foreach(f);
}
-void MessageDeque::removeIf(Predicate p)
+void MessageDeque::resetCursors()
{
- for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->status == QueuedMessage::AVAILABLE && p(*i)) {
- //Use special status for this as messages are not yet
- //dequeued, but should not be considered on the queue
- //either (used for purging and moving)
- i->status = QueuedMessage::REMOVED;
- --available;
- }
- }
- clean();
+ messages.resetCursors();
}
}} // namespace qpid::broker