summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/MessageGroupManager.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageGroupManager.h')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h103
1 files changed, 81 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index f4bffc4760..9f50c8bf93 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -43,40 +43,99 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
Messages& messages; // parent Queue's in memory message container
const std::string qName; // name of parent queue (for logs)
- struct GroupState {
+ class GroupState;
+
+ /** track consumers subscribed to this queue */
+ class ConsumerState {
+ // note: update getState()/setState() when changing this object's state implementation
+ bool zombie; // cancelled, but still holding messages.
+ std::string name;
+ uint32_t ownedGroups;
+ uint32_t pendingMsgs;
+
+ public:
+ ConsumerState() : zombie(false), ownedGroups(0), pendingMsgs(0) {}
+ const std::string& getName() const {return name;}
+ void setName(const std::string& n) {name = n;}
+ uint32_t groupCount() const {return ownedGroups;}
+ uint32_t remainingMsgs() const {return pendingMsgs;}
+
+ void addGroup( const GroupState& g);
+ void removeGroup( const GroupState& g );
+ bool cancelled() const {return zombie;}
+ void cancel() {zombie = true;}
+ void uncancel() {zombie = false;} // resubscribed on existing session
+ void msgAvailable(const GroupState& g, const QueuedMessage& qm);
+ void msgAcquired(const GroupState& g, const QueuedMessage& qm);
+ };
+ typedef sys::unordered_map<std::string, ConsumerState> ConsumerMap;
+ ConsumerMap consumers; // index: consumer name
+
+ /** track all known groups */
+ class GroupState {
// note: update getState()/setState() when changing this object's state implementation
typedef std::deque<framing::SequenceNumber> PositionFifo;
- std::string group; // group identifier
- std::string owner; // consumer with outstanding acquired messages
+ std::string name; // group identifier
uint32_t acquired; // count of outstanding acquired messages
+ ConsumerState *owner; // consumer with outstanding acquired messages
PositionFifo members; // msgs belonging to this group
- GroupState() : acquired(0) {}
- bool owned() const {return !owner.empty();}
+ public:
+ GroupState() : acquired(0), owner(0) {}
+ const std::string& getName() const {return name;}
+ void setName(const std::string& n) {name = n;}
+ uint32_t acquiredMsgs() const {return acquired;}
+ uint32_t totalMsgs() const {return members.size();}
+ bool isFree() const {return owner == 0;}
+
+ void setOwner( ConsumerState& consumer );
+ ConsumerState *getOwner() const { return owner; }
+ void resetOwner();
+ const framing::SequenceNumber& nextMsg() const;
+ void enqueueMsg(const QueuedMessage& msg);
+ void acquireMsg(const QueuedMessage& msg);
+ void requeueMsg(const QueuedMessage& msg);
+ void dequeueMsg(const QueuedMessage& msg);
+ // for clustering:
+ void getPositions(framing::Array& pos) const;
+ void setPositions(const framing::Array& pos);
+ void setAcquired(uint32_t c) {acquired = c;}
+
};
+ typedef sys::unordered_map<std::string, GroupState> GroupMap;
+ GroupMap messageGroups; // index: group name
+ // cache the last lookup
+ uint hits;
+ uint misses;
+ uint32_t lastMsg;
+ std::string lastGroup;
+ GroupState *cachedGroup;
- typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
- typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+ /** store free (un-owned) groups by the position of the oldest index */
+ class GroupFifo {
+ // orders groups by their next available message (oldest first)
+ typedef std::map<framing::SequenceNumber, const GroupState *> Fifo;
+ Fifo fifo;
+
+ public:
+ GroupFifo() {}
+ void addGroup(const GroupState& group);
+ void removeGroup(const GroupState& group);
+ size_t groupCount() const {return fifo.size();}
+ const GroupState& nextGroup() const;
+ void clear() {fifo.clear();}
+ };
+ GroupFifo freeGroups;
- GroupMap messageGroups; // index: group name
- GroupFifo freeGroups; // ordered by oldest free msg
- //Consumers consumers; // index: consumer name
+ GroupState& findGroup( const QueuedMessage& qm );
+ void deleteGroup(GroupState& group);
+ void disownGroup(GroupState& group); /** release a group from a subscriber */
static const std::string qpidMessageGroupKey;
static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
static const std::string qpidMessageGroupTimestamp;
- GroupState& findGroup( const QueuedMessage& qm );
- unsigned long hits, misses; // for debug
- uint32_t lastMsg;
- std::string lastGroup;
- GroupState *cachedGroup;
-
- void unFree( const GroupState& state );
- void own( GroupState& state, const std::string& owner );
- void disown( GroupState& state );
-
public:
static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
@@ -97,8 +156,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
void acquired( const QueuedMessage& qm );
void requeued( const QueuedMessage& qm );
void dequeued( const QueuedMessage& qm );
- void consumerAdded( const Consumer& ) {};
- void consumerRemoved( const Consumer& ) {};
+ void consumerAdded( const Consumer& );
+ void consumerRemoved( const Consumer& );
void getState(qpid::framing::FieldTable& state ) const;
void setState(const qpid::framing::FieldTable&);