summaryrefslogtreecommitdiff
path: root/src/dreyfus/src/dreyfus_fabric_search.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/dreyfus/src/dreyfus_fabric_search.erl')
-rw-r--r--src/dreyfus/src/dreyfus_fabric_search.erl288
1 files changed, 176 insertions, 112 deletions
diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl
index 8edaa385a..7e78e5fc3 100644
--- a/src/dreyfus/src/dreyfus_fabric_search.erl
+++ b/src/dreyfus/src/dreyfus_fabric_search.erl
@@ -10,7 +10,6 @@
% License for the specific language governing permissions and limitations under
% the License.
-
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-module(dreyfus_fabric_search).
@@ -32,48 +31,61 @@
}).
go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>,
- [ejson_body]),
+ {ok, DDoc} = fabric:open_doc(
+ DbName,
+ <<"_design/", GroupId/binary>>,
+ [ejson_body]
+ ),
dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName),
go(DbName, DDoc, IndexName, QueryArgs);
-
-go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) ->
+go(DbName, DDoc, IndexName, #index_query_args{bookmark = nil} = QueryArgs) ->
DesignName = dreyfus_util:get_design_docid(DDoc),
dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
- Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search,
- [DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
+ Workers = fabric_util:submit_jobs(
+ Shards,
+ dreyfus_rpc,
+ search,
+ [DDoc, IndexName, dreyfus_util:export(QueryArgs)]
+ ),
Counters = fabric_dict:init(Workers, nil),
go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts);
-
-go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
- Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs)
- catch
- _:_ ->
- throw({bad_request, "Invalid bookmark parameter supplied"})
- end,
+go(DbName, DDoc, IndexName, #index_query_args{} = QueryArgs) ->
+ Bookmark0 =
+ try
+ dreyfus_bookmark:unpack(DbName, QueryArgs)
+ catch
+ _:_ ->
+ throw({bad_request, "Invalid bookmark parameter supplied"})
+ end,
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
LiveNodes = [node() | nodes()],
- LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)],
+ LiveShards = [S || #shard{node = Node} = S <- Shards, lists:member(Node, LiveNodes)],
Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards),
- Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) ->
- QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{
- bookmark = After
- }),
- case lists:member(Shard, LiveShards) of
- true ->
- Ref = rexi:cast(N, {dreyfus_rpc, search,
- [Name, DDoc, IndexName, QueryArgs1]}),
- [Shard#shard{ref = Ref}];
- false ->
- lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
- Ref = rexi:cast(N2, {dreyfus_rpc, search,
- [Name2, DDoc, IndexName, QueryArgs1]}),
- NewShard#shard{ref = Ref}
- end, find_replacement_shards(Shard, LiveShards))
- end
- end, Bookmark1),
+ Counters0 = lists:flatmap(
+ fun({#shard{name = Name, node = N} = Shard, After}) ->
+ QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{
+ bookmark = After
+ }),
+ case lists:member(Shard, LiveShards) of
+ true ->
+ Ref = rexi:cast(N, {dreyfus_rpc, search, [Name, DDoc, IndexName, QueryArgs1]}),
+ [Shard#shard{ref = Ref}];
+ false ->
+ lists:map(
+ fun(#shard{name = Name2, node = N2} = NewShard) ->
+ Ref = rexi:cast(
+ N2, {dreyfus_rpc, search, [Name2, DDoc, IndexName, QueryArgs1]}
+ ),
+ NewShard#shard{ref = Ref}
+ end,
+ find_replacement_shards(Shard, LiveShards)
+ )
+ end
+ end,
+ Bookmark1
+ ),
Counters = fabric_dict:init(Counters0, nil),
WorkerShards = fabric_dict:fetch_keys(Counters),
RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards),
@@ -95,73 +107,92 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) ->
State = #state{
limit = Limit,
sort = Sort,
- top_docs = #top_docs{total_hits=0,hits=[]},
+ top_docs = #top_docs{total_hits = 0, hits = []},
counters = Counters,
start_args = [DDoc, IndexName, QueryArgs],
replacements = Replacements,
ring_opts = RingOpts
- },
+ },
RexiMon = fabric_util:create_monitors(Workers),
- try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 1000 * 60 * 60) of
- {ok, Result} ->
- #state{top_docs=TopDocs} = Result,
- #top_docs{total_hits=TotalHits, hits=Hits,
- counts=Counts, ranges=Ranges} = TopDocs,
- case RawBookmark of
- true ->
- {ok, Bookmark, TotalHits, Hits, Counts, Ranges};
- false ->
- Bookmark1 = dreyfus_bookmark:update(Sort, Bookmark, Hits),
- Hits1 = remove_sortable(Hits),
- {ok, Bookmark1, TotalHits, Hits1, Counts, Ranges}
- end;
- {error, Reason} ->
- {error, Reason}
+ try
+ rexi_utils:recv(
+ Workers,
+ #shard.ref,
+ fun handle_message/3,
+ State,
+ infinity,
+ 1000 * 60 * 60
+ )
+ of
+ {ok, Result} ->
+ #state{top_docs = TopDocs} = Result,
+ #top_docs{
+ total_hits = TotalHits,
+ hits = Hits,
+ counts = Counts,
+ ranges = Ranges
+ } = TopDocs,
+ case RawBookmark of
+ true ->
+ {ok, Bookmark, TotalHits, Hits, Counts, Ranges};
+ false ->
+ Bookmark1 = dreyfus_bookmark:update(Sort, Bookmark, Hits),
+ Hits1 = remove_sortable(Hits),
+ {ok, Bookmark1, TotalHits, Hits1, Counts, Ranges}
+ end;
+ {error, Reason} ->
+ {error, Reason}
after
rexi_monitor:stop(RexiMon),
fabric_util:cleanup(Workers)
end.
-handle_message({ok, #top_docs{}=NewTopDocs}, Shard, State0) ->
+handle_message({ok, #top_docs{} = NewTopDocs}, Shard, State0) ->
State = upgrade_state(State0),
- #state{top_docs=TopDocs, limit=Limit, sort=Sort} = State,
+ #state{top_docs = TopDocs, limit = Limit, sort = Sort} = State,
case fabric_dict:lookup_element(Shard, State#state.counters) of
- undefined ->
- %% already heard from someone else in this range
- {ok, State};
- nil ->
- C1 = fabric_dict:store(Shard, ok, State#state.counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- Sortable = make_sortable(Shard, NewTopDocs),
- MergedTopDocs = merge_top_docs(TopDocs, Sortable, Limit, Sort),
- State1 = State#state{
- counters=C2,
- top_docs=MergedTopDocs
- },
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, State1};
- false ->
- {stop, State1}
- end
+ undefined ->
+ %% already heard from someone else in this range
+ {ok, State};
+ nil ->
+ C1 = fabric_dict:store(Shard, ok, State#state.counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ Sortable = make_sortable(Shard, NewTopDocs),
+ MergedTopDocs = merge_top_docs(TopDocs, Sortable, Limit, Sort),
+ State1 = State#state{
+ counters = C2,
+ top_docs = MergedTopDocs
+ },
+ case fabric_dict:any(nil, C2) of
+ true ->
+ {ok, State1};
+ false ->
+ {stop, State1}
+ end
end;
-
% upgrade clause
handle_message({ok, {top_docs, UpdateSeq, TotalHits, Hits}}, Shard, State) ->
TopDocs = #top_docs{
- update_seq = UpdateSeq,
- total_hits = TotalHits,
- hits = Hits},
+ update_seq = UpdateSeq,
+ total_hits = TotalHits,
+ hits = Hits
+ },
handle_message({ok, TopDocs}, Shard, State);
-
handle_message(Error, Worker, State0) ->
State = upgrade_state(State0),
- case dreyfus_fabric:handle_error_message(Error, Worker,
- State#state.counters, State#state.replacements,
- search, State#state.start_args, State#state.ring_opts) of
+ case
+ dreyfus_fabric:handle_error_message(
+ Error,
+ Worker,
+ State#state.counters,
+ State#state.replacements,
+ search,
+ State#state.start_args,
+ State#state.ring_opts
+ )
+ of
{ok, Counters} ->
- {ok, State#state{counters=Counters}};
+ {ok, State#state{counters = Counters}};
{new_refs, NewRefs, NewCounters, NewReplacements} ->
NewState = State#state{
counters = NewCounters,
@@ -172,36 +203,45 @@ handle_message(Error, Worker, State0) ->
Else
end.
-find_replacement_shards(#shard{range=Range}, AllShards) ->
+find_replacement_shards(#shard{range = Range}, AllShards) ->
[Shard || Shard <- AllShards, Shard#shard.range =:= Range].
-make_sortable(Shard, #top_docs{}=TopDocs) ->
+make_sortable(Shard, #top_docs{} = TopDocs) ->
Hits = make_sortable(Shard, TopDocs#top_docs.hits),
- TopDocs#top_docs{hits=Hits};
+ TopDocs#top_docs{hits = Hits};
make_sortable(Shard, List) when is_list(List) ->
make_sortable(Shard, List, []).
make_sortable(_, [], Acc) ->
lists:reverse(Acc);
-make_sortable(Shard, [#hit{}=Hit|Rest], Acc) ->
- make_sortable(Shard, Rest, [#sortable{item=Hit, order=Hit#hit.order, shard=Shard} | Acc]).
+make_sortable(Shard, [#hit{} = Hit | Rest], Acc) ->
+ make_sortable(Shard, Rest, [#sortable{item = Hit, order = Hit#hit.order, shard = Shard} | Acc]).
remove_sortable(List) ->
remove_sortable(List, []).
remove_sortable([], Acc) ->
lists:reverse(Acc);
-remove_sortable([#sortable{item=Item} | Rest], Acc) ->
+remove_sortable([#sortable{item = Item} | Rest], Acc) ->
remove_sortable(Rest, [Item | Acc]).
-merge_top_docs(#top_docs{}=TopDocsA, #top_docs{}=TopDocsB, Limit, Sort) ->
+merge_top_docs(#top_docs{} = TopDocsA, #top_docs{} = TopDocsB, Limit, Sort) ->
MergedTotal = sum_element(#top_docs.total_hits, TopDocsA, TopDocsB),
- MergedHits = lists:sublist(dreyfus_util:sort(Sort,
- TopDocsA#top_docs.hits ++ TopDocsB#top_docs.hits), Limit),
+ MergedHits = lists:sublist(
+ dreyfus_util:sort(
+ Sort,
+ TopDocsA#top_docs.hits ++ TopDocsB#top_docs.hits
+ ),
+ Limit
+ ),
MergedCounts = merge_facets(TopDocsA#top_docs.counts, TopDocsB#top_docs.counts),
MergedRanges = merge_facets(TopDocsA#top_docs.ranges, TopDocsB#top_docs.ranges),
- #top_docs{total_hits=MergedTotal, hits=MergedHits,
- counts=MergedCounts, ranges=MergedRanges}.
+ #top_docs{
+ total_hits = MergedTotal,
+ hits = MergedHits,
+ counts = MergedCounts,
+ ranges = MergedRanges
+ }.
merge_facets(undefined, undefined) ->
undefined;
@@ -218,26 +258,35 @@ merge_facets_int(FacetsA, []) ->
FacetsA;
merge_facets_int([], FacetsB) ->
FacetsB;
-merge_facets_int([{KA, _, _}=A | RA], [{KB, _, _} | _]=FB) when KA < KB ->
+merge_facets_int([{KA, _, _} = A | RA], [{KB, _, _} | _] = FB) when KA < KB ->
[A | merge_facets_int(RA, FB)];
merge_facets_int([{KA, VA, CA} | RA], [{KB, VB, CB} | RB]) when KA =:= KB ->
- [{KA, VA+VB, merge_facets_int(CA, CB)} | merge_facets_int(RA, RB)];
-merge_facets_int([{KA, _, _} | _]=FA, [{KB, _, _}=B | RB]) when KA > KB ->
+ [{KA, VA + VB, merge_facets_int(CA, CB)} | merge_facets_int(RA, RB)];
+merge_facets_int([{KA, _, _} | _] = FA, [{KB, _, _} = B | RB]) when KA > KB ->
[B | merge_facets_int(FA, RB)].
sort_facets([]) ->
[];
sort_facets(Facets) ->
- lists:sort(lists:map(fun({K, V, C}) -> {K, V, sort_facets(C)} end,
- Facets)).
+ lists:sort(
+ lists:map(
+ fun({K, V, C}) -> {K, V, sort_facets(C)} end,
+ Facets
+ )
+ ).
sum_element(N, T1, T2) ->
element(N, T1) + element(N, T2).
upgrade_state({state, Limit, Sort, TopDocs, Counters}) ->
- #state{limit=Limit, sort=Sort, top_docs=TopDocs, counters=Counters,
- replacements=[]};
-upgrade_state(#state{}=State) ->
+ #state{
+ limit = Limit,
+ sort = Sort,
+ top_docs = TopDocs,
+ counters = Counters,
+ replacements = []
+ };
+upgrade_state(#state{} = State) ->
State.
-ifdef(TEST).
@@ -248,23 +297,38 @@ merge_facets_test() ->
?assertEqual([{foo, 1.0, []}], merge_facets([{foo, 1.0, []}], [])),
% one level, one key
- ?assertEqual([{foo, 3.0, []}],
- merge_facets([{foo, 1.0, []}],
- [{foo, 2.0, []}])),
+ ?assertEqual(
+ [{foo, 3.0, []}],
+ merge_facets(
+ [{foo, 1.0, []}],
+ [{foo, 2.0, []}]
+ )
+ ),
% one level, two keys
- ?assertEqual([{bar, 6.0, []}, {foo, 9.0, []}],
- merge_facets([{foo, 1.0, []}, {bar, 2.0, []}],
- [{bar, 4.0, []}, {foo, 8.0, []}])),
+ ?assertEqual(
+ [{bar, 6.0, []}, {foo, 9.0, []}],
+ merge_facets(
+ [{foo, 1.0, []}, {bar, 2.0, []}],
+ [{bar, 4.0, []}, {foo, 8.0, []}]
+ )
+ ),
% multi level, multi keys
- ?assertEqual([{foo, 2.0, [{bar, 2.0, []}]}],
- merge_facets([{foo, 1.0, [{bar, 1.0, []}]}],
- [{foo, 1.0, [{bar, 1.0, []}]}])),
-
- ?assertEqual([{foo, 5.0, [{bar, 7.0, [{bar, 1.0, []}, {baz, 3.0, []}, {foo, 6.5, []}]}]}],
- merge_facets([{foo, 1.0, [{bar, 2.0, [{baz, 3.0, []}, {foo, 0.5, []}]}]}],
- [{foo, 4.0, [{bar, 5.0, [{foo, 6.0, []}, {bar, 1.0, []}]}]}])).
-
+ ?assertEqual(
+ [{foo, 2.0, [{bar, 2.0, []}]}],
+ merge_facets(
+ [{foo, 1.0, [{bar, 1.0, []}]}],
+ [{foo, 1.0, [{bar, 1.0, []}]}]
+ )
+ ),
+
+ ?assertEqual(
+ [{foo, 5.0, [{bar, 7.0, [{bar, 1.0, []}, {baz, 3.0, []}, {foo, 6.5, []}]}]}],
+ merge_facets(
+ [{foo, 1.0, [{bar, 2.0, [{baz, 3.0, []}, {foo, 0.5, []}]}]}],
+ [{foo, 4.0, [{bar, 5.0, [{foo, 6.0, []}, {bar, 1.0, []}]}]}]
+ )
+ ).
-endif.