summaryrefslogtreecommitdiff
path: root/src/dreyfus/src/dreyfus_fabric_search.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/dreyfus/src/dreyfus_fabric_search.erl')
-rw-r--r--src/dreyfus/src/dreyfus_fabric_search.erl18
1 files changed, 12 insertions, 6 deletions
diff --git a/src/dreyfus/src/dreyfus_fabric_search.erl b/src/dreyfus/src/dreyfus_fabric_search.erl
index acf7a83ec..c0ebde1d6 100644
--- a/src/dreyfus/src/dreyfus_fabric_search.erl
+++ b/src/dreyfus/src/dreyfus_fabric_search.erl
@@ -27,7 +27,8 @@
top_docs,
counters,
start_args,
- replacements
+ replacements,
+ ring_opts
}).
go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
@@ -40,10 +41,11 @@ go(DbName, DDoc, IndexName, #index_query_args{bookmark=nil}=QueryArgs) ->
DesignName = dreyfus_util:get_design_docid(DDoc),
dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
+ RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search,
[DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
Counters = fabric_dict:init(Workers, nil),
- go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters);
+ go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts);
go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
Bookmark0 = try dreyfus_bookmark:unpack(DbName, QueryArgs)
@@ -54,6 +56,7 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
Shards = dreyfus_util:get_shards(DbName, QueryArgs),
LiveNodes = [node() | nodes()],
LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, LiveNodes)],
+ RingOpts = dreyful_util:get_ring_opts(QueryArgs, LiveShards),
Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards),
Counters0 = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, After}) ->
QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{
@@ -73,14 +76,16 @@ go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
end
end, Bookmark1),
Counters = fabric_dict:init(Counters0, nil),
+ WorkerShards = fabric_dict:fetch_keys(Counters),
+ RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards),
QueryArgs2 = QueryArgs#index_query_args{
bookmark = Bookmark1
},
- go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1);
+ go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts);
go(DbName, DDoc, IndexName, OldArgs) ->
go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)).
-go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) ->
+go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) ->
{Workers, _} = lists:unzip(Counters),
#index_query_args{
limit = Limit,
@@ -94,7 +99,8 @@ go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark) ->
top_docs = #top_docs{total_hits=0,hits=[]},
counters = Counters,
start_args = [DDoc, IndexName, QueryArgs],
- replacements = Replacements
+ replacements = Replacements,
+ ring_opts = RingOpts
},
RexiMon = fabric_util:create_monitors(Workers),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
@@ -154,7 +160,7 @@ handle_message(Error, Worker, State0) ->
State = upgrade_state(State0),
case dreyfus_fabric:handle_error_message(Error, Worker,
State#state.counters, State#state.replacements,
- search, State#state.start_args) of
+ search, State#state.start_args, State#state.ring_opts) of
{ok, Counters} ->
{ok, State#state{counters=Counters}};
{new_refs, NewRefs, NewCounters, NewReplacements} ->