diff options
Diffstat (limited to 'src/fabric/src/fabric_view_all_docs.erl')
-rw-r--r-- | src/fabric/src/fabric_view_all_docs.erl | 332 |
1 files changed, 0 insertions, 332 deletions
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl deleted file mode 100644 index e4d3d4a40..000000000 --- a/src/fabric/src/fabric_view_all_docs.erl +++ /dev/null @@ -1,332 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_all_docs). - --export([go/5]). --export([open_doc/4]). % exported for spawn - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). - -go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> - {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs), - DbName = fabric:dbname(Db), - {Shards, RingOpts} = shards(Db, QueryArgs), - Workers0 = fabric_util:submit_jobs( - Shards, fabric_rpc, all_docs, [Options, WorkerArgs]), - RexiMon = fabric_util:create_monitors(Workers0), - try - case fabric_streams:start(Workers0, #shard.ref, RingOpts) of - {ok, Workers} -> - try - go(DbName, Options, Workers, CoordArgs, Callback, Acc) - after - fabric_streams:cleanup(Workers) - end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "all_docs" - ), - Callback({error, timeout}, Acc); - {error, Error} -> - Callback({error, Error}, Acc) - end - after - rexi_monitor:stop(RexiMon) - end; - - -go(DbName, Options, QueryArgs, Callback, Acc0) -> - #mrargs{ - direction = Dir, - include_docs = IncludeDocs, - doc_options = DocOptions0, - limit = Limit, - conflicts = Conflicts, - skip = Skip, - keys = Keys0, - extra = Extra, - update_seq = UpdateSeq - } = QueryArgs, - DocOptions1 = case Conflicts of - true -> [conflicts|DocOptions0]; - _ -> DocOptions0 - end, - SpawnFun = fun(Key) -> - spawn_monitor(?MODULE, open_doc, [DbName, Options ++ DocOptions1, Key, IncludeDocs]) - end, - MaxJobs = all_docs_concurrency(), - Keys1 = case Dir of - fwd -> Keys0; - _ -> lists:reverse(Keys0) - end, - Keys2 = case Skip < length(Keys1) of - true -> lists:nthtail(Skip, Keys1); - false -> [] - end, - Keys3 = case Limit < length(Keys2) of - true -> lists:sublist(Keys2, Limit); - false -> Keys2 - end, - %% namespace can be _set_ to `undefined`, so we want simulate enum here - Namespace = case couch_util:get_value(namespace, Extra) of - <<"_all_docs">> -> <<"_all_docs">>; - <<"_design">> -> <<"_design">>; - <<"_local">> -> <<"_local">>; - _ -> <<"_all_docs">> - end, - Timeout = fabric_util:all_docs_timeout(), - {_, Ref} = spawn_monitor(fun() -> - exit(fabric:get_doc_count(DbName, Namespace)) - end), - receive - {'DOWN', Ref, _, _, {ok, TotalRows}} -> - Meta = case UpdateSeq of - false -> - [{total, TotalRows}, {offset, null}]; - true -> - [{total, TotalRows}, {offset, null}, {update_seq, null}] - end, - {ok, Acc1} = Callback({meta, Meta}, Acc0), - Resp = doc_receive_loop( - Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1 - ), - case Resp of - {ok, Acc2} -> - Callback(complete, Acc2); - timeout -> - Callback({error, timeout}, Acc0) - end; - {'DOWN', Ref, _, _, Error} -> - Callback({error, Error}, Acc0) - after Timeout -> - Callback({error, timeout}, Acc0) - end. - -go(DbName, _Options, Workers, QueryArgs, Callback, Acc0) -> - #mrargs{limit = Limit, skip = Skip, update_seq = UpdateSeq} = QueryArgs, - State = #collector{ - db_name = DbName, - query_args = QueryArgs, - callback = Callback, - counters = fabric_dict:init(Workers, 0), - skip = Skip, - limit = Limit, - user_acc = Acc0, - update_seq = case UpdateSeq of true -> []; false -> nil end - }, - case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, fabric_util:view_timeout(QueryArgs), 5000) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - {timeout, NewState} -> - Callback({error, timeout}, NewState#collector.user_acc); - {error, Resp} -> - {ok, Resp} - end. - -shards(Db, Args) -> - DbPartitioned = fabric_util:is_partitioned(Db), - Partition = couch_mrview_util:get_extra(Args, partition), - NewArgs = case {DbPartitioned, Partition} of - {true, undefined} -> - % If a user specifies the same partition on both - % the start and end keys we can optimize the - % query by limiting to the partition shard. - Start = couch_partition:extract(Args#mrargs.start_key), - End = couch_partition:extract(Args#mrargs.end_key), - case {Start, End} of - {{Partition, SK}, {Partition, EK}} -> - A1 = Args#mrargs{ - start_key = SK, - end_key = EK - }, - couch_mrview_util:set_extra(A1, partition, Partition); - _ -> - Args - end; - _ -> - Args - end, - fabric_view:get_shards(Db, NewArgs). - - -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> - fabric_view:check_down_shards(State, NodeRef); - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - fabric_view:handle_worker_exit(State, Worker, Reason); - -handle_message({meta, Meta0}, {Worker, From}, State) -> - Tot = couch_util:get_value(total, Meta0, 0), - Off = couch_util:get_value(offset, Meta0, 0), - Seq = couch_util:get_value(update_seq, Meta0, 0), - #collector{ - callback = Callback, - counters = Counters0, - total_rows = Total0, - offset = Offset0, - user_acc = AccIn, - update_seq = UpdateSeq0 - } = State, - % Assert that we don't have other messages from this - % worker when the total_and_offset message arrives. - 0 = fabric_dict:lookup_element(Worker, Counters0), - rexi:stream_ack(From), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - Total = if Tot == null -> null; true -> Total0 + Tot end, - Offset = if Off == null -> null; true -> Offset0 + Off end, - UpdateSeq = case {UpdateSeq0, Seq} of - {nil, _} -> nil; - {_, null} -> null; - _ -> [{Worker, Seq} | UpdateSeq0] - end, - case fabric_dict:any(0, Counters1) of - true -> - {ok, State#collector{ - counters = Counters1, - total_rows = Total, - update_seq = UpdateSeq, - offset = Offset - }}; - false -> - FinalOffset = case Offset of - null -> null; - _ -> erlang:min(Total, Offset+State#collector.skip) - end, - Meta = [{total, Total}, {offset, FinalOffset}] ++ - case UpdateSeq of - nil -> - []; - null -> - [{update_seq, null}]; - _ -> - [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}] - end, - {Go, Acc} = Callback({meta, Meta}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters1), - total_rows = Total, - offset = FinalOffset, - user_acc = Acc, - update_seq = UpdateSeq0 - }} - end; - -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - Dir = Args#mrargs.direction, - Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=Counters1}, - fabric_view:maybe_send_row(State1); - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}); - -handle_message({execution_stats, _} = Msg, {_,From}, St) -> - #collector{callback=Callback, user_acc=AccIn} = St, - {Go, Acc} = Callback(Msg, AccIn), - rexi:stream_ack(From), - {Go, St#collector{user_acc=Acc}}. - -merge_row(fwd, Row, Rows) -> - lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). - -all_docs_concurrency() -> - Value = config:get("fabric", "all_docs_concurrency", "10"), - try - list_to_integer(Value) - catch _:_ -> - 10 - end. - -doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) -> - case {Keys, queue:len(Pids)} of - {[], 0} -> - {ok, AccIn}; - {[K | RKeys], Len} when Len < MaxJobs -> - Pids1 = queue:in(SpawnFun(K), Pids), - doc_receive_loop(RKeys, Pids1, SpawnFun, MaxJobs, Callback, AccIn); - _ -> - {{value, {Pid, Ref}}, RestPids} = queue:out(Pids), - Timeout = fabric_util:all_docs_timeout(), - receive {'DOWN', Ref, process, Pid, Row} -> - case Row of - #view_row{} -> - case Callback(fabric_view:transform_row(Row), AccIn) of - {ok, Acc} -> - doc_receive_loop( - Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc - ); - {stop, Acc} -> - cancel_read_pids(RestPids), - {ok, Acc} - end; - Error -> - cancel_read_pids(RestPids), - Callback({error, Error}, AccIn) - end - after Timeout -> - timeout - end - end. - - -open_doc(DbName, Options, Id, IncludeDocs) -> - try open_doc_int(DbName, Options, Id, IncludeDocs) of - #view_row{} = Row -> - exit(Row) - catch Type:Reason -> - Stack = erlang:get_stacktrace(), - couch_log:error("_all_docs open error: ~s ~s :: ~w ~w", [ - DbName, Id, {Type, Reason}, Stack]), - exit({Id, Reason}) - end. - -open_doc_int(DbName, Options, Id, IncludeDocs) -> - Row = case fabric:open_doc(DbName, Id, [deleted | Options]) of - {not_found, missing} -> - Doc = undefined, - #view_row{key=Id}; - {ok, #doc{deleted=true, revs=Revs}} -> - Doc = null, - {RevPos, [RevId|_]} = Revs, - Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]}, - #view_row{key=Id, id=Id, value=Value}; - {ok, #doc{revs=Revs} = Doc0} -> - Doc = couch_doc:to_json_obj(Doc0, Options), - {RevPos, [RevId|_]} = Revs, - Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]}, - #view_row{key=Id, id=Id, value=Value} - end, - if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end. - -cancel_read_pids(Pids) -> - case queue:out(Pids) of - {{value, {Pid, Ref}}, RestPids} -> - exit(Pid, kill), - erlang:demonitor(Ref, [flush]), - cancel_read_pids(RestPids); - {empty, _} -> - ok - end. |