diff options
Diffstat (limited to 'src/dreyfus/src/dreyfus_fabric.erl')
-rw-r--r-- | src/dreyfus/src/dreyfus_fabric.erl | 133 |
1 files changed, 115 insertions, 18 deletions
diff --git a/src/dreyfus/src/dreyfus_fabric.erl b/src/dreyfus/src/dreyfus_fabric.erl index a953b6a38..0b25a6cc6 100644 --- a/src/dreyfus/src/dreyfus_fabric.erl +++ b/src/dreyfus/src/dreyfus_fabric.erl @@ -14,7 +14,7 @@ %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- -module(dreyfus_fabric). --export([get_json_docs/2, handle_error_message/6]). +-export([get_json_docs/2, handle_error_message/7]). -include_lib("couch/include/couch_db.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -36,40 +36,42 @@ callback(timeout, _Acc) -> {error, timeout}. handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of {ok, NewCounters} -> {ok, NewCounters}; error -> {error, {nodedown, <<"progress not possible">>}} end; handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker, - Counters, Replacements, StartFun, StartArgs) -> - handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs); + Counters, Replacements, StartFun, StartArgs, RingOpts) -> + handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs, + RingOpts); handle_error_message({rexi_EXIT, Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error(Reason, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error(Reason, Worker, Counters, RingOpts); handle_error_message({error, Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error(Reason, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error(Reason, Worker, Counters, RingOpts); handle_error_message({'EXIT', Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs) -> - handle_error({exit, Reason}, Worker, Counters); + Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> + handle_error({exit, Reason}, Worker, Counters, RingOpts); handle_error_message(Reason, Worker, Counters, - _Replacements, _StartFun, _StartArgs) -> + _Replacements, _StartFun, _StartArgs, RingOpts) -> couch_log:error("Unexpected error during request: ~p", [Reason]), - handle_error(Reason, Worker, Counters). + handle_error(Reason, Worker, Counters, RingOpts). -handle_error(Reason, Worker, Counters0) -> +handle_error(Reason, Worker, Counters0, RingOpts) -> Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of + case fabric_ring:is_progress_possible(Counters, RingOpts) of true -> {ok, Counters}; false -> {error, Reason} end. -handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) -> +handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs, + RingOpts) -> OldCounters = lists:filter(fun({#shard{ref=R}, _}) -> R /= Worker#shard.ref end, OldCntrs0), @@ -79,12 +81,12 @@ handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) -> NewCounter = start_replacement(StartFun, StartArgs, Repl), fabric_dict:store(NewCounter, nil, CounterAcc) end, OldCounters, Replacements), - true = fabric_view:is_progress_possible(NewCounters), + true = fabric_ring:is_progress_possible(NewCounters, RingOpts), NewRefs = fabric_dict:fetch_keys(NewCounters), {new_refs, NewRefs, NewCounters, NewReplacements}; false -> handle_error({nodedown, <<"progress not possible">>}, - Worker, OldCounters) + Worker, OldCounters, RingOpts) end. start_replacement(StartFun, StartArgs, Shard) -> @@ -106,3 +108,98 @@ start_replacement(StartFun, StartArgs, Shard) -> {dreyfus_rpc, StartFun, [Shard#shard.name|StartArgs1]}), Shard#shard{ref = Ref}. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +node_down_test() -> + [S1, S2, S3] = [ + mk_shard("n1", [0, 4]), + mk_shard("n1", [5, ?RING_END]), + mk_shard("n2", [0, ?RING_END]) + ], + [W1, W2, W3] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()}, + S3#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2, W3], nil), + + N1 = S1#shard.node, + Msg1 = {rexi_DOWN, nil, {nil, N1}, nil}, + Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []), + ?assertEqual({ok, [{W3, nil}]}, Res1), + + {ok, Counters2} = Res1, + N2 = S3#shard.node, + Msg2 = {rexi_DOWN, nil, {nil, N2}, nil}, + Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []), + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2). + + +worker_error_test() -> + [S1, S2] = [ + mk_shard("n1", [0, ?RING_END]), + mk_shard("n2", [0, ?RING_END]) + ], + [W1, W2] = [S1#shard{ref = make_ref()}, S2#shard{ref = make_ref()}], + Counters1 = fabric_dict:init([W1, W2], nil), + + Res1 = handle_error(bam, W1, Counters1, []), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, [])). + + +node_down_with_partitions_test() -> + [S1, S2] = [ + mk_shard("n1", [0, 4]), + mk_shard("n2", [0, 8]) + ], + [W1, W2] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2], nil), + RingOpts = [{any, [S1, S2]}], + + N1 = S1#shard.node, + Msg1 = {rexi_DOWN, nil, {nil, N1}, nil}, + Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + N2 = S2#shard.node, + Msg2 = {rexi_DOWN, nil, {nil, N2}, nil}, + Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts), + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2). + + +worker_error_with_partitions_test() -> + [S1, S2] = [ + mk_shard("n1", [0, 4]), + mk_shard("n2", [0, 8])], + [W1, W2] = [ + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()} + ], + Counters1 = fabric_dict:init([W1, W2], nil), + RingOpts = [{any, [S1, S2]}], + + Res1 = handle_error(bam, W1, Counters1, RingOpts), + ?assertEqual({ok, [{W2, nil}]}, Res1), + + {ok, Counters2} = Res1, + ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, RingOpts)). + + +mk_shard(Name, Range) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = Range}. + +-endif. |