diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 0aef732e54..5f450cd556 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -204,7 +204,7 @@ MessageGroupManager::~MessageGroupManager() } bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - if (messages.empty()) + if (!messages.size()) return false; next.position = c->getPosition(); @@ -216,15 +216,16 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued } } - while (messages.next( next.position, next )) { + while (messages.browse( next.position, next, true )) { GroupState& group = findGroup(next); if (!group.owned()) { - if (group.members.front() == next.position) { // only take from head! + //TODO: make acquire more efficient when we already have the message in question + if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head! return true; } QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group << "'s head message still pending. pos=" << group.members.front()); - } else if (group.owner == c->getName()) { + } else if (group.owner == c->getName() && messages.acquire(next.position, next)) { return true; } } @@ -249,9 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { // browse: allow access to any available msg, regardless of group ownership (?ok?) - if (!messages.empty() && messages.next(c->getPosition(), next)) - return true; - return false; + return messages.browse(c->getPosition(), next, false); } void MessageGroupManager::query(qpid::types::Variant::Map& status) const |