diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.h')
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.h | 34 |
1 files changed, 8 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h index 35bdda94d5..6c81ec14d1 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.h +++ b/cpp/src/qpid/broker/MessageGroupManager.h @@ -44,22 +44,20 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu const std::string qName; // name of parent queue (for logs) struct GroupState { + // note: update getState()/setState() when changing this object's state implementation typedef std::deque<framing::SequenceNumber> PositionFifo; std::string group; // group identifier std::string owner; // consumer with outstanding acquired messages uint32_t acquired; // count of outstanding acquired messages - //uint32_t total; // count of enqueued messages in this group PositionFifo members; // msgs belonging to this group GroupState() : acquired(0) {} bool owned() const {return !owner.empty();} }; typedef std::map<std::string, struct GroupState> GroupMap; - //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; - // note: update getState()/setState() when changing this object's state implementation GroupMap messageGroups; // index: group name GroupFifo freeGroups; // ordered by oldest free msg //Consumers consumers; // index: consumer name @@ -69,27 +67,9 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu static const std::string qpidMessageGroupTimestamp; const std::string getGroupId( const QueuedMessage& qm ) const; - void unFree( const GroupState& state ) - { - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); - assert( pos != freeGroups.end() && pos->second == &state ); - freeGroups.erase( pos ); - } - void own( GroupState& state, const std::string& owner ) - { - state.owner = owner; - //consumers[state.owner]++; - unFree( state ); - } - void disown( GroupState& state ) - { - //assert(consumers[state.owner]); - //consumers[state.owner]--; - state.owner.clear(); - assert(state.members.size()); - assert(freeGroups.find(state.members.front()) == freeGroups.end()); - freeGroups[state.members.front()] = &state; - } + void unFree( const GroupState& state ); + void own( GroupState& state, const std::string& owner ); + void disown( GroupState& state ); public: @@ -102,12 +82,14 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu Messages& container, unsigned int _timestamp=0 ) : StatefulQueueObserver(std::string("MessageGroupManager:") + header), groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {} + + // QueueObserver iface void enqueued( const QueuedMessage& qm ); void acquired( const QueuedMessage& qm ); void requeued( const QueuedMessage& qm ); void dequeued( const QueuedMessage& qm ); - void consumerAdded( const Consumer& ); - void consumerRemoved( const Consumer& ); + void consumerAdded( const Consumer& ) {}; + void consumerRemoved( const Consumer& ) {}; void getState(qpid::framing::FieldTable& state ) const; void setState(const qpid::framing::FieldTable&); |