summaryrefslogtreecommitdiff
path: root/src/dreyfus/src/dreyfus_fabric_group1.erl
blob: bdae6f040374309c15df26ed5ec2c64b5b292982 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.


%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-

-module(dreyfus_fabric_group1).

-include("dreyfus.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").

-export([go/4]).

-record(state, {
    limit,
    sort,
    top_groups,
    counters,
    start_args,
    replacements,
    ring_opts
}).

go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
    dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName),
    go(DbName, DDoc, IndexName, QueryArgs);

go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
    DesignName = dreyfus_util:get_design_docid(DDoc),
    dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
    Shards = dreyfus_util:get_shards(DbName, QueryArgs),
    RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
    Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group1, [DDoc,
         IndexName, dreyfus_util:export(QueryArgs)]),
    Replacements = fabric_view:get_shard_replacements(DbName, Workers),
    Counters = fabric_dict:init(Workers, nil),
    RexiMon = fabric_util:create_monitors(Workers),
    State = #state{
        limit = QueryArgs#index_query_args.grouping#grouping.limit,
        sort = QueryArgs#index_query_args.grouping#grouping.sort,
        top_groups = [],
        counters = Counters,
        start_args = [DDoc, IndexName, QueryArgs],
        replacements = Replacements,
        ring_opts = RingOpts
    },
    try
        rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
                        State, infinity, 1000 * 60 * 60)
    after
        rexi_monitor:stop(RexiMon),
        fabric_util:cleanup(Workers)
    end;
go(DbName, DDoc, IndexName, OldArgs) ->
    go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)).

handle_message({ok, NewTopGroups}, Shard, State0) ->
    State = upgrade_state(State0),
    #state{top_groups=TopGroups, limit=Limit, sort=Sort} = State,
    case fabric_dict:lookup_element(Shard, State#state.counters) of
    undefined ->
        %% already heard from someone else in this range
        {ok, State};
    nil ->
        C1 = fabric_dict:store(Shard, ok, State#state.counters),
        C2 = fabric_view:remove_overlapping_shards(Shard, C1),
        MergedTopGroups = merge_top_groups(TopGroups, make_sortable(Shard, NewTopGroups), Limit, Sort),
        State1 = State#state{
            counters=C2,
            top_groups=MergedTopGroups
        },
        case fabric_dict:any(nil, C2) of
        true ->
            {ok, State1};
        false ->
            {stop, remove_sortable(MergedTopGroups)}
        end
    end;

handle_message(Error, Worker, State0) ->
    State = upgrade_state(State0),
    case dreyfus_fabric:handle_error_message(Error, Worker,
      State#state.counters, State#state.replacements,
      group1, State#state.start_args, State#state.ring_opts) of
        {ok, Counters} ->
            {ok, State#state{counters=Counters}};
        {new_refs, NewRefs, NewCounters, NewReplacements} ->
            NewState = State#state{
                counters = NewCounters,
                replacements = NewReplacements
            },
            {new_refs, NewRefs, NewState};
        Else ->
            Else
    end.

merge_top_groups(TopGroupsA, TopGroupsB, Limit, Sort) ->
    MergedGroups0 = TopGroupsA ++ TopGroupsB,
    GNs = lists:usort([N || #sortable{item={N,_}} <- MergedGroups0]),
    MergedGroups = [merge_top_group(Sort, [S || #sortable{item={N,_}}=S <- MergedGroups0, N =:= GN]) || GN <- GNs],
    lists:sublist(dreyfus_util:sort(Sort, MergedGroups), Limit).

merge_top_group(_Sort, [Group]) ->
    Group;
merge_top_group(Sort, [_, _] = Groups) ->
    hd(dreyfus_util:sort(Sort, Groups)).

make_sortable(Shard, TopGroups) ->
    [#sortable{item=G, order=Order, shard=Shard} || {_Name, Order}=G <- TopGroups].

remove_sortable(Sortables) ->
    [Item || #sortable{item=Item} <- Sortables].

upgrade_state({state, Limit, Sort, TopGroups, Counters}) ->
    #state{limit=Limit, sort=Sort, top_groups=TopGroups, counters=Counters,
           replacements=[]};
upgrade_state(#state{}=State) ->
    State.