diff options
author | Gordon Sim <gsim@apache.org> | 2009-02-26 17:18:46 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-02-26 17:18:46 +0000 |
commit | ca7460747ce41c91ef1d485b514e9dfe2879cb1c (patch) | |
tree | 7238c0f779cc1967b33fe1efaade6e35629fc197 /cpp/src | |
parent | b12874a1e35a05a5489c95f8099733ff788225e5 (diff) | |
download | qpid-python-ca7460747ce41c91ef1d485b514e9dfe2879cb1c.tar.gz |
QPID-1695: Make LVQ persist durable messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748214 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 3 |
3 files changed, 27 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 8c50f26abd..bc29815e84 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -176,7 +176,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ - push(msg); + push(msg, true); msg->enqueueComplete(); // mark the message as enqueued mgntEnqStats(msg); @@ -545,7 +545,7 @@ void Queue::popMsg(QueuedMessage& qmsg) ++dequeueTracker; } -void Queue::push(boost::intrusive_ptr<Message>& msg){ +void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); @@ -566,7 +566,13 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; i->second->setReplacementMessage(msg,this); - dequeued(QueuedMessage(qm.queue, old, qm.position)); + if (isRecovery) { + //can't issue new requests for the store until + //recovery is complete + pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); + } else { + dequeue(0, QueuedMessage(qm.queue, old, qm.position)); + } } }else { messages.push_back(qm); @@ -664,7 +670,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) msg->addTraceId(traceId); } - if (msg->isPersistent() && store && !lastValueQueue) { + if (msg->isPersistent() && store) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); @@ -683,7 +689,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } } - if (msg.payload->isPersistent() && store && !lastValueQueue) { + if (msg.payload->isPersistent() && store) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); @@ -976,3 +982,10 @@ void Queue::setQueueEventManager(QueueEvents& mgr) { eventMgr = &mgr; } + +void Queue::recoveryComplete() +{ + //process any pending dequeues + for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + pendingDequeues.clear(); +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 14849b3c8e..dfba0533e6 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -85,6 +85,7 @@ namespace qpid { std::vector<std::string> traceExclude; QueueListeners listeners; Messages messages; + Messages pendingDequeues;//used to avoid dequeuing during recovery LVQ lvq; mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Mutex messageLock; @@ -101,7 +102,7 @@ namespace qpid { int eventMode; QueueEvents* eventMgr; - void push(boost::intrusive_ptr<Message>& msg); + void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); bool seek(QueuedMessage& msg, Consumer::shared_ptr position); bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); @@ -290,6 +291,11 @@ namespace qpid { void setPosition(framing::SequenceNumber pos); int getEventMode(); void setQueueEventManager(QueueEvents&); + + /** + * Notify queue that recovery has completed. + */ + void recoveryComplete(); }; } } diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 8030cf7d0e..5f8b57fa0b 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -149,7 +149,8 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer void RecoveryManagerImpl::recoveryComplete() { - //TODO (finalise binding setup etc) + //notify all queues + queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1)); } bool RecoverableMessageImpl::loadContent(uint64_t available) |