diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-04-22 19:53:13 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-04-28 17:19:44 -0400 |
commit | 0e38f390796d6e34f698853f5886a36607e2aeec (patch) | |
tree | c910ca79a6e1ab1d8e8c08ac8b2b02320d2aafb8 | |
parent | 0bc311b7d7ec9836679b929ee5c2a630ad2ff01f (diff) | |
download | couchdb-0e38f390796d6e34f698853f5886a36607e2aeec.tar.gz |
Improve index building during shard splitting
Previously we didn't check responses from get_state/2 or await/2 functions when
building indices. If an index updater crashed, and the index never finished
building, the get_state/2 call would simply return an error and the process
would exit normally. Then, the shard splitting job would count that as a
success and continue to make progress.
To fix that, make sure to check the response to all the supported indexing
types and wait until they return an `ok` result.
Additionally, increase the index building resilience to allow for more retries
on failure, and for configurable retries for individual index builders.
-rw-r--r-- | rel/overlay/etc/default.ini | 8 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_index.erl | 131 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_job.erl | 22 | ||||
-rw-r--r-- | src/mem3/test/eunit/mem3_reshard_test.erl | 54 |
4 files changed, 142 insertions, 73 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 7535501c0..5fb45b5b5 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -719,7 +719,7 @@ state_dir = {{state_dir}} [reshard] ;max_jobs = 48 ;max_history = 20 -;max_retries = 1 +;max_retries = 5 ;retry_interval_sec = 10 ;delete_source = true ;update_shard_map_timeout_sec = 60 @@ -727,6 +727,12 @@ state_dir = {{state_dir}} ;require_node_param = false ;require_range_param = false +; How many times to retry building an individual index +;index_max_retries = 5 + +; How many seconds to wait between retries for an individual index +;index_retry_interval_sec = 10 + [prometheus] additional_port = false bind_address = 127.0.0.1 diff --git a/src/mem3/src/mem3_reshard_index.erl b/src/mem3/src/mem3_reshard_index.erl index fef25d52c..fa0a101b5 100644 --- a/src/mem3/src/mem3_reshard_index.erl +++ b/src/mem3/src/mem3_reshard_index.erl @@ -15,11 +15,15 @@ -export([ design_docs/1, target_indices/2, - spawn_builders/1 + spawn_builders/1, + build_index/2 ]). +-define(MRVIEW, mrview). +-define(DREYFUS, dreyfus). +-define(HASTINGS, hastings). + -include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). %% Public API @@ -44,25 +48,8 @@ target_indices(Docs, Targets) -> lists:flatten(Indices). spawn_builders(Indices) -> - Results = [build_index(Index) || Index <- Indices], - Oks = [{ok, Pid} || {ok, Pid} <- Results, is_pid(Pid)], - case Results -- Oks of - [] -> - {ok, [Pid || {ok, Pid} <- Results]}; - Error -> - % Do a all or nothing pattern, if some indices could not be - % spawned, kill the spawned ones and and return the error. - ErrMsg = "~p failed to spawn index builders: ~p ~p", - couch_log:error(ErrMsg, [?MODULE, Error, Indices]), - lists:foreach( - fun({ok, Pid}) -> - catch unlink(Pid), - catch exit(Pid, kill) - end, - Oks - ), - {error, Error} - end. + Retries = max_retries(), + [spawn_link(?MODULE, build_index, [Idx, Retries]) || Idx <- Indices]. %% Private API @@ -83,7 +70,7 @@ mrview_indices(DbName, Doc) -> Views = couch_mrview_index:get(views, MRSt), case Views =/= [] of true -> - [{mrview, DbName, MRSt}]; + [{?MRVIEW, DbName, MRSt}]; false -> [] end @@ -97,7 +84,7 @@ mrview_indices(DbName, Doc) -> dreyfus_indices(DbName, Doc) -> try Indices = dreyfus_index:design_doc_to_indexes(Doc), - [{dreyfus, DbName, Index} || Index <- Indices] + [{?DREYFUS, DbName, Index} || Index <- Indices] catch Tag:Err -> Msg = "~p couldn't get dreyfus indices ~p ~p ~p:~p", @@ -108,7 +95,7 @@ dreyfus_indices(DbName, Doc) -> hastings_indices(DbName, Doc) -> try Indices = hastings_index:design_doc_to_indexes(Doc), - [{hastings, DbName, Index} || Index <- Indices] + [{?HASTINGS, DbName, Index} || Index <- Indices] catch Tag:Err -> Msg = "~p couldn't get hasting indices ~p ~p ~p:~p", @@ -116,33 +103,71 @@ hastings_indices(DbName, Doc) -> [] end. -build_index({mrview, DbName, MRSt}) -> - case couch_index_server:get_index(couch_mrview_index, MRSt) of - {ok, Pid} -> - Args = [Pid, get_update_seq(DbName)], - WPid = spawn_link(couch_index, get_state, Args), - {ok, WPid}; - Error -> - Error - end; -build_index({dreyfus, DbName, Index}) -> - case dreyfus_index_manager:get_index(DbName, Index) of - {ok, Pid} -> - Args = [Pid, get_update_seq(DbName)], - WPid = spawn_link(dreyfus_index, await, Args), - {ok, WPid}; - Error -> - Error +build_index({?MRVIEW, _DbName, MRSt} = Ctx, Try) -> + await_retry( + couch_index_server:get_index(couch_mrview_index, MRSt), + fun couch_index:get_state/2, + Ctx, + Try + ); +build_index({?DREYFUS, DbName, DIndex} = Ctx, Try) -> + await_retry( + dreyfus_index_manager:get_index(DbName, DIndex), + fun dreyfus_index:await/2, + Ctx, + Try + ); +build_index({?HASTINGS, DbName, HIndex} = Ctx, Try) -> + await_retry( + hastings_index_manager:get_index(DbName, HIndex), + fun hastings_index:await/2, + Ctx, + Try + ). + +await_retry({ok, Pid}, AwaitIndex, {_, DbName, _} = Ctx, Try) -> + try AwaitIndex(Pid, get_update_seq(DbName)) of + {ok, _} -> ok; + {ok, _, _} -> ok; + AwaitError -> maybe_retry(Ctx, AwaitError, Try) + catch + _:CatchError -> + maybe_retry(Ctx, CatchError, Try) end; -build_index({hastings, DbName, Index}) -> - case hastings_index_manager:get_index(DbName, Index) of - {ok, Pid} -> - Args = [Pid, get_update_seq(DbName)], - WPid = spawn_link(hastings_index, await, Args), - {ok, WPid}; - Error -> - Error - end. +await_retry(OpenError, _AwaitIndex, Ctx, Try) -> + maybe_retry(Ctx, OpenError, Try). + +maybe_retry(Ctx, killed = Error, Try) -> + retry(Ctx, Error, Try); +maybe_retry(Ctx, {killed, _} = Error, Try) -> + retry(Ctx, Error, Try); +maybe_retry(Ctx, shutdown = Error, Try) -> + retry(Ctx, Error, Try); +maybe_retry(Ctx, Error, 0) -> + fail(Ctx, Error); +maybe_retry(Ctx, Error, Try) when is_integer(Try), Try > 0 -> + retry(Ctx, Error, Try - 1). + +retry(Ctx, Error, Try) -> + IndexInfo = index_info(Ctx), + LogMsg = "~p : error ~p when building ~p, retrying (~p)", + couch_log:warning(LogMsg, [?MODULE, Error, IndexInfo, Try]), + timer:sleep(retry_interval_sec() * 1000), + build_index(Ctx, Try). + +fail(Ctx, Error) -> + IndexInfo = index_info(Ctx), + LogMsg = "~p : error ~p when building ~p, max tries exceeded, failing", + couch_log:error(LogMsg, [?MODULE, Error, IndexInfo]), + exit({error_building_index, IndexInfo}). + +index_info({?MRVIEW, DbName, MRSt}) -> + GroupName = couch_mrview_index:get(idx_name, MRSt), + {DbName, GroupName}; +index_info({?DREYFUS, DbName, Index}) -> + {DbName, Index}; +index_info({?HASTINGS, DbName, Index}) -> + {DbName, Index}. has_app(App) -> code:lib_dir(App) /= {error, bad_name}. @@ -151,3 +176,9 @@ get_update_seq(DbName) -> couch_util:with_db(DbName, fun(Db) -> couch_db:get_update_seq(Db) end). + +max_retries() -> + config:get_integer("reshard", "index_max_retries", 5). + +retry_interval_sec() -> + config:get_integer("reshard", "index_retry_interval_sec", 10). diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl index aff5c2648..a9fb48134 100644 --- a/src/mem3/src/mem3_reshard_job.erl +++ b/src/mem3/src/mem3_reshard_job.erl @@ -408,29 +408,17 @@ topoff_impl(#job{source = #shard{} = Source, target = Targets}) -> build_indices(#job{} = Job) -> #job{ source = #shard{name = SourceName} = Source, - target = Targets, - retries = Retries, - state_info = Info + target = Targets } = Job, check_source_exists(Source, build_indices), {ok, DDocs} = mem3_reshard_index:design_docs(SourceName), Indices = mem3_reshard_index:target_indices(DDocs, Targets), case mem3_reshard_index:spawn_builders(Indices) of - {ok, []} -> + [] -> % Skip the log spam if this is a no-op Job#job{workers = []}; - {ok, Pids} -> - report(Job#job{workers = Pids}); - {error, Error} -> - case Job#job.retries =< max_retries() of - true -> - build_indices(Job#job{ - retries = Retries + 1, - state_info = info_update(error, Error, Info) - }); - false -> - exit(Error) - end + [_ | _] = Pids -> + report(Job#job{workers = Pids}) end. copy_local_docs(#job{split_state = copy_local_docs} = Job) -> @@ -612,7 +600,7 @@ check_targets_exist(Targets, StateName) -> -spec max_retries() -> integer(). max_retries() -> - config:get_integer("reshard", "max_retries", 1). + config:get_integer("reshard", "max_retries", 5). -spec retry_interval_sec() -> integer(). retry_interval_sec() -> diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl index 1929242bb..be539b47a 100644 --- a/src/mem3/test/eunit/mem3_reshard_test.erl +++ b/src/mem3/test/eunit/mem3_reshard_test.erl @@ -37,13 +37,15 @@ setup() -> create_db(Db1, [{q, 1}, {n, 1}]), PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}], create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]), - config:set("reshard", "retry_interval_sec", "0", _Persist = false), + config:set("reshard", "retry_interval_sec", "0", _Persist1 = false), + config:set("reshard", "index_retry_interval_sec", "0", _Persist2 = false), #{db1 => Db1, db2 => Db2}. teardown(#{} = Dbs) -> mem3_reshard:reset_state(), maps:map(fun(_, Db) -> delete_db(Db) end, Dbs), - config:delete("reshard", "retry_interval_sec", _Persist = false), + config:delete("reshard", "index_retry_interval_sec", _Persist1 = false), + config:delete("reshard", "retry_interval_sec", _Persist2 = false), meck:unload(). start_couch() -> @@ -68,6 +70,7 @@ mem3_reshard_db_test_() -> fun split_shard_with_lots_of_purges/1, fun update_docs_before_topoff1/1, fun indices_are_built/1, + fun indices_can_be_built_with_errors/1, fun split_partitioned_db/1, fun split_twice/1, fun couch_events_are_emitted/1, @@ -273,6 +276,47 @@ indices_are_built(#{db1 := Db}) -> end end)}. +% This test that indices are built despite intermittent errors. +indices_can_be_built_with_errors(#{db1 := Db}) -> + {timeout, ?TIMEOUT, + ?_test(begin + add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 2}), + [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)), + meck:expect( + couch_index_server, + get_index, + 2, + meck:seq([ + meck:raise(error, foo_reason), + meck:raise(exit, killed), + meck:passthrough() + ]) + ), + meck:expect( + couch_index, + get_state, + 2, + meck:seq([ + meck:raise(error, bar_reason), + meck:raise(exit, killed), + meck:val({not_ok, other}), + meck:passthrough() + ]) + ), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + wait_state(JobId, completed), + % Normally would expect 4 (2 shards x 2 mrviews), but there were 2 + % failures in get_index/2 and 3 in get_state/3 for a total of 4 + 5 = 9 + ?assertEqual(9, meck:num_calls(couch_index_server, get_index, 2)), + % Normally would be 4 calls (2 shards x 2 mrviews), but there were + % 3 extra failures in get_state/2 for a total of 4 + 3 = 7 + ?assertEqual(7, meck:num_calls(couch_index, get_state, 2)), + Shards1 = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(Shards1)), + MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>), + ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo) + end)}. + mock_dreyfus_indices() -> meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) -> #doc{body = {BodyProps}} = Doc, @@ -284,7 +328,7 @@ mock_dreyfus_indices() -> end end), meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end), - meck:expect(dreyfus_index, await, fun(_, _) -> ok end). + meck:expect(dreyfus_index, await, fun(_, _) -> {ok, indexpid, someseq} end). mock_hastings_indices() -> meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) -> @@ -297,7 +341,7 @@ mock_hastings_indices() -> end end), meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end), - meck:expect(hastings_index, await, fun(_, _) -> ok end). + meck:expect(hastings_index, await, fun(_, _) -> {ok, someseq} end). % Split partitioned database split_partitioned_db(#{db2 := Db}) -> @@ -504,7 +548,7 @@ retries_work(#{db1 := Db}) -> {ok, JobId} = mem3_reshard:start_split_job(Shard), wait_state(JobId, failed), - ?assertEqual(3, meck:num_calls(couch_db_split, split, 3)) + ?assertEqual(7, meck:num_calls(couch_db_split, split, 3)) end)}. target_reset_in_initial_copy(#{db1 := Db}) -> |