summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-10-25 14:27:32 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-01-18 13:03:28 -0600
commitdc369e93f8b9e104fc2ab70da5e4a804a272eb49 (patch)
tree80ea5dc3ffb169485aefc5903571392907988d6d
parenteb1ffcf652611069600c92fecb3a7ac4f4f98ba8 (diff)
downloadcouchdb-dc369e93f8b9e104fc2ab70da5e4a804a272eb49.tar.gz
Optimize offset/limit for partition queries
Now that a single shard handles the entire response we can optimize work normally done in the coordinator by moving it to the RPC worker which then removes the need to send an extra `skip` number of rows to the coordinator. Co-authored-by: Robert Newson <rnewson@apache.org>
-rw-r--r--src/fabric/src/fabric_rpc.erl12
-rw-r--r--src/fabric/src/fabric_view.erl16
-rw-r--r--src/fabric/src/fabric_view_all_docs.erl5
-rw-r--r--src/fabric/src/fabric_view_map.erl5
-rw-r--r--src/fabric/src/fabric_view_reduce.erl7
5 files changed, 29 insertions, 16 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index b80cc792e..97374be1f 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -118,9 +118,8 @@ do_changes(Db, StartSeq, Enum, Acc0, Opts) ->
all_docs(DbName, Options, Args0) ->
case fabric_util:upgrade_mrargs(Args0) of
- #mrargs{keys=undefined} = Args1 ->
+ #mrargs{keys=undefined} = Args ->
set_io_priority(DbName, Options),
- Args = fix_skip_and_limit(Args1),
{ok, Db} = get_or_create_db(DbName, Options),
CB = get_view_cb(Args),
couch_mrview:query_all_docs(Db, Args, CB, Args)
@@ -144,7 +143,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
map_view(DbName, DDoc, ViewName, Args0, DbOptions);
map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
- Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+ Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
CB = get_view_cb(Args),
couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args).
@@ -158,16 +157,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
- Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+ Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
VAcc0 = #vacc{db=Db},
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
-fix_skip_and_limit(Args) ->
- #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
- % the coordinator needs to finalize each row, so make sure the shards don't
- Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}.
-
create_db(DbName) ->
create_db(DbName, []).
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 0ba980a65..27b0c275f 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -16,6 +16,7 @@
transform_row/1, keydict/1, extract_view/4, get_shards/2,
check_down_shards/2, handle_worker_exit/3,
get_shard_replacements/2, maybe_update_others/5]).
+-export([fix_skip_and_limit/1]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -375,6 +376,21 @@ get_shard_replacements(DbName, UsedShards0) ->
end
end, [], UsedShards).
+-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}.
+fix_skip_and_limit(#mrargs{} = Args) ->
+ {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of
+ undefined ->
+ #mrargs{skip=Skip, limit=Limit}=Args,
+ {Args, Args#mrargs{skip=0, limit=Skip+Limit}};
+ _Partition ->
+ {Args#mrargs{skip=0}, Args}
+ end,
+ %% the coordinator needs to finalize each row, so make sure the shards don't
+ {CoordArgs, remove_finalizer(WorkerArgs)}.
+
+remove_finalizer(Args) ->
+ couch_mrview_util:set_extra(Args, finalizer, null).
+
% unit test
is_progress_possible_test() ->
EndPoint = 2 bsl 31,
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index fdc3bd988..4b412a683 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -21,16 +21,17 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
DbName = fabric:dbname(Db),
Shards = shards(Db, QueryArgs),
Workers0 = fabric_util:submit_jobs(
- Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+ Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
RexiMon = fabric_util:create_monitors(Workers0),
try
case fabric_streams:start(Workers0, #shard.ref) of
{ok, Workers} ->
try
- go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+ go(DbName, Options, Workers, CoordArgs, Callback, Acc)
after
fabric_streams:cleanup(Workers)
end;
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index 0f5e8bb23..b3d768a51 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -27,10 +27,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo)
go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
DbName = fabric:dbname(Db),
Shards = fabric_view:get_shards(Db, Args),
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
Repls = fabric_view:get_shard_replacements(DbName, Shards),
- RPCArgs = [DocIdAndRev, View, Args, Options],
+ RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
end,
@@ -42,7 +43,7 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
- go(DbName, Workers, VInfo, Args, Callback, Acc)
+ go(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
after
fabric_streams:cleanup(Workers)
end;
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index 84b9bba64..f52061a4c 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -25,9 +25,10 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
DbName = fabric:dbname(Db),
- DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
- RPCArgs = [DocIdAndRev, VName, Args],
Shards = fabric_view:get_shards(Db, Args),
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
+ DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
+ RPCArgs = [DocIdAndRev, VName, WorkerArgs],
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
Repls = fabric_view:get_shard_replacements(DbName, Shards),
StartFun = fun(Shard) ->
@@ -41,7 +42,7 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
- go2(DbName, Workers, VInfo, Args, Callback, Acc)
+ go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
after
fabric_streams:cleanup(Workers)
end;