summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageGroupManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.cpp76
1 files changed, 22 insertions, 54 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp
index d4ca6af1d5..07b05f3b92 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -43,6 +43,27 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+void MessageGroupManager::unFree( const GroupState& state )
+{
+ GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ assert( pos != freeGroups.end() && pos->second == &state );
+ freeGroups.erase( pos );
+}
+
+void MessageGroupManager::own( GroupState& state, const std::string& owner )
+{
+ state.owner = owner;
+ unFree( state );
+}
+
+void MessageGroupManager::disown( GroupState& state )
+{
+ state.owner.clear();
+ assert(state.members.size());
+ assert(freeGroups.find(state.members.front()) == freeGroups.end());
+ freeGroups[state.members.front()] = &state;
+}
+
const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
{
const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
@@ -155,51 +176,6 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
": dequeued message from group id=" << group << " total=" << total );
}
-void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
-{
-#if 0
- // allow a re-subscribing consumer
- if (consumers.find(c.getName()) == consumers.end()) {
- consumers[c.getName()] = 0; // no groups owned yet
- QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() );
- } else {
- QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() );
- }
-#endif
-}
-
-void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
-{
-#if 0
- const std::string& name(c.getName());
- Consumers::iterator consumer = consumers.find(name);
- assert(consumer != consumers.end());
- size_t count = consumer->second;
-
- for (GroupMap::iterator gs = messageGroups.begin();
- count && gs != messageGroups.end(); ++gs) {
-
- GroupState& state( gs->second );
- if (state.owner == name) {
- if (state.acquired == 0) {
- --count;
- disown(state);
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << name << " released group id=" << gs->first);
- }
- }
- }
- if (count == 0) {
- consumers.erase( consumer );
- QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name );
- } else {
- // don't release groups with outstanding acquired msgs - consumer may re-subscribe!
- QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages.");
- }
-#endif
-}
-
-
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
if (messages.empty())
@@ -215,11 +191,6 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
return false; // shouldn't happen - should find nextFree
}
} else { // no free groups available
-#if 0
- if (consumers[c->getName()] == 0) { // and none currently owned
- return false; // so nothing available to consume
- }
-#endif
if (!messages.next( c->position, next ))
return false;
}
@@ -430,10 +401,7 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
messageGroups[state.group] = state;
- if (state.owned())
- //consumers[state.owner]++;
- ;
- else {
+ if (!state.owned()) {
assert(state.members.size());
freeGroups[state.members.front()] = &messageGroups[state.group];
}