diff options
author | Robert Newson <rnewson@apache.org> | 2021-02-09 15:42:50 +0000 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2021-02-12 13:12:53 +0000 |
commit | c4b254fcb9addf95c0fa42cdb810c285e4572210 (patch) | |
tree | 23c0159c3cfcc87a26ad5870eab64d2c54020265 | |
parent | 5819f71d23e46917f6ec8e41a2bf70d34b4cc757 (diff) | |
download | couchdb-couch_server_sharding.tar.gz |
Shard couch_server for performancecouch_server_sharding
-rw-r--r-- | src/couch/src/couch_lru.erl | 13 | ||||
-rw-r--r-- | src/couch/src/couch_primary_sup.erl | 24 | ||||
-rw-r--r-- | src/couch/src/couch_server.erl | 227 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl | 5 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_db_split_tests.erl | 2 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_db_tests.erl | 5 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_server_tests.erl | 6 | ||||
-rw-r--r-- | src/couch/test/eunit/couchdb_db_tests.erl | 4 | ||||
-rw-r--r-- | src/couch_pse_tests/src/cpse_util.erl | 3 |
9 files changed, 181 insertions, 108 deletions
diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl index 6ad7c65cd..618a0144f 100644 --- a/src/couch/src/couch_lru.erl +++ b/src/couch/src/couch_lru.erl @@ -43,17 +43,20 @@ close({Tree, _} = Cache) -> close_int(none, _) -> false; close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) -> - case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of + CouchDbs = couch_server:couch_dbs(DbName), + CouchDbsPidToName = couch_server:couch_dbs_pid_to_name(DbName), + + case ets:update_element(CouchDbs, DbName, {#entry.lock, locked}) of true -> - [#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName), + [#entry{db = Db, pid = Pid}] = ets:lookup(CouchDbs, DbName), case couch_db:is_idle(Db) of true -> - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), + true = ets:delete(CouchDbs, DbName), + true = ets:delete(CouchDbsPidToName, Pid), exit(Pid, kill), {true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}}; false -> ElemSpec = {#entry.lock, unlocked}, - true = ets:update_element(couch_dbs, DbName, ElemSpec), + true = ets:update_element(CouchDbs, DbName, ElemSpec), couch_stats:increment_counter([couchdb, couch_server, lru_skip]), close_int(gb_trees:next(Iter), update(DbName, Cache)) end; diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl index dc2d9e51a..48c6d42ab 100644 --- a/src/couch/src/couch_primary_sup.erl +++ b/src/couch/src/couch_primary_sup.erl @@ -30,13 +30,21 @@ init([]) -> permanent, brutal_kill, worker, - [couch_task_status]}, - {couch_server, - {couch_server, sup_start_link, []}, - permanent, - brutal_kill, - worker, - [couch_server]} - ], + [couch_task_status]} + ] ++ couch_servers(), {ok, {{one_for_one, 10, 3600}, Children}}. + +couch_servers() -> + N = couch_server:num_servers(), + [couch_server(I) || I <- lists:seq(1, N)]. + +couch_server(N) -> + Name = couch_server:couch_server(N), + {Name, + {couch_server, sup_start_link, [N]}, + permanent, + brutal_kill, + worker, + [couch_server] + }. diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index f57bf3b81..22797f80b 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -17,10 +17,9 @@ -export([open/2,create/2,delete/2,get_version/0,get_version/1,get_git_sha/0,get_uuid/0]). -export([all_databases/0, all_databases/2]). --export([init/1, handle_call/3,sup_start_link/0]). +-export([init/1, handle_call/3,sup_start_link/1]). -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). -export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]). --export([close_lru/0]). -export([close_db_if_idle/1]). -export([delete_compaction_files/1]). -export([exists/1]). @@ -28,6 +27,7 @@ -export([get_engine_path/2]). -export([lock/2, unlock/1]). -export([db_updated/1]). +-export([num_servers/0, couch_server/1, couch_dbs_pid_to_name/1, couch_dbs/1]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -45,7 +45,10 @@ dbs_open=0, start_time="", update_lru_on_read=true, - lru = couch_lru:new() + lru = couch_lru:new(), + couch_dbs, + couch_dbs_pid_to_name, + couch_dbs_locks }). dev_start() -> @@ -72,12 +75,16 @@ get_uuid() -> end. get_stats() -> - {ok, #server{start_time=Time,dbs_open=Open}} = - gen_server:call(couch_server, get_server), + Fun = fun(N, {TimeAcc, OpenAcc}) -> + {ok, #server{start_time=Time,dbs_open=Open}} = + gen_server:call(couch_server(N), get_server), + {max(Time, TimeAcc), Open + OpenAcc} end, + {Time, Open} = + lists:foldl(Fun, {0, 0}, lists:seq(1, num_servers())), [{start_time, ?l2b(Time)}, {dbs_open, Open}]. -sup_start_link() -> - gen_server:start_link({local, couch_server}, couch_server, [], []). +sup_start_link(N) -> + gen_server:start_link({local, couch_server(N)}, couch_server, [N], []). open(DbName, Options) -> try @@ -89,7 +96,7 @@ open(DbName, Options) -> open_int(DbName, Options0) -> Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}), - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(DbName), DbName) of [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked -> update_lru(DbName, Entry#entry.db_options), {ok, Db1} = couch_db:incref(Db0), @@ -98,7 +105,7 @@ open_int(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), Timeout = couch_util:get_value(timeout, Options, infinity), Create = couch_util:get_value(create_if_missing, Options, false), - case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of + case gen_server:call(couch_server(DbName), {open, DbName, Options}, Timeout) of {ok, Db0} -> {ok, Db1} = couch_db:incref(Db0), couch_db:set_user_ctx(Db1, Ctx); @@ -114,15 +121,13 @@ update_lru(DbName, Options) -> case config:get_boolean("couchdb", "update_lru_on_read", false) of true -> case lists:member(sys_db, Options) of - false -> gen_server:cast(couch_server, {update_lru, DbName}); + false -> gen_server:cast(couch_server(DbName), {update_lru, DbName}); true -> ok end; false -> ok end. -close_lru() -> - gen_server:call(couch_server, close_lru). create(DbName, Options) -> try @@ -135,7 +140,7 @@ create(DbName, Options) -> create_int(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), couch_partition:validate_dbname(DbName, Options), - case gen_server:call(couch_server, {create, DbName, Options}, infinity) of + case gen_server:call(couch_server(DbName), {create, DbName, Options}, infinity) of {ok, Db0} -> Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), {ok, Db1} = couch_db:incref(Db0), @@ -145,7 +150,7 @@ create_int(DbName, Options0) -> end. delete(DbName, Options) -> - gen_server:call(couch_server, {delete, DbName, Options}, infinity). + gen_server:call(couch_server(DbName), {delete, DbName, Options}, infinity). exists(DbName) -> @@ -226,15 +231,15 @@ hash_admin_passwords(Persist) -> end, couch_passwords:get_unhashed_admins()). close_db_if_idle(DbName) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(DbName), DbName) of [#entry{}] -> - gen_server:cast(couch_server, {close_db_if_idle, DbName}); + gen_server:cast(couch_server(DbName), {close_db_if_idle, DbName}); [] -> ok end. -init([]) -> +init([N]) -> couch_util:set_mqd_off_heap(?MODULE), couch_util:set_process_priority(?MODULE, high), @@ -268,18 +273,18 @@ init([]) -> config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))), UpdateLruOnRead = config:get("couchdb", "update_lru_on_read", "false") =:= "true", - ok = config:listen_for_changes(?MODULE, nil), + ok = config:listen_for_changes(?MODULE, N), ok = couch_file:init_delete_dir(RootDir), hash_admin_passwords(), - ets:new(couch_dbs, [ + ets:new(couch_dbs(N), [ set, protected, named_table, {keypos, #entry.name}, {read_concurrency, true} ]), - ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), - ets:new(couch_dbs_locks, [ + ets:new(couch_dbs_pid_to_name(N), [set, protected, named_table]), + ets:new(couch_dbs_locks(N), [ set, public, named_table, @@ -290,7 +295,10 @@ init([]) -> engines = Engines, max_dbs_open=MaxDbsOpen, update_lru_on_read=UpdateLruOnRead, - start_time=couch_util:rfc1123_date()}}. + start_time=couch_util:rfc1123_date(), + couch_dbs=couch_dbs(N), + couch_dbs_pid_to_name=couch_dbs_pid_to_name(N), + couch_dbs_locks=couch_dbs_locks(N)}}. terminate(Reason, Srv) -> couch_log:error("couch_server terminating with ~p, state ~2048p", @@ -302,40 +310,50 @@ terminate(Reason, Srv) -> if Db == undefined -> ok; true -> couch_util:shutdown_sync(couch_db:get_pid(Db)) end - end, nil, couch_dbs), + end, nil, couch_dbs(Srv)), ok. handle_config_change("couchdb", "database_dir", _, _, _) -> exit(whereis(couch_server), config_change), remove_handler; -handle_config_change("couchdb", "update_lru_on_read", "true", _, _) -> - {ok, gen_server:call(couch_server,{set_update_lru_on_read,true})}; -handle_config_change("couchdb", "update_lru_on_read", _, _, _) -> - {ok, gen_server:call(couch_server,{set_update_lru_on_read,false})}; -handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) -> - {ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})}; -handle_config_change("couchdb", "max_dbs_open", _, _, _) -> - {ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})}; -handle_config_change("couchdb_engines", _, _, _, _) -> - {ok, gen_server:call(couch_server, reload_engines)}; -handle_config_change("admins", _, _, Persist, _) -> +handle_config_change("couchdb", "update_lru_on_read", "true", _, N) -> + gen_server:call(couch_server(N),{set_update_lru_on_read,true}), + {ok, N}; +handle_config_change("couchdb", "update_lru_on_read", _, _, N) -> + gen_server:call(couch_server(N),{set_update_lru_on_read,false}), + {ok, N}; +handle_config_change("couchdb", "max_dbs_open", Max, _, N) when is_list(Max) -> + gen_server:call(couch_server(N),{set_max_dbs_open,list_to_integer(Max)}), + {ok, N}; +handle_config_change("couchdb", "max_dbs_open", _, _, N) -> + gen_server:call(couch_server(N),{set_max_dbs_open,?MAX_DBS_OPEN}), + {ok, N}; +handle_config_change("couchdb_engines", _, _, _, N) -> + gen_server:call(couch_server(N), reload_engines), + {ok, N}; +handle_config_change("admins", _, _, Persist, N) -> % spawn here so couch event manager doesn't deadlock - {ok, spawn(fun() -> hash_admin_passwords(Persist) end)}; -handle_config_change("httpd", "authentication_handlers", _, _, _) -> - {ok, couch_httpd:stop()}; -handle_config_change("httpd", "bind_address", _, _, _) -> - {ok, couch_httpd:stop()}; -handle_config_change("httpd", "port", _, _, _) -> - {ok, couch_httpd:stop()}; -handle_config_change("httpd", "max_connections", _, _, _) -> - {ok, couch_httpd:stop()}; -handle_config_change(_, _, _, _, _) -> - {ok, nil}. + spawn(fun() -> hash_admin_passwords(Persist) end), + {ok, N}; +handle_config_change("httpd", "authentication_handlers", _, _, N) -> + couch_httpd:stop(), + {ok, N}; +handle_config_change("httpd", "bind_address", _, _, N) -> + couch_httpd:stop(), + {ok, N}; +handle_config_change("httpd", "port", _, _, N) -> + couch_httpd:stop(), + {ok, N}; +handle_config_change("httpd", "max_connections", _, _, N) -> + couch_httpd:stop(), + {ok, N}; +handle_config_change(_, _, _, _, N) -> + {ok, N}. handle_config_terminate(_, stop, _) -> ok; -handle_config_terminate(_Server, _Reason, _State) -> - erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). +handle_config_terminate(_Server, _Reason, N) -> + erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), {restart_config_listener, N}). all_databases() -> @@ -344,7 +362,7 @@ all_databases() -> {ok, lists:usort(DbList)}. all_databases(Fun, Acc0) -> - {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server), + {ok, #server{root_dir=Root}} = gen_server:call(couch_server_1, get_server), NormRoot = couch_util:normpath(Root), Extensions = get_engine_extensions(), ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")", @@ -425,7 +443,7 @@ open_async(Server, From, DbName, Options) -> true -> create; false -> open end, - true = ets:insert(couch_dbs, #entry{ + true = ets:insert(couch_dbs(Server), #entry{ name = DbName, pid = Opener, lock = locked, @@ -433,7 +451,7 @@ open_async(Server, From, DbName, Options) -> req_type = ReqType, db_options = Options }), - true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}), + true = ets:insert(couch_dbs_pid_to_name(Server), {Opener, DbName}), db_opened(Server, Options). open_async_int(Server, DbName, Options) -> @@ -468,9 +486,9 @@ handle_call(reload_engines, _From, Server) -> handle_call(get_server, _From, Server) -> {reply, {ok, Server}, Server}; handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> - true = ets:delete(couch_dbs_pid_to_name, Opener), + true = ets:delete(couch_dbs_pid_to_name(Server), Opener), DbPid = couch_db:get_pid(Db), - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> % db was deleted during async open exit(DbPid, kill), @@ -485,7 +503,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> _ -> ok end, - true = ets:insert(couch_dbs, #entry{ + true = ets:insert(couch_dbs(Server), #entry{ name = DbName, db = Db, pid = DbPid, @@ -493,7 +511,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> db_options = Entry#entry.db_options, start_time = couch_db:get_instance_start_time(Db) }), - true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}), + true = ets:insert(couch_dbs_pid_to_name(Server), {DbPid, DbName}), Lru = case couch_db:is_system_db(Db) of false -> couch_lru:insert(DbName, Server#server.lru); @@ -511,14 +529,14 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> handle_call({open_result, DbName, {error, eexist}}, From, Server) -> handle_call({open_result, DbName, file_exists}, From, Server); handle_call({open_result, DbName, Error}, {Opener, _}, Server) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> % db was deleted during async open {reply, ok, Server}; [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] -> [gen_server:reply(Waiter, Error) || Waiter <- Waiters], - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Opener), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), Opener), NewServer = case ReqType of {create, DbName, Options, CrFrom} -> open_async(Server, CrFrom, DbName, Options); @@ -532,7 +550,7 @@ handle_call({open_result, DbName, Error}, {Opener, _}, Server) -> {reply, ok, Server} end; handle_call({open, DbName, Options}, From, Server) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> case make_room(Server, Options) of {ok, Server2} -> @@ -541,7 +559,7 @@ handle_call({open, DbName, Options}, From, Server) -> {reply, CloseError, Server} end; [#entry{waiters = Waiters} = Entry] when is_list(Waiters) -> - true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}), + true = ets:insert(couch_dbs(Server), Entry#entry{waiters = [From | Waiters]}), NumWaiters = length(Waiters), if NumWaiters =< 10 orelse NumWaiters rem 10 /= 0 -> ok; true -> Fmt = "~b clients waiting to open db ~s", @@ -552,7 +570,7 @@ handle_call({open, DbName, Options}, From, Server) -> {reply, {ok, Db}, Server} end; handle_call({create, DbName, Options}, From, Server) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> case make_room(Server, Options) of {ok, Server2} -> @@ -567,7 +585,7 @@ handle_call({create, DbName, Options}, From, Server) -> % to wait while we figure out if it'll succeed. CrOptions = [create | Options], Req = {create, DbName, CrOptions, From}, - true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), + true = ets:insert(couch_dbs(Server), Entry#entry{req_type = Req}), {noreply, Server}; [_AlreadyRunningDb] -> {reply, file_exists, Server} @@ -577,17 +595,17 @@ handle_call({delete, DbName, Options}, _From, Server) -> case check_dbname(DbNameList) of ok -> Server2 = - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> Server; [#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) -> - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), Pid), exit(Pid, kill), [gen_server:reply(Waiter, not_found) || Waiter <- Waiters], db_closed(Server, Entry#entry.db_options); [#entry{pid = Pid} = Entry] -> - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), Pid), exit(Pid, kill), db_closed(Server, Entry#entry.db_options) end, @@ -616,9 +634,9 @@ handle_call({delete, DbName, Options}, _From, Server) -> handle_call({db_updated, Db}, _From, Server0) -> DbName = couch_db:name(Db), StartTime = couch_db:get_instance_start_time(Db), - Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of + Server = try ets:lookup_element(couch_dbs(Server0), DbName, #entry.start_time) of StartTime -> - true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}), + true = ets:update_element(couch_dbs(Server0), DbName, {#entry.db, Db}), Lru = case couch_db:is_system_db(Db) of false -> couch_lru:update(DbName, Server0#server.lru); true -> Server0#server.lru @@ -636,19 +654,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} = handle_cast({update_lru, _DbName}, Server) -> {noreply, Server}; handle_cast({close_db_if_idle, DbName}, Server) -> - case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of + case ets:update_element(couch_dbs(Server), DbName, {#entry.lock, locked}) of true -> - [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName), + [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs(Server), DbName), case couch_db:is_idle(Db) of true -> DbPid = couch_db:get_pid(Db), - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, DbPid), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), DbPid), exit(DbPid, kill), {noreply, db_closed(Server, DbOpts)}; false -> true = ets:update_element( - couch_dbs, DbName, {#entry.lock, unlocked}), + couch_dbs(Server), DbName, {#entry.lock, unlocked}), {noreply, Server} end; false -> @@ -664,9 +682,9 @@ code_change(_OldVsn, #server{}=State, _Extra) -> handle_info({'EXIT', _Pid, config_change}, Server) -> {stop, config_change, Server}; handle_info({'EXIT', Pid, Reason}, Server) -> - case ets:lookup(couch_dbs_pid_to_name, Pid) of + case ets:lookup(couch_dbs_pid_to_name(Server), Pid) of [{Pid, DbName}] -> - [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName), + [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs(Server), DbName), if Reason /= snappy_nif_not_loaded -> ok; true -> Msg = io_lib:format("To open the database `~s`, Apache CouchDB " "must be built with Erlang OTP R13B04 or higher.", [DbName]), @@ -681,14 +699,14 @@ handle_info({'EXIT', Pid, Reason}, Server) -> if not is_list(Waiters) -> ok; true -> [gen_server:reply(Waiter, Reason) || Waiter <- Waiters] end, - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), Pid), {noreply, db_closed(Server, Entry#entry.db_options)}; [] -> {noreply, Server} end; -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), +handle_info({restart_config_listener, N}, State) -> + ok = config:listen_for_changes(?MODULE, N), {noreply, State}; handle_info(Info, Server) -> {stop, {unknown_message, Info}, Server}. @@ -720,7 +738,7 @@ validate_open_or_create(DbName, Options) -> throw({?MODULE, EngineError}) end, - case ets:lookup(couch_dbs_locks, DbName) of + case ets:lookup(couch_dbs_locks(DbName), DbName) of [] -> ok; [{DbName, Reason}] -> @@ -861,21 +879,62 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) -> end. lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(DbName), DbName) of [] -> - true = ets:insert(couch_dbs_locks, {DbName, Reason}), + true = ets:insert(couch_dbs_locks(DbName), {DbName, Reason}), ok; [#entry{}] -> {error, already_opened} end. unlock(DbName) when is_binary(DbName) -> - true = ets:delete(couch_dbs_locks, DbName), + true = ets:delete(couch_dbs_locks(DbName), DbName), ok. db_updated(Db) -> - gen_server:call(couch_server, {db_updated, Db}, infinity). + DbName = couch_db:name(Db), + gen_server:call(couch_server(DbName), {db_updated, Db}, infinity). + + +couch_server(Arg) -> + name("couch_server", Arg). + + +couch_dbs(Arg) -> + name("couch_dbs", Arg). + + +couch_dbs_pid_to_name(Arg) -> + name("couch_dbs_pid_to_name", Arg). + + +couch_dbs_locks(Arg) -> + name("couch_dbs_locks", Arg). + + +name("couch_dbs", #server{} = Server) -> + Server#server.couch_dbs; + +name("couch_dbs_pid_to_name", #server{} = Server) -> + Server#server.couch_dbs_pid_to_name; + +name("couch_dbs_locks", #server{} = Server) -> + Server#server.couch_dbs_locks; + +name(BaseName, DbName) when is_list(DbName) -> + name(BaseName, ?l2b(DbName)); + +name(BaseName, DbName) when is_binary(DbName) -> + N = 1 + erlang:phash2(DbName, num_servers()), + name(BaseName, N); + +name(BaseName, N) when is_integer(N), N > 0 -> + list_to_atom(BaseName ++ "_" ++ integer_to_list(N)). + + +num_servers() -> + erlang:system_info(schedulers). -ifdef(TEST). diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl index 188078c2d..090217b4c 100644 --- a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl +++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl @@ -283,7 +283,8 @@ wait_db_cleared(Db, N) when N < 0 -> erlang:error({db_clear_timeout, couch_db:name(Db)}); wait_db_cleared(Db, N) -> - case ets:lookup(couch_dbs, couch_db:name(Db)) of + Tab = couch_server:couch_dbs(couch_db:name(Db)), + case ets:lookup(Tab, couch_db:name(Db)) of [] -> ok; [#entry{db = NewDb}] -> @@ -332,4 +333,4 @@ purge_module() -> _ -> code:delete(couch_db_updater), code:purge(couch_db_updater) - end.
\ No newline at end of file + end. diff --git a/src/couch/test/eunit/couch_db_split_tests.erl b/src/couch/test/eunit/couch_db_split_tests.erl index 6e24c36ee..b52184a8c 100644 --- a/src/couch/test/eunit/couch_db_split_tests.erl +++ b/src/couch/test/eunit/couch_db_split_tests.erl @@ -113,7 +113,7 @@ should_fail_on_existing_target(DbName) -> ok = couch_db:close(Db), exit(Pid, kill), test_util:wait(fun() -> - case ets:lookup(couch_dbs, TName) of + case ets:lookup(couch_server:couch_dbs(DbName), TName) of [] -> ok; [_ | _] -> wait end diff --git a/src/couch/test/eunit/couch_db_tests.erl b/src/couch/test/eunit/couch_db_tests.erl index dd2cb427d..d52a15597 100644 --- a/src/couch/test/eunit/couch_db_tests.erl +++ b/src/couch/test/eunit/couch_db_tests.erl @@ -109,7 +109,8 @@ should_delete_db(DbName) -> should_create_multiple_dbs(DbNames) -> ?_test(begin - gen_server:call(couch_server, {set_max_dbs_open, 3}), + [gen_server:call(couch_server:couch_server(N), {set_max_dbs_open, 3}) || + N <- lists:seq(1, couch_server:num_servers())], {ok, Before} = couch_server:all_databases(), [?assertNot(lists:member(DbName, Before)) || DbName <- DbNames], [?assert(create_db(DbName)) || DbName <- DbNames], @@ -170,7 +171,7 @@ locking_should_work(DbName) -> ok = couch_db:close(Db), catch exit(couch_db:get_pid(Db), kill), test_util:wait(fun() -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_server:couch_dbs(DbName), DbName) of [] -> ok; [_ | _] -> wait end diff --git a/src/couch/test/eunit/couch_server_tests.erl b/src/couch/test/eunit/couch_server_tests.erl index 7d50700d2..66533d48c 100644 --- a/src/couch/test/eunit/couch_server_tests.erl +++ b/src/couch/test/eunit/couch_server_tests.erl @@ -169,7 +169,7 @@ start_interleaved() -> DbPid = couch_db:get_pid(Db), unlink(DbPid), Msg = {'EXIT', DbPid, killed}, - erlang:send_after(2000, whereis(couch_server), Msg); + erlang:send_after(2000, whereis(couch_server:couch_server(DbName)), Msg); _ -> ok end, @@ -202,7 +202,7 @@ t_interleaved_create_delete_open(DbName) -> % Get the current couch_server pid so we're sure % to not end up messaging two different pids - CouchServer = whereis(couch_server), + CouchServer = whereis(couch_server:couch_server(DbName)), % Start our first instance that will succeed in % an invalid state. Notice that the opener pid @@ -250,7 +250,7 @@ t_interleaved_create_delete_open(DbName) -> get_opener_pid(DbName) -> WaitFun = fun() -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_server:couch_dbs(DbName), DbName) of [#entry{pid = Pid}] -> {ok, Pid}; [] -> diff --git a/src/couch/test/eunit/couchdb_db_tests.erl b/src/couch/test/eunit/couchdb_db_tests.erl index 734bafb9f..338f2cd3c 100644 --- a/src/couch/test/eunit/couchdb_db_tests.erl +++ b/src/couch/test/eunit/couchdb_db_tests.erl @@ -65,12 +65,12 @@ should_close_deleted_db(DbName) -> throw(timeout_error) end, test_util:wait(fun() -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_server:couch_dbs(DbName), DbName) of [] -> ok; _ -> wait end end), - ?assertEqual([], ets:lookup(couch_dbs, DbName)) + ?assertEqual([], ets:lookup(couch_server:couch_dbs(DbName), DbName)) end). diff --git a/src/couch_pse_tests/src/cpse_util.erl b/src/couch_pse_tests/src/cpse_util.erl index 24f49e88c..55622e925 100644 --- a/src/couch_pse_tests/src/cpse_util.erl +++ b/src/couch_pse_tests/src/cpse_util.erl @@ -116,7 +116,8 @@ shutdown_db(Db) -> erlang:error(database_shutdown_timeout) end, test_util:wait(fun() -> - case ets:member(couch_dbs, couch_db:name(Db)) of + case ets:member(couch_server:couch_dbs(couch_db:name(Db)), + couch_db:name(Db)) of true -> wait; false -> ok end |