summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2021-11-30 15:39:58 +0000
committerRobert Newson <rnewson@apache.org>2021-12-08 12:19:04 +0000
commitbb041c3a948d9ffca50bd68d2f32f1ae17c23f3c (patch)
treebffb1283b8cfa1b3204ee74a18d1ca314bb70695
parentaa04ed6520c84ed5f6dcd6a73e9a1585df18d91e (diff)
downloadcouchdb-sharded_couch_index_server.tar.gz
Add sharding to couch_index_serversharded_couch_index_server
-rw-r--r--src/chttpd/src/chttpd_node.erl3
-rw-r--r--src/couch/src/couch_secondary_sup.erl20
-rw-r--r--src/couch_index/src/couch_index.app.src2
-rw-r--r--src/couch_index/src/couch_index_server.erl204
-rw-r--r--src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl26
-rw-r--r--src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl10
6 files changed, 173 insertions, 92 deletions
diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl
index 7379dba02..63a7fb1fc 100644
--- a/src/chttpd/src/chttpd_node.erl
+++ b/src/chttpd/src/chttpd_node.erl
@@ -282,7 +282,8 @@ get_stats() ->
MessageQueues0 = [
{couch_file, {CF}},
{couch_db_updater, {CDU}},
- {couch_server, couch_server:aggregate_queue_len()}
+ {couch_server, couch_server:aggregate_queue_len()},
+ {index_server, couch_index_server:aggregate_queue_len()}
],
MessageQueues = MessageQueues0 ++ message_queues(registered()),
{SQ, DCQ} = run_queues(),
diff --git a/src/couch/src/couch_secondary_sup.erl b/src/couch/src/couch_secondary_sup.erl
index a328c170e..cfe38bbd4 100644
--- a/src/couch/src/couch_secondary_sup.erl
+++ b/src/couch/src/couch_secondary_sup.erl
@@ -22,12 +22,12 @@ init([]) ->
{couch_plugin_event, {gen_event, start_link, [{local, couch_plugin}]}, permanent,
brutal_kill, worker, dynamic}
],
- Daemons = [
- {index_server, {couch_index_server, start_link, []}},
- {query_servers, {couch_proc_manager, start_link, []}},
- {vhosts, {couch_httpd_vhost, start_link, []}},
- {uuids, {couch_uuids, start, []}}
- ],
+ Daemons =
+ [
+ {query_servers, {couch_proc_manager, start_link, []}},
+ {vhosts, {couch_httpd_vhost, start_link, []}},
+ {uuids, {couch_uuids, start, []}}
+ ] ++ couch_index_servers(),
MaybeHttp =
case http_enabled() of
@@ -69,3 +69,11 @@ https_enabled() ->
LegacySSLEnabled = LegacySSL =:= "{chttpd, start_link, [https]}",
SSLEnabled orelse LegacySSLEnabled.
+
+couch_index_servers() ->
+ N = couch_index_server:num_servers(),
+ [couch_index_server(I) || I <- lists:seq(1, N)].
+
+couch_index_server(N) ->
+ Name = couch_index_server:server_name(N),
+ {Name, {couch_index_server, start_link, [N]}}.
diff --git a/src/couch_index/src/couch_index.app.src b/src/couch_index/src/couch_index.app.src
index 3aa92ba5d..834be3f3c 100644
--- a/src/couch_index/src/couch_index.app.src
+++ b/src/couch_index/src/couch_index.app.src
@@ -13,7 +13,7 @@
{application, couch_index, [
{description, "CouchDB Secondary Index Manager"},
{vsn, git},
- {registered, [couch_index_server]},
+ {registered, []},
{applications, [kernel, stdlib, couch_epi]},
{mod, {couch_index_app, []}}
]}.
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 77f91cc5b..a72ec3b88 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -16,11 +16,15 @@
-vsn(2).
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
+-export([start_link/1, validate/2, get_index/4, get_index/3, get_index/2]).
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
+% Sharding functions
+-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1]).
+-export([aggregate_queue_len/0]).
+
% Exported for callbacks
-export([
handle_config_change/5,
@@ -30,15 +34,18 @@
-include_lib("couch/include/couch_db.hrl").
--define(BY_SIG, couchdb_indexes_by_sig).
--define(BY_PID, couchdb_indexes_by_pid).
--define(BY_DB, couchdb_indexes_by_db).
-define(RELISTEN_DELAY, 5000).
--record(st, {root_dir}).
+-record(st, {
+ root_dir,
+ server_name,
+ by_sig,
+ by_pid,
+ by_db
+}).
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(N) ->
+ gen_server:start_link({local, server_name(N)}, ?MODULE, [N], []).
validate(Db, DDoc) ->
LoadModFun = fun
@@ -101,90 +108,101 @@ get_index(Module, Db, DDoc, _Fun) ->
get_index(Module, IdxState) ->
DbName = Module:get(db_name, IdxState),
Sig = Module:get(signature, IdxState),
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
+ case ets:lookup(by_sig(DbName), {DbName, Sig}) of
[{_, Pid}] when is_pid(Pid) ->
DDocId = Module:get(idx_name, IdxState),
- case ets:match_object(?BY_DB, {DbName, {DDocId, Sig}}) of
+ case ets:match_object(by_db(DbName), {DbName, {DDocId, Sig}}) of
[] ->
Args = [Pid, DbName, DDocId, Sig],
- gen_server:cast(?MODULE, {add_to_ets, Args});
+ gen_server:cast(server_name(DbName), {add_to_ets, Args});
_ ->
ok
end,
{ok, Pid};
_ ->
Args = {Module, IdxState, DbName, Sig},
- gen_server:call(?MODULE, {get_index, Args}, infinity)
+ gen_server:call(server_name(DbName), {get_index, Args}, infinity)
end.
-init([]) ->
+init([N]) ->
process_flag(trap_exit, true),
- ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
- ets:new(?BY_SIG, [protected, set, named_table]),
- ets:new(?BY_PID, [private, set, named_table]),
- ets:new(?BY_DB, [protected, bag, named_table]),
- couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
+ ets:new(by_sig(N), [protected, set, named_table]),
+ ets:new(by_pid(N), [private, set, named_table]),
+ ets:new(by_db(N), [protected, bag, named_table]),
RootDir = couch_index_util:root_dir(),
- couch_file:init_delete_dir(RootDir),
- {ok, #st{root_dir = RootDir}}.
+ % We only need one of the index servers to nuke this on startup.
+ case N of
+ 1 -> couch_file:init_delete_dir(RootDir);
+ _ -> ok
+ end,
+ St = #st{
+ root_dir = RootDir,
+ server_name = server_name(N),
+ by_sig = by_sig(N),
+ by_pid = by_pid(N),
+ by_db = by_db(N)
+ },
+ ok = config:listen_for_changes(?MODULE, St),
+ couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]),
+ {ok, St}.
-terminate(_Reason, _State) ->
- Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+terminate(_Reason, State) ->
+ Pids = [Pid || {Pid, _} <- ets:tab2list(State#st.by_pid)],
lists:map(fun couch_util:shutdown_sync/1, Pids),
ok.
handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
+ case ets:lookup(State#st.by_sig, {DbName, Sig}) of
[] ->
spawn_link(fun() -> new_index(Args) end),
- ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+ ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}),
{noreply, State};
[{_, Waiters}] when is_list(Waiters) ->
- ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+ ets:insert(State#st.by_sig, {{DbName, Sig}, [From | Waiters]}),
{noreply, State};
[{_, Pid}] when is_pid(Pid) ->
{reply, {ok, Pid}, State}
end;
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
- [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
link(Pid),
- add_to_ets(DbName, Sig, DDocId, Pid),
+ add_to_ets(DbName, Sig, DDocId, Pid, State),
{reply, ok, State};
handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
- [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
- ets:delete(?BY_SIG, {DbName, Sig}),
+ ets:delete(State#st.by_sig, {DbName, Sig}),
{reply, ok, State};
handle_call({reset_indexes, DbName}, _From, State) ->
- reset_indexes(DbName, State#st.root_dir),
+ reset_indexes(DbName, State),
{reply, ok, State}.
handle_cast({reset_indexes, DbName}, State) ->
- reset_indexes(DbName, State#st.root_dir),
+ reset_indexes(DbName, State),
{noreply, State};
handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
% check if Pid still exists
- case ets:lookup(?BY_PID, Pid) of
+ case ets:lookup(State#st.by_pid, Pid) of
[{Pid, {DbName, Sig}}] when is_pid(Pid) ->
- ets:insert(?BY_DB, {DbName, {DDocId, Sig}});
+ ets:insert(State#st.by_db, {DbName, {DDocId, Sig}});
_ ->
ok
end,
{noreply, State};
handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
- ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}),
+ ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, Server) ->
- case ets:lookup(?BY_PID, Pid) of
+ case ets:lookup(Server#st.by_pid, Pid) of
[{Pid, {DbName, Sig}}] ->
DDocIds = [
DDocId
|| {_, {DDocId, _}} <-
- ets:match_object(?BY_DB, {DbName, {'$1', Sig}})
+ ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
],
- rem_from_ets(DbName, Sig, DDocIds, Pid);
+ rem_from_ets(DbName, Sig, DDocIds, Pid, Server);
[] when Reason /= normal ->
exit(Reason);
_Else ->
@@ -201,50 +219,50 @@ handle_info(Msg, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) ->
- {ok, RootDir};
-handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) ->
- {ok, RootDir};
-handle_config_change("couchdb", "index_dir", _, _, _) ->
- exit(whereis(couch_index_server), config_change),
+handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
+ {ok, St};
+handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
+ {ok, St};
+handle_config_change("couchdb", "index_dir", _, _, St) ->
+ exit(whereis(St#st.server_name), config_change),
remove_handler;
-handle_config_change("couchdb", "view_index_dir", _, _, _) ->
- exit(whereis(couch_index_server), config_change),
+handle_config_change("couchdb", "view_index_dir", _, _, St) ->
+ exit(whereis(St#st.server_name), config_change),
remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
- {ok, RootDir}.
+handle_config_change(_, _, _, _, St) ->
+ {ok, St}.
handle_config_terminate(_, stop, _) ->
ok;
-handle_config_terminate(_Server, _Reason, _State) ->
- erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener),
- {ok, couch_index_util:root_dir()}.
+handle_config_terminate(_Server, _Reason, State) ->
+ erlang:send_after(?RELISTEN_DELAY, whereis(State#st.server_name), restart_config_listener),
+ {ok, State}.
new_index({Mod, IdxState, DbName, Sig}) ->
DDocId = Mod:get(idx_name, IdxState),
case couch_index:start_link({Mod, IdxState}) of
{ok, Pid} ->
ok = gen_server:call(
- ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}
+ server_name(DbName), {async_open, {DbName, DDocId, Sig}, {ok, Pid}}
),
unlink(Pid);
Error ->
ok = gen_server:call(
- ?MODULE, {async_error, {DbName, DDocId, Sig}, Error}
+ server_name(DbName), {async_error, {DbName, DDocId, Sig}, Error}
)
end.
-reset_indexes(DbName, Root) ->
+reset_indexes(DbName, #st{} = State) ->
% shutdown all the updaters and clear the files, the db got changed
SigDDocIds = lists:foldl(
fun({_, {DDocId, Sig}}, DDict) ->
dict:append(Sig, DDocId, DDict)
end,
dict:new(),
- ets:lookup(?BY_DB, DbName)
+ ets:lookup(State#st.by_db, DbName)
),
Fun = fun({Sig, DDocIds}) ->
- [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
unlink(Pid),
gen_server:cast(Pid, delete),
receive
@@ -253,32 +271,38 @@ reset_indexes(DbName, Root) ->
after 0 ->
ok
end,
- rem_from_ets(DbName, Sig, DDocIds, Pid)
+ rem_from_ets(DbName, Sig, DDocIds, Pid, State)
end,
lists:foreach(Fun, dict:to_list(SigDDocIds)),
- Path = couch_index_util:index_dir("", DbName),
- couch_file:nuke_dir(Root, Path).
+ % We only need one of the index servers to do this.
+ case State#st.server_name == server_name(1) of
+ true ->
+ Path = couch_index_util:index_dir("", DbName),
+ couch_file:nuke_dir(State#st.root_dir, Path);
+ false ->
+ ok
+ end.
-add_to_ets(DbName, Sig, DDocId, Pid) ->
- ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
- ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
- ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
+ ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
+ ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
+ ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
-rem_from_ets(DbName, Sig, DDocIds, Pid) ->
- ets:delete(?BY_SIG, {DbName, Sig}),
- ets:delete(?BY_PID, Pid),
+rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
+ ets:delete(St#st.by_sig, {DbName, Sig}),
+ ets:delete(St#st.by_pid, Pid),
lists:foreach(
fun(DDocId) ->
- ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}})
+ ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
end,
DDocIds
).
handle_db_event(DbName, created, St) ->
- gen_server:cast(?MODULE, {reset_indexes, DbName}),
+ gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(DbName, deleted, St) ->
- gen_server:cast(?MODULE, {reset_indexes, DbName}),
+ gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
DDocResult = couch_util:with_db(DbName, fun(Db) ->
@@ -297,15 +321,15 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(
fun({_DbShard, {_DDocId, Sig}}) ->
% check if there are other ddocs with the same Sig for the same db
- SigDDocs = ets:match_object(?BY_DB, {DbShard, {'$1', Sig}}),
+ SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
if
length(SigDDocs) > 1 ->
- % remove records from ?BY_DB for this DDoc
+ % remove records from by_db for this DDoc
Args = [DbShard, DDocId, Sig],
- gen_server:cast(?MODULE, {rem_from_ets, Args});
+ gen_server:cast(St#st.server_name, {rem_from_ets, Args});
true ->
% single DDoc with this Sig - close couch_index processes
- case ets:lookup(?BY_SIG, {DbShard, Sig}) of
+ case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
[{_, IndexPid}] ->
(catch gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
[] ->
@@ -313,7 +337,7 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
end
end
end,
- ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}})
+ ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
)
end,
DbShards
@@ -322,15 +346,47 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(
fun({_DbName, {_DDocId, Sig}}) ->
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
+ case ets:lookup(St#st.by_sig, {DbName, Sig}) of
[{_, IndexPid}] ->
(catch gen_server:cast(IndexPid, ddoc_updated));
[] ->
ok
end
end,
- ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}})
+ ets:match_object(St#st.by_db, {DbName, {DDocId, '$1'}})
),
{ok, St};
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
+
+num_servers() ->
+ erlang:system_info(schedulers).
+
+server_name(Arg) ->
+ name("index_server", Arg).
+
+by_sig(Arg) ->
+ name("couchdb_indexes_by_sig", Arg).
+
+by_pid(Arg) ->
+ name("couchdb_indexes_by_pid", Arg).
+
+by_db(Arg) ->
+ name("couchdb_indexes_by_db", Arg).
+
+name(BaseName, Arg) when is_list(Arg) ->
+ name(BaseName, ?l2b(Arg));
+name(BaseName, Arg) when is_binary(Arg) ->
+ N = 1 + erlang:phash2(Arg, num_servers()),
+ name(BaseName, N);
+name(BaseName, N) when is_integer(N), N > 0 ->
+ list_to_atom(BaseName ++ "_" ++ integer_to_list(N)).
+
+aggregate_queue_len() ->
+ N = num_servers(),
+ Names = [server_name(I) || I <- lists:seq(1, N)],
+ MQs = [
+ process_info(whereis(Name), message_queue_len)
+ || Name <- Names
+ ],
+ lists:sum([X || {_, X} <- MQs]).
diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
index 7bee8baae..3af58d2fc 100644
--- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
+++ b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
@@ -103,8 +103,16 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
% assert that all index processes exit after ddoc updated
ok = meck:reset(test_index),
- couch_index_server:handle_db_event(
- couch_db:name(DbShard), {ddoc_updated, DDocID}, {st, ""}
+ lists:foreach(
+ fun(I) ->
+ couch_index_server:handle_db_event(
+ couch_db:name(DbShard),
+ {ddoc_updated, DDocID},
+ {st, "", couch_index_server:server_name(I), couch_index_server:by_sig(I),
+ couch_index_server:by_pid(I), couch_index_server:by_db(I)}
+ )
+ end,
+ seq()
),
ok = meck:wait(N, test_index, init, ['_', '_'], 5000),
@@ -139,8 +147,13 @@ fake_index() ->
get_indexes_by_ddoc(DDocID, N) ->
Indexes = test_util:wait(fun() ->
- Indxs = ets:match_object(
- couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}
+ Indxs = lists:flatmap(
+ fun(I) ->
+ ets:match_object(
+ couch_index_server:by_db(I), {'$1', {DDocID, '$2'}}
+ )
+ end,
+ seq()
),
case length(Indxs) == N of
true ->
@@ -151,7 +164,7 @@ get_indexes_by_ddoc(DDocID, N) ->
end),
lists:foldl(
fun({DbName, {_DDocID, Sig}}, Acc) ->
- case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+ case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of
[{_, Pid}] -> [Pid | Acc];
_ -> Acc
end
@@ -159,3 +172,6 @@ get_indexes_by_ddoc(DDocID, N) ->
[],
Indexes
).
+
+seq() ->
+ lists:seq(1, couch_index_server:num_servers()).
diff --git a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
index 2a6299448..91b24e336 100644
--- a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
+++ b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
@@ -88,7 +88,7 @@ check_indexing_stops_on_ddoc_change(Db) ->
?_test(begin
DDocID = <<"_design/bar">>,
- IndexesBefore = get_indexes_by_ddoc(DDocID, 1),
+ IndexesBefore = get_indexes_by_ddoc(couch_db:name(Db), DDocID, 1),
?assertEqual(1, length(IndexesBefore)),
AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
?assertEqual(1, length(AliveBefore)),
@@ -127,16 +127,16 @@ check_indexing_stops_on_ddoc_change(Db) ->
end,
%% assert that previously running indexes are gone
- IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+ IndexesAfter = get_indexes_by_ddoc(couch_db:name(Db), DDocID, 0),
?assertEqual(0, length(IndexesAfter)),
AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
?assertEqual(0, length(AliveAfter))
end).
-get_indexes_by_ddoc(DDocID, N) ->
+get_indexes_by_ddoc(DbName0, DDocID, N) ->
Indexes = test_util:wait(fun() ->
Indxs = ets:match_object(
- couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}
+ couch_index_server:by_db(DbName0), {'$1', {DDocID, '$2'}}
),
case length(Indxs) == N of
true ->
@@ -147,7 +147,7 @@ get_indexes_by_ddoc(DDocID, N) ->
end),
lists:foldl(
fun({DbName, {_DDocID, Sig}}, Acc) ->
- case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+ case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of
[{_, Pid}] -> [Pid | Acc];
_ -> Acc
end