summaryrefslogtreecommitdiff
path: root/src/dreyfus/src/dreyfus_fabric.erl
blob: a953b6a38fcc4939c92a6ae2fe32b0c62e54c053 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.


%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-

-module(dreyfus_fabric).
-export([get_json_docs/2, handle_error_message/6]).

-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
-include("dreyfus.hrl").

get_json_docs(DbName, DocIds) ->
    fabric:all_docs(DbName, fun callback/2, [], [{keys, DocIds}, {include_docs, true}]).

callback({meta,_}, Acc) ->
    {ok, Acc};
callback({error, Reason}, _Acc) ->
    {error, Reason};
callback({row, Row}, Acc) ->
    {id, Id} = lists:keyfind(id, 1, Row),
    {ok, [{Id, lists:keyfind(doc, 1, Row)}|Acc]};
callback(complete, Acc) ->
    {ok, lists:reverse(Acc)};
callback(timeout, _Acc) ->
    {error, timeout}.

handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker,
                     Counters, _Replacements, _StartFun, _StartArgs) ->
    case fabric_util:remove_down_workers(Counters, NodeRef) of
    {ok, NewCounters} ->
        {ok, NewCounters};
    error ->
        {error, {nodedown, <<"progress not possible">>}}
    end;
handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker,
                     Counters, Replacements, StartFun, StartArgs) ->
    handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs);
handle_error_message({rexi_EXIT, Reason}, Worker,
                     Counters, _Replacements, _StartFun, _StartArgs) ->
    handle_error(Reason, Worker, Counters);
handle_error_message({error, Reason}, Worker,
                     Counters, _Replacements, _StartFun, _StartArgs) ->
    handle_error(Reason, Worker, Counters);
handle_error_message({'EXIT', Reason}, Worker,
                     Counters, _Replacements, _StartFun, _StartArgs) ->
    handle_error({exit, Reason}, Worker, Counters);
handle_error_message(Reason, Worker, Counters,
                     _Replacements, _StartFun, _StartArgs) ->
    couch_log:error("Unexpected error during request: ~p", [Reason]),
    handle_error(Reason, Worker, Counters).

handle_error(Reason, Worker, Counters0) ->
    Counters = fabric_dict:erase(Worker, Counters0),
    case fabric_view:is_progress_possible(Counters) of
    true ->
        {ok, Counters};
    false ->
        {error, Reason}
    end.

handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
    OldCounters = lists:filter(fun({#shard{ref=R}, _}) ->
        R /= Worker#shard.ref
    end, OldCntrs0),
    case lists:keytake(Worker#shard.range, 1, OldReplacements) of
        {value, {_Range, Replacements}, NewReplacements} ->
            NewCounters = lists:foldl(fun(Repl, CounterAcc) ->
                NewCounter = start_replacement(StartFun, StartArgs, Repl),
                fabric_dict:store(NewCounter, nil, CounterAcc)
            end, OldCounters, Replacements),
            true = fabric_view:is_progress_possible(NewCounters),
            NewRefs = fabric_dict:fetch_keys(NewCounters),
            {new_refs, NewRefs, NewCounters, NewReplacements};
        false ->
            handle_error({nodedown, <<"progress not possible">>},
                         Worker, OldCounters)
    end.

start_replacement(StartFun, StartArgs, Shard) ->
    [DDoc, IndexName, QueryArgs] = StartArgs,
    After = case QueryArgs#index_query_args.bookmark of
        Bookmark when is_list(Bookmark) ->
            lists:foldl(fun({#shard{range=R0}, After0}, Acc) ->
                case R0 == Shard#shard.range of
                    true -> After0;
                    false -> Acc
                end
            end, nil, Bookmark);
        _ ->
            nil
    end,
    QueryArgs1 = QueryArgs#index_query_args{bookmark=After},
    StartArgs1 = [DDoc, IndexName, QueryArgs1],
    Ref = rexi:cast(Shard#shard.node,
                    {dreyfus_rpc, StartFun,
                     [Shard#shard.name|StartArgs1]}),
    Shard#shard{ref = Ref}.