diff options
Diffstat (limited to 'src/dreyfus/src/dreyfus_fabric_search.erl')
-rw-r--r-- | src/dreyfus/src/dreyfus_fabric_search.erl | 288 |
1 files changed, 176 insertions, 112 deletions
diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl index 8edaa385a..7e78e5fc3 100644 --- a/src/dreyfus/src/dreyfus_fabric_search.erl +++ b/src/dreyfus/src/dreyfus_fabric_search.erl @@ -10,7 +10,6 @@ % License for the specific language governing permissions and limitations under % the License. - %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- -module(dreyfus_fabric_search). @@ -32,48 +31,61 @@ }). go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, - [ejson_body]), + {ok, DDoc} = fabric:open_doc( + DbName, + <<"_design/", GroupId/binary>>, + [ejson_body] + ), dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName), go(DbName, DDoc, IndexName, QueryArgs); - -go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) -> +go(DbName, DDoc, IndexName, #index_query_args{bookmark = nil} = QueryArgs) -> DesignName = dreyfus_util:get_design_docid(DDoc), dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), Shards = dreyfus_util:get_shards(DbName, QueryArgs), RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), - Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search, - [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), + Workers = fabric_util:submit_jobs( + Shards, + dreyfus_rpc, + search, + [DDoc, IndexName, dreyfus_util:export(QueryArgs)] + ), Counters = fabric_dict:init(Workers, nil), go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts); - -go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> - Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs) - catch - _:_ -> - throw({bad_request, "Invalid bookmark parameter supplied"}) - end, +go(DbName, DDoc, IndexName, #index_query_args{} = QueryArgs) -> + Bookmark0 = + try + dreyfus_bookmark:unpack(DbName, QueryArgs) + catch + _:_ -> + throw({bad_request, "Invalid bookmark parameter supplied"}) + end, Shards = dreyfus_util:get_shards(DbName, QueryArgs), LiveNodes = [node() | nodes()], - LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)], + LiveShards = [S || #shard{node = Node} = S <- Shards, lists:member(Node, LiveNodes)], Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards), - Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) -> - QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{ - bookmark = After - }), - case lists:member(Shard, LiveShards) of - true -> - Ref = rexi:cast(N, {dreyfus_rpc, search, - [Name, DDoc, IndexName, QueryArgs1]}), - [Shard#shard{ref = Ref}]; - false -> - lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> - Ref = rexi:cast(N2, {dreyfus_rpc, search, - [Name2, DDoc, IndexName, QueryArgs1]}), - NewShard#shard{ref = Ref} - end, find_replacement_shards(Shard, LiveShards)) - end - end, Bookmark1), + Counters0 = lists:flatmap( + fun({#shard{name = Name, node = N} = Shard, After}) -> + QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{ + bookmark = After + }), + case lists:member(Shard, LiveShards) of + true -> + Ref = rexi:cast(N, {dreyfus_rpc, search, [Name, DDoc, IndexName, QueryArgs1]}), + [Shard#shard{ref = Ref}]; + false -> + lists:map( + fun(#shard{name = Name2, node = N2} = NewShard) -> + Ref = rexi:cast( + N2, {dreyfus_rpc, search, [Name2, DDoc, IndexName, QueryArgs1]} + ), + NewShard#shard{ref = Ref} + end, + find_replacement_shards(Shard, LiveShards) + ) + end + end, + Bookmark1 + ), Counters = fabric_dict:init(Counters0, nil), WorkerShards = fabric_dict:fetch_keys(Counters), RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards), @@ -95,73 +107,92 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) -> State = #state{ limit = Limit, sort = Sort, - top_docs = #top_docs{total_hits=0,hits=[]}, + top_docs = #top_docs{total_hits = 0, hits = []}, counters = Counters, start_args = [DDoc, IndexName, QueryArgs], replacements = Replacements, ring_opts = RingOpts - }, + }, RexiMon = fabric_util:create_monitors(Workers), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 1000 * 60 * 60) of - {ok, Result} -> - #state{top_docs=TopDocs} = Result, - #top_docs{total_hits=TotalHits, hits=Hits, - counts=Counts, ranges=Ranges} = TopDocs, - case RawBookmark of - true -> - {ok, Bookmark, TotalHits, Hits, Counts, Ranges}; - false -> - Bookmark1 = dreyfus_bookmark:update(Sort, Bookmark, Hits), - Hits1 = remove_sortable(Hits), - {ok, Bookmark1, TotalHits, Hits1, Counts, Ranges} - end; - {error, Reason} -> - {error, Reason} + try + rexi_utils:recv( + Workers, + #shard.ref, + fun handle_message/3, + State, + infinity, + 1000 * 60 * 60 + ) + of + {ok, Result} -> + #state{top_docs = TopDocs} = Result, + #top_docs{ + total_hits = TotalHits, + hits = Hits, + counts = Counts, + ranges = Ranges + } = TopDocs, + case RawBookmark of + true -> + {ok, Bookmark, TotalHits, Hits, Counts, Ranges}; + false -> + Bookmark1 = dreyfus_bookmark:update(Sort, Bookmark, Hits), + Hits1 = remove_sortable(Hits), + {ok, Bookmark1, TotalHits, Hits1, Counts, Ranges} + end; + {error, Reason} -> + {error, Reason} after rexi_monitor:stop(RexiMon), fabric_util:cleanup(Workers) end. -handle_message({ok, #top_docs{}=NewTopDocs}, Shard, State0) -> +handle_message({ok, #top_docs{} = NewTopDocs}, Shard, State0) -> State = upgrade_state(State0), - #state{top_docs=TopDocs, limit=Limit, sort=Sort} = State, + #state{top_docs = TopDocs, limit = Limit, sort = Sort} = State, case fabric_dict:lookup_element(Shard, State#state.counters) of - undefined -> - %% already heard from someone else in this range - {ok, State}; - nil -> - C1 = fabric_dict:store(Shard, ok, State#state.counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - Sortable = make_sortable(Shard, NewTopDocs), - MergedTopDocs = merge_top_docs(TopDocs, Sortable, Limit, Sort), - State1 = State#state{ - counters=C2, - top_docs=MergedTopDocs - }, - case fabric_dict:any(nil, C2) of - true -> - {ok, State1}; - false -> - {stop, State1} - end + undefined -> + %% already heard from someone else in this range + {ok, State}; + nil -> + C1 = fabric_dict:store(Shard, ok, State#state.counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + Sortable = make_sortable(Shard, NewTopDocs), + MergedTopDocs = merge_top_docs(TopDocs, Sortable, Limit, Sort), + State1 = State#state{ + counters = C2, + top_docs = MergedTopDocs + }, + case fabric_dict:any(nil, C2) of + true -> + {ok, State1}; + false -> + {stop, State1} + end end; - % upgrade clause handle_message({ok, {top_docs, UpdateSeq, TotalHits, Hits}}, Shard, State) -> TopDocs = #top_docs{ - update_seq = UpdateSeq, - total_hits = TotalHits, - hits = Hits}, + update_seq = UpdateSeq, + total_hits = TotalHits, + hits = Hits + }, handle_message({ok, TopDocs}, Shard, State); - handle_message(Error, Worker, State0) -> State = upgrade_state(State0), - case dreyfus_fabric:handle_error_message(Error, Worker, - State#state.counters, State#state.replacements, - search, State#state.start_args, State#state.ring_opts) of + case + dreyfus_fabric:handle_error_message( + Error, + Worker, + State#state.counters, + State#state.replacements, + search, + State#state.start_args, + State#state.ring_opts + ) + of {ok, Counters} -> - {ok, State#state{counters=Counters}}; + {ok, State#state{counters = Counters}}; {new_refs, NewRefs, NewCounters, NewReplacements} -> NewState = State#state{ counters = NewCounters, @@ -172,36 +203,45 @@ handle_message(Error, Worker, State0) -> Else end. -find_replacement_shards(#shard{range=Range}, AllShards) -> +find_replacement_shards(#shard{range = Range}, AllShards) -> [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. -make_sortable(Shard, #top_docs{}=TopDocs) -> +make_sortable(Shard, #top_docs{} = TopDocs) -> Hits = make_sortable(Shard, TopDocs#top_docs.hits), - TopDocs#top_docs{hits=Hits}; + TopDocs#top_docs{hits = Hits}; make_sortable(Shard, List) when is_list(List) -> make_sortable(Shard, List, []). make_sortable(_, [], Acc) -> lists:reverse(Acc); -make_sortable(Shard, [#hit{}=Hit|Rest], Acc) -> - make_sortable(Shard, Rest, [#sortable{item=Hit, order=Hit#hit.order, shard=Shard} | Acc]). +make_sortable(Shard, [#hit{} = Hit | Rest], Acc) -> + make_sortable(Shard, Rest, [#sortable{item = Hit, order = Hit#hit.order, shard = Shard} | Acc]). remove_sortable(List) -> remove_sortable(List, []). remove_sortable([], Acc) -> lists:reverse(Acc); -remove_sortable([#sortable{item=Item} | Rest], Acc) -> +remove_sortable([#sortable{item = Item} | Rest], Acc) -> remove_sortable(Rest, [Item | Acc]). -merge_top_docs(#top_docs{}=TopDocsA, #top_docs{}=TopDocsB, Limit, Sort) -> +merge_top_docs(#top_docs{} = TopDocsA, #top_docs{} = TopDocsB, Limit, Sort) -> MergedTotal = sum_element(#top_docs.total_hits, TopDocsA, TopDocsB), - MergedHits = lists:sublist(dreyfus_util:sort(Sort, - TopDocsA#top_docs.hits ++ TopDocsB#top_docs.hits), Limit), + MergedHits = lists:sublist( + dreyfus_util:sort( + Sort, + TopDocsA#top_docs.hits ++ TopDocsB#top_docs.hits + ), + Limit + ), MergedCounts = merge_facets(TopDocsA#top_docs.counts, TopDocsB#top_docs.counts), MergedRanges = merge_facets(TopDocsA#top_docs.ranges, TopDocsB#top_docs.ranges), - #top_docs{total_hits=MergedTotal, hits=MergedHits, - counts=MergedCounts, ranges=MergedRanges}. + #top_docs{ + total_hits = MergedTotal, + hits = MergedHits, + counts = MergedCounts, + ranges = MergedRanges + }. merge_facets(undefined, undefined) -> undefined; @@ -218,26 +258,35 @@ merge_facets_int(FacetsA, []) -> FacetsA; merge_facets_int([], FacetsB) -> FacetsB; -merge_facets_int([{KA, _, _}=A | RA], [{KB, _, _} | _]=FB) when KA < KB -> +merge_facets_int([{KA, _, _} = A | RA], [{KB, _, _} | _] = FB) when KA < KB -> [A | merge_facets_int(RA, FB)]; merge_facets_int([{KA, VA, CA} | RA], [{KB, VB, CB} | RB]) when KA =:= KB -> - [{KA, VA+VB, merge_facets_int(CA, CB)} | merge_facets_int(RA, RB)]; -merge_facets_int([{KA, _, _} | _]=FA, [{KB, _, _}=B | RB]) when KA > KB -> + [{KA, VA + VB, merge_facets_int(CA, CB)} | merge_facets_int(RA, RB)]; +merge_facets_int([{KA, _, _} | _] = FA, [{KB, _, _} = B | RB]) when KA > KB -> [B | merge_facets_int(FA, RB)]. sort_facets([]) -> []; sort_facets(Facets) -> - lists:sort(lists:map(fun({K, V, C}) -> {K, V, sort_facets(C)} end, - Facets)). + lists:sort( + lists:map( + fun({K, V, C}) -> {K, V, sort_facets(C)} end, + Facets + ) + ). sum_element(N, T1, T2) -> element(N, T1) + element(N, T2). upgrade_state({state, Limit, Sort, TopDocs, Counters}) -> - #state{limit=Limit, sort=Sort, top_docs=TopDocs, counters=Counters, - replacements=[]}; -upgrade_state(#state{}=State) -> + #state{ + limit = Limit, + sort = Sort, + top_docs = TopDocs, + counters = Counters, + replacements = [] + }; +upgrade_state(#state{} = State) -> State. -ifdef(TEST). @@ -248,23 +297,38 @@ merge_facets_test() -> ?assertEqual([{foo, 1.0, []}], merge_facets([{foo, 1.0, []}], [])), % one level, one key - ?assertEqual([{foo, 3.0, []}], - merge_facets([{foo, 1.0, []}], - [{foo, 2.0, []}])), + ?assertEqual( + [{foo, 3.0, []}], + merge_facets( + [{foo, 1.0, []}], + [{foo, 2.0, []}] + ) + ), % one level, two keys - ?assertEqual([{bar, 6.0, []}, {foo, 9.0, []}], - merge_facets([{foo, 1.0, []}, {bar, 2.0, []}], - [{bar, 4.0, []}, {foo, 8.0, []}])), + ?assertEqual( + [{bar, 6.0, []}, {foo, 9.0, []}], + merge_facets( + [{foo, 1.0, []}, {bar, 2.0, []}], + [{bar, 4.0, []}, {foo, 8.0, []}] + ) + ), % multi level, multi keys - ?assertEqual([{foo, 2.0, [{bar, 2.0, []}]}], - merge_facets([{foo, 1.0, [{bar, 1.0, []}]}], - [{foo, 1.0, [{bar, 1.0, []}]}])), - - ?assertEqual([{foo, 5.0, [{bar, 7.0, [{bar, 1.0, []}, {baz, 3.0, []}, {foo, 6.5, []}]}]}], - merge_facets([{foo, 1.0, [{bar, 2.0, [{baz, 3.0, []}, {foo, 0.5, []}]}]}], - [{foo, 4.0, [{bar, 5.0, [{foo, 6.0, []}, {bar, 1.0, []}]}]}])). - + ?assertEqual( + [{foo, 2.0, [{bar, 2.0, []}]}], + merge_facets( + [{foo, 1.0, [{bar, 1.0, []}]}], + [{foo, 1.0, [{bar, 1.0, []}]}] + ) + ), + + ?assertEqual( + [{foo, 5.0, [{bar, 7.0, [{bar, 1.0, []}, {baz, 3.0, []}, {foo, 6.5, []}]}]}], + merge_facets( + [{foo, 1.0, [{bar, 2.0, [{baz, 3.0, []}, {foo, 0.5, []}]}]}], + [{foo, 4.0, [{bar, 5.0, [{foo, 6.0, []}, {bar, 1.0, []}]}]}] + ) + ). -endif. |