diff options
author | Alan Conway <aconway@apache.org> | 2012-02-17 14:54:46 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-17 14:54:46 +0000 |
commit | 0a8773c335509c2b9e9b96df360de190a266dcad (patch) | |
tree | 288469c17dacc37199b5f77498965fee7e778d95 /cpp/src/qpid/broker/MessageGroupManager.cpp | |
parent | d82ce6836f7f0e4f7d647b2dc603141f549869d3 (diff) | |
download | qpid-python-0a8773c335509c2b9e9b96df360de190a266dcad.tar.gz |
QPID-3603: Merge new HA foundations.
Merged from qpid-3603-7. This is basic support for the new HA approach.
For information & limitations see qpid/cpp/design_docs/new-ha-design.txt.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1245587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.cpp | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp index 0aef732e54..5f450cd556 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -204,7 +204,7 @@ MessageGroupManager::~MessageGroupManager() } bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - if (messages.empty()) + if (!messages.size()) return false; next.position = c->getPosition(); @@ -216,15 +216,16 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued } } - while (messages.next( next.position, next )) { + while (messages.browse( next.position, next, true )) { GroupState& group = findGroup(next); if (!group.owned()) { - if (group.members.front() == next.position) { // only take from head! + //TODO: make acquire more efficient when we already have the message in question + if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head! return true; } QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group << "'s head message still pending. pos=" << group.members.front()); - } else if (group.owner == c->getName()) { + } else if (group.owner == c->getName() && messages.acquire(next.position, next)) { return true; } } @@ -249,9 +250,7 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { // browse: allow access to any available msg, regardless of group ownership (?ok?) - if (!messages.empty() && messages.next(c->getPosition(), next)) - return true; - return false; + return messages.browse(c->getPosition(), next, false); } void MessageGroupManager::query(qpid::types::Variant::Map& status) const |