diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageDeque.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageDeque.cpp | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp index f70c996975..83c8ca6868 100644 --- a/cpp/src/qpid/broker/MessageDeque.cpp +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -40,13 +40,16 @@ size_t MessageDeque::index(const framing::SequenceNumber& position) bool MessageDeque::deleted(const QueuedMessage& m) { size_t i = index(m.position); - if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) { - messages[i].status = QueuedMessage::DELETED; - clean(); - return true; - } else { - return false; + if (i < messages.size()) { + QueuedMessage *qm = &messages[i]; + if (qm->status != QueuedMessage::DELETED) { + qm->status = QueuedMessage::DELETED; + qm->payload = 0; // message no longer needed + clean(); + return true; + } } + return false; } size_t MessageDeque::size() @@ -144,6 +147,7 @@ QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) { messages.back().status = QueuedMessage::AVAILABLE; if (head >= messages.size()) head = messages.size() - 1; ++available; + clean(); // QPID-4046: let producer help clean the backlog of deleted messages return &messages.back(); } @@ -173,12 +177,37 @@ void MessageDeque::updateAcquired(const QueuedMessage& acquired) } } +namespace { +bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; } +} // namespace + +void MessageDeque::setPosition(const framing::SequenceNumber& n) { + size_t i = index(n+1); + if (i >= messages.size()) return; // Nothing to do. + + // Assertion to verify the precondition: no messaages after n. + assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) == + messages.end()); + messages.erase(messages.begin()+i, messages.end()); + if (head >= messages.size()) head = messages.size() - 1; + // Re-count the available messages + available = 0; + for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->status == QueuedMessage::AVAILABLE) ++available; + } +} + void MessageDeque::clean() { - while (messages.size() && messages.front().status == QueuedMessage::DELETED) { + // QPID-4046: If a queue has multiple consumers, then it is possible for a large + // collection of deleted messages to build up. Limit the number of messages cleaned + // up on each call to clean(). + size_t count = 0; + while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) { messages.pop_front(); - if (head) --head; + count += 1; } + head = (head > count) ? head - count : 0; } void MessageDeque::foreach(Functor f) |