summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp101
1 files changed, 52 insertions, 49 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 56800e6b95..ba06ee82f2 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -20,11 +20,12 @@
*
*/
-#include "QueueContext.h"
+#include "BrokerContext.h"
+#include "Group.h"
#include "Multicaster.h"
-#include "qpid/cluster/types.h"
-#include "BrokerContext.h" // for ScopedSuppressReplication
+#include "QueueContext.h"
#include "hash.h"
+#include "qpid/cluster/types.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/ClusterQueueResubscribeBody.h"
#include "qpid/framing/ClusterQueueSubscribeBody.h"
@@ -37,14 +38,14 @@
namespace qpid {
namespace cluster {
-QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m)
- : timer(boost::bind(&QueueContext::timeout, this),
- q.getBroker()->getTimer(),
- consumeLock),
- queue(q), mcast(m), consumers(0), hash(hashof(q.getName()))
+QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_)
+ : consumers(0), consuming(true), ticks(0),
+ queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())),
+ maxTicks(maxTicks_)
{
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
q.stopConsumers(); // Stop queue initially.
+ g.getTicker().add(this);
}
QueueContext::~QueueContext() {}
@@ -54,72 +55,74 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
}
// Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(
- QueueOwnership before, QueueOwnership after, bool selfDelivered)
+void QueueContext::replicaState(QueueOwnership before, QueueOwnership after)
{
- // No lock, this function does not touch any member variables.
-
- // Invariants for ownership:
- // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
- // SOLE_OWNER <=> timer stopped, queue started
- // SHARED_OWNER <=> timer started, queue started
-
- // Interested in state changes and my own events which lead to
- // ownership.
- if ((before != after || selfDelivered) && isOwner(after)) {
- QPID_LOG(trace, "cluster: start consumers on " << queue.getName() << ", timer "
- << (after==SHARED_OWNER? "start" : "stop"));
- queue.startConsumers();
- if (after == SHARED_OWNER) timer.start();
- else timer.stop();
+ // Interested in state changes which lead to ownership.
+ // We voluntarily give up ownership before multicasting
+ // the state change so we don't need to handle transitions
+ // that lead to non-ownership.
+ if (before != after && isOwner(after)) {
+ bool start = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ start = !consuming;
+ consuming = true;
+ ticks = 0;
+ }
+ if (start) queue.startConsumers();
}
-
- // If we lost ownership then the queue and timer will already have
- // been stopped by timeout()
}
// FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
// Called in broker threads when a consumer is added
void QueueContext::consume(size_t n) {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ }
if (n == 1) mcast.mcast(
framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
}
// Called in broker threads when a consumer is cancelled
void QueueContext::cancel(size_t n) {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
- // When consuming threads are stopped, this->stopped will be called.
- if (n == 0) {
- QPID_LOG(trace, "cluster: all consumers canceled on " << queue.getName());
- timer.stop();
- queue.stopConsumers();
+ bool stop = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ stop = (n == 0 && consuming);
}
+ if (stop) queue.stopConsumers();
}
-// Called in timer thread.
-void QueueContext::timeout() {
+// Called in Ticker thread.
+void QueueContext::tick() {
+ bool stop = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ stop = (consuming && ++ticks >= maxTicks);
+ }
// When all threads have stopped, queue will call stopped()
- QPID_LOG(trace, "cluster: lock timeout on " << queue.getName());
- queue.stopConsumers();
+ if (stop) queue.stopConsumers();
}
// Callback set up by queue.stopConsumers() called in connection or timer thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, "cluster: stopped consumers, "
- << (consumers == 0 ? "unsubscribe" : "resubscribe")
- << " to " << queue.getName());
- if (consumers == 0)
- mcast.mcast(framing::ClusterQueueUnsubscribeBody(
- framing::ProtocolVersion(), queue.getName()));
- else // FIXME aconway 2011-09-13: check if we're owner?
+ bool resubscribe = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ assert(consuming);
+ consuming = false;
+ resubscribe = consumers;
+ }
+ if (resubscribe)
mcast.mcast(framing::ClusterQueueResubscribeBody(
framing::ProtocolVersion(), queue.getName()));
+ else
+ mcast.mcast(framing::ClusterQueueUnsubscribeBody(
+ framing::ProtocolVersion(), queue.getName()));
}
void QueueContext::requeue(uint32_t position, bool redelivered) {