diff options
author | Robert Newson <rnewson@apache.org> | 2018-09-05 12:30:31 +0100 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2018-09-17 21:21:16 +0100 |
commit | f5687960e2967f5c981717d3a5f2efbe7b360a91 (patch) | |
tree | 63b34ba2f542bd288d8f4ee09f15a22f94003e05 | |
parent | 38adc669e4348c8560307881099c999613f081c6 (diff) | |
download | couchdb-f5687960e2967f5c981717d3a5f2efbe7b360a91.tar.gz |
Optimize skip for partitioned queries
'skip' is implemented efficiently at the worker level but we've
disabled it for clustered views because of the multiple shards (and
not being able to calculate the right skip value to pass to each
worker). With a partitioned query, this problem is gone, as the value
the query specifies will be the right value for all workers (as they
hit the same shard range).
This commit removes the old fix_skip_and_limit function from
fabric_rpc and moves the logic up to the coordinators.
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 12 | ||||
-rw-r--r-- | src/fabric/src/fabric_view.erl | 16 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_all_docs.erl | 5 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_map.erl | 5 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_reduce.erl | 5 |
5 files changed, 28 insertions, 15 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 60526f495..e538a9dbe 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -96,9 +96,8 @@ changes(DbName, Options, StartVector, DbOptions) -> all_docs(DbName, Options, Args0) -> case fabric_util:upgrade_mrargs(Args0) of - #mrargs{keys=undefined} = Args1 -> + #mrargs{keys=undefined} = Args -> set_io_priority(DbName, Options), - Args = fix_skip_and_limit(Args1), {ok, Db} = get_or_create_db(DbName, Options), VAcc0 = #vacc{db=Db}, couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0) @@ -122,7 +121,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> map_view(DbName, DDoc, ViewName, Args0, DbOptions); map_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), VAcc0 = #vacc{db=Db}, couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0). @@ -136,16 +135,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> reduce_view(DbName, DDoc, ViewName, Args0, DbOptions); reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), VAcc0 = #vacc{db=Db}, couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). -fix_skip_and_limit(Args) -> - #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args, - % the coordinator needs to finalize each row, so make sure the shards don't - Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}. - create_db(DbName) -> create_db(DbName, []). diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index c0e29740d..de374cdec 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -16,6 +16,7 @@ transform_row/1, keydict/1, extract_view/4, get_shards/2, check_down_shards/2, handle_worker_exit/3, get_shard_replacements/2, maybe_update_others/5]). +-export([fix_skip_and_limit/1]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -388,6 +389,21 @@ get_shard_replacements(DbName, UsedShards0) -> end end, [], UsedShards). +-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}. +fix_skip_and_limit(#mrargs{} = Args) -> + {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of + undefined -> + #mrargs{skip=Skip, limit=Limit}=Args, + {Args, Args#mrargs{skip=0, limit=Skip+Limit}}; + _Partition -> + {Args#mrargs{skip=0}, Args} + end, + %% the coordinator needs to finalize each row, so make sure the shards don't + {CoordArgs, remove_finalizer(WorkerArgs)}. + +remove_finalizer(Args) -> + couch_mrview_util:set_extra(Args, finalizer, null). + % unit test is_progress_possible_test() -> EndPoint = 2 bsl 31, diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index b12bcded3..e8868c505 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -21,15 +21,16 @@ -include_lib("couch_mrview/include/couch_mrview.hrl"). go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs), Shards = shards(DbName, QueryArgs), Workers0 = fabric_util:submit_jobs( - Shards, fabric_rpc, all_docs, [Options, QueryArgs]), + Shards, fabric_rpc, all_docs, [Options, WorkerArgs]), RexiMon = fabric_util:create_monitors(Workers0), try case fabric_util:stream_start(Workers0, #shard.ref) of {ok, Workers} -> try - go(DbName, Options, Workers, QueryArgs, Callback, Acc) + go(DbName, Options, Workers, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index b6a3d6f83..85e9bec62 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -25,11 +25,12 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo); go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) -> + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), Shards = fabric_view:get_shards(DbName, Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), - RPCArgs = [DocIdAndRev, View, Args, Options], + RPCArgs = [DocIdAndRev, View, WorkerArgs, Options], StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs)) end, @@ -41,7 +42,7 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go(DbName, Workers, VInfo, Args, Callback, Acc) + go(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index a74be1073..1ea4d1b95 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -24,8 +24,9 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) - go(DbName, DDoc, View, Args, Callback, Acc0, VInfo); go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) -> + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), - RPCArgs = [DocIdAndRev, VName, Args], + RPCArgs = [DocIdAndRev, VName, WorkerArgs], Shards = fabric_view:get_shards(DbName, Args), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), @@ -40,7 +41,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go2(DbName, Workers, VInfo, Args, Callback, Acc) + go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; |