diff options
Diffstat (limited to 'cpp/src/qpid/broker/IncompleteMessageList.cpp')
-rw-r--r-- | cpp/src/qpid/broker/IncompleteMessageList.cpp | 31 |
1 files changed, 24 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp index edb3721a40..64562dfb57 100644 --- a/cpp/src/qpid/broker/IncompleteMessageList.cpp +++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp @@ -18,38 +18,55 @@ * under the License. * */ -#include "IncompleteMessageList.h" -#include "Message.h" +#include "IncompleteMessageList.h" namespace qpid { namespace broker { +IncompleteMessageList::IncompleteMessageList() : + callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1)) +{} + void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg) { + sys::Mutex::ScopedLock l(lock); + msg->setEnqueueCompleteCallback(callback); incomplete.push_back(msg); } -void IncompleteMessageList::process(const CompletionListener& l, bool sync) +void IncompleteMessageList::enqueueComplete(const boost::intrusive_ptr<Message>& ) { + sys::Mutex::ScopedLock l(lock); + lock.notify(); +} + +void IncompleteMessageList::process(const CompletionListener& listen, bool sync) { + sys::Mutex::ScopedLock l(lock); while (!incomplete.empty()) { boost::intrusive_ptr<Message>& msg = incomplete.front(); if (!msg->isEnqueueComplete()) { if (sync){ msg->flush(); - msg->waitForEnqueueComplete(); + while (!msg->isEnqueueComplete()) + lock.wait(); } else { //leave the message as incomplete for now return; } } - l(msg); + listen(msg); incomplete.pop_front(); } } -void IncompleteMessageList::each(const CompletionListener& l) { - std::for_each(incomplete.begin(), incomplete.end(), l); +void IncompleteMessageList::each(const CompletionListener& listen) { + Messages snapshot; + { + sys::Mutex::ScopedLock l(lock); + snapshot = incomplete; + } + std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value? } }} |