diff options
Diffstat (limited to 'src/nouveau/src/nouveau_fabric_search.erl')
-rw-r--r-- | src/nouveau/src/nouveau_fabric_search.erl | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/src/nouveau/src/nouveau_fabric_search.erl b/src/nouveau/src/nouveau_fabric_search.erl new file mode 100644 index 000000000..4e528cc93 --- /dev/null +++ b/src/nouveau/src/nouveau_fabric_search.erl @@ -0,0 +1,221 @@ +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_fabric_search). + +-export([go/4]). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(state, { + limit, + sort, + counters, + search_results +}). + +go(DbName, GroupId, IndexName, QueryArgs0) when is_binary(GroupId) -> + {ok, DDoc} = fabric:open_doc( + DbName, + <<"_design/", GroupId/binary>>, + [ejson_body] + ), + go(DbName, DDoc, IndexName, QueryArgs0); +go(DbName, #doc{} = DDoc, IndexName, QueryArgs0) -> + {ok, Index} = nouveau_util:design_doc_to_index(DbName, DDoc, IndexName), + Shards = mem3:shards(DbName), + {PackedBookmark, #{limit := Limit, sort := Sort} = QueryArgs1} = + maps:take(bookmark, QueryArgs0), + Bookmark = nouveau_bookmark:unpack(DbName, PackedBookmark), + Counters0 = lists:map( + fun(#shard{} = Shard) -> + After = maps:get(Shard#shard.range, Bookmark, null), + Ref = rexi:cast( + Shard#shard.node, + {nouveau_rpc, search, [Shard#shard.name, Index, QueryArgs1#{'after' => After}]} + ), + Shard#shard{ref = Ref} + end, + Shards + ), + Counters = fabric_dict:init(Counters0, nil), + Workers = fabric_dict:fetch_keys(Counters), + RexiMon = fabric_util:create_monitors(Workers), + State = #state{ + limit = Limit, + sort = Sort, + counters = Counters, + search_results = #{} + }, + try + rexi_utils:recv( + Workers, + #shard.ref, + fun handle_message/3, + State, + fabric_util:timeout("nouveau", "infinity"), + fabric_util:timeout("nouveau_permsg", "3600000") + ) + of + {ok, SearchResults} -> + NewBookmark = nouveau_bookmark:update(DbName, Bookmark, SearchResults), + {ok, simplify_hits(SearchResults#{bookmark => NewBookmark})}; + {error, Reason} -> + {error, Reason} + after + rexi_monitor:stop(RexiMon), + fabric_util:cleanup(Workers) + end. + +handle_message({ok, Response}, Shard, State) -> + case fabric_dict:lookup_element(Shard, State#state.counters) of + undefined -> + %% already heard from someone else in this range + {ok, State}; + nil -> + SearchResults = merge_search_results(State#state.search_results, Response, State), + Counters1 = fabric_dict:store(Shard, ok, State#state.counters), + Counters2 = fabric_view:remove_overlapping_shards(Shard, Counters1), + State1 = State#state{counters = Counters2, search_results = SearchResults}, + case fabric_dict:any(nil, Counters2) of + true -> + {ok, State1}; + false -> + {stop, SearchResults} + end + end; +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, State) -> + #state{counters = Counters0} = State, + case fabric_util:remove_down_workers(Counters0, NodeRef, []) of + {ok, Counters1} -> + {ok, Counters1}; + error -> + {error, {nodedown, <<"progress not possible">>}} + end; +handle_message({error, Reason}, _Shard, _State) -> + {error, Reason}; +handle_message(Else, _Shard, _State) -> + {error, Else}. + +merge_search_results(A, B, #state{} = State) -> + #{ + <<"total_hits">> => merge_total_hits( + maps:get(<<"total_hits">>, A, 0), maps:get(<<"total_hits">>, B, 0) + ), + <<"total_hits_relation">> => merge_total_hits_relation( + maps:get(<<"total_hits_relation">>, A, null), + maps:get(<<"total_hits_relation">>, B, null) + ), + <<"hits">> => merge_hits( + maps:get(<<"hits">>, A, []), + maps:get(<<"hits">>, B, []), + State#state.sort, + State#state.limit + ), + <<"counts">> => merge_facets( + maps:get(<<"counts">>, A, null), maps:get(<<"counts">>, B, null), State#state.limit + ), + <<"ranges">> => merge_facets( + maps:get(<<"ranges">>, A, null), maps:get(<<"ranges">>, B, null), State#state.limit + ) + }. + +merge_total_hits(TotalHitsA, TotalHitsB) -> + TotalHitsA + TotalHitsB. + +merge_total_hits_relation(A, B) when + A == <<"GREATER_THAN_OR_EQUAL_TO">>; B == <<"GREATER_THAN_OR_EQUAL_TO">> +-> + <<"GREATER_THAN_OR_EQUAL_TO">>; +merge_total_hits_relation(A, B) when A == <<"EQUAL_TO">>; B == <<"EQUAL_TO">> -> + <<"EQUAL_TO">>; +merge_total_hits_relation(null, null) -> + %% not supported in selected Lucene version. + null. + +merge_hits(HitsA, HitsB, Sort, Limit) -> + MergedHits = lists:merge(merge_fun(Sort), HitsA, HitsB), + lists:sublist(MergedHits, Limit). + +simplify_hits(SearchResults) -> + #{<<"hits">> := Hits} = SearchResults, + SearchResults#{<<"hits">> => lists:map(fun simplify_hit/1, Hits)}. + +simplify_hit(#{} = Hit) -> + #{<<"fields">> := Fields} = Hit, + Hit#{<<"fields">> => simplify_fields(Fields)}. + +simplify_fields(Fields) when is_list(Fields) -> + Fun = fun(Field, Acc) -> + {Key, Value} = simplify_field(Field), + Acc#{Key => Value} + end, + lists:foldl(Fun, #{}, Fields). + +simplify_field(#{<<"@type">> := <<"stored">>} = Field) -> + #{<<"name">> := Key, <<"value">> := Value} = Field, + {Key, Value}. + +merge_fun(Sort) -> + fun(HitA, HitB) -> + OrderA = maps:get(<<"order">>, HitA), + OrderB = maps:get(<<"order">>, HitB), + compare_order(Sort, OrderA, OrderB) + end. + +%% no sort order specified +compare_order(null, [A | ARest], [B | BRest]) -> + case couch_ejson_compare:less(convert_item(A), convert_item(B)) of + 0 -> + compare_order(null, ARest, BRest); + Less -> + Less < 1 + end; +%% server-side adds _id on the end of sort order if not present +compare_order([], [A], [B]) -> + couch_ejson_compare:less(convert_item(A), convert_item(B)) < 1; +%% reverse order specified +compare_order([<<"-", _/binary>> | SortRest], [A | ARest], [B | BRest]) -> + case couch_ejson_compare:less(convert_item(B), convert_item(A)) of + 0 -> + compare_order(SortRest, ARest, BRest); + Less -> + Less < 1 + end; +%% forward order specified +compare_order([_ | SortRest], [A | ARest], [B | BRest]) -> + case couch_ejson_compare:less(convert_item(A), convert_item(B)) of + 0 -> + compare_order(SortRest, ARest, BRest); + Less -> + Less < 1 + end. + +convert_item(Item) -> + case maps:get(<<"@type">>, Item) of + <<"bytes">> -> + base64:decode(maps:get(<<"value">>, Item)); + _ -> + maps:get(<<"value">>, Item) + end. + +merge_facets(FacetsA, null, _Limit) -> + FacetsA; +merge_facets(null, FacetsB, _Limit) -> + FacetsB; +merge_facets(FacetsA, FacetsB, _Limit) -> + Combiner = fun(_, V1, V2) -> maps:merge_with(fun(_, V3, V4) -> V3 + V4 end, V1, V2) end, + maps:merge_with(Combiner, FacetsA, FacetsB). |