diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.cpp | 176 |
1 files changed, 65 insertions, 111 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp index 15cd56a676..47e40a4794 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -1,4 +1,4 @@ -/* + /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,10 +20,16 @@ */ #include "qpid/broker/MessageGroupManager.h" - -#include "qpid/broker/Queue.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Messages.h" +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/QueueSettings.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" +#include "qpid/framing/TypeCode.h" +#include "qpid/types/Variant.h" #include "qpid/log/Statement.h" #include "qpid/types/Variant.h" @@ -75,24 +81,16 @@ void MessageGroupManager::disown( GroupState& state ) freeGroups[state.members.front().position] = &state; } -MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm ) +MessageGroupManager::GroupState& MessageGroupManager::findGroup( const Message& m ) { - uint32_t thisMsg = qm.position.getValue(); + uint32_t thisMsg = m.getSequence().getValue(); if (cachedGroup && lastMsg == thisMsg) { hits++; return *cachedGroup; } - std::string group = defaultGroupId; - const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); - if (headers) { - qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); - if (id && id->convertsTo<std::string>()) { - std::string tmp = id->get<std::string>(); - if (!tmp.empty()) // empty group is reserved - group = tmp; - } - } + std::string group = m.getPropertyAsString(groupIdHeader); + if (group.empty()) group = defaultGroupId; //empty group is reserved if (cachedGroup && group == lastGroup) { hits++; @@ -112,48 +110,48 @@ MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMes } -void MessageGroupManager::enqueued( const QueuedMessage& qm ) +void MessageGroupManager::enqueued( const Message& m ) { // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? - GroupState& state = findGroup(qm); - GroupState::MessageState mState(qm.position); + GroupState& state = findGroup(m); + GroupState::MessageState mState(m.getSequence()); state.members.push_back(mState); uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << ": added message to group id=" << state.group << " total=" << total ); if (total == 1) { // newly created group, no owner - assert(freeGroups.find(qm.position) == freeGroups.end()); - freeGroups[qm.position] = &state; + assert(freeGroups.find(m.getSequence()) == freeGroups.end()); + freeGroups[m.getSequence()] = &state; } } -void MessageGroupManager::acquired( const QueuedMessage& qm ) +void MessageGroupManager::acquired( const Message& m ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - GroupState& state = findGroup(qm); - GroupState::MessageFifo::iterator m = state.findMsg(qm.position); - assert(m != state.members.end()); - m->acquired = true; + GroupState& state = findGroup(m); + GroupState::MessageFifo::iterator gm = state.findMsg(m.getSequence()); + assert(gm != state.members.end()); + gm->acquired = true; state.acquired += 1; QPID_LOG( trace, "group queue " << qName << ": acquired message in group id=" << state.group << " acquired=" << state.acquired ); } -void MessageGroupManager::requeued( const QueuedMessage& qm ) +void MessageGroupManager::requeued( const Message& m ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - GroupState& state = findGroup(qm); + GroupState& state = findGroup(m); assert( state.acquired != 0 ); state.acquired -= 1; - GroupState::MessageFifo::iterator m = state.findMsg(qm.position); - assert(m != state.members.end()); - m->acquired = false; + GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence()); + assert(i != state.members.end()); + i->acquired = false; if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << state.group); @@ -164,14 +162,14 @@ void MessageGroupManager::requeued( const QueuedMessage& qm ) } -void MessageGroupManager::dequeued( const QueuedMessage& qm ) +void MessageGroupManager::dequeued( const Message& m ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - GroupState& state = findGroup(qm); - GroupState::MessageFifo::iterator m = state.findMsg(qm.position); - assert(m != state.members.end()); - if (m->acquired) { + GroupState& state = findGroup(m); + GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence()); + assert(i != state.members.end()); + if (i->acquired) { assert( state.acquired != 0 ); state.acquired -= 1; } @@ -179,7 +177,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) // special case if qm is first (oldest) message in the group: // may need to re-insert it back on the freeGroups list, as the index will change bool reFreeNeeded = false; - if (m == state.members.begin()) { + if (i == state.members.begin()) { if (!state.owned()) { // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! // if on freelist, it is indexed by first member, which is about to be removed! @@ -188,7 +186,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) } state.members.pop_front(); } else { - state.members.erase(m); + state.members.erase(i); } uint32_t total = state.members.size(); @@ -206,6 +204,12 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << state.group); disown(state); + MessageDeque* md = dynamic_cast<MessageDeque*>(&messages); + if (md) { + md->resetCursors(); + } else { + QPID_LOG(warning, "Could not reset cursors for message group, unexpected container type"); + } } else if (reFreeNeeded) { disown(state); } @@ -215,55 +219,27 @@ MessageGroupManager::~MessageGroupManager() { QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses ); } -bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) + +bool MessageGroupManager::acquire(const std::string& consumer, Message& m) { - if (!messages.size()) - return false; + if (m.getState() == AVAILABLE) { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + GroupState& state = findGroup(m); - next.position = c->getPosition(); - if (!freeGroups.empty()) { - const framing::SequenceNumber& nextFree = freeGroups.begin()->first; - if (nextFree <= next.position) { // take oldest free - next.position = nextFree; - --next.position; + if (!state.owned()) { + own( state, consumer ); + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << consumer << " has acquired group id=" << state.group); } - } - - while (messages.browse( next.position, next, true )) { - GroupState& group = findGroup(next); - if (!group.owned()) { - //TODO: make acquire more efficient when we already have the message in question - if (group.members.front().position == next.position && messages.acquire(next.position, next)) { // only take from head! - return true; - } - QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group - << "'s head message still pending. pos=" << group.members.front().position); - } else if (group.owner == c->getName() && messages.acquire(next.position, next)) { + if (state.owner == consumer) { + m.setState(ACQUIRED); return true; + } else { + return false; } + } else { + return false; } - return false; -} - - -bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - GroupState& state = findGroup(qm); - - if (!state.owned()) { - own( state, consumer ); - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << consumer << " has acquired group id=" << state.group); - return true; - } - return state.owner == consumer; -} - -bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) -{ - // browse: allow access to any available msg, regardless of group ownership (?ok?) - return messages.browse(c->getPosition(), next, false); } void MessageGroupManager::query(qpid::types::Variant::Map& status) const @@ -296,11 +272,9 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const // set the timestamp to the arrival timestamp of the oldest (HEAD) message, if present info[GROUP_TIMESTAMP] = 0; if (g->second.members.size() != 0) { - QueuedMessage qm; - if (messages.find(g->second.members.front().position, qm) && - qm.payload && - qm.payload->hasProperties<framing::DeliveryProperties>()) { - info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp(); + Message* m = messages.find(g->second.members.front().position, 0); + if (m && m->getTimestamp()) { + info[GROUP_TIMESTAMP] = m->getTimestamp(); } } info[GROUP_CONSUMER] = g->second.owner; @@ -313,33 +287,13 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName, Messages& messages, - const qpid::framing::FieldTable& settings ) + const QueueSettings& settings ) { - boost::shared_ptr<MessageGroupManager> empty; - - if (settings.isSet(qpidMessageGroupKey)) { - - // @todo: remove once "sticky" consumers are supported - see QPID-3347 - if (!settings.isSet(qpidSharedGroup)) { - QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." ); - return empty; - } - - std::string headerKey = settings.getAsString(qpidMessageGroupKey); - if (headerKey.empty()) { - QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName); - return empty; - } - unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); - - boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) ); - - QPID_LOG( debug, "Configured Queue '" << qName << - "' for message grouping using header key '" << headerKey << "'" << - " (timestamp=" << timestamp << ")"); - return manager; - } - return empty; + boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( settings.groupKey, qName, messages, settings.addTimestamp ) ); + QPID_LOG( debug, "Configured Queue '" << qName << + "' for message grouping using header key '" << settings.groupKey << "'" << + " (timestamp=" << settings.addTimestamp << ")"); + return manager; } std::string MessageGroupManager::defaultGroupId; |