diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageGroupManager.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.h | 103 |
1 files changed, 81 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index f4bffc4760..9f50c8bf93 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -43,40 +43,99 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu Messages& messages; // parent Queue's in memory message container const std::string qName; // name of parent queue (for logs) - struct GroupState { + class GroupState; + + /** track consumers subscribed to this queue */ + class ConsumerState { + // note: update getState()/setState() when changing this object's state implementation + bool zombie; // cancelled, but still holding messages. + std::string name; + uint32_t ownedGroups; + uint32_t pendingMsgs; + + public: + ConsumerState() : zombie(false), ownedGroups(0), pendingMsgs(0) {} + const std::string& getName() const {return name;} + void setName(const std::string& n) {name = n;} + uint32_t groupCount() const {return ownedGroups;} + uint32_t remainingMsgs() const {return pendingMsgs;} + + void addGroup( const GroupState& g); + void removeGroup( const GroupState& g ); + bool cancelled() const {return zombie;} + void cancel() {zombie = true;} + void uncancel() {zombie = false;} // resubscribed on existing session + void msgAvailable(const GroupState& g, const QueuedMessage& qm); + void msgAcquired(const GroupState& g, const QueuedMessage& qm); + }; + typedef sys::unordered_map<std::string, ConsumerState> ConsumerMap; + ConsumerMap consumers; // index: consumer name + + /** track all known groups */ + class GroupState { // note: update getState()/setState() when changing this object's state implementation typedef std::deque<framing::SequenceNumber> PositionFifo; - std::string group; // group identifier - std::string owner; // consumer with outstanding acquired messages + std::string name; // group identifier uint32_t acquired; // count of outstanding acquired messages + ConsumerState *owner; // consumer with outstanding acquired messages PositionFifo members; // msgs belonging to this group - GroupState() : acquired(0) {} - bool owned() const {return !owner.empty();} + public: + GroupState() : acquired(0), owner(0) {} + const std::string& getName() const {return name;} + void setName(const std::string& n) {name = n;} + uint32_t acquiredMsgs() const {return acquired;} + uint32_t totalMsgs() const {return members.size();} + bool isFree() const {return owner == 0;} + + void setOwner( ConsumerState& consumer ); + ConsumerState *getOwner() const { return owner; } + void resetOwner(); + const framing::SequenceNumber& nextMsg() const; + void enqueueMsg(const QueuedMessage& msg); + void acquireMsg(const QueuedMessage& msg); + void requeueMsg(const QueuedMessage& msg); + void dequeueMsg(const QueuedMessage& msg); + // for clustering: + void getPositions(framing::Array& pos) const; + void setPositions(const framing::Array& pos); + void setAcquired(uint32_t c) {acquired = c;} + }; + typedef sys::unordered_map<std::string, GroupState> GroupMap; + GroupMap messageGroups; // index: group name + // cache the last lookup + uint hits; + uint misses; + uint32_t lastMsg; + std::string lastGroup; + GroupState *cachedGroup; - typedef sys::unordered_map<std::string, struct GroupState> GroupMap; - typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; + /** store free (un-owned) groups by the position of the oldest index */ + class GroupFifo { + // orders groups by their next available message (oldest first) + typedef std::map<framing::SequenceNumber, const GroupState *> Fifo; + Fifo fifo; + + public: + GroupFifo() {} + void addGroup(const GroupState& group); + void removeGroup(const GroupState& group); + size_t groupCount() const {return fifo.size();} + const GroupState& nextGroup() const; + void clear() {fifo.clear();} + }; + GroupFifo freeGroups; - GroupMap messageGroups; // index: group name - GroupFifo freeGroups; // ordered by oldest free msg - //Consumers consumers; // index: consumer name + GroupState& findGroup( const QueuedMessage& qm ); + void deleteGroup(GroupState& group); + void disownGroup(GroupState& group); /** release a group from a subscriber */ static const std::string qpidMessageGroupKey; static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers static const std::string qpidMessageGroupTimestamp; - GroupState& findGroup( const QueuedMessage& qm ); - unsigned long hits, misses; // for debug - uint32_t lastMsg; - std::string lastGroup; - GroupState *cachedGroup; - - void unFree( const GroupState& state ); - void own( GroupState& state, const std::string& owner ); - void disown( GroupState& state ); - public: static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId); @@ -97,8 +156,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu void acquired( const QueuedMessage& qm ); void requeued( const QueuedMessage& qm ); void dequeued( const QueuedMessage& qm ); - void consumerAdded( const Consumer& ) {}; - void consumerRemoved( const Consumer& ) {}; + void consumerAdded( const Consumer& ); + void consumerRemoved( const Consumer& ); void getState(qpid::framing::FieldTable& state ) const; void setState(const qpid::framing::FieldTable&); |