summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2023-03-22 18:41:17 +0000
committerRobert Newson <rnewson@apache.org>2023-03-22 21:12:44 +0000
commit224d6764e8d52f451d30ef15d5be5e508c82ca17 (patch)
tree93f1a1de176b11fa12b9bfb1254cbe35401f21e5
parentf82d9617324d9098d143cdd22d47872bfbb6ae32 (diff)
downloadcouchdb-224d6764e8d52f451d30ef15d5be5e508c82ca17.tar.gz
track index pids during open and don't crash if they do
-rw-r--r--src/couch_index/src/couch_index_server.erl44
-rw-r--r--src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl3
2 files changed, 33 insertions, 14 deletions
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 36273ad68..35df43d2a 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -22,7 +22,7 @@
-export([handle_call/3, handle_cast/2, handle_info/2]).
% Sharding functions
--export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1]).
+-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1, openers/1]).
-export([aggregate_queue_len/0, names/0]).
% Exported for callbacks
@@ -41,7 +41,8 @@
server_name,
by_sig,
by_pid,
- by_db
+ by_db,
+ openers
}).
start_link(N) ->
@@ -129,6 +130,7 @@ init([N]) ->
ets:new(by_sig(N), [protected, set, named_table]),
ets:new(by_pid(N), [private, set, named_table]),
ets:new(by_db(N), [protected, bag, named_table]),
+ ets:new(openers(N), [protected, set, named_table]),
RootDir = couch_index_util:root_dir(),
% We only need one of the index servers to nuke this on startup.
case N of
@@ -140,7 +142,8 @@ init([N]) ->
server_name = server_name(N),
by_sig = by_sig(N),
by_pid = by_pid(N),
- by_db = by_db(N)
+ by_db = by_db(N),
+ openers = openers(N)
},
ok = config:listen_for_changes(?MODULE, St),
couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]),
@@ -154,7 +157,8 @@ terminate(_Reason, State) ->
handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
case ets:lookup(State#st.by_sig, {DbName, Sig}) of
[] ->
- spawn_link(fun() -> new_index(Args) end),
+ Pid = spawn_link(fun() -> new_index(Args) end),
+ ets:insert(State#st.openers, {Pid, {DbName, Sig}}),
ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}),
{noreply, State};
[{_, Waiters}] when is_list(Waiters) ->
@@ -163,15 +167,17 @@ handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
[{_, Pid}] when is_pid(Pid) ->
{reply, {ok, Pid}, State}
end;
-handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
+handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _}, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
link(Pid),
+ ets:delete(State#st.openers, OpenerPid),
add_to_ets(DbName, Sig, DDocId, Pid, State),
{reply, ok, State};
-handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _}, State) ->
[{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
+ ets:delete(State#st.openers, OpenerPid),
ets:delete(State#st.by_sig, {DbName, Sig}),
{reply, ok, State};
handle_call({reset_indexes, DbName}, _From, State) ->
@@ -198,16 +204,25 @@ handle_cast({rem_from_ets, [DbName]}, State) ->
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, Server) ->
+ Cleanup = fun(DbName, Sig) ->
+ DDocIds = [
+ DDocId
+ || {_, {DDocId, _}} <-
+ ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
+ ],
+ rem_from_ets(DbName, Sig, DDocIds, Pid, Server)
+ end,
case ets:lookup(Server#st.by_pid, Pid) of
[{Pid, {DbName, Sig}}] ->
- DDocIds = [
- DDocId
- || {_, {DDocId, _}} <-
- ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
- ],
- rem_from_ets(DbName, Sig, DDocIds, Pid, Server);
+ Cleanup(DbName, Sig);
[] when Reason /= normal ->
- exit(Reason);
+ case ets:lookup(Server#st.openers, Pid) of
+ [{Pid, {DbName, Sig}}] ->
+ ets:delete(Server#st.openers, Pid),
+ Cleanup(DbName, Sig);
+ [] ->
+ exit(Reason)
+ end;
_Else ->
ok
end,
@@ -404,6 +419,9 @@ by_pid(Arg) ->
by_db(Arg) ->
name("couchdb_indexes_by_db", Arg).
+openers(Arg) ->
+ name("couchdb_indexes_openers", Arg).
+
name(BaseName, Arg) when is_list(Arg) ->
name(BaseName, ?l2b(Arg));
name(BaseName, Arg) when is_binary(Arg) ->
diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
index 3af58d2fc..cbdb71954 100644
--- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
+++ b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
@@ -109,7 +109,8 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
couch_db:name(DbShard),
{ddoc_updated, DDocID},
{st, "", couch_index_server:server_name(I), couch_index_server:by_sig(I),
- couch_index_server:by_pid(I), couch_index_server:by_db(I)}
+ couch_index_server:by_pid(I), couch_index_server:by_db(I),
+ couch_index_server:openers(I)}
)
end,
seq()