summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/IncompleteMessageList.cpp31
-rw-r--r--cpp/src/qpid/broker/IncompleteMessageList.h15
-rw-r--r--cpp/src/qpid/broker/Message.cpp8
-rw-r--r--cpp/src/qpid/broker/Message.h6
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp28
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h10
6 files changed, 53 insertions, 45 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp
index edb3721a40..64562dfb57 100644
--- a/cpp/src/qpid/broker/IncompleteMessageList.cpp
+++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp
@@ -18,38 +18,55 @@
* under the License.
*
*/
-#include "IncompleteMessageList.h"
-#include "Message.h"
+#include "IncompleteMessageList.h"
namespace qpid {
namespace broker {
+IncompleteMessageList::IncompleteMessageList() :
+ callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1))
+{}
+
void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
{
+ sys::Mutex::ScopedLock l(lock);
+ msg->setEnqueueCompleteCallback(callback);
incomplete.push_back(msg);
}
-void IncompleteMessageList::process(const CompletionListener& l, bool sync)
+void IncompleteMessageList::enqueueComplete(const boost::intrusive_ptr<Message>& ) {
+ sys::Mutex::ScopedLock l(lock);
+ lock.notify();
+}
+
+void IncompleteMessageList::process(const CompletionListener& listen, bool sync)
{
+ sys::Mutex::ScopedLock l(lock);
while (!incomplete.empty()) {
boost::intrusive_ptr<Message>& msg = incomplete.front();
if (!msg->isEnqueueComplete()) {
if (sync){
msg->flush();
- msg->waitForEnqueueComplete();
+ while (!msg->isEnqueueComplete())
+ lock.wait();
} else {
//leave the message as incomplete for now
return;
}
}
- l(msg);
+ listen(msg);
incomplete.pop_front();
}
}
-void IncompleteMessageList::each(const CompletionListener& l) {
- std::for_each(incomplete.begin(), incomplete.end(), l);
+void IncompleteMessageList::each(const CompletionListener& listen) {
+ Messages snapshot;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ snapshot = incomplete;
+ }
+ std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME aconway 2008-11-07: passed by ref or value?
}
}}
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.h b/cpp/src/qpid/broker/IncompleteMessageList.h
index 36cc1b4bf5..40c47cfaa6 100644
--- a/cpp/src/qpid/broker/IncompleteMessageList.h
+++ b/cpp/src/qpid/broker/IncompleteMessageList.h
@@ -21,23 +21,30 @@
#ifndef _IncompleteMessageList_
#define _IncompleteMessageList_
-#include <list>
+#include "qpid/sys/Monitor.h"
+#include "qpid/broker/Message.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/function.hpp>
+#include <list>
namespace qpid {
namespace broker {
-class Message;
-
class IncompleteMessageList
{
typedef std::list< boost::intrusive_ptr<Message> > Messages;
+
+ void enqueueComplete(const boost::intrusive_ptr<Message>&);
+
+ sys::Monitor lock;
Messages incomplete;
+ Message::MessageCallback callback;
public:
- typedef boost::function<void(boost::intrusive_ptr<Message>)> CompletionListener;
+ typedef Message::MessageCallback CompletionListener;
+ IncompleteMessageList();
+
void add(boost::intrusive_ptr<Message> msg);
void process(const CompletionListener& l, bool sync);
void each(const CompletionListener& l);
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 89c647358a..a99a10180e 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -348,7 +348,7 @@ void Message::allEnqueuesComplete() {
sys::Mutex::ScopedLock l(lock);
swap(cb, enqueueCallback);
}
- if (cb && *cb) (*cb)(*this);
+ if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
@@ -357,7 +357,11 @@ void Message::allDequeuesComplete() {
sys::Mutex::ScopedLock l(lock);
swap(cb, dequeueCallback);
}
- if (cb && *cb) (*cb)(*this);
+ if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
+void Message::setEnqueueCompleteCallback(MessageCallback& cb) { enqueueCallback = &cb; }
+
+void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; }
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 762ec68fe8..8510ef78e9 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -48,7 +48,7 @@ class Queue;
class Message : public PersistableMessage {
public:
- typedef boost::function<void (Message&)> MessageCallback;
+ typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
Message(const framing::SequenceNumber& id = framing::SequenceNumber());
~Message();
@@ -145,10 +145,10 @@ public:
void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
/** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
- void setEnqueueCompleteCallback(const MessageCallback* cb);
+ void setEnqueueCompleteCallback(MessageCallback& cb);
/** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
- void setDequeueCompleteCallback(const MessageCallback& cb);
+ void setDequeueCompleteCallback(MessageCallback& cb);
private:
typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 920dfd6386..4d272c3780 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -63,25 +63,17 @@ void PersistableMessage::setContentReleased() {contentReleased = true; }
bool PersistableMessage::isContentReleased()const { return contentReleased; }
-void PersistableMessage::waitForEnqueueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
- while (asyncEnqueueCounter > 0) {
- asyncEnqueueLock.wait();
- }
-}
-
bool PersistableMessage::isEnqueueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
return asyncEnqueueCounter == 0;
}
void PersistableMessage::enqueueComplete() {
bool notify = false;
{
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
if (asyncEnqueueCounter > 0) {
if (--asyncEnqueueCounter == 0) {
- asyncEnqueueLock.notify();
notify = true;
}
}
@@ -109,36 +101,28 @@ void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, Messag
}
void PersistableMessage::enqueueAsync() {
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
asyncEnqueueCounter++;
}
bool PersistableMessage::isDequeueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
return asyncDequeueCounter == 0;
}
void PersistableMessage::dequeueComplete() {
bool notify = false;
{
- sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
if (asyncDequeueCounter > 0) {
if (--asyncDequeueCounter == 0) {
notify = true;
- asyncDequeueLock.notify();
}
}
}
if (notify) allDequeuesComplete();
}
-void PersistableMessage::waitForDequeueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
- while (asyncDequeueCounter > 0) {
- asyncDequeueLock.wait();
- }
-}
-
void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
@@ -150,7 +134,7 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag
}
void PersistableMessage::dequeueAsync() {
- sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
asyncDequeueCounter++;
}
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 59fa2e3d95..4f2e3abafa 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -28,7 +28,7 @@
#include <boost/weak_ptr.hpp>
#include "Persistable.h"
#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
#include "PersistableQueue.h"
namespace qpid {
@@ -42,8 +42,8 @@ class MessageStore;
class PersistableMessage : public Persistable
{
typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
- sys::Monitor asyncEnqueueLock;
- sys::Monitor asyncDequeueLock;
+ sys::Mutex asyncEnqueueLock;
+ sys::Mutex asyncDequeueLock;
sys::Mutex storeLock;
/**
@@ -93,8 +93,6 @@ class PersistableMessage : public Persistable
bool isContentReleased() const;
- void waitForEnqueueComplete();
-
bool isEnqueueComplete();
void enqueueComplete();
@@ -107,8 +105,6 @@ class PersistableMessage : public Persistable
void dequeueComplete();
- void waitForDequeueComplete();
-
void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store);
void dequeueAsync();