diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.cpp | 411 |
1 files changed, 0 insertions, 411 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp deleted file mode 100644 index 07b05f3b92..0000000000 --- a/cpp/src/qpid/broker/MessageGroupManager.cpp +++ /dev/null @@ -1,411 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/FieldTable.h" -#include "qpid/types/Variant.h" -#include "qpid/log/Statement.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/MessageGroupManager.h" - -using namespace qpid::broker; - -namespace { - const std::string GROUP_QUERY_KEY("qpid.message_group_queue"); - const std::string GROUP_HEADER_KEY("group_header_key"); - const std::string GROUP_STATE_KEY("group_state"); - const std::string GROUP_ID_KEY("group_id"); - const std::string GROUP_MSG_COUNT("msg_count"); - const std::string GROUP_TIMESTAMP("timestamp"); - const std::string GROUP_CONSUMER("consumer"); -} - - -const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); -const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); -const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); - - -void MessageGroupManager::unFree( const GroupState& state ) -{ - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); - assert( pos != freeGroups.end() && pos->second == &state ); - freeGroups.erase( pos ); -} - -void MessageGroupManager::own( GroupState& state, const std::string& owner ) -{ - state.owner = owner; - unFree( state ); -} - -void MessageGroupManager::disown( GroupState& state ) -{ - state.owner.clear(); - assert(state.members.size()); - assert(freeGroups.find(state.members.front()) == freeGroups.end()); - freeGroups[state.members.front()] = &state; -} - -const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const -{ - const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); - if (!headers) return defaultGroupId; - qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); - if (!id || !id->convertsTo<std::string>()) return defaultGroupId; - return id->get<std::string>(); -} - - -void MessageGroupManager::enqueued( const QueuedMessage& qm ) -{ - // @todo KAG optimization - store reference to group state in QueuedMessage - // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupState &state(messageGroups[group]); - state.members.push_back(qm.position); - uint32_t total = state.members.size(); - QPID_LOG( trace, "group queue " << qName << - ": added message to group id=" << group << " total=" << total ); - if (total == 1) { - // newly created group, no owner - state.group = group; - assert(freeGroups.find(qm.position) == freeGroups.end()); - freeGroups[qm.position] = &state; - } -} - - -void MessageGroupManager::acquired( const QueuedMessage& qm ) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - state.acquired += 1; - QPID_LOG( trace, "group queue " << qName << - ": acquired message in group id=" << group << " acquired=" << state.acquired ); -} - - -void MessageGroupManager::requeued( const QueuedMessage& qm ) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - assert( state.acquired != 0 ); - state.acquired -= 1; - if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << gs->first); - disown(state); - } - QPID_LOG( trace, "group queue " << qName << - ": requeued message to group id=" << group << " acquired=" << state.acquired ); -} - - -void MessageGroupManager::dequeued( const QueuedMessage& qm ) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - assert( state.members.size() != 0 ); - assert( state.acquired != 0 ); - state.acquired -= 1; - - // likely to be at or near begin() if dequeued in order - bool reFreeNeeded = false; - if (state.members.front() == qm.position) { - if (!state.owned()) { - // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! - // if on freelist, it is indexed by first member, which is about to be removed! - unFree(state); - reFreeNeeded = true; - } - state.members.pop_front(); - } else { - GroupState::PositionFifo::iterator pos = state.members.begin() + 1; - GroupState::PositionFifo::iterator end = state.members.end(); - while (pos != end) { - if (*pos == qm.position) { - state.members.erase(pos); - break; - } - ++pos; - } - } - - uint32_t total = state.members.size(); - if (total == 0) { - QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); - messageGroups.erase( gs ); - } else if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << gs->first); - disown(state); - } else if (reFreeNeeded) { - disown(state); - } - QPID_LOG( trace, "group queue " << qName << - ": dequeued message from group id=" << group << " total=" << total ); -} - -bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) -{ - if (messages.empty()) - return false; - - if (!freeGroups.empty()) { - framing::SequenceNumber nextFree = freeGroups.begin()->first; - if (nextFree < c->position) { // next free group's msg is older than current position - bool ok = messages.find(nextFree, next); - (void) ok; assert( ok ); - } else { - if (!messages.next( c->position, next )) - return false; // shouldn't happen - should find nextFree - } - } else { // no free groups available - if (!messages.next( c->position, next )) - return false; - } - - do { - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - std::string group( getGroupId( next ) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - if (!state.owned() || state.owner == c->getName()) { - return true; - } - } while (messages.next( next.position, next )); - return false; -} - - -bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - - if (!state.owned()) { - own( state, consumer ); - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << consumer << " has acquired group id=" << gs->first); - return true; - } - return state.owner == consumer; -} - -bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) -{ - // browse: allow access to any available msg, regardless of group ownership (?ok?) - if (!messages.empty() && messages.next(c->position, next)) - return true; - return false; -} - -void MessageGroupManager::query(qpid::types::Variant::Map& status) const -{ - /** Add a description of the current state of the message groups for this queue. - FORMAT: - { "qpid.message_group_queue": - { "group_header_key" : "<KEY>", - "group_state" : - [ { "group_id" : "<name>", - "msg_count" : <int>, - "timestamp" : <absTime>, - "consumer" : <consumer name> }, - {...} // one for each known group - ] - } - } - **/ - - assert(status.find(GROUP_QUERY_KEY) == status.end()); - qpid::types::Variant::Map state; - qpid::types::Variant::List groups; - - state[GROUP_HEADER_KEY] = groupIdHeader; - for (GroupMap::const_iterator g = messageGroups.begin(); - g != messageGroups.end(); ++g) { - qpid::types::Variant::Map info; - info[GROUP_ID_KEY] = g->first; - info[GROUP_MSG_COUNT] = g->second.members.size(); - info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ - info[GROUP_CONSUMER] = g->second.owner; - groups.push_back(info); - } - state[GROUP_STATE_KEY] = groups; - status[GROUP_QUERY_KEY] = state; -} - - -boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName, - Messages& messages, - const qpid::framing::FieldTable& settings ) -{ - boost::shared_ptr<MessageGroupManager> empty; - - if (settings.isSet(qpidMessageGroupKey)) { - - // @todo: remove once "sticky" consumers are supported - see QPID-3347 - if (!settings.isSet(qpidSharedGroup)) { - QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." ); - return empty; - } - - std::string headerKey = settings.getAsString(qpidMessageGroupKey); - if (headerKey.empty()) { - QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName); - return empty; - } - unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); - - boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) ); - - QPID_LOG( debug, "Configured Queue '" << qName << - "' for message grouping using header key '" << headerKey << "'" << - " (timestamp=" << timestamp << ")"); - return manager; - } - return empty; -} - -std::string MessageGroupManager::defaultGroupId; -void MessageGroupManager::setDefaults(const std::string& groupId) // static -{ - defaultGroupId = groupId; -} - -/** Cluster replication: - - state map format: - - { "group-state": [ {"name": <group-name>, - "owner": <consumer-name>-or-empty, - "acquired-ct": <acquired count>, - "positions": [Seqnumbers, ... ]}, - {...} - ] - } -*/ - -namespace { - const std::string GROUP_NAME("name"); - const std::string GROUP_OWNER("owner"); - const std::string GROUP_ACQUIRED_CT("acquired-ct"); - const std::string GROUP_POSITIONS("positions"); - const std::string GROUP_STATE("group-state"); -} - - -/** Runs on UPDATER to snapshot current state */ -void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const -{ - using namespace qpid::framing; - state.clear(); - framing::Array groupState(TYPE_CODE_MAP); - for (GroupMap::const_iterator g = messageGroups.begin(); - g != messageGroups.end(); ++g) { - - framing::FieldTable group; - group.setString(GROUP_NAME, g->first); - group.setString(GROUP_OWNER, g->second.owner); - group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); - framing::Array positions(TYPE_CODE_UINT32); - for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); - p != g->second.members.end(); ++p) - positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); - group.setArray(GROUP_POSITIONS, positions); - groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); - } - state.setArray(GROUP_STATE, groupState); - - QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); -} - - -/** called on UPDATEE to set state from snapshot */ -void MessageGroupManager::setState(const qpid::framing::FieldTable& state) -{ - using namespace qpid::framing; - messageGroups.clear(); - //consumers.clear(); - freeGroups.clear(); - - framing::Array groupState(TYPE_CODE_MAP); - - bool ok = state.getArray(GROUP_STATE, groupState); - if (!ok) { - QPID_LOG(error, "Unable to find message group state information for queue \"" << - qName << "\": cluster inconsistency error!"); - return; - } - - for (framing::Array::const_iterator g = groupState.begin(); - g != groupState.end(); ++g) { - framing::FieldTable group; - ok = framing::getEncodedValue<FieldTable>(*g, group); - if (!ok) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": table encoding error!"); - return; - } - MessageGroupManager::GroupState state; - if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": fields missing error!"); - return; - } - state.group = group.getAsString(GROUP_NAME); - state.owner = group.getAsString(GROUP_OWNER); - state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); - framing::Array positions(TYPE_CODE_UINT32); - ok = group.getArray(GROUP_POSITIONS, positions); - if (!ok) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": position encoding error!"); - return; - } - - for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) - state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); - messageGroups[state.group] = state; - if (!state.owned()) { - assert(state.members.size()); - freeGroups[state.members.front()] = &messageGroups[state.group]; - } - } - - QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) -} |