diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 126 |
1 files changed, 99 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index e59857462c..b05172f984 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/Broker.h" +#include "qpid/broker/Cluster.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" @@ -224,6 +225,7 @@ void Queue::requeue(const QueuedMessage& msg){ } } } + if (broker) broker->getCluster().release(msg); copy.notify(); } @@ -236,8 +238,22 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } +// Inform the cluster of an acquired message on exit from a function +// that does the acquiring. The calling function should set qmsg +// to the acquired message. +struct ClusterAcquireOnExit { + Broker* broker; + QueuedMessage qmsg; + ClusterAcquireOnExit(Broker* b) : broker(b) {} + ~ClusterAcquireOnExit() { + if (broker && qmsg.queue) broker->getCluster().acquire(qmsg); + } +}; + bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { + ClusterAcquireOnExit willAcquire(broker); + Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); @@ -248,16 +264,18 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess if (lastValueQueue) { clearLVQIndex(*i); } - QPID_LOG(debug, - "Acquired message at " << i->position << " from " << name); + QPID_LOG(debug, "Acquired message at " << i->position << " from " << name); + willAcquire.qmsg = *i; messages.erase(i); return true; - } + } QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); return false; } bool Queue::acquire(const QueuedMessage& msg) { + ClusterAcquireOnExit acquire(broker); + Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -265,16 +283,17 @@ bool Queue::acquire(const QueuedMessage& msg) { Messages::iterator i = findAt(msg.position); if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set (!lastValueQueue || - (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 - ) { + (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 + ) { clearLVQIndex(msg); QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); + acquire.qmsg = *i; messages.erase(i); return true; - } + } QPID_LOG(debug, "Acquire failed for " << msg.position); return false; @@ -314,6 +333,8 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { + ClusterAcquireOnExit willAcquire(broker); // Outside the lock + Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); @@ -330,6 +351,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; + willAcquire.qmsg = msg; popMsg(msg); return CONSUMED; } else { @@ -451,40 +473,51 @@ QueuedMessage Queue::find(SequenceNumber pos) const { return QueuedMessage(); } -void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) { assertClusterSafe(); - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + size_t consumers; + { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } } + consumers = ++consumerCount; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); } - consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); + if (broker) broker->getCluster().consume(*this, consumers); } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - Mutex::ScopedLock locker(consumerLock); - consumerCount--; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); + size_t consumers; + { + Mutex::ScopedLock locker(consumerLock); + consumers = --consumerCount; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); + } + if (broker) broker->getCluster().cancel(*this, consumers); } QueuedMessage Queue::get(){ + ClusterAcquireOnExit acquire(broker); // Outside lock + Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); if(!messages.empty()){ msg = getFront(); + acquire.qmsg = msg; popMsg(msg); } return msg; @@ -609,10 +642,11 @@ void Queue::popMsg(QueuedMessage& qmsg) void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); + QueuedMessage qm; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); - QueuedMessage qm(this, msg, ++sequence); + qm = QueuedMessage(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -629,12 +663,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; i->second->setReplacementMessage(msg,this); + // FIXME aconway 2010-10-15: it is incorrect to use qm.position below + // should be using the position of the message being replaced. if (isRecovery) { //can't issue new requests for the store until //recovery is complete pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); } else { - Mutex::ScopedUnlock u(messageLock); + Mutex::ScopedUnlock u(messageLock); dequeue(0, QueuedMessage(qm.queue, old, qm.position)); } } @@ -651,6 +687,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ } } copy.notify(); + if (broker) broker->getCluster().enqueue(qm); } QueuedMessage Queue::getFront() @@ -792,12 +829,42 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) if (policy.get()) policy->enqueueAborted(msg); } +void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) { + if (broker) broker->getCluster().accept(msg); + dequeue(ctxt, msg); +} + +struct ScopedClusterReject { + Broker* broker; + const QueuedMessage& qmsg; + ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) { + if (broker) broker->getCluster().reject(qmsg); + } + ~ScopedClusterReject() { + if (broker) broker->getCluster().rejected(qmsg); + } +}; + +void Queue::reject(const QueuedMessage &msg) { + ScopedClusterReject scr(broker, msg); + Exchange::shared_ptr alternate = getAlternateExchange(); + if (alternate) { + DeliverableMessage delivery(msg.payload); + alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); + QPID_LOG(info, "Routed rejected message from " << getName() << " to " + << alternate->getName()); + } else { + //just drop it + QPID_LOG(info, "Dropping rejected message from " << getName()); + } + dequeue(0, msg); +} + // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); if (!u.acquired) return false; - { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; @@ -846,6 +913,9 @@ void Queue::popAndDequeue() */ void Queue::dequeued(const QueuedMessage& msg) { + // Note: Cluster::dequeued does only local book-keeping, no multicast + // So OK to call here with lock held. + if (broker) broker->getCluster().dequeue(msg); if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { @@ -861,6 +931,7 @@ void Queue::create(const FieldTable& _settings) store->create(*this, _settings); } configure(_settings); + if (broker) broker->getCluster().create(*this); } void Queue::configure(const FieldTable& _settings, bool recovering) @@ -934,6 +1005,7 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } + if (broker) broker->getCluster().destroy(*this); } void Queue::notifyDeleted() |