From ca7460747ce41c91ef1d485b514e9dfe2879cb1c Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 26 Feb 2009 17:18:46 +0000 Subject: QPID-1695: Make LVQ persist durable messages git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748214 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Queue.cpp | 23 ++++++++++++++++++----- cpp/src/qpid/broker/Queue.h | 8 +++++++- cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 3 ++- 3 files changed, 27 insertions(+), 7 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 8c50f26abd..bc29815e84 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -176,7 +176,7 @@ void Queue::deliver(boost::intrusive_ptr& msg){ void Queue::recover(boost::intrusive_ptr& msg){ - push(msg); + push(msg, true); msg->enqueueComplete(); // mark the message as enqueued mgntEnqStats(msg); @@ -545,7 +545,7 @@ void Queue::popMsg(QueuedMessage& qmsg) ++dequeueTracker; } -void Queue::push(boost::intrusive_ptr& msg){ +void Queue::push(boost::intrusive_ptr& msg, bool isRecovery){ QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); @@ -566,7 +566,13 @@ void Queue::push(boost::intrusive_ptr& msg){ boost::intrusive_ptr old = i->second->getReplacementMessage(this); if (!old) old = i->second; i->second->setReplacementMessage(msg,this); - dequeued(QueuedMessage(qm.queue, old, qm.position)); + if (isRecovery) { + //can't issue new requests for the store until + //recovery is complete + pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); + } else { + dequeue(0, QueuedMessage(qm.queue, old, qm.position)); + } } }else { messages.push_back(qm); @@ -664,7 +670,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr msg) msg->addTraceId(traceId); } - if (msg->isPersistent() && store && !lastValueQueue) { + if (msg->isPersistent() && store) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr pmsg = boost::static_pointer_cast(msg); store->enqueue(ctxt, pmsg, *this); @@ -683,7 +689,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } } - if (msg.payload->isPersistent() && store && !lastValueQueue) { + if (msg.payload->isPersistent() && store) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr pmsg = boost::static_pointer_cast(msg.payload); store->dequeue(ctxt, pmsg, *this); @@ -976,3 +982,10 @@ void Queue::setQueueEventManager(QueueEvents& mgr) { eventMgr = &mgr; } + +void Queue::recoveryComplete() +{ + //process any pending dequeues + for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + pendingDequeues.clear(); +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 14849b3c8e..dfba0533e6 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -85,6 +85,7 @@ namespace qpid { std::vector traceExclude; QueueListeners listeners; Messages messages; + Messages pendingDequeues;//used to avoid dequeuing during recovery LVQ lvq; mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Mutex messageLock; @@ -101,7 +102,7 @@ namespace qpid { int eventMode; QueueEvents* eventMgr; - void push(boost::intrusive_ptr& msg); + void push(boost::intrusive_ptr& msg, bool isRecovery=false); void setPolicy(std::auto_ptr policy); bool seek(QueuedMessage& msg, Consumer::shared_ptr position); bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); @@ -290,6 +291,11 @@ namespace qpid { void setPosition(framing::SequenceNumber pos); int getEventMode(); void setQueueEventManager(QueueEvents&); + + /** + * Notify queue that recovery has completed. + */ + void recoveryComplete(); }; } } diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 8030cf7d0e..5f8b57fa0b 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -149,7 +149,8 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer void RecoveryManagerImpl::recoveryComplete() { - //TODO (finalise binding setup etc) + //notify all queues + queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1)); } bool RecoverableMessageImpl::loadContent(uint64_t available) -- cgit v1.2.1