summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-02-25 12:28:18 -0500
committerNick Vatamaniuc <vatamane@apache.org>2019-02-28 11:49:01 -0500
commit8e8980982f2c7d5a4f82bcd5817ff74f15e8abfa (patch)
tree827481724212400f0889d58a65c29e3a236e449d
parent71f18988fa6bcaadd230f75683c5dd577bd3ff79 (diff)
downloadcouchdb-8e8980982f2c7d5a4f82bcd5817ff74f15e8abfa.tar.gz
Implement initial shard splitting data copy
The first step when a new shard splitting job starts is to do a bulk copy of data from the source to the target. Ideally this should happen as fast as possible as it could potentially churn through billions of documents. This logic is implemented in the `couch_db_split` module in the main `couch` application. To understand better what happens in `couch_db_split` it is better to think of it as a version of `couch_bt_engine_compactor` that lives just above the couch_db_engine (PSE) interface instead of below it. The first initial data copy does is it creates the targets. Targets are created based on the source parameters. So if the source uses a specific PSE engine, targets will use the same PSE engine. If the source is partitioned, the targets will use the same partitioned hash function as well. An interesting bit with respect to target creation is that targets are not regular couch_db databases but are closer to a couch_file with a couch_db_updater process linked to them. They are linked directly without going through couch_server. This is done in order to avoid the complexity of handling concurrent updates, handling VDU, interactive vs non-interactive updates, making sure it doesn't compact while copying happens, doesn't update any LRUs, or emit `db_updated` events. Those are things are not needed and handling them would make this more fragile. Another way to think of the targets during the initial bulk data copy is as "hidden" or "write-only" dbs. Another notable thing is that `couch_db_split` doesn't know anything about shards and only knows about databases. The input is a source, a map of targets and a caller provided "picker" function which will know how for each given document ID to pick one of the targets. This will work for both regular dbs as well as partitioned ones. All the logic will be inside the pick function not embedded in `couch_db_split`. One last point is about handling internal replicator _local checkpoint docs. Those documents are transformed when they are copied such that the old source UUID is replaced with the new target's UUID, since each shard will have its own new UUID. That is done to avoid replications rewinding. Besides those points, the rest is rather boring and it's just "open documents from the source, pick the target, copy the documents to one of the targets, read more documents from the source, etc". Issue #1920 Co-authored-by: Paul J. Davis <davisp@apache.org> Co-authored-by: Eric Avdey <eiri@eiri.ca>
-rw-r--r--src/couch/src/couch_bt_engine.erl18
-rw-r--r--src/couch/src/couch_db.erl4
-rw-r--r--src/couch/src/couch_db_engine.erl32
-rw-r--r--src/couch/src/couch_db_split.erl493
-rw-r--r--src/couch/src/couch_server.erl11
-rw-r--r--src/couch/test/couch_db_split_tests.erl298
-rw-r--r--src/couch/test/couch_server_tests.erl30
-rw-r--r--src/couch_pse_tests/src/cpse_test_copy_purge_infos.erl82
-rw-r--r--src/couch_pse_tests/src/cpse_util.erl1
9 files changed, 966 insertions, 3 deletions
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 7b33c4203..f4ff9a8d3 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -51,6 +51,8 @@
set_security/2,
set_props/2,
+ set_update_seq/2,
+
open_docs/2,
open_local_docs/2,
read_doc_body/2,
@@ -60,6 +62,7 @@
write_doc_body/2,
write_doc_infos/3,
purge_docs/3,
+ copy_purge_infos/2,
commit_data/1,
@@ -105,7 +108,6 @@
% Used by the compactor
-export([
- set_update_seq/2,
update_header/2,
copy_security/2,
copy_props/2
@@ -548,6 +550,20 @@ purge_docs(#st{} = St, Pairs, PurgeInfos) ->
}}.
+copy_purge_infos(#st{} = St, PurgeInfos) ->
+ #st{
+ purge_tree = PurgeTree,
+ purge_seq_tree = PurgeSeqTree
+ } = St,
+ {ok, PurgeTree2} = couch_btree:add(PurgeTree, PurgeInfos),
+ {ok, PurgeSeqTree2} = couch_btree:add(PurgeSeqTree, PurgeInfos),
+ {ok, St#st{
+ purge_tree = PurgeTree2,
+ purge_seq_tree = PurgeSeqTree2,
+ needs_commit = true
+ }}.
+
+
commit_data(St) ->
#st{
fd = Fd,
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index bdb9dfeca..285c252d3 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -1527,7 +1527,7 @@ calculate_start_seq(Db, Node, {Seq, Uuid}) ->
% Treat the current node as the epoch node
calculate_start_seq(Db, Node, {Seq, Uuid, Node});
calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) ->
- case is_prefix(Uuid, get_uuid(Db)) of
+ case Uuid =:= split orelse is_prefix(Uuid, get_uuid(Db)) of
true ->
case is_owner(EpochNode, Seq, get_epochs(Db)) of
true -> Seq;
@@ -1547,7 +1547,7 @@ calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) ->
0
end;
calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) ->
- case is_prefix(Uuid, couch_db:get_uuid(Db)) of
+ case Uuid =:= split orelse is_prefix(Uuid, couch_db:get_uuid(Db)) of
true ->
start_seq(get_epochs(Db), OriginalNode, Seq);
false ->
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 91d35b0c7..9adc9929d 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -319,6 +319,15 @@
{ok, NewDbHandle::db_handle()}.
+% Set the current update sequence of the database. The intention is to use this
+% when copying a database such that the destination update sequence should
+% match exactly the source update sequence.
+-callback set_update_seq(
+ DbHandle::db_handle(),
+ UpdateSeq::non_neg_integer()) ->
+ {ok, NewDbHandle::db_handle()}.
+
+
% This function will be called by many processes concurrently.
% It should return a #full_doc_info{} record or not_found for
% every provided DocId in the order those DocId's appear in
@@ -465,6 +474,13 @@
{ok, NewDbHandle::db_handle()}.
+% This function should be called from a single threaded context and
+% should be used to copy purge infos from on database to another
+% when copying a database
+-callback copy_purge_infos(DbHandle::db_handle(), [purge_info()]) ->
+ {ok, NewDbHandle::db_handle()}.
+
+
% This function is called in the context of couch_db_udpater and
% as such is single threaded for any given DbHandle.
%
@@ -712,6 +728,8 @@
set_purge_infos_limit/2,
set_props/2,
+ set_update_seq/2,
+
open_docs/2,
open_local_docs/2,
read_doc_body/2,
@@ -721,6 +739,7 @@
write_doc_body/2,
write_doc_infos/3,
purge_docs/3,
+ copy_purge_infos/2,
commit_data/1,
open_write_stream/2,
@@ -918,6 +937,12 @@ set_props(#db{} = Db, Props) ->
{ok, Db#db{engine = {Engine, NewSt}}}.
+set_update_seq(#db{} = Db, UpdateSeq) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:set_update_seq(EngineState, UpdateSeq),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+
open_docs(#db{} = Db, DocIds) ->
#db{engine = {Engine, EngineState}} = Db,
Engine:open_docs(EngineState, DocIds).
@@ -961,6 +986,13 @@ purge_docs(#db{} = Db, DocUpdates, Purges) ->
{ok, Db#db{engine = {Engine, NewSt}}}.
+copy_purge_infos(#db{} = Db, Purges) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:copy_purge_infos(
+ EngineState, Purges),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+
commit_data(#db{} = Db) ->
#db{engine = {Engine, EngineState}} = Db,
{ok, NewSt} = Engine:commit_data(EngineState),
diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl
new file mode 100644
index 000000000..d5c5c9b8d
--- /dev/null
+++ b/src/couch/src/couch_db_split.erl
@@ -0,0 +1,493 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_split).
+
+
+-export([
+ split/3,
+ copy_local_docs/3,
+ cleanup_target/2
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(DEFAULT_BUFFER_SIZE, 16777216).
+
+
+-record(state, {
+ source_db,
+ source_uuid,
+ targets,
+ pickfun,
+ max_buffer_size = ?DEFAULT_BUFFER_SIZE,
+ hashfun
+}).
+
+-record(target, {
+ db,
+ uuid,
+ buffer = [],
+ buffer_size = 0
+}).
+
+-record(racc, {
+ id,
+ source_db,
+ target_db,
+ active = 0,
+ external = 0,
+ atts = []
+}).
+
+
+% Public API
+
+split(Source, #{} = Targets, PickFun) when
+ map_size(Targets) >= 2, is_function(PickFun, 3) ->
+ case couch_db:open_int(Source, [?ADMIN_CTX]) of
+ {ok, SourceDb} ->
+ Engine = get_engine(SourceDb),
+ Partitioned = couch_db:is_partitioned(SourceDb),
+ HashFun = mem3_hash:get_hash_fun(couch_db:name(SourceDb)),
+ try
+ split(SourceDb, Partitioned, Engine, Targets, PickFun, HashFun)
+ catch
+ throw:{target_create_error, DbName, Error, TargetDbs} ->
+ cleanup_targets(TargetDbs, Engine),
+ {error, {target_create_error, DbName, Error}}
+ after
+ couch_db:close(SourceDb)
+ end;
+ {not_found, _} ->
+ {error, missing_source}
+ end.
+
+
+copy_local_docs(Source, #{} = Targets0, PickFun) when
+ is_binary(Source), is_function(PickFun, 3) ->
+ case couch_db:open_int(Source, [?ADMIN_CTX]) of
+ {ok, SourceDb} ->
+ try
+ Targets = maps:map(fun(_, DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+ #target{db = Db, uuid = couch_db:get_uuid(Db)}
+ end, Targets0),
+ SourceName = couch_db:name(SourceDb),
+ try
+ State = #state{
+ source_db = SourceDb,
+ source_uuid = couch_db:get_uuid(SourceDb),
+ targets = Targets,
+ pickfun = PickFun,
+ hashfun = mem3_hash:get_hash_fun(SourceName)
+ },
+ copy_local_docs(State),
+ ok
+ after
+ maps:map(fun(_, #target{db = Db} = T) ->
+ couch_db:close(Db),
+ T#target{db = undefined}
+ end, Targets)
+ end
+ after
+ couch_db:close(SourceDb)
+ end;
+ {not_found, _} ->
+ {error, missing_source}
+ end.
+
+
+cleanup_target(Source, Target) when is_binary(Source), is_binary(Target) ->
+ case couch_db:open_int(Source, [?ADMIN_CTX]) of
+ {ok, SourceDb} ->
+ try
+ delete_target(Target, get_engine(SourceDb))
+ after
+ couch_db:close(SourceDb)
+ end;
+ {not_found, _} ->
+ {error, missing_source}
+ end.
+
+
+% Private Functions
+
+split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) ->
+ Targets = maps:fold(fun(Key, DbName, Map) ->
+ case couch_db:validate_dbname(DbName) of
+ ok ->
+ ok;
+ {error, E} ->
+ throw({target_create_error, DbName, E, Map})
+ end,
+ {ok, Filepath} = couch_server:get_engine_path(DbName, Engine),
+ Opts = [create, ?ADMIN_CTX] ++ case Partitioned of
+ true -> [{props, [{partitioned, true}, {hash, [M, F, A]}]}];
+ false -> []
+ end,
+ case couch_db:start_link(Engine, DbName, Filepath, Opts) of
+ {ok, Db} ->
+ Map#{Key => #target{db = Db}};
+ {error, Error} ->
+ throw({target_create_error, DbName, Error, Map})
+ end
+ end, #{}, Targets0),
+ Seq = couch_db:get_update_seq(SourceDb),
+ State1 = #state{
+ source_db = SourceDb,
+ targets = Targets,
+ pickfun = PickFun,
+ hashfun = HashFun,
+ max_buffer_size = get_max_buffer_size()
+ },
+ State2 = copy_docs(State1),
+ State3 = copy_checkpoints(State2),
+ State4 = copy_meta(State3),
+ State5 = copy_purge_info(State4),
+ State6 = set_targets_update_seq(State5),
+ stop_targets(State6#state.targets),
+ {ok, Seq}.
+
+
+cleanup_targets(#{} = Targets, Engine) ->
+ maps:map(fun(_, #target{db = Db} = T) ->
+ stop_target_db(Db),
+ delete_target(couch_db:name(Db), Engine),
+ T
+ end, Targets).
+
+
+stop_targets(#{} = Targets) ->
+ maps:map(fun(_, #target{db = Db} = T) ->
+ {ok, Db1} = couch_db_engine:commit_data(Db),
+ ok = stop_target_db(Db1),
+ T
+ end, Targets).
+
+
+stop_target_db(Db) ->
+ couch_db:close(Db),
+ Pid = couch_db:get_pid(Db),
+ catch unlink(Pid),
+ catch exit(Pid, kill),
+ ok.
+
+
+delete_target(DbName, Engine) ->
+ RootDir = config:get("couchdb", "database_dir", "."),
+ {ok, Filepath} = couch_server:get_engine_path(DbName, Engine),
+ DelOpt = [{context, compaction}, sync],
+ couch_db_engine:delete(Engine, RootDir, Filepath, DelOpt).
+
+
+pick_target(DocId, #state{} = State, #{} = Targets) ->
+ #state{pickfun = PickFun, hashfun = HashFun} = State,
+ Key = PickFun(DocId, maps:keys(Targets), HashFun),
+ {Key, maps:get(Key, Targets)}.
+
+
+set_targets_update_seq(#state{targets = Targets} = State) ->
+ Seq = couch_db:get_update_seq(State#state.source_db),
+ Targets1 = maps:map(fun(_, #target{db = Db} = Target) ->
+ {ok, Db1} = couch_db_engine:set_update_seq(Db, Seq),
+ Target#target{db = Db1}
+ end, Targets),
+ State#state{targets = Targets1}.
+
+
+copy_checkpoints(#state{} = State) ->
+ #state{source_db = Db, source_uuid = SrcUUID, targets = Targets} = State,
+ FoldFun = fun(#doc{id = Id} = Doc, Acc) ->
+ UpdatedAcc = case Id of
+ <<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>> ->
+ % Transform mem3 internal replicator checkpoints to avoid
+ % rewinding the changes feed when it sees the new shards
+ maps:map(fun(_, #target{uuid = TgtUUID, buffer = Docs} = T) ->
+ Doc1 = update_checkpoint_doc(SrcUUID, TgtUUID, Doc),
+ T#target{buffer = [Doc1 | Docs]}
+ end, Acc);
+ <<?LOCAL_DOC_PREFIX, "purge-", _/binary>> ->
+ % Copy purge checkpoints to all shards
+ maps:map(fun(_, #target{buffer = Docs} = T) ->
+ T#target{buffer = [Doc | Docs]}
+ end, Acc);
+ <<?LOCAL_DOC_PREFIX, _/binary>> ->
+ % Skip copying these that will be done during
+ % local docs top off right before the shards are switched
+ Acc
+ end,
+ {ok, UpdatedAcc}
+ end,
+ {ok, Targets1} = couch_db_engine:fold_local_docs(Db, FoldFun, Targets, []),
+ Targets2 = maps:map(fun(_, #target{db = TDb, buffer = Docs} = T) ->
+ case Docs of
+ [] ->
+ T;
+ [_ | _] ->
+ Docs1 = lists:reverse(Docs),
+ {ok, TDb1} = couch_db_engine:write_doc_infos(TDb, [], Docs1),
+ {ok, TDb2} = couch_db_engine:commit_data(TDb1),
+ T#target{db = TDb2, buffer = []}
+ end
+ end, Targets1),
+ State#state{targets = Targets2}.
+
+
+update_checkpoint_doc(Old, New, #doc{body = {Props}} = Doc) ->
+ NewProps = case couch_util:get_value(<<"target_uuid">>, Props) of
+ Old ->
+ replace_kv(Props, {<<"target_uuid">>, Old, New});
+ Other when is_binary(Other) ->
+ replace_kv(Props, {<<"source_uuid">>, Old, New})
+ end,
+ NewId = update_checkpoint_id(Doc#doc.id, Old, New),
+ Doc#doc{id = NewId, body = {NewProps}}.
+
+
+update_checkpoint_id(Id, Old, New) ->
+ OldHash = mem3_rep:local_id_hash(Old),
+ NewHash = mem3_rep:local_id_hash(New),
+ binary:replace(Id, OldHash, NewHash).
+
+
+replace_kv({[]}, _) ->
+ {[]};
+replace_kv({KVs}, Replacement) ->
+ {[replace_kv(KV, Replacement) || KV <- KVs]};
+replace_kv([], _) ->
+ [];
+replace_kv(List, Replacement) when is_list(List) ->
+ [replace_kv(V, Replacement) || V <- List];
+replace_kv({K, V}, {K, V, NewV}) ->
+ {K, NewV};
+replace_kv({K, V}, Replacement) ->
+ {K, replace_kv(V, Replacement)};
+replace_kv(V, _) ->
+ V.
+
+
+copy_meta(#state{source_db = SourceDb, targets = Targets} = State) ->
+ RevsLimit = couch_db:get_revs_limit(SourceDb),
+ {SecProps} = couch_db:get_security(SourceDb),
+ PurgeLimit = couch_db:get_purge_infos_limit(SourceDb),
+ Targets1 = maps:map(fun(_, #target{db = Db} = T) ->
+ {ok, Db1} = couch_db_engine:set_revs_limit(Db, RevsLimit),
+ {ok, Db2} = couch_db_engine:set_security(Db1, SecProps),
+ {ok, Db3} = couch_db_engine:set_purge_infos_limit(Db2, PurgeLimit),
+ T#target{db = Db3}
+ end, Targets),
+ State#state{targets = Targets1}.
+
+
+copy_purge_info(#state{source_db = Db} = State) ->
+ {ok, NewState} = couch_db:fold_purge_infos(Db, 0, fun purge_cb/2, State),
+ Targets = maps:map(fun(_, #target{} = T) ->
+ commit_purge_infos(T)
+ end, NewState#state.targets),
+ NewState#state{targets = Targets}.
+
+
+purge_cb({_PSeq, _UUID, Id, _Revs} = PI, #state{targets = Targets} = State) ->
+ {Key, Target} = pick_target(Id, State, Targets),
+ #target{buffer = Buffer, buffer_size = BufferSize} = Target,
+ Target1 = Target#target{
+ buffer = [PI | Buffer],
+ buffer_size = BufferSize + ?term_size(PI)
+ },
+ Target2 = case Target1#target.buffer_size > State#state.max_buffer_size of
+ true -> commit_purge_infos(Target1);
+ false -> Target1
+ end,
+ {ok, State#state{targets = Targets#{Key => Target2}}}.
+
+
+commit_purge_infos(#target{buffer = [], db = Db} = Target) ->
+ Target#target{db = Db};
+
+commit_purge_infos(#target{buffer = PIs0, db = Db} = Target) ->
+ PIs = lists:reverse(PIs0),
+ {ok, Db1} = couch_db_engine:copy_purge_infos(Db, PIs),
+ {ok, Db2} = couch_db_engine:commit_data(Db1),
+ Target#target{buffer = [], buffer_size = 0, db = Db2}.
+
+
+copy_docs(#state{source_db = Db} = State) ->
+ {ok, NewState} = couch_db:fold_changes(Db, 0, fun changes_cb/2, State),
+ CommitTargets = maps:map(fun(_, #target{} = T) ->
+ commit_docs(T)
+ end, NewState#state.targets),
+ NewState#state{targets = CommitTargets}.
+
+
+% Backwards compatibility clause. Seq trees used to hold #doc_infos at one time
+changes_cb(#doc_info{id = Id}, #state{source_db = Db} = State) ->
+ [FDI = #full_doc_info{}] = couch_db_engine:open_docs(Db, [Id]),
+ changes_cb(FDI, State);
+
+changes_cb(#full_doc_info{id = Id} = FDI, #state{} = State) ->
+ #state{source_db = SourceDb, targets = Targets} = State,
+ {Key, Target} = pick_target(Id, State, Targets),
+ #target{db = TargetDb, buffer = Buffer} = Target,
+ FDI1 = process_fdi(FDI, SourceDb, TargetDb),
+ Target1 = Target#target{
+ buffer = [FDI1 | Buffer],
+ buffer_size = Target#target.buffer_size + ?term_size(FDI1)
+ },
+ Target2 = case Target1#target.buffer_size > State#state.max_buffer_size of
+ true -> commit_docs(Target1);
+ false -> Target1
+ end,
+ {ok, State#state{targets = Targets#{Key => Target2}}}.
+
+
+commit_docs(#target{buffer = [], db = Db} = Target) ->
+ Target#target{db = Db};
+
+commit_docs(#target{buffer = FDIs, db = Db} = Target) ->
+ Pairs = [{not_found, FDI} || FDI <- lists:reverse(FDIs)],
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, []),
+ {ok, Db2} = couch_db_engine:commit_data(Db1),
+ Target#target{buffer = [], buffer_size = 0, db = Db2}.
+
+
+process_fdi(FDI, SourceDb, TargetDb) ->
+ #full_doc_info{id = Id, rev_tree = RTree} = FDI,
+ Acc = #racc{id = Id, source_db = SourceDb, target_db = TargetDb},
+ {NewRTree, NewAcc} = couch_key_tree:mapfold(fun revtree_cb/4, Acc, RTree),
+ {Active, External} = total_sizes(NewAcc),
+ FDI#full_doc_info{
+ rev_tree = NewRTree,
+ sizes = #size_info{active = Active, external = External}
+ }.
+
+
+revtree_cb(_Rev, _Leaf, branch, Acc) ->
+ {[], Acc};
+
+revtree_cb({Pos, RevId}, Leaf, leaf, Acc) ->
+ #racc{id = Id, source_db = SourceDb, target_db = TargetDb} = Acc,
+ #leaf{deleted = Deleted, ptr = Ptr} = Leaf,
+ Doc0 = #doc{
+ id = Id,
+ revs = {Pos, [RevId]},
+ deleted = Deleted,
+ body = Ptr
+ },
+ Doc1 = couch_db_engine:read_doc_body(SourceDb, Doc0),
+ #doc{body = Body, atts = AttInfos0} = Doc1,
+ External = if not is_binary(Body) -> ?term_size(body); true ->
+ couch_compress:uncompressed_size(Body)
+ end,
+ AttInfos = if not is_binary(AttInfos0) -> AttInfos0; true ->
+ couch_compress:decompress(AttInfos0)
+ end,
+ Atts = [process_attachment(Att, SourceDb, TargetDb) || Att <- AttInfos],
+ Doc2 = Doc1#doc{atts = Atts},
+ Doc3 = couch_db_engine:serialize_doc(TargetDb, Doc2),
+ {ok, Doc4, Active} = couch_db_engine:write_doc_body(TargetDb, Doc3),
+ AttSizes = [{element(3, A), element(4, A)} || A <- Atts],
+ NewLeaf = Leaf#leaf{
+ ptr = Doc4#doc.body,
+ sizes = #size_info{active = Active, external = External},
+ atts = AttSizes
+ },
+ {NewLeaf, add_sizes(Active, External, AttSizes, Acc)}.
+
+
+% This is copied almost verbatim from the compactor
+process_attachment({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}, SourceDb,
+ TargetDb) ->
+ % 010 upgrade code
+ {ok, SrcStream} = couch_db_engine:open_read_stream(SourceDb, BinSp),
+ {ok, DstStream} = couch_db_engine:open_write_stream(TargetDb, []),
+ ok = couch_stream:copy(SrcStream, DstStream),
+ {NewStream, AttLen, AttLen, ActualMd5, _IdentityMd5} =
+ couch_stream:close(DstStream),
+ {ok, NewBinSp} = couch_stream:to_disk_term(NewStream),
+ couch_util:check_md5(ExpectedMd5, ActualMd5),
+ {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
+
+process_attachment({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5,
+ Enc1}, SourceDb, TargetDb) ->
+ {ok, SrcStream} = couch_db_engine:open_read_stream(SourceDb, BinSp),
+ {ok, DstStream} = couch_db_engine:open_write_stream(TargetDb, []),
+ ok = couch_stream:copy(SrcStream, DstStream),
+ {NewStream, AttLen, _, ActualMd5, _IdentityMd5} =
+ couch_stream:close(DstStream),
+ {ok, NewBinSp} = couch_stream:to_disk_term(NewStream),
+ couch_util:check_md5(ExpectedMd5, ActualMd5),
+ Enc = case Enc1 of
+ true -> gzip; % 0110 upgrade code
+ false -> identity; % 0110 upgrade code
+ _ -> Enc1
+ end,
+ {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}.
+
+
+get_engine(Db) ->
+ {ok, DbInfoProps} = couch_db:get_db_info(Db),
+ proplists:get_value(engine, DbInfoProps).
+
+
+add_sizes(Active, External, Atts, #racc{} = Acc) ->
+ #racc{active = ActiveAcc, external = ExternalAcc, atts = AttsAcc} = Acc,
+ NewActiveAcc = ActiveAcc + Active,
+ NewExternalAcc = ExternalAcc + External,
+ NewAttsAcc = lists:umerge(Atts, AttsAcc),
+ Acc#racc{
+ active = NewActiveAcc,
+ external = NewExternalAcc,
+ atts = NewAttsAcc
+ }.
+
+
+total_sizes(#racc{active = Active, external = External, atts = Atts}) ->
+ TotalAtts = lists:foldl(fun({_, S}, A) -> S + A end, 0, Atts),
+ {Active + TotalAtts, External + TotalAtts}.
+
+
+get_max_buffer_size() ->
+ config:get_integer("shard_splitting", "buffer_size", ?DEFAULT_BUFFER_SIZE).
+
+
+copy_local_docs(#state{source_db = Db, targets = Targets} = State) ->
+ FoldFun = fun(#doc{id = Id} = Doc, Acc) ->
+ UpdatedAcc = case Id of
+ <<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>> ->
+ Acc;
+ <<?LOCAL_DOC_PREFIX, "purge-", _/binary>> ->
+ Acc;
+ <<?LOCAL_DOC_PREFIX, _/binary>> ->
+ % Users' and replicator app's checkpoints go to their
+ % respective shards based on the general hashing algorithm
+ {Key, Target} = pick_target(Id, State, Acc),
+ #target{buffer = Docs} = Target,
+ Acc#{Key => Target#target{buffer = [Doc | Docs]}}
+ end,
+ {ok, UpdatedAcc}
+ end,
+ {ok, Targets1} = couch_db:fold_local_docs(Db, FoldFun, Targets, []),
+ Targets2 = maps:map(fun(_, #target{db = TDb, buffer = Docs} = T) ->
+ case Docs of
+ [] ->
+ T;
+ [_ | _] ->
+ Docs1 = lists:reverse(Docs),
+ {ok, _} = couch_db:update_docs(TDb, Docs1),
+ {ok, _} = couch_db:ensure_full_commit(TDb),
+ T#target{buffer = []}
+ end
+ end, Targets1),
+ State#state{targets = Targets2}.
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 3ceab3abd..3bbd2eb34 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -25,6 +25,7 @@
-export([delete_compaction_files/1]).
-export([exists/1]).
-export([get_engine_extensions/0]).
+-export([get_engine_path/2]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -767,6 +768,16 @@ get_engine_extensions() ->
end.
+get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
+ RootDir = config:get("couchdb", "database_dir", "."),
+ case lists:keyfind(Engine, 2, get_configured_engines()) of
+ {Ext, Engine} ->
+ {ok, make_filepath(RootDir, DbName, Ext)};
+ false ->
+ {error, {invalid_engine, Engine}}
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/couch/test/couch_db_split_tests.erl b/src/couch/test/couch_db_split_tests.erl
new file mode 100644
index 000000000..5361c3b8e
--- /dev/null
+++ b/src/couch/test/couch_db_split_tests.erl
@@ -0,0 +1,298 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_split_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(RINGTOP, 2 bsl 31).
+
+
+setup() ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ ok = couch_db:close(Db),
+ DbName.
+
+
+teardown(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ FilePath = couch_db:get_filepath(Db),
+ ok = couch_db:close(Db),
+ ok = file:delete(FilePath).
+
+
+split_test_() ->
+ Cases = [
+ {"Should split an empty shard", 0, 2},
+ {"Should split shard in half", 100, 2},
+ {"Should split shard in three", 99, 3},
+ {"Should split shard in four", 100, 4}
+ ],
+ {
+ setup,
+ fun test_util:start_couch/0, fun test_util:stop/1,
+ [
+ {
+ foreachx,
+ fun(_) -> setup() end, fun(_, St) -> teardown(St) end,
+ [{Case, fun should_split_shard/2} || Case <- Cases]
+ },
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_fail_on_missing_source/1,
+ fun should_fail_on_existing_target/1,
+ fun should_fail_on_invalid_target_name/1,
+ fun should_crash_on_invalid_tmap/1
+ ]
+ }
+ ]
+ }.
+
+
+should_split_shard({Desc, TotalDocs, Q}, DbName) ->
+ {ok, ExpectSeq} = create_docs(DbName, TotalDocs),
+ Ranges = make_ranges(Q),
+ TMap = make_targets(Ranges),
+ DocsPerRange = TotalDocs div Q,
+ PickFun = make_pickfun(DocsPerRange),
+ {Desc, ?_test(begin
+ {ok, UpdateSeq} = couch_db_split:split(DbName, TMap, PickFun),
+ ?assertEqual(ExpectSeq, UpdateSeq),
+ maps:map(fun(Range, Name) ->
+ {ok, Db} = couch_db:open_int(Name, []),
+ FilePath = couch_db:get_filepath(Db),
+ %% target is actually exist
+ ?assertMatch({ok, _}, file:read_file_info(FilePath)),
+ %% target's update seq is the same as source's update seq
+ USeq = couch_db:get_update_seq(Db),
+ ?assertEqual(ExpectSeq, USeq),
+ %% target shard has all the expected in its range docs
+ {ok, DocsInShard} = couch_db:fold_docs(Db, fun(FDI, Acc) ->
+ DocId = FDI#full_doc_info.id,
+ ExpectedRange = PickFun(DocId, Ranges, undefined),
+ ?assertEqual(ExpectedRange, Range),
+ {ok, Acc + 1}
+ end, 0),
+ ?assertEqual(DocsPerRange, DocsInShard),
+ ok = couch_db:close(Db),
+ ok = file:delete(FilePath)
+ end, TMap)
+ end)}.
+
+
+should_fail_on_missing_source(_DbName) ->
+ DbName = ?tempdb(),
+ Ranges = make_ranges(2),
+ TMap = make_targets(Ranges),
+ Response = couch_db_split:split(DbName, TMap, fun fake_pickfun/3),
+ ?_assertEqual({error, missing_source}, Response).
+
+
+should_fail_on_existing_target(DbName) ->
+ Ranges = make_ranges(2),
+ TMap = maps:map(fun(_, _) -> DbName end, make_targets(Ranges)),
+ Response = couch_db_split:split(DbName, TMap, fun fake_pickfun/3),
+ ?_assertMatch({error, {target_create_error, DbName, eexist}}, Response).
+
+
+should_fail_on_invalid_target_name(DbName) ->
+ Ranges = make_ranges(2),
+ TMap = maps:map(fun([B, _], _) ->
+ iolist_to_binary(["_$", couch_util:to_hex(<<B:32/integer>>)])
+ end, make_targets(Ranges)),
+ Expect = {error, {target_create_error, <<"_$00000000">>,
+ {illegal_database_name, <<"_$00000000">>}}},
+ Response = couch_db_split:split(DbName, TMap, fun fake_pickfun/3),
+ ?_assertMatch(Expect, Response).
+
+
+should_crash_on_invalid_tmap(DbName) ->
+ Ranges = make_ranges(1),
+ TMap = make_targets(Ranges),
+ ?_assertError(function_clause,
+ couch_db_split:split(DbName, TMap, fun fake_pickfun/3)).
+
+
+copy_local_docs_test_() ->
+ Cases = [
+ {"Should work with no docs", 0, 2},
+ {"Should copy local docs after split in two", 100, 2},
+ {"Should copy local docs after split in three", 99, 3},
+ {"Should copy local docs after split in four", 100, 4}
+ ],
+ {
+ setup,
+ fun test_util:start_couch/0, fun test_util:stop/1,
+ [
+ {
+ foreachx,
+ fun(_) -> setup() end, fun(_, St) -> teardown(St) end,
+ [{Case, fun should_copy_local_docs/2} || Case <- Cases]
+ },
+ {"Should return error on missing source",
+ fun should_fail_copy_local_on_missing_source/0}
+ ]
+ }.
+
+
+should_copy_local_docs({Desc, TotalDocs, Q}, DbName) ->
+ {ok, ExpectSeq} = create_docs(DbName, TotalDocs),
+ Ranges = make_ranges(Q),
+ TMap = make_targets(Ranges),
+ DocsPerRange = TotalDocs div Q,
+ PickFun = make_pickfun(DocsPerRange),
+ {Desc, ?_test(begin
+ {ok, UpdateSeq} = couch_db_split:split(DbName, TMap, PickFun),
+ ?assertEqual(ExpectSeq, UpdateSeq),
+ Response = couch_db_split:copy_local_docs(DbName, TMap, PickFun),
+ ?assertEqual(ok, Response),
+ maps:map(fun(Range, Name) ->
+ {ok, Db} = couch_db:open_int(Name, []),
+ FilePath = couch_db:get_filepath(Db),
+ %% target shard has all the expected in its range docs
+ {ok, DocsInShard} = couch_db:fold_local_docs(Db, fun(Doc, Acc) ->
+ DocId = Doc#doc.id,
+ ExpectedRange = PickFun(DocId, Ranges, undefined),
+ ?assertEqual(ExpectedRange, Range),
+ {ok, Acc + 1}
+ end, 0, []),
+ ?assertEqual(DocsPerRange, DocsInShard),
+ ok = couch_db:close(Db),
+ ok = file:delete(FilePath)
+ end, TMap)
+ end)}.
+
+
+should_fail_copy_local_on_missing_source() ->
+ DbName = ?tempdb(),
+ Ranges = make_ranges(2),
+ TMap = make_targets(Ranges),
+ PickFun = fun fake_pickfun/3,
+ Response = couch_db_split:copy_local_docs(DbName, TMap, PickFun),
+ ?assertEqual({error, missing_source}, Response).
+
+
+cleanup_target_test_() ->
+ {
+ setup,
+ fun test_util:start_couch/0, fun test_util:stop/1,
+ [
+ {
+ setup,
+ fun setup/0, fun teardown/1,
+ fun should_delete_existing_targets/1
+ },
+ {"Should return error on missing source",
+ fun should_fail_cleanup_target_on_missing_source/0}
+ ]
+ }.
+
+
+should_delete_existing_targets(SourceName) ->
+ {ok, ExpectSeq} = create_docs(SourceName, 100),
+ Ranges = make_ranges(2),
+ TMap = make_targets(Ranges),
+ PickFun = make_pickfun(50),
+ ?_test(begin
+ {ok, UpdateSeq} = couch_db_split:split(SourceName, TMap, PickFun),
+ ?assertEqual(ExpectSeq, UpdateSeq),
+ maps:map(fun(_Range, TargetName) ->
+ FilePath = couch_util:with_db(TargetName, fun(Db) ->
+ couch_db:get_filepath(Db)
+ end),
+ ?assertMatch({ok, _}, file:read_file_info(FilePath)),
+ Response = couch_db_split:cleanup_target(SourceName, TargetName),
+ ?assertEqual(ok, Response),
+ ?assertEqual({error, enoent}, file:read_file_info(FilePath))
+ end, TMap)
+ end).
+
+
+should_fail_cleanup_target_on_missing_source() ->
+ SourceName = ?tempdb(),
+ TargetName = ?tempdb(),
+ Response = couch_db_split:cleanup_target(SourceName, TargetName),
+ ?assertEqual({error, missing_source}, Response).
+
+
+make_pickfun(DocsPerRange) ->
+ fun(DocId, Ranges, _HashFun) ->
+ Id = docid_to_integer(DocId),
+ case {Id div DocsPerRange, Id rem DocsPerRange} of
+ {N, 0} ->
+ lists:nth(N, Ranges);
+ {N, _} ->
+ lists:nth(N + 1, Ranges)
+ end
+ end.
+
+
+fake_pickfun(_, Ranges, _) ->
+ hd(Ranges).
+
+
+make_targets([]) ->
+ maps:new();
+make_targets(Ranges) ->
+ Targets = lists:map(fun(Range) ->
+ {Range, ?tempdb()}
+ end, Ranges),
+ maps:from_list(Targets).
+
+
+make_ranges(Q) when Q > 0 ->
+ Incr = (2 bsl 31) div Q,
+ lists:map(fun
+ (End) when End >= ?RINGTOP - 1 ->
+ [End - Incr, ?RINGTOP - 1];
+ (End) ->
+ [End - Incr, End - 1]
+ end, lists:seq(Incr, ?RINGTOP, Incr));
+make_ranges(_) ->
+ [].
+
+
+create_docs(DbName, 0) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ UpdateSeq = couch_db:get_update_seq(Db),
+ {ok, UpdateSeq}
+ end);
+create_docs(DbName, DocNum) ->
+ Docs = lists:foldl(fun(I, Acc) ->
+ [create_doc(I), create_local_doc(I) | Acc]
+ end, [], lists:seq(DocNum, 1, -1)),
+ couch_util:with_db(DbName, fun(Db) ->
+ {ok, _Result} = couch_db:update_docs(Db, Docs),
+ {ok, _StartTime} = couch_db:ensure_full_commit(Db),
+ {ok, Db1} = couch_db:reopen(Db),
+ UpdateSeq = couch_db:get_update_seq(Db1),
+ {ok, UpdateSeq}
+ end).
+
+
+create_doc(I) ->
+ Id = iolist_to_binary(io_lib:format("~3..0B", [I])),
+ couch_doc:from_json_obj({[{<<"_id">>, Id}, {<<"value">>, I}]}).
+
+
+create_local_doc(I) ->
+ Id = iolist_to_binary(io_lib:format("_local/~3..0B", [I])),
+ couch_doc:from_json_obj({[{<<"_id">>, Id}, {<<"value">>, I}]}).
+
+docid_to_integer(<<"_local/", DocId/binary>>) ->
+ docid_to_integer(DocId);
+docid_to_integer(DocId) ->
+ list_to_integer(binary_to_list(DocId)).
diff --git a/src/couch/test/couch_server_tests.erl b/src/couch/test/couch_server_tests.erl
index fdbc175ce..530b7efd0 100644
--- a/src/couch/test/couch_server_tests.erl
+++ b/src/couch/test/couch_server_tests.erl
@@ -108,6 +108,36 @@ t_bad_engine_option() ->
?assertEqual(Resp, {error, {invalid_engine_extension, <<"cowabunga!">>}}).
+get_engine_path_test_() ->
+ {
+ setup,
+ fun start/0, fun test_util:stop/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_return_engine_path/1,
+ fun should_return_invalid_engine_error/1
+ ]
+ }
+ }.
+
+
+should_return_engine_path(Db) ->
+ DbName = couch_db:name(Db),
+ Engine = couch_db_engine:get_engine(Db),
+ Resp = couch_server:get_engine_path(DbName, Engine),
+ FilePath = couch_db:get_filepath(Db),
+ ?_assertMatch({ok, FilePath}, Resp).
+
+
+should_return_invalid_engine_error(Db) ->
+ DbName = couch_db:name(Db),
+ Engine = fake_engine,
+ Resp = couch_server:get_engine_path(DbName, Engine),
+ ?_assertMatch({error, {invalid_engine, Engine}}, Resp).
+
+
interleaved_requests_test_() ->
{
setup,
diff --git a/src/couch_pse_tests/src/cpse_test_copy_purge_infos.erl b/src/couch_pse_tests/src/cpse_test_copy_purge_infos.erl
new file mode 100644
index 000000000..4e41430d3
--- /dev/null
+++ b/src/couch_pse_tests/src/cpse_test_copy_purge_infos.erl
@@ -0,0 +1,82 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(cpse_test_copy_purge_infos).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(NUM_DOCS, 100).
+
+
+setup_each() ->
+ {ok, SrcDb} = cpse_util:create_db(),
+ {ok, SrcDb2} = create_and_purge(SrcDb),
+ {ok, TrgDb} = cpse_util:create_db(),
+ {SrcDb2, TrgDb}.
+
+
+teardown_each({SrcDb, TrgDb}) ->
+ ok = couch_server:delete(couch_db:name(SrcDb), []),
+ ok = couch_server:delete(couch_db:name(TrgDb), []).
+
+
+cpse_copy_empty_purged_info({_, Db}) ->
+ {ok, Db1} = couch_db_engine:copy_purge_infos(Db, []),
+ ?assertEqual(ok, cpse_util:assert_each_prop(Db1, [{purge_infos, []}])).
+
+
+cpse_copy_purged_info({SrcDb, TrgDb}) ->
+ {ok, RPIs} = couch_db_engine:fold_purge_infos(SrcDb, 0, fun(PI, Acc) ->
+ {ok, [PI | Acc]}
+ end, [], []),
+ PIs = lists:reverse(RPIs),
+ AEPFold = fun({PSeq, UUID, Id, Revs}, {CPSeq, CPurges}) ->
+ {max(PSeq, CPSeq), [{UUID, Id, Revs} | CPurges]}
+ end,
+ {PurgeSeq, RPurges} = lists:foldl(AEPFold, {0, []}, PIs),
+ Purges = lists:reverse(RPurges),
+ {ok, TrgDb2} = couch_db_engine:copy_purge_infos(TrgDb, PIs),
+ AssertProps = [{purge_seq, PurgeSeq}, {purge_infos, Purges}],
+ ?assertEqual(ok, cpse_util:assert_each_prop(TrgDb2, AssertProps)).
+
+
+create_and_purge(Db) ->
+ {RActions, RIds} = lists:foldl(fun(Id, {CActions, CIds}) ->
+ Id1 = docid(Id),
+ Action = {create, {Id1, {[{<<"int">>, Id}]}}},
+ {[Action| CActions], [Id1| CIds]}
+ end, {[], []}, lists:seq(1, ?NUM_DOCS)),
+ Actions = lists:reverse(RActions),
+ Ids = lists:reverse(RIds),
+ {ok, Db1} = cpse_util:apply_batch(Db, Actions),
+
+ FDIs = couch_db_engine:open_docs(Db1, Ids),
+ RActions2 = lists:foldl(fun(FDI, CActions) ->
+ Id = FDI#full_doc_info.id,
+ PrevRev = cpse_util:prev_rev(FDI),
+ Rev = PrevRev#rev_info.rev,
+ Action = {purge, {Id, Rev}},
+ [Action| CActions]
+ end, [], FDIs),
+ Actions2 = lists:reverse(RActions2),
+ {ok, Db2} = cpse_util:apply_batch(Db1, Actions2),
+ {ok, Db2}.
+
+
+docid(I) ->
+ Str = io_lib:format("~4..0b", [I]),
+ iolist_to_binary(Str).
diff --git a/src/couch_pse_tests/src/cpse_util.erl b/src/couch_pse_tests/src/cpse_util.erl
index 1bf24314e..f8f6a19ae 100644
--- a/src/couch_pse_tests/src/cpse_util.erl
+++ b/src/couch_pse_tests/src/cpse_util.erl
@@ -27,6 +27,7 @@
cpse_test_fold_docs,
cpse_test_fold_changes,
cpse_test_fold_purge_infos,
+ cpse_test_copy_purge_infos,
cpse_test_purge_docs,
cpse_test_purge_replication,
cpse_test_purge_bad_checkpoints,