diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 101 |
1 files changed, 52 insertions, 49 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 56800e6b95..ba06ee82f2 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -20,11 +20,12 @@ * */ -#include "QueueContext.h" +#include "BrokerContext.h" +#include "Group.h" #include "Multicaster.h" -#include "qpid/cluster/types.h" -#include "BrokerContext.h" // for ScopedSuppressReplication +#include "QueueContext.h" #include "hash.h" +#include "qpid/cluster/types.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/ClusterQueueResubscribeBody.h" #include "qpid/framing/ClusterQueueSubscribeBody.h" @@ -37,14 +38,14 @@ namespace qpid { namespace cluster { -QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m) - : timer(boost::bind(&QueueContext::timeout, this), - q.getBroker()->getTimer(), - consumeLock), - queue(q), mcast(m), consumers(0), hash(hashof(q.getName())) +QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_) + : consumers(0), consuming(true), ticks(0), + queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())), + maxTicks(maxTicks_) { q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); q.stopConsumers(); // Stop queue initially. + g.getTicker().add(this); } QueueContext::~QueueContext() {} @@ -54,72 +55,74 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } } // Called by QueueReplica in CPG deliver thread when state changes. -void QueueContext::replicaState( - QueueOwnership before, QueueOwnership after, bool selfDelivered) +void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) { - // No lock, this function does not touch any member variables. - - // Invariants for ownership: - // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped - // SOLE_OWNER <=> timer stopped, queue started - // SHARED_OWNER <=> timer started, queue started - - // Interested in state changes and my own events which lead to - // ownership. - if ((before != after || selfDelivered) && isOwner(after)) { - QPID_LOG(trace, "cluster: start consumers on " << queue.getName() << ", timer " - << (after==SHARED_OWNER? "start" : "stop")); - queue.startConsumers(); - if (after == SHARED_OWNER) timer.start(); - else timer.stop(); + // Interested in state changes which lead to ownership. + // We voluntarily give up ownership before multicasting + // the state change so we don't need to handle transitions + // that lead to non-ownership. + if (before != after && isOwner(after)) { + bool start = false; + { + sys::Mutex::ScopedLock l(lock); + start = !consuming; + consuming = true; + ticks = 0; + } + if (start) queue.startConsumers(); } - - // If we lost ownership then the queue and timer will already have - // been stopped by timeout() } // FIXME aconway 2011-07-27: Dont spin the token on an empty queue. // Called in broker threads when a consumer is added void QueueContext::consume(size_t n) { - sys::Mutex::ScopedLock l(lock); - consumers = n; + { + sys::Mutex::ScopedLock l(lock); + consumers = n; + } if (n == 1) mcast.mcast( framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName())); } // Called in broker threads when a consumer is cancelled void QueueContext::cancel(size_t n) { - sys::Mutex::ScopedLock l(lock); - consumers = n; - // When consuming threads are stopped, this->stopped will be called. - if (n == 0) { - QPID_LOG(trace, "cluster: all consumers canceled on " << queue.getName()); - timer.stop(); - queue.stopConsumers(); + bool stop = false; + { + sys::Mutex::ScopedLock l(lock); + consumers = n; + stop = (n == 0 && consuming); } + if (stop) queue.stopConsumers(); } -// Called in timer thread. -void QueueContext::timeout() { +// Called in Ticker thread. +void QueueContext::tick() { + bool stop = false; + { + sys::Mutex::ScopedLock l(lock); + stop = (consuming && ++ticks >= maxTicks); + } // When all threads have stopped, queue will call stopped() - QPID_LOG(trace, "cluster: lock timeout on " << queue.getName()); - queue.stopConsumers(); + if (stop) queue.stopConsumers(); } // Callback set up by queue.stopConsumers() called in connection or timer thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { - sys::Mutex::ScopedLock l(lock); - QPID_LOG(trace, "cluster: stopped consumers, " - << (consumers == 0 ? "unsubscribe" : "resubscribe") - << " to " << queue.getName()); - if (consumers == 0) - mcast.mcast(framing::ClusterQueueUnsubscribeBody( - framing::ProtocolVersion(), queue.getName())); - else // FIXME aconway 2011-09-13: check if we're owner? + bool resubscribe = false; + { + sys::Mutex::ScopedLock l(lock); + assert(consuming); + consuming = false; + resubscribe = consumers; + } + if (resubscribe) mcast.mcast(framing::ClusterQueueResubscribeBody( framing::ProtocolVersion(), queue.getName())); + else + mcast.mcast(framing::ClusterQueueUnsubscribeBody( + framing::ProtocolVersion(), queue.getName())); } void QueueContext::requeue(uint32_t position, bool redelivered) { |