summaryrefslogtreecommitdiff
path: root/src/dreyfus/src/dreyfus_fabric.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/dreyfus/src/dreyfus_fabric.erl')
-rw-r--r--src/dreyfus/src/dreyfus_fabric.erl133
1 files changed, 115 insertions, 18 deletions
diff --git a/src/dreyfus/src/dreyfus_fabric.erl b/src/dreyfus/src/dreyfus_fabric.erl
index a953b6a38..0b25a6cc6 100644
--- a/src/dreyfus/src/dreyfus_fabric.erl
+++ b/src/dreyfus/src/dreyfus_fabric.erl
@@ -14,7 +14,7 @@
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-module(dreyfus_fabric).
--export([get_json_docs/2, handle_error_message/6]).
+-export([get_json_docs/2, handle_error_message/7]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -36,40 +36,42 @@ callback(timeout, _Acc) ->
{error, timeout}.
handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker,
- Counters, _Replacements, _StartFun, _StartArgs) ->
- case fabric_util:remove_down_workers(Counters, NodeRef) of
+ Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+ case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of
{ok, NewCounters} ->
{ok, NewCounters};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker,
- Counters, Replacements, StartFun, StartArgs) ->
- handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs);
+ Counters, Replacements, StartFun, StartArgs, RingOpts) ->
+ handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs,
+ RingOpts);
handle_error_message({rexi_EXIT, Reason}, Worker,
- Counters, _Replacements, _StartFun, _StartArgs) ->
- handle_error(Reason, Worker, Counters);
+ Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+ handle_error(Reason, Worker, Counters, RingOpts);
handle_error_message({error, Reason}, Worker,
- Counters, _Replacements, _StartFun, _StartArgs) ->
- handle_error(Reason, Worker, Counters);
+ Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+ handle_error(Reason, Worker, Counters, RingOpts);
handle_error_message({'EXIT', Reason}, Worker,
- Counters, _Replacements, _StartFun, _StartArgs) ->
- handle_error({exit, Reason}, Worker, Counters);
+ Counters, _Replacements, _StartFun, _StartArgs, RingOpts) ->
+ handle_error({exit, Reason}, Worker, Counters, RingOpts);
handle_error_message(Reason, Worker, Counters,
- _Replacements, _StartFun, _StartArgs) ->
+ _Replacements, _StartFun, _StartArgs, RingOpts) ->
couch_log:error("Unexpected error during request: ~p", [Reason]),
- handle_error(Reason, Worker, Counters).
+ handle_error(Reason, Worker, Counters, RingOpts).
-handle_error(Reason, Worker, Counters0) ->
+handle_error(Reason, Worker, Counters0, RingOpts) ->
Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
+ case fabric_ring:is_progress_possible(Counters, RingOpts) of
true ->
{ok, Counters};
false ->
{error, Reason}
end.
-handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
+handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs,
+ RingOpts) ->
OldCounters = lists:filter(fun({#shard{ref=R}, _}) ->
R /= Worker#shard.ref
end, OldCntrs0),
@@ -79,12 +81,12 @@ handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
NewCounter = start_replacement(StartFun, StartArgs, Repl),
fabric_dict:store(NewCounter, nil, CounterAcc)
end, OldCounters, Replacements),
- true = fabric_view:is_progress_possible(NewCounters),
+ true = fabric_ring:is_progress_possible(NewCounters, RingOpts),
NewRefs = fabric_dict:fetch_keys(NewCounters),
{new_refs, NewRefs, NewCounters, NewReplacements};
false ->
handle_error({nodedown, <<"progress not possible">>},
- Worker, OldCounters)
+ Worker, OldCounters, RingOpts)
end.
start_replacement(StartFun, StartArgs, Shard) ->
@@ -106,3 +108,98 @@ start_replacement(StartFun, StartArgs, Shard) ->
{dreyfus_rpc, StartFun,
[Shard#shard.name|StartArgs1]}),
Shard#shard{ref = Ref}.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+node_down_test() ->
+ [S1, S2, S3] = [
+ mk_shard("n1", [0, 4]),
+ mk_shard("n1", [5, ?RING_END]),
+ mk_shard("n2", [0, ?RING_END])
+ ],
+ [W1, W2, W3] = [
+ S1#shard{ref = make_ref()},
+ S2#shard{ref = make_ref()},
+ S3#shard{ref = make_ref()}
+ ],
+ Counters1 = fabric_dict:init([W1, W2, W3], nil),
+
+ N1 = S1#shard.node,
+ Msg1 = {rexi_DOWN, nil, {nil, N1}, nil},
+ Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []),
+ ?assertEqual({ok, [{W3, nil}]}, Res1),
+
+ {ok, Counters2} = Res1,
+ N2 = S3#shard.node,
+ Msg2 = {rexi_DOWN, nil, {nil, N2}, nil},
+ Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []),
+ ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2).
+
+
+worker_error_test() ->
+ [S1, S2] = [
+ mk_shard("n1", [0, ?RING_END]),
+ mk_shard("n2", [0, ?RING_END])
+ ],
+ [W1, W2] = [S1#shard{ref = make_ref()}, S2#shard{ref = make_ref()}],
+ Counters1 = fabric_dict:init([W1, W2], nil),
+
+ Res1 = handle_error(bam, W1, Counters1, []),
+ ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+ {ok, Counters2} = Res1,
+ ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, [])).
+
+
+node_down_with_partitions_test() ->
+ [S1, S2] = [
+ mk_shard("n1", [0, 4]),
+ mk_shard("n2", [0, 8])
+ ],
+ [W1, W2] = [
+ S1#shard{ref = make_ref()},
+ S2#shard{ref = make_ref()}
+ ],
+ Counters1 = fabric_dict:init([W1, W2], nil),
+ RingOpts = [{any, [S1, S2]}],
+
+ N1 = S1#shard.node,
+ Msg1 = {rexi_DOWN, nil, {nil, N1}, nil},
+ Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts),
+ ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+ {ok, Counters2} = Res1,
+ N2 = S2#shard.node,
+ Msg2 = {rexi_DOWN, nil, {nil, N2}, nil},
+ Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts),
+ ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2).
+
+
+worker_error_with_partitions_test() ->
+ [S1, S2] = [
+ mk_shard("n1", [0, 4]),
+ mk_shard("n2", [0, 8])],
+ [W1, W2] = [
+ S1#shard{ref = make_ref()},
+ S2#shard{ref = make_ref()}
+ ],
+ Counters1 = fabric_dict:init([W1, W2], nil),
+ RingOpts = [{any, [S1, S2]}],
+
+ Res1 = handle_error(bam, W1, Counters1, RingOpts),
+ ?assertEqual({ok, [{W2, nil}]}, Res1),
+
+ {ok, Counters2} = Res1,
+ ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, RingOpts)).
+
+
+mk_shard(Name, Range) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name = BName, node = Node, range = Range}.
+
+-endif.