diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-28 14:47:50 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-28 14:47:50 +0000 |
commit | 25866cc22314bed8ba6d85841c74cda435509433 (patch) | |
tree | 63dc6d2b7627361c55c8543c80f24dd484ef9c20 /cpp | |
parent | 973fb96e7abf08f940cdc777646268b3f93e7708 (diff) | |
download | qpid-python-25866cc22314bed8ba6d85841c74cda435509433.tar.gz |
QPID-3346: prevent taking non-head of free group, additional performance tweaks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1190374 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.cpp | 123 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/sys/unordered_map.h | 35 |
4 files changed, 119 insertions, 57 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 448a7fa3ca..e87cd3b4e0 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -499,6 +499,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/Waitable.h \ qpid/sys/alloca.h \ qpid/sys/uuid.h \ + qpid/sys/unordered_map.h \ qpid/amqp_0_10/Codecs.cpp if HAVE_SASL diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp index 07b05f3b92..77108c10d5 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -64,13 +64,37 @@ void MessageGroupManager::disown( GroupState& state ) freeGroups[state.members.front()] = &state; } -const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const +MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm ) { + uint32_t thisMsg = qm.position.getValue(); + if (cachedGroup && lastMsg == thisMsg) { + hits++; + return *cachedGroup; + } + + std::string group = defaultGroupId; const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); - if (!headers) return defaultGroupId; - qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); - if (!id || !id->convertsTo<std::string>()) return defaultGroupId; - return id->get<std::string>(); + if (headers) { + qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); + if (id && id->convertsTo<std::string>()) { + group = id->get<std::string>(); + } + } + + if (cachedGroup && group == lastGroup) { + hits++; + return *cachedGroup; + } + + misses++; + + GroupState& found = messageGroups[group]; + if (found.group.empty()) + found.group = group; // new group, assign name + lastMsg = thisMsg; + lastGroup = group; + cachedGroup = &found; + return found; } @@ -78,15 +102,13 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm ) { // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupState &state(messageGroups[group]); + GroupState& state = findGroup(qm); state.members.push_back(qm.position); uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << - ": added message to group id=" << group << " total=" << total ); + ": added message to group id=" << state.group << " total=" << total ); if (total == 1) { // newly created group, no owner - state.group = group; assert(freeGroups.find(qm.position) == freeGroups.end()); freeGroups[qm.position] = &state; } @@ -97,13 +119,11 @@ void MessageGroupManager::acquired( const QueuedMessage& qm ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); + GroupState& state = findGroup(qm); + assert(state.members.size()); // there are msgs present state.acquired += 1; QPID_LOG( trace, "group queue " << qName << - ": acquired message in group id=" << group << " acquired=" << state.acquired ); + ": acquired message in group id=" << state.group << " acquired=" << state.acquired ); } @@ -111,19 +131,16 @@ void MessageGroupManager::requeued( const QueuedMessage& qm ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); + GroupState& state = findGroup(qm); assert( state.acquired != 0 ); state.acquired -= 1; if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << gs->first); + ": consumer name=" << state.owner << " released group id=" << state.group); disown(state); } QPID_LOG( trace, "group queue " << qName << - ": requeued message to group id=" << group << " acquired=" << state.acquired ); + ": requeued message to group id=" << state.group << " acquired=" << state.acquired ); } @@ -131,10 +148,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); + GroupState& state = findGroup(qm); assert( state.members.size() != 0 ); assert( state.acquired != 0 ); state.acquired -= 1; @@ -162,49 +176,55 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) } uint32_t total = state.members.size(); + QPID_LOG( trace, "group queue " << qName << + ": dequeued message from group id=" << state.group << " total=" << total ); + if (total == 0) { - QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); - messageGroups.erase( gs ); + QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << state.group); + if (cachedGroup == &state) { + cachedGroup = 0; + } + std::string key(state.group); + messageGroups.erase( key ); } else if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << gs->first); + ": consumer name=" << state.owner << " released group id=" << state.group); disown(state); } else if (reFreeNeeded) { disown(state); } - QPID_LOG( trace, "group queue " << qName << - ": dequeued message from group id=" << group << " total=" << total ); } +MessageGroupManager::~MessageGroupManager() +{ + QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses ); +} bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { if (messages.empty()) return false; + next.position = c->position; if (!freeGroups.empty()) { - framing::SequenceNumber nextFree = freeGroups.begin()->first; - if (nextFree < c->position) { // next free group's msg is older than current position - bool ok = messages.find(nextFree, next); - (void) ok; assert( ok ); - } else { - if (!messages.next( c->position, next )) - return false; // shouldn't happen - should find nextFree + const framing::SequenceNumber& nextFree = freeGroups.begin()->first; + if (nextFree < next.position) { // a free message is older than current + next.position = nextFree; + --next.position; } - } else { // no free groups available - if (!messages.next( c->position, next )) - return false; } - do { - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - std::string group( getGroupId( next ) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - if (!state.owned() || state.owner == c->getName()) { + while (messages.next( next.position, next )) { + GroupState& group = findGroup(next); + if (!group.owned()) { + if (group.members.front() == next.position) { // only take from head! + return true; + } + QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group + << "'s head message still pending. pos=" << group.members.front()); + } else if (group.owner == c->getName()) { return true; } - } while (messages.next( next.position, next )); + } return false; } @@ -212,15 +232,12 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); + GroupState& state = findGroup(qm); if (!state.owned()) { own( state, consumer ); QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << consumer << " has acquired group id=" << gs->first); + ": consumer name=" << consumer << " has acquired group id=" << state.group); return true; } return state.owner == consumer; @@ -360,8 +377,8 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) { using namespace qpid::framing; messageGroups.clear(); - //consumers.clear(); freeGroups.clear(); + cachedGroup = 0; framing::Array groupState(TYPE_CODE_MAP); diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h index 6c81ec14d1..f4bffc4760 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.h +++ b/cpp/src/qpid/broker/MessageGroupManager.h @@ -26,7 +26,7 @@ #include "qpid/broker/StatefulQueueObserver.h" #include "qpid/broker/MessageDistributor.h" - +#include "qpid/sys/unordered_map.h" namespace qpid { namespace broker { @@ -55,7 +55,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu GroupState() : acquired(0) {} bool owned() const {return !owner.empty();} }; - typedef std::map<std::string, struct GroupState> GroupMap; + + typedef sys::unordered_map<std::string, struct GroupState> GroupMap; typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; GroupMap messageGroups; // index: group name @@ -66,7 +67,12 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers static const std::string qpidMessageGroupTimestamp; - const std::string getGroupId( const QueuedMessage& qm ) const; + GroupState& findGroup( const QueuedMessage& qm ); + unsigned long hits, misses; // for debug + uint32_t lastMsg; + std::string lastGroup; + GroupState *cachedGroup; + void unFree( const GroupState& state ); void own( GroupState& state, const std::string& owner ); void disown( GroupState& state ); @@ -81,7 +87,10 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu MessageGroupManager(const std::string& header, const std::string& _qName, Messages& container, unsigned int _timestamp=0 ) : StatefulQueueObserver(std::string("MessageGroupManager:") + header), - groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {} + groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName), + hits(0), misses(0), + lastMsg(0), cachedGroup(0) {} + virtual ~MessageGroupManager(); // QueueObserver iface void enqueued( const QueuedMessage& qm ); diff --git a/cpp/src/qpid/sys/unordered_map.h b/cpp/src/qpid/sys/unordered_map.h new file mode 100644 index 0000000000..5f7f9567c5 --- /dev/null +++ b/cpp/src/qpid/sys/unordered_map.h @@ -0,0 +1,35 @@ +#ifndef _sys_unordered_map_h +#define _sys_unordered_map_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// unordered_map include path is platform specific + +#ifdef _MSC_VER +# include <unordered_map> +#else +# include <tr1/unordered_map> +#endif /* _MSC_VER */ +namespace qpid { +namespace sys { + using std::tr1::unordered_map; +}} + + +#endif /* _sys_unordered_map_h */ |