summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-04-22 19:53:13 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-04-28 17:19:44 -0400
commit0e38f390796d6e34f698853f5886a36607e2aeec (patch)
treec910ca79a6e1ab1d8e8c08ac8b2b02320d2aafb8
parent0bc311b7d7ec9836679b929ee5c2a630ad2ff01f (diff)
downloadcouchdb-0e38f390796d6e34f698853f5886a36607e2aeec.tar.gz
Improve index building during shard splitting
Previously we didn't check responses from get_state/2 or await/2 functions when building indices. If an index updater crashed, and the index never finished building, the get_state/2 call would simply return an error and the process would exit normally. Then, the shard splitting job would count that as a success and continue to make progress. To fix that, make sure to check the response to all the supported indexing types and wait until they return an `ok` result. Additionally, increase the index building resilience to allow for more retries on failure, and for configurable retries for individual index builders.
-rw-r--r--rel/overlay/etc/default.ini8
-rw-r--r--src/mem3/src/mem3_reshard_index.erl131
-rw-r--r--src/mem3/src/mem3_reshard_job.erl22
-rw-r--r--src/mem3/test/eunit/mem3_reshard_test.erl54
4 files changed, 142 insertions, 73 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 7535501c0..5fb45b5b5 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -719,7 +719,7 @@ state_dir = {{state_dir}}
[reshard]
;max_jobs = 48
;max_history = 20
-;max_retries = 1
+;max_retries = 5
;retry_interval_sec = 10
;delete_source = true
;update_shard_map_timeout_sec = 60
@@ -727,6 +727,12 @@ state_dir = {{state_dir}}
;require_node_param = false
;require_range_param = false
+; How many times to retry building an individual index
+;index_max_retries = 5
+
+; How many seconds to wait between retries for an individual index
+;index_retry_interval_sec = 10
+
[prometheus]
additional_port = false
bind_address = 127.0.0.1
diff --git a/src/mem3/src/mem3_reshard_index.erl b/src/mem3/src/mem3_reshard_index.erl
index fef25d52c..fa0a101b5 100644
--- a/src/mem3/src/mem3_reshard_index.erl
+++ b/src/mem3/src/mem3_reshard_index.erl
@@ -15,11 +15,15 @@
-export([
design_docs/1,
target_indices/2,
- spawn_builders/1
+ spawn_builders/1,
+ build_index/2
]).
+-define(MRVIEW, mrview).
+-define(DREYFUS, dreyfus).
+-define(HASTINGS, hastings).
+
-include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
%% Public API
@@ -44,25 +48,8 @@ target_indices(Docs, Targets) ->
lists:flatten(Indices).
spawn_builders(Indices) ->
- Results = [build_index(Index) || Index <- Indices],
- Oks = [{ok, Pid} || {ok, Pid} <- Results, is_pid(Pid)],
- case Results -- Oks of
- [] ->
- {ok, [Pid || {ok, Pid} <- Results]};
- Error ->
- % Do a all or nothing pattern, if some indices could not be
- % spawned, kill the spawned ones and and return the error.
- ErrMsg = "~p failed to spawn index builders: ~p ~p",
- couch_log:error(ErrMsg, [?MODULE, Error, Indices]),
- lists:foreach(
- fun({ok, Pid}) ->
- catch unlink(Pid),
- catch exit(Pid, kill)
- end,
- Oks
- ),
- {error, Error}
- end.
+ Retries = max_retries(),
+ [spawn_link(?MODULE, build_index, [Idx, Retries]) || Idx <- Indices].
%% Private API
@@ -83,7 +70,7 @@ mrview_indices(DbName, Doc) ->
Views = couch_mrview_index:get(views, MRSt),
case Views =/= [] of
true ->
- [{mrview, DbName, MRSt}];
+ [{?MRVIEW, DbName, MRSt}];
false ->
[]
end
@@ -97,7 +84,7 @@ mrview_indices(DbName, Doc) ->
dreyfus_indices(DbName, Doc) ->
try
Indices = dreyfus_index:design_doc_to_indexes(Doc),
- [{dreyfus, DbName, Index} || Index <- Indices]
+ [{?DREYFUS, DbName, Index} || Index <- Indices]
catch
Tag:Err ->
Msg = "~p couldn't get dreyfus indices ~p ~p ~p:~p",
@@ -108,7 +95,7 @@ dreyfus_indices(DbName, Doc) ->
hastings_indices(DbName, Doc) ->
try
Indices = hastings_index:design_doc_to_indexes(Doc),
- [{hastings, DbName, Index} || Index <- Indices]
+ [{?HASTINGS, DbName, Index} || Index <- Indices]
catch
Tag:Err ->
Msg = "~p couldn't get hasting indices ~p ~p ~p:~p",
@@ -116,33 +103,71 @@ hastings_indices(DbName, Doc) ->
[]
end.
-build_index({mrview, DbName, MRSt}) ->
- case couch_index_server:get_index(couch_mrview_index, MRSt) of
- {ok, Pid} ->
- Args = [Pid, get_update_seq(DbName)],
- WPid = spawn_link(couch_index, get_state, Args),
- {ok, WPid};
- Error ->
- Error
- end;
-build_index({dreyfus, DbName, Index}) ->
- case dreyfus_index_manager:get_index(DbName, Index) of
- {ok, Pid} ->
- Args = [Pid, get_update_seq(DbName)],
- WPid = spawn_link(dreyfus_index, await, Args),
- {ok, WPid};
- Error ->
- Error
+build_index({?MRVIEW, _DbName, MRSt} = Ctx, Try) ->
+ await_retry(
+ couch_index_server:get_index(couch_mrview_index, MRSt),
+ fun couch_index:get_state/2,
+ Ctx,
+ Try
+ );
+build_index({?DREYFUS, DbName, DIndex} = Ctx, Try) ->
+ await_retry(
+ dreyfus_index_manager:get_index(DbName, DIndex),
+ fun dreyfus_index:await/2,
+ Ctx,
+ Try
+ );
+build_index({?HASTINGS, DbName, HIndex} = Ctx, Try) ->
+ await_retry(
+ hastings_index_manager:get_index(DbName, HIndex),
+ fun hastings_index:await/2,
+ Ctx,
+ Try
+ ).
+
+await_retry({ok, Pid}, AwaitIndex, {_, DbName, _} = Ctx, Try) ->
+ try AwaitIndex(Pid, get_update_seq(DbName)) of
+ {ok, _} -> ok;
+ {ok, _, _} -> ok;
+ AwaitError -> maybe_retry(Ctx, AwaitError, Try)
+ catch
+ _:CatchError ->
+ maybe_retry(Ctx, CatchError, Try)
end;
-build_index({hastings, DbName, Index}) ->
- case hastings_index_manager:get_index(DbName, Index) of
- {ok, Pid} ->
- Args = [Pid, get_update_seq(DbName)],
- WPid = spawn_link(hastings_index, await, Args),
- {ok, WPid};
- Error ->
- Error
- end.
+await_retry(OpenError, _AwaitIndex, Ctx, Try) ->
+ maybe_retry(Ctx, OpenError, Try).
+
+maybe_retry(Ctx, killed = Error, Try) ->
+ retry(Ctx, Error, Try);
+maybe_retry(Ctx, {killed, _} = Error, Try) ->
+ retry(Ctx, Error, Try);
+maybe_retry(Ctx, shutdown = Error, Try) ->
+ retry(Ctx, Error, Try);
+maybe_retry(Ctx, Error, 0) ->
+ fail(Ctx, Error);
+maybe_retry(Ctx, Error, Try) when is_integer(Try), Try > 0 ->
+ retry(Ctx, Error, Try - 1).
+
+retry(Ctx, Error, Try) ->
+ IndexInfo = index_info(Ctx),
+ LogMsg = "~p : error ~p when building ~p, retrying (~p)",
+ couch_log:warning(LogMsg, [?MODULE, Error, IndexInfo, Try]),
+ timer:sleep(retry_interval_sec() * 1000),
+ build_index(Ctx, Try).
+
+fail(Ctx, Error) ->
+ IndexInfo = index_info(Ctx),
+ LogMsg = "~p : error ~p when building ~p, max tries exceeded, failing",
+ couch_log:error(LogMsg, [?MODULE, Error, IndexInfo]),
+ exit({error_building_index, IndexInfo}).
+
+index_info({?MRVIEW, DbName, MRSt}) ->
+ GroupName = couch_mrview_index:get(idx_name, MRSt),
+ {DbName, GroupName};
+index_info({?DREYFUS, DbName, Index}) ->
+ {DbName, Index};
+index_info({?HASTINGS, DbName, Index}) ->
+ {DbName, Index}.
has_app(App) ->
code:lib_dir(App) /= {error, bad_name}.
@@ -151,3 +176,9 @@ get_update_seq(DbName) ->
couch_util:with_db(DbName, fun(Db) ->
couch_db:get_update_seq(Db)
end).
+
+max_retries() ->
+ config:get_integer("reshard", "index_max_retries", 5).
+
+retry_interval_sec() ->
+ config:get_integer("reshard", "index_retry_interval_sec", 10).
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index aff5c2648..a9fb48134 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -408,29 +408,17 @@ topoff_impl(#job{source = #shard{} = Source, target = Targets}) ->
build_indices(#job{} = Job) ->
#job{
source = #shard{name = SourceName} = Source,
- target = Targets,
- retries = Retries,
- state_info = Info
+ target = Targets
} = Job,
check_source_exists(Source, build_indices),
{ok, DDocs} = mem3_reshard_index:design_docs(SourceName),
Indices = mem3_reshard_index:target_indices(DDocs, Targets),
case mem3_reshard_index:spawn_builders(Indices) of
- {ok, []} ->
+ [] ->
% Skip the log spam if this is a no-op
Job#job{workers = []};
- {ok, Pids} ->
- report(Job#job{workers = Pids});
- {error, Error} ->
- case Job#job.retries =< max_retries() of
- true ->
- build_indices(Job#job{
- retries = Retries + 1,
- state_info = info_update(error, Error, Info)
- });
- false ->
- exit(Error)
- end
+ [_ | _] = Pids ->
+ report(Job#job{workers = Pids})
end.
copy_local_docs(#job{split_state = copy_local_docs} = Job) ->
@@ -612,7 +600,7 @@ check_targets_exist(Targets, StateName) ->
-spec max_retries() -> integer().
max_retries() ->
- config:get_integer("reshard", "max_retries", 1).
+ config:get_integer("reshard", "max_retries", 5).
-spec retry_interval_sec() -> integer().
retry_interval_sec() ->
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
index 1929242bb..be539b47a 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -37,13 +37,15 @@ setup() ->
create_db(Db1, [{q, 1}, {n, 1}]),
PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]),
- config:set("reshard", "retry_interval_sec", "0", _Persist = false),
+ config:set("reshard", "retry_interval_sec", "0", _Persist1 = false),
+ config:set("reshard", "index_retry_interval_sec", "0", _Persist2 = false),
#{db1 => Db1, db2 => Db2}.
teardown(#{} = Dbs) ->
mem3_reshard:reset_state(),
maps:map(fun(_, Db) -> delete_db(Db) end, Dbs),
- config:delete("reshard", "retry_interval_sec", _Persist = false),
+ config:delete("reshard", "index_retry_interval_sec", _Persist1 = false),
+ config:delete("reshard", "retry_interval_sec", _Persist2 = false),
meck:unload().
start_couch() ->
@@ -68,6 +70,7 @@ mem3_reshard_db_test_() ->
fun split_shard_with_lots_of_purges/1,
fun update_docs_before_topoff1/1,
fun indices_are_built/1,
+ fun indices_can_be_built_with_errors/1,
fun split_partitioned_db/1,
fun split_twice/1,
fun couch_events_are_emitted/1,
@@ -273,6 +276,47 @@ indices_are_built(#{db1 := Db}) ->
end
end)}.
+% This test that indices are built despite intermittent errors.
+indices_can_be_built_with_errors(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT,
+ ?_test(begin
+ add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 2}),
+ [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+ meck:expect(
+ couch_index_server,
+ get_index,
+ 2,
+ meck:seq([
+ meck:raise(error, foo_reason),
+ meck:raise(exit, killed),
+ meck:passthrough()
+ ])
+ ),
+ meck:expect(
+ couch_index,
+ get_state,
+ 2,
+ meck:seq([
+ meck:raise(error, bar_reason),
+ meck:raise(exit, killed),
+ meck:val({not_ok, other}),
+ meck:passthrough()
+ ])
+ ),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+ % Normally would expect 4 (2 shards x 2 mrviews), but there were 2
+ % failures in get_index/2 and 3 in get_state/3 for a total of 4 + 5 = 9
+ ?assertEqual(9, meck:num_calls(couch_index_server, get_index, 2)),
+ % Normally would be 4 calls (2 shards x 2 mrviews), but there were
+ % 3 extra failures in get_state/2 for a total of 4 + 3 = 7
+ ?assertEqual(7, meck:num_calls(couch_index, get_state, 2)),
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>),
+ ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo)
+ end)}.
+
mock_dreyfus_indices() ->
meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) ->
#doc{body = {BodyProps}} = Doc,
@@ -284,7 +328,7 @@ mock_dreyfus_indices() ->
end
end),
meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end),
- meck:expect(dreyfus_index, await, fun(_, _) -> ok end).
+ meck:expect(dreyfus_index, await, fun(_, _) -> {ok, indexpid, someseq} end).
mock_hastings_indices() ->
meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) ->
@@ -297,7 +341,7 @@ mock_hastings_indices() ->
end
end),
meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end),
- meck:expect(hastings_index, await, fun(_, _) -> ok end).
+ meck:expect(hastings_index, await, fun(_, _) -> {ok, someseq} end).
% Split partitioned database
split_partitioned_db(#{db2 := Db}) ->
@@ -504,7 +548,7 @@ retries_work(#{db1 := Db}) ->
{ok, JobId} = mem3_reshard:start_split_job(Shard),
wait_state(JobId, failed),
- ?assertEqual(3, meck:num_calls(couch_db_split, split, 3))
+ ?assertEqual(7, meck:num_calls(couch_db_split, split, 3))
end)}.
target_reset_in_initial_copy(#{db1 := Db}) ->