diff options
Diffstat (limited to 'src/dreyfus/src/dreyfus_fabric_search.erl')
-rw-r--r-- | src/dreyfus/src/dreyfus_fabric_search.erl | 270 |
1 files changed, 0 insertions, 270 deletions
diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl deleted file mode 100644 index 8edaa385a..000000000 --- a/src/dreyfus/src/dreyfus_fabric_search.erl +++ /dev/null @@ -1,270 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - - -%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- - --module(dreyfus_fabric_search). - --include("dreyfus.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --export([go/4]). - --record(state, { - limit, - sort, - top_docs, - counters, - start_args, - replacements, - ring_opts -}). - -go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, - [ejson_body]), - dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName), - go(DbName, DDoc, IndexName, QueryArgs); - -go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) -> - DesignName = dreyfus_util:get_design_docid(DDoc), - dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), - Shards = dreyfus_util:get_shards(DbName, QueryArgs), - RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), - Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search, - [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), - Counters = fabric_dict:init(Workers, nil), - go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts); - -go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> - Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs) - catch - _:_ -> - throw({bad_request, "Invalid bookmark parameter supplied"}) - end, - Shards = dreyfus_util:get_shards(DbName, QueryArgs), - LiveNodes = [node() | nodes()], - LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)], - Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards), - Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) -> - QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{ - bookmark = After - }), - case lists:member(Shard, LiveShards) of - true -> - Ref = rexi:cast(N, {dreyfus_rpc, search, - [Name, DDoc, IndexName, QueryArgs1]}), - [Shard#shard{ref = Ref}]; - false -> - lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> - Ref = rexi:cast(N2, {dreyfus_rpc, search, - [Name2, DDoc, IndexName, QueryArgs1]}), - NewShard#shard{ref = Ref} - end, find_replacement_shards(Shard, LiveShards)) - end - end, Bookmark1), - Counters = fabric_dict:init(Counters0, nil), - WorkerShards = fabric_dict:fetch_keys(Counters), - RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards), - QueryArgs2 = QueryArgs#index_query_args{ - bookmark = Bookmark1 - }, - go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts); -go(DbName, DDoc, IndexName, OldArgs) -> - go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)). - -go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) -> - {Workers, _} = lists:unzip(Counters), - #index_query_args{ - limit = Limit, - sort = Sort, - raw_bookmark = RawBookmark - } = QueryArgs, - Replacements = fabric_view:get_shard_replacements(DbName, Workers), - State = #state{ - limit = Limit, - sort = Sort, - top_docs = #top_docs{total_hits=0,hits=[]}, - counters = Counters, - start_args = [DDoc, IndexName, QueryArgs], - replacements = Replacements, - ring_opts = RingOpts - }, - RexiMon = fabric_util:create_monitors(Workers), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 1000 * 60 * 60) of - {ok, Result} -> - #state{top_docs=TopDocs} = Result, - #top_docs{total_hits=TotalHits, hits=Hits, - counts=Counts, ranges=Ranges} = TopDocs, - case RawBookmark of - true -> - {ok, Bookmark, TotalHits, Hits, Counts, Ranges}; - false -> - Bookmark1 = dreyfus_bookmark:update(Sort, Bookmark, Hits), - Hits1 = remove_sortable(Hits), - {ok, Bookmark1, TotalHits, Hits1, Counts, Ranges} - end; - {error, Reason} -> - {error, Reason} - after - rexi_monitor:stop(RexiMon), - fabric_util:cleanup(Workers) - end. - -handle_message({ok, #top_docs{}=NewTopDocs}, Shard, State0) -> - State = upgrade_state(State0), - #state{top_docs=TopDocs, limit=Limit, sort=Sort} = State, - case fabric_dict:lookup_element(Shard, State#state.counters) of - undefined -> - %% already heard from someone else in this range - {ok, State}; - nil -> - C1 = fabric_dict:store(Shard, ok, State#state.counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - Sortable = make_sortable(Shard, NewTopDocs), - MergedTopDocs = merge_top_docs(TopDocs, Sortable, Limit, Sort), - State1 = State#state{ - counters=C2, - top_docs=MergedTopDocs - }, - case fabric_dict:any(nil, C2) of - true -> - {ok, State1}; - false -> - {stop, State1} - end - end; - -% upgrade clause -handle_message({ok, {top_docs, UpdateSeq, TotalHits, Hits}}, Shard, State) -> - TopDocs = #top_docs{ - update_seq = UpdateSeq, - total_hits = TotalHits, - hits = Hits}, - handle_message({ok, TopDocs}, Shard, State); - -handle_message(Error, Worker, State0) -> - State = upgrade_state(State0), - case dreyfus_fabric:handle_error_message(Error, Worker, - State#state.counters, State#state.replacements, - search, State#state.start_args, State#state.ring_opts) of - {ok, Counters} -> - {ok, State#state{counters=Counters}}; - {new_refs, NewRefs, NewCounters, NewReplacements} -> - NewState = State#state{ - counters = NewCounters, - replacements = NewReplacements - }, - {new_refs, NewRefs, NewState}; - Else -> - Else - end. - -find_replacement_shards(#shard{range=Range}, AllShards) -> - [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. - -make_sortable(Shard, #top_docs{}=TopDocs) -> - Hits = make_sortable(Shard, TopDocs#top_docs.hits), - TopDocs#top_docs{hits=Hits}; -make_sortable(Shard, List) when is_list(List) -> - make_sortable(Shard, List, []). - -make_sortable(_, [], Acc) -> - lists:reverse(Acc); -make_sortable(Shard, [#hit{}=Hit|Rest], Acc) -> - make_sortable(Shard, Rest, [#sortable{item=Hit, order=Hit#hit.order, shard=Shard} | Acc]). - -remove_sortable(List) -> - remove_sortable(List, []). - -remove_sortable([], Acc) -> - lists:reverse(Acc); -remove_sortable([#sortable{item=Item} | Rest], Acc) -> - remove_sortable(Rest, [Item | Acc]). - -merge_top_docs(#top_docs{}=TopDocsA, #top_docs{}=TopDocsB, Limit, Sort) -> - MergedTotal = sum_element(#top_docs.total_hits, TopDocsA, TopDocsB), - MergedHits = lists:sublist(dreyfus_util:sort(Sort, - TopDocsA#top_docs.hits ++ TopDocsB#top_docs.hits), Limit), - MergedCounts = merge_facets(TopDocsA#top_docs.counts, TopDocsB#top_docs.counts), - MergedRanges = merge_facets(TopDocsA#top_docs.ranges, TopDocsB#top_docs.ranges), - #top_docs{total_hits=MergedTotal, hits=MergedHits, - counts=MergedCounts, ranges=MergedRanges}. - -merge_facets(undefined, undefined) -> - undefined; -merge_facets(undefined, Facets) -> - sort_facets(Facets); -merge_facets(Facets, undefined) -> - sort_facets(Facets); -merge_facets(FacetsA, FacetsB) -> - merge_facets_int(sort_facets(FacetsA), sort_facets(FacetsB)). - -merge_facets_int([], []) -> - []; -merge_facets_int(FacetsA, []) -> - FacetsA; -merge_facets_int([], FacetsB) -> - FacetsB; -merge_facets_int([{KA, _, _}=A | RA], [{KB, _, _} | _]=FB) when KA < KB -> - [A | merge_facets_int(RA, FB)]; -merge_facets_int([{KA, VA, CA} | RA], [{KB, VB, CB} | RB]) when KA =:= KB -> - [{KA, VA+VB, merge_facets_int(CA, CB)} | merge_facets_int(RA, RB)]; -merge_facets_int([{KA, _, _} | _]=FA, [{KB, _, _}=B | RB]) when KA > KB -> - [B | merge_facets_int(FA, RB)]. - -sort_facets([]) -> - []; -sort_facets(Facets) -> - lists:sort(lists:map(fun({K, V, C}) -> {K, V, sort_facets(C)} end, - Facets)). - -sum_element(N, T1, T2) -> - element(N, T1) + element(N, T2). - -upgrade_state({state, Limit, Sort, TopDocs, Counters}) -> - #state{limit=Limit, sort=Sort, top_docs=TopDocs, counters=Counters, - replacements=[]}; -upgrade_state(#state{}=State) -> - State. - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -merge_facets_test() -> - % empty list is a no-op - ?assertEqual([{foo, 1.0, []}], merge_facets([{foo, 1.0, []}], [])), - - % one level, one key - ?assertEqual([{foo, 3.0, []}], - merge_facets([{foo, 1.0, []}], - [{foo, 2.0, []}])), - - % one level, two keys - ?assertEqual([{bar, 6.0, []}, {foo, 9.0, []}], - merge_facets([{foo, 1.0, []}, {bar, 2.0, []}], - [{bar, 4.0, []}, {foo, 8.0, []}])), - - % multi level, multi keys - ?assertEqual([{foo, 2.0, [{bar, 2.0, []}]}], - merge_facets([{foo, 1.0, [{bar, 1.0, []}]}], - [{foo, 1.0, [{bar, 1.0, []}]}])), - - ?assertEqual([{foo, 5.0, [{bar, 7.0, [{bar, 1.0, []}, {baz, 3.0, []}, {foo, 6.5, []}]}]}], - merge_facets([{foo, 1.0, [{bar, 2.0, [{baz, 3.0, []}, {foo, 0.5, []}]}]}], - [{foo, 4.0, [{bar, 5.0, [{foo, 6.0, []}, {bar, 1.0, []}]}]}])). - - --endif. |