diff options
Diffstat (limited to 'src/dreyfus/src/dreyfus_fabric.erl')
-rw-r--r-- | src/dreyfus/src/dreyfus_fabric.erl | 195 |
1 files changed, 130 insertions, 65 deletions
diff --git a/src/dreyfus/src/dreyfus_fabric.erl b/src/dreyfus/src/dreyfus_fabric.erl index 0b25a6cc6..5689c1d4e 100644 --- a/src/dreyfus/src/dreyfus_fabric.erl +++ b/src/dreyfus/src/dreyfus_fabric.erl @@ -10,7 +10,6 @@ % License for the specific language governing permissions and limitations under % the License. - %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- -module(dreyfus_fabric). @@ -23,98 +22,167 @@ get_json_docs(DbName, DocIds) -> fabric:all_docs(DbName, fun callback/2, [], [{keys, DocIds}, {include_docs, true}]). -callback({meta,_}, Acc) -> +callback({meta, _}, Acc) -> {ok, Acc}; callback({error, Reason}, _Acc) -> {error, Reason}; callback({row, Row}, Acc) -> {id, Id} = lists:keyfind(id, 1, Row), - {ok, [{Id, lists:keyfind(doc, 1, Row)}|Acc]}; + {ok, [{Id, lists:keyfind(doc, 1, Row)} | Acc]}; callback(complete, Acc) -> {ok, lists:reverse(Acc)}; callback(timeout, _Acc) -> {error, timeout}. -handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, - Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> +handle_error_message( + {rexi_DOWN, _, {_, NodeRef}, _}, + _Worker, + Counters, + _Replacements, + _StartFun, + _StartArgs, + RingOpts +) -> case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of - {ok, NewCounters} -> - {ok, NewCounters}; - error -> - {error, {nodedown, <<"progress not possible">>}} + {ok, NewCounters} -> + {ok, NewCounters}; + error -> + {error, {nodedown, <<"progress not possible">>}} end; -handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker, - Counters, Replacements, StartFun, StartArgs, RingOpts) -> - handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs, - RingOpts); -handle_error_message({rexi_EXIT, Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> +handle_error_message( + {rexi_EXIT, {maintenance_mode, _}}, + Worker, + Counters, + Replacements, + StartFun, + StartArgs, + RingOpts +) -> + handle_replacement( + Worker, + Counters, + Replacements, + StartFun, + StartArgs, + RingOpts + ); +handle_error_message( + {rexi_EXIT, Reason}, + Worker, + Counters, + _Replacements, + _StartFun, + _StartArgs, + RingOpts +) -> handle_error(Reason, Worker, Counters, RingOpts); -handle_error_message({error, Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> +handle_error_message( + {error, Reason}, + Worker, + Counters, + _Replacements, + _StartFun, + _StartArgs, + RingOpts +) -> handle_error(Reason, Worker, Counters, RingOpts); -handle_error_message({'EXIT', Reason}, Worker, - Counters, _Replacements, _StartFun, _StartArgs, RingOpts) -> +handle_error_message( + {'EXIT', Reason}, + Worker, + Counters, + _Replacements, + _StartFun, + _StartArgs, + RingOpts +) -> handle_error({exit, Reason}, Worker, Counters, RingOpts); -handle_error_message(Reason, Worker, Counters, - _Replacements, _StartFun, _StartArgs, RingOpts) -> +handle_error_message( + Reason, + Worker, + Counters, + _Replacements, + _StartFun, + _StartArgs, + RingOpts +) -> couch_log:error("Unexpected error during request: ~p", [Reason]), handle_error(Reason, Worker, Counters, RingOpts). handle_error(Reason, Worker, Counters0, RingOpts) -> Counters = fabric_dict:erase(Worker, Counters0), case fabric_ring:is_progress_possible(Counters, RingOpts) of - true -> - {ok, Counters}; - false -> - {error, Reason} + true -> + {ok, Counters}; + false -> + {error, Reason} end. -handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs, - RingOpts) -> - OldCounters = lists:filter(fun({#shard{ref=R}, _}) -> - R /= Worker#shard.ref - end, OldCntrs0), +handle_replacement( + Worker, + OldCntrs0, + OldReplacements, + StartFun, + StartArgs, + RingOpts +) -> + OldCounters = lists:filter( + fun({#shard{ref = R}, _}) -> + R /= Worker#shard.ref + end, + OldCntrs0 + ), case lists:keytake(Worker#shard.range, 1, OldReplacements) of {value, {_Range, Replacements}, NewReplacements} -> - NewCounters = lists:foldl(fun(Repl, CounterAcc) -> - NewCounter = start_replacement(StartFun, StartArgs, Repl), - fabric_dict:store(NewCounter, nil, CounterAcc) - end, OldCounters, Replacements), + NewCounters = lists:foldl( + fun(Repl, CounterAcc) -> + NewCounter = start_replacement(StartFun, StartArgs, Repl), + fabric_dict:store(NewCounter, nil, CounterAcc) + end, + OldCounters, + Replacements + ), true = fabric_ring:is_progress_possible(NewCounters, RingOpts), NewRefs = fabric_dict:fetch_keys(NewCounters), {new_refs, NewRefs, NewCounters, NewReplacements}; false -> - handle_error({nodedown, <<"progress not possible">>}, - Worker, OldCounters, RingOpts) + handle_error( + {nodedown, <<"progress not possible">>}, + Worker, + OldCounters, + RingOpts + ) end. start_replacement(StartFun, StartArgs, Shard) -> [DDoc, IndexName, QueryArgs] = StartArgs, - After = case QueryArgs#index_query_args.bookmark of - Bookmark when is_list(Bookmark) -> - lists:foldl(fun({#shard{range=R0}, After0}, Acc) -> - case R0 == Shard#shard.range of - true -> After0; - false -> Acc - end - end, nil, Bookmark); - _ -> - nil - end, - QueryArgs1 = QueryArgs#index_query_args{bookmark=After}, + After = + case QueryArgs#index_query_args.bookmark of + Bookmark when is_list(Bookmark) -> + lists:foldl( + fun({#shard{range = R0}, After0}, Acc) -> + case R0 == Shard#shard.range of + true -> After0; + false -> Acc + end + end, + nil, + Bookmark + ); + _ -> + nil + end, + QueryArgs1 = QueryArgs#index_query_args{bookmark = After}, StartArgs1 = [DDoc, IndexName, QueryArgs1], - Ref = rexi:cast(Shard#shard.node, - {dreyfus_rpc, StartFun, - [Shard#shard.name|StartArgs1]}), + Ref = rexi:cast( + Shard#shard.node, + {dreyfus_rpc, StartFun, [Shard#shard.name | StartArgs1]} + ), Shard#shard{ref = Ref}. - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). - node_down_test() -> [S1, S2, S3] = [ mk_shard("n1", [0, 4]), @@ -122,24 +190,23 @@ node_down_test() -> mk_shard("n2", [0, ?RING_END]) ], [W1, W2, W3] = [ - S1#shard{ref = make_ref()}, - S2#shard{ref = make_ref()}, - S3#shard{ref = make_ref()} + S1#shard{ref = make_ref()}, + S2#shard{ref = make_ref()}, + S3#shard{ref = make_ref()} ], Counters1 = fabric_dict:init([W1, W2, W3], nil), N1 = S1#shard.node, Msg1 = {rexi_DOWN, nil, {nil, N1}, nil}, - Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []), + Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, []), ?assertEqual({ok, [{W3, nil}]}, Res1), {ok, Counters2} = Res1, N2 = S3#shard.node, Msg2 = {rexi_DOWN, nil, {nil, N2}, nil}, - Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []), + Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, []), ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2). - worker_error_test() -> [S1, S2] = [ mk_shard("n1", [0, ?RING_END]), @@ -154,7 +221,6 @@ worker_error_test() -> {ok, Counters2} = Res1, ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, [])). - node_down_with_partitions_test() -> [S1, S2] = [ mk_shard("n1", [0, 4]), @@ -169,20 +235,20 @@ node_down_with_partitions_test() -> N1 = S1#shard.node, Msg1 = {rexi_DOWN, nil, {nil, N1}, nil}, - Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts), + Res1 = handle_error_message(Msg1, nil, Counters1, nil, nil, nil, RingOpts), ?assertEqual({ok, [{W2, nil}]}, Res1), {ok, Counters2} = Res1, N2 = S2#shard.node, Msg2 = {rexi_DOWN, nil, {nil, N2}, nil}, - Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts), + Res2 = handle_error_message(Msg2, nil, Counters2, nil, nil, nil, RingOpts), ?assertEqual({error, {nodedown, <<"progress not possible">>}}, Res2). - worker_error_with_partitions_test() -> [S1, S2] = [ mk_shard("n1", [0, 4]), - mk_shard("n2", [0, 8])], + mk_shard("n2", [0, 8]) + ], [W1, W2] = [ S1#shard{ref = make_ref()}, S2#shard{ref = make_ref()} @@ -196,7 +262,6 @@ worker_error_with_partitions_test() -> {ok, Counters2} = Res1, ?assertEqual({error, boom}, handle_error(boom, W2, Counters2, RingOpts)). - mk_shard(Name, Range) -> Node = list_to_atom(Name), BName = list_to_binary(Name), |