summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-11-07 03:29:04 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-11-07 03:29:04 +0000
commit8499ba4b12b41f9375442f770e44eb58337530fb (patch)
treed86d3598c4c584f76d8eee1ed3e6632625bd4491
parent193c30f81aa5ef21798522744a5ecb6911ef6e5b (diff)
downloadqpid-python-qpid-3346.tar.gz
QPID-3346: checkpoint client tracking codeqpid-3346
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1198612 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp456
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h103
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp1
3 files changed, 424 insertions, 136 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 00d1a58bef..34b11b816e 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -43,28 +43,174 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
-void MessageGroupManager::unFree( const GroupState& state )
+/** class ConsumerState **/
+
+/** the consumer owns the given group */
+void MessageGroupManager::ConsumerState::addGroup(const GroupState& group)
+{
+ ownedGroups += 1;
+ pendingMsgs += (group.totalMsgs() - group.acquiredMsgs());
+}
+
+/** the consumer releases the given group */
+void MessageGroupManager::ConsumerState::removeGroup(const GroupState& group)
+{
+ assert(ownedGroups != 0);
+ ownedGroups -= 1;
+ uint32_t del = (group.totalMsgs() - group.acquiredMsgs());
+ assert(del <= pendingMsgs);
+ pendingMsgs -= del;
+}
+
+/** notify the consumer that a new message has arrived at one if its owned groups */
+void MessageGroupManager::ConsumerState::msgAvailable(const GroupState&,
+ const QueuedMessage& )
+{
+ assert(ownedGroups != 0);
+ pendingMsgs += 1;
+}
+
+/** notify the consumer that an available message has been acquired */
+void MessageGroupManager::ConsumerState::msgAcquired(const GroupState&,
+ const QueuedMessage& )
+{
+ assert(pendingMsgs != 0);
+ pendingMsgs -= 1;
+}
+
+
+void MessageGroupManager::consumerAdded( const Consumer& c)
+{
+ const std::string& name = c.getName();
+ ConsumerState& state = consumers[name];
+ state.setName(name);
+ state.uncancel(); // just in case old consumer resubcribed
+ QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " added.");
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& c)
+{
+ const std::string& name = c.getName();
+ ConsumerMap::iterator cs = consumers.find( name );
+ assert(cs != consumers.end());
+ ConsumerState& state = cs->second;
+ state.cancel();
+ if (state.groupCount() == 0) {
+ assert(state.remainingMsgs() == 0);
+ consumers.erase(cs);
+ QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " removed.");
+ }
+}
+
+
+/** GroupFifo */
+void MessageGroupManager::GroupFifo::addGroup(const GroupState& group)
{
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
- assert( pos != freeGroups.end() && pos->second == &state );
- freeGroups.erase( pos );
+ assert(group.totalMsgs() != 0);
+ const framing::SequenceNumber& next = group.nextMsg();
+ assert(fifo.find(next) == fifo.end());
+ fifo[next] = &group;
+}
+
+void MessageGroupManager::GroupFifo::removeGroup(const GroupState& group)
+{
+ Fifo::iterator pos = fifo.find( group.nextMsg() );
+ assert( pos != fifo.end() && pos->second == &group );
+ fifo.erase( pos );
+}
+
+const MessageGroupManager::GroupState& MessageGroupManager::GroupFifo::nextGroup() const
+{
+ return *(fifo.begin()->second);
+}
+
+
+/** GroupState */
+void MessageGroupManager::GroupState::setOwner( ConsumerState& consumer )
+{
+ assert(owner == 0);
+ owner = &consumer;
+ owner->addGroup( *this );
+}
+
+void MessageGroupManager::GroupState::resetOwner()
+{
+ assert(owner);
+ owner->removeGroup( *this );
+ owner = 0;
+}
+
+const qpid::framing::SequenceNumber& MessageGroupManager::GroupState::nextMsg() const
+{
+ assert(members.size() != 0);
+ return members.front();
+}
+
+
+void MessageGroupManager::GroupState::enqueueMsg(const QueuedMessage& msg)
+{
+ members.push_back(msg.position);
+ if (owner) {
+ owner->msgAvailable(*this, msg);
+ }
+}
+
+
+void MessageGroupManager::GroupState::acquireMsg(const QueuedMessage& msg)
+{
+ assert(members.size()); // there are msgs present
+ acquired += 1;
+ if (owner) {
+ owner->msgAcquired(*this, msg);
+ }
+}
+
+void MessageGroupManager::GroupState::requeueMsg(const QueuedMessage& msg)
+{
+ assert(acquired != 0);
+ acquired -= 1;
+ if (owner) {
+ owner->msgAvailable(*this, msg);
+ }
+}
+
+
+void MessageGroupManager::GroupState::dequeueMsg(const QueuedMessage& msg)
+{
+ assert( members.size() != 0 );
+ assert( acquired != 0 );
+ acquired -= 1;
+
+ // likely to be at or near begin() if dequeued in order
+ if (members.front() == msg.position) {
+ members.pop_front();
+ } else {
+ unsigned long diff = msg.position.getValue() - members.front().getValue();
+ long maxEnd = diff < members.size() ? (diff + 1) : members.size();
+ GroupState::PositionFifo::iterator i =
+ std::lower_bound(members.begin(), members.begin()+maxEnd, msg.position);
+ assert(i != members.end() && *i == msg.position);
+ members.erase(i);
+ }
}
-void MessageGroupManager::own( GroupState& state, const std::string& owner )
+void MessageGroupManager::GroupState::getPositions(framing::Array& positions) const
{
- state.owner = owner;
- unFree( state );
+ for (PositionFifo::const_iterator p = members.begin();
+ p != members.end(); ++p)
+ positions.push_back(framing::Array::ValuePtr(new framing::IntegerValue( *p )));
}
-void MessageGroupManager::disown( GroupState& state )
+void MessageGroupManager::GroupState::setPositions(const framing::Array& positions)
{
- state.owner.clear();
- assert(state.members.size());
- assert(freeGroups.find(state.members.front()) == freeGroups.end());
- freeGroups[state.members.front()] = &state;
+ members.clear();
+ for (framing::Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
+ members.push_back((*p)->getIntegerValue<uint32_t, 4>());
}
-MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
+
+
+MessageGroupManager::GroupState& MessageGroupManager::findGroup(const QueuedMessage& qm)
{
uint32_t thisMsg = qm.position.getValue();
if (cachedGroup && lastMsg == thisMsg) {
@@ -91,42 +237,45 @@ MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMes
misses++;
- GroupState& found = messageGroups[group];
- if (found.group.empty())
- found.group = group; // new group, assign name
+ cachedGroup = &messageGroups[group];
+ if (cachedGroup->getName().empty())
+ cachedGroup->setName(group); // new group, assign name
lastMsg = thisMsg;
lastGroup = group;
- cachedGroup = &found;
- return found;
+ return *cachedGroup;
}
+void MessageGroupManager::deleteGroup(GroupState& group)
+{
+ if (cachedGroup == &group)
+ cachedGroup = 0;
+ std::string name = group.getName();
+ messageGroups.erase(name);
+}
+
void MessageGroupManager::enqueued( const QueuedMessage& qm )
{
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- state.members.push_back(qm.position);
- uint32_t total = state.members.size();
+ state.enqueueMsg(qm);
+ uint32_t total = state.totalMsgs();
QPID_LOG( trace, "group queue " << qName <<
- ": added message to group id=" << state.group << " total=" << total );
+ ": added message to group id=" << state.getName() << " total=" << total );
if (total == 1) {
// newly created group, no owner
- assert(freeGroups.find(qm.position) == freeGroups.end());
- freeGroups[qm.position] = &state;
+ freeGroups.addGroup(state);
}
}
void MessageGroupManager::acquired( const QueuedMessage& qm )
{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- // issue: const-ness??
GroupState& state = findGroup(qm);
- assert(state.members.size()); // there are msgs present
- state.acquired += 1;
+ state.acquireMsg(qm);
QPID_LOG( trace, "group queue " << qName <<
- ": acquired message in group id=" << state.group << " acquired=" << state.acquired );
+ ": acquired message in group id=" << state.getName() << " acquired=" << state.acquiredMsgs());
}
@@ -135,69 +284,77 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- assert( state.acquired != 0 );
- state.acquired -= 1;
- if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id=" << state.group);
- disown(state);
+ state.requeueMsg(qm);
+ if (state.acquiredMsgs() == 0 && state.getOwner()) {
+ disownGroup(state);
+ freeGroups.addGroup(state);
}
QPID_LOG( trace, "group queue " << qName <<
- ": requeued message to group id=" << state.group << " acquired=" << state.acquired );
+ ": requeued message to group id=" << state.getName() << " acquired=" << state.acquiredMsgs());
}
void MessageGroupManager::dequeued( const QueuedMessage& qm )
{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- // issue: const-ness??
- GroupState& state = findGroup(qm);
- assert( state.members.size() != 0 );
- assert( state.acquired != 0 );
- state.acquired -= 1;
-
- // likely to be at or near begin() if dequeued in order
- bool reFreeNeeded = false;
- if (state.members.front() == qm.position) {
- if (!state.owned()) {
- // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
- // if on freelist, it is indexed by first member, which is about to be removed!
- unFree(state);
- reFreeNeeded = true;
- }
- state.members.pop_front();
- } else {
- GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
- GroupState::PositionFifo::iterator end = state.members.end();
- while (pos != end) {
- if (*pos == qm.position) {
- state.members.erase(pos);
- break;
- }
- ++pos;
+ GroupState& group = findGroup(qm);
+ bool freeNeeded = false;
+ if (group.isFree()) { // dequeue is occuring via mgmt, not subscriber!
+ const framing::SequenceNumber next = group.nextMsg();
+ if (next == qm.position) {
+ /* we are about to remove the head message of this group. This message is
+ * used to index the freeGroups fifo, so we must temporarily remove it from
+ * the fifo until we are done updating the head message.
+ */
+ freeGroups.removeGroup(group);
+ freeNeeded = true;
}
}
+ group.dequeueMsg(qm);
+
+ uint32_t total = group.totalMsgs();
+ QPID_LOG( trace, "group queue " << qName <<
+ ": dequeued message from group id=" << group.getName() << " total=" << total );
+
+ // if no more outstanding acquired messages, free the group from the consumer
+ if (group.acquiredMsgs() == 0 && group.getOwner()) {
+ // group is now available again
+ disownGroup(group);
+ freeNeeded = true;
+ }
- uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
- ": dequeued message from group id=" << state.group << " total=" << total );
+ ": dequeued message from group id=" << group.getName() << " total=" << total );
if (total == 0) {
- QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << state.group);
- if (cachedGroup == &state) {
- cachedGroup = 0;
- }
- std::string key(state.group);
- messageGroups.erase( key );
- } else if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id=" << state.group);
- disown(state);
- } else if (reFreeNeeded) {
- disown(state);
+ QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << group.getName());
+ deleteGroup(group);
+ } else if (freeNeeded) {
+ freeGroups.addGroup(group);
}
}
+/** remove the owner of the group */
+void MessageGroupManager::disownGroup(GroupState& group)
+{
+ ConsumerState& owner = *group.getOwner();
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << owner.getName() << " released group id=" << group.getName());
+ group.resetOwner();
+ if (owner.cancelled() && owner.groupCount() == 0) {
+ // this owner has unsubscribed, we can release it now.
+ std::string name = owner.getName();
+ consumers.erase(name);
+ QPID_LOG( error, "group queue " << qName << ": consumer " << name << " removed.");
+ }
+}
+
+namespace {
+ unsigned long found = 0;
+ unsigned long failed = 0;
+ unsigned long missCount = 0;
+ unsigned long earlyRet = 0;
+}
+
MessageGroupManager::~MessageGroupManager()
{
QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
@@ -207,27 +364,38 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
if (messages.empty())
return false;
+ ConsumerState& cState = consumers.find(c->getName())->second;
+
next.position = c->position;
- if (!freeGroups.empty()) {
- const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
- if (nextFree < next.position) { // a free message is older than current
+ if (freeGroups.groupCount() != 0) {
+ const framing::SequenceNumber& nextFree = freeGroups.nextGroup().nextMsg();
+ if (nextFree <= next.position) { // a free message is older than current
next.position = nextFree;
--next.position;
}
+ } else if (cState.remainingMsgs() == 0) { // no more msgs from owned groups
+ earlyRet += 1;
+ return false;
}
+ int count = 1;
while (messages.next( next.position, next )) {
GroupState& group = findGroup(next);
- if (!group.owned()) {
- if (group.members.front() == next.position) { // only take from head!
+ if (group.getOwner() == &cState) {
+ found += 1;
+ return true;
+ } else if (group.isFree()) {
+ if (group.nextMsg() == next.position) { // only take from head!
+ found += 1;
return true;
}
- QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
- << "'s head message still pending. pos=" << group.members.front());
- } else if (group.owner == c->getName()) {
- return true;
+ QPID_LOG(debug, "Skipping " << next.position << " since group " << group.getName()
+ << "'s head message still pending. pos=" << group.nextMsg());
}
+ count += 1;
}
+ failed += 1;
+ missCount += 1;
return false;
}
@@ -237,13 +405,17 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
GroupState& state = findGroup(qm);
- if (!state.owned()) {
- own( state, consumer );
+ if (state.isFree()) {
+ freeGroups.removeGroup(state);
+ ConsumerMap::iterator cs = consumers.find( consumer );
+ assert(cs != consumers.end());
+ ConsumerState& owner = cs->second;
+ state.setOwner( owner );
QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << consumer << " has acquired group id=" << state.group);
+ ": consumer name=" << consumer << " has acquired group id=" << state.getName());
return true;
}
- return state.owner == consumer;
+ return state.getOwner()->getName() == consumer;
}
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
@@ -280,9 +452,13 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const
g != messageGroups.end(); ++g) {
qpid::types::Variant::Map info;
info[GROUP_ID_KEY] = g->first;
- info[GROUP_MSG_COUNT] = g->second.members.size();
+ info[GROUP_MSG_COUNT] = g->second.totalMsgs();
info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
- info[GROUP_CONSUMER] = g->second.owner;
+ if (g->second.getOwner()) {
+ info[GROUP_CONSUMER] = g->second.getOwner()->getName();
+ } else {
+ info[GROUP_CONSUMER] = std::string("");
+ }
groups.push_back(info);
}
state[GROUP_STATE_KEY] = groups;
@@ -346,6 +522,11 @@ namespace {
const std::string GROUP_ACQUIRED_CT("acquired-ct");
const std::string GROUP_POSITIONS("positions");
const std::string GROUP_STATE("group-state");
+ const std::string OWNER_STATE("owner-state");
+ const std::string CANCELLED("cancelled");
+ const std::string YES("yes");
+ const std::string NO("no");
+ const std::string OWNER_NAME("name");
}
@@ -360,17 +541,29 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
framing::FieldTable group;
group.setString(GROUP_NAME, g->first);
- group.setString(GROUP_OWNER, g->second.owner);
- group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
+ if (g->second.getOwner()) {
+ group.setString(GROUP_OWNER, g->second.getOwner()->getName());
+ } else {
+ group.setString(GROUP_OWNER, std::string(""));
+ }
+ group.setInt(GROUP_ACQUIRED_CT, g->second.acquiredMsgs());
framing::Array positions(TYPE_CODE_UINT32);
- for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
- p != g->second.members.end(); ++p)
- positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+ g->second.getPositions(positions);
group.setArray(GROUP_POSITIONS, positions);
groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
}
state.setArray(GROUP_STATE, groupState);
+ framing::Array ownerState(TYPE_CODE_MAP);
+ for (ConsumerMap::const_iterator c = consumers.begin();
+ c != consumers.end(); ++c) {
+ framing::FieldTable owner;
+ owner.setString(OWNER_NAME, c->first);
+ owner.setString(CANCELLED, c->second.cancelled() ? YES : NO);
+ ownerState.push_back(framing::Array::ValuePtr(new FieldTableValue(owner)));
+ }
+ state.setArray(OWNER_STATE, ownerState);
+
QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader);
}
@@ -379,51 +572,86 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
{
using namespace qpid::framing;
+ consumers.clear();
messageGroups.clear();
freeGroups.clear();
cachedGroup = 0;
- framing::Array groupState(TYPE_CODE_MAP);
+ // set up the known owners
+ framing::Array ownerState(TYPE_CODE_MAP);
+ bool ok = state.getArray(OWNER_STATE, ownerState);
+ if (!ok) {
+ QPID_LOG(error, "Unable to find message group owner state information for queue \"" <<
+ qName << "\": cluster inconsistency error!");
+ return;
+ }
- bool ok = state.getArray(GROUP_STATE, groupState);
+ for (framing::Array::const_iterator c = ownerState.begin(); c != ownerState.end(); ++c) {
+ framing::FieldTable ownerMap;
+ ok = framing::getEncodedValue<FieldTable>(*c, ownerMap);
+ if (!ok) {
+ QPID_LOG(error, "Invalid message group owner information for queue \"" <<
+ qName << "\": table encoding error!");
+ return;
+ }
+ if (!ownerMap.isSet(OWNER_NAME) || !ownerMap.isSet(CANCELLED)) {
+ QPID_LOG(error, "Invalid message group owner information for queue \"" <<
+ qName << "\": fields missing error!");
+ return;
+ }
+
+ const std::string name = ownerMap.getAsString(OWNER_NAME);
+ ConsumerState& owner = consumers[name];
+ owner.setName(name);
+ if (ownerMap.getAsString(CANCELLED) == YES) {
+ owner.cancel();
+ }
+ }
+
+ // set up the known groups
+ framing::Array groupState(TYPE_CODE_MAP);
+ ok = state.getArray(GROUP_STATE, groupState);
if (!ok) {
QPID_LOG(error, "Unable to find message group state information for queue \"" <<
qName << "\": cluster inconsistency error!");
return;
}
- for (framing::Array::const_iterator g = groupState.begin();
- g != groupState.end(); ++g) {
- framing::FieldTable group;
- ok = framing::getEncodedValue<FieldTable>(*g, group);
+ for (framing::Array::const_iterator g = groupState.begin(); g != groupState.end(); ++g) {
+ framing::FieldTable groupMap;
+ ok = framing::getEncodedValue<FieldTable>(*g, groupMap);
if (!ok) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": table encoding error!");
return;
}
- MessageGroupManager::GroupState state;
- if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) {
+ if (!groupMap.isSet(GROUP_NAME) || !groupMap.isSet(GROUP_OWNER) || !groupMap.isSet(GROUP_ACQUIRED_CT)) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": fields missing error!");
return;
}
- state.group = group.getAsString(GROUP_NAME);
- state.owner = group.getAsString(GROUP_OWNER);
- state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
+
+ // replicate the group state
+ std::string name = groupMap.getAsString(GROUP_NAME);
+ MessageGroupManager::GroupState& group = messageGroups[name];
+ assert(group.getName().empty());
+ group.setName(name);
+ group.setAcquired(groupMap.getAsInt(GROUP_ACQUIRED_CT));
framing::Array positions(TYPE_CODE_UINT32);
- ok = group.getArray(GROUP_POSITIONS, positions);
+ ok = groupMap.getArray(GROUP_POSITIONS, positions);
if (!ok) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": position encoding error!");
return;
}
-
- for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
- state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
- messageGroups[state.group] = state;
- if (!state.owned()) {
- assert(state.members.size());
- freeGroups[state.members.front()] = &messageGroups[state.group];
+ group.setPositions(positions);
+
+ const std::string ownerName = groupMap.getAsString(GROUP_OWNER);
+ if (!ownerName.empty()) {
+ ConsumerState& owner = consumers[ownerName];
+ group.setOwner(owner);
+ } else {
+ freeGroups.addGroup(group);
}
}
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index f4bffc4760..9f50c8bf93 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -43,40 +43,99 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
Messages& messages; // parent Queue's in memory message container
const std::string qName; // name of parent queue (for logs)
- struct GroupState {
+ class GroupState;
+
+ /** track consumers subscribed to this queue */
+ class ConsumerState {
+ // note: update getState()/setState() when changing this object's state implementation
+ bool zombie; // cancelled, but still holding messages.
+ std::string name;
+ uint32_t ownedGroups;
+ uint32_t pendingMsgs;
+
+ public:
+ ConsumerState() : zombie(false), ownedGroups(0), pendingMsgs(0) {}
+ const std::string& getName() const {return name;}
+ void setName(const std::string& n) {name = n;}
+ uint32_t groupCount() const {return ownedGroups;}
+ uint32_t remainingMsgs() const {return pendingMsgs;}
+
+ void addGroup( const GroupState& g);
+ void removeGroup( const GroupState& g );
+ bool cancelled() const {return zombie;}
+ void cancel() {zombie = true;}
+ void uncancel() {zombie = false;} // resubscribed on existing session
+ void msgAvailable(const GroupState& g, const QueuedMessage& qm);
+ void msgAcquired(const GroupState& g, const QueuedMessage& qm);
+ };
+ typedef sys::unordered_map<std::string, ConsumerState> ConsumerMap;
+ ConsumerMap consumers; // index: consumer name
+
+ /** track all known groups */
+ class GroupState {
// note: update getState()/setState() when changing this object's state implementation
typedef std::deque<framing::SequenceNumber> PositionFifo;
- std::string group; // group identifier
- std::string owner; // consumer with outstanding acquired messages
+ std::string name; // group identifier
uint32_t acquired; // count of outstanding acquired messages
+ ConsumerState *owner; // consumer with outstanding acquired messages
PositionFifo members; // msgs belonging to this group
- GroupState() : acquired(0) {}
- bool owned() const {return !owner.empty();}
+ public:
+ GroupState() : acquired(0), owner(0) {}
+ const std::string& getName() const {return name;}
+ void setName(const std::string& n) {name = n;}
+ uint32_t acquiredMsgs() const {return acquired;}
+ uint32_t totalMsgs() const {return members.size();}
+ bool isFree() const {return owner == 0;}
+
+ void setOwner( ConsumerState& consumer );
+ ConsumerState *getOwner() const { return owner; }
+ void resetOwner();
+ const framing::SequenceNumber& nextMsg() const;
+ void enqueueMsg(const QueuedMessage& msg);
+ void acquireMsg(const QueuedMessage& msg);
+ void requeueMsg(const QueuedMessage& msg);
+ void dequeueMsg(const QueuedMessage& msg);
+ // for clustering:
+ void getPositions(framing::Array& pos) const;
+ void setPositions(const framing::Array& pos);
+ void setAcquired(uint32_t c) {acquired = c;}
+
};
+ typedef sys::unordered_map<std::string, GroupState> GroupMap;
+ GroupMap messageGroups; // index: group name
+ // cache the last lookup
+ uint hits;
+ uint misses;
+ uint32_t lastMsg;
+ std::string lastGroup;
+ GroupState *cachedGroup;
- typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
- typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+ /** store free (un-owned) groups by the position of the oldest index */
+ class GroupFifo {
+ // orders groups by their next available message (oldest first)
+ typedef std::map<framing::SequenceNumber, const GroupState *> Fifo;
+ Fifo fifo;
+
+ public:
+ GroupFifo() {}
+ void addGroup(const GroupState& group);
+ void removeGroup(const GroupState& group);
+ size_t groupCount() const {return fifo.size();}
+ const GroupState& nextGroup() const;
+ void clear() {fifo.clear();}
+ };
+ GroupFifo freeGroups;
- GroupMap messageGroups; // index: group name
- GroupFifo freeGroups; // ordered by oldest free msg
- //Consumers consumers; // index: consumer name
+ GroupState& findGroup( const QueuedMessage& qm );
+ void deleteGroup(GroupState& group);
+ void disownGroup(GroupState& group); /** release a group from a subscriber */
static const std::string qpidMessageGroupKey;
static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
static const std::string qpidMessageGroupTimestamp;
- GroupState& findGroup( const QueuedMessage& qm );
- unsigned long hits, misses; // for debug
- uint32_t lastMsg;
- std::string lastGroup;
- GroupState *cachedGroup;
-
- void unFree( const GroupState& state );
- void own( GroupState& state, const std::string& owner );
- void disown( GroupState& state );
-
public:
static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
@@ -97,8 +156,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
void acquired( const QueuedMessage& qm );
void requeued( const QueuedMessage& qm );
void dequeued( const QueuedMessage& qm );
- void consumerAdded( const Consumer& ) {};
- void consumerRemoved( const Consumer& ) {};
+ void consumerAdded( const Consumer& );
+ void consumerRemoved( const Consumer& );
void getState(qpid::framing::FieldTable& state ) const;
void setState(const qpid::framing::FieldTable&);
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index aaa2721021..392ae26b7c 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -860,6 +860,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Owners= ---, ---, ---
TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+ queue->consume(c3);
std::deque<QueuedMessage> dequeMeC3;
verifyAcquire(queue, c3, dequeMeC3, "a", 2 );