1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
|
-*-org-*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
* A new design for Qpid clustering.
** Issues with old cluster design
See [[./old-cluster-issues.txt]]
** A new cluster design.
1. Clearly defined interface between broker code and cluster plug-in.
2. Replicate queue events rather than client data.
- Only requires consistent enqueue order.
- Events only need be serialized per-queue, allows concurrency between queues
- Allows for replicated and non-replicated queues.
3. Use a lock protocol to agree order of dequeues: only the broker
holding the lock can acqiure & dequeue. No longer relies on
identical state and lock-step behavior to cause identical dequeues
on each broker.
4. Use multiple CPG groups to process different queues in
parallel. Use a fixed set of groups and hash queue names to choose
the group for each queue.
*** Requirements
The cluster must provide these delivery guarantees:
- client sends transfer: message must be replicated and not lost even if the local broker crashes.
- client acquires a message: message must not be delivered on another broker while acquired.
- client accepts message: message is forgotten, will never be delivered or re-queued by any broker.
- client releases message: message must be re-queued on cluster and not lost.
- client rejects message: message must be dead-lettered or discarded and forgotten.
- client disconnects/broker crashes: acquired but not accepted messages must be re-queued on cluster.
Each guarantee takes effect when the client receives a *completion*
for the associated command (transfer, acquire, reject, accept)
*** Broker receiving messages
On recieving a message transfer, in the connection thread we:
- multicast a message-received event.
- enqueue and complete the transfer when it is self-delivered.
Other brokers enqueue the message when they recieve the message-received event.
Enqueues are queued up with other queue operations to be executed in the
thread context associated with the queue.
*** Broker sending messages: moving queue ownership
Each queue is *owned* by at most one cluster broker at a time. Only
that broker may acquire or dequeue messages. The owner multicasts
notification of messages it acquires/dequeues to the cluster.
Periodically the owner hands over ownership to another interested
broker, providing time-shared access to the queue among all interested
brokers.
We assume the same IO-driven dequeuing algorithm as the standalone
broker with one modification: queues can be "locked". A locked queue
is not available for dequeuing messages and will be skipped by the
output algorithm.
At any given time only those queues owned by the local broker will be
unlocked.
As messages are acquired/dequeued from unlocked queues by the IO threads
the broker multicasts acquire/dequeue events to the cluster.
When an unlocked queue has no more consumers with credit, or when a
time limit expires, the broker relinquishes ownership by multicasting
a release-queue event, allowing another interested broker to take
ownership.
*** Asynchronous completion of accept
In acknowledged mode a message is not forgotten until it is accepted,
to allow for requeue on rejection or crash. The accept should not be
completed till the message has been forgotten.
On receiving an accept the broker:
- dequeues the message from the local queue
- multicasts an "accept" event
- completes the accept asynchronously when the dequeue event is self delivered.
NOTE: The message store does not currently implement asynchronous
completions of accept, this is a bug.
*** Multiple CPG groups.
The old cluster was bottlenecked by processing everything in a single
CPG deliver thread.
The new cluster uses a set of CPG groups, one per core. Queue names
are hashed to give group indexes, so statistically queues are likely
to be spread over the set of groups.
Operations on a given queue always use the same group, so we have
order within each queue, but operations on different queues can use
different groups giving greater throughput sending to CPG and multiple
handler threads to process CPG messages.
** Inconsistent errors.
An inconsistent error means that after multicasting an enqueue, accept
or dequeue, some brokers succeed in processing it and others fail.
The new design eliminates most sources of inconsistent errors in the
old broker: connections, sessions, security, management etc. Only
store journal errors remain.
The new inconsistent error protocol is similar to the old one with one
major improvement: brokers do not have to stall processing while an
error is being resolved.
** Updating new members
When a new member (the updatee) joins a cluster it needs to be brought
up to date with the rest of the cluster. An existing member (the
updater) sends an "update".
In the old cluster design the update is a snapshot of the entire
broker state. To ensure consistency of the snapshot both the updatee
and the updater "stall" at the start of the update, i.e. they stop
processing multicast events and queue them up for processing when the
update is complete. This creates a back-log of work to get through,
which leaves them lagging behind the rest of the cluster till they
catch up (which is not guaranteed to happen in a bounded time.)
With the new cluster design only exchanges, queues, bindings and
messages need to be replicated.
We update individual objects (queues and exchanges) independently.
- create queues first, then update all queues and exchanges in parallel.
- multiple updater threads, per queue/exchange.
Queue updater:
- marks the queue position at the sync point
- sends messages starting from the sync point working towards the head of the queue.
- send "done" message.
Queue updatee:
- enqueues received from CPG: add to back of queue as normal.
- dequeues received from CPG: apply if found, else save to check at end of update.
- messages from updater: add to the *front* of the queue.
- update complete: apply any saved dequeues.
Exchange updater:
- updater: send snapshot of exchange as it was at the sync point.
Exchange updatee:
- queue exchange operations after the sync point.
- when snapshot is received: apply saved operations.
Note:
- Updater is active throughout, no stalling.
- Consuming clients actually reduce the size of the update.
- Updatee stalls clients until the update completes.
(Note: May be possible to avoid updatee stall as well, needs thought)
** Internal cluster interface
The new cluster interface is similar to the MessageStore interface, but
provides more detail (message positions) and some additional call
points (e.g. acquire)
The cluster interface captures these events:
- wiring changes: queue/exchange declare/bind
- message enqueued/acquired/released/rejected/dequeued.
- transactional events.
** Maintainability
This design gives us more robust code with a clear and explicit interfaces.
The cluster depends on specific events clearly defined by an explicit
interface. Provided the semantics of this interface are not violated,
the cluster will not be broken by changes to broker code.
The cluster no longer requires identical processing of the entire
broker stack on each broker. It is not affected by the details of how
the broker allocates messages. It is independent of the
protocol-specific state of connections and sessions and so is
protected from future protocol changes (e.g. AMQP 1.0)
A number of specific ways the code will be simplified:
- drop code to replicate management model.
- drop timer workarounds for TTL, management, heartbeats.
- drop "cluster-safe assertions" in broker code.
- drop connections, sessions, management from cluster update.
- drop security workarounds: cluster code now operates after message decoding.
- drop connection tracking in cluster code.
- simper inconsistent-error handling code, no need to stall.
** Performance
The standalone broker processes _connections_ concurrently, so CPU
usage increases as you add more connections.
The new cluster processes _queues_ concurrently, so CPU usage increases as you
add more queues.
In both cases, CPU usage peaks when the number of "units of
concurrency" (connections or queues) goes above the number of cores.
When all consumers on a queue are connected to the same broker the new
cluster uses the same messagea allocation threading/logic as a
standalone broker, with a little extra asynchronous book-keeping.
If a queue has multiple consumers connected to multiple brokers, the
new cluster time-shares the queue which is less efficient than having
all consumers on a queue connected to the same broker.
** Flow control
New design does not queue up CPG delivered messages, they are
processed immediately in the CPG deliver thread. This means that CPG's
flow control is sufficient for qpid.
** Live upgrades
Live upgrades refers to the ability to upgrade a cluster while it is
running, with no downtime. Each brokers in the cluster is shut down,
and then re-started with a new version of the broker code.
To achieve this
- Cluster protocl XML file has a new element <version number=N> attached
to each method. This is the version at which the method was added.
- New versions can only add methods, existing methods cannot be changed.
- The cluster handshake for new members includes the protocol version
at each member.
- Each cpg message starts with header: version,size. Allows new encodings later.
- Brokers ignore messages of higher version.
- The cluster's version is the lowest version among its members.
- A newer broker can join and older cluster. When it does, it must restrict
itself to speaking the older version protocol.
- When the cluster version increases (because the lowest version member has left)
the remaining members may move up to the new version.
* Design debates
** Active/active vs. active passive
An active-active cluster can be used in an active-passive mode. In
this mode we would like the cluster to be as efficient as a strictly
active-passive implementation.
An active/passive implementation allows some simplifications over active/active:
- drop Queue ownership and locking
- don't need to replicate message acquisition.
- can do immediate local enqueue and still guarantee order.
Active/passive introduces a few extra requirements:
- Exactly one broker has to take over if primary fails.
- Passive members must refuse client connections.
- On failover, clients must re-try all known addresses till they find the active member.
Active/active benefits:
- A broker failure only affects the subset of clients connected to that broker.
- Clients can switch to any other broker on failover
- Backup brokers are immediately available on failover.
- As long as a client can connect to any broker in the cluster, it can be served.
Active/passive benefits:
- Don't need to replicate message allocation, can feed consumers at top speed.
Active/passive drawbacks:
- All clients on one node so a failure affects every client in the
system.
- After a failure there is a "reconnect storm" as every client
reconnects to the new active node.
- After a failure there is a period where no broker is active, until
the other brokers realize the primary is gone and agree on the new
primary.
- Clients must find the single active node, may involve multiple
connect attempts.
- No service if a partition separates a client from the active broker,
even if the client can see other brokers.
|