diff options
Diffstat (limited to 'qpid')
30 files changed, 1004 insertions, 147 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 8ede09fa79..8d22850360 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -452,6 +452,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/AtomicValue_gcc.h \ qpid/sys/AtomicValue_mutex.h \ qpid/sys/BlockingQueue.h \ + qpid/sys/BusyThreads.h \ qpid/sys/ClusterSafe.h \ qpid/sys/ClusterSafe.cpp \ qpid/sys/Codec.h \ diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 69b0228126..1809c87ca8 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -110,8 +110,8 @@ cluster2_la_SOURCES = \ qpid/cluster/Cpg.h \ qpid/cluster/PollerDispatch.cpp \ qpid/cluster/PollerDispatch.h \ - qpid/cluster/exp/BrokerHandler.cpp \ - qpid/cluster/exp/BrokerHandler.h \ + qpid/cluster/exp/BrokerContext.cpp \ + qpid/cluster/exp/BrokerContext.h \ qpid/cluster/exp/BufferFactory.h \ qpid/cluster/exp/Cluster2Plugin.cpp \ qpid/cluster/exp/Core.cpp \ @@ -124,6 +124,12 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/MessageHandler.h \ qpid/cluster/exp/Multicaster.cpp \ qpid/cluster/exp/Multicaster.h \ + qpid/cluster/exp/QueueContext.cpp \ + qpid/cluster/exp/QueueContext.h \ + qpid/cluster/exp/QueueHandler.cpp \ + qpid/cluster/exp/QueueHandler.h \ + qpid/cluster/exp/QueueReplica.cpp \ + qpid/cluster/exp/QueueReplica.h \ qpid/cluster/exp/WiringHandler.cpp \ qpid/cluster/exp/WiringHandler.h diff --git a/qpid/cpp/src/qpid/broker/Cluster.h b/qpid/cpp/src/qpid/broker/Cluster.h index c927d35ba3..193332692b 100644 --- a/qpid/cpp/src/qpid/broker/Cluster.h +++ b/qpid/cpp/src/qpid/broker/Cluster.h @@ -80,6 +80,10 @@ class Cluster virtual void consume(Queue&, size_t consumerCount) = 0; /** A consumer cancels its subscription to a queue */ virtual void cancel(Queue&, size_t consumerCount) = 0; + /** A queue becomes empty */ + virtual void empty(Queue&) = 0; + /** A queue has been stopped */ + virtual void stopped(Queue&) = 0; // Wiring diff --git a/qpid/cpp/src/qpid/broker/NullCluster.h b/qpid/cpp/src/qpid/broker/NullCluster.h index efda8bb1ab..399e2a3ca6 100644 --- a/qpid/cpp/src/qpid/broker/NullCluster.h +++ b/qpid/cpp/src/qpid/broker/NullCluster.h @@ -49,6 +49,11 @@ class NullCluster : public Cluster virtual void consume(Queue&, size_t) {} virtual void cancel(Queue&, size_t) {} + // Queues + + virtual void stopped(Queue&) {} + virtual void empty(Queue&) {} + // Wiring virtual void create(Queue&) {} @@ -59,6 +64,7 @@ class NullCluster : public Cluster const std::string&, const framing::FieldTable&) {} virtual void unbind(Queue&, Exchange&, const std::string&, const framing::FieldTable&) {} + }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index f593d7e443..84f025824c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -112,7 +112,8 @@ Queue::Queue(const string& _name, bool _autodelete, broker(b), deleted(false), barrier(*this), - autoDeleteTimeout(0) + autoDeleteTimeout(0), + dispatching(boost::bind(&Queue::acquireStopped,this)) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -231,29 +232,40 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -// Inform the cluster of an acquired message on exit from a function -// that does the acquiring. ClusterAcquireOnExit is declared *before* -// any locks are taken. The calling function sets qmsg to the acquired -// message with a lock held, but the call to Cluster::acquire() will -// be outside the lock. -struct ClusterAcquireOnExit { +/** Mark a scope that acquires a message. + * + * ClusterAcquireScope is declared before are taken. The calling + * function sets qmsg with the lock held, but the call to + * Cluster::acquire() will happen after the lock is released. + * + * Also marks a Stoppable as busy for the duration of the scope. + **/ +struct ClusterAcquireScope { Broker* broker; + Queue& queue; QueuedMessage qmsg; - ClusterAcquireOnExit(Broker* b) : broker(b) {} - ~ClusterAcquireOnExit() { - if (broker && qmsg.queue) broker->getCluster().acquire(qmsg); + + ClusterAcquireScope(Queue& q) : broker(q.getBroker()), queue(q) {} + + ~ClusterAcquireScope() { + if (broker) { + // FIXME aconway 2011-06-27: Move to QueueContext. + // Avoid the indirection via queuename. + if (qmsg.queue) broker->getCluster().acquire(qmsg); + else broker->getCluster().empty(queue); + } } }; bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - ClusterAcquireOnExit willAcquire(broker); // Outside lock + ClusterAcquireScope acquireScope(*this); // Outside lock Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); if (messages->remove(position, message)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); - willAcquire.qmsg = message; + acquireScope.qmsg = message; return true; } else { QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); @@ -300,9 +312,15 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { - ClusterAcquireOnExit willAcquire(broker); // Outside the lock + Stoppable::Scope stopper(dispatching); // FIXME aconway 2011-06-28: rename consuming + if (!stopper) { + QPID_LOG(trace, "Queue is stopped: " << name); + listeners.addListener(c); + return NO_MESSAGES; + } + ClusterAcquireScope acquireScope(*this); // Outside the lock Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { + if (messages->empty()) { // FIXME aconway 2011-06-07: ugly QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -317,7 +335,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - willAcquire.qmsg = msg; + acquireScope.qmsg = msg; pop(); return CONSUMED; } else { @@ -374,18 +392,11 @@ void Queue::removeListener(Consumer::shared_ptr c) bool Queue::dispatch(Consumer::shared_ptr c) { - Stoppable::Scope doDispatch(dispatching); - if (doDispatch) { - QueuedMessage msg(this); - if (getNextMessage(msg, c)) { - c->deliver(msg); - return true; - } else { - return false; - } - } else { // Dispatching is stopped - Mutex::ScopedLock locker(messageLock); - listeners.addListener(c); // FIXME aconway 2011-05-05: + QueuedMessage msg(this); + if (getNextMessage(msg, c)) { + c->deliver(msg); + return true; + } else { return false; } } @@ -450,10 +461,10 @@ void Queue::cancel(Consumer::shared_ptr c){ } QueuedMessage Queue::get(){ - ClusterAcquireOnExit willAcquire(broker); // Outside lock + ClusterAcquireScope acquireScope(*this); // Outside lock Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - if (messages->pop(msg)) willAcquire.qmsg = msg; + if (messages->pop(msg)) acquireScope.qmsg = msg; return msg; } @@ -704,7 +715,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) if (!isEnqueued(msg)) return false; if (!ctxt) dequeued(msg); } + if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock + // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -902,6 +915,10 @@ void Queue::notifyDeleted() set.notifyAll(); } +void Queue::acquireStopped() { + if (broker) broker->getCluster().stopped(*this); +} + void Queue::bound(const string& exchange, const string& key, const FieldTable& args) { @@ -1234,7 +1251,7 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, } -const Broker* Queue::getBroker() +Broker* Queue::getBroker() { return broker; } @@ -1268,10 +1285,13 @@ void Queue::UsageBarrier::destroy() // FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()? void Queue::stop() { + // FIXME aconway 2011-05-25: rename dispatching - acquiring? dispatching.stop(); } void Queue::start() { + QPID_LOG(critical, "FIXME start context=" << clusterContext); + assert(clusterContext); // FIXME aconway 2011-06-08: XXX dispatching.start(); notifyListener(); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 1588ae1171..0ba7b362e9 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "qpid/log/Statement.h" // FIXME XXX aconway 2011-06-08: remove #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/OwnershipToken.h" @@ -130,8 +131,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; - // Allow dispatching consumer threads to be stopped. - sys::Stoppable dispatching; + // Allow dispatching consumer threads to be stopped. Used by cluster + sys::Stoppable dispatching; // FIXME aconway 2011-06-07: name: acquiring? + boost::intrusive_ptr<RefCounted> clusterContext; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -179,6 +181,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, void checkNotDeleted(); void notifyDeleted(); + void acquireStopped(); public: @@ -379,20 +382,25 @@ class Queue : public boost::enable_shared_from_this<Queue>, void flush(); - const Broker* getBroker(); + Broker* getBroker(); - /** Stop consumers. Return when all consumer threads are stopped. - *@pre Queue is active and not already stopping. - */ + /** Stop consumers. Return when all consumer threads are stopped. */ void stop(); - /** Start consumers. - *@pre Queue is stopped and idle: no thread in dispatch. - */ + /** Start consumers. */ void start(); - /** Context data attached and used by cluster code. */ - boost::intrusive_ptr<qpid::RefCounted> clusterContext; + /** Context information used in a cluster. */ + boost::intrusive_ptr<RefCounted> getClusterContext() { + // FIXME aconway 2011-06-08: XXX + QPID_LOG(critical, "FIXME q get context " << name << clusterContext); + return clusterContext; + } + void setClusterContext(boost::intrusive_ptr<RefCounted> context) { + // FIXME aconway 2011-06-08: XXX + clusterContext = context; + QPID_LOG(critical, "FIXME q set context " << name << clusterContext); + } }; }} // qpid::broker diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index 269e0b2ba3..465a5de021 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -20,17 +20,22 @@ */ #include "Core.h" -#include "BrokerHandler.h" +#include "BrokerContext.h" +#include "QueueContext.h" +#include "QueueHandler.h" #include "qpid/framing/ClusterMessageRoutingBody.h" #include "qpid/framing/ClusterMessageRoutedBody.h" #include "qpid/framing/ClusterMessageEnqueueBody.h" +#include "qpid/framing/ClusterMessageAcquireBody.h" #include "qpid/framing/ClusterMessageDequeueBody.h" +#include "qpid/framing/ClusterMessageReleaseBody.h" #include "qpid/framing/ClusterWiringCreateQueueBody.h" #include "qpid/framing/ClusterWiringCreateExchangeBody.h" #include "qpid/framing/ClusterWiringDestroyQueueBody.h" #include "qpid/framing/ClusterWiringDestroyExchangeBody.h" #include "qpid/framing/ClusterWiringBindBody.h" #include "qpid/framing/ClusterWiringUnbindBody.h" +#include "qpid/framing/ClusterQueueSubscribeBody.h" #include "qpid/sys/Thread.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/broker/Queue.h" @@ -54,27 +59,28 @@ QPID_TSS bool tssNoReplicate = false; QPID_TSS RoutingId tssRoutingId = 0; } -BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() { +BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() { assert(!tssNoReplicate); tssNoReplicate = true; } -BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() { +BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() { assert(tssNoReplicate); tssNoReplicate = false; } -BrokerHandler::BrokerHandler(Core& c) : core(c) {} +BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q) + : core(c), queueHandler(q) {} -RoutingId BrokerHandler::nextRoutingId() { +RoutingId BrokerContext::nextRoutingId() { RoutingId id = ++routingId; if (id == 0) id = ++routingId; // Avoid 0 on wrap-around. return id; } -void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { } +void BrokerContext::routing(const boost::intrusive_ptr<Message>&) { } -bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) +bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) { if (tssNoReplicate) return true; if (!tssRoutingId) { // This is the first enqueue, so send the message @@ -93,7 +99,7 @@ bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& m return false; } -void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) { +void BrokerContext::routed(const boost::intrusive_ptr<Message>&) { if (tssRoutingId) { // we enqueued at least one message. core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId)); // Note: routingMap is cleaned up on CPG delivery in MessageHandler. @@ -101,28 +107,45 @@ void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) { } } -void BrokerHandler::dequeue(const broker::QueuedMessage& qm) { +void BrokerContext::acquire(const broker::QueuedMessage& qm) { if (tssNoReplicate) return; - // FIXME aconway 2010-10-28: we also need to delay completion of the - // ack that caused this dequeue until self-delivery of the mcast below. - core.mcast(ClusterMessageDequeueBody( + QueueContext::get(*qm.queue)->acquire(); + core.mcast(ClusterMessageAcquireBody( ProtocolVersion(), qm.queue->getName(), qm.position)); } -void BrokerHandler::create(broker::Queue& q) { +// FIXME aconway 2011-05-24: need to handle acquire and release. +// Dequeue in the wrong place? +void BrokerContext::dequeue(const broker::QueuedMessage& qm) { if (tssNoReplicate) return; + core.mcast(ClusterMessageDequeueBody( + ProtocolVersion(), qm.queue->getName(), qm.position)); +} + +void BrokerContext::release(const broker::QueuedMessage& ) { + // FIXME aconway 2011-05-24: TODO +} + +// FIXME aconway 2011-06-08: should be be using shared_ptr to q here? +void BrokerContext::create(broker::Queue& q) { + if (tssNoReplicate) return; // FIXME aconway 2011-06-08: revisit + // FIXME aconway 2011-06-08: error handling- if already set... + // Create local context immediately, queue will be stopped until replicated. + boost::intrusive_ptr<QueueContext> context( + new QueueContext(q,core.getMulticaster())); std::string data(q.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); q.encode(buf); core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data)); + QPID_LOG(critical, "FIXME BrokerContext create " << q.getName() << q.getClusterContext().get()); } -void BrokerHandler::destroy(broker::Queue& q) { +void BrokerContext::destroy(broker::Queue& q) { if (tssNoReplicate) return; core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName())); } -void BrokerHandler::create(broker::Exchange& ex) { +void BrokerContext::create(broker::Exchange& ex) { if (tssNoReplicate) return; std::string data(ex.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); @@ -130,12 +153,12 @@ void BrokerHandler::create(broker::Exchange& ex) { core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data)); } -void BrokerHandler::destroy(broker::Exchange& ex) { +void BrokerContext::destroy(broker::Exchange& ex) { if (tssNoReplicate) return; core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName())); } -void BrokerHandler::bind(broker::Queue& q, broker::Exchange& ex, +void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { if (tssNoReplicate) return; @@ -143,7 +166,7 @@ void BrokerHandler::bind(broker::Queue& q, broker::Exchange& ex, ProtocolVersion(), q.getName(), ex.getName(), key, args)); } -void BrokerHandler::unbind(broker::Queue& q, broker::Exchange& ex, +void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { if (tssNoReplicate) return; @@ -151,4 +174,32 @@ void BrokerHandler::unbind(broker::Queue& q, broker::Exchange& ex, ProtocolVersion(), q.getName(), ex.getName(), key, args)); } +// n is the number of consumers including the one just added. +// FIXME aconway 2011-06-27: rename, conflicting terms. +void BrokerContext::consume(broker::Queue& q, size_t n) { + if (n == 1) { + // FIXME aconway 2011-06-27: should be on QueueContext for symmetry? + core.mcast(ClusterQueueSubscribeBody(ProtocolVersion(), q.getName())); + } +} + +// n is the number of consumers after the cancel. +void BrokerContext::cancel(broker::Queue& q, size_t n) { + if (n == 0) QueueContext::get(q)->unsubscribed(); +} + +void BrokerContext::empty(broker::Queue& ) { + // FIXME aconway 2011-06-28: is this needed? +} + +void BrokerContext::stopped(broker::Queue& q) { + boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q); + // Don't forward the stopped call if the queue does not yet have a cluster context + // this when the queue is first created locally. + if (qc){ + QPID_LOG(critical, "FIXME BrokerContext::stopped " << q.getName()); + qc->stopped(); + } +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h index 1cfcc75863..fc19d6487b 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_BROKERHANDLER_H -#define QPID_CLUSTER_BROKERHANDLER_H +#ifndef QPID_CLUSTER_BROKERCONTEXT_H +#define QPID_CLUSTER_BROKERCONTEXT_H /* * @@ -28,13 +28,15 @@ namespace qpid { namespace cluster { class Core; +class QueueHandler; +class QueueContext; // TODO aconway 2010-10-19: experimental cluster code. /** * Implements broker::Cluster interface, handles events in broker code. */ -class BrokerHandler : public broker::Cluster +class BrokerContext : public broker::Cluster { public: /** Suppress replication while in scope. @@ -45,7 +47,7 @@ class BrokerHandler : public broker::Cluster ~ScopedSuppressReplication(); }; - BrokerHandler(Core&); + BrokerContext(Core&, boost::intrusive_ptr<QueueHandler>); // FIXME aconway 2010-10-20: implement all points. @@ -54,14 +56,18 @@ class BrokerHandler : public broker::Cluster void routing(const boost::intrusive_ptr<broker::Message>&); bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&); void routed(const boost::intrusive_ptr<broker::Message>&); - void acquire(const broker::QueuedMessage&) {} - void release(const broker::QueuedMessage&) {} + void acquire(const broker::QueuedMessage&); void dequeue(const broker::QueuedMessage&); + void release(const broker::QueuedMessage&); // Consumers - void consume(broker::Queue&, size_t) {} - void cancel(broker::Queue&, size_t) {} + void consume(broker::Queue&, size_t); + void cancel(broker::Queue&, size_t); + + // Queues + void empty(broker::Queue&); + void stopped(broker::Queue&); // Wiring @@ -79,8 +85,9 @@ class BrokerHandler : public broker::Cluster uint32_t nextRoutingId(); Core& core; + boost::intrusive_ptr<QueueHandler> queueHandler; sys::AtomicValue<uint32_t> routingId; }; }} // namespace qpid::cluster -#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/ +#endif /*!QPID_CLUSTER_BROKERCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index e1dba349a1..7bcc068120 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -21,9 +21,11 @@ #include "Core.h" #include "EventHandler.h" -#include "BrokerHandler.h" +#include "BrokerContext.h" #include "WiringHandler.h" #include "MessageHandler.h" +#include "QueueContext.h" +#include "QueueHandler.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SignalHandler.h" #include "qpid/framing/AMQFrame.h" @@ -39,12 +41,17 @@ Core::Core(const Settings& s, broker::Broker& b) : eventHandler(new EventHandler(*this)), multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)) { - eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler))); - eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler))); + boost::intrusive_ptr<QueueHandler> queueHandler( + new QueueHandler(*eventHandler, multicaster)); + eventHandler->add(queueHandler); + eventHandler->add(boost::intrusive_ptr<HandlerBase>( + new WiringHandler(*eventHandler, queueHandler))); + eventHandler->add(boost::intrusive_ptr<HandlerBase>( + new MessageHandler(*eventHandler))); - std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this)); + std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler)); brokerHandler = bh.get(); - // BrokerHandler belongs to Broker + // BrokerContext belongs to Broker broker.setCluster(std::auto_ptr<broker::Cluster>(bh)); eventHandler->start(); eventHandler->getCpg().join(s.name); @@ -62,8 +69,7 @@ void Core::fatal() { void Core::mcast(const framing::AMQBody& body) { QPID_LOG(trace, "cluster multicast: " << body); - framing::AMQFrame f(body); - multicaster.mcast(f); + multicaster.mcast(body); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h index 8b83a0004d..d0dc8e57a8 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.h +++ b/qpid/cpp/src/qpid/cluster/exp/Core.h @@ -44,7 +44,7 @@ class Broker; namespace cluster { class EventHandler; -class BrokerHandler; +class BrokerContext; /** * Cluster core state machine. @@ -77,16 +77,17 @@ class Core broker::Broker& getBroker() { return broker; } EventHandler& getEventHandler() { return *eventHandler; } - BrokerHandler& getBrokerHandler() { return *brokerHandler; } + BrokerContext& getBrokerContext() { return *brokerHandler; } + Multicaster& getMulticaster() { return multicaster; } /** Map of messages that are currently being routed. - * Used to pass messages being routed from BrokerHandler to MessageHandler + * Used to pass messages being routed from BrokerContext to MessageHandler */ RoutingMap& getRoutingMap() { return routingMap; } private: broker::Broker& broker; std::auto_ptr<EventHandler> eventHandler; // Handles CPG events. - BrokerHandler* brokerHandler; // Handles broker events. + BrokerContext* brokerHandler; // Handles broker events. RoutingMap routingMap; Multicaster multicaster; }; diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp index 2138004380..beebe9fc16 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp @@ -41,7 +41,7 @@ EventHandler::EventHandler(Core& c) : EventHandler::~EventHandler() {} -void EventHandler::add(const boost::shared_ptr<HandlerBase>& handler) { +void EventHandler::add(const boost::intrusive_ptr<HandlerBase>& handler) { handlers.push_back(handler); } diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h index b946c27084..93423778f1 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h @@ -27,7 +27,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/PollerDispatch.h" #include "qpid/cluster/types.h" -#include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> #include <vector> namespace qpid { @@ -52,7 +52,7 @@ class EventHandler : public Cpg::Handler ~EventHandler(); /** Add a handler */ - void add(const boost::shared_ptr<HandlerBase>&); + void add(const boost::intrusive_ptr<HandlerBase>&); /** Start polling */ void start(); @@ -87,7 +87,7 @@ class EventHandler : public Cpg::Handler MemberId sender; // sender of current event. MemberId self; - typedef std::vector<boost::shared_ptr<HandlerBase> > Handlers; + typedef std::vector<boost::intrusive_ptr<HandlerBase> > Handlers; Handlers handlers; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h index b153f56a01..f0c6650994 100644 --- a/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h +++ b/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "qpid/RefCounted.h" #include "qpid/cluster/types.h" namespace qpid { @@ -35,7 +36,7 @@ class EventHandler; /** * Base class for handlers of events, children of the EventHandler. */ -class HandlerBase +class HandlerBase : public RefCounted { public: HandlerBase(EventHandler&); diff --git a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h index 0736e7ac35..c0afe740f8 100644 --- a/qpid/cpp/src/qpid/cluster/exp/LockedMap.h +++ b/qpid/cpp/src/qpid/cluster/exp/LockedMap.h @@ -54,7 +54,7 @@ class LockedMap */ bool add(const Key& key, const Value& value) { sys::RWlock::ScopedWlock w(lock); - return map.insert(key, value).second; + return map.insert(std::make_pair(key, value)).second; } /** Erase the value associated with key if any. Return true if a value was erased. */ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index d4095e5bc1..86894b9dd9 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -21,7 +21,7 @@ #include "Core.h" #include "MessageHandler.h" -#include "BrokerHandler.h" +#include "BrokerContext.h" #include "EventHandler.h" #include "qpid/broker/Message.h" #include "qpid/broker/Broker.h" @@ -73,7 +73,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) { msg = memberMap[sender()].routingMap[routingId]; if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q << " failed: unknown message")); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; queue->deliver(msg); } @@ -84,22 +84,40 @@ void MessageHandler::routed(RoutingId routingId) { memberMap[sender()].routingMap.erase(routingId); } -void MessageHandler::dequeue(const std::string& q, uint32_t position) { +void MessageHandler::acquire(const std::string& q, uint32_t position) { + // Note acquires from other members. My own acquires were exeuted in + // the connection thread + if (sender() != self()) { + // FIXME aconway 2010-10-28: need to store acquired messages on QueueContext + // by broker for possible re-queuing if a broker leaves. + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); + QueuedMessage qm; + BrokerContext::ScopedSuppressReplication ssr; + bool ok = queue->acquireMessageAt(position, qm); + (void)ok; // Avoid unused variable warnings. + assert(ok); + assert(qm.position.getValue() == position); + assert(qm.payload); + } +} + +void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) { if (sender() == self()) { // FIXME aconway 2010-10-28: we should complete the ack that initiated - // the dequeue at this point, see BrokerHandler::dequeue + // the dequeue at this point, see BrokerContext::dequeue return; } boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed"); - BrokerHandler::ScopedSuppressReplication ssr; - QueuedMessage qm; - // FIXME aconway 2010-10-28: when we replicate acquires, the acquired - // messages will be stored by MessageHandler::acquire. - if (queue->acquireMessageAt(position, qm)) { - assert(qm.position.getValue() == position); - assert(qm.payload); - queue->dequeue(0, qm); - } + BrokerContext::ScopedSuppressReplication ssr; + // FIXME aconway 2011-05-12: Remove the acquired message from QueueContext. + // Do we need to call this? Review with gsim. + // QueuedMessage qm; + // Get qm from QueueContext? + // queue->dequeue(0, qm); +} + +void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) { + // FIXME aconway 2011-05-24: } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h index f87f22a1ec..0a010a8ecf 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h @@ -39,7 +39,10 @@ class Queue; namespace cluster { class EventHandler; -class BrokerHandler; +class BrokerContext; + +// FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for queue. +// Make this consistent. /** * Handler for message disposition events. @@ -55,7 +58,10 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler void routing(uint32_t routingId, const std::string& message); void enqueue(uint32_t routingId, const std::string& queue); void routed(uint32_t routingId); + void acquire(const std::string& queue, uint32_t position); void dequeue(const std::string& queue, uint32_t position); + void release(const std::string& queue, uint32_t position); + private: struct Member { typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp new file mode 100644 index 0000000000..6c97c906e8 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueContext.h" +#include "Multicaster.h" +#include "qpid/framing/ClusterQueueResubscribeBody.h" +#include "qpid/framing/ClusterQueueUnsubscribeBody.h" +#include "qpid/broker/Queue.h" +#include "qpid/log/Statement.h" + + +namespace qpid { +namespace cluster { + +QueueContext::QueueContext(broker::Queue& q, Multicaster& m) + : owner(NOT_OWNER), count(0), queue(q), mcast(m) +{ + QPID_LOG(debug, "Assign cluster context to queue " << q.getName()); + q.stop(); // Initially stopped. Must all before setClusterContext + q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); + +} + +// Called by QueueReplica in deliver thread. +void QueueContext::sharedOwner(size_t limit) { + QPID_LOG(critical, "FIXME QueueContext::sharedOwner " << queue.getName() << queue.getClusterContext().get()); + sys::Mutex::ScopedLock l(lock); + count = limit; + if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex? + owner = SHARED_OWNER; +} + +// Called by QueueReplica in deliver thread. +void QueueContext::soleOwner() { + QPID_LOG(critical, "FIXME QueueContext::soleOwner " << queue.getName() << queue.getClusterContext().get()); + sys::Mutex::ScopedLock l(lock); + count = 0; + if (owner == NOT_OWNER) queue.start(); // FIXME aconway 2011-06-09: ok inside mutex? + owner = SOLE_OWNER; +} + +// Called by BrokerContext in connection thread(s) on acquiring a message +void QueueContext::acquire() { + bool stop = false; + { + sys::Mutex::ScopedLock l(lock); + assert(owner != NOT_OWNER); // Can't acquire on a queue we don't own. + QPID_LOG(critical, "FIXME QueueContext::acquire " << queue.getName() + << " owner=" << owner << " count=" << count); + if (owner == SHARED_OWNER) { + // Note count could be 0 if there are concurrent calls to acquire. + if (count && --count == 0) { + stop = true; + } + } + } + // FIXME aconway 2011-06-28: could have multiple stop() threads... + if (stop) queue.stop(); +} + +// Callback set up by queue.stop() +void QueueContext::stopped() { + sys::Mutex::ScopedLock l(lock); + if (owner == NOT_OWNER) { + mcast.mcast(framing::ClusterQueueUnsubscribeBody( + framing::ProtocolVersion(), queue.getName())); + } else { + owner = NOT_OWNER; + mcast.mcast(framing::ClusterQueueResubscribeBody( + framing::ProtocolVersion(), queue.getName())); + } +} + +void QueueContext::unsubscribed() { + QPID_LOG(critical, "FIXME QueueContext unsubscribed, stopping " << queue.getName()); + queue.stop(); + sys::Mutex::ScopedLock l(lock); + owner = NOT_OWNER; +} + +boost::intrusive_ptr<QueueContext> QueueContext::get(broker::Queue& q) { + return boost::intrusive_ptr<QueueContext>( + static_cast<QueueContext*>(q.getClusterContext().get())); +} + + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h new file mode 100644 index 0000000000..5bafb5eb0f --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -0,0 +1,93 @@ +#ifndef QPID_CLUSTER_EXP_QUEUESTATE_H +#define QPID_CLUSTER_EXP_QUEUESTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <qpid/RefCounted.h> +#include <qpid/sys/Mutex.h> +#include <boost/intrusive_ptr.hpp> + + +// FIXME aconway 2011-06-08: refactor broker::Cluster to put queue ups on +// class broker::Cluster::Queue. This becomes the cluster context. + +namespace qpid { +namespace broker { +class Queue; +} +namespace cluster { + +class Multicaster; + + /** + * Queue state that is not replicated to the cluster. + * Manages the local queue start/stop status + * + * Thread safe: Called by connection and dispatch threads. + */ +class QueueContext : public RefCounted { + // FIXME aconway 2011-06-07: consistent use of shared vs. intrusive ptr? + public: + QueueContext(broker::Queue& q, Multicaster& m); + + /** Sharing ownership of queue, can acquire up to limit before releasing. + * Called in deliver thread. + */ + void sharedOwner(size_t limit); + + /** Sole owner of queue, no limits to acquiring */ + void soleOwner(); + + /** + * Count an acquired message against the limit. + * Called from connection threads while consuming messages + */ + void acquire(); + + /** Called if the queue becomes empty, from connection thread. */ + void empty(); + + /** Called when queue is stopped, connection or deliver thread. */ + void stopped(); + + /** Called when the last subscription to a queue is cancelled */ + void unsubscribed(); + + /** Get the context for a broker queue. */ + static boost::intrusive_ptr<QueueContext> get(broker::Queue&); + + private: + void release(); + + sys::Mutex lock; + enum { NOT_OWNER, SOLE_OWNER, SHARED_OWNER } owner; + size_t count; // Count of dequeues remaining, 0 means no limit. + broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? + Multicaster& mcast; + + // FIXME aconway 2011-06-28: need to store acquired messages for possible re-queueing. +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_QUEUESTATE_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp new file mode 100644 index 0000000000..7d56025fb8 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueHandler.h" +#include "EventHandler.h" +#include "QueueReplica.h" +#include "QueueContext.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up? +QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m) + : HandlerBase(eh), multicaster(m) {} + +bool QueueHandler::invoke(const framing::AMQBody& body) { + return framing::invoke(*this, body).wasHandled(); +} + +void QueueHandler::subscribe(const std::string& queue) { + find(queue)->subscribe(sender()); +} +void QueueHandler::unsubscribe(const std::string& queue) { + find(queue)->unsubscribe(sender()); +} +void QueueHandler::resubscribe(const std::string& queue) { + find(queue)->resubscribe(sender()); +} + +void QueueHandler::left(const MemberId& member) { + // Unsubscribe for members that leave. + // FIXME aconway 2011-06-28: also need to re-queue acquired messages. + for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i) + i->second->unsubscribe(member); +} + +// FIXME aconway 2011-06-08: do we need to hold on to the shared pointer for lifecycle? +void QueueHandler::add(boost::shared_ptr<broker::Queue> q) { + // FIXME aconway 2011-06-08: move create operation from Wiring to Queue handler. + // FIXME aconway 2011-05-10: assert not already in map. + + // Local queues already have a context, remote queues need one. + if (!QueueContext::get(*q)) + new QueueContext(*q, multicaster); // Context attaches itself to the Queue + queues[q->getName()] = boost::intrusive_ptr<QueueReplica>( + new QueueReplica(q, self())); +} + +boost::intrusive_ptr<QueueReplica> QueueHandler::find(const std::string& queue) { + QueueMap::iterator i = queues.find(queue); + if (i == queues.end()) + throw Exception(QPID_MSG("Unknown queue " << queue << " in cluster queue handler")); + return i->second; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h new file mode 100644 index 0000000000..6494efb1b3 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h @@ -0,0 +1,82 @@ +#ifndef QPID_CLUSTER_QUEUEHANDLER_H +#define QPID_CLUSTER_QUEUEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "HandlerBase.h" +#include "LockedMap.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include "boost/shared_ptr.hpp" +#include "boost/intrusive_ptr.hpp" +#include <map> + +namespace qpid { + +namespace broker { +class Queue; +class QueuedMessage; +} + +namespace cluster { + +class EventHandler; +class QueueReplica; +class Multicaster; + +/** + * Handler for queue subscription events. + * + * THREAD UNSAFE: only accessed in cluster deliver thread, on delivery + * of queue controls and also from WiringHandler on delivery of queue + * create. + */ +class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, + public HandlerBase +{ + public: + QueueHandler(EventHandler&, Multicaster&); + + bool invoke(const framing::AMQBody& body); + + // Events + void subscribe(const std::string& queue); + void unsubscribe(const std::string& queue); + void resubscribe(const std::string& queue); + void left(const MemberId&); + + void add(boost::shared_ptr<broker::Queue>); + + // NB: These functions ar called in connection threads, not deliver threads. + void acquired(const broker::QueuedMessage& qm); + void empty(const broker::Queue& q); + + private: + typedef std::map<std::string, boost::intrusive_ptr<QueueReplica> > QueueMap; + + boost::intrusive_ptr<QueueReplica> find(const std::string& queue); + + QueueMap queues; + Multicaster& multicaster; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_QUEUEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp new file mode 100644 index 0000000000..551477a920 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueueReplica.h" +#include "QueueContext.h" +#include "qpid/broker/Queue.h" +#include "qpid/log/Statement.h" +#include <algorithm> + +namespace qpid { +namespace cluster { + +QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q, + const MemberId& self_) + : queue(q), self(self_), context(QueueContext::get(*q)) +{ + // q is initially stopped. +} + +struct PrintSubscribers { + const QueueReplica::MemberQueue& mq; + PrintSubscribers(const QueueReplica::MemberQueue& m) : mq(m) {} +}; + +std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) { + copy(ps.mq.begin(), ps.mq.end(), std::ostream_iterator<MemberId>(o, " ")); + return o; +} + +std::ostream& operator<<(std::ostream& o, QueueReplica::State s) { + static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" }; + return o << tags[s]; +} + +std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) { + o << qr.queue->getName() << "(" << qr.getState() << "): " + << PrintSubscribers(qr.subscribers); + return o; +} + +// FIXME aconway 2011-05-17: error handling for asserts. + +void QueueReplica::subscribe(const MemberId& member) { + State before = getState(); + subscribers.push_back(member); + update(before); +} + +void QueueReplica::unsubscribe(const MemberId& member) { + State before = getState(); + MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member); + if (i != subscribers.end()) { + subscribers.erase(i, subscribers.end()); + update(before); + } +} + +void QueueReplica::resubscribe(const MemberId& member) { + assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling + State before = getState(); + subscribers.pop_front(); + subscribers.push_back(member); + update(before); +} + +void QueueReplica::update(State before) { + const int acquireLimit = 10; // FIXME aconway 2011-06-23: configurable + State after = getState(); + if (before == after) return; + QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")"); + switch (after) { + case UNSUBSCRIBED: break; + case SUBSCRIBED: break; + case SOLE_OWNER: + context->soleOwner(); + break; + case SHARED_OWNER: + context->sharedOwner(acquireLimit); + break; + } +} + +QueueReplica::State QueueReplica::getState() const { + if (isOwner()) + return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER; + return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED; +} + +bool QueueReplica::isOwner() const { + return !subscribers.empty() && subscribers.front() == self; +} + +bool QueueReplica::isSubscriber(const MemberId& member) const { + // FIXME aconway 2011-06-27: linear search here, is it a performance issue? + return std::find(subscribers.begin(), subscribers.end(), member) != subscribers.end(); +} + +}} // namespace qpid::cluster::exp diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h new file mode 100644 index 0000000000..a322a8b9c0 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h @@ -0,0 +1,85 @@ +#ifndef QPID_CLUSTER_QUEUEMODEL_H +#define QPID_CLUSTER_QUEUEMODEL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qpid/cluster/types.h" +#include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> +#include <deque> + +namespace qpid { + +namespace broker { +class Queue; +} + +namespace cluster { +class QueueHandler; +class QueueContext; + +/** + * Queue state that is replicated among all cluster members. + * + * Handles queue subscription controls by starting/stopping the queue. + * + * THREAD UNSAFE: only used in cluster deliver thread, on delivery + * of queue controls and also from WiringHandler on delivery of queue + * create. + */ +class QueueReplica : public RefCounted +{ + public: + QueueReplica(boost::shared_ptr<broker::Queue> , const MemberId& ); + void subscribe(const MemberId&); + void unsubscribe(const MemberId&); + void resubscribe(const MemberId&); + + private: + enum State { + UNSUBSCRIBED, + SUBSCRIBED, + SOLE_OWNER, + SHARED_OWNER + }; + + friend class PrintSubscribers; + friend std::ostream& operator<<(std::ostream&, State); + friend std::ostream& operator<<(std::ostream&, const QueueReplica&); + + typedef std::deque<MemberId> MemberQueue; + + boost::shared_ptr<broker::Queue> queue; + MemberQueue subscribers; + MemberId self; + boost::intrusive_ptr<QueueContext> context; + + State getState() const; + bool isOwner() const; + bool isSubscriber(const MemberId&) const; + void update(State before); +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_QUEUEMODEL_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp index 04a76b9758..1b3286792f 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -22,7 +22,8 @@ #include "Core.h" #include "WiringHandler.h" #include "EventHandler.h" -#include "BrokerHandler.h" +#include "QueueHandler.h" +#include "BrokerContext.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Queue.h" @@ -32,18 +33,20 @@ #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" -#include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace cluster { using namespace broker; using framing::FieldTable; -WiringHandler::WiringHandler(EventHandler& e) : +WiringHandler::WiringHandler(EventHandler& e, + const boost::intrusive_ptr<QueueHandler>& qh) : HandlerBase(e), broker(e.getCore().getBroker()), recovery(broker.getQueues(), broker.getExchanges(), - broker.getLinks(), broker.getDtxManager()) + broker.getLinks(), broker.getDtxManager()), + queueHandler(qh) {} bool WiringHandler::invoke(const framing::AMQBody& body) { @@ -51,24 +54,39 @@ bool WiringHandler::invoke(const framing::AMQBody& body) { } void WiringHandler::createQueue(const std::string& data) { - if (sender() == self()) return; - BrokerHandler::ScopedSuppressReplication ssr; - framing::Buffer buf(const_cast<char*>(&data[0]), data.size()); - // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() - RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf); - QPID_LOG(debug, "cluster: create queue " << queue->getName()); + // FIXME aconway 2011-05-25: Needs async completion. + std::string name; + if (sender() != self()) { // Created by another member, need to create locally. + BrokerContext::ScopedSuppressReplication ssr; + framing::Buffer buf(const_cast<char*>(&data[0]), data.size()); + // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() + RecoverableQueue::shared_ptr rq = recovery.recoverQueue(buf); + name = rq->getName(); + } + else { // Created locally, Queue and QueueContext already exist. + framing::Buffer buffer(const_cast<char*>(&data[0]), data.size()); + // FIXME aconway 2011-05-10: implicit knowledge of queue encoding. + buffer.getShortString(name); + } + boost::shared_ptr<broker::Queue> q = broker.getQueues().find(name); + assert(q); // FIXME aconway 2011-05-10: error handling. + // TODO aconway 2011-05-10: if we implement multi-group for queues then + // this call is a problem: comes from wiring delivery thread, not queues. + // FIXME aconway 2011-06-08: move wiring ops to Queue and Exchange handlers.. + queueHandler->add(q); + QPID_LOG(debug, "cluster: create queue " << q->getName()); } void WiringHandler::destroyQueue(const std::string& name) { if (sender() == self()) return; QPID_LOG(debug, "cluster: destroy queue " << name); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; broker.deleteQueue(name, std::string(), std::string()); } void WiringHandler::createExchange(const std::string& data) { if (sender() == self()) return; - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; framing::Buffer buf(const_cast<char*>(&data[0]), data.size()); // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*() RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf); @@ -78,7 +96,7 @@ void WiringHandler::createExchange(const std::string& data) { void WiringHandler::destroyExchange(const std::string& name) { if (sender() == self()) return; QPID_LOG(debug, "cluster: destroy exchange " << name); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; broker.getExchanges().destroy(name); } @@ -91,7 +109,7 @@ void WiringHandler::bind( << " exchange=" << exchangeName << " key=" << routingKey << " arguments=" << arguments); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string()); } @@ -104,7 +122,7 @@ void WiringHandler::unbind( << " exchange=" << exchangeName << " key=" << routingKey << " arguments=" << arguments); - BrokerHandler::ScopedSuppressReplication ssr; + BrokerContext::ScopedSuppressReplication ssr; broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string()); } diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h index e375cf6a95..71aa6e52e9 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h @@ -42,7 +42,7 @@ class Broker; namespace cluster { class EventHandler; - +class QueueHandler; /** * Handler for wiring disposition events. @@ -51,7 +51,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, public HandlerBase { public: - WiringHandler(EventHandler&); + WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh); bool invoke(const framing::AMQBody& body); @@ -66,8 +66,10 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, private: + broker::Broker& broker; broker::RecoveryManagerImpl recovery; + boost::intrusive_ptr<QueueHandler> queueHandler; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/overview.h b/qpid/cpp/src/qpid/cluster/exp/overview.h new file mode 100644 index 0000000000..3a0189d750 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/overview.h @@ -0,0 +1,13 @@ +// This file is documentation in doxygen format. +/** + +<h1>New cluster implementation overview</h> + +There are 3 areas indicated by a suffix on class names: + +- Replica: State that is replicated to the entire cluster. Only called by Handlers in the deliver thread. +- Context: State that is private to this member. Called by both Replia and broker objects in deliver and connection threads. +- Handler: Dispatch CPG messages by calling Replica objects in the deliver thread. + + +**/ diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h index af21af46ba..6bb02bc6af 100644 --- a/qpid/cpp/src/qpid/sys/Stoppable.h +++ b/qpid/cpp/src/qpid/sys/Stoppable.h @@ -21,17 +21,27 @@ * under the License. * */ + +#include <boost/function.hpp> + namespace qpid { namespace sys { +// FIXME aconway 2011-05-25: needs better name + /** * An activity that may be executed by multiple threads, and can be stopped. - * Stopping prevents new threads from entering and waits till exiting busy threads leave. + * + * Stopping prevents new threads from entering and calls a callback + * when all busy threads leave. */ class Stoppable { public: - Stoppable() : busy(0), stopped(false) {} - ~Stoppable() { stop(); } + /** + *@param stoppedCallback: called when all threads have stopped. + */ + Stoppable(boost::function<void()> stoppedCallback) + : busy(0), stopped(false), notify(stoppedCallback) {} /** Mark the scope of a busy thread like this: * <pre> @@ -52,38 +62,49 @@ class Stoppable { friend class Scope; - /** Mark stopped, wait for all threads to leave their busy scope. */ + /** + * Set state to "stopped", so no new threads can enter. + * Call notify function when all busy threads have left. + */ + // FIXME aconway 2011-06-27: not guaranteed that stopped will be called, + // deadlock? void stop() { sys::Monitor::ScopedLock l(lock); stopped = true; - while (busy > 0) lock.wait(); + check(); } - /** Set the state to started. - *@pre state is stopped and no theads are busy. + /** Set the state to "started", allow threads to enter. */ void start() { sys::Monitor::ScopedLock l(lock); - assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling. stopped = false; } - private: - uint busy; - bool stopped; - sys::Monitor lock; - + // Busy thread enters scope bool enter() { sys::Monitor::ScopedLock l(lock); if (!stopped) ++busy; return !stopped; } + // Busy thread exits scope void exit() { sys::Monitor::ScopedLock l(lock); assert(busy > 0); - if (--busy == 0) lock.notifyAll(); + --busy; + check(); + } + + private: + void check() { + if (stopped && busy == 0 && notify) notify(); } + + uint busy; + bool stopped; + sys::Monitor lock; + boost::function< void() > notify; }; }} // namespace qpid::sys diff --git a/qpid/cpp/src/tests/BrokerClusterCalls.cpp b/qpid/cpp/src/tests/BrokerClusterCalls.cpp index 53d0f2102a..4311cf51cf 100644 --- a/qpid/cpp/src/tests/BrokerClusterCalls.cpp +++ b/qpid/cpp/src/tests/BrokerClusterCalls.cpp @@ -56,13 +56,18 @@ class DummyCluster : public broker::Cluster */ bool isRouting; + // Record a QueuedMessage void recordQm(const string& op, const broker::QueuedMessage& qm) { history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() % qm.position % qm.payload->getFrames().getContent()).str(); } + + // Record a message void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) { history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); } + + // Record a string void recordStr(const string& op, const string& name) { history += (format("%s(%s)") % op % name).str(); } @@ -102,6 +107,11 @@ class DummyCluster : public broker::Cluster history += (format("cancel(%s, %d)") % q.getName() % n).str(); } + // Queues + // FIXME aconway 2011-05-18: update test to exercise empty() + virtual void empty(broker::Queue& q) { recordStr("empty", q.getName()); } + virtual void stopped(broker::Queue& q) { recordStr("stopped", q.getName()); } + // Wiring virtual void create(broker::Queue& q) { recordStr("createq", q.getName()); } @@ -230,7 +240,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { h.clear(); i = 0; m = Message("t"); - m.setTtl(Duration(1)); // Timeout 1ms + m.setTtl(Duration(1)); // Timeout 1ms sender.send(m); usleep(2000); // Sleep 2ms bool received = receiver.fetch(m, Duration::IMMEDIATE); @@ -239,6 +249,10 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); + // Note: empty is called once for each receiver. + BOOST_CHECK_EQUAL(h.at(i++), "empty(q)"); + BOOST_CHECK_EQUAL(h.at(i++), "empty(q)"); + BOOST_CHECK_EQUAL(h.at(i++), "empty(q)"); BOOST_CHECK_EQUAL(h.size(), i); // Message replaced on LVQ diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py index f17dfe2961..1cf749cdb4 100755 --- a/qpid/cpp/src/tests/cluster2_tests.py +++ b/qpid/cpp/src/tests/cluster2_tests.py @@ -33,8 +33,27 @@ log = getLogger("qpid.cluster_tests") class Cluster2Tests(BrokerTest): """Tests for new cluster code.""" - def verify_content(self, content, receiver): - for c in content: self.assertEqual(c, receiver.fetch(1).content) + def queue_exists(self, queue, connection): + s = connection.session() + try: + s.sender(queue) + return True + except qpid.messaging.exceptions.NotFound: + return False + + # FIXME aconway 2011-06-22: needed to compensate for + # async wiring in early cluster2 prototype + def wait_for_queue(self, queue, connections, timeout=10): + deadline = time.time() + timeout + for c in connections: + while not self.queue_exists(queue,c): + if time.time() > timeout: fail("Time out in wait_for_queue(%s))"%queue) + time.sleep(0.01) + + # FIXME aconway 2011-05-17: remove, use assert_browse. + def verify_content(self, expect, receiver): + actual = [receiver.fetch(1).content for x in expect] + self.assertEqual(expect, actual) self.assertRaises(Empty, receiver.fetch, 0) def test_message_enqueue(self): @@ -74,12 +93,15 @@ class Cluster2Tests(BrokerTest): s0 = sn0.sender("q;{create:always,delete:always}") r0 = sn0.receiver("q") sn1 = cluster[1].connect().session() - r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring. + r1 = sn1.receiver("q;{create:always}") content = ["a","b","c"] for m in content: s0.send(Message(m)) - # Verify enqueued on cluster[1] + # Verify enqueued on members 0 and 1 + # FIXME aconway 2011-05-13: + self.verify_content(content, sn0.receiver("q;{mode:browse}")) self.verify_content(content, sn1.receiver("q;{mode:browse}")) + # Dequeue on cluster[0] self.assertEqual(r0.fetch(1).content, "a") sn0.acknowledge(sync=True) @@ -114,3 +136,40 @@ class Cluster2Tests(BrokerTest): self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex") # FIXME aconway 2010-10-29: test unbind, may need to use old API. + + def test_dequeue_mutex(self): + """Ensure that one and only one consumer receives each dequeued message.""" + class Receiver(Thread): + def __init__(self, session): + self.session = session + self.receiver = session.receiver("q") + self.messages = [] + Thread.__init__(self) + + def run(self): + try: + while True: + self.messages.append(self.receiver.fetch(1)) + self.session.acknowledge() + except Empty: pass + + cluster = self.cluster(3, cluster2=True, args=["-t"]) # FIXME aconway 2011-05-13: -t + connections = [ b.connect() for b in cluster] + sessions = [ c.session() for c in connections ] + sender = sessions[0].sender("q;{create:always}") + self.wait_for_queue("q", connections) + + receivers = [ Receiver(s) for s in sessions ] + for r in receivers: r.start() + + n = 0 + t = time.time() + 1 # Send for 1 second. + while time.time() < t: + sender.send(str(n)) + n += 1 + for r in receivers: r.join(); + print "FIXME", [len(r.messages) for r in receivers] # FIXME aconway 2011-05-17: + for r in receivers: assert len(r.messages) # At least one message to each + messages = [int(m.content) for r in receivers for m in r.messages ] + messages.sort() + self.assertEqual(range(n), messages) diff --git a/qpid/cpp/src/tests/qpid-test-cluster b/qpid/cpp/src/tests/qpid-test-cluster index 9887406ef9..7522a7fdfd 100755 --- a/qpid/cpp/src/tests/qpid-test-cluster +++ b/qpid/cpp/src/tests/qpid-test-cluster @@ -28,7 +28,7 @@ Options: Default is $DEFAULT_ENV. -c CONFIG Use CONFIG as qpidd config file. Copies CONFIG to each host. Default is $DEFAULT_CONF - -d Delete data-dir and log file before starting broker. + -d Delete data-dir and log file before starting broker. " exit 1 } @@ -82,6 +82,7 @@ do_start() { } do_stop() { + for h in $HOSTS; do ssh $h "$SOURCE_ENV qpidd -q --no-module-dir --no-data-dir $QPIDD_ARGS" done diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index e0cd647894..aac764ee62 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -8,9 +8,9 @@ - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at -- +- - http://www.apache.org/licenses/LICENSE-2.0 -- +- - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -81,7 +81,7 @@ <control name="message-expired" code="0x12"> <field name="id" type="uint64"/> </control> - + <domain name="error-type" type="uint8" label="Types of error"> <enum> <choice name="none" value="0"/> @@ -89,7 +89,7 @@ <choice name="connection" value="2"/> </enum> </domain> - + <!-- Check for error consistency across the cluster --> <control name="error-check" code="0x14"> <field name="type" type="error-type"/> @@ -149,7 +149,7 @@ <!-- Abort a connection that is sending invalid data. --> <control name="abort" code="0x4"/> - + <!-- Update controls. Sent to a new broker in joining mode. A connection is updated as followed: - send the shadow's management ID in shadow-perpare on the update connection @@ -192,9 +192,9 @@ <field name="enqueued" type="bit"/> <field name="credit" type="uint32"/> </control> - + <!-- Tx transaction state. --> - <control name="tx-start" code="0x12"/> + <control name="tx-start" code="0x12"/> <control name="tx-accept" code="0x13"> <field name="commands" type="sequence-set"/> </control> <control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/> </control> <control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/> </control> @@ -204,7 +204,7 @@ </control> <control name="tx-end" code="0x17"/> <control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control> - + <!-- Consumers in the connection's output task --> <control name="output-task" code="0x19"> <field name="channel" type="uint16"/> @@ -294,6 +294,7 @@ <field name="message" type="str32"/> </control> + <!-- FIXME aconway 2011-04-27: reference queues by index, not name --> <control name="enqueue" code="0x2"> <field name="routing-id" type="uint32"/> <field name="queue" type="queue.name"/> @@ -303,10 +304,22 @@ <field name="routing-id" type="uint32"/> </control> - <control name="dequeue" code="0x4"> + <!-- FIXME aconway 2011-04-27: review queue positions vs. global message IDs --> + <control name="acquire" code="0x4"> <field name="queue" type="queue.name"/> <field name="position" type="uint32"/> </control> + + <control name="dequeue" code="0x5"> + <field name="queue" type="queue.name"/> + <field name="position" type="uint32"/> + </control> + + <control name="release" code="0x6"> + <field name="queue" type="queue.name"/> + <field name="position" type="uint32"/> + </control> + </class> <class name="cluster-wiring" code="0x83"> @@ -341,4 +354,26 @@ </control> </class> + + <!-- Manage subscriptions to a queue. + + Each queue has a "subscriber queue" of members waiting take + messages from the queue. The member at the front of the queue + is the only one allowed to take messages. --> + + <class name="cluster-queue" code="0x84"> + <!-- Join at the back of the subscriber queue --> + <control name="subscribe" code="0x1"> + <field name="queue" type="queue.name"/> + </control> + <!-- Leave the subscriber queue --> + <control name="unsubscribe" code="0x2"> + <field name="queue" type="queue.name"/> + </control> + <!-- Move the member at the front to the back. --> + <control name="resubscribe" code="0x3"> + <field name="queue" type="queue.name"/> + </control> + </class> + </amqp> |