summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp10
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp16
-rw-r--r--cpp/src/qpid/broker/Queue.h1
4 files changed, 23 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 4d272c3780..2275009015 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -90,6 +90,16 @@ void PersistableMessage::enqueueComplete() {
}
}
+bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
+ if (store && (queue->getPersistenceId()!=0)) {
+ for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
+ PersistableQueue::shared_ptr q(i->lock());
+ if (q && q->getPersistenceId() == queue->getPersistenceId()) return true;
+ }
+ }
+ return false;
+}
+
void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index 92f89ba578..98d9655862 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -111,6 +111,8 @@ class PersistableMessage : public Persistable
MessageStore* _store);
QPID_BROKER_EXTERN void dequeueAsync();
+
+ bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 30be733f89..19589e1d84 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -99,8 +99,7 @@ Queue::Queue(const string& _name, bool _autodelete,
eventMode(0),
eventMgr(0),
insertSeqNo(0),
- broker(b),
- lastForcedPosition(0)
+ broker(b)
{
if (parent != 0 && broker != 0)
{
@@ -211,6 +210,14 @@ void Queue::requeue(const QueuedMessage& msg){
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
+
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
+ msg.payload->forcePersistent();
+ if (msg.payload->isForcedPersistent() ){
+ enqueue(0, msg.payload);
+ }
+ }
}
copy.notify();
}
@@ -660,7 +667,6 @@ bool Queue::canAutoDelete() const
void Queue::clearLastNodeFailure()
{
inLastNodeFailure = false;
- lastForcedPosition = sequence;
}
void Queue::setLastNodeFailure()
@@ -669,19 +675,19 @@ void Queue::setLastNodeFailure()
Mutex::ScopedLock locker(messageLock);
for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
// don't force a message twice to disk.
- if(i->position > lastForcedPosition) {
+ if(!i->payload->isStoredOnQueue(shared_from_this())) {
if (lastValueQueue) checkLvqReplace(*i);
i->payload->forcePersistent();
if (i->payload->isForcedPersistent() ){
enqueue(0, i->payload);
}
- lastForcedPosition = i->position;
}
}
inLastNodeFailure = true;
}
}
+
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index dbad5e12ed..f1a2afe3a4 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -106,7 +106,6 @@ namespace qpid {
bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
- framing::SequenceNumber lastForcedPosition;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);