diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/IncompleteMessageList.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 27 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 5 |
3 files changed, 29 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp index 02265ab85c..a061e872d0 100644 --- a/cpp/src/qpid/broker/IncompleteMessageList.cpp +++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp @@ -30,7 +30,8 @@ IncompleteMessageList::IncompleteMessageList() : IncompleteMessageList::~IncompleteMessageList() { - sys::Mutex::ScopedLock l(lock); + // No lock here. We are relying on Messsag::reset*CompleteCallback + // to ensure no callbacks are in progress before they return. for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ++i) { (*i)->resetEnqueueCompleteCallback(); (*i)->resetDequeueCompleteCallback(); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index ff4a37c88f..329451d64e 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -49,7 +49,8 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {} + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), + inCallback(false), requiredCredit(0) {} Message::~Message() { @@ -398,35 +399,55 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que replacement[qfor] = msg; } +namespace { +struct ScopedSet { + sys::Monitor& lock; + bool& flag; + ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) { + sys::Monitor::ScopedLock sl(lock); + flag = true; + } + ~ScopedSet(){ + sys::Monitor::ScopedLock sl(lock); + flag = false; + lock.notifyAll(); + } +}; +} + void Message::allEnqueuesComplete() { - sys::Mutex::ScopedLock l(callbackLock); + ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = enqueueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } void Message::allDequeuesComplete() { - sys::Mutex::ScopedLock l(callbackLock); + ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } void Message::setEnqueueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); enqueueCallback = &cb; } void Message::resetEnqueueCompleteCallback() { sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); enqueueCallback = 0; } void Message::setDequeueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); dequeueCallback = &cb; } void Message::resetDequeueCompleteCallback() { sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); dequeueCallback = 0; } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 0a7772040b..353044c577 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -26,7 +26,7 @@ #include "qpid/broker/PersistableMessage.h" #include "qpid/broker/MessageAdapter.h" #include "qpid/framing/amqp_types.h" -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include <boost/function.hpp> #include <boost/shared_ptr.hpp> @@ -189,9 +189,10 @@ public: mutable Replacement replacement; mutable boost::intrusive_ptr<Message> empty; - sys::Mutex callbackLock; + sys::Monitor callbackLock; MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; + bool inCallback; uint32_t requiredCredit; }; |