summaryrefslogtreecommitdiff
path: root/src/couch_replicator/README.md
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/README.md')
-rw-r--r--src/couch_replicator/README.md285
1 files changed, 0 insertions, 285 deletions
diff --git a/src/couch_replicator/README.md b/src/couch_replicator/README.md
deleted file mode 100644
index 6a2a5cfdd..000000000
--- a/src/couch_replicator/README.md
+++ /dev/null
@@ -1,285 +0,0 @@
-Developer Oriented Replicator Description
-=========================================
-
-This description of scheduling replicator's functionality is mainly geared to
-CouchDB developers. It dives a bit into the internal and explains how
-everything is connected together.
-
-A natural place to start is the top application supervisor:
-`couch_replicator_sup`. It's a `rest_for_one` restart strategy supervisor,
-so if a child process terminates, the rest of the children in the hierarchy
-following it are also terminated. This structure implies a useful constraint --
-children lower in the list can safely call their siblings which are higher in
-the list.
-
-A description of each child:
-
- * `couch_replication_event`: Starts a gen_event publication bus to handle some
- replication related events. This used for example, to publish cluster
- membership changes by the `couch_replicator_clustering` process. But is
- also used in replication tests to monitor for replication events.
- Notification is performed via the `couch_replicator_notifier:notify/1`
- function. It's the first (left-most) child because
- `couch_replicator_clustering` uses it.
-
- * `couch_replicator_clustering`: This module maintains cluster membership
- information for the replication application and provides functions to check
- ownership of replication jobs. A cluster membership change is published via
- the `gen_event` event server named `couch_replication_event` as previously
- covered. Published events are `{cluster, stable}` when cluster membership
- has stabilized, that it, no node membership changes in a given period, and
- `{cluster, unstable}` which indicates there was a recent change to the
- cluster membership and now it's considered unstable. Listeners for cluster
- membership change include `couch_replicator_doc_processor` and
- `couch_replicator_db_changes`. When doc processor gets an `{cluster,
- stable}` event it will remove all the replication jobs not belonging to the
- current node. When `couch_replicator_db_changes` gets a `{cluster,
- stable}` event, it will restart the `couch_multidb_changes` process it
- controls, which will launch an new scan of all the replicator databases.
-
- * `couch_replicator_connection`: Maintains a global replication connection
- pool. It allows reusing connections across replication tasks. The main
- interface is `acquire/1` and `release/1`. The general idea is once a
- connection is established, it is kept around for
- `replicator.connection_close_interval` milliseconds in case another
- replication task wants to re-use it. It is worth pointing out how linking
- and monitoring is handled: workers are linked to the connection pool when
- they are created. If they crash, the connection pool will receive an 'EXIT'
- event and clean up after the worker. The connection pool also monitors
- owners (by monitoring the `Pid` from the `From` argument in the call to
- `acquire/1`) and cleans up if owner dies, and the pool receives a 'DOWN'
- message. Another interesting thing is that connection establishment
- (creation) happens in the owner process so the pool is not blocked on it.
-
- * `couch_replicator_rate_limiter`: Implements a rate limiter to handle
- connection throttling from sources or targets where requests return 429
- error codes. Uses the Additive Increase / Multiplicative Decrease feedback
- control algorithm to converge on the channel capacity. Implemented using a
- 16-way sharded ETS table to maintain connection state. The table sharding
- code is split out to `couch_replicator_rate_limiter_tables` module. The
- purpose of the module it to maintain and continually estimate sleep
- intervals for each connection represented as a `{Method, Url}` pair. The
- interval is updated accordingly on each call to `failure/1` or `success/1`
- calls. For a successful request, a client should call `success/1`. Whenever
- a 429 response is received the client should call `failure/1`. When no
- failures are happening the code ensures the ETS tables are empty in
- order to have a lower impact on a running system.
-
- * `couch_replicator_scheduler`: This is the core component of the scheduling
- replicator. It's main task is to switch between replication jobs, by
- stopping some and starting others to ensure all of them make progress.
- Replication jobs which fail are penalized using an exponential backoff.
- That is, each consecutive failure will double the time penalty. This frees
- up system resources for more useful work than just continuously trying to
- run the same subset of failing jobs.
-
- The main API function is `add_job/1`. Its argument is an instance of the
- `#rep{}` record, which could be the result of a document update from a
- `_replicator` db or the result of a POST to `_replicate` endpoint.
-
- Each job internally is represented by the `#job{}` record. It contains the
- original `#rep{}` but also, maintains an event history. The history is a
- sequence of past events for each job. These are timestamped and ordered
- such that the most recent event is at the head. History length is limited
- based on the `replicator.max_history` configuration value. The default is
- 20 entries. History events types are:
-
- * `added` : job was just added to the scheduler. This is the first event.
- * `started` : job was started. This was an attempt to run the job.
- * `stopped` : job was stopped by the scheduler.
- * `crashed` : job has crashed (instead of stopping cleanly).
-
- The core of the scheduling algorithm is the `reschedule/1` function. This
- function is called every `replicator.interval` milliseconds (default is
- 60000 i.e. a minute). During each call the scheduler will try to stop some
- jobs, start some new ones and will also try to keep the maximum number of
- jobs running less than `replicator.max_jobs` (default 500). So the
- functions does these operations (actual code paste):
-
- ```
- Running = running_job_count(),
- Pending = pending_job_count(),
- stop_excess_jobs(State, Running),
- start_pending_jobs(State, Running, Pending),
- rotate_jobs(State, Running, Pending),
- update_running_jobs_stats(State#state.stats_pid)
- ```
-
- `Running` is the total number of currently running jobs. `Pending` is the
- total number of jobs waiting to be run. `stop_excess_jobs` will stop any
- exceeding the `replicator.max_jobs` configured limit. This code takes
- effect if user reduces the `max_jobs` configuration value.
- `start_pending_jobs` will start any jobs if there is more room available.
- This will take effect on startup or when user increases the `max_jobs`
- configuration value. `rotate_jobs` is where all the action happens. The
- scheduler picks `replicator.max_churn` running jobs to stop and then picks
- the same number of pending jobs to start. The default value of `max_churn`
- is 20. So by default every minute, 20 running jobs are stopped, and 20 new
- pending jobs are started.
-
- Before moving on it is worth pointing out that scheduler treats continuous
- and non-continuous replications differently. Normal (non-continuous)
- replications once started will be allowed to run to completion. That
- behavior is to preserve their semantics of replicating a snapshot of the
- source database to the target. For example if new documents are added to
- the source after the replication are started, those updates should not show
- up on the target database. Stopping and restarting a normal replication
- would violate that constraint. The only exception to the rule is the user
- explicitly reduces `replicator.max_jobs` configuration value. Even then
- scheduler will first attempt to stop as many continuous jobs as possible
- and only if it has no choice left will it stop normal jobs.
-
- Keeping that in mind and going back to the scheduling algorithm, the next
- interesting part is how the scheduler picks which jobs to stop and which
- ones to start:
-
- * Stopping: When picking jobs to stop the scheduler will pick longest
- running continuous jobs first. The sorting callback function to get the
- longest running jobs is unsurprisingly called `longest_running/2`. To
- pick the longest running jobs it looks at the most recent `started`
- event. After it gets a sorted list by longest running, it simply picks
- first few depending on the value of `max_churn` using `lists:sublist/2`.
- Then those jobs are stopped.
-
- * Starting: When starting the scheduler will pick the jobs which have been
- waiting the longest. Surprisingly, in this case it also looks at the
- `started` timestamp and picks the jobs which have the oldest `started`
- timestamp. If there are 3 jobs, A[started=10], B[started=7],
- C[started=9], then B will be picked first, then C then A. This ensures
- that jobs are not starved, which is a classic scheduling pitfall.
-
- In the code, the list of pending jobs is picked slightly differently than
- how the list of running jobs is picked. `pending_jobs/1` uses `ets:foldl`
- to iterate over all the pending jobs. As it iterates it tries to keep only
- up to `max_churn` oldest items in the accumulator. The reason this is done
- is that there could be a very large number of pending jobs and loading them
- all in a list (making a copy from ETS) and then sorting it can be quite
- expensive performance-wise. The tricky part of the iteration is happening
- in `pending_maybe_replace/2`. A `gb_sets` ordered set is used to keep top-N
- longest waiting jobs so far. The code has a comment with a helpful example
- on how this algorithm works.
-
- The last part is how the scheduler treats jobs which keep crashing. If a
- job is started but then crashes then that job is considered unhealthy. The
- main idea is to penalize such jobs such that they are forced to wait an
- exponentially larger amount of time with each consecutive crash. A central
- part to this algorithm is determining what forms a sequence of consecutive
- crashes. If a job starts then quickly crashes, and after its next start it
- crashes again, then that would become a sequence of 2 consecutive crashes.
- The penalty then would be calculated by `backoff_micros/1` function where
- the consecutive crash count would end up as the exponent. However for
- practical concerns there is also maximum penalty specified and that's the
- equivalent of 10 consecutive crashes. Timewise it ends up being about 8
- hours. That means even a job which keep crashing will still get a chance to
- retry once in 8 hours.
-
- There is subtlety when calculating consecutive crashes and that is deciding
- when the sequence stops. That is, figuring out when a job becomes healthy
- again. The scheduler considers a job healthy again if it started and hasn't
- crashed in a while. The "in a while" part is a configuration parameter
- `replicator.health_threshold` defaulting to 2 minutes. This means if job
- has been crashing, for example 5 times in a row, but then on the 6th
- attempt it started and ran for more than 2 minutes then it is considered
- healthy again. The next time it crashes its sequence of consecutive crashes
- will restart at 1.
-
- * `couch_replicator_scheduler_sup`: This module is a supervisor for running
- replication tasks. The most interesting thing about it is perhaps that it is
- not used to restart children. The scheduler handles restarts and error
- handling backoffs.
-
- * `couch_replicator_doc_processor`: The doc processor component is in charge
- of processing replication document updates, turning them into replication
- jobs and adding those jobs to the scheduler. Unfortunately the only reason
- there is even a `couch_replicator_doc_processor` gen_server, instead of
- replication documents being turned to jobs and inserted into the scheduler
- directly, is because of one corner case -- filtered replications using
- custom (JavaScript mostly) filters. More about this later. It is better to
- start with how updates flow through the doc processor:
-
- Document updates come via the `db_change/3` callback from
- `couch_multidb_changes`, then go to the `process_change/2` function.
-
- In `process_change/2` a few decisions are made regarding how to proceed. The
- first is "ownership" check. That is a check if the replication document
- belongs on the current node. If not, then it is ignored. In a cluster, in
- general there would be N copies of a document change and we only want to run
- the replication once. Another check is to see if the update has arrived
- during a time when the cluster is considered "unstable". If so, it is
- ignored, because soon enough a rescan will be launched and all the documents
- will be reprocessed anyway. Another noteworthy thing in `process_change/2`
- is handling of upgrades from the previous version of the replicator when
- transient states were written to the documents. Two such states were
- `triggered` and `error`. Both of those states are removed from the document
- then then update proceeds in the regular fashion. `failed` documents are
- also ignored here. `failed` is a terminal state which indicates the document
- was somehow unsuitable to become a replication job (it was malformed or a
- duplicate). Otherwise the state update proceeds to `process_updated/2`.
-
- `process_updated/2` is where replication document updates are parsed and
- translated to `#rep{}` records. The interesting part here is that the
- replication ID isn't calculated yet. Unsurprisingly the parsing function
- used is called `parse_rep_doc_without_id/1`. Also note that up until now
- everything is still running in the context of the `db_change/3` callback.
- After replication filter type is determined the update gets passed to the
- `couch_replicator_doc_processor` gen_server.
-
- The `couch_replicator_doc_processor` gen_server's main role is to try to
- calculate replication IDs for each `#rep{}` record passed to it, then add
- that as a scheduler job. As noted before, `#rep{}` records parsed up until
- this point lack a replication ID. The reason is replication ID calculation
- includes a hash of the filter code. And because user defined replication
- filters live in the source DB, which most likely involves a remote network
- fetch there is a possibility of blocking and a need to handle various
- network failures and retries. Because of that `replication_doc_processor`
- dispatches all of that blocking and retrying to a separate `worker` process
- (`couch_replicator_doc_processor_worker` module).
-
- `couch_replicator_doc_processor_worker` is where replication IDs are
- calculated for each individual doc update. There are two separate modules
- which contain utilities related to replication ID calculation:
- `couch_replicator_ids` and `couch_replicator_filters`. The first one
- contains ID calculation algorithms and the second one knows how to parse and
- fetch user filters from a remote source DB. One interesting thing about the
- worker is that it is time-bounded and is guaranteed to not be stuck forever.
- That's why it spawns an extra process with `spawn_monitor`, just so it can
- do an `after` clause in receive and bound the maximum time this worker will
- take.
-
- A doc processor worker will either succeed or fail but never block for too
- long. Success and failure are returned as exit values. Those are handled in
- the `worker_returned/3` doc processor clauses. The most common pattern is
- that a worker is spawned to add a replication job, it does so and returns a
- `{ok, ReplicationID}` value in `worker_returned`.
-
- In case of a filtered replication with custom user code there are two case to
- consider:
-
- 1. Filter fetching code has failed. In that case worker returns an error.
- But because the error could be a transient network error, another
- worker is started to try again. It could fail and return an error
- again, then another one is started and so on. However each consecutive
- worker will do an exponential backoff, not unlike the scheduler code.
- `error_backoff/1` is where the backoff period is calculated.
- Consecutive errors are held in the `errcnt` field in the ETS table.
-
- 2. Fetching filter code succeeds, replication ID is calculated and job is
- added to the scheduler. However, because this is a filtered replication
- the source database could get an updated filter. Which means
- replication ID could change again. So the worker is spawned to
- periodically check the filter and see if it changed. In other words doc
- processor will do the work of checking for filtered replications, get
- an updated filter and will then refresh the replication job (remove the
- old one and add a new one with a different ID). The filter checking
- interval is determined by the `filter_backoff` function. An unusual
- thing about that function is it calculates the period based on the size
- of the ETS table. The idea there is for a few replications in a
- cluster, it's ok to check filter changes often. But when there are lots
- of replications running, having each one checking their filter often is
- not a good idea.
-
- * `couch_replicator_db_changes`: This process specializes and configures
- `couch_multidb_changes` so that it looks for `_replicator` suffixed shards
- and makes sure to restart it when node membership changes.
-
-