diff options
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 1 |
4 files changed, 23 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 4d272c3780..2275009015 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -90,6 +90,16 @@ void PersistableMessage::enqueueComplete() { } } +bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ + if (store && (queue->getPersistenceId()!=0)) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + PersistableQueue::shared_ptr q(i->lock()); + if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; + } + } + return false; +} + void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 92f89ba578..98d9655862 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -111,6 +111,8 @@ class PersistableMessage : public Persistable MessageStore* _store); QPID_BROKER_EXTERN void dequeueAsync(); + + bool isStoredOnQueue(PersistableQueue::shared_ptr queue); }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 30be733f89..19589e1d84 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -99,8 +99,7 @@ Queue::Queue(const string& _name, bool _autodelete, eventMode(0), eventMgr(0), insertSeqNo(0), - broker(b), - lastForcedPosition(0) + broker(b) { if (parent != 0 && broker != 0) { @@ -211,6 +210,14 @@ void Queue::requeue(const QueuedMessage& msg){ msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); + + // for persistLastNode - don't force a message twice to disk, but force it if no force before + if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { + msg.payload->forcePersistent(); + if (msg.payload->isForcedPersistent() ){ + enqueue(0, msg.payload); + } + } } copy.notify(); } @@ -660,7 +667,6 @@ bool Queue::canAutoDelete() const void Queue::clearLastNodeFailure() { inLastNodeFailure = false; - lastForcedPosition = sequence; } void Queue::setLastNodeFailure() @@ -669,19 +675,19 @@ void Queue::setLastNodeFailure() Mutex::ScopedLock locker(messageLock); for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { // don't force a message twice to disk. - if(i->position > lastForcedPosition) { + if(!i->payload->isStoredOnQueue(shared_from_this())) { if (lastValueQueue) checkLvqReplace(*i); i->payload->forcePersistent(); if (i->payload->isForcedPersistent() ){ enqueue(0, i->payload); } - lastForcedPosition = i->position; } } inLastNodeFailure = true; } } + // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index dbad5e12ed..f1a2afe3a4 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -106,7 +106,6 @@ namespace qpid { bool insertSeqNo; std::string seqNoKey; Broker* broker; - framing::SequenceNumber lastForcedPosition; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); |