summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/MessageMap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageMap.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.cpp85
1 files changed, 59 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp
index 048df45434..d6702a9336 100644
--- a/qpid/cpp/src/qpid/broker/MessageMap.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp
@@ -20,6 +20,8 @@
*/
#include "qpid/broker/MessageMap.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -27,7 +29,16 @@ namespace {
const std::string EMPTY;
}
-bool MessageMap::deleted(const QueuedMessage&) { return true; }
+bool MessageMap::deleted(const QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end()) {
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
std::string MessageMap::getKey(const QueuedMessage& message)
{
@@ -38,30 +49,32 @@ std::string MessageMap::getKey(const QueuedMessage& message)
size_t MessageMap::size()
{
- return messages.size();
+ size_t count(0);
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+ }
+ return count;
}
bool MessageMap::empty()
{
- return messages.empty();
+ return size() == 0;//TODO: more efficient implementation
}
void MessageMap::release(const QueuedMessage& message)
{
- std::string key = getKey(message);
- Index::iterator i = index.find(key);
- if (i == index.end()) {
- index[key] = message;
- messages[message.position] = message;
- } //else message has already been replaced
+ Ordering::iterator i = messages.find(message.position);
+ if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+ i->second.status = QueuedMessage::AVAILABLE;
+ }
}
bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end()) {
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
message = i->second;
- erase(i);
return true;
} else {
return false;
@@ -71,7 +84,7 @@ bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage&
bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end()) {
+ if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
message = i->second;
return true;
} else {
@@ -79,10 +92,10 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me
}
}
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
{
Ordering::iterator i = messages.lower_bound(position+1);
- if (i != messages.end()) {
+ if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
message = i->second;
return true;
} else {
@@ -92,14 +105,14 @@ bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage&
bool MessageMap::consume(QueuedMessage& message)
{
- Ordering::iterator i = messages.begin();
- if (i != messages.end()) {
- message = i->second;
- erase(i);
- return true;
- } else {
- return false;
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.status == QueuedMessage::AVAILABLE) {
+ i->second.status = QueuedMessage::ACQUIRED;
+ message = i->second;
+ return true;
+ }
}
+ return false;
}
const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
@@ -115,28 +128,48 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
if (result.second) {
//there was no previous message for this key; nothing needs to
//be removed, just add the message into its correct position
- messages[added.position] = added;
+ QueuedMessage& a = messages[added.position];
+ a = added;
+ a.status = QueuedMessage::AVAILABLE;
+ QPID_LOG(debug, "Added message " << a);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
+ result.first->second.status = QueuedMessage::AVAILABLE;
+ QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
return true;
}
}
+void MessageMap::setPosition(const framing::SequenceNumber& seq) {
+ // Nothing to do, just assert that the precondition is respected and there
+ // are no undeleted messages after seq.
+ assert(messages.empty() || (--messages.end())->first <= seq);
+}
+
void MessageMap::foreach(Functor f)
{
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(i->second);
+ if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
}
}
void MessageMap::removeIf(Predicate p)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
- if (p(i->second)) {
- erase(i);
+ for (Ordering::iterator i = messages.begin(); i != messages.end();) {
+ if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
+ index.erase(getKey(i->second));
+ //Note: Removing from messages means that the subsequent
+ //call to deleted() for the same message will return
+ //false. At present that is not a problem. If this were
+ //changed to hold onto the message until dequeued
+ //(e.g. with REMOVED state), then the erase() below would
+ //need to take that into account.
+ messages.erase(i++);
+ } else {
+ ++i;
}
}
}