summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/PersistableMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/PersistableMessage.cpp')
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp48
1 files changed, 40 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 7ba28eb293..957248b522 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -21,7 +21,8 @@
#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/AsyncStore.h"
#include <iostream>
using namespace qpid::broker;
@@ -29,13 +30,12 @@ using namespace qpid::broker;
namespace qpid {
namespace broker {
-class MessageStore;
-
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
asyncDequeueCounter(0),
- store(0)
+ store(0),
+ asyncStore(0)
{}
void PersistableMessage::flush()
@@ -78,8 +78,8 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
return false;
}
-
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+// deprecated
+void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -88,7 +88,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa
}
}
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ asyncStore = _store;
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
+ }
+}
+
+// deprecated
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+ addToSyncList(queue, _store);
+ enqueueStart();
+}
+
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
addToSyncList(queue, _store);
enqueueStart();
}
@@ -111,7 +126,8 @@ void PersistableMessage::dequeueComplete() {
if (notify) allDequeuesComplete();
}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+// deprecated
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -121,6 +137,16 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag
dequeueAsync();
}
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ asyncStore = _store;
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
+ }
+ dequeueAsync();
+}
+
void PersistableMessage::dequeueAsync() {
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
asyncDequeueCounter++;
@@ -128,11 +154,17 @@ void PersistableMessage::dequeueAsync() {
PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
+// deprecated
void PersistableMessage::setStore(MessageStore* s)
{
store = s;
}
+void PersistableMessage::setStore(AsyncStore* s)
+{
+ asyncStore = s;
+}
+
void PersistableMessage::requestContentRelease()
{
contentReleaseState.requested = true;