summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMayya Sharipova <mayyas@ca.ibm.com>2017-04-24 17:13:18 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2017-06-02 11:34:35 -0400
commit994fe33bb02ca6a1d1b2d807a486c3b386426999 (patch)
treec1a5ce86d461be758591190b9b84bd0644b74e49
parente5e708a018c6f44408fc055df1aacb65572b8637 (diff)
downloadcouchdb-994fe33bb02ca6a1d1b2d807a486c3b386426999.tar.gz
Notify couch_index_processes on all shards when ddoc updated
Before when a design doc is updated/deleted, only one couch_index process was notified - the one which shard contained a design doc. couch_index processes from other shards still continued to exist, and indexing activities for these processes were still be going on. The patch notifies couch_index_processes on all shards COUCHDB-3400
-rw-r--r--src/couch_index/src/couch_index.erl23
-rw-r--r--src/couch_index/src/couch_index_server.erl16
-rw-r--r--src/couch_index/test/couch_index_ddoc_updated_tests.erl144
3 files changed, 183 insertions, 0 deletions
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl
index c86f5e122..9da928dac 100644
--- a/src/couch_index/src/couch_index.erl
+++ b/src/couch_index/src/couch_index.erl
@@ -257,6 +257,29 @@ handle_cast(delete, State) ->
#st{mod=Mod, idx_state=IdxState} = State,
ok = Mod:delete(IdxState),
{stop, normal, State};
+handle_cast({ddoc_updated, DDocResult}, State) ->
+ #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
+ Shutdown = case DDocResult of
+ {not_found, deleted} ->
+ true;
+ {ok, DDoc} ->
+ DbName = Mod:get(db_name, IdxState),
+ couch_util:with_db(DbName, fun(Db) ->
+ {ok, NewIdxState} = Mod:init(Db, DDoc),
+ Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
+ end)
+ end,
+ case Shutdown of
+ true ->
+ case Waiters of
+ [] ->
+ {stop, normal, State};
+ _ ->
+ {noreply, State#st{shutdown = true}}
+ end;
+ false ->
+ {noreply, State#st{shutdown = false}}
+ end;
handle_cast(ddoc_updated, State) ->
#st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
DbName = Mod:get(db_name, IdxState),
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 4e86f5e80..92b8c8eff 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -251,6 +251,22 @@ handle_db_event(DbName, created, St) ->
handle_db_event(DbName, deleted, St) ->
gen_server:cast(?MODULE, {reset_indexes, DbName}),
{ok, St};
+handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated,
+ DDocId}, St) ->
+ DDocResult = couch_util:with_db(DbName, fun(Db) ->
+ couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
+ end),
+ DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))],
+ lists:foreach(fun(DbShard) ->
+ lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
+ case ets:lookup(?BY_SIG, {DbShard, Sig}) of
+ [{_, IndexPid}] -> (catch
+ gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
+ [] -> []
+ end
+ end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}}))
+ end, DbShards),
+ {ok, St};
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(fun({_DbName, {_DDocId, Sig}}) ->
case ets:lookup(?BY_SIG, {DbName, Sig}) of
diff --git a/src/couch_index/test/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/couch_index_ddoc_updated_tests.erl
new file mode 100644
index 000000000..007f5692b
--- /dev/null
+++ b/src/couch_index/test/couch_index_ddoc_updated_tests.erl
@@ -0,0 +1,144 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_ddoc_updated_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+start() ->
+ fake_index(),
+ Ctx = test_util:start_couch([mem3, fabric]),
+ DbName = ?tempdb(),
+ ok = fabric:create_db(DbName, [?ADMIN_CTX]),
+ {Ctx, DbName}.
+
+
+stop({Ctx, DbName}) ->
+ (catch meck:unload(test_index)),
+ ok = fabric:delete_db(DbName, [?ADMIN_CTX]),
+ DbDir = config:get("couchdb", "database_dir", "."),
+ WaitFun = fun() ->
+ filelib:fold_files(DbDir, <<".*", DbName/binary, "\.[0-9]+.*">>,
+ true, fun(_F, _A) -> wait end, ok)
+ end,
+ ok = test_util:wait(WaitFun),
+ test_util:stop_couch(Ctx),
+ ok.
+
+
+ddoc_update_test_() ->
+ {
+ "Check ddoc update actions",
+ {
+ setup,
+ fun start/0, fun stop/1,
+ fun check_all_indexers_exit_on_ddoc_change/1
+ }
+ }.
+
+
+check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
+ ?_test(begin
+ [DbShard1 | RestDbShards] = lists:map(fun(Sh) ->
+ {ok, ShardDb} = couch_db:open(mem3:name(Sh), []),
+ ShardDb
+ end, mem3:local_shards(mem3:dbname(DbName))),
+
+ % create a DDoc on Db1
+ DDocID = <<"idx_name">>,
+ DDocJson = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"value">>, 1}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(DbShard1, DDocJson, []),
+ {ok, DbShard} = couch_db:reopen(DbShard1),
+ {ok, DDoc} = couch_db:open_doc(
+ DbShard, DDocID, [ejson_body, ?ADMIN_CTX]),
+ DbShards = [DbShard | RestDbShards],
+ N = length(DbShards),
+
+ % run couch_index process for each shard database
+ ok = meck:reset(test_index),
+ lists:foreach(fun(ShardDb) ->
+ couch_index_server:get_index(test_index, ShardDb, DDoc)
+ end, DbShards),
+
+ IndexesBefore = get_indexes_by_ddoc(DDocID, N),
+ ?assertEqual(N, length(IndexesBefore)),
+
+ AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
+ ?assertEqual(N, length(AliveBefore)),
+
+ % update ddoc
+ DDocJson2 = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"value">>, 2},
+ {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}
+ ]}),
+ {ok, _} = couch_db:update_doc(DbShard, DDocJson2, []),
+
+ % assert that all index processes exit after ddoc updated
+ ok = meck:reset(test_index),
+ couch_index_server:handle_db_event(
+ DbShard#db.name, {ddoc_updated, DDocID}, {st, ""}),
+
+ ok = meck:wait(N, test_index, init, ['_', '_'], 5000),
+ IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+ ?assertEqual(0, length(IndexesAfter)),
+
+ %% assert that previously running indexes are gone
+ AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
+ ?assertEqual(0, length(AliveAfter)),
+ ok
+ end).
+
+
+fake_index() ->
+ ok = meck:new([test_index], [non_strict]),
+ ok = meck:expect(test_index, init, fun(Db, DDoc) ->
+ {ok, {couch_db:name(Db), DDoc}}
+ end),
+ ok = meck:expect(test_index, open, fun(_Db, State) ->
+ {ok, State}
+ end),
+ ok = meck:expect(test_index, get, fun
+ (db_name, {DbName, _DDoc}) ->
+ DbName;
+ (idx_name, {_DbName, DDoc}) ->
+ DDoc#doc.id;
+ (signature, {_DbName, DDoc}) ->
+ couch_crypto:hash(md5, term_to_binary(DDoc));
+ (update_seq, Seq) ->
+ Seq
+ end).
+
+
+get_indexes_by_ddoc(DDocID, N) ->
+ Indexes = test_util:wait(fun() ->
+ Indxs = ets:match_object(
+ couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}),
+ case length(Indxs) == N of
+ true ->
+ Indxs;
+ false ->
+ wait
+ end
+ end),
+ lists:foldl(fun({DbName, {_DDocID, Sig}}, Acc) ->
+ case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+ [{_, Pid}] -> [Pid|Acc];
+ _ -> Acc
+ end
+ end, [], Indexes).
+