diff options
author | Alan Conway <aconway@apache.org> | 2009-06-16 21:21:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-06-16 21:21:09 +0000 |
commit | 80d65b38008d9b7f31c825508819f9600d63b63c (patch) | |
tree | 316862bff35f1cae6f0d1152dcf4a6e3b0f967ed /cpp/src/qpid/sys/AggregateOutput.cpp | |
parent | f5e98a6dfb8c4defe22755340f440e6f16c2559a (diff) | |
download | qpid-python-80d65b38008d9b7f31c825508819f9600d63b63c.tar.gz |
Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection
holding only the OutputTasks that are potentially active. Tasks are
droped from the list as soon as they return false, and added back when
they may have output.
Inlined frequently-used SequenceNumber functions.
Replace std::list in QueueListeners with std::vector.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@785408 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/AggregateOutput.cpp')
-rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.cpp | 68 |
1 files changed, 42 insertions, 26 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp index 74bf6d0f85..d46fccc208 100644 --- a/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -26,50 +26,66 @@ namespace qpid { namespace sys { +AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {} + void AggregateOutput::abort() { control.abort(); } void AggregateOutput::activateOutput() { control.activateOutput(); } void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } -bool AggregateOutput::hasOutput() { - for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) - if ((*i)->hasOutput()) return true; - return false; +bool AggregateOutput::AggregateOutput::hasOutput() { + Mutex::ScopedLock l(lock); + return !tasks.empty(); } -bool AggregateOutput::doOutput() -{ - bool result = false; - if (!tasks.empty()) { - if (next >= tasks.size()) next = next % tasks.size(); +// Clear the busy flag and notify waiting threads in destructor. +struct ScopedBusy { + bool& flag; + Monitor& monitor; + ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; } + ~ScopedBusy() { flag = false; monitor.notifyAll(); } +}; + +bool AggregateOutput::doOutput() { + Mutex::ScopedLock l(lock); + ScopedBusy sb(busy, lock); - size_t start = next; - //loop until a task generated some output - while (!result) { - result = tasks[next++]->doOutput(); - if (tasks.empty()) break; - if (next >= tasks.size()) next = next % tasks.size(); - if (start == next) break; + while (!tasks.empty()) { + OutputTask* t=tasks.front(); + tasks.pop_front(); + bool didOutput; + { + // Allow concurrent call to addOutputTask. + // removeOutputTask will wait till !busy before removing a task. + Mutex::ScopedUnlock u(lock); + didOutput = t->doOutput(); + } + if (didOutput) { + tasks.push_back(t); + return true; } } - return result; + return false; } - -void AggregateOutput::addOutputTask(OutputTask* t) -{ - tasks.push_back(t); + +void AggregateOutput::addOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + tasks.push_back(task); } -void AggregateOutput::removeOutputTask(OutputTask* t) -{ - TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t); - if (i != tasks.end()) tasks.erase(i); +void AggregateOutput::removeOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); + tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end()); } - + void AggregateOutput::removeAll() { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); tasks.clear(); } + }} // namespace qpid::sys |