diff options
Diffstat (limited to 'cpp/src/qpid/broker/PersistableMessage.cpp')
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 48 |
1 files changed, 40 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 7ba28eb293..957248b522 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -21,7 +21,8 @@ #include "qpid/broker/PersistableMessage.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/AsyncStore.h" #include <iostream> using namespace qpid::broker; @@ -29,13 +30,12 @@ using namespace qpid::broker; namespace qpid { namespace broker { -class MessageStore; - PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : asyncDequeueCounter(0), - store(0) + store(0), + asyncStore(0) {} void PersistableMessage::flush() @@ -78,8 +78,8 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ return false; } - -void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { +// deprecated +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -88,7 +88,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa } } -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + asyncStore = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } +} + +// deprecated +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + addToSyncList(queue, _store); + enqueueStart(); +} + +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) { addToSyncList(queue, _store); enqueueStart(); } @@ -111,7 +126,8 @@ void PersistableMessage::dequeueComplete() { if (notify) allDequeuesComplete(); } -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +// deprecated +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -121,6 +137,16 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag dequeueAsync(); } +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + asyncStore = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } + dequeueAsync(); +} + void PersistableMessage::dequeueAsync() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); asyncDequeueCounter++; @@ -128,11 +154,17 @@ void PersistableMessage::dequeueAsync() { PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} +// deprecated void PersistableMessage::setStore(MessageStore* s) { store = s; } +void PersistableMessage::setStore(AsyncStore* s) +{ + asyncStore = s; +} + void PersistableMessage::requestContentRelease() { contentReleaseState.requested = true; |