summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-28 14:47:50 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-28 14:47:50 +0000
commit25866cc22314bed8ba6d85841c74cda435509433 (patch)
tree63dc6d2b7627361c55c8543c80f24dd484ef9c20 /cpp
parent973fb96e7abf08f940cdc777646268b3f93e7708 (diff)
downloadqpid-python-25866cc22314bed8ba6d85841c74cda435509433.tar.gz
QPID-3346: prevent taking non-head of free group, additional performance tweaks
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1190374 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.cpp123
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.h17
-rw-r--r--cpp/src/qpid/sys/unordered_map.h35
4 files changed, 119 insertions, 57 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 448a7fa3ca..e87cd3b4e0 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -499,6 +499,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/Waitable.h \
qpid/sys/alloca.h \
qpid/sys/uuid.h \
+ qpid/sys/unordered_map.h \
qpid/amqp_0_10/Codecs.cpp
if HAVE_SASL
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp
index 07b05f3b92..77108c10d5 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -64,13 +64,37 @@ void MessageGroupManager::disown( GroupState& state )
freeGroups[state.members.front()] = &state;
}
-const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
+MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
{
+ uint32_t thisMsg = qm.position.getValue();
+ if (cachedGroup && lastMsg == thisMsg) {
+ hits++;
+ return *cachedGroup;
+ }
+
+ std::string group = defaultGroupId;
const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
- if (!headers) return defaultGroupId;
- qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
- if (!id || !id->convertsTo<std::string>()) return defaultGroupId;
- return id->get<std::string>();
+ if (headers) {
+ qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
+ if (id && id->convertsTo<std::string>()) {
+ group = id->get<std::string>();
+ }
+ }
+
+ if (cachedGroup && group == lastGroup) {
+ hits++;
+ return *cachedGroup;
+ }
+
+ misses++;
+
+ GroupState& found = messageGroups[group];
+ if (found.group.empty())
+ found.group = group; // new group, assign name
+ lastMsg = thisMsg;
+ lastGroup = group;
+ cachedGroup = &found;
+ return found;
}
@@ -78,15 +102,13 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm )
{
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupState &state(messageGroups[group]);
+ GroupState& state = findGroup(qm);
state.members.push_back(qm.position);
uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
- ": added message to group id=" << group << " total=" << total );
+ ": added message to group id=" << state.group << " total=" << total );
if (total == 1) {
// newly created group, no owner
- state.group = group;
assert(freeGroups.find(qm.position) == freeGroups.end());
freeGroups[qm.position] = &state;
}
@@ -97,13 +119,11 @@ void MessageGroupManager::acquired( const QueuedMessage& qm )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
+ GroupState& state = findGroup(qm);
+ assert(state.members.size()); // there are msgs present
state.acquired += 1;
QPID_LOG( trace, "group queue " << qName <<
- ": acquired message in group id=" << group << " acquired=" << state.acquired );
+ ": acquired message in group id=" << state.group << " acquired=" << state.acquired );
}
@@ -111,19 +131,16 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
+ GroupState& state = findGroup(qm);
assert( state.acquired != 0 );
state.acquired -= 1;
if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id=" << gs->first);
+ ": consumer name=" << state.owner << " released group id=" << state.group);
disown(state);
}
QPID_LOG( trace, "group queue " << qName <<
- ": requeued message to group id=" << group << " acquired=" << state.acquired );
+ ": requeued message to group id=" << state.group << " acquired=" << state.acquired );
}
@@ -131,10 +148,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
+ GroupState& state = findGroup(qm);
assert( state.members.size() != 0 );
assert( state.acquired != 0 );
state.acquired -= 1;
@@ -162,49 +176,55 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
}
uint32_t total = state.members.size();
+ QPID_LOG( trace, "group queue " << qName <<
+ ": dequeued message from group id=" << state.group << " total=" << total );
+
if (total == 0) {
- QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first);
- messageGroups.erase( gs );
+ QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << state.group);
+ if (cachedGroup == &state) {
+ cachedGroup = 0;
+ }
+ std::string key(state.group);
+ messageGroups.erase( key );
} else if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id=" << gs->first);
+ ": consumer name=" << state.owner << " released group id=" << state.group);
disown(state);
} else if (reFreeNeeded) {
disown(state);
}
- QPID_LOG( trace, "group queue " << qName <<
- ": dequeued message from group id=" << group << " total=" << total );
}
+MessageGroupManager::~MessageGroupManager()
+{
+ QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
+}
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
if (messages.empty())
return false;
+ next.position = c->position;
if (!freeGroups.empty()) {
- framing::SequenceNumber nextFree = freeGroups.begin()->first;
- if (nextFree < c->position) { // next free group's msg is older than current position
- bool ok = messages.find(nextFree, next);
- (void) ok; assert( ok );
- } else {
- if (!messages.next( c->position, next ))
- return false; // shouldn't happen - should find nextFree
+ const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
+ if (nextFree < next.position) { // a free message is older than current
+ next.position = nextFree;
+ --next.position;
}
- } else { // no free groups available
- if (!messages.next( c->position, next ))
- return false;
}
- do {
- // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- std::string group( getGroupId( next ) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
- if (!state.owned() || state.owner == c->getName()) {
+ while (messages.next( next.position, next )) {
+ GroupState& group = findGroup(next);
+ if (!group.owned()) {
+ if (group.members.front() == next.position) { // only take from head!
+ return true;
+ }
+ QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
+ << "'s head message still pending. pos=" << group.members.front());
+ } else if (group.owner == c->getName()) {
return true;
}
- } while (messages.next( next.position, next ));
+ }
return false;
}
@@ -212,15 +232,12 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm)
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
+ GroupState& state = findGroup(qm);
if (!state.owned()) {
own( state, consumer );
QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << consumer << " has acquired group id=" << gs->first);
+ ": consumer name=" << consumer << " has acquired group id=" << state.group);
return true;
}
return state.owner == consumer;
@@ -360,8 +377,8 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
{
using namespace qpid::framing;
messageGroups.clear();
- //consumers.clear();
freeGroups.clear();
+ cachedGroup = 0;
framing::Array groupState(TYPE_CODE_MAP);
diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h
index 6c81ec14d1..f4bffc4760 100644
--- a/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/cpp/src/qpid/broker/MessageGroupManager.h
@@ -26,7 +26,7 @@
#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/MessageDistributor.h"
-
+#include "qpid/sys/unordered_map.h"
namespace qpid {
namespace broker {
@@ -55,7 +55,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
GroupState() : acquired(0) {}
bool owned() const {return !owner.empty();}
};
- typedef std::map<std::string, struct GroupState> GroupMap;
+
+ typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
GroupMap messageGroups; // index: group name
@@ -66,7 +67,12 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
static const std::string qpidMessageGroupTimestamp;
- const std::string getGroupId( const QueuedMessage& qm ) const;
+ GroupState& findGroup( const QueuedMessage& qm );
+ unsigned long hits, misses; // for debug
+ uint32_t lastMsg;
+ std::string lastGroup;
+ GroupState *cachedGroup;
+
void unFree( const GroupState& state );
void own( GroupState& state, const std::string& owner );
void disown( GroupState& state );
@@ -81,7 +87,10 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
MessageGroupManager(const std::string& header, const std::string& _qName,
Messages& container, unsigned int _timestamp=0 )
: StatefulQueueObserver(std::string("MessageGroupManager:") + header),
- groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {}
+ groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName),
+ hits(0), misses(0),
+ lastMsg(0), cachedGroup(0) {}
+ virtual ~MessageGroupManager();
// QueueObserver iface
void enqueued( const QueuedMessage& qm );
diff --git a/cpp/src/qpid/sys/unordered_map.h b/cpp/src/qpid/sys/unordered_map.h
new file mode 100644
index 0000000000..5f7f9567c5
--- /dev/null
+++ b/cpp/src/qpid/sys/unordered_map.h
@@ -0,0 +1,35 @@
+#ifndef _sys_unordered_map_h
+#define _sys_unordered_map_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// unordered_map include path is platform specific
+
+#ifdef _MSC_VER
+# include <unordered_map>
+#else
+# include <tr1/unordered_map>
+#endif /* _MSC_VER */
+namespace qpid {
+namespace sys {
+ using std::tr1::unordered_map;
+}}
+
+
+#endif /* _sys_unordered_map_h */