diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-13 17:33:08 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-13 17:33:08 +0000 |
commit | d9ef71a5813ea9f54928392997e48e5205b4c4f6 (patch) | |
tree | c79149cc8c6f6608638ec353448efe0c953d7e3c | |
parent | 927b8dfc86563f97e2ea7498f4ff8439d1c3387a (diff) | |
download | qpid-python-d9ef71a5813ea9f54928392997e48e5205b4c4f6.tar.gz |
QPID-3346: code cleanup - move private code out of headers, delete dead code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1183001 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 76 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.h | 34 |
2 files changed, 30 insertions, 80 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index d4ca6af1d5..07b05f3b92 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -43,6 +43,27 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); +void MessageGroupManager::unFree( const GroupState& state ) +{ + GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + assert( pos != freeGroups.end() && pos->second == &state ); + freeGroups.erase( pos ); +} + +void MessageGroupManager::own( GroupState& state, const std::string& owner ) +{ + state.owner = owner; + unFree( state ); +} + +void MessageGroupManager::disown( GroupState& state ) +{ + state.owner.clear(); + assert(state.members.size()); + assert(freeGroups.find(state.members.front()) == freeGroups.end()); + freeGroups[state.members.front()] = &state; +} + const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const { const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); @@ -155,51 +176,6 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) ": dequeued message from group id=" << group << " total=" << total ); } -void MessageGroupManager::consumerAdded( const Consumer& /*c*/ ) -{ -#if 0 - // allow a re-subscribing consumer - if (consumers.find(c.getName()) == consumers.end()) { - consumers[c.getName()] = 0; // no groups owned yet - QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() ); - } else { - QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() ); - } -#endif -} - -void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ ) -{ -#if 0 - const std::string& name(c.getName()); - Consumers::iterator consumer = consumers.find(name); - assert(consumer != consumers.end()); - size_t count = consumer->second; - - for (GroupMap::iterator gs = messageGroups.begin(); - count && gs != messageGroups.end(); ++gs) { - - GroupState& state( gs->second ); - if (state.owner == name) { - if (state.acquired == 0) { - --count; - disown(state); - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << name << " released group id=" << gs->first); - } - } - } - if (count == 0) { - consumers.erase( consumer ); - QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name ); - } else { - // don't release groups with outstanding acquired msgs - consumer may re-subscribe! - QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages."); - } -#endif -} - - bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { if (messages.empty()) @@ -215,11 +191,6 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued return false; // shouldn't happen - should find nextFree } } else { // no free groups available -#if 0 - if (consumers[c->getName()] == 0) { // and none currently owned - return false; // so nothing available to consume - } -#endif if (!messages.next( c->position, next )) return false; } @@ -430,10 +401,7 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); messageGroups[state.group] = state; - if (state.owned()) - //consumers[state.owner]++; - ; - else { + if (!state.owned()) { assert(state.members.size()); freeGroups[state.members.front()] = &messageGroups[state.group]; } diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index 35bdda94d5..6c81ec14d1 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -44,22 +44,20 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu const std::string qName; // name of parent queue (for logs) struct GroupState { + // note: update getState()/setState() when changing this object's state implementation typedef std::deque<framing::SequenceNumber> PositionFifo; std::string group; // group identifier std::string owner; // consumer with outstanding acquired messages uint32_t acquired; // count of outstanding acquired messages - //uint32_t total; // count of enqueued messages in this group PositionFifo members; // msgs belonging to this group GroupState() : acquired(0) {} bool owned() const {return !owner.empty();} }; typedef std::map<std::string, struct GroupState> GroupMap; - //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; - // note: update getState()/setState() when changing this object's state implementation GroupMap messageGroups; // index: group name GroupFifo freeGroups; // ordered by oldest free msg //Consumers consumers; // index: consumer name @@ -69,27 +67,9 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu static const std::string qpidMessageGroupTimestamp; const std::string getGroupId( const QueuedMessage& qm ) const; - void unFree( const GroupState& state ) - { - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); - assert( pos != freeGroups.end() && pos->second == &state ); - freeGroups.erase( pos ); - } - void own( GroupState& state, const std::string& owner ) - { - state.owner = owner; - //consumers[state.owner]++; - unFree( state ); - } - void disown( GroupState& state ) - { - //assert(consumers[state.owner]); - //consumers[state.owner]--; - state.owner.clear(); - assert(state.members.size()); - assert(freeGroups.find(state.members.front()) == freeGroups.end()); - freeGroups[state.members.front()] = &state; - } + void unFree( const GroupState& state ); + void own( GroupState& state, const std::string& owner ); + void disown( GroupState& state ); public: @@ -102,12 +82,14 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu Messages& container, unsigned int _timestamp=0 ) : StatefulQueueObserver(std::string("MessageGroupManager:") + header), groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {} + + // QueueObserver iface void enqueued( const QueuedMessage& qm ); void acquired( const QueuedMessage& qm ); void requeued( const QueuedMessage& qm ); void dequeued( const QueuedMessage& qm ); - void consumerAdded( const Consumer& ); - void consumerRemoved( const Consumer& ); + void consumerAdded( const Consumer& ) {}; + void consumerRemoved( const Consumer& ) {}; void getState(qpid::framing::FieldTable& state ) const; void setState(const qpid::framing::FieldTable&); |