summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/IncompleteMessageList.cpp3
-rw-r--r--cpp/src/qpid/broker/Message.cpp27
-rw-r--r--cpp/src/qpid/broker/Message.h5
3 files changed, 29 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp
index 02265ab85c..a061e872d0 100644
--- a/cpp/src/qpid/broker/IncompleteMessageList.cpp
+++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp
@@ -30,7 +30,8 @@ IncompleteMessageList::IncompleteMessageList() :
IncompleteMessageList::~IncompleteMessageList()
{
- sys::Mutex::ScopedLock l(lock);
+ // No lock here. We are relying on Messsag::reset*CompleteCallback
+ // to ensure no callbacks are in progress before they return.
for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ++i) {
(*i)->resetEnqueueCompleteCallback();
(*i)->resetDequeueCompleteCallback();
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index ff4a37c88f..329451d64e 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -49,7 +49,8 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
+ expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
+ inCallback(false), requiredCredit(0) {}
Message::~Message()
{
@@ -398,35 +399,55 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que
replacement[qfor] = msg;
}
+namespace {
+struct ScopedSet {
+ sys::Monitor& lock;
+ bool& flag;
+ ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
+ sys::Monitor::ScopedLock sl(lock);
+ flag = true;
+ }
+ ~ScopedSet(){
+ sys::Monitor::ScopedLock sl(lock);
+ flag = false;
+ lock.notifyAll();
+ }
+};
+}
+
void Message::allEnqueuesComplete() {
- sys::Mutex::ScopedLock l(callbackLock);
+ ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = enqueueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
- sys::Mutex::ScopedLock l(callbackLock);
+ ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
enqueueCallback = &cb;
}
void Message::resetEnqueueCompleteCallback() {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
enqueueCallback = 0;
}
void Message::setDequeueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
dequeueCallback = &cb;
}
void Message::resetDequeueCompleteCallback() {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
dequeueCallback = 0;
}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 0a7772040b..353044c577 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -26,7 +26,7 @@
#include "qpid/broker/PersistableMessage.h"
#include "qpid/broker/MessageAdapter.h"
#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
@@ -189,9 +189,10 @@ public:
mutable Replacement replacement;
mutable boost::intrusive_ptr<Message> empty;
- sys::Mutex callbackLock;
+ sys::Monitor callbackLock;
MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
+ bool inCallback;
uint32_t requiredCredit;
};