summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AggregateOutput.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/AggregateOutput.cpp')
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.cpp13
1 files changed, 9 insertions, 4 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp
index fc95f46fb9..ebc5689ce5 100644
--- a/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -32,8 +32,6 @@ void AggregateOutput::abort() { control.abort(); }
void AggregateOutput::activateOutput() { control.activateOutput(); }
-void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
-
namespace {
// Clear the busy flag and notify waiting threads in destructor.
struct ScopedBusy {
@@ -51,6 +49,7 @@ bool AggregateOutput::doOutput() {
while (!tasks.empty()) {
OutputTask* t=tasks.front();
tasks.pop_front();
+ taskSet.erase(t);
bool didOutput;
{
// Allow concurrent call to addOutputTask.
@@ -59,7 +58,9 @@ bool AggregateOutput::doOutput() {
didOutput = t->doOutput();
}
if (didOutput) {
- tasks.push_back(t);
+ if (taskSet.insert(t).second) {
+ tasks.push_back(t);
+ }
return true;
}
}
@@ -68,12 +69,15 @@ bool AggregateOutput::doOutput() {
void AggregateOutput::addOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
- tasks.push_back(task);
+ if (taskSet.insert(task).second) {
+ tasks.push_back(task);
+ }
}
void AggregateOutput::removeOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.erase(task);
tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
}
@@ -81,6 +85,7 @@ void AggregateOutput::removeAll()
{
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.clear();
tasks.clear();
}