diff options
Diffstat (limited to 'src/dreyfus/src/dreyfus_fabric_search.erl')
-rw-r--r-- | src/dreyfus/src/dreyfus_fabric_search.erl | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl index acf7a83ec..c0ebde1d6 100644 --- a/src/dreyfus/src/dreyfus_fabric_search.erl +++ b/src/dreyfus/src/dreyfus_fabric_search.erl @@ -27,7 +27,8 @@ top_docs, counters, start_args, - replacements + replacements, + ring_opts }). go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> @@ -40,10 +41,11 @@ go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) -> DesignName = dreyfus_util:get_design_docid(DDoc), dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), Shards = dreyfus_util:get_shards(DbName, QueryArgs), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search, [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), Counters = fabric_dict:init(Workers, nil), - go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters); + go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts); go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs) @@ -54,6 +56,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> Shards = dreyfus_util:get_shards(DbName, QueryArgs), LiveNodes = [node() | nodes()], LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)], + RingOpts = dreyful_util:get_ring_opts(QueryArgs, LiveShards), Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards), Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) -> QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{ @@ -73,14 +76,16 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> end end, Bookmark1), Counters = fabric_dict:init(Counters0, nil), + WorkerShards = fabric_dict:fetch_keys(Counters), + RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards), QueryArgs2 = QueryArgs#index_query_args{ bookmark = Bookmark1 }, - go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1); + go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts); go(DbName, DDoc, IndexName, OldArgs) -> go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)). -go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) -> +go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) -> {Workers, _} = lists:unzip(Counters), #index_query_args{ limit = Limit, @@ -94,7 +99,8 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) -> top_docs = #top_docs{total_hits=0,hits=[]}, counters = Counters, start_args = [DDoc, IndexName, QueryArgs], - replacements = Replacements + replacements = Replacements, + ring_opts = RingOpts }, RexiMon = fabric_util:create_monitors(Workers), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, @@ -154,7 +160,7 @@ handle_message(Error, Worker, State0) -> State = upgrade_state(State0), case dreyfus_fabric:handle_error_message(Error, Worker, State#state.counters, State#state.replacements, - search, State#state.start_args) of + search, State#state.start_args, State#state.ring_opts) of {ok, Counters} -> {ok, State#state{counters=Counters}}; {new_refs, NewRefs, NewCounters, NewReplacements} -> |