diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 80 |
1 files changed, 50 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index f593d7e443..84f025824c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -112,7 +112,8 @@ Queue::Queue(const string& _name, bool _autodelete, broker(b), deleted(false), barrier(*this), - autoDeleteTimeout(0) + autoDeleteTimeout(0), + dispatching(boost::bind(&Queue::acquireStopped,this)) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -231,29 +232,40 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -// Inform the cluster of an acquired message on exit from a function -// that does the acquiring. ClusterAcquireOnExit is declared *before* -// any locks are taken. The calling function sets qmsg to the acquired -// message with a lock held, but the call to Cluster::acquire() will -// be outside the lock. -struct ClusterAcquireOnExit { +/** Mark a scope that acquires a message. + * + * ClusterAcquireScope is declared before are taken. The calling + * function sets qmsg with the lock held, but the call to + * Cluster::acquire() will happen after the lock is released. + * + * Also marks a Stoppable as busy for the duration of the scope. + **/ +struct ClusterAcquireScope { Broker* broker; + Queue& queue; QueuedMessage qmsg; - ClusterAcquireOnExit(Broker* b) : broker(b) {} - ~ClusterAcquireOnExit() { - if (broker && qmsg.queue) broker->getCluster().acquire(qmsg); + + ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {} + + ~ClusterAcquireScope() { + if (broker) { + // FIXME aconway 2011-06-27: Move to QueueContext. + // Avoid the indirection via queuename. + if (qmsg.queue) broker->getCluster().acquire(qmsg); + else broker->getCluster().empty(queue); + } } }; bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - ClusterAcquireOnExit willAcquire(broker); // Outside lock + ClusterAcquireScope acquireScope(*this); // Outside lock Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); if (messages->remove(position, message)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); - willAcquire.qmsg = message; + acquireScope.qmsg = message; return true; } else { QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); @@ -300,9 +312,15 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { - ClusterAcquireOnExit willAcquire(broker); // Outside the lock + Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming + if (!stopper) { + QPID_LOG(trace, "Queue is stopped: " << name); + listeners.addListener(c); + return NO_MESSAGES; + } + ClusterAcquireScope acquireScope(*this); // Outside the lock Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { + if (messages->empty()) { // FIXME aconway 2011-06-07: ugly QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -317,7 +335,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - willAcquire.qmsg = msg; + acquireScope.qmsg = msg; pop(); return CONSUMED; } else { @@ -374,18 +392,11 @@ void Queue::removeListener(Consumer::shared_ptr c) bool Queue::dispatch(Consumer::shared_ptr c) { - Stoppable::Scope doDispatch(dispatching); - if (doDispatch) { - QueuedMessage msg(this); - if (getNextMessage(msg, c)) { - c->deliver(msg); - return true; - } else { - return false; - } - } else { // Dispatching is stopped - Mutex::ScopedLock locker(messageLock); - listeners.addListener(c); // FIXME aconway 2011-05-05: + QueuedMessage msg(this); + if (getNextMessage(msg, c)) { + c->deliver(msg); + return true; + } else { return false; } } @@ -450,10 +461,10 @@ void Queue::cancel(Consumer::shared_ptr c){ } QueuedMessage Queue::get(){ - ClusterAcquireOnExit willAcquire(broker); // Outside lock + ClusterAcquireScope acquireScope(*this); // Outside lock Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - if (messages->pop(msg)) willAcquire.qmsg = msg; + if (messages->pop(msg)) acquireScope.qmsg = msg; return msg; } @@ -704,7 +715,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) if (!isEnqueued(msg)) return false; if (!ctxt) dequeued(msg); } + if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock + // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -902,6 +915,10 @@ void Queue::notifyDeleted() set.notifyAll(); } +void Queue::acquireStopped() { + if (broker) broker->getCluster().stopped(*this); +} + void Queue::bound(const string& exchange, const string& key, const FieldTable& args) { @@ -1234,7 +1251,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, } -const Broker* Queue::getBroker() +Broker* Queue::getBroker() { return broker; } @@ -1268,10 +1285,13 @@ void Queue::UsageBarrier::destroy() // FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()? void Queue::stop() { + // FIXME aconway 2011-05-25: rename dispatching - acquiring? dispatching.stop(); } void Queue::start() { + QPID_LOG(critical, "FIXME start context=" << clusterContext); + assert(clusterContext); // FIXME aconway 2011-06-08: XXX dispatching.start(); notifyListener(); } |