diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageMap.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageMap.cpp | 146 |
1 files changed, 61 insertions, 85 deletions
diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp index 592f3fefde..4cdd83c9aa 100644 --- a/cpp/src/qpid/broker/MessageMap.cpp +++ b/cpp/src/qpid/broker/MessageMap.cpp @@ -19,7 +19,8 @@ * */ #include "qpid/broker/MessageMap.h" -#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/QueueCursor.h" #include "qpid/log/Statement.h" #include <algorithm> @@ -29,29 +30,17 @@ namespace { const std::string EMPTY; } -bool MessageMap::deleted(const QueuedMessage& message) -{ - Ordering::iterator i = messages.find(message.position); - if (i != messages.end()) { - erase(i); - return true; - } else { - return false; - } -} -std::string MessageMap::getKey(const QueuedMessage& message) +std::string MessageMap::getKey(const Message& message) { - const framing::FieldTable* ft = message.payload->getApplicationHeaders(); - if (ft) return ft->getAsString(key); - else return EMPTY; + return message.getPropertyAsString(key); } size_t MessageMap::size() { size_t count(0); for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { - if (i->second.status == QueuedMessage::AVAILABLE) ++count; + if (i->second.getState() == AVAILABLE) ++count; } return count; } @@ -61,116 +50,103 @@ bool MessageMap::empty() return size() == 0;//TODO: more efficient implementation } -void MessageMap::release(const QueuedMessage& message) +bool MessageMap::deleted(const QueueCursor& cursor) { - Ordering::iterator i = messages.find(message.position); - if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { - i->second.status = QueuedMessage::AVAILABLE; - } -} - -bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message) -{ - Ordering::iterator i = messages.find(position); - if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { - i->second.status = QueuedMessage::ACQUIRED; - message = i->second; + Ordering::iterator i = messages.find(cursor.position); + if (i != messages.end()) { + erase(i); return true; } else { return false; } } -bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message) +Message* MessageMap::find(const QueueCursor& cursor) { - Ordering::iterator i = messages.find(position); - if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { - message = i->second; - return true; - } else { - return false; - } + if (cursor.valid) return find(cursor.position, 0); + else return 0; } -bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) +Message* MessageMap::find(const framing::SequenceNumber& position, QueueCursor* cursor) { - Ordering::iterator i = messages.lower_bound(position+1); - if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { - message = i->second; - return true; + Ordering::iterator i = messages.lower_bound(position); + if (i != messages.end()) { + if (cursor) cursor->setPosition(i->first, version); + if (i->first == position) return &(i->second); + else return 0; } else { - return false; + //there is no message whose sequence is greater than position, + //i.e. haven't got there yet + if (cursor) cursor->setPosition(position, version); + return 0; } } -bool MessageMap::consume(QueuedMessage& message) +Message* MessageMap::next(QueueCursor& cursor) { - for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { - if (i->second.status == QueuedMessage::AVAILABLE) { - i->second.status = QueuedMessage::ACQUIRED; - message = i->second; - return true; + Ordering::iterator i; + if (!cursor.valid) i = messages.begin(); //start with oldest message + else i = messages.upper_bound(cursor.position); //get first message that is greater than position + + while (i != messages.end()) { + Message& m = i->second; + cursor.setPosition(m.getSequence(), version); + if (cursor.check(m)) { + return &m; + } else { + ++i; } } - return false; + return 0; } -const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update) +const Message& MessageMap::replace(const Message& original, const Message& update) { - messages.erase(original.position); - messages[update.position] = update; - return update; + messages.erase(original.getSequence()); + std::pair<Ordering::iterator, bool> i = messages.insert(Ordering::value_type(update.getSequence(), update)); + i.first->second.setState(AVAILABLE); + return i.first->second; } -bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) +void MessageMap::publish(const Message& added) +{ + Message dummy; + update(added, dummy); +} + +bool MessageMap::update(const Message& added, Message& removed) { std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added)); if (result.second) { //there was no previous message for this key; nothing needs to //be removed, just add the message into its correct position - QueuedMessage& a = messages[added.position]; - a = added; - a.status = QueuedMessage::AVAILABLE; - QPID_LOG(debug, "Added message " << a); + messages.insert(Ordering::value_type(added.getSequence(), added)).first->second.setState(AVAILABLE); return false; } else { //there is already a message with that key which needs to be replaced removed = result.first->second; result.first->second = replace(result.first->second, added); - result.first->second.status = QueuedMessage::AVAILABLE; - QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first); + result.first->second.setState(AVAILABLE); + QPID_LOG(debug, "Displaced message at " << removed.getSequence() << " with " << result.first->second.getSequence() << ": " << result.first->first); return true; } } -void MessageMap::setPosition(const framing::SequenceNumber& seq) { - // Nothing to do, just assert that the precondition is respected and there - // are no undeleted messages after seq. - (void) seq; assert(messages.empty() || (--messages.end())->first <= seq); -} - -void MessageMap::foreach(Functor f) +Message* MessageMap::release(const QueueCursor& cursor) { - for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { - if (i->second.status == QueuedMessage::AVAILABLE) f(i->second); + Ordering::iterator i = messages.find(cursor.position); + if (i != messages.end()) { + i->second.setState(AVAILABLE); + return &i->second; + } else { + return 0; } } -void MessageMap::removeIf(Predicate p) +void MessageMap::foreach(Functor f) { - for (Ordering::iterator i = messages.begin(); i != messages.end();) { - if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) { - index.erase(getKey(i->second)); - //Note: Removing from messages means that the subsequent - //call to deleted() for the same message will return - //false. At present that is not a problem. If this were - //changed to hold onto the message until dequeued - //(e.g. with REMOVED state), then the erase() below would - //need to take that into account. - messages.erase(i++); - } else { - ++i; - } + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.getState() == AVAILABLE) f(i->second); } } @@ -180,6 +156,6 @@ void MessageMap::erase(Ordering::iterator i) messages.erase(i); } -MessageMap::MessageMap(const std::string& k) : key(k) {} +MessageMap::MessageMap(const std::string& k) : key(k), version(0) {} }} // namespace qpid::broker |