diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageDeque.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageDeque.cpp | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp index 709d99876b..f70c996975 100644 --- a/cpp/src/qpid/broker/MessageDeque.cpp +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/MessageDeque.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/log/Statement.h" +#include "assert.h" namespace qpid { namespace broker { @@ -39,7 +40,7 @@ size_t MessageDeque::index(const framing::SequenceNumber& position) bool MessageDeque::deleted(const QueuedMessage& m) { size_t i = index(m.position); - if (i < messages.size()) { + if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) { messages[i].status = QueuedMessage::DELETED; clean(); return true; @@ -53,7 +54,7 @@ size_t MessageDeque::size() return available; } -void MessageDeque::release(const QueuedMessage& message) +QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message) { size_t i = index(message.position); if (i < messages.size()) { @@ -62,12 +63,17 @@ void MessageDeque::release(const QueuedMessage& message) if (head > i) head = i; m.status = QueuedMessage::AVAILABLE; ++available; + return &messages[i]; } } else { + assert(0); QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")"); } + return 0; } +void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); } + bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { if (position < messages.front().position) return false; @@ -129,8 +135,7 @@ QueuedMessage padding(uint32_t pos) { } } // namespace -bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) -{ +QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) { //add padding to prevent gaps in sequence, which break the index //calculation (needed for queue replication) while (messages.size() && (added.position - messages.back().position) > 1) @@ -139,7 +144,12 @@ bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed* messages.back().status = QueuedMessage::AVAILABLE; if (head >= messages.size()) head = messages.size() - 1; ++available; - return false;//adding a message never causes one to be removed for deque + return &messages.back(); +} + +bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { + pushPtr(added); + return false; // adding a message never causes one to be removed for deque } void MessageDeque::updateAcquired(const QueuedMessage& acquired) |