summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2021-02-09 15:42:50 +0000
committerRobert Newson <rnewson@apache.org>2021-02-12 13:12:53 +0000
commitc4b254fcb9addf95c0fa42cdb810c285e4572210 (patch)
tree23c0159c3cfcc87a26ad5870eab64d2c54020265
parent5819f71d23e46917f6ec8e41a2bf70d34b4cc757 (diff)
downloadcouchdb-couch_server_sharding.tar.gz
Shard couch_server for performancecouch_server_sharding
-rw-r--r--src/couch/src/couch_lru.erl13
-rw-r--r--src/couch/src/couch_primary_sup.erl24
-rw-r--r--src/couch/src/couch_server.erl227
-rw-r--r--src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl5
-rw-r--r--src/couch/test/eunit/couch_db_split_tests.erl2
-rw-r--r--src/couch/test/eunit/couch_db_tests.erl5
-rw-r--r--src/couch/test/eunit/couch_server_tests.erl6
-rw-r--r--src/couch/test/eunit/couchdb_db_tests.erl4
-rw-r--r--src/couch_pse_tests/src/cpse_util.erl3
9 files changed, 181 insertions, 108 deletions
diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl
index 6ad7c65cd..618a0144f 100644
--- a/src/couch/src/couch_lru.erl
+++ b/src/couch/src/couch_lru.erl
@@ -43,17 +43,20 @@ close({Tree, _} = Cache) ->
close_int(none, _) ->
false;
close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) ->
- case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
+ CouchDbs = couch_server:couch_dbs(DbName),
+ CouchDbsPidToName = couch_server:couch_dbs_pid_to_name(DbName),
+
+ case ets:update_element(CouchDbs, DbName, {#entry.lock, locked}) of
true ->
- [#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName),
+ [#entry{db = Db, pid = Pid}] = ets:lookup(CouchDbs, DbName),
case couch_db:is_idle(Db) of true ->
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
+ true = ets:delete(CouchDbs, DbName),
+ true = ets:delete(CouchDbsPidToName, Pid),
exit(Pid, kill),
{true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}};
false ->
ElemSpec = {#entry.lock, unlocked},
- true = ets:update_element(couch_dbs, DbName, ElemSpec),
+ true = ets:update_element(CouchDbs, DbName, ElemSpec),
couch_stats:increment_counter([couchdb, couch_server, lru_skip]),
close_int(gb_trees:next(Iter), update(DbName, Cache))
end;
diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl
index dc2d9e51a..48c6d42ab 100644
--- a/src/couch/src/couch_primary_sup.erl
+++ b/src/couch/src/couch_primary_sup.erl
@@ -30,13 +30,21 @@ init([]) ->
permanent,
brutal_kill,
worker,
- [couch_task_status]},
- {couch_server,
- {couch_server, sup_start_link, []},
- permanent,
- brutal_kill,
- worker,
- [couch_server]}
- ],
+ [couch_task_status]}
+ ] ++ couch_servers(),
{ok, {{one_for_one, 10, 3600}, Children}}.
+
+couch_servers() ->
+ N = couch_server:num_servers(),
+ [couch_server(I) || I <- lists:seq(1, N)].
+
+couch_server(N) ->
+ Name = couch_server:couch_server(N),
+ {Name,
+ {couch_server, sup_start_link, [N]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_server]
+ }.
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index f57bf3b81..22797f80b 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -17,10 +17,9 @@
-export([open/2,create/2,delete/2,get_version/0,get_version/1,get_git_sha/0,get_uuid/0]).
-export([all_databases/0, all_databases/2]).
--export([init/1, handle_call/3,sup_start_link/0]).
+-export([init/1, handle_call/3,sup_start_link/1]).
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
--export([close_lru/0]).
-export([close_db_if_idle/1]).
-export([delete_compaction_files/1]).
-export([exists/1]).
@@ -28,6 +27,7 @@
-export([get_engine_path/2]).
-export([lock/2, unlock/1]).
-export([db_updated/1]).
+-export([num_servers/0, couch_server/1, couch_dbs_pid_to_name/1, couch_dbs/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -45,7 +45,10 @@
dbs_open=0,
start_time="",
update_lru_on_read=true,
- lru = couch_lru:new()
+ lru = couch_lru:new(),
+ couch_dbs,
+ couch_dbs_pid_to_name,
+ couch_dbs_locks
}).
dev_start() ->
@@ -72,12 +75,16 @@ get_uuid() ->
end.
get_stats() ->
- {ok, #server{start_time=Time,dbs_open=Open}} =
- gen_server:call(couch_server, get_server),
+ Fun = fun(N, {TimeAcc, OpenAcc}) ->
+ {ok, #server{start_time=Time,dbs_open=Open}} =
+ gen_server:call(couch_server(N), get_server),
+ {max(Time, TimeAcc), Open + OpenAcc} end,
+ {Time, Open} =
+ lists:foldl(Fun, {0, 0}, lists:seq(1, num_servers())),
[{start_time, ?l2b(Time)}, {dbs_open, Open}].
-sup_start_link() ->
- gen_server:start_link({local, couch_server}, couch_server, [], []).
+sup_start_link(N) ->
+ gen_server:start_link({local, couch_server(N)}, couch_server, [N], []).
open(DbName, Options) ->
try
@@ -89,7 +96,7 @@ open(DbName, Options) ->
open_int(DbName, Options0) ->
Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_dbs(DbName), DbName) of
[#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
update_lru(DbName, Entry#entry.db_options),
{ok, Db1} = couch_db:incref(Db0),
@@ -98,7 +105,7 @@ open_int(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
Timeout = couch_util:get_value(timeout, Options, infinity),
Create = couch_util:get_value(create_if_missing, Options, false),
- case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
+ case gen_server:call(couch_server(DbName), {open, DbName, Options}, Timeout) of
{ok, Db0} ->
{ok, Db1} = couch_db:incref(Db0),
couch_db:set_user_ctx(Db1, Ctx);
@@ -114,15 +121,13 @@ update_lru(DbName, Options) ->
case config:get_boolean("couchdb", "update_lru_on_read", false) of
true ->
case lists:member(sys_db, Options) of
- false -> gen_server:cast(couch_server, {update_lru, DbName});
+ false -> gen_server:cast(couch_server(DbName), {update_lru, DbName});
true -> ok
end;
false ->
ok
end.
-close_lru() ->
- gen_server:call(couch_server, close_lru).
create(DbName, Options) ->
try
@@ -135,7 +140,7 @@ create(DbName, Options) ->
create_int(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
couch_partition:validate_dbname(DbName, Options),
- case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
+ case gen_server:call(couch_server(DbName), {create, DbName, Options}, infinity) of
{ok, Db0} ->
Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
{ok, Db1} = couch_db:incref(Db0),
@@ -145,7 +150,7 @@ create_int(DbName, Options0) ->
end.
delete(DbName, Options) ->
- gen_server:call(couch_server, {delete, DbName, Options}, infinity).
+ gen_server:call(couch_server(DbName), {delete, DbName, Options}, infinity).
exists(DbName) ->
@@ -226,15 +231,15 @@ hash_admin_passwords(Persist) ->
end, couch_passwords:get_unhashed_admins()).
close_db_if_idle(DbName) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_dbs(DbName), DbName) of
[#entry{}] ->
- gen_server:cast(couch_server, {close_db_if_idle, DbName});
+ gen_server:cast(couch_server(DbName), {close_db_if_idle, DbName});
[] ->
ok
end.
-init([]) ->
+init([N]) ->
couch_util:set_mqd_off_heap(?MODULE),
couch_util:set_process_priority(?MODULE, high),
@@ -268,18 +273,18 @@ init([]) ->
config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))),
UpdateLruOnRead =
config:get("couchdb", "update_lru_on_read", "false") =:= "true",
- ok = config:listen_for_changes(?MODULE, nil),
+ ok = config:listen_for_changes(?MODULE, N),
ok = couch_file:init_delete_dir(RootDir),
hash_admin_passwords(),
- ets:new(couch_dbs, [
+ ets:new(couch_dbs(N), [
set,
protected,
named_table,
{keypos, #entry.name},
{read_concurrency, true}
]),
- ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
- ets:new(couch_dbs_locks, [
+ ets:new(couch_dbs_pid_to_name(N), [set, protected, named_table]),
+ ets:new(couch_dbs_locks(N), [
set,
public,
named_table,
@@ -290,7 +295,10 @@ init([]) ->
engines = Engines,
max_dbs_open=MaxDbsOpen,
update_lru_on_read=UpdateLruOnRead,
- start_time=couch_util:rfc1123_date()}}.
+ start_time=couch_util:rfc1123_date(),
+ couch_dbs=couch_dbs(N),
+ couch_dbs_pid_to_name=couch_dbs_pid_to_name(N),
+ couch_dbs_locks=couch_dbs_locks(N)}}.
terminate(Reason, Srv) ->
couch_log:error("couch_server terminating with ~p, state ~2048p",
@@ -302,40 +310,50 @@ terminate(Reason, Srv) ->
if Db == undefined -> ok; true ->
couch_util:shutdown_sync(couch_db:get_pid(Db))
end
- end, nil, couch_dbs),
+ end, nil, couch_dbs(Srv)),
ok.
handle_config_change("couchdb", "database_dir", _, _, _) ->
exit(whereis(couch_server), config_change),
remove_handler;
-handle_config_change("couchdb", "update_lru_on_read", "true", _, _) ->
- {ok, gen_server:call(couch_server,{set_update_lru_on_read,true})};
-handle_config_change("couchdb", "update_lru_on_read", _, _, _) ->
- {ok, gen_server:call(couch_server,{set_update_lru_on_read,false})};
-handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) ->
- {ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})};
-handle_config_change("couchdb", "max_dbs_open", _, _, _) ->
- {ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})};
-handle_config_change("couchdb_engines", _, _, _, _) ->
- {ok, gen_server:call(couch_server, reload_engines)};
-handle_config_change("admins", _, _, Persist, _) ->
+handle_config_change("couchdb", "update_lru_on_read", "true", _, N) ->
+ gen_server:call(couch_server(N),{set_update_lru_on_read,true}),
+ {ok, N};
+handle_config_change("couchdb", "update_lru_on_read", _, _, N) ->
+ gen_server:call(couch_server(N),{set_update_lru_on_read,false}),
+ {ok, N};
+handle_config_change("couchdb", "max_dbs_open", Max, _, N) when is_list(Max) ->
+ gen_server:call(couch_server(N),{set_max_dbs_open,list_to_integer(Max)}),
+ {ok, N};
+handle_config_change("couchdb", "max_dbs_open", _, _, N) ->
+ gen_server:call(couch_server(N),{set_max_dbs_open,?MAX_DBS_OPEN}),
+ {ok, N};
+handle_config_change("couchdb_engines", _, _, _, N) ->
+ gen_server:call(couch_server(N), reload_engines),
+ {ok, N};
+handle_config_change("admins", _, _, Persist, N) ->
% spawn here so couch event manager doesn't deadlock
- {ok, spawn(fun() -> hash_admin_passwords(Persist) end)};
-handle_config_change("httpd", "authentication_handlers", _, _, _) ->
- {ok, couch_httpd:stop()};
-handle_config_change("httpd", "bind_address", _, _, _) ->
- {ok, couch_httpd:stop()};
-handle_config_change("httpd", "port", _, _, _) ->
- {ok, couch_httpd:stop()};
-handle_config_change("httpd", "max_connections", _, _, _) ->
- {ok, couch_httpd:stop()};
-handle_config_change(_, _, _, _, _) ->
- {ok, nil}.
+ spawn(fun() -> hash_admin_passwords(Persist) end),
+ {ok, N};
+handle_config_change("httpd", "authentication_handlers", _, _, N) ->
+ couch_httpd:stop(),
+ {ok, N};
+handle_config_change("httpd", "bind_address", _, _, N) ->
+ couch_httpd:stop(),
+ {ok, N};
+handle_config_change("httpd", "port", _, _, N) ->
+ couch_httpd:stop(),
+ {ok, N};
+handle_config_change("httpd", "max_connections", _, _, N) ->
+ couch_httpd:stop(),
+ {ok, N};
+handle_config_change(_, _, _, _, N) ->
+ {ok, N}.
handle_config_terminate(_, stop, _) ->
ok;
-handle_config_terminate(_Server, _Reason, _State) ->
- erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
+handle_config_terminate(_Server, _Reason, N) ->
+ erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), {restart_config_listener, N}).
all_databases() ->
@@ -344,7 +362,7 @@ all_databases() ->
{ok, lists:usort(DbList)}.
all_databases(Fun, Acc0) ->
- {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server),
+ {ok, #server{root_dir=Root}} = gen_server:call(couch_server_1, get_server),
NormRoot = couch_util:normpath(Root),
Extensions = get_engine_extensions(),
ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")",
@@ -425,7 +443,7 @@ open_async(Server, From, DbName, Options) ->
true -> create;
false -> open
end,
- true = ets:insert(couch_dbs, #entry{
+ true = ets:insert(couch_dbs(Server), #entry{
name = DbName,
pid = Opener,
lock = locked,
@@ -433,7 +451,7 @@ open_async(Server, From, DbName, Options) ->
req_type = ReqType,
db_options = Options
}),
- true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}),
+ true = ets:insert(couch_dbs_pid_to_name(Server), {Opener, DbName}),
db_opened(Server, Options).
open_async_int(Server, DbName, Options) ->
@@ -468,9 +486,9 @@ handle_call(reload_engines, _From, Server) ->
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
- true = ets:delete(couch_dbs_pid_to_name, Opener),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Opener),
DbPid = couch_db:get_pid(Db),
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] ->
% db was deleted during async open
exit(DbPid, kill),
@@ -485,7 +503,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
_ ->
ok
end,
- true = ets:insert(couch_dbs, #entry{
+ true = ets:insert(couch_dbs(Server), #entry{
name = DbName,
db = Db,
pid = DbPid,
@@ -493,7 +511,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
db_options = Entry#entry.db_options,
start_time = couch_db:get_instance_start_time(Db)
}),
- true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
+ true = ets:insert(couch_dbs_pid_to_name(Server), {DbPid, DbName}),
Lru = case couch_db:is_system_db(Db) of
false ->
couch_lru:insert(DbName, Server#server.lru);
@@ -511,14 +529,14 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
handle_call({open_result, DbName, {error, eexist}}, From, Server) ->
handle_call({open_result, DbName, file_exists}, From, Server);
handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] ->
% db was deleted during async open
{reply, ok, Server};
[#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
[gen_server:reply(Waiter, Error) || Waiter <- Waiters],
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Opener),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Opener),
NewServer = case ReqType of
{create, DbName, Options, CrFrom} ->
open_async(Server, CrFrom, DbName, Options);
@@ -532,7 +550,7 @@ handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
{reply, ok, Server}
end;
handle_call({open, DbName, Options}, From, Server) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] ->
case make_room(Server, Options) of
{ok, Server2} ->
@@ -541,7 +559,7 @@ handle_call({open, DbName, Options}, From, Server) ->
{reply, CloseError, Server}
end;
[#entry{waiters = Waiters} = Entry] when is_list(Waiters) ->
- true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}),
+ true = ets:insert(couch_dbs(Server), Entry#entry{waiters = [From | Waiters]}),
NumWaiters = length(Waiters),
if NumWaiters =< 10 orelse NumWaiters rem 10 /= 0 -> ok; true ->
Fmt = "~b clients waiting to open db ~s",
@@ -552,7 +570,7 @@ handle_call({open, DbName, Options}, From, Server) ->
{reply, {ok, Db}, Server}
end;
handle_call({create, DbName, Options}, From, Server) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] ->
case make_room(Server, Options) of
{ok, Server2} ->
@@ -567,7 +585,7 @@ handle_call({create, DbName, Options}, From, Server) ->
% to wait while we figure out if it'll succeed.
CrOptions = [create | Options],
Req = {create, DbName, CrOptions, From},
- true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
+ true = ets:insert(couch_dbs(Server), Entry#entry{req_type = Req}),
{noreply, Server};
[_AlreadyRunningDb] ->
{reply, file_exists, Server}
@@ -577,17 +595,17 @@ handle_call({delete, DbName, Options}, _From, Server) ->
case check_dbname(DbNameList) of
ok ->
Server2 =
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] -> Server;
[#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) ->
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Pid),
exit(Pid, kill),
[gen_server:reply(Waiter, not_found) || Waiter <- Waiters],
db_closed(Server, Entry#entry.db_options);
[#entry{pid = Pid} = Entry] ->
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Pid),
exit(Pid, kill),
db_closed(Server, Entry#entry.db_options)
end,
@@ -616,9 +634,9 @@ handle_call({delete, DbName, Options}, _From, Server) ->
handle_call({db_updated, Db}, _From, Server0) ->
DbName = couch_db:name(Db),
StartTime = couch_db:get_instance_start_time(Db),
- Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of
+ Server = try ets:lookup_element(couch_dbs(Server0), DbName, #entry.start_time) of
StartTime ->
- true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}),
+ true = ets:update_element(couch_dbs(Server0), DbName, {#entry.db, Db}),
Lru = case couch_db:is_system_db(Db) of
false -> couch_lru:update(DbName, Server0#server.lru);
true -> Server0#server.lru
@@ -636,19 +654,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} =
handle_cast({update_lru, _DbName}, Server) ->
{noreply, Server};
handle_cast({close_db_if_idle, DbName}, Server) ->
- case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
+ case ets:update_element(couch_dbs(Server), DbName, {#entry.lock, locked}) of
true ->
- [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName),
+ [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs(Server), DbName),
case couch_db:is_idle(Db) of
true ->
DbPid = couch_db:get_pid(Db),
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, DbPid),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), DbPid),
exit(DbPid, kill),
{noreply, db_closed(Server, DbOpts)};
false ->
true = ets:update_element(
- couch_dbs, DbName, {#entry.lock, unlocked}),
+ couch_dbs(Server), DbName, {#entry.lock, unlocked}),
{noreply, Server}
end;
false ->
@@ -664,9 +682,9 @@ code_change(_OldVsn, #server{}=State, _Extra) ->
handle_info({'EXIT', _Pid, config_change}, Server) ->
{stop, config_change, Server};
handle_info({'EXIT', Pid, Reason}, Server) ->
- case ets:lookup(couch_dbs_pid_to_name, Pid) of
+ case ets:lookup(couch_dbs_pid_to_name(Server), Pid) of
[{Pid, DbName}] ->
- [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName),
+ [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs(Server), DbName),
if Reason /= snappy_nif_not_loaded -> ok; true ->
Msg = io_lib:format("To open the database `~s`, Apache CouchDB "
"must be built with Erlang OTP R13B04 or higher.", [DbName]),
@@ -681,14 +699,14 @@ handle_info({'EXIT', Pid, Reason}, Server) ->
if not is_list(Waiters) -> ok; true ->
[gen_server:reply(Waiter, Reason) || Waiter <- Waiters]
end,
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Pid),
{noreply, db_closed(Server, Entry#entry.db_options)};
[] ->
{noreply, Server}
end;
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
+handle_info({restart_config_listener, N}, State) ->
+ ok = config:listen_for_changes(?MODULE, N),
{noreply, State};
handle_info(Info, Server) ->
{stop, {unknown_message, Info}, Server}.
@@ -720,7 +738,7 @@ validate_open_or_create(DbName, Options) ->
throw({?MODULE, EngineError})
end,
- case ets:lookup(couch_dbs_locks, DbName) of
+ case ets:lookup(couch_dbs_locks(DbName), DbName) of
[] ->
ok;
[{DbName, Reason}] ->
@@ -861,21 +879,62 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
end.
lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_dbs(DbName), DbName) of
[] ->
- true = ets:insert(couch_dbs_locks, {DbName, Reason}),
+ true = ets:insert(couch_dbs_locks(DbName), {DbName, Reason}),
ok;
[#entry{}] ->
{error, already_opened}
end.
unlock(DbName) when is_binary(DbName) ->
- true = ets:delete(couch_dbs_locks, DbName),
+ true = ets:delete(couch_dbs_locks(DbName), DbName),
ok.
db_updated(Db) ->
- gen_server:call(couch_server, {db_updated, Db}, infinity).
+ DbName = couch_db:name(Db),
+ gen_server:call(couch_server(DbName), {db_updated, Db}, infinity).
+
+
+couch_server(Arg) ->
+ name("couch_server", Arg).
+
+
+couch_dbs(Arg) ->
+ name("couch_dbs", Arg).
+
+
+couch_dbs_pid_to_name(Arg) ->
+ name("couch_dbs_pid_to_name", Arg).
+
+
+couch_dbs_locks(Arg) ->
+ name("couch_dbs_locks", Arg).
+
+
+name("couch_dbs", #server{} = Server) ->
+ Server#server.couch_dbs;
+
+name("couch_dbs_pid_to_name", #server{} = Server) ->
+ Server#server.couch_dbs_pid_to_name;
+
+name("couch_dbs_locks", #server{} = Server) ->
+ Server#server.couch_dbs_locks;
+
+name(BaseName, DbName) when is_list(DbName) ->
+ name(BaseName, ?l2b(DbName));
+
+name(BaseName, DbName) when is_binary(DbName) ->
+ N = 1 + erlang:phash2(DbName, num_servers()),
+ name(BaseName, N);
+
+name(BaseName, N) when is_integer(N), N > 0 ->
+ list_to_atom(BaseName ++ "_" ++ integer_to_list(N)).
+
+
+num_servers() ->
+ erlang:system_info(schedulers).
-ifdef(TEST).
diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
index 188078c2d..090217b4c 100644
--- a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
+++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
@@ -283,7 +283,8 @@ wait_db_cleared(Db, N) when N < 0 ->
erlang:error({db_clear_timeout, couch_db:name(Db)});
wait_db_cleared(Db, N) ->
- case ets:lookup(couch_dbs, couch_db:name(Db)) of
+ Tab = couch_server:couch_dbs(couch_db:name(Db)),
+ case ets:lookup(Tab, couch_db:name(Db)) of
[] ->
ok;
[#entry{db = NewDb}] ->
@@ -332,4 +333,4 @@ purge_module() ->
_ ->
code:delete(couch_db_updater),
code:purge(couch_db_updater)
- end. \ No newline at end of file
+ end.
diff --git a/src/couch/test/eunit/couch_db_split_tests.erl b/src/couch/test/eunit/couch_db_split_tests.erl
index 6e24c36ee..b52184a8c 100644
--- a/src/couch/test/eunit/couch_db_split_tests.erl
+++ b/src/couch/test/eunit/couch_db_split_tests.erl
@@ -113,7 +113,7 @@ should_fail_on_existing_target(DbName) ->
ok = couch_db:close(Db),
exit(Pid, kill),
test_util:wait(fun() ->
- case ets:lookup(couch_dbs, TName) of
+ case ets:lookup(couch_server:couch_dbs(DbName), TName) of
[] -> ok;
[_ | _] -> wait
end
diff --git a/src/couch/test/eunit/couch_db_tests.erl b/src/couch/test/eunit/couch_db_tests.erl
index dd2cb427d..d52a15597 100644
--- a/src/couch/test/eunit/couch_db_tests.erl
+++ b/src/couch/test/eunit/couch_db_tests.erl
@@ -109,7 +109,8 @@ should_delete_db(DbName) ->
should_create_multiple_dbs(DbNames) ->
?_test(begin
- gen_server:call(couch_server, {set_max_dbs_open, 3}),
+ [gen_server:call(couch_server:couch_server(N), {set_max_dbs_open, 3}) ||
+ N <- lists:seq(1, couch_server:num_servers())],
{ok, Before} = couch_server:all_databases(),
[?assertNot(lists:member(DbName, Before)) || DbName <- DbNames],
[?assert(create_db(DbName)) || DbName <- DbNames],
@@ -170,7 +171,7 @@ locking_should_work(DbName) ->
ok = couch_db:close(Db),
catch exit(couch_db:get_pid(Db), kill),
test_util:wait(fun() ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_server:couch_dbs(DbName), DbName) of
[] -> ok;
[_ | _] -> wait
end
diff --git a/src/couch/test/eunit/couch_server_tests.erl b/src/couch/test/eunit/couch_server_tests.erl
index 7d50700d2..66533d48c 100644
--- a/src/couch/test/eunit/couch_server_tests.erl
+++ b/src/couch/test/eunit/couch_server_tests.erl
@@ -169,7 +169,7 @@ start_interleaved() ->
DbPid = couch_db:get_pid(Db),
unlink(DbPid),
Msg = {'EXIT', DbPid, killed},
- erlang:send_after(2000, whereis(couch_server), Msg);
+ erlang:send_after(2000, whereis(couch_server:couch_server(DbName)), Msg);
_ ->
ok
end,
@@ -202,7 +202,7 @@ t_interleaved_create_delete_open(DbName) ->
% Get the current couch_server pid so we're sure
% to not end up messaging two different pids
- CouchServer = whereis(couch_server),
+ CouchServer = whereis(couch_server:couch_server(DbName)),
% Start our first instance that will succeed in
% an invalid state. Notice that the opener pid
@@ -250,7 +250,7 @@ t_interleaved_create_delete_open(DbName) ->
get_opener_pid(DbName) ->
WaitFun = fun() ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_server:couch_dbs(DbName), DbName) of
[#entry{pid = Pid}] ->
{ok, Pid};
[] ->
diff --git a/src/couch/test/eunit/couchdb_db_tests.erl b/src/couch/test/eunit/couchdb_db_tests.erl
index 734bafb9f..338f2cd3c 100644
--- a/src/couch/test/eunit/couchdb_db_tests.erl
+++ b/src/couch/test/eunit/couchdb_db_tests.erl
@@ -65,12 +65,12 @@ should_close_deleted_db(DbName) ->
throw(timeout_error)
end,
test_util:wait(fun() ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(couch_server:couch_dbs(DbName), DbName) of
[] -> ok;
_ -> wait
end
end),
- ?assertEqual([], ets:lookup(couch_dbs, DbName))
+ ?assertEqual([], ets:lookup(couch_server:couch_dbs(DbName), DbName))
end).
diff --git a/src/couch_pse_tests/src/cpse_util.erl b/src/couch_pse_tests/src/cpse_util.erl
index 24f49e88c..55622e925 100644
--- a/src/couch_pse_tests/src/cpse_util.erl
+++ b/src/couch_pse_tests/src/cpse_util.erl
@@ -116,7 +116,8 @@ shutdown_db(Db) ->
erlang:error(database_shutdown_timeout)
end,
test_util:wait(fun() ->
- case ets:member(couch_dbs, couch_db:name(Db)) of
+ case ets:member(couch_server:couch_dbs(couch_db:name(Db)),
+ couch_db:name(Db)) of
true -> wait;
false -> ok
end