summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/PersistableMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/PersistableMessage.cpp')
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp53
1 files changed, 31 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index c1f86d4ca4..dc855315db 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
#include "qpid/broker/PersistableMessage.h"
#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/NullMessageStore.h"
#include <iostream>
using namespace qpid::broker;
@@ -34,9 +35,8 @@ class MessageStore;
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
- asyncEnqueueCounter(0),
+ asyncEnqueueCounter(0),
asyncDequeueCounter(0),
- contentReleased(false),
store(0)
{}
@@ -56,13 +56,22 @@ void PersistableMessage::flush()
if (q) {
store->flush(*q);
}
- }
+ }
}
-void PersistableMessage::setContentReleased() {contentReleased = true; }
+void PersistableMessage::setContentReleased() { releaseMgr.setReleased(); }
+
+void PersistableMessage::blockRelease() { releaseMgr.blockRelease(); }
+
+bool PersistableMessage::requestContentRelease()
+{
+ if (!store || NullMessageStore::isNullStore(store) || releaseMgr.isReleaseBlocked()) return false;
+ releaseMgr.setReleaseRequested();
+ return true;
+}
+
+bool PersistableMessage::isContentReleased()const { return releaseMgr.isReleased(); }
-bool PersistableMessage::isContentReleased()const { return contentReleased; }
-
bool PersistableMessage::isEnqueueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
return asyncEnqueueCounter == 0;
@@ -85,8 +94,8 @@ void PersistableMessage::enqueueComplete() {
for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
PersistableQueue::shared_ptr q(i->lock());
if (q) q->notifyDurableIOComplete();
- }
- }
+ }
+ }
}
}
@@ -95,13 +104,13 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
PersistableQueue::shared_ptr q(i->lock());
if (q && q->getPersistenceId() == queue->getPersistenceId()) return true;
- }
- }
+ }
+ }
return false;
}
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -110,22 +119,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa
}
}
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
addToSyncList(queue, _store);
enqueueAsync();
}
-void PersistableMessage::enqueueAsync() {
+void PersistableMessage::enqueueAsync() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
- asyncEnqueueCounter++;
+ asyncEnqueueCounter++;
}
-bool PersistableMessage::isDequeueComplete() {
+bool PersistableMessage::isDequeueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
return asyncDequeueCounter == 0;
}
-
-void PersistableMessage::dequeueComplete() {
+
+void PersistableMessage::dequeueComplete() {
bool notify = false;
{
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
@@ -138,7 +147,7 @@ void PersistableMessage::dequeueComplete() {
if (notify) allDequeuesComplete();
}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -148,9 +157,9 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag
dequeueAsync();
}
-void PersistableMessage::dequeueAsync() {
+void PersistableMessage::dequeueAsync() {
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
- asyncDequeueCounter++;
+ asyncDequeueCounter++;
}
}}