diff options
Diffstat (limited to 'src/couch/src/couch_server.erl')
-rw-r--r-- | src/couch/src/couch_server.erl | 213 |
1 files changed, 149 insertions, 64 deletions
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index ab0122eec..909e23898 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -26,6 +26,7 @@ -export([exists/1]). -export([get_engine_extensions/0]). -export([get_engine_path/2]). +-export([lock/2, unlock/1]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -77,8 +78,15 @@ get_stats() -> sup_start_link() -> gen_server:start_link({local, couch_server}, couch_server, [], []). +open(DbName, Options) -> + try + validate_open_or_create(DbName, Options), + open_int(DbName, Options) + catch throw:{?MODULE, Error} -> + Error + end. -open(DbName, Options0) -> +open_int(DbName, Options0) -> Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}), case ets:lookup(couch_dbs, DbName) of [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked -> @@ -115,7 +123,15 @@ update_lru(DbName, Options) -> close_lru() -> gen_server:call(couch_server, close_lru). -create(DbName, Options0) -> +create(DbName, Options) -> + try + validate_open_or_create(DbName, Options), + create_int(DbName, Options) + catch throw:{?MODULE, Error} -> + Error + end. + +create_int(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), couch_partition:validate_dbname(DbName, Options), case gen_server:call(couch_server, {create, DbName, Options}, infinity) of @@ -183,7 +199,7 @@ path_ends_with(Path, Suffix) when is_binary(Suffix) -> path_ends_with(Path, Suffix) when is_list(Suffix) -> path_ends_with(Path, ?l2b(Suffix)). -check_dbname(#server{}, DbName) -> +check_dbname(DbName) -> couch_db:validate_dbname(DbName). is_admin(User, ClearPwd) -> @@ -219,6 +235,7 @@ close_db_if_idle(DbName) -> init([]) -> couch_util:set_mqd_off_heap(?MODULE), + couch_util:set_process_priority(?MODULE, high), % Mark pluggable storage engines as a supported feature config:enable_feature('pluggable-storage-engines'), @@ -251,6 +268,12 @@ init([]) -> {read_concurrency, true} ]), ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), + ets:new(couch_dbs_locks, [ + set, + public, + named_table, + {read_concurrency, true} + ]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, engines = Engines, @@ -357,19 +380,32 @@ maybe_close_lru_db(#server{lru=Lru}=Server) -> {error, all_dbs_active} end. -open_async(Server, From, DbName, {Module, Filepath}, Options) -> +open_async(Server, From, DbName, Options) -> Parent = self(), T0 = os:timestamp(), Opener = spawn_link(fun() -> - Res = couch_db:start_link(Module, DbName, Filepath, Options), - case {Res, lists:member(create, Options)} of - {{ok, _Db}, true} -> + Res = open_async_int(Server, DbName, Options), + IsSuccess = case Res of + {ok, _} -> true; + _ -> false + end, + case IsSuccess andalso lists:member(create, Options) of + true -> couch_event:notify(DbName, created); - _ -> + false -> ok end, - gen_server:call(Parent, {open_result, T0, DbName, Res}, infinity), - unlink(Parent) + gen_server:call(Parent, {open_result, DbName, Res}, infinity), + unlink(Parent), + case IsSuccess of + true -> + % Track latency times for successful opens + Diff = timer:now_diff(os:timestamp(), T0) / 1000, + couch_stats:update_histogram([couchdb, db_open_time], Diff); + false -> + % Log unsuccessful open results + couch_log:info("open_result error ~p for ~s", [Res, DbName]) + end end), ReqType = case lists:member(create, Options) of true -> create; @@ -386,6 +422,20 @@ open_async(Server, From, DbName, {Module, Filepath}, Options) -> true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}), db_opened(Server, Options). +open_async_int(Server, DbName, Options) -> + DbNameList = binary_to_list(DbName), + case check_dbname(DbNameList) of + ok -> + case get_engine(Server, DbNameList, Options) of + {ok, {Module, FilePath}} -> + couch_db:start_link(Module, DbName, FilePath, Options); + Error2 -> + Error2 + end; + Error1 -> + Error1 + end. + handle_call(close_lru, _From, #server{lru=Lru} = Server) -> case couch_lru:close(Lru) of {true, NewLru} -> @@ -403,10 +453,8 @@ handle_call(reload_engines, _From, Server) -> {reply, ok, Server#server{engines = get_configured_engines()}}; handle_call(get_server, _From, Server) -> {reply, {ok, Server}, Server}; -handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) -> +handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) -> true = ets:delete(couch_dbs_pid_to_name, Opener), - OpenTime = timer:now_diff(os:timestamp(), T0) / 1000, - couch_stats:update_histogram([couchdb, db_open_time], OpenTime), DbPid = couch_db:get_pid(Db), case ets:lookup(couch_dbs, DbName) of [] -> @@ -418,7 +466,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) -> [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters], % Cancel the creation request if it exists. case ReqType of - {create, DbName, _Engine, _Options, CrFrom} -> + {create, DbName, _Options, CrFrom} -> gen_server:reply(CrFrom, file_exists); _ -> ok @@ -446,21 +494,20 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) -> exit(couch_db:get_pid(Db), kill), {reply, ok, Server} end; -handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) -> - handle_call({open_result, T0, DbName, file_exists}, From, Server); -handle_call({open_result, _T0, DbName, Error}, {Opener, _}, Server) -> +handle_call({open_result, DbName, {error, eexist}}, From, Server) -> + handle_call({open_result, DbName, file_exists}, From, Server); +handle_call({open_result, DbName, Error}, {Opener, _}, Server) -> case ets:lookup(couch_dbs, DbName) of [] -> % db was deleted during async open {reply, ok, Server}; [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] -> [gen_server:reply(Waiter, Error) || Waiter <- Waiters], - couch_log:info("open_result error ~p for ~s", [Error, DbName]), true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, Opener), NewServer = case ReqType of - {create, DbName, Engine, Options, CrFrom} -> - open_async(Server, CrFrom, DbName, Engine, Options); + {create, DbName, Options, CrFrom} -> + open_async(Server, CrFrom, DbName, Options); _ -> Server end, @@ -473,22 +520,16 @@ handle_call({open_result, _T0, DbName, Error}, {Opener, _}, Server) -> handle_call({open, DbName, Options}, From, Server) -> case ets:lookup(couch_dbs, DbName) of [] -> - DbNameList = binary_to_list(DbName), - case check_dbname(Server, DbNameList) of - ok -> - case make_room(Server, Options) of - {ok, Server2} -> - {ok, Engine} = get_engine(Server2, DbNameList), - {noreply, open_async(Server2, From, DbName, Engine, Options)}; - CloseError -> - {reply, CloseError, Server} - end; - Error -> - {reply, Error, Server} + case make_room(Server, Options) of + {ok, Server2} -> + {noreply, open_async(Server2, From, DbName, Options)}; + CloseError -> + {reply, CloseError, Server} end; [#entry{waiters = Waiters} = Entry] when is_list(Waiters) -> true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}), - if length(Waiters) =< 10 -> ok; true -> + NumWaiters = length(Waiters), + if NumWaiters =< 10 orelse NumWaiters rem 10 /= 0 -> ok; true -> Fmt = "~b clients waiting to open db ~s", couch_log:info(Fmt, [length(Waiters), DbName]) end, @@ -497,40 +538,29 @@ handle_call({open, DbName, Options}, From, Server) -> {reply, {ok, Db}, Server} end; handle_call({create, DbName, Options}, From, Server) -> - DbNameList = binary_to_list(DbName), - case get_engine(Server, DbNameList, Options) of - {ok, Engine} -> - case check_dbname(Server, DbNameList) of - ok -> - case ets:lookup(couch_dbs, DbName) of - [] -> - case make_room(Server, Options) of - {ok, Server2} -> - {noreply, open_async(Server2, From, DbName, Engine, - [create | Options])}; - CloseError -> - {reply, CloseError, Server} - end; - [#entry{req_type = open} = Entry] -> - % We're trying to create a database while someone is in - % the middle of trying to open it. We allow one creator - % to wait while we figure out if it'll succeed. - CrOptions = [create | Options], - Req = {create, DbName, Engine, CrOptions, From}, - true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), - {noreply, Server}; - [_AlreadyRunningDb] -> - {reply, file_exists, Server} - end; - Error -> - {reply, Error, Server} + case ets:lookup(couch_dbs, DbName) of + [] -> + case make_room(Server, Options) of + {ok, Server2} -> + CrOptions = [create | Options], + {noreply, open_async(Server2, From, DbName, CrOptions)}; + CloseError -> + {reply, CloseError, Server} end; - Error -> - {reply, Error, Server} + [#entry{req_type = open} = Entry] -> + % We're trying to create a database while someone is in + % the middle of trying to open it. We allow one creator + % to wait while we figure out if it'll succeed. + CrOptions = [create | Options], + Req = {create, DbName, CrOptions, From}, + true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), + {noreply, Server}; + [_AlreadyRunningDb] -> + {reply, file_exists, Server} end; handle_call({delete, DbName, Options}, _From, Server) -> DbNameList = binary_to_list(DbName), - case check_dbname(Server, DbNameList) of + case check_dbname(DbNameList) of ok -> Server2 = case ets:lookup(couch_dbs, DbName) of @@ -628,7 +658,12 @@ handle_info({'EXIT', Pid, Reason}, Server) -> "must be built with Erlang OTP R13B04 or higher.", [DbName]), couch_log:error(Msg, []) end, - couch_log:info("db ~s died with reason ~p", [DbName, Reason]), + % We kill databases on purpose so there's no reason + % to log that fact. So we restrict logging to "interesting" + % reasons. + if Reason == normal orelse Reason == killed -> ok; true -> + couch_log:info("db ~s died with reason ~p", [DbName, Reason]) + end, if not is_list(Waiters) -> ok; true -> [gen_server:reply(Waiter, Reason) || Waiter <- Waiters] end, @@ -656,6 +691,27 @@ db_closed(Server, Options) -> true -> Server end. +validate_open_or_create(DbName, Options) -> + case check_dbname(DbName) of + ok -> + ok; + DbNameError -> + throw({?MODULE, DbNameError}) + end, + + case check_engine(Options) of + ok -> + ok; + EngineError -> + throw({?MODULE, EngineError}) + end, + + case ets:lookup(couch_dbs_locks, DbName) of + [] -> + ok; + [{DbName, Reason}] -> + throw({?MODULE, {error, {locked, Reason}}}) + end. get_configured_engines() -> ConfigEntries = config:get("couchdb_engines"), @@ -765,6 +821,22 @@ get_engine_extensions() -> end. +check_engine(Options) -> + case couch_util:get_value(engine, Options) of + Ext when is_binary(Ext) -> + ExtStr = binary_to_list(Ext), + Extensions = get_engine_extensions(), + case lists:member(ExtStr, Extensions) of + true -> + ok; + false -> + {error, {invalid_engine_extension, Ext}} + end; + _ -> + ok + end. + + get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) -> RootDir = config:get("couchdb", "database_dir", "."), case lists:keyfind(Engine, 2, get_configured_engines()) of @@ -774,6 +846,19 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) -> {error, {invalid_engine, Engine}} end. +lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) -> + case ets:lookup(couch_dbs, DbName) of + [] -> + true = ets:insert(couch_dbs_locks, {DbName, Reason}), + ok; + [#entry{}] -> + {error, already_opened} + end. + +unlock(DbName) when is_binary(DbName) -> + true = ets:delete(couch_dbs_locks, DbName), + ok. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). |