diff options
author | ncshaw <ncshaw@ibm.com> | 2021-11-29 18:04:59 -0500 |
---|---|---|
committer | ncshaw <ncshaw@ibm.com> | 2022-03-14 16:17:27 -0500 |
commit | 5ed8d54bd834a43a41eba11524171217f141e0cb (patch) | |
tree | 59afc77d2ab8d11095c18875c1b13d8bf655f742 | |
parent | 79577d8adeca8577a4709ea51cc9bbd3e7bb331e (diff) | |
download | couchdb-5ed8d54bd834a43a41eba11524171217f141e0cb.tar.gz |
Add smoosh queue persistence
-rwxr-xr-x | configure | 1 | ||||
-rwxr-xr-x | dev/run | 3 | ||||
-rw-r--r-- | rel/overlay/etc/default.ini | 3 | ||||
-rw-r--r-- | rel/plugins/eunit_plugin.erl | 4 | ||||
-rw-r--r-- | setup_eunit.template | 3 | ||||
-rw-r--r-- | src/couch/src/couch_bt_engine.erl | 10 | ||||
-rw-r--r-- | src/couch/src/couch_db.erl | 4 | ||||
-rw-r--r-- | src/couch/src/couch_db_engine.erl | 4 | ||||
-rw-r--r-- | src/couch/src/couch_server.erl | 9 | ||||
-rw-r--r-- | src/smoosh/operator_guide.md | 20 | ||||
-rw-r--r-- | src/smoosh/src/smoosh.app.src | 33 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_channel.erl | 254 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_priority_queue.erl | 122 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_server.erl | 24 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_utils.erl | 31 | ||||
-rw-r--r-- | src/smoosh/test/smoosh_priority_queue_tests.erl | 167 | ||||
-rw-r--r-- | src/smoosh/test/smoosh_tests.erl | 130 |
17 files changed, 733 insertions, 89 deletions
@@ -250,6 +250,7 @@ cat > rel/couchdb.config << EOF {prefix, "."}. {data_dir, "./data"}. {view_index_dir, "./data"}. +{state_dir, "./data"}. {log_file, "$LOG_FILE"}. {fauxton_root, "./share/www"}. {user, "$COUCHDB_USER"}. @@ -299,6 +299,9 @@ def setup_configs(ctx): "view_index_dir": toposixpath( ensure_dir_exists(ctx["devdir"], "lib", node, "data") ), + "state_dir": toposixpath( + ensure_dir_exists(ctx["devdir"], "lib", node, "data") + ), "node_name": "-name %s@127.0.0.1" % node, "cluster_port": cluster_port, "backend_port": backend_port, diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 6b64c6d74..5def66caa 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -643,6 +643,9 @@ partitioned||* = true ;[smoosh.slack_views] ;priority = slack ;min_priority = 536870912 +; +; Directory to store the state of smoosh +state_dir = {{state_dir}} [ioq] ; The maximum number of concurrent in-flight IO requests that diff --git a/rel/plugins/eunit_plugin.erl b/rel/plugins/eunit_plugin.erl index 69003aba6..8f298db5f 100644 --- a/rel/plugins/eunit_plugin.erl +++ b/rel/plugins/eunit_plugin.erl @@ -29,12 +29,14 @@ build_eunit_config(Config0, AppFile) -> Cwd = filename:absname(rebar_utils:get_cwd()), DataDir = Cwd ++ "/tmp/data", ViewIndexDir = Cwd ++ "/tmp/data", + StateDir = Cwd ++ "/tmp/data", TmpDataDir = Cwd ++ "/tmp/tmp_data", cleanup_dirs([DataDir, TmpDataDir]), Config1 = rebar_config:set_global(Config0, template, "setup_eunit"), Config2 = rebar_config:set_global(Config1, prefix, Cwd), Config3 = rebar_config:set_global(Config2, data_dir, DataDir), - Config = rebar_config:set_global(Config3, view_index_dir, ViewIndexDir), + Config4 = rebar_config:set_global(Config3, view_index_dir, ViewIndexDir), + Config = rebar_config:set_global(Config4, state_dir, StateDir), rebar_templater:create(Config, AppFile). cleanup_dirs(Dirs) -> diff --git a/setup_eunit.template b/setup_eunit.template index 3625441bd..ceef60d12 100644 --- a/setup_eunit.template +++ b/setup_eunit.template @@ -7,7 +7,8 @@ {data_dir, "/tmp"}, {prefix, "/tmp"}, - {view_index_dir, "/tmp"} + {view_index_dir, "/tmp"}, + {state_dir, "/tmp"} ]}. {dir, "tmp"}. {dir, "tmp/etc"}. diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 7d2390556..42dbed23e 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -19,6 +19,8 @@ delete/3, delete_compaction_files/3, + is_compacting/1, + init/2, terminate/2, handle_db_updater_call/2, @@ -139,6 +141,14 @@ delete_compaction_files(RootDir, FilePath, DelOpts) -> [".compact", ".compact.data", ".compact.meta"] ). +is_compacting(DbName) -> + lists:any( + fun(Ext) -> + filelib:is_regular(DbName ++ Ext) + end, + [".compact", ".compact.data", ".compact.meta"] + ). + init(FilePath, Options) -> {ok, Fd} = open_db_file(FilePath, Options), Header = diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index fa003e16d..cafbb0dc9 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -120,6 +120,7 @@ cancel_compact/1, wait_for_compaction/1, wait_for_compaction/2, + is_compacting/1, dbname_suffix/1, normalize_dbname/1, @@ -277,6 +278,9 @@ wait_for_compaction(#db{main_pid = Pid} = Db, Timeout) -> ok end. +is_compacting(DbName) -> + couch_server:is_compacting(DbName). + delete_doc(Db, Id, Revisions) -> DeletedDocs = [#doc{id = Id, revs = [Rev], deleted = true} || Rev <- Revisions], {ok, [Result]} = update_docs(Db, DeletedDocs, []), diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index de4a42495..9e46b816b 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -662,6 +662,7 @@ exists/2, delete/4, delete_compaction_files/4, + is_compacting/2, init/3, terminate/2, @@ -736,6 +737,9 @@ delete_compaction_files(Engine, RootDir, DbPath, DelOpts) when -> Engine:delete_compaction_files(RootDir, DbPath, DelOpts). +is_compacting(Engine, DbName) -> + Engine:is_compacting(DbName). + init(Engine, DbPath, Options) -> case Engine:init(DbPath, Options) of {ok, EngineState} -> diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index 5e295381d..74217894c 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -22,6 +22,7 @@ -export([dev_start/0, is_admin/2, has_admins/0, get_stats/0]). -export([close_db_if_idle/1]). -export([delete_compaction_files/1]). +-export([is_compacting/1]). -export([exists/1]). -export([get_engine_extensions/0]). -export([get_engine_path/2]). @@ -183,6 +184,14 @@ delete_compaction_files(DbName, DelOpts) when is_list(DbName) -> delete_compaction_files(DbName, DelOpts) when is_binary(DbName) -> delete_compaction_files(?b2l(DbName), DelOpts). +is_compacting(DbName) -> + lists:any( + fun({_, Engine}) -> + couch_db_engine:is_compacting(Engine, DbName) + end, + get_configured_engines() + ). + maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) -> maybe_add_sys_db_callbacks(?b2l(DbName), Options); maybe_add_sys_db_callbacks(DbName, Options) -> diff --git a/src/smoosh/operator_guide.md b/src/smoosh/operator_guide.md index afe08f8e7..3dd8cdce5 100644 --- a/src/smoosh/operator_guide.md +++ b/src/smoosh/operator_guide.md @@ -107,7 +107,27 @@ However, it's the best measure we currently have. [Even more info](https://github.com/apache/couchdb-smoosh#notes-on-the-data_size-value). +#### State diagram +``` +stateDiagram + [*] --> init + init --> start_recovery: send_after(?START_DELAY_IN_MSEC, self(), start_recovery) + note right of start_recovery + activated = false + paused = true + end note + start_recovery --> activate: send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate) + note right of activate + state has been recovered + activated = true + paused = true + end note + activate --> schedule_unpause + schedule_unpause --> [*]: after 30 sec, paused = false and compaction of new jobs begin +``` + +See [here](https://mermaid.ink/img/pako:eNqNUtFKwzAU_ZVLnjbpQPCtoFK2PgzUB1sEsVKy5LaNtklJ08kY-3eTtKVsTDBPybnnnntOkiNhiiMJSWeowY2gpaZNJsGuj5tPWK0eQEhhBsTtPGTJ2uQamdqjPoTQoeQ5LQzqxWOSRq9pvomfovd8-5I_J_E6sIS6WCyDi8blICuVQdCirAyo4oIyMNyizIi99cjhHgpadziXWtp3Hje6H2FryOsOp3NNH2GSOzcfrdPtW5TGf_mfuq46n4qzMX-pUNEOdogSRgPIr6ea3f8r1NQ6vAirkPc15r30jWPuC9RT7buG4PPC3a1NxoJ5lr9YoHYOU03rJijpokn8gS-1czlKIUlAGtQNFdx-nKMblRFTYYMZCe2WU_2dkUyeLK9vubUYc2GUJqGXDwjtjUoOkpHQZZtI4-cbWadfC0TavA) for a diagram of smoosh's initial state during the recovery process. ### Defining a channel Defining a channel is done via normal dbcore configuration, with some diff --git a/src/smoosh/src/smoosh.app.src b/src/smoosh/src/smoosh.app.src index a6cdb7f5e..4549c6610 100644 --- a/src/smoosh/src/smoosh.app.src +++ b/src/smoosh/src/smoosh.app.src @@ -10,20 +10,19 @@ % License for the specific language governing permissions and limitations under % the License. -{application, smoosh, - [ - {description, "Auto-compaction daemon"}, - {vsn, git}, - {registered, [smoosh_server]}, - {applications, [ - kernel, - stdlib, - couch_log, - config, - couch_event, - couch, - mem3 - ]}, - {mod, { smoosh_app, []}}, - {env, []} - ]}. +{application, smoosh, [ + {description, "Auto-compaction daemon"}, + {vsn, git}, + {registered, [smoosh_server]}, + {applications, [ + kernel, + stdlib, + couch_log, + config, + couch_event, + couch, + mem3 + ]}, + {mod, {smoosh_app, []}}, + {env, []} +]}. diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl index 06849ac3a..064f18afc 100644 --- a/src/smoosh/src/smoosh_channel.erl +++ b/src/smoosh/src/smoosh_channel.erl @@ -16,8 +16,8 @@ -include_lib("couch/include/couch_db.hrl"). % public api. --export([start_link/1, close/1, suspend/1, resume/1, get_status/1]). --export([enqueue/3, last_updated/2, flush/1]). +-export([start_link/1, close/1, suspend/1, resume/1, activate/1, get_status/1]). +-export([enqueue/3, last_updated/2, flush/1, is_key/2, is_activated/1, persist/1]). % gen_server api. -export([ @@ -25,18 +25,38 @@ handle_call/3, handle_cast/2, handle_info/2, - code_change/3, terminate/2 ]). +-define(VSN, 1). +-define(CHECKPOINT_INTERVAL_IN_MSEC, 180000). + +-ifndef(TEST). +-define(START_DELAY_IN_MSEC, 60000). +-define(ACTIVATE_DELAY_IN_MSEC, 30000). +-else. +-define(START_DELAY_IN_MSEC, 0). +-define(ACTIVATE_DELAY_IN_MSEC, 0). +-endif. + % records. +% When the state is set to activated = true, the channel has completed the state +% recovery process that occurs on (re)start and is accepting new compaction jobs. +% Note: if activated = false and a request for a new compaction job is received, +% smoosh will enqueue this new job after the state recovery process has finished. +% When the state is set to paused = false, the channel is actively compacting any +% compaction jobs that are scheduled. +% See operator_guide.md --> State diagram. + -record(state, { active = [], name, - waiting = smoosh_priority_queue:new(), + waiting, paused = true, - starting = [] + starting = [], + activated = false, + requests = [] }). % public functions. @@ -48,7 +68,10 @@ suspend(ServerRef) -> gen_server:call(ServerRef, suspend). resume(ServerRef) -> - gen_server:call(ServerRef, resume). + gen_server:call(ServerRef, resume_and_activate). + +activate(ServerRef) -> + gen_server:call(ServerRef, activate). enqueue(ServerRef, Object, Priority) -> gen_server:cast(ServerRef, {enqueue, Object, Priority}). @@ -65,32 +88,42 @@ close(ServerRef) -> flush(ServerRef) -> gen_server:call(ServerRef, flush). +is_key(ServerRef, Key) -> + gen_server:call(ServerRef, {is_key, Key}). + +is_activated(ServerRef) -> + gen_server:call(ServerRef, is_activated). + +persist(ServerRef) -> + gen_server:call(ServerRef, persist). + % gen_server functions. init(Name) -> - schedule_unpause(), erlang:send_after(60 * 1000, self(), check_window), - {ok, #state{name = Name}}. + process_flag(trap_exit, true), + Waiting = smoosh_priority_queue:new(Name), + State = #state{name = Name, waiting = Waiting, paused = true, activated = false}, + erlang:send_after(?START_DELAY_IN_MSEC, self(), start_recovery), + {ok, State}. -handle_call({last_updated, Object}, _From, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_call({last_updated, Object}, _From, State) -> LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting), {reply, LastUpdated, State}; -handle_call(suspend, _From, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_call(suspend, _From, State) -> #state{active = Active} = State, [ catch erlang:suspend_process(Pid, [unless_suspending]) || {_, Pid} <- Active ], {reply, ok, State#state{paused = true}}; -handle_call(resume, _From, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_call(resume_and_activate, _From, State) -> #state{active = Active} = State, [catch erlang:resume_process(Pid) || {_, Pid} <- Active], - {reply, ok, State#state{paused = false}}; -handle_call(status, _From, State0) -> - {ok, State} = code_change(nil, State0, nil), + {reply, ok, State#state{paused = false, activated = true}}; +handle_call(activate, _From, State) -> + {reply, ok, State#state{activated = true}}; +handle_call(status, _From, State) -> {reply, {ok, [ {active, length(State#state.active)}, @@ -98,27 +131,38 @@ handle_call(status, _From, State0) -> {waiting, smoosh_priority_queue:info(State#state.waiting)} ]}, State}; -handle_call(close, _From, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_call(close, _From, State) -> {stop, normal, ok, State}; -handle_call(flush, _From, State0) -> - {ok, State} = code_change(nil, State0, nil), - {reply, ok, State#state{waiting = smoosh_priority_queue:new()}}. +handle_call(flush, _From, #state{waiting = Q} = State) -> + {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}}; +handle_call({is_key, Key}, _From, #state{waiting = Waiting} = State) -> + {reply, smoosh_priority_queue:is_key(Key, Waiting), State}; +handle_call(is_activated, _From, #state{activated = Activated} = State0) -> + {reply, Activated, State0}; +handle_call(persist, _From, State) -> + persist_queue(State), + {reply, ok, State}. -handle_cast({enqueue, _Object, 0}, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_cast({enqueue, _Object, 0}, #state{} = State) -> {noreply, State}; -handle_cast({enqueue, Object, Priority}, State0) -> - {ok, State} = code_change(nil, State0, nil), - {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}. +handle_cast({enqueue, Object, Priority}, #state{activated = true} = State) -> + {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}; +handle_cast({enqueue, Object, Priority}, #state{activated = false, requests = Requests} = State0) -> + couch_log:notice( + "~p Channel is not activated yet. Adding ~p to requests with priority ~p.", [ + ?MODULE, + Object, + Priority + ] + ), + {noreply, State0#state{requests = [{Object, Priority} | Requests]}}. % We accept noproc here due to possibly having monitored a restarted compaction % pid after it finished. -handle_info({'DOWN', Ref, _, Job, Reason}, State0) when +handle_info({'DOWN', Ref, _, Job, Reason}, State) when Reason == normal; Reason == noproc -> - {ok, State} = code_change(nil, State0, nil), #state{active = Active, starting = Starting} = State, {noreply, maybe_start_compaction( @@ -127,8 +171,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when starting = lists:keydelete(Ref, 1, Starting) } )}; -handle_info({'DOWN', Ref, _, Job, Reason}, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_info({'DOWN', Ref, _, Job, Reason}, State) -> #state{active = Active0, starting = Starting0} = State, case lists:keytake(Job, 2, Active0) of {value, {Key, _Pid}, Active1} -> @@ -142,7 +185,8 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) -> case lists:keytake(Ref, 1, Starting0) of {value, {_, Key}, Starting1} -> couch_log:warning("failed to start compaction of ~p: ~p", [ - smoosh_utils:stringify(Key), Reason + smoosh_utils:stringify(Key), + Reason ]), {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]), {noreply, maybe_start_compaction(State#state{starting = Starting1})}; @@ -150,8 +194,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) -> {noreply, State} end end; -handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) -> - {ok, State} = code_change(nil, State0, nil), +handle_info({Ref, {ok, Pid}}, State) when is_reference(Ref) -> case lists:keytake(Ref, 1, State#state.starting) of {value, {_, Key}, Starting1} -> couch_log:notice( @@ -167,8 +210,7 @@ handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) -> false -> {noreply, State} end; -handle_info(check_window, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_info(check_window, State) -> #state{paused = Paused, name = Name} = State, StrictWindow = smoosh_utils:get(Name, "strict_window", "false"), FinalState = @@ -194,18 +236,113 @@ handle_info(check_window, State0) -> end, erlang:send_after(60 * 1000, self(), check_window), {noreply, FinalState}; -handle_info(pause, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) -> + RecActive = recover(active_file_name(Name)), + Waiting1 = lists:foldl( + fun(DbName, Acc) -> + case couch_db:is_compacting(DbName) of + true -> + Priority = smoosh_server:get_priority(Name, DbName), + smoosh_priority_queue:in(DbName, Priority, Priority, Acc); + false -> + Acc + end + end, + Waiting0, + RecActive + ), + State1 = maybe_start_compaction(State0#state{paused = false, waiting = Waiting1}), + couch_log:notice( + "~p Previously active compaction jobs (if any) have been successfully recovered and restarted.", + [?MODULE] + ), + erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate), + {noreply, State1#state{paused = true}}; +handle_info(activate, State) -> + {noreply, activate_channel(State)}; +handle_info(persist, State) -> + persist_queue(State), + erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist), + {noreply, State}; +handle_info(pause, State) -> {noreply, State#state{paused = true}}; -handle_info(unpause, State0) -> - {ok, State} = code_change(nil, State0, nil), +handle_info(unpause, State) -> {noreply, maybe_start_compaction(State#state{paused = false})}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, #state{} = State, _Extra) -> - {ok, State}. +persist_queue(State) -> + write_state_to_file(State). + +recover(FilePath) -> + case do_recover(FilePath) of + {ok, List} -> + List; + error -> + [] + end. + +do_recover(FilePath) -> + case file:read_file(FilePath) of + {ok, Content} -> + <<Vsn, Binary/binary>> = Content, + try parse_state(Vsn, ?VSN, Binary) of + Term -> + couch_log:notice( + "~p Successfully restored state file ~s", [?MODULE, FilePath] + ), + {ok, Term} + catch + error:Reason -> + couch_log:error( + "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath] + ), + file:delete(FilePath), + error + end; + {error, enoent} -> + couch_log:notice( + "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath] + ), + error; + {error, Reason} -> + couch_log:error( + "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath] + ), + file:delete(FilePath), + error + end. + +parse_state(1, ?VSN, Binary) -> + erlang:binary_to_term(Binary, [safe]); +parse_state(Vsn, ?VSN, _) -> + error({unsupported_version, Vsn}). + +write_state_to_file(#state{name = Name, active = Active, starting = Starting, waiting = Waiting}) -> + Active1 = lists:foldl( + fun({DbName, _}, Acc) -> + [DbName | Acc] + end, + [], + Active + ), + Starting1 = lists:foldl( + fun({_, DbName}, Acc) -> + [DbName | Acc] + end, + [], + Starting + ), + smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN), + smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN), + smoosh_priority_queue:write_to_file(Waiting). + +active_file_name(Name) -> + filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active"). + +starting_file_name(Name) -> + filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting"). % private functions. @@ -225,6 +362,37 @@ add_to_queue(Key, Priority, State) -> } end. +maybe_activate(#state{activated = true} = State) -> + State; +maybe_activate(State) -> + activate_channel(State). + +activate_channel(#state{name = Name, waiting = Waiting0, requests = Requests0} = State0) -> + RecStarting = recover(starting_file_name(Name)), + Starting = lists:foldl( + fun(DbName, Acc) -> + Priority = smoosh_server:get_priority(Name, DbName), + smoosh_priority_queue:in(DbName, Priority, Priority, Acc) + end, + Waiting0, + RecStarting + ), + Waiting1 = smoosh_priority_queue:recover(Starting), + Requests1 = lists:reverse(Requests0), + Waiting2 = lists:foldl( + fun({DbName, Priority}, Acc) -> + smoosh_priority_queue:in(DbName, Priority, Priority, Acc) + end, + Waiting1, + Requests1 + ), + State1 = maybe_start_compaction(State0#state{ + waiting = Waiting2, paused = false, activated = true, requests = [] + }), + handle_info(persist, State1), + schedule_unpause(), + State1#state{paused = true}. + maybe_start_compaction(#state{paused = true} = State) -> State; maybe_start_compaction(State) -> @@ -239,7 +407,7 @@ maybe_start_compaction(State) -> length(State#state.active) + length(State#state.starting) < Concurrency -> case smoosh_priority_queue:out(State#state.waiting) of false -> - State; + maybe_activate(State); {Key, Priority, Q} -> try State2 = diff --git a/src/smoosh/src/smoosh_priority_queue.erl b/src/smoosh/src/smoosh_priority_queue.erl index b6f4b6dd8..83c93663c 100644 --- a/src/smoosh/src/smoosh_priority_queue.erl +++ b/src/smoosh/src/smoosh_priority_queue.erl @@ -12,33 +12,68 @@ -module(smoosh_priority_queue). --export([new/0, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]). +-export([new/1, recover/1]). + +-export([last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]). + +-export([flush/1]). + +-export([from_list/2, to_list/1]). + +-export([is_empty/1]). + +-export([file_name/1, write_to_file/1]). + +-define(VSN, 1). -record(priority_queue, { - dict = dict:new(), - tree = gb_trees:empty() + name, + map, + tree }). -new() -> - #priority_queue{}. +new(Name) -> + #priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}. + +recover(#priority_queue{name = Name, map = Map0} = Q) -> + case do_recover(file_name(Q)) of + {ok, Terms} -> + Map = maps:merge(Map0, Terms), + Tree = maps:fold( + fun(Key, {TreeKey, Value}, TreeAcc) -> + gb_trees:enter(TreeKey, {Key, Value}, TreeAcc) + end, + gb_trees:empty(), + Map + ), + #priority_queue{name = Name, map = Map, tree = Tree}; + error -> + Q + end. + +write_to_file(#priority_queue{map = Map} = Q) -> + smoosh_utils:write_to_file(Map, file_name(Q), ?VSN). + +flush(#priority_queue{name = Name} = Q) -> + Q#priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}. -last_updated(Key, #priority_queue{dict = Dict}) -> - case dict:find(Key, Dict) of +last_updated(Key, #priority_queue{map = Map}) -> + case maps:find(Key, Map) of {ok, {_Priority, {LastUpdatedMTime, _MInt}}} -> LastUpdatedMTime; error -> false end. -is_key(Key, #priority_queue{dict = Dict}) -> - dict:is_key(Key, Dict). +is_key(Key, #priority_queue{map = Map}) -> + maps:is_key(Key, Map). in(Key, Value, Priority, Q) -> in(Key, Value, Priority, infinity, Q). -in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) -> +in(Key, Value, Priority, Capacity, #priority_queue{name = Name, map = Map, tree = Tree}) -> Tree1 = - case dict:find(Key, Dict) of + case maps:find(Key, Map) of {ok, TreeKey} -> gb_trees:delete_any(TreeKey, Tree); error -> @@ -47,17 +82,18 @@ in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) -> Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])}, TreeKey1 = {Priority, Now}, Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1), - Dict1 = dict:store(Key, TreeKey1, Dict), - truncate(Capacity, #priority_queue{dict = Dict1, tree = Tree2}). + Map1 = maps:put(Key, TreeKey1, Map), + truncate(Capacity, #priority_queue{name = Name, map = Map1, tree = Tree2}). -out(#priority_queue{dict = Dict, tree = Tree}) -> +out(#priority_queue{name = Name, map = Map, tree = Tree}) -> case gb_trees:is_empty(Tree) of true -> false; false -> {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree), - Dict1 = dict:erase(Key, Dict), - {Key, Value, #priority_queue{dict = Dict1, tree = Tree1}} + Map1 = maps:remove(Key, Map), + Q = #priority_queue{name = Name, map = Map1, tree = Tree1}, + {Key, Value, Q} end. size(#priority_queue{tree = Tree}) -> @@ -76,6 +112,20 @@ info(#priority_queue{tree = Tree} = Q) -> end ]. +from_list(Orddict, #priority_queue{name = Name}) -> + Map = maps:from_list(Orddict), + Tree = gb_trees:from_orddict(Orddict), + #priority_queue{name = Name, map = Map, tree = Tree}. + +to_list(#priority_queue{tree = Tree}) -> + gb_trees:to_list(Tree). + +is_empty(#priority_queue{tree = Tree}) -> + gb_trees:is_empty(Tree). + +file_name(#priority_queue{name = Name}) -> + filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".waiting"). + truncate(infinity, Q) -> Q; truncate(Capacity, Q) when Capacity > 0 -> @@ -83,7 +133,43 @@ truncate(Capacity, Q) when Capacity > 0 -> truncate(Capacity, Size, Q) when Size =< Capacity -> Q; -truncate(Capacity, Size, #priority_queue{dict = Dict, tree = Tree}) when Size > 0 -> +truncate(Capacity, Size, #priority_queue{name = Name, map = Map, tree = Tree}) when Size > 0 -> {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree), - Q1 = #priority_queue{dict = dict:erase(Key, Dict), tree = Tree1}, + Q1 = #priority_queue{name = Name, map = maps:remove(Key, Map), tree = Tree1}, truncate(Capacity, ?MODULE:size(Q1), Q1). + +do_recover(FilePath) -> + case file:read_file(FilePath) of + {ok, Content} -> + <<Vsn, Binary/binary>> = Content, + try parse_queue(Vsn, ?VSN, Binary) of + Bin -> + couch_log:notice( + "~p Successfully restored state file ~s", [?MODULE, FilePath] + ), + {ok, Bin} + catch + error:Reason -> + couch_log:error( + "~p Invalid queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath] + ), + file:delete(FilePath), + error + end; + {error, enoent} -> + couch_log:notice( + "~p (~p) Queue file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath] + ), + error; + {error, Reason} -> + couch_log:error( + "~p Cannot read the queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath] + ), + file:delete(FilePath), + error + end. + +parse_queue(1, ?VSN, Binary) -> + erlang:binary_to_term(Binary, [safe]); +parse_queue(Vsn, ?VSN, _) -> + error({unsupported_version, Vsn}). diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl index 5529e93de..e9823c36f 100644 --- a/src/smoosh/src/smoosh_server.erl +++ b/src/smoosh/src/smoosh_server.erl @@ -45,6 +45,10 @@ % exported but for internal use. -export([enqueue_request/2]). +-export([get_priority/2]). + +% exported for testing and debugging +-export([get_channel/1]). -ifdef(TEST). -define(RELISTEN_DELAY, 50). @@ -60,7 +64,7 @@ schema_channels = [], tab, event_listener, - waiting = dict:new() + waiting = maps:new() }). -record(channel, { @@ -109,6 +113,10 @@ handle_db_event(DbName, {schema_updated, DDocId}, St) -> handle_db_event(_DbName, _Event, St) -> {ok, St}. +% for testing and debugging only +get_channel(ChannelName) -> + gen_server:call(?MODULE, {get_channel, ChannelName}). + % gen_server functions. init([]) -> @@ -181,7 +189,9 @@ handle_call(resume, _From, State) -> 0, State#state.tab ), - {reply, ok, State}. + {reply, ok, State}; +handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) -> + {reply, {ok, channel_pid(Tab, ChannelName)}, State}. handle_cast({new_db_channels, Channels}, State) -> [ @@ -203,12 +213,12 @@ handle_cast({new_schema_channels, Channels}, State) -> {noreply, create_missing_channels(State#state{view_channels = Channels})}; handle_cast({enqueue, Object}, State) -> #state{waiting = Waiting} = State, - case dict:is_key(Object, Waiting) of + case maps:is_key(Object, Waiting) of true -> {noreply, State}; false -> {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]), - {noreply, State#state{waiting = dict:store(Object, Ref, Waiting)}} + {noreply, State#state{waiting = maps:put(Object, Ref, Waiting)}} end. handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) -> @@ -225,7 +235,7 @@ handle_info({'EXIT', Pid, Reason}, State) -> end, {noreply, create_missing_channels(State)}; handle_info({'DOWN', Ref, _, _, _}, State) -> - Waiting = dict:filter( + Waiting = maps:filter( fun(_Key, Value) -> Value =/= Ref end, State#state.waiting ), @@ -306,7 +316,9 @@ find_channel(Tab, [Channel | Rest], Object) -> ?SECONDS_PER_MINUTE, Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native), Now = erlang:monotonic_time(), - case LastUpdated =:= false orelse Now - LastUpdated > Staleness of + Activated = smoosh_channel:is_activated(Pid), + StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness, + case Activated andalso StaleEnough of true -> case smoosh_utils:ignore_db(Object) of true -> diff --git a/src/smoosh/src/smoosh_utils.erl b/src/smoosh/src/smoosh_utils.erl index 882d3ec84..625a9e4e0 100644 --- a/src/smoosh/src/smoosh_utils.erl +++ b/src/smoosh/src/smoosh_utils.erl @@ -14,9 +14,7 @@ -include_lib("couch/include/couch_db.hrl"). -export([get/2, get/3, group_pid/1, split/1, stringify/1, ignore_db/1]). --export([ - in_allowed_window/1 -]). +-export([in_allowed_window/1, write_to_file/3]). group_pid({Shard, GroupId}) -> case couch_view_group:open_db_group(Shard, GroupId) of @@ -75,6 +73,33 @@ in_allowed_window(From, To) -> ({HH, MM} >= From) orelse ({HH, MM} < To) end. +file_delete(Path) -> + case file:delete(Path) of + Ret when Ret =:= ok; Ret =:= {error, enoent} -> + ok; + Error -> + Error + end. + +throw_on_error(_Args, ok) -> + ok; +throw_on_error(Args, {error, Reason}) -> + throw({error, {Reason, Args}}). + +write_to_file(Content, FileName, VSN) -> + couch_log:notice("~p Writing state to state file ~s", [?MODULE, FileName]), + OnDisk = <<VSN, (erlang:term_to_binary(Content, [compressed, {minor_version, 1}]))/binary>>, + TmpFileName = FileName ++ ".tmp", + try + throw_on_error(TmpFileName, file_delete(TmpFileName)), + throw_on_error(TmpFileName, file:write_file(TmpFileName, OnDisk, [sync])), + throw_on_error(FileName, file_delete(FileName)), + throw_on_error([TmpFileName, FileName], file:rename(TmpFileName, FileName)) + catch + throw:Error -> + Error + end. + parse_time(undefined, Default) -> Default; parse_time(String, Default) -> diff --git a/src/smoosh/test/smoosh_priority_queue_tests.erl b/src/smoosh/test/smoosh_priority_queue_tests.erl new file mode 100644 index 000000000..289804ca5 --- /dev/null +++ b/src/smoosh/test/smoosh_priority_queue_tests.erl @@ -0,0 +1,167 @@ +-module(smoosh_priority_queue_tests). + +-include_lib("proper/include/proper.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). + +-define(PROP_PREFIX, "prop_"). + +-define(CAPACITY, 3). + +-define(RANDOM_CHANNEL, lists:flatten(io_lib:format("~p", [erlang:timestamp()]))). + +setup() -> + Ctx = test_util:start_couch(), + Ctx. + +teardown(Ctx) -> + test_util:stop_couch(Ctx). + +smoosh_priority_queue_test_() -> + { + "smoosh priority queue test", + { + setup, + fun setup/0, + fun teardown/1, + [ + fun prop_inverse_test_/0, + fun no_halt_on_corrupted_file_test/0, + fun no_halt_on_missing_file_test/0 + ] + } + }. + +%% ========== +%% Tests +%% ---------- + +%% define all tests to be able to run them individually +prop_inverse_test_() -> + ?_test(begin + test_property(prop_inverse) + end). + +no_halt_on_corrupted_file_test() -> + ?_test(begin + Name = ?RANDOM_CHANNEL, + Q = smoosh_priority_queue:new(Name), + FilePath = smoosh_priority_queue:file_name(Q), + ok = file:write_file(FilePath, <<"garbage">>), + ?assertEqual(Q, smoosh_priority_queue:recover(Q)), + ok + end). + +no_halt_on_missing_file_test() -> + ?_test(begin + Name = ?RANDOM_CHANNEL, + Q = smoosh_priority_queue:new(Name), + FilePath = smoosh_priority_queue:file_name(Q), + ok = file:delete(FilePath), + ?assertEqual(Q, smoosh_priority_queue:recover(Q)), + ok + end). + +%% ========== +%% Properties +%% ---------- + +prop_inverse() -> + ?FORALL( + Q, + queue(), + begin + List = smoosh_priority_queue:to_list(Q), + equal(Q, smoosh_priority_queue:from_list(List, Q)) + end + ). + +%% ========== +%% Generators +%% ---------- + +key() -> + proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]). +value() -> + proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]). +priority() -> integer(). +item() -> {key(), value(), priority()}. + +items_list() -> + ?LET(L, list(item()), L). + +simple_queue() -> + ?LET( + L, + items_list(), + from_list(L) + ). + +with_deleted() -> + ?LET( + Q, + ?LET( + {{K0, V0, P0}, Q0}, + {item(), simple_queue()}, + smoosh_priority_queue:in(K0, V0, P0, ?CAPACITY, Q0) + ), + frequency([ + {1, Q}, + {2, element(3, smoosh_priority_queue:out(Q))} + ]) + ). + +queue() -> + with_deleted(). + +%% ========================== +%% Proper related boilerplate +%% -------------------------- + +test_property(Property) when is_atom(Property) -> + test_property({atom_to_list(Property), Property}); +test_property({Id, Property}) -> + Name = string:sub_string(Id, length(?PROP_PREFIX) + 1), + Opts = [long_result, {numtests, 1000}, {to_file, user}], + {Name, {timeout, 60, fun() -> test_it(Property, Opts) end}}. + +test_it(Property, Opts) -> + case proper:quickcheck(?MODULE:Property(), Opts) of + true -> + true; + Else -> + erlang:error( + {propertyFailed, [ + {module, ?MODULE}, + {property, Property}, + {result, Else} + ]} + ) + end. + +%% ================ +%% Helper functions +%% ---------------- + +new() -> + Q = smoosh_priority_queue:new("foo"), + smoosh_priority_queue:recover(Q). + +from_list(List) -> + lists:foldl( + fun({Key, Value, Priority}, Queue) -> + smoosh_priority_queue:in(Key, Value, Priority, ?CAPACITY, Queue) + end, + new(), + List + ). + +equal(Q1, Q2) -> + out_all(Q1) =:= out_all(Q2). + +out_all(Q) -> + out_all(Q, []). +out_all(Q0, Acc) -> + case smoosh_priority_queue:out(Q0) of + {K, V, Q1} -> out_all(Q1, [{K, V} | Acc]); + false -> lists:reverse(Acc) + end. diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl new file mode 100644 index 000000000..13855f616 --- /dev/null +++ b/src/smoosh/test/smoosh_tests.erl @@ -0,0 +1,130 @@ +-module(smoosh_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-include("couch/src/couch_db_int.hrl"). + +-define(KILOBYTE, binary:copy(<<"x">>, 1024)). + +%% ========== +%% Setup +%% ---------- + +setup(ChannelType) -> + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + couch_db:close(Db), + {ok, ChannelPid} = smoosh_server:get_channel(ChannelType), + smoosh_channel:flush(ChannelPid), + ok = config:set(config_section(ChannelType), "min_size", "200000", false), + DbName. + +teardown(ChannelType, DbName) -> + ok = couch_server:delete(DbName, [?ADMIN_CTX]), + ok = config:delete(config_section(DbName), "min_size", false), + {ok, ChannelPid} = smoosh_server:get_channel(ChannelType), + smoosh_channel:flush(ChannelPid), + ok. + +config_section(ChannelType) -> + "smoosh." ++ ChannelType. + +%% ========== +%% Tests +%% ---------- + +smoosh_test_() -> + { + "Testing smoosh", + { + setup, + fun() -> test_util:start_couch([smoosh]) end, + fun test_util:stop/1, + [ + channels_tests(), + persistence_tests() + ] + } + }. + +persistence_tests() -> + Tests = [ + fun should_persist_queue/2 + ], + { + "Should persist queue state", + [ + make_test_case("ratio_dbs", Tests) + ] + }. + +channels_tests() -> + Tests = [ + fun should_enqueue/2 + ], + { + "Various channels tests", + [ + make_test_case("ratio_dbs", Tests) + ] + }. + +make_test_case(Type, Funs) -> + {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}. + +should_enqueue(ChannelType, DbName) -> + ?_test(begin + ok = grow_db_file(DbName, 300), + ok = wait_enqueue(ChannelType, DbName), + ?assert(is_enqueued(ChannelType, DbName)), + ok + end). + +should_persist_queue(ChannelType, DbName) -> + ?_test(begin + {ok, ChannelPid} = smoosh_server:get_channel(ChannelType), + ok = grow_db_file(DbName, 300), + ok = wait_enqueue(ChannelType, DbName), + ok = smoosh_channel:persist(ChannelPid), + Q0 = channel_queue(ChannelType), + ok = application:stop(smoosh), + ok = application:start(smoosh), + Q1 = channel_queue(ChannelType), + ?assertEqual(Q0, Q1), + ok + end). + +grow_db_file(DbName, SizeInKb) -> + {ok, #db{filepath = FilePath} = Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), + {ok, Fd} = file:open(FilePath, [append]), + Bytes = binary:copy(?KILOBYTE, SizeInKb), + file:write(Fd, Bytes), + ok = file:close(Fd), + Doc = couch_doc:from_json_obj( + {[ + {<<"_id">>, ?l2b(?docid())}, + {<<"value">>, ?l2b(?docid())} + ]} + ), + {ok, _} = couch_db:update_docs(Db, [Doc], []), + couch_db:close(Db), + ok. + +is_enqueued(ChannelType, DbName) -> + {ok, ChannelPid} = smoosh_server:get_channel(ChannelType), + smoosh_channel:is_key(ChannelPid, DbName). + +wait_enqueue(ChannelType, DbName) -> + test_util:wait(fun() -> + case is_enqueued(ChannelType, DbName) of + false -> + wait; + true -> + ok + end + end). + +channel_queue(ChannelType) -> + Q0 = smoosh_priority_queue:new(ChannelType), + smoosh_priority_queue:recover(Q0). |