summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp30
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp180
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h47
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp31
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.h16
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp28
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h12
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Ticker.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Ticker.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp2
17 files changed, 248 insertions, 153 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 9943b3d2b5..5871810a78 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -26,8 +26,6 @@
#include "QueueContext.h"
#include "hash.h"
#include "qpid/framing/ClusterMessageEnqueueBody.h"
-#include "qpid/framing/ClusterMessageAcquireBody.h"
-#include "qpid/framing/ClusterMessageDequeueBody.h"
#include "qpid/framing/ClusterMessageRequeueBody.h"
#include "qpid/framing/ClusterWiringCreateQueueBody.h"
#include "qpid/framing/ClusterWiringCreateExchangeBody.h"
@@ -113,14 +111,13 @@ void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {}
void BrokerContext::acquire(const broker::QueuedMessage& qm) {
if (tssReplicate) {
assert(!qm.queue->isConsumingStopped());
- mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
+ QueueContext::get(*qm.queue)->localAcquire(qm.position);
}
}
void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
if (tssReplicate)
- mcaster(qm).mcast(
- ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position));
+ QueueContext::get(*qm.queue)->localDequeue(qm.position);
}
void BrokerContext::requeue(const broker::QueuedMessage& qm) {
@@ -135,8 +132,7 @@ void BrokerContext::requeue(const broker::QueuedMessage& qm) {
void BrokerContext::create(broker::Queue& q) {
if (!tssReplicate) return;
assert(!QueueContext::get(q));
- boost::intrusive_ptr<QueueContext> context(
- new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks));
+ new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks);
std::string data(q.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
q.encode(buf);
@@ -188,7 +184,7 @@ void BrokerContext::cancel(broker::Queue& q, size_t n) {
}
void BrokerContext::stopped(broker::Queue& q) {
- boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
+ QueueContext* qc = QueueContext::get(q);
// Don't forward the stopped call if the queue does not yet have a
// cluster context - this when the queue is first created locally.
if (qc) qc->stopped();
diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
index 68b9d5075b..52ae3eeea3 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
@@ -36,7 +36,7 @@ struct Cluster2Plugin : public Plugin {
addOptions()
("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.")
- ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in the cluster.")
+ ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in microseconds.")
("cluster2-consume-ticks", optValue(settings.consumeTicks, "N"), "Maximum number of ticks a broker can hold the consume lock on a shared queue.");
// FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h.
// FIXME aconway 2011-10-05: rename to final option names.
diff --git a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
index 2e86949dfd..144831dde0 100644
--- a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
+++ b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
@@ -61,6 +61,8 @@ class CountdownTimer {
}
}
+ bool isRunning() const { return timerRunning; }
+
private:
class Task : public sys::TimerTask {
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
index dc10548f80..3c19967e06 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -71,11 +71,11 @@ void EventHandler::deliver(
try {
handle(frame);
} catch (const std::exception& e) {
- // FIXME aconway 2011-10-19: error handling.
- QPID_LOG(error, "cluster: error in deliver on " << cpg.getName()
- << " from " << PrettyId(sender, self)
- << ": " << frame
- << ": " << e.what());
+ // FIXME aconway 2011-10-19: error handling.
+ QPID_LOG(error, "cluster event: " << e.what()
+ << " (sender=" << PrettyId(sender, self) << " group=" << cpg.getName()
+ << " " << frame << ")");
+
}
}
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
index 9f9ae1a856..f3a678f026 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
@@ -78,6 +78,14 @@ class EventHandler : public Cpg::Handler
MemberId getSelf() { return self; }
Cpg& getCpg() { return cpg; }
+ template <class HandlerT> boost::intrusive_ptr<HandlerT> getHandler() {
+ for (size_t i = 0; i < handlers.size(); ++i) {
+ boost::intrusive_ptr<HandlerT> p(dynamic_cast<HandlerT*>(handlers[i].get()));
+ if (p) return p;
+ }
+ return 0;
+ }
+
private:
void handle(const framing::AMQFrame&);
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
index c6d98856a1..0bb805da5b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
@@ -54,4 +54,7 @@ Group::~Group() {}
void Group::mcast(const framing::AMQBody& b) { multicaster->mcast(b); }
void Group::mcast(const framing::AMQFrame& f) { multicaster->mcast(f); }
+
+MemberId Group::getSelf() const { return eventHandler->getSelf(); }
+
}} // namespace qpid::cluster::exp
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h
index 49b33c6a70..cb216faf8a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.h
@@ -22,6 +22,7 @@
*
*/
+#include "qpid/cluster/types.h"
#include "qpid/RefCounted.h"
#include <memory>
@@ -57,6 +58,7 @@ class Group : public RefCounted
MessageHolder& getMessageHolder() { return *messageHolder; }
MessageBuilders& getMessageBuilders() { return *messageBuilders; }
Ticker& getTicker() { return *ticker; }
+ MemberId getSelf() const;
void mcast(const framing::AMQBody&);
void mcast(const framing::AMQFrame&);
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 21129b0fae..c8997877c5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -88,38 +88,26 @@ void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
// We only need to build message from other brokers, our own messages
// are held by the MessageHolder.
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "cluster: enqueue");
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster enqueue");
messageBuilders.announce(sender(), channel, queue);
}
}
-// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
-// and scan queue once.
void MessageHandler::acquire(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
- QPID_LOG(trace, "cluster: message " << q << "[" << position
+ QPID_LOG(trace, "cluster message " << q << "[" << position
<< "] acquired by " << PrettyId(sender(), self()));
// Note acquires from other members. My own acquires were executed in
// the broker thread
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "cluster: acquire");
- QueuedMessage qm;
- BrokerContext::ScopedSuppressReplication ssr;
- if (!queue->acquireMessageAt(position, qm))
- throw Exception(QPID_MSG("cluster: acquire: message not found: "
- << q << "[" << position << "]"));
- assert(qm.position.getValue() == position);
- assert(qm.payload);
- // Save on context for possible requeue if released/rejected.
- QueueContext::get(*queue)->acquire(qm);
- // FIXME aconway 2011-09-19: need to record by member-ID to
- // requeue if member leaves.
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster acquire");
+ QueueContext::get(*queue)->acquire(position);
}
}
void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
- QPID_LOG(trace, "cluster: message " << q << "[" << position
+ QPID_LOG(trace, "cluster message " << q << "[" << position
<< "] dequeued by " << PrettyId(sender(), self()));
// FIXME aconway 2010-10-28: for local dequeues, we should
@@ -128,16 +116,14 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// My own dequeues were processed in the broker thread before multicasting.
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "cluster: dequeue");
- QueuedMessage qm = QueueContext::get(*queue)->dequeue(position);
- BrokerContext::ScopedSuppressReplication ssr;
- if (qm.queue) queue->dequeue(0, qm);
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster dequeue");
+ QueueContext::get(*queue)->dequeue(position);
}
}
void MessageHandler::requeue(const std::string& q, uint32_t position, bool redelivered) {
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "cluster: requeue");
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster requeue");
QueueContext::get(*queue)->requeue(position, redelivered);
}
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index ba06ee82f2..ff9c050348 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -21,34 +21,45 @@
*/
#include "BrokerContext.h"
+#include "EventHandler.h"
#include "Group.h"
#include "Multicaster.h"
#include "QueueContext.h"
+#include "QueueHandler.h"
#include "hash.h"
-#include "qpid/cluster/types.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/ClusterQueueResubscribeBody.h"
-#include "qpid/framing/ClusterQueueSubscribeBody.h"
-#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/ClusterMessageAcquireBody.h"
+#include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterQueueConsumedBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
+#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
+using framing::SequenceSet;
+const framing::ProtocolVersion pv; // shorthand
+
QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_)
- : consumers(0), consuming(true), ticks(0),
+ : ownership(UNSUBSCRIBED), consumers(0), consuming(false), ticks(0),
queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())),
- maxTicks(maxTicks_)
+ maxTicks(maxTicks_), group(g)
{
- q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+ q.setClusterContext(std::auto_ptr<broker::Context>(this));
q.stopConsumers(); // Stop queue initially.
- g.getTicker().add(this);
+ group.getTicker().add(this);
}
-QueueContext::~QueueContext() {}
+QueueContext::~QueueContext() {
+ // Lifecycle: must remove all references to this context before it is deleted.
+ // Must be sure that there can be no use of this context later.
+ group.getTicker().remove(this);
+ group.getEventHandler().getHandler<QueueHandler>()->remove(queue);
+}
namespace {
bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
@@ -57,72 +68,66 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
// Called by QueueReplica in CPG deliver thread when state changes.
void QueueContext::replicaState(QueueOwnership before, QueueOwnership after)
{
+ sys::Mutex::ScopedLock l(lock);
// Interested in state changes which lead to ownership.
// We voluntarily give up ownership before multicasting
// the state change so we don't need to handle transitions
// that lead to non-ownership.
+
if (before != after && isOwner(after)) {
- bool start = false;
- {
- sys::Mutex::ScopedLock l(lock);
- start = !consuming;
- consuming = true;
- ticks = 0;
- }
- if (start) queue.startConsumers();
+ assert(before == ownership);
+ if (!consuming) queue.startConsumers();
+ consuming = true;
+ ticks = 0;
}
+ ownership = after;
}
// FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
// Called in broker threads when a consumer is added
void QueueContext::consume(size_t n) {
- {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
- }
- if (n == 1) mcast.mcast(
- framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
+ sys::Mutex::ScopedLock l(lock);
+ if (consumers == 0 && n > 0 && ownership == UNSUBSCRIBED)
+ mcast.mcast(
+ framing::ClusterQueueSubscribeBody(pv, queue.getName()));
+ consumers = n;
}
// Called in broker threads when a consumer is cancelled
void QueueContext::cancel(size_t n) {
- bool stop = false;
- {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
- stop = (n == 0 && consuming);
- }
- if (stop) queue.stopConsumers();
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ if (n == 0 && consuming) queue.stopConsumers();
}
+// FIXME aconway 2011-11-03: review scope of locking around sendConsumed
+
// Called in Ticker thread.
void QueueContext::tick() {
- bool stop = false;
- {
- sys::Mutex::ScopedLock l(lock);
- stop = (consuming && ++ticks >= maxTicks);
- }
- // When all threads have stopped, queue will call stopped()
- if (stop) queue.stopConsumers();
+ sys::Mutex::ScopedLock l(lock);
+ if (!consuming) return; // Nothing to do if we don't have the lock.
+ if (ownership == SHARED_OWNER && ++ticks >= maxTicks) queue.stopConsumers();
+ else if (ownership == SOLE_OWNER) sendConsumed(l); // Status report on consumption
}
// Callback set up by queue.stopConsumers() called in connection or timer thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
- bool resubscribe = false;
- {
- sys::Mutex::ScopedLock l(lock);
- assert(consuming);
- consuming = false;
- resubscribe = consumers;
- }
- if (resubscribe)
- mcast.mcast(framing::ClusterQueueResubscribeBody(
- framing::ProtocolVersion(), queue.getName()));
- else
- mcast.mcast(framing::ClusterQueueUnsubscribeBody(
- framing::ProtocolVersion(), queue.getName()));
+ sys::Mutex::ScopedLock l(lock);
+ if (!consuming) return; // !consuming => initial stopConsumers in ctor.
+ sendConsumed(l);
+ mcast.mcast(
+ framing::ClusterQueueUnsubscribeBody(pv, queue.getName(), consumers));
+ consuming = false;
+}
+
+void QueueContext::sendConsumed(const sys::Mutex::ScopedLock&) {
+ if (acquired.empty() && dequeued.empty()) return; // Nothing to send
+ mcast.mcast(
+ framing::ClusterQueueConsumedBody(pv, queue.getName(), acquired,dequeued));
+ acquired.clear();
+ dequeued.clear();
}
void QueueContext::requeue(uint32_t position, bool redelivered) {
@@ -135,17 +140,76 @@ void QueueContext::requeue(uint32_t position, bool redelivered) {
}
}
-void QueueContext::acquire(const broker::QueuedMessage& qm) {
- unacked.put(qm.position, qm);
+void QueueContext::localAcquire(uint32_t position) {
+ QPID_LOG(trace, "cluster queue " << queue.getName() << " acquired " << position);
+ sys::Mutex::ScopedLock l(lock);
+ assert(consuming);
+ acquired.add(position);
+}
+
+void QueueContext::localDequeue(uint32_t position) {
+ QPID_LOG(trace, "cluster queue " << queue.getName() << " dequeued " << position);
+ // FIXME aconway 2010-10-28: for local dequeues, we should
+ // complete the ack that initiated the dequeue at this point.
+ sys::Mutex::ScopedLock l(lock);
+
+ // FIXME aconway 2011-11-03: this assertion fails for explicit accept
+ // because it doesn't respect the consume lock.
+ // assert(consuming);
+
+ dequeued.add(position);
+}
+
+void QueueContext::consumed(
+ const MemberId& sender,
+ const SequenceSet& acquired,
+ const SequenceSet& dequeued)
+{
+ // No lock, doesn't touch any members.
+
+ // FIXME aconway 2011-09-15: systematic logging across cluster module.
+ // FIXME aconway 2011-09-23: pretty printing for identifier.
+ QPID_LOG(trace, "cluster: " << sender << " acquired: " << acquired
+ << " dequeued: " << dequeued << " on queue: " << queue.getName());
+
+ // Note acquires from other members. My own acquires were executed in
+ // the connection thread
+ if (sender != group.getSelf()) {
+ // FIXME aconway 2011-09-23: avoid individual finds, scan queue once.
+ for (SequenceSet::iterator i = acquired.begin(); i != acquired.end(); ++i)
+ acquire(*i);
+ }
+ // Process deques from the queue owner.
+ // FIXME aconway 2011-09-23: avoid individual finds, scan queue once.
+ for (SequenceSet::iterator i = dequeued.begin(); i != dequeued.end(); ++i)
+ dequeue(*i);
+}
+
+// Remote acquire
+void QueueContext::acquire(uint32_t position) {
+ // No lock, doesn't touch any members.
+ broker::QueuedMessage qm;
+ BrokerContext::ScopedSuppressReplication ssr;
+ if (!queue.acquireMessageAt(position, qm))
+ // FIXME aconway 2011-10-31: error handling
+ throw Exception(QPID_MSG("cluster: acquire: message not found: "
+ << queue.getName() << "[" << position << "]"));
+ assert(qm.position.getValue() == position);
+ assert(qm.payload);
+ unacked.put(qm.position, qm); // unacked has its own lock.
}
-broker::QueuedMessage QueueContext::dequeue(uint32_t position) {
- return unacked.pop(position);
+void QueueContext::dequeue(uint32_t position) {
+ // No lock, doesn't touch any members. unacked has its own lock.
+ broker::QueuedMessage qm = unacked.pop(position);
+ BrokerContext::ScopedSuppressReplication ssr;
+ if (qm.queue) queue.dequeue(0, qm);
}
-boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
- return boost::intrusive_ptr<QueueContext>(
- static_cast<QueueContext*>(q.getClusterContext().get()));
+QueueContext* QueueContext::get(broker::Queue& q) {
+ return static_cast<QueueContext*>(q.getClusterContext());
}
+// FIXME aconway 2011-09-23: make unacked a plain map, use lock.
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 20c2aabc1d..d7079ab8a5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -25,11 +25,12 @@
#include "LockedMap.h"
#include "Ticker.h"
#include "qpid/RefCounted.h"
+#include "qpid/broker/Context.h"
+#include "qpid/cluster/types.h"
+#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/AtomicValue.h"
-#include "qpid/sys/Time.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/cluster/types.h"
-#include <boost/intrusive_ptr.hpp>
+#include "qpid/sys/Time.h"
namespace qpid {
namespace broker {
@@ -43,12 +44,13 @@ class Multicaster;
class Group;
/**
- * Queue state that is not replicated to the cluster.
- * Manages the local queue start/stop status.
+ * Local Queue state, manage start/stop consuming on the queue.
+ * Destroyed when the queue is destroyed, it must erase itself
+ * from any cluster data structures in its destructor.
*
-* THREAD SAFE: Called by connection threads and Ticker dispatch threads.
+ * THREAD SAFE: Called by connection threads and Ticker dispatch threads.
*/
-class QueueContext : public Ticker::Tickable {
+class QueueContext : public broker::Context, Ticker::Tickable {
public:
QueueContext(broker::Queue&, Group&, size_t consumeTicks);
~QueueContext();
@@ -80,33 +82,50 @@ class QueueContext : public Ticker::Tickable {
/** Called by MessageHandler to requeue a message. */
void requeue(uint32_t position, bool redelivered);
+ /** Called by BrokerContext when a mesages is acquired locally. */
+ void localAcquire(uint32_t position);
+
/** Called by MessageHandler when a mesages is acquired. */
- void acquire(const broker::QueuedMessage& qm);
+ void acquire(uint32_t position);
/** Called by MesageHandler when a message is dequeued. */
- broker::QueuedMessage dequeue(uint32_t position);
+ void dequeue(uint32_t position);
- size_t getHash() const { return hash; }
+ /** Called by BrokerContext when a message is dequeued locally. */
+ void localDequeue(uint32_t position);
+
+ /** Called in deliver thread, take note of another brokers acquires/dequeues. */
+ void consumed(const MemberId&,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued);
+ size_t getHash() const { return hash; }
+ broker::Queue& getQueue() { return queue; }
+
/** Get the cluster context for a broker queue. */
- static boost::intrusive_ptr<QueueContext> get(broker::Queue&) ;
+ static QueueContext* get(broker::Queue&);
private:
+ void sendConsumed(const sys::Mutex::ScopedLock&);
+
sys::Mutex lock;
- size_t consumers; // Number of local consumers
- bool consuming; // True if we have the lock & local consumers are active
+ QueueOwnership ownership; // Ownership status.
+ size_t consumers; // Number of local consumers.
+ bool consuming; // True if we have the lock.
size_t ticks; // Ticks since we got the lock.
// Following members are immutable
- broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
+ broker::Queue& queue;
Multicaster& mcast;
size_t hash;
size_t maxTicks; // Max ticks we are allowed.
+ framing::SequenceSet acquired, dequeued; // Track local acquires/dequeues.
// Following members are safe to use without holding a lock
typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap;
UnackedMap unacked;
+ Group& group;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 0c96e9326d..f763841c20 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -45,25 +45,36 @@ bool QueueHandler::handle(const framing::AMQFrame& frame) {
void QueueHandler::subscribe(const std::string& queue) {
find(queue)->subscribe(sender());
}
-void QueueHandler::unsubscribe(const std::string& queue) {
- find(queue)->unsubscribe(sender());
+
+void QueueHandler::unsubscribe(const std::string& queue,
+ bool resubscribe) {
+ find(queue)->unsubscribe(sender(), resubscribe);
}
-void QueueHandler::resubscribe(const std::string& queue) {
- find(queue)->resubscribe(sender());
+
+void QueueHandler::consumed(const std::string& queue,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued)
+{
+ find(queue)->consumed(sender(), acquired, dequeued);
}
void QueueHandler::left(const MemberId& member) {
// Unsubscribe for members that leave.
for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i)
- i->second->unsubscribe(member);
+ i->second->unsubscribe(member, false);
}
-void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
+void QueueHandler::add(broker::Queue& q) {
// Local queues already have a context, remote queues need one.
- if (!QueueContext::get(*q))
- new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue
- queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
- new QueueReplica(q, self()));
+ if (!QueueContext::get(q))
+ new QueueContext(q, group, consumeTicks); // Context attaches to the Queue
+ assert(QueueContext::get(q));
+ queues[q.getName()] = boost::intrusive_ptr<QueueReplica>(
+ new QueueReplica(*QueueContext::get(q), self()));
+}
+
+void QueueHandler::remove(broker::Queue& q) {
+ queues.erase(q.getName());
}
boost::intrusive_ptr<QueueReplica> QueueHandler::find(const std::string& queue) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
index 84e8b75cfb..0f9937641b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
@@ -60,15 +60,17 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
// Events
void subscribe(const std::string& queue);
- void unsubscribe(const std::string& queue);
- void resubscribe(const std::string& queue);
- void left(const MemberId&);
- void add(boost::shared_ptr<broker::Queue>);
+ void unsubscribe(const std::string& queue, bool resubscribe);
+
+ void consumed(const std::string& queue,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued);
+
+ void left(const MemberId&);
- // NB: These functions ar called in broker threads, not deliver threads.
- void acquired(const broker::QueuedMessage& qm);
- void empty(const broker::Queue& q);
+ void add(broker::Queue&);
+ void remove(broker::Queue&);
private:
typedef std::map<std::string, boost::intrusive_ptr<QueueReplica> > QueueMap;
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 11a7496582..66a7a81f33 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -28,9 +28,8 @@
namespace qpid {
namespace cluster {
-QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q,
- const MemberId& self_)
- : queue(q), self(self_), context(QueueContext::get(*q))
+QueueReplica::QueueReplica(QueueContext& qc, const MemberId& self_)
+ : self(self_), context(qc)
{}
struct PrintSubscribers {
@@ -56,27 +55,28 @@ void QueueReplica::subscribe(const MemberId& member) {
update(before);
}
-// FIXME aconway 2011-09-20: need to requeue.
-void QueueReplica::unsubscribe(const MemberId& member) {
+void QueueReplica::unsubscribe(const MemberId& member, bool resubscribe)
+{
+ assert(!resubscribe || member == subscribers.front());
QueueOwnership before = getState();
MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
- if (i != subscribers.end()) subscribers.erase(i, subscribers.end());
+ subscribers.erase(i, subscribers.end());
+ if (resubscribe) subscribers.push_back(member);
update(before);
}
-void QueueReplica::resubscribe(const MemberId& member) {
- assert (member == subscribers.front());
- QueueOwnership before = getState();
- subscribers.pop_front();
- subscribers.push_back(member);
- update(before);
+void QueueReplica::consumed(const MemberId& member,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued)
+{
+ context.consumed(member, acquired, dequeued);
}
void QueueReplica::update(QueueOwnership before) {
QueueOwnership after = getState();
- QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": "
+ QPID_LOG(trace, "cluster: queue replica: " << context.getQueue().getName() << ": "
<< before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
- context->replicaState(before, after);
+ context.replicaState(before, after);
}
QueueOwnership QueueReplica::getState() const {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index 31faf4853a..ca92de1e30 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -50,20 +50,22 @@ struct PrintSubscribers;
class QueueReplica : public RefCounted
{
public:
- QueueReplica(boost::shared_ptr<broker::Queue> , const MemberId& );
+ QueueReplica(QueueContext&, const MemberId& );
void subscribe(const MemberId&);
- void unsubscribe(const MemberId&);
- void resubscribe(const MemberId&);
+ void unsubscribe(const MemberId&, bool resubscribe);
+ void consumed(const MemberId&,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued);
MemberId getSelf() const { return self; }
private:
typedef std::deque<MemberId> MemberQueue;
- boost::shared_ptr<broker::Queue> queue;
+ std::string name;
MemberQueue subscribers;
MemberId self;
- boost::intrusive_ptr<QueueContext> context;
+ QueueContext& context;
QueueOwnership getState() const;
bool isOwner() const;
diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
index 9ff04f2f54..1210eb7055 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
@@ -34,21 +34,21 @@ Ticker::Ticker(sys::Duration tick, sys::Timer& timer_,
timer.add(this);
}
-void Ticker::add(boost::intrusive_ptr<Tickable> t) {
+void Ticker::add(Tickable* t) {
sys::Mutex::ScopedLock l(lock);
tickables.push_back(t);
}
-void Ticker::remove(boost::intrusive_ptr<Tickable> t) {
+void Ticker::remove(Tickable* t) {
sys::Mutex::ScopedLock l(lock);
Tickables::iterator i = std::find(tickables.begin(), tickables.end(), t);
if (i != tickables.end()) tickables.erase(i);
}
-// Called by timer thread, sets condition
+// Called by timer thread
void Ticker::fire() {
condition.set();
- setupNextFire();
+ setupNextFire(); // FIXME aconway 2011-11-03: restart()?
timer.add(this);
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.h b/qpid/cpp/src/qpid/cluster/exp/Ticker.h
index 0a8d508a70..6910b7a0be 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Ticker.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.h
@@ -53,18 +53,18 @@ namespace cluster {
class Ticker : public sys::TimerTask
{
public:
- struct Tickable : public RefCounted {
+ struct Tickable {
virtual ~Tickable();
virtual void tick() = 0;
};
Ticker(sys::Duration tick, sys::Timer&, boost::shared_ptr<sys::Poller>);
- void add(boost::intrusive_ptr<Tickable>);
- void remove(boost::intrusive_ptr<Tickable>);
+ void add(Tickable*);
+ void remove(Tickable*);
private:
- typedef std::vector<boost::intrusive_ptr<Tickable> > Tickables;
+ typedef std::vector<Tickable*> Tickables;
void fire(); // Called in timer thread.
void dispatch(sys::PollableCondition&); // Called in IO thread
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index da110fed8f..c2f52f2a27 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -72,7 +72,7 @@ void WiringHandler::createQueue(const std::string& data) {
}
boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name);
assert(q); // FIXME aconway 2011-05-10: error handling.
- queueHandler->add(q);
+ queueHandler->add(*q);
QPID_LOG(debug, "cluster: create queue " << q->getName());
}