summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp13
1 files changed, 6 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 0aef732e54..5f450cd556 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -204,7 +204,7 @@ MessageGroupManager::~MessageGroupManager()
}
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- if (messages.empty())
+ if (!messages.size())
return false;
next.position = c->getPosition();
@@ -216,15 +216,16 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
}
}
- while (messages.next( next.position, next )) {
+ while (messages.browse( next.position, next, true )) {
GroupState& group = findGroup(next);
if (!group.owned()) {
- if (group.members.front() == next.position) { // only take from head!
+ //TODO: make acquire more efficient when we already have the message in question
+ if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head!
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
<< "'s head message still pending. pos=" << group.members.front());
- } else if (group.owner == c->getName()) {
+ } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
return true;
}
}
@@ -249,9 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
// browse: allow access to any available msg, regardless of group ownership (?ok?)
- if (!messages.empty() && messages.next(c->getPosition(), next))
- return true;
- return false;
+ return messages.browse(c->getPosition(), next, false);
}
void MessageGroupManager::query(qpid::types::Variant::Map& status) const