diff options
Diffstat (limited to 'cpp/src/qpid/sys/AggregateOutput.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.cpp | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp index fc95f46fb9..ff9c740926 100644 --- a/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -51,6 +51,7 @@ bool AggregateOutput::doOutput() { while (!tasks.empty()) { OutputTask* t=tasks.front(); tasks.pop_front(); + taskSet.erase(t); bool didOutput; { // Allow concurrent call to addOutputTask. @@ -59,7 +60,9 @@ bool AggregateOutput::doOutput() { didOutput = t->doOutput(); } if (didOutput) { - tasks.push_back(t); + if (taskSet.insert(t).second) { + tasks.push_back(t); + } return true; } } @@ -68,12 +71,15 @@ bool AggregateOutput::doOutput() { void AggregateOutput::addOutputTask(OutputTask* task) { Mutex::ScopedLock l(lock); - tasks.push_back(task); + if (taskSet.insert(task).second) { + tasks.push_back(task); + } } void AggregateOutput::removeOutputTask(OutputTask* task) { Mutex::ScopedLock l(lock); while (busy) lock.wait(); + taskSet.erase(task); tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end()); } @@ -81,6 +87,7 @@ void AggregateOutput::removeAll() { Mutex::ScopedLock l(lock); while (busy) lock.wait(); + taskSet.clear(); tasks.clear(); } |
