diff options
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.cpp | 68 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.h | 65 |
2 files changed, 80 insertions, 53 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp index 74bf6d0f85..d46fccc208 100644 --- a/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -26,50 +26,66 @@ namespace qpid { namespace sys { +AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {} + void AggregateOutput::abort() { control.abort(); } void AggregateOutput::activateOutput() { control.activateOutput(); } void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } -bool AggregateOutput::hasOutput() { - for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) - if ((*i)->hasOutput()) return true; - return false; +bool AggregateOutput::AggregateOutput::hasOutput() { + Mutex::ScopedLock l(lock); + return !tasks.empty(); } -bool AggregateOutput::doOutput() -{ - bool result = false; - if (!tasks.empty()) { - if (next >= tasks.size()) next = next % tasks.size(); +// Clear the busy flag and notify waiting threads in destructor. +struct ScopedBusy { + bool& flag; + Monitor& monitor; + ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; } + ~ScopedBusy() { flag = false; monitor.notifyAll(); } +}; + +bool AggregateOutput::doOutput() { + Mutex::ScopedLock l(lock); + ScopedBusy sb(busy, lock); - size_t start = next; - //loop until a task generated some output - while (!result) { - result = tasks[next++]->doOutput(); - if (tasks.empty()) break; - if (next >= tasks.size()) next = next % tasks.size(); - if (start == next) break; + while (!tasks.empty()) { + OutputTask* t=tasks.front(); + tasks.pop_front(); + bool didOutput; + { + // Allow concurrent call to addOutputTask. + // removeOutputTask will wait till !busy before removing a task. + Mutex::ScopedUnlock u(lock); + didOutput = t->doOutput(); + } + if (didOutput) { + tasks.push_back(t); + return true; } } - return result; + return false; } - -void AggregateOutput::addOutputTask(OutputTask* t) -{ - tasks.push_back(t); + +void AggregateOutput::addOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + tasks.push_back(task); } -void AggregateOutput::removeOutputTask(OutputTask* t) -{ - TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t); - if (i != tasks.end()) tasks.erase(i); +void AggregateOutput::removeOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); + tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end()); } - + void AggregateOutput::removeAll() { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); tasks.clear(); } + }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h index b33113796c..4e3190a093 100644 --- a/cpp/src/qpid/sys/AggregateOutput.h +++ b/cpp/src/qpid/sys/AggregateOutput.h @@ -21,47 +21,58 @@ #ifndef _AggregateOutput_ #define _AggregateOutput_ -#include "Mutex.h" +#include "Monitor.h" #include "OutputControl.h" #include "OutputTask.h" #include "qpid/CommonImportExport.h" #include <algorithm> -#include <vector> +#include <deque> namespace qpid { namespace sys { - class AggregateOutput : public OutputTask, public OutputControl - { - typedef std::vector<OutputTask*> TaskList; +/** + * Holds a collection of output tasks, doOutput picks the next one to execute. + * + * Tasks are automatically removed if their doOutput() or hasOutput() returns false. + * + * Thread safe. addOutputTask may be called in one connection thread while + * doOutput is called in another. + */ + +class AggregateOutput : public OutputTask, public OutputControl +{ + typedef std::deque<OutputTask*> TaskList; + + Monitor lock; + TaskList tasks; + bool busy; + OutputControl& control; - TaskList tasks; - size_t next; - OutputControl& control; + public: + QPID_COMMON_EXTERN AggregateOutput(OutputControl& c); - public: - AggregateOutput(OutputControl& c) : next(0), control(c) {}; - //this may be called on any thread - QPID_COMMON_EXTERN void abort(); - QPID_COMMON_EXTERN void activateOutput(); - QPID_COMMON_EXTERN void giveReadCredit(int32_t); + // These may be called concurrently with any function. + QPID_COMMON_EXTERN void abort(); + QPID_COMMON_EXTERN void activateOutput(); + QPID_COMMON_EXTERN void giveReadCredit(int32_t); + QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); - //all the following will be called on the same thread - QPID_COMMON_EXTERN bool doOutput(); - QPID_COMMON_EXTERN bool hasOutput(); - QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); - QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t); - QPID_COMMON_EXTERN void removeAll(); + // These functions must not be called concurrently with each other. + QPID_COMMON_EXTERN bool doOutput(); + QPID_COMMON_EXTERN bool hasOutput(); + QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t); + QPID_COMMON_EXTERN void removeAll(); - /** Apply f to each OutputTask* in the tasks list */ - template <class F> void eachOutput(F f) { - std::for_each(tasks.begin(), tasks.end(), f); - } - }; + /** Apply f to each OutputTask* in the tasks list */ + template <class F> void eachOutput(F f) { + Mutex::ScopedLock l(lock); + std::for_each(tasks.begin(), tasks.end(), f); + } +}; -} -} +}} // namespace qpid::sys #endif |