summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/MessageDeque.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageDeque.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.cpp161
1 files changed, 98 insertions, 63 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
index 24b8f6f895..9f874e4c9a 100644
--- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
@@ -20,121 +20,156 @@
*/
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
-size_t MessageDeque::size()
-{
- return messages.size();
-}
-
-bool MessageDeque::empty()
-{
- return messages.empty();
-}
+MessageDeque::MessageDeque() : available(0), head(0) {}
-void MessageDeque::reinsert(const QueuedMessage& message)
+size_t MessageDeque::index(const framing::SequenceNumber& position)
{
- messages.insert(lower_bound(messages.begin(), messages.end(), message), message);
-}
-
-MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position)
-{
- if (!messages.empty()) {
- QueuedMessage comp;
- comp.position = position;
- unsigned long diff = position.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
- return lower_bound(messages.begin(),messages.begin()+maxEnd,comp);
- } else {
- return messages.end();
- }
+ //assuming a monotonic sequence, with no messages removed except
+ //from the ends of the deque, we can use the position to determin
+ //an index into the deque
+ if (messages.empty() || position < messages.front().position) return 0;
+ return position - messages.front().position;
}
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+bool MessageDeque::deleted(const QueuedMessage& m)
{
- Deque::iterator i = seek(position);
- if (i != messages.end() && i->position == position) {
- message = *i;
- if (remove) messages.erase(i);
+ size_t i = index(m.position);
+ if (i < messages.size()) {
+ messages[i].status = QueuedMessage::DELETED;
+ clean();
return true;
} else {
return false;
}
}
-bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+size_t MessageDeque::size()
{
- return find(position, message, true);
+ return available;
}
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
+void MessageDeque::release(const QueuedMessage& message)
{
- return find(position, message, false);
+ size_t i = index(message.position);
+ if (i < messages.size()) {
+ QueuedMessage& m = messages[i];
+ if (m.status == QueuedMessage::ACQUIRED) {
+ if (head > i) head = i;
+ m.status = QueuedMessage::AVAILABLE;
+ ++available;
+ }
+ } else {
+ QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
+ }
}
-bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
- if (messages.empty()) {
- return false;
- } else if (position < front().position) {
- message = front();
- return true;
- } else {
- Deque::iterator i = seek(position+1);
- if (i != messages.end()) {
- message = *i;
+ if (position < messages.front().position) return false;
+ size_t i = index(position);
+ if (i < messages.size()) {
+ QueuedMessage& temp = messages[i];
+ if (temp.status == QueuedMessage::AVAILABLE) {
+ temp.status = QueuedMessage::ACQUIRED;
+ --available;
+ message = temp;
return true;
- } else {
- return false;
}
}
+ return false;
}
-QueuedMessage& MessageDeque::front()
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
- return messages.front();
+ size_t i = index(position);
+ if (i < messages.size()) {
+ message = messages[i];
+ return true;
+ } else {
+ return false;
+ }
}
-void MessageDeque::pop()
+bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
- if (!messages.empty()) {
- messages.pop_front();
+ //get first message that is greater than position
+ size_t i = index(position + 1);
+ while (i < messages.size()) {
+ QueuedMessage& m = messages[i++];
+ if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) {
+ message = m;
+ return true;
+ }
}
+ return false;
}
-bool MessageDeque::pop(QueuedMessage& out)
+bool MessageDeque::consume(QueuedMessage& message)
{
- if (messages.empty()) {
- return false;
- } else {
- out = front();
- messages.pop_front();
- return true;
+ while (head < messages.size()) {
+ QueuedMessage& i = messages[head++];
+ if (i.status == QueuedMessage::AVAILABLE) {
+ i.status = QueuedMessage::ACQUIRED;
+ --available;
+ message = i;
+ return true;
+ }
}
+ return false;
}
bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
+ //add padding to prevent gaps in sequence, which break the index
+ //calculation (needed for queue replication)
+ while (messages.size() && (added.position - messages.back().position) > 1) {
+ QueuedMessage dummy;
+ dummy.position = messages.back().position + 1;
+ dummy.status = QueuedMessage::DELETED;
+ messages.push_back(dummy);
+ QPID_LOG(debug, "Adding padding at " << dummy.position << ", between " << messages.back().position << " and " << added.position);
+ }
messages.push_back(added);
+ messages.back().status = QueuedMessage::AVAILABLE;
+ if (head >= messages.size()) head = messages.size() - 1;
+ ++available;
return false;//adding a message never causes one to be removed for deque
}
+void MessageDeque::clean()
+{
+ while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
+ messages.pop_front();
+ if (head) --head;
+ }
+}
+
void MessageDeque::foreach(Functor f)
{
- std::for_each(messages.begin(), messages.end(), f);
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE) {
+ f(*i);
+ }
+ }
}
void MessageDeque::removeIf(Predicate p)
{
- for (Deque::iterator i = messages.begin(); i != messages.end();) {
- if (p(*i)) {
- i = messages.erase(i);
- } else {
- ++i;
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE && p(*i)) {
+ //Use special status for this as messages are not yet
+ //dequeued, but should not be considered on the queue
+ //either (used for purging and moving)
+ i->status = QueuedMessage::REMOVED;
+ --available;
}
}
+ clean();
}
}} // namespace qpid::broker