summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageGroupManager.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:54:46 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:54:46 +0000
commit0a8773c335509c2b9e9b96df360de190a266dcad (patch)
tree288469c17dacc37199b5f77498965fee7e778d95 /cpp/src/qpid/broker/MessageGroupManager.cpp
parentd82ce6836f7f0e4f7d647b2dc603141f549869d3 (diff)
downloadqpid-python-0a8773c335509c2b9e9b96df360de190a266dcad.tar.gz
QPID-3603: Merge new HA foundations.
Merged from qpid-3603-7. This is basic support for the new HA approach. For information & limitations see qpid/cpp/design_docs/new-ha-design.txt. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1245587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.cpp13
1 files changed, 6 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp
index 0aef732e54..5f450cd556 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -204,7 +204,7 @@ MessageGroupManager::~MessageGroupManager()
}
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- if (messages.empty())
+ if (!messages.size())
return false;
next.position = c->getPosition();
@@ -216,15 +216,16 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
}
}
- while (messages.next( next.position, next )) {
+ while (messages.browse( next.position, next, true )) {
GroupState& group = findGroup(next);
if (!group.owned()) {
- if (group.members.front() == next.position) { // only take from head!
+ //TODO: make acquire more efficient when we already have the message in question
+ if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head!
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
<< "'s head message still pending. pos=" << group.members.front());
- } else if (group.owner == c->getName()) {
+ } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
return true;
}
}
@@ -249,9 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
// browse: allow access to any available msg, regardless of group ownership (?ok?)
- if (!messages.empty() && messages.next(c->getPosition(), next))
- return true;
- return false;
+ return messages.browse(c->getPosition(), next, false);
}
void MessageGroupManager::query(qpid::types::Variant::Map& status) const