diff options
Diffstat (limited to 'qpid/cpp/design_docs/new-cluster-design.txt')
-rw-r--r-- | qpid/cpp/design_docs/new-cluster-design.txt | 435 |
1 files changed, 435 insertions, 0 deletions
diff --git a/qpid/cpp/design_docs/new-cluster-design.txt b/qpid/cpp/design_docs/new-cluster-design.txt new file mode 100644 index 0000000000..7adb46fee3 --- /dev/null +++ b/qpid/cpp/design_docs/new-cluster-design.txt @@ -0,0 +1,435 @@ +-*-org-*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +* A new design for Qpid clustering. + +** Issues with current design. + +The cluster is based on virtual synchrony: each broker multicasts +events and the events from all brokers are serialized and delivered in +the same order to each broker. + +In the current design raw byte buffers from client connections are +multicast, serialized and delivered in the same order to each broker. + +Each broker has a replica of all queues, exchanges, bindings and also +all connections & sessions from every broker. Cluster code treats the +broker as a "black box", it "plays" the client data into the +connection objects and assumes that by giving the same input, each +broker will reach the same state. + +A new broker joining the cluster receives a snapshot of the current +cluster state, and then follows the multicast conversation. + +*** Maintenance issues. + +The entire state of each broker is replicated to every member: +connections, sessions, queues, messages, exchanges, management objects +etc. Any discrepancy in the state that affects how messages are +allocated to consumers can cause an inconsistency. + +- Entire broker state must be faithfully updated to new members. +- Management model also has to be replicated. +- All queues are replicated, can't have unreplicated queues (e.g. for management) + +Events that are not deterministically predictable from the client +input data stream can cause inconsistencies. In particular use of +timers/timestamps require cluster workarounds to synchronize. + +A member that encounters an error which is not encounted by all other +members is considered inconsistent and will shut itself down. Such +errors can come from any area of the broker code, e.g. different +ACL files can cause inconsistent errors. + +The following areas required workarounds to work in a cluster: + +- Timers/timestamps in broker code: management, heartbeats, TTL +- Security: cluster must replicate *after* decryption by security layer. +- Management: not initially included in the replicated model, source of many inconsistencies. + +It is very easy for someone adding a feature or fixing a bug in the +standalone broker to break the cluster by: +- adding new state that needs to be replicated in cluster updates. +- doing something in a timer or other non-connection thread. + +It's very hard to test for such breaks. We need a looser coupling +and a more explicitly defined interface between cluster and standalone +broker code. + +*** Performance issues. + +Virtual synchrony delivers all data from all clients in a single +stream to each broker. The cluster must play this data thru the full +broker code stack: connections, sessions etc. in a single thread +context in order to get identical behavior on each broker. The cluster +has a pipelined design to get some concurrency but this is a severe +limitation on scalability in multi-core hosts compared to the +standalone broker which processes each connection in a separate thread +context. + +** A new cluster design. + +Clearly defined interface between broker code and cluster plug-in. + +Replicate queue events rather than client data. +- Broker behavior only needs to match per-queue. +- Smaller amount of code (queue implementation) that must behave predictably. +- Events only need be serialized per-queue, allows concurrency between queues + +Use a moving queue ownership protocol to agree order of dequeues. +No longer relies on identical state and lock-step behavior to cause +identical dequeues on each broker. + +Each queue has an associated thread-context. Events for a queue are executed +in that queues context, in parallel with events for other queues. + +*** Requirements + +The cluster must provide these delivery guarantees: + +- client sends transfer: message must be replicated and not lost even if the local broker crashes. +- client acquires a message: message must not be delivered on another broker while acquired. +- client accepts message: message is forgotten, will never be delivered or re-queued by any broker. +- client releases message: message must be re-queued on cluster and not lost. +- client rejects message: message must be dead-lettered or discarded and forgotten. +- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster. + +Each guarantee takes effect when the client receives a *completion* +for the associated command (transfer, acquire, reject, accept) + +*** Broker receiving messages + +On recieving a message transfer, in the connection thread we: +- multicast a message-received event. +- enqueue and complete the transfer when it is self-delivered. + +Other brokers enqueue the message when they recieve the message-received event. + +Enqueues are queued up with other queue operations to be executed in the +thread context associated with the queue. + +*** Broker sending messages: moving queue ownership + +Each queue is *owned* by at most one cluster broker at a time. Only +that broker may acquire or dequeue messages. The owner multicasts +notification of messages it acquires/dequeues to the cluster. +Periodically the owner hands over ownership to another interested +broker, providing time-shared access to the queue among all interested +brokers. + +We assume the same IO-driven dequeuing algorithm as the standalone +broker with one modification: queues can be "locked". A locked queue +is not available for dequeuing messages and will be skipped by the +output algorithm. + +At any given time only those queues owned by the local broker will be +unlocked. + +As messages are acquired/dequeued from unlocked queues by the IO threads +the broker multicasts acquire/dequeue events to the cluster. + +When an unlocked queue has no more consumers with credit, or when a +time limit expires, the broker relinquishes ownership by multicasting +a release-queue event, allowing another interested broker to take +ownership. + +*** Asynchronous completion of accept +### HERE +In acknowledged mode a message is not forgotten until it is accepted, +to allow for requeue on rejection or crash. The accept should not be +completed till the message has been forgotten. + +On receiving an accept the broker: +- dequeues the message from the local queue +- multicasts an "accept" event +- completes the accept asynchronously when the dequeue event is self delivered. + +NOTE: The message store does not currently implement asynchronous +completions of accept, this is a bug. + +** Inconsistent errors. + +The new design eliminates most sources of inconsistent errors +(connections, sessions, security, management etc.) The only points +where inconsistent errors can occur are at enqueue and dequeue (most +likely store-related errors.) + +The new design can use the exisiting error-handling protocol with one +major improvement: since brokers are no longer required to maintain +identical state they do not have to stall processing while an error is +being resolved. + +#TODO: The only source of dequeue errors is probably an unrecoverable journal failure. + +** Updating new members + +When a new member (the updatee) joins a cluster it needs to be brought +up to date with the rest of the cluster. An existing member (the +updater) sends an "update". + +In the old cluster design the update is a snapshot of the entire +broker state. To ensure consistency of the snapshot both the updatee +and the updater "stall" at the start of the update, i.e. they stop +processing multicast events and queue them up for processing when the +update is complete. This creates a back-log of work to get through, +which leaves them lagging behind the rest of the cluster till they +catch up (which is not guaranteed to happen in a bounded time.) + +With the new cluster design only exchanges, queues, bindings and +messages need to be replicated. + +Update of wiring (exchanges, queues, bindings) is the same as current +design. + +Update of messages is different: +- per-queue rather than per-broker, separate queues can be updated in parallel. +- updates queues in reverse order to eliminate unbounded catch-up +- does not require updater & updatee to stall during update. + +Replication events, multicast to cluster: +- enqueue(q,m): message m pushed on back of queue q . +- acquire(q,m): mark m acquired +- dequeue(q,m): forget m. +Messages sent on update connection: +- update_front(q,m): during update, receiver pushes m to *front* of q +- update_done(q): during update, update of q is complete. + +Updater: +- when updatee joins set iterator i = q.end() +- while i != q.begin(): --i; send update_front(q,*i) to updatee +- send update_done(q) to updatee + +Updatee: +- q initially in locked state, can't dequeue locally. +- start processing replication events for q immediately (enqueue, dequeue, acquire etc.) +- receive update_front(q,m): q.push_front(m) +- receive update_done(q): q can be unlocked for local dequeing. + +Benefits: +- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated. +- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update. +- During update consumers actually help by removing messages before they need to be updated. +- Needs no separate "work to do" queue, only the broker queues themselves. + +# TODO how can we recover from updater crashing before update complete? +# Clear queues that are not updated & send request for udpates on those queues? + +# TODO updatee may receive a dequeue for a message it has not yet seen, needs +# to hold on to that so it can drop the message when it is seen. +# Similar problem exists for wiring? + +** Cluster API + +The new cluster API is similar to the MessageStore interface. +(Initially I thought it would be an extension of the MessageStore interface, +but as the design develops it seems better to make it a separate interface.) + +The cluster interface captures these events: +- wiring changes: queue/exchange declare/bind +- message enqueued/acquired/released/rejected/dequeued. + +The cluster will require some extensions to the Queue: +- Queues can be "locked", locked queues are ignored by IO-driven output. +- Cluster must be able to apply queue events from the cluster to a queue. + These appear to fit into existing queue operations. + +** Maintainability + +This design gives us more robust code with a clear and explicit interfaces. + +The cluster depends on specific events clearly defined by an explicit +interface. Provided the semantics of this interface are not violated, +the cluster will not be broken by changes to broker code. + +The cluster no longer requires identical processing of the entire +broker stack on each broker. It is not affected by the details of how +the broker allocates messages. It is independent of the +protocol-specific state of connections and sessions and so is +protected from future protocol changes (e.g. AMQP 1.0) + +A number of specific ways the code will be simplified: +- drop code to replicate management model. +- drop timer workarounds for TTL, management, heartbeats. +- drop "cluster-safe assertions" in broker code. +- drop connections, sessions, management from cluster update. +- drop security workarounds: cluster code now operates after message decoding. +- drop connection tracking in cluster code. +- simper inconsistent-error handling code, no need to stall. + +** Performance + +The only way to verify the relative performance of the new design is +to prototype & profile. The following points suggest the new design +may scale/perform better: + +Some work moved from virtual synchrony thread to connection threads: +- All connection/session logic moves to connection thread. +- Exchange routing logic moves to connection thread. +- On local broker dequeueing is done in connection thread +- Local broker dequeue is IO driven as for a standalone broker. + +For queues with all consumers on a single node dequeue is all +IO-driven in connection thread. Pay for time-sharing only if queue has +consumers on multiple brokers. + +Doing work for different queues in parallel scales on multi-core boxes when +there are multiple queues. + +One difference works against performance, thre is an extra +encode/decode. The old design multicasts raw client data and decodes +it in the virtual synchrony thread. The new design would decode +messages in the connection thread, re-encode them for multicast, and +decode (on non-local brokers) in the virtual synchrony thread. There +is extra work here, but only in the *connection* thread: on a +multi-core machine this happens in parallel for every connection, so +it probably is not a bottleneck. There may be scope to optimize +decode/re-encode by re-using some of the original encoded data, this +could also benefit the stand-alone broker. + +** Asynchronous queue replication + +The existing "asynchronous queue replication" feature maintains a +passive backup passive backup of queues on a remote broker over a TCP +connection. + +The new cluster replication protocol could be re-used to implement +asynchronous queue replication: its just a special case where the +active broker is always the queue owner and the enqueue/dequeue +messages are sent over a TCP connection rather than multicast. + +The new update update mechanism could also work with 'asynchronous +queue replication', allowing such replication (over a TCP connection +on a WAN say) to be initiated after the queue had already been created +and been in use (one of the key missing features). + +** Increasing Concurrency and load sharing + +The current cluster is bottlenecked by processing everything in the +CPG deliver thread. By removing the need for identical operation on +each broker, we open up the possiblility of greater concurrency. + +Handling multicast enqueue, acquire, accpet, release etc: concurrency +per queue. Operatons on different queues can be done in different +threads. + +The new design does not force each broker to do all the work in the +CPG thread so spreading load across cluster members should give some +scale-up. + +** Misc outstanding issues & notes + +Replicating wiring +- Need async completion of wiring commands? +- qpid.sequence_counter: need extra work to support in new design, do we care? + +Cluster+persistence: +- finish async completion: dequeue completion for store & cluster +- cluster restart from store: clean stores *not* identical, pick 1, all others update. +- need to generate cluster ids for messages recovered from store. + +Live updates: we don't need to stall brokers during an update! +- update on queue-by-queue basis. +- updatee locks queues during update, no dequeue. +- update in reverse: don't update messages dequeued during update. +- updatee adds update messages at front (as normal), replicated messages at back. +- updater starts from back, sends "update done" when it hits front of queue. + +Flow control: need to throttle multicasting +1. bound the number of outstanding multicasts. +2. ensure the entire cluster keeps up, no unbounded "lag" +The existing design uses read-credit to solve 1., and does not solve 2. +New design should stop reading on all connections while flow control +condition exists? + +Can federation also be unified, at least in configuration? + +Consider queues (and exchanges?) as having "reliability" attributes: +- persistent: is the message stored on disk. +- backed-up (to another broker): active/passive async replication. +- replicated (to a cluster): active/active multicast replication to cluster. +- federated: federation link to a queue/exchange on another broker. + +"Reliability" seems right for the first 3 but not for federation, is +there a better term? + +Clustering and scalability: new design may give us the flexibility to +address scalability as part of cluster design. Think about +relationship to federation and "fragmented queues" idea. + +* Design debates/descisions + +** Active/active vs. active passive + +An active-active cluster can be used in an active-passive mode. In +this mode we would like the cluster to be as efficient as a strictly +active-passive implementation. + +An active/passive implementation allows some simplifications over active/active: +- drop Queue ownership and locking +- don't need to replicate message acquisition. +- can do immediate local enqueue and still guarantee order. + +Active/passive introduces a few extra requirements: +- Exactly one broker hast to take over if primary fails. +- Passive members must refuse client connections. +- On failover, clients must re-try all known addresses till they find the active member. + +Active/active benefits: +- A broker failure only affects the subset of clients connected to that broker. +- Clients can switch to any other broker on failover +- Backup brokers are immediately available on failover. +- Some load sharing: reading from client + multicast only done on direct node. + +Active/active drawbacks: +- Co-ordinating message acquisition may impact performance (not tested) +- Code may be more complex that active/passive. + +Active/passive benefits: +- Don't need message allocation strategy, can feed consumers at top speed. +- Code may be simpler than active/active. + +Active/passive drawbacks: +- All clients on one node so a failure affects every client in the system. +- After a failure there is a "reconnect storm" as every client reconnects to the new active node. +- After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary. +- Clients must find the single active node, may involve multiple connect attempts. + +** Total ordering. + +Initial thinking: allow message ordering to differ between brokers. +New thinking: use CPG total ordering, get identical ordering on all brokers. +- Allowing variation in order introduces too much chance of unexpected behavior. +- Usign total order allows other optimizations, see Message Identifiers below. + +** Message identifiers. + +Initial thinking: message ID = CPG node id + 64 bit sequence number. +This involves a lot of mapping between cluster IDs and broker messsages. + +New thinking: message ID = queue name + queue position. +- Removes most of the mapping and memory management for cluster code. +- Requires total ordering of messages (see above) + +** Message rejection + +Initial thinking: add special reject/rejected points to cluster interface so +rejected messages could be re-queued without multicast. + +New thinking: treat re-queueing after reject as entirely new message. +- Simplifies cluster interface & implementation +- Not on the critical path. |