summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp113
1 files changed, 68 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 6c97c906e8..122163ee7e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -21,79 +21,101 @@
#include "QueueContext.h"
#include "Multicaster.h"
+#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/ClusterQueueResubscribeBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
-
+#include "qpid/sys/Timer.h"
namespace qpid {
namespace cluster {
+
+class OwnershipTimeout : public sys::TimerTask {
+ QueueContext& queueContext;
+
+ public:
+ OwnershipTimeout(QueueContext& qc, const sys::Duration& interval) :
+ TimerTask(interval, "QueueContext::OwnershipTimeout"), queueContext(qc) {}
+
+ // FIXME aconway 2011-07-27: thread safety on deletion?
+ void fire() { queueContext.timeout(); }
+};
+
QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
- : owner(NOT_OWNER), count(0), queue(q), mcast(m)
+ : timer(q.getBroker()->getTimer()), queue(q), mcast(m), consumers(0)
{
- QPID_LOG(debug, "Assign cluster context to queue " << q.getName());
- q.stop(); // Initially stopped. Must all before setClusterContext
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+ q.stop(); // Initially stopped.
+}
+QueueContext::~QueueContext() {
+ // FIXME aconway 2011-07-27: revisit shutdown logic.
+ // timeout() could be called concurrently with destructor.
+ sys::Mutex::ScopedLock l(lock);
+ timerTask->cancel();
}
-// Called by QueueReplica in deliver thread.
-void QueueContext::sharedOwner(size_t limit) {
- QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get());
+void QueueContext::replicaState(QueueOwnership state) {
sys::Mutex::ScopedLock l(lock);
- count = limit;
- if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
- owner = SHARED_OWNER;
+ switch (state) {
+ case UNSUBSCRIBED:
+ case SUBSCRIBED:
+ break;
+ case SOLE_OWNER:
+ queue.start();
+ if (timerTask) { // no need for timeout.
+ timerTask->cancel();
+ timerTask = 0;
+ }
+ break;
+ case SHARED_OWNER:
+ queue.start();
+ if (timerTask) timerTask->cancel();
+ // FIXME aconway 2011-07-28: configurable interval.
+ timerTask = new OwnershipTimeout(*this, 100*sys::TIME_MSEC);
+ timer.add(timerTask);
+ break;
+ }
}
-// Called by QueueReplica in deliver thread.
-void QueueContext::soleOwner() {
- QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get());
+// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer.
+
+void QueueContext::consume(size_t n) {
sys::Mutex::ScopedLock l(lock);
- count = 0;
- if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
- owner = SOLE_OWNER;
+ consumers = n;
+ if (n == 1) mcast.mcast(
+ framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
}
-// Called by BrokerContext in connection thread(s) on acquiring a message
-void QueueContext::acquire() {
- bool stop = false;
- {
- sys::Mutex::ScopedLock l(lock);
- assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own.
- QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName()
- << " owner=" << owner << " count=" << count);
- if (owner == SHARED_OWNER) {
- // Note count could be 0 if there are concurrent calls to acquire.
- if (count && --count == 0) {
- stop = true;
- }
- }
- }
- // FIXME aconway 2011-06-28: could have multiple stop() threads...
- if (stop) queue.stop();
+void QueueContext::cancel(size_t n) {
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ if (n == 0) queue.stop(); // FIXME aconway 2011-07-28: Ok inside lock?
+}
+
+void QueueContext::timeout() {
+ QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName());
+ queue.stop();
}
-// Callback set up by queue.stop()
+
+// Callback set up by queue.stop(), called when no threads are dispatching from the queue.
+// Release the queue.
void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
- if (owner == NOT_OWNER) {
+ // FIXME aconway 2011-07-28: review thread safety of state.
+ // Deffered call to stopped doesn't sit well.
+ // queueActive is invaled while stop is in progress?
+ if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));
- } else {
- owner = NOT_OWNER;
+ else
mcast.mcast(framing::ClusterQueueResubscribeBody(
framing::ProtocolVersion(), queue.getName()));
- }
-}
-
-void QueueContext::unsubscribed() {
- QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName());
- queue.stop();
- sys::Mutex::ScopedLock l(lock);
- owner = NOT_OWNER;
}
boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
@@ -102,4 +124,5 @@ boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
}
+
}} // namespace qpid::cluster