diff options
author | Mayya Sharipova <mayyas@ca.ibm.com> | 2017-06-12 09:23:23 -0400 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-07-31 12:30:48 -0500 |
commit | 2e7ca45b364467f164d2ecbe8846878234d36a47 (patch) | |
tree | 6d295a6b60ca6a65c800f58f07361869fdc942dc | |
parent | f527f2a5e6caa131e0ee00672aac20010729464e (diff) | |
download | couchdb-2e7ca45b364467f164d2ecbe8846878234d36a47.tar.gz |
Stop couch_index processes on ddoc update
Currently when ddoc is updated, couch_index and couch_index_updater processes
corresponding to the previous version of ddoc will still exist until
all indexing processing initiated by them is done.
When ddoc of a big database is rapidly modified, this puts a lot
of unnecessary strain on database resources.
With this change, when ddoc is updated:
* all couch_index processes for the previous version of ddoc will be shutdown
* all linked to them couch_index_updater processes will die as well
* all processes waiting for indexing activity to be finished (waiters
for couch_index:get_status) will receive an immediate reply:
ddoc_updated. Interactive user requests (view queries) will get response:
{404, <<"not_found">>, <<"Design document was updated or deleted.">>}
Check if there are ddocs that use the same couch_index process
before closing it on ddoc_updated
1. When opening an index, always add a record {DbName, {DDocId, Sig}} to ?BY_DB.
2. When ddoc_updated, check if there other ddocs in ?BY_DB with the same Sig.
If there are no, stop couch_index processes.
If there are other, only remove {DbName, {DDocId, Sig}}
record from ?BY_DB for this ddoc.
-rw-r--r-- | src/chttpd/src/chttpd.erl | 2 | ||||
-rw-r--r-- | src/couch_index/src/couch_index.erl | 29 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_server.erl | 56 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview.erl | 18 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_index.erl | 14 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_util.erl | 26 | ||||
-rw-r--r-- | src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl | 139 | ||||
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 8 | ||||
-rw-r--r-- | src/fabric/src/fabric_util.erl | 3 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_map.erl | 7 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_reduce.erl | 7 |
11 files changed, 261 insertions, 48 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 9423fa9f4..cfefb7800 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -846,6 +846,8 @@ error_info({not_found, Reason}) -> {404, <<"not_found">>, Reason}; error_info({filter_fetch_error, Reason}) -> {404, <<"not_found">>, Reason}; +error_info(ddoc_updated) -> + {404, <<"not_found">>, <<"Design document was updated or deleted.">>}; error_info({not_acceptable, Reason}) -> {406, <<"not_acceptable">>, Reason}; error_info(conflict) -> diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl index 604d5038c..b3a800fc1 100644 --- a/src/couch_index/src/couch_index.erl +++ b/src/couch_index/src/couch_index.erl @@ -112,9 +112,16 @@ init({Mod, IdxState}) -> end. -terminate(Reason, State) -> +terminate(Reason0, State) -> #st{mod=Mod, idx_state=IdxState}=State, - Mod:close(IdxState), + case Reason0 of + {shutdown, ddoc_updated} -> + Mod:shutdown(IdxState), + Reason = ddoc_updated; + _ -> + Mod:close(IdxState), + Reason = Reason0 + end, send_all(State#st.waiters, Reason), couch_util:shutdown_sync(State#st.updater), couch_util:shutdown_sync(State#st.compactor), @@ -271,7 +278,7 @@ handle_cast(delete, State) -> ok = Mod:delete(IdxState), {stop, normal, State}; handle_cast({ddoc_updated, DDocResult}, State) -> - #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State, + #st{mod = Mod, idx_state = IdxState} = State, Shutdown = case DDocResult of {not_found, deleted} -> true; @@ -284,17 +291,12 @@ handle_cast({ddoc_updated, DDocResult}, State) -> end, case Shutdown of true -> - case Waiters of - [] -> - {stop, normal, State}; - _ -> - {noreply, State#st{shutdown = true}} - end; + {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}}; false -> {noreply, State#st{shutdown = false}} end; handle_cast(ddoc_updated, State) -> - #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State, + #st{mod = Mod, idx_state = IdxState} = State, DbName = Mod:get(db_name, IdxState), DDocId = Mod:get(idx_name, IdxState), Shutdown = couch_util:with_db(DbName, fun(Db) -> @@ -308,12 +310,7 @@ handle_cast(ddoc_updated, State) -> end), case Shutdown of true -> - case Waiters of - [] -> - {stop, normal, State}; - _ -> - {noreply, State#st{shutdown = true}} - end; + {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}}; false -> {noreply, State#st{shutdown = false}} end; diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index 92b8c8eff..8225a90a3 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -106,6 +106,13 @@ get_index(Module, IdxState) -> Sig = Module:get(signature, IdxState), case ets:lookup(?BY_SIG, {DbName, Sig}) of [{_, Pid}] when is_pid(Pid) -> + DDocId = Module:get(idx_name, IdxState), + case ets:match_object(?BY_DB, {DbName, {DDocId, Sig}}) of + [] -> + Args = [Pid, DbName, DDocId, Sig], + gen_server:cast(?MODULE, {add_to_ets, Args}); + _ -> ok + end, {ok, Pid}; _ -> Args = {Module, IdxState, DbName, Sig}, @@ -161,14 +168,25 @@ handle_call({reset_indexes, DbName}, _From, State) -> handle_cast({reset_indexes, DbName}, State) -> reset_indexes(DbName, State#st.root_dir), + {noreply, State}; +handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) -> + % check if Pid still exists + case ets:lookup(?BY_PID, Pid) of + [{Pid, {DbName, Sig}}] when is_pid(Pid) -> + ets:insert(?BY_DB, {DbName, {DDocId, Sig}}); + _ -> ok + end, + {noreply, State}; +handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) -> + ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}), {noreply, State}. handle_info({'EXIT', Pid, Reason}, Server) -> case ets:lookup(?BY_PID, Pid) of [{Pid, {DbName, Sig}}] -> - [{DbName, {DDocId, Sig}}] = - ets:match_object(?BY_DB, {DbName, {'$1', Sig}}), - rem_from_ets(DbName, Sig, DDocId, Pid); + DDocIds = [DDocId || {_, {DDocId, _}} + <- ets:match_object(?BY_DB, {DbName, {'$1', Sig}})], + rem_from_ets(DbName, Sig, DDocIds, Pid); [] when Reason /= normal -> exit(Reason); _Else -> @@ -221,14 +239,17 @@ new_index({Mod, IdxState, DbName, Sig}) -> reset_indexes(DbName, Root) -> % shutdown all the updaters and clear the files, the db got changed - Fun = fun({_, {DDocId, Sig}}) -> + SigDDocIds = lists:foldl(fun({_, {DDocId, Sig}}, DDict) -> + dict:append(Sig, DDocId, DDict) + end, dict:new(), ets:lookup(?BY_DB, DbName)), + Fun = fun({Sig, DDocIds}) -> [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}), MRef = erlang:monitor(process, Pid), gen_server:cast(Pid, delete), receive {'DOWN', MRef, _, _, _} -> ok end, - rem_from_ets(DbName, Sig, DDocId, Pid) + rem_from_ets(DbName, Sig, DDocIds, Pid) end, - lists:foreach(Fun, ets:lookup(?BY_DB, DbName)), + lists:foreach(Fun, dict:to_list(SigDDocIds)), Path = couch_index_util:index_dir("", DbName), couch_file:nuke_dir(Root, Path). @@ -239,10 +260,12 @@ add_to_ets(DbName, Sig, DDocId, Pid) -> ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). -rem_from_ets(DbName, Sig, DDocId, Pid) -> +rem_from_ets(DbName, Sig, DDocIds, Pid) -> ets:delete(?BY_SIG, {DbName, Sig}), ets:delete(?BY_PID, Pid), - ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}). + lists:foreach(fun(DDocId) -> + ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}) + end, DDocIds). handle_db_event(DbName, created, St) -> @@ -259,10 +282,19 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))], lists:foreach(fun(DbShard) -> lists:foreach(fun({_DbShard, {_DDocId, Sig}}) -> - case ets:lookup(?BY_SIG, {DbShard, Sig}) of - [{_, IndexPid}] -> (catch - gen_server:cast(IndexPid, {ddoc_updated, DDocResult})); - [] -> [] + % check if there are other ddocs with the same Sig for the same db + SigDDocs = ets:match_object(?BY_DB, {DbShard, {'$1', Sig}}), + if length(SigDDocs) > 1 -> + % remove records from ?BY_DB for this DDoc + Args = [DbShard, DDocId, Sig], + gen_server:cast(?MODULE, {rem_from_ets, Args}); + true -> + % single DDoc with this Sig - close couch_index processes + case ets:lookup(?BY_SIG, {DbShard, Sig}) of + [{_, IndexPid}] -> (catch + gen_server:cast(IndexPid, {ddoc_updated, DDocResult})); + [] -> [] + end end end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}})) end, DbShards), diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index 45dd83d66..c44dd91f3 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -241,12 +241,16 @@ query_view(Db, DDoc, VName, Args) -> query_view(Db, DDoc, VName, Args, Callback, Acc) when is_list(Args) -> query_view(Db, DDoc, VName, to_mrargs(Args), Callback, Acc); query_view(Db, DDoc, VName, Args0, Callback, Acc0) -> - {ok, VInfo, Sig, Args} = couch_mrview_util:get_view(Db, DDoc, VName, Args0), - {ok, Acc1} = case Args#mrargs.preflight_fun of - PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0); - _ -> {ok, Acc0} - end, - query_view(Db, VInfo, Args, Callback, Acc1). + case couch_mrview_util:get_view(Db, DDoc, VName, Args0) of + {ok, VInfo, Sig, Args} -> + {ok, Acc1} = case Args#mrargs.preflight_fun of + PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0); + _ -> {ok, Acc0} + end, + query_view(Db, VInfo, Args, Callback, Acc1); + ddoc_updated -> + Callback(ok, ddoc_updated) + end. get_view_index_pid(Db, DDoc, ViewName, Args0) -> @@ -689,6 +693,8 @@ default_cb({final, Info}, []) -> {ok, [Info]}; default_cb({final, _}, Acc) -> {ok, Acc}; +default_cb(ok, ddoc_updated) -> + {ok, ddoc_updated}; default_cb(Row, Acc) -> {ok, [Row | Acc]}. diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index eaec5cc52..aa1ee2741 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -14,7 +14,7 @@ -export([get/2]). --export([init/2, open/2, close/1, reset/1, delete/1]). +-export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]). -export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]). -export([compact/3, swap_compacted/2, remove_compacted/1]). -export([index_file_exists/1]). @@ -143,6 +143,18 @@ close(State) -> couch_file:close(State#mrst.fd). +% This called after ddoc_updated event occurrs, and +% before we shutdown couch_index process. +% We unlink couch_index from corresponding couch_file and demonitor it. +% This allows all outstanding queries that are currently streaming +% data from couch_file finish successfully. +% couch_file will be closed automatically after all +% outstanding queries are done. +shutdown(State) -> + erlang:demonitor(State#mrst.fd_monitor, [flush]), + unlink(State#mrst.fd). + + delete(#mrst{db_name=DbName, sig=Sig}=State) -> couch_file:close(State#mrst.fd), catch couch_mrview_util:delete_files(DbName, Sig). diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 5c95f2e46..0d58e4f35 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -42,13 +42,17 @@ get_view(Db, DDoc, ViewName, Args0) -> - {ok, State, Args2} = get_view_index_state(Db, DDoc, ViewName, Args0), - Ref = erlang:monitor(process, State#mrst.fd), - #mrst{language=Lang, views=Views} = State, - {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views), - check_range(Args3, view_cmp(View)), - Sig = view_sig(Db, State, View, Args3), - {ok, {Type, View, Ref}, Sig, Args3}. + case get_view_index_state(Db, DDoc, ViewName, Args0) of + {ok, State, Args2} -> + Ref = erlang:monitor(process, State#mrst.fd), + #mrst{language=Lang, views=Views} = State, + {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views), + check_range(Args3, view_cmp(View)), + Sig = view_sig(Db, State, View, Args3), + {ok, {Type, View, Ref}, Sig, Args3}; + ddoc_updated -> + ddoc_updated + end. get_view_index_pid(Db, DDoc, ViewName, Args0) -> @@ -71,7 +75,7 @@ get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount) -> UpdateSeq = couch_util:with_db(Db, fun(WDb) -> couch_db:get_update_seq(WDb) end), - {ok, State} = case Args#mrargs.update of + State = case Args#mrargs.update of lazy -> spawn(fun() -> catch couch_index:get_state(Pid, UpdateSeq) @@ -82,7 +86,11 @@ get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount) -> _ -> couch_index:get_state(Pid, UpdateSeq) end, - {ok, State, Args} + case State of + {ok, State0} -> {ok, State0, Args}; + ddoc_updated -> ddoc_updated; + Else -> throw(Else) + end catch exit:{Reason, _} when Reason == noproc; Reason == normal -> timer:sleep(?GET_VIEW_RETRY_DELAY), diff --git a/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl new file mode 100644 index 000000000..d0ba6b4d2 --- /dev/null +++ b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl @@ -0,0 +1,139 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_mrview_ddoc_updated_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(TIMEOUT, 1000). + + +setup() -> + Name = ?tempdb(), + couch_server:delete(Name, [?ADMIN_CTX]), + {ok, Db} = couch_db:create(Name, [?ADMIN_CTX]), + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, <<"_design/bar">>}, + {<<"views">>, {[ + {<<"baz">>, {[ + {<<"map">>, << + "function(doc) {\n" + " var i = 0; while(i<1000){ i++ };\n" + " emit(doc.val, doc.val);\n" + "}" + >>} + ]}} + ]}} + ]}), + [Doc1 | Docs999] = couch_mrview_test_util:make_docs(map, 100), + {ok, _} = couch_db:update_docs(Db, [DDoc, Doc1], []), + {ok, Db2} = couch_db:reopen(Db), + + % run a query with 1 doc to initialize couch_index process + CB = fun + ({row, _}, Count) -> {ok, Count+1}; + (_, Count) -> {ok, Count} + end, + {ok, _} = + couch_mrview:query_view(Db2, <<"_design/bar">>, <<"baz">>, [], CB, 0), + + % add more docs + {ok, _} = couch_db:update_docs(Db2, Docs999, []), + {ok, Db3} = couch_db:reopen(Db2), + Db3. + +teardown(Db) -> + couch_db:close(Db), + couch_server:delete(Db#db.name, [?ADMIN_CTX]), + ok. + + +ddoc_update_test_() -> + { + "Check ddoc update actions", + { + setup, + fun test_util:start_couch/0, fun test_util:stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun check_indexing_stops_on_ddoc_change/1 + ] + } + } + }. + + +check_indexing_stops_on_ddoc_change(Db) -> + ?_test(begin + DDocID = <<"_design/bar">>, + + IndexesBefore = get_indexes_by_ddoc(DDocID, 1), + ?assertEqual(1, length(IndexesBefore)), + AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore), + ?assertEqual(1, length(AliveBefore)), + + {ok, DDoc} = couch_db:open_doc(Db, DDocID, [ejson_body, ?ADMIN_CTX]), + DDocJson2 = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"_deleted">>, true}, + {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)} + ]}), + + % spawn a process for query + Self = self(), + QPid = spawn(fun() -> + {ok, Result} = couch_mrview:query_view( + Db, <<"_design/bar">>, <<"baz">>, []), + Self ! {self(), Result} + end), + + % while indexing for the query is in progress, delete DDoc + {ok, _} = couch_db:update_doc(Db, DDocJson2, []), + receive + {QPid, Msg} -> + ?assertEqual(Msg, ddoc_updated) + after ?TIMEOUT -> + erlang:error( + {assertion_failed, [{module, ?MODULE}, {line, ?LINE}, + {reason, "test failed"}]}) + end, + + %% assert that previously running indexes are gone + IndexesAfter = get_indexes_by_ddoc(DDocID, 0), + ?assertEqual(0, length(IndexesAfter)), + AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore), + ?assertEqual(0, length(AliveAfter)) + end). + + +get_indexes_by_ddoc(DDocID, N) -> + Indexes = test_util:wait(fun() -> + Indxs = ets:match_object( + couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}), + case length(Indxs) == N of + true -> + Indxs; + false -> + wait + end + end), + lists:foldl(fun({DbName, {_DDocID, Sig}}, Acc) -> + case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of + [{_, Pid}] -> [Pid|Acc]; + _ -> Acc + end + end, [], Indexes). + + diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 80b110a24..93d7d1536 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -314,7 +314,9 @@ view_cb({row, Row}, Acc) -> view_cb(complete, Acc) -> % Finish view output ok = rexi:stream_last(complete), - {ok, Acc}. + {ok, Acc}; +view_cb(ok, ddoc_updated) -> + rexi:reply({ok, ddoc_updated}). reduce_cb({meta, Meta}, Acc) -> @@ -331,7 +333,9 @@ reduce_cb({row, Row}, Acc) -> reduce_cb(complete, Acc) -> % Finish view output ok = rexi:stream_last(complete), - {ok, Acc}. + {ok, Acc}; +reduce_cb(ok, ddoc_updated) -> + rexi:reply({ok, ddoc_updated}). changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) -> diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 7e3f23e68..765561381 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -132,6 +132,9 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) -> {stop, St#stream_acc{workers=Workers1}} end end; +handle_stream_start({ok, ddoc_updated}, _, St) -> + cleanup(St#stream_acc.workers), + {stop, ddoc_updated}; handle_stream_start(Else, _, _) -> exit({invalid_stream_start, Else}). diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index b6cedb750..b6a3d6f83 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -37,6 +37,8 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) -> RexiMon = fabric_util:create_monitors(Workers0), try case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of + {ok, ddoc_updated} -> + Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try go(DbName, Workers, VInfo, Args, Callback, Acc) @@ -173,7 +175,10 @@ handle_message(#view_row{} = Row, {Worker, From}, State) -> handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). + fabric_view:maybe_send_row(State#collector{counters = Counters}); + +handle_message(ddoc_updated, _Worker, State) -> + {stop, State}. merge_row(Dir, Collation, undefined, Row, Rows0) -> Rows1 = lists:merge( diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index e6146b045..a74be1073 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -36,6 +36,8 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) -> RexiMon = fabric_util:create_monitors(Workers0), try case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of + {ok, ddoc_updated} -> + Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try go2(DbName, Workers, VInfo, Args, Callback, Acc) @@ -150,7 +152,10 @@ handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> handle_message(complete, Worker, #collector{counters = Counters0} = State) -> true = fabric_dict:is_key(Worker, Counters0), C1 = fabric_dict:update_counter(Worker, 1, Counters0), - fabric_view:maybe_send_row(State#collector{counters = C1}). + fabric_view:maybe_send_row(State#collector{counters = C1}); + +handle_message(ddoc_updated, _Worker, State) -> + {stop, State}. os_proc_needed(<<"_", _/binary>>) -> false; os_proc_needed(_) -> true. |