summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/MessageGroupManager.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageGroupManager.h')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h16
1 files changed, 13 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index f4bffc4760..340ebbc56a 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -45,19 +45,29 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
struct GroupState {
// note: update getState()/setState() when changing this object's state implementation
- typedef std::deque<framing::SequenceNumber> PositionFifo;
+
+ // track which messages are in this group, and if they have been acquired
+ struct MessageState {
+ qpid::framing::SequenceNumber position;
+ bool acquired;
+ MessageState() : acquired(false) {}
+ MessageState(const qpid::framing::SequenceNumber& p) : position(p), acquired(false) {}
+ bool operator<(const MessageState& b) const { return position < b.position; }
+ };
+ typedef std::deque<MessageState> MessageFifo;
std::string group; // group identifier
std::string owner; // consumer with outstanding acquired messages
uint32_t acquired; // count of outstanding acquired messages
- PositionFifo members; // msgs belonging to this group
+ MessageFifo members; // msgs belonging to this group, in enqueue order
GroupState() : acquired(0) {}
bool owned() const {return !owner.empty();}
+ MessageFifo::iterator findMsg(const qpid::framing::SequenceNumber &);
};
typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
- typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+ typedef std::map<qpid::framing::SequenceNumber, struct GroupState *> GroupFifo;
GroupMap messageGroups; // index: group name
GroupFifo freeGroups; // ordered by oldest free msg