summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-07-28 21:38:06 +0000
committerAlan Conway <aconway@apache.org>2011-07-28 21:38:06 +0000
commitc42d9df9b8af5dc7d5decdcb5818a100ee8df0a3 (patch)
treef8c01686d457d1eef2b3fd0d6a5bc67887728ec5
parent05220525f1591fe8052f17ec67f810751bd5fc71 (diff)
downloadqpid-python-c42d9df9b8af5dc7d5decdcb5818a100ee8df0a3.tar.gz
QPID-2920: Broken checkpoint: passing dequeue mutex test with issues
- handler/context/replica convention (see overview.h doc notes) - rename BrokerHandler to BrokerContext - notify Cluster (BrokerContext) on queue stopped or empty (need empty?) - Implementing Stoppable & stoppable scopes in Queue.cpp - Move queue ownership logic from dequeue to acquire Releasing on message count will not work, switch to timer based release. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1152008 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/cluster.mk10
-rw-r--r--qpid/cpp/src/qpid/broker/Cluster.h4
-rw-r--r--qpid/cpp/src/qpid/broker/NullCluster.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp80
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h30
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (renamed from qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp)87
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h (renamed from qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h)25
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp20
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.h9
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/HandlerBase.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/LockedMap.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp44
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp105
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h93
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp79
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.h82
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp115
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h85
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp48
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/overview.h13
-rw-r--r--qpid/cpp/src/qpid/sys/Stoppable.h49
-rw-r--r--qpid/cpp/src/tests/BrokerClusterCalls.cpp16
-rwxr-xr-xqpid/cpp/src/tests/cluster2_tests.py67
-rwxr-xr-xqpid/cpp/src/tests/qpid-test-cluster3
-rw-r--r--qpid/cpp/xml/cluster.xml53
30 files changed, 1004 insertions, 147 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 8ede09fa79..8d22850360 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -452,6 +452,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/AtomicValue_gcc.h \
qpid/sys/AtomicValue_mutex.h \
qpid/sys/BlockingQueue.h \
+ qpid/sys/BusyThreads.h \
qpid/sys/ClusterSafe.h \
qpid/sys/ClusterSafe.cpp \
qpid/sys/Codec.h \
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 69b0228126..1809c87ca8 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -110,8 +110,8 @@ cluster2_la_SOURCES = \
qpid/cluster/Cpg.h \
qpid/cluster/PollerDispatch.cpp \
qpid/cluster/PollerDispatch.h \
- qpid/cluster/exp/BrokerHandler.cpp \
- qpid/cluster/exp/BrokerHandler.h \
+ qpid/cluster/exp/BrokerContext.cpp \
+ qpid/cluster/exp/BrokerContext.h \
qpid/cluster/exp/BufferFactory.h \
qpid/cluster/exp/Cluster2Plugin.cpp \
qpid/cluster/exp/Core.cpp \
@@ -124,6 +124,12 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/MessageHandler.h \
qpid/cluster/exp/Multicaster.cpp \
qpid/cluster/exp/Multicaster.h \
+ qpid/cluster/exp/QueueContext.cpp \
+ qpid/cluster/exp/QueueContext.h \
+ qpid/cluster/exp/QueueHandler.cpp \
+ qpid/cluster/exp/QueueHandler.h \
+ qpid/cluster/exp/QueueReplica.cpp \
+ qpid/cluster/exp/QueueReplica.h \
qpid/cluster/exp/WiringHandler.cpp \
qpid/cluster/exp/WiringHandler.h
diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h
index c927d35ba3..193332692b 100644
--- a/qpid/cpp/src/qpid/broker/Cluster.h
+++ b/qpid/cpp/src/qpid/broker/Cluster.h
@@ -80,6 +80,10 @@ class Cluster
virtual void consume(Queue&, size_t consumerCount) = 0;
/** A consumer cancels its subscription to a queue */
virtual void cancel(Queue&, size_t consumerCount) = 0;
+ /** A queue becomes empty */
+ virtual void empty(Queue&) = 0;
+ /** A queue has been stopped */
+ virtual void stopped(Queue&) = 0;
// Wiring
diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h
index efda8bb1ab..399e2a3ca6 100644
--- a/qpid/cpp/src/qpid/broker/NullCluster.h
+++ b/qpid/cpp/src/qpid/broker/NullCluster.h
@@ -49,6 +49,11 @@ class NullCluster : public Cluster
virtual void consume(Queue&, size_t) {}
virtual void cancel(Queue&, size_t) {}
+ // Queues
+
+ virtual void stopped(Queue&) {}
+ virtual void empty(Queue&) {}
+
// Wiring
virtual void create(Queue&) {}
@@ -59,6 +64,7 @@ class NullCluster : public Cluster
const std::string&, const framing::FieldTable&) {}
virtual void unbind(Queue&, Exchange&,
const std::string&, const framing::FieldTable&) {}
+
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index f593d7e443..84f025824c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -112,7 +112,8 @@ Queue::Queue(const string& _name, bool _autodelete,
broker(b),
deleted(false),
barrier(*this),
- autoDeleteTimeout(0)
+ autoDeleteTimeout(0),
+ dispatching(boost::bind(&Queue::acquireStopped,this))
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -231,29 +232,40 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-// Inform the cluster of an acquired message on exit from a function
-// that does the acquiring. ClusterAcquireOnExit is declared *before*
-// any locks are taken. The calling function sets qmsg to the acquired
-// message with a lock held, but the call to Cluster::acquire() will
-// be outside the lock.
-struct ClusterAcquireOnExit {
+/** Mark a scope that acquires a message.
+ *
+ * ClusterAcquireScope is declared before are taken. The calling
+ * function sets qmsg with the lock held, but the call to
+ * Cluster::acquire() will happen after the lock is released.
+ *
+ * Also marks a Stoppable as busy for the duration of the scope.
+ **/
+struct ClusterAcquireScope {
Broker* broker;
+ Queue& queue;
QueuedMessage qmsg;
- ClusterAcquireOnExit(Broker* b) : broker(b) {}
- ~ClusterAcquireOnExit() {
- if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+
+ ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {}
+
+ ~ClusterAcquireScope() {
+ if (broker) {
+ // FIXME aconway 2011-06-27: Move to QueueContext.
+ // Avoid the indirection via queuename.
+ if (qmsg.queue) broker->getCluster().acquire(qmsg);
+ else broker->getCluster().empty(queue);
+ }
}
};
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
- ClusterAcquireOnExit willAcquire(broker); // Outside lock
+ ClusterAcquireScope acquireScope(*this); // Outside lock
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
if (messages->remove(position, message)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
- willAcquire.qmsg = message;
+ acquireScope.qmsg = message;
return true;
} else {
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
@@ -300,9 +312,15 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
- ClusterAcquireOnExit willAcquire(broker); // Outside the lock
+ Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming
+ if (!stopper) {
+ QPID_LOG(trace, "Queue is stopped: " << name);
+ listeners.addListener(c);
+ return NO_MESSAGES;
+ }
+ ClusterAcquireScope acquireScope(*this); // Outside the lock
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
+ if (messages->empty()) { // FIXME aconway 2011-06-07: ugly
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -317,7 +335,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- willAcquire.qmsg = msg;
+ acquireScope.qmsg = msg;
pop();
return CONSUMED;
} else {
@@ -374,18 +392,11 @@ void Queue::removeListener(Consumer::shared_ptr c)
bool Queue::dispatch(Consumer::shared_ptr c)
{
- Stoppable::Scope doDispatch(dispatching);
- if (doDispatch) {
- QueuedMessage msg(this);
- if (getNextMessage(msg, c)) {
- c->deliver(msg);
- return true;
- } else {
- return false;
- }
- } else { // Dispatching is stopped
- Mutex::ScopedLock locker(messageLock);
- listeners.addListener(c); // FIXME aconway 2011-05-05:
+ QueuedMessage msg(this);
+ if (getNextMessage(msg, c)) {
+ c->deliver(msg);
+ return true;
+ } else {
return false;
}
}
@@ -450,10 +461,10 @@ void Queue::cancel(Consumer::shared_ptr c){
}
QueuedMessage Queue::get(){
- ClusterAcquireOnExit willAcquire(broker); // Outside lock
+ ClusterAcquireScope acquireScope(*this); // Outside lock
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if (messages->pop(msg)) willAcquire.qmsg = msg;
+ if (messages->pop(msg)) acquireScope.qmsg = msg;
return msg;
}
@@ -704,7 +715,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
if (!isEnqueued(msg)) return false;
if (!ctxt) dequeued(msg);
}
+
if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock
+
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -902,6 +915,10 @@ void Queue::notifyDeleted()
set.notifyAll();
}
+void Queue::acquireStopped() {
+ if (broker) broker->getCluster().stopped(*this);
+}
+
void Queue::bound(const string& exchange, const string& key,
const FieldTable& args)
{
@@ -1234,7 +1251,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
}
-const Broker* Queue::getBroker()
+Broker* Queue::getBroker()
{
return broker;
}
@@ -1268,10 +1285,13 @@ void Queue::UsageBarrier::destroy()
// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
void Queue::stop() {
+ // FIXME aconway 2011-05-25: rename dispatching - acquiring?
dispatching.stop();
}
void Queue::start() {
+ QPID_LOG(critical, "FIXME start context=" << clusterContext);
+ assert(clusterContext); // FIXME aconway 2011-06-08: XXX
dispatching.start();
notifyListener();
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 1588ae1171..0ba7b362e9 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/log/Statement.h" // FIXME XXX aconway 2011-06-08: remove
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/OwnershipToken.h"
@@ -130,8 +131,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
- // Allow dispatching consumer threads to be stopped.
- sys::Stoppable dispatching;
+ // Allow dispatching consumer threads to be stopped. Used by cluster
+ sys::Stoppable dispatching; // FIXME aconway 2011-06-07: name: acquiring?
+ boost::intrusive_ptr<RefCounted> clusterContext;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -179,6 +181,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void checkNotDeleted();
void notifyDeleted();
+ void acquireStopped();
public:
@@ -379,20 +382,25 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void flush();
- const Broker* getBroker();
+ Broker* getBroker();
- /** Stop consumers. Return when all consumer threads are stopped.
- *@pre Queue is active and not already stopping.
- */
+ /** Stop consumers. Return when all consumer threads are stopped. */
void stop();
- /** Start consumers.
- *@pre Queue is stopped and idle: no thread in dispatch.
- */
+ /** Start consumers. */
void start();
- /** Context data attached and used by cluster code. */
- boost::intrusive_ptr<qpid::RefCounted> clusterContext;
+ /** Context information used in a cluster. */
+ boost::intrusive_ptr<RefCounted> getClusterContext() {
+ // FIXME aconway 2011-06-08: XXX
+ QPID_LOG(critical, "FIXME q get context " << name << clusterContext);
+ return clusterContext;
+ }
+ void setClusterContext(boost::intrusive_ptr<RefCounted> context) {
+ // FIXME aconway 2011-06-08: XXX
+ clusterContext = context;
+ QPID_LOG(critical, "FIXME q set context " << name << clusterContext);
+ }
};
}} // qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 269e0b2ba3..465a5de021 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -20,17 +20,22 @@
*/
#include "Core.h"
-#include "BrokerHandler.h"
+#include "BrokerContext.h"
+#include "QueueContext.h"
+#include "QueueHandler.h"
#include "qpid/framing/ClusterMessageRoutingBody.h"
#include "qpid/framing/ClusterMessageRoutedBody.h"
#include "qpid/framing/ClusterMessageEnqueueBody.h"
+#include "qpid/framing/ClusterMessageAcquireBody.h"
#include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterMessageReleaseBody.h"
#include "qpid/framing/ClusterWiringCreateQueueBody.h"
#include "qpid/framing/ClusterWiringCreateExchangeBody.h"
#include "qpid/framing/ClusterWiringDestroyQueueBody.h"
#include "qpid/framing/ClusterWiringDestroyExchangeBody.h"
#include "qpid/framing/ClusterWiringBindBody.h"
#include "qpid/framing/ClusterWiringUnbindBody.h"
+#include "qpid/framing/ClusterQueueSubscribeBody.h"
#include "qpid/sys/Thread.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/Queue.h"
@@ -54,27 +59,28 @@ QPID_TSS bool tssNoReplicate = false;
QPID_TSS RoutingId tssRoutingId = 0;
}
-BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
+BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() {
assert(!tssNoReplicate);
tssNoReplicate = true;
}
-BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
+BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() {
assert(tssNoReplicate);
tssNoReplicate = false;
}
-BrokerHandler::BrokerHandler(Core& c) : core(c) {}
+BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q)
+ : core(c), queueHandler(q) {}
-RoutingId BrokerHandler::nextRoutingId() {
+RoutingId BrokerContext::nextRoutingId() {
RoutingId id = ++routingId;
if (id == 0) id = ++routingId; // Avoid 0 on wrap-around.
return id;
}
-void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
+void BrokerContext::routing(const boost::intrusive_ptr<Message>&) { }
-bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
+bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
{
if (tssNoReplicate) return true;
if (!tssRoutingId) { // This is the first enqueue, so send the message
@@ -93,7 +99,7 @@ bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& m
return false;
}
-void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
+void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
if (tssRoutingId) { // we enqueued at least one message.
core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId));
// Note: routingMap is cleaned up on CPG delivery in MessageHandler.
@@ -101,28 +107,45 @@ void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
}
}
-void BrokerHandler::dequeue(const broker::QueuedMessage& qm) {
+void BrokerContext::acquire(const broker::QueuedMessage& qm) {
if (tssNoReplicate) return;
- // FIXME aconway 2010-10-28: we also need to delay completion of the
- // ack that caused this dequeue until self-delivery of the mcast below.
- core.mcast(ClusterMessageDequeueBody(
+ QueueContext::get(*qm.queue)->acquire();
+ core.mcast(ClusterMessageAcquireBody(
ProtocolVersion(), qm.queue->getName(), qm.position));
}
-void BrokerHandler::create(broker::Queue& q) {
+// FIXME aconway 2011-05-24: need to handle acquire and release.
+// Dequeue in the wrong place?
+void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
if (tssNoReplicate) return;
+ core.mcast(ClusterMessageDequeueBody(
+ ProtocolVersion(), qm.queue->getName(), qm.position));
+}
+
+void BrokerContext::release(const broker::QueuedMessage& ) {
+ // FIXME aconway 2011-05-24: TODO
+}
+
+// FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
+void BrokerContext::create(broker::Queue& q) {
+ if (tssNoReplicate) return; // FIXME aconway 2011-06-08: revisit
+ // FIXME aconway 2011-06-08: error handling- if already set...
+ // Create local context immediately, queue will be stopped until replicated.
+ boost::intrusive_ptr<QueueContext> context(
+ new QueueContext(q,core.getMulticaster()));
std::string data(q.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
q.encode(buf);
core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data));
+ QPID_LOG(critical, "FIXME BrokerContext create " << q.getName() << q.getClusterContext().get());
}
-void BrokerHandler::destroy(broker::Queue& q) {
+void BrokerContext::destroy(broker::Queue& q) {
if (tssNoReplicate) return;
core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName()));
}
-void BrokerHandler::create(broker::Exchange& ex) {
+void BrokerContext::create(broker::Exchange& ex) {
if (tssNoReplicate) return;
std::string data(ex.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
@@ -130,12 +153,12 @@ void BrokerHandler::create(broker::Exchange& ex) {
core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data));
}
-void BrokerHandler::destroy(broker::Exchange& ex) {
+void BrokerContext::destroy(broker::Exchange& ex) {
if (tssNoReplicate) return;
core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName()));
}
-void BrokerHandler::bind(broker::Queue& q, broker::Exchange& ex,
+void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex,
const std::string& key, const framing::FieldTable& args)
{
if (tssNoReplicate) return;
@@ -143,7 +166,7 @@ void BrokerHandler::bind(broker::Queue& q, broker::Exchange& ex,
ProtocolVersion(), q.getName(), ex.getName(), key, args));
}
-void BrokerHandler::unbind(broker::Queue& q, broker::Exchange& ex,
+void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
const std::string& key, const framing::FieldTable& args)
{
if (tssNoReplicate) return;
@@ -151,4 +174,32 @@ void BrokerHandler::unbind(broker::Queue& q, broker::Exchange& ex,
ProtocolVersion(), q.getName(), ex.getName(), key, args));
}
+// n is the number of consumers including the one just added.
+// FIXME aconway 2011-06-27: rename, conflicting terms.
+void BrokerContext::consume(broker::Queue& q, size_t n) {
+ if (n == 1) {
+ // FIXME aconway 2011-06-27: should be on QueueContext for symmetry?
+ core.mcast(ClusterQueueSubscribeBody(ProtocolVersion(), q.getName()));
+ }
+}
+
+// n is the number of consumers after the cancel.
+void BrokerContext::cancel(broker::Queue& q, size_t n) {
+ if (n == 0) QueueContext::get(q)->unsubscribed();
+}
+
+void BrokerContext::empty(broker::Queue& ) {
+ // FIXME aconway 2011-06-28: is this needed?
+}
+
+void BrokerContext::stopped(broker::Queue& q) {
+ boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
+ // Don't forward the stopped call if the queue does not yet have a cluster context
+ // this when the queue is first created locally.
+ if (qc){
+ QPID_LOG(critical, "FIXME BrokerContext::stopped " << q.getName());
+ qc->stopped();
+ }
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index 1cfcc75863..fc19d6487b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_BROKERHANDLER_H
-#define QPID_CLUSTER_BROKERHANDLER_H
+#ifndef QPID_CLUSTER_BROKERCONTEXT_H
+#define QPID_CLUSTER_BROKERCONTEXT_H
/*
*
@@ -28,13 +28,15 @@
namespace qpid {
namespace cluster {
class Core;
+class QueueHandler;
+class QueueContext;
// TODO aconway 2010-10-19: experimental cluster code.
/**
* Implements broker::Cluster interface, handles events in broker code.
*/
-class BrokerHandler : public broker::Cluster
+class BrokerContext : public broker::Cluster
{
public:
/** Suppress replication while in scope.
@@ -45,7 +47,7 @@ class BrokerHandler : public broker::Cluster
~ScopedSuppressReplication();
};
- BrokerHandler(Core&);
+ BrokerContext(Core&, boost::intrusive_ptr<QueueHandler>);
// FIXME aconway 2010-10-20: implement all points.
@@ -54,14 +56,18 @@ class BrokerHandler : public broker::Cluster
void routing(const boost::intrusive_ptr<broker::Message>&);
bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
void routed(const boost::intrusive_ptr<broker::Message>&);
- void acquire(const broker::QueuedMessage&) {}
- void release(const broker::QueuedMessage&) {}
+ void acquire(const broker::QueuedMessage&);
void dequeue(const broker::QueuedMessage&);
+ void release(const broker::QueuedMessage&);
// Consumers
- void consume(broker::Queue&, size_t) {}
- void cancel(broker::Queue&, size_t) {}
+ void consume(broker::Queue&, size_t);
+ void cancel(broker::Queue&, size_t);
+
+ // Queues
+ void empty(broker::Queue&);
+ void stopped(broker::Queue&);
// Wiring
@@ -79,8 +85,9 @@ class BrokerHandler : public broker::Cluster
uint32_t nextRoutingId();
Core& core;
+ boost::intrusive_ptr<QueueHandler> queueHandler;
sys::AtomicValue<uint32_t> routingId;
};
}} // namespace qpid::cluster
-#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/
+#endif /*!QPID_CLUSTER_BROKERCONTEXT_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index e1dba349a1..7bcc068120 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -21,9 +21,11 @@
#include "Core.h"
#include "EventHandler.h"
-#include "BrokerHandler.h"
+#include "BrokerContext.h"
#include "WiringHandler.h"
#include "MessageHandler.h"
+#include "QueueContext.h"
+#include "QueueHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/AMQFrame.h"
@@ -39,12 +41,17 @@ Core::Core(const Settings& s, broker::Broker& b) :
eventHandler(new EventHandler(*this)),
multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this))
{
- eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler)));
- eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler)));
+ boost::intrusive_ptr<QueueHandler> queueHandler(
+ new QueueHandler(*eventHandler, multicaster));
+ eventHandler->add(queueHandler);
+ eventHandler->add(boost::intrusive_ptr<HandlerBase>(
+ new WiringHandler(*eventHandler, queueHandler)));
+ eventHandler->add(boost::intrusive_ptr<HandlerBase>(
+ new MessageHandler(*eventHandler)));
- std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
+ std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler));
brokerHandler = bh.get();
- // BrokerHandler belongs to Broker
+ // BrokerContext belongs to Broker
broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
eventHandler->start();
eventHandler->getCpg().join(s.name);
@@ -62,8 +69,7 @@ void Core::fatal() {
void Core::mcast(const framing::AMQBody& body) {
QPID_LOG(trace, "cluster multicast: " << body);
- framing::AMQFrame f(body);
- multicaster.mcast(f);
+ multicaster.mcast(body);
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h
index 8b83a0004d..d0dc8e57a8 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.h
@@ -44,7 +44,7 @@ class Broker;
namespace cluster {
class EventHandler;
-class BrokerHandler;
+class BrokerContext;
/**
* Cluster core state machine.
@@ -77,16 +77,17 @@ class Core
broker::Broker& getBroker() { return broker; }
EventHandler& getEventHandler() { return *eventHandler; }
- BrokerHandler& getBrokerHandler() { return *brokerHandler; }
+ BrokerContext& getBrokerContext() { return *brokerHandler; }
+ Multicaster& getMulticaster() { return multicaster; }
/** Map of messages that are currently being routed.
- * Used to pass messages being routed from BrokerHandler to MessageHandler
+ * Used to pass messages being routed from BrokerContext to MessageHandler
*/
RoutingMap& getRoutingMap() { return routingMap; }
private:
broker::Broker& broker;
std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
- BrokerHandler* brokerHandler; // Handles broker events.
+ BrokerContext* brokerHandler; // Handles broker events.
RoutingMap routingMap;
Multicaster multicaster;
};
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
index 2138004380..beebe9fc16 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -41,7 +41,7 @@ EventHandler::EventHandler(Core& c) :
EventHandler::~EventHandler() {}
-void EventHandler::add(const boost::shared_ptr<HandlerBase>& handler) {
+void EventHandler::add(const boost::intrusive_ptr<HandlerBase>& handler) {
handlers.push_back(handler);
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
index b946c27084..93423778f1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
@@ -27,7 +27,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/PollerDispatch.h"
#include "qpid/cluster/types.h"
-#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <vector>
namespace qpid {
@@ -52,7 +52,7 @@ class EventHandler : public Cpg::Handler
~EventHandler();
/** Add a handler */
- void add(const boost::shared_ptr<HandlerBase>&);
+ void add(const boost::intrusive_ptr<HandlerBase>&);
/** Start polling */
void start();
@@ -87,7 +87,7 @@ class EventHandler : public Cpg::Handler
MemberId sender; // sender of current event.
MemberId self;
- typedef std::vector<boost::shared_ptr<HandlerBase> > Handlers;
+ typedef std::vector<boost::intrusive_ptr<HandlerBase> > Handlers;
Handlers handlers;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
index b153f56a01..f0c6650994 100644
--- a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
+++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/RefCounted.h"
#include "qpid/cluster/types.h"
namespace qpid {
@@ -35,7 +36,7 @@ class EventHandler;
/**
* Base class for handlers of events, children of the EventHandler.
*/
-class HandlerBase
+class HandlerBase : public RefCounted
{
public:
HandlerBase(EventHandler&);
diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
index 0736e7ac35..c0afe740f8 100644
--- a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
+++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
@@ -54,7 +54,7 @@ class LockedMap
*/
bool add(const Key& key, const Value& value) {
sys::RWlock::ScopedWlock w(lock);
- return map.insert(key, value).second;
+ return map.insert(std::make_pair(key, value)).second;
}
/** Erase the value associated with key if any. Return true if a value was erased. */
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index d4095e5bc1..86894b9dd9 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -21,7 +21,7 @@
#include "Core.h"
#include "MessageHandler.h"
-#include "BrokerHandler.h"
+#include "BrokerContext.h"
#include "EventHandler.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
@@ -73,7 +73,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
msg = memberMap[sender()].routingMap[routingId];
if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
<< " failed: unknown message"));
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
queue->deliver(msg);
}
@@ -84,22 +84,40 @@ void MessageHandler::routed(RoutingId routingId) {
memberMap[sender()].routingMap.erase(routingId);
}
-void MessageHandler::dequeue(const std::string& q, uint32_t position) {
+void MessageHandler::acquire(const std::string& q, uint32_t position) {
+ // Note acquires from other members. My own acquires were exeuted in
+ // the connection thread
+ if (sender() != self()) {
+ // FIXME aconway 2010-10-28: need to store acquired messages on QueueContext
+ // by broker for possible re-queuing if a broker leaves.
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+ QueuedMessage qm;
+ BrokerContext::ScopedSuppressReplication ssr;
+ bool ok = queue->acquireMessageAt(position, qm);
+ (void)ok; // Avoid unused variable warnings.
+ assert(ok);
+ assert(qm.position.getValue() == position);
+ assert(qm.payload);
+ }
+}
+
+void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) {
if (sender() == self()) {
// FIXME aconway 2010-10-28: we should complete the ack that initiated
- // the dequeue at this point, see BrokerHandler::dequeue
+ // the dequeue at this point, see BrokerContext::dequeue
return;
}
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
- BrokerHandler::ScopedSuppressReplication ssr;
- QueuedMessage qm;
- // FIXME aconway 2010-10-28: when we replicate acquires, the acquired
- // messages will be stored by MessageHandler::acquire.
- if (queue->acquireMessageAt(position, qm)) {
- assert(qm.position.getValue() == position);
- assert(qm.payload);
- queue->dequeue(0, qm);
- }
+ BrokerContext::ScopedSuppressReplication ssr;
+ // FIXME aconway 2011-05-12: Remove the acquired message from QueueContext.
+ // Do we need to call this? Review with gsim.
+ // QueuedMessage qm;
+ // Get qm from QueueContext?
+ // queue->dequeue(0, qm);
+}
+
+void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) {
+ // FIXME aconway 2011-05-24:
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
index f87f22a1ec..0a010a8ecf 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
@@ -39,7 +39,10 @@ class Queue;
namespace cluster {
class EventHandler;
-class BrokerHandler;
+class BrokerContext;
+
+// FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for queue.
+// Make this consistent.
/**
* Handler for message disposition events.
@@ -55,7 +58,10 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
void routing(uint32_t routingId, const std::string& message);
void enqueue(uint32_t routingId, const std::string& queue);
void routed(uint32_t routingId);
+ void acquire(const std::string& queue, uint32_t position);
void dequeue(const std::string& queue, uint32_t position);
+ void release(const std::string& queue, uint32_t position);
+
private:
struct Member {
typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap;
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
new file mode 100644
index 0000000000..6c97c906e8
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueContext.h"
+#include "Multicaster.h"
+#include "qpid/framing/ClusterQueueResubscribeBody.h"
+#include "qpid/framing/ClusterQueueUnsubscribeBody.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
+
+
+namespace qpid {
+namespace cluster {
+
+QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
+ : owner(NOT_OWNER), count(0), queue(q), mcast(m)
+{
+ QPID_LOG(debug, "Assign cluster context to queue " << q.getName());
+ q.stop(); // Initially stopped. Must all before setClusterContext
+ q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
+
+}
+
+// Called by QueueReplica in deliver thread.
+void QueueContext::sharedOwner(size_t limit) {
+ QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get());
+ sys::Mutex::ScopedLock l(lock);
+ count = limit;
+ if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
+ owner = SHARED_OWNER;
+}
+
+// Called by QueueReplica in deliver thread.
+void QueueContext::soleOwner() {
+ QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get());
+ sys::Mutex::ScopedLock l(lock);
+ count = 0;
+ if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex?
+ owner = SOLE_OWNER;
+}
+
+// Called by BrokerContext in connection thread(s) on acquiring a message
+void QueueContext::acquire() {
+ bool stop = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own.
+ QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName()
+ << " owner=" << owner << " count=" << count);
+ if (owner == SHARED_OWNER) {
+ // Note count could be 0 if there are concurrent calls to acquire.
+ if (count && --count == 0) {
+ stop = true;
+ }
+ }
+ }
+ // FIXME aconway 2011-06-28: could have multiple stop() threads...
+ if (stop) queue.stop();
+}
+
+// Callback set up by queue.stop()
+void QueueContext::stopped() {
+ sys::Mutex::ScopedLock l(lock);
+ if (owner == NOT_OWNER) {
+ mcast.mcast(framing::ClusterQueueUnsubscribeBody(
+ framing::ProtocolVersion(), queue.getName()));
+ } else {
+ owner = NOT_OWNER;
+ mcast.mcast(framing::ClusterQueueResubscribeBody(
+ framing::ProtocolVersion(), queue.getName()));
+ }
+}
+
+void QueueContext::unsubscribed() {
+ QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName());
+ queue.stop();
+ sys::Mutex::ScopedLock l(lock);
+ owner = NOT_OWNER;
+}
+
+boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) {
+ return boost::intrusive_ptr<QueueContext>(
+ static_cast<QueueContext*>(q.getClusterContext().get()));
+}
+
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
new file mode 100644
index 0000000000..5bafb5eb0f
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -0,0 +1,93 @@
+#ifndef QPID_CLUSTER_EXP_QUEUESTATE_H
+#define QPID_CLUSTER_EXP_QUEUESTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <qpid/RefCounted.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/intrusive_ptr.hpp>
+
+
+// FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on
+// class broker::Cluster::Queue. This becomes the cluster context.
+
+namespace qpid {
+namespace broker {
+class Queue;
+}
+namespace cluster {
+
+class Multicaster;
+
+ /**
+ * Queue state that is not replicated to the cluster.
+ * Manages the local queue start/stop status
+ *
+ * Thread safe: Called by connection and dispatch threads.
+ */
+class QueueContext : public RefCounted {
+ // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr?
+ public:
+ QueueContext(broker::Queue& q, Multicaster& m);
+
+ /** Sharing ownership of queue, can acquire up to limit before releasing.
+ * Called in deliver thread.
+ */
+ void sharedOwner(size_t limit);
+
+ /** Sole owner of queue, no limits to acquiring */
+ void soleOwner();
+
+ /**
+ * Count an acquired message against the limit.
+ * Called from connection threads while consuming messages
+ */
+ void acquire();
+
+ /** Called if the queue becomes empty, from connection thread. */
+ void empty();
+
+ /** Called when queue is stopped, connection or deliver thread. */
+ void stopped();
+
+ /** Called when the last subscription to a queue is cancelled */
+ void unsubscribed();
+
+ /** Get the context for a broker queue. */
+ static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
+
+ private:
+ void release();
+
+ sys::Mutex lock;
+ enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner;
+ size_t count; // Count of dequeues remaining, 0 means no limit.
+ broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
+ Multicaster& mcast;
+
+ // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing.
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_QUEUESTATE_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
new file mode 100644
index 0000000000..7d56025fb8
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueHandler.h"
+#include "EventHandler.h"
+#include "QueueReplica.h"
+#include "QueueContext.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up?
+QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m)
+ : HandlerBase(eh), multicaster(m) {}
+
+bool QueueHandler::invoke(const framing::AMQBody& body) {
+ return framing::invoke(*this, body).wasHandled();
+}
+
+void QueueHandler::subscribe(const std::string& queue) {
+ find(queue)->subscribe(sender());
+}
+void QueueHandler::unsubscribe(const std::string& queue) {
+ find(queue)->unsubscribe(sender());
+}
+void QueueHandler::resubscribe(const std::string& queue) {
+ find(queue)->resubscribe(sender());
+}
+
+void QueueHandler::left(const MemberId& member) {
+ // Unsubscribe for members that leave.
+ // FIXME aconway 2011-06-28: also need to re-queue acquired messages.
+ for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i)
+ i->second->unsubscribe(member);
+}
+
+// FIXME aconway 2011-06-08: do we need to hold on to the shared pointer for lifecycle?
+void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
+ // FIXME aconway 2011-06-08: move create operation from Wiring to Queue handler.
+ // FIXME aconway 2011-05-10: assert not already in map.
+
+ // Local queues already have a context, remote queues need one.
+ if (!QueueContext::get(*q))
+ new QueueContext(*q, multicaster); // Context attaches itself to the Queue
+ queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
+ new QueueReplica(q, self()));
+}
+
+boost::intrusive_ptr<QueueReplica> QueueHandler::find(const std::string& queue) {
+ QueueMap::iterator i = queues.find(queue);
+ if (i == queues.end())
+ throw Exception(QPID_MSG("Unknown queue " << queue << " in cluster queue handler"));
+ return i->second;
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
new file mode 100644
index 0000000000..6494efb1b3
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
@@ -0,0 +1,82 @@
+#ifndef QPID_CLUSTER_QUEUEHANDLER_H
+#define QPID_CLUSTER_QUEUEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "HandlerBase.h"
+#include "LockedMap.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "boost/shared_ptr.hpp"
+#include "boost/intrusive_ptr.hpp"
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace cluster {
+
+class EventHandler;
+class QueueReplica;
+class Multicaster;
+
+/**
+ * Handler for queue subscription events.
+ *
+ * THREAD UNSAFE: only accessed in cluster deliver thread, on delivery
+ * of queue controls and also from WiringHandler on delivery of queue
+ * create.
+ */
+class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
+ public HandlerBase
+{
+ public:
+ QueueHandler(EventHandler&, Multicaster&);
+
+ bool invoke(const framing::AMQBody& body);
+
+ // Events
+ void subscribe(const std::string& queue);
+ void unsubscribe(const std::string& queue);
+ void resubscribe(const std::string& queue);
+ void left(const MemberId&);
+
+ void add(boost::shared_ptr<broker::Queue>);
+
+ // NB: These functions ar called in connection threads, not deliver threads.
+ void acquired(const broker::QueuedMessage& qm);
+ void empty(const broker::Queue& q);
+
+ private:
+ typedef std::map<std::string, boost::intrusive_ptr<QueueReplica> > QueueMap;
+
+ boost::intrusive_ptr<QueueReplica> find(const std::string& queue);
+
+ QueueMap queues;
+ Multicaster& multicaster;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_QUEUEHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
new file mode 100644
index 0000000000..551477a920
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueReplica.h"
+#include "QueueContext.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q,
+ const MemberId& self_)
+ : queue(q), self(self_), context(QueueContext::get(*q))
+{
+ // q is initially stopped.
+}
+
+struct PrintSubscribers {
+ const QueueReplica::MemberQueue& mq;
+ PrintSubscribers(const QueueReplica::MemberQueue& m) : mq(m) {}
+};
+
+std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) {
+ copy(ps.mq.begin(), ps.mq.end(), std::ostream_iterator<MemberId>(o, " "));
+ return o;
+}
+
+std::ostream& operator<<(std::ostream& o, QueueReplica::State s) {
+ static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" };
+ return o << tags[s];
+}
+
+std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) {
+ o << qr.queue->getName() << "(" << qr.getState() << "): "
+ << PrintSubscribers(qr.subscribers);
+ return o;
+}
+
+// FIXME aconway 2011-05-17: error handling for asserts.
+
+void QueueReplica::subscribe(const MemberId& member) {
+ State before = getState();
+ subscribers.push_back(member);
+ update(before);
+}
+
+void QueueReplica::unsubscribe(const MemberId& member) {
+ State before = getState();
+ MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
+ if (i != subscribers.end()) {
+ subscribers.erase(i, subscribers.end());
+ update(before);
+ }
+}
+
+void QueueReplica::resubscribe(const MemberId& member) {
+ assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling
+ State before = getState();
+ subscribers.pop_front();
+ subscribers.push_back(member);
+ update(before);
+}
+
+void QueueReplica::update(State before) {
+ const int acquireLimit = 10; // FIXME aconway 2011-06-23: configurable
+ State after = getState();
+ if (before == after) return;
+ QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")");
+ switch (after) {
+ case UNSUBSCRIBED: break;
+ case SUBSCRIBED: break;
+ case SOLE_OWNER:
+ context->soleOwner();
+ break;
+ case SHARED_OWNER:
+ context->sharedOwner(acquireLimit);
+ break;
+ }
+}
+
+QueueReplica::State QueueReplica::getState() const {
+ if (isOwner())
+ return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER;
+ return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;
+}
+
+bool QueueReplica::isOwner() const {
+ return !subscribers.empty() && subscribers.front() == self;
+}
+
+bool QueueReplica::isSubscriber(const MemberId& member) const {
+ // FIXME aconway 2011-06-27: linear search here, is it a performance issue?
+ return std::find(subscribers.begin(), subscribers.end(), member) != subscribers.end();
+}
+
+}} // namespace qpid::cluster::exp
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
new file mode 100644
index 0000000000..a322a8b9c0
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -0,0 +1,85 @@
+#ifndef QPID_CLUSTER_QUEUEMODEL_H
+#define QPID_CLUSTER_QUEUEMODEL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qpid/cluster/types.h"
+#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+#include <deque>
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+}
+
+namespace cluster {
+class QueueHandler;
+class QueueContext;
+
+/**
+ * Queue state that is replicated among all cluster members.
+ *
+ * Handles queue subscription controls by starting/stopping the queue.
+ *
+ * THREAD UNSAFE: only used in cluster deliver thread, on delivery
+ * of queue controls and also from WiringHandler on delivery of queue
+ * create.
+ */
+class QueueReplica : public RefCounted
+{
+ public:
+ QueueReplica(boost::shared_ptr<broker::Queue> , const MemberId& );
+ void subscribe(const MemberId&);
+ void unsubscribe(const MemberId&);
+ void resubscribe(const MemberId&);
+
+ private:
+ enum State {
+ UNSUBSCRIBED,
+ SUBSCRIBED,
+ SOLE_OWNER,
+ SHARED_OWNER
+ };
+
+ friend class PrintSubscribers;
+ friend std::ostream& operator<<(std::ostream&, State);
+ friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
+
+ typedef std::deque<MemberId> MemberQueue;
+
+ boost::shared_ptr<broker::Queue> queue;
+ MemberQueue subscribers;
+ MemberId self;
+ boost::intrusive_ptr<QueueContext> context;
+
+ State getState() const;
+ bool isOwner() const;
+ bool isSubscriber(const MemberId&) const;
+ void update(State before);
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_QUEUEMODEL_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index 04a76b9758..1b3286792f 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -22,7 +22,8 @@
#include "Core.h"
#include "WiringHandler.h"
#include "EventHandler.h"
-#include "BrokerHandler.h"
+#include "QueueHandler.h"
+#include "BrokerContext.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
@@ -32,18 +33,20 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
-#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace cluster {
using namespace broker;
using framing::FieldTable;
-WiringHandler::WiringHandler(EventHandler& e) :
+WiringHandler::WiringHandler(EventHandler& e,
+ const boost::intrusive_ptr<QueueHandler>& qh) :
HandlerBase(e),
broker(e.getCore().getBroker()),
recovery(broker.getQueues(), broker.getExchanges(),
- broker.getLinks(), broker.getDtxManager())
+ broker.getLinks(), broker.getDtxManager()),
+ queueHandler(qh)
{}
bool WiringHandler::invoke(const framing::AMQBody& body) {
@@ -51,24 +54,39 @@ bool WiringHandler::invoke(const framing::AMQBody& body) {
}
void WiringHandler::createQueue(const std::string& data) {
- if (sender() == self()) return;
- BrokerHandler::ScopedSuppressReplication ssr;
- framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
- // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
- RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf);
- QPID_LOG(debug, "cluster: create queue " << queue->getName());
+ // FIXME aconway 2011-05-25: Needs async completion.
+ std::string name;
+ if (sender() != self()) { // Created by another member, need to create locally.
+ BrokerContext::ScopedSuppressReplication ssr;
+ framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+ // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+ RecoverableQueue::shared_ptr rq = recovery.recoverQueue(buf);
+ name = rq->getName();
+ }
+ else { // Created locally, Queue and QueueContext already exist.
+ framing::Buffer buffer(const_cast<char*>(&data[0]), data.size());
+ // FIXME aconway 2011-05-10: implicit knowledge of queue encoding.
+ buffer.getShortString(name);
+ }
+ boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name);
+ assert(q); // FIXME aconway 2011-05-10: error handling.
+ // TODO aconway 2011-05-10: if we implement multi-group for queues then
+ // this call is a problem: comes from wiring delivery thread, not queues.
+ // FIXME aconway 2011-06-08: move wiring ops to Queue and Exchange handlers..
+ queueHandler->add(q);
+ QPID_LOG(debug, "cluster: create queue " << q->getName());
}
void WiringHandler::destroyQueue(const std::string& name) {
if (sender() == self()) return;
QPID_LOG(debug, "cluster: destroy queue " << name);
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
broker.deleteQueue(name, std::string(), std::string());
}
void WiringHandler::createExchange(const std::string& data) {
if (sender() == self()) return;
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
// TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf);
@@ -78,7 +96,7 @@ void WiringHandler::createExchange(const std::string& data) {
void WiringHandler::destroyExchange(const std::string& name) {
if (sender() == self()) return;
QPID_LOG(debug, "cluster: destroy exchange " << name);
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
broker.getExchanges().destroy(name);
}
@@ -91,7 +109,7 @@ void WiringHandler::bind(
<< " exchange=" << exchangeName
<< " key=" << routingKey
<< " arguments=" << arguments);
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string());
}
@@ -104,7 +122,7 @@ void WiringHandler::unbind(
<< " exchange=" << exchangeName
<< " key=" << routingKey
<< " arguments=" << arguments);
- BrokerHandler::ScopedSuppressReplication ssr;
+ BrokerContext::ScopedSuppressReplication ssr;
broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string());
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
index e375cf6a95..71aa6e52e9 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
@@ -42,7 +42,7 @@ class Broker;
namespace cluster {
class EventHandler;
-
+class QueueHandler;
/**
* Handler for wiring disposition events.
@@ -51,7 +51,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler,
public HandlerBase
{
public:
- WiringHandler(EventHandler&);
+ WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh);
bool invoke(const framing::AMQBody& body);
@@ -66,8 +66,10 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler,
private:
+
broker::Broker& broker;
broker::RecoveryManagerImpl recovery;
+ boost::intrusive_ptr<QueueHandler> queueHandler;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/overview.h b/qpid/cpp/src/qpid/cluster/exp/overview.h
new file mode 100644
index 0000000000..3a0189d750
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/overview.h
@@ -0,0 +1,13 @@
+// This file is documentation in doxygen format.
+/**
+
+<h1>New cluster implementation overview</h>
+
+There are 3 areas indicated by a suffix on class names:
+
+- Replica: State that is replicated to the entire cluster. Only called by Handlers in the deliver thread.
+- Context: State that is private to this member. Called by both Replia and broker objects in deliver and connection threads.
+- Handler: Dispatch CPG messages by calling Replica objects in the deliver thread.
+
+
+**/
diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h
index af21af46ba..6bb02bc6af 100644
--- a/qpid/cpp/src/qpid/sys/Stoppable.h
+++ b/qpid/cpp/src/qpid/sys/Stoppable.h
@@ -21,17 +21,27 @@
* under the License.
*
*/
+
+#include <boost/function.hpp>
+
namespace qpid {
namespace sys {
+// FIXME aconway 2011-05-25: needs better name
+
/**
* An activity that may be executed by multiple threads, and can be stopped.
- * Stopping prevents new threads from entering and waits till exiting busy threads leave.
+ *
+ * Stopping prevents new threads from entering and calls a callback
+ * when all busy threads leave.
*/
class Stoppable {
public:
- Stoppable() : busy(0), stopped(false) {}
- ~Stoppable() { stop(); }
+ /**
+ *@param stoppedCallback: called when all threads have stopped.
+ */
+ Stoppable(boost::function<void()> stoppedCallback)
+ : busy(0), stopped(false), notify(stoppedCallback) {}
/** Mark the scope of a busy thread like this:
* <pre>
@@ -52,38 +62,49 @@ class Stoppable {
friend class Scope;
- /** Mark stopped, wait for all threads to leave their busy scope. */
+ /**
+ * Set state to "stopped", so no new threads can enter.
+ * Call notify function when all busy threads have left.
+ */
+ // FIXME aconway 2011-06-27: not guaranteed that stopped will be called,
+ // deadlock?
void stop() {
sys::Monitor::ScopedLock l(lock);
stopped = true;
- while (busy > 0) lock.wait();
+ check();
}
- /** Set the state to started.
- *@pre state is stopped and no theads are busy.
+ /** Set the state to "started", allow threads to enter.
*/
void start() {
sys::Monitor::ScopedLock l(lock);
- assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling.
stopped = false;
}
- private:
- uint busy;
- bool stopped;
- sys::Monitor lock;
-
+ // Busy thread enters scope
bool enter() {
sys::Monitor::ScopedLock l(lock);
if (!stopped) ++busy;
return !stopped;
}
+ // Busy thread exits scope
void exit() {
sys::Monitor::ScopedLock l(lock);
assert(busy > 0);
- if (--busy == 0) lock.notifyAll();
+ --busy;
+ check();
+ }
+
+ private:
+ void check() {
+ if (stopped && busy == 0 && notify) notify();
}
+
+ uint busy;
+ bool stopped;
+ sys::Monitor lock;
+ boost::function< void() > notify;
};
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
index 53d0f2102a..4311cf51cf 100644
--- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp
+++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp
@@ -56,13 +56,18 @@ class DummyCluster : public broker::Cluster
*/
bool isRouting;
+ // Record a QueuedMessage
void recordQm(const string& op, const broker::QueuedMessage& qm) {
history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
% qm.position % qm.payload->getFrames().getContent()).str();
}
+
+ // Record a message
void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) {
history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
}
+
+ // Record a string
void recordStr(const string& op, const string& name) {
history += (format("%s(%s)") % op % name).str();
}
@@ -102,6 +107,11 @@ class DummyCluster : public broker::Cluster
history += (format("cancel(%s, %d)") % q.getName() % n).str();
}
+ // Queues
+ // FIXME aconway 2011-05-18: update test to exercise empty()
+ virtual void empty(broker::Queue& q) { recordStr("empty", q.getName()); }
+ virtual void stopped(broker::Queue& q) { recordStr("stopped", q.getName()); }
+
// Wiring
virtual void create(broker::Queue& q) { recordStr("createq", q.getName()); }
@@ -230,7 +240,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
h.clear();
i = 0;
m = Message("t");
- m.setTtl(Duration(1)); // Timeout 1ms
+ m.setTtl(Duration(1)); // Timeout 1ms
sender.send(m);
usleep(2000); // Sleep 2ms
bool received = receiver.fetch(m, Duration::IMMEDIATE);
@@ -239,6 +249,10 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
+ // Note: empty is called once for each receiver.
+ BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
+ BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
+ BOOST_CHECK_EQUAL(h.at(i++), "empty(q)");
BOOST_CHECK_EQUAL(h.size(), i);
// Message replaced on LVQ
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index f17dfe2961..1cf749cdb4 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -33,8 +33,27 @@ log = getLogger("qpid.cluster_tests")
class Cluster2Tests(BrokerTest):
"""Tests for new cluster code."""
- def verify_content(self, content, receiver):
- for c in content: self.assertEqual(c, receiver.fetch(1).content)
+ def queue_exists(self, queue, connection):
+ s = connection.session()
+ try:
+ s.sender(queue)
+ return True
+ except qpid.messaging.exceptions.NotFound:
+ return False
+
+ # FIXME aconway 2011-06-22: needed to compensate for
+ # async wiring in early cluster2 prototype
+ def wait_for_queue(self, queue, connections, timeout=10):
+ deadline = time.time() + timeout
+ for c in connections:
+ while not self.queue_exists(queue,c):
+ if time.time() > timeout: fail("Time out in wait_for_queue(%s))"%queue)
+ time.sleep(0.01)
+
+ # FIXME aconway 2011-05-17: remove, use assert_browse.
+ def verify_content(self, expect, receiver):
+ actual = [receiver.fetch(1).content for x in expect]
+ self.assertEqual(expect, actual)
self.assertRaises(Empty, receiver.fetch, 0)
def test_message_enqueue(self):
@@ -74,12 +93,15 @@ class Cluster2Tests(BrokerTest):
s0 = sn0.sender("q;{create:always,delete:always}")
r0 = sn0.receiver("q")
sn1 = cluster[1].connect().session()
- r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring.
+ r1 = sn1.receiver("q;{create:always}")
content = ["a","b","c"]
for m in content: s0.send(Message(m))
- # Verify enqueued on cluster[1]
+ # Verify enqueued on members 0 and 1
+ # FIXME aconway 2011-05-13:
+ self.verify_content(content, sn0.receiver("q;{mode:browse}"))
self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+
# Dequeue on cluster[0]
self.assertEqual(r0.fetch(1).content, "a")
sn0.acknowledge(sync=True)
@@ -114,3 +136,40 @@ class Cluster2Tests(BrokerTest):
self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex")
# FIXME aconway 2010-10-29: test unbind, may need to use old API.
+
+ def test_dequeue_mutex(self):
+ """Ensure that one and only one consumer receives each dequeued message."""
+ class Receiver(Thread):
+ def __init__(self, session):
+ self.session = session
+ self.receiver = session.receiver("q")
+ self.messages = []
+ Thread.__init__(self)
+
+ def run(self):
+ try:
+ while True:
+ self.messages.append(self.receiver.fetch(1))
+ self.session.acknowledge()
+ except Empty: pass
+
+ cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t
+ connections = [ b.connect() for b in cluster]
+ sessions = [ c.session() for c in connections ]
+ sender = sessions[0].sender("q;{create:always}")
+ self.wait_for_queue("q", connections)
+
+ receivers = [ Receiver(s) for s in sessions ]
+ for r in receivers: r.start()
+
+ n = 0
+ t = time.time() + 1 # Send for 1 second.
+ while time.time() < t:
+ sender.send(str(n))
+ n += 1
+ for r in receivers: r.join();
+ print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17:
+ for r in receivers: assert len(r.messages) # At least one message to each
+ messages = [int(m.content) for r in receivers for m in r.messages ]
+ messages.sort()
+ self.assertEqual(range(n), messages)
diff --git a/qpid/cpp/src/tests/qpid-test-cluster b/qpid/cpp/src/tests/qpid-test-cluster
index 9887406ef9..7522a7fdfd 100755
--- a/qpid/cpp/src/tests/qpid-test-cluster
+++ b/qpid/cpp/src/tests/qpid-test-cluster
@@ -28,7 +28,7 @@ Options:
Default is $DEFAULT_ENV.
-c CONFIG Use CONFIG as qpidd config file. Copies CONFIG to each host.
Default is $DEFAULT_CONF
- -d Delete data-dir and log file before starting broker.
+ -d Delete data-dir and log file before starting broker.
"
exit 1
}
@@ -82,6 +82,7 @@ do_start() {
}
do_stop() {
+
for h in $HOSTS; do
ssh $h "$SOURCE_ENV qpidd -q --no-module-dir --no-data-dir $QPIDD_ARGS"
done
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index e0cd647894..aac764ee62 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -8,9 +8,9 @@
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
--
+-
- http://www.apache.org/licenses/LICENSE-2.0
--
+-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -81,7 +81,7 @@
<control name="message-expired" code="0x12">
<field name="id" type="uint64"/>
</control>
-
+
<domain name="error-type" type="uint8" label="Types of error">
<enum>
<choice name="none" value="0"/>
@@ -89,7 +89,7 @@
<choice name="connection" value="2"/>
</enum>
</domain>
-
+
<!-- Check for error consistency across the cluster -->
<control name="error-check" code="0x14">
<field name="type" type="error-type"/>
@@ -149,7 +149,7 @@
<!-- Abort a connection that is sending invalid data. -->
<control name="abort" code="0x4"/>
-
+
<!-- Update controls. Sent to a new broker in joining mode.
A connection is updated as followed:
- send the shadow's management ID in shadow-perpare on the update connection
@@ -192,9 +192,9 @@
<field name="enqueued" type="bit"/>
<field name="credit" type="uint32"/>
</control>
-
+
<!-- Tx transaction state. -->
- <control name="tx-start" code="0x12"/>
+ <control name="tx-start" code="0x12"/>
<control name="tx-accept" code="0x13"> <field name="commands" type="sequence-set"/> </control>
<control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/> </control>
<control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/> </control>
@@ -204,7 +204,7 @@
</control>
<control name="tx-end" code="0x17"/>
<control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
-
+
<!-- Consumers in the connection's output task -->
<control name="output-task" code="0x19">
<field name="channel" type="uint16"/>
@@ -294,6 +294,7 @@
<field name="message" type="str32"/>
</control>
+ <!-- FIXME aconway 2011-04-27: reference queues by index, not name -->
<control name="enqueue" code="0x2">
<field name="routing-id" type="uint32"/>
<field name="queue" type="queue.name"/>
@@ -303,10 +304,22 @@
<field name="routing-id" type="uint32"/>
</control>
- <control name="dequeue" code="0x4">
+ <!-- FIXME aconway 2011-04-27: review queue positions vs. global message IDs -->
+ <control name="acquire" code="0x4">
<field name="queue" type="queue.name"/>
<field name="position" type="uint32"/>
</control>
+
+ <control name="dequeue" code="0x5">
+ <field name="queue" type="queue.name"/>
+ <field name="position" type="uint32"/>
+ </control>
+
+ <control name="release" code="0x6">
+ <field name="queue" type="queue.name"/>
+ <field name="position" type="uint32"/>
+ </control>
+
</class>
<class name="cluster-wiring" code="0x83">
@@ -341,4 +354,26 @@
</control>
</class>
+
+ <!-- Manage subscriptions to a queue.
+
+ Each queue has a "subscriber queue" of members waiting take
+ messages from the queue. The member at the front of the queue
+ is the only one allowed to take messages. -->
+
+ <class name="cluster-queue" code="0x84">
+ <!-- Join at the back of the subscriber queue -->
+ <control name="subscribe" code="0x1">
+ <field name="queue" type="queue.name"/>
+ </control>
+ <!-- Leave the subscriber queue -->
+ <control name="unsubscribe" code="0x2">
+ <field name="queue" type="queue.name"/>
+ </control>
+ <!-- Move the member at the front to the back. -->
+ <control name="resubscribe" code="0x3">
+ <field name="queue" type="queue.name"/>
+ </control>
+ </class>
+
</amqp>