diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.cpp | 76 |
1 files changed, 22 insertions, 54 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp index d4ca6af1d5..07b05f3b92 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -43,6 +43,27 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); +void MessageGroupManager::unFree( const GroupState& state ) +{ + GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + assert( pos != freeGroups.end() && pos->second == &state ); + freeGroups.erase( pos ); +} + +void MessageGroupManager::own( GroupState& state, const std::string& owner ) +{ + state.owner = owner; + unFree( state ); +} + +void MessageGroupManager::disown( GroupState& state ) +{ + state.owner.clear(); + assert(state.members.size()); + assert(freeGroups.find(state.members.front()) == freeGroups.end()); + freeGroups[state.members.front()] = &state; +} + const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const { const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); @@ -155,51 +176,6 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) ": dequeued message from group id=" << group << " total=" << total ); } -void MessageGroupManager::consumerAdded( const Consumer& /*c*/ ) -{ -#if 0 - // allow a re-subscribing consumer - if (consumers.find(c.getName()) == consumers.end()) { - consumers[c.getName()] = 0; // no groups owned yet - QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() ); - } else { - QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() ); - } -#endif -} - -void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ ) -{ -#if 0 - const std::string& name(c.getName()); - Consumers::iterator consumer = consumers.find(name); - assert(consumer != consumers.end()); - size_t count = consumer->second; - - for (GroupMap::iterator gs = messageGroups.begin(); - count && gs != messageGroups.end(); ++gs) { - - GroupState& state( gs->second ); - if (state.owner == name) { - if (state.acquired == 0) { - --count; - disown(state); - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << name << " released group id=" << gs->first); - } - } - } - if (count == 0) { - consumers.erase( consumer ); - QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name ); - } else { - // don't release groups with outstanding acquired msgs - consumer may re-subscribe! - QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages."); - } -#endif -} - - bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { if (messages.empty()) @@ -215,11 +191,6 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued return false; // shouldn't happen - should find nextFree } } else { // no free groups available -#if 0 - if (consumers[c->getName()] == 0) { // and none currently owned - return false; // so nothing available to consume - } -#endif if (!messages.next( c->position, next )) return false; } @@ -430,10 +401,7 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); messageGroups[state.group] = state; - if (state.owned()) - //consumers[state.owner]++; - ; - else { + if (!state.owned()) { assert(state.members.size()); freeGroups[state.members.front()] = &messageGroups[state.group]; } |
