summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/IncompleteMessageList.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/IncompleteMessageList.cpp')
-rw-r--r--cpp/src/qpid/broker/IncompleteMessageList.cpp31
1 files changed, 24 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp
index edb3721a40..64562dfb57 100644
--- a/cpp/src/qpid/broker/IncompleteMessageList.cpp
+++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp
@@ -18,38 +18,55 @@
* under the License.
*
*/
-#include "IncompleteMessageList.h"
-#include "Message.h"
+#include "IncompleteMessageList.h"
namespace qpid {
namespace broker {
+IncompleteMessageList::IncompleteMessageList() :
+ callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1))
+{}
+
void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
{
+ sys::Mutex::ScopedLock l(lock);
+ msg->setEnqueueCompleteCallback(callback);
incomplete.push_back(msg);
}
-void IncompleteMessageList::process(const CompletionListener& l, bool sync)
+void IncompleteMessageList::enqueueComplete(const boost::intrusive_ptr<Message>& ) {
+ sys::Mutex::ScopedLock l(lock);
+ lock.notify();
+}
+
+void IncompleteMessageList::process(const CompletionListener& listen, bool sync)
{
+ sys::Mutex::ScopedLock l(lock);
while (!incomplete.empty()) {
boost::intrusive_ptr<Message>& msg = incomplete.front();
if (!msg->isEnqueueComplete()) {
if (sync){
msg->flush();
- msg->waitForEnqueueComplete();
+ while (!msg->isEnqueueComplete())
+ lock.wait();
} else {
//leave the message as incomplete for now
return;
}
}
- l(msg);
+ listen(msg);
incomplete.pop_front();
}
}
-void IncompleteMessageList::each(const CompletionListener& l) {
- std::for_each(incomplete.begin(), incomplete.end(), l);
+void IncompleteMessageList::each(const CompletionListener& listen) {
+ Messages snapshot;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ snapshot = incomplete;
+ }
+ std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value?
}
}}