diff options
author | Robert Newson <rnewson@apache.org> | 2021-12-13 16:23:38 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-13 16:23:38 +0000 |
commit | 7487606e1f0eb6a12649479cfb2ccb7bcdbcc6fe (patch) | |
tree | 169665c91d0aa5fdc159c1a8fddfe15e933e4081 | |
parent | 8d329e587dacc0b94cdbc2a562270042214dcca2 (diff) | |
parent | 380d8ccb2623673ffc783b6dc0dcf01e24ef9eae (diff) | |
download | couchdb-7487606e1f0eb6a12649479cfb2ccb7bcdbcc6fe.tar.gz |
Merge pull request #3860 from apache/sharded_couch_index_server
Sharded couch index server
-rw-r--r-- | src/chttpd/src/chttpd_node.erl | 3 | ||||
-rw-r--r-- | src/couch/src/couch_secondary_sup.erl | 20 | ||||
-rw-r--r-- | src/couch_index/src/couch_index.app.src | 2 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_server.erl | 204 | ||||
-rw-r--r-- | src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl | 26 | ||||
-rw-r--r-- | src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl | 10 |
6 files changed, 173 insertions, 92 deletions
diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl index 7379dba02..63a7fb1fc 100644 --- a/src/chttpd/src/chttpd_node.erl +++ b/src/chttpd/src/chttpd_node.erl @@ -282,7 +282,8 @@ get_stats() -> MessageQueues0 = [ {couch_file, {CF}}, {couch_db_updater, {CDU}}, - {couch_server, couch_server:aggregate_queue_len()} + {couch_server, couch_server:aggregate_queue_len()}, + {index_server, couch_index_server:aggregate_queue_len()} ], MessageQueues = MessageQueues0 ++ message_queues(registered()), {SQ, DCQ} = run_queues(), diff --git a/src/couch/src/couch_secondary_sup.erl b/src/couch/src/couch_secondary_sup.erl index a328c170e..cfe38bbd4 100644 --- a/src/couch/src/couch_secondary_sup.erl +++ b/src/couch/src/couch_secondary_sup.erl @@ -22,12 +22,12 @@ init([]) -> {couch_plugin_event, {gen_event, start_link, [{local, couch_plugin}]}, permanent, brutal_kill, worker, dynamic} ], - Daemons = [ - {index_server, {couch_index_server, start_link, []}}, - {query_servers, {couch_proc_manager, start_link, []}}, - {vhosts, {couch_httpd_vhost, start_link, []}}, - {uuids, {couch_uuids, start, []}} - ], + Daemons = + [ + {query_servers, {couch_proc_manager, start_link, []}}, + {vhosts, {couch_httpd_vhost, start_link, []}}, + {uuids, {couch_uuids, start, []}} + ] ++ couch_index_servers(), MaybeHttp = case http_enabled() of @@ -69,3 +69,11 @@ https_enabled() -> LegacySSLEnabled = LegacySSL =:= "{chttpd, start_link, [https]}", SSLEnabled orelse LegacySSLEnabled. + +couch_index_servers() -> + N = couch_index_server:num_servers(), + [couch_index_server(I) || I <- lists:seq(1, N)]. + +couch_index_server(N) -> + Name = couch_index_server:server_name(N), + {Name, {couch_index_server, start_link, [N]}}. diff --git a/src/couch_index/src/couch_index.app.src b/src/couch_index/src/couch_index.app.src index 3aa92ba5d..834be3f3c 100644 --- a/src/couch_index/src/couch_index.app.src +++ b/src/couch_index/src/couch_index.app.src @@ -13,7 +13,7 @@ {application, couch_index, [ {description, "CouchDB Secondary Index Manager"}, {vsn, git}, - {registered, [couch_index_server]}, + {registered, []}, {applications, [kernel, stdlib, couch_epi]}, {mod, {couch_index_app, []}} ]}. diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index 77f91cc5b..a72ec3b88 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -16,11 +16,15 @@ -vsn(2). --export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]). +-export([start_link/1, validate/2, get_index/4, get_index/3, get_index/2]). -export([init/1, terminate/2, code_change/3]). -export([handle_call/3, handle_cast/2, handle_info/2]). +% Sharding functions +-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1]). +-export([aggregate_queue_len/0]). + % Exported for callbacks -export([ handle_config_change/5, @@ -30,15 +34,18 @@ -include_lib("couch/include/couch_db.hrl"). --define(BY_SIG, couchdb_indexes_by_sig). --define(BY_PID, couchdb_indexes_by_pid). --define(BY_DB, couchdb_indexes_by_db). -define(RELISTEN_DELAY, 5000). --record(st, {root_dir}). +-record(st, { + root_dir, + server_name, + by_sig, + by_pid, + by_db +}). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(N) -> + gen_server:start_link({local, server_name(N)}, ?MODULE, [N], []). validate(Db, DDoc) -> LoadModFun = fun @@ -101,90 +108,101 @@ get_index(Module, Db, DDoc, _Fun) -> get_index(Module, IdxState) -> DbName = Module:get(db_name, IdxState), Sig = Module:get(signature, IdxState), - case ets:lookup(?BY_SIG, {DbName, Sig}) of + case ets:lookup(by_sig(DbName), {DbName, Sig}) of [{_, Pid}] when is_pid(Pid) -> DDocId = Module:get(idx_name, IdxState), - case ets:match_object(?BY_DB, {DbName, {DDocId, Sig}}) of + case ets:match_object(by_db(DbName), {DbName, {DDocId, Sig}}) of [] -> Args = [Pid, DbName, DDocId, Sig], - gen_server:cast(?MODULE, {add_to_ets, Args}); + gen_server:cast(server_name(DbName), {add_to_ets, Args}); _ -> ok end, {ok, Pid}; _ -> Args = {Module, IdxState, DbName, Sig}, - gen_server:call(?MODULE, {get_index, Args}, infinity) + gen_server:call(server_name(DbName), {get_index, Args}, infinity) end. -init([]) -> +init([N]) -> process_flag(trap_exit, true), - ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()), - ets:new(?BY_SIG, [protected, set, named_table]), - ets:new(?BY_PID, [private, set, named_table]), - ets:new(?BY_DB, [protected, bag, named_table]), - couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), + ets:new(by_sig(N), [protected, set, named_table]), + ets:new(by_pid(N), [private, set, named_table]), + ets:new(by_db(N), [protected, bag, named_table]), RootDir = couch_index_util:root_dir(), - couch_file:init_delete_dir(RootDir), - {ok, #st{root_dir = RootDir}}. + % We only need one of the index servers to nuke this on startup. + case N of + 1 -> couch_file:init_delete_dir(RootDir); + _ -> ok + end, + St = #st{ + root_dir = RootDir, + server_name = server_name(N), + by_sig = by_sig(N), + by_pid = by_pid(N), + by_db = by_db(N) + }, + ok = config:listen_for_changes(?MODULE, St), + couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]), + {ok, St}. -terminate(_Reason, _State) -> - Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)], +terminate(_Reason, State) -> + Pids = [Pid || {Pid, _} <- ets:tab2list(State#st.by_pid)], lists:map(fun couch_util:shutdown_sync/1, Pids), ok. handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) -> - case ets:lookup(?BY_SIG, {DbName, Sig}) of + case ets:lookup(State#st.by_sig, {DbName, Sig}) of [] -> spawn_link(fun() -> new_index(Args) end), - ets:insert(?BY_SIG, {{DbName, Sig}, [From]}), + ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}), {noreply, State}; [{_, Waiters}] when is_list(Waiters) -> - ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}), + ets:insert(State#st.by_sig, {{DbName, Sig}, [From | Waiters]}), {noreply, State}; [{_, Pid}] when is_pid(Pid) -> {reply, {ok, Pid}, State} end; handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}), [gen_server:reply(From, {ok, Pid}) || From <- Waiters], link(Pid), - add_to_ets(DbName, Sig, DDocId, Pid), + add_to_ets(DbName, Sig, DDocId, Pid, State), {reply, ok, State}; handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) -> - [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}), [gen_server:reply(From, Error) || From <- Waiters], - ets:delete(?BY_SIG, {DbName, Sig}), + ets:delete(State#st.by_sig, {DbName, Sig}), {reply, ok, State}; handle_call({reset_indexes, DbName}, _From, State) -> - reset_indexes(DbName, State#st.root_dir), + reset_indexes(DbName, State), {reply, ok, State}. handle_cast({reset_indexes, DbName}, State) -> - reset_indexes(DbName, State#st.root_dir), + reset_indexes(DbName, State), {noreply, State}; handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) -> % check if Pid still exists - case ets:lookup(?BY_PID, Pid) of + case ets:lookup(State#st.by_pid, Pid) of [{Pid, {DbName, Sig}}] when is_pid(Pid) -> - ets:insert(?BY_DB, {DbName, {DDocId, Sig}}); + ets:insert(State#st.by_db, {DbName, {DDocId, Sig}}); _ -> ok end, {noreply, State}; handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) -> - ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}), + ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}), {noreply, State}. handle_info({'EXIT', Pid, Reason}, Server) -> - case ets:lookup(?BY_PID, Pid) of + case ets:lookup(Server#st.by_pid, Pid) of [{Pid, {DbName, Sig}}] -> DDocIds = [ DDocId || {_, {DDocId, _}} <- - ets:match_object(?BY_DB, {DbName, {'$1', Sig}}) + ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}}) ], - rem_from_ets(DbName, Sig, DDocIds, Pid); + rem_from_ets(DbName, Sig, DDocIds, Pid, Server); [] when Reason /= normal -> exit(Reason); _Else -> @@ -201,50 +219,50 @@ handle_info(Msg, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) -> - {ok, RootDir}; -handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) -> - {ok, RootDir}; -handle_config_change("couchdb", "index_dir", _, _, _) -> - exit(whereis(couch_index_server), config_change), +handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir = RootDir} = St) -> + {ok, St}; +handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir = RootDir} = St) -> + {ok, St}; +handle_config_change("couchdb", "index_dir", _, _, St) -> + exit(whereis(St#st.server_name), config_change), remove_handler; -handle_config_change("couchdb", "view_index_dir", _, _, _) -> - exit(whereis(couch_index_server), config_change), +handle_config_change("couchdb", "view_index_dir", _, _, St) -> + exit(whereis(St#st.server_name), config_change), remove_handler; -handle_config_change(_, _, _, _, RootDir) -> - {ok, RootDir}. +handle_config_change(_, _, _, _, St) -> + {ok, St}. handle_config_terminate(_, stop, _) -> ok; -handle_config_terminate(_Server, _Reason, _State) -> - erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener), - {ok, couch_index_util:root_dir()}. +handle_config_terminate(_Server, _Reason, State) -> + erlang:send_after(?RELISTEN_DELAY, whereis(State#st.server_name), restart_config_listener), + {ok, State}. new_index({Mod, IdxState, DbName, Sig}) -> DDocId = Mod:get(idx_name, IdxState), case couch_index:start_link({Mod, IdxState}) of {ok, Pid} -> ok = gen_server:call( - ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}} + server_name(DbName), {async_open, {DbName, DDocId, Sig}, {ok, Pid}} ), unlink(Pid); Error -> ok = gen_server:call( - ?MODULE, {async_error, {DbName, DDocId, Sig}, Error} + server_name(DbName), {async_error, {DbName, DDocId, Sig}, Error} ) end. -reset_indexes(DbName, Root) -> +reset_indexes(DbName, #st{} = State) -> % shutdown all the updaters and clear the files, the db got changed SigDDocIds = lists:foldl( fun({_, {DDocId, Sig}}, DDict) -> dict:append(Sig, DDocId, DDict) end, dict:new(), - ets:lookup(?BY_DB, DbName) + ets:lookup(State#st.by_db, DbName) ), Fun = fun({Sig, DDocIds}) -> - [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}), + [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}), unlink(Pid), gen_server:cast(Pid, delete), receive @@ -253,32 +271,38 @@ reset_indexes(DbName, Root) -> after 0 -> ok end, - rem_from_ets(DbName, Sig, DDocIds, Pid) + rem_from_ets(DbName, Sig, DDocIds, Pid, State) end, lists:foreach(Fun, dict:to_list(SigDDocIds)), - Path = couch_index_util:index_dir("", DbName), - couch_file:nuke_dir(Root, Path). + % We only need one of the index servers to do this. + case State#st.server_name == server_name(1) of + true -> + Path = couch_index_util:index_dir("", DbName), + couch_file:nuke_dir(State#st.root_dir, Path); + false -> + ok + end. -add_to_ets(DbName, Sig, DDocId, Pid) -> - ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), - ets:insert(?BY_PID, {Pid, {DbName, Sig}}), - ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). +add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) -> + ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}), + ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}), + ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}). -rem_from_ets(DbName, Sig, DDocIds, Pid) -> - ets:delete(?BY_SIG, {DbName, Sig}), - ets:delete(?BY_PID, Pid), +rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) -> + ets:delete(St#st.by_sig, {DbName, Sig}), + ets:delete(St#st.by_pid, Pid), lists:foreach( fun(DDocId) -> - ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}) + ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}}) end, DDocIds ). handle_db_event(DbName, created, St) -> - gen_server:cast(?MODULE, {reset_indexes, DbName}), + gen_server:cast(St#st.server_name, {reset_indexes, DbName}), {ok, St}; handle_db_event(DbName, deleted, St) -> - gen_server:cast(?MODULE, {reset_indexes, DbName}), + gen_server:cast(St#st.server_name, {reset_indexes, DbName}), {ok, St}; handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) -> DDocResult = couch_util:with_db(DbName, fun(Db) -> @@ -297,15 +321,15 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) -> lists:foreach( fun({_DbShard, {_DDocId, Sig}}) -> % check if there are other ddocs with the same Sig for the same db - SigDDocs = ets:match_object(?BY_DB, {DbShard, {'$1', Sig}}), + SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}), if length(SigDDocs) > 1 -> - % remove records from ?BY_DB for this DDoc + % remove records from by_db for this DDoc Args = [DbShard, DDocId, Sig], - gen_server:cast(?MODULE, {rem_from_ets, Args}); + gen_server:cast(St#st.server_name, {rem_from_ets, Args}); true -> % single DDoc with this Sig - close couch_index processes - case ets:lookup(?BY_SIG, {DbShard, Sig}) of + case ets:lookup(St#st.by_sig, {DbShard, Sig}) of [{_, IndexPid}] -> (catch gen_server:cast(IndexPid, {ddoc_updated, DDocResult})); [] -> @@ -313,7 +337,7 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) -> end end end, - ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}}) + ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}}) ) end, DbShards @@ -322,15 +346,47 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) -> handle_db_event(DbName, {ddoc_updated, DDocId}, St) -> lists:foreach( fun({_DbName, {_DDocId, Sig}}) -> - case ets:lookup(?BY_SIG, {DbName, Sig}) of + case ets:lookup(St#st.by_sig, {DbName, Sig}) of [{_, IndexPid}] -> (catch gen_server:cast(IndexPid, ddoc_updated)); [] -> ok end end, - ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}}) + ets:match_object(St#st.by_db, {DbName, {DDocId, '$1'}}) ), {ok, St}; handle_db_event(_DbName, _Event, St) -> {ok, St}. + +num_servers() -> + erlang:system_info(schedulers). + +server_name(Arg) -> + name("index_server", Arg). + +by_sig(Arg) -> + name("couchdb_indexes_by_sig", Arg). + +by_pid(Arg) -> + name("couchdb_indexes_by_pid", Arg). + +by_db(Arg) -> + name("couchdb_indexes_by_db", Arg). + +name(BaseName, Arg) when is_list(Arg) -> + name(BaseName, ?l2b(Arg)); +name(BaseName, Arg) when is_binary(Arg) -> + N = 1 + erlang:phash2(Arg, num_servers()), + name(BaseName, N); +name(BaseName, N) when is_integer(N), N > 0 -> + list_to_atom(BaseName ++ "_" ++ integer_to_list(N)). + +aggregate_queue_len() -> + N = num_servers(), + Names = [server_name(I) || I <- lists:seq(1, N)], + MQs = [ + process_info(whereis(Name), message_queue_len) + || Name <- Names + ], + lists:sum([X || {_, X} <- MQs]). diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl index 7bee8baae..3af58d2fc 100644 --- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl +++ b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl @@ -103,8 +103,16 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) -> % assert that all index processes exit after ddoc updated ok = meck:reset(test_index), - couch_index_server:handle_db_event( - couch_db:name(DbShard), {ddoc_updated, DDocID}, {st, ""} + lists:foreach( + fun(I) -> + couch_index_server:handle_db_event( + couch_db:name(DbShard), + {ddoc_updated, DDocID}, + {st, "", couch_index_server:server_name(I), couch_index_server:by_sig(I), + couch_index_server:by_pid(I), couch_index_server:by_db(I)} + ) + end, + seq() ), ok = meck:wait(N, test_index, init, ['_', '_'], 5000), @@ -139,8 +147,13 @@ fake_index() -> get_indexes_by_ddoc(DDocID, N) -> Indexes = test_util:wait(fun() -> - Indxs = ets:match_object( - couchdb_indexes_by_db, {'$1', {DDocID, '$2'}} + Indxs = lists:flatmap( + fun(I) -> + ets:match_object( + couch_index_server:by_db(I), {'$1', {DDocID, '$2'}} + ) + end, + seq() ), case length(Indxs) == N of true -> @@ -151,7 +164,7 @@ get_indexes_by_ddoc(DDocID, N) -> end), lists:foldl( fun({DbName, {_DDocID, Sig}}, Acc) -> - case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of + case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of [{_, Pid}] -> [Pid | Acc]; _ -> Acc end @@ -159,3 +172,6 @@ get_indexes_by_ddoc(DDocID, N) -> [], Indexes ). + +seq() -> + lists:seq(1, couch_index_server:num_servers()). diff --git a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl index 2a6299448..91b24e336 100644 --- a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl +++ b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl @@ -88,7 +88,7 @@ check_indexing_stops_on_ddoc_change(Db) -> ?_test(begin DDocID = <<"_design/bar">>, - IndexesBefore = get_indexes_by_ddoc(DDocID, 1), + IndexesBefore = get_indexes_by_ddoc(couch_db:name(Db), DDocID, 1), ?assertEqual(1, length(IndexesBefore)), AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore), ?assertEqual(1, length(AliveBefore)), @@ -127,16 +127,16 @@ check_indexing_stops_on_ddoc_change(Db) -> end, %% assert that previously running indexes are gone - IndexesAfter = get_indexes_by_ddoc(DDocID, 0), + IndexesAfter = get_indexes_by_ddoc(couch_db:name(Db), DDocID, 0), ?assertEqual(0, length(IndexesAfter)), AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore), ?assertEqual(0, length(AliveAfter)) end). -get_indexes_by_ddoc(DDocID, N) -> +get_indexes_by_ddoc(DbName0, DDocID, N) -> Indexes = test_util:wait(fun() -> Indxs = ets:match_object( - couchdb_indexes_by_db, {'$1', {DDocID, '$2'}} + couch_index_server:by_db(DbName0), {'$1', {DDocID, '$2'}} ), case length(Indxs) == N of true -> @@ -147,7 +147,7 @@ get_indexes_by_ddoc(DDocID, N) -> end), lists:foldl( fun({DbName, {_DDocID, Sig}}, Acc) -> - case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of + case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of [{_, Pid}] -> [Pid | Acc]; _ -> Acc end |