summaryrefslogtreecommitdiff
path: root/src/couch_replicator/README.md
blob: 9822f27bdaede7617d02a1e125b1139a10a7c400 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
Developer Oriented Replicator Description
=========================================

This description of scheduling replicator's functionality is mainly geared to
CouchDB developers. It dives a bit into the internal and explains how
everything is connected together.

A natural place to start is the top application supervisor:
`couch_replicator_sup`. It's a `rest_for_one` restart strategy supervisor,
so if a child process terminates, the rest of the children in the hierarchy
following it are also terminated. This structure implies a useful constraint --
children lower in the list can safely call their siblings which are higher in
the list.

A description of each child:

 * `couch_replication_event`: Starts a gen_event publication bus to handle some
    replication related events. This used for example, to publish cluster
    membership changes by the `couch_replicator_clustering` process. But is
    also used in replication tests to monitor for replication events.
    Notification is performed via the `couch_replicator_notifier:notify/1`
    function. It's the first (left-most) child because
    `couch_replicator_clustering` uses it.

 * `couch_replicator_clustering`: This module maintains cluster membership
    information for the replication application and provides functions to check
    ownership of replication jobs. A cluster membership change is published via
    the `gen_event` event server named `couch_replication_event` as previously
    covered. Published events are `{cluster, stable}` when cluster membership
    has stabilized, that it, no node membership changes in a given period, and
    `{cluster, unstable}` which indicates there was a recent change to the
    cluster membership and now it's considered unstable. Listeners for cluster
    membership change include `couch_replicator_doc_processor` and
    `couch_replicator_db_changes`. When doc processor gets an `{cluster,
    stable}` event it will remove all the replication jobs not belonging to the
    current node. When `couch_replicator_db_changes` gets a `{cluster,
    stable}` event, it will restart the `couch_multidb_changes` process it
    controls, which will launch an new scan of all the replicator databases.

  * `couch_replicator_connection`: Maintains a global replication connection
    pool. It allows reusing connections across replication tasks. The main
    interface is `acquire/1` and `release/1`. The general idea is once a
    connection is established, it is kept around for
    `replicator.connection_close_interval` milliseconds in case another
    replication task wants to re-use it. It is worth pointing out how linking
    and monitoring is handled: workers are linked to the connection pool when
    they are created. If they crash, the connection pool will receive an 'EXIT'
    event and clean up after the worker. The connection pool also monitors
    owners (by monitoring the `Pid` from the `From` argument in the call to
    `acquire/1`) and cleans up if owner dies, and the pool receives a 'DOWN'
    message. Another interesting thing is that connection establishment
    (creation) happens in the owner process so the pool is not blocked on it.

 * `couch_replicator_rate_limiter`: Implements a rate limiter to handle
    connection throttling from sources or targets where requests return 429
    error codes. Uses the Additive Increase / Multiplicative Decrease feedback
    control algorithm to converge on the channel capacity. Implemented using a
    16-way sharded ETS table to maintain connection state. The table sharding
    code is split out to `couch_replicator_rate_limiter_tables` module. The
    purpose of the module it to maintain and continually estimate sleep
    intervals for each connection represented as a `{Method, Url}` pair. The
    interval is updated accordingly on each call to `failure/1` or `success/1`
    calls. For a successful request, a client should call `success/1`. Whenever
    a 429 response is received the client should call `failure/1`. When no
    failures are happening the code ensures the ETS tables are empty in
    order to have a lower impact on a running system.

 * `couch_replicator_scheduler`: This is the core component of the scheduling
    replicator. It's main task is to switch between replication jobs, by
    stopping some and starting others to ensure all of them make progress.
    Replication jobs which fail are penalized using an exponential backoff.
    That is, each consecutive failure will double the time penalty. This frees
    up system resources for more useful work than just continuously trying to
    run the same subset of failing jobs.

    The main API function is `add_job/1`. Its argument is an instance of the
    `#rep{}` record, which could be the result of a document update from a
    `_replicator` db or the result of a POST to `_replicate` endpoint.

    Each job internally is represented by the `#job{}` record. It contains the
    original `#rep{}` but also, maintains an event history. The history is a
    sequence of past events for each job. These are timestamped and ordered
    such that the most recent event is at the head. History length is limited
    based on the `replicator.max_history` configuration value. The default is
    20 entries. History events types are:

    * `added` : job was just added to the scheduler. This is the first event.
    * `started` : job was started. This was an attempt to run the job.
    * `stopped` : job was stopped by the scheduler.
    * `crashed` : job has crashed (instead of stopping cleanly).

    The core of the scheduling algorithm is the `reschedule/1` function. This
    function is called every `replicator.interval` milliseconds (default is
    60000 i.e. a minute). During each call the scheduler will try to stop some
    jobs, start some new ones and will also try to keep the maximum number of
    jobs running less than `replicator.max_jobs` (default 500). So the
    functions does these operations (actual code paste):

    ```
    Running = running_job_count(),
    Pending = pending_job_count(),
    stop_excess_jobs(State, Running),
    start_pending_jobs(State, Running, Pending),
    rotate_jobs(State, Running, Pending),
    update_running_jobs_stats(State#state.stats_pid)
    ```

    `Running` is the total number of currently running jobs. `Pending` is the
    total number of jobs waiting to be run. `stop_excess_jobs` will stop any
    exceeding the `replicator.max_jobs` configured limit. This code takes
    effect if user reduces the `max_jobs` configuration value.
    `start_pending_jobs` will start any jobs if there is more room available.
    This will take effect on startup or when user increases the `max_jobs`
    configuration value. `rotate_jobs` is where all the action happens. The
    scheduler picks `replicator.max_churn` running jobs to stop and then picks
    the same number of pending jobs to start. The default value of `max_churn`
    is 20. So by default every minute, 20 running jobs are stopped, and 20 new
    pending jobs are started.

    Before moving on it is worth pointing out that scheduler treats continuous
    and non-continuous replications differently. Normal (non-continuous)
    replications once started will be allowed to run to completion. That
    behavior is to preserve their semantics of replicating a snapshot of the
    source database to the target. For example if new documents are added to
    the source after the replication are started, those updates should not show
    up on the target database. Stopping and restarting a normal replication
    would violate that constraint. The only exception to the rule is the user
    explicitly reduces `replicator.max_jobs` configuration value. Even then
    scheduler will first attempt to stop as many continuous jobs as possible
    and only if it has no choice left will it stop normal jobs.

    Keeping that in mind and going back to the scheduling algorithm, the next
    interesting part is how the scheduler picks which jobs to stop and which
    ones to start:

    * Stopping: When picking jobs to stop the scheduler will pick longest
      running continuous jobs first. The sorting callback function to get the
      longest running jobs is unsurprisingly called `longest_running/2`. To
      pick the longest running jobs it looks at the most recent `started`
      event. After it gets a sorted list by longest running, it simply picks
      first few depending on the value of `max_churn` using `lists:sublist/2`.
      Then those jobs are stopped.

    * Starting: When starting the scheduler will pick the jobs which have been
      waiting the longest. Surprisingly, in this case it also looks at the
      `started` timestamp and picks the jobs which have the oldest `started`
      timestamp. If there are 3 jobs, A[started=10], B[started=7],
      C[started=9], then B will be picked first, then C then A. This ensures
      that jobs are not starved, which is a classic scheduling pitfall.

    In the code, the list of pending jobs is picked slightly differently than
    how the list of running jobs is picked. `pending_jobs/1` uses `ets:foldl`
    to iterate over all the pending jobs. As it iterates it tries to keep only
    up to `max_churn` oldest items in the accumulator. The reason this is done
    is that there could be a very large number of pending jobs and loading them
    all in a list (making a copy from ETS) and then sorting it can be quite
    expensive performance-wise. The tricky part of the iteration is happening
    in `pending_maybe_replace/2`. A `gb_sets` ordered set is used to keep top-N
    longest waiting jobs so far. The code has a comment with a helpful example
    on how this algorithm works.

    The last part is how the scheduler treats jobs which keep crashing. If a
    job is started but then crashes then that job is considered unhealthy. The
    main idea is to penalize such jobs such that they are forced to wait an
    exponentially larger amount of time with each consecutive crash. A central
    part to this algorithm is determining what forms a sequence of consecutive
    crashes. If a job starts then quickly crashes, and after its next start it
    crashes again, then that would become a sequence of 2 consecutive crashes.
    The penalty then would be calculated by `backoff_micros/1` function where
    the consecutive crash count would end up as the exponent. However for
    practical concerns there is also maximum penalty specified and that's the
    equivalent of 10 consecutive crashes. Timewise it ends up being about 8
    hours. That means even a job which keep crashing will still get a chance to
    retry once in 8 hours.

    There is subtlety when calculating consecutive crashes and that is deciding
    when the sequence stops. That is, figuring out when a job becomes healthy
    again. The scheduler considers a job healthy again if it started and hasn't
    crashed in a while. The "in a while" part is a configuration parameter
    `replicator.health_threshold` defaulting to 2 minutes. This means if job
    has been crashing, for example 5 times in a row, but then on the 6th
    attempt it started and ran for more than 2 minutes then it is considered
    healthy again. The next time it crashes its sequence of consecutive crashes
    will restart at 1.

 * `couch_replicator_scheduler_sup`: This module is a supervisor for running
   replication tasks. The most interesting thing about it is perhaps that it is
   not used to restart children. The scheduler handles restarts and error
   handling backoffs.

 * `couch_replicator_doc_processor`: The doc processor component is in charge
   of processing replication document updates, turning them into replication
   jobs and adding those jobs to the scheduler. Unfortunately the only reason
   there is even a `couch_replicator_doc_processor` gen_server, instead of
   replication documents being turned to jobs and inserted into the scheduler
   directly, is because of one corner case -- filtered replications using
   custom (JavaScript mostly) filters. More about this later. It is better to
   start with how updates flow through the doc processor:

   Document updates come via the `db_change/3` callback from
   `couch_multidb_changes`, then go to the `process_change/2` function.

   In `process_change/2` a few decisions are made regarding how to proceed. The
   first is "ownership" check. That is a check if the replication document
   belongs on the current node. If not, then it is ignored. In a cluster, in
   general there would be N copies of a document change and we only want to run
   the replication once. Another check is to see if the update has arrived
   during a time when the cluster is considered "unstable". If so, it is
   ignored, because soon enough a rescan will be launched and all the documents
   will be reprocessed anyway. Another noteworthy thing in `process_change/2`
   is handling of upgrades from the previous version of the replicator when
   transient states were written to the documents. Two such states were
   `triggered` and `error`. Both of those states are removed from the document
   then then update proceeds in the regular fashion. `failed` documents are
   also ignored here. `failed` is a terminal state which indicates the document
   was somehow unsuitable to become a replication job (it was malformed or a
   duplicate). Otherwise the state update proceeds to `process_updated/2`.

   `process_updated/2` is where replication document updates are parsed and
   translated to `#rep{}` records. The interesting part here is that the
   replication ID isn't calculated yet. Unsurprisingly the parsing function
   used is called `parse_rep_doc_without_id/1`. Also note that up until now
   everything is still running in the context of the `db_change/3` callback.
   After replication filter type is determined the update gets passed to the
   `couch_replicator_doc_processor` gen_server.

   The `couch_replicator_doc_processor` gen_server's main role is to try to
   calculate replication IDs for each `#rep{}` record passed to it, then add
   that as a scheduler job. As noted before, `#rep{}` records parsed up until
   this point lack a replication ID. The reason is replication ID calculation
   includes a hash of the filter code. And because user defined replication
   filters live in the source DB, which most likely involves a remote network
   fetch there is a possibility of blocking and a need to handle various
   network failures and retries. Because of that `replication_doc_processor`
   dispatches all of that blocking and retrying to a separate `worker` process
   (`couch_replicator_doc_processor_worker` module).

   `couch_replicator_doc_processor_worker` is where replication IDs are
   calculated for each individual doc update. There are two separate modules
   which contain utilities related to replication ID calculation:
   `couch_replicator_ids` and `couch_replicator_filters`. The first one
   contains ID calculation algorithms and the second one knows how to parse and
   fetch user filters from a remote source DB. One interesting thing about the
   worker is that it is time-bounded and is guaranteed to not be stuck forever.
   That's why it spawns an extra process with `spawn_monitor`, just so it can
   do an `after` clause in receive and bound the maximum time this worker will
   take.

   A doc processor worker will either succeed or fail but never block for too
   long. Success and failure are returned as exit values. Those are handled in
   the `worker_returned/3` doc processor clauses. The most common pattern is
   that a worker is spawned to add a replication job, it does so and returns a
   `{ok, ReplicationID}` value in `worker_returned`.

   In case of a filtered replication with custom user code there are two case to
   consider:

     1. Filter fetching code has failed. In that case worker returns an error.
        But because the error could be a transient network error, another
        worker is started to try again. It could fail and return an error
        again, then another one is started and so on. However each consecutive
        worker will do an exponential backoff, not unlike the scheduler code.
        `error_backoff/1` is where the backoff period is calculated.
        Consecutive errors are held in the `errcnt` field in the ETS table.

     2. Fetching filter code succeeds, replication ID is calculated and job is
        added to the scheduler. However, because this is a filtered replication
        the source database could get an updated filter. Which means
        replication ID could change again. So the worker is spawned to
        periodically check the filter and see if it changed. In other words doc
        processor will do the work of checking for filtered replications, get
        an updated filter and will then refresh the replication job (remove the
        old one and add a new one with a different ID). The filter checking
        interval is determined by the `filter_backoff` function. An unusual
        thing about that function is it calculates the period based on the size
        of the ETS table. The idea there is for a few replications in a
        cluster, it's ok to check filter changes often. But when there are lots
        of replications running, having each one checking their filter often is
        not a good idea.

 * `couch_replicator`: This is an unusual but useful pattern. This child is not
   an actual process but a one-time call to the
   `couch_replicator:ensure_rep_db_exists/0` function, executed by the
   supervisor in the correct order (and monitored for crashes). This ensures
   the local replicator db exists, then returns `ignore`. This pattern is
   useful for doing setup-like things at the top level and in the correct order
   regarding the rest of the children in the supervisor.

 * `couch_replicator_db_changes`: This process specializes and configures
   `couch_multidb_changes` so that it looks for `_replicator` suffixed shards
   and makes sure to restart it when node membership changes.