From 52c57eee791b36cc82b965af0f834ae91a5bf196 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Wed, 15 Jan 2020 12:55:19 -0500 Subject: Fix fabric worker failures for partition requests Previously any failed node or rexi worker error resulted in requests failing immediately even though there were available workers to keep handling the request. This was because the progress check function didn't account for the fact that partition requests only use a handful of shards which, by design, do not complete the full ring. Here we fix both partition info queries and dreyfus search functionality. We follow the pattern from fabric and pass through a set of "ring options" that let the progress function know it is dealing with partitions instead of a full ring. --- src/dreyfus/src/dreyfus_fabric.erl | 133 ++++++++++++++++++++++++---- src/dreyfus/src/dreyfus_fabric_group1.erl | 9 +- src/dreyfus/src/dreyfus_fabric_group2.erl | 9 +- src/dreyfus/src/dreyfus_fabric_info.erl | 6 +- src/dreyfus/src/dreyfus_fabric_search.erl | 18 ++-- src/dreyfus/src/dreyfus_util.erl | 22 ++++- src/fabric/src/fabric_db_partition_info.erl | 84 +++++++++++++++--- src/fabric/src/fabric_util.erl | 7 +- src/fabric/src/fabric_view.erl | 36 +------- 9 files changed, 239 insertions(+), 85 deletions(-) diff --git a/src/dreyfus/src/dreyfus_fabric.erl b/src/dreyfus/src/dreyfus_fabric.erl index a953b6a38..0b25a6cc6 100644 --- a/src/dreyfus/src/dreyfus_fabric.erl +++ b/src/dreyfus/src/dreyfus_fabric.erl @@ -14,7 +14,7 @@ %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- -module(dreyfus_fabric). --export([get_json_docs/2, handle_error_message/6]). +-export([get_json_docs/2, handle_error_message/7]). -include_lib("couch/include/couch_db.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -36,40 +36,42 @@ callback(timeout, _Acc) -> {error, timeout}. handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of {ok, NewCounters} -> {ok, NewCounters}; error -> {error, {nodedown, <<"progress not possible">>}} end; handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker, - Counters, Replacements, StartFun, StartArgs) -> - handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs); + Counters, Replacements, StartFun, StartArgs, RingOpts) -> + handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs, + RingOpts); handle_error_message({rexi_EXIT, Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error(Reason, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error(Reason, Worker, Counters, RingOpts); handle_error_message({error, Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error(Reason, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error(Reason, Worker, Counters, RingOpts); handle_error_message({'EXIT', Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error({exit, Reason}, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error({exit, Reason}, Worker, Counters, RingOpts); handle_error_message(Reason, Worker, Counters, - _Replacements, _StartFun, _StartArgs) -> + _Replacements, _StartFun, _StartArgs, RingOpts) -> couch_log:error("Unexpected error during request: ~p", [Reason]), - handle_error(Reason, Worker, Counters). + handle_error(Reason, Worker, Counters, RingOpts). -handle_error(Reason, Worker, Counters0) -> +handle_error(Reason, Worker, Counters0, RingOpts) -> Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of + case fabric_ring:is_progress_possible(Counters, RingOpts) of true -> {ok, Counters}; false -> {error, Reason} end. -handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) -> +handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs, + RingOpts) -> OldCounters = lists:filter(fun({#shard{ref=R}, _}) -> R /= Worker#shard.ref end, OldCntrs0), @@ -79,12 +81,12 @@ handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) -> NewCounter = start_replacement(StartFun, StartArgs, Repl), fabric_dict:store(NewCounter, nil, CounterAcc) end, OldCounters, Replacements), - true = fabric_view:is_progress_possible(NewCounters), + true = fabric_ring:is_progress_possible(NewCounters, RingOpts), NewRefs = fabric_dict:fetch_keys(NewCounters), {new_refs, NewRefs, NewCounters, NewReplacements}; false -> handle_error({nodedown, <<"progress not possible">>}, - Worker, OldCounters) + Worker, OldCounters, RingOpts) end. start_replacement(StartFun, StartArgs, Shard) -> @@ -106,3 +108,98 @@ start_replacement(StartFun, StartArgs, Shard) -> {dreyfus_rpc, StartFun, [Shard#shard.name|StartArgs1]}), Shard#shard{ref = Ref}. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +node_down_test() -> + [S1, S2, S3] = [ + mk_shard("n1", [0, 4]), + mk_shard("n1", [5, ?RING_END]), + mk_shard("n2", [0, ?RING_END]) + ], + [W1, W2, W3] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()}, + S3#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2, W3], nil), + + N1 = S1#shard.node, + Msg1 = {rexi_DOWN, nil, {nil, N1}, nil}, + Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []), + ?assertEqual({ok, [{W3, nil}]}, Res1), + + {ok, Counters2} = Res1, + N2 = S3#shard.node, + Msg2 = {rexi_DOWN, nil, {nil, N2}, nil}, + Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []), + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2). + + +worker_error_test() -> + [S1, S2] = [ + mk_shard("n1", [0, ?RING_END]), + mk_shard("n2", [0, ?RING_END]) + ], + [W1, W2] = [S1#shard{ref = make_ref()}, S2#shard{ref = make_ref()}], + Counters1 = fabric_dict:init([W1, W2], nil), + + Res1 = handle_error(bam, W1, Counters1, []), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, [])). + + +node_down_with_partitions_test() -> + [S1, S2] = [ + mk_shard("n1", [0, 4]), + mk_shard("n2", [0, 8]) + ], + [W1, W2] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2], nil), + RingOpts = [{any, [S1, S2]}], + + N1 = S1#shard.node, + Msg1 = {rexi_DOWN, nil, {nil, N1}, nil}, + Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + N2 = S2#shard.node, + Msg2 = {rexi_DOWN, nil, {nil, N2}, nil}, + Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts), + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2). + + +worker_error_with_partitions_test() -> + [S1, S2] = [ + mk_shard("n1", [0, 4]), + mk_shard("n2", [0, 8])], + [W1, W2] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2], nil), + RingOpts = [{any, [S1, S2]}], + + Res1 = handle_error(bam, W1, Counters1, RingOpts), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, RingOpts)). + + +mk_shard(Name, Range) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = Range}. + +-endif. diff --git a/src/dreyfus/src/dreyfus_fabric_group1.erl b/src/dreyfus/src/dreyfus_fabric_group1.erl index 2d530ca7e..bdae6f040 100644 --- a/src/dreyfus/src/dreyfus_fabric_group1.erl +++ b/src/dreyfus/src/dreyfus_fabric_group1.erl @@ -27,7 +27,8 @@ top_groups, counters, start_args, - replacements + replacements, + ring_opts }). go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> @@ -39,6 +40,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> DesignName = dreyfus_util:get_design_docid(DDoc), dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), Shards = dreyfus_util:get_shards(DbName, QueryArgs), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group1, [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), Replacements = fabric_view:get_shard_replacements(DbName, Workers), @@ -50,7 +52,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> top_groups = [], counters = Counters, start_args = [DDoc, IndexName, QueryArgs], - replacements = Replacements + replacements = Replacements, + ring_opts = RingOpts }, try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, @@ -89,7 +92,7 @@ handle_message(Error, Worker, State0) -> State = upgrade_state(State0), case dreyfus_fabric:handle_error_message(Error, Worker, State#state.counters, State#state.replacements, - group1, State#state.start_args) of + group1, State#state.start_args, State#state.ring_opts) of {ok, Counters} -> {ok, State#state{counters=Counters}}; {new_refs, NewRefs, NewCounters, NewReplacements} -> diff --git a/src/dreyfus/src/dreyfus_fabric_group2.erl b/src/dreyfus/src/dreyfus_fabric_group2.erl index 1239f8b74..8d864dd0c 100644 --- a/src/dreyfus/src/dreyfus_fabric_group2.erl +++ b/src/dreyfus/src/dreyfus_fabric_group2.erl @@ -29,7 +29,8 @@ top_groups, counters, start_args, - replacements + replacements, + ring_opts }). go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> @@ -41,6 +42,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> DesignName = dreyfus_util:get_design_docid(DDoc), dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), Shards = dreyfus_util:get_shards(DbName, QueryArgs), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group2, [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), Replacements = fabric_view:get_shard_replacements(DbName, Workers), @@ -54,7 +56,8 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> top_groups = [], counters = Counters, start_args = [DDoc, IndexName, QueryArgs], - replacements = Replacements + replacements = Replacements, + ring_opts = RingOpts }, try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, @@ -102,7 +105,7 @@ handle_message(Error, Worker, State0) -> State = upgrade_state(State0), case dreyfus_fabric:handle_error_message(Error, Worker, State#state.counters, State#state.replacements, - group2, State#state.start_args) of + group2, State#state.start_args, State#state.ring_opts) of {ok, Counters} -> {ok, State#state{counters=Counters}}; {new_refs, NewRefs, NewCounters, NewReplacements} -> diff --git a/src/dreyfus/src/dreyfus_fabric_info.erl b/src/dreyfus/src/dreyfus_fabric_info.erl index 27eec8065..e217bc0ef 100644 --- a/src/dreyfus/src/dreyfus_fabric_info.erl +++ b/src/dreyfus/src/dreyfus_fabric_info.erl @@ -49,7 +49,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Counters, Acc}) -> handle_message({rexi_EXIT, Reason}, Worker, {Counters, Acc}) -> NewCounters = fabric_dict:erase(Worker, Counters), - case fabric_view:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters) of true -> {ok, {NewCounters, Acc}}; false -> @@ -74,7 +74,7 @@ handle_message({ok, Info}, Worker, {Counters, Acc}) -> handle_message({error, Reason}, Worker, {Counters, Acc}) -> NewCounters = fabric_dict:erase(Worker, Counters), - case fabric_view:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters) of true -> {ok, {NewCounters, Acc}}; false -> @@ -82,7 +82,7 @@ handle_message({error, Reason}, Worker, {Counters, Acc}) -> end; handle_message({'EXIT', _}, Worker, {Counters, Acc}) -> NewCounters = fabric_dict:erase(Worker, Counters), - case fabric_view:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters) of true -> {ok, {NewCounters, Acc}}; false -> diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl index acf7a83ec..c0ebde1d6 100644 --- a/src/dreyfus/src/dreyfus_fabric_search.erl +++ b/src/dreyfus/src/dreyfus_fabric_search.erl @@ -27,7 +27,8 @@ top_docs, counters, start_args, - replacements + replacements, + ring_opts }). go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> @@ -40,10 +41,11 @@ go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) -> DesignName = dreyfus_util:get_design_docid(DDoc), dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), Shards = dreyfus_util:get_shards(DbName, QueryArgs), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search, [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), Counters = fabric_dict:init(Workers, nil), - go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters); + go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts); go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs) @@ -54,6 +56,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> Shards = dreyfus_util:get_shards(DbName, QueryArgs), LiveNodes = [node() | nodes()], LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)], + RingOpts = dreyful_util:get_ring_opts(QueryArgs, LiveShards), Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards), Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) -> QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{ @@ -73,14 +76,16 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> end end, Bookmark1), Counters = fabric_dict:init(Counters0, nil), + WorkerShards = fabric_dict:fetch_keys(Counters), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards), QueryArgs2 = QueryArgs#index_query_args{ bookmark = Bookmark1 }, - go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1); + go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts); go(DbName, DDoc, IndexName, OldArgs) -> go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)). -go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) -> +go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) -> {Workers, _} = lists:unzip(Counters), #index_query_args{ limit = Limit, @@ -94,7 +99,8 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) -> top_docs = #top_docs{total_hits=0,hits=[]}, counters = Counters, start_args = [DDoc, IndexName, QueryArgs], - replacements = Replacements + replacements = Replacements, + ring_opts = RingOpts }, RexiMon = fabric_util:create_monitors(Workers), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, @@ -154,7 +160,7 @@ handle_message(Error, Worker, State0) -> State = upgrade_state(State0), case dreyfus_fabric:handle_error_message(Error, Worker, State#state.counters, State#state.replacements, - search, State#state.start_args) of + search, State#state.start_args, State#state.ring_opts) of {ok, Counters} -> {ok, State#state{counters=Counters}}; {new_refs, NewRefs, NewCounters, NewReplacements} -> diff --git a/src/dreyfus/src/dreyfus_util.erl b/src/dreyfus/src/dreyfus_util.erl index 0a83e87bd..05ecdb621 100644 --- a/src/dreyfus/src/dreyfus_util.erl +++ b/src/dreyfus/src/dreyfus_util.erl @@ -19,7 +19,7 @@ -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). --export([get_shards/2, sort/2, upgrade/1, export/1, time/2]). +-export([get_shards/2, get_ring_opts/2, sort/2, upgrade/1, export/1, time/2]). -export([in_black_list/1, in_black_list/3, maybe_deny_index/3]). -export([get_design_docid/1]). -export([ @@ -59,6 +59,15 @@ use_ushards(#index_query_args{stable=true}) -> use_ushards(#index_query_args{}) -> false. + +get_ring_opts(#index_query_args{partition = nil}, _Shards) -> + []; +get_ring_opts(#index_query_args{}, Shards) -> + Shards1 = lists:map(fun(#shard{} = S) -> + S#shard{ref = undefined} + end, Shards), + [{any, Shards1}]. + -spec sort(Order :: relevance | [any()], [#sortable{}]) -> [#sortable{}]. sort(Sort, List0) -> {List1, Stash} = stash_items(List0), @@ -418,4 +427,15 @@ stash_test() -> Unstashed = hd(unstash_items(Stashed, Stash)), ?assertEqual(Unstashed#sortable.item, bar). + +ring_opts_test() -> + Shards = [#shard{name = foo, ref = make_ref()}], + + QArgs1 = #index_query_args{partition = nil}, + ?assertEqual([], get_ring_opts(QArgs1, Shards)), + + QArgs2 = #index_query_args{partition = <<"x">>}, + ?assertMatch([{any, [#shard{name = foo, ref = undefined}]}], + get_ring_opts(QArgs2, Shards)). + -endif. diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl index 2978832f0..954c52db2 100644 --- a/src/fabric/src/fabric_db_partition_info.erl +++ b/src/fabric/src/fabric_db_partition_info.erl @@ -17,15 +17,27 @@ -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). + +-record(acc, { + counters, + replies, + ring_opts +}). + + go(DbName, Partition) -> - Shards = mem3:shards(DbName, <>), + Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)), Workers = fabric_util:submit_jobs(Shards, get_partition_info, [Partition]), RexiMon = fabric_util:create_monitors(Shards), Fun = fun handle_message/3, - Acc0 = {fabric_dict:init(Workers, nil), []}, + Acc0 = #acc{ + counters = fabric_dict:init(Workers, nil), + replies = [], + ring_opts = [{any, Shards}] + }, try case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of - {ok, Acc} -> {ok, Acc}; + {ok, Res} -> {ok, Res}; {timeout, {WorkersDict, _}} -> DefunctWorkers = fabric_util:remove_done_workers( WorkersDict, @@ -42,36 +54,39 @@ go(DbName, Partition) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, #acc{} = Acc) -> + #acc{counters = Counters, ring_opts = RingOpts} = Acc, + case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; + {ok, Acc#acc{counters = NewCounters}}; error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> +handle_message({rexi_EXIT, Reason}, Shard, #acc{} = Acc) -> + #acc{counters = Counters, ring_opts = RingOpts} = Acc, NewCounters = fabric_dict:erase(Shard, Counters), - case fabric_ring:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters, RingOpts) of true -> - {ok, {NewCounters, Acc}}; + {ok, Acc#acc{counters = NewCounters}}; false -> {error, Reason} end; -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> - Acc2 = [Info | Acc], +handle_message({ok, Info}, #shard{dbname=Name} = Shard, #acc{} = Acc) -> + #acc{counters = Counters, replies = Replies} = Acc, + Replies1 = [Info | Replies], Counters1 = fabric_dict:erase(Shard, Counters), case fabric_dict:size(Counters1) =:= 0 of true -> - [FirstInfo | RestInfos] = Acc2, + [FirstInfo | RestInfos] = Replies1, PartitionInfo = get_max_partition_size(FirstInfo, RestInfos), {stop, [{db_name, Name} | format_partition(PartitionInfo)]}; false -> - {ok, {Counters1, Acc2}} + {ok, Acc#acc{counters = Counters1, replies = Replies1}} end; -handle_message(_, _, Acc) -> +handle_message(_, _, #acc{} = Acc) -> {ok, Acc}. @@ -97,3 +112,44 @@ format_partition(PartitionInfo) -> {value, {sizes, Size}, PartitionInfo1} = lists:keytake(sizes, 1, PartitionInfo), [{sizes, {Size}} | PartitionInfo1]. + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +node_down_test() -> + [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])], + Acc1 = #acc{ + counters = fabric_dict:init([S1, S2], nil), + ring_opts = [{any, [S1, S2]}] + }, + + N1 = S1#shard.node, + {ok, Acc2} = handle_message({rexi_DOWN, nil, {nil, N1}, nil}, nil, Acc1), + ?assertEqual([{S2, nil}], Acc2#acc.counters), + + N2 = S2#shard.node, + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, + handle_message({rexi_DOWN, nil, {nil, N2}, nil}, nil, Acc2)). + + +worker_exit_test() -> + [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])], + Acc1 = #acc{ + counters = fabric_dict:init([S1, S2], nil), + ring_opts = [{any, [S1, S2]}] + }, + + {ok, Acc2} = handle_message({rexi_EXIT, boom}, S1, Acc1), + ?assertEqual([{S2, nil}], Acc2#acc.counters), + + ?assertEqual({error, bam}, handle_message({rexi_EXIT, bam}, S2, Acc2)). + + +mk_shard(Name, Range) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = Range}. + +-endif. diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index aaf0623f0..8aa14e73a 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -14,7 +14,7 @@ -export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1, update_counter/3, remove_ancestors/2, create_monitors/1, kv/2, - remove_down_workers/2, doc_id_and_rev/1]). + remove_down_workers/2, remove_down_workers/3, doc_id_and_rev/1]). -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0, view_timeout/1]). -export([log_timeout/2, remove_done_workers/2]). -export([is_users_db/1, is_replicator_db/1]). @@ -33,9 +33,12 @@ -include_lib("eunit/include/eunit.hrl"). remove_down_workers(Workers, BadNode) -> + remove_down_workers(Workers, BadNode, []). + +remove_down_workers(Workers, BadNode, RingOpts) -> Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end, NewWorkers = fabric_dict:filter(Filter, Workers), - case fabric_ring:is_progress_possible(NewWorkers) of + case fabric_ring:is_progress_possible(NewWorkers, RingOpts) of true -> {ok, NewWorkers}; false -> diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 55b44e6f7..425f864c4 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -12,7 +12,7 @@ -module(fabric_view). --export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, +-export([remove_overlapping_shards/2, maybe_send_row/1, transform_row/1, keydict/1, extract_view/4, get_shards/2, check_down_shards/2, handle_worker_exit/3, get_shard_replacements/2, maybe_update_others/5]). @@ -46,10 +46,6 @@ handle_worker_exit(Collector, _Worker, Reason) -> {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), {error, Resp}. -%% @doc looks for a fully covered keyrange in the list of counters --spec is_progress_possible([{#shard{}, term()}]) -> boolean(). -is_progress_possible(Counters) -> - fabric_ring:is_progress_possible(Counters). -spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> [{#shard{}, any()}]. @@ -416,28 +412,6 @@ fix_skip_and_limit(#mrargs{} = Args) -> remove_finalizer(Args) -> couch_mrview_util:set_extra(Args, finalizer, null). -% unit test -is_progress_possible_test() -> - EndPoint = 2 bsl 31, - T1 = [[0, EndPoint-1]], - ?assertEqual(is_progress_possible(mk_cnts(T1)),true), - T2 = [[0,10],[11,20],[21,EndPoint-1]], - ?assertEqual(is_progress_possible(mk_cnts(T2)),true), - % gap - T3 = [[0,10],[12,EndPoint-1]], - ?assertEqual(is_progress_possible(mk_cnts(T3)),false), - % outside range - T4 = [[1,10],[11,20],[21,EndPoint-1]], - ?assertEqual(is_progress_possible(mk_cnts(T4)),false), - % outside range - T5 = [[0,10],[11,20],[21,EndPoint]], - ?assertEqual(is_progress_possible(mk_cnts(T5)),false), - T6 = [[0, 10], [11, 20], [0, 5], [6, 21], [21, EndPoint - 1]], - ?assertEqual(is_progress_possible(mk_cnts(T6)), true), - % not possible, overlap is not exact - T7 = [[0, 10], [13, 20], [21, EndPoint - 1], [9, 12]], - ?assertEqual(is_progress_possible(mk_cnts(T7)), false). - remove_overlapping_shards_test() -> Cb = undefined, @@ -482,10 +456,6 @@ get_shard_replacements_test() -> ?assertEqual(Expect, Res). -mk_cnts(Ranges) -> - Shards = lists:map(fun mk_shard/1, Ranges), - orddict:from_list([{Shard,nil} || Shard <- Shards]). - mk_cnts(Ranges, NoNodes) -> orddict:from_list([{Shard,nil} || Shard <- @@ -502,10 +472,6 @@ mk_shards(NoNodes,Range,Shards) -> mk_shards(NoNodes-1,Range, [mk_shard(Name, Range) | Shards]). -mk_shard([B, E]) when is_integer(B), is_integer(E) -> - #shard{range = [B, E]}. - - mk_shard(Name, Range) -> Node = list_to_atom(Name), BName = list_to_binary(Name), -- cgit v1.2.1