From 8499ba4b12b41f9375442f770e44eb58337530fb Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Mon, 7 Nov 2011 03:29:04 +0000 Subject: QPID-3346: checkpoint client tracking code git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1198612 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 456 +++++++++++++++++------ qpid/cpp/src/qpid/broker/MessageGroupManager.h | 103 +++-- qpid/cpp/src/tests/QueueTest.cpp | 1 + 3 files changed, 424 insertions(+), 136 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 00d1a58bef..34b11b816e 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -43,28 +43,174 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); -void MessageGroupManager::unFree( const GroupState& state ) +/** class ConsumerState **/ + +/** the consumer owns the given group */ +void MessageGroupManager::ConsumerState::addGroup(const GroupState& group) +{ + ownedGroups += 1; + pendingMsgs += (group.totalMsgs() - group.acquiredMsgs()); +} + +/** the consumer releases the given group */ +void MessageGroupManager::ConsumerState::removeGroup(const GroupState& group) +{ + assert(ownedGroups != 0); + ownedGroups -= 1; + uint32_t del = (group.totalMsgs() - group.acquiredMsgs()); + assert(del <= pendingMsgs); + pendingMsgs -= del; +} + +/** notify the consumer that a new message has arrived at one if its owned groups */ +void MessageGroupManager::ConsumerState::msgAvailable(const GroupState&, + const QueuedMessage& ) +{ + assert(ownedGroups != 0); + pendingMsgs += 1; +} + +/** notify the consumer that an available message has been acquired */ +void MessageGroupManager::ConsumerState::msgAcquired(const GroupState&, + const QueuedMessage& ) +{ + assert(pendingMsgs != 0); + pendingMsgs -= 1; +} + + +void MessageGroupManager::consumerAdded( const Consumer& c) +{ + const std::string& name = c.getName(); + ConsumerState& state = consumers[name]; + state.setName(name); + state.uncancel(); // just in case old consumer resubcribed + QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " added."); +} + +void MessageGroupManager::consumerRemoved( const Consumer& c) +{ + const std::string& name = c.getName(); + ConsumerMap::iterator cs = consumers.find( name ); + assert(cs != consumers.end()); + ConsumerState& state = cs->second; + state.cancel(); + if (state.groupCount() == 0) { + assert(state.remainingMsgs() == 0); + consumers.erase(cs); + QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " removed."); + } +} + + +/** GroupFifo */ +void MessageGroupManager::GroupFifo::addGroup(const GroupState& group) { - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); - assert( pos != freeGroups.end() && pos->second == &state ); - freeGroups.erase( pos ); + assert(group.totalMsgs() != 0); + const framing::SequenceNumber& next = group.nextMsg(); + assert(fifo.find(next) == fifo.end()); + fifo[next] = &group; +} + +void MessageGroupManager::GroupFifo::removeGroup(const GroupState& group) +{ + Fifo::iterator pos = fifo.find( group.nextMsg() ); + assert( pos != fifo.end() && pos->second == &group ); + fifo.erase( pos ); +} + +const MessageGroupManager::GroupState& MessageGroupManager::GroupFifo::nextGroup() const +{ + return *(fifo.begin()->second); +} + + +/** GroupState */ +void MessageGroupManager::GroupState::setOwner( ConsumerState& consumer ) +{ + assert(owner == 0); + owner = &consumer; + owner->addGroup( *this ); +} + +void MessageGroupManager::GroupState::resetOwner() +{ + assert(owner); + owner->removeGroup( *this ); + owner = 0; +} + +const qpid::framing::SequenceNumber& MessageGroupManager::GroupState::nextMsg() const +{ + assert(members.size() != 0); + return members.front(); +} + + +void MessageGroupManager::GroupState::enqueueMsg(const QueuedMessage& msg) +{ + members.push_back(msg.position); + if (owner) { + owner->msgAvailable(*this, msg); + } +} + + +void MessageGroupManager::GroupState::acquireMsg(const QueuedMessage& msg) +{ + assert(members.size()); // there are msgs present + acquired += 1; + if (owner) { + owner->msgAcquired(*this, msg); + } +} + +void MessageGroupManager::GroupState::requeueMsg(const QueuedMessage& msg) +{ + assert(acquired != 0); + acquired -= 1; + if (owner) { + owner->msgAvailable(*this, msg); + } +} + + +void MessageGroupManager::GroupState::dequeueMsg(const QueuedMessage& msg) +{ + assert( members.size() != 0 ); + assert( acquired != 0 ); + acquired -= 1; + + // likely to be at or near begin() if dequeued in order + if (members.front() == msg.position) { + members.pop_front(); + } else { + unsigned long diff = msg.position.getValue() - members.front().getValue(); + long maxEnd = diff < members.size() ? (diff + 1) : members.size(); + GroupState::PositionFifo::iterator i = + std::lower_bound(members.begin(), members.begin()+maxEnd, msg.position); + assert(i != members.end() && *i == msg.position); + members.erase(i); + } } -void MessageGroupManager::own( GroupState& state, const std::string& owner ) +void MessageGroupManager::GroupState::getPositions(framing::Array& positions) const { - state.owner = owner; - unFree( state ); + for (PositionFifo::const_iterator p = members.begin(); + p != members.end(); ++p) + positions.push_back(framing::Array::ValuePtr(new framing::IntegerValue( *p ))); } -void MessageGroupManager::disown( GroupState& state ) +void MessageGroupManager::GroupState::setPositions(const framing::Array& positions) { - state.owner.clear(); - assert(state.members.size()); - assert(freeGroups.find(state.members.front()) == freeGroups.end()); - freeGroups[state.members.front()] = &state; + members.clear(); + for (framing::Array::const_iterator p = positions.begin(); p != positions.end(); ++p) + members.push_back((*p)->getIntegerValue()); } -MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm ) + + +MessageGroupManager::GroupState& MessageGroupManager::findGroup(const QueuedMessage& qm) { uint32_t thisMsg = qm.position.getValue(); if (cachedGroup && lastMsg == thisMsg) { @@ -91,42 +237,45 @@ MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMes misses++; - GroupState& found = messageGroups[group]; - if (found.group.empty()) - found.group = group; // new group, assign name + cachedGroup = &messageGroups[group]; + if (cachedGroup->getName().empty()) + cachedGroup->setName(group); // new group, assign name lastMsg = thisMsg; lastGroup = group; - cachedGroup = &found; - return found; + return *cachedGroup; } +void MessageGroupManager::deleteGroup(GroupState& group) +{ + if (cachedGroup == &group) + cachedGroup = 0; + std::string name = group.getName(); + messageGroups.erase(name); +} + void MessageGroupManager::enqueued( const QueuedMessage& qm ) { // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - state.members.push_back(qm.position); - uint32_t total = state.members.size(); + state.enqueueMsg(qm); + uint32_t total = state.totalMsgs(); QPID_LOG( trace, "group queue " << qName << - ": added message to group id=" << state.group << " total=" << total ); + ": added message to group id=" << state.getName() << " total=" << total ); if (total == 1) { // newly created group, no owner - assert(freeGroups.find(qm.position) == freeGroups.end()); - freeGroups[qm.position] = &state; + freeGroups.addGroup(state); } } void MessageGroupManager::acquired( const QueuedMessage& qm ) { - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - // issue: const-ness?? GroupState& state = findGroup(qm); - assert(state.members.size()); // there are msgs present - state.acquired += 1; + state.acquireMsg(qm); QPID_LOG( trace, "group queue " << qName << - ": acquired message in group id=" << state.group << " acquired=" << state.acquired ); + ": acquired message in group id=" << state.getName() << " acquired=" << state.acquiredMsgs()); } @@ -135,69 +284,77 @@ void MessageGroupManager::requeued( const QueuedMessage& qm ) // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - assert( state.acquired != 0 ); - state.acquired -= 1; - if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << state.group); - disown(state); + state.requeueMsg(qm); + if (state.acquiredMsgs() == 0 && state.getOwner()) { + disownGroup(state); + freeGroups.addGroup(state); } QPID_LOG( trace, "group queue " << qName << - ": requeued message to group id=" << state.group << " acquired=" << state.acquired ); + ": requeued message to group id=" << state.getName() << " acquired=" << state.acquiredMsgs()); } void MessageGroupManager::dequeued( const QueuedMessage& qm ) { - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - // issue: const-ness?? - GroupState& state = findGroup(qm); - assert( state.members.size() != 0 ); - assert( state.acquired != 0 ); - state.acquired -= 1; - - // likely to be at or near begin() if dequeued in order - bool reFreeNeeded = false; - if (state.members.front() == qm.position) { - if (!state.owned()) { - // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! - // if on freelist, it is indexed by first member, which is about to be removed! - unFree(state); - reFreeNeeded = true; - } - state.members.pop_front(); - } else { - GroupState::PositionFifo::iterator pos = state.members.begin() + 1; - GroupState::PositionFifo::iterator end = state.members.end(); - while (pos != end) { - if (*pos == qm.position) { - state.members.erase(pos); - break; - } - ++pos; + GroupState& group = findGroup(qm); + bool freeNeeded = false; + if (group.isFree()) { // dequeue is occuring via mgmt, not subscriber! + const framing::SequenceNumber next = group.nextMsg(); + if (next == qm.position) { + /* we are about to remove the head message of this group. This message is + * used to index the freeGroups fifo, so we must temporarily remove it from + * the fifo until we are done updating the head message. + */ + freeGroups.removeGroup(group); + freeNeeded = true; } } + group.dequeueMsg(qm); + + uint32_t total = group.totalMsgs(); + QPID_LOG( trace, "group queue " << qName << + ": dequeued message from group id=" << group.getName() << " total=" << total ); + + // if no more outstanding acquired messages, free the group from the consumer + if (group.acquiredMsgs() == 0 && group.getOwner()) { + // group is now available again + disownGroup(group); + freeNeeded = true; + } - uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << - ": dequeued message from group id=" << state.group << " total=" << total ); + ": dequeued message from group id=" << group.getName() << " total=" << total ); if (total == 0) { - QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << state.group); - if (cachedGroup == &state) { - cachedGroup = 0; - } - std::string key(state.group); - messageGroups.erase( key ); - } else if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << state.group); - disown(state); - } else if (reFreeNeeded) { - disown(state); + QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << group.getName()); + deleteGroup(group); + } else if (freeNeeded) { + freeGroups.addGroup(group); } } +/** remove the owner of the group */ +void MessageGroupManager::disownGroup(GroupState& group) +{ + ConsumerState& owner = *group.getOwner(); + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << owner.getName() << " released group id=" << group.getName()); + group.resetOwner(); + if (owner.cancelled() && owner.groupCount() == 0) { + // this owner has unsubscribed, we can release it now. + std::string name = owner.getName(); + consumers.erase(name); + QPID_LOG( error, "group queue " << qName << ": consumer " << name << " removed."); + } +} + +namespace { + unsigned long found = 0; + unsigned long failed = 0; + unsigned long missCount = 0; + unsigned long earlyRet = 0; +} + MessageGroupManager::~MessageGroupManager() { QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses ); @@ -207,27 +364,38 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued if (messages.empty()) return false; + ConsumerState& cState = consumers.find(c->getName())->second; + next.position = c->position; - if (!freeGroups.empty()) { - const framing::SequenceNumber& nextFree = freeGroups.begin()->first; - if (nextFree < next.position) { // a free message is older than current + if (freeGroups.groupCount() != 0) { + const framing::SequenceNumber& nextFree = freeGroups.nextGroup().nextMsg(); + if (nextFree <= next.position) { // a free message is older than current next.position = nextFree; --next.position; } + } else if (cState.remainingMsgs() == 0) { // no more msgs from owned groups + earlyRet += 1; + return false; } + int count = 1; while (messages.next( next.position, next )) { GroupState& group = findGroup(next); - if (!group.owned()) { - if (group.members.front() == next.position) { // only take from head! + if (group.getOwner() == &cState) { + found += 1; + return true; + } else if (group.isFree()) { + if (group.nextMsg() == next.position) { // only take from head! + found += 1; return true; } - QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group - << "'s head message still pending. pos=" << group.members.front()); - } else if (group.owner == c->getName()) { - return true; + QPID_LOG(debug, "Skipping " << next.position << " since group " << group.getName() + << "'s head message still pending. pos=" << group.nextMsg()); } + count += 1; } + failed += 1; + missCount += 1; return false; } @@ -237,13 +405,17 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage GroupState& state = findGroup(qm); - if (!state.owned()) { - own( state, consumer ); + if (state.isFree()) { + freeGroups.removeGroup(state); + ConsumerMap::iterator cs = consumers.find( consumer ); + assert(cs != consumers.end()); + ConsumerState& owner = cs->second; + state.setOwner( owner ); QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << consumer << " has acquired group id=" << state.group); + ": consumer name=" << consumer << " has acquired group id=" << state.getName()); return true; } - return state.owner == consumer; + return state.getOwner()->getName() == consumer; } bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) @@ -280,9 +452,13 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const g != messageGroups.end(); ++g) { qpid::types::Variant::Map info; info[GROUP_ID_KEY] = g->first; - info[GROUP_MSG_COUNT] = g->second.members.size(); + info[GROUP_MSG_COUNT] = g->second.totalMsgs(); info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ - info[GROUP_CONSUMER] = g->second.owner; + if (g->second.getOwner()) { + info[GROUP_CONSUMER] = g->second.getOwner()->getName(); + } else { + info[GROUP_CONSUMER] = std::string(""); + } groups.push_back(info); } state[GROUP_STATE_KEY] = groups; @@ -346,6 +522,11 @@ namespace { const std::string GROUP_ACQUIRED_CT("acquired-ct"); const std::string GROUP_POSITIONS("positions"); const std::string GROUP_STATE("group-state"); + const std::string OWNER_STATE("owner-state"); + const std::string CANCELLED("cancelled"); + const std::string YES("yes"); + const std::string NO("no"); + const std::string OWNER_NAME("name"); } @@ -360,17 +541,29 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const framing::FieldTable group; group.setString(GROUP_NAME, g->first); - group.setString(GROUP_OWNER, g->second.owner); - group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); + if (g->second.getOwner()) { + group.setString(GROUP_OWNER, g->second.getOwner()->getName()); + } else { + group.setString(GROUP_OWNER, std::string("")); + } + group.setInt(GROUP_ACQUIRED_CT, g->second.acquiredMsgs()); framing::Array positions(TYPE_CODE_UINT32); - for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); - p != g->second.members.end(); ++p) - positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); + g->second.getPositions(positions); group.setArray(GROUP_POSITIONS, positions); groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); } state.setArray(GROUP_STATE, groupState); + framing::Array ownerState(TYPE_CODE_MAP); + for (ConsumerMap::const_iterator c = consumers.begin(); + c != consumers.end(); ++c) { + framing::FieldTable owner; + owner.setString(OWNER_NAME, c->first); + owner.setString(CANCELLED, c->second.cancelled() ? YES : NO); + ownerState.push_back(framing::Array::ValuePtr(new FieldTableValue(owner))); + } + state.setArray(OWNER_STATE, ownerState); + QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); } @@ -379,51 +572,86 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const void MessageGroupManager::setState(const qpid::framing::FieldTable& state) { using namespace qpid::framing; + consumers.clear(); messageGroups.clear(); freeGroups.clear(); cachedGroup = 0; - framing::Array groupState(TYPE_CODE_MAP); + // set up the known owners + framing::Array ownerState(TYPE_CODE_MAP); + bool ok = state.getArray(OWNER_STATE, ownerState); + if (!ok) { + QPID_LOG(error, "Unable to find message group owner state information for queue \"" << + qName << "\": cluster inconsistency error!"); + return; + } - bool ok = state.getArray(GROUP_STATE, groupState); + for (framing::Array::const_iterator c = ownerState.begin(); c != ownerState.end(); ++c) { + framing::FieldTable ownerMap; + ok = framing::getEncodedValue(*c, ownerMap); + if (!ok) { + QPID_LOG(error, "Invalid message group owner information for queue \"" << + qName << "\": table encoding error!"); + return; + } + if (!ownerMap.isSet(OWNER_NAME) || !ownerMap.isSet(CANCELLED)) { + QPID_LOG(error, "Invalid message group owner information for queue \"" << + qName << "\": fields missing error!"); + return; + } + + const std::string name = ownerMap.getAsString(OWNER_NAME); + ConsumerState& owner = consumers[name]; + owner.setName(name); + if (ownerMap.getAsString(CANCELLED) == YES) { + owner.cancel(); + } + } + + // set up the known groups + framing::Array groupState(TYPE_CODE_MAP); + ok = state.getArray(GROUP_STATE, groupState); if (!ok) { QPID_LOG(error, "Unable to find message group state information for queue \"" << qName << "\": cluster inconsistency error!"); return; } - for (framing::Array::const_iterator g = groupState.begin(); - g != groupState.end(); ++g) { - framing::FieldTable group; - ok = framing::getEncodedValue(*g, group); + for (framing::Array::const_iterator g = groupState.begin(); g != groupState.end(); ++g) { + framing::FieldTable groupMap; + ok = framing::getEncodedValue(*g, groupMap); if (!ok) { QPID_LOG(error, "Invalid message group state information for queue \"" << qName << "\": table encoding error!"); return; } - MessageGroupManager::GroupState state; - if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { + if (!groupMap.isSet(GROUP_NAME) || !groupMap.isSet(GROUP_OWNER) || !groupMap.isSet(GROUP_ACQUIRED_CT)) { QPID_LOG(error, "Invalid message group state information for queue \"" << qName << "\": fields missing error!"); return; } - state.group = group.getAsString(GROUP_NAME); - state.owner = group.getAsString(GROUP_OWNER); - state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); + + // replicate the group state + std::string name = groupMap.getAsString(GROUP_NAME); + MessageGroupManager::GroupState& group = messageGroups[name]; + assert(group.getName().empty()); + group.setName(name); + group.setAcquired(groupMap.getAsInt(GROUP_ACQUIRED_CT)); framing::Array positions(TYPE_CODE_UINT32); - ok = group.getArray(GROUP_POSITIONS, positions); + ok = groupMap.getArray(GROUP_POSITIONS, positions); if (!ok) { QPID_LOG(error, "Invalid message group state information for queue \"" << qName << "\": position encoding error!"); return; } - - for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) - state.members.push_back((*p)->getIntegerValue()); - messageGroups[state.group] = state; - if (!state.owned()) { - assert(state.members.size()); - freeGroups[state.members.front()] = &messageGroups[state.group]; + group.setPositions(positions); + + const std::string ownerName = groupMap.getAsString(GROUP_OWNER); + if (!ownerName.empty()) { + ConsumerState& owner = consumers[ownerName]; + group.setOwner(owner); + } else { + freeGroups.addGroup(group); } } diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index f4bffc4760..9f50c8bf93 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -43,40 +43,99 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu Messages& messages; // parent Queue's in memory message container const std::string qName; // name of parent queue (for logs) - struct GroupState { + class GroupState; + + /** track consumers subscribed to this queue */ + class ConsumerState { + // note: update getState()/setState() when changing this object's state implementation + bool zombie; // cancelled, but still holding messages. + std::string name; + uint32_t ownedGroups; + uint32_t pendingMsgs; + + public: + ConsumerState() : zombie(false), ownedGroups(0), pendingMsgs(0) {} + const std::string& getName() const {return name;} + void setName(const std::string& n) {name = n;} + uint32_t groupCount() const {return ownedGroups;} + uint32_t remainingMsgs() const {return pendingMsgs;} + + void addGroup( const GroupState& g); + void removeGroup( const GroupState& g ); + bool cancelled() const {return zombie;} + void cancel() {zombie = true;} + void uncancel() {zombie = false;} // resubscribed on existing session + void msgAvailable(const GroupState& g, const QueuedMessage& qm); + void msgAcquired(const GroupState& g, const QueuedMessage& qm); + }; + typedef sys::unordered_map ConsumerMap; + ConsumerMap consumers; // index: consumer name + + /** track all known groups */ + class GroupState { // note: update getState()/setState() when changing this object's state implementation typedef std::deque PositionFifo; - std::string group; // group identifier - std::string owner; // consumer with outstanding acquired messages + std::string name; // group identifier uint32_t acquired; // count of outstanding acquired messages + ConsumerState *owner; // consumer with outstanding acquired messages PositionFifo members; // msgs belonging to this group - GroupState() : acquired(0) {} - bool owned() const {return !owner.empty();} + public: + GroupState() : acquired(0), owner(0) {} + const std::string& getName() const {return name;} + void setName(const std::string& n) {name = n;} + uint32_t acquiredMsgs() const {return acquired;} + uint32_t totalMsgs() const {return members.size();} + bool isFree() const {return owner == 0;} + + void setOwner( ConsumerState& consumer ); + ConsumerState *getOwner() const { return owner; } + void resetOwner(); + const framing::SequenceNumber& nextMsg() const; + void enqueueMsg(const QueuedMessage& msg); + void acquireMsg(const QueuedMessage& msg); + void requeueMsg(const QueuedMessage& msg); + void dequeueMsg(const QueuedMessage& msg); + // for clustering: + void getPositions(framing::Array& pos) const; + void setPositions(const framing::Array& pos); + void setAcquired(uint32_t c) {acquired = c;} + }; + typedef sys::unordered_map GroupMap; + GroupMap messageGroups; // index: group name + // cache the last lookup + uint hits; + uint misses; + uint32_t lastMsg; + std::string lastGroup; + GroupState *cachedGroup; - typedef sys::unordered_map GroupMap; - typedef std::map GroupFifo; + /** store free (un-owned) groups by the position of the oldest index */ + class GroupFifo { + // orders groups by their next available message (oldest first) + typedef std::map Fifo; + Fifo fifo; + + public: + GroupFifo() {} + void addGroup(const GroupState& group); + void removeGroup(const GroupState& group); + size_t groupCount() const {return fifo.size();} + const GroupState& nextGroup() const; + void clear() {fifo.clear();} + }; + GroupFifo freeGroups; - GroupMap messageGroups; // index: group name - GroupFifo freeGroups; // ordered by oldest free msg - //Consumers consumers; // index: consumer name + GroupState& findGroup( const QueuedMessage& qm ); + void deleteGroup(GroupState& group); + void disownGroup(GroupState& group); /** release a group from a subscriber */ static const std::string qpidMessageGroupKey; static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers static const std::string qpidMessageGroupTimestamp; - GroupState& findGroup( const QueuedMessage& qm ); - unsigned long hits, misses; // for debug - uint32_t lastMsg; - std::string lastGroup; - GroupState *cachedGroup; - - void unFree( const GroupState& state ); - void own( GroupState& state, const std::string& owner ); - void disown( GroupState& state ); - public: static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId); @@ -97,8 +156,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu void acquired( const QueuedMessage& qm ); void requeued( const QueuedMessage& qm ); void dequeued( const QueuedMessage& qm ); - void consumerAdded( const Consumer& ) {}; - void consumerRemoved( const Consumer& ) {}; + void consumerAdded( const Consumer& ); + void consumerRemoved( const Consumer& ); void getState(qpid::framing::FieldTable& state ) const; void setState(const qpid::framing::FieldTable&); diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index aaa2721021..392ae26b7c 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -860,6 +860,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Owners= ---, ---, --- TestConsumer::shared_ptr c3(new TestConsumer("C3")); + queue->consume(c3); std::deque dequeMeC3; verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); -- cgit v1.2.1