diff options
author | Robert Newson <rnewson@apache.org> | 2023-03-22 22:00:48 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-22 22:00:48 +0000 |
commit | 0073e764bc7096dbf867a79ac88312acf4c40d73 (patch) | |
tree | 93f1a1de176b11fa12b9bfb1254cbe35401f21e5 | |
parent | 189db657f54f5c1d3ed54fb4a11ac73718fb4691 (diff) | |
parent | 224d6764e8d52f451d30ef15d5be5e508c82ca17 (diff) | |
download | couchdb-0073e764bc7096dbf867a79ac88312acf4c40d73.tar.gz |
Merge pull request #4491 from apache/couch_index_fixes
Couch index fixes
-rw-r--r-- | src/couch_event/src/couch_event_listener_mfa.erl | 25 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_server.erl | 154 | ||||
-rw-r--r-- | src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl | 3 |
3 files changed, 119 insertions, 63 deletions
diff --git a/src/couch_event/src/couch_event_listener_mfa.erl b/src/couch_event/src/couch_event_listener_mfa.erl index b4cd9148a..5ec465cf7 100644 --- a/src/couch_event/src/couch_event_listener_mfa.erl +++ b/src/couch_event/src/couch_event_listener_mfa.erl @@ -76,13 +76,24 @@ terminate(_Reason, _MFA) -> ok. handle_event(DbName, Event, #st{mod = Mod, func = Func, state = State} = St) -> - case (catch Mod:Func(DbName, Event, State)) of - {ok, NewState} -> - {ok, St#st{state = NewState}}; - stop -> - {stop, normal, St}; - Else -> - erlang:error(Else) + try + case Mod:Func(DbName, Event, State) of + {ok, NewState} -> + {ok, St#st{state = NewState}}; + stop -> + {stop, normal, St}; + Else -> + couch_log:error("~p: else in handle_event for db ~p, event ~p, else ~p", [ + ?MODULE, DbName, Event, Else + ]), + erlang:error(Else) + end + catch + Class:Reason:Stack -> + couch_log:error("~p: ~p in handle_event for db ~p, event ~p, reason ~p, stack ~p", [ + ?MODULE, Class, DbName, Event, Reason, Stack + ]), + erlang:raise(Class, Reason, Stack) end. handle_cast(shutdown, St) -> diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index c3440024e..35df43d2a 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -22,7 +22,7 @@ -export([handle_call/3, handle_cast/2, handle_info/2]). % Sharding functions --export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1]). +-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1, openers/1]). -export([aggregate_queue_len/0, names/0]). % Exported for callbacks @@ -41,7 +41,8 @@ server_name, by_sig, by_pid, - by_db + by_db, + openers }). start_link(N) -> @@ -129,6 +130,7 @@ init([N]) -> ets:new(by_sig(N), [protected, set, named_table]), ets:new(by_pid(N), [private, set, named_table]), ets:new(by_db(N), [protected, bag, named_table]), + ets:new(openers(N), [protected, set, named_table]), RootDir = couch_index_util:root_dir(), % We only need one of the index servers to nuke this on startup. case N of @@ -140,7 +142,8 @@ init([N]) -> server_name = server_name(N), by_sig = by_sig(N), by_pid = by_pid(N), - by_db = by_db(N) + by_db = by_db(N), + openers = openers(N) }, ok = config:listen_for_changes(?MODULE, St), couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]), @@ -154,7 +157,8 @@ terminate(_Reason, State) -> handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) -> case ets:lookup(State#st.by_sig, {DbName, Sig}) of [] -> - spawn_link(fun() -> new_index(Args) end), + Pid = spawn_link(fun() -> new_index(Args) end), + ets:insert(State#st.openers, {Pid, {DbName, Sig}}), ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}), {noreply, State}; [{_, Waiters}] when is_list(Waiters) -> @@ -163,15 +167,17 @@ handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) -> [{_, Pid}] when is_pid(Pid) -> {reply, {ok, Pid}, State} end; -handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) -> +handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _}, State) -> [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}), [gen_server:reply(From, {ok, Pid}) || From <- Waiters], link(Pid), + ets:delete(State#st.openers, OpenerPid), add_to_ets(DbName, Sig, DDocId, Pid, State), {reply, ok, State}; -handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) -> +handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _}, State) -> [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}), [gen_server:reply(From, Error) || From <- Waiters], + ets:delete(State#st.openers, OpenerPid), ets:delete(State#st.by_sig, {DbName, Sig}), {reply, ok, State}; handle_call({reset_indexes, DbName}, _From, State) -> @@ -192,19 +198,31 @@ handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) -> {noreply, State}; handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) -> ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}), + {noreply, State}; +handle_cast({rem_from_ets, [DbName]}, State) -> + rem_from_ets(DbName, State), {noreply, State}. handle_info({'EXIT', Pid, Reason}, Server) -> + Cleanup = fun(DbName, Sig) -> + DDocIds = [ + DDocId + || {_, {DDocId, _}} <- + ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}}) + ], + rem_from_ets(DbName, Sig, DDocIds, Pid, Server) + end, case ets:lookup(Server#st.by_pid, Pid) of [{Pid, {DbName, Sig}}] -> - DDocIds = [ - DDocId - || {_, {DDocId, _}} <- - ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}}) - ], - rem_from_ets(DbName, Sig, DDocIds, Pid, Server); + Cleanup(DbName, Sig); [] when Reason /= normal -> - exit(Reason); + case ets:lookup(Server#st.openers, Pid) of + [{Pid, {DbName, Sig}}] -> + ets:delete(Server#st.openers, Pid), + Cleanup(DbName, Sig); + [] -> + exit(Reason) + end; _Else -> ok end, @@ -298,6 +316,27 @@ rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) -> DDocIds ). +rem_from_ets(DbName, #st{} = State) -> + SigDDocIds = lists:foldl( + fun({_, {DDocId, Sig}}, DDict) -> + dict:append(Sig, DDocId, DDict) + end, + dict:new(), + ets:lookup(State#st.by_db, DbName) + ), + Fun = fun({Sig, DDocIds}) -> + [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}), + unlink(Pid), + receive + {'EXIT', Pid, _} -> + ok + after 0 -> + ok + end, + rem_from_ets(DbName, Sig, DDocIds, Pid, State) + end, + lists:foreach(Fun, dict:to_list(SigDDocIds)). + handle_db_event(DbName, created, St) -> gen_server:cast(St#st.server_name, {reset_indexes, DbName}), {ok, St}; @@ -305,48 +344,50 @@ handle_db_event(DbName, deleted, St) -> gen_server:cast(St#st.server_name, {reset_indexes, DbName}), {ok, St}; handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) -> - DDocResult = couch_util:with_db(DbName, fun(Db) -> - couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) - end), - LocalShards = - try - mem3:local_shards(mem3:dbname(DbName)) - catch - Class:Msg -> - couch_log:warning( - "~p got ~p:~p when fetching local shards for ~p", - [?MODULE, Class, Msg, DbName] - ), - [] - end, - DbShards = [mem3:name(Sh) || Sh <- LocalShards], - lists:foreach( - fun(DbShard) -> - lists:foreach( - fun({_DbShard, {_DDocId, Sig}}) -> - % check if there are other ddocs with the same Sig for the same db - SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}), - if - length(SigDDocs) > 1 -> - % remove records from by_db for this DDoc - Args = [DbShard, DDocId, Sig], - gen_server:cast(St#st.server_name, {rem_from_ets, Args}); - true -> - % single DDoc with this Sig - close couch_index processes - case ets:lookup(St#st.by_sig, {DbShard, Sig}) of - [{_, IndexPid}] -> - (catch gen_server:cast(IndexPid, {ddoc_updated, DDocResult})); - [] -> - [] - end - end - end, - ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}}) - ) - end, - DbShards - ), - {ok, St}; + %% this handle_db_event function must not crash (or it takes down the couch_index_server) + try + DDocResult = couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) + end), + LocalShards = mem3:local_shards(mem3:dbname(DbName)), + DbShards = [mem3:name(Sh) || Sh <- LocalShards], + lists:foreach( + fun(DbShard) -> + lists:foreach( + fun({_DbShard, {_DDocId, Sig}}) -> + % check if there are other ddocs with the same Sig for the same db + SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}), + if + length(SigDDocs) > 1 -> + % remove records from by_db for this DDoc + Args = [DbShard, DDocId, Sig], + gen_server:cast(St#st.server_name, {rem_from_ets, Args}); + true -> + % single DDoc with this Sig - close couch_index processes + case ets:lookup(St#st.by_sig, {DbShard, Sig}) of + [{_, IndexPid}] -> + (catch gen_server:cast( + IndexPid, {ddoc_updated, DDocResult} + )); + [] -> + [] + end + end + end, + ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}}) + ) + end, + DbShards + ), + {ok, St} + catch + Class:Reason:Stack -> + couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p, stack ~p", [ + ?MODULE, Class, DbName, Reason, Stack + ]), + gen_server:cast(St#st.server_name, {rem_from_ets, [DbName]}), + {ok, St} + end; handle_db_event(DbName, {ddoc_updated, DDocId}, St) -> lists:foreach( fun({_DbName, {_DDocId, Sig}}) -> @@ -378,6 +419,9 @@ by_pid(Arg) -> by_db(Arg) -> name("couchdb_indexes_by_db", Arg). +openers(Arg) -> + name("couchdb_indexes_openers", Arg). + name(BaseName, Arg) when is_list(Arg) -> name(BaseName, ?l2b(Arg)); name(BaseName, Arg) when is_binary(Arg) -> diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl index 3af58d2fc..cbdb71954 100644 --- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl +++ b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl @@ -109,7 +109,8 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) -> couch_db:name(DbShard), {ddoc_updated, DDocID}, {st, "", couch_index_server:server_name(I), couch_index_server:by_sig(I), - couch_index_server:by_pid(I), couch_index_server:by_db(I)} + couch_index_server:by_pid(I), couch_index_server:by_db(I), + couch_index_server:openers(I)} ) end, seq() |