diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageGroupManager.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.h | 16 |
1 files changed, 13 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index f4bffc4760..340ebbc56a 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -45,19 +45,29 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu struct GroupState { // note: update getState()/setState() when changing this object's state implementation - typedef std::deque<framing::SequenceNumber> PositionFifo; + + // track which messages are in this group, and if they have been acquired + struct MessageState { + qpid::framing::SequenceNumber position; + bool acquired; + MessageState() : acquired(false) {} + MessageState(const qpid::framing::SequenceNumber& p) : position(p), acquired(false) {} + bool operator<(const MessageState& b) const { return position < b.position; } + }; + typedef std::deque<MessageState> MessageFifo; std::string group; // group identifier std::string owner; // consumer with outstanding acquired messages uint32_t acquired; // count of outstanding acquired messages - PositionFifo members; // msgs belonging to this group + MessageFifo members; // msgs belonging to this group, in enqueue order GroupState() : acquired(0) {} bool owned() const {return !owner.empty();} + MessageFifo::iterator findMsg(const qpid::framing::SequenceNumber &); }; typedef sys::unordered_map<std::string, struct GroupState> GroupMap; - typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; + typedef std::map<qpid::framing::SequenceNumber, struct GroupState *> GroupFifo; GroupMap messageGroups; // index: group name GroupFifo freeGroups; // ordered by oldest free msg |