summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-11-04 20:27:13 +0000
committerAlan Conway <aconway@apache.org>2011-11-04 20:27:13 +0000
commit29e3b04915ef30f7e0f769cc1ee3994d99711fef (patch)
tree02e49caec0e4e7699413d36eab177a3d5bbb732d
parent561fe4dd6234c085dc55bbd430dcab7427d2db29 (diff)
downloadqpid-python-qpid-2920-active.tar.gz
QPID-2920: Batch acquire/dequeue messages in cluster.qpid-2920-active
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1197749 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/RangeSet.h2
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/qpid/broker/Context.h38
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp30
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp180
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h47
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp31
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.h16
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp28
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h12
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Ticker.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Ticker.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp2
-rwxr-xr-xqpid/cpp/src/tests/cluster2_tests.py31
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark3
-rw-r--r--qpid/cpp/xml/cluster.xml17
24 files changed, 318 insertions, 182 deletions
diff --git a/qpid/cpp/include/qpid/RangeSet.h b/qpid/cpp/include/qpid/RangeSet.h
index 36991fd784..b36030cbcd 100644
--- a/qpid/cpp/include/qpid/RangeSet.h
+++ b/qpid/cpp/include/qpid/RangeSet.h
@@ -133,6 +133,8 @@ class RangeSet
explicit RangeSet(const Range<T>& r) { *this += r; }
RangeSet(const T& a, const T& b) { *this += Range<T>(a,b); }
+ void swap(RangeSet& x) { ranges.swap(x.ranges); }
+
bool contiguous() const { return ranges.size() <= 1; }
bool contains(const T& t) const;
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 250491c394..5e59331b5f 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -530,6 +530,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/ConnectionHandler.h \
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionToken.h \
+ qpid/broker/Context .h \
qpid/broker/Consumer.h \
qpid/broker/Daemon.cpp \
qpid/broker/Daemon.h \
diff --git a/qpid/cpp/src/qpid/broker/Context.h b/qpid/cpp/src/qpid/broker/Context.h
new file mode 100644
index 0000000000..5770dd78ed
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Context.h
@@ -0,0 +1,38 @@
+#ifndef QPID_BROKER_CONTEXT_H
+#define QPID_BROKER_CONTEXT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for context objects.
+ */
+class Context
+{
+ public:
+ virtual ~Context() {}
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONTEXT_H*/
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index aae858f804..b57ec46adc 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -24,6 +24,7 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/OwnershipToken.h"
#include "qpid/broker/Consumer.h"
+#include "qpid/broker/Context.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Messages.h"
#include "qpid/broker/PersistableQueue.h"
@@ -130,7 +131,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
sys::Activity consuming; // Allow consumer threads to be stopped, used by cluster
- boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster
+ std::auto_ptr<Context> clusterContext; // Clustering state.
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -401,8 +402,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isConsumingStopped();
/** Context information used in a cluster. */
- boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; }
- void setClusterContext(boost::intrusive_ptr<RefCounted> context) { clusterContext = context; }
+ Context* getClusterContext() { return clusterContext.get(); }
+ void setClusterContext(std::auto_ptr<Context> context) { clusterContext = context; }
};
}} // qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 9943b3d2b5..5871810a78 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -26,8 +26,6 @@
#include "QueueContext.h"
#include "hash.h"
#include "qpid/framing/ClusterMessageEnqueueBody.h"
-#include "qpid/framing/ClusterMessageAcquireBody.h"
-#include "qpid/framing/ClusterMessageDequeueBody.h"
#include "qpid/framing/ClusterMessageRequeueBody.h"
#include "qpid/framing/ClusterWiringCreateQueueBody.h"
#include "qpid/framing/ClusterWiringCreateExchangeBody.h"
@@ -113,14 +111,13 @@ void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {}
void BrokerContext::acquire(const broker::QueuedMessage& qm) {
if (tssReplicate) {
assert(!qm.queue->isConsumingStopped());
- mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
+ QueueContext::get(*qm.queue)->localAcquire(qm.position);
}
}
void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
if (tssReplicate)
- mcaster(qm).mcast(
- ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position));
+ QueueContext::get(*qm.queue)->localDequeue(qm.position);
}
void BrokerContext::requeue(const broker::QueuedMessage& qm) {
@@ -135,8 +132,7 @@ void BrokerContext::requeue(const broker::QueuedMessage& qm) {
void BrokerContext::create(broker::Queue& q) {
if (!tssReplicate) return;
assert(!QueueContext::get(q));
- boost::intrusive_ptr<QueueContext> context(
- new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks));
+ new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks);
std::string data(q.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
q.encode(buf);
@@ -188,7 +184,7 @@ void BrokerContext::cancel(broker::Queue& q, size_t n) {
}
void BrokerContext::stopped(broker::Queue& q) {
- boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
+ QueueContext* qc = QueueContext::get(q);
// Don't forward the stopped call if the queue does not yet have a
// cluster context - this when the queue is first created locally.
if (qc) qc->stopped();
diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
index 68b9d5075b..52ae3eeea3 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
@@ -36,7 +36,7 @@ struct Cluster2Plugin : public Plugin {
addOptions()
("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.")
- ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in the cluster.")
+ ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in microseconds.")
("cluster2-consume-ticks", optValue(settings.consumeTicks, "N"), "Maximum number of ticks a broker can hold the consume lock on a shared queue.");
// FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h.
// FIXME aconway 2011-10-05: rename to final option names.
diff --git a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
index 2e86949dfd..144831dde0 100644
--- a/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
+++ b/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
@@ -61,6 +61,8 @@ class CountdownTimer {
}
}
+ bool isRunning() const { return timerRunning; }
+
private:
class Task : public sys::TimerTask {
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
index dc10548f80..3c19967e06 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -71,11 +71,11 @@ void EventHandler::deliver(
try {
handle(frame);
} catch (const std::exception& e) {
- // FIXME aconway 2011-10-19: error handling.
- QPID_LOG(error, "cluster: error in deliver on " << cpg.getName()
- << " from " << PrettyId(sender, self)
- << ": " << frame
- << ": " << e.what());
+ // FIXME aconway 2011-10-19: error handling.
+ QPID_LOG(error, "cluster event: " << e.what()
+ << " (sender=" << PrettyId(sender, self) << " group=" << cpg.getName()
+ << " " << frame << ")");
+
}
}
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
index 9f9ae1a856..f3a678f026 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
@@ -78,6 +78,14 @@ class EventHandler : public Cpg::Handler
MemberId getSelf() { return self; }
Cpg& getCpg() { return cpg; }
+ template <class HandlerT> boost::intrusive_ptr<HandlerT> getHandler() {
+ for (size_t i = 0; i < handlers.size(); ++i) {
+ boost::intrusive_ptr<HandlerT> p(dynamic_cast<HandlerT*>(handlers[i].get()));
+ if (p) return p;
+ }
+ return 0;
+ }
+
private:
void handle(const framing::AMQFrame&);
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
index c6d98856a1..0bb805da5b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
@@ -54,4 +54,7 @@ Group::~Group() {}
void Group::mcast(const framing::AMQBody& b) { multicaster->mcast(b); }
void Group::mcast(const framing::AMQFrame& f) { multicaster->mcast(f); }
+
+MemberId Group::getSelf() const { return eventHandler->getSelf(); }
+
}} // namespace qpid::cluster::exp
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h
index 49b33c6a70..cb216faf8a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.h
@@ -22,6 +22,7 @@
*
*/
+#include "qpid/cluster/types.h"
#include "qpid/RefCounted.h"
#include <memory>
@@ -57,6 +58,7 @@ class Group : public RefCounted
MessageHolder& getMessageHolder() { return *messageHolder; }
MessageBuilders& getMessageBuilders() { return *messageBuilders; }
Ticker& getTicker() { return *ticker; }
+ MemberId getSelf() const;
void mcast(const framing::AMQBody&);
void mcast(const framing::AMQFrame&);
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 21129b0fae..c8997877c5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -88,38 +88,26 @@ void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
// We only need to build message from other brokers, our own messages
// are held by the MessageHolder.
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "cluster: enqueue");
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster enqueue");
messageBuilders.announce(sender(), channel, queue);
}
}
-// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
-// and scan queue once.
void MessageHandler::acquire(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
- QPID_LOG(trace, "cluster: message " << q << "[" << position
+ QPID_LOG(trace, "cluster message " << q << "[" << position
<< "] acquired by " << PrettyId(sender(), self()));
// Note acquires from other members. My own acquires were executed in
// the broker thread
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "cluster: acquire");
- QueuedMessage qm;
- BrokerContext::ScopedSuppressReplication ssr;
- if (!queue->acquireMessageAt(position, qm))
- throw Exception(QPID_MSG("cluster: acquire: message not found: "
- << q << "[" << position << "]"));
- assert(qm.position.getValue() == position);
- assert(qm.payload);
- // Save on context for possible requeue if released/rejected.
- QueueContext::get(*queue)->acquire(qm);
- // FIXME aconway 2011-09-19: need to record by member-ID to
- // requeue if member leaves.
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster acquire");
+ QueueContext::get(*queue)->acquire(position);
}
}
void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
- QPID_LOG(trace, "cluster: message " << q << "[" << position
+ QPID_LOG(trace, "cluster message " << q << "[" << position
<< "] dequeued by " << PrettyId(sender(), self()));
// FIXME aconway 2010-10-28: for local dequeues, we should
@@ -128,16 +116,14 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// My own dequeues were processed in the broker thread before multicasting.
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "cluster: dequeue");
- QueuedMessage qm = QueueContext::get(*queue)->dequeue(position);
- BrokerContext::ScopedSuppressReplication ssr;
- if (qm.queue) queue->dequeue(0, qm);
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster dequeue");
+ QueueContext::get(*queue)->dequeue(position);
}
}
void MessageHandler::requeue(const std::string& q, uint32_t position, bool redelivered) {
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "cluster: requeue");
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster requeue");
QueueContext::get(*queue)->requeue(position, redelivered);
}
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index ba06ee82f2..ff9c050348 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -21,34 +21,45 @@
*/
#include "BrokerContext.h"
+#include "EventHandler.h"
#include "Group.h"
#include "Multicaster.h"
#include "QueueContext.h"
+#include "QueueHandler.h"
#include "hash.h"
-#include "qpid/cluster/types.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/framing/ClusterQueueResubscribeBody.h"
-#include "qpid/framing/ClusterQueueSubscribeBody.h"
-#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/ClusterMessageAcquireBody.h"
+#include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterQueueConsumedBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
+#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
+using framing::SequenceSet;
+const framing::ProtocolVersion pv; // shorthand
+
QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_)
- : consumers(0), consuming(true), ticks(0),
+ : ownership(UNSUBSCRIBED), consumers(0), consuming(false), ticks(0),
queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())),
- maxTicks(maxTicks_)
+ maxTicks(maxTicks_), group(g)
{
- q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+ q.setClusterContext(std::auto_ptr<broker::Context>(this));
q.stopConsumers(); // Stop queue initially.
- g.getTicker().add(this);
+ group.getTicker().add(this);
}
-QueueContext::~QueueContext() {}
+QueueContext::~QueueContext() {
+ // Lifecycle: must remove all references to this context before it is deleted.
+ // Must be sure that there can be no use of this context later.
+ group.getTicker().remove(this);
+ group.getEventHandler().getHandler<QueueHandler>()->remove(queue);
+}
namespace {
bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
@@ -57,72 +68,66 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
// Called by QueueReplica in CPG deliver thread when state changes.
void QueueContext::replicaState(QueueOwnership before, QueueOwnership after)
{
+ sys::Mutex::ScopedLock l(lock);
// Interested in state changes which lead to ownership.
// We voluntarily give up ownership before multicasting
// the state change so we don't need to handle transitions
// that lead to non-ownership.
+
if (before != after && isOwner(after)) {
- bool start = false;
- {
- sys::Mutex::ScopedLock l(lock);
- start = !consuming;
- consuming = true;
- ticks = 0;
- }
- if (start) queue.startConsumers();
+ assert(before == ownership);
+ if (!consuming) queue.startConsumers();
+ consuming = true;
+ ticks = 0;
}
+ ownership = after;
}
// FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
// Called in broker threads when a consumer is added
void QueueContext::consume(size_t n) {
- {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
- }
- if (n == 1) mcast.mcast(
- framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
+ sys::Mutex::ScopedLock l(lock);
+ if (consumers == 0 && n > 0 && ownership == UNSUBSCRIBED)
+ mcast.mcast(
+ framing::ClusterQueueSubscribeBody(pv, queue.getName()));
+ consumers = n;
}
// Called in broker threads when a consumer is cancelled
void QueueContext::cancel(size_t n) {
- bool stop = false;
- {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
- stop = (n == 0 && consuming);
- }
- if (stop) queue.stopConsumers();
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ if (n == 0 && consuming) queue.stopConsumers();
}
+// FIXME aconway 2011-11-03: review scope of locking around sendConsumed
+
// Called in Ticker thread.
void QueueContext::tick() {
- bool stop = false;
- {
- sys::Mutex::ScopedLock l(lock);
- stop = (consuming && ++ticks >= maxTicks);
- }
- // When all threads have stopped, queue will call stopped()
- if (stop) queue.stopConsumers();
+ sys::Mutex::ScopedLock l(lock);
+ if (!consuming) return; // Nothing to do if we don't have the lock.
+ if (ownership == SHARED_OWNER && ++ticks >= maxTicks) queue.stopConsumers();
+ else if (ownership == SOLE_OWNER) sendConsumed(l); // Status report on consumption
}
// Callback set up by queue.stopConsumers() called in connection or timer thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
- bool resubscribe = false;
- {
- sys::Mutex::ScopedLock l(lock);
- assert(consuming);
- consuming = false;
- resubscribe = consumers;
- }
- if (resubscribe)
- mcast.mcast(framing::ClusterQueueResubscribeBody(
- framing::ProtocolVersion(), queue.getName()));
- else
- mcast.mcast(framing::ClusterQueueUnsubscribeBody(
- framing::ProtocolVersion(), queue.getName()));
+ sys::Mutex::ScopedLock l(lock);
+ if (!consuming) return; // !consuming => initial stopConsumers in ctor.
+ sendConsumed(l);
+ mcast.mcast(
+ framing::ClusterQueueUnsubscribeBody(pv, queue.getName(), consumers));
+ consuming = false;
+}
+
+void QueueContext::sendConsumed(const sys::Mutex::ScopedLock&) {
+ if (acquired.empty() && dequeued.empty()) return; // Nothing to send
+ mcast.mcast(
+ framing::ClusterQueueConsumedBody(pv, queue.getName(), acquired,dequeued));
+ acquired.clear();
+ dequeued.clear();
}
void QueueContext::requeue(uint32_t position, bool redelivered) {
@@ -135,17 +140,76 @@ void QueueContext::requeue(uint32_t position, bool redelivered) {
}
}
-void QueueContext::acquire(const broker::QueuedMessage& qm) {
- unacked.put(qm.position, qm);
+void QueueContext::localAcquire(uint32_t position) {
+ QPID_LOG(trace, "cluster queue " << queue.getName() << " acquired " << position);
+ sys::Mutex::ScopedLock l(lock);
+ assert(consuming);
+ acquired.add(position);
+}
+
+void QueueContext::localDequeue(uint32_t position) {
+ QPID_LOG(trace, "cluster queue " << queue.getName() << " dequeued " << position);
+ // FIXME aconway 2010-10-28: for local dequeues, we should
+ // complete the ack that initiated the dequeue at this point.
+ sys::Mutex::ScopedLock l(lock);
+
+ // FIXME aconway 2011-11-03: this assertion fails for explicit accept
+ // because it doesn't respect the consume lock.
+ // assert(consuming);
+
+ dequeued.add(position);
+}
+
+void QueueContext::consumed(
+ const MemberId& sender,
+ const SequenceSet& acquired,
+ const SequenceSet& dequeued)
+{
+ // No lock, doesn't touch any members.
+
+ // FIXME aconway 2011-09-15: systematic logging across cluster module.
+ // FIXME aconway 2011-09-23: pretty printing for identifier.
+ QPID_LOG(trace, "cluster: " << sender << " acquired: " << acquired
+ << " dequeued: " << dequeued << " on queue: " << queue.getName());
+
+ // Note acquires from other members. My own acquires were executed in
+ // the connection thread
+ if (sender != group.getSelf()) {
+ // FIXME aconway 2011-09-23: avoid individual finds, scan queue once.
+ for (SequenceSet::iterator i = acquired.begin(); i != acquired.end(); ++i)
+ acquire(*i);
+ }
+ // Process deques from the queue owner.
+ // FIXME aconway 2011-09-23: avoid individual finds, scan queue once.
+ for (SequenceSet::iterator i = dequeued.begin(); i != dequeued.end(); ++i)
+ dequeue(*i);
+}
+
+// Remote acquire
+void QueueContext::acquire(uint32_t position) {
+ // No lock, doesn't touch any members.
+ broker::QueuedMessage qm;
+ BrokerContext::ScopedSuppressReplication ssr;
+ if (!queue.acquireMessageAt(position, qm))
+ // FIXME aconway 2011-10-31: error handling
+ throw Exception(QPID_MSG("cluster: acquire: message not found: "
+ << queue.getName() << "[" << position << "]"));
+ assert(qm.position.getValue() == position);
+ assert(qm.payload);
+ unacked.put(qm.position, qm); // unacked has its own lock.
}
-broker::QueuedMessage QueueContext::dequeue(uint32_t position) {
- return unacked.pop(position);
+void QueueContext::dequeue(uint32_t position) {
+ // No lock, doesn't touch any members. unacked has its own lock.
+ broker::QueuedMessage qm = unacked.pop(position);
+ BrokerContext::ScopedSuppressReplication ssr;
+ if (qm.queue) queue.dequeue(0, qm);
}
-boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
- return boost::intrusive_ptr<QueueContext>(
- static_cast<QueueContext*>(q.getClusterContext().get()));
+QueueContext* QueueContext::get(broker::Queue& q) {
+ return static_cast<QueueContext*>(q.getClusterContext());
}
+// FIXME aconway 2011-09-23: make unacked a plain map, use lock.
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 20c2aabc1d..d7079ab8a5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -25,11 +25,12 @@
#include "LockedMap.h"
#include "Ticker.h"
#include "qpid/RefCounted.h"
+#include "qpid/broker/Context.h"
+#include "qpid/cluster/types.h"
+#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/AtomicValue.h"
-#include "qpid/sys/Time.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/cluster/types.h"
-#include <boost/intrusive_ptr.hpp>
+#include "qpid/sys/Time.h"
namespace qpid {
namespace broker {
@@ -43,12 +44,13 @@ class Multicaster;
class Group;
/**
- * Queue state that is not replicated to the cluster.
- * Manages the local queue start/stop status.
+ * Local Queue state, manage start/stop consuming on the queue.
+ * Destroyed when the queue is destroyed, it must erase itself
+ * from any cluster data structures in its destructor.
*
-* THREAD SAFE: Called by connection threads and Ticker dispatch threads.
+ * THREAD SAFE: Called by connection threads and Ticker dispatch threads.
*/
-class QueueContext : public Ticker::Tickable {
+class QueueContext : public broker::Context, Ticker::Tickable {
public:
QueueContext(broker::Queue&, Group&, size_t consumeTicks);
~QueueContext();
@@ -80,33 +82,50 @@ class QueueContext : public Ticker::Tickable {
/** Called by MessageHandler to requeue a message. */
void requeue(uint32_t position, bool redelivered);
+ /** Called by BrokerContext when a mesages is acquired locally. */
+ void localAcquire(uint32_t position);
+
/** Called by MessageHandler when a mesages is acquired. */
- void acquire(const broker::QueuedMessage& qm);
+ void acquire(uint32_t position);
/** Called by MesageHandler when a message is dequeued. */
- broker::QueuedMessage dequeue(uint32_t position);
+ void dequeue(uint32_t position);
- size_t getHash() const { return hash; }
+ /** Called by BrokerContext when a message is dequeued locally. */
+ void localDequeue(uint32_t position);
+
+ /** Called in deliver thread, take note of another brokers acquires/dequeues. */
+ void consumed(const MemberId&,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued);
+ size_t getHash() const { return hash; }
+ broker::Queue& getQueue() { return queue; }
+
/** Get the cluster context for a broker queue. */
- static boost::intrusive_ptr<QueueContext> get(broker::Queue&) ;
+ static QueueContext* get(broker::Queue&);
private:
+ void sendConsumed(const sys::Mutex::ScopedLock&);
+
sys::Mutex lock;
- size_t consumers; // Number of local consumers
- bool consuming; // True if we have the lock & local consumers are active
+ QueueOwnership ownership; // Ownership status.
+ size_t consumers; // Number of local consumers.
+ bool consuming; // True if we have the lock.
size_t ticks; // Ticks since we got the lock.
// Following members are immutable
- broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
+ broker::Queue& queue;
Multicaster& mcast;
size_t hash;
size_t maxTicks; // Max ticks we are allowed.
+ framing::SequenceSet acquired, dequeued; // Track local acquires/dequeues.
// Following members are safe to use without holding a lock
typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap;
UnackedMap unacked;
+ Group& group;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 0c96e9326d..f763841c20 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -45,25 +45,36 @@ bool QueueHandler::handle(const framing::AMQFrame& frame) {
void QueueHandler::subscribe(const std::string& queue) {
find(queue)->subscribe(sender());
}
-void QueueHandler::unsubscribe(const std::string& queue) {
- find(queue)->unsubscribe(sender());
+
+void QueueHandler::unsubscribe(const std::string& queue,
+ bool resubscribe) {
+ find(queue)->unsubscribe(sender(), resubscribe);
}
-void QueueHandler::resubscribe(const std::string& queue) {
- find(queue)->resubscribe(sender());
+
+void QueueHandler::consumed(const std::string& queue,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued)
+{
+ find(queue)->consumed(sender(), acquired, dequeued);
}
void QueueHandler::left(const MemberId& member) {
// Unsubscribe for members that leave.
for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i)
- i->second->unsubscribe(member);
+ i->second->unsubscribe(member, false);
}
-void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
+void QueueHandler::add(broker::Queue& q) {
// Local queues already have a context, remote queues need one.
- if (!QueueContext::get(*q))
- new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue
- queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
- new QueueReplica(q, self()));
+ if (!QueueContext::get(q))
+ new QueueContext(q, group, consumeTicks); // Context attaches to the Queue
+ assert(QueueContext::get(q));
+ queues[q.getName()] = boost::intrusive_ptr<QueueReplica>(
+ new QueueReplica(*QueueContext::get(q), self()));
+}
+
+void QueueHandler::remove(broker::Queue& q) {
+ queues.erase(q.getName());
}
boost::intrusive_ptr<QueueReplica> QueueHandler::find(const std::string& queue) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
index 84e8b75cfb..0f9937641b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
@@ -60,15 +60,17 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
// Events
void subscribe(const std::string& queue);
- void unsubscribe(const std::string& queue);
- void resubscribe(const std::string& queue);
- void left(const MemberId&);
- void add(boost::shared_ptr<broker::Queue>);
+ void unsubscribe(const std::string& queue, bool resubscribe);
+
+ void consumed(const std::string& queue,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued);
+
+ void left(const MemberId&);
- // NB: These functions ar called in broker threads, not deliver threads.
- void acquired(const broker::QueuedMessage& qm);
- void empty(const broker::Queue& q);
+ void add(broker::Queue&);
+ void remove(broker::Queue&);
private:
typedef std::map<std::string, boost::intrusive_ptr<QueueReplica> > QueueMap;
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 11a7496582..66a7a81f33 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -28,9 +28,8 @@
namespace qpid {
namespace cluster {
-QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q,
- const MemberId& self_)
- : queue(q), self(self_), context(QueueContext::get(*q))
+QueueReplica::QueueReplica(QueueContext& qc, const MemberId& self_)
+ : self(self_), context(qc)
{}
struct PrintSubscribers {
@@ -56,27 +55,28 @@ void QueueReplica::subscribe(const MemberId& member) {
update(before);
}
-// FIXME aconway 2011-09-20: need to requeue.
-void QueueReplica::unsubscribe(const MemberId& member) {
+void QueueReplica::unsubscribe(const MemberId& member, bool resubscribe)
+{
+ assert(!resubscribe || member == subscribers.front());
QueueOwnership before = getState();
MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
- if (i != subscribers.end()) subscribers.erase(i, subscribers.end());
+ subscribers.erase(i, subscribers.end());
+ if (resubscribe) subscribers.push_back(member);
update(before);
}
-void QueueReplica::resubscribe(const MemberId& member) {
- assert (member == subscribers.front());
- QueueOwnership before = getState();
- subscribers.pop_front();
- subscribers.push_back(member);
- update(before);
+void QueueReplica::consumed(const MemberId& member,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued)
+{
+ context.consumed(member, acquired, dequeued);
}
void QueueReplica::update(QueueOwnership before) {
QueueOwnership after = getState();
- QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": "
+ QPID_LOG(trace, "cluster: queue replica: " << context.getQueue().getName() << ": "
<< before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
- context->replicaState(before, after);
+ context.replicaState(before, after);
}
QueueOwnership QueueReplica::getState() const {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index 31faf4853a..ca92de1e30 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -50,20 +50,22 @@ struct PrintSubscribers;
class QueueReplica : public RefCounted
{
public:
- QueueReplica(boost::shared_ptr<broker::Queue> , const MemberId& );
+ QueueReplica(QueueContext&, const MemberId& );
void subscribe(const MemberId&);
- void unsubscribe(const MemberId&);
- void resubscribe(const MemberId&);
+ void unsubscribe(const MemberId&, bool resubscribe);
+ void consumed(const MemberId&,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued);
MemberId getSelf() const { return self; }
private:
typedef std::deque<MemberId> MemberQueue;
- boost::shared_ptr<broker::Queue> queue;
+ std::string name;
MemberQueue subscribers;
MemberId self;
- boost::intrusive_ptr<QueueContext> context;
+ QueueContext& context;
QueueOwnership getState() const;
bool isOwner() const;
diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
index 9ff04f2f54..1210eb7055 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
@@ -34,21 +34,21 @@ Ticker::Ticker(sys::Duration tick, sys::Timer& timer_,
timer.add(this);
}
-void Ticker::add(boost::intrusive_ptr<Tickable> t) {
+void Ticker::add(Tickable* t) {
sys::Mutex::ScopedLock l(lock);
tickables.push_back(t);
}
-void Ticker::remove(boost::intrusive_ptr<Tickable> t) {
+void Ticker::remove(Tickable* t) {
sys::Mutex::ScopedLock l(lock);
Tickables::iterator i = std::find(tickables.begin(), tickables.end(), t);
if (i != tickables.end()) tickables.erase(i);
}
-// Called by timer thread, sets condition
+// Called by timer thread
void Ticker::fire() {
condition.set();
- setupNextFire();
+ setupNextFire(); // FIXME aconway 2011-11-03: restart()?
timer.add(this);
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.h b/qpid/cpp/src/qpid/cluster/exp/Ticker.h
index 0a8d508a70..6910b7a0be 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Ticker.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.h
@@ -53,18 +53,18 @@ namespace cluster {
class Ticker : public sys::TimerTask
{
public:
- struct Tickable : public RefCounted {
+ struct Tickable {
virtual ~Tickable();
virtual void tick() = 0;
};
Ticker(sys::Duration tick, sys::Timer&, boost::shared_ptr<sys::Poller>);
- void add(boost::intrusive_ptr<Tickable>);
- void remove(boost::intrusive_ptr<Tickable>);
+ void add(Tickable*);
+ void remove(Tickable*);
private:
- typedef std::vector<boost::intrusive_ptr<Tickable> > Tickables;
+ typedef std::vector<Tickable*> Tickables;
void fire(); // Called in timer thread.
void dispatch(sys::PollableCondition&); // Called in IO thread
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index da110fed8f..c2f52f2a27 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -72,7 +72,7 @@ void WiringHandler::createQueue(const std::string& data) {
}
boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name);
assert(q); // FIXME aconway 2011-05-10: error handling.
- queueHandler->add(q);
+ queueHandler->add(*q);
QPID_LOG(debug, "cluster: create queue " << q->getName());
}
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index d5c9ffb61d..c5f6157f34 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -50,12 +50,6 @@ class Cluster2Tests(BrokerTest):
if time.time() > timeout: fail("Time out in wait_for_queue(%s))"%queue)
time.sleep(0.01)
- # FIXME aconway 2011-05-17: remove, use assert_browse.
- def verify_content(self, expect, receiver):
- actual = [receiver.fetch(1).content for x in expect]
- self.assertEqual(expect, actual)
- self.assertRaises(Empty, receiver.fetch, 0)
-
def test_message_enqueue(self):
"""Test basic replication of enqueued messages.
Verify that fanout messages are replicated correctly.
@@ -64,13 +58,12 @@ class Cluster2Tests(BrokerTest):
cluster = self.cluster(2, cluster2=True)
sn0 = cluster[0].connect().session()
- r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
- r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
s0 = sn0.sender("amq.fanout");
-
sn1 = cluster[1].connect().session()
- r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
- r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+
+ # Bind queues to amq.fanout
+ sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+ sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
# Send messages on member 0
@@ -78,10 +71,10 @@ class Cluster2Tests(BrokerTest):
for m in content: s0.send(Message(m))
# Browse on both members.
- self.verify_content(content, r0p)
- self.verify_content(content, r0q)
- self.verify_content(content, r1p)
- self.verify_content(content, r1q)
+ self.assert_browse(sn0, "p", content)
+ self.assert_browse(sn0, "q", content)
+ self.assert_browse(sn1, "p", content)
+ self.assert_browse(sn1, "q", content)
sn1.connection.close()
sn0.connection.close()
@@ -98,16 +91,16 @@ class Cluster2Tests(BrokerTest):
content = ["a","b","c"]
for m in content: s0.send(Message(m))
# Verify enqueued on members 0 and 1
- self.verify_content(content, sn0.receiver("q;{mode:browse}"))
- self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+ self.assert_browse(sn0, "q", content)
+ self.assert_browse(sn1, "q", content)
# Dequeue on cluster[0]
self.assertEqual(r0.fetch(1).content, "a")
sn0.acknowledge(sync=True)
# Verify dequeued on cluster[0] and cluster[1]
- self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}"))
- self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}"))
+ self.assert_browse(sn0, "q", ["b", "c"])
+ self.assert_browse(sn1, "q", ["b", "c"])
def test_wiring(self):
"""Test replication of wiring"""
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 26283adb6c..9a0ee6b384 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -184,8 +184,7 @@ def recreate_queues(queues, brokers):
# FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
for b in brokers:
while queue_exists(q,b): time.sleep(0.1);
- for q in queues:
- s.sender("%s;{create:always}"%q)
+ s.sender("%s;{create:always}"%q)
# FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
for b in brokers:
while not queue_exists(q,b): time.sleep(0.1);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 0d325c4d12..18d4f9bacd 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -351,7 +351,6 @@
<field name="position" type="uint32"/>
<field name="redelivered" type="bit"/>
</control>
-
</class>
<class name="cluster-wiring" code="0x83">
@@ -387,6 +386,9 @@
</class>
+ <!-- FIXME aconway 2011-10-31: terminology. Use lock/acquire/release terminology
+ rather than subscription-->
+
<!-- Manage subscriptions to a queue.
Each queue has a "subscriber queue" of members waiting take
@@ -394,18 +396,23 @@
is the only one allowed to take messages. -->
<class name="cluster-queue" code="0x84">
+
<!-- Join at the back of the subscriber queue -->
<control name="subscribe" code="0x1">
<field name="queue" type="queue.name"/>
</control>
- <!-- Leave the subscriber queue -->
+
+ <!-- Unsubscribe from queue to release the lock. -->
<control name="unsubscribe" code="0x2">
<field name="queue" type="queue.name"/>
+ <!-- Set this bit to automatically re-subscribe -->
+ <field name="resubscribe" type="bit"/>
</control>
- <!-- Move the member at the front to the back. -->
- <control name="resubscribe" code="0x3">
+
+ <control name="consumed" code="0x3">
<field name="queue" type="queue.name"/>
+ <field name="acquired" type="sequence-set"/>
+ <field name="dequeued" type="sequence-set"/>
</control>
</class>
-
</amqp>