summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorncshaw <ncshaw@ibm.com>2021-11-29 18:04:59 -0500
committerncshaw <ncshaw@ibm.com>2022-03-14 16:17:27 -0500
commit5ed8d54bd834a43a41eba11524171217f141e0cb (patch)
tree59afc77d2ab8d11095c18875c1b13d8bf655f742
parent79577d8adeca8577a4709ea51cc9bbd3e7bb331e (diff)
downloadcouchdb-5ed8d54bd834a43a41eba11524171217f141e0cb.tar.gz
Add smoosh queue persistence
-rwxr-xr-xconfigure1
-rwxr-xr-xdev/run3
-rw-r--r--rel/overlay/etc/default.ini3
-rw-r--r--rel/plugins/eunit_plugin.erl4
-rw-r--r--setup_eunit.template3
-rw-r--r--src/couch/src/couch_bt_engine.erl10
-rw-r--r--src/couch/src/couch_db.erl4
-rw-r--r--src/couch/src/couch_db_engine.erl4
-rw-r--r--src/couch/src/couch_server.erl9
-rw-r--r--src/smoosh/operator_guide.md20
-rw-r--r--src/smoosh/src/smoosh.app.src33
-rw-r--r--src/smoosh/src/smoosh_channel.erl254
-rw-r--r--src/smoosh/src/smoosh_priority_queue.erl122
-rw-r--r--src/smoosh/src/smoosh_server.erl24
-rw-r--r--src/smoosh/src/smoosh_utils.erl31
-rw-r--r--src/smoosh/test/smoosh_priority_queue_tests.erl167
-rw-r--r--src/smoosh/test/smoosh_tests.erl130
17 files changed, 733 insertions, 89 deletions
diff --git a/configure b/configure
index 0bcbfaef3..d8e592b9e 100755
--- a/configure
+++ b/configure
@@ -250,6 +250,7 @@ cat > rel/couchdb.config << EOF
{prefix, "."}.
{data_dir, "./data"}.
{view_index_dir, "./data"}.
+{state_dir, "./data"}.
{log_file, "$LOG_FILE"}.
{fauxton_root, "./share/www"}.
{user, "$COUCHDB_USER"}.
diff --git a/dev/run b/dev/run
index 3ca67a186..05ed16abb 100755
--- a/dev/run
+++ b/dev/run
@@ -299,6 +299,9 @@ def setup_configs(ctx):
"view_index_dir": toposixpath(
ensure_dir_exists(ctx["devdir"], "lib", node, "data")
),
+ "state_dir": toposixpath(
+ ensure_dir_exists(ctx["devdir"], "lib", node, "data")
+ ),
"node_name": "-name %s@127.0.0.1" % node,
"cluster_port": cluster_port,
"backend_port": backend_port,
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 6b64c6d74..5def66caa 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -643,6 +643,9 @@ partitioned||* = true
;[smoosh.slack_views]
;priority = slack
;min_priority = 536870912
+;
+; Directory to store the state of smoosh
+state_dir = {{state_dir}}
[ioq]
; The maximum number of concurrent in-flight IO requests that
diff --git a/rel/plugins/eunit_plugin.erl b/rel/plugins/eunit_plugin.erl
index 69003aba6..8f298db5f 100644
--- a/rel/plugins/eunit_plugin.erl
+++ b/rel/plugins/eunit_plugin.erl
@@ -29,12 +29,14 @@ build_eunit_config(Config0, AppFile) ->
Cwd = filename:absname(rebar_utils:get_cwd()),
DataDir = Cwd ++ "/tmp/data",
ViewIndexDir = Cwd ++ "/tmp/data",
+ StateDir = Cwd ++ "/tmp/data",
TmpDataDir = Cwd ++ "/tmp/tmp_data",
cleanup_dirs([DataDir, TmpDataDir]),
Config1 = rebar_config:set_global(Config0, template, "setup_eunit"),
Config2 = rebar_config:set_global(Config1, prefix, Cwd),
Config3 = rebar_config:set_global(Config2, data_dir, DataDir),
- Config = rebar_config:set_global(Config3, view_index_dir, ViewIndexDir),
+ Config4 = rebar_config:set_global(Config3, view_index_dir, ViewIndexDir),
+ Config = rebar_config:set_global(Config4, state_dir, StateDir),
rebar_templater:create(Config, AppFile).
cleanup_dirs(Dirs) ->
diff --git a/setup_eunit.template b/setup_eunit.template
index 3625441bd..ceef60d12 100644
--- a/setup_eunit.template
+++ b/setup_eunit.template
@@ -7,7 +7,8 @@
{data_dir, "/tmp"},
{prefix, "/tmp"},
- {view_index_dir, "/tmp"}
+ {view_index_dir, "/tmp"},
+ {state_dir, "/tmp"}
]}.
{dir, "tmp"}.
{dir, "tmp/etc"}.
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 7d2390556..42dbed23e 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -19,6 +19,8 @@
delete/3,
delete_compaction_files/3,
+ is_compacting/1,
+
init/2,
terminate/2,
handle_db_updater_call/2,
@@ -139,6 +141,14 @@ delete_compaction_files(RootDir, FilePath, DelOpts) ->
[".compact", ".compact.data", ".compact.meta"]
).
+is_compacting(DbName) ->
+ lists:any(
+ fun(Ext) ->
+ filelib:is_regular(DbName ++ Ext)
+ end,
+ [".compact", ".compact.data", ".compact.meta"]
+ ).
+
init(FilePath, Options) ->
{ok, Fd} = open_db_file(FilePath, Options),
Header =
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index fa003e16d..cafbb0dc9 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -120,6 +120,7 @@
cancel_compact/1,
wait_for_compaction/1,
wait_for_compaction/2,
+ is_compacting/1,
dbname_suffix/1,
normalize_dbname/1,
@@ -277,6 +278,9 @@ wait_for_compaction(#db{main_pid = Pid} = Db, Timeout) ->
ok
end.
+is_compacting(DbName) ->
+ couch_server:is_compacting(DbName).
+
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id = Id, revs = [Rev], deleted = true} || Rev <- Revisions],
{ok, [Result]} = update_docs(Db, DeletedDocs, []),
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index de4a42495..9e46b816b 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -662,6 +662,7 @@
exists/2,
delete/4,
delete_compaction_files/4,
+ is_compacting/2,
init/3,
terminate/2,
@@ -736,6 +737,9 @@ delete_compaction_files(Engine, RootDir, DbPath, DelOpts) when
->
Engine:delete_compaction_files(RootDir, DbPath, DelOpts).
+is_compacting(Engine, DbName) ->
+ Engine:is_compacting(DbName).
+
init(Engine, DbPath, Options) ->
case Engine:init(DbPath, Options) of
{ok, EngineState} ->
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 5e295381d..74217894c 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -22,6 +22,7 @@
-export([dev_start/0, is_admin/2, has_admins/0, get_stats/0]).
-export([close_db_if_idle/1]).
-export([delete_compaction_files/1]).
+-export([is_compacting/1]).
-export([exists/1]).
-export([get_engine_extensions/0]).
-export([get_engine_path/2]).
@@ -183,6 +184,14 @@ delete_compaction_files(DbName, DelOpts) when is_list(DbName) ->
delete_compaction_files(DbName, DelOpts) when is_binary(DbName) ->
delete_compaction_files(?b2l(DbName), DelOpts).
+is_compacting(DbName) ->
+ lists:any(
+ fun({_, Engine}) ->
+ couch_db_engine:is_compacting(Engine, DbName)
+ end,
+ get_configured_engines()
+ ).
+
maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) ->
maybe_add_sys_db_callbacks(?b2l(DbName), Options);
maybe_add_sys_db_callbacks(DbName, Options) ->
diff --git a/src/smoosh/operator_guide.md b/src/smoosh/operator_guide.md
index afe08f8e7..3dd8cdce5 100644
--- a/src/smoosh/operator_guide.md
+++ b/src/smoosh/operator_guide.md
@@ -107,7 +107,27 @@ However, it's the best measure we currently have.
[Even more info](https://github.com/apache/couchdb-smoosh#notes-on-the-data_size-value).
+#### State diagram
+```
+stateDiagram
+ [*] --> init
+ init --> start_recovery: send_after(?START_DELAY_IN_MSEC, self(), start_recovery)
+ note right of start_recovery
+ activated = false
+ paused = true
+ end note
+ start_recovery --> activate: send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate)
+ note right of activate
+ state has been recovered
+ activated = true
+ paused = true
+ end note
+ activate --> schedule_unpause
+ schedule_unpause --> [*]: after 30 sec, paused = false and compaction of new jobs begin
+```
+
+See [here](https://mermaid.ink/img/pako:eNqNUtFKwzAU_ZVLnjbpQPCtoFK2PgzUB1sEsVKy5LaNtklJ08kY-3eTtKVsTDBPybnnnntOkiNhiiMJSWeowY2gpaZNJsGuj5tPWK0eQEhhBsTtPGTJ2uQamdqjPoTQoeQ5LQzqxWOSRq9pvomfovd8-5I_J_E6sIS6WCyDi8blICuVQdCirAyo4oIyMNyizIi99cjhHgpadziXWtp3Hje6H2FryOsOp3NNH2GSOzcfrdPtW5TGf_mfuq46n4qzMX-pUNEOdogSRgPIr6ea3f8r1NQ6vAirkPc15r30jWPuC9RT7buG4PPC3a1NxoJ5lr9YoHYOU03rJijpokn8gS-1czlKIUlAGtQNFdx-nKMblRFTYYMZCe2WU_2dkUyeLK9vubUYc2GUJqGXDwjtjUoOkpHQZZtI4-cbWadfC0TavA) for a diagram of smoosh's initial state during the recovery process.
### Defining a channel
Defining a channel is done via normal dbcore configuration, with some
diff --git a/src/smoosh/src/smoosh.app.src b/src/smoosh/src/smoosh.app.src
index a6cdb7f5e..4549c6610 100644
--- a/src/smoosh/src/smoosh.app.src
+++ b/src/smoosh/src/smoosh.app.src
@@ -10,20 +10,19 @@
% License for the specific language governing permissions and limitations under
% the License.
-{application, smoosh,
- [
- {description, "Auto-compaction daemon"},
- {vsn, git},
- {registered, [smoosh_server]},
- {applications, [
- kernel,
- stdlib,
- couch_log,
- config,
- couch_event,
- couch,
- mem3
- ]},
- {mod, { smoosh_app, []}},
- {env, []}
- ]}.
+{application, smoosh, [
+ {description, "Auto-compaction daemon"},
+ {vsn, git},
+ {registered, [smoosh_server]},
+ {applications, [
+ kernel,
+ stdlib,
+ couch_log,
+ config,
+ couch_event,
+ couch,
+ mem3
+ ]},
+ {mod, {smoosh_app, []}},
+ {env, []}
+]}.
diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index 06849ac3a..064f18afc 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -16,8 +16,8 @@
-include_lib("couch/include/couch_db.hrl").
% public api.
--export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1]).
+-export([start_link/1, close/1, suspend/1, resume/1, activate/1, get_status/1]).
+-export([enqueue/3, last_updated/2, flush/1, is_key/2, is_activated/1, persist/1]).
% gen_server api.
-export([
@@ -25,18 +25,38 @@
handle_call/3,
handle_cast/2,
handle_info/2,
- code_change/3,
terminate/2
]).
+-define(VSN, 1).
+-define(CHECKPOINT_INTERVAL_IN_MSEC, 180000).
+
+-ifndef(TEST).
+-define(START_DELAY_IN_MSEC, 60000).
+-define(ACTIVATE_DELAY_IN_MSEC, 30000).
+-else.
+-define(START_DELAY_IN_MSEC, 0).
+-define(ACTIVATE_DELAY_IN_MSEC, 0).
+-endif.
+
% records.
+% When the state is set to activated = true, the channel has completed the state
+% recovery process that occurs on (re)start and is accepting new compaction jobs.
+% Note: if activated = false and a request for a new compaction job is received,
+% smoosh will enqueue this new job after the state recovery process has finished.
+% When the state is set to paused = false, the channel is actively compacting any
+% compaction jobs that are scheduled.
+% See operator_guide.md --> State diagram.
+
-record(state, {
active = [],
name,
- waiting = smoosh_priority_queue:new(),
+ waiting,
paused = true,
- starting = []
+ starting = [],
+ activated = false,
+ requests = []
}).
% public functions.
@@ -48,7 +68,10 @@ suspend(ServerRef) ->
gen_server:call(ServerRef, suspend).
resume(ServerRef) ->
- gen_server:call(ServerRef, resume).
+ gen_server:call(ServerRef, resume_and_activate).
+
+activate(ServerRef) ->
+ gen_server:call(ServerRef, activate).
enqueue(ServerRef, Object, Priority) ->
gen_server:cast(ServerRef, {enqueue, Object, Priority}).
@@ -65,32 +88,42 @@ close(ServerRef) ->
flush(ServerRef) ->
gen_server:call(ServerRef, flush).
+is_key(ServerRef, Key) ->
+ gen_server:call(ServerRef, {is_key, Key}).
+
+is_activated(ServerRef) ->
+ gen_server:call(ServerRef, is_activated).
+
+persist(ServerRef) ->
+ gen_server:call(ServerRef, persist).
+
% gen_server functions.
init(Name) ->
- schedule_unpause(),
erlang:send_after(60 * 1000, self(), check_window),
- {ok, #state{name = Name}}.
+ process_flag(trap_exit, true),
+ Waiting = smoosh_priority_queue:new(Name),
+ State = #state{name = Name, waiting = Waiting, paused = true, activated = false},
+ erlang:send_after(?START_DELAY_IN_MSEC, self(), start_recovery),
+ {ok, State}.
-handle_call({last_updated, Object}, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_call({last_updated, Object}, _From, State) ->
LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
{reply, LastUpdated, State};
-handle_call(suspend, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_call(suspend, _From, State) ->
#state{active = Active} = State,
[
catch erlang:suspend_process(Pid, [unless_suspending])
|| {_, Pid} <- Active
],
{reply, ok, State#state{paused = true}};
-handle_call(resume, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_call(resume_and_activate, _From, State) ->
#state{active = Active} = State,
[catch erlang:resume_process(Pid) || {_, Pid} <- Active],
- {reply, ok, State#state{paused = false}};
-handle_call(status, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ {reply, ok, State#state{paused = false, activated = true}};
+handle_call(activate, _From, State) ->
+ {reply, ok, State#state{activated = true}};
+handle_call(status, _From, State) ->
{reply,
{ok, [
{active, length(State#state.active)},
@@ -98,27 +131,38 @@ handle_call(status, _From, State0) ->
{waiting, smoosh_priority_queue:info(State#state.waiting)}
]},
State};
-handle_call(close, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_call(close, _From, State) ->
{stop, normal, ok, State};
-handle_call(flush, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {reply, ok, State#state{waiting = smoosh_priority_queue:new()}}.
+handle_call(flush, _From, #state{waiting = Q} = State) ->
+ {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
+handle_call({is_key, Key}, _From, #state{waiting = Waiting} = State) ->
+ {reply, smoosh_priority_queue:is_key(Key, Waiting), State};
+handle_call(is_activated, _From, #state{activated = Activated} = State0) ->
+ {reply, Activated, State0};
+handle_call(persist, _From, State) ->
+ persist_queue(State),
+ {reply, ok, State}.
-handle_cast({enqueue, _Object, 0}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_cast({enqueue, _Object, 0}, #state{} = State) ->
{noreply, State};
-handle_cast({enqueue, Object, Priority}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}.
+handle_cast({enqueue, Object, Priority}, #state{activated = true} = State) ->
+ {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))};
+handle_cast({enqueue, Object, Priority}, #state{activated = false, requests = Requests} = State0) ->
+ couch_log:notice(
+ "~p Channel is not activated yet. Adding ~p to requests with priority ~p.", [
+ ?MODULE,
+ Object,
+ Priority
+ ]
+ ),
+ {noreply, State0#state{requests = [{Object, Priority} | Requests]}}.
% We accept noproc here due to possibly having monitored a restarted compaction
% pid after it finished.
-handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
+handle_info({'DOWN', Ref, _, Job, Reason}, State) when
Reason == normal;
Reason == noproc
->
- {ok, State} = code_change(nil, State0, nil),
#state{active = Active, starting = Starting} = State,
{noreply,
maybe_start_compaction(
@@ -127,8 +171,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
starting = lists:keydelete(Ref, 1, Starting)
}
)};
-handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_info({'DOWN', Ref, _, Job, Reason}, State) ->
#state{active = Active0, starting = Starting0} = State,
case lists:keytake(Job, 2, Active0) of
{value, {Key, _Pid}, Active1} ->
@@ -142,7 +185,8 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
case lists:keytake(Ref, 1, Starting0) of
{value, {_, Key}, Starting1} ->
couch_log:warning("failed to start compaction of ~p: ~p", [
- smoosh_utils:stringify(Key), Reason
+ smoosh_utils:stringify(Key),
+ Reason
]),
{ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
{noreply, maybe_start_compaction(State#state{starting = Starting1})};
@@ -150,8 +194,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
{noreply, State}
end
end;
-handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_info({Ref, {ok, Pid}}, State) when is_reference(Ref) ->
case lists:keytake(Ref, 1, State#state.starting) of
{value, {_, Key}, Starting1} ->
couch_log:notice(
@@ -167,8 +210,7 @@ handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
false ->
{noreply, State}
end;
-handle_info(check_window, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_info(check_window, State) ->
#state{paused = Paused, name = Name} = State,
StrictWindow = smoosh_utils:get(Name, "strict_window", "false"),
FinalState =
@@ -194,18 +236,113 @@ handle_info(check_window, State0) ->
end,
erlang:send_after(60 * 1000, self(), check_window),
{noreply, FinalState};
-handle_info(pause, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) ->
+ RecActive = recover(active_file_name(Name)),
+ Waiting1 = lists:foldl(
+ fun(DbName, Acc) ->
+ case couch_db:is_compacting(DbName) of
+ true ->
+ Priority = smoosh_server:get_priority(Name, DbName),
+ smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
+ false ->
+ Acc
+ end
+ end,
+ Waiting0,
+ RecActive
+ ),
+ State1 = maybe_start_compaction(State0#state{paused = false, waiting = Waiting1}),
+ couch_log:notice(
+ "~p Previously active compaction jobs (if any) have been successfully recovered and restarted.",
+ [?MODULE]
+ ),
+ erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate),
+ {noreply, State1#state{paused = true}};
+handle_info(activate, State) ->
+ {noreply, activate_channel(State)};
+handle_info(persist, State) ->
+ persist_queue(State),
+ erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist),
+ {noreply, State};
+handle_info(pause, State) ->
{noreply, State#state{paused = true}};
-handle_info(unpause, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+handle_info(unpause, State) ->
{noreply, maybe_start_compaction(State#state{paused = false})}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, #state{} = State, _Extra) ->
- {ok, State}.
+persist_queue(State) ->
+ write_state_to_file(State).
+
+recover(FilePath) ->
+ case do_recover(FilePath) of
+ {ok, List} ->
+ List;
+ error ->
+ []
+ end.
+
+do_recover(FilePath) ->
+ case file:read_file(FilePath) of
+ {ok, Content} ->
+ <<Vsn, Binary/binary>> = Content,
+ try parse_state(Vsn, ?VSN, Binary) of
+ Term ->
+ couch_log:notice(
+ "~p Successfully restored state file ~s", [?MODULE, FilePath]
+ ),
+ {ok, Term}
+ catch
+ error:Reason ->
+ couch_log:error(
+ "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
+ ),
+ file:delete(FilePath),
+ error
+ end;
+ {error, enoent} ->
+ couch_log:notice(
+ "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
+ ),
+ error;
+ {error, Reason} ->
+ couch_log:error(
+ "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
+ ),
+ file:delete(FilePath),
+ error
+ end.
+
+parse_state(1, ?VSN, Binary) ->
+ erlang:binary_to_term(Binary, [safe]);
+parse_state(Vsn, ?VSN, _) ->
+ error({unsupported_version, Vsn}).
+
+write_state_to_file(#state{name = Name, active = Active, starting = Starting, waiting = Waiting}) ->
+ Active1 = lists:foldl(
+ fun({DbName, _}, Acc) ->
+ [DbName | Acc]
+ end,
+ [],
+ Active
+ ),
+ Starting1 = lists:foldl(
+ fun({_, DbName}, Acc) ->
+ [DbName | Acc]
+ end,
+ [],
+ Starting
+ ),
+ smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN),
+ smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN),
+ smoosh_priority_queue:write_to_file(Waiting).
+
+active_file_name(Name) ->
+ filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
+
+starting_file_name(Name) ->
+ filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
% private functions.
@@ -225,6 +362,37 @@ add_to_queue(Key, Priority, State) ->
}
end.
+maybe_activate(#state{activated = true} = State) ->
+ State;
+maybe_activate(State) ->
+ activate_channel(State).
+
+activate_channel(#state{name = Name, waiting = Waiting0, requests = Requests0} = State0) ->
+ RecStarting = recover(starting_file_name(Name)),
+ Starting = lists:foldl(
+ fun(DbName, Acc) ->
+ Priority = smoosh_server:get_priority(Name, DbName),
+ smoosh_priority_queue:in(DbName, Priority, Priority, Acc)
+ end,
+ Waiting0,
+ RecStarting
+ ),
+ Waiting1 = smoosh_priority_queue:recover(Starting),
+ Requests1 = lists:reverse(Requests0),
+ Waiting2 = lists:foldl(
+ fun({DbName, Priority}, Acc) ->
+ smoosh_priority_queue:in(DbName, Priority, Priority, Acc)
+ end,
+ Waiting1,
+ Requests1
+ ),
+ State1 = maybe_start_compaction(State0#state{
+ waiting = Waiting2, paused = false, activated = true, requests = []
+ }),
+ handle_info(persist, State1),
+ schedule_unpause(),
+ State1#state{paused = true}.
+
maybe_start_compaction(#state{paused = true} = State) ->
State;
maybe_start_compaction(State) ->
@@ -239,7 +407,7 @@ maybe_start_compaction(State) ->
length(State#state.active) + length(State#state.starting) < Concurrency ->
case smoosh_priority_queue:out(State#state.waiting) of
false ->
- State;
+ maybe_activate(State);
{Key, Priority, Q} ->
try
State2 =
diff --git a/src/smoosh/src/smoosh_priority_queue.erl b/src/smoosh/src/smoosh_priority_queue.erl
index b6f4b6dd8..83c93663c 100644
--- a/src/smoosh/src/smoosh_priority_queue.erl
+++ b/src/smoosh/src/smoosh_priority_queue.erl
@@ -12,33 +12,68 @@
-module(smoosh_priority_queue).
--export([new/0, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+-export([new/1, recover/1]).
+
+-export([last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+
+-export([flush/1]).
+
+-export([from_list/2, to_list/1]).
+
+-export([is_empty/1]).
+
+-export([file_name/1, write_to_file/1]).
+
+-define(VSN, 1).
-record(priority_queue, {
- dict = dict:new(),
- tree = gb_trees:empty()
+ name,
+ map,
+ tree
}).
-new() ->
- #priority_queue{}.
+new(Name) ->
+ #priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}.
+
+recover(#priority_queue{name = Name, map = Map0} = Q) ->
+ case do_recover(file_name(Q)) of
+ {ok, Terms} ->
+ Map = maps:merge(Map0, Terms),
+ Tree = maps:fold(
+ fun(Key, {TreeKey, Value}, TreeAcc) ->
+ gb_trees:enter(TreeKey, {Key, Value}, TreeAcc)
+ end,
+ gb_trees:empty(),
+ Map
+ ),
+ #priority_queue{name = Name, map = Map, tree = Tree};
+ error ->
+ Q
+ end.
+
+write_to_file(#priority_queue{map = Map} = Q) ->
+ smoosh_utils:write_to_file(Map, file_name(Q), ?VSN).
+
+flush(#priority_queue{name = Name} = Q) ->
+ Q#priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}.
-last_updated(Key, #priority_queue{dict = Dict}) ->
- case dict:find(Key, Dict) of
+last_updated(Key, #priority_queue{map = Map}) ->
+ case maps:find(Key, Map) of
{ok, {_Priority, {LastUpdatedMTime, _MInt}}} ->
LastUpdatedMTime;
error ->
false
end.
-is_key(Key, #priority_queue{dict = Dict}) ->
- dict:is_key(Key, Dict).
+is_key(Key, #priority_queue{map = Map}) ->
+ maps:is_key(Key, Map).
in(Key, Value, Priority, Q) ->
in(Key, Value, Priority, infinity, Q).
-in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) ->
+in(Key, Value, Priority, Capacity, #priority_queue{name = Name, map = Map, tree = Tree}) ->
Tree1 =
- case dict:find(Key, Dict) of
+ case maps:find(Key, Map) of
{ok, TreeKey} ->
gb_trees:delete_any(TreeKey, Tree);
error ->
@@ -47,17 +82,18 @@ in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) ->
Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])},
TreeKey1 = {Priority, Now},
Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1),
- Dict1 = dict:store(Key, TreeKey1, Dict),
- truncate(Capacity, #priority_queue{dict = Dict1, tree = Tree2}).
+ Map1 = maps:put(Key, TreeKey1, Map),
+ truncate(Capacity, #priority_queue{name = Name, map = Map1, tree = Tree2}).
-out(#priority_queue{dict = Dict, tree = Tree}) ->
+out(#priority_queue{name = Name, map = Map, tree = Tree}) ->
case gb_trees:is_empty(Tree) of
true ->
false;
false ->
{_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree),
- Dict1 = dict:erase(Key, Dict),
- {Key, Value, #priority_queue{dict = Dict1, tree = Tree1}}
+ Map1 = maps:remove(Key, Map),
+ Q = #priority_queue{name = Name, map = Map1, tree = Tree1},
+ {Key, Value, Q}
end.
size(#priority_queue{tree = Tree}) ->
@@ -76,6 +112,20 @@ info(#priority_queue{tree = Tree} = Q) ->
end
].
+from_list(Orddict, #priority_queue{name = Name}) ->
+ Map = maps:from_list(Orddict),
+ Tree = gb_trees:from_orddict(Orddict),
+ #priority_queue{name = Name, map = Map, tree = Tree}.
+
+to_list(#priority_queue{tree = Tree}) ->
+ gb_trees:to_list(Tree).
+
+is_empty(#priority_queue{tree = Tree}) ->
+ gb_trees:is_empty(Tree).
+
+file_name(#priority_queue{name = Name}) ->
+ filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".waiting").
+
truncate(infinity, Q) ->
Q;
truncate(Capacity, Q) when Capacity > 0 ->
@@ -83,7 +133,43 @@ truncate(Capacity, Q) when Capacity > 0 ->
truncate(Capacity, Size, Q) when Size =< Capacity ->
Q;
-truncate(Capacity, Size, #priority_queue{dict = Dict, tree = Tree}) when Size > 0 ->
+truncate(Capacity, Size, #priority_queue{name = Name, map = Map, tree = Tree}) when Size > 0 ->
{_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree),
- Q1 = #priority_queue{dict = dict:erase(Key, Dict), tree = Tree1},
+ Q1 = #priority_queue{name = Name, map = maps:remove(Key, Map), tree = Tree1},
truncate(Capacity, ?MODULE:size(Q1), Q1).
+
+do_recover(FilePath) ->
+ case file:read_file(FilePath) of
+ {ok, Content} ->
+ <<Vsn, Binary/binary>> = Content,
+ try parse_queue(Vsn, ?VSN, Binary) of
+ Bin ->
+ couch_log:notice(
+ "~p Successfully restored state file ~s", [?MODULE, FilePath]
+ ),
+ {ok, Bin}
+ catch
+ error:Reason ->
+ couch_log:error(
+ "~p Invalid queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
+ ),
+ file:delete(FilePath),
+ error
+ end;
+ {error, enoent} ->
+ couch_log:notice(
+ "~p (~p) Queue file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
+ ),
+ error;
+ {error, Reason} ->
+ couch_log:error(
+ "~p Cannot read the queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
+ ),
+ file:delete(FilePath),
+ error
+ end.
+
+parse_queue(1, ?VSN, Binary) ->
+ erlang:binary_to_term(Binary, [safe]);
+parse_queue(Vsn, ?VSN, _) ->
+ error({unsupported_version, Vsn}).
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 5529e93de..e9823c36f 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -45,6 +45,10 @@
% exported but for internal use.
-export([enqueue_request/2]).
+-export([get_priority/2]).
+
+% exported for testing and debugging
+-export([get_channel/1]).
-ifdef(TEST).
-define(RELISTEN_DELAY, 50).
@@ -60,7 +64,7 @@
schema_channels = [],
tab,
event_listener,
- waiting = dict:new()
+ waiting = maps:new()
}).
-record(channel, {
@@ -109,6 +113,10 @@ handle_db_event(DbName, {schema_updated, DDocId}, St) ->
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
+% for testing and debugging only
+get_channel(ChannelName) ->
+ gen_server:call(?MODULE, {get_channel, ChannelName}).
+
% gen_server functions.
init([]) ->
@@ -181,7 +189,9 @@ handle_call(resume, _From, State) ->
0,
State#state.tab
),
- {reply, ok, State}.
+ {reply, ok, State};
+handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) ->
+ {reply, {ok, channel_pid(Tab, ChannelName)}, State}.
handle_cast({new_db_channels, Channels}, State) ->
[
@@ -203,12 +213,12 @@ handle_cast({new_schema_channels, Channels}, State) ->
{noreply, create_missing_channels(State#state{view_channels = Channels})};
handle_cast({enqueue, Object}, State) ->
#state{waiting = Waiting} = State,
- case dict:is_key(Object, Waiting) of
+ case maps:is_key(Object, Waiting) of
true ->
{noreply, State};
false ->
{_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
- {noreply, State#state{waiting = dict:store(Object, Ref, Waiting)}}
+ {noreply, State#state{waiting = maps:put(Object, Ref, Waiting)}}
end.
handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
@@ -225,7 +235,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
end,
{noreply, create_missing_channels(State)};
handle_info({'DOWN', Ref, _, _, _}, State) ->
- Waiting = dict:filter(
+ Waiting = maps:filter(
fun(_Key, Value) -> Value =/= Ref end,
State#state.waiting
),
@@ -306,7 +316,9 @@ find_channel(Tab, [Channel | Rest], Object) ->
?SECONDS_PER_MINUTE,
Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native),
Now = erlang:monotonic_time(),
- case LastUpdated =:= false orelse Now - LastUpdated > Staleness of
+ Activated = smoosh_channel:is_activated(Pid),
+ StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness,
+ case Activated andalso StaleEnough of
true ->
case smoosh_utils:ignore_db(Object) of
true ->
diff --git a/src/smoosh/src/smoosh_utils.erl b/src/smoosh/src/smoosh_utils.erl
index 882d3ec84..625a9e4e0 100644
--- a/src/smoosh/src/smoosh_utils.erl
+++ b/src/smoosh/src/smoosh_utils.erl
@@ -14,9 +14,7 @@
-include_lib("couch/include/couch_db.hrl").
-export([get/2, get/3, group_pid/1, split/1, stringify/1, ignore_db/1]).
--export([
- in_allowed_window/1
-]).
+-export([in_allowed_window/1, write_to_file/3]).
group_pid({Shard, GroupId}) ->
case couch_view_group:open_db_group(Shard, GroupId) of
@@ -75,6 +73,33 @@ in_allowed_window(From, To) ->
({HH, MM} >= From) orelse ({HH, MM} < To)
end.
+file_delete(Path) ->
+ case file:delete(Path) of
+ Ret when Ret =:= ok; Ret =:= {error, enoent} ->
+ ok;
+ Error ->
+ Error
+ end.
+
+throw_on_error(_Args, ok) ->
+ ok;
+throw_on_error(Args, {error, Reason}) ->
+ throw({error, {Reason, Args}}).
+
+write_to_file(Content, FileName, VSN) ->
+ couch_log:notice("~p Writing state to state file ~s", [?MODULE, FileName]),
+ OnDisk = <<VSN, (erlang:term_to_binary(Content, [compressed, {minor_version, 1}]))/binary>>,
+ TmpFileName = FileName ++ ".tmp",
+ try
+ throw_on_error(TmpFileName, file_delete(TmpFileName)),
+ throw_on_error(TmpFileName, file:write_file(TmpFileName, OnDisk, [sync])),
+ throw_on_error(FileName, file_delete(FileName)),
+ throw_on_error([TmpFileName, FileName], file:rename(TmpFileName, FileName))
+ catch
+ throw:Error ->
+ Error
+ end.
+
parse_time(undefined, Default) ->
Default;
parse_time(String, Default) ->
diff --git a/src/smoosh/test/smoosh_priority_queue_tests.erl b/src/smoosh/test/smoosh_priority_queue_tests.erl
new file mode 100644
index 000000000..289804ca5
--- /dev/null
+++ b/src/smoosh/test/smoosh_priority_queue_tests.erl
@@ -0,0 +1,167 @@
+-module(smoosh_priority_queue_tests).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(PROP_PREFIX, "prop_").
+
+-define(CAPACITY, 3).
+
+-define(RANDOM_CHANNEL, lists:flatten(io_lib:format("~p", [erlang:timestamp()]))).
+
+setup() ->
+ Ctx = test_util:start_couch(),
+ Ctx.
+
+teardown(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+smoosh_priority_queue_test_() ->
+ {
+ "smoosh priority queue test",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun prop_inverse_test_/0,
+ fun no_halt_on_corrupted_file_test/0,
+ fun no_halt_on_missing_file_test/0
+ ]
+ }
+ }.
+
+%% ==========
+%% Tests
+%% ----------
+
+%% define all tests to be able to run them individually
+prop_inverse_test_() ->
+ ?_test(begin
+ test_property(prop_inverse)
+ end).
+
+no_halt_on_corrupted_file_test() ->
+ ?_test(begin
+ Name = ?RANDOM_CHANNEL,
+ Q = smoosh_priority_queue:new(Name),
+ FilePath = smoosh_priority_queue:file_name(Q),
+ ok = file:write_file(FilePath, <<"garbage">>),
+ ?assertEqual(Q, smoosh_priority_queue:recover(Q)),
+ ok
+ end).
+
+no_halt_on_missing_file_test() ->
+ ?_test(begin
+ Name = ?RANDOM_CHANNEL,
+ Q = smoosh_priority_queue:new(Name),
+ FilePath = smoosh_priority_queue:file_name(Q),
+ ok = file:delete(FilePath),
+ ?assertEqual(Q, smoosh_priority_queue:recover(Q)),
+ ok
+ end).
+
+%% ==========
+%% Properties
+%% ----------
+
+prop_inverse() ->
+ ?FORALL(
+ Q,
+ queue(),
+ begin
+ List = smoosh_priority_queue:to_list(Q),
+ equal(Q, smoosh_priority_queue:from_list(List, Q))
+ end
+ ).
+
+%% ==========
+%% Generators
+%% ----------
+
+key() ->
+ proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
+value() ->
+ proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
+priority() -> integer().
+item() -> {key(), value(), priority()}.
+
+items_list() ->
+ ?LET(L, list(item()), L).
+
+simple_queue() ->
+ ?LET(
+ L,
+ items_list(),
+ from_list(L)
+ ).
+
+with_deleted() ->
+ ?LET(
+ Q,
+ ?LET(
+ {{K0, V0, P0}, Q0},
+ {item(), simple_queue()},
+ smoosh_priority_queue:in(K0, V0, P0, ?CAPACITY, Q0)
+ ),
+ frequency([
+ {1, Q},
+ {2, element(3, smoosh_priority_queue:out(Q))}
+ ])
+ ).
+
+queue() ->
+ with_deleted().
+
+%% ==========================
+%% Proper related boilerplate
+%% --------------------------
+
+test_property(Property) when is_atom(Property) ->
+ test_property({atom_to_list(Property), Property});
+test_property({Id, Property}) ->
+ Name = string:sub_string(Id, length(?PROP_PREFIX) + 1),
+ Opts = [long_result, {numtests, 1000}, {to_file, user}],
+ {Name, {timeout, 60, fun() -> test_it(Property, Opts) end}}.
+
+test_it(Property, Opts) ->
+ case proper:quickcheck(?MODULE:Property(), Opts) of
+ true ->
+ true;
+ Else ->
+ erlang:error(
+ {propertyFailed, [
+ {module, ?MODULE},
+ {property, Property},
+ {result, Else}
+ ]}
+ )
+ end.
+
+%% ================
+%% Helper functions
+%% ----------------
+
+new() ->
+ Q = smoosh_priority_queue:new("foo"),
+ smoosh_priority_queue:recover(Q).
+
+from_list(List) ->
+ lists:foldl(
+ fun({Key, Value, Priority}, Queue) ->
+ smoosh_priority_queue:in(Key, Value, Priority, ?CAPACITY, Queue)
+ end,
+ new(),
+ List
+ ).
+
+equal(Q1, Q2) ->
+ out_all(Q1) =:= out_all(Q2).
+
+out_all(Q) ->
+ out_all(Q, []).
+out_all(Q0, Acc) ->
+ case smoosh_priority_queue:out(Q0) of
+ {K, V, Q1} -> out_all(Q1, [{K, V} | Acc]);
+ false -> lists:reverse(Acc)
+ end.
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
new file mode 100644
index 000000000..13855f616
--- /dev/null
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -0,0 +1,130 @@
+-module(smoosh_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-include("couch/src/couch_db_int.hrl").
+
+-define(KILOBYTE, binary:copy(<<"x">>, 1024)).
+
+%% ==========
+%% Setup
+%% ----------
+
+setup(ChannelType) ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ couch_db:close(Db),
+ {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+ smoosh_channel:flush(ChannelPid),
+ ok = config:set(config_section(ChannelType), "min_size", "200000", false),
+ DbName.
+
+teardown(ChannelType, DbName) ->
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+ ok = config:delete(config_section(DbName), "min_size", false),
+ {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+ smoosh_channel:flush(ChannelPid),
+ ok.
+
+config_section(ChannelType) ->
+ "smoosh." ++ ChannelType.
+
+%% ==========
+%% Tests
+%% ----------
+
+smoosh_test_() ->
+ {
+ "Testing smoosh",
+ {
+ setup,
+ fun() -> test_util:start_couch([smoosh]) end,
+ fun test_util:stop/1,
+ [
+ channels_tests(),
+ persistence_tests()
+ ]
+ }
+ }.
+
+persistence_tests() ->
+ Tests = [
+ fun should_persist_queue/2
+ ],
+ {
+ "Should persist queue state",
+ [
+ make_test_case("ratio_dbs", Tests)
+ ]
+ }.
+
+channels_tests() ->
+ Tests = [
+ fun should_enqueue/2
+ ],
+ {
+ "Various channels tests",
+ [
+ make_test_case("ratio_dbs", Tests)
+ ]
+ }.
+
+make_test_case(Type, Funs) ->
+ {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+
+should_enqueue(ChannelType, DbName) ->
+ ?_test(begin
+ ok = grow_db_file(DbName, 300),
+ ok = wait_enqueue(ChannelType, DbName),
+ ?assert(is_enqueued(ChannelType, DbName)),
+ ok
+ end).
+
+should_persist_queue(ChannelType, DbName) ->
+ ?_test(begin
+ {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+ ok = grow_db_file(DbName, 300),
+ ok = wait_enqueue(ChannelType, DbName),
+ ok = smoosh_channel:persist(ChannelPid),
+ Q0 = channel_queue(ChannelType),
+ ok = application:stop(smoosh),
+ ok = application:start(smoosh),
+ Q1 = channel_queue(ChannelType),
+ ?assertEqual(Q0, Q1),
+ ok
+ end).
+
+grow_db_file(DbName, SizeInKb) ->
+ {ok, #db{filepath = FilePath} = Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+ {ok, Fd} = file:open(FilePath, [append]),
+ Bytes = binary:copy(?KILOBYTE, SizeInKb),
+ file:write(Fd, Bytes),
+ ok = file:close(Fd),
+ Doc = couch_doc:from_json_obj(
+ {[
+ {<<"_id">>, ?l2b(?docid())},
+ {<<"value">>, ?l2b(?docid())}
+ ]}
+ ),
+ {ok, _} = couch_db:update_docs(Db, [Doc], []),
+ couch_db:close(Db),
+ ok.
+
+is_enqueued(ChannelType, DbName) ->
+ {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+ smoosh_channel:is_key(ChannelPid, DbName).
+
+wait_enqueue(ChannelType, DbName) ->
+ test_util:wait(fun() ->
+ case is_enqueued(ChannelType, DbName) of
+ false ->
+ wait;
+ true ->
+ ok
+ end
+ end).
+
+channel_queue(ChannelType) ->
+ Q0 = smoosh_priority_queue:new(ChannelType),
+ smoosh_priority_queue:recover(Q0).