diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageDeque.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageDeque.cpp | 179 |
1 files changed, 118 insertions, 61 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp index 24b8f6f895..709d99876b 100644 --- a/cpp/src/qpid/broker/MessageDeque.cpp +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -20,121 +20,178 @@ */ #include "qpid/broker/MessageDeque.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { -size_t MessageDeque::size() +MessageDeque::MessageDeque() : available(0), head(0) {} + +size_t MessageDeque::index(const framing::SequenceNumber& position) { - return messages.size(); + //assuming a monotonic sequence, with no messages removed except + //from the ends of the deque, we can use the position to determin + //an index into the deque + if (messages.empty() || position < messages.front().position) return 0; + return position - messages.front().position; } -bool MessageDeque::empty() +bool MessageDeque::deleted(const QueuedMessage& m) { - return messages.empty(); + size_t i = index(m.position); + if (i < messages.size()) { + messages[i].status = QueuedMessage::DELETED; + clean(); + return true; + } else { + return false; + } } -void MessageDeque::reinsert(const QueuedMessage& message) +size_t MessageDeque::size() { - messages.insert(lower_bound(messages.begin(), messages.end(), message), message); + return available; } -MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position) +void MessageDeque::release(const QueuedMessage& message) { - if (!messages.empty()) { - QueuedMessage comp; - comp.position = position; - unsigned long diff = position.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - return lower_bound(messages.begin(),messages.begin()+maxEnd,comp); + size_t i = index(message.position); + if (i < messages.size()) { + QueuedMessage& m = messages[i]; + if (m.status == QueuedMessage::ACQUIRED) { + if (head > i) head = i; + m.status = QueuedMessage::AVAILABLE; + ++available; + } } else { - return messages.end(); + QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")"); } } -bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { - Deque::iterator i = seek(position); - if (i != messages.end() && i->position == position) { - message = *i; - if (remove) messages.erase(i); - return true; - } else { - return false; + if (position < messages.front().position) return false; + size_t i = index(position); + if (i < messages.size()) { + QueuedMessage& temp = messages[i]; + if (temp.status == QueuedMessage::AVAILABLE) { + temp.status = QueuedMessage::ACQUIRED; + --available; + message = temp; + return true; + } } + return false; } -bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message) +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) { - return find(position, message, true); + size_t i = index(position); + if (i < messages.size()) { + message = messages[i]; + return true; + } else { + return false; + } } -bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) +bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { - return find(position, message, false); + //get first message that is greater than position + size_t i = index(position + 1); + while (i < messages.size()) { + QueuedMessage& m = messages[i++]; + if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) { + message = m; + return true; + } + } + return false; } -bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message) +bool MessageDeque::consume(QueuedMessage& message) { - if (messages.empty()) { - return false; - } else if (position < front().position) { - message = front(); - return true; - } else { - Deque::iterator i = seek(position+1); - if (i != messages.end()) { - message = *i; + while (head < messages.size()) { + QueuedMessage& i = messages[head++]; + if (i.status == QueuedMessage::AVAILABLE) { + i.status = QueuedMessage::ACQUIRED; + --available; + message = i; return true; - } else { - return false; } } + return false; } -QueuedMessage& MessageDeque::front() -{ - return messages.front(); +namespace { +QueuedMessage padding(uint32_t pos) { + return QueuedMessage(0, 0, pos, QueuedMessage::DELETED); } +} // namespace -void MessageDeque::pop() +bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { - if (!messages.empty()) { - messages.pop_front(); - } + //add padding to prevent gaps in sequence, which break the index + //calculation (needed for queue replication) + while (messages.size() && (added.position - messages.back().position) > 1) + messages.push_back(padding(messages.back().position + 1)); + messages.push_back(added); + messages.back().status = QueuedMessage::AVAILABLE; + if (head >= messages.size()) head = messages.size() - 1; + ++available; + return false;//adding a message never causes one to be removed for deque } -bool MessageDeque::pop(QueuedMessage& out) +void MessageDeque::updateAcquired(const QueuedMessage& acquired) { - if (messages.empty()) { - return false; - } else { - out = front(); - messages.pop_front(); - return true; + // Pad the front of the queue if necessary + while (messages.size() && (acquired.position < messages.front().position)) + messages.push_front(padding(uint32_t(messages.front().position) - 1)); + size_t i = index(acquired.position); + if (i < messages.size()) { // Replace an existing padding message + assert(messages[i].status == QueuedMessage::DELETED); + messages[i] = acquired; + messages[i].status = QueuedMessage::ACQUIRED; + } + else { // Push to the back + // Pad the back of the queue if necessary + while (messages.size() && (acquired.position - messages.back().position) > 1) + messages.push_back(padding(messages.back().position + 1)); + assert(!messages.size() || (acquired.position - messages.back().position) == 1); + messages.push_back(acquired); + messages.back().status = QueuedMessage::ACQUIRED; } } -bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) +void MessageDeque::clean() { - messages.push_back(added); - return false;//adding a message never causes one to be removed for deque + while (messages.size() && messages.front().status == QueuedMessage::DELETED) { + messages.pop_front(); + if (head) --head; + } } void MessageDeque::foreach(Functor f) { - std::for_each(messages.begin(), messages.end(), f); + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE) { + f(*i); + } + } } void MessageDeque::removeIf(Predicate p) { - for (Deque::iterator i = messages.begin(); i != messages.end();) { - if (p(*i)) { - i = messages.erase(i); - } else { - ++i; + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE && p(*i)) { + //Use special status for this as messages are not yet + //dequeued, but should not be considered on the queue + //either (used for purging and moving) + i->status = QueuedMessage::REMOVED; + --available; } } + clean(); } }} // namespace qpid::broker |