diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageDeque.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.cpp | 161 |
1 files changed, 98 insertions, 63 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp index 24b8f6f895..9f874e4c9a 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp +++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp @@ -20,121 +20,156 @@ */ #include "qpid/broker/MessageDeque.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { -size_t MessageDeque::size() -{ - return messages.size(); -} - -bool MessageDeque::empty() -{ - return messages.empty(); -} +MessageDeque::MessageDeque() : available(0), head(0) {} -void MessageDeque::reinsert(const QueuedMessage& message) +size_t MessageDeque::index(const framing::SequenceNumber& position) { - messages.insert(lower_bound(messages.begin(), messages.end(), message), message); -} - -MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position) -{ - if (!messages.empty()) { - QueuedMessage comp; - comp.position = position; - unsigned long diff = position.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - return lower_bound(messages.begin(),messages.begin()+maxEnd,comp); - } else { - return messages.end(); - } + //assuming a monotonic sequence, with no messages removed except + //from the ends of the deque, we can use the position to determin + //an index into the deque + if (messages.empty() || position < messages.front().position) return 0; + return position - messages.front().position; } -bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +bool MessageDeque::deleted(const QueuedMessage& m) { - Deque::iterator i = seek(position); - if (i != messages.end() && i->position == position) { - message = *i; - if (remove) messages.erase(i); + size_t i = index(m.position); + if (i < messages.size()) { + messages[i].status = QueuedMessage::DELETED; + clean(); return true; } else { return false; } } -bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message) +size_t MessageDeque::size() { - return find(position, message, true); + return available; } -bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) +void MessageDeque::release(const QueuedMessage& message) { - return find(position, message, false); + size_t i = index(message.position); + if (i < messages.size()) { + QueuedMessage& m = messages[i]; + if (m.status == QueuedMessage::ACQUIRED) { + if (head > i) head = i; + m.status = QueuedMessage::AVAILABLE; + ++available; + } + } else { + QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")"); + } } -bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message) +bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { - if (messages.empty()) { - return false; - } else if (position < front().position) { - message = front(); - return true; - } else { - Deque::iterator i = seek(position+1); - if (i != messages.end()) { - message = *i; + if (position < messages.front().position) return false; + size_t i = index(position); + if (i < messages.size()) { + QueuedMessage& temp = messages[i]; + if (temp.status == QueuedMessage::AVAILABLE) { + temp.status = QueuedMessage::ACQUIRED; + --available; + message = temp; return true; - } else { - return false; } } + return false; } -QueuedMessage& MessageDeque::front() +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) { - return messages.front(); + size_t i = index(position); + if (i < messages.size()) { + message = messages[i]; + return true; + } else { + return false; + } } -void MessageDeque::pop() +bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - if (!messages.empty()) { - messages.pop_front(); + //get first message that is greater than position + size_t i = index(position + 1); + while (i < messages.size()) { + QueuedMessage& m = messages[i++]; + if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) { + message = m; + return true; + } } + return false; } -bool MessageDeque::pop(QueuedMessage& out) +bool MessageDeque::consume(QueuedMessage& message) { - if (messages.empty()) { - return false; - } else { - out = front(); - messages.pop_front(); - return true; + while (head < messages.size()) { + QueuedMessage& i = messages[head++]; + if (i.status == QueuedMessage::AVAILABLE) { + i.status = QueuedMessage::ACQUIRED; + --available; + message = i; + return true; + } } + return false; } bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { + //add padding to prevent gaps in sequence, which break the index + //calculation (needed for queue replication) + while (messages.size() && (added.position - messages.back().position) > 1) { + QueuedMessage dummy; + dummy.position = messages.back().position + 1; + dummy.status = QueuedMessage::DELETED; + messages.push_back(dummy); + QPID_LOG(debug, "Adding padding at " << dummy.position << ", between " << messages.back().position << " and " << added.position); + } messages.push_back(added); + messages.back().status = QueuedMessage::AVAILABLE; + if (head >= messages.size()) head = messages.size() - 1; + ++available; return false;//adding a message never causes one to be removed for deque } +void MessageDeque::clean() +{ + while (messages.size() && messages.front().status == QueuedMessage::DELETED) { + messages.pop_front(); + if (head) --head; + } +} + void MessageDeque::foreach(Functor f) { - std::for_each(messages.begin(), messages.end(), f); + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE) { + f(*i); + } + } } void MessageDeque::removeIf(Predicate p) { - for (Deque::iterator i = messages.begin(); i != messages.end();) { - if (p(*i)) { - i = messages.erase(i); - } else { - ++i; + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE && p(*i)) { + //Use special status for this as messages are not yet + //dequeued, but should not be considered on the queue + //either (used for purging and moving) + i->status = QueuedMessage::REMOVED; + --available; } } + clean(); } }} // namespace qpid::broker |