diff options
author | Mayya Sharipova <mayyas@ca.ibm.com> | 2017-04-24 17:13:18 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2017-06-02 11:34:35 -0400 |
commit | 994fe33bb02ca6a1d1b2d807a486c3b386426999 (patch) | |
tree | c1a5ce86d461be758591190b9b84bd0644b74e49 | |
parent | e5e708a018c6f44408fc055df1aacb65572b8637 (diff) | |
download | couchdb-994fe33bb02ca6a1d1b2d807a486c3b386426999.tar.gz |
Notify couch_index_processes on all shards when ddoc updated
Before when a design doc is updated/deleted, only one couch_index
process was notified - the one which shard contained a design doc.
couch_index processes from other shards still continued to exist,
and indexing activities for these processes were still be going on.
The patch notifies couch_index_processes on all shards
COUCHDB-3400
-rw-r--r-- | src/couch_index/src/couch_index.erl | 23 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_server.erl | 16 | ||||
-rw-r--r-- | src/couch_index/test/couch_index_ddoc_updated_tests.erl | 144 |
3 files changed, 183 insertions, 0 deletions
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl index c86f5e122..9da928dac 100644 --- a/src/couch_index/src/couch_index.erl +++ b/src/couch_index/src/couch_index.erl @@ -257,6 +257,29 @@ handle_cast(delete, State) -> #st{mod=Mod, idx_state=IdxState} = State, ok = Mod:delete(IdxState), {stop, normal, State}; +handle_cast({ddoc_updated, DDocResult}, State) -> + #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State, + Shutdown = case DDocResult of + {not_found, deleted} -> + true; + {ok, DDoc} -> + DbName = Mod:get(db_name, IdxState), + couch_util:with_db(DbName, fun(Db) -> + {ok, NewIdxState} = Mod:init(Db, DDoc), + Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState) + end) + end, + case Shutdown of + true -> + case Waiters of + [] -> + {stop, normal, State}; + _ -> + {noreply, State#st{shutdown = true}} + end; + false -> + {noreply, State#st{shutdown = false}} + end; handle_cast(ddoc_updated, State) -> #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State, DbName = Mod:get(db_name, IdxState), diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index 4e86f5e80..92b8c8eff 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -251,6 +251,22 @@ handle_db_event(DbName, created, St) -> handle_db_event(DbName, deleted, St) -> gen_server:cast(?MODULE, {reset_indexes, DbName}), {ok, St}; +handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, + DDocId}, St) -> + DDocResult = couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) + end), + DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))], + lists:foreach(fun(DbShard) -> + lists:foreach(fun({_DbShard, {_DDocId, Sig}}) -> + case ets:lookup(?BY_SIG, {DbShard, Sig}) of + [{_, IndexPid}] -> (catch + gen_server:cast(IndexPid, {ddoc_updated, DDocResult})); + [] -> [] + end + end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}})) + end, DbShards), + {ok, St}; handle_db_event(DbName, {ddoc_updated, DDocId}, St) -> lists:foreach(fun({_DbName, {_DDocId, Sig}}) -> case ets:lookup(?BY_SIG, {DbName, Sig}) of diff --git a/src/couch_index/test/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/couch_index_ddoc_updated_tests.erl new file mode 100644 index 000000000..007f5692b --- /dev/null +++ b/src/couch_index/test/couch_index_ddoc_updated_tests.erl @@ -0,0 +1,144 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_index_ddoc_updated_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +start() -> + fake_index(), + Ctx = test_util:start_couch([mem3, fabric]), + DbName = ?tempdb(), + ok = fabric:create_db(DbName, [?ADMIN_CTX]), + {Ctx, DbName}. + + +stop({Ctx, DbName}) -> + (catch meck:unload(test_index)), + ok = fabric:delete_db(DbName, [?ADMIN_CTX]), + DbDir = config:get("couchdb", "database_dir", "."), + WaitFun = fun() -> + filelib:fold_files(DbDir, <<".*", DbName/binary, "\.[0-9]+.*">>, + true, fun(_F, _A) -> wait end, ok) + end, + ok = test_util:wait(WaitFun), + test_util:stop_couch(Ctx), + ok. + + +ddoc_update_test_() -> + { + "Check ddoc update actions", + { + setup, + fun start/0, fun stop/1, + fun check_all_indexers_exit_on_ddoc_change/1 + } + }. + + +check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) -> + ?_test(begin + [DbShard1 | RestDbShards] = lists:map(fun(Sh) -> + {ok, ShardDb} = couch_db:open(mem3:name(Sh), []), + ShardDb + end, mem3:local_shards(mem3:dbname(DbName))), + + % create a DDoc on Db1 + DDocID = <<"idx_name">>, + DDocJson = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"value">>, 1} + ]}), + {ok, _Rev} = couch_db:update_doc(DbShard1, DDocJson, []), + {ok, DbShard} = couch_db:reopen(DbShard1), + {ok, DDoc} = couch_db:open_doc( + DbShard, DDocID, [ejson_body, ?ADMIN_CTX]), + DbShards = [DbShard | RestDbShards], + N = length(DbShards), + + % run couch_index process for each shard database + ok = meck:reset(test_index), + lists:foreach(fun(ShardDb) -> + couch_index_server:get_index(test_index, ShardDb, DDoc) + end, DbShards), + + IndexesBefore = get_indexes_by_ddoc(DDocID, N), + ?assertEqual(N, length(IndexesBefore)), + + AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore), + ?assertEqual(N, length(AliveBefore)), + + % update ddoc + DDocJson2 = couch_doc:from_json_obj({[ + {<<"_id">>, DDocID}, + {<<"value">>, 2}, + {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)} + ]}), + {ok, _} = couch_db:update_doc(DbShard, DDocJson2, []), + + % assert that all index processes exit after ddoc updated + ok = meck:reset(test_index), + couch_index_server:handle_db_event( + DbShard#db.name, {ddoc_updated, DDocID}, {st, ""}), + + ok = meck:wait(N, test_index, init, ['_', '_'], 5000), + IndexesAfter = get_indexes_by_ddoc(DDocID, 0), + ?assertEqual(0, length(IndexesAfter)), + + %% assert that previously running indexes are gone + AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore), + ?assertEqual(0, length(AliveAfter)), + ok + end). + + +fake_index() -> + ok = meck:new([test_index], [non_strict]), + ok = meck:expect(test_index, init, fun(Db, DDoc) -> + {ok, {couch_db:name(Db), DDoc}} + end), + ok = meck:expect(test_index, open, fun(_Db, State) -> + {ok, State} + end), + ok = meck:expect(test_index, get, fun + (db_name, {DbName, _DDoc}) -> + DbName; + (idx_name, {_DbName, DDoc}) -> + DDoc#doc.id; + (signature, {_DbName, DDoc}) -> + couch_crypto:hash(md5, term_to_binary(DDoc)); + (update_seq, Seq) -> + Seq + end). + + +get_indexes_by_ddoc(DDocID, N) -> + Indexes = test_util:wait(fun() -> + Indxs = ets:match_object( + couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}), + case length(Indxs) == N of + true -> + Indxs; + false -> + wait + end + end), + lists:foldl(fun({DbName, {_DDocID, Sig}}, Acc) -> + case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of + [{_, Pid}] -> [Pid|Acc]; + _ -> Acc + end + end, [], Indexes). + |