diff options
Diffstat (limited to 'cpp/src/qpid/broker/PersistableMessage.cpp')
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 53 |
1 files changed, 31 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index c1f86d4ca4..dc855315db 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ #include "qpid/broker/PersistableMessage.h" #include "qpid/broker/MessageStore.h" +#include "qpid/broker/NullMessageStore.h" #include <iostream> using namespace qpid::broker; @@ -34,9 +35,8 @@ class MessageStore; PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : - asyncEnqueueCounter(0), + asyncEnqueueCounter(0), asyncDequeueCounter(0), - contentReleased(false), store(0) {} @@ -56,13 +56,22 @@ void PersistableMessage::flush() if (q) { store->flush(*q); } - } + } } -void PersistableMessage::setContentReleased() {contentReleased = true; } +void PersistableMessage::setContentReleased() { releaseMgr.setReleased(); } + +void PersistableMessage::blockRelease() { releaseMgr.blockRelease(); } + +bool PersistableMessage::requestContentRelease() +{ + if (!store || NullMessageStore::isNullStore(store) || releaseMgr.isReleaseBlocked()) return false; + releaseMgr.setReleaseRequested(); + return true; +} + +bool PersistableMessage::isContentReleased()const { return releaseMgr.isReleased(); } -bool PersistableMessage::isContentReleased()const { return contentReleased; } - bool PersistableMessage::isEnqueueComplete() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); return asyncEnqueueCounter == 0; @@ -85,8 +94,8 @@ void PersistableMessage::enqueueComplete() { for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { PersistableQueue::shared_ptr q(i->lock()); if (q) q->notifyDurableIOComplete(); - } - } + } + } } } @@ -95,13 +104,13 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { PersistableQueue::shared_ptr q(i->lock()); if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; - } - } + } + } return false; } -void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -110,22 +119,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa } } -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { addToSyncList(queue, _store); enqueueAsync(); } -void PersistableMessage::enqueueAsync() { +void PersistableMessage::enqueueAsync() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - asyncEnqueueCounter++; + asyncEnqueueCounter++; } -bool PersistableMessage::isDequeueComplete() { +bool PersistableMessage::isDequeueComplete() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); return asyncDequeueCounter == 0; } - -void PersistableMessage::dequeueComplete() { + +void PersistableMessage::dequeueComplete() { bool notify = false; { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); @@ -138,7 +147,7 @@ void PersistableMessage::dequeueComplete() { if (notify) allDequeuesComplete(); } -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -148,9 +157,9 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag dequeueAsync(); } -void PersistableMessage::dequeueAsync() { +void PersistableMessage::dequeueAsync() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - asyncDequeueCounter++; + asyncDequeueCounter++; } }} |