summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp27
1 files changed, 24 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index ff4a37c88f..329451d64e 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -49,7 +49,8 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
+ expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
+ inCallback(false), requiredCredit(0) {}
Message::~Message()
{
@@ -398,35 +399,55 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que
replacement[qfor] = msg;
}
+namespace {
+struct ScopedSet {
+ sys::Monitor& lock;
+ bool& flag;
+ ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
+ sys::Monitor::ScopedLock sl(lock);
+ flag = true;
+ }
+ ~ScopedSet(){
+ sys::Monitor::ScopedLock sl(lock);
+ flag = false;
+ lock.notifyAll();
+ }
+};
+}
+
void Message::allEnqueuesComplete() {
- sys::Mutex::ScopedLock l(callbackLock);
+ ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = enqueueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
- sys::Mutex::ScopedLock l(callbackLock);
+ ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
enqueueCallback = &cb;
}
void Message::resetEnqueueCompleteCallback() {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
enqueueCallback = 0;
}
void Message::setDequeueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
dequeueCallback = &cb;
}
void Message::resetDequeueCompleteCallback() {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
dequeueCallback = 0;
}