summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageMap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageMap.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageMap.cpp146
1 files changed, 61 insertions, 85 deletions
diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp
index 592f3fefde..4cdd83c9aa 100644
--- a/cpp/src/qpid/broker/MessageMap.cpp
+++ b/cpp/src/qpid/broker/MessageMap.cpp
@@ -19,7 +19,8 @@
*
*/
#include "qpid/broker/MessageMap.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/log/Statement.h"
#include <algorithm>
@@ -29,29 +30,17 @@ namespace {
const std::string EMPTY;
}
-bool MessageMap::deleted(const QueuedMessage& message)
-{
- Ordering::iterator i = messages.find(message.position);
- if (i != messages.end()) {
- erase(i);
- return true;
- } else {
- return false;
- }
-}
-std::string MessageMap::getKey(const QueuedMessage& message)
+std::string MessageMap::getKey(const Message& message)
{
- const framing::FieldTable* ft = message.payload->getApplicationHeaders();
- if (ft) return ft->getAsString(key);
- else return EMPTY;
+ return message.getPropertyAsString(key);
}
size_t MessageMap::size()
{
size_t count(0);
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+ if (i->second.getState() == AVAILABLE) ++count;
}
return count;
}
@@ -61,116 +50,103 @@ bool MessageMap::empty()
return size() == 0;//TODO: more efficient implementation
}
-void MessageMap::release(const QueuedMessage& message)
+bool MessageMap::deleted(const QueueCursor& cursor)
{
- Ordering::iterator i = messages.find(message.position);
- if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
- i->second.status = QueuedMessage::AVAILABLE;
- }
-}
-
-bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
-{
- Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
- i->second.status = QueuedMessage::ACQUIRED;
- message = i->second;
+ Ordering::iterator i = messages.find(cursor.position);
+ if (i != messages.end()) {
+ erase(i);
return true;
} else {
return false;
}
}
-bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* MessageMap::find(const QueueCursor& cursor)
{
- Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
- message = i->second;
- return true;
- } else {
- return false;
- }
+ if (cursor.valid) return find(cursor.position, 0);
+ else return 0;
}
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+Message* MessageMap::find(const framing::SequenceNumber& position, QueueCursor* cursor)
{
- Ordering::iterator i = messages.lower_bound(position+1);
- if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
- message = i->second;
- return true;
+ Ordering::iterator i = messages.lower_bound(position);
+ if (i != messages.end()) {
+ if (cursor) cursor->setPosition(i->first, version);
+ if (i->first == position) return &(i->second);
+ else return 0;
} else {
- return false;
+ //there is no message whose sequence is greater than position,
+ //i.e. haven't got there yet
+ if (cursor) cursor->setPosition(position, version);
+ return 0;
}
}
-bool MessageMap::consume(QueuedMessage& message)
+Message* MessageMap::next(QueueCursor& cursor)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->second.status == QueuedMessage::AVAILABLE) {
- i->second.status = QueuedMessage::ACQUIRED;
- message = i->second;
- return true;
+ Ordering::iterator i;
+ if (!cursor.valid) i = messages.begin(); //start with oldest message
+ else i = messages.upper_bound(cursor.position); //get first message that is greater than position
+
+ while (i != messages.end()) {
+ Message& m = i->second;
+ cursor.setPosition(m.getSequence(), version);
+ if (cursor.check(m)) {
+ return &m;
+ } else {
+ ++i;
}
}
- return false;
+ return 0;
}
-const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
+const Message& MessageMap::replace(const Message& original, const Message& update)
{
- messages.erase(original.position);
- messages[update.position] = update;
- return update;
+ messages.erase(original.getSequence());
+ std::pair<Ordering::iterator, bool> i = messages.insert(Ordering::value_type(update.getSequence(), update));
+ i.first->second.setState(AVAILABLE);
+ return i.first->second;
}
-bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
+void MessageMap::publish(const Message& added)
+{
+ Message dummy;
+ update(added, dummy);
+}
+
+bool MessageMap::update(const Message& added, Message& removed)
{
std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added));
if (result.second) {
//there was no previous message for this key; nothing needs to
//be removed, just add the message into its correct position
- QueuedMessage& a = messages[added.position];
- a = added;
- a.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Added message " << a);
+ messages.insert(Ordering::value_type(added.getSequence(), added)).first->second.setState(AVAILABLE);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
- result.first->second.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
+ result.first->second.setState(AVAILABLE);
+ QPID_LOG(debug, "Displaced message at " << removed.getSequence() << " with " << result.first->second.getSequence() << ": " << result.first->first);
return true;
}
}
-void MessageMap::setPosition(const framing::SequenceNumber& seq) {
- // Nothing to do, just assert that the precondition is respected and there
- // are no undeleted messages after seq.
- (void) seq; assert(messages.empty() || (--messages.end())->first <= seq);
-}
-
-void MessageMap::foreach(Functor f)
+Message* MessageMap::release(const QueueCursor& cursor)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
- if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
+ Ordering::iterator i = messages.find(cursor.position);
+ if (i != messages.end()) {
+ i->second.setState(AVAILABLE);
+ return &i->second;
+ } else {
+ return 0;
}
}
-void MessageMap::removeIf(Predicate p)
+void MessageMap::foreach(Functor f)
{
- for (Ordering::iterator i = messages.begin(); i != messages.end();) {
- if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
- index.erase(getKey(i->second));
- //Note: Removing from messages means that the subsequent
- //call to deleted() for the same message will return
- //false. At present that is not a problem. If this were
- //changed to hold onto the message until dequeued
- //(e.g. with REMOVED state), then the erase() below would
- //need to take that into account.
- messages.erase(i++);
- } else {
- ++i;
- }
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->second.getState() == AVAILABLE) f(i->second);
}
}
@@ -180,6 +156,6 @@ void MessageMap::erase(Ordering::iterator i)
messages.erase(i);
}
-MessageMap::MessageMap(const std::string& k) : key(k) {}
+MessageMap::MessageMap(const std::string& k) : key(k), version(0) {}
}} // namespace qpid::broker