summaryrefslogtreecommitdiff
path: root/qpid/cpp/design_docs/new-cluster-design.txt
blob: 7adb46fee3a34d6e23ec0c8b059c331b05a50b4c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
-*-org-*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

* A new design for Qpid clustering.

** Issues with current design.

The cluster is based on virtual synchrony: each broker multicasts
events and the events from all brokers are serialized and delivered in
the same order to each broker.

In the current design raw byte buffers from client connections are
multicast, serialized and delivered in the same order to each broker.

Each broker has a replica of all queues, exchanges, bindings and also
all connections & sessions from every broker. Cluster code treats the
broker as a "black box", it "plays" the client data into the
connection objects and assumes that by giving the same input, each
broker will reach the same state.

A new broker joining the cluster receives a snapshot of the current
cluster state, and then follows the multicast conversation.

*** Maintenance issues.

The entire state of each broker is replicated to every member:
connections, sessions, queues, messages, exchanges, management objects
etc. Any discrepancy in the state that affects how messages are
allocated to consumers can cause an inconsistency.

- Entire broker state must be faithfully updated to new members.
- Management model also has to be replicated.
- All queues are replicated, can't have unreplicated queues (e.g. for management)

Events that are not deterministically predictable from the client
input data stream can cause inconsistencies. In particular use of
timers/timestamps require cluster workarounds to synchronize.

A member that encounters an error which is not encounted by all other
members is considered inconsistent and will shut itself down. Such
errors can come from any area of the broker code, e.g. different
ACL files can cause inconsistent errors.

The following areas required workarounds to work in a cluster:

- Timers/timestamps in broker code: management, heartbeats, TTL
- Security: cluster must replicate *after* decryption by security layer.
- Management: not initially included in the replicated model, source of many inconsistencies.

It is very easy for someone adding a feature or fixing a bug in the
standalone broker to break the cluster by:
- adding new state that needs to be replicated in cluster updates.
- doing something in a timer or other non-connection thread.

It's very hard to test for such breaks. We need a looser coupling
and a more explicitly defined interface between cluster and standalone
broker code.

*** Performance issues.

Virtual synchrony delivers all data from all clients in a single
stream to each broker.  The cluster must play this data thru the full
broker code stack: connections, sessions etc. in a single thread
context in order to get identical behavior on each broker. The cluster
has a pipelined design to get some concurrency but this is a severe
limitation on scalability in multi-core hosts compared to the
standalone broker which processes each connection in a separate thread
context.

** A new cluster design.

Clearly defined interface between broker code and cluster plug-in.

Replicate queue events rather than client data.
- Broker behavior only needs to match per-queue.
- Smaller amount of code (queue implementation) that must behave predictably.
- Events only need be serialized per-queue, allows concurrency between queues

Use a moving queue ownership protocol to agree order of dequeues.
No longer relies on identical state and lock-step behavior to cause
identical dequeues on each broker.

Each queue has an associated thread-context. Events for a queue are executed
in that queues context, in parallel with events for other queues.

*** Requirements

The cluster must provide these delivery guarantees:

- client sends transfer: message must be replicated and not lost even if the local broker crashes.
- client acquires a message: message must not be delivered on another broker while acquired.
- client accepts message: message is forgotten, will never be delivered or re-queued by any broker.
- client releases message: message must be re-queued on cluster and not lost.
- client rejects message: message must be dead-lettered or discarded and forgotten.
- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster.

Each guarantee takes effect when the client receives a *completion*
for the associated command (transfer, acquire, reject, accept)

*** Broker receiving messages

On recieving a message transfer, in the connection thread we:
- multicast a message-received event.
- enqueue and complete the transfer when it is self-delivered.

Other brokers enqueue the message when they recieve the message-received event.

Enqueues are queued up with other queue operations to be executed in the
thread context associated with the queue.

*** Broker sending messages: moving queue ownership

Each queue is *owned* by at most one cluster broker at a time. Only
that broker may acquire or dequeue messages. The owner multicasts
notification of messages it acquires/dequeues to the cluster.
Periodically the owner hands over ownership to another interested
broker, providing time-shared access to the queue among all interested
brokers.

We assume the same IO-driven dequeuing algorithm as the standalone
broker with one modification: queues can be "locked". A locked queue
is not available for dequeuing messages and will be skipped by the
output algorithm.

At any given time only those queues owned by the local broker will be
unlocked.

As messages are acquired/dequeued from unlocked queues by the IO threads
the broker multicasts acquire/dequeue events to the cluster.

When an unlocked queue has no more consumers with credit, or when a
time limit expires, the broker relinquishes ownership by multicasting
a release-queue event, allowing another interested broker to take
ownership.

*** Asynchronous completion of accept
### HERE
In acknowledged mode a message is not forgotten until it is accepted,
to allow for requeue on rejection or crash. The accept should not be
completed till the message has been forgotten.

On receiving an accept the broker:
- dequeues the message from the local queue
- multicasts an "accept" event
- completes the accept asynchronously when the dequeue event is self delivered.

NOTE: The message store does not currently implement asynchronous
completions of accept, this is a bug.

** Inconsistent errors.

The new design eliminates most sources of inconsistent errors
(connections, sessions, security, management etc.) The only points
where inconsistent errors can occur are at enqueue and dequeue (most
likely store-related errors.)

The new design can use the exisiting error-handling protocol with one
major improvement: since brokers are no longer required to maintain
identical state they do not have to stall processing while an error is
being resolved.

#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.

** Updating new members

When a new member (the updatee) joins a cluster it needs to be brought
up to date with the rest of the cluster.  An existing member (the
updater) sends an "update".

In the old cluster design the update is a snapshot of the entire
broker state.  To ensure consistency of the snapshot both the updatee
and the updater "stall" at the start of the update, i.e. they stop
processing multicast events and queue them up for processing when the
update is complete. This creates a back-log of work to get through,
which leaves them lagging behind the rest of the cluster till they
catch up (which is not guaranteed to happen in a bounded time.)

With the new cluster design only exchanges, queues, bindings and
messages need to be replicated.

Update of wiring (exchanges, queues, bindings) is the same as current
design.

Update of messages is different:
- per-queue rather than per-broker, separate queues can be updated in parallel.
- updates queues in reverse order to eliminate unbounded catch-up
- does not require updater & updatee to stall during update.

Replication events, multicast to cluster:
- enqueue(q,m): message m pushed on back of queue q .
- acquire(q,m): mark m acquired
- dequeue(q,m): forget m.
Messages sent on update connection:
- update_front(q,m): during update, receiver pushes m to *front* of q
- update_done(q): during update, update of q is complete.

Updater:
- when updatee joins set iterator i = q.end()
- while i != q.begin(): --i; send update_front(q,*i) to updatee
- send update_done(q) to updatee

Updatee:
- q initially in locked state, can't dequeue locally.
- start processing replication events for q immediately (enqueue, dequeue, acquire etc.)
- receive update_front(q,m): q.push_front(m)
- receive update_done(q): q can be unlocked for local dequeing.

Benefits:
- Stall only for wiring update: updater & updatee can process multicast messages while messages are updated.
- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update.
- During update consumers actually help by removing messages before they need to be updated.
- Needs no separate "work to do" queue, only the broker queues themselves.

# TODO how can we recover from updater crashing before update complete?
# Clear queues that are not updated & send request for udpates on those queues?

# TODO updatee may receive a dequeue for a message it has not yet seen, needs
# to hold on to that so it can drop the message when it is seen.
# Similar problem exists for wiring?

** Cluster API

The new cluster API is similar to the MessageStore interface.
(Initially I thought it would be an extension of the MessageStore interface,
but as the design develops it seems better to make it a separate interface.)

The cluster interface captures these events:
- wiring changes: queue/exchange declare/bind
- message enqueued/acquired/released/rejected/dequeued.

The cluster will require some extensions to the Queue:
- Queues can be "locked", locked queues are ignored by IO-driven output.
- Cluster must be able to apply queue events from the cluster to a queue.
  These appear to fit into existing queue operations.

** Maintainability

This design gives us more robust code with a clear and explicit interfaces.

The cluster depends on specific events clearly defined by an explicit
interface. Provided the semantics of this interface are not violated,
the cluster will not be broken by changes to broker code.

The cluster no longer requires identical processing of the entire
broker stack on each broker. It is not affected by the details of how
the broker allocates messages. It is independent of the
protocol-specific state of connections and sessions and so is
protected from future protocol changes (e.g. AMQP 1.0)

A number of specific ways the code will be simplified:
- drop code to replicate management model.
- drop timer workarounds for TTL, management, heartbeats.
- drop "cluster-safe assertions" in broker code.
- drop connections, sessions, management from cluster update.
- drop security workarounds: cluster code now operates after message decoding.
- drop connection tracking in cluster code.
- simper inconsistent-error handling code, no need to stall.

** Performance

The only way to verify the relative performance of the new design is
to prototype & profile. The following points suggest the new design
may scale/perform better:

Some work moved from virtual synchrony thread to connection threads:
- All connection/session logic moves to connection thread.
- Exchange routing logic moves to connection thread.
- On local broker dequeueing is done in connection thread
- Local broker dequeue is IO driven as for a standalone broker.

For queues with all consumers on a single node dequeue is all
IO-driven in connection thread. Pay for time-sharing only if queue has
consumers on multiple brokers.

Doing work for different queues in parallel scales on multi-core boxes when
there are multiple queues.

One difference works against performance, thre is an extra
encode/decode. The old design multicasts raw client data and decodes
it in the virtual synchrony thread. The new design would decode
messages in the connection thread, re-encode them for multicast, and
decode (on non-local brokers) in the virtual synchrony thread. There
is extra work here, but only in the *connection* thread: on a
multi-core machine this happens in parallel for every connection, so
it probably is not a bottleneck. There may be scope to optimize
decode/re-encode by re-using some of the original encoded data, this
could also benefit the stand-alone broker.

** Asynchronous queue replication

The existing "asynchronous queue replication" feature maintains a
passive backup passive backup of queues on a remote broker over a TCP
connection.

The new cluster replication protocol could be re-used to implement
asynchronous queue replication: its just a special case where the
active broker is always the queue owner and the enqueue/dequeue
messages are sent over a TCP connection rather than multicast.

The new update update mechanism could also work with 'asynchronous
queue replication', allowing such replication (over a TCP connection
on a WAN say) to be initiated after the queue had already been created
and been in use (one of the key missing features).

** Increasing Concurrency and load sharing

The current cluster is bottlenecked by processing everything in the
CPG deliver thread. By removing the need for identical operation on
each broker, we open up the possiblility of greater concurrency.

Handling multicast enqueue, acquire, accpet, release etc: concurrency
per queue.  Operatons on different queues can be done in different
threads.

The new design does not force each broker to do all the work in the
CPG thread so spreading load across cluster members should give some
scale-up.

** Misc outstanding issues & notes

Replicating wiring
- Need async completion of wiring commands?
- qpid.sequence_counter: need extra work to support in new design, do we care?

Cluster+persistence:
- finish async completion: dequeue completion for store & cluster
- cluster restart from store: clean stores *not* identical, pick 1, all others update.
- need to generate cluster ids for messages recovered from store.

Live updates: we don't need to stall brokers during an update!
- update on queue-by-queue basis.
- updatee locks queues during update, no dequeue.
- update in reverse: don't update messages dequeued during update.
- updatee adds update messages at front (as normal), replicated messages at back.
- updater starts from back, sends "update done" when it hits front of queue.

Flow control: need to throttle multicasting
1. bound the number of outstanding multicasts.
2. ensure the entire cluster keeps up, no unbounded "lag"
The existing design uses read-credit to solve 1., and does not solve 2.
New design should stop reading on all connections while flow control
condition exists?

Can federation also be unified, at least in configuration?

Consider queues (and exchanges?) as having "reliability" attributes:
- persistent: is the message stored on disk.
- backed-up (to another broker): active/passive async replication.
- replicated (to a cluster): active/active multicast replication to cluster.
- federated: federation link to a queue/exchange on another broker.

"Reliability" seems right for the first 3 but not for federation, is
there a better term?

Clustering and scalability: new design may give us the flexibility to
address scalability as part of cluster design. Think about
relationship to federation and "fragmented queues" idea.

* Design debates/descisions

** Active/active vs. active passive

An active-active cluster can be used in an active-passive mode. In
this mode we would like the cluster to be as efficient as a strictly
active-passive implementation.

An active/passive implementation allows some simplifications over active/active:
- drop Queue ownership and locking
- don't need to replicate message acquisition.
- can do immediate local enqueue and still guarantee order.

Active/passive introduces a few extra requirements:
- Exactly one broker hast to take over if primary fails.
- Passive members must refuse client connections.
- On failover, clients must re-try all known addresses till they find the active member.

Active/active benefits:
- A broker failure only affects the subset of clients connected to that broker.
- Clients can switch to any other broker on failover
- Backup brokers are immediately available on failover.
- Some load sharing: reading from client + multicast only done on direct node.

Active/active drawbacks:
- Co-ordinating message acquisition may impact performance (not tested)
- Code may be more complex that active/passive.

Active/passive benefits:
- Don't need message allocation strategy, can feed consumers at top speed.
- Code may be simpler than active/active.

Active/passive drawbacks:
- All clients on one node so a failure affects every client in the system.
- After a failure there is a "reconnect storm" as every client reconnects to the new active node.
- After a failure there is a period where no broker is active, until the other brokers realize the primary is gone and agree on the new primary.
- Clients must find the single active node, may involve multiple connect attempts.

** Total ordering.

Initial thinking: allow message ordering to differ between brokers.
New thinking: use CPG total ordering, get identical ordering on all brokers.
- Allowing variation in order introduces too much chance of unexpected behavior.
- Usign total order allows other optimizations, see Message Identifiers below.

** Message identifiers.

Initial thinking: message ID = CPG node id + 64 bit sequence number.
This involves a lot of mapping between cluster IDs and broker messsages.

New thinking: message ID = queue name + queue position.
- Removes most of the mapping and memory management for cluster code.
- Requires total ordering of messages (see above)

** Message rejection

Initial thinking: add special reject/rejected points to cluster interface so
rejected messages could be re-queued without multicast.

New thinking: treat re-queueing after reject as entirely new message.
- Simplifies cluster interface & implementation
- Not on the critical path.