summaryrefslogtreecommitdiff
path: root/src/mem3/README_reshard.md
blob: 2373714851fa469165dba6a192772b2902f78a90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
Developer Oriented Resharding Description
=========================================

This is a technical description of the resharding logic. The discussion will focus on: job creation and life-cycle, data definitions, and on the state transition mechanisms.


Job Life-Cycle
--------------

Job creation happens in the `mem3_reshard_httpd` API handler module. That module uses `mem3_reshard_http_util` to do some validation, and eventually calls `mem3_reshard:start_split_job/2` on one or more nodes in the cluster depending where the new jobs should run.

`mem3_reshard:start_split_job/2` is the main Erlang API entry point.  After some more validation it creates a `#job{}` record and calls the `mem3_reshard` job manager. The manager then will add the job to its jobs ets table, save it to a `_local` document in the shards db, and most importantly, start a new resharding process. That process will be supervised by the `mem3_reshard_job_sup` supervisor.

Each job will be running in a gen_server implemented in `mem3_reshard_job` module. When splitting a shard, a job will go through a series of steps such as `initial_copy`, `build_indices`, `update_shard_map`, etc. Between each step it will report progress and checkpoint with `mem3_reshard` manager. A checkpoint involved the `mem3_reshard` manager persisting that job's state to disk in `_local` document in `_dbs` db. Then job continues until `completed` state or until it failed in the `failed` state.

If a user stops shard splitting on the whole cluster, then all running jobs will stop. When shard splitting is resumed, they will try to recover from their last checkpoint.

A job can also be individually stopped or resumed. If a job is individually stopped it will stay so even if the global shard splitting state is `running`. A user has to explicitly set that job to a `running` state for it to resume. If a node with running jobs is turned off, when it is rebooted running jobs will resume from their last checkpoint.


Data Definitions
----------------

This section focuses on record definition and how data is transformed to and from various formats.

Right below the `mem3_reshard:start_split_job/1` API level a job is converted to a `#job{}` record defined in the `mem3_reshard.hrl` header file. That record is then used throughout most of the resharding code. The job manager `mem3_reshard` stores it in its jobs ets table, then when a job process is spawn it single argument also just a `#job{}` record. As a job process is executing it will periodically report state back to the `mem3_reshard` manager as an updated `#job{}` record.

Some interesting fields from the `#job{}` record:

 - `id` Uniquely identifies a job in a cluster. It is derived from the source shard name, node and a version (currently = 1).
 - `type` Currently the only type supported is `split` but `merge` or `rebalance` might be added in the future.
 - `job_state` The running state of the job. Indicates if the job is `running`, `stopped`, `completed` or `failed`.
 - `split_state` Once the job is running this indicates how far along it got in the splitting process.
 - `source` Source shard file. If/when merge is implemented this will be a list.
 - `target` List of target shard files. This is expected to be a list of 2 items currently.
 - `history` A time-line of state transitions represented as a list of tuples.
 - `pid` When job is running this will be set to the pid of the process.


In the `mem3_reshard_job_store` module the `#job{}` record is translated to an json document so it can be persisted to disk. Translation functions to and from a json in that module are also used by the HTTP API layer to return a job's state and other information to the user.

Another important piece of data is the global resharding state. When a user disables resharding on a cluster, a call is made to `mem3_reshard` manager on each node and they store that in a `#state{}` record. This record is defined in the `mem3_reshard.hrl` module, and just like the `#job{}` record can be translated to/from ejson in the `mem3_reshard_store` and stored and loaded from disk.


State Transitions
-----------------

Resharding logic has 3 separate states to keep track of:

1. Per-node resharding state. This state can be toggled between `running` and `stopped`. That toggle happens via the `mem3_reshard:start/0` and `mem3_reshard:stop/1` function.  This state is kept in the `#state{}` record of the `mem3_reshard` manager gen_server. This state is also persisted to the local shard map database as a `_local` document so that it is maintained through a node restart. When the state is `running` then all jobs that are not individually `stopped`, and have not failed or completed, will be `running`. When the state is `stopped` all the running jobs will be `stopped`.

2. Job's running state held in the `#job{}` `job_state` field. This is the general running state of a resharding job. It can be `new`, `running`, `stopped`, `completed` or `failed`. This state is most relevant for the `mem3_reshard` manager. In other words, it is the `mem3_reshard` gen_server that starts the job, monitors it to see if it exits successfully on completion or with an error.

3. Job's internal splitting state. This state tracks the steps taken during shard splitting by each job. This state is mostly relevant for the `mem3_reshard_job` module. This state is kept in `#job{}`'s `split_state` field. The progression of these states is linear going from one state to the next. That's reflected in the code as well, when one state is done, `mem3_reshard_job:get_next_state/1` is called which returns the next state in the list. The list is defined in the `SPLIT_STATES` macro. This simplistic transition is also one of the reasons why a gen_fsm wasn't considered for `mem3_reshard_job` logic.

Another interesting aspect is how the `split_state` transitions happen in the `mem3_reshard_job` module. What follows is an examination of that.

A job starts running in the `new` state or from a previously checkpointed state. In the later case, the job goes through some recovery logic (see `?STATE_RESTART` macro in `mem3_reshard_job`) where it tries to resume its work from where it left of. It means, for example, if it was in the `initial_copy` state and was interrupted it might have to reset the target files and copy everything again. After recovery, the state execution logic is driven by `run(#job{})` which ends up calling `?MODULE:State(#job{})` state specific functions for each state.

In `mem3_reshard_job:switch_to_next_state/2` job's history is updated, any current `state_info` is cleared, job state is switched in the `#job{}` record. Then, the new state is checkpointed in the `checkpoint/1` function. Checkpoint will cast a message to the `mem3_reshard` manager. After that message is sent the job process sits and waits.

In the meantime `mem3_reshard` manager checkpoints the state, which means it updates both its ETS table with the new `#job{}` record, persists the state with the `mem3_reshard_store` module, then, finally, it notifies the job process that checkpointing is done by calling `mem3_reshard_job:checkpoint_done/1`.

`mem3_reshard_job:checkpoint_done/1` function call sends a `checkpoint_done` message to the job's process, at which point it starts executing that state.

Most states in `mem3_reshard_job` try not to block the main job process and instead launch worker processes to perform long running operations. It is usually just one worker process but it could be multiple as well. After that it waits for the workers to finish and inspects their exit signal (see `wait_for_workers/1` function). When all the workers exit for a particular `split_state`, the job is switched to the next state with `switch_to_next_state/1` and the whole thing repeats until the `completed` state is reached when the whole job exits normally.

If the source is updated at high rate and the cluster is under load, there is a possibility for the resharding jobs to take longer to finish. The cluster would have to be running at the limit where both compaction and internal replication will have difficulty catching up as fundamentally the logic used for the initial bulk copy is similar the compaction code, and topoff states are just reusing the internal replicator code. Eventually when the load subsides the jobs should catch up and finish.

Individual Modules Description
------------------------------

These are mostly random notes about various modules involved in resharding. Most, but not all, are in the `mem3` application.

* `mem3_reshard`: Main API entry point and the job manager.

* `mem3_reshard_job` : Individual job logic.

* `mem3_reshard_dbdoc` : Responsible for updating shard doc in the `_db`'s database. Besides just having a bunch of utility function there is a gen_server spawned which is used to update shard documents in a cluster in such a way as to minimize the risk of conflicts. That is accomplished by having each shard updater calling only one such updater for the whole cluster. This coordinator is picked by sorting the list of all the live mem3 nodes and picking the first one in the list.

* `mem3_reshard_httpd` : API endpoint definitions.

* `mem3_reshard_api` : Cluster API endpoint. This module is responsible for sending requests to all the nodes in a cluster and gathering results.

* `mem3_reshard_index` : This is a helper module used by workers in the `build_indices` state.

* `mem3_reshard_job_sup` : Simple one for one supervisor which keeps track of running jobs.

* `mem3_reshard_store` : State persistence module. It knows how to save/restore `#job{}` and `#state{}` records to/from `_local` docs. It is also re-used for serializing `#job{}` into ejson by the HTTP API module.

* `mem3_reshard_validate` : Validate that source exists, target ranges don't have gaps in them, etc.

* `couch_db_split` : This module is not in `mem3` app but it does all the heavy lifting during the initial data copy. Given a source db and some targets, and a function to decide which doc go to which target, it will copy all data from the source to the targets. It's best to think of this module as a form of compactor. Unlike `couch_bt_engine_compactor` this one lives above the `couch_db_engine` API, and instead of copying data to one new file it copies it to 2 or more. Unsurprisingly because of that it uses some lower level `couch_db_engine` API directly, including linking to a couch_db_updater, force setting db update sequences and others.