summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/IncompleteMessageList.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-07 20:48:38 +0000
committerAlan Conway <aconway@apache.org>2008-11-07 20:48:38 +0000
commit0e107c3844c7078cf57212f16b1335dd50d4364c (patch)
treea62880c89a7b0e3a8cace6b96729fab2de34743e /cpp/src/qpid/broker/IncompleteMessageList.cpp
parent15ea1b2572d040cbf62154b075b1e851cc15a22e (diff)
downloadqpid-python-0e107c3844c7078cf57212f16b1335dd50d4364c.tar.gz
broker/Message, IncompleteMessageList: drop waitFor(De|En)Complete, replace with callbacks.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@712258 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/IncompleteMessageList.cpp')
-rw-r--r--cpp/src/qpid/broker/IncompleteMessageList.cpp31
1 files changed, 24 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp
index edb3721a40..64562dfb57 100644
--- a/cpp/src/qpid/broker/IncompleteMessageList.cpp
+++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp
@@ -18,38 +18,55 @@
* under the License.
*
*/
-#include "IncompleteMessageList.h"
-#include "Message.h"
+#include "IncompleteMessageList.h"
namespace qpid {
namespace broker {
+IncompleteMessageList::IncompleteMessageList() :
+ callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1))
+{}
+
void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
{
+ sys::Mutex::ScopedLock l(lock);
+ msg->setEnqueueCompleteCallback(callback);
incomplete.push_back(msg);
}
-void IncompleteMessageList::process(const CompletionListener& l, bool sync)
+void IncompleteMessageList::enqueueComplete(const boost::intrusive_ptr<Message>& ) {
+ sys::Mutex::ScopedLock l(lock);
+ lock.notify();
+}
+
+void IncompleteMessageList::process(const CompletionListener& listen, bool sync)
{
+ sys::Mutex::ScopedLock l(lock);
while (!incomplete.empty()) {
boost::intrusive_ptr<Message>& msg = incomplete.front();
if (!msg->isEnqueueComplete()) {
if (sync){
msg->flush();
- msg->waitForEnqueueComplete();
+ while (!msg->isEnqueueComplete())
+ lock.wait();
} else {
//leave the message as incomplete for now
return;
}
}
- l(msg);
+ listen(msg);
incomplete.pop_front();
}
}
-void IncompleteMessageList::each(const CompletionListener& l) {
- std::for_each(incomplete.begin(), incomplete.end(), l);
+void IncompleteMessageList::each(const CompletionListener& listen) {
+ Messages snapshot;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ snapshot = incomplete;
+ }
+ std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value?
}
}}