From adcaecbdb26674008dab4df11b15db5032115ce1 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 31 Oct 2011 12:00:15 +0000 Subject: QPID-2920: Updated cluster/exp/overview.h & other comments. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1195424 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/cluster/exp/Core.h | 2 +- qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 4 +- qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 4 +- qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 4 +- qpid/cpp/src/qpid/cluster/exp/QueueHandler.h | 2 +- qpid/cpp/src/qpid/cluster/exp/overview.h | 65 +++++++++++++++++++++--- 6 files changed, 66 insertions(+), 15 deletions(-) diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h index d23ed2e4e8..c630b4b3f5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.h +++ b/qpid/cpp/src/qpid/cluster/exp/Core.h @@ -55,7 +55,7 @@ class BrokerContext; * Holds together the various objects that implement cluster behavior, * and holds state that is shared by multiple components. * - * Thread safe: called from broker connection threads and CPG dispatch threads. + * Thread safe: called from broker broker threads and CPG dispatch threads. */ class Core { diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index bef8cb74ed..21129b0fae 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -100,7 +100,7 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) { QPID_LOG(trace, "cluster: message " << q << "[" << position << "] acquired by " << PrettyId(sender(), self())); // Note acquires from other members. My own acquires were executed in - // the connection thread + // the broker thread if (sender() != self()) { boost::shared_ptr queue = findQueue(q, "cluster: acquire"); QueuedMessage qm; @@ -126,7 +126,7 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) { // complete the ack that initiated the dequeue at this point, see // BrokerContext::dequeue - // My own dequeues were processed in the connection thread before multicasting. + // My own dequeues were processed in the broker thread before multicasting. if (sender() != self()) { boost::shared_ptr queue = findQueue(q, "cluster: dequeue"); QueuedMessage qm = QueueContext::get(*queue)->dequeue(position); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index b8bdb2aa07..56800e6b95 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -80,7 +80,7 @@ void QueueContext::replicaState( // FIXME aconway 2011-07-27: Dont spin the token on an empty queue. -// Called in connection threads when a consumer is added +// Called in broker threads when a consumer is added void QueueContext::consume(size_t n) { sys::Mutex::ScopedLock l(lock); consumers = n; @@ -88,7 +88,7 @@ void QueueContext::consume(size_t n) { framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName())); } -// Called in connection threads when a consumer is cancelled +// Called in broker threads when a consumer is cancelled void QueueContext::cancel(size_t n) { sys::Mutex::ScopedLock l(lock); consumers = n; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index a37225e64e..5f2adeae74 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -63,12 +63,12 @@ class QueueContext : public RefCounted { */ void stopped(); - /** Called in connection thread when a consumer is added. + /** Called in broker thread when a consumer is added. *@param n: nubmer of consumers after new one is added. */ void consume(size_t n); - /** Called in connection thread when a consumer is cancelled on the queue. + /** Called in broker thread when a consumer is cancelled on the queue. *@param n: nubmer of consumers after the cancel. */ void cancel(size_t n); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h index ecb9ba7821..053127e428 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h @@ -66,7 +66,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, void add(boost::shared_ptr); - // NB: These functions ar called in connection threads, not deliver threads. + // NB: These functions ar called in broker threads, not deliver threads. void acquired(const broker::QueuedMessage& qm); void empty(const broker::Queue& q); diff --git a/qpid/cpp/src/qpid/cluster/exp/overview.h b/qpid/cpp/src/qpid/cluster/exp/overview.h index 5db2a35184..7172d602fd 100644 --- a/qpid/cpp/src/qpid/cluster/exp/overview.h +++ b/qpid/cpp/src/qpid/cluster/exp/overview.h @@ -1,15 +1,66 @@ // This file is documentation in doxygen format. /** -

New cluster implementation overview +

New Cluster Implementation Overview -Naming conventions: There are 3 types of classes indicated by a suffix on class names: +

Terms and Naming Conventions

-- *Handler: Dispatch CPG messages by calling Replica objects in the deliver thread. +Deliver thread: Thread that is dispatching delivered CPG events. -- *Replica: State that is replicated to the entire cluster. - Only called by Handlers in the deliver thread. May call on Contexts. +Broker thread: Thread that is serving a Broker connection. + +Local/remote: For a given action the "local" broker is the one +directly connected to the client in question. The others are "remote". + +Contended: a queue is contended if it has subscribes on two different +brokers. It is uncontended if all subscribers are on the same broker. + +There are 3 types of classes indicated by a suffix on class names: + +- *Handler: Implements a class from cluster.xml. Called only in the + CPG deliver thread to dispatch an XML controls. Calls on *Replica + or *Context objects. + +- *Replica: Holds state that is replicated to the entire cluster. + Called only *Handler in the CPG deliver thread. May call on *Context + objects. + +- *Context: Holds state private to this member and associated with a + local object such as the Broker or a Queue. Can be called in CPG + deliver and broker threads. + +f

Overview of Message Lifecycle

+ +BrokerContext implements the broker::Cluster interface, it is the +point of contact with the Broker. When a message is delivered locally +the broker calls BrokerContext::enqueue. This multicasts the message +and delays delivery on the local broker by returning false. + +On self-delivery the local broker does the enqueue. This is +synchronized with delivery on the other brokers so that all message +have the same sequence numbers on their queues. We use queue+sequence +no. to identify messages in the cluster. + +QueueReplica and QueueContext track the subscriptions for a queue. An +uncontended queue can serve local consumers using the standard broker +scheme. A queue can be "stopped" meaning no consumers can consume from +it, or "active" meaning consumers can consume as on a stand-alone +broker. + +An uncontended queue is active. A contended queue is active on only +one broker which is said to hold the "lock" for that queue. It is +stopped on all others. The lock is passed on a timeout. + +A broker with an active queue multicasts acquire and dequeue events +for that queue so other brokers can stay in sync. In the case of a +contended queue the broker sends acquire/dequeue events for its +activity before passing the lock. + +We use multipe CPG groups to make better use of CPUs. Events about a +given queue always uses the same CPG group, different queues may use +different groups. There is a fixed set of groups, queue names are +hashed to pick the gruop for each queue. + +// FIXME aconway 2011-10-31: other schemes than hashing? -- *Context: State private to this member and associated with a local entity - such as the Broker or a Queue. Called in deliver and connection threads. **/ -- cgit v1.2.1