summaryrefslogtreecommitdiff
path: root/qpid/cpp/design_docs/new-cluster-plan.txt
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/design_docs/new-cluster-plan.txt')
-rw-r--r--qpid/cpp/design_docs/new-cluster-plan.txt563
1 files changed, 181 insertions, 382 deletions
diff --git a/qpid/cpp/design_docs/new-cluster-plan.txt b/qpid/cpp/design_docs/new-cluster-plan.txt
index 781876e55a..626e443be7 100644
--- a/qpid/cpp/design_docs/new-cluster-plan.txt
+++ b/qpid/cpp/design_docs/new-cluster-plan.txt
@@ -17,376 +17,156 @@
# specific language governing permissions and limitations
# under the License.
+* Status of impementation
-Notes on new cluster implementation. See also: new-cluster-design.txt
+Meaning of priorities:
+[#A] Essential for basic functioning.
+[#B] Required for first release.
+[#C] Can be addressed in a later release.
-* Implementation plan.
+The existig prototype is bare bones to do performance benchmarks:
+- Implements publish and consumer locking protocol.
+- Defered delivery and asynchronous completion of message.
+- Optimize the case all consumers are on the same node.
+- No new member updates, no failover updates, no transactions, no persistence etc.
-Co-existence with old cluster code and tests:
-- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
-- Double up tests with old version/new version as the new code develops.
+Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/
-Minimal POC for message delivery & perf test.
-- no wiring replication, no updates, no failover, no persistence, no async completion.
-- just implement publish and acquire/dequeue locking protocol.
-- optimize the special case where all consumers are on the same node.
-- measure performance: compare active-passive and active-active modes of use.
+** Similarities to existing cluster.
-Full implementation of transient cluster
-- Update (based on existing update), async completion etc.
-- Passing all existing transient cluster tests.
+/Active-active/: the new cluster can be a drop-in replacement for the
+old, existing tests & customer deployment configurations are still
+valid.
-Persistent cluster
-- Make sure async completion works correctly.
-- InitialStatus protoocl etc. to support persistent start-up (existing code)
-- cluster restart from store: stores not identical. Load one, update the rest.
- - assign cluster ID's to messages recovered from store, don't replicate.
+/Virtual synchrony/: Uses corosync to co-ordinate activity of members.
-Improved update protocol
-- per-queue, less stalling, bounded catch-up.
+/XML controls/: Uses XML to define the primitives multicast to the
+cluster.
-* Task list
+** Differences with existing cluster.
-** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
+/Report rather than predict consumption/: brokers explicitly tell each
+other which messages have been acquired or dequeued. This removes the
+major cause of bugs in the existing cluster.
-NOTE: as implementation questions arise, take the easiest option and make
-a note for later optimization/improvement.
+/Queue consumer locking/: to avoid duplicates only one broker can acquire or
+dequeue messages at a time - while has the consume-lock on the
+queue. If multiple brokers are consuming from the same queue the lock
+is passed around to time-share access to the queue.
-*** Tests
-- python test: 4 senders, numbered messages, 4 receivers, verify message set.
-- acquire then release messages: verify can be dequeued on any member
-- acquire then kill broker: verify can be dequeued other members.
-- acquire then reject: verify goes on alt-exchange once only.
+/Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting
+the concurrency of the host) to allow concurrent processing on
+different queues. Queues are hashed onto the groups.
-*** DONE broker::Cluster interface and call points.
+* Completed tasks
+** DONE [#A] Minimal POC: publish/acquire/dequeue protocol.
+ CLOSED: [2011-10-05 Wed 16:03]
-Initial interface commited.
+Defines broker::Cluster interface and call points.
+Initial interface commite
-*** Main classes
+Main classes
+Core: central object holding cluster classes together (replaces cluster::Cluster)
+BrokerContext: implements broker::Cluster interface.
+QueueContext: Attached to a broker::Queue, holds cluster status.
+MessageHolder:holds local messages while they are being enqueued.
-BrokerHandler:
-- implements broker::Cluster intercept points.
-- sends mcast events to inform cluster of local actions.
-- thread safe, called in connection threads.
+Implements multiple CPG groups for better concurrency.
-LocalMessageMap:
-- Holds local messages while they are being enqueued.
-- thread safe: called by both BrokerHandler and MessageHandler
-
-MessageHandler:
-- handles delivered mcast messages related to messages.
-- initiates local actions in response to mcast events.
-- thread unsafe, only called in deliver thread.
-- maintains view of cluster state regarding messages.
+** DONE [#A] Large message replication.
+ CLOSED: [2011-10-05 Wed 17:22]
+Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame)
-QueueOwnerHandler:
-- handles delivered mcast messages related to queue consumer ownership.
-- thread safe, called in deliver, connection and timer threads.
-- maintains view of cluster state regarding queue ownership.
-
-cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
-- thread safe: manage state used by both MessageHandler and BrokerHandler
-
-The following code sketch illustrates only the "happy path" error handling
-is omitted.
-
-*** BrokerHandler
-Types:
-- struct QueuedMessage { Message msg; QueueName q; SequenceNumber position; }
-- struct
-
-NOTE:
-- Messages on queues are identified by a queue name + a position.
-- Messages being routed are identified by a sequence number.
-
-Members:
-- thread_local bool noReplicate // suppress replication.
-- thread_local bool isRouting // suppress operations while routing
-- Message localMessage[SequenceNumber] // local messages being routed.
-- thread_local SequenceNumber routingSequence
-
-NOTE: localMessage is also modified by MessageHandler.
-
-broker::Cluster intercept functions:
-
-routing(msg)
- if noReplicate: return
- # Supress everything except enqueues while we are routing.
- # We don't want to replicate acquires & dequeues caused by an enqueu,
- # e.g. removal of messages from ring/LV queues.
- isRouting = true
-
-enqueue(qmsg):
- if noReplicate: return
- if routingSequence == 0 # thread local
- routingSequence = nextRoutingSequence()
- mcast create(encode(qmsg.msg),routingSeq)
- mcast enqueue(qmsg.q,routingSeq)
-
-routed(msg):
- if noReplicate: return
- isRouting = false
-
-acquire(qmsg):
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- if msg.id: mcast acquire(qmsg)
-
-release(QueuedMessage)
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- mcast release(qmsg)
-
-accept(QueuedMessage):
- if noReplicate: return
- if isRouting: return # Ignore while we are routing a message.
- mcast accept(qmsg)
-
-reject(QueuedMessage):
- isRejecting = true
- mcast reject(qmsg)
-
-# FIXME no longer needed?
-drop(QueuedMessage)
- cleanup(qmsg)
-
-*** MessageHandler and mcast messages
-Types:
-- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
-- struct QueueKey { MessageId id; QueueName q; }
-- typedef map<QueueKey, QueueEntry> Queue
-- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
-
-Members:
-- QueueEntry enqueued[QueueKey]
-- Node node[NodeId]
-
-Mcast messages in Message class:
-
-create(msg,seq)
- if sender != self: node[sender].routing[seq] = decode(msg)
-
-enqueue(q,seq):
- id = (sender,seq)
- if sender == self:
- enqueued[id,q] = (localMessage[seq], acquired=None)
- else:
- msg = sender.routing[seq]
- enqueued[id,q] = (qmsg, acquired=None)
- with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
-
-routed(seq):
- if sender == self: localMessage.erase(msg.id.seq)
- else: sender.routing.erase(seq)
-
-acquire(id,q):
- enqueued[id,q].acquired = sender
- node[sender].acquired.push_back((id,q))
- if sender != self:
- with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
-
-release(id,q)
- enqueued[id,q].acquired = None
- node[sender].acquired.erase((id,q))
- if sender != self
- with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
-
-reject(id,q):
- sender.routing[id] = enqueued[id,q] # prepare for re-queueing
-
-rejected(id,q)
- sender.routing.erase[id]
-
-dequeue(id,q)
- entry = enqueued[id,q]
- enqueued.erase[id,q]
- node[entry.acquired].acquired.erase(id,q)
- if sender != self:
- with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
-
-member m leaves cluster:
- for key in node[m].acquired:
- release(key.id, key.q)
- node.erase(m)
-
-*** Queue consumer locking
-
-When a queue is locked it does not deliver messages to its consumers.
-
-New broker::Queue functions:
-- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
-- startConsumers(): reset consumersStopped flag
-
-Implementation sketch, locking omitted:
-
-void Queue::stopConsumers() {
- consumersStopped = true;
- while (consumersBusy) consumersBusyMonitor.wait();
-}
-
-void Queue::startConsumers() {
- consumersStopped = false;
- listeners.notify();
-}
-
-bool Queue::dispatch(consumer) {
- if (consumersStopped) return false;
- ++consumersBusy;
- do_regular_dispatch_body()
- if (--consumersBusy == 0) consumersBusyMonitor.notify();
-}
-
-*** QueueOwnerHandler
-
-Invariants:
-- Each queue is owned by at most one node at any time.
-- Each node is interested in a set of queues at any given time.
-- A queue is un-owned if no node is interested.
-
-The queue owner releases the queue when
-- it loses interest i.e. queue has no consumers with credit.
-- a configured time delay expires and there are other interested nodes.
-
-The owner mcasts release(q). On delivery the new queue owner is the
-next node in node-id order (treating nodes as a circular list)
-starting from the old owner that is interested in the queue.
-
-Queue consumers initially are stopped, only started when we get
-ownership from the cluster.
-
-Thread safety: called by deliver, connection and timer threads, needs locking.
-
-Thread safe object per queue holding queue ownership status.
-Called by deliver, connection and timer threads.
-
-class QueueOwnership {
- bool owned;
- Timer timer;
- BrokerQueue q;
-
- drop(): # locked
- if owned:
- owned = false
- q.stopConsumers()
- mcast release(q.name, false)
- timer.stop()
-
- take(): # locked
- if not owned:
- owned = true
- q.startConsumers()
- timer.start(timeout)
-
- timer.fire(): drop()
-}
-
-Data Members, only modified/examined in deliver thread:
-- typedef set<NodeId> ConsumerSet
-- map<QueueName, ConsumerSet> consumers
-- map<QueueName, NodeId> owner
+* Open questions
-Thread safe data members, accessed in connection threads (via BrokerHandler):
-- map<QueueName, QueueOwnership> ownership
+** TODO [#A] Queue sequence numbers vs. independant message IDs.
+ SCHEDULED: <2011-10-07 Fri>
-Multicast messages in QueueOwner class:
+Current prototype uses queue sequence numbers to identify
+message. This is tricky for updating new members as the sequence
+numbers are only known on delivery.
-consume(q):
- if sender==self and consumers[q].empty(): ownership[q].take()
- consumers[q].insert(sender)
+Independent message IDs that can be generated and sent with the message simplify
+this and potentially allow performance benefits by relaxing total ordering.
+However they imply additional map lookups that might hurt performance.
-release(q):
- asssert(owner[q] == sender and owner[q] in consumers[q])
- owner[q] = circular search from sender in consumers[q]
- if owner==self: ownership[q].take()
+- [X] Prototype independent message IDs, check performance.
+Throughput worse by 30% in contented case, 10% in uncontended.
+Sticking with queue sequence numbers.
-cancel(q):
- assert(queue[q].owner != sender) # sender must release() before cancel()
- consumers[q].erase(sender)
+* Outstanding Tasks
+** TODO [#A] Defer and async completion of wiring commands.
-member-leaves:
- for q in queue: if owner[q] = left: left.release(q)
+Testing requirement: Many tests assume wiring changes are visible
+across the cluster once the commad completes.
-Need 2 more intercept points in broker::Cluster:
+Name clashes: need to avoid race if same name queue/exchange declared
+on 2 brokers simultaneously
-consume(q,consumer,consumerCount) - Queue::consume()
- if consumerCount == 1: mcast consume(q)
+** TODO [#A] Passing all existing cluster tests.
-cancel(q,consumer,consumerCount) - Queue::cancel()
- if consumerCount == 0:
- ownership[q].drop()
- mcast cancel(q)
+The new cluster should be a drop-in replacement for the old, so it
+should be able to pass all the existing tests.
-#TODO: lifecycle, updating cluster data structures when queues are destroyed
-
-*** Increasing concurrency
-The major performance limitation of the old cluster is that it does
-everything in the single CPG deliver thread context.
-
-We can get additional concurrency by creating a thread context _per queue_
-for queue operations: enqueue, acquire, accept etc.
-
-We associate a PollableQueue of queue operations with each AMQP queue.
-The CPG deliver thread would
-- build messages and associate with cluster IDs.
-- push queue ops to the appropriate PollableQueue to be dispatched the queues thread.
-
-Serializing operations on the same queue avoids contention, but takes advantage
-of the independence of operations on separate queues.
+** TODO [#A] Update to new members joining.
-*** Re-use of existing cluster code
-- re-use Event
-- re-use Multicaster
-- re-use same PollableQueueSetup (may experiment later)
-- new Core class to replace Cluster.
-- keep design modular, keep threading rules clear.
+Need to resolve [[Queue sequence numbers vs. independant message IDs]] first.
+- implicit sequence numbers are more tricky to replicate to new member.
-** TODO [#B] Large message replication.
-Multicast should encode messages in fixed size buffers (64k)?
-Can't assume we can send message in one chunk.
-For 0-10 can use channel numbers & send whole frames packed into larger buffer.
-** TODO [#B] Transaction support.
-Extend broker::Cluster interface to capture transaction context and completion.
-Sequence number to generate per-node tx IDs.
-Replicate transaction completion.
-** TODO [#B] Batch CPG multicast messages
-The new cluster design involves a lot of small multicast messages,
-they need to be batched into larger CPG messages for efficiency.
-** TODO [#B] Genuine async completion
-Replace current synchronous waiting implementation with genuine async completion.
+Update individual objects (queues and exchanges) independently.
+- create queues first, then update all queues and exchanges in parallel.
+- multiple updater threads, per queue/exchange.
+- updater sends messages to special exchange(s) (not using extended AMQP controls)
+
+Queue updater:
+- marks the queue position at the sync point
+- sends messages starting from the sync point working towards the head of the queue.
+- send "done" message.
+Note: updater remains active throughout, consuming clients actually reduce the
+size of the update.
-Test: enhance test_store.cpp to defer enqueueComplete till special message received.
+Queue updatee:
+- enqueues received from CPG: add to back of queue as normal.
+- dequeues received from CPG: apply if found, else save to check at end of update.
+- messages from updater: add to the *front* of the queue.
+- update complete: apply any saved dequeues.
-Async callback uses *requestIOProcessing* to queue action on IO thread.
+Exchange updater:
+- updater: send snapshot of exchange as it was at the sync point.
-** TODO [#B] Async completion of accept when dequeue completes.
-Interface is already there on broker::Message, just need to ensure
-that store and cluster implementations call it appropriately.
+Exchange updatee:
+- queue exchange operations after the sync point.
+- when snapshot is received: apply saved operations.
-** TODO [#B] Replicate wiring.
-From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
+Updater remains active throughout.
+Updatee stalls clients until the update completes.
-** TODO [#B] New members joining - first pass
+Updating queue/exchange/binding objects is via the same encode/decode
+that is used by the store. Updatee to use recovery interfaces to
+recover?
-Re-use update code from old cluster but don't replicate sessions &
-connections.
+** TODO [#A] Failover updates to client.
+Implement the amq.failover exchange to notify clients of membership.
-Need to extend it to send cluster IDs with messages.
+** TODO [#B] Initial status protocol.
+Handshake to give status of each broker member to new members joining.
+Status includes
+- persistent store state (clean, dirty)
+- cluster protocol version.
-Need to replicate the queue ownership data as part of the update.
+** TODO [#B] Replace boost::hash with our own hash function.
+The hash function is effectively part of the interface so
+we need to be sure it doesn't change underneath us.
-** TODO [#B] Persistence support.
-InitialStatus protoocl etc. to support persistent start-up (existing code)
+** TODO [#B] Persistent cluster support.
+Initial status protoocl to support persistent start-up (see existing code)
Only one broker recovers from store, update to others.
Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
-** TODO [#B] Handle other ways that messages can leave a queue.
-
-Other ways (other than via a consumer) that messages are take off a queue.
-
-NOTE: Not controlled by queue lock, how to make them consistent?
-
+** TODO [#B] Management support
+Replicate management methods that modify queues - e.g. move, purge.
Target broker may not have all messages on other brokers for purge/destroy.
- Queue::move() - need to wait for lock? Replicate?
- Queue::get() - ???
@@ -395,66 +175,48 @@ Target broker may not have all messages on other brokers for purge/destroy.
Need to add callpoints & mcast messages to replicate these?
-** TODO [#B] Flow control for internal queues.
-
-Need to bound the size of internal queues: delivery and multicast.
-- stop polling for read on client connections when we reach a bound.
-- restart polling when we get back under it.
-
-That will stop local multicasting, we still have to deal with remote
-multicasting (note existing cluster does not do this.) Something like:
-- when over bounds multicast a flow-control event.
-- on delivery of flow-control all members stop polling to read client connections
-- when back under bounds send flow-control-end, all members resume
-- if flow-controling member dies others resume
-
-** TODO [#B] Integration with transactions.
-Do we want to replicate during transaction & replicate commit/rollback
-or replicate only on commit?
-No integration with DTX transactions.
-** TODO [#B] Make new cluster work with replication exchange.
-Possibly re-use some common logic. Replication exchange is like clustering
-except over TCP.
-** TODO [#B] Better concurrency, scalabiility on multi-cores.
-Introduce PollableQueue of operations per broker queue. Queue up mcast
-operations (enqueue, acquire, accept etc.) to be handled concurrently
-on different queue. Performance testing to verify improved scalability.
-** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
-Cluster needs to complete these asynchronously to guarantee resources
-exist across the cluster when the command completes.
+** TODO [#B] TX transaction support.
+Extend broker::Cluster interface to capture transaction context and completion.
+Running brokers exchange TX information.
+New broker update includes TX information.
-** TODO [#C] Allow non-replicated exchanges, queues.
+ // FIXME aconway 2010-10-18: As things stand the cluster is not
+ // compatible with transactions
+ // - enqueues occur after routing is complete
+ // - no call to Cluster::enqueue, should be in Queue::process?
+ // - no transaction context associated with messages in the Cluster interface.
+ // - no call to Cluster::accept in Queue::dequeueCommitted
-Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
-- save replicated status to store.
-- support in management tools.
-Replicated exchange: replicate binds to replicated queues.
-Replicated queue: replicate all messages.
+** TODO [#B] DTX transaction support.
+Extend broker::Cluster interface to capture transaction context and completion.
+Running brokers exchange DTX information.
+New broker update includes DTX information.
-** TODO [#C] New members joining - improved.
+** TODO [#B] Async completion of accept.
+When this is fixed in the standalone broker, it should be fixed for cluster.
-Replicate wiring like old cluster, stall for wiring but not for
-messages. Update messages on a per-queue basis from back to front.
+** TODO [#B] Network partitions and quorum.
+Re-use existing implementation.
-Updater:
-- stall & push wiring: declare exchanges, queues, bindings.
-- start update iterator thread on each queue.
-- unstall and process normally while iterator threads run.
+** TODO [#B] Review error handling, put in a consitent model.
+- [ ] Review all asserts, for possible throw.
+- [ ] Decide on fatal vs. non-fatal errors.
-Update iterator thread:
-- starts at back of updater queue, message m.
-- send update_front(q,m) to updatee and advance towards front
-- at front: send update_done(q)
+** TODO [#B] Implement inconsistent error handling policy.
+What to do if a message is enqueued sucessfully on the local broker,
+but fails on one or more backups - e.g. due to store limits?
+- we have more flexibility, we don't *have* to crash
+- but we've loste some of our redundancy guarantee, how should we inform client?
-Updatee:
-- stall, receive wiring, lock all queues, mark queues "updating", unstall
-- update_front(q,m): push m to *front* of q
-- update_done(q): mark queue "ready"
+** TODO [#C] Allow non-replicated exchanges, queues.
-Updatee cannot take the queue consume lock for a queue that is updating.
-Updatee *can* push messages onto a queue that is updating.
+Set qpid.replicate=false in declare arguments, set flag on Exchange, Queue objects.
+- save replicated status to store.
+- support in management tools.
+Replicated queue: replicate all messages.
+Replicated exchange: replicate bindings to replicated queues only.
-TODO: Is there any way to eliminate the stall for wiring?
+Configurable default? Defaults to true.
** TODO [#C] Refactoring of common concerns.
@@ -469,9 +231,46 @@ Look for ways to capitalize on the similarity & simplify the code.
In particular QueuedEvents (async replication) strongly resembles
cluster replication, but over TCP rather than multicast.
-** TODO [#C] Concurrency for enqueue events.
-All enqueue events are being processed in the CPG deliver thread context which
-serializes all the work. We only need ordering on a per queue basis, can we
-enqueue in parallel on different queues and will that improve performance?
+
** TODO [#C] Handling immediate messages in a cluster
Include remote consumers in descision to deliver an immediate message?
+** TODO [#C] Remove old cluster hacks and workarounds
+The old cluster has workarounds in the broker code that can be removed.
+- [ ] drop code to replicate management model.
+- [ ] drop timer workarounds for TTL, management, heartbeats.
+- [ ] drop "cluster-safe assertions" in broker code.
+- [ ] drop connections, sessions, management from cluster update.
+- [ ] drop security workarounds: cluster code now operates after message decoding.
+- [ ] drop connection tracking in cluster code.
+- [ ] simpler inconsistent-error handling code, no need to stall.
+
+** TODO [#C] Support for live upgrades.
+
+Allow brokers in a running cluster to be replaced one-by-one with a new version.
+(see new-cluster-design for design notes.)
+
+The old cluster protocol was unstable because any changes in broker
+state caused changes to the cluster protocol.The new design should be
+much more stable.
+
+Points to implement in anticipation of live upgrade:
+- Prefix each CPG message with a version number and length.
+ Version number determines how to decode the message.
+- Brokers ignore messages that have a higher version number than they understand.
+- Protocol version XML element in cluster.xml, on each control.
+- Initial status protocol to include protocol version number.
+
+New member udpates: use the store encode/decode for updates, use the
+same backward compatibility strategy as the store. This allows for
+adding new elements to the end of structures but not changing or
+removing new elements.
+
+** TODO [#C] Support for AMQP 1.0.
+
+* Testing
+** TODO [#A] Pass all existing cluster tests.
+Requires [[Defer and async completion of wiring commands.]]
+** TODO [#A] New cluster tests.
+Stress tests & performance benchmarks focused on changes in new cluster:
+- concurrency by queues rather than connections.
+- different handling shared queues when consuemrs are on different brokers.