diff options
author | Alan Conway <aconway@apache.org> | 2012-11-09 16:56:44 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-11-09 16:56:44 +0000 |
commit | 464ae39a9a52becfb9b22fa2d5efc8624617bffa (patch) | |
tree | ad18fd5296e39e0d602985f3d631587216c57b53 | |
parent | 7ec68252849613f79171cf2e42eaf3e308f01d36 (diff) | |
download | qpid-python-464ae39a9a52becfb9b22fa2d5efc8624617bffa.tar.gz |
QPID-4430: HA QMF queue events do not propagate to backups under load (Jason Dillaman)
In a stress tests QMF events were not being propagated to backups. Discovered
that the inter-broker link had hundreds of thousands of enqueued OutputTasks --
representing only a few unique consumers. There should only be only a single
output task for a given consumer. This appears to have stalled the delivery of
QMF messages to the backup broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1407543 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/sys/AggregateOutput.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AggregateOutput.h | 3 |
2 files changed, 12 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp index fc95f46fb9..ff9c740926 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp @@ -51,6 +51,7 @@ bool AggregateOutput::doOutput() { while (!tasks.empty()) { OutputTask* t=tasks.front(); tasks.pop_front(); + taskSet.erase(t); bool didOutput; { // Allow concurrent call to addOutputTask. @@ -59,7 +60,9 @@ bool AggregateOutput::doOutput() { didOutput = t->doOutput(); } if (didOutput) { - tasks.push_back(t); + if (taskSet.insert(t).second) { + tasks.push_back(t); + } return true; } } @@ -68,12 +71,15 @@ bool AggregateOutput::doOutput() { void AggregateOutput::addOutputTask(OutputTask* task) { Mutex::ScopedLock l(lock); - tasks.push_back(task); + if (taskSet.insert(task).second) { + tasks.push_back(task); + } } void AggregateOutput::removeOutputTask(OutputTask* task) { Mutex::ScopedLock l(lock); while (busy) lock.wait(); + taskSet.erase(task); tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end()); } @@ -81,6 +87,7 @@ void AggregateOutput::removeAll() { Mutex::ScopedLock l(lock); while (busy) lock.wait(); + taskSet.clear(); tasks.clear(); } diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h index d7c0ff29e3..802722ad26 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.h +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h @@ -28,6 +28,7 @@ #include <algorithm> #include <deque> +#include <set> namespace qpid { namespace sys { @@ -44,9 +45,11 @@ namespace sys { class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl { typedef std::deque<OutputTask*> TaskList; + typedef std::set<OutputTask*> TaskSet; Monitor lock; TaskList tasks; + TaskSet taskSet; bool busy; OutputControl& control; |