summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2017-05-24 03:37:04 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2017-06-07 16:13:34 -0400
commitba0c10bcf49be66b042d8f0deaf14bcfda1a49bf (patch)
tree8620f61cb8804206006b6c0b365f868befa811aa
parent2062aff2c78f93ce935658cfded852bca7b6b300 (diff)
downloadcouchdb-ba0c10bcf49be66b042d8f0deaf14bcfda1a49bf.tar.gz
Close idle dbs
Previously idle dbs, especially sys dbs like _replicator once opened once for scanning would stay open forever. In a large cluster with many _replicator shards that can add up to a significant overhead, mostly in terms of number of active processes. Add a mechanism to close dbs which have an idle db updater. Before hibernation was used to limit the memory pressure, however that is often not enough. Some databases are only read periodically so their updater would time out. To prevent that from happening keep the last read timestamp in the couch file process dictionary. Idle check then avoid closing dbs which have been recently read from. (Original idea for using timeouts in gen_server replies belongs to Paul Davis) COUCHDB-3323
-rw-r--r--src/couch/src/couch_db_updater.erl108
-rw-r--r--src/couch/src/couch_file.erl26
-rw-r--r--src/couch/src/couch_server.erl28
-rw-r--r--src/couch/src/couch_util.erl19
4 files changed, 146 insertions, 35 deletions
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index bb8e9dafb..49061b2f6 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -21,6 +21,8 @@
-include_lib("couch/include/couch_db.hrl").
+-define(IDLE_LIMIT_DEFAULT, 61000).
+
-record(comp_header, {
db_header,
meta_state
@@ -36,6 +38,7 @@
init({DbName, Filepath, Fd, Options}) ->
erlang:put(io_priority, {db_update, DbName}),
+ update_idle_limit_from_config(),
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
@@ -70,7 +73,7 @@ init({DbName, Filepath, Fd, Options}) ->
% we don't load validation funs here because the fabric query is liable to
% race conditions. Instead see couch_db:validate_doc_update, which loads
% them lazily
- {ok, Db#db{main_pid = self()}}.
+ {ok, Db#db{main_pid = self()}, idle_limit()}.
terminate(_Reason, Db) ->
@@ -84,23 +87,23 @@ terminate(_Reason, Db) ->
ok.
handle_call(get_db, _From, Db) ->
- {reply, {ok, Db}, Db};
+ {reply, {ok, Db}, Db, idle_limit()};
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
- {reply, ok, Db}; % no data waiting, return ok immediately
+ {reply, ok, Db, idle_limit()}; % no data waiting, return ok immediately
handle_call(full_commit, _From, Db) ->
- {reply, ok, commit_data(Db)};
+ {reply, ok, commit_data(Db), idle_limit()};
handle_call({full_commit, RequiredSeq}, _From, Db)
when RequiredSeq =< Db#db.committed_update_seq ->
- {reply, ok, Db};
+ {reply, ok, Db, idle_limit()};
handle_call({full_commit, _}, _, Db) ->
- {reply, ok, commit_data(Db)}; % commit the data and return ok
+ {reply, ok, commit_data(Db), idle_limit()}; % commit the data and return ok
handle_call(start_compact, _From, Db) ->
- {noreply, NewDb} = handle_cast(start_compact, Db),
- {reply, {ok, NewDb#db.compactor_pid}, NewDb};
+ {noreply, NewDb, _Timeout} = handle_cast(start_compact, Db),
+ {reply, {ok, NewDb#db.compactor_pid}, NewDb, idle_limit()};
handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) ->
- {reply, Pid, Db};
+ {reply, Pid, Db, idle_limit()};
handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
- {reply, ok, Db};
+ {reply, ok, Db, idle_limit()};
handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
unlink(Pid),
exit(Pid, kill),
@@ -108,12 +111,12 @@ handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
Db2 = Db#db{compactor_pid = nil},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
+ {reply, ok, Db2, idle_limit()};
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_event:notify(Db#db.name, updated),
- {reply, {ok, Db2#db.update_seq}, Db2};
+ {reply, {ok, Db2#db.update_seq}, Db2, idle_limit()};
handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
{ok, Ptr, _} = couch_file:append_term(
@@ -121,17 +124,17 @@ handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
update_seq=Db#db.update_seq+1}),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
+ {reply, ok, Db2, idle_limit()};
handle_call({set_revs_limit, Limit}, _From, Db) ->
Db2 = commit_data(Db#db{revs_limit=Limit,
update_seq=Db#db.update_seq+1}),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {reply, ok, Db2};
+ {reply, ok, Db2, idle_limit()};
handle_call({purge_docs, _IdRevs}, _From,
#db{compactor_pid=Pid}=Db) when Pid /= nil ->
- {reply, {error, purge_during_compaction}, Db};
+ {reply, {error, purge_during_compaction}, Db, idle_limit()};
handle_call({purge_docs, IdRevs}, _From, Db) ->
#db{
fd = Fd,
@@ -199,13 +202,14 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_event:notify(Db#db.name, updated),
- {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2}.
+ {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2,
+ idle_limit()}.
handle_cast({load_validation_funs, ValidationFuns}, Db) ->
Db2 = Db#db{validate_doc_funs = ValidationFuns},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2};
+ {noreply, Db2, idle_limit()};
handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
nil ->
@@ -213,10 +217,10 @@ handle_cast(start_compact, Db) ->
Pid = spawn_link(fun() -> start_copy_compact(Db) end),
Db2 = Db#db{compactor_pid=Pid},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2};
+ {noreply, Db2, idle_limit()};
_ ->
% compact currently running, this is a no-op
- {noreply, Db}
+ {noreply, Db, idle_limit()}
end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath,fd=Fd}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
@@ -259,7 +263,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath,fd=Fd}=Db) ->
ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
couch_event:notify(NewDb3#db.name, compacted),
couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]),
- {noreply, NewDb3#db{compactor_pid=nil}};
+ {noreply, NewDb3#db{compactor_pid=nil}, idle_limit()};
false ->
couch_log:info("Compaction file still behind main file "
"(update seq=~p. compact update seq=~p). Retrying.",
@@ -268,9 +272,12 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath,fd=Fd}=Db) ->
Pid = spawn_link(fun() -> start_copy_compact(Db) end),
Db2 = Db#db{compactor_pid=Pid},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2}
+ {noreply, Db2, idle_limit()}
end;
+handle_cast(wakeup, Db) ->
+ {noreply, Db, idle_limit()};
+
handle_cast(Msg, #db{name = Name} = Db) ->
couch_log:error("Database `~s` updater received unexpected cast: ~p",
[Name, Msg]),
@@ -317,30 +324,49 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
false ->
Db2
end,
- {noreply, Db3, hibernate}
+ {noreply, Db3, hibernate_if_no_idle_limit()}
catch
throw: retry ->
[catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
- {noreply, Db, hibernate}
+ {noreply, Db, hibernate_if_no_idle_limit()}
end;
handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
%no outstanding delayed commits, ignore
- {noreply, Db};
+ {noreply, Db, idle_limit()};
handle_info(delayed_commit, Db) ->
case commit_data(Db) of
Db ->
- {noreply, Db};
+ {noreply, Db, idle_limit()};
Db2 ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- {noreply, Db2}
+ {noreply, Db2, idle_limit()}
end;
handle_info({'EXIT', _Pid, normal}, Db) ->
- {noreply, Db};
+ {noreply, Db, idle_limit()};
handle_info({'EXIT', _Pid, Reason}, Db) ->
{stop, Reason, Db};
handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]),
- {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}.
+ {stop, normal, Db#db{fd=undefined, fd_monitor=closed}};
+handle_info(timeout, #db{fd=Fd, name=DbName} = Db) ->
+ IdleLimitMSec = update_idle_limit_from_config(),
+ case couch_db:is_idle(Db) of
+ true ->
+ MSecSinceLastRead = couch_file:msec_since_last_read(Fd),
+ case MSecSinceLastRead > IdleLimitMSec of
+ true ->
+ ok = couch_server:close_db_if_idle(DbName);
+ false ->
+ ok
+ end;
+ false ->
+ ok
+ end,
+ % Send a message to wake up and then hibernate. Hibernation here is done to
+ % force a thorough garbage collection.
+ gen_server:cast(self(), wakeup),
+ {noreply, Db, hibernate}.
+
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -1457,3 +1483,29 @@ default_security_object(_DbName) ->
"everyone" ->
[]
end.
+
+% These functions rely on using the process dictionary. This is
+% usually frowned upon however in this case it is done to avoid
+% changing to a different server state record. Once PSE (Pluggable
+% Storage Engine) code lands this should be moved to the #db{} record.
+update_idle_limit_from_config() ->
+ Default = integer_to_list(?IDLE_LIMIT_DEFAULT),
+ IdleLimit = case config:get("couchdb", "idle_check_timeout", Default) of
+ "infinity" ->
+ infinity;
+ Milliseconds ->
+ list_to_integer(Milliseconds)
+ end,
+ put(idle_limit, IdleLimit),
+ IdleLimit.
+
+idle_limit() ->
+ get(idle_limit).
+
+hibernate_if_no_idle_limit() ->
+ case idle_limit() of
+ infinity ->
+ hibernate;
+ Timeout when is_integer(Timeout) ->
+ Timeout
+ end.
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index d40c525f2..8df462b05 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -44,6 +44,7 @@
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
-export([write_header/2, read_header/1]).
-export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
+-export([msec_since_last_read/1]).
% gen_server callbacks
-export([init/1, terminate/2, code_change/3]).
@@ -338,12 +339,25 @@ init_status_error(ReturnPid, Ref, Error) ->
ReturnPid ! {Ref, self(), Error},
ignore.
+
+% Return time since last read. The return value is conservative in the
+% sense that if no read timestamp has been found, it would return 0. This
+% result is used to decide if reader is idle so returning 0 will avoid marking
+% it idle by accident when process is starting up.
+msec_since_last_read(Fd) when is_pid(Fd) ->
+ Now = os:timestamp(),
+ LastRead = couch_util:process_dict_get(Fd, read_timestamp, Now),
+ DtMSec = timer:now_diff(Now, LastRead) div 1000,
+ max(0, DtMSec).
+
+
% server functions
init({Filepath, Options, ReturnPid, Ref}) ->
OpenOptions = file_open_options(Options),
Limit = get_pread_limit(),
IsSys = lists:member(sys_db, Options),
+ update_read_timestamp(),
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
@@ -422,6 +436,7 @@ handle_call(close, _From, #file{fd=Fd}=File) ->
{stop, normal, file:close(Fd), File#file{fd = nil}};
handle_call({pread_iolist, Pos}, _From, File) ->
+ update_read_timestamp(),
{LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
case iolist_to_binary(LenIolist) of
<<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
@@ -695,13 +710,10 @@ is_idle(#file{is_sys=false}) ->
{Fd :: pid() | tuple(), FilePath :: string()} | undefined.
process_info(Pid) ->
- {dictionary, Dict} = erlang:process_info(Pid, dictionary),
- case lists:keyfind(couch_file_fd, 1, Dict) of
- false ->
- undefined;
- {couch_file_fd, {Fd, InitialName}} ->
- {Fd, InitialName}
- end.
+ couch_util:process_dict_get(Pid, couch_file_fd).
+
+update_read_timestamp() ->
+ put(read_timestamp, os:timestamp()).
upgrade_state(#file{db_monitor=DbPid}=File) when is_pid(DbPid) ->
unlink(DbPid),
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index ad2a5f0ec..26c6c77a2 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -21,6 +21,7 @@
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
-export([close_lru/0]).
+-export([close_db_if_idle/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -173,6 +174,15 @@ hash_admin_passwords(Persist) ->
config:set("admins", User, ?b2l(HashedPassword), Persist)
end, couch_passwords:get_unhashed_admins()).
+close_db_if_idle(DbName) ->
+ case ets:lookup(couch_dbs, DbName) of
+ [#db{}] ->
+ gen_server:cast(couch_server, {close_db_if_idle, DbName});
+ _ ->
+ ok
+ end.
+
+
init([]) ->
% read config and register for configuration changes
@@ -508,6 +518,24 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} =
{noreply, Server#server{lru = couch_lru:update(DbName, Lru)}};
handle_cast({update_lru, _DbName}, Server) ->
{noreply, Server};
+handle_cast({close_db_if_idle, DbName}, Server) ->
+ case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of
+ true ->
+ [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
+ case couch_db:is_idle(Db) of
+ true ->
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_dbs_pid_to_name, Pid),
+ exit(Pid, kill),
+ {noreply, db_closed(Server, Db#db.options)};
+ false ->
+ true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
+ {noreply, Server}
+ end;
+ false ->
+ {noreply, Server}
+ end;
+
handle_cast(Msg, Server) ->
{stop, {unknown_cast_message, Msg}, Server}.
diff --git a/src/couch/src/couch_util.erl b/src/couch/src/couch_util.erl
index 6001ae2e4..4b848616d 100644
--- a/src/couch/src/couch_util.erl
+++ b/src/couch/src/couch_util.erl
@@ -33,6 +33,7 @@
-export([find_in_binary/2]).
-export([callback_exists/3, validate_callback_exists/3]).
-export([with_proc/4]).
+-export([process_dict_get/2, process_dict_get/3]).
-include_lib("couch/include/couch_db.hrl").
@@ -598,3 +599,21 @@ with_proc(M, F, A, Timeout) ->
erlang:demonitor(Ref, [flush]),
{error, timeout}
end.
+
+
+process_dict_get(Pid, Key) ->
+ process_dict_get(Pid, Key, undefined).
+
+
+process_dict_get(Pid, Key, DefaultValue) ->
+ case process_info(Pid, dictionary) of
+ {dictionary, Dict} ->
+ case lists:keyfind(Key, 1, Dict) of
+ false ->
+ DefaultValue;
+ {Key, Value} ->
+ Value
+ end;
+ undefined ->
+ DefaultValue
+ end.