summaryrefslogtreecommitdiff
path: root/qpid/cpp/design_docs/new-cluster-plan.txt
blob: a6689dc6bd4e3b13a76bea15810a0dd49bb1e6c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
-*-org-*-

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

* Status of impementation

Meaning of priorities:
[#A] Essential for basic functioning.
[#B] Required for first release.
[#C] Can be addressed in a later release.

The existig prototype is bare bones to do performance benchmarks:
- Implements publish and consumer locking protocol.
- Defered delivery and asynchronous completion of message.
- Optimize the case all consumers are on the same node.
- No new member updates, no failover updates, no transactions, no persistence etc.

Prototype code is on branch qpid-2920-active, in cpp/src/qpid/cluster/exp/

** Similarities to existing cluster.

/Active-active/: the new cluster can be a drop-in replacement for the
old, existing tests & customer deployment configurations are still
valid.

/Virtual synchrony/: Uses corosync to co-ordinate activity of members.

/XML controls/: Uses XML to define the primitives multicast to the
cluster.

** Differences with existing cluster.

/Report rather than predict consumption/: brokers explicitly tell each
other which messages have been acquired or dequeued. This removes the
major cause of bugs in the existing cluster.

/Queue consumer locking/: to avoid duplicates only one broker can acquire or
dequeue messages at a time - while has the consume-lock on the
queue. If multiple brokers are consuming from the same queue the lock
is passed around to time-share access to the queue.

/Per-queue concurrency/: uses a fixed-size set of CPG groups (reflecting
the concurrency of the host) to allow concurrent processing on
different queues. Queues are hashed onto the groups.

* Completed tasks
** DONE [#A] Minimal POC: publish/acquire/dequeue protocol.
   CLOSED: [2011-10-05 Wed 16:03]

Defines broker::Cluster interface and call points.
Initial interface commite

Main classes
Core: central object holding cluster classes together (replaces cluster::Cluster)
BrokerContext: implements broker::Cluster interface.
QueueContext: Attached to a broker::Queue, holds cluster status.
MessageHolder:holds local messages while they are being enqueued.

Implements multiple CPG groups for better concurrency.

** DONE [#A] Large message replication.
   CLOSED: [2011-10-05 Wed 17:22]
Multicast using fixed-size (64k) buffers, allow fragmetation of messages across buffers (frame by frame)

* Design Questions
** [[Queue sequence numbers vs. independant message IDs]] 

Current prototype uses queue+sequence number to identify message. This
is tricky for updating new members as the sequence numbers are only
known on delivery.

Independent message IDs that can be generated and sent as part of the
message simplify this and potentially allow performance benefits by
relaxing total ordering.  However they require additional map lookups
that hurt performance.

- [X] Prototype independent message IDs, check performance.
Throughput worse by 30% in contented case, 10% in uncontended.

* Tasks to match existing cluster
** TODO [#A] Review old cluster code for more tasks. 1
** TODO [#A] Defer and async completion of wiring commands. 5
Testing requirement: Many tests assume wiring changes are visible
across the cluster once the commad completes.

Name clashes: need to avoid race if same name queue/exchange declared
on 2 brokers simultaneously

** TODO [#A] Update to new members joining. 10.

Need to resolve [[Queue sequence numbers vs. independant message IDs]] first.
- implicit sequence numbers are more tricky to replicate to new member.

Update individual  objects (queues and exchanges) independently.
- create queues first, then update all queues and exchanges in parallel.
- multiple updater threads, per queue/exchange.
- updater sends messages to special exchange(s) (not using extended AMQP controls)

Queue updater:
- marks the queue position at the sync point
- sends messages starting from the sync point working towards the head of the queue.
- send "done" message.
Note: updater remains active throughout, consuming clients actually reduce the
size of the update.

Queue updatee:
- enqueues received from CPG: add to back of queue as normal.
- dequeues received from CPG: apply if found, else save to check at end of update.
- messages from updater: add to the *front* of the queue.
- update complete: apply any saved dequeues.

Exchange updater:
- updater: send snapshot of exchange as it was at the sync point.

Exchange updatee:
- queue exchange operations after the sync point.
- when snapshot is received: apply saved operations.

Updater remains active throughout.
Updatee stalls clients until the update completes.

Updating queue/exchange/binding objects is via the same encode/decode
that is used by the store. Updatee to use recovery interfaces to
recover?

** TODO [#A] Failover updates to client. 2
Implement the amq.failover exchange to notify clients of membership.
** TODO [#A] Passing all existing cluster tests. 5

The new cluster should be a drop-in replacement for the old, so it
should be able to pass all the existing tests.

** TODO [#B] Initial status protocol. 3
Handshake to give status of each broker member to new members joining.
Status includes
- cluster protocol version.
- persistent store state (clean, dirty)
- make it extensible, so additional state can be added in new protocols

** TODO [#B] Persistent cluster startup. 4

Based on existing code:
- Exchange dirty/clean exchanged in initial status.
- Only one broker recovers from store, others update.
** TODO [#B] Replace boost::hash with our own hash function. 1  
The hash function is effectively part of the interface so
we need to be sure it doesn't change underneath us.

** TODO [#B] Management model. 3
Alerts for inconsistent message loss.

** TODO [#B] Management methods that modify queues. 5
Replicate management methods that modify queues - e.g. move, purge.
Target broker may not have all messages on other brokers for purge/destroy.
- Queue::move() - need to wait for lock? Replicate?
- Queue::get() - ???
- Queue::purge() - replicate purge? or just delete what's on broker ?
- Queue::destroy() - messages to alternate exchange on all brokers.?

Need to add callpoints & mcast messages to replicate these?
** TODO [#B] TX transaction support. 5
Extend broker::Cluster interface to capture transaction context and completion.
Running brokers exchange TX information.
New broker update includes TX information.

    // FIXME aconway 2010-10-18: As things stand the cluster is not
    // compatible with transactions
    // - enqueues occur after routing is complete
    // - no call to Cluster::enqueue, should be in Queue::process?
    // - no transaction context associated with messages in the Cluster interface.
    // - no call to Cluster::accept in Queue::dequeueCommitted

Injecting holes into a queue:
- Multicast a 'non-message' that just consumes one queue position.
- Used to reserve a message ID (position) for a non-commited message.
- Also could allow non-replicated messages on a replicated queue if required.

** TODO [#B] DTX transaction support. 5
Extend broker::Cluster interface to capture transaction context and completion.
Running brokers exchange DTX information.
New broker update includes DTX information.

** TODO [#B] Async completion of accept. 4
When this is fixed in the standalone broker, it should be fixed for cluster.

** TODO [#B] Network partitions and quorum. 2
Re-use existing implementation.

** TODO [#B] Review error handling, put in a consitent model. 4.
- [ ] Review all asserts, for possible throw.
- [ ] Decide on fatal vs. non-fatal errors.

** TODO [#B] Implement inconsistent error handling policy. 5
What to do if a message is enqueued sucessfully on some broker(s),
but fails on other(s) - e.g. due to store limits?
- fail on local broker = possible message loss.
- fail on non-local broker = possible duplication.

We have more flexibility now, we don't *have* to crash
- but we've lost some of our redundancy guarantee, how to inform user? 

Options to respond to inconsistent error:
- stop broker
- reset broker (exec a new qpidd)
- reset queue
- log critical
- send management event

Most important is to inform of the risk of message loss.
Current favourite: reset queue+log critical+ management event.
Configurable choices?

** TODO [#C] Allow non-replicated exchanges, queues. 5

3 levels set in declare arguments:
- qpid.replicate=no - nothing is replicated.
- qpid.replicate=wiring - queues/exchanges are replicated but not messages.
- qpid.replicate=yes - queues exchanges and messages are replicated.

Wiring use case: it's OK to lose some messages (up to the max depth of
the queue) but the queue/exchange structure must be highly available
so clients can resume communication after fail over.

Configurable default? Default same as old cluster?

Need to
- save replicated status to stored (in arguments).
- support in management tools.

** TODO [#C] Handling immediate messages in a cluster. 2
Include remote consumers in descision to deliver an immediate message.
* Improvements over existing cluster
** TODO [#C] Remove old cluster hacks and workarounds. 
The old cluster has workarounds in the broker code that can be removed.
- [ ] drop code to replicate management model.
- [ ] drop timer workarounds for TTL, management, heartbeats.
- [ ] drop "cluster-safe assertions" in broker code.
- [ ] drop connections, sessions, management from cluster update.
- [ ] drop security workarounds: cluster code now operates after message decoding.
- [ ] drop connection tracking in cluster code.
- [ ] simpler inconsistent-error handling code, no need to stall.

** TODO [#C] Support for live upgrades.

Allow brokers in a running cluster to be replaced one-by-one with a new version.
(see new-cluster-design for design notes.)

The old cluster protocol was unstable because any changes in broker
state caused changes to the cluster protocol.The new design should be
much more stable.

Points to implement in anticipation of live upgrade:
- Prefix each CPG message with a version number and length.
  Version number determines how to decode the message.
- Brokers ignore messages that have a higher version number than they understand.
- Protocol version XML element in cluster.xml, on each control.
- Initial status protocol to include protocol version number.

New member udpates: use the store encode/decode for updates, use the
same backward compatibility strategy as the store. This allows for
adding new elements to the end of structures but not changing or
removing new elements.

NOTE: Any change to the association of CPG group names and queues will
break compatibility. How to work around this?

** TODO [#C] Refactoring of common concerns.

There are a bunch of things that act as "Queue observers" with intercept
points in similar places.
- QueuePolicy
- QueuedEvents (async replication)
- MessageStore
- Cluster

Look for ways to capitalize on the similarity & simplify the code.

In particular QueuedEvents (async replication) strongly resembles
cluster replication, but over TCP rather than multicast.

** TODO [#C] Support for AMQP 1.0.

* Testing
** TODO [#A] Pass all existing cluster tests.
Requires [[Defer and async completion of wiring commands.]]
** TODO [#A] New cluster tests.
Stress tests & performance benchmarks focused on changes in new cluster:
- concurrency by queues rather than connections.
- different handling shared queues when consuemrs are on different brokers.