diff options
author | Alan Conway <aconway@apache.org> | 2008-11-07 20:48:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-07 20:48:38 +0000 |
commit | 0e107c3844c7078cf57212f16b1335dd50d4364c (patch) | |
tree | a62880c89a7b0e3a8cace6b96729fab2de34743e /cpp/src | |
parent | 15ea1b2572d040cbf62154b075b1e851cc15a22e (diff) | |
download | qpid-python-0e107c3844c7078cf57212f16b1335dd50d4364c.tar.gz |
broker/Message, IncompleteMessageList: drop waitFor(De|En)Complete, replace with callbacks.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@712258 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/IncompleteMessageList.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/broker/IncompleteMessageList.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 10 |
6 files changed, 53 insertions, 45 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp index edb3721a40..64562dfb57 100644 --- a/cpp/src/qpid/broker/IncompleteMessageList.cpp +++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp @@ -18,38 +18,55 @@ * under the License. * */ -#include "IncompleteMessageList.h" -#include "Message.h" +#include "IncompleteMessageList.h" namespace qpid { namespace broker { +IncompleteMessageList::IncompleteMessageList() : + callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1)) +{} + void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg) { + sys::Mutex::ScopedLock l(lock); + msg->setEnqueueCompleteCallback(callback); incomplete.push_back(msg); } -void IncompleteMessageList::process(const CompletionListener& l, bool sync) +void IncompleteMessageList::enqueueComplete(const boost::intrusive_ptr<Message>& ) { + sys::Mutex::ScopedLock l(lock); + lock.notify(); +} + +void IncompleteMessageList::process(const CompletionListener& listen, bool sync) { + sys::Mutex::ScopedLock l(lock); while (!incomplete.empty()) { boost::intrusive_ptr<Message>& msg = incomplete.front(); if (!msg->isEnqueueComplete()) { if (sync){ msg->flush(); - msg->waitForEnqueueComplete(); + while (!msg->isEnqueueComplete()) + lock.wait(); } else { //leave the message as incomplete for now return; } } - l(msg); + listen(msg); incomplete.pop_front(); } } -void IncompleteMessageList::each(const CompletionListener& l) { - std::for_each(incomplete.begin(), incomplete.end(), l); +void IncompleteMessageList::each(const CompletionListener& listen) { + Messages snapshot; + { + sys::Mutex::ScopedLock l(lock); + snapshot = incomplete; + } + std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value? } }} diff --git a/cpp/src/qpid/broker/IncompleteMessageList.h b/cpp/src/qpid/broker/IncompleteMessageList.h index 36cc1b4bf5..40c47cfaa6 100644 --- a/cpp/src/qpid/broker/IncompleteMessageList.h +++ b/cpp/src/qpid/broker/IncompleteMessageList.h @@ -21,23 +21,30 @@ #ifndef _IncompleteMessageList_ #define _IncompleteMessageList_ -#include <list> +#include "qpid/sys/Monitor.h" +#include "qpid/broker/Message.h" #include <boost/intrusive_ptr.hpp> #include <boost/function.hpp> +#include <list> namespace qpid { namespace broker { -class Message; - class IncompleteMessageList { typedef std::list< boost::intrusive_ptr<Message> > Messages; + + void enqueueComplete(const boost::intrusive_ptr<Message>&); + + sys::Monitor lock; Messages incomplete; + Message::MessageCallback callback; public: - typedef boost::function<void(boost::intrusive_ptr<Message>)> CompletionListener; + typedef Message::MessageCallback CompletionListener; + IncompleteMessageList(); + void add(boost::intrusive_ptr<Message> msg); void process(const CompletionListener& l, bool sync); void each(const CompletionListener& l); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 89c647358a..a99a10180e 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -348,7 +348,7 @@ void Message::allEnqueuesComplete() { sys::Mutex::ScopedLock l(lock); swap(cb, enqueueCallback); } - if (cb && *cb) (*cb)(*this); + if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } void Message::allDequeuesComplete() { @@ -357,7 +357,11 @@ void Message::allDequeuesComplete() { sys::Mutex::ScopedLock l(lock); swap(cb, dequeueCallback); } - if (cb && *cb) (*cb)(*this); + if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } +void Message::setEnqueueCompleteCallback(MessageCallback& cb) { enqueueCallback = &cb; } + +void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; } + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 762ec68fe8..8510ef78e9 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -48,7 +48,7 @@ class Queue; class Message : public PersistableMessage { public: - typedef boost::function<void (Message&)> MessageCallback; + typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; Message(const framing::SequenceNumber& id = framing::SequenceNumber()); ~Message(); @@ -145,10 +145,10 @@ public: void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */ - void setEnqueueCompleteCallback(const MessageCallback* cb); + void setEnqueueCompleteCallback(MessageCallback& cb); /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ - void setDequeueCompleteCallback(const MessageCallback& cb); + void setDequeueCompleteCallback(MessageCallback& cb); private: typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 920dfd6386..4d272c3780 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -63,25 +63,17 @@ void PersistableMessage::setContentReleased() {contentReleased = true; } bool PersistableMessage::isContentReleased()const { return contentReleased; } -void PersistableMessage::waitForEnqueueComplete() { - sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); - while (asyncEnqueueCounter > 0) { - asyncEnqueueLock.wait(); - } -} - bool PersistableMessage::isEnqueueComplete() { - sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); return asyncEnqueueCounter == 0; } void PersistableMessage::enqueueComplete() { bool notify = false; { - sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); if (asyncEnqueueCounter > 0) { if (--asyncEnqueueCounter == 0) { - asyncEnqueueLock.notify(); notify = true; } } @@ -109,36 +101,28 @@ void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, Messag } void PersistableMessage::enqueueAsync() { - sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); asyncEnqueueCounter++; } bool PersistableMessage::isDequeueComplete() { - sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); + sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); return asyncDequeueCounter == 0; } void PersistableMessage::dequeueComplete() { bool notify = false; { - sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); + sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); if (asyncDequeueCounter > 0) { if (--asyncDequeueCounter == 0) { notify = true; - asyncDequeueLock.notify(); } } } if (notify) allDequeuesComplete(); } -void PersistableMessage::waitForDequeueComplete() { - sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); - while (asyncDequeueCounter > 0) { - asyncDequeueLock.wait(); - } -} - void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); @@ -150,7 +134,7 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag } void PersistableMessage::dequeueAsync() { - sys::ScopedLock<sys::Monitor> l(asyncDequeueLock); + sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); asyncDequeueCounter++; } diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 59fa2e3d95..4f2e3abafa 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -28,7 +28,7 @@ #include <boost/weak_ptr.hpp> #include "Persistable.h" #include "qpid/framing/amqp_types.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Mutex.h" #include "PersistableQueue.h" namespace qpid { @@ -42,8 +42,8 @@ class MessageStore; class PersistableMessage : public Persistable { typedef std::list< boost::weak_ptr<PersistableQueue> > syncList; - sys::Monitor asyncEnqueueLock; - sys::Monitor asyncDequeueLock; + sys::Mutex asyncEnqueueLock; + sys::Mutex asyncDequeueLock; sys::Mutex storeLock; /** @@ -93,8 +93,6 @@ class PersistableMessage : public Persistable bool isContentReleased() const; - void waitForEnqueueComplete(); - bool isEnqueueComplete(); void enqueueComplete(); @@ -107,8 +105,6 @@ class PersistableMessage : public Persistable void dequeueComplete(); - void waitForDequeueComplete(); - void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); void dequeueAsync(); |