diff options
Diffstat (limited to 'cpp/src/qpid/sys/AggregateOutput.cpp')
-rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.cpp | 83 |
1 files changed, 53 insertions, 30 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp index 2fad28c381..709d3bc640 100644 --- a/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,44 +25,67 @@ namespace qpid { namespace sys { - -void AggregateOutput::activateOutput() -{ - control.activateOutput(); -} + +AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {} + +void AggregateOutput::abort() { control.abort(); } + +void AggregateOutput::activateOutput() { control.activateOutput(); } + +void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } bool AggregateOutput::hasOutput() { - for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) - if ((*i)->hasOutput()) return true; - return false; + Mutex::ScopedLock l(lock); + return !tasks.empty(); } -bool AggregateOutput::doOutput() -{ - bool result = false; - if (!tasks.empty()) { - if (next >= tasks.size()) next = next % tasks.size(); - - size_t start = next; - //loop until a task generated some output - while (!result) { - result = tasks[next++]->doOutput(); - if (next >= tasks.size()) next = next % tasks.size(); - if (start == next) break; +// Clear the busy flag and notify waiting threads in destructor. +struct ScopedBusy { + bool& flag; + Monitor& monitor; + ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; } + ~ScopedBusy() { flag = false; monitor.notifyAll(); } +}; + +bool AggregateOutput::doOutput() { + Mutex::ScopedLock l(lock); + ScopedBusy sb(busy, lock); + + while (!tasks.empty()) { + OutputTask* t=tasks.front(); + tasks.pop_front(); + bool didOutput; + { + // Allow concurrent call to addOutputTask. + // removeOutputTask will wait till !busy before removing a task. + Mutex::ScopedUnlock u(lock); + didOutput = t->doOutput(); + } + if (didOutput) { + tasks.push_back(t); + return true; } } - return result; + return false; +} + +void AggregateOutput::addOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + tasks.push_back(task); } -void AggregateOutput::addOutputTask(OutputTask* t) -{ - tasks.push_back(t); +void AggregateOutput::removeOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); + tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end()); } - -void AggregateOutput::removeOutputTask(OutputTask* t) + +void AggregateOutput::removeAll() { - TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t); - if (i != tasks.end()) tasks.erase(i); + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); + tasks.clear(); } + }} // namespace qpid::sys |