summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMayya Sharipova <mayyas@ca.ibm.com>2017-06-12 09:23:23 -0400
committerPaul J. Davis <paul.joseph.davis@gmail.com>2017-07-31 12:30:48 -0500
commit2e7ca45b364467f164d2ecbe8846878234d36a47 (patch)
tree6d295a6b60ca6a65c800f58f07361869fdc942dc
parentf527f2a5e6caa131e0ee00672aac20010729464e (diff)
downloadcouchdb-2e7ca45b364467f164d2ecbe8846878234d36a47.tar.gz
Stop couch_index processes on ddoc update
Currently when ddoc is updated, couch_index and couch_index_updater processes corresponding to the previous version of ddoc will still exist until all indexing processing initiated by them is done. When ddoc of a big database is rapidly modified, this puts a lot of unnecessary strain on database resources. With this change, when ddoc is updated: * all couch_index processes for the previous version of ddoc will be shutdown * all linked to them couch_index_updater processes will die as well * all processes waiting for indexing activity to be finished (waiters for couch_index:get_status) will receive an immediate reply: ddoc_updated. Interactive user requests (view queries) will get response: {404, <<"not_found">>, <<"Design document was updated or deleted.">>} Check if there are ddocs that use the same couch_index process before closing it on ddoc_updated 1. When opening an index, always add a record {DbName, {DDocId, Sig}} to ?BY_DB. 2. When ddoc_updated, check if there other ddocs in ?BY_DB with the same Sig. If there are no, stop couch_index processes. If there are other, only remove {DbName, {DDocId, Sig}} record from ?BY_DB for this ddoc.
-rw-r--r--src/chttpd/src/chttpd.erl2
-rw-r--r--src/couch_index/src/couch_index.erl29
-rw-r--r--src/couch_index/src/couch_index_server.erl56
-rw-r--r--src/couch_mrview/src/couch_mrview.erl18
-rw-r--r--src/couch_mrview/src/couch_mrview_index.erl14
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl26
-rw-r--r--src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl139
-rw-r--r--src/fabric/src/fabric_rpc.erl8
-rw-r--r--src/fabric/src/fabric_util.erl3
-rw-r--r--src/fabric/src/fabric_view_map.erl7
-rw-r--r--src/fabric/src/fabric_view_reduce.erl7
11 files changed, 261 insertions, 48 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 9423fa9f4..cfefb7800 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -846,6 +846,8 @@ error_info({not_found, Reason}) ->
{404, <<"not_found">>, Reason};
error_info({filter_fetch_error, Reason}) ->
{404, <<"not_found">>, Reason};
+error_info(ddoc_updated) ->
+ {404, <<"not_found">>, <<"Design document was updated or deleted.">>};
error_info({not_acceptable, Reason}) ->
{406, <<"not_acceptable">>, Reason};
error_info(conflict) ->
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl
index 604d5038c..b3a800fc1 100644
--- a/src/couch_index/src/couch_index.erl
+++ b/src/couch_index/src/couch_index.erl
@@ -112,9 +112,16 @@ init({Mod, IdxState}) ->
end.
-terminate(Reason, State) ->
+terminate(Reason0, State) ->
#st{mod=Mod, idx_state=IdxState}=State,
- Mod:close(IdxState),
+ case Reason0 of
+ {shutdown, ddoc_updated} ->
+ Mod:shutdown(IdxState),
+ Reason = ddoc_updated;
+ _ ->
+ Mod:close(IdxState),
+ Reason = Reason0
+ end,
send_all(State#st.waiters, Reason),
couch_util:shutdown_sync(State#st.updater),
couch_util:shutdown_sync(State#st.compactor),
@@ -271,7 +278,7 @@ handle_cast(delete, State) ->
ok = Mod:delete(IdxState),
{stop, normal, State};
handle_cast({ddoc_updated, DDocResult}, State) ->
- #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
+ #st{mod = Mod, idx_state = IdxState} = State,
Shutdown = case DDocResult of
{not_found, deleted} ->
true;
@@ -284,17 +291,12 @@ handle_cast({ddoc_updated, DDocResult}, State) ->
end,
case Shutdown of
true ->
- case Waiters of
- [] ->
- {stop, normal, State};
- _ ->
- {noreply, State#st{shutdown = true}}
- end;
+ {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}};
false ->
{noreply, State#st{shutdown = false}}
end;
handle_cast(ddoc_updated, State) ->
- #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
+ #st{mod = Mod, idx_state = IdxState} = State,
DbName = Mod:get(db_name, IdxState),
DDocId = Mod:get(idx_name, IdxState),
Shutdown = couch_util:with_db(DbName, fun(Db) ->
@@ -308,12 +310,7 @@ handle_cast(ddoc_updated, State) ->
end),
case Shutdown of
true ->
- case Waiters of
- [] ->
- {stop, normal, State};
- _ ->
- {noreply, State#st{shutdown = true}}
- end;
+ {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}};
false ->
{noreply, State#st{shutdown = false}}
end;
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 92b8c8eff..8225a90a3 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -106,6 +106,13 @@ get_index(Module, IdxState) ->
Sig = Module:get(signature, IdxState),
case ets:lookup(?BY_SIG, {DbName, Sig}) of
[{_, Pid}] when is_pid(Pid) ->
+ DDocId = Module:get(idx_name, IdxState),
+ case ets:match_object(?BY_DB, {DbName, {DDocId, Sig}}) of
+ [] ->
+ Args = [Pid, DbName, DDocId, Sig],
+ gen_server:cast(?MODULE, {add_to_ets, Args});
+ _ -> ok
+ end,
{ok, Pid};
_ ->
Args = {Module, IdxState, DbName, Sig},
@@ -161,14 +168,25 @@ handle_call({reset_indexes, DbName}, _From, State) ->
handle_cast({reset_indexes, DbName}, State) ->
reset_indexes(DbName, State#st.root_dir),
+ {noreply, State};
+handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
+ % check if Pid still exists
+ case ets:lookup(?BY_PID, Pid) of
+ [{Pid, {DbName, Sig}}] when is_pid(Pid) ->
+ ets:insert(?BY_DB, {DbName, {DDocId, Sig}});
+ _ -> ok
+ end,
+ {noreply, State};
+handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
+ ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}),
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, Server) ->
case ets:lookup(?BY_PID, Pid) of
[{Pid, {DbName, Sig}}] ->
- [{DbName, {DDocId, Sig}}] =
- ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
- rem_from_ets(DbName, Sig, DDocId, Pid);
+ DDocIds = [DDocId || {_, {DDocId, _}}
+ <- ets:match_object(?BY_DB, {DbName, {'$1', Sig}})],
+ rem_from_ets(DbName, Sig, DDocIds, Pid);
[] when Reason /= normal ->
exit(Reason);
_Else ->
@@ -221,14 +239,17 @@ new_index({Mod, IdxState, DbName, Sig}) ->
reset_indexes(DbName, Root) ->
% shutdown all the updaters and clear the files, the db got changed
- Fun = fun({_, {DDocId, Sig}}) ->
+ SigDDocIds = lists:foldl(fun({_, {DDocId, Sig}}, DDict) ->
+ dict:append(Sig, DDocId, DDict)
+ end, dict:new(), ets:lookup(?BY_DB, DbName)),
+ Fun = fun({Sig, DDocIds}) ->
[{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
MRef = erlang:monitor(process, Pid),
gen_server:cast(Pid, delete),
receive {'DOWN', MRef, _, _, _} -> ok end,
- rem_from_ets(DbName, Sig, DDocId, Pid)
+ rem_from_ets(DbName, Sig, DDocIds, Pid)
end,
- lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
+ lists:foreach(Fun, dict:to_list(SigDDocIds)),
Path = couch_index_util:index_dir("", DbName),
couch_file:nuke_dir(Root, Path).
@@ -239,10 +260,12 @@ add_to_ets(DbName, Sig, DDocId, Pid) ->
ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
-rem_from_ets(DbName, Sig, DDocId, Pid) ->
+rem_from_ets(DbName, Sig, DDocIds, Pid) ->
ets:delete(?BY_SIG, {DbName, Sig}),
ets:delete(?BY_PID, Pid),
- ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}).
+ lists:foreach(fun(DDocId) ->
+ ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}})
+ end, DDocIds).
handle_db_event(DbName, created, St) ->
@@ -259,10 +282,19 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated,
DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))],
lists:foreach(fun(DbShard) ->
lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
- case ets:lookup(?BY_SIG, {DbShard, Sig}) of
- [{_, IndexPid}] -> (catch
- gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
- [] -> []
+ % check if there are other ddocs with the same Sig for the same db
+ SigDDocs = ets:match_object(?BY_DB, {DbShard, {'$1', Sig}}),
+ if length(SigDDocs) > 1 ->
+ % remove records from ?BY_DB for this DDoc
+ Args = [DbShard, DDocId, Sig],
+ gen_server:cast(?MODULE, {rem_from_ets, Args});
+ true ->
+ % single DDoc with this Sig - close couch_index processes
+ case ets:lookup(?BY_SIG, {DbShard, Sig}) of
+ [{_, IndexPid}] -> (catch
+ gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
+ [] -> []
+ end
end
end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}}))
end, DbShards),
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index 45dd83d66..c44dd91f3 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -241,12 +241,16 @@ query_view(Db, DDoc, VName, Args) ->
query_view(Db, DDoc, VName, Args, Callback, Acc) when is_list(Args) ->
query_view(Db, DDoc, VName, to_mrargs(Args), Callback, Acc);
query_view(Db, DDoc, VName, Args0, Callback, Acc0) ->
- {ok, VInfo, Sig, Args} = couch_mrview_util:get_view(Db, DDoc, VName, Args0),
- {ok, Acc1} = case Args#mrargs.preflight_fun of
- PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0);
- _ -> {ok, Acc0}
- end,
- query_view(Db, VInfo, Args, Callback, Acc1).
+ case couch_mrview_util:get_view(Db, DDoc, VName, Args0) of
+ {ok, VInfo, Sig, Args} ->
+ {ok, Acc1} = case Args#mrargs.preflight_fun of
+ PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0);
+ _ -> {ok, Acc0}
+ end,
+ query_view(Db, VInfo, Args, Callback, Acc1);
+ ddoc_updated ->
+ Callback(ok, ddoc_updated)
+ end.
get_view_index_pid(Db, DDoc, ViewName, Args0) ->
@@ -689,6 +693,8 @@ default_cb({final, Info}, []) ->
{ok, [Info]};
default_cb({final, _}, Acc) ->
{ok, Acc};
+default_cb(ok, ddoc_updated) ->
+ {ok, ddoc_updated};
default_cb(Row, Acc) ->
{ok, [Row | Acc]}.
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index eaec5cc52..aa1ee2741 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -14,7 +14,7 @@
-export([get/2]).
--export([init/2, open/2, close/1, reset/1, delete/1]).
+-export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]).
-export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]).
-export([compact/3, swap_compacted/2, remove_compacted/1]).
-export([index_file_exists/1]).
@@ -143,6 +143,18 @@ close(State) ->
couch_file:close(State#mrst.fd).
+% This called after ddoc_updated event occurrs, and
+% before we shutdown couch_index process.
+% We unlink couch_index from corresponding couch_file and demonitor it.
+% This allows all outstanding queries that are currently streaming
+% data from couch_file finish successfully.
+% couch_file will be closed automatically after all
+% outstanding queries are done.
+shutdown(State) ->
+ erlang:demonitor(State#mrst.fd_monitor, [flush]),
+ unlink(State#mrst.fd).
+
+
delete(#mrst{db_name=DbName, sig=Sig}=State) ->
couch_file:close(State#mrst.fd),
catch couch_mrview_util:delete_files(DbName, Sig).
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 5c95f2e46..0d58e4f35 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -42,13 +42,17 @@
get_view(Db, DDoc, ViewName, Args0) ->
- {ok, State, Args2} = get_view_index_state(Db, DDoc, ViewName, Args0),
- Ref = erlang:monitor(process, State#mrst.fd),
- #mrst{language=Lang, views=Views} = State,
- {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
- check_range(Args3, view_cmp(View)),
- Sig = view_sig(Db, State, View, Args3),
- {ok, {Type, View, Ref}, Sig, Args3}.
+ case get_view_index_state(Db, DDoc, ViewName, Args0) of
+ {ok, State, Args2} ->
+ Ref = erlang:monitor(process, State#mrst.fd),
+ #mrst{language=Lang, views=Views} = State,
+ {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
+ check_range(Args3, view_cmp(View)),
+ Sig = view_sig(Db, State, View, Args3),
+ {ok, {Type, View, Ref}, Sig, Args3};
+ ddoc_updated ->
+ ddoc_updated
+ end.
get_view_index_pid(Db, DDoc, ViewName, Args0) ->
@@ -71,7 +75,7 @@ get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount) ->
UpdateSeq = couch_util:with_db(Db, fun(WDb) ->
couch_db:get_update_seq(WDb)
end),
- {ok, State} = case Args#mrargs.update of
+ State = case Args#mrargs.update of
lazy ->
spawn(fun() ->
catch couch_index:get_state(Pid, UpdateSeq)
@@ -82,7 +86,11 @@ get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount) ->
_ ->
couch_index:get_state(Pid, UpdateSeq)
end,
- {ok, State, Args}
+ case State of
+ {ok, State0} -> {ok, State0, Args};
+ ddoc_updated -> ddoc_updated;
+ Else -> throw(Else)
+ end
catch
exit:{Reason, _} when Reason == noproc; Reason == normal ->
timer:sleep(?GET_VIEW_RETRY_DELAY),
diff --git a/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl
new file mode 100644
index 000000000..d0ba6b4d2
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl
@@ -0,0 +1,139 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_ddoc_updated_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+ Name = ?tempdb(),
+ couch_server:delete(Name, [?ADMIN_CTX]),
+ {ok, Db} = couch_db:create(Name, [?ADMIN_CTX]),
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, <<"_design/bar">>},
+ {<<"views">>, {[
+ {<<"baz">>, {[
+ {<<"map">>, <<
+ "function(doc) {\n"
+ " var i = 0; while(i<1000){ i++ };\n"
+ " emit(doc.val, doc.val);\n"
+ "}"
+ >>}
+ ]}}
+ ]}}
+ ]}),
+ [Doc1 | Docs999] = couch_mrview_test_util:make_docs(map, 100),
+ {ok, _} = couch_db:update_docs(Db, [DDoc, Doc1], []),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ % run a query with 1 doc to initialize couch_index process
+ CB = fun
+ ({row, _}, Count) -> {ok, Count+1};
+ (_, Count) -> {ok, Count}
+ end,
+ {ok, _} =
+ couch_mrview:query_view(Db2, <<"_design/bar">>, <<"baz">>, [], CB, 0),
+
+ % add more docs
+ {ok, _} = couch_db:update_docs(Db2, Docs999, []),
+ {ok, Db3} = couch_db:reopen(Db2),
+ Db3.
+
+teardown(Db) ->
+ couch_db:close(Db),
+ couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ ok.
+
+
+ddoc_update_test_() ->
+ {
+ "Check ddoc update actions",
+ {
+ setup,
+ fun test_util:start_couch/0, fun test_util:stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun check_indexing_stops_on_ddoc_change/1
+ ]
+ }
+ }
+ }.
+
+
+check_indexing_stops_on_ddoc_change(Db) ->
+ ?_test(begin
+ DDocID = <<"_design/bar">>,
+
+ IndexesBefore = get_indexes_by_ddoc(DDocID, 1),
+ ?assertEqual(1, length(IndexesBefore)),
+ AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
+ ?assertEqual(1, length(AliveBefore)),
+
+ {ok, DDoc} = couch_db:open_doc(Db, DDocID, [ejson_body, ?ADMIN_CTX]),
+ DDocJson2 = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"_deleted">>, true},
+ {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}
+ ]}),
+
+ % spawn a process for query
+ Self = self(),
+ QPid = spawn(fun() ->
+ {ok, Result} = couch_mrview:query_view(
+ Db, <<"_design/bar">>, <<"baz">>, []),
+ Self ! {self(), Result}
+ end),
+
+ % while indexing for the query is in progress, delete DDoc
+ {ok, _} = couch_db:update_doc(Db, DDocJson2, []),
+ receive
+ {QPid, Msg} ->
+ ?assertEqual(Msg, ddoc_updated)
+ after ?TIMEOUT ->
+ erlang:error(
+ {assertion_failed, [{module, ?MODULE}, {line, ?LINE},
+ {reason, "test failed"}]})
+ end,
+
+ %% assert that previously running indexes are gone
+ IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+ ?assertEqual(0, length(IndexesAfter)),
+ AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
+ ?assertEqual(0, length(AliveAfter))
+ end).
+
+
+get_indexes_by_ddoc(DDocID, N) ->
+ Indexes = test_util:wait(fun() ->
+ Indxs = ets:match_object(
+ couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}),
+ case length(Indxs) == N of
+ true ->
+ Indxs;
+ false ->
+ wait
+ end
+ end),
+ lists:foldl(fun({DbName, {_DDocID, Sig}}, Acc) ->
+ case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+ [{_, Pid}] -> [Pid|Acc];
+ _ -> Acc
+ end
+ end, [], Indexes).
+
+
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 80b110a24..93d7d1536 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -314,7 +314,9 @@ view_cb({row, Row}, Acc) ->
view_cb(complete, Acc) ->
% Finish view output
ok = rexi:stream_last(complete),
- {ok, Acc}.
+ {ok, Acc};
+view_cb(ok, ddoc_updated) ->
+ rexi:reply({ok, ddoc_updated}).
reduce_cb({meta, Meta}, Acc) ->
@@ -331,7 +333,9 @@ reduce_cb({row, Row}, Acc) ->
reduce_cb(complete, Acc) ->
% Finish view output
ok = rexi:stream_last(complete),
- {ok, Acc}.
+ {ok, Acc};
+reduce_cb(ok, ddoc_updated) ->
+ rexi:reply({ok, ddoc_updated}).
changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) ->
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 7e3f23e68..765561381 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -132,6 +132,9 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
{stop, St#stream_acc{workers=Workers1}}
end
end;
+handle_stream_start({ok, ddoc_updated}, _, St) ->
+ cleanup(St#stream_acc.workers),
+ {stop, ddoc_updated};
handle_stream_start(Else, _, _) ->
exit({invalid_stream_start, Else}).
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index b6cedb750..b6a3d6f83 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -37,6 +37,8 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
RexiMon = fabric_util:create_monitors(Workers0),
try
case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+ {ok, ddoc_updated} ->
+ Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
go(DbName, Workers, VInfo, Args, Callback, Acc)
@@ -173,7 +175,10 @@ handle_message(#view_row{} = Row, {Worker, From}, State) ->
handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
- fabric_view:maybe_send_row(State#collector{counters = Counters}).
+ fabric_view:maybe_send_row(State#collector{counters = Counters});
+
+handle_message(ddoc_updated, _Worker, State) ->
+ {stop, State}.
merge_row(Dir, Collation, undefined, Row, Rows0) ->
Rows1 = lists:merge(
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index e6146b045..a74be1073 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -36,6 +36,8 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
RexiMon = fabric_util:create_monitors(Workers0),
try
case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+ {ok, ddoc_updated} ->
+ Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
go2(DbName, Workers, VInfo, Args, Callback, Acc)
@@ -150,7 +152,10 @@ handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
true = fabric_dict:is_key(Worker, Counters0),
C1 = fabric_dict:update_counter(Worker, 1, Counters0),
- fabric_view:maybe_send_row(State#collector{counters = C1}).
+ fabric_view:maybe_send_row(State#collector{counters = C1});
+
+handle_message(ddoc_updated, _Worker, State) ->
+ {stop, State}.
os_proc_needed(<<"_", _/binary>>) -> false;
os_proc_needed(_) -> true.