summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageDeque.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageDeque.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageDeque.cpp20
1 files changed, 15 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp
index 709d99876b..f70c996975 100644
--- a/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/cpp/src/qpid/broker/MessageDeque.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
+#include "assert.h"
namespace qpid {
namespace broker {
@@ -39,7 +40,7 @@ size_t MessageDeque::index(const framing::SequenceNumber& position)
bool MessageDeque::deleted(const QueuedMessage& m)
{
size_t i = index(m.position);
- if (i < messages.size()) {
+ if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
messages[i].status = QueuedMessage::DELETED;
clean();
return true;
@@ -53,7 +54,7 @@ size_t MessageDeque::size()
return available;
}
-void MessageDeque::release(const QueuedMessage& message)
+QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
{
size_t i = index(message.position);
if (i < messages.size()) {
@@ -62,12 +63,17 @@ void MessageDeque::release(const QueuedMessage& message)
if (head > i) head = i;
m.status = QueuedMessage::AVAILABLE;
++available;
+ return &messages[i];
}
} else {
+ assert(0);
QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
}
+ return 0;
}
+void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); }
+
bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
if (position < messages.front().position) return false;
@@ -129,8 +135,7 @@ QueuedMessage padding(uint32_t pos) {
}
} // namespace
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
-{
+QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
//add padding to prevent gaps in sequence, which break the index
//calculation (needed for queue replication)
while (messages.size() && (added.position - messages.back().position) > 1)
@@ -139,7 +144,12 @@ bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*
messages.back().status = QueuedMessage::AVAILABLE;
if (head >= messages.size()) head = messages.size() - 1;
++available;
- return false;//adding a message never causes one to be removed for deque
+ return &messages.back();
+}
+
+bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) {
+ pushPtr(added);
+ return false; // adding a message never causes one to be removed for deque
}
void MessageDeque::updateAcquired(const QueuedMessage& acquired)