diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 180 |
1 files changed, 122 insertions, 58 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index ba06ee82f2..ff9c050348 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -21,34 +21,45 @@ */ #include "BrokerContext.h" +#include "EventHandler.h" #include "Group.h" #include "Multicaster.h" #include "QueueContext.h" +#include "QueueHandler.h" #include "hash.h" -#include "qpid/cluster/types.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/ClusterQueueResubscribeBody.h" -#include "qpid/framing/ClusterQueueSubscribeBody.h" -#include "qpid/framing/ClusterQueueUnsubscribeBody.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/ClusterMessageAcquireBody.h" +#include "qpid/framing/ClusterMessageDequeueBody.h" +#include "qpid/framing/ClusterQueueConsumedBody.h" +#include "qpid/framing/ClusterQueueSubscribeBody.h" +#include "qpid/framing/ClusterQueueUnsubscribeBody.h" +#include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { +using framing::SequenceSet; +const framing::ProtocolVersion pv; // shorthand + QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_) - : consumers(0), consuming(true), ticks(0), + : ownership(UNSUBSCRIBED), consumers(0), consuming(false), ticks(0), queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())), - maxTicks(maxTicks_) + maxTicks(maxTicks_), group(g) { - q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); + q.setClusterContext(std::auto_ptr<broker::Context>(this)); q.stopConsumers(); // Stop queue initially. - g.getTicker().add(this); + group.getTicker().add(this); } -QueueContext::~QueueContext() {} +QueueContext::~QueueContext() { + // Lifecycle: must remove all references to this context before it is deleted. + // Must be sure that there can be no use of this context later. + group.getTicker().remove(this); + group.getEventHandler().getHandler<QueueHandler>()->remove(queue); +} namespace { bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } @@ -57,72 +68,66 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } // Called by QueueReplica in CPG deliver thread when state changes. void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) { + sys::Mutex::ScopedLock l(lock); // Interested in state changes which lead to ownership. // We voluntarily give up ownership before multicasting // the state change so we don't need to handle transitions // that lead to non-ownership. + if (before != after && isOwner(after)) { - bool start = false; - { - sys::Mutex::ScopedLock l(lock); - start = !consuming; - consuming = true; - ticks = 0; - } - if (start) queue.startConsumers(); + assert(before == ownership); + if (!consuming) queue.startConsumers(); + consuming = true; + ticks = 0; } + ownership = after; } // FIXME aconway 2011-07-27: Dont spin the token on an empty queue. // Called in broker threads when a consumer is added void QueueContext::consume(size_t n) { - { - sys::Mutex::ScopedLock l(lock); - consumers = n; - } - if (n == 1) mcast.mcast( - framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName())); + sys::Mutex::ScopedLock l(lock); + if (consumers == 0 && n > 0 && ownership == UNSUBSCRIBED) + mcast.mcast( + framing::ClusterQueueSubscribeBody(pv, queue.getName())); + consumers = n; } // Called in broker threads when a consumer is cancelled void QueueContext::cancel(size_t n) { - bool stop = false; - { - sys::Mutex::ScopedLock l(lock); - consumers = n; - stop = (n == 0 && consuming); - } - if (stop) queue.stopConsumers(); + sys::Mutex::ScopedLock l(lock); + consumers = n; + if (n == 0 && consuming) queue.stopConsumers(); } +// FIXME aconway 2011-11-03: review scope of locking around sendConsumed + // Called in Ticker thread. void QueueContext::tick() { - bool stop = false; - { - sys::Mutex::ScopedLock l(lock); - stop = (consuming && ++ticks >= maxTicks); - } - // When all threads have stopped, queue will call stopped() - if (stop) queue.stopConsumers(); + sys::Mutex::ScopedLock l(lock); + if (!consuming) return; // Nothing to do if we don't have the lock. + if (ownership == SHARED_OWNER && ++ticks >= maxTicks) queue.stopConsumers(); + else if (ownership == SOLE_OWNER) sendConsumed(l); // Status report on consumption } // Callback set up by queue.stopConsumers() called in connection or timer thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { - bool resubscribe = false; - { - sys::Mutex::ScopedLock l(lock); - assert(consuming); - consuming = false; - resubscribe = consumers; - } - if (resubscribe) - mcast.mcast(framing::ClusterQueueResubscribeBody( - framing::ProtocolVersion(), queue.getName())); - else - mcast.mcast(framing::ClusterQueueUnsubscribeBody( - framing::ProtocolVersion(), queue.getName())); + sys::Mutex::ScopedLock l(lock); + if (!consuming) return; // !consuming => initial stopConsumers in ctor. + sendConsumed(l); + mcast.mcast( + framing::ClusterQueueUnsubscribeBody(pv, queue.getName(), consumers)); + consuming = false; +} + +void QueueContext::sendConsumed(const sys::Mutex::ScopedLock&) { + if (acquired.empty() && dequeued.empty()) return; // Nothing to send + mcast.mcast( + framing::ClusterQueueConsumedBody(pv, queue.getName(), acquired,dequeued)); + acquired.clear(); + dequeued.clear(); } void QueueContext::requeue(uint32_t position, bool redelivered) { @@ -135,17 +140,76 @@ void QueueContext::requeue(uint32_t position, bool redelivered) { } } -void QueueContext::acquire(const broker::QueuedMessage& qm) { - unacked.put(qm.position, qm); +void QueueContext::localAcquire(uint32_t position) { + QPID_LOG(trace, "cluster queue " << queue.getName() << " acquired " << position); + sys::Mutex::ScopedLock l(lock); + assert(consuming); + acquired.add(position); +} + +void QueueContext::localDequeue(uint32_t position) { + QPID_LOG(trace, "cluster queue " << queue.getName() << " dequeued " << position); + // FIXME aconway 2010-10-28: for local dequeues, we should + // complete the ack that initiated the dequeue at this point. + sys::Mutex::ScopedLock l(lock); + + // FIXME aconway 2011-11-03: this assertion fails for explicit accept + // because it doesn't respect the consume lock. + // assert(consuming); + + dequeued.add(position); +} + +void QueueContext::consumed( + const MemberId& sender, + const SequenceSet& acquired, + const SequenceSet& dequeued) +{ + // No lock, doesn't touch any members. + + // FIXME aconway 2011-09-15: systematic logging across cluster module. + // FIXME aconway 2011-09-23: pretty printing for identifier. + QPID_LOG(trace, "cluster: " << sender << " acquired: " << acquired + << " dequeued: " << dequeued << " on queue: " << queue.getName()); + + // Note acquires from other members. My own acquires were executed in + // the connection thread + if (sender != group.getSelf()) { + // FIXME aconway 2011-09-23: avoid individual finds, scan queue once. + for (SequenceSet::iterator i = acquired.begin(); i != acquired.end(); ++i) + acquire(*i); + } + // Process deques from the queue owner. + // FIXME aconway 2011-09-23: avoid individual finds, scan queue once. + for (SequenceSet::iterator i = dequeued.begin(); i != dequeued.end(); ++i) + dequeue(*i); +} + +// Remote acquire +void QueueContext::acquire(uint32_t position) { + // No lock, doesn't touch any members. + broker::QueuedMessage qm; + BrokerContext::ScopedSuppressReplication ssr; + if (!queue.acquireMessageAt(position, qm)) + // FIXME aconway 2011-10-31: error handling + throw Exception(QPID_MSG("cluster: acquire: message not found: " + << queue.getName() << "[" << position << "]")); + assert(qm.position.getValue() == position); + assert(qm.payload); + unacked.put(qm.position, qm); // unacked has its own lock. } -broker::QueuedMessage QueueContext::dequeue(uint32_t position) { - return unacked.pop(position); +void QueueContext::dequeue(uint32_t position) { + // No lock, doesn't touch any members. unacked has its own lock. + broker::QueuedMessage qm = unacked.pop(position); + BrokerContext::ScopedSuppressReplication ssr; + if (qm.queue) queue.dequeue(0, qm); } -boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { - return boost::intrusive_ptr<QueueContext>( - static_cast<QueueContext*>(q.getClusterContext().get())); +QueueContext* QueueContext::get(broker::Queue& q) { + return static_cast<QueueContext*>(q.getClusterContext()); } +// FIXME aconway 2011-09-23: make unacked a plain map, use lock. + }} // namespace qpid::cluster |