summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2023-03-22 22:00:48 +0000
committerGitHub <noreply@github.com>2023-03-22 22:00:48 +0000
commit0073e764bc7096dbf867a79ac88312acf4c40d73 (patch)
tree93f1a1de176b11fa12b9bfb1254cbe35401f21e5
parent189db657f54f5c1d3ed54fb4a11ac73718fb4691 (diff)
parent224d6764e8d52f451d30ef15d5be5e508c82ca17 (diff)
downloadcouchdb-0073e764bc7096dbf867a79ac88312acf4c40d73.tar.gz
Merge pull request #4491 from apache/couch_index_fixes
Couch index fixes
-rw-r--r--src/couch_event/src/couch_event_listener_mfa.erl25
-rw-r--r--src/couch_index/src/couch_index_server.erl154
-rw-r--r--src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl3
3 files changed, 119 insertions, 63 deletions
diff --git a/src/couch_event/src/couch_event_listener_mfa.erl b/src/couch_event/src/couch_event_listener_mfa.erl
index b4cd9148a..5ec465cf7 100644
--- a/src/couch_event/src/couch_event_listener_mfa.erl
+++ b/src/couch_event/src/couch_event_listener_mfa.erl
@@ -76,13 +76,24 @@ terminate(_Reason, _MFA) ->
ok.
handle_event(DbName, Event, #st{mod = Mod, func = Func, state = State} = St) ->
- case (catch Mod:Func(DbName, Event, State)) of
- {ok, NewState} ->
- {ok, St#st{state = NewState}};
- stop ->
- {stop, normal, St};
- Else ->
- erlang:error(Else)
+ try
+ case Mod:Func(DbName, Event, State) of
+ {ok, NewState} ->
+ {ok, St#st{state = NewState}};
+ stop ->
+ {stop, normal, St};
+ Else ->
+ couch_log:error("~p: else in handle_event for db ~p, event ~p, else ~p", [
+ ?MODULE, DbName, Event, Else
+ ]),
+ erlang:error(Else)
+ end
+ catch
+ Class:Reason:Stack ->
+ couch_log:error("~p: ~p in handle_event for db ~p, event ~p, reason ~p, stack ~p", [
+ ?MODULE, Class, DbName, Event, Reason, Stack
+ ]),
+ erlang:raise(Class, Reason, Stack)
end.
handle_cast(shutdown, St) ->
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index c3440024e..35df43d2a 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -22,7 +22,7 @@
-export([handle_call/3, handle_cast/2, handle_info/2]).
% Sharding functions
--export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1]).
+-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1, openers/1]).
-export([aggregate_queue_len/0, names/0]).
% Exported for callbacks
@@ -41,7 +41,8 @@
server_name,
by_sig,
by_pid,
- by_db
+ by_db,
+ openers
}).
start_link(N) ->
@@ -129,6 +130,7 @@ init([N]) ->
ets:new(by_sig(N), [protected, set, named_table]),
ets:new(by_pid(N), [private, set, named_table]),
ets:new(by_db(N), [protected, bag, named_table]),
+ ets:new(openers(N), [protected, set, named_table]),
RootDir = couch_index_util:root_dir(),
% We only need one of the index servers to nuke this on startup.
case N of
@@ -140,7 +142,8 @@ init([N]) ->
server_name = server_name(N),
by_sig = by_sig(N),
by_pid = by_pid(N),
- by_db = by_db(N)
+ by_db = by_db(N),
+ openers = openers(N)
},
ok = config:listen_for_changes(?MODULE, St),
couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]),
@@ -154,7 +157,8 @@ terminate(_Reason, State) ->
handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
case ets:lookup(State#st.by_sig, {DbName, Sig}) of
[] ->
- spawn_link(fun() -> new_index(Args) end),
+ Pid = spawn_link(fun() -> new_index(Args) end),
+ ets:insert(State#st.openers, {Pid, {DbName, Sig}}),
ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}),
{noreply, State};
[{_, Waiters}] when is_list(Waiters) ->
@@ -163,15 +167,17 @@ handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
[{_, Pid}] when is_pid(Pid) ->
{reply, {ok, Pid}, State}
end;
-handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
+handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _}, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
link(Pid),
+ ets:delete(State#st.openers, OpenerPid),
add_to_ets(DbName, Sig, DDocId, Pid, State),
{reply, ok, State};
-handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _}, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
+ ets:delete(State#st.openers, OpenerPid),
ets:delete(State#st.by_sig, {DbName, Sig}),
{reply, ok, State};
handle_call({reset_indexes, DbName}, _From, State) ->
@@ -192,19 +198,31 @@ handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
{noreply, State};
handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
+ {noreply, State};
+handle_cast({rem_from_ets, [DbName]}, State) ->
+ rem_from_ets(DbName, State),
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, Server) ->
+ Cleanup = fun(DbName, Sig) ->
+ DDocIds = [
+ DDocId
+ || {_, {DDocId, _}} <-
+ ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
+ ],
+ rem_from_ets(DbName, Sig, DDocIds, Pid, Server)
+ end,
case ets:lookup(Server#st.by_pid, Pid) of
[{Pid, {DbName, Sig}}] ->
- DDocIds = [
- DDocId
- || {_, {DDocId, _}} <-
- ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
- ],
- rem_from_ets(DbName, Sig, DDocIds, Pid, Server);
+ Cleanup(DbName, Sig);
[] when Reason /= normal ->
- exit(Reason);
+ case ets:lookup(Server#st.openers, Pid) of
+ [{Pid, {DbName, Sig}}] ->
+ ets:delete(Server#st.openers, Pid),
+ Cleanup(DbName, Sig);
+ [] ->
+ exit(Reason)
+ end;
_Else ->
ok
end,
@@ -298,6 +316,27 @@ rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
DDocIds
).
+rem_from_ets(DbName, #st{} = State) ->
+ SigDDocIds = lists:foldl(
+ fun({_, {DDocId, Sig}}, DDict) ->
+ dict:append(Sig, DDocId, DDict)
+ end,
+ dict:new(),
+ ets:lookup(State#st.by_db, DbName)
+ ),
+ Fun = fun({Sig, DDocIds}) ->
+ [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
+ unlink(Pid),
+ receive
+ {'EXIT', Pid, _} ->
+ ok
+ after 0 ->
+ ok
+ end,
+ rem_from_ets(DbName, Sig, DDocIds, Pid, State)
+ end,
+ lists:foreach(Fun, dict:to_list(SigDDocIds)).
+
handle_db_event(DbName, created, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
@@ -305,48 +344,50 @@ handle_db_event(DbName, deleted, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
- DDocResult = couch_util:with_db(DbName, fun(Db) ->
- couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
- end),
- LocalShards =
- try
- mem3:local_shards(mem3:dbname(DbName))
- catch
- Class:Msg ->
- couch_log:warning(
- "~p got ~p:~p when fetching local shards for ~p",
- [?MODULE, Class, Msg, DbName]
- ),
- []
- end,
- DbShards = [mem3:name(Sh) || Sh <- LocalShards],
- lists:foreach(
- fun(DbShard) ->
- lists:foreach(
- fun({_DbShard, {_DDocId, Sig}}) ->
- % check if there are other ddocs with the same Sig for the same db
- SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
- if
- length(SigDDocs) > 1 ->
- % remove records from by_db for this DDoc
- Args = [DbShard, DDocId, Sig],
- gen_server:cast(St#st.server_name, {rem_from_ets, Args});
- true ->
- % single DDoc with this Sig - close couch_index processes
- case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
- [{_, IndexPid}] ->
- (catch gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
- [] ->
- []
- end
- end
- end,
- ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
- )
- end,
- DbShards
- ),
- {ok, St};
+ %% this handle_db_event function must not crash (or it takes down the couch_index_server)
+ try
+ DDocResult = couch_util:with_db(DbName, fun(Db) ->
+ couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
+ end),
+ LocalShards = mem3:local_shards(mem3:dbname(DbName)),
+ DbShards = [mem3:name(Sh) || Sh <- LocalShards],
+ lists:foreach(
+ fun(DbShard) ->
+ lists:foreach(
+ fun({_DbShard, {_DDocId, Sig}}) ->
+ % check if there are other ddocs with the same Sig for the same db
+ SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
+ if
+ length(SigDDocs) > 1 ->
+ % remove records from by_db for this DDoc
+ Args = [DbShard, DDocId, Sig],
+ gen_server:cast(St#st.server_name, {rem_from_ets, Args});
+ true ->
+ % single DDoc with this Sig - close couch_index processes
+ case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
+ [{_, IndexPid}] ->
+ (catch gen_server:cast(
+ IndexPid, {ddoc_updated, DDocResult}
+ ));
+ [] ->
+ []
+ end
+ end
+ end,
+ ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
+ )
+ end,
+ DbShards
+ ),
+ {ok, St}
+ catch
+ Class:Reason:Stack ->
+ couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p, stack ~p", [
+ ?MODULE, Class, DbName, Reason, Stack
+ ]),
+ gen_server:cast(St#st.server_name, {rem_from_ets, [DbName]}),
+ {ok, St}
+ end;
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(
fun({_DbName, {_DDocId, Sig}}) ->
@@ -378,6 +419,9 @@ by_pid(Arg) ->
by_db(Arg) ->
name("couchdb_indexes_by_db", Arg).
+openers(Arg) ->
+ name("couchdb_indexes_openers", Arg).
+
name(BaseName, Arg) when is_list(Arg) ->
name(BaseName, ?l2b(Arg));
name(BaseName, Arg) when is_binary(Arg) ->
diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
index 3af58d2fc..cbdb71954 100644
--- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
+++ b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
@@ -109,7 +109,8 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
couch_db:name(DbShard),
{ddoc_updated, DDocID},
{st, "", couch_index_server:server_name(I), couch_index_server:by_sig(I),
- couch_index_server:by_pid(I), couch_index_server:by_db(I)}
+ couch_index_server:by_pid(I), couch_index_server:by_db(I),
+ couch_index_server:openers(I)}
)
end,
seq()