summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
blob: 1a0010ee525bae806ba423c39e2b27719a18cbbf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
-*-org-*-

* A new design for Qpid clustering.

** Issues with current design.

The cluster is based on virtual synchrony: each broker multicasts
events and the events from all brokers are serialized and delivered in
the same order to each broker.

In the current design raw byte buffers from client connections are
multicast, serialized and delivered in the same order to each broker.

Each broker has a replica of all queues, exchanges, bindings and also
all connections & sessions from every broker. Cluster code treats the
broker as a "black box", it "plays" the client data into the
connection objects and assumes that by giving the same input, each
broker will reach the same state.

A new broker joining the cluster receives a snapshot of the current
cluster state, and then follows the multicast conversation.

*** Maintenance issues.

The entire state of each broker is replicated to every member:
connections, sessions, queues, messages, exchanges, management objects
etc. Any discrepancy in the state that affects how messages are
allocated to consumers can cause an inconsistency.

- Entire broker state must be faithfully updated to new members.
- Management model also has to be replicated.
- All queues are replicated, can't have unreplicated queues (e.g. for management)
  
Events that are not deterministically predictable from the client
input data stream can cause inconsistencies. In particular use of
timers/timestamps require cluster workarounds to synchronize.

A member that encounters an error which is not encounted by all other
members is considered inconsistent and will shut itself down. Such
errors can come from any area of the broker code, e.g. different
ACL files can cause inconsistent errors.

The following areas required workarounds to work in a cluster:

- Timers/timestamps in broker code: management, heartbeats, TTL
- Security: cluster must replicate *after* decryption by security layer.
- Management: not initially included in the replicated model, source of many inconsistencies.

It is very easy for someone adding a feature or fixing a bug in the
standalone broker to break the cluster by:
- adding new state that needs to be replicated in cluster updates.
- doing something in a timer or other non-connection thread.

It's very hard to test for such breaks. We need a looser coupling
and a more explicitly defined interface between cluster and standalone
broker code.

*** Performance issues.

Virtual synchrony delivers all data from all clients in a single
stream to each broker.  The cluster must play this data thru the full
broker code stack: connections, sessions etc. in a single thread
context. The cluster has a pipelined design to get some concurrency
but this is a severe limitation on scalability in multi-core hosts
compared to the standalone broker which processes each connection
in a separate thread context.

** A new cluster design.
   
Maintain /equivalent/ state not /identical/ state on each member. 

Messages from different sources need not be ordered identically on all members. 

Use a moving queue ownership protocol to agree order of dequeues, rather
than relying on identical state and lock-step behavior to cause
identical dequeues on each broker.

*** Requirements

The cluster must provide these delivery guarantees: 

- client sends transfer: message must be replicated and not lost even if the local broker crashes.
- client acquires a message: message must not be delivered on another broker while acquired.
- client accepts message: message is forgotten, will never be delivered or re-queued by any broker.
- client releases message: message must be re-queued on cluster and not lost.
- client rejects message: message must be dead-lettered or discarded and forgotten.
- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster.


Each guarantee takes effect when the client receives a *completion*
for the associated command (transfer, acquire, reject, accept)

The cluster must provide this ordering guarantee:

- messages from the same publisher received by the same subscriber
  must be received in the order they were sent (except in the case of
  re-queued messages.)

*** Broker receiving messages
     
On recieving a message transfer, in the connection thread we:
- multicast a message-received event.
- enqueue the message on the local queue.
- asynchronously complete the transfer when the message-received is self-delivered.

This is exactly like the asynchronous completion in the MessageStore:
the cluster "stores" a message by multicast. We send a completion to
the client asynchronously when the multicast "completes" by
self-delivery. This satisfies the "client sends transfer" guarantee,
but makes the message available on the queue immediately, avoiding the
multicast latency.

It also moves most of the work to the client connection thread. The
only work in the virtual synchrony deliver thread is sending the client
completion.

Other brokers enqueue the message when they recieve the
message-received event, in the virtual synchrony deliver thread.

*** Broker sending messages: moving queue ownership

Each queue is *owned* by at most one cluster broker at a time. Only
that broker may acquire or dequeue messages. The owner multicasts
notification of messages it acquires/dequeues to the cluster.
Periodically the owner hands over ownership to another interested
broker, providing time-shared access to the queue among all interested
brokers.

This means we no longer need identical message ordering on all brokers
to get consistent dequeuing.  Messages from different sources can be
ordered differently on different brokers. 

We assume the same IO-driven dequeuing algorithm as the standalone
broker with one modification: queues can be "locked". A locked queue
is not available for dequeuing messages and will be skipped by the
output algorithm.

At any given time only those queues owned by the local broker will be
unlocked.

As messages are acquired/dequeued from unlocked queues by the IO threads
the broker multicasts acquire/dequeue events to the cluster.

When an unlocked queue has no more consumers with credit, or when a
time limit expires, the broker relinquishes ownership by multicasting
a release-queue event, allowing another interested broker to take
ownership.

*** Asynchronous completion of accept

In acknowledged mode a message is not forgotten until it is accepted,
to allow for requeue on rejection or crash. The accept should not be
completed till the message has been forgotten.

On receiving an accept the broker:
- dequeues the message from the local queue
- multicasts a "dequeue" event
- completes the accept asynchronously when the dequeue event is self delivered. 

NOTE: The message store does not currently implement asynchronous
completions of accept, this is a bug.

** Inconsistent errors.

The new design eliminates most sources of inconsistent errors
(connections, sessions, security, management etc.) The only points
where inconsistent errors can occur are at enqueue and dequeue (most
likely store-related errors.)

The new design can use the exisiting error-handling protocol with one
major improvement: since brokers are no longer required to maintain
identical state they do not have to stall processing while an error is
being resolved.

#TODO: The only source of dequeue errors is probably an unrecoverable journal failure.

When a new member (the updatee) joins a cluster it needs to be brought up to date.
The old cluster design an existing member (the updater) sends a state snapshot.

To ensure consistency of the snapshot both the updatee and the updater
"stall" at the point of the update. They stop processing multicast
events and queue them up for processing when the update is
complete. This creates a back-log of work for each to get through,
which leaves them lagging behind the rest of the cluster till they
catch up (which is not guaranteed to happen in a bounded time.)

** Better handling of new brokers joining

When a new member (the updatee) joins a cluster it needs to be brought
up to date with the rest of the cluster.  An existing member (the
updater) sends an "update".

In the old cluster design the update is a snapshot of the entire
broker state.  To ensure consistency of the snapshot both the updatee
and the updater "stall" at the start of the update, i.e. they stop
processing multicast events and queue them up for processing when the
update is complete. This creates a back-log of work to get through,
which leaves them lagging behind the rest of the cluster till they
catch up (which is not guaranteed to happen in a bounded time.)

With the new cluster design only queues need to be replicated
(actually wiring needs replication also, see below.)

The new update is:
- per-queue rather than per-broker, separate queues can be updated in parallel.
- updates queues in reverse order to eliminate potentially unbounded catch-up

Replication events, multicast to cluster:
- enqueue(q,m): message m pushed on back of queue q .
- acquire(q,m): mark m acquired
- dequeue(q,m): forget m.
Messages sent on update connection:
- update_front(q,m): during update, receiver pushes m to *front* of q
- update_done(q): during update, update of q is complete.
  
Updater: 
- when updatee joins set iterator i = q.end()
- while i != q.begin(): --i; send update_front(q,*i) to updatee
- send update_done(q) to updatee

Updatee:
- q initially in locked state, can't dequeue locally.
- start processing replication events for q immediately (enqueue, dequeue, acquire etc.)
- receive update_front(q,m): q.push_front(m)
- receive update_done(q): q can be unlocked for local dequeing.

Benefits:
- No stall: updarer & updatee process multicast messages throughout the update.
- No unbounded catch-up: update consists of at most N update_front() messages where N=q.size() at start of update.
- During update consumers actually help by removing messages before they need to be updated.
- Needs no separate "work to do" queue, only the brokers queues themselves.

# TODO how can we recover from updater crashing before update complete?
# Clear queues that are not updated & send request for udpates on those queues?
  
# TODO above is incomplete, we also need to replicate exchanges & bindings.
# Think about where this fits into the update process above and when
# local clients of the updatee can start to send messages.

# TODO updatee may receive a dequeue for a message it has not yet seen, needs
# to hold on to that so it can drop the message when it is seen.
# Similar problem exists for wiring?
  
** Cluster API

The new cluster API is an extension of the existing MessageStore API.

The MessageStore API already covers these events:
- wiring changes: queue/exchange declare/bind
- message enqueued/dequeued.

The cluster needs to add a "message acquired" call, which store
implementations can ignore.

The cluster will require some extensions to the Queue:
- Queues can be "locked", locked queues are skipped by IO-driven output.
- Messages carry a cluster-message-id.
- messages can be dequeued by cluster-message-id

When cluster+store are in use the cluster implementation of MessageStore
will delegate to the store implementation.

** Maintainability

This design gives us more robust code with a clear and explicit interfaces.

The cluster depends on specific, well-defined events - defined by the
extended MessageStore API. Provided the semantics of this API are not
violated, the cluster will not be broken by changes to broker code.

Re-using the established MessageStore API provides assurance that the
API is sound and gives economy of design.

The cluster no longer requires identical processing of the entire
broker stack on each broker. It is not affected by the details of how
the broker allocates messages. It is independent of the
protocol-specific state of connections and sessions and so is
protected from future protocol changes (e.g. AMQP 1.0)

A number of specific ways the code will be simplified:
- drop code to replicate management model. 
- drop timer workarounds for TTL, management, heartbeats.
- drop "cluster-safe assertions" in broker code.
- drop connections, sessions, management from cluster update.
- drop security workarounds: cluster code now operates after message decoding.
- drop connection tracking in cluster code.
- simper inconsistent-error handling code, no need to stall.

** Performance

The only way to verify the relative performance of the new design is
to prototype & profile. The following points suggest the new design
may scale/perform better:

Moving work from virtual synchrony thread to connection threads where
it can be done in parallel:
- All connection/session logic moves to connection thread.
- Exchange routing logic moves to connection thread.
- Local broker does all enqueue/dequeue work in connection thread
- Enqueue/dequeue are IO driven as for a standalone broker.

Optimizes common cases (pay for what you use):
- Publisher/subscriber on same broker: replication is fully asynchronous, no extra latency.
- Unshared queue: dequeue is all IO-driven in connection thread.
- Time sharing: pay for time-sharing only if queue has consumers on multiple brokers. 

#TODO Not clear how the time sharing algorithm would compare with the existing cluster delivery algorithm.

Extra decode/encode: The old design multicasts raw client data and
decodes it in the virtual synchrony thread. The new design would
decode messages in the connection thread, re-encode them for
multicast, and decode (on non-local brokers) in the virtual synchrony
thread. There is extra work here, but only in the *connection* thread:
on a multi-core machine this happens in parallel for every connection,
so it probably is not a bottleneck. There may be scope to optimize
decode/re-encode by re-using some of the original encoded data, this
could also benefit the stand-alone broker.

** Misc outstanding issues & notes

Message IDs: need an efficient cluster-wide message ID.

Replicating wiring
- Need async completion of wiring commands? 
- qpid.sequence_counter: need extra work to support in new design, do we care?

Cluster+persistence:
- cluster implements MessageStore+ & delegates to store?
- finish async completion: dequeue completion for store & cluster
- need to support multiple async completions
- cluster restart from store: clean stores *not* identical, pick 1, all others update.
- async completion of wiring changes?

Live updates: we don't need to stall brokers during an update!
- update on queue-by-queue basis.
- updatee locks queues during update, no dequeue.
- update in reverse: don't update messages dequeued during update.
- updatee adds update messages at front (as normal), replicated messages at back.
- updater starts from back, sends "update done" when it hits front of queue.

Flow control: need to throttle multicasting
1. bound the number of outstanding multicasts.
2. ensure the entire cluster keeps up, no unbounded "lag"
The existing design uses read-credit to solve 1., and does not solve 2.
New design should stop reading on all connections while flow control
condition exists? 

Asynchronous queue replication could be refactored to work the same
way: under a MessageStore interface using the same enqueue/dequeue
protocol but over a TCP connection. Separate the "async queue
replication" code for reuse.

Can federation also be unified, at least in configuration?

Consider queues (and exchanges?) as having "reliability" attributes:
- persistent: is the message stored on disk.
- backed-up (to another broker): active/passive async replication.
- replicated (to a cluster): active/active multicast replication to cluster.
- federated: federation link to a queue/exchange on another broker.

"Reliability" seems right for the first 3 but not for federation, is
there a better term?