summaryrefslogtreecommitdiff
path: root/qpid/cpp/design_docs/new-cluster-plan.txt
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/design_docs/new-cluster-plan.txt')
-rw-r--r--qpid/cpp/design_docs/new-cluster-plan.txt477
1 files changed, 477 insertions, 0 deletions
diff --git a/qpid/cpp/design_docs/new-cluster-plan.txt b/qpid/cpp/design_docs/new-cluster-plan.txt
new file mode 100644
index 0000000000..781876e55a
--- /dev/null
+++ b/qpid/cpp/design_docs/new-cluster-plan.txt
@@ -0,0 +1,477 @@
+-*-org-*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+Notes on new cluster implementation. See also: new-cluster-design.txt
+
+* Implementation plan.
+
+Co-existence with old cluster code and tests:
+- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
+- Double up tests with old version/new version as the new code develops.
+
+Minimal POC for message delivery & perf test.
+- no wiring replication, no updates, no failover, no persistence, no async completion.
+- just implement publish and acquire/dequeue locking protocol.
+- optimize the special case where all consumers are on the same node.
+- measure performance: compare active-passive and active-active modes of use.
+
+Full implementation of transient cluster
+- Update (based on existing update), async completion etc.
+- Passing all existing transient cluster tests.
+
+Persistent cluster
+- Make sure async completion works correctly.
+- InitialStatus protoocl etc. to support persistent start-up (existing code)
+- cluster restart from store: stores not identical. Load one, update the rest.
+ - assign cluster ID's to messages recovered from store, don't replicate.
+
+Improved update protocol
+- per-queue, less stalling, bounded catch-up.
+
+* Task list
+
+** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
+
+NOTE: as implementation questions arise, take the easiest option and make
+a note for later optimization/improvement.
+
+*** Tests
+- python test: 4 senders, numbered messages, 4 receivers, verify message set.
+- acquire then release messages: verify can be dequeued on any member
+- acquire then kill broker: verify can be dequeued other members.
+- acquire then reject: verify goes on alt-exchange once only.
+
+*** DONE broker::Cluster interface and call points.
+
+Initial interface commited.
+
+*** Main classes
+
+BrokerHandler:
+- implements broker::Cluster intercept points.
+- sends mcast events to inform cluster of local actions.
+- thread safe, called in connection threads.
+
+LocalMessageMap:
+- Holds local messages while they are being enqueued.
+- thread safe: called by both BrokerHandler and MessageHandler
+
+MessageHandler:
+- handles delivered mcast messages related to messages.
+- initiates local actions in response to mcast events.
+- thread unsafe, only called in deliver thread.
+- maintains view of cluster state regarding messages.
+
+QueueOwnerHandler:
+- handles delivered mcast messages related to queue consumer ownership.
+- thread safe, called in deliver, connection and timer threads.
+- maintains view of cluster state regarding queue ownership.
+
+cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
+- thread safe: manage state used by both MessageHandler and BrokerHandler
+
+The following code sketch illustrates only the "happy path" error handling
+is omitted.
+
+*** BrokerHandler
+Types:
+- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; }
+- struct
+
+NOTE:
+- Messages on queues are identified by a queue name + a position.
+- Messages being routed are identified by a sequence number.
+
+Members:
+- thread_local bool noReplicate // suppress replication.
+- thread_local bool isRouting // suppress operations while routing
+- Message localMessage[SequenceNumber] // local messages being routed.
+- thread_local SequenceNumber routingSequence
+
+NOTE: localMessage is also modified by MessageHandler.
+
+broker::Cluster intercept functions:
+
+routing(msg)
+ if noReplicate: return
+ # Supress everything except enqueues while we are routing.
+ # We don't want to replicate acquires & dequeues caused by an enqueu,
+ # e.g. removal of messages from ring/LV queues.
+ isRouting = true
+
+enqueue(qmsg):
+ if noReplicate: return
+ if routingSequence == 0 # thread local
+ routingSequence = nextRoutingSequence()
+ mcast create(encode(qmsg.msg),routingSeq)
+ mcast enqueue(qmsg.q,routingSeq)
+
+routed(msg):
+ if noReplicate: return
+ isRouting = false
+
+acquire(qmsg):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast acquire(qmsg)
+
+release(QueuedMessage)
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ mcast release(qmsg)
+
+accept(QueuedMessage):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ mcast accept(qmsg)
+
+reject(QueuedMessage):
+ isRejecting = true
+ mcast reject(qmsg)
+
+# FIXME no longer needed?
+drop(QueuedMessage)
+ cleanup(qmsg)
+
+*** MessageHandler and mcast messages
+Types:
+- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
+- struct QueueKey { MessageId id; QueueName q; }
+- typedef map<QueueKey, QueueEntry> Queue
+- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
+
+Members:
+- QueueEntry enqueued[QueueKey]
+- Node node[NodeId]
+
+Mcast messages in Message class:
+
+create(msg,seq)
+ if sender != self: node[sender].routing[seq] = decode(msg)
+
+enqueue(q,seq):
+ id = (sender,seq)
+ if sender == self:
+ enqueued[id,q] = (localMessage[seq], acquired=None)
+ else:
+ msg = sender.routing[seq]
+ enqueued[id,q] = (qmsg, acquired=None)
+ with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
+
+routed(seq):
+ if sender == self: localMessage.erase(msg.id.seq)
+ else: sender.routing.erase(seq)
+
+acquire(id,q):
+ enqueued[id,q].acquired = sender
+ node[sender].acquired.push_back((id,q))
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
+
+release(id,q)
+ enqueued[id,q].acquired = None
+ node[sender].acquired.erase((id,q))
+ if sender != self
+ with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
+
+reject(id,q):
+ sender.routing[id] = enqueued[id,q] # prepare for re-queueing
+
+rejected(id,q)
+ sender.routing.erase[id]
+
+dequeue(id,q)
+ entry = enqueued[id,q]
+ enqueued.erase[id,q]
+ node[entry.acquired].acquired.erase(id,q)
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
+
+member m leaves cluster:
+ for key in node[m].acquired:
+ release(key.id, key.q)
+ node.erase(m)
+
+*** Queue consumer locking
+
+When a queue is locked it does not deliver messages to its consumers.
+
+New broker::Queue functions:
+- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
+- startConsumers(): reset consumersStopped flag
+
+Implementation sketch, locking omitted:
+
+void Queue::stopConsumers() {
+ consumersStopped = true;
+ while (consumersBusy) consumersBusyMonitor.wait();
+}
+
+void Queue::startConsumers() {
+ consumersStopped = false;
+ listeners.notify();
+}
+
+bool Queue::dispatch(consumer) {
+ if (consumersStopped) return false;
+ ++consumersBusy;
+ do_regular_dispatch_body()
+ if (--consumersBusy == 0) consumersBusyMonitor.notify();
+}
+
+*** QueueOwnerHandler
+
+Invariants:
+- Each queue is owned by at most one node at any time.
+- Each node is interested in a set of queues at any given time.
+- A queue is un-owned if no node is interested.
+
+The queue owner releases the queue when
+- it loses interest i.e. queue has no consumers with credit.
+- a configured time delay expires and there are other interested nodes.
+
+The owner mcasts release(q). On delivery the new queue owner is the
+next node in node-id order (treating nodes as a circular list)
+starting from the old owner that is interested in the queue.
+
+Queue consumers initially are stopped, only started when we get
+ownership from the cluster.
+
+Thread safety: called by deliver, connection and timer threads, needs locking.
+
+Thread safe object per queue holding queue ownership status.
+Called by deliver, connection and timer threads.
+
+class QueueOwnership {
+ bool owned;
+ Timer timer;
+ BrokerQueue q;
+
+ drop(): # locked
+ if owned:
+ owned = false
+ q.stopConsumers()
+ mcast release(q.name, false)
+ timer.stop()
+
+ take(): # locked
+ if not owned:
+ owned = true
+ q.startConsumers()
+ timer.start(timeout)
+
+ timer.fire(): drop()
+}
+
+Data Members, only modified/examined in deliver thread:
+- typedef set<NodeId> ConsumerSet
+- map<QueueName, ConsumerSet> consumers
+- map<QueueName, NodeId> owner
+
+Thread safe data members, accessed in connection threads (via BrokerHandler):
+- map<QueueName, QueueOwnership> ownership
+
+Multicast messages in QueueOwner class:
+
+consume(q):
+ if sender==self and consumers[q].empty(): ownership[q].take()
+ consumers[q].insert(sender)
+
+release(q):
+ asssert(owner[q] == sender and owner[q] in consumers[q])
+ owner[q] = circular search from sender in consumers[q]
+ if owner==self: ownership[q].take()
+
+cancel(q):
+ assert(queue[q].owner != sender) # sender must release() before cancel()
+ consumers[q].erase(sender)
+
+member-leaves:
+ for q in queue: if owner[q] = left: left.release(q)
+
+Need 2 more intercept points in broker::Cluster:
+
+consume(q,consumer,consumerCount) - Queue::consume()
+ if consumerCount == 1: mcast consume(q)
+
+cancel(q,consumer,consumerCount) - Queue::cancel()
+ if consumerCount == 0:
+ ownership[q].drop()
+ mcast cancel(q)
+
+#TODO: lifecycle, updating cluster data structures when queues are destroyed
+
+*** Increasing concurrency
+The major performance limitation of the old cluster is that it does
+everything in the single CPG deliver thread context.
+
+We can get additional concurrency by creating a thread context _per queue_
+for queue operations: enqueue, acquire, accept etc.
+
+We associate a PollableQueue of queue operations with each AMQP queue.
+The CPG deliver thread would
+- build messages and associate with cluster IDs.
+- push queue ops to the appropriate PollableQueue to be dispatched the queues thread.
+
+Serializing operations on the same queue avoids contention, but takes advantage
+of the independence of operations on separate queues.
+
+*** Re-use of existing cluster code
+- re-use Event
+- re-use Multicaster
+- re-use same PollableQueueSetup (may experiment later)
+- new Core class to replace Cluster.
+- keep design modular, keep threading rules clear.
+
+** TODO [#B] Large message replication.
+Multicast should encode messages in fixed size buffers (64k)?
+Can't assume we can send message in one chunk.
+For 0-10 can use channel numbers & send whole frames packed into larger buffer.
+** TODO [#B] Transaction support.
+Extend broker::Cluster interface to capture transaction context and completion.
+Sequence number to generate per-node tx IDs.
+Replicate transaction completion.
+** TODO [#B] Batch CPG multicast messages
+The new cluster design involves a lot of small multicast messages,
+they need to be batched into larger CPG messages for efficiency.
+** TODO [#B] Genuine async completion
+Replace current synchronous waiting implementation with genuine async completion.
+
+Test: enhance test_store.cpp to defer enqueueComplete till special message received.
+
+Async callback uses *requestIOProcessing* to queue action on IO thread.
+
+** TODO [#B] Async completion of accept when dequeue completes.
+Interface is already there on broker::Message, just need to ensure
+that store and cluster implementations call it appropriately.
+
+** TODO [#B] Replicate wiring.
+From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
+
+** TODO [#B] New members joining - first pass
+
+Re-use update code from old cluster but don't replicate sessions &
+connections.
+
+Need to extend it to send cluster IDs with messages.
+
+Need to replicate the queue ownership data as part of the update.
+
+** TODO [#B] Persistence support.
+InitialStatus protoocl etc. to support persistent start-up (existing code)
+
+Only one broker recovers from store, update to others.
+
+Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
+
+** TODO [#B] Handle other ways that messages can leave a queue.
+
+Other ways (other than via a consumer) that messages are take off a queue.
+
+NOTE: Not controlled by queue lock, how to make them consistent?
+
+Target broker may not have all messages on other brokers for purge/destroy.
+- Queue::move() - need to wait for lock? Replicate?
+- Queue::get() - ???
+- Queue::purge() - replicate purge? or just delete what's on broker ?
+- Queue::destroy() - messages to alternate exchange on all brokers.?
+
+Need to add callpoints & mcast messages to replicate these?
+
+** TODO [#B] Flow control for internal queues.
+
+Need to bound the size of internal queues: delivery and multicast.
+- stop polling for read on client connections when we reach a bound.
+- restart polling when we get back under it.
+
+That will stop local multicasting, we still have to deal with remote
+multicasting (note existing cluster does not do this.) Something like:
+- when over bounds multicast a flow-control event.
+- on delivery of flow-control all members stop polling to read client connections
+- when back under bounds send flow-control-end, all members resume
+- if flow-controling member dies others resume
+
+** TODO [#B] Integration with transactions.
+Do we want to replicate during transaction & replicate commit/rollback
+or replicate only on commit?
+No integration with DTX transactions.
+** TODO [#B] Make new cluster work with replication exchange.
+Possibly re-use some common logic. Replication exchange is like clustering
+except over TCP.
+** TODO [#B] Better concurrency, scalabiility on multi-cores.
+Introduce PollableQueue of operations per broker queue. Queue up mcast
+operations (enqueue, acquire, accept etc.) to be handled concurrently
+on different queue. Performance testing to verify improved scalability.
+** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
+Cluster needs to complete these asynchronously to guarantee resources
+exist across the cluster when the command completes.
+
+** TODO [#C] Allow non-replicated exchanges, queues.
+
+Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
+- save replicated status to store.
+- support in management tools.
+Replicated exchange: replicate binds to replicated queues.
+Replicated queue: replicate all messages.
+
+** TODO [#C] New members joining - improved.
+
+Replicate wiring like old cluster, stall for wiring but not for
+messages. Update messages on a per-queue basis from back to front.
+
+Updater:
+- stall & push wiring: declare exchanges, queues, bindings.
+- start update iterator thread on each queue.
+- unstall and process normally while iterator threads run.
+
+Update iterator thread:
+- starts at back of updater queue, message m.
+- send update_front(q,m) to updatee and advance towards front
+- at front: send update_done(q)
+
+Updatee:
+- stall, receive wiring, lock all queues, mark queues "updating", unstall
+- update_front(q,m): push m to *front* of q
+- update_done(q): mark queue "ready"
+
+Updatee cannot take the queue consume lock for a queue that is updating.
+Updatee *can* push messages onto a queue that is updating.
+
+TODO: Is there any way to eliminate the stall for wiring?
+
+** TODO [#C] Refactoring of common concerns.
+
+There are a bunch of things that act as "Queue observers" with intercept
+points in similar places.
+- QueuePolicy
+- QueuedEvents (async replication)
+- MessageStore
+- Cluster
+
+Look for ways to capitalize on the similarity & simplify the code.
+
+In particular QueuedEvents (async replication) strongly resembles
+cluster replication, but over TCP rather than multicast.
+** TODO [#C] Concurrency for enqueue events.
+All enqueue events are being processed in the CPG deliver thread context which
+serializes all the work. We only need ordering on a per queue basis, can we
+enqueue in parallel on different queues and will that improve performance?
+** TODO [#C] Handling immediate messages in a cluster
+Include remote consumers in descision to deliver an immediate message?