summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2018-09-05 12:30:31 +0100
committerRobert Newson <rnewson@apache.org>2018-09-17 21:21:16 +0100
commitf5687960e2967f5c981717d3a5f2efbe7b360a91 (patch)
tree63b34ba2f542bd288d8f4ee09f15a22f94003e05
parent38adc669e4348c8560307881099c999613f081c6 (diff)
downloadcouchdb-f5687960e2967f5c981717d3a5f2efbe7b360a91.tar.gz
Optimize skip for partitioned queries
'skip' is implemented efficiently at the worker level but we've disabled it for clustered views because of the multiple shards (and not being able to calculate the right skip value to pass to each worker). With a partitioned query, this problem is gone, as the value the query specifies will be the right value for all workers (as they hit the same shard range). This commit removes the old fix_skip_and_limit function from fabric_rpc and moves the logic up to the coordinators.
-rw-r--r--src/fabric/src/fabric_rpc.erl12
-rw-r--r--src/fabric/src/fabric_view.erl16
-rw-r--r--src/fabric/src/fabric_view_all_docs.erl5
-rw-r--r--src/fabric/src/fabric_view_map.erl5
-rw-r--r--src/fabric/src/fabric_view_reduce.erl5
5 files changed, 28 insertions, 15 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 60526f495..e538a9dbe 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -96,9 +96,8 @@ changes(DbName, Options, StartVector, DbOptions) ->
all_docs(DbName, Options, Args0) ->
case fabric_util:upgrade_mrargs(Args0) of
- #mrargs{keys=undefined} = Args1 ->
+ #mrargs{keys=undefined} = Args ->
set_io_priority(DbName, Options),
- Args = fix_skip_and_limit(Args1),
{ok, Db} = get_or_create_db(DbName, Options),
VAcc0 = #vacc{db=Db},
couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0)
@@ -122,7 +121,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
map_view(DbName, DDoc, ViewName, Args0, DbOptions);
map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
- Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+ Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
VAcc0 = #vacc{db=Db},
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
@@ -136,16 +135,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
- Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+ Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
VAcc0 = #vacc{db=Db},
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
-fix_skip_and_limit(Args) ->
- #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
- % the coordinator needs to finalize each row, so make sure the shards don't
- Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}.
-
create_db(DbName) ->
create_db(DbName, []).
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index c0e29740d..de374cdec 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -16,6 +16,7 @@
transform_row/1, keydict/1, extract_view/4, get_shards/2,
check_down_shards/2, handle_worker_exit/3,
get_shard_replacements/2, maybe_update_others/5]).
+-export([fix_skip_and_limit/1]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -388,6 +389,21 @@ get_shard_replacements(DbName, UsedShards0) ->
end
end, [], UsedShards).
+-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}.
+fix_skip_and_limit(#mrargs{} = Args) ->
+ {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of
+ undefined ->
+ #mrargs{skip=Skip, limit=Limit}=Args,
+ {Args, Args#mrargs{skip=0, limit=Skip+Limit}};
+ _Partition ->
+ {Args#mrargs{skip=0}, Args}
+ end,
+ %% the coordinator needs to finalize each row, so make sure the shards don't
+ {CoordArgs, remove_finalizer(WorkerArgs)}.
+
+remove_finalizer(Args) ->
+ couch_mrview_util:set_extra(Args, finalizer, null).
+
% unit test
is_progress_possible_test() ->
EndPoint = 2 bsl 31,
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index b12bcded3..e8868c505 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -21,15 +21,16 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
Shards = shards(DbName, QueryArgs),
Workers0 = fabric_util:submit_jobs(
- Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+ Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
RexiMon = fabric_util:create_monitors(Workers0),
try
case fabric_util:stream_start(Workers0, #shard.ref) of
{ok, Workers} ->
try
- go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+ go(DbName, Options, Workers, CoordArgs, Callback, Acc)
after
fabric_util:cleanup(Workers)
end;
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index b6a3d6f83..85e9bec62 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -25,11 +25,12 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo)
go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo);
go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
Shards = fabric_view:get_shards(DbName, Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
Repls = fabric_view:get_shard_replacements(DbName, Shards),
- RPCArgs = [DocIdAndRev, View, Args, Options],
+ RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
end,
@@ -41,7 +42,7 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
- go(DbName, Workers, VInfo, Args, Callback, Acc)
+ go(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
after
fabric_util:cleanup(Workers)
end;
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index a74be1073..1ea4d1b95 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -24,8 +24,9 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
- RPCArgs = [DocIdAndRev, VName, Args],
+ RPCArgs = [DocIdAndRev, VName, WorkerArgs],
Shards = fabric_view:get_shards(DbName, Args),
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
Repls = fabric_view:get_shard_replacements(DbName, Shards),
@@ -40,7 +41,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
- go2(DbName, Workers, VInfo, Args, Callback, Acc)
+ go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
after
fabric_util:cleanup(Workers)
end;