summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp180
1 files changed, 122 insertions, 58 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index ba06ee82f2..ff9c050348 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -21,34 +21,45 @@
*/
#include "BrokerContext.h"
+#include "EventHandler.h"
#include "Group.h"
#include "Multicaster.h"
#include "QueueContext.h"
+#include "QueueHandler.h"
#include "hash.h"
-#include "qpid/cluster/types.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/ClusterQueueResubscribeBody.h"
-#include "qpid/framing/ClusterQueueSubscribeBody.h"
-#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/ClusterMessageAcquireBody.h"
+#include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterQueueConsumedBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
+#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
+using framing::SequenceSet;
+const framing::ProtocolVersion pv; // shorthand
+
QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_)
- : consumers(0), consuming(true), ticks(0),
+ : ownership(UNSUBSCRIBED), consumers(0), consuming(false), ticks(0),
queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())),
- maxTicks(maxTicks_)
+ maxTicks(maxTicks_), group(g)
{
- q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+ q.setClusterContext(std::auto_ptr<broker::Context>(this));
q.stopConsumers(); // Stop queue initially.
- g.getTicker().add(this);
+ group.getTicker().add(this);
}
-QueueContext::~QueueContext() {}
+QueueContext::~QueueContext() {
+ // Lifecycle: must remove all references to this context before it is deleted.
+ // Must be sure that there can be no use of this context later.
+ group.getTicker().remove(this);
+ group.getEventHandler().getHandler<QueueHandler>()->remove(queue);
+}
namespace {
bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
@@ -57,72 +68,66 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
// Called by QueueReplica in CPG deliver thread when state changes.
void QueueContext::replicaState(QueueOwnership before, QueueOwnership after)
{
+ sys::Mutex::ScopedLock l(lock);
// Interested in state changes which lead to ownership.
// We voluntarily give up ownership before multicasting
// the state change so we don't need to handle transitions
// that lead to non-ownership.
+
if (before != after && isOwner(after)) {
- bool start = false;
- {
- sys::Mutex::ScopedLock l(lock);
- start = !consuming;
- consuming = true;
- ticks = 0;
- }
- if (start) queue.startConsumers();
+ assert(before == ownership);
+ if (!consuming) queue.startConsumers();
+ consuming = true;
+ ticks = 0;
}
+ ownership = after;
}
// FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
// Called in broker threads when a consumer is added
void QueueContext::consume(size_t n) {
- {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
- }
- if (n == 1) mcast.mcast(
- framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
+ sys::Mutex::ScopedLock l(lock);
+ if (consumers == 0 && n > 0 && ownership == UNSUBSCRIBED)
+ mcast.mcast(
+ framing::ClusterQueueSubscribeBody(pv, queue.getName()));
+ consumers = n;
}
// Called in broker threads when a consumer is cancelled
void QueueContext::cancel(size_t n) {
- bool stop = false;
- {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
- stop = (n == 0 && consuming);
- }
- if (stop) queue.stopConsumers();
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ if (n == 0 && consuming) queue.stopConsumers();
}
+// FIXME aconway 2011-11-03: review scope of locking around sendConsumed
+
// Called in Ticker thread.
void QueueContext::tick() {
- bool stop = false;
- {
- sys::Mutex::ScopedLock l(lock);
- stop = (consuming && ++ticks >= maxTicks);
- }
- // When all threads have stopped, queue will call stopped()
- if (stop) queue.stopConsumers();
+ sys::Mutex::ScopedLock l(lock);
+ if (!consuming) return; // Nothing to do if we don't have the lock.
+ if (ownership == SHARED_OWNER && ++ticks >= maxTicks) queue.stopConsumers();
+ else if (ownership == SOLE_OWNER) sendConsumed(l); // Status report on consumption
}
// Callback set up by queue.stopConsumers() called in connection or timer thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
- bool resubscribe = false;
- {
- sys::Mutex::ScopedLock l(lock);
- assert(consuming);
- consuming = false;
- resubscribe = consumers;
- }
- if (resubscribe)
- mcast.mcast(framing::ClusterQueueResubscribeBody(
- framing::ProtocolVersion(), queue.getName()));
- else
- mcast.mcast(framing::ClusterQueueUnsubscribeBody(
- framing::ProtocolVersion(), queue.getName()));
+ sys::Mutex::ScopedLock l(lock);
+ if (!consuming) return; // !consuming => initial stopConsumers in ctor.
+ sendConsumed(l);
+ mcast.mcast(
+ framing::ClusterQueueUnsubscribeBody(pv, queue.getName(), consumers));
+ consuming = false;
+}
+
+void QueueContext::sendConsumed(const sys::Mutex::ScopedLock&) {
+ if (acquired.empty() && dequeued.empty()) return; // Nothing to send
+ mcast.mcast(
+ framing::ClusterQueueConsumedBody(pv, queue.getName(), acquired,dequeued));
+ acquired.clear();
+ dequeued.clear();
}
void QueueContext::requeue(uint32_t position, bool redelivered) {
@@ -135,17 +140,76 @@ void QueueContext::requeue(uint32_t position, bool redelivered) {
}
}
-void QueueContext::acquire(const broker::QueuedMessage& qm) {
- unacked.put(qm.position, qm);
+void QueueContext::localAcquire(uint32_t position) {
+ QPID_LOG(trace, "cluster queue " << queue.getName() << " acquired " << position);
+ sys::Mutex::ScopedLock l(lock);
+ assert(consuming);
+ acquired.add(position);
+}
+
+void QueueContext::localDequeue(uint32_t position) {
+ QPID_LOG(trace, "cluster queue " << queue.getName() << " dequeued " << position);
+ // FIXME aconway 2010-10-28: for local dequeues, we should
+ // complete the ack that initiated the dequeue at this point.
+ sys::Mutex::ScopedLock l(lock);
+
+ // FIXME aconway 2011-11-03: this assertion fails for explicit accept
+ // because it doesn't respect the consume lock.
+ // assert(consuming);
+
+ dequeued.add(position);
+}
+
+void QueueContext::consumed(
+ const MemberId& sender,
+ const SequenceSet& acquired,
+ const SequenceSet& dequeued)
+{
+ // No lock, doesn't touch any members.
+
+ // FIXME aconway 2011-09-15: systematic logging across cluster module.
+ // FIXME aconway 2011-09-23: pretty printing for identifier.
+ QPID_LOG(trace, "cluster: " << sender << " acquired: " << acquired
+ << " dequeued: " << dequeued << " on queue: " << queue.getName());
+
+ // Note acquires from other members. My own acquires were executed in
+ // the connection thread
+ if (sender != group.getSelf()) {
+ // FIXME aconway 2011-09-23: avoid individual finds, scan queue once.
+ for (SequenceSet::iterator i = acquired.begin(); i != acquired.end(); ++i)
+ acquire(*i);
+ }
+ // Process deques from the queue owner.
+ // FIXME aconway 2011-09-23: avoid individual finds, scan queue once.
+ for (SequenceSet::iterator i = dequeued.begin(); i != dequeued.end(); ++i)
+ dequeue(*i);
+}
+
+// Remote acquire
+void QueueContext::acquire(uint32_t position) {
+ // No lock, doesn't touch any members.
+ broker::QueuedMessage qm;
+ BrokerContext::ScopedSuppressReplication ssr;
+ if (!queue.acquireMessageAt(position, qm))
+ // FIXME aconway 2011-10-31: error handling
+ throw Exception(QPID_MSG("cluster: acquire: message not found: "
+ << queue.getName() << "[" << position << "]"));
+ assert(qm.position.getValue() == position);
+ assert(qm.payload);
+ unacked.put(qm.position, qm); // unacked has its own lock.
}
-broker::QueuedMessage QueueContext::dequeue(uint32_t position) {
- return unacked.pop(position);
+void QueueContext::dequeue(uint32_t position) {
+ // No lock, doesn't touch any members. unacked has its own lock.
+ broker::QueuedMessage qm = unacked.pop(position);
+ BrokerContext::ScopedSuppressReplication ssr;
+ if (qm.queue) queue.dequeue(0, qm);
}
-boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
- return boost::intrusive_ptr<QueueContext>(
- static_cast<QueueContext*>(q.getClusterContext().get()));
+QueueContext* QueueContext::get(broker::Queue& q) {
+ return static_cast<QueueContext*>(q.getClusterContext());
}
+// FIXME aconway 2011-09-23: make unacked a plain map, use lock.
+
}} // namespace qpid::cluster