summaryrefslogtreecommitdiff
path: root/qpid/cpp/design_docs/new-cluster-design.txt
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/design_docs/new-cluster-design.txt')
-rw-r--r--qpid/cpp/design_docs/new-cluster-design.txt435
1 files changed, 435 insertions, 0 deletions
diff --git a/qpid/cpp/design_docs/new-cluster-design.txt b/qpid/cpp/design_docs/new-cluster-design.txt
new file mode 100644
index 0000000000..7adb46fee3
--- /dev/null
+++ b/qpid/cpp/design_docs/new-cluster-design.txt
@@ -0,0 +1,435 @@
+-*-org-*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+* A new design for Qpid clustering.
+
+** Issues with current design.
+
+The cluster is based on virtual synchrony: each broker multicasts
+events and the events from all brokers are serialized and delivered in
+the same order to each broker.
+
+In the current design raw byte buffers from client connections are
+multicast, serialized and delivered in the same order to each broker.
+
+Each broker has a replica of all queues, exchanges, bindings and also
+all connections & sessions from every broker. Cluster code treats the
+broker as a "black box", it "plays" the client data into the
+connection objects and assumes that by giving the same input, each
+broker will reach the same state.
+
+A new broker joining the cluster receives a snapshot of the current
+cluster state, and then follows the multicast conversation.
+
+*** Maintenance issues.
+
+The entire state of each broker is replicated to every member:
+connections, sessions, queues, messages, exchanges, management objects
+etc. Any discrepancy in the state that affects how messages are
+allocated to consumers can cause an inconsistency.
+
+- Entire broker state must be faithfully updated to new members.
+- Management model also has to be replicated.
+- All queues are replicated, can't have unreplicated queues (e.g. for management)
+
+Events that are not deterministically predictable from the client
+input data stream can cause inconsistencies. In particular use of
+timers/timestamps require cluster workarounds to synchronize.
+
+A member that encounters an error which is not encounted by all other
+members is considered inconsistent and will shut itself down. Such
+errors can come from any area of the broker code, e.g. different
+ACL files can cause inconsistent errors.
+
+The following areas required workarounds to work in a cluster:
+
+- Timers/timestamps in broker code: management, heartbeats, TTL
+- Security: cluster must replicate *after* decryption by security layer.
+- Management: not initially included in the replicated model, source of many inconsistencies.
+
+It is very easy for someone adding a feature or fixing a bug in the
+standalone broker to break the cluster by:
+- adding new state that needs to be replicated in cluster updates.
+- doing something in a timer or other non-connection thread.
+
+It's very hard to test for such breaks. We need a looser coupling
+and a more explicitly defined interface between cluster and standalone
+broker code.
+
+*** Performance issues.
+
+Virtual synchrony delivers all data from all clients in a single
+stream to each broker. The cluster must play this data thru the full
+broker code stack: connections, sessions etc. in a single thread
+context in order to get identical behavior on each broker. The cluster
+has a pipelined design to get some concurrency but this is a severe
+limitation on scalability in multi-core hosts compared to the
+standalone broker which processes each connection in a separate thread
+context.
+
+** A new cluster design.
+
+Clearly defined interface between broker code and cluster plug-in.
+
+Replicate queue events rather than client data.
+- Broker behavior only needs to match per-queue.
+- Smaller amount of code (queue implementation) that must behave predictably.
+- Events only need be serialized per-queue, allows concurrency between queues
+
+Use a moving queue ownership protocol to agree order of dequeues.
+No longer relies on identical state and lock-step behavior to cause
+identical dequeues on each broker.
+
+Each queue has an associated thread-context. Events for a queue are executed
+in that queues context, in parallel with events for other queues.
+
+*** Requirements
+
+The cluster must provide these delivery guarantees:
+
+- client sends transfer: message must be replicated and not lost even if the local broker crashes.
+- client acquires a message: message must not be delivered on another broker while acquired.
+- client accepts message: message is forgotten, will never be delivered or re-queued by any broker.
+- client releases message: message must be re-queued on cluster and not lost.
+- client rejects message: message must be dead-lettered or discarded and forgotten.
+- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster.
+
+Each guarantee takes effect when the client receives a *completion*
+for the associated command (transfer, acquire, reject, accept)
+
+*** Broker receiving messages
+
+On recieving a message transfer, in the connection thread we:
+- multicast a message-received event.
+- enqueue and complete the transfer when it is self-delivered.
+
+Other brokers enqueue the message when they recieve the message-received event.
+
+Enqueues are queued up with other queue operations to be executed in the
+thread context associated with the queue.
+
+*** Broker sending messages: moving queue ownership
+
+Each queue is *owned* by at most one cluster broker at a time. Only
+that broker may acquire or dequeue messages. The owner multicasts
+notification of messages it acquires/dequeues to the cluster.
+Periodically the owner hands over ownership to another interested
+broker, providing time-shared access to the queue among all interested
+brokers.
+
+We assume the same IO-driven dequeuing algorithm as the standalone
+broker with one modification: queues can be "locked". A locked queue
+is not available for dequeuing messages and will be skipped by the
+output algorithm.
+
+At any given time only those queues owned by the local broker will be
+unlocked.
+
+As messages are acquired/dequeued from unlocked queues by the IO threads
+the broker multicasts acquire/dequeue events to the cluster.
+
+When an unlocked queue has no more consumers with credit, or when a
+time limit expires, the broker relinquishes ownership by multicasting
+a release-queue event, allowing another interested broker to take
+ownership.
+
+*** Asynchronous completion of accept
+### HERE
+In acknowledged mode a message is not forgotten until it is accepted,
+to allow for requeue on rejection or crash. The accept should not be
+completed till the message has been forgotten.
+
+On receiving an accept the broker:
+- dequeues the message from the local queue
+- multicasts an "accept" event
+- completes the accept asynchronously when the dequeue event is self delivered.
+
+NOTE: The message store does not currently implement asynchronous
+completions of accept, this is a bug.
+
+** Inconsistent errors.
+
+The new design eliminates most sources of inconsistent errors
+(connections, sessions, security, management etc.) The only points
+where inconsistent errors can occur are at enqueue and dequeue (most
+likely store-related errors.)
+
+The new design can use the exisiting error-handling protocol with one
+major improvement: since brokers are no longer required to maintain
+identical state they do not have to stall processing while an error is
+being resolved.
+
+#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.
+
+** Updating new members
+
+When a new member (the updatee) joins a cluster it needs to be brought
+up to date with the rest of the cluster. An existing member (the
+updater) sends an "update".
+
+In the old cluster design the update is a snapshot of the entire
+broker state. To ensure consistency of the snapshot both the updatee
+and the updater "stall" at the start of the update, i.e. they stop
+processing multicast events and queue them up for processing when the
+update is complete. This creates a back-log of work to get through,
+which leaves them lagging behind the rest of the cluster till they
+catch up (which is not guaranteed to happen in a bounded time.)
+
+With the new cluster design only exchanges, queues, bindings and
+messages need to be replicated.
+
+Update of wiring (exchanges, queues, bindings) is the same as current
+design.
+
+Update of messages is different:
+- per-queue rather than per-broker, separate queues can be updated in parallel.
+- updates queues in reverse order to eliminate unbounded catch-up
+- does not require updater & updatee to stall during update.
+
+Replication events, multicast to cluster:
+- enqueue(q,m): message m pushed on back of queue q .
+- acquire(q,m): mark m acquired
+- dequeue(q,m): forget m.
+Messages sent on update connection:
+- update_front(q,m): during update, receiver pushes m to *front* of q
+- update_done(q): during update, update of q is complete.
+
+Updater:
+- when updatee joins set iterator i = q.end()
+- while i != q.begin(): --i; send update_front(q,*i) to updatee
+- send update_done(q) to updatee
+
+Updatee:
+- q initially in locked state, can't dequeue locally.
+- start processing replication events for q immediately (enqueue, dequeue, acquire etc.)
+- receive update_front(q,m): q.push_front(m)
+- receive update_done(q): q can be unlocked for local dequeing.
+
+Benefits:
+- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated.
+- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update.
+- During update consumers actually help by removing messages before they need to be updated.
+- Needs no separate "work to do" queue, only the broker queues themselves.
+
+# TODO how can we recover from updater crashing before update complete?
+# Clear queues that are not updated & send request for udpates on those queues?
+
+# TODO updatee may receive a dequeue for a message it has not yet seen, needs
+# to hold on to that so it can drop the message when it is seen.
+# Similar problem exists for wiring?
+
+** Cluster API
+
+The new cluster API is similar to the MessageStore interface.
+(Initially I thought it would be an extension of the MessageStore interface,
+but as the design develops it seems better to make it a separate interface.)
+
+The cluster interface captures these events:
+- wiring changes: queue/exchange declare/bind
+- message enqueued/acquired/released/rejected/dequeued.
+
+The cluster will require some extensions to the Queue:
+- Queues can be "locked", locked queues are ignored by IO-driven output.
+- Cluster must be able to apply queue events from the cluster to a queue.
+ These appear to fit into existing queue operations.
+
+** Maintainability
+
+This design gives us more robust code with a clear and explicit interfaces.
+
+The cluster depends on specific events clearly defined by an explicit
+interface. Provided the semantics of this interface are not violated,
+the cluster will not be broken by changes to broker code.
+
+The cluster no longer requires identical processing of the entire
+broker stack on each broker. It is not affected by the details of how
+the broker allocates messages. It is independent of the
+protocol-specific state of connections and sessions and so is
+protected from future protocol changes (e.g. AMQP 1.0)
+
+A number of specific ways the code will be simplified:
+- drop code to replicate management model.
+- drop timer workarounds for TTL, management, heartbeats.
+- drop "cluster-safe assertions" in broker code.
+- drop connections, sessions, management from cluster update.
+- drop security workarounds: cluster code now operates after message decoding.
+- drop connection tracking in cluster code.
+- simper inconsistent-error handling code, no need to stall.
+
+** Performance
+
+The only way to verify the relative performance of the new design is
+to prototype & profile. The following points suggest the new design
+may scale/perform better:
+
+Some work moved from virtual synchrony thread to connection threads:
+- All connection/session logic moves to connection thread.
+- Exchange routing logic moves to connection thread.
+- On local broker dequeueing is done in connection thread
+- Local broker dequeue is IO driven as for a standalone broker.
+
+For queues with all consumers on a single node dequeue is all
+IO-driven in connection thread. Pay for time-sharing only if queue has
+consumers on multiple brokers.
+
+Doing work for different queues in parallel scales on multi-core boxes when
+there are multiple queues.
+
+One difference works against performance, thre is an extra
+encode/decode. The old design multicasts raw client data and decodes
+it in the virtual synchrony thread. The new design would decode
+messages in the connection thread, re-encode them for multicast, and
+decode (on non-local brokers) in the virtual synchrony thread. There
+is extra work here, but only in the *connection* thread: on a
+multi-core machine this happens in parallel for every connection, so
+it probably is not a bottleneck. There may be scope to optimize
+decode/re-encode by re-using some of the original encoded data, this
+could also benefit the stand-alone broker.
+
+** Asynchronous queue replication
+
+The existing "asynchronous queue replication" feature maintains a
+passive backup passive backup of queues on a remote broker over a TCP
+connection.
+
+The new cluster replication protocol could be re-used to implement
+asynchronous queue replication: its just a special case where the
+active broker is always the queue owner and the enqueue/dequeue
+messages are sent over a TCP connection rather than multicast.
+
+The new update update mechanism could also work with 'asynchronous
+queue replication', allowing such replication (over a TCP connection
+on a WAN say) to be initiated after the queue had already been created
+and been in use (one of the key missing features).
+
+** Increasing Concurrency and load sharing
+
+The current cluster is bottlenecked by processing everything in the
+CPG deliver thread. By removing the need for identical operation on
+each broker, we open up the possiblility of greater concurrency.
+
+Handling multicast enqueue, acquire, accpet, release etc: concurrency
+per queue. Operatons on different queues can be done in different
+threads.
+
+The new design does not force each broker to do all the work in the
+CPG thread so spreading load across cluster members should give some
+scale-up.
+
+** Misc outstanding issues & notes
+
+Replicating wiring
+- Need async completion of wiring commands?
+- qpid.sequence_counter: need extra work to support in new design, do we care?
+
+Cluster+persistence:
+- finish async completion: dequeue completion for store & cluster
+- cluster restart from store: clean stores *not* identical, pick 1, all others update.
+- need to generate cluster ids for messages recovered from store.
+
+Live updates: we don't need to stall brokers during an update!
+- update on queue-by-queue basis.
+- updatee locks queues during update, no dequeue.
+- update in reverse: don't update messages dequeued during update.
+- updatee adds update messages at front (as normal), replicated messages at back.
+- updater starts from back, sends "update done" when it hits front of queue.
+
+Flow control: need to throttle multicasting
+1. bound the number of outstanding multicasts.
+2. ensure the entire cluster keeps up, no unbounded "lag"
+The existing design uses read-credit to solve 1., and does not solve 2.
+New design should stop reading on all connections while flow control
+condition exists?
+
+Can federation also be unified, at least in configuration?
+
+Consider queues (and exchanges?) as having "reliability" attributes:
+- persistent: is the message stored on disk.
+- backed-up (to another broker): active/passive async replication.
+- replicated (to a cluster): active/active multicast replication to cluster.
+- federated: federation link to a queue/exchange on another broker.
+
+"Reliability" seems right for the first 3 but not for federation, is
+there a better term?
+
+Clustering and scalability: new design may give us the flexibility to
+address scalability as part of cluster design. Think about
+relationship to federation and "fragmented queues" idea.
+
+* Design debates/descisions
+
+** Active/active vs. active passive
+
+An active-active cluster can be used in an active-passive mode. In
+this mode we would like the cluster to be as efficient as a strictly
+active-passive implementation.
+
+An active/passive implementation allows some simplifications over active/active:
+- drop Queue ownership and locking
+- don't need to replicate message acquisition.
+- can do immediate local enqueue and still guarantee order.
+
+Active/passive introduces a few extra requirements:
+- Exactly one broker hast to take over if primary fails.
+- Passive members must refuse client connections.
+- On failover, clients must re-try all known addresses till they find the active member.
+
+Active/active benefits:
+- A broker failure only affects the subset of clients connected to that broker.
+- Clients can switch to any other broker on failover
+- Backup brokers are immediately available on failover.
+- Some load sharing: reading from client + multicast only done on direct node.
+
+Active/active drawbacks:
+- Co-ordinating message acquisition may impact performance (not tested)
+- Code may be more complex that active/passive.
+
+Active/passive benefits:
+- Don't need message allocation strategy, can feed consumers at top speed.
+- Code may be simpler than active/active.
+
+Active/passive drawbacks:
+- All clients on one node so a failure affects every client in the system.
+- After a failure there is a "reconnect storm" as every client reconnects to the new active node.
+- After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary.
+- Clients must find the single active node, may involve multiple connect attempts.
+
+** Total ordering.
+
+Initial thinking: allow message ordering to differ between brokers.
+New thinking: use CPG total ordering, get identical ordering on all brokers.
+- Allowing variation in order introduces too much chance of unexpected behavior.
+- Usign total order allows other optimizations, see Message Identifiers below.
+
+** Message identifiers.
+
+Initial thinking: message ID = CPG node id + 64 bit sequence number.
+This involves a lot of mapping between cluster IDs and broker messsages.
+
+New thinking: message ID = queue name + queue position.
+- Removes most of the mapping and memory management for cluster code.
+- Requires total ordering of messages (see above)
+
+** Message rejection
+
+Initial thinking: add special reject/rejected points to cluster interface so
+rejected messages could be re-queued without multicast.
+
+New thinking: treat re-queueing after reject as entirely new message.
+- Simplifies cluster interface & implementation
+- Not on the critical path.