diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageDeque.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageDeque.cpp | 207 |
1 files changed, 30 insertions, 177 deletions
diff --git a/cpp/src/qpid/broker/MessageDeque.cpp b/cpp/src/qpid/broker/MessageDeque.cpp index 83c8ca6868..1529d4ac94 100644 --- a/cpp/src/qpid/broker/MessageDeque.cpp +++ b/cpp/src/qpid/broker/MessageDeque.cpp @@ -19,218 +19,71 @@ * */ #include "qpid/broker/MessageDeque.h" -#include "qpid/broker/QueuedMessage.h" -#include "qpid/log/Statement.h" #include "assert.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/QueueCursor.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { - -MessageDeque::MessageDeque() : available(0), head(0) {} - -size_t MessageDeque::index(const framing::SequenceNumber& position) -{ - //assuming a monotonic sequence, with no messages removed except - //from the ends of the deque, we can use the position to determin - //an index into the deque - if (messages.empty() || position < messages.front().position) return 0; - return position - messages.front().position; +namespace { +Message padding(qpid::framing::SequenceNumber id) { + Message m; + m.setState(DELETED); + m.setSequence(id); + return m; } - -bool MessageDeque::deleted(const QueuedMessage& m) -{ - size_t i = index(m.position); - if (i < messages.size()) { - QueuedMessage *qm = &messages[i]; - if (qm->status != QueuedMessage::DELETED) { - qm->status = QueuedMessage::DELETED; - qm->payload = 0; // message no longer needed - clean(); - return true; - } - } - return false; } -size_t MessageDeque::size() -{ - return available; -} +using qpid::framing::SequenceNumber; -QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message) -{ - size_t i = index(message.position); - if (i < messages.size()) { - QueuedMessage& m = messages[i]; - if (m.status == QueuedMessage::ACQUIRED) { - if (head > i) head = i; - m.status = QueuedMessage::AVAILABLE; - ++available; - return &messages[i]; - } - } else { - assert(0); - QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")"); - } - return 0; -} +MessageDeque::MessageDeque() : messages(&padding) {} -void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); } -bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message) +bool MessageDeque::deleted(const QueueCursor& cursor) { - if (position < messages.front().position) return false; - size_t i = index(position); - if (i < messages.size()) { - QueuedMessage& temp = messages[i]; - if (temp.status == QueuedMessage::AVAILABLE) { - temp.status = QueuedMessage::ACQUIRED; - --available; - message = temp; - return true; - } - } - return false; + return messages.deleted(cursor); } -bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) +void MessageDeque::publish(const Message& added) { - size_t i = index(position); - if (i < messages.size()) { - message = messages[i]; - return true; - } else { - return false; - } + messages.publish(added); } -bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) +Message* MessageDeque::release(const QueueCursor& cursor) { - //get first message that is greater than position - size_t i = index(position + 1); - while (i < messages.size()) { - QueuedMessage& m = messages[i++]; - if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) { - message = m; - return true; - } - } - return false; + return messages.release(cursor); } -bool MessageDeque::consume(QueuedMessage& message) +Message* MessageDeque::next(QueueCursor& cursor) { - while (head < messages.size()) { - QueuedMessage& i = messages[head++]; - if (i.status == QueuedMessage::AVAILABLE) { - i.status = QueuedMessage::ACQUIRED; - --available; - message = i; - return true; - } - } - return false; + return messages.next(cursor); } -namespace { -QueuedMessage padding(uint32_t pos) { - return QueuedMessage(0, 0, pos, QueuedMessage::DELETED); -} -} // namespace - -QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) { - //add padding to prevent gaps in sequence, which break the index - //calculation (needed for queue replication) - while (messages.size() && (added.position - messages.back().position) > 1) - messages.push_back(padding(messages.back().position + 1)); - messages.push_back(added); - messages.back().status = QueuedMessage::AVAILABLE; - if (head >= messages.size()) head = messages.size() - 1; - ++available; - clean(); // QPID-4046: let producer help clean the backlog of deleted messages - return &messages.back(); -} - -bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) { - pushPtr(added); - return false; // adding a message never causes one to be removed for deque -} - -void MessageDeque::updateAcquired(const QueuedMessage& acquired) +size_t MessageDeque::size() { - // Pad the front of the queue if necessary - while (messages.size() && (acquired.position < messages.front().position)) - messages.push_front(padding(uint32_t(messages.front().position) - 1)); - size_t i = index(acquired.position); - if (i < messages.size()) { // Replace an existing padding message - assert(messages[i].status == QueuedMessage::DELETED); - messages[i] = acquired; - messages[i].status = QueuedMessage::ACQUIRED; - } - else { // Push to the back - // Pad the back of the queue if necessary - while (messages.size() && (acquired.position - messages.back().position) > 1) - messages.push_back(padding(messages.back().position + 1)); - assert(!messages.size() || (acquired.position - messages.back().position) == 1); - messages.push_back(acquired); - messages.back().status = QueuedMessage::ACQUIRED; - } + return messages.size(); } -namespace { -bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; } -} // namespace - -void MessageDeque::setPosition(const framing::SequenceNumber& n) { - size_t i = index(n+1); - if (i >= messages.size()) return; // Nothing to do. - - // Assertion to verify the precondition: no messaages after n. - assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) == - messages.end()); - messages.erase(messages.begin()+i, messages.end()); - if (head >= messages.size()) head = messages.size() - 1; - // Re-count the available messages - available = 0; - for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { - if (i->status == QueuedMessage::AVAILABLE) ++available; - } +Message* MessageDeque::find(const framing::SequenceNumber& position, QueueCursor* cursor) +{ + return messages.find(position, cursor); } -void MessageDeque::clean() +Message* MessageDeque::find(const QueueCursor& cursor) { - // QPID-4046: If a queue has multiple consumers, then it is possible for a large - // collection of deleted messages to build up. Limit the number of messages cleaned - // up on each call to clean(). - size_t count = 0; - while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) { - messages.pop_front(); - count += 1; - } - head = (head > count) ? head - count : 0; + return messages.find(cursor); } void MessageDeque::foreach(Functor f) { - for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { - if (i->status == QueuedMessage::AVAILABLE) { - f(*i); - } - } + messages.foreach(f); } -void MessageDeque::removeIf(Predicate p) +void MessageDeque::resetCursors() { - for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) { - if (i->status == QueuedMessage::AVAILABLE && p(*i)) { - //Use special status for this as messages are not yet - //dequeued, but should not be considered on the queue - //either (used for purging and moving) - i->status = QueuedMessage::REMOVED; - --available; - } - } - clean(); + messages.resetCursors(); } }} // namespace qpid::broker |