summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp456
1 files changed, 342 insertions, 114 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 00d1a58bef..34b11b816e 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -43,28 +43,174 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
-void MessageGroupManager::unFree( const GroupState& state )
+/** class ConsumerState **/
+
+/** the consumer owns the given group */
+void MessageGroupManager::ConsumerState::addGroup(const GroupState& group)
+{
+ ownedGroups += 1;
+ pendingMsgs += (group.totalMsgs() - group.acquiredMsgs());
+}
+
+/** the consumer releases the given group */
+void MessageGroupManager::ConsumerState::removeGroup(const GroupState& group)
+{
+ assert(ownedGroups != 0);
+ ownedGroups -= 1;
+ uint32_t del = (group.totalMsgs() - group.acquiredMsgs());
+ assert(del <= pendingMsgs);
+ pendingMsgs -= del;
+}
+
+/** notify the consumer that a new message has arrived at one if its owned groups */
+void MessageGroupManager::ConsumerState::msgAvailable(const GroupState&,
+ const QueuedMessage& )
+{
+ assert(ownedGroups != 0);
+ pendingMsgs += 1;
+}
+
+/** notify the consumer that an available message has been acquired */
+void MessageGroupManager::ConsumerState::msgAcquired(const GroupState&,
+ const QueuedMessage& )
+{
+ assert(pendingMsgs != 0);
+ pendingMsgs -= 1;
+}
+
+
+void MessageGroupManager::consumerAdded( const Consumer& c)
+{
+ const std::string& name = c.getName();
+ ConsumerState& state = consumers[name];
+ state.setName(name);
+ state.uncancel(); // just in case old consumer resubcribed
+ QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " added.");
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& c)
+{
+ const std::string& name = c.getName();
+ ConsumerMap::iterator cs = consumers.find( name );
+ assert(cs != consumers.end());
+ ConsumerState& state = cs->second;
+ state.cancel();
+ if (state.groupCount() == 0) {
+ assert(state.remainingMsgs() == 0);
+ consumers.erase(cs);
+ QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " removed.");
+ }
+}
+
+
+/** GroupFifo */
+void MessageGroupManager::GroupFifo::addGroup(const GroupState& group)
{
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
- assert( pos != freeGroups.end() && pos->second == &state );
- freeGroups.erase( pos );
+ assert(group.totalMsgs() != 0);
+ const framing::SequenceNumber& next = group.nextMsg();
+ assert(fifo.find(next) == fifo.end());
+ fifo[next] = &group;
+}
+
+void MessageGroupManager::GroupFifo::removeGroup(const GroupState& group)
+{
+ Fifo::iterator pos = fifo.find( group.nextMsg() );
+ assert( pos != fifo.end() && pos->second == &group );
+ fifo.erase( pos );
+}
+
+const MessageGroupManager::GroupState& MessageGroupManager::GroupFifo::nextGroup() const
+{
+ return *(fifo.begin()->second);
+}
+
+
+/** GroupState */
+void MessageGroupManager::GroupState::setOwner( ConsumerState& consumer )
+{
+ assert(owner == 0);
+ owner = &consumer;
+ owner->addGroup( *this );
+}
+
+void MessageGroupManager::GroupState::resetOwner()
+{
+ assert(owner);
+ owner->removeGroup( *this );
+ owner = 0;
+}
+
+const qpid::framing::SequenceNumber& MessageGroupManager::GroupState::nextMsg() const
+{
+ assert(members.size() != 0);
+ return members.front();
+}
+
+
+void MessageGroupManager::GroupState::enqueueMsg(const QueuedMessage& msg)
+{
+ members.push_back(msg.position);
+ if (owner) {
+ owner->msgAvailable(*this, msg);
+ }
+}
+
+
+void MessageGroupManager::GroupState::acquireMsg(const QueuedMessage& msg)
+{
+ assert(members.size()); // there are msgs present
+ acquired += 1;
+ if (owner) {
+ owner->msgAcquired(*this, msg);
+ }
+}
+
+void MessageGroupManager::GroupState::requeueMsg(const QueuedMessage& msg)
+{
+ assert(acquired != 0);
+ acquired -= 1;
+ if (owner) {
+ owner->msgAvailable(*this, msg);
+ }
+}
+
+
+void MessageGroupManager::GroupState::dequeueMsg(const QueuedMessage& msg)
+{
+ assert( members.size() != 0 );
+ assert( acquired != 0 );
+ acquired -= 1;
+
+ // likely to be at or near begin() if dequeued in order
+ if (members.front() == msg.position) {
+ members.pop_front();
+ } else {
+ unsigned long diff = msg.position.getValue() - members.front().getValue();
+ long maxEnd = diff < members.size() ? (diff + 1) : members.size();
+ GroupState::PositionFifo::iterator i =
+ std::lower_bound(members.begin(), members.begin()+maxEnd, msg.position);
+ assert(i != members.end() && *i == msg.position);
+ members.erase(i);
+ }
}
-void MessageGroupManager::own( GroupState& state, const std::string& owner )
+void MessageGroupManager::GroupState::getPositions(framing::Array& positions) const
{
- state.owner = owner;
- unFree( state );
+ for (PositionFifo::const_iterator p = members.begin();
+ p != members.end(); ++p)
+ positions.push_back(framing::Array::ValuePtr(new framing::IntegerValue( *p )));
}
-void MessageGroupManager::disown( GroupState& state )
+void MessageGroupManager::GroupState::setPositions(const framing::Array& positions)
{
- state.owner.clear();
- assert(state.members.size());
- assert(freeGroups.find(state.members.front()) == freeGroups.end());
- freeGroups[state.members.front()] = &state;
+ members.clear();
+ for (framing::Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
+ members.push_back((*p)->getIntegerValue<uint32_t, 4>());
}
-MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
+
+
+MessageGroupManager::GroupState& MessageGroupManager::findGroup(const QueuedMessage& qm)
{
uint32_t thisMsg = qm.position.getValue();
if (cachedGroup && lastMsg == thisMsg) {
@@ -91,42 +237,45 @@ MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMes
misses++;
- GroupState& found = messageGroups[group];
- if (found.group.empty())
- found.group = group; // new group, assign name
+ cachedGroup = &messageGroups[group];
+ if (cachedGroup->getName().empty())
+ cachedGroup->setName(group); // new group, assign name
lastMsg = thisMsg;
lastGroup = group;
- cachedGroup = &found;
- return found;
+ return *cachedGroup;
}
+void MessageGroupManager::deleteGroup(GroupState& group)
+{
+ if (cachedGroup == &group)
+ cachedGroup = 0;
+ std::string name = group.getName();
+ messageGroups.erase(name);
+}
+
void MessageGroupManager::enqueued( const QueuedMessage& qm )
{
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- state.members.push_back(qm.position);
- uint32_t total = state.members.size();
+ state.enqueueMsg(qm);
+ uint32_t total = state.totalMsgs();
QPID_LOG( trace, "group queue " << qName <<
- ": added message to group id=" << state.group << " total=" << total );
+ ": added message to group id=" << state.getName() << " total=" << total );
if (total == 1) {
// newly created group, no owner
- assert(freeGroups.find(qm.position) == freeGroups.end());
- freeGroups[qm.position] = &state;
+ freeGroups.addGroup(state);
}
}
void MessageGroupManager::acquired( const QueuedMessage& qm )
{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- // issue: const-ness??
GroupState& state = findGroup(qm);
- assert(state.members.size()); // there are msgs present
- state.acquired += 1;
+ state.acquireMsg(qm);
QPID_LOG( trace, "group queue " << qName <<
- ": acquired message in group id=" << state.group << " acquired=" << state.acquired );
+ ": acquired message in group id=" << state.getName() << " acquired=" << state.acquiredMsgs());
}
@@ -135,69 +284,77 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- assert( state.acquired != 0 );
- state.acquired -= 1;
- if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id=" << state.group);
- disown(state);
+ state.requeueMsg(qm);
+ if (state.acquiredMsgs() == 0 && state.getOwner()) {
+ disownGroup(state);
+ freeGroups.addGroup(state);
}
QPID_LOG( trace, "group queue " << qName <<
- ": requeued message to group id=" << state.group << " acquired=" << state.acquired );
+ ": requeued message to group id=" << state.getName() << " acquired=" << state.acquiredMsgs());
}
void MessageGroupManager::dequeued( const QueuedMessage& qm )
{
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- // issue: const-ness??
- GroupState& state = findGroup(qm);
- assert( state.members.size() != 0 );
- assert( state.acquired != 0 );
- state.acquired -= 1;
-
- // likely to be at or near begin() if dequeued in order
- bool reFreeNeeded = false;
- if (state.members.front() == qm.position) {
- if (!state.owned()) {
- // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
- // if on freelist, it is indexed by first member, which is about to be removed!
- unFree(state);
- reFreeNeeded = true;
- }
- state.members.pop_front();
- } else {
- GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
- GroupState::PositionFifo::iterator end = state.members.end();
- while (pos != end) {
- if (*pos == qm.position) {
- state.members.erase(pos);
- break;
- }
- ++pos;
+ GroupState& group = findGroup(qm);
+ bool freeNeeded = false;
+ if (group.isFree()) { // dequeue is occuring via mgmt, not subscriber!
+ const framing::SequenceNumber next = group.nextMsg();
+ if (next == qm.position) {
+ /* we are about to remove the head message of this group. This message is
+ * used to index the freeGroups fifo, so we must temporarily remove it from
+ * the fifo until we are done updating the head message.
+ */
+ freeGroups.removeGroup(group);
+ freeNeeded = true;
}
}
+ group.dequeueMsg(qm);
+
+ uint32_t total = group.totalMsgs();
+ QPID_LOG( trace, "group queue " << qName <<
+ ": dequeued message from group id=" << group.getName() << " total=" << total );
+
+ // if no more outstanding acquired messages, free the group from the consumer
+ if (group.acquiredMsgs() == 0 && group.getOwner()) {
+ // group is now available again
+ disownGroup(group);
+ freeNeeded = true;
+ }
- uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
- ": dequeued message from group id=" << state.group << " total=" << total );
+ ": dequeued message from group id=" << group.getName() << " total=" << total );
if (total == 0) {
- QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << state.group);
- if (cachedGroup == &state) {
- cachedGroup = 0;
- }
- std::string key(state.group);
- messageGroups.erase( key );
- } else if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id=" << state.group);
- disown(state);
- } else if (reFreeNeeded) {
- disown(state);
+ QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << group.getName());
+ deleteGroup(group);
+ } else if (freeNeeded) {
+ freeGroups.addGroup(group);
}
}
+/** remove the owner of the group */
+void MessageGroupManager::disownGroup(GroupState& group)
+{
+ ConsumerState& owner = *group.getOwner();
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << owner.getName() << " released group id=" << group.getName());
+ group.resetOwner();
+ if (owner.cancelled() && owner.groupCount() == 0) {
+ // this owner has unsubscribed, we can release it now.
+ std::string name = owner.getName();
+ consumers.erase(name);
+ QPID_LOG( error, "group queue " << qName << ": consumer " << name << " removed.");
+ }
+}
+
+namespace {
+ unsigned long found = 0;
+ unsigned long failed = 0;
+ unsigned long missCount = 0;
+ unsigned long earlyRet = 0;
+}
+
MessageGroupManager::~MessageGroupManager()
{
QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
@@ -207,27 +364,38 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
if (messages.empty())
return false;
+ ConsumerState& cState = consumers.find(c->getName())->second;
+
next.position = c->position;
- if (!freeGroups.empty()) {
- const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
- if (nextFree < next.position) { // a free message is older than current
+ if (freeGroups.groupCount() != 0) {
+ const framing::SequenceNumber& nextFree = freeGroups.nextGroup().nextMsg();
+ if (nextFree <= next.position) { // a free message is older than current
next.position = nextFree;
--next.position;
}
+ } else if (cState.remainingMsgs() == 0) { // no more msgs from owned groups
+ earlyRet += 1;
+ return false;
}
+ int count = 1;
while (messages.next( next.position, next )) {
GroupState& group = findGroup(next);
- if (!group.owned()) {
- if (group.members.front() == next.position) { // only take from head!
+ if (group.getOwner() == &cState) {
+ found += 1;
+ return true;
+ } else if (group.isFree()) {
+ if (group.nextMsg() == next.position) { // only take from head!
+ found += 1;
return true;
}
- QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
- << "'s head message still pending. pos=" << group.members.front());
- } else if (group.owner == c->getName()) {
- return true;
+ QPID_LOG(debug, "Skipping " << next.position << " since group " << group.getName()
+ << "'s head message still pending. pos=" << group.nextMsg());
}
+ count += 1;
}
+ failed += 1;
+ missCount += 1;
return false;
}
@@ -237,13 +405,17 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
GroupState& state = findGroup(qm);
- if (!state.owned()) {
- own( state, consumer );
+ if (state.isFree()) {
+ freeGroups.removeGroup(state);
+ ConsumerMap::iterator cs = consumers.find( consumer );
+ assert(cs != consumers.end());
+ ConsumerState& owner = cs->second;
+ state.setOwner( owner );
QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << consumer << " has acquired group id=" << state.group);
+ ": consumer name=" << consumer << " has acquired group id=" << state.getName());
return true;
}
- return state.owner == consumer;
+ return state.getOwner()->getName() == consumer;
}
bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
@@ -280,9 +452,13 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const
g != messageGroups.end(); ++g) {
qpid::types::Variant::Map info;
info[GROUP_ID_KEY] = g->first;
- info[GROUP_MSG_COUNT] = g->second.members.size();
+ info[GROUP_MSG_COUNT] = g->second.totalMsgs();
info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
- info[GROUP_CONSUMER] = g->second.owner;
+ if (g->second.getOwner()) {
+ info[GROUP_CONSUMER] = g->second.getOwner()->getName();
+ } else {
+ info[GROUP_CONSUMER] = std::string("");
+ }
groups.push_back(info);
}
state[GROUP_STATE_KEY] = groups;
@@ -346,6 +522,11 @@ namespace {
const std::string GROUP_ACQUIRED_CT("acquired-ct");
const std::string GROUP_POSITIONS("positions");
const std::string GROUP_STATE("group-state");
+ const std::string OWNER_STATE("owner-state");
+ const std::string CANCELLED("cancelled");
+ const std::string YES("yes");
+ const std::string NO("no");
+ const std::string OWNER_NAME("name");
}
@@ -360,17 +541,29 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
framing::FieldTable group;
group.setString(GROUP_NAME, g->first);
- group.setString(GROUP_OWNER, g->second.owner);
- group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
+ if (g->second.getOwner()) {
+ group.setString(GROUP_OWNER, g->second.getOwner()->getName());
+ } else {
+ group.setString(GROUP_OWNER, std::string(""));
+ }
+ group.setInt(GROUP_ACQUIRED_CT, g->second.acquiredMsgs());
framing::Array positions(TYPE_CODE_UINT32);
- for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
- p != g->second.members.end(); ++p)
- positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+ g->second.getPositions(positions);
group.setArray(GROUP_POSITIONS, positions);
groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
}
state.setArray(GROUP_STATE, groupState);
+ framing::Array ownerState(TYPE_CODE_MAP);
+ for (ConsumerMap::const_iterator c = consumers.begin();
+ c != consumers.end(); ++c) {
+ framing::FieldTable owner;
+ owner.setString(OWNER_NAME, c->first);
+ owner.setString(CANCELLED, c->second.cancelled() ? YES : NO);
+ ownerState.push_back(framing::Array::ValuePtr(new FieldTableValue(owner)));
+ }
+ state.setArray(OWNER_STATE, ownerState);
+
QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader);
}
@@ -379,51 +572,86 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
{
using namespace qpid::framing;
+ consumers.clear();
messageGroups.clear();
freeGroups.clear();
cachedGroup = 0;
- framing::Array groupState(TYPE_CODE_MAP);
+ // set up the known owners
+ framing::Array ownerState(TYPE_CODE_MAP);
+ bool ok = state.getArray(OWNER_STATE, ownerState);
+ if (!ok) {
+ QPID_LOG(error, "Unable to find message group owner state information for queue \"" <<
+ qName << "\": cluster inconsistency error!");
+ return;
+ }
- bool ok = state.getArray(GROUP_STATE, groupState);
+ for (framing::Array::const_iterator c = ownerState.begin(); c != ownerState.end(); ++c) {
+ framing::FieldTable ownerMap;
+ ok = framing::getEncodedValue<FieldTable>(*c, ownerMap);
+ if (!ok) {
+ QPID_LOG(error, "Invalid message group owner information for queue \"" <<
+ qName << "\": table encoding error!");
+ return;
+ }
+ if (!ownerMap.isSet(OWNER_NAME) || !ownerMap.isSet(CANCELLED)) {
+ QPID_LOG(error, "Invalid message group owner information for queue \"" <<
+ qName << "\": fields missing error!");
+ return;
+ }
+
+ const std::string name = ownerMap.getAsString(OWNER_NAME);
+ ConsumerState& owner = consumers[name];
+ owner.setName(name);
+ if (ownerMap.getAsString(CANCELLED) == YES) {
+ owner.cancel();
+ }
+ }
+
+ // set up the known groups
+ framing::Array groupState(TYPE_CODE_MAP);
+ ok = state.getArray(GROUP_STATE, groupState);
if (!ok) {
QPID_LOG(error, "Unable to find message group state information for queue \"" <<
qName << "\": cluster inconsistency error!");
return;
}
- for (framing::Array::const_iterator g = groupState.begin();
- g != groupState.end(); ++g) {
- framing::FieldTable group;
- ok = framing::getEncodedValue<FieldTable>(*g, group);
+ for (framing::Array::const_iterator g = groupState.begin(); g != groupState.end(); ++g) {
+ framing::FieldTable groupMap;
+ ok = framing::getEncodedValue<FieldTable>(*g, groupMap);
if (!ok) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": table encoding error!");
return;
}
- MessageGroupManager::GroupState state;
- if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) {
+ if (!groupMap.isSet(GROUP_NAME) || !groupMap.isSet(GROUP_OWNER) || !groupMap.isSet(GROUP_ACQUIRED_CT)) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": fields missing error!");
return;
}
- state.group = group.getAsString(GROUP_NAME);
- state.owner = group.getAsString(GROUP_OWNER);
- state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
+
+ // replicate the group state
+ std::string name = groupMap.getAsString(GROUP_NAME);
+ MessageGroupManager::GroupState& group = messageGroups[name];
+ assert(group.getName().empty());
+ group.setName(name);
+ group.setAcquired(groupMap.getAsInt(GROUP_ACQUIRED_CT));
framing::Array positions(TYPE_CODE_UINT32);
- ok = group.getArray(GROUP_POSITIONS, positions);
+ ok = groupMap.getArray(GROUP_POSITIONS, positions);
if (!ok) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
qName << "\": position encoding error!");
return;
}
-
- for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
- state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
- messageGroups[state.group] = state;
- if (!state.owned()) {
- assert(state.members.size());
- freeGroups[state.members.front()] = &messageGroups[state.group];
+ group.setPositions(positions);
+
+ const std::string ownerName = groupMap.getAsString(GROUP_OWNER);
+ if (!ownerName.empty()) {
+ ConsumerState& owner = consumers[ownerName];
+ group.setOwner(owner);
+ } else {
+ freeGroups.addGroup(group);
}
}