/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/MessageGroupManager.h" #include "qpid/broker/Message.h" #include "qpid/broker/Messages.h" #include "qpid/broker/MessageDeque.h" #include "qpid/broker/QueueSettings.h" #include "qpid/framing/Array.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/TypeCode.h" #include "qpid/types/Variant.h" #include "qpid/log/Statement.h" #include "qpid/types/Variant.h" using namespace qpid::broker; namespace { const std::string GROUP_QUERY_KEY("qpid.message_group_queue"); const std::string GROUP_HEADER_KEY("group_header_key"); const std::string GROUP_STATE_KEY("group_state"); const std::string GROUP_ID_KEY("group_id"); const std::string GROUP_MSG_COUNT("msg_count"); const std::string GROUP_TIMESTAMP("timestamp"); const std::string GROUP_CONSUMER("consumer"); } const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); /** return an iterator to the message at position, or members.end() if not found */ MessageGroupManager::GroupState::MessageFifo::iterator MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position) { MessageState mState(position); MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState); return (found->position == position) ? found : members.end(); } void MessageGroupManager::unFree( const GroupState& state ) { GroupFifo::iterator pos = freeGroups.find( state.members.front().position ); assert( pos != freeGroups.end() && pos->second == &state ); freeGroups.erase( pos ); } void MessageGroupManager::own( GroupState& state, const std::string& owner ) { state.owner = owner; unFree( state ); } void MessageGroupManager::disown( GroupState& state ) { state.owner.clear(); assert(state.members.size()); assert(freeGroups.find(state.members.front().position) == freeGroups.end()); freeGroups[state.members.front().position] = &state; } MessageGroupManager::GroupState& MessageGroupManager::findGroup( const Message& m ) { uint32_t thisMsg = m.getSequence().getValue(); if (cachedGroup && lastMsg == thisMsg) { hits++; return *cachedGroup; } std::string group = m.getPropertyAsString(groupIdHeader); if (group.empty()) group = defaultGroupId; //empty group is reserved if (cachedGroup && group == lastGroup) { hits++; lastMsg = thisMsg; return *cachedGroup; } misses++; GroupState& found = messageGroups[group]; if (found.group.empty()) found.group = group; // new group, assign name lastMsg = thisMsg; lastGroup = group; cachedGroup = &found; return found; } void MessageGroupManager::enqueued( const Message& m ) { // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? GroupState& state = findGroup(m); GroupState::MessageState mState(m.getSequence()); state.members.push_back(mState); uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << ": added message to group id=" << state.group << " total=" << total ); if (total == 1) { // newly created group, no owner assert(freeGroups.find(m.getSequence()) == freeGroups.end()); freeGroups[m.getSequence()] = &state; } } void MessageGroupManager::acquired( const Message& m ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(m); GroupState::MessageFifo::iterator gm = state.findMsg(m.getSequence()); assert(gm != state.members.end()); gm->acquired = true; state.acquired += 1; QPID_LOG( trace, "group queue " << qName << ": acquired message in group id=" << state.group << " acquired=" << state.acquired ); } void MessageGroupManager::requeued( const Message& m ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(m); assert( state.acquired != 0 ); state.acquired -= 1; GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence()); assert(i != state.members.end()); i->acquired = false; if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << state.group); disown(state); } QPID_LOG( trace, "group queue " << qName << ": requeued message to group id=" << state.group << " acquired=" << state.acquired ); } void MessageGroupManager::dequeued( const Message& m ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(m); GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence()); assert(i != state.members.end()); if (i->acquired) { assert( state.acquired != 0 ); state.acquired -= 1; } // special case if qm is first (oldest) message in the group: // may need to re-insert it back on the freeGroups list, as the index will change bool reFreeNeeded = false; if (i == state.members.begin()) { if (!state.owned()) { // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! // if on freelist, it is indexed by first member, which is about to be removed! unFree(state); reFreeNeeded = true; } state.members.pop_front(); } else { state.members.erase(i); } uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << ": dequeued message from group id=" << state.group << " total=" << total ); if (total == 0) { QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << state.group); if (cachedGroup == &state) { cachedGroup = 0; } std::string key(state.group); messageGroups.erase( key ); } else if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << state.group); disown(state); MessageDeque* md = dynamic_cast(&messages); if (md) { md->resetCursors(); } else { QPID_LOG(warning, "Could not reset cursors for message group, unexpected container type"); } } else if (reFreeNeeded) { disown(state); } } MessageGroupManager::~MessageGroupManager() { QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses ); } bool MessageGroupManager::acquire(const std::string& consumer, Message& m) { if (m.getState() == AVAILABLE) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage GroupState& state = findGroup(m); if (!state.owned()) { own( state, consumer ); QPID_LOG( trace, "group queue " << qName << ": consumer name=" << consumer << " has acquired group id=" << state.group); } if (state.owner == consumer) { m.setState(ACQUIRED); return true; } else { return false; } } else { return false; } } void MessageGroupManager::query(qpid::types::Variant::Map& status) const { /** Add a description of the current state of the message groups for this queue. FORMAT: { "qpid.message_group_queue": { "group_header_key" : "", "group_state" : [ { "group_id" : "", "msg_count" : , "timestamp" : , "consumer" : }, {...} // one for each known group ] } } **/ assert(status.find(GROUP_QUERY_KEY) == status.end()); qpid::types::Variant::Map state; qpid::types::Variant::List groups; state[GROUP_HEADER_KEY] = groupIdHeader; for (GroupMap::const_iterator g = messageGroups.begin(); g != messageGroups.end(); ++g) { qpid::types::Variant::Map info; info[GROUP_ID_KEY] = g->first; info[GROUP_MSG_COUNT] = (uint64_t) g->second.members.size(); // set the timestamp to the arrival timestamp of the oldest (HEAD) message, if present info[GROUP_TIMESTAMP] = 0; if (g->second.members.size() != 0) { Message* m = messages.find(g->second.members.front().position, 0); if (m && m->getTimestamp()) { info[GROUP_TIMESTAMP] = m->getTimestamp(); } } info[GROUP_CONSUMER] = g->second.owner; groups.push_back(info); } state[GROUP_STATE_KEY] = groups; status[GROUP_QUERY_KEY] = state; } boost::shared_ptr MessageGroupManager::create( const std::string& qName, Messages& messages, const QueueSettings& settings ) { boost::shared_ptr manager( new MessageGroupManager( settings.groupKey, qName, messages, settings.addTimestamp ) ); QPID_LOG( debug, "Configured Queue '" << qName << "' for message grouping using header key '" << settings.groupKey << "'" << " (timestamp=" << settings.addTimestamp << ")"); return manager; } std::string MessageGroupManager::defaultGroupId; void MessageGroupManager::setDefaults(const std::string& groupId) // static { defaultGroupId = groupId; } namespace { const std::string GROUP_NAME("name"); const std::string GROUP_OWNER("owner"); const std::string GROUP_ACQUIRED_CT("acquired-ct"); const std::string GROUP_POSITIONS("positions"); const std::string GROUP_ACQUIRED_MSGS("acquired-msgs"); const std::string GROUP_STATE("group-state"); }