summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageGroupManager.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.h')
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.h34
1 files changed, 8 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h
index 35bdda94d5..6c81ec14d1 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/cpp/src/qpid/broker/MessageGroupManager.h
@@ -44,22 +44,20 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
const std::string qName; // name of parent queue (for logs)
struct GroupState {
+ // note: update getState()/setState() when changing this object's state implementation
typedef std::deque<framing::SequenceNumber> PositionFifo;
std::string group; // group identifier
std::string owner; // consumer with outstanding acquired messages
uint32_t acquired; // count of outstanding acquired messages
- //uint32_t total; // count of enqueued messages in this group
PositionFifo members; // msgs belonging to this group
GroupState() : acquired(0) {}
bool owned() const {return !owner.empty();}
};
typedef std::map<std::string, struct GroupState> GroupMap;
- //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
- // note: update getState()/setState() when changing this object's state implementation
GroupMap messageGroups; // index: group name
GroupFifo freeGroups; // ordered by oldest free msg
//Consumers consumers; // index: consumer name
@@ -69,27 +67,9 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
static const std::string qpidMessageGroupTimestamp;
const std::string getGroupId( const QueuedMessage& qm ) const;
- void unFree( const GroupState& state )
- {
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
- assert( pos != freeGroups.end() && pos->second == &state );
- freeGroups.erase( pos );
- }
- void own( GroupState& state, const std::string& owner )
- {
- state.owner = owner;
- //consumers[state.owner]++;
- unFree( state );
- }
- void disown( GroupState& state )
- {
- //assert(consumers[state.owner]);
- //consumers[state.owner]--;
- state.owner.clear();
- assert(state.members.size());
- assert(freeGroups.find(state.members.front()) == freeGroups.end());
- freeGroups[state.members.front()] = &state;
- }
+ void unFree( const GroupState& state );
+ void own( GroupState& state, const std::string& owner );
+ void disown( GroupState& state );
public:
@@ -102,12 +82,14 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
Messages& container, unsigned int _timestamp=0 )
: StatefulQueueObserver(std::string("MessageGroupManager:") + header),
groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {}
+
+ // QueueObserver iface
void enqueued( const QueuedMessage& qm );
void acquired( const QueuedMessage& qm );
void requeued( const QueuedMessage& qm );
void dequeued( const QueuedMessage& qm );
- void consumerAdded( const Consumer& );
- void consumerRemoved( const Consumer& );
+ void consumerAdded( const Consumer& ) {};
+ void consumerRemoved( const Consumer& ) {};
void getState(qpid::framing::FieldTable& state ) const;
void setState(const qpid::framing::FieldTable&);