summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-11-15 21:28:53 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-11-18 17:04:30 -0500
commit111f2616e1ef48fadb43dcc358bb8dabb69ad839 (patch)
tree0462e5bb2a0043e8bb38341ce4334076f0f916bb
parentde05ea9f624d28ae99be9dba793f2e614c6f3658 (diff)
downloadcouchdb-111f2616e1ef48fadb43dcc358bb8dabb69ad839.tar.gz
Optimize smoosh
Clean up, optimize and increase test coverage for smoosh. * Use the new, simpler persistence module. Remove the extra `activated vs non-activated` states. Use `handle_continue(...)` gen_server callback to avoid blocking smoosh application initialization. Then, let channel unpersist in their init function which happens outside general application initialization. * Add an index cleanup channel and enqueue index cleanup jobs by default. * Remove gen_server bottlenecks for status and last update calls. Instead rely on ets table lookups and gen_casts only. * Add a few more `try ... catch`'s to avoid crashing the channels. * Use maps to keep track of starting and active jobs in the channel. * Re-check priority again before starting jobs. This is needed when jobs are un-persisted after restart and database may have been compacted or deleted already. * Update periodic channel scheduled checks to have a common scheduling mechanism. * Quantize ratio priorities to keep about a single decimal worth of precision. This should help with churn in the priority queue where the same db is constantly added and removed for small insignificant ratio changes like 2.0001 -> 2.0002. This works along a recent optimization in smoosh priority queue module, where if priority matches that item is not removed and re-added, it just stays where it is, reducing CPU and heap memory churn. * Remove testing-only API functions to avoid cluttering the API. * Store messages off-heap for channels and smoosh_server. * Instead of a per-channel `last_updated` gen_server:call(...), use a single access ets table which is periodically cleaned from stale entries. As an optimization, before enqueueing the shard, check last access first. This avoids sending an extra message to smoosh_server. * As a protection mechanism against overload, cap the access table size at 250k entries. Don't allow enqueueing more than that many entries during the configured `[smoosh] staleness = Minutes` period. * Increase test coverage from 60% to 90%
-rw-r--r--src/smoosh/src/smoosh.erl13
-rw-r--r--src/smoosh/src/smoosh_channel.erl815
-rw-r--r--src/smoosh/src/smoosh_server.erl389
-rw-r--r--src/smoosh/test/smoosh_tests.erl596
4 files changed, 1100 insertions, 713 deletions
diff --git a/src/smoosh/src/smoosh.erl b/src/smoosh/src/smoosh.erl
index 950500ffa..68e8d1828 100644
--- a/src/smoosh/src/smoosh.erl
+++ b/src/smoosh/src/smoosh.erl
@@ -16,7 +16,7 @@
-include_lib("mem3/include/mem3.hrl").
-export([suspend/0, resume/0, enqueue/1, status/0]).
--export([enqueue_all_dbs/0, enqueue_all_dbs/1, enqueue_all_views/0]).
+-export([enqueue_all_dbs/0, enqueue_all_views/0]).
suspend() ->
smoosh_server:suspend().
@@ -30,9 +30,6 @@ enqueue(Object) ->
sync_enqueue(Object) ->
smoosh_server:sync_enqueue(Object).
-sync_enqueue(Object, Timeout) ->
- smoosh_server:sync_enqueue(Object, Timeout).
-
status() ->
smoosh_server:status().
@@ -44,14 +41,6 @@ enqueue_all_dbs() ->
ok
).
-enqueue_all_dbs(Timeout) ->
- fold_local_shards(
- fun(#shard{name = Name}, _Acc) ->
- sync_enqueue(Name, Timeout)
- end,
- ok
- ).
-
enqueue_all_views() ->
fold_local_shards(
fun(#shard{name = Name}, _Acc) ->
diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index 50274f704..92fd3413b 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -12,52 +12,52 @@
-module(smoosh_channel).
-behaviour(gen_server).
--vsn(1).
--include_lib("couch/include/couch_db.hrl").
% public api.
--export([start_link/1, close/1, suspend/1, resume/1, activate/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1, is_key/2, is_activated/1]).
+-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
+-export([enqueue/3, flush/1]).
+-export([get_status_table/1]).
% gen_server api.
-export([
init/1,
handle_call/3,
handle_cast/2,
- handle_info/2,
- terminate/2
+ handle_info/2
]).
--define(VSN, 1).
--define(CHECKPOINT_INTERVAL_IN_MSEC, 180000).
+-define(INDEX_CLEANUP, index_cleanup).
+-define(TIME_WINDOW_MSEC, 60 * 1000).
+-define(CHECKPOINT_INTERVAL_MSEC, 180000).
--ifndef(TEST).
--define(START_DELAY_IN_MSEC, 60000).
--define(ACTIVATE_DELAY_IN_MSEC, 30000).
+-ifdef(TEST).
+-define(RE_ENQUEUE_INTERVAL, 50).
+-define(STATUS_UPDATE_INTERVAL_MSEC, 490).
-else.
--define(START_DELAY_IN_MSEC, 0).
--define(ACTIVATE_DELAY_IN_MSEC, 0).
--export([persist/1]).
+-define(RE_ENQUEUE_INTERVAL, 5000).
+-define(STATUS_UPDATE_INTERVAL_MSEC, 4900).
-endif.
-% records.
-
-% When the state is set to activated = true, the channel has completed the state
-% recovery process that occurs on (re)start and is accepting new compaction jobs.
-% Note: if activated = false and a request for a new compaction job is received,
-% smoosh will enqueue this new job after the state recovery process has finished.
-% When the state is set to paused = false, the channel is actively compacting any
-% compaction jobs that are scheduled.
-% See operator_guide.md --> State diagram.
+% If persistence is configured, on startup the channel will try to load its
+% waiting queue from a persisted queue data file. Then, during every
+% CHECKPOINT_INTERVAL_MSEC, it will spawn an checkpointer process to write the
+% persisted queue state to disk.
-record(state, {
- active = [],
+ % Channel name
name,
+ % smoosh_persisted_queue:new() object
waiting,
+ % Paused flag. The channel starts in a the paused state.
paused = true,
- starting = [],
- activated = false,
- requests = []
+ % #{Key => Pid}
+ active = #{},
+ % #{Ref => Key}
+ starting = #{},
+ % Monitor reference of the checkpointer process
+ cref,
+ % ETS status table handle. Used to publish channel status.
+ stab
}).
% public functions.
@@ -69,19 +69,19 @@ suspend(ServerRef) ->
gen_server:call(ServerRef, suspend).
resume(ServerRef) ->
- gen_server:call(ServerRef, resume_and_activate).
-
-activate(ServerRef) ->
- gen_server:call(ServerRef, activate).
+ gen_server:call(ServerRef, resume).
enqueue(ServerRef, Object, Priority) ->
gen_server:cast(ServerRef, {enqueue, Object, Priority}).
-last_updated(ServerRef, Object) ->
- gen_server:call(ServerRef, {last_updated, Object}).
-
-get_status(ServerRef) ->
- gen_server:call(ServerRef, status).
+get_status(StatusTab) when is_reference(StatusTab) ->
+ try ets:lookup(StatusTab, status) of
+ [{status, Status}] -> Status;
+ [] -> []
+ catch
+ error:badarg ->
+ []
+ end.
close(ServerRef) ->
gen_server:call(ServerRef, close).
@@ -89,137 +89,92 @@ close(ServerRef) ->
flush(ServerRef) ->
gen_server:call(ServerRef, flush).
-is_key(ServerRef, Key) ->
- gen_server:call(ServerRef, {is_key, Key}).
-
-is_activated(ServerRef) ->
- gen_server:call(ServerRef, is_activated).
-
-% Only exported to force persistence in tests
-persist(ServerRef) ->
- gen_server:call(ServerRef, persist).
+get_status_table(ServerRef) ->
+ gen_server:call(ServerRef, get_status_table).
% gen_server functions.
init(Name) ->
- erlang:send_after(60 * 1000, self(), check_window),
process_flag(trap_exit, true),
- Waiting = smoosh_priority_queue:new(Name),
- Persist = config:get_boolean("smoosh", "persist", false),
- State =
- case smoosh_utils:is_view_channel(Name) orelse Persist =:= false of
- true ->
- schedule_unpause(),
- #state{name = Name, waiting = Waiting, paused = true, activated = true};
- false ->
- erlang:send_after(?START_DELAY_IN_MSEC, self(), start_recovery),
- #state{name = Name, waiting = Waiting, paused = true, activated = false}
- end,
- {ok, State}.
-
-handle_call({last_updated, Object}, _From, State) ->
- LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
- {reply, LastUpdated, State};
-handle_call(suspend, _From, State) ->
- #state{active = Active} = State,
- [
- catch erlang:suspend_process(Pid, [unless_suspending])
- || {_, Pid} <- Active
- ],
- {reply, ok, State#state{paused = true}};
-handle_call(resume_and_activate, _From, State) ->
- #state{active = Active} = State,
- [catch erlang:resume_process(Pid) || {_, Pid} <- Active],
- {reply, ok, State#state{paused = false, activated = true}};
-handle_call(activate, _From, State) ->
- {reply, ok, State#state{activated = true}};
-handle_call(status, _From, State) ->
- {reply,
- {ok, [
- {active, length(State#state.active)},
- {starting, length(State#state.starting)},
- {waiting, smoosh_priority_queue:info(State#state.waiting)}
- ]},
- State};
+ process_flag(message_queue_data, off_heap),
+ schedule_check_window(),
+ schedule_update_status(),
+ schedule_checkpoint(),
+ schedule_unpause(),
+ STab = ets:new(smoosh_stats, [{read_concurrency, true}]),
+ Waiting = unpersist(Name),
+ State = #state{name = Name, waiting = Waiting, stab = STab},
+ {ok, set_status(State)}.
+
+handle_call(get_status_table, _From, #state{} = State) ->
+ State1 = set_status(State),
+ {reply, {ok, State#state.stab}, State1};
+handle_call(suspend, _From, #state{} = State) ->
+ {reply, ok, do_suspend(State)};
+handle_call(resume, _From, #state{} = State) ->
+ {reply, ok, do_resume(State)};
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(flush, _From, #state{waiting = Q} = State) ->
- {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
-handle_call({is_key, Key}, _From, #state{waiting = Waiting} = State) ->
- {reply, smoosh_priority_queue:is_key(Key, Waiting), State};
-handle_call(is_activated, _From, #state{activated = Activated} = State0) ->
- {reply, Activated, State0};
-handle_call(persist, _From, State) ->
- persist_queue(State),
- {reply, ok, State}.
-
-handle_cast({enqueue, _Object, 0}, #state{} = State) ->
- {noreply, State};
-handle_cast({enqueue, Object, Priority}, #state{activated = true} = State) ->
- {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))};
-handle_cast({enqueue, Object, Priority}, #state{activated = false, requests = Requests} = State0) ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p Channel is not activated yet. Adding ~p to requests with priority ~p.", [
- ?MODULE,
- Object,
- Priority
- ]
- ),
- {noreply, State0#state{requests = [{Object, Priority} | Requests]}}.
+ State1 = State#state{waiting = smoosh_priority_queue:flush(Q)},
+ smoosh_persist:persist(State1#state.waiting, #{}, #{}),
+ State2 = set_status(State1),
+ {reply, ok, State2}.
+handle_cast({enqueue, Object, Priority}, #state{} = State) ->
+ State1 = add_to_queue(Object, Priority, State),
+ {noreply, maybe_start_compaction(State1)}.
+
+handle_info({'DOWN', Ref, _, _, _}, #state{cref = Ref} = State) ->
+ {noreply, State#state{cref = undefined}};
% We accept noproc here due to possibly having monitored a restarted compaction
% pid after it finished.
-handle_info({'DOWN', Ref, _, Job, Reason}, State) when
+handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) when
Reason == normal;
Reason == noproc
->
#state{active = Active, starting = Starting} = State,
- {noreply,
- maybe_start_compaction(
- State#state{
- active = lists:keydelete(Job, 2, Active),
- starting = lists:keydelete(Ref, 1, Starting)
- }
- )};
-handle_info({'DOWN', Ref, _, Job, Reason}, State) ->
- #state{active = Active0, starting = Starting0} = State,
- case lists:keytake(Job, 2, Active0) of
- {value, {Key, _Pid}, Active1} ->
- State1 = maybe_remonitor_cpid(
- State#state{active = Active1},
- Key,
- Reason
- ),
- {noreply, maybe_start_compaction(State1)};
- false ->
- case lists:keytake(Ref, 1, Starting0) of
- {value, {_, Key}, Starting1} ->
- couch_log:warning("failed to start compaction of ~p: ~p", [
- smoosh_utils:stringify(Key),
- Reason
- ]),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
- {noreply, maybe_start_compaction(State#state{starting = Starting1})};
- false ->
+ Active1 = maps:filter(fun(_, Pid) -> Pid =/= Job end, Active),
+ Starting1 = maps:remove(Ref, Starting),
+ State1 = State#state{active = Active1, starting = Starting1},
+ {noreply, maybe_start_compaction(State1)};
+handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) ->
+ #state{name = Name, active = Active, starting = Starting} = State,
+ FoundActive = maps:filter(fun(_, Pid) -> Pid =:= Job end, Active),
+ case maps:to_list(FoundActive) of
+ [{Key, _Pid}] ->
+ Active1 = maps:without([Key], Active),
+ State1 = State#state{active = maps:without([Key], Active1)},
+ State2 = maybe_remonitor_cpid(State1, Key, Reason),
+ {noreply, maybe_start_compaction(State2)};
+ [] ->
+ case maps:take(Ref, Starting) of
+ {Key, Starting1} ->
+ LogMsg = "~s : failed to start compaction of ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(Key), Reason],
+ couch_log:warning(LogMsg, LogArgs),
+ re_enqueue(Key),
+ State1 = State#state{starting = Starting1},
+ {noreply, maybe_start_compaction(State1)};
+ error ->
{noreply, State}
end
end;
-handle_info({Ref, {ok, Pid}}, State) when is_reference(Ref) ->
- case lists:keytake(Ref, 1, State#state.starting) of
- {value, {_, Key}, Starting1} ->
+% This is the '$gen_call' response handling
+handle_info({Ref, {ok, Pid}}, #state{} = State) when is_reference(Ref) ->
+ #state{name = Name, active = Active, starting = Starting} = State,
+ case maps:take(Ref, Starting) of
+ {Key, Starting1} ->
Level = smoosh_utils:log_level("compaction_log_level", "notice"),
- couch_log:Level(
- "~s: Started compaction for ~s",
- [State#state.name, smoosh_utils:stringify(Key)]
- ),
+ LogMsg = "~s: Started compaction for ~s",
+ LogArgs = [Name, smoosh_utils:stringify(Key)],
+ couch_log:Level(LogMsg, LogArgs),
erlang:monitor(process, Pid),
erlang:demonitor(Ref, [flush]),
- {noreply, State#state{
- active = [{Key, Pid} | State#state.active],
- starting = Starting1
- }};
- false ->
+ Active1 = Active#{Key => Pid},
+ State1 = State#state{active = Active1, starting = Starting1},
+ {noreply, set_status(State1)};
+ error ->
{noreply, State}
end;
handle_info(check_window, State) ->
@@ -235,245 +190,146 @@ handle_info(check_window, State) ->
State;
{false, true} ->
% resume is always safe even if we did not previously suspend
- {reply, ok, NewState} = handle_call(resume_and_activate, nil, State),
- NewState;
+ do_resume(State);
+ {true, false} when StrictWindow =:= "true" ->
+ % suspend immediately
+ do_suspend(State);
{true, false} ->
- if
- StrictWindow =:= "true" ->
- {reply, ok, NewState} = handle_call(suspend, nil, State),
- NewState;
- true ->
- State#state{paused = true}
- end
+ % prevent new jobs from starting, active ones keep running
+ State#state{paused = true}
end,
- erlang:send_after(60 * 1000, self(), check_window),
+ schedule_check_window(),
{noreply, FinalState};
-handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) ->
- RecActive = recover(active_file_name(Name)),
- Waiting1 = lists:foldl(
- fun(DbName, Acc) ->
- case couch_db:exists(DbName) andalso couch_db:is_compacting(DbName) of
- true ->
- Priority = smoosh_server:get_priority(Name, DbName),
- smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
- false ->
- Acc
- end
- end,
- Waiting0,
- RecActive
- ),
- State1 = maybe_start_compaction(State0#state{paused = false, waiting = Waiting1}),
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p Previously active compaction jobs (if any) have been successfully recovered and restarted.",
- [?MODULE]
- ),
- erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate),
- {noreply, State1#state{paused = true}};
-handle_info(activate, State) ->
- {noreply, activate_channel(State)};
-handle_info(persist, State) ->
- ok = persist_and_reschedule(State),
+handle_info(update_status, #state{} = State) ->
+ schedule_update_status(),
+ {noreply, set_status(State)};
+handle_info(checkpoint, #state{cref = Ref} = State) when is_reference(Ref) ->
+ % If a checkpointer process is still running, don't start another one.
+ schedule_checkpoint(),
{noreply, State};
-handle_info(pause, State) ->
- {noreply, State#state{paused = true}};
+handle_info(checkpoint, #state{cref = undefined} = State) ->
+ % Start an asyncronous checkpoint process so we don't block the channel
+ #state{waiting = Waiting, active = Active, starting = Starting} = State,
+ Args = [Waiting, Active, Starting],
+ {_, Ref} = spawn_monitor(smoosh_persist, persist, Args),
+ schedule_checkpoint(),
+ {noreply, State#state{cref = Ref}};
handle_info(unpause, State) ->
{noreply, maybe_start_compaction(State#state{paused = false})}.
-terminate(_Reason, _State) ->
- ok.
-
% private functions.
-recover(FilePath) ->
- case do_recover(FilePath) of
- {ok, List} ->
- List;
- error ->
- []
- end.
-
-do_recover(FilePath) ->
- case file:read_file(FilePath) of
- {ok, Content} ->
- <<Vsn, Binary/binary>> = Content,
- try parse_state(Vsn, ?VSN, Binary) of
- Term ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p Successfully restored state file ~s", [?MODULE, FilePath]
- ),
- {ok, Term}
- catch
- error:Reason ->
- couch_log:error(
- "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
- ),
- file:delete(FilePath),
- error
- end;
- {error, enoent} ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
- ),
- error;
- {error, Reason} ->
- couch_log:error(
- "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
- ),
- file:delete(FilePath),
- error
- end.
-
-parse_state(1, ?VSN, Binary) ->
- erlang:binary_to_term(Binary, [safe]);
-parse_state(Vsn, ?VSN, _) ->
- error({unsupported_version, Vsn}).
-
-persist_and_reschedule(State) ->
- persist_queue(State),
- erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist),
- ok.
-
-persist_queue(#state{name = Name, active = Active, starting = Starting, waiting = Waiting}) ->
- Active1 = lists:foldl(
- fun({DbName, _}, Acc) ->
- [DbName | Acc]
- end,
- [],
- Active
- ),
- Starting1 = lists:foldl(
- fun({_, DbName}, Acc) ->
- [DbName | Acc]
- end,
- [],
- Starting
- ),
- smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN),
- smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN),
- smoosh_priority_queue:write_to_file(Waiting).
-
-active_file_name(Name) ->
- filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
-
-starting_file_name(Name) ->
- filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
+unpersist(Name) ->
+ % Insert into the access table with a current
+ % timestamp to prevent the same dbs from being re-enqueued
+ % again after startup
+ Waiting = smoosh_persist:unpersist(Name),
+ MapFun = fun(Object, _Priority) ->
+ smoosh_server:update_access(Object)
+ end,
+ maps:map(MapFun, smoosh_priority_queue:to_map(Waiting)),
+ Waiting.
+
+% Periodically cache in status ets table to avoid having to block on gen_server
+% get_status() calls.
+%
+set_status(#state{} = State) ->
+ #state{active = Active, starting = Starting, waiting = Waiting} = State,
+ Status = [
+ {active, map_size(Active)},
+ {starting, map_size(Starting)},
+ {waiting, smoosh_priority_queue:info(Waiting)}
+ ],
+ true = ets:insert(State#state.stab, {status, Status}),
+ State.
add_to_queue(Key, Priority, State) ->
- #state{active = Active, waiting = Q} = State,
- case lists:keymember(Key, 1, Active) of
+ #state{name = Name, active = Active, waiting = Q} = State,
+ case is_map_key(Key, Active) of
true ->
State;
false ->
- Capacity = list_to_integer(smoosh_utils:get(State#state.name, "capacity", "9999")),
+ Capacity = smoosh_utils:capacity(State#state.name),
Level = smoosh_utils:log_level("compaction_log_level", "notice"),
- couch_log:Level(
- "~s: adding ~p to internal compactor queue with priority ~p",
- [State#state.name, Key, Priority]
- ),
- State#state{
- waiting = smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q)
- }
+ LogMsg = "~s: enqueueing ~p to compact with priority ~p",
+ LogArgs = [Name, Key, Priority],
+ couch_log:Level(LogMsg, LogArgs),
+ Q1 = smoosh_priority_queue:in(Key, Priority, Capacity, Q),
+ State#state{waiting = Q1}
end.
-maybe_activate(#state{activated = true} = State) ->
+maybe_start_compaction(#state{paused = true} = State) ->
State;
-maybe_activate(State) ->
- activate_channel(State).
-
-activate_channel(#state{name = Name, waiting = Waiting0, requests = Requests0} = State0) ->
- RecStarting = recover(starting_file_name(Name)),
- Starting = lists:foldl(
- fun(DbName, Acc) ->
- case couch_db:exists(DbName) of
- true ->
- Priority = smoosh_server:get_priority(Name, DbName),
- smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
- false ->
- Acc
- end
- end,
- Waiting0,
- RecStarting
- ),
- Waiting1 = smoosh_priority_queue:recover(Starting),
- Requests1 = lists:reverse(Requests0),
- Waiting2 = lists:foldl(
- fun({DbName, Priority}, Acc) ->
- case couch_db:exists(DbName) of
- true ->
- smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
- false ->
- Acc
- end
- end,
- Waiting1,
- Requests1
- ),
- State1 = maybe_start_compaction(State0#state{
- waiting = Waiting2, paused = false, activated = true, requests = []
- }),
- ok = persist_and_reschedule(State1),
- schedule_unpause(),
- State1#state{paused = true}.
+maybe_start_compaction(#state{paused = false, name = Name} = State) ->
+ Concurrency = smoosh_utils:concurrency(Name),
+ maybe_start_compaction(Concurrency, State).
-maybe_start_compaction(#state{paused = true} = State) ->
+maybe_start_compaction(Concurrency, #state{active = A, starting = S} = State) when
+ map_size(A) + map_size(S) >= Concurrency
+->
State;
-maybe_start_compaction(State) ->
- Concurrency = list_to_integer(
- smoosh_utils:get(
- State#state.name,
- "concurrency",
- "1"
- )
- ),
- if
- length(State#state.active) + length(State#state.starting) < Concurrency ->
- case smoosh_priority_queue:out(State#state.waiting) of
- false ->
- maybe_activate(State);
- {Key, Priority, Q} ->
- try
- State2 =
- case start_compact(State, Key) of
- false ->
- State;
- State1 ->
- Level = smoosh_utils:log_level(
- "compaction_log_level",
- "notice"
- ),
- couch_log:Level(
- "~s: Starting compaction for ~s (priority ~p)",
- [State#state.name, smoosh_utils:stringify(Key), Priority]
- ),
- State1
- end,
- maybe_start_compaction(State2#state{waiting = Q})
- catch
- Class:Exception ->
- couch_log:warning(
- "~s: ~p ~p for ~s",
- [
- State#state.name,
- Class,
- Exception,
- smoosh_utils:stringify(Key)
- ]
- ),
- maybe_start_compaction(State#state{waiting = Q})
- end
- end;
- true ->
+maybe_start_compaction(Concurrency, #state{} = State) ->
+ case smoosh_priority_queue:out(State#state.waiting) of
+ false ->
+ State;
+ {Key, Q} ->
+ State1 = State#state{waiting = Q},
+ % Re-check priority since by the time the object was in queue, or
+ % was un-persisted after a node was down, the db or ddoc might be
+ % long gone and we don't want to crash the channel attemping to
+ % compact it
+ State2 =
+ case priority(State1, Key) of
+ 0 -> State1;
+ _ -> try_compact(State1, Key)
+ end,
+ maybe_start_compaction(Concurrency, State2)
+ end.
+
+priority(#state{name = Name}, Key) ->
+ try
+ smoosh_server:get_priority(Name, Key)
+ catch
+ Tag:Error ->
+ % We are being defensive as we don't want to crash the channel
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s: Failed to get priority for ~s in queue ~p:~p",
+ LogArgs = [Name, smoosh_utils:stringify(Key), Tag, Error],
+ couch_log:Level(LogMsg, LogArgs),
+ 0
+ end.
+
+try_compact(#state{name = Name} = State, Key) ->
+ try start_compact(State, Key) of
+ false ->
+ State;
+ #state{} = State1 ->
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s: Starting compaction for ~s",
+ LogArgs = [Name, smoosh_utils:stringify(Key)],
+ couch_log:Level(LogMsg, LogArgs),
+ State1
+ catch
+ Class:Exception ->
+ LogArgs = [Name, Class, Exception, smoosh_utils:stringify(Key)],
+ couch_log:warning("~s: compaction error ~p:~p for ~s", LogArgs),
State
end.
-start_compact(State, DbName) when is_list(DbName) ->
- start_compact(State, ?l2b(DbName));
-start_compact(State, DbName) when is_binary(DbName) ->
+start_compact(#state{} = State, {?INDEX_CLEANUP, DbName} = Key) ->
+ #state{name = Name, active = Active} = State,
+ case smoosh_utils:ignore_db(DbName) of
+ false ->
+ {Pid, _Ref} = spawn_monitor(fun() -> cleanup_index_files(DbName) end),
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s: Starting index cleanup for ~s",
+ LogArgs = [Name, smoosh_utils:stringify(Key)],
+ couch_log:Level(LogMsg, LogArgs),
+ State#state{active = Active#{Key => Pid}};
+ _ ->
+ false
+ end;
+start_compact(#state{name = Name} = State, DbName) when is_binary(DbName) ->
case couch_db:open_int(DbName, []) of
{ok, Db} ->
try
@@ -482,94 +338,221 @@ start_compact(State, DbName) when is_binary(DbName) ->
couch_db:close(Db)
end;
Error = {not_found, no_db_file} ->
- couch_log:warning(
- "Error starting compaction for ~p: ~p",
- [smoosh_utils:stringify(DbName), Error]
- ),
+ LogMsg = "~s : Error starting compaction for ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+ couch_log:warning(LogMsg, LogArgs),
false
end;
-start_compact(State, {Shard, GroupId}) ->
+start_compact(#state{} = State, {Shard, GroupId} = Key) ->
+ #state{name = Name, starting = Starting} = State,
case smoosh_utils:ignore_db({Shard, GroupId}) of
false ->
- DbName = mem3:dbname(Shard),
- {ok, Pid} = couch_index_server:get_index(
- couch_mrview_index, Shard, GroupId
- ),
- spawn(fun() -> cleanup_index_files(DbName, Shard) end),
- Ref = erlang:monitor(process, Pid),
- Pid ! {'$gen_call', {self(), Ref}, compact},
- State#state{starting = [{Ref, {Shard, GroupId}} | State#state.starting]};
+ case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
+ {ok, Pid} ->
+ schedule_cleanup_index_files(Shard),
+ Ref = erlang:monitor(process, Pid),
+ Pid ! {'$gen_call', {self(), Ref}, compact},
+ State#state{starting = Starting#{Ref => Key}};
+ Error ->
+ LogMsg = "~s : Error starting view compaction for ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(Key), Error],
+ couch_log:warning(LogMsg, LogArgs),
+ false
+ end;
_ ->
false
end;
-start_compact(State, Db) ->
- case smoosh_utils:ignore_db(Db) of
+start_compact(#state{} = State, Db) ->
+ #state{name = Name, starting = Starting, active = Active} = State,
+ Key = couch_db:name(Db),
+ case smoosh_utils:ignore_db(Key) of
false ->
- DbPid = couch_db:get_pid(Db),
- Key = couch_db:name(Db),
case couch_db:get_compactor_pid(Db) of
nil ->
+ DbPid = couch_db:get_pid(Db),
Ref = erlang:monitor(process, DbPid),
DbPid ! {'$gen_call', {self(), Ref}, start_compact},
- State#state{starting = [{Ref, Key} | State#state.starting]};
+ State#state{starting = Starting#{Ref => Key}};
% Compaction is already running, so monitor existing compaction pid.
- CPid ->
- Level = smoosh_utils:log_level("compaction_log_level", "notice"),
- couch_log:Level(
- "Db ~s continuing compaction",
- [smoosh_utils:stringify(Key)]
- ),
+ CPid when is_pid(CPid) ->
erlang:monitor(process, CPid),
- State#state{active = [{Key, CPid} | State#state.active]}
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s : db ~s continuing compaction",
+ LogArgs = [Name, smoosh_utils:stringify(Key)],
+ couch_log:Level(LogMsg, LogArgs),
+ State#state{active = Active#{Key => CPid}}
end;
_ ->
false
end.
-maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) ->
+maybe_remonitor_cpid(#state{} = State, DbName, Reason) when is_binary(DbName) ->
+ #state{name = Name, active = Active} = State,
case couch_db:open_int(DbName, []) of
{ok, Db} ->
- case couch_db:get_compactor_pid_sync(Db) of
+ try couch_db:get_compactor_pid_sync(Db) of
nil ->
- couch_log:warning(
- "exit for compaction of ~p: ~p",
- [smoosh_utils:stringify(DbName), Reason]
- ),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]),
+ LogMsg = "~s : exit for compaction of ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), Reason],
+ couch_log:warning(LogMsg, LogArgs),
+ re_enqueue(DbName),
State;
- CPid ->
- Level = smoosh_utils:log_level("compaction_log_level", "notice"),
- couch_log:Level(
- "~s compaction already running. Re-monitor Pid ~p",
- [smoosh_utils:stringify(DbName), CPid]
- ),
+ CPid when is_pid(CPid) ->
erlang:monitor(process, CPid),
- State#state{active = [{DbName, CPid} | State#state.active]}
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s: ~s compaction already running. Re-monitor Pid ~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), CPid],
+ couch_log:Level(LogMsg, LogArgs),
+ State#state{active = Active#{DbName => CPid}}
+ catch
+ _:Error ->
+ LogMsg = "~s: error remonitoring db compaction ~p error:~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+ couch_log:warning(LogMsg, LogArgs),
+ re_enqueue(DbName),
+ State
end;
Error = {not_found, no_db_file} ->
- couch_log:warning(
- "exit for compaction of ~p: ~p",
- [smoosh_utils:stringify(DbName), Error]
- ),
+ LogMsg = "~s : exit for compaction of ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+ couch_log:warning(LogMsg, LogArgs),
State
end;
% not a database compaction, so ignore the pid check
-maybe_remonitor_cpid(State, Key, Reason) ->
- couch_log:warning(
- "exit for compaction of ~p: ~p",
- [smoosh_utils:stringify(Key), Reason]
- ),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+maybe_remonitor_cpid(#state{name = Name} = State, Key, Reason) ->
+ LogMsg = "~s: exit for compaction of ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(Key), Reason],
+ couch_log:warning(LogMsg, LogArgs),
+ re_enqueue(Key),
State.
+schedule_check_window() ->
+ erlang:send_after(?TIME_WINDOW_MSEC, self(), check_window).
+
+schedule_update_status() ->
+ erlang:send_after(?STATUS_UPDATE_INTERVAL_MSEC, self(), update_status).
+
schedule_unpause() ->
- WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
+ WaitSecs = config:get_integer("smoosh", "wait_secs", 30),
erlang:send_after(WaitSecs * 1000, self(), unpause).
-cleanup_index_files(DbName, _Shard) ->
- case config:get("smoosh", "cleanup_index_files", "false") of
- "true" ->
- fabric:cleanup_index_files(DbName);
+schedule_checkpoint() ->
+ erlang:send_after(?CHECKPOINT_INTERVAL_MSEC, self(), checkpoint).
+
+re_enqueue(Obj) ->
+ case whereis(smoosh_server) of
+ Pid when is_pid(Pid) ->
+ Cast = {'$gen_cast', {enqueue, Obj}},
+ erlang:send_after(?RE_ENQUEUE_INTERVAL, Pid, Cast),
+ ok;
_ ->
ok
end.
+
+cleanup_index_files(DbName) ->
+ case should_clean_up_indices() of
+ true -> fabric:cleanup_index_files(DbName);
+ false -> ok
+ end.
+
+schedule_cleanup_index_files(Shard) ->
+ case should_clean_up_indices() of
+ true ->
+ % Since cleanup is at the cluster level, schedule it with a chance
+ % inversely proportional to the number of local shards
+ DbName = mem3:dbname(Shard),
+ try length(mem3:local_shards(DbName)) of
+ ShardCount when ShardCount >= 1 ->
+ case rand:uniform() < (1 / ShardCount) of
+ true ->
+ Arg = {?INDEX_CLEANUP, DbName},
+ smoosh_server:enqueue(Arg);
+ false ->
+ ok
+ end;
+ _ ->
+ ok
+ catch
+ error:database_does_not_exist ->
+ ok
+ end;
+ false ->
+ ok
+ end.
+
+should_clean_up_indices() ->
+ config:get_boolean("smoosh", "cleanup_index_files", true).
+
+do_suspend(#state{active = Active} = State) ->
+ [suspend_pid(Pid) || Pid <- maps:values(Active)],
+ State#state{paused = true}.
+
+do_resume(#state{active = Active} = State) ->
+ [resume_pid(Pid) || Pid <- maps:values(Active)],
+ State#state{paused = false}.
+
+suspend_pid(Pid) when is_pid(Pid) ->
+ catch erlang:suspend_process(Pid, [unless_suspending]).
+
+resume_pid(Pid) when is_pid(Pid) ->
+ catch erlang:resume_process(Pid).
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+start_compact_errors_test_() ->
+ {
+ foreach,
+ fun setup_purge_seq/0,
+ fun teardown_purge_seq/1,
+ [
+ ?TDEF_FE(t_start_db_with_missing_db),
+ ?TDEF_FE(t_start_view_with_missing_db),
+ ?TDEF_FE(t_start_view_with_missing_index),
+ ?TDEF_FE(t_start_compact_throws)
+ ]
+ }.
+
+setup_purge_seq() ->
+ meck:new(couch_log, [passthrough]),
+ meck:new(couch_db, [passthrough]),
+ meck:new(smoosh_utils, [passthrough]),
+ Ctx = test_util:start_couch(),
+ DbName = ?tempdb(),
+ {ok, Db} = couch_server:create(DbName, []),
+ couch_db:close(Db),
+ {Ctx, DbName}.
+
+teardown_purge_seq({Ctx, DbName}) ->
+ couch_server:delete(DbName, []),
+ test_util:stop_couch(Ctx),
+ meck:unload().
+
+t_start_db_with_missing_db({_, _}) ->
+ State = #state{name = "ratio_dbs"},
+ meck:reset(couch_log),
+ try_compact(State, <<"missing_db">>),
+ ?assertEqual(1, meck:num_calls(couch_log, warning, 2)).
+
+t_start_view_with_missing_db({_, _}) ->
+ State = #state{name = "ratio_views"},
+ meck:reset(couch_log),
+ try_compact(State, {<<"missing_db">>, <<"_design/nope">>}),
+ ?assertEqual(1, meck:num_calls(couch_log, warning, 2)).
+
+t_start_view_with_missing_index({_, DbName}) ->
+ State = #state{name = "ratio_views"},
+ meck:reset(couch_log),
+ try_compact(State, {DbName, <<"_design/nope">>}),
+ ?assertEqual(1, meck:num_calls(couch_log, warning, 2)).
+
+t_start_compact_throws({_, _}) ->
+ State = #state{name = "ratio_dbs"},
+ % Make something explode inside start_compact, so pick smoosh_util:ignore/1
+ meck:expect(smoosh_utils, ignore_db, 1, meck:raise(error, foo)),
+ meck:reset(couch_log),
+ try_compact(State, {<<"some_db">>, <<"_design/some_view">>}),
+ ?assertEqual(1, meck:num_calls(couch_log, warning, 2)).
+
+-endif.
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 27277be04..10368a549 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -12,27 +12,24 @@
-module(smoosh_server).
-behaviour(gen_server).
--vsn(4).
-behaviour(config_listener).
--include_lib("couch/include/couch_db.hrl").
% public api.
-export([
start_link/0,
suspend/0,
resume/0,
+ flush/0,
enqueue/1,
sync_enqueue/1,
- sync_enqueue/2,
handle_db_event/3,
status/0
]).
--define(SECONDS_PER_MINUTE, 60).
-
% gen_server api.
-export([
init/1,
+ handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
@@ -44,32 +41,44 @@
-export([handle_config_change/5, handle_config_terminate/3]).
% exported but for internal use.
--export([enqueue_request/2]).
--export([get_priority/2]).
-
-% exported for testing and debugging
--export([get_channel/1]).
+-export([
+ enqueue_request/2,
+ get_priority/2,
+ update_access/1,
+ access_cleaner/0
+]).
-ifdef(TEST).
+-define(STALENESS_MIN, 0).
+-define(ACCEESS_CLEAN_INTERVAL_MSEC, 300).
-define(RELISTEN_DELAY, 50).
-else.
+-define(STALENESS_MIN, 5).
+-define(ACCEESS_CLEAN_INTERVAL_MSEC, 3000).
-define(RELISTEN_DELAY, 5000).
-endif.
+-define(INDEX_CLEANUP, index_cleanup).
+-define(ACCESS, smoosh_access).
+-define(ACCESS_MAX_SIZE, 250000).
+-define(ACCESS_NEVER, -1 bsl 58).
+
% private records.
-record(state, {
db_channels = [],
view_channels = [],
- tab,
+ cleanup_channels = [],
event_listener,
waiting = #{},
- waiting_by_ref = #{}
+ waiting_by_ref = #{},
+ access_cleaner
}).
-record(channel, {
name,
- pid
+ pid,
+ stab
}).
% public functions.
@@ -78,22 +87,35 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
suspend() ->
- gen_server:call(?MODULE, suspend).
+ gen_server:call(?MODULE, suspend, infinity).
resume() ->
- gen_server:call(?MODULE, resume).
+ gen_server:call(?MODULE, resume, infinity).
-status() ->
- gen_server:call(?MODULE, status).
+flush() ->
+ gen_server:call(?MODULE, flush, infinity).
-enqueue(Object) ->
- gen_server:cast(?MODULE, {enqueue, Object}).
+status() ->
+ try ets:foldl(fun get_channel_status/2, [], ?MODULE) of
+ Res -> {ok, Res}
+ catch
+ error:badarg ->
+ {ok, []}
+ end.
-sync_enqueue(Object) ->
- gen_server:call(?MODULE, {enqueue, Object}).
+enqueue(Object0) ->
+ Object = smoosh_utils:validate_arg(Object0),
+ case stale_enough(Object) of
+ true -> gen_server:cast(?MODULE, {enqueue, Object});
+ false -> ok
+ end.
-sync_enqueue(Object, Timeout) ->
- gen_server:call(?MODULE, {enqueue, Object}, Timeout).
+sync_enqueue(Object0) ->
+ Object = smoosh_utils:validate_arg(Object0),
+ case stale_enough(Object) of
+ true -> gen_server:call(?MODULE, {enqueue, Object}, infinity);
+ false -> ok
+ end.
handle_db_event(DbName, local_updated, St) ->
enqueue(DbName),
@@ -110,35 +132,29 @@ handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) ->
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
-% for testing and debugging only
-get_channel(ChannelName) ->
- gen_server:call(?MODULE, {get_channel, ChannelName}).
-
% gen_server functions.
init([]) ->
process_flag(trap_exit, true),
+ process_flag(message_queue_data, off_heap),
ok = config:listen_for_changes(?MODULE, nil),
- {ok, Pid} = start_event_listener(),
- DbChannels = smoosh_utils:split(
- config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs")
- ),
- ViewChannels = smoosh_utils:split(
- config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views")
- ),
- Tab = ets:new(channels, [{keypos, #channel.name}]),
- {ok,
- create_missing_channels(#state{
- db_channels = DbChannels,
- view_channels = ViewChannels,
- event_listener = Pid,
- tab = Tab
- })}.
-
-handle_config_change("smoosh", "db_channels", L, _, _) ->
- {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})};
-handle_config_change("smoosh", "view_channels", L, _, _) ->
- {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})};
+ Opts = [named_table, {read_concurrency, true}],
+ ets:new(?MODULE, Opts ++ [{keypos, #channel.name}]),
+ ets:new(?ACCESS, Opts ++ [{write_concurrency, true}, public]),
+ State = #state{
+ access_cleaner = spawn_link(?MODULE, access_cleaner, []),
+ db_channels = smoosh_utils:db_channels(),
+ view_channels = smoosh_utils:view_channels(),
+ cleanup_channels = smoosh_utils:cleanup_channels()
+ },
+ {ok, State, {continue, create_channels}}.
+
+handle_config_change("smoosh", "db_channels", _, _, _) ->
+ {ok, gen_server:cast(?MODULE, new_db_channels)};
+handle_config_change("smoosh", "view_channels", _, _, _) ->
+ {ok, gen_server:cast(?MODULE, new_view_channels)};
+handle_config_change("smoosh", "cleanup_channels", _, _, _) ->
+ {ok, gen_server:cast(?MODULE, new_cleanup_channels)};
handle_config_change(_, _, _, _, _) ->
{ok, nil}.
@@ -151,51 +167,58 @@ handle_config_terminate(_Server, _Reason, _State) ->
restart_config_listener
).
-handle_call(status, _From, State) ->
- Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab),
- {reply, {ok, Acc}, State};
+handle_continue(create_channels, #state{} = State) ->
+ % Warn users about smoosh persistence misconfiguration issues. Do it once
+ % on startup to avoid continuously spamming logs with errors.
+ smoosh_persist:check_setup(),
+ State1 = create_missing_channels(State),
+ {ok, Pid} = start_event_listener(),
+ {noreply, State1#state{event_listener = Pid}}.
+
handle_call({enqueue, Object}, _From, State) ->
{noreply, NewState} = handle_cast({enqueue, Object}, State),
{reply, ok, NewState};
handle_call(suspend, _From, State) ->
- ets:foldl(
- fun(#channel{name = Name, pid = P}, _) ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level("Suspending ~p", [Name]),
- smoosh_channel:suspend(P)
- end,
- 0,
- State#state.tab
- ),
+ Fun = fun(#channel{name = Name, pid = P}, _) ->
+ Level = smoosh_utils:log_level("compaction_log_level", "debug"),
+ couch_log:Level("Suspending ~p", [Name]),
+ smoosh_channel:suspend(P)
+ end,
+ ets:foldl(Fun, ok, ?MODULE),
{reply, ok, State};
handle_call(resume, _From, State) ->
- ets:foldl(
- fun(#channel{name = Name, pid = P}, _) ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level("Resuming ~p", [Name]),
- smoosh_channel:resume(P)
- end,
- 0,
- State#state.tab
- ),
+ Fun = fun(#channel{name = Name, pid = P}, _) ->
+ Level = smoosh_utils:log_level("compaction_log_level", "debug"),
+ couch_log:Level("Resuming ~p", [Name]),
+ smoosh_channel:resume(P)
+ end,
+ ets:foldl(Fun, ok, ?MODULE),
{reply, ok, State};
-handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) ->
- {reply, {ok, channel_pid(Tab, ChannelName)}, State}.
-
-handle_cast({new_db_channels, Channels}, State) ->
- [
- smoosh_channel:close(channel_pid(State#state.tab, C))
- || C <- State#state.db_channels -- Channels
- ],
- {noreply, create_missing_channels(State#state{db_channels = Channels})};
-handle_cast({new_view_channels, Channels}, State) ->
- [
- smoosh_channel:close(channel_pid(State#state.tab, C))
- || C <- State#state.view_channels -- Channels
- ],
- {noreply, create_missing_channels(State#state{view_channels = Channels})};
+handle_call(flush, _From, State) ->
+ Fun = fun(#channel{pid = P}, _) -> smoosh_channel:flush(P) end,
+ ets:foldl(Fun, ok, ?MODULE),
+ {reply, ok, State}.
+
+handle_cast(new_db_channels, #state{} = State) ->
+ Channels = smoosh_utils:db_channels(),
+ Closed = State#state.db_channels -- Channels,
+ [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+ State1 = State#state{db_channels = Channels},
+ {noreply, create_missing_channels(State1)};
+handle_cast(new_view_channels, #state{} = State) ->
+ Channels = smoosh_utils:view_channels(),
+ Closed = State#state.view_channels -- Channels,
+ [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+ State1 = State#state{view_channels = Channels},
+ {noreply, create_missing_channels(State1)};
+handle_cast(new_cleanup_channels, #state{} = State) ->
+ Channels = smoosh_utils:cleanup_channels(),
+ Closed = State#state.cleanup_channels -- Channels,
+ [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+ State1 = State#state{cleanup_channels = Channels},
+ {noreply, create_missing_channels(State1)};
handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) ->
- case maps:is_key(Object, Waiting) of
+ case is_map_key(Object, Waiting) of
true ->
{noreply, State};
false ->
@@ -208,12 +231,17 @@ handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
couch_log:Level("update notifier died ~p", [Reason]),
{ok, Pid1} = start_event_listener(),
{noreply, State#state{event_listener = Pid1}};
+handle_info({'EXIT', Pid, Reason}, #state{access_cleaner = Pid} = State) ->
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ couch_log:Level("access cleaner died ~p", [Reason]),
+ Pid1 = spawn_link(?MODULE, access_cleaner, []),
+ {noreply, State#state{access_cleaner = Pid1}};
handle_info({'EXIT', Pid, Reason}, State) ->
Level = smoosh_utils:log_level("compaction_log_level", "notice"),
couch_log:Level("~p ~p died ~p", [?MODULE, Pid, Reason]),
- case ets:match_object(State#state.tab, #channel{pid = Pid, _ = '_'}) of
+ case ets:match_object(?MODULE, #channel{pid = Pid, _ = '_'}) of
[#channel{name = Name}] ->
- ets:delete(State#state.tab, Name);
+ ets:delete(?MODULE, Name);
_ ->
ok
end,
@@ -226,17 +254,22 @@ handle_info(restart_config_listener, State) ->
handle_info(_Msg, State) ->
{noreply, State}.
-terminate(_Reason, State) ->
- ets:foldl(
- fun(#channel{pid = P}, _) -> smoosh_channel:close(P) end,
- 0,
- State#state.tab
- ),
- ok.
+terminate(_Reason, #state{access_cleaner = CPid}) ->
+ catch unlink(CPid),
+ exit(CPid, kill),
+ Fun = fun(#channel{pid = P}, _) ->
+ smoosh_channel:close(P)
+ end,
+ ets:foldl(Fun, ok, ?MODULE).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+update_access(Object) ->
+ Now = erlang:monotonic_time(second),
+ true = ets:insert(?ACCESS, {Object, Now}),
+ ok.
+
% private functions.
add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) ->
@@ -251,18 +284,11 @@ remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) ->
{Ref, Waiting1} = maps:take(Object, Waiting),
State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
-get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) ->
- try gen_server:call(P, status) of
- {ok, Status} ->
- [{Name, Status} | Acc0];
- _ ->
- Acc0
- catch
- _:_ ->
- Acc0
- end;
-get_channel_status(_, Acc0) ->
- Acc0.
+get_channel_status(#channel{name = Name, stab = Tab}, Acc) ->
+ Status = smoosh_channel:get_status(Tab),
+ [{Name, Status} | Acc];
+get_channel_status(_, Acc) ->
+ Acc.
start_event_listener() ->
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]).
@@ -273,79 +299,92 @@ enqueue_request(State, Object) ->
false ->
ok;
{ok, Pid, Priority} ->
- smoosh_channel:enqueue(Pid, Object, Priority)
+ case ets:info(?ACCESS, size) of
+ Size when Size =< ?ACCESS_MAX_SIZE ->
+ ok = update_access(Object);
+ _ ->
+ ok
+ end,
+ QuantizedPriority = quantize(Priority),
+ smoosh_channel:enqueue(Pid, Object, QuantizedPriority)
end
catch
- Class:Exception:Stack ->
- couch_log:warning(
- "~s: ~p ~p for ~s : ~p",
- [
- ?MODULE,
- Class,
- Exception,
- smoosh_utils:stringify(Object),
- Stack
- ]
- )
+ Tag:Exception:Stack ->
+ Args = [?MODULE, Tag, Exception, smoosh_utils:stringify(Object), Stack],
+ couch_log:warning("~s: ~p ~p for ~s : ~p", Args),
+ ok
end.
-find_channel(#state{} = State, {Shard, GroupId}) ->
- find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId});
+find_channel(#state{} = State, {?INDEX_CLEANUP, DbName}) ->
+ find_channel(State, State#state.cleanup_channels, {?INDEX_CLEANUP, DbName});
+find_channel(#state{} = State, {Shard, GroupId}) when is_binary(Shard) ->
+ find_channel(State, State#state.view_channels, {Shard, GroupId});
find_channel(#state{} = State, DbName) ->
- find_channel(State#state.tab, State#state.db_channels, DbName).
+ find_channel(State, State#state.db_channels, DbName).
-find_channel(_Tab, [], _Object) ->
+find_channel(#state{} = _State, [], _Object) ->
false;
-find_channel(Tab, [Channel | Rest], Object) ->
- Pid = channel_pid(Tab, Channel),
- LastUpdated = smoosh_channel:last_updated(Pid, Object),
- StalenessInSec =
- config:get_integer("smoosh", "staleness", 5) *
- ?SECONDS_PER_MINUTE,
- Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native),
- Now = erlang:monotonic_time(),
- Activated = smoosh_channel:is_activated(Pid),
- StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness,
- case Activated andalso StaleEnough of
+find_channel(#state{} = State, [Channel | Rest], Object) ->
+ case stale_enough(Object) of
true ->
case smoosh_utils:ignore_db(Object) of
true ->
- find_channel(Tab, Rest, Object);
+ find_channel(State, Rest, Object);
_ ->
case get_priority(Channel, Object) of
0 ->
- find_channel(Tab, Rest, Object);
+ find_channel(State, Rest, Object);
Priority ->
- {ok, Pid, Priority}
+ {ok, channel_pid(Channel), Priority}
end
end;
false ->
- find_channel(Tab, Rest, Object)
+ find_channel(State, Rest, Object)
end.
-channel_pid(Tab, Channel) ->
- [#channel{pid = Pid}] = ets:lookup(Tab, Channel),
+stale_enough(Object) ->
+ LastUpdatedSec = last_updated(Object),
+ Staleness = erlang:monotonic_time(second) - LastUpdatedSec,
+ Staleness >= min_staleness_sec().
+
+channel_pid(Channel) ->
+ [#channel{pid = Pid}] = ets:lookup(?MODULE, Channel),
Pid.
-create_missing_channels(State) ->
- create_missing_channels(State#state.tab, State#state.db_channels),
- create_missing_channels(State#state.tab, State#state.view_channels),
+create_missing_channels(#state{} = State) ->
+ create_missing_channels_type(State#state.db_channels),
+ create_missing_channels_type(State#state.view_channels),
+ create_missing_channels_type(State#state.cleanup_channels),
State.
-create_missing_channels(_Tab, []) ->
+create_missing_channels_type([]) ->
ok;
-create_missing_channels(Tab, [Channel | Rest]) ->
- case ets:lookup(Tab, Channel) of
+create_missing_channels_type([Channel | Rest]) ->
+ case ets:lookup(?MODULE, Channel) of
[] ->
{ok, Pid} = smoosh_channel:start_link(Channel),
- true = ets:insert(Tab, [#channel{name = Channel, pid = Pid}]);
+ {ok, STab} = smoosh_channel:get_status_table(Pid),
+ Chan = #channel{
+ name = Channel,
+ pid = Pid,
+ stab = STab
+ },
+ true = ets:insert(?MODULE, Chan);
_ ->
ok
end,
- create_missing_channels(Tab, Rest).
+ create_missing_channels_type(Rest).
+get_priority(_Channel, {?INDEX_CLEANUP, DbName}) ->
+ try mem3:local_shards(mem3:dbname(DbName)) of
+ [_ | _] -> 1;
+ [] -> 0
+ catch
+ error:database_does_not_exist ->
+ 0
+ end;
get_priority(Channel, {Shard, GroupId}) ->
- case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
+ try couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
{ok, Pid} ->
try
{ok, ViewInfo} = couch_index:get_info(Pid),
@@ -368,9 +407,10 @@ get_priority(Channel, {Shard, GroupId}) ->
[Channel, Shard, GroupId, Reason]
),
0
+ catch
+ throw:{not_found, _} ->
+ 0
end;
-get_priority(Channel, DbName) when is_list(DbName) ->
- get_priority(Channel, ?l2b(DbName));
get_priority(Channel, DbName) when is_binary(DbName) ->
case couch_db:open_int(DbName, []) of
{ok, Db} ->
@@ -379,11 +419,8 @@ get_priority(Channel, DbName) when is_binary(DbName) ->
after
couch_db:close(Db)
end;
- Error = {not_found, no_db_file} ->
- couch_log:warning(
- "~p: Error getting priority for ~p: ~p",
- [Channel, DbName, Error]
- ),
+ {not_found, no_db_file} ->
+ % It's expected that a db might be deleted while waiting in queue
0
end;
get_priority(Channel, Db) ->
@@ -494,6 +531,38 @@ view_needs_upgrade(Props) ->
Enabled andalso length(Versions) >= 2
end.
+access_cleaner() ->
+ JitterMSec = rand:uniform(?ACCEESS_CLEAN_INTERVAL_MSEC),
+ timer:sleep(?ACCEESS_CLEAN_INTERVAL_MSEC + JitterMSec),
+ NowSec = erlang:monotonic_time(second),
+ Limit = NowSec - min_staleness_sec(),
+ Head = {'_', '$1'},
+ Guard = {'<', '$1', Limit},
+ ets:select_delete(?ACCESS, [{Head, [Guard], [true]}]),
+ access_cleaner().
+
+min_staleness_sec() ->
+ Min = config:get_integer("smoosh", "staleness", ?STALENESS_MIN),
+ Min * 60.
+
+last_updated(Object) ->
+ try ets:lookup(?ACCESS, Object) of
+ [{_, AccessSec}] ->
+ AccessSec;
+ [] ->
+ ?ACCESS_NEVER
+ catch
+ error:badarg ->
+ 0
+ end.
+
+quantize(Ratio) when is_integer(Ratio) ->
+ Ratio;
+quantize(Ratio) when is_float(Ratio), Ratio >= 16 ->
+ round(Ratio);
+quantize(Ratio) when is_float(Ratio) ->
+ round(Ratio * 16) / 16.
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
@@ -690,4 +759,16 @@ add_remove_enqueue_ref_test() ->
% It's basically back to an initial (empty) state
?assertEqual(St1, #state{}).
+quantize_test() ->
+ ?assertEqual(0, quantize(0)),
+ ?assertEqual(1, quantize(1)),
+ ?assertEqual(0.0, quantize(0.0)),
+ ?assertEqual(16, quantize(16.0)),
+ ?assertEqual(15.0, quantize(15.0)),
+ ?assertEqual(0.0, quantize(0.01)),
+ ?assertEqual(0.125, quantize(0.1)),
+ ?assertEqual(0.125, quantize(0.1042)),
+ ?assertEqual(0.125, quantize(0.125111111111)),
+ ?assertEqual(10.0, quantize(10.0002)).
+
-endif.
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
index 73876888b..0ec07025a 100644
--- a/src/smoosh/test/smoosh_tests.erl
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -3,152 +3,486 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-%% ==========
-%% Setup
-%% ----------
-
-setup(ChannelType) ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- couch_db:close(Db),
- {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
- smoosh_channel:flush(ChannelPid),
- ok = config:set("smoosh", "persist", "true", false),
- ok = config:set(config_section(ChannelType), "min_size", "1", false),
- ok = config:set(config_section(ChannelType), "min_priority", "1", false),
- DbName.
-
-teardown(ChannelType, DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok = config:delete("smoosh", "persist", false),
- ok = config:delete(config_section(DbName), "min_size", false),
- ok = config:delete(config_section(DbName), "min_priority", false),
- meck:unload(),
- {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
- smoosh_channel:flush(ChannelPid),
- ok.
-
-config_section(ChannelType) ->
- "smoosh." ++ ChannelType.
-
-%% ==========
-%% Tests
-%% ----------
-
smoosh_test_() ->
{
- "Testing smoosh",
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
{
- setup,
- fun() -> test_util:start_couch([smoosh]) end,
- fun test_util:stop/1,
+ foreach,
+ fun setup/0,
+ fun teardown/1,
[
- channels_tests(),
- persistence_tests()
+ ?TDEF_FE(t_default_channels),
+ ?TDEF_FE(t_channels_recreated_on_crash),
+ ?TDEF_FE(t_can_create_and_delete_channels),
+ ?TDEF_FE(t_db_is_enqueued_and_compacted),
+ ?TDEF_FE(t_view_is_enqueued_and_compacted),
+ ?TDEF_FE(t_index_cleanup_happens_by_default),
+ ?TDEF_FE(t_index_cleanup_can_be_disabled),
+ ?TDEF_FE(t_suspend_resume),
+ ?TDEF_FE(t_check_window_can_resume),
+ ?TDEF_FE(t_renqueue_on_crashes),
+ ?TDEF_FE(t_update_status_works),
+ ?TDEF_FE(t_checkpointing_works, 15),
+ ?TDEF_FE(t_ignore_checkpoint_resume_if_compacted_already),
+ ?TDEF_FE(t_access_cleaner_restarts),
+ ?TDEF_FE(t_event_handler_restarts),
+ ?TDEF_FE(t_manual_enqueue_api_works),
+ ?TDEF_FE(t_access_cleaner_works)
]
}
}.
-persistence_tests() ->
- Tests = [
- fun should_persist_queue/2,
- fun should_call_recover/2,
- fun should_not_call_recover/2
- ],
- {
- "Various persistence tests",
- [
- make_test_case("ratio_dbs", Tests)
- ]
- }.
+setup_all() ->
+ meck:new(smoosh_server, [passthrough]),
+ meck:new(smoosh_channel, [passthrough]),
+ meck:new(fabric, [passthrough]),
+ meck:new(couch_emsort, [passthrough]),
+ Ctx = test_util:start_couch([fabric]),
+ config:set("query_server_config", "commit_freq", "0", false),
+ Ctx.
-channels_tests() ->
- Tests = [
- fun should_enqueue/2
- ],
- {
- "Various channels tests",
+teardown_all(Ctx) ->
+ catch application:stop(smoosh),
+ config:delete("query_server_config", "commit_freq", false),
+ test_util:stop(Ctx),
+ meck:unload().
+
+setup() ->
+ config:set("smoosh", "persist", "false", false),
+ config:set("smoosh", "wait_secs", "0", false),
+ DbName = ?tempdb(),
+ fabric:create_db(DbName, [{q, 1}]),
+ {ok, _} = create_ddoc(DbName, <<"_design/foo">>, <<"bar">>),
+ {ok, _} = create_doc(DbName, <<"doc1">>, 1500000),
+ {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+ application:start(smoosh),
+ wait_for_channels(),
+ flush(),
+ DbName.
+
+teardown(DbName) ->
+ catch flush(),
+ catch application:stop(smoosh),
+ fabric:delete_db(DbName),
+ meck:reset(smoosh_server),
+ meck:reset(smoosh_channel),
+ meck:reset(couch_emsort),
+ meck:reset(fabric),
+ config:delete("smoosh", "db_channels", false),
+ config:delete("smoosh.ratio_dbs", "min_priority", false),
+ config:delete("smoosh.ratio_views", "min_priority", false),
+ config:delete("smoosh", "view_channels", false),
+ config:delete("smoosh", "cleanup_channels", false),
+ config:delete("smoosh", "wait_secs", false),
+ config:delete("smoosh", "persist", false),
+ config:delete("smoosh", "cleanup_index_files", false).
+
+t_default_channels(_) ->
+ ?assertMatch(
[
- make_test_case("ratio_dbs", Tests)
- ]
- }.
+ {"index_cleanup", _},
+ {"ratio_dbs", _},
+ {"ratio_views", _},
+ {"slack_dbs", _},
+ {"slack_views", _},
+ {"upgrade_dbs", _},
+ {"upgrade_views", _}
+ ],
+ status()
+ ),
+ % If app hasn't started status won't crash
+ application:stop(smoosh),
+ ?assertEqual([], status()).
-make_test_case(Type, Funs) ->
- {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+t_channels_recreated_on_crash(_) ->
+ RatioDbsPid = get_channel_pid("ratio_dbs"),
+ meck:reset(smoosh_channel),
+ exit(RatioDbsPid, kill),
+ meck:wait(1, smoosh_channel, start_link, 1, 3000),
+ wait_for_channels(7),
+ ?assertMatch([_, {"ratio_dbs", _} | _], status()),
+ ?assertNotEqual(RatioDbsPid, get_channel_pid("ratio_dbs")).
-should_enqueue(ChannelType, DbName) ->
- ?_test(begin
- ok = grow_db_file(DbName, 300),
- ok = wait_enqueue(ChannelType, DbName),
- ?assert(is_enqueued(ChannelType, DbName)),
- ok
- end).
+t_can_create_and_delete_channels(_) ->
+ config:set("smoosh", "db_channels", "mychan1", false),
+ config:set("smoosh", "view_channels", "mychan2", false),
+ config:set("smoosh", "cleanup_channels", "mychan3", false),
+ % 7 default ones + 3 new ones
+ wait_for_channels(10),
+ meck:reset(smoosh_channel),
+ config:delete("smoosh", "db_channels", false),
+ config:delete("smoosh", "view_channels", false),
+ config:delete("smoosh", "cleanup_channels", false),
+ wait_for_channels(7).
-should_persist_queue(ChannelType, DbName) ->
- ?_test(begin
- {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
- ok = grow_db_file(DbName, 300),
- ok = wait_enqueue(ChannelType, DbName),
- ok = smoosh_channel:persist(ChannelPid),
- Q0 = channel_queue(ChannelType),
- ok = application:stop(smoosh),
- ok = application:start(smoosh),
- Q1 = channel_queue(ChannelType),
- % Assert that queues are not empty
- ?assertNotEqual(Q0, smoosh_priority_queue:new(ChannelType)),
- ?assertNotEqual(Q1, smoosh_priority_queue:new(ChannelType)),
- ?assertEqual(Q0, Q1),
- ok
- end).
+t_db_is_enqueued_and_compacted(DbName) ->
+ ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+ meck:reset(smoosh_channel),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_to_enqueue(DbName),
+ ok = wait_compact_start(),
+ ok = wait_normal_down().
-should_call_recover(_ChannelType, _DbName) ->
- ?_test(begin
- ok = application:stop(smoosh),
- ok = config:set("smoosh", "persist", "true", false),
- meck:new(smoosh_priority_queue, [passthrough]),
- ok = application:start(smoosh),
- timer:sleep(1000),
- ?assertNotEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
- ok
- end).
+t_view_is_enqueued_and_compacted(DbName) ->
+ % We don't want index cleanup to interfere for now
+ config:set("smoosh", "cleanup_index_files", "false", false),
+ % Ensure db is compacted
+ meck:reset(smoosh_channel),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_normal_down(),
+ % Check view
+ meck:reset(smoosh_channel),
+ {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+ ok = wait_to_enqueue({DbName, <<"_design/foo">>}),
+ ok = wait_compact_start(),
+ ok = wait_normal_down().
-should_not_call_recover(_ChannelType, _DbName) ->
- ?_test(begin
- ok = application:stop(smoosh),
- ok = config:set("smoosh", "persist", "false", false),
- meck:new(smoosh_priority_queue, [passthrough]),
- ok = application:start(smoosh),
- timer:sleep(1000),
- ?assertEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
- ok
- end).
+t_index_cleanup_happens_by_default(DbName) ->
+ ?assert(config:get_boolean("smoosh", "cleanup_index_files", true)),
+ % Db compacts
+ meck:reset(smoosh_channel),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_normal_down(),
+ % View should compact as well
+ meck:reset(fabric),
+ meck:reset(smoosh_channel),
+ {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+ % View cleanup should have been invoked
+ meck:wait(fabric, cleanup_index_files, [DbName], 4000).
+
+t_index_cleanup_can_be_disabled(DbName) ->
+ config:set("smoosh", "cleanup_index_files", "false", false),
+ % Db compacts
+ meck:reset(smoosh_channel),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_normal_down(),
+ % View should compact as well
+ meck:reset(fabric),
+ meck:reset(smoosh_channel),
+ {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+ ok = wait_compact_start(),
+ ok = wait_normal_down(),
+ % View cleanup was not called
+ timer:sleep(1000),
+ ?assertEqual(0, meck:num_calls(fabric, cleanup_index_files, 1)).
+
+t_suspend_resume(DbName) ->
+ ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+ meck:reset(smoosh_channel),
+ setup_db_compactor_intercept(),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_to_enqueue(DbName),
+ CompPid = wait_db_compactor_pid(),
+ ok = smoosh:suspend(),
+ ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+ ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")),
+ % Suspending twice should work too
+ ok = smoosh:suspend(),
+ ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+ ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")),
+ ok = smoosh:resume(),
+ ?assertNotEqual({status, suspended}, erlang:process_info(CompPid, status)),
+ % Resuming twice should work too
+ ok = smoosh:resume(),
+ ?assertNotEqual({status, suspended}, erlang:process_info(CompPid, status)),
+ CompPid ! continue,
+ ok = wait_normal_down().
+
+t_check_window_can_resume(DbName) ->
+ ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+ meck:reset(smoosh_channel),
+ setup_db_compactor_intercept(),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_to_enqueue(DbName),
+ CompPid = wait_db_compactor_pid(),
+ ok = smoosh:suspend(),
+ ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+ get_channel_pid("ratio_dbs") ! check_window,
+ CompPid ! continue,
+ ok = wait_normal_down().
+
+t_renqueue_on_crashes(DbName) ->
+ ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+ meck:reset(smoosh_channel),
+ setup_db_compactor_intercept(),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_to_enqueue(DbName),
+ CompPid = wait_db_compactor_pid(),
+ meck:reset(smoosh_channel),
+ CompPid ! {raise, error, boom},
+ ok = wait_to_enqueue(DbName),
+ CompPid2 = wait_db_compactor_pid(),
+ CompPid2 ! continue,
+ ok = wait_normal_down().
+
+t_update_status_works(DbName) ->
+ setup_db_compactor_intercept(),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_to_enqueue(DbName),
+ CompPid = wait_db_compactor_pid(),
+ % status should have 1 starting job, but it may have not been updated yet so
+ % we wait until update_status is called
+ wait_update_status(),
+ WaitFun = fun() ->
+ case {1, 0, 0} =:= status("ratio_dbs") of
+ true -> ok;
+ _ -> wait
+ end
+ end,
+ test_util:wait(WaitFun),
+ CompPid ! continue,
+ ok = wait_normal_down().
+
+t_checkpointing_works(DbName) ->
+ setup_db_compactor_intercept(),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_to_enqueue(DbName),
+ CompPid = wait_db_compactor_pid(),
+ ChanPid = get_channel_pid("ratio_dbs"),
+ config:set("smoosh", "persist", "true", false),
+ meck:reset(smoosh_channel),
+ ChanPid ! checkpoint,
+ % Wait for checkpoint process to exit
+ ok = wait_normal_down(),
+ % Stop smoosh and then crash the compaction
+ ok = application:stop(smoosh),
+ CompPid ! {raise, error, kapow},
+ % Smoosh should resume job and continue compacting
+ setup_db_compactor_intercept(),
+ meck:reset(smoosh_channel),
+ ok = application:start(smoosh),
+ CompPid2 = wait_db_compactor_pid(),
+ ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")),
+ CompPid2 ! continue,
+ ok = wait_normal_down().
+
+t_ignore_checkpoint_resume_if_compacted_already(DbName) ->
+ setup_db_compactor_intercept(),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ok = wait_to_enqueue(DbName),
+ CompPid = wait_db_compactor_pid(),
+ ChanPid = get_channel_pid("ratio_dbs"),
+ config:set("smoosh", "persist", "true", false),
+ meck:reset(smoosh_channel),
+ ChanPid ! checkpoint,
+ % Wait for checkpoint process to exit
+ ok = wait_normal_down(),
+ % Stop smoosh and then let the compaction finish
+ ok = application:stop(smoosh),
+ Ref = erlang:monitor(process, CompPid),
+ CompPid ! continue,
+ receive
+ {'DOWN', Ref, _, _, normal} -> ok
+ end,
+ % Smoosh should resume job and *not* compact
+ setup_db_compactor_intercept(),
+ meck:reset(smoosh_channel),
+ ok = application:start(smoosh),
+ timer:sleep(500),
+ StartPat = {'_', {ok, '_'}},
+ ?assertEqual(0, meck:num_calls(smoosh_channel, handle_info, [StartPat, '_'])).
+
+t_access_cleaner_restarts(_) ->
+ ACPid = get_access_cleaner_pid(),
+ exit(ACPid, kill),
+ WaitFun = fun() ->
+ case get_access_cleaner_pid() of
+ Pid when Pid =/= ACPid -> ok;
+ _ -> wait
+ end
+ end,
+ test_util:wait(WaitFun),
+ ?assertNotEqual(ACPid, get_access_cleaner_pid()).
+
+t_event_handler_restarts(_) ->
+ EHPid = get_event_handler_pid(),
+ exit(EHPid, kill),
+ WaitFun = fun() ->
+ case get_event_handler_pid() of
+ Pid when Pid =/= EHPid -> ok;
+ _ -> wait
+ end
+ end,
+ test_util:wait(WaitFun),
+ ?assertNotEqual(EHPid, get_access_cleaner_pid()).
-grow_db_file(DbName, SizeInKb) ->
- {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
- Data = b64url:encode(crypto:strong_rand_bytes(SizeInKb * 1024)),
- Body = {[{<<"value">>, Data}]},
- Doc = #doc{id = <<"doc1">>, body = Body},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- couch_db:close(Db),
- ok.
-
-is_enqueued(ChannelType, DbName) ->
- {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
- smoosh_channel:is_key(ChannelPid, DbName).
-
-wait_enqueue(ChannelType, DbName) ->
- test_util:wait(
- fun() ->
- case is_enqueued(ChannelType, DbName) of
- false ->
- wait;
- true ->
- ok
- end
+t_manual_enqueue_api_works(DbName) ->
+ Shard = shard_name(DbName),
+
+ SmooshPid = whereis(smoosh_server),
+ RatioDbsPid = get_channel_pid("ratio_dbs"),
+ RatioViewsPid = get_channel_pid("ratio_views"),
+ CleanupPid = get_channel_pid("index_cleanup"),
+
+ % Lower min priority so that enqueued shards would try to compact
+ config:set("smoosh.ratio_dbs", "min_priority", "1", false),
+ config:set("smoosh.ratio_views", "min_priority", "1", false),
+
+ ?assertEqual(ok, smoosh_server:sync_enqueue(<<"invalid">>)),
+ ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, <<"invalid">>})),
+ ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/invalid">>})),
+
+ ?assertEqual(ok, smoosh_server:sync_enqueue(Shard)),
+ ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, Shard})),
+ ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/foo">>})),
+
+ ?assertEqual(ok, smoosh:enqueue(Shard)),
+ ?assertEqual(ok, smoosh:enqueue({index_cleanup, Shard})),
+ ?assertEqual(ok, smoosh:enqueue({Shard, <<"_design/foo">>})),
+
+ smoosh:enqueue_all_dbs(),
+ smoosh:enqueue_all_views(),
+
+ % Enqueuing the same items in a loop should work
+ lists:foreach(
+ fun(_) ->
+ ?assertEqual(ok, smoosh:enqueue(Shard)),
+ ?assertEqual(ok, smoosh:enqueue({index_cleanup, Shard})),
+ ?assertEqual(ok, smoosh:enqueue({Shard, <<"_design/foo">>}))
end,
- 15000
- ).
+ lists:seq(1, 1000)
+ ),
+
+ ?assertEqual(ok, smoosh_server:sync_enqueue(Shard)),
+ ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, Shard})),
+ ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/foo">>})),
+
+ % Assert that channels and smoosh server didn't crash
+ ?assertEqual(SmooshPid, whereis(smoosh_server)),
+ ?assertEqual(RatioDbsPid, get_channel_pid("ratio_dbs")),
+ ?assertEqual(RatioViewsPid, get_channel_pid("ratio_views")),
+ ?assertEqual(CleanupPid, get_channel_pid("index_cleanup")).
+
+t_access_cleaner_works(_) ->
+ Now = erlang:monotonic_time(second),
+ ets:insert(smoosh_access, {<<"db1">>, Now - 3600}),
+ WaitFun = fun() ->
+ case ets:tab2list(smoosh_access) == [] of
+ true -> ok;
+ _ -> wait
+ end
+ end,
+ test_util:wait(WaitFun),
+ ?assertEqual([], ets:tab2list(smoosh_access)).
+
+create_doc(DbName, DocId, Size) ->
+ Data = b64url:encode(crypto:strong_rand_bytes(Size)),
+ Doc = #doc{id = DocId, body = {[{<<"value">>, Data}]}},
+ fabric:update_doc(DbName, Doc, [?ADMIN_CTX]).
+
+create_ddoc(DbName, DocId, ViewName) ->
+ MapFun = <<"function(doc) {emit(doc._id, doc.value);}">>,
+ DDoc = couch_doc:from_json_obj(
+ {[
+ {<<"_id">>, DocId},
+ {<<"language">>, <<"javascript">>},
+ {<<"autoupdate">>, false},
+ {<<"views">>,
+ {[
+ {ViewName,
+ {[
+ {<<"map">>, MapFun}
+ ]}}
+ ]}}
+ ]}
+ ),
+ fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+delete_doc(DbName, DDocId) ->
+ {ok, DDoc0} = fabric:open_doc(DbName, DDocId, [?ADMIN_CTX]),
+ DDoc = DDoc0#doc{deleted = true, body = {[]}},
+ fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+status() ->
+ {ok, Props} = smoosh:status(),
+ lists:keysort(1, Props).
+
+status(Channel) ->
+ case lists:keyfind(Channel, 1, status()) of
+ {_, Val} ->
+ Val,
+ Active = proplists:get_value(active, Val),
+ Starting = proplists:get_value(starting, Val),
+ WaitingInfo = proplists:get_value(waiting, Val),
+ Waiting = proplists:get_value(size, WaitingInfo),
+ {Active, Starting, Waiting};
+ false ->
+ false
+ end.
+
+sync_status(Channel) ->
+ Pid = get_channel_pid(Channel),
+ gen_server:call(Pid, get_status_table, infinity),
+ status(Channel).
+
+flush() ->
+ ok = smoosh_server:flush().
+
+get_channel_pid(Chan) ->
+ [{channel, _, Pid, _}] = ets:lookup(smoosh_server, Chan),
+ Pid.
+
+get_access_cleaner_pid() ->
+ {state, _, _, _, _, _, _, ACPid} = sys:get_state(smoosh_server),
+ ACPid.
+
+get_event_handler_pid() ->
+ {state, _, _, _, EHPid, _, _, _} = sys:get_state(smoosh_server),
+ EHPid.
+
+wait_for_channels() ->
+ % 3 default ratios + 3 default views + 1 index cleanup = 7
+ wait_for_channels(7).
+
+wait_for_channels(N) when is_integer(N), N >= 0 ->
+ WaitFun = fun() ->
+ case length(status()) of
+ N -> ok;
+ _ -> wait
+ end
+ end,
+ test_util:wait(WaitFun).
+
+wait_to_enqueue(DbName) when is_binary(DbName) ->
+ wait_enqueue(shard_name(DbName));
+wait_to_enqueue({DbName, View}) when is_binary(DbName) ->
+ wait_enqueue({shard_name(DbName), View});
+wait_to_enqueue({index_cleanup, DbName}) when is_binary(DbName) ->
+ wait_enqueue({index_cleanup, DbName}).
+
+wait_enqueue(Obj) ->
+ Enqueue = {enqueue, Obj, '_'},
+ meck:wait(smoosh_channel, handle_cast, [Enqueue, '_'], 4000).
+
+shard_name(DbName) ->
+ [Shard] = mem3:shards(DbName),
+ mem3:name(Shard).
+
+wait_compact_start() ->
+ StartOk = {'_', {ok, '_'}},
+ meck:wait(smoosh_channel, handle_info, [StartOk, '_'], 4000).
+
+wait_normal_down() ->
+ NormalDown = {'DOWN', '_', '_', '_', normal},
+ meck:wait(smoosh_channel, handle_info, [NormalDown, '_'], 4000).
+
+wait_update_status() ->
+ meck:wait(smoosh_channel, handle_info, [update_status, '_'], 4000).
+
+setup_db_compactor_intercept() ->
+ TestPid = self(),
+ meck:expect(couch_emsort, open, fun(Fd) ->
+ TestPid ! {compactor_paused, self()},
+ receive
+ continue -> meck:passthrough([Fd]);
+ {raise, Tag, Reason} -> meck:exception(Tag, Reason)
+ end
+ end).
-channel_queue(ChannelType) ->
- Q0 = smoosh_priority_queue:new(ChannelType),
- smoosh_priority_queue:recover(Q0).
+wait_db_compactor_pid() ->
+ receive
+ {compactor_paused, Pid} ->
+ Pid
+ end.