diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 113 |
1 files changed, 68 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 6c97c906e8..122163ee7e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -21,79 +21,101 @@ #include "QueueContext.h" #include "Multicaster.h" +#include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/ClusterQueueResubscribeBody.h" +#include "qpid/framing/ClusterQueueSubscribeBody.h" #include "qpid/framing/ClusterQueueUnsubscribeBody.h" +#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" - +#include "qpid/sys/Timer.h" namespace qpid { namespace cluster { + +class OwnershipTimeout : public sys::TimerTask { + QueueContext& queueContext; + + public: + OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) : + TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {} + + // FIXME aconway 2011-07-27: thread safety on deletion? + void fire() { queueContext.timeout(); } +}; + QueueContext::QueueContext(broker::Queue& q, Multicaster& m) - : owner(NOT_OWNER), count(0), queue(q), mcast(m) + : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0) { - QPID_LOG(debug, "Assign cluster context to queue " << q.getName()); - q.stop(); // Initially stopped. Must all before setClusterContext q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); + q.stop(); // Initially stopped. +} +QueueContext::~QueueContext() { + // FIXME aconway 2011-07-27: revisit shutdown logic. + // timeout() could be called concurrently with destructor. + sys::Mutex::ScopedLock l(lock); + timerTask->cancel(); } -// Called by QueueReplica in deliver thread. -void QueueContext::sharedOwner(size_t limit) { - QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get()); +void QueueContext::replicaState(QueueOwnership state) { sys::Mutex::ScopedLock l(lock); - count = limit; - if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex? - owner = SHARED_OWNER; + switch (state) { + case UNSUBSCRIBED: + case SUBSCRIBED: + break; + case SOLE_OWNER: + queue.start(); + if (timerTask) { // no need for timeout. + timerTask->cancel(); + timerTask = 0; + } + break; + case SHARED_OWNER: + queue.start(); + if (timerTask) timerTask->cancel(); + // FIXME aconway 2011-07-28: configurable interval. + timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC); + timer.add(timerTask); + break; + } } -// Called by QueueReplica in deliver thread. -void QueueContext::soleOwner() { - QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get()); +// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer. + +void QueueContext::consume(size_t n) { sys::Mutex::ScopedLock l(lock); - count = 0; - if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex? - owner = SOLE_OWNER; + consumers = n; + if (n == 1) mcast.mcast( + framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName())); } -// Called by BrokerContext in connection thread(s) on acquiring a message -void QueueContext::acquire() { - bool stop = false; - { - sys::Mutex::ScopedLock l(lock); - assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own. - QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName() - << " owner=" << owner << " count=" << count); - if (owner == SHARED_OWNER) { - // Note count could be 0 if there are concurrent calls to acquire. - if (count && --count == 0) { - stop = true; - } - } - } - // FIXME aconway 2011-06-28: could have multiple stop() threads... - if (stop) queue.stop(); +void QueueContext::cancel(size_t n) { + sys::Mutex::ScopedLock l(lock); + consumers = n; + if (n == 0) queue.stop(); // FIXME aconway 2011-07-28: Ok inside lock? +} + +void QueueContext::timeout() { + QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName()); + queue.stop(); } -// Callback set up by queue.stop() + +// Callback set up by queue.stop(), called when no threads are dispatching from the queue. +// Release the queue. void QueueContext::stopped() { sys::Mutex::ScopedLock l(lock); - if (owner == NOT_OWNER) { + // FIXME aconway 2011-07-28: review thread safety of state. + // Deffered call to stopped doesn't sit well. + // queueActive is invaled while stop is in progress? + if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( framing::ProtocolVersion(), queue.getName())); - } else { - owner = NOT_OWNER; + else mcast.mcast(framing::ClusterQueueResubscribeBody( framing::ProtocolVersion(), queue.getName())); - } -} - -void QueueContext::unsubscribed() { - QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName()); - queue.stop(); - sys::Mutex::ScopedLock l(lock); - owner = NOT_OWNER; } boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { @@ -102,4 +124,5 @@ boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { } + }} // namespace qpid::cluster |