summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-11-09 16:56:44 +0000
committerAlan Conway <aconway@apache.org>2012-11-09 16:56:44 +0000
commit464ae39a9a52becfb9b22fa2d5efc8624617bffa (patch)
treead18fd5296e39e0d602985f3d631587216c57b53
parent7ec68252849613f79171cf2e42eaf3e308f01d36 (diff)
downloadqpid-python-464ae39a9a52becfb9b22fa2d5efc8624617bffa.tar.gz
QPID-4430: HA QMF queue events do not propagate to backups under load (Jason Dillaman)
In a stress tests QMF events were not being propagated to backups. Discovered that the inter-broker link had hundreds of thousands of enqueued OutputTasks -- representing only a few unique consumers. There should only be only a single output task for a given consumer. This appears to have stalled the delivery of QMF messages to the backup broker git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1407543 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp11
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.h3
2 files changed, 12 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
index fc95f46fb9..ff9c740926 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -51,6 +51,7 @@ bool AggregateOutput::doOutput() {
while (!tasks.empty()) {
OutputTask* t=tasks.front();
tasks.pop_front();
+ taskSet.erase(t);
bool didOutput;
{
// Allow concurrent call to addOutputTask.
@@ -59,7 +60,9 @@ bool AggregateOutput::doOutput() {
didOutput = t->doOutput();
}
if (didOutput) {
- tasks.push_back(t);
+ if (taskSet.insert(t).second) {
+ tasks.push_back(t);
+ }
return true;
}
}
@@ -68,12 +71,15 @@ bool AggregateOutput::doOutput() {
void AggregateOutput::addOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
- tasks.push_back(task);
+ if (taskSet.insert(task).second) {
+ tasks.push_back(task);
+ }
}
void AggregateOutput::removeOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.erase(task);
tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
}
@@ -81,6 +87,7 @@ void AggregateOutput::removeAll()
{
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.clear();
tasks.clear();
}
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h
index d7c0ff29e3..802722ad26 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.h
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h
@@ -28,6 +28,7 @@
#include <algorithm>
#include <deque>
+#include <set>
namespace qpid {
namespace sys {
@@ -44,9 +45,11 @@ namespace sys {
class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
{
typedef std::deque<OutputTask*> TaskList;
+ typedef std::set<OutputTask*> TaskSet;
Monitor lock;
TaskList tasks;
+ TaskSet taskSet;
bool busy;
OutputControl& control;