summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp84
1 files changed, 56 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 5f450cd556..22253532cb 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -43,9 +43,18 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+/** return an iterator to the message at position, or members.end() if not found */
+MessageGroupManager::GroupState::MessageFifo::iterator
+MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position)
+{
+ MessageState mState(position);
+ MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState);
+ return (found->position == position) ? found : members.end();
+}
+
void MessageGroupManager::unFree( const GroupState& state )
{
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ GroupFifo::iterator pos = freeGroups.find( state.members.front().position );
assert( pos != freeGroups.end() && pos->second == &state );
freeGroups.erase( pos );
}
@@ -60,8 +69,8 @@ void MessageGroupManager::disown( GroupState& state )
{
state.owner.clear();
assert(state.members.size());
- assert(freeGroups.find(state.members.front()) == freeGroups.end());
- freeGroups[state.members.front()] = &state;
+ assert(freeGroups.find(state.members.front().position) == freeGroups.end());
+ freeGroups[state.members.front().position] = &state;
}
MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
@@ -106,7 +115,8 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm )
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- state.members.push_back(qm.position);
+ GroupState::MessageState mState(qm.position);
+ state.members.push_back(mState);
uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
": added message to group id=" << state.group << " total=" << total );
@@ -123,7 +133,9 @@ void MessageGroupManager::acquired( const QueuedMessage& qm )
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- assert(state.members.size()); // there are msgs present
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ m->acquired = true;
state.acquired += 1;
QPID_LOG( trace, "group queue " << qName <<
": acquired message in group id=" << state.group << " acquired=" << state.acquired );
@@ -137,6 +149,9 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
GroupState& state = findGroup(qm);
assert( state.acquired != 0 );
state.acquired -= 1;
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ m->acquired = false;
if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << state.group);
@@ -152,13 +167,17 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
GroupState& state = findGroup(qm);
- assert( state.members.size() != 0 );
- assert( state.acquired != 0 );
- state.acquired -= 1;
+ GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+ assert(m != state.members.end());
+ if (m->acquired) {
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ }
- // likely to be at or near begin() if dequeued in order
+ // special case if qm is first (oldest) message in the group:
+ // may need to re-insert it back on the freeGroups list, as the index will change
bool reFreeNeeded = false;
- if (state.members.front() == qm.position) {
+ if (m == state.members.begin()) {
if (!state.owned()) {
// will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
// if on freelist, it is indexed by first member, which is about to be removed!
@@ -167,15 +186,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
}
state.members.pop_front();
} else {
- GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
- GroupState::PositionFifo::iterator end = state.members.end();
- while (pos != end) {
- if (*pos == qm.position) {
- state.members.erase(pos);
- break;
- }
- ++pos;
- }
+ state.members.erase(m);
}
uint32_t total = state.members.size();
@@ -220,11 +231,11 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
GroupState& group = findGroup(next);
if (!group.owned()) {
//TODO: make acquire more efficient when we already have the message in question
- if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head!
+ if (group.members.front().position == next.position && messages.acquire(next.position, next)) { // only take from head!
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
- << "'s head message still pending. pos=" << group.members.front());
+ << "'s head message still pending. pos=" << group.members.front().position);
} else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
return true;
}
@@ -284,7 +295,7 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const
info[GROUP_TIMESTAMP] = 0;
if (g->second.members.size() != 0) {
QueuedMessage qm;
- if (messages.find(g->second.members.front(), qm) &&
+ if (messages.find(g->second.members.front().position, qm) &&
qm.payload &&
qm.payload->hasProperties<framing::DeliveryProperties>()) {
info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp();
@@ -353,6 +364,7 @@ namespace {
const std::string GROUP_OWNER("owner");
const std::string GROUP_ACQUIRED_CT("acquired-ct");
const std::string GROUP_POSITIONS("positions");
+ const std::string GROUP_ACQUIRED_MSGS("acquired-msgs");
const std::string GROUP_STATE("group-state");
}
@@ -371,10 +383,14 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
group.setString(GROUP_OWNER, g->second.owner);
group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
framing::Array positions(TYPE_CODE_UINT32);
- for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
- p != g->second.members.end(); ++p)
- positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+ framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+ for (GroupState::MessageFifo::const_iterator p = g->second.members.begin();
+ p != g->second.members.end(); ++p) {
+ positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position )));
+ acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired )));
+ }
group.setArray(GROUP_POSITIONS, positions);
+ group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
}
state.setArray(GROUP_STATE, groupState);
@@ -425,13 +441,25 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
qName << "\": position encoding error!");
return;
}
+ framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+ ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
+ if (!ok || positions.count() != acquiredMsgs.count()) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": acquired flag encoding error!");
+ return;
+ }
+
+ Array::const_iterator a = acquiredMsgs.begin();
+ for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) {
+ GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>());
+ mState.acquired = (*a++)->getIntegerValue<bool>();
+ state.members.push_back(mState);
+ }
- for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
- state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
messageGroups[state.group] = state;
if (!state.owned()) {
assert(state.members.size());
- freeGroups[state.members.front()] = &messageGroups[state.group];
+ freeGroups[state.members.front().position] = &messageGroups[state.group];
}
}