diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/AggregateOutput.h')
-rw-r--r-- | qpid/cpp/src/qpid/sys/AggregateOutput.h | 65 |
1 files changed, 38 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.h b/qpid/cpp/src/qpid/sys/AggregateOutput.h index b33113796c..4e3190a093 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.h +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.h @@ -21,47 +21,58 @@ #ifndef _AggregateOutput_ #define _AggregateOutput_ -#include "Mutex.h" +#include "Monitor.h" #include "OutputControl.h" #include "OutputTask.h" #include "qpid/CommonImportExport.h" #include <algorithm> -#include <vector> +#include <deque> namespace qpid { namespace sys { - class AggregateOutput : public OutputTask, public OutputControl - { - typedef std::vector<OutputTask*> TaskList; +/** + * Holds a collection of output tasks, doOutput picks the next one to execute. + * + * Tasks are automatically removed if their doOutput() or hasOutput() returns false. + * + * Thread safe. addOutputTask may be called in one connection thread while + * doOutput is called in another. + */ + +class AggregateOutput : public OutputTask, public OutputControl +{ + typedef std::deque<OutputTask*> TaskList; + + Monitor lock; + TaskList tasks; + bool busy; + OutputControl& control; - TaskList tasks; - size_t next; - OutputControl& control; + public: + QPID_COMMON_EXTERN AggregateOutput(OutputControl& c); - public: - AggregateOutput(OutputControl& c) : next(0), control(c) {}; - //this may be called on any thread - QPID_COMMON_EXTERN void abort(); - QPID_COMMON_EXTERN void activateOutput(); - QPID_COMMON_EXTERN void giveReadCredit(int32_t); + // These may be called concurrently with any function. + QPID_COMMON_EXTERN void abort(); + QPID_COMMON_EXTERN void activateOutput(); + QPID_COMMON_EXTERN void giveReadCredit(int32_t); + QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); - //all the following will be called on the same thread - QPID_COMMON_EXTERN bool doOutput(); - QPID_COMMON_EXTERN bool hasOutput(); - QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); - QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t); - QPID_COMMON_EXTERN void removeAll(); + // These functions must not be called concurrently with each other. + QPID_COMMON_EXTERN bool doOutput(); + QPID_COMMON_EXTERN bool hasOutput(); + QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t); + QPID_COMMON_EXTERN void removeAll(); - /** Apply f to each OutputTask* in the tasks list */ - template <class F> void eachOutput(F f) { - std::for_each(tasks.begin(), tasks.end(), f); - } - }; + /** Apply f to each OutputTask* in the tasks list */ + template <class F> void eachOutput(F f) { + Mutex::ScopedLock l(lock); + std::for_each(tasks.begin(), tasks.end(), f); + } +}; -} -} +}} // namespace qpid::sys #endif |