diff options
author | Robert Newson <rnewson@apache.org> | 2021-02-12 15:40:18 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-12 15:40:18 +0000 |
commit | be2898dd5338e2e83c6058f6974cc6177c0bccae (patch) | |
tree | 102aaa9dba8e03c4195017c340ec965e1127be45 | |
parent | e089b029a754763ccec1ac889ce912627cec480a (diff) | |
parent | ef1c9029ba6699774e0ea292adc5f1778a2c02a1 (diff) | |
download | couchdb-be2898dd5338e2e83c6058f6974cc6177c0bccae.tar.gz |
Merge pull request #3366 from apache/couch_server_sharding
Couch server sharding
-rw-r--r-- | rel/overlay/etc/default.ini | 6 | ||||
-rw-r--r-- | src/couch/src/couch_db_engine.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_db_updater.erl | 16 | ||||
-rw-r--r-- | src/couch/src/couch_lru.erl | 13 | ||||
-rw-r--r-- | src/couch/src/couch_primary_sup.erl | 24 | ||||
-rw-r--r-- | src/couch/src/couch_server.erl | 232 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl | 5 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_db_split_tests.erl | 2 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_db_tests.erl | 5 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_server_tests.erl | 6 | ||||
-rw-r--r-- | src/couch/test/eunit/couchdb_db_tests.erl | 4 | ||||
-rw-r--r-- | src/couch_pse_tests/src/cpse_util.erl | 3 |
12 files changed, 201 insertions, 117 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 2cacf7775..973b1a1fe 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -9,7 +9,13 @@ view_index_dir = {{view_index_dir}} ; util_driver_dir = ; plugin_dir = os_process_timeout = 5000 ; 5 seconds. for view servers. + +; Maximum number of .couch files to open at once. +; The actual limit may be slightly lower depending on how +; many schedulers you have as the allowance is divided evenly +; among them. max_dbs_open = 500 + ; Method used to compress everything that is appended to database and view index files, except ; for attachments (see the attachments section). Available methods are: ; diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index 9adc9929d..918dabcca 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -1073,7 +1073,7 @@ finish_compaction(Db, CompactInfo) -> compactor_pid = CompactorPid } end, - ok = gen_server:call(couch_server, {db_updated, NewDb}, infinity), + ok = couch_server:db_updated(NewDb), {ok, NewDb}. diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 1ca804c05..535acfad6 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -79,7 +79,7 @@ handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) -> exit(Pid, kill), couch_server:delete_compaction_files(Db#db.name), Db2 = Db#db{compactor_pid = nil}, - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + ok = couch_server:db_updated(Db2), {reply, ok, Db2, idle_limit()}; handle_call({set_security, NewSec}, _From, #db{} = Db) -> @@ -87,18 +87,18 @@ handle_call({set_security, NewSec}, _From, #db{} = Db) -> NewSecDb = commit_data(NewDb#db{ security = NewSec }), - ok = gen_server:call(couch_server, {db_updated, NewSecDb}, infinity), + ok = couch_server:db_updated(NewSecDb), {reply, ok, NewSecDb, idle_limit()}; handle_call({set_revs_limit, Limit}, _From, Db) -> {ok, Db2} = couch_db_engine:set_revs_limit(Db, Limit), Db3 = commit_data(Db2), - ok = gen_server:call(couch_server, {db_updated, Db3}, infinity), + ok = couch_server:db_updated(Db3), {reply, ok, Db3, idle_limit()}; handle_call({set_purge_infos_limit, Limit}, _From, Db) -> {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit), - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + ok = couch_server:db_updated(Db2), {reply, ok, Db2, idle_limit()}; handle_call({purge_docs, [], _}, _From, Db) -> @@ -130,7 +130,7 @@ handle_call(Msg, From, Db) -> handle_cast({load_validation_funs, ValidationFuns}, Db) -> Db2 = Db#db{validate_doc_funs = ValidationFuns}, - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + ok = couch_server:db_updated(Db2), {noreply, Db2, idle_limit()}; handle_cast(start_compact, Db) -> case Db#db.compactor_pid of @@ -143,7 +143,7 @@ handle_cast(start_compact, Db) -> Args = [Db#db.name, UpdateSeq], couch_log:info("Starting compaction for db \"~s\" at ~p", Args), {ok, Db2} = couch_db_engine:start_compaction(Db), - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + ok = couch_server:db_updated(Db2), {noreply, Db2, idle_limit()}; _ -> % compact currently running, this is a no-op @@ -175,7 +175,7 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts}, NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs], try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts) of {ok, Db2, UpdatedDDocIds} -> - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + ok = couch_server:db_updated(Db2), case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of {Seq, Seq} -> ok; _ -> couch_event:notify(Db2#db.name, updated) @@ -780,7 +780,7 @@ purge_docs(Db, PurgeReqs) -> {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos), Db2 = commit_data(Db1), - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + ok = couch_server:db_updated(Db2), couch_event:notify(Db2#db.name, updated), {ok, Db2, Replies}. diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl index 6ad7c65cd..618a0144f 100644 --- a/src/couch/src/couch_lru.erl +++ b/src/couch/src/couch_lru.erl @@ -43,17 +43,20 @@ close({Tree, _} = Cache) -> close_int(none, _) -> false; close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) -> - case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of + CouchDbs = couch_server:couch_dbs(DbName), + CouchDbsPidToName = couch_server:couch_dbs_pid_to_name(DbName), + + case ets:update_element(CouchDbs, DbName, {#entry.lock, locked}) of true -> - [#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName), + [#entry{db = Db, pid = Pid}] = ets:lookup(CouchDbs, DbName), case couch_db:is_idle(Db) of true -> - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), + true = ets:delete(CouchDbs, DbName), + true = ets:delete(CouchDbsPidToName, Pid), exit(Pid, kill), {true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}}; false -> ElemSpec = {#entry.lock, unlocked}, - true = ets:update_element(couch_dbs, DbName, ElemSpec), + true = ets:update_element(CouchDbs, DbName, ElemSpec), couch_stats:increment_counter([couchdb, couch_server, lru_skip]), close_int(gb_trees:next(Iter), update(DbName, Cache)) end; diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl index dc2d9e51a..48c6d42ab 100644 --- a/src/couch/src/couch_primary_sup.erl +++ b/src/couch/src/couch_primary_sup.erl @@ -30,13 +30,21 @@ init([]) -> permanent, brutal_kill, worker, - [couch_task_status]}, - {couch_server, - {couch_server, sup_start_link, []}, - permanent, - brutal_kill, - worker, - [couch_server]} - ], + [couch_task_status]} + ] ++ couch_servers(), {ok, {{one_for_one, 10, 3600}, Children}}. + +couch_servers() -> + N = couch_server:num_servers(), + [couch_server(I) || I <- lists:seq(1, N)]. + +couch_server(N) -> + Name = couch_server:couch_server(N), + {Name, + {couch_server, sup_start_link, [N]}, + permanent, + brutal_kill, + worker, + [couch_server] + }. diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index 6db3f7448..480ed58ed 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -17,16 +17,17 @@ -export([open/2,create/2,delete/2,get_version/0,get_version/1,get_git_sha/0,get_uuid/0]). -export([all_databases/0, all_databases/2]). --export([init/1, handle_call/3,sup_start_link/0]). +-export([init/1, handle_call/3,sup_start_link/1]). -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). -export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]). --export([close_lru/0]). -export([close_db_if_idle/1]). -export([delete_compaction_files/1]). -export([exists/1]). -export([get_engine_extensions/0]). -export([get_engine_path/2]). -export([lock/2, unlock/1]). +-export([db_updated/1]). +-export([num_servers/0, couch_server/1, couch_dbs_pid_to_name/1, couch_dbs/1]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -44,7 +45,10 @@ dbs_open=0, start_time="", update_lru_on_read=true, - lru = couch_lru:new() + lru = couch_lru:new(), + couch_dbs, + couch_dbs_pid_to_name, + couch_dbs_locks }). dev_start() -> @@ -71,12 +75,16 @@ get_uuid() -> end. get_stats() -> - {ok, #server{start_time=Time,dbs_open=Open}} = - gen_server:call(couch_server, get_server), + Fun = fun(N, {TimeAcc, OpenAcc}) -> + {ok, #server{start_time=Time,dbs_open=Open}} = + gen_server:call(couch_server(N), get_server), + {max(Time, TimeAcc), Open + OpenAcc} end, + {Time, Open} = + lists:foldl(Fun, {0, 0}, lists:seq(1, num_servers())), [{start_time, ?l2b(Time)}, {dbs_open, Open}]. -sup_start_link() -> - gen_server:start_link({local, couch_server}, couch_server, [], []). +sup_start_link(N) -> + gen_server:start_link({local, couch_server(N)}, couch_server, [N], []). open(DbName, Options) -> try @@ -88,7 +96,7 @@ open(DbName, Options) -> open_int(DbName, Options0) -> Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}), - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(DbName), DbName) of [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked -> update_lru(DbName, Entry#entry.db_options), {ok, Db1} = couch_db:incref(Db0), @@ -97,7 +105,7 @@ open_int(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), Timeout = couch_util:get_value(timeout, Options, infinity), Create = couch_util:get_value(create_if_missing, Options, false), - case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of + case gen_server:call(couch_server(DbName), {open, DbName, Options}, Timeout) of {ok, Db0} -> {ok, Db1} = couch_db:incref(Db0), couch_db:set_user_ctx(Db1, Ctx); @@ -113,15 +121,13 @@ update_lru(DbName, Options) -> case config:get_boolean("couchdb", "update_lru_on_read", false) of true -> case lists:member(sys_db, Options) of - false -> gen_server:cast(couch_server, {update_lru, DbName}); + false -> gen_server:cast(couch_server(DbName), {update_lru, DbName}); true -> ok end; false -> ok end. -close_lru() -> - gen_server:call(couch_server, close_lru). create(DbName, Options) -> try @@ -134,7 +140,7 @@ create(DbName, Options) -> create_int(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), couch_partition:validate_dbname(DbName, Options), - case gen_server:call(couch_server, {create, DbName, Options}, infinity) of + case gen_server:call(couch_server(DbName), {create, DbName, Options}, infinity) of {ok, Db0} -> Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), {ok, Db1} = couch_db:incref(Db0), @@ -144,7 +150,7 @@ create_int(DbName, Options0) -> end. delete(DbName, Options) -> - gen_server:call(couch_server, {delete, DbName, Options}, infinity). + gen_server:call(couch_server(DbName), {delete, DbName, Options}, infinity). exists(DbName) -> @@ -225,15 +231,15 @@ hash_admin_passwords(Persist) -> end, couch_passwords:get_unhashed_admins()). close_db_if_idle(DbName) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(DbName), DbName) of [#entry{}] -> - gen_server:cast(couch_server, {close_db_if_idle, DbName}); + gen_server:cast(couch_server(DbName), {close_db_if_idle, DbName}); [] -> ok end. -init([]) -> +init([N]) -> couch_util:set_mqd_off_heap(?MODULE), couch_util:set_process_priority(?MODULE, high), @@ -267,18 +273,18 @@ init([]) -> config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))), UpdateLruOnRead = config:get("couchdb", "update_lru_on_read", "false") =:= "true", - ok = config:listen_for_changes(?MODULE, nil), + ok = config:listen_for_changes(?MODULE, N), ok = couch_file:init_delete_dir(RootDir), hash_admin_passwords(), - ets:new(couch_dbs, [ + ets:new(couch_dbs(N), [ set, protected, named_table, {keypos, #entry.name}, {read_concurrency, true} ]), - ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), - ets:new(couch_dbs_locks, [ + ets:new(couch_dbs_pid_to_name(N), [set, protected, named_table]), + ets:new(couch_dbs_locks(N), [ set, public, named_table, @@ -287,9 +293,12 @@ init([]) -> process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, engines = Engines, - max_dbs_open=MaxDbsOpen, + max_dbs_open=MaxDbsOpen div couch_server:num_servers(), update_lru_on_read=UpdateLruOnRead, - start_time=couch_util:rfc1123_date()}}. + start_time=couch_util:rfc1123_date(), + couch_dbs=couch_dbs(N), + couch_dbs_pid_to_name=couch_dbs_pid_to_name(N), + couch_dbs_locks=couch_dbs_locks(N)}}. terminate(Reason, Srv) -> couch_log:error("couch_server terminating with ~p, state ~2048p", @@ -301,40 +310,50 @@ terminate(Reason, Srv) -> if Db == undefined -> ok; true -> couch_util:shutdown_sync(couch_db:get_pid(Db)) end - end, nil, couch_dbs), + end, nil, couch_dbs(Srv)), ok. handle_config_change("couchdb", "database_dir", _, _, _) -> exit(whereis(couch_server), config_change), remove_handler; -handle_config_change("couchdb", "update_lru_on_read", "true", _, _) -> - {ok, gen_server:call(couch_server,{set_update_lru_on_read,true})}; -handle_config_change("couchdb", "update_lru_on_read", _, _, _) -> - {ok, gen_server:call(couch_server,{set_update_lru_on_read,false})}; -handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) -> - {ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})}; -handle_config_change("couchdb", "max_dbs_open", _, _, _) -> - {ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})}; -handle_config_change("couchdb_engines", _, _, _, _) -> - {ok, gen_server:call(couch_server, reload_engines)}; -handle_config_change("admins", _, _, Persist, _) -> +handle_config_change("couchdb", "update_lru_on_read", "true", _, N) -> + gen_server:call(couch_server(N),{set_update_lru_on_read,true}), + {ok, N}; +handle_config_change("couchdb", "update_lru_on_read", _, _, N) -> + gen_server:call(couch_server(N),{set_update_lru_on_read,false}), + {ok, N}; +handle_config_change("couchdb", "max_dbs_open", Max, _, N) when is_list(Max) -> + gen_server:call(couch_server(N),{set_max_dbs_open,list_to_integer(Max)}), + {ok, N}; +handle_config_change("couchdb", "max_dbs_open", _, _, N) -> + gen_server:call(couch_server(N),{set_max_dbs_open,?MAX_DBS_OPEN}), + {ok, N}; +handle_config_change("couchdb_engines", _, _, _, N) -> + gen_server:call(couch_server(N), reload_engines), + {ok, N}; +handle_config_change("admins", _, _, Persist, N) -> % spawn here so couch event manager doesn't deadlock - {ok, spawn(fun() -> hash_admin_passwords(Persist) end)}; -handle_config_change("httpd", "authentication_handlers", _, _, _) -> - {ok, couch_httpd:stop()}; -handle_config_change("httpd", "bind_address", _, _, _) -> - {ok, couch_httpd:stop()}; -handle_config_change("httpd", "port", _, _, _) -> - {ok, couch_httpd:stop()}; -handle_config_change("httpd", "max_connections", _, _, _) -> - {ok, couch_httpd:stop()}; -handle_config_change(_, _, _, _, _) -> - {ok, nil}. + spawn(fun() -> hash_admin_passwords(Persist) end), + {ok, N}; +handle_config_change("httpd", "authentication_handlers", _, _, N) -> + couch_httpd:stop(), + {ok, N}; +handle_config_change("httpd", "bind_address", _, _, N) -> + couch_httpd:stop(), + {ok, N}; +handle_config_change("httpd", "port", _, _, N) -> + couch_httpd:stop(), + {ok, N}; +handle_config_change("httpd", "max_connections", _, _, N) -> + couch_httpd:stop(), + {ok, N}; +handle_config_change(_, _, _, _, N) -> + {ok, N}. handle_config_terminate(_, stop, _) -> ok; -handle_config_terminate(_Server, _Reason, _State) -> - erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). +handle_config_terminate(_Server, _Reason, N) -> + erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), {restart_config_listener, N}). all_databases() -> @@ -343,7 +362,7 @@ all_databases() -> {ok, lists:usort(DbList)}. all_databases(Fun, Acc0) -> - {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server), + {ok, #server{root_dir=Root}} = gen_server:call(couch_server_1, get_server), NormRoot = couch_util:normpath(Root), Extensions = get_engine_extensions(), ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")", @@ -424,7 +443,7 @@ open_async(Server, From, DbName, Options) -> true -> create; false -> open end, - true = ets:insert(couch_dbs, #entry{ + true = ets:insert(couch_dbs(Server), #entry{ name = DbName, pid = Opener, lock = locked, @@ -432,7 +451,7 @@ open_async(Server, From, DbName, Options) -> req_type = ReqType, db_options = Options }), - true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}), + true = ets:insert(couch_dbs_pid_to_name(Server), {Opener, DbName}), db_opened(Server, Options). open_async_int(Server, DbName, Options) -> @@ -467,9 +486,9 @@ handle_call(reload_engines, _From, Server) -> handle_call(get_server, _From, Server) -> {reply, {ok, Server}, Server}; handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> - true = ets:delete(couch_dbs_pid_to_name, Opener), + true = ets:delete(couch_dbs_pid_to_name(Server), Opener), DbPid = couch_db:get_pid(Db), - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> % db was deleted during async open exit(DbPid, kill), @@ -484,7 +503,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> _ -> ok end, - true = ets:insert(couch_dbs, #entry{ + true = ets:insert(couch_dbs(Server), #entry{ name = DbName, db = Db, pid = DbPid, @@ -492,7 +511,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> db_options = Entry#entry.db_options, start_time = couch_db:get_instance_start_time(Db) }), - true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}), + true = ets:insert(couch_dbs_pid_to_name(Server), {DbPid, DbName}), Lru = case couch_db:is_system_db(Db) of false -> couch_lru:insert(DbName, Server#server.lru); @@ -510,14 +529,14 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> handle_call({open_result, DbName, {error, eexist}}, From, Server) -> handle_call({open_result, DbName, file_exists}, From, Server); handle_call({open_result, DbName, Error}, {Opener, _}, Server) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> % db was deleted during async open {reply, ok, Server}; [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] -> [gen_server:reply(Waiter, Error) || Waiter <- Waiters], - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Opener), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), Opener), NewServer = case ReqType of {create, DbName, Options, CrFrom} -> open_async(Server, CrFrom, DbName, Options); @@ -531,7 +550,7 @@ handle_call({open_result, DbName, Error}, {Opener, _}, Server) -> {reply, ok, Server} end; handle_call({open, DbName, Options}, From, Server) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> case make_room(Server, Options) of {ok, Server2} -> @@ -540,7 +559,7 @@ handle_call({open, DbName, Options}, From, Server) -> {reply, CloseError, Server} end; [#entry{waiters = Waiters} = Entry] when is_list(Waiters) -> - true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}), + true = ets:insert(couch_dbs(Server), Entry#entry{waiters = [From | Waiters]}), NumWaiters = length(Waiters), if NumWaiters =< 10 orelse NumWaiters rem 10 /= 0 -> ok; true -> Fmt = "~b clients waiting to open db ~s", @@ -551,7 +570,7 @@ handle_call({open, DbName, Options}, From, Server) -> {reply, {ok, Db}, Server} end; handle_call({create, DbName, Options}, From, Server) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> case make_room(Server, Options) of {ok, Server2} -> @@ -566,7 +585,7 @@ handle_call({create, DbName, Options}, From, Server) -> % to wait while we figure out if it'll succeed. CrOptions = [create | Options], Req = {create, DbName, CrOptions, From}, - true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), + true = ets:insert(couch_dbs(Server), Entry#entry{req_type = Req}), {noreply, Server}; [_AlreadyRunningDb] -> {reply, file_exists, Server} @@ -576,17 +595,17 @@ handle_call({delete, DbName, Options}, _From, Server) -> case check_dbname(DbNameList) of ok -> Server2 = - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(Server), DbName) of [] -> Server; [#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) -> - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), Pid), exit(Pid, kill), [gen_server:reply(Waiter, not_found) || Waiter <- Waiters], db_closed(Server, Entry#entry.db_options); [#entry{pid = Pid} = Entry] -> - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), Pid), exit(Pid, kill), db_closed(Server, Entry#entry.db_options) end, @@ -615,9 +634,9 @@ handle_call({delete, DbName, Options}, _From, Server) -> handle_call({db_updated, Db}, _From, Server0) -> DbName = couch_db:name(Db), StartTime = couch_db:get_instance_start_time(Db), - Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of + Server = try ets:lookup_element(couch_dbs(Server0), DbName, #entry.start_time) of StartTime -> - true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}), + true = ets:update_element(couch_dbs(Server0), DbName, {#entry.db, Db}), Lru = case couch_db:is_system_db(Db) of false -> couch_lru:update(DbName, Server0#server.lru); true -> Server0#server.lru @@ -635,19 +654,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} = handle_cast({update_lru, _DbName}, Server) -> {noreply, Server}; handle_cast({close_db_if_idle, DbName}, Server) -> - case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of + case ets:update_element(couch_dbs(Server), DbName, {#entry.lock, locked}) of true -> - [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName), + [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs(Server), DbName), case couch_db:is_idle(Db) of true -> DbPid = couch_db:get_pid(Db), - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, DbPid), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), DbPid), exit(DbPid, kill), {noreply, db_closed(Server, DbOpts)}; false -> true = ets:update_element( - couch_dbs, DbName, {#entry.lock, unlocked}), + couch_dbs(Server), DbName, {#entry.lock, unlocked}), {noreply, Server} end; false -> @@ -663,9 +682,9 @@ code_change(_OldVsn, #server{}=State, _Extra) -> handle_info({'EXIT', _Pid, config_change}, Server) -> {stop, config_change, Server}; handle_info({'EXIT', Pid, Reason}, Server) -> - case ets:lookup(couch_dbs_pid_to_name, Pid) of + case ets:lookup(couch_dbs_pid_to_name(Server), Pid) of [{Pid, DbName}] -> - [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName), + [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs(Server), DbName), if Reason /= snappy_nif_not_loaded -> ok; true -> Msg = io_lib:format("To open the database `~s`, Apache CouchDB " "must be built with Erlang OTP R13B04 or higher.", [DbName]), @@ -680,14 +699,14 @@ handle_info({'EXIT', Pid, Reason}, Server) -> if not is_list(Waiters) -> ok; true -> [gen_server:reply(Waiter, Reason) || Waiter <- Waiters] end, - true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), + true = ets:delete(couch_dbs(Server), DbName), + true = ets:delete(couch_dbs_pid_to_name(Server), Pid), {noreply, db_closed(Server, Entry#entry.db_options)}; [] -> {noreply, Server} end; -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), +handle_info({restart_config_listener, N}, State) -> + ok = config:listen_for_changes(?MODULE, N), {noreply, State}; handle_info(Info, Server) -> {stop, {unknown_message, Info}, Server}. @@ -719,7 +738,7 @@ validate_open_or_create(DbName, Options) -> throw({?MODULE, EngineError}) end, - case ets:lookup(couch_dbs_locks, DbName) of + case ets:lookup(couch_dbs_locks(DbName), DbName) of [] -> ok; [{DbName, Reason}] -> @@ -860,19 +879,64 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) -> end. lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_dbs(DbName), DbName) of [] -> - true = ets:insert(couch_dbs_locks, {DbName, Reason}), + true = ets:insert(couch_dbs_locks(DbName), {DbName, Reason}), ok; [#entry{}] -> {error, already_opened} end. unlock(DbName) when is_binary(DbName) -> - true = ets:delete(couch_dbs_locks, DbName), + true = ets:delete(couch_dbs_locks(DbName), DbName), ok. +db_updated(Db) -> + DbName = couch_db:name(Db), + gen_server:call(couch_server(DbName), {db_updated, Db}, infinity). + + +couch_server(Arg) -> + name("couch_server", Arg). + + +couch_dbs(Arg) -> + name("couch_dbs", Arg). + + +couch_dbs_pid_to_name(Arg) -> + name("couch_dbs_pid_to_name", Arg). + + +couch_dbs_locks(Arg) -> + name("couch_dbs_locks", Arg). + + +name("couch_dbs", #server{} = Server) -> + Server#server.couch_dbs; + +name("couch_dbs_pid_to_name", #server{} = Server) -> + Server#server.couch_dbs_pid_to_name; + +name("couch_dbs_locks", #server{} = Server) -> + Server#server.couch_dbs_locks; + +name(BaseName, DbName) when is_list(DbName) -> + name(BaseName, ?l2b(DbName)); + +name(BaseName, DbName) when is_binary(DbName) -> + N = 1 + erlang:phash2(DbName, num_servers()), + name(BaseName, N); + +name(BaseName, N) when is_integer(N), N > 0 -> + list_to_atom(BaseName ++ "_" ++ integer_to_list(N)). + + +num_servers() -> + erlang:system_info(schedulers). + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl index 188078c2d..090217b4c 100644 --- a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl +++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl @@ -283,7 +283,8 @@ wait_db_cleared(Db, N) when N < 0 -> erlang:error({db_clear_timeout, couch_db:name(Db)}); wait_db_cleared(Db, N) -> - case ets:lookup(couch_dbs, couch_db:name(Db)) of + Tab = couch_server:couch_dbs(couch_db:name(Db)), + case ets:lookup(Tab, couch_db:name(Db)) of [] -> ok; [#entry{db = NewDb}] -> @@ -332,4 +333,4 @@ purge_module() -> _ -> code:delete(couch_db_updater), code:purge(couch_db_updater) - end.
\ No newline at end of file + end. diff --git a/src/couch/test/eunit/couch_db_split_tests.erl b/src/couch/test/eunit/couch_db_split_tests.erl index 6e24c36ee..b52184a8c 100644 --- a/src/couch/test/eunit/couch_db_split_tests.erl +++ b/src/couch/test/eunit/couch_db_split_tests.erl @@ -113,7 +113,7 @@ should_fail_on_existing_target(DbName) -> ok = couch_db:close(Db), exit(Pid, kill), test_util:wait(fun() -> - case ets:lookup(couch_dbs, TName) of + case ets:lookup(couch_server:couch_dbs(DbName), TName) of [] -> ok; [_ | _] -> wait end diff --git a/src/couch/test/eunit/couch_db_tests.erl b/src/couch/test/eunit/couch_db_tests.erl index dd2cb427d..d52a15597 100644 --- a/src/couch/test/eunit/couch_db_tests.erl +++ b/src/couch/test/eunit/couch_db_tests.erl @@ -109,7 +109,8 @@ should_delete_db(DbName) -> should_create_multiple_dbs(DbNames) -> ?_test(begin - gen_server:call(couch_server, {set_max_dbs_open, 3}), + [gen_server:call(couch_server:couch_server(N), {set_max_dbs_open, 3}) || + N <- lists:seq(1, couch_server:num_servers())], {ok, Before} = couch_server:all_databases(), [?assertNot(lists:member(DbName, Before)) || DbName <- DbNames], [?assert(create_db(DbName)) || DbName <- DbNames], @@ -170,7 +171,7 @@ locking_should_work(DbName) -> ok = couch_db:close(Db), catch exit(couch_db:get_pid(Db), kill), test_util:wait(fun() -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_server:couch_dbs(DbName), DbName) of [] -> ok; [_ | _] -> wait end diff --git a/src/couch/test/eunit/couch_server_tests.erl b/src/couch/test/eunit/couch_server_tests.erl index 7d50700d2..66533d48c 100644 --- a/src/couch/test/eunit/couch_server_tests.erl +++ b/src/couch/test/eunit/couch_server_tests.erl @@ -169,7 +169,7 @@ start_interleaved() -> DbPid = couch_db:get_pid(Db), unlink(DbPid), Msg = {'EXIT', DbPid, killed}, - erlang:send_after(2000, whereis(couch_server), Msg); + erlang:send_after(2000, whereis(couch_server:couch_server(DbName)), Msg); _ -> ok end, @@ -202,7 +202,7 @@ t_interleaved_create_delete_open(DbName) -> % Get the current couch_server pid so we're sure % to not end up messaging two different pids - CouchServer = whereis(couch_server), + CouchServer = whereis(couch_server:couch_server(DbName)), % Start our first instance that will succeed in % an invalid state. Notice that the opener pid @@ -250,7 +250,7 @@ t_interleaved_create_delete_open(DbName) -> get_opener_pid(DbName) -> WaitFun = fun() -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_server:couch_dbs(DbName), DbName) of [#entry{pid = Pid}] -> {ok, Pid}; [] -> diff --git a/src/couch/test/eunit/couchdb_db_tests.erl b/src/couch/test/eunit/couchdb_db_tests.erl index 734bafb9f..338f2cd3c 100644 --- a/src/couch/test/eunit/couchdb_db_tests.erl +++ b/src/couch/test/eunit/couchdb_db_tests.erl @@ -65,12 +65,12 @@ should_close_deleted_db(DbName) -> throw(timeout_error) end, test_util:wait(fun() -> - case ets:lookup(couch_dbs, DbName) of + case ets:lookup(couch_server:couch_dbs(DbName), DbName) of [] -> ok; _ -> wait end end), - ?assertEqual([], ets:lookup(couch_dbs, DbName)) + ?assertEqual([], ets:lookup(couch_server:couch_dbs(DbName), DbName)) end). diff --git a/src/couch_pse_tests/src/cpse_util.erl b/src/couch_pse_tests/src/cpse_util.erl index 24f49e88c..55622e925 100644 --- a/src/couch_pse_tests/src/cpse_util.erl +++ b/src/couch_pse_tests/src/cpse_util.erl @@ -116,7 +116,8 @@ shutdown_db(Db) -> erlang:error(database_shutdown_timeout) end, test_util:wait(fun() -> - case ets:member(couch_dbs, couch_db:name(Db)) of + case ets:member(couch_server:couch_dbs(couch_db:name(Db)), + couch_db:name(Db)) of true -> wait; false -> ok end |