diff options
Diffstat (limited to 'src/fabric/src/fabric_view_reduce.erl')
-rw-r--r-- | src/fabric/src/fabric_view_reduce.erl | 116 |
1 files changed, 66 insertions, 50 deletions
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index a432b2cd5..600c8d01a 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -22,7 +22,6 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -> {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), go(DbName, DDoc, View, Args, Callback, Acc0, VInfo); - go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), {Shards, RingOpts} = fabric_view:get_shards(Db, Args), @@ -34,11 +33,18 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs)) end, - Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs), + Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, reduce_view, RPCArgs), RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls, - RingOpts) of + case + fabric_streams:start( + Workers0, + #shard.ref, + StartFun, + Repls, + RingOpts + ) + of {ok, ddoc_updated} -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> @@ -64,13 +70,14 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> rexi_monitor:stop(RexiMon) end. -go2(DbName, Workers, {red, {_, Lang, View}, _}=VInfo, Args, Callback, Acc0) -> +go2(DbName, Workers, {red, {_, Lang, View}, _} = VInfo, Args, Callback, Acc0) -> #mrargs{limit = Limit, skip = Skip, keys = Keys, update_seq = UpdateSeq} = Args, RedSrc = couch_mrview_util:extract_view_reduce(VInfo), - OsProc = case os_proc_needed(RedSrc) of - true -> couch_query_servers:get_os_process(Lang); - _ -> nil - end, + OsProc = + case os_proc_needed(RedSrc) of + true -> couch_query_servers:get_os_process(Lang); + _ -> nil + end, State = #collector{ db_name = DbName, query_args = Args, @@ -85,28 +92,39 @@ go2(DbName, Workers, {red, {_, Lang, View}, _}=VInfo, Args, Callback, Acc0) -> collation = couch_util:get_value(<<"collation">>, View#mrview.options), rows = dict:new(), user_acc = Acc0, - update_seq = case UpdateSeq of true -> []; false -> nil end + update_seq = + case UpdateSeq of + true -> []; + false -> nil + end }, - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, fabric_util:view_timeout(Args), 1000 * 60 * 60) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - {timeout, NewState} -> - Callback({error, timeout}, NewState#collector.user_acc); - {error, Resp} -> - {ok, Resp} + try + rexi_utils:recv( + Workers, + #shard.ref, + fun handle_message/3, + State, + fabric_util:view_timeout(Args), + 1000 * 60 * 60 + ) + of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + {timeout, NewState} -> + Callback({error, timeout}, NewState#collector.user_acc); + {error, Resp} -> + {ok, Resp} after - if OsProc == nil -> ok; true -> - catch couch_query_servers:ret_os_process(OsProc) + if + OsProc == nil -> ok; + true -> catch couch_query_servers:ret_os_process(OsProc) end end. handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); - handle_message({rexi_EXIT, Reason}, Worker, State) -> fabric_view:handle_worker_exit(State, Worker, Reason); - handle_message({meta, Meta0}, {Worker, From}, State) -> Seq = couch_util:get_value(update_seq, Meta0, 0), #collector{ @@ -120,46 +138,44 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> 0 = fabric_dict:lookup_element(Worker, Counters0), rexi:stream_ack(From), Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - UpdateSeq = case UpdateSeq0 of - nil -> nil; - _ -> [{Worker, Seq} | UpdateSeq0] - end, - case fabric_dict:any(0, Counters1) of - true -> - {ok, State#collector{ - counters = Counters1, - update_seq = UpdateSeq - }}; - false -> - Meta = case UpdateSeq of - nil -> - []; - _ -> - [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}] + UpdateSeq = + case UpdateSeq0 of + nil -> nil; + _ -> [{Worker, Seq} | UpdateSeq0] end, - {Go, Acc} = Callback({meta, Meta}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters1), - user_acc = Acc - }} + case fabric_dict:any(0, Counters1) of + true -> + {ok, State#collector{ + counters = Counters1, + update_seq = UpdateSeq + }}; + false -> + Meta = + case UpdateSeq of + nil -> + []; + _ -> + [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}] + end, + {Go, Acc} = Callback({meta, Meta}, AccIn), + {Go, State#collector{ + counters = fabric_dict:decrement_all(Counters1), + user_acc = Acc + }} end; - -handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> +handle_message(#view_row{key = Key} = Row, {Worker, From}, State) -> #collector{counters = Counters0, rows = Rows0} = State, true = fabric_dict:is_key(Worker, Counters0), - Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0), + Rows = dict:append(Key, Row#view_row{worker = {Worker, From}}, Rows0), C1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=C1}, + State1 = State#collector{rows = Rows, counters = C1}, fabric_view:maybe_send_row(State1); - handle_message(complete, Worker, #collector{counters = Counters0} = State) -> true = fabric_dict:is_key(Worker, Counters0), C1 = fabric_dict:update_counter(Worker, 1, Counters0), fabric_view:maybe_send_row(State#collector{counters = C1}); - handle_message(ddoc_updated, _Worker, State) -> {stop, State}. os_proc_needed(<<"_", _/binary>>) -> false; os_proc_needed(_) -> true. - |