summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2018-06-06 22:08:40 -0400
committerAdam Kocoloski <kocolosk@apache.org>2018-06-08 16:36:30 -0400
commit0392c51dd67210bc70fb9f3ce8b2f191d3ad63ca (patch)
tree278fe94a628ab78f734a111b06abffe3dc21bcc4
parent8a46473d4b6f2a38c97906a22005af00ee41a19a (diff)
downloadcouchdb-0392c51dd67210bc70fb9f3ce8b2f191d3ad63ca.tar.gz
Finalize in couch_mrview, but make it optional
If we're in a cluster, finalization runs at the coordinator. Otherwise, couch_mrview can run it directly to simplify things for consumers.
-rw-r--r--src/couch_mrview/src/couch_mrview.erl28
-rw-r--r--src/fabric/src/fabric_rpc.erl5
2 files changed, 26 insertions, 7 deletions
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index b417aac52..82bbd7928 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -41,6 +41,7 @@
user_acc,
last_go=ok,
reduce_fun,
+ finalizer,
update_seq,
args
}).
@@ -579,7 +580,14 @@ map_fold(#doc{id = <<"_local/", _/binary>>} = Doc, _Offset, #mracc{} = Acc) ->
last_go=Go
}}.
-red_fold(Db, {_Nth, _Lang, View}=RedView, Args, Callback, UAcc) ->
+red_fold(Db, {NthRed, _Lang, View}=RedView, Args, Callback, UAcc) ->
+ Finalizer = case couch_util:get_value(finalizer, Args#mrargs.extra) of
+ undefined ->
+ {_, FunSrc} = lists:nth(NthRed, View#mrview.reduce_funs),
+ FunSrc;
+ CustomFun->
+ CustomFun
+ end,
Acc = #mracc{
db=Db,
total_rows=null,
@@ -589,6 +597,7 @@ red_fold(Db, {_Nth, _Lang, View}=RedView, Args, Callback, UAcc) ->
callback=Callback,
user_acc=UAcc,
update_seq=View#mrview.update_seq,
+ finalizer=Finalizer,
args=Args
},
Grouping = {key_group_level, Args#mrargs.group_level},
@@ -620,41 +629,50 @@ red_fold(_Key, _Red, #mracc{limit=0} = Acc) ->
{stop, Acc};
red_fold(_Key, Red, #mracc{group_level=0} = Acc) ->
#mracc{
+ finalizer=Finalizer,
limit=Limit,
callback=Callback,
user_acc=UAcc0
} = Acc,
- Row = [{key, null}, {value, Red}],
+ Row = [{key, null}, {value, maybe_finalize(Red, Finalizer)}],
{Go, UAcc1} = Callback({row, Row}, UAcc0),
{Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
red_fold(Key, Red, #mracc{group_level=exact} = Acc) ->
#mracc{
+ finalizer=Finalizer,
limit=Limit,
callback=Callback,
user_acc=UAcc0
} = Acc,
- Row = [{key, Key}, {value, Red}],
+ Row = [{key, Key}, {value, maybe_finalize(Red, Finalizer)}],
{Go, UAcc1} = Callback({row, Row}, UAcc0),
{Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
red_fold(K, Red, #mracc{group_level=I} = Acc) when I > 0, is_list(K) ->
#mracc{
+ finalizer=Finalizer,
limit=Limit,
callback=Callback,
user_acc=UAcc0
} = Acc,
- Row = [{key, lists:sublist(K, I)}, {value, Red}],
+ Row = [{key, lists:sublist(K, I)}, {value, maybe_finalize(Red, Finalizer)}],
{Go, UAcc1} = Callback({row, Row}, UAcc0),
{Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
red_fold(K, Red, #mracc{group_level=I} = Acc) when I > 0 ->
#mracc{
+ finalizer=Finalizer,
limit=Limit,
callback=Callback,
user_acc=UAcc0
} = Acc,
- Row = [{key, K}, {value, Red}],
+ Row = [{key, K}, {value, maybe_finalize(Red, Finalizer)}],
{Go, UAcc1} = Callback({row, Row}, UAcc0),
{Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}}.
+maybe_finalize(Red, null) ->
+ Red;
+maybe_finalize(Red, RedSrc) ->
+ {ok, Finalized} = couch_query_servers:finalize(RedSrc, Red),
+ Finalized.
finish_fold(#mracc{last_go=ok, update_seq=UpdateSeq}=Acc, ExtraMeta) ->
#mracc{callback=Callback, user_acc=UAcc, args=Args}=Acc,
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 4a69e7ea1..913aafe0e 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -142,8 +142,9 @@ reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
fix_skip_and_limit(Args) ->
- #mrargs{skip=Skip, limit=Limit}=Args,
- Args#mrargs{skip=0, limit=Skip+Limit}.
+ #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
+ % the coordinator needs to finalize each row, so make sure the shards don't
+ Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}.
create_db(DbName) ->
create_db(DbName, []).