diff options
Diffstat (limited to 'src/fabric/src/fabric_view_all_docs.erl')
-rw-r--r-- | src/fabric/src/fabric_view_all_docs.erl | 337 |
1 files changed, 183 insertions, 154 deletions
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index 1b1aa8a9c..4bd8e9522 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -13,19 +13,21 @@ -module(fabric_view_all_docs). -export([go/5]). --export([open_doc/4]). % exported for spawn +% exported for spawn +-export([open_doc/4]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> +go(Db, Options, #mrargs{keys = undefined} = QueryArgs, Callback, Acc) -> {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs), DbName = fabric:dbname(Db), {Shards, RingOpts} = shards(Db, QueryArgs), Workers0 = fabric_util:submit_jobs( - Shards, fabric_rpc, all_docs, [Options, WorkerArgs]), + Shards, fabric_rpc, all_docs, [Options, WorkerArgs] + ), RexiMon = fabric_util:create_monitors(Workers0), try case fabric_streams:start(Workers0, #shard.ref, RingOpts) of @@ -50,8 +52,6 @@ go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> after rexi_monitor:stop(RexiMon) end; - - go(DbName, Options, QueryArgs, Callback, Acc0) -> #mrargs{ direction = Dir, @@ -64,45 +64,51 @@ go(DbName, Options, QueryArgs, Callback, Acc0) -> extra = Extra, update_seq = UpdateSeq } = QueryArgs, - DocOptions1 = case Conflicts of - true -> [conflicts|DocOptions0]; - _ -> DocOptions0 - end, + DocOptions1 = + case Conflicts of + true -> [conflicts | DocOptions0]; + _ -> DocOptions0 + end, SpawnFun = fun(Key) -> spawn_monitor(?MODULE, open_doc, [DbName, Options ++ DocOptions1, Key, IncludeDocs]) end, MaxJobs = all_docs_concurrency(), - Keys1 = case Dir of - fwd -> Keys0; - _ -> lists:reverse(Keys0) - end, - Keys2 = case Skip < length(Keys1) of - true -> lists:nthtail(Skip, Keys1); - false -> [] - end, - Keys3 = case Limit < length(Keys2) of - true -> lists:sublist(Keys2, Limit); - false -> Keys2 - end, + Keys1 = + case Dir of + fwd -> Keys0; + _ -> lists:reverse(Keys0) + end, + Keys2 = + case Skip < length(Keys1) of + true -> lists:nthtail(Skip, Keys1); + false -> [] + end, + Keys3 = + case Limit < length(Keys2) of + true -> lists:sublist(Keys2, Limit); + false -> Keys2 + end, %% namespace can be _set_ to `undefined`, so we want simulate enum here - Namespace = case couch_util:get_value(namespace, Extra) of - <<"_all_docs">> -> <<"_all_docs">>; - <<"_design">> -> <<"_design">>; - <<"_local">> -> <<"_local">>; - _ -> <<"_all_docs">> - end, + Namespace = + case couch_util:get_value(namespace, Extra) of + <<"_all_docs">> -> <<"_all_docs">>; + <<"_design">> -> <<"_design">>; + <<"_local">> -> <<"_local">>; + _ -> <<"_all_docs">> + end, Timeout = fabric_util:all_docs_timeout(), {_, Ref} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName, Namespace)) end), receive {'DOWN', Ref, _, _, {ok, TotalRows}} -> - Meta = case UpdateSeq of - false -> - [{total, TotalRows}, {offset, null}]; - true -> - [{total, TotalRows}, {offset, null}, {update_seq, null}] - end, + Meta = + case UpdateSeq of + false -> + [{total, TotalRows}, {offset, null}]; + true -> + [{total, TotalRows}, {offset, null}, {update_seq, null}] + end, {ok, Acc1} = Callback({meta, Meta}, Acc0), Resp = doc_receive_loop( Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1 @@ -129,50 +135,60 @@ go(DbName, _Options, Workers, QueryArgs, Callback, Acc0) -> skip = Skip, limit = Limit, user_acc = Acc0, - update_seq = case UpdateSeq of true -> []; false -> nil end + update_seq = + case UpdateSeq of + true -> []; + false -> nil + end }, - case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, fabric_util:view_timeout(QueryArgs), 5000) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - {timeout, NewState} -> - Callback({error, timeout}, NewState#collector.user_acc); - {error, Resp} -> - {ok, Resp} + case + rexi_utils:recv( + Workers, + #shard.ref, + fun handle_message/3, + State, + fabric_util:view_timeout(QueryArgs), + 5000 + ) + of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + {timeout, NewState} -> + Callback({error, timeout}, NewState#collector.user_acc); + {error, Resp} -> + {ok, Resp} end. shards(Db, Args) -> DbPartitioned = fabric_util:is_partitioned(Db), Partition = couch_mrview_util:get_extra(Args, partition), - NewArgs = case {DbPartitioned, Partition} of - {true, undefined} -> - % If a user specifies the same partition on both - % the start and end keys we can optimize the - % query by limiting to the partition shard. - Start = couch_partition:extract(Args#mrargs.start_key), - End = couch_partition:extract(Args#mrargs.end_key), - case {Start, End} of - {{Partition, SK}, {Partition, EK}} -> - A1 = Args#mrargs{ - start_key = SK, - end_key = EK - }, - couch_mrview_util:set_extra(A1, partition, Partition); - _ -> - Args - end; - _ -> - Args - end, + NewArgs = + case {DbPartitioned, Partition} of + {true, undefined} -> + % If a user specifies the same partition on both + % the start and end keys we can optimize the + % query by limiting to the partition shard. + Start = couch_partition:extract(Args#mrargs.start_key), + End = couch_partition:extract(Args#mrargs.end_key), + case {Start, End} of + {{Partition, SK}, {Partition, EK}} -> + A1 = Args#mrargs{ + start_key = SK, + end_key = EK + }, + couch_mrview_util:set_extra(A1, partition, Partition); + _ -> + Args + end; + _ -> + Args + end, fabric_view:get_shards(Db, NewArgs). - handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); - handle_message({rexi_EXIT, Reason}, Worker, State) -> fabric_view:handle_worker_exit(State, Worker, Reason); - handle_message({meta, Meta0}, {Worker, From}, State) -> Tot = couch_util:get_value(total, Meta0, 0), Off = couch_util:get_value(offset, Meta0, 0), @@ -190,62 +206,70 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> 0 = fabric_dict:lookup_element(Worker, Counters0), rexi:stream_ack(From), Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - Total = if Tot == null -> null; true -> Total0 + Tot end, - Offset = if Off == null -> null; true -> Offset0 + Off end, - UpdateSeq = case {UpdateSeq0, Seq} of - {nil, _} -> nil; - {_, null} -> null; - _ -> [{Worker, Seq} | UpdateSeq0] - end, - case fabric_dict:any(0, Counters1) of - true -> - {ok, State#collector{ - counters = Counters1, - total_rows = Total, - update_seq = UpdateSeq, - offset = Offset - }}; - false -> - FinalOffset = case Offset of - null -> null; - _ -> erlang:min(Total, Offset+State#collector.skip) + Total = + if + Tot == null -> null; + true -> Total0 + Tot end, - Meta = [{total, Total}, {offset, FinalOffset}] ++ - case UpdateSeq of - nil -> - []; - null -> - [{update_seq, null}]; - _ -> - [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}] - end, - {Go, Acc} = Callback({meta, Meta}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters1), - total_rows = Total, - offset = FinalOffset, - user_acc = Acc, - update_seq = UpdateSeq0 - }} + Offset = + if + Off == null -> null; + true -> Offset0 + Off + end, + UpdateSeq = + case {UpdateSeq0, Seq} of + {nil, _} -> nil; + {_, null} -> null; + _ -> [{Worker, Seq} | UpdateSeq0] + end, + case fabric_dict:any(0, Counters1) of + true -> + {ok, State#collector{ + counters = Counters1, + total_rows = Total, + update_seq = UpdateSeq, + offset = Offset + }}; + false -> + FinalOffset = + case Offset of + null -> null; + _ -> erlang:min(Total, Offset + State#collector.skip) + end, + Meta = + [{total, Total}, {offset, FinalOffset}] ++ + case UpdateSeq of + nil -> + []; + null -> + [{update_seq, null}]; + _ -> + [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}] + end, + {Go, Acc} = Callback({meta, Meta}, AccIn), + {Go, State#collector{ + counters = fabric_dict:decrement_all(Counters1), + total_rows = Total, + offset = FinalOffset, + user_acc = Acc, + update_seq = UpdateSeq0 + }} end; - handle_message(#view_row{} = Row, {Worker, From}, State) -> #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, Dir = Args#mrargs.direction, - Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0), + Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0), Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=Counters1}, + State1 = State#collector{rows = Rows, counters = Counters1}, fabric_view:maybe_send_row(State1); - handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}); - -handle_message({execution_stats, _} = Msg, {_,From}, St) -> - #collector{callback=Callback, user_acc=AccIn} = St, +handle_message({execution_stats, _} = Msg, {_, From}, St) -> + #collector{callback = Callback, user_acc = AccIn} = St, {Go, Acc} = Callback(Msg, AccIn), rexi:stream_ack(From), - {Go, St#collector{user_acc=Acc}}. + {Go, St#collector{user_acc = Acc}}. merge_row(fwd, Row, Rows) -> lists:keymerge(#view_row.id, [Row], Rows); @@ -256,42 +280,43 @@ all_docs_concurrency() -> Value = config:get("fabric", "all_docs_concurrency", "10"), try list_to_integer(Value) - catch _:_ -> - 10 + catch + _:_ -> + 10 end. doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) -> case {Keys, queue:len(Pids)} of - {[], 0} -> - {ok, AccIn}; - {[K | RKeys], Len} when Len < MaxJobs -> - Pids1 = queue:in(SpawnFun(K), Pids), - doc_receive_loop(RKeys, Pids1, SpawnFun, MaxJobs, Callback, AccIn); - _ -> - {{value, {Pid, Ref}}, RestPids} = queue:out(Pids), - Timeout = fabric_util:all_docs_timeout(), - receive {'DOWN', Ref, process, Pid, Row} -> - case Row of - #view_row{} -> - case Callback(fabric_view:transform_row(Row), AccIn) of - {ok, Acc} -> - doc_receive_loop( - Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc - ); - {stop, Acc} -> - cancel_read_pids(RestPids), - {ok, Acc} - end; - Error -> - cancel_read_pids(RestPids), - Callback({error, Error}, AccIn) + {[], 0} -> + {ok, AccIn}; + {[K | RKeys], Len} when Len < MaxJobs -> + Pids1 = queue:in(SpawnFun(K), Pids), + doc_receive_loop(RKeys, Pids1, SpawnFun, MaxJobs, Callback, AccIn); + _ -> + {{value, {Pid, Ref}}, RestPids} = queue:out(Pids), + Timeout = fabric_util:all_docs_timeout(), + receive + {'DOWN', Ref, process, Pid, Row} -> + case Row of + #view_row{} -> + case Callback(fabric_view:transform_row(Row), AccIn) of + {ok, Acc} -> + doc_receive_loop( + Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc + ); + {stop, Acc} -> + cancel_read_pids(RestPids), + {ok, Acc} + end; + Error -> + cancel_read_pids(RestPids), + Callback({error, Error}, AccIn) + end + after Timeout -> + timeout end - after Timeout -> - timeout - end end. - open_doc(DbName, Options, Id, IncludeDocs) -> try open_doc_int(DbName, Options, Id, IncludeDocs) of #view_row{} = Row -> @@ -303,22 +328,26 @@ open_doc(DbName, Options, Id, IncludeDocs) -> end. open_doc_int(DbName, Options, Id, IncludeDocs) -> - Row = case fabric:open_doc(DbName, Id, [deleted | Options]) of - {not_found, missing} -> - Doc = undefined, - #view_row{key=Id}; - {ok, #doc{deleted=true, revs=Revs}} -> - Doc = null, - {RevPos, [RevId|_]} = Revs, - Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]}, - #view_row{key=Id, id=Id, value=Value}; - {ok, #doc{revs=Revs} = Doc0} -> - Doc = couch_doc:to_json_obj(Doc0, Options), - {RevPos, [RevId|_]} = Revs, - Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]}, - #view_row{key=Id, id=Id, value=Value} - end, - if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end. + Row = + case fabric:open_doc(DbName, Id, [deleted | Options]) of + {not_found, missing} -> + Doc = undefined, + #view_row{key = Id}; + {ok, #doc{deleted = true, revs = Revs}} -> + Doc = null, + {RevPos, [RevId | _]} = Revs, + Value = {[{rev, couch_doc:rev_to_str({RevPos, RevId})}, {deleted, true}]}, + #view_row{key = Id, id = Id, value = Value}; + {ok, #doc{revs = Revs} = Doc0} -> + Doc = couch_doc:to_json_obj(Doc0, Options), + {RevPos, [RevId | _]} = Revs, + Value = {[{rev, couch_doc:rev_to_str({RevPos, RevId})}]}, + #view_row{key = Id, id = Id, value = Value} + end, + if + IncludeDocs -> Row#view_row{doc = Doc}; + true -> Row + end. cancel_read_pids(Pids) -> case queue:out(Pids) of |