diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 27 |
1 files changed, 24 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index ff4a37c88f..329451d64e 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -49,7 +49,8 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {} + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), + inCallback(false), requiredCredit(0) {} Message::~Message() { @@ -398,35 +399,55 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que replacement[qfor] = msg; } +namespace { +struct ScopedSet { + sys::Monitor& lock; + bool& flag; + ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) { + sys::Monitor::ScopedLock sl(lock); + flag = true; + } + ~ScopedSet(){ + sys::Monitor::ScopedLock sl(lock); + flag = false; + lock.notifyAll(); + } +}; +} + void Message::allEnqueuesComplete() { - sys::Mutex::ScopedLock l(callbackLock); + ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = enqueueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } void Message::allDequeuesComplete() { - sys::Mutex::ScopedLock l(callbackLock); + ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } void Message::setEnqueueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); enqueueCallback = &cb; } void Message::resetEnqueueCompleteCallback() { sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); enqueueCallback = 0; } void Message::setDequeueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); dequeueCallback = &cb; } void Message::resetDequeueCompleteCallback() { sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); dequeueCallback = 0; } |