diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-10-25 14:27:32 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-01-18 13:03:28 -0600 |
commit | dc369e93f8b9e104fc2ab70da5e4a804a272eb49 (patch) | |
tree | 80ea5dc3ffb169485aefc5903571392907988d6d | |
parent | eb1ffcf652611069600c92fecb3a7ac4f4f98ba8 (diff) | |
download | couchdb-dc369e93f8b9e104fc2ab70da5e4a804a272eb49.tar.gz |
Optimize offset/limit for partition queries
Now that a single shard handles the entire response we can optimize work
normally done in the coordinator by moving it to the RPC worker which
then removes the need to send an extra `skip` number of rows to the
coordinator.
Co-authored-by: Robert Newson <rnewson@apache.org>
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 12 | ||||
-rw-r--r-- | src/fabric/src/fabric_view.erl | 16 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_all_docs.erl | 5 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_map.erl | 5 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_reduce.erl | 7 |
5 files changed, 29 insertions, 16 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index b80cc792e..97374be1f 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -118,9 +118,8 @@ do_changes(Db, StartSeq, Enum, Acc0, Opts) -> all_docs(DbName, Options, Args0) -> case fabric_util:upgrade_mrargs(Args0) of - #mrargs{keys=undefined} = Args1 -> + #mrargs{keys=undefined} = Args -> set_io_priority(DbName, Options), - Args = fix_skip_and_limit(Args1), {ok, Db} = get_or_create_db(DbName, Options), CB = get_view_cb(Args), couch_mrview:query_all_docs(Db, Args, CB, Args) @@ -144,7 +143,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> map_view(DbName, DDoc, ViewName, Args0, DbOptions); map_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), CB = get_view_cb(Args), couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args). @@ -158,16 +157,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> reduce_view(DbName, DDoc, ViewName, Args0, DbOptions); reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), VAcc0 = #vacc{db=Db}, couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). -fix_skip_and_limit(Args) -> - #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args, - % the coordinator needs to finalize each row, so make sure the shards don't - Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}. - create_db(DbName) -> create_db(DbName, []). diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 0ba980a65..27b0c275f 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -16,6 +16,7 @@ transform_row/1, keydict/1, extract_view/4, get_shards/2, check_down_shards/2, handle_worker_exit/3, get_shard_replacements/2, maybe_update_others/5]). +-export([fix_skip_and_limit/1]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -375,6 +376,21 @@ get_shard_replacements(DbName, UsedShards0) -> end end, [], UsedShards). +-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}. +fix_skip_and_limit(#mrargs{} = Args) -> + {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of + undefined -> + #mrargs{skip=Skip, limit=Limit}=Args, + {Args, Args#mrargs{skip=0, limit=Skip+Limit}}; + _Partition -> + {Args#mrargs{skip=0}, Args} + end, + %% the coordinator needs to finalize each row, so make sure the shards don't + {CoordArgs, remove_finalizer(WorkerArgs)}. + +remove_finalizer(Args) -> + couch_mrview_util:set_extra(Args, finalizer, null). + % unit test is_progress_possible_test() -> EndPoint = 2 bsl 31, diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index fdc3bd988..4b412a683 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -21,16 +21,17 @@ -include_lib("couch_mrview/include/couch_mrview.hrl"). go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs), DbName = fabric:dbname(Db), Shards = shards(Db, QueryArgs), Workers0 = fabric_util:submit_jobs( - Shards, fabric_rpc, all_docs, [Options, QueryArgs]), + Shards, fabric_rpc, all_docs, [Options, WorkerArgs]), RexiMon = fabric_util:create_monitors(Workers0), try case fabric_streams:start(Workers0, #shard.ref) of {ok, Workers} -> try - go(DbName, Options, Workers, QueryArgs, Callback, Acc) + go(DbName, Options, Workers, CoordArgs, Callback, Acc) after fabric_streams:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 0f5e8bb23..b3d768a51 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -27,10 +27,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), Shards = fabric_view:get_shards(Db, Args), + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), - RPCArgs = [DocIdAndRev, View, Args, Options], + RPCArgs = [DocIdAndRev, View, WorkerArgs, Options], StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs)) end, @@ -42,7 +43,7 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go(DbName, Workers, VInfo, Args, Callback, Acc) + go(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_streams:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index 84b9bba64..f52061a4c 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -25,9 +25,10 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) - go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), - DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), - RPCArgs = [DocIdAndRev, VName, Args], Shards = fabric_view:get_shards(Db, Args), + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), + DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), + RPCArgs = [DocIdAndRev, VName, WorkerArgs], fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), StartFun = fun(Shard) -> @@ -41,7 +42,7 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go2(DbName, Workers, VInfo, Args, Callback, Acc) + go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_streams:cleanup(Workers) end; |