diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-11-15 21:28:53 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-11-18 17:04:30 -0500 |
commit | 111f2616e1ef48fadb43dcc358bb8dabb69ad839 (patch) | |
tree | 0462e5bb2a0043e8bb38341ce4334076f0f916bb | |
parent | de05ea9f624d28ae99be9dba793f2e614c6f3658 (diff) | |
download | couchdb-111f2616e1ef48fadb43dcc358bb8dabb69ad839.tar.gz |
Optimize smoosh
Clean up, optimize and increase test coverage for smoosh.
* Use the new, simpler persistence module. Remove the extra `activated vs
non-activated` states. Use `handle_continue(...)` gen_server callback to
avoid blocking smoosh application initialization. Then, let channel
unpersist in their init function which happens outside general application
initialization.
* Add an index cleanup channel and enqueue index cleanup jobs by default.
* Remove gen_server bottlenecks for status and last update calls. Instead
rely on ets table lookups and gen_casts only.
* Add a few more `try ... catch`'s to avoid crashing the channels.
* Use maps to keep track of starting and active jobs in the channel.
* Re-check priority again before starting jobs. This is needed when jobs are
un-persisted after restart and database may have been compacted or deleted
already.
* Update periodic channel scheduled checks to have a common scheduling
mechanism.
* Quantize ratio priorities to keep about a single decimal worth of
precision. This should help with churn in the priority queue where the same
db is constantly added and removed for small insignificant ratio changes
like 2.0001 -> 2.0002. This works along a recent optimization in smoosh
priority queue module, where if priority matches that item is not removed
and re-added, it just stays where it is, reducing CPU and heap memory
churn.
* Remove testing-only API functions to avoid cluttering the API.
* Store messages off-heap for channels and smoosh_server.
* Instead of a per-channel `last_updated` gen_server:call(...), use a single
access ets table which is periodically cleaned from stale entries. As an
optimization, before enqueueing the shard, check last access first. This
avoids sending an extra message to smoosh_server.
* As a protection mechanism against overload, cap the access table size at
250k entries. Don't allow enqueueing more than that many entries during the
configured `[smoosh] staleness = Minutes` period.
* Increase test coverage from 60% to 90%
-rw-r--r-- | src/smoosh/src/smoosh.erl | 13 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_channel.erl | 815 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_server.erl | 389 | ||||
-rw-r--r-- | src/smoosh/test/smoosh_tests.erl | 596 |
4 files changed, 1100 insertions, 713 deletions
diff --git a/src/smoosh/src/smoosh.erl b/src/smoosh/src/smoosh.erl index 950500ffa..68e8d1828 100644 --- a/src/smoosh/src/smoosh.erl +++ b/src/smoosh/src/smoosh.erl @@ -16,7 +16,7 @@ -include_lib("mem3/include/mem3.hrl"). -export([suspend/0, resume/0, enqueue/1, status/0]). --export([enqueue_all_dbs/0, enqueue_all_dbs/1, enqueue_all_views/0]). +-export([enqueue_all_dbs/0, enqueue_all_views/0]). suspend() -> smoosh_server:suspend(). @@ -30,9 +30,6 @@ enqueue(Object) -> sync_enqueue(Object) -> smoosh_server:sync_enqueue(Object). -sync_enqueue(Object, Timeout) -> - smoosh_server:sync_enqueue(Object, Timeout). - status() -> smoosh_server:status(). @@ -44,14 +41,6 @@ enqueue_all_dbs() -> ok ). -enqueue_all_dbs(Timeout) -> - fold_local_shards( - fun(#shard{name = Name}, _Acc) -> - sync_enqueue(Name, Timeout) - end, - ok - ). - enqueue_all_views() -> fold_local_shards( fun(#shard{name = Name}, _Acc) -> diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl index 50274f704..92fd3413b 100644 --- a/src/smoosh/src/smoosh_channel.erl +++ b/src/smoosh/src/smoosh_channel.erl @@ -12,52 +12,52 @@ -module(smoosh_channel). -behaviour(gen_server). --vsn(1). --include_lib("couch/include/couch_db.hrl"). % public api. --export([start_link/1, close/1, suspend/1, resume/1, activate/1, get_status/1]). --export([enqueue/3, last_updated/2, flush/1, is_key/2, is_activated/1]). +-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]). +-export([enqueue/3, flush/1]). +-export([get_status_table/1]). % gen_server api. -export([ init/1, handle_call/3, handle_cast/2, - handle_info/2, - terminate/2 + handle_info/2 ]). --define(VSN, 1). --define(CHECKPOINT_INTERVAL_IN_MSEC, 180000). +-define(INDEX_CLEANUP, index_cleanup). +-define(TIME_WINDOW_MSEC, 60 * 1000). +-define(CHECKPOINT_INTERVAL_MSEC, 180000). --ifndef(TEST). --define(START_DELAY_IN_MSEC, 60000). --define(ACTIVATE_DELAY_IN_MSEC, 30000). +-ifdef(TEST). +-define(RE_ENQUEUE_INTERVAL, 50). +-define(STATUS_UPDATE_INTERVAL_MSEC, 490). -else. --define(START_DELAY_IN_MSEC, 0). --define(ACTIVATE_DELAY_IN_MSEC, 0). --export([persist/1]). +-define(RE_ENQUEUE_INTERVAL, 5000). +-define(STATUS_UPDATE_INTERVAL_MSEC, 4900). -endif. -% records. - -% When the state is set to activated = true, the channel has completed the state -% recovery process that occurs on (re)start and is accepting new compaction jobs. -% Note: if activated = false and a request for a new compaction job is received, -% smoosh will enqueue this new job after the state recovery process has finished. -% When the state is set to paused = false, the channel is actively compacting any -% compaction jobs that are scheduled. -% See operator_guide.md --> State diagram. +% If persistence is configured, on startup the channel will try to load its +% waiting queue from a persisted queue data file. Then, during every +% CHECKPOINT_INTERVAL_MSEC, it will spawn an checkpointer process to write the +% persisted queue state to disk. -record(state, { - active = [], + % Channel name name, + % smoosh_persisted_queue:new() object waiting, + % Paused flag. The channel starts in a the paused state. paused = true, - starting = [], - activated = false, - requests = [] + % #{Key => Pid} + active = #{}, + % #{Ref => Key} + starting = #{}, + % Monitor reference of the checkpointer process + cref, + % ETS status table handle. Used to publish channel status. + stab }). % public functions. @@ -69,19 +69,19 @@ suspend(ServerRef) -> gen_server:call(ServerRef, suspend). resume(ServerRef) -> - gen_server:call(ServerRef, resume_and_activate). - -activate(ServerRef) -> - gen_server:call(ServerRef, activate). + gen_server:call(ServerRef, resume). enqueue(ServerRef, Object, Priority) -> gen_server:cast(ServerRef, {enqueue, Object, Priority}). -last_updated(ServerRef, Object) -> - gen_server:call(ServerRef, {last_updated, Object}). - -get_status(ServerRef) -> - gen_server:call(ServerRef, status). +get_status(StatusTab) when is_reference(StatusTab) -> + try ets:lookup(StatusTab, status) of + [{status, Status}] -> Status; + [] -> [] + catch + error:badarg -> + [] + end. close(ServerRef) -> gen_server:call(ServerRef, close). @@ -89,137 +89,92 @@ close(ServerRef) -> flush(ServerRef) -> gen_server:call(ServerRef, flush). -is_key(ServerRef, Key) -> - gen_server:call(ServerRef, {is_key, Key}). - -is_activated(ServerRef) -> - gen_server:call(ServerRef, is_activated). - -% Only exported to force persistence in tests -persist(ServerRef) -> - gen_server:call(ServerRef, persist). +get_status_table(ServerRef) -> + gen_server:call(ServerRef, get_status_table). % gen_server functions. init(Name) -> - erlang:send_after(60 * 1000, self(), check_window), process_flag(trap_exit, true), - Waiting = smoosh_priority_queue:new(Name), - Persist = config:get_boolean("smoosh", "persist", false), - State = - case smoosh_utils:is_view_channel(Name) orelse Persist =:= false of - true -> - schedule_unpause(), - #state{name = Name, waiting = Waiting, paused = true, activated = true}; - false -> - erlang:send_after(?START_DELAY_IN_MSEC, self(), start_recovery), - #state{name = Name, waiting = Waiting, paused = true, activated = false} - end, - {ok, State}. - -handle_call({last_updated, Object}, _From, State) -> - LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting), - {reply, LastUpdated, State}; -handle_call(suspend, _From, State) -> - #state{active = Active} = State, - [ - catch erlang:suspend_process(Pid, [unless_suspending]) - || {_, Pid} <- Active - ], - {reply, ok, State#state{paused = true}}; -handle_call(resume_and_activate, _From, State) -> - #state{active = Active} = State, - [catch erlang:resume_process(Pid) || {_, Pid} <- Active], - {reply, ok, State#state{paused = false, activated = true}}; -handle_call(activate, _From, State) -> - {reply, ok, State#state{activated = true}}; -handle_call(status, _From, State) -> - {reply, - {ok, [ - {active, length(State#state.active)}, - {starting, length(State#state.starting)}, - {waiting, smoosh_priority_queue:info(State#state.waiting)} - ]}, - State}; + process_flag(message_queue_data, off_heap), + schedule_check_window(), + schedule_update_status(), + schedule_checkpoint(), + schedule_unpause(), + STab = ets:new(smoosh_stats, [{read_concurrency, true}]), + Waiting = unpersist(Name), + State = #state{name = Name, waiting = Waiting, stab = STab}, + {ok, set_status(State)}. + +handle_call(get_status_table, _From, #state{} = State) -> + State1 = set_status(State), + {reply, {ok, State#state.stab}, State1}; +handle_call(suspend, _From, #state{} = State) -> + {reply, ok, do_suspend(State)}; +handle_call(resume, _From, #state{} = State) -> + {reply, ok, do_resume(State)}; handle_call(close, _From, State) -> {stop, normal, ok, State}; handle_call(flush, _From, #state{waiting = Q} = State) -> - {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}}; -handle_call({is_key, Key}, _From, #state{waiting = Waiting} = State) -> - {reply, smoosh_priority_queue:is_key(Key, Waiting), State}; -handle_call(is_activated, _From, #state{activated = Activated} = State0) -> - {reply, Activated, State0}; -handle_call(persist, _From, State) -> - persist_queue(State), - {reply, ok, State}. - -handle_cast({enqueue, _Object, 0}, #state{} = State) -> - {noreply, State}; -handle_cast({enqueue, Object, Priority}, #state{activated = true} = State) -> - {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}; -handle_cast({enqueue, Object, Priority}, #state{activated = false, requests = Requests} = State0) -> - Level = smoosh_utils:log_level("compaction_log_level", "debug"), - couch_log:Level( - "~p Channel is not activated yet. Adding ~p to requests with priority ~p.", [ - ?MODULE, - Object, - Priority - ] - ), - {noreply, State0#state{requests = [{Object, Priority} | Requests]}}. + State1 = State#state{waiting = smoosh_priority_queue:flush(Q)}, + smoosh_persist:persist(State1#state.waiting, #{}, #{}), + State2 = set_status(State1), + {reply, ok, State2}. +handle_cast({enqueue, Object, Priority}, #state{} = State) -> + State1 = add_to_queue(Object, Priority, State), + {noreply, maybe_start_compaction(State1)}. + +handle_info({'DOWN', Ref, _, _, _}, #state{cref = Ref} = State) -> + {noreply, State#state{cref = undefined}}; % We accept noproc here due to possibly having monitored a restarted compaction % pid after it finished. -handle_info({'DOWN', Ref, _, Job, Reason}, State) when +handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) when Reason == normal; Reason == noproc -> #state{active = Active, starting = Starting} = State, - {noreply, - maybe_start_compaction( - State#state{ - active = lists:keydelete(Job, 2, Active), - starting = lists:keydelete(Ref, 1, Starting) - } - )}; -handle_info({'DOWN', Ref, _, Job, Reason}, State) -> - #state{active = Active0, starting = Starting0} = State, - case lists:keytake(Job, 2, Active0) of - {value, {Key, _Pid}, Active1} -> - State1 = maybe_remonitor_cpid( - State#state{active = Active1}, - Key, - Reason - ), - {noreply, maybe_start_compaction(State1)}; - false -> - case lists:keytake(Ref, 1, Starting0) of - {value, {_, Key}, Starting1} -> - couch_log:warning("failed to start compaction of ~p: ~p", [ - smoosh_utils:stringify(Key), - Reason - ]), - {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]), - {noreply, maybe_start_compaction(State#state{starting = Starting1})}; - false -> + Active1 = maps:filter(fun(_, Pid) -> Pid =/= Job end, Active), + Starting1 = maps:remove(Ref, Starting), + State1 = State#state{active = Active1, starting = Starting1}, + {noreply, maybe_start_compaction(State1)}; +handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) -> + #state{name = Name, active = Active, starting = Starting} = State, + FoundActive = maps:filter(fun(_, Pid) -> Pid =:= Job end, Active), + case maps:to_list(FoundActive) of + [{Key, _Pid}] -> + Active1 = maps:without([Key], Active), + State1 = State#state{active = maps:without([Key], Active1)}, + State2 = maybe_remonitor_cpid(State1, Key, Reason), + {noreply, maybe_start_compaction(State2)}; + [] -> + case maps:take(Ref, Starting) of + {Key, Starting1} -> + LogMsg = "~s : failed to start compaction of ~p: ~p", + LogArgs = [Name, smoosh_utils:stringify(Key), Reason], + couch_log:warning(LogMsg, LogArgs), + re_enqueue(Key), + State1 = State#state{starting = Starting1}, + {noreply, maybe_start_compaction(State1)}; + error -> {noreply, State} end end; -handle_info({Ref, {ok, Pid}}, State) when is_reference(Ref) -> - case lists:keytake(Ref, 1, State#state.starting) of - {value, {_, Key}, Starting1} -> +% This is the '$gen_call' response handling +handle_info({Ref, {ok, Pid}}, #state{} = State) when is_reference(Ref) -> + #state{name = Name, active = Active, starting = Starting} = State, + case maps:take(Ref, Starting) of + {Key, Starting1} -> Level = smoosh_utils:log_level("compaction_log_level", "notice"), - couch_log:Level( - "~s: Started compaction for ~s", - [State#state.name, smoosh_utils:stringify(Key)] - ), + LogMsg = "~s: Started compaction for ~s", + LogArgs = [Name, smoosh_utils:stringify(Key)], + couch_log:Level(LogMsg, LogArgs), erlang:monitor(process, Pid), erlang:demonitor(Ref, [flush]), - {noreply, State#state{ - active = [{Key, Pid} | State#state.active], - starting = Starting1 - }}; - false -> + Active1 = Active#{Key => Pid}, + State1 = State#state{active = Active1, starting = Starting1}, + {noreply, set_status(State1)}; + error -> {noreply, State} end; handle_info(check_window, State) -> @@ -235,245 +190,146 @@ handle_info(check_window, State) -> State; {false, true} -> % resume is always safe even if we did not previously suspend - {reply, ok, NewState} = handle_call(resume_and_activate, nil, State), - NewState; + do_resume(State); + {true, false} when StrictWindow =:= "true" -> + % suspend immediately + do_suspend(State); {true, false} -> - if - StrictWindow =:= "true" -> - {reply, ok, NewState} = handle_call(suspend, nil, State), - NewState; - true -> - State#state{paused = true} - end + % prevent new jobs from starting, active ones keep running + State#state{paused = true} end, - erlang:send_after(60 * 1000, self(), check_window), + schedule_check_window(), {noreply, FinalState}; -handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) -> - RecActive = recover(active_file_name(Name)), - Waiting1 = lists:foldl( - fun(DbName, Acc) -> - case couch_db:exists(DbName) andalso couch_db:is_compacting(DbName) of - true -> - Priority = smoosh_server:get_priority(Name, DbName), - smoosh_priority_queue:in(DbName, Priority, Priority, Acc); - false -> - Acc - end - end, - Waiting0, - RecActive - ), - State1 = maybe_start_compaction(State0#state{paused = false, waiting = Waiting1}), - Level = smoosh_utils:log_level("compaction_log_level", "debug"), - couch_log:Level( - "~p Previously active compaction jobs (if any) have been successfully recovered and restarted.", - [?MODULE] - ), - erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate), - {noreply, State1#state{paused = true}}; -handle_info(activate, State) -> - {noreply, activate_channel(State)}; -handle_info(persist, State) -> - ok = persist_and_reschedule(State), +handle_info(update_status, #state{} = State) -> + schedule_update_status(), + {noreply, set_status(State)}; +handle_info(checkpoint, #state{cref = Ref} = State) when is_reference(Ref) -> + % If a checkpointer process is still running, don't start another one. + schedule_checkpoint(), {noreply, State}; -handle_info(pause, State) -> - {noreply, State#state{paused = true}}; +handle_info(checkpoint, #state{cref = undefined} = State) -> + % Start an asyncronous checkpoint process so we don't block the channel + #state{waiting = Waiting, active = Active, starting = Starting} = State, + Args = [Waiting, Active, Starting], + {_, Ref} = spawn_monitor(smoosh_persist, persist, Args), + schedule_checkpoint(), + {noreply, State#state{cref = Ref}}; handle_info(unpause, State) -> {noreply, maybe_start_compaction(State#state{paused = false})}. -terminate(_Reason, _State) -> - ok. - % private functions. -recover(FilePath) -> - case do_recover(FilePath) of - {ok, List} -> - List; - error -> - [] - end. - -do_recover(FilePath) -> - case file:read_file(FilePath) of - {ok, Content} -> - <<Vsn, Binary/binary>> = Content, - try parse_state(Vsn, ?VSN, Binary) of - Term -> - Level = smoosh_utils:log_level("compaction_log_level", "debug"), - couch_log:Level( - "~p Successfully restored state file ~s", [?MODULE, FilePath] - ), - {ok, Term} - catch - error:Reason -> - couch_log:error( - "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath] - ), - file:delete(FilePath), - error - end; - {error, enoent} -> - Level = smoosh_utils:log_level("compaction_log_level", "debug"), - couch_log:Level( - "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath] - ), - error; - {error, Reason} -> - couch_log:error( - "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath] - ), - file:delete(FilePath), - error - end. - -parse_state(1, ?VSN, Binary) -> - erlang:binary_to_term(Binary, [safe]); -parse_state(Vsn, ?VSN, _) -> - error({unsupported_version, Vsn}). - -persist_and_reschedule(State) -> - persist_queue(State), - erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist), - ok. - -persist_queue(#state{name = Name, active = Active, starting = Starting, waiting = Waiting}) -> - Active1 = lists:foldl( - fun({DbName, _}, Acc) -> - [DbName | Acc] - end, - [], - Active - ), - Starting1 = lists:foldl( - fun({_, DbName}, Acc) -> - [DbName | Acc] - end, - [], - Starting - ), - smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN), - smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN), - smoosh_priority_queue:write_to_file(Waiting). - -active_file_name(Name) -> - filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active"). - -starting_file_name(Name) -> - filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting"). +unpersist(Name) -> + % Insert into the access table with a current + % timestamp to prevent the same dbs from being re-enqueued + % again after startup + Waiting = smoosh_persist:unpersist(Name), + MapFun = fun(Object, _Priority) -> + smoosh_server:update_access(Object) + end, + maps:map(MapFun, smoosh_priority_queue:to_map(Waiting)), + Waiting. + +% Periodically cache in status ets table to avoid having to block on gen_server +% get_status() calls. +% +set_status(#state{} = State) -> + #state{active = Active, starting = Starting, waiting = Waiting} = State, + Status = [ + {active, map_size(Active)}, + {starting, map_size(Starting)}, + {waiting, smoosh_priority_queue:info(Waiting)} + ], + true = ets:insert(State#state.stab, {status, Status}), + State. add_to_queue(Key, Priority, State) -> - #state{active = Active, waiting = Q} = State, - case lists:keymember(Key, 1, Active) of + #state{name = Name, active = Active, waiting = Q} = State, + case is_map_key(Key, Active) of true -> State; false -> - Capacity = list_to_integer(smoosh_utils:get(State#state.name, "capacity", "9999")), + Capacity = smoosh_utils:capacity(State#state.name), Level = smoosh_utils:log_level("compaction_log_level", "notice"), - couch_log:Level( - "~s: adding ~p to internal compactor queue with priority ~p", - [State#state.name, Key, Priority] - ), - State#state{ - waiting = smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q) - } + LogMsg = "~s: enqueueing ~p to compact with priority ~p", + LogArgs = [Name, Key, Priority], + couch_log:Level(LogMsg, LogArgs), + Q1 = smoosh_priority_queue:in(Key, Priority, Capacity, Q), + State#state{waiting = Q1} end. -maybe_activate(#state{activated = true} = State) -> +maybe_start_compaction(#state{paused = true} = State) -> State; -maybe_activate(State) -> - activate_channel(State). - -activate_channel(#state{name = Name, waiting = Waiting0, requests = Requests0} = State0) -> - RecStarting = recover(starting_file_name(Name)), - Starting = lists:foldl( - fun(DbName, Acc) -> - case couch_db:exists(DbName) of - true -> - Priority = smoosh_server:get_priority(Name, DbName), - smoosh_priority_queue:in(DbName, Priority, Priority, Acc); - false -> - Acc - end - end, - Waiting0, - RecStarting - ), - Waiting1 = smoosh_priority_queue:recover(Starting), - Requests1 = lists:reverse(Requests0), - Waiting2 = lists:foldl( - fun({DbName, Priority}, Acc) -> - case couch_db:exists(DbName) of - true -> - smoosh_priority_queue:in(DbName, Priority, Priority, Acc); - false -> - Acc - end - end, - Waiting1, - Requests1 - ), - State1 = maybe_start_compaction(State0#state{ - waiting = Waiting2, paused = false, activated = true, requests = [] - }), - ok = persist_and_reschedule(State1), - schedule_unpause(), - State1#state{paused = true}. +maybe_start_compaction(#state{paused = false, name = Name} = State) -> + Concurrency = smoosh_utils:concurrency(Name), + maybe_start_compaction(Concurrency, State). -maybe_start_compaction(#state{paused = true} = State) -> +maybe_start_compaction(Concurrency, #state{active = A, starting = S} = State) when + map_size(A) + map_size(S) >= Concurrency +-> State; -maybe_start_compaction(State) -> - Concurrency = list_to_integer( - smoosh_utils:get( - State#state.name, - "concurrency", - "1" - ) - ), - if - length(State#state.active) + length(State#state.starting) < Concurrency -> - case smoosh_priority_queue:out(State#state.waiting) of - false -> - maybe_activate(State); - {Key, Priority, Q} -> - try - State2 = - case start_compact(State, Key) of - false -> - State; - State1 -> - Level = smoosh_utils:log_level( - "compaction_log_level", - "notice" - ), - couch_log:Level( - "~s: Starting compaction for ~s (priority ~p)", - [State#state.name, smoosh_utils:stringify(Key), Priority] - ), - State1 - end, - maybe_start_compaction(State2#state{waiting = Q}) - catch - Class:Exception -> - couch_log:warning( - "~s: ~p ~p for ~s", - [ - State#state.name, - Class, - Exception, - smoosh_utils:stringify(Key) - ] - ), - maybe_start_compaction(State#state{waiting = Q}) - end - end; - true -> +maybe_start_compaction(Concurrency, #state{} = State) -> + case smoosh_priority_queue:out(State#state.waiting) of + false -> + State; + {Key, Q} -> + State1 = State#state{waiting = Q}, + % Re-check priority since by the time the object was in queue, or + % was un-persisted after a node was down, the db or ddoc might be + % long gone and we don't want to crash the channel attemping to + % compact it + State2 = + case priority(State1, Key) of + 0 -> State1; + _ -> try_compact(State1, Key) + end, + maybe_start_compaction(Concurrency, State2) + end. + +priority(#state{name = Name}, Key) -> + try + smoosh_server:get_priority(Name, Key) + catch + Tag:Error -> + % We are being defensive as we don't want to crash the channel + Level = smoosh_utils:log_level("compaction_log_level", "notice"), + LogMsg = "~s: Failed to get priority for ~s in queue ~p:~p", + LogArgs = [Name, smoosh_utils:stringify(Key), Tag, Error], + couch_log:Level(LogMsg, LogArgs), + 0 + end. + +try_compact(#state{name = Name} = State, Key) -> + try start_compact(State, Key) of + false -> + State; + #state{} = State1 -> + Level = smoosh_utils:log_level("compaction_log_level", "notice"), + LogMsg = "~s: Starting compaction for ~s", + LogArgs = [Name, smoosh_utils:stringify(Key)], + couch_log:Level(LogMsg, LogArgs), + State1 + catch + Class:Exception -> + LogArgs = [Name, Class, Exception, smoosh_utils:stringify(Key)], + couch_log:warning("~s: compaction error ~p:~p for ~s", LogArgs), State end. -start_compact(State, DbName) when is_list(DbName) -> - start_compact(State, ?l2b(DbName)); -start_compact(State, DbName) when is_binary(DbName) -> +start_compact(#state{} = State, {?INDEX_CLEANUP, DbName} = Key) -> + #state{name = Name, active = Active} = State, + case smoosh_utils:ignore_db(DbName) of + false -> + {Pid, _Ref} = spawn_monitor(fun() -> cleanup_index_files(DbName) end), + Level = smoosh_utils:log_level("compaction_log_level", "notice"), + LogMsg = "~s: Starting index cleanup for ~s", + LogArgs = [Name, smoosh_utils:stringify(Key)], + couch_log:Level(LogMsg, LogArgs), + State#state{active = Active#{Key => Pid}}; + _ -> + false + end; +start_compact(#state{name = Name} = State, DbName) when is_binary(DbName) -> case couch_db:open_int(DbName, []) of {ok, Db} -> try @@ -482,94 +338,221 @@ start_compact(State, DbName) when is_binary(DbName) -> couch_db:close(Db) end; Error = {not_found, no_db_file} -> - couch_log:warning( - "Error starting compaction for ~p: ~p", - [smoosh_utils:stringify(DbName), Error] - ), + LogMsg = "~s : Error starting compaction for ~p: ~p", + LogArgs = [Name, smoosh_utils:stringify(DbName), Error], + couch_log:warning(LogMsg, LogArgs), false end; -start_compact(State, {Shard, GroupId}) -> +start_compact(#state{} = State, {Shard, GroupId} = Key) -> + #state{name = Name, starting = Starting} = State, case smoosh_utils:ignore_db({Shard, GroupId}) of false -> - DbName = mem3:dbname(Shard), - {ok, Pid} = couch_index_server:get_index( - couch_mrview_index, Shard, GroupId - ), - spawn(fun() -> cleanup_index_files(DbName, Shard) end), - Ref = erlang:monitor(process, Pid), - Pid ! {'$gen_call', {self(), Ref}, compact}, - State#state{starting = [{Ref, {Shard, GroupId}} | State#state.starting]}; + case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of + {ok, Pid} -> + schedule_cleanup_index_files(Shard), + Ref = erlang:monitor(process, Pid), + Pid ! {'$gen_call', {self(), Ref}, compact}, + State#state{starting = Starting#{Ref => Key}}; + Error -> + LogMsg = "~s : Error starting view compaction for ~p: ~p", + LogArgs = [Name, smoosh_utils:stringify(Key), Error], + couch_log:warning(LogMsg, LogArgs), + false + end; _ -> false end; -start_compact(State, Db) -> - case smoosh_utils:ignore_db(Db) of +start_compact(#state{} = State, Db) -> + #state{name = Name, starting = Starting, active = Active} = State, + Key = couch_db:name(Db), + case smoosh_utils:ignore_db(Key) of false -> - DbPid = couch_db:get_pid(Db), - Key = couch_db:name(Db), case couch_db:get_compactor_pid(Db) of nil -> + DbPid = couch_db:get_pid(Db), Ref = erlang:monitor(process, DbPid), DbPid ! {'$gen_call', {self(), Ref}, start_compact}, - State#state{starting = [{Ref, Key} | State#state.starting]}; + State#state{starting = Starting#{Ref => Key}}; % Compaction is already running, so monitor existing compaction pid. - CPid -> - Level = smoosh_utils:log_level("compaction_log_level", "notice"), - couch_log:Level( - "Db ~s continuing compaction", - [smoosh_utils:stringify(Key)] - ), + CPid when is_pid(CPid) -> erlang:monitor(process, CPid), - State#state{active = [{Key, CPid} | State#state.active]} + Level = smoosh_utils:log_level("compaction_log_level", "notice"), + LogMsg = "~s : db ~s continuing compaction", + LogArgs = [Name, smoosh_utils:stringify(Key)], + couch_log:Level(LogMsg, LogArgs), + State#state{active = Active#{Key => CPid}} end; _ -> false end. -maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) -> +maybe_remonitor_cpid(#state{} = State, DbName, Reason) when is_binary(DbName) -> + #state{name = Name, active = Active} = State, case couch_db:open_int(DbName, []) of {ok, Db} -> - case couch_db:get_compactor_pid_sync(Db) of + try couch_db:get_compactor_pid_sync(Db) of nil -> - couch_log:warning( - "exit for compaction of ~p: ~p", - [smoosh_utils:stringify(DbName), Reason] - ), - {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]), + LogMsg = "~s : exit for compaction of ~p: ~p", + LogArgs = [Name, smoosh_utils:stringify(DbName), Reason], + couch_log:warning(LogMsg, LogArgs), + re_enqueue(DbName), State; - CPid -> - Level = smoosh_utils:log_level("compaction_log_level", "notice"), - couch_log:Level( - "~s compaction already running. Re-monitor Pid ~p", - [smoosh_utils:stringify(DbName), CPid] - ), + CPid when is_pid(CPid) -> erlang:monitor(process, CPid), - State#state{active = [{DbName, CPid} | State#state.active]} + Level = smoosh_utils:log_level("compaction_log_level", "notice"), + LogMsg = "~s: ~s compaction already running. Re-monitor Pid ~p", + LogArgs = [Name, smoosh_utils:stringify(DbName), CPid], + couch_log:Level(LogMsg, LogArgs), + State#state{active = Active#{DbName => CPid}} + catch + _:Error -> + LogMsg = "~s: error remonitoring db compaction ~p error:~p", + LogArgs = [Name, smoosh_utils:stringify(DbName), Error], + couch_log:warning(LogMsg, LogArgs), + re_enqueue(DbName), + State end; Error = {not_found, no_db_file} -> - couch_log:warning( - "exit for compaction of ~p: ~p", - [smoosh_utils:stringify(DbName), Error] - ), + LogMsg = "~s : exit for compaction of ~p: ~p", + LogArgs = [Name, smoosh_utils:stringify(DbName), Error], + couch_log:warning(LogMsg, LogArgs), State end; % not a database compaction, so ignore the pid check -maybe_remonitor_cpid(State, Key, Reason) -> - couch_log:warning( - "exit for compaction of ~p: ~p", - [smoosh_utils:stringify(Key), Reason] - ), - {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]), +maybe_remonitor_cpid(#state{name = Name} = State, Key, Reason) -> + LogMsg = "~s: exit for compaction of ~p: ~p", + LogArgs = [Name, smoosh_utils:stringify(Key), Reason], + couch_log:warning(LogMsg, LogArgs), + re_enqueue(Key), State. +schedule_check_window() -> + erlang:send_after(?TIME_WINDOW_MSEC, self(), check_window). + +schedule_update_status() -> + erlang:send_after(?STATUS_UPDATE_INTERVAL_MSEC, self(), update_status). + schedule_unpause() -> - WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")), + WaitSecs = config:get_integer("smoosh", "wait_secs", 30), erlang:send_after(WaitSecs * 1000, self(), unpause). -cleanup_index_files(DbName, _Shard) -> - case config:get("smoosh", "cleanup_index_files", "false") of - "true" -> - fabric:cleanup_index_files(DbName); +schedule_checkpoint() -> + erlang:send_after(?CHECKPOINT_INTERVAL_MSEC, self(), checkpoint). + +re_enqueue(Obj) -> + case whereis(smoosh_server) of + Pid when is_pid(Pid) -> + Cast = {'$gen_cast', {enqueue, Obj}}, + erlang:send_after(?RE_ENQUEUE_INTERVAL, Pid, Cast), + ok; _ -> ok end. + +cleanup_index_files(DbName) -> + case should_clean_up_indices() of + true -> fabric:cleanup_index_files(DbName); + false -> ok + end. + +schedule_cleanup_index_files(Shard) -> + case should_clean_up_indices() of + true -> + % Since cleanup is at the cluster level, schedule it with a chance + % inversely proportional to the number of local shards + DbName = mem3:dbname(Shard), + try length(mem3:local_shards(DbName)) of + ShardCount when ShardCount >= 1 -> + case rand:uniform() < (1 / ShardCount) of + true -> + Arg = {?INDEX_CLEANUP, DbName}, + smoosh_server:enqueue(Arg); + false -> + ok + end; + _ -> + ok + catch + error:database_does_not_exist -> + ok + end; + false -> + ok + end. + +should_clean_up_indices() -> + config:get_boolean("smoosh", "cleanup_index_files", true). + +do_suspend(#state{active = Active} = State) -> + [suspend_pid(Pid) || Pid <- maps:values(Active)], + State#state{paused = true}. + +do_resume(#state{active = Active} = State) -> + [resume_pid(Pid) || Pid <- maps:values(Active)], + State#state{paused = false}. + +suspend_pid(Pid) when is_pid(Pid) -> + catch erlang:suspend_process(Pid, [unless_suspending]). + +resume_pid(Pid) when is_pid(Pid) -> + catch erlang:resume_process(Pid). + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +start_compact_errors_test_() -> + { + foreach, + fun setup_purge_seq/0, + fun teardown_purge_seq/1, + [ + ?TDEF_FE(t_start_db_with_missing_db), + ?TDEF_FE(t_start_view_with_missing_db), + ?TDEF_FE(t_start_view_with_missing_index), + ?TDEF_FE(t_start_compact_throws) + ] + }. + +setup_purge_seq() -> + meck:new(couch_log, [passthrough]), + meck:new(couch_db, [passthrough]), + meck:new(smoosh_utils, [passthrough]), + Ctx = test_util:start_couch(), + DbName = ?tempdb(), + {ok, Db} = couch_server:create(DbName, []), + couch_db:close(Db), + {Ctx, DbName}. + +teardown_purge_seq({Ctx, DbName}) -> + couch_server:delete(DbName, []), + test_util:stop_couch(Ctx), + meck:unload(). + +t_start_db_with_missing_db({_, _}) -> + State = #state{name = "ratio_dbs"}, + meck:reset(couch_log), + try_compact(State, <<"missing_db">>), + ?assertEqual(1, meck:num_calls(couch_log, warning, 2)). + +t_start_view_with_missing_db({_, _}) -> + State = #state{name = "ratio_views"}, + meck:reset(couch_log), + try_compact(State, {<<"missing_db">>, <<"_design/nope">>}), + ?assertEqual(1, meck:num_calls(couch_log, warning, 2)). + +t_start_view_with_missing_index({_, DbName}) -> + State = #state{name = "ratio_views"}, + meck:reset(couch_log), + try_compact(State, {DbName, <<"_design/nope">>}), + ?assertEqual(1, meck:num_calls(couch_log, warning, 2)). + +t_start_compact_throws({_, _}) -> + State = #state{name = "ratio_dbs"}, + % Make something explode inside start_compact, so pick smoosh_util:ignore/1 + meck:expect(smoosh_utils, ignore_db, 1, meck:raise(error, foo)), + meck:reset(couch_log), + try_compact(State, {<<"some_db">>, <<"_design/some_view">>}), + ?assertEqual(1, meck:num_calls(couch_log, warning, 2)). + +-endif. diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl index 27277be04..10368a549 100644 --- a/src/smoosh/src/smoosh_server.erl +++ b/src/smoosh/src/smoosh_server.erl @@ -12,27 +12,24 @@ -module(smoosh_server). -behaviour(gen_server). --vsn(4). -behaviour(config_listener). --include_lib("couch/include/couch_db.hrl"). % public api. -export([ start_link/0, suspend/0, resume/0, + flush/0, enqueue/1, sync_enqueue/1, - sync_enqueue/2, handle_db_event/3, status/0 ]). --define(SECONDS_PER_MINUTE, 60). - % gen_server api. -export([ init/1, + handle_continue/2, handle_call/3, handle_cast/2, handle_info/2, @@ -44,32 +41,44 @@ -export([handle_config_change/5, handle_config_terminate/3]). % exported but for internal use. --export([enqueue_request/2]). --export([get_priority/2]). - -% exported for testing and debugging --export([get_channel/1]). +-export([ + enqueue_request/2, + get_priority/2, + update_access/1, + access_cleaner/0 +]). -ifdef(TEST). +-define(STALENESS_MIN, 0). +-define(ACCEESS_CLEAN_INTERVAL_MSEC, 300). -define(RELISTEN_DELAY, 50). -else. +-define(STALENESS_MIN, 5). +-define(ACCEESS_CLEAN_INTERVAL_MSEC, 3000). -define(RELISTEN_DELAY, 5000). -endif. +-define(INDEX_CLEANUP, index_cleanup). +-define(ACCESS, smoosh_access). +-define(ACCESS_MAX_SIZE, 250000). +-define(ACCESS_NEVER, -1 bsl 58). + % private records. -record(state, { db_channels = [], view_channels = [], - tab, + cleanup_channels = [], event_listener, waiting = #{}, - waiting_by_ref = #{} + waiting_by_ref = #{}, + access_cleaner }). -record(channel, { name, - pid + pid, + stab }). % public functions. @@ -78,22 +87,35 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). suspend() -> - gen_server:call(?MODULE, suspend). + gen_server:call(?MODULE, suspend, infinity). resume() -> - gen_server:call(?MODULE, resume). + gen_server:call(?MODULE, resume, infinity). -status() -> - gen_server:call(?MODULE, status). +flush() -> + gen_server:call(?MODULE, flush, infinity). -enqueue(Object) -> - gen_server:cast(?MODULE, {enqueue, Object}). +status() -> + try ets:foldl(fun get_channel_status/2, [], ?MODULE) of + Res -> {ok, Res} + catch + error:badarg -> + {ok, []} + end. -sync_enqueue(Object) -> - gen_server:call(?MODULE, {enqueue, Object}). +enqueue(Object0) -> + Object = smoosh_utils:validate_arg(Object0), + case stale_enough(Object) of + true -> gen_server:cast(?MODULE, {enqueue, Object}); + false -> ok + end. -sync_enqueue(Object, Timeout) -> - gen_server:call(?MODULE, {enqueue, Object}, Timeout). +sync_enqueue(Object0) -> + Object = smoosh_utils:validate_arg(Object0), + case stale_enough(Object) of + true -> gen_server:call(?MODULE, {enqueue, Object}, infinity); + false -> ok + end. handle_db_event(DbName, local_updated, St) -> enqueue(DbName), @@ -110,35 +132,29 @@ handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) -> handle_db_event(_DbName, _Event, St) -> {ok, St}. -% for testing and debugging only -get_channel(ChannelName) -> - gen_server:call(?MODULE, {get_channel, ChannelName}). - % gen_server functions. init([]) -> process_flag(trap_exit, true), + process_flag(message_queue_data, off_heap), ok = config:listen_for_changes(?MODULE, nil), - {ok, Pid} = start_event_listener(), - DbChannels = smoosh_utils:split( - config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs") - ), - ViewChannels = smoosh_utils:split( - config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views") - ), - Tab = ets:new(channels, [{keypos, #channel.name}]), - {ok, - create_missing_channels(#state{ - db_channels = DbChannels, - view_channels = ViewChannels, - event_listener = Pid, - tab = Tab - })}. - -handle_config_change("smoosh", "db_channels", L, _, _) -> - {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})}; -handle_config_change("smoosh", "view_channels", L, _, _) -> - {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})}; + Opts = [named_table, {read_concurrency, true}], + ets:new(?MODULE, Opts ++ [{keypos, #channel.name}]), + ets:new(?ACCESS, Opts ++ [{write_concurrency, true}, public]), + State = #state{ + access_cleaner = spawn_link(?MODULE, access_cleaner, []), + db_channels = smoosh_utils:db_channels(), + view_channels = smoosh_utils:view_channels(), + cleanup_channels = smoosh_utils:cleanup_channels() + }, + {ok, State, {continue, create_channels}}. + +handle_config_change("smoosh", "db_channels", _, _, _) -> + {ok, gen_server:cast(?MODULE, new_db_channels)}; +handle_config_change("smoosh", "view_channels", _, _, _) -> + {ok, gen_server:cast(?MODULE, new_view_channels)}; +handle_config_change("smoosh", "cleanup_channels", _, _, _) -> + {ok, gen_server:cast(?MODULE, new_cleanup_channels)}; handle_config_change(_, _, _, _, _) -> {ok, nil}. @@ -151,51 +167,58 @@ handle_config_terminate(_Server, _Reason, _State) -> restart_config_listener ). -handle_call(status, _From, State) -> - Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab), - {reply, {ok, Acc}, State}; +handle_continue(create_channels, #state{} = State) -> + % Warn users about smoosh persistence misconfiguration issues. Do it once + % on startup to avoid continuously spamming logs with errors. + smoosh_persist:check_setup(), + State1 = create_missing_channels(State), + {ok, Pid} = start_event_listener(), + {noreply, State1#state{event_listener = Pid}}. + handle_call({enqueue, Object}, _From, State) -> {noreply, NewState} = handle_cast({enqueue, Object}, State), {reply, ok, NewState}; handle_call(suspend, _From, State) -> - ets:foldl( - fun(#channel{name = Name, pid = P}, _) -> - Level = smoosh_utils:log_level("compaction_log_level", "debug"), - couch_log:Level("Suspending ~p", [Name]), - smoosh_channel:suspend(P) - end, - 0, - State#state.tab - ), + Fun = fun(#channel{name = Name, pid = P}, _) -> + Level = smoosh_utils:log_level("compaction_log_level", "debug"), + couch_log:Level("Suspending ~p", [Name]), + smoosh_channel:suspend(P) + end, + ets:foldl(Fun, ok, ?MODULE), {reply, ok, State}; handle_call(resume, _From, State) -> - ets:foldl( - fun(#channel{name = Name, pid = P}, _) -> - Level = smoosh_utils:log_level("compaction_log_level", "debug"), - couch_log:Level("Resuming ~p", [Name]), - smoosh_channel:resume(P) - end, - 0, - State#state.tab - ), + Fun = fun(#channel{name = Name, pid = P}, _) -> + Level = smoosh_utils:log_level("compaction_log_level", "debug"), + couch_log:Level("Resuming ~p", [Name]), + smoosh_channel:resume(P) + end, + ets:foldl(Fun, ok, ?MODULE), {reply, ok, State}; -handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) -> - {reply, {ok, channel_pid(Tab, ChannelName)}, State}. - -handle_cast({new_db_channels, Channels}, State) -> - [ - smoosh_channel:close(channel_pid(State#state.tab, C)) - || C <- State#state.db_channels -- Channels - ], - {noreply, create_missing_channels(State#state{db_channels = Channels})}; -handle_cast({new_view_channels, Channels}, State) -> - [ - smoosh_channel:close(channel_pid(State#state.tab, C)) - || C <- State#state.view_channels -- Channels - ], - {noreply, create_missing_channels(State#state{view_channels = Channels})}; +handle_call(flush, _From, State) -> + Fun = fun(#channel{pid = P}, _) -> smoosh_channel:flush(P) end, + ets:foldl(Fun, ok, ?MODULE), + {reply, ok, State}. + +handle_cast(new_db_channels, #state{} = State) -> + Channels = smoosh_utils:db_channels(), + Closed = State#state.db_channels -- Channels, + [smoosh_channel:close(channel_pid(C)) || C <- Closed], + State1 = State#state{db_channels = Channels}, + {noreply, create_missing_channels(State1)}; +handle_cast(new_view_channels, #state{} = State) -> + Channels = smoosh_utils:view_channels(), + Closed = State#state.view_channels -- Channels, + [smoosh_channel:close(channel_pid(C)) || C <- Closed], + State1 = State#state{view_channels = Channels}, + {noreply, create_missing_channels(State1)}; +handle_cast(new_cleanup_channels, #state{} = State) -> + Channels = smoosh_utils:cleanup_channels(), + Closed = State#state.cleanup_channels -- Channels, + [smoosh_channel:close(channel_pid(C)) || C <- Closed], + State1 = State#state{cleanup_channels = Channels}, + {noreply, create_missing_channels(State1)}; handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) -> - case maps:is_key(Object, Waiting) of + case is_map_key(Object, Waiting) of true -> {noreply, State}; false -> @@ -208,12 +231,17 @@ handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) -> couch_log:Level("update notifier died ~p", [Reason]), {ok, Pid1} = start_event_listener(), {noreply, State#state{event_listener = Pid1}}; +handle_info({'EXIT', Pid, Reason}, #state{access_cleaner = Pid} = State) -> + Level = smoosh_utils:log_level("compaction_log_level", "notice"), + couch_log:Level("access cleaner died ~p", [Reason]), + Pid1 = spawn_link(?MODULE, access_cleaner, []), + {noreply, State#state{access_cleaner = Pid1}}; handle_info({'EXIT', Pid, Reason}, State) -> Level = smoosh_utils:log_level("compaction_log_level", "notice"), couch_log:Level("~p ~p died ~p", [?MODULE, Pid, Reason]), - case ets:match_object(State#state.tab, #channel{pid = Pid, _ = '_'}) of + case ets:match_object(?MODULE, #channel{pid = Pid, _ = '_'}) of [#channel{name = Name}] -> - ets:delete(State#state.tab, Name); + ets:delete(?MODULE, Name); _ -> ok end, @@ -226,17 +254,22 @@ handle_info(restart_config_listener, State) -> handle_info(_Msg, State) -> {noreply, State}. -terminate(_Reason, State) -> - ets:foldl( - fun(#channel{pid = P}, _) -> smoosh_channel:close(P) end, - 0, - State#state.tab - ), - ok. +terminate(_Reason, #state{access_cleaner = CPid}) -> + catch unlink(CPid), + exit(CPid, kill), + Fun = fun(#channel{pid = P}, _) -> + smoosh_channel:close(P) + end, + ets:foldl(Fun, ok, ?MODULE). code_change(_OldVsn, State, _Extra) -> {ok, State}. +update_access(Object) -> + Now = erlang:monotonic_time(second), + true = ets:insert(?ACCESS, {Object, Now}), + ok. + % private functions. add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) -> @@ -251,18 +284,11 @@ remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) -> {Ref, Waiting1} = maps:take(Object, Waiting), State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}. -get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) -> - try gen_server:call(P, status) of - {ok, Status} -> - [{Name, Status} | Acc0]; - _ -> - Acc0 - catch - _:_ -> - Acc0 - end; -get_channel_status(_, Acc0) -> - Acc0. +get_channel_status(#channel{name = Name, stab = Tab}, Acc) -> + Status = smoosh_channel:get_status(Tab), + [{Name, Status} | Acc]; +get_channel_status(_, Acc) -> + Acc. start_event_listener() -> couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]). @@ -273,79 +299,92 @@ enqueue_request(State, Object) -> false -> ok; {ok, Pid, Priority} -> - smoosh_channel:enqueue(Pid, Object, Priority) + case ets:info(?ACCESS, size) of + Size when Size =< ?ACCESS_MAX_SIZE -> + ok = update_access(Object); + _ -> + ok + end, + QuantizedPriority = quantize(Priority), + smoosh_channel:enqueue(Pid, Object, QuantizedPriority) end catch - Class:Exception:Stack -> - couch_log:warning( - "~s: ~p ~p for ~s : ~p", - [ - ?MODULE, - Class, - Exception, - smoosh_utils:stringify(Object), - Stack - ] - ) + Tag:Exception:Stack -> + Args = [?MODULE, Tag, Exception, smoosh_utils:stringify(Object), Stack], + couch_log:warning("~s: ~p ~p for ~s : ~p", Args), + ok end. -find_channel(#state{} = State, {Shard, GroupId}) -> - find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId}); +find_channel(#state{} = State, {?INDEX_CLEANUP, DbName}) -> + find_channel(State, State#state.cleanup_channels, {?INDEX_CLEANUP, DbName}); +find_channel(#state{} = State, {Shard, GroupId}) when is_binary(Shard) -> + find_channel(State, State#state.view_channels, {Shard, GroupId}); find_channel(#state{} = State, DbName) -> - find_channel(State#state.tab, State#state.db_channels, DbName). + find_channel(State, State#state.db_channels, DbName). -find_channel(_Tab, [], _Object) -> +find_channel(#state{} = _State, [], _Object) -> false; -find_channel(Tab, [Channel | Rest], Object) -> - Pid = channel_pid(Tab, Channel), - LastUpdated = smoosh_channel:last_updated(Pid, Object), - StalenessInSec = - config:get_integer("smoosh", "staleness", 5) * - ?SECONDS_PER_MINUTE, - Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native), - Now = erlang:monotonic_time(), - Activated = smoosh_channel:is_activated(Pid), - StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness, - case Activated andalso StaleEnough of +find_channel(#state{} = State, [Channel | Rest], Object) -> + case stale_enough(Object) of true -> case smoosh_utils:ignore_db(Object) of true -> - find_channel(Tab, Rest, Object); + find_channel(State, Rest, Object); _ -> case get_priority(Channel, Object) of 0 -> - find_channel(Tab, Rest, Object); + find_channel(State, Rest, Object); Priority -> - {ok, Pid, Priority} + {ok, channel_pid(Channel), Priority} end end; false -> - find_channel(Tab, Rest, Object) + find_channel(State, Rest, Object) end. -channel_pid(Tab, Channel) -> - [#channel{pid = Pid}] = ets:lookup(Tab, Channel), +stale_enough(Object) -> + LastUpdatedSec = last_updated(Object), + Staleness = erlang:monotonic_time(second) - LastUpdatedSec, + Staleness >= min_staleness_sec(). + +channel_pid(Channel) -> + [#channel{pid = Pid}] = ets:lookup(?MODULE, Channel), Pid. -create_missing_channels(State) -> - create_missing_channels(State#state.tab, State#state.db_channels), - create_missing_channels(State#state.tab, State#state.view_channels), +create_missing_channels(#state{} = State) -> + create_missing_channels_type(State#state.db_channels), + create_missing_channels_type(State#state.view_channels), + create_missing_channels_type(State#state.cleanup_channels), State. -create_missing_channels(_Tab, []) -> +create_missing_channels_type([]) -> ok; -create_missing_channels(Tab, [Channel | Rest]) -> - case ets:lookup(Tab, Channel) of +create_missing_channels_type([Channel | Rest]) -> + case ets:lookup(?MODULE, Channel) of [] -> {ok, Pid} = smoosh_channel:start_link(Channel), - true = ets:insert(Tab, [#channel{name = Channel, pid = Pid}]); + {ok, STab} = smoosh_channel:get_status_table(Pid), + Chan = #channel{ + name = Channel, + pid = Pid, + stab = STab + }, + true = ets:insert(?MODULE, Chan); _ -> ok end, - create_missing_channels(Tab, Rest). + create_missing_channels_type(Rest). +get_priority(_Channel, {?INDEX_CLEANUP, DbName}) -> + try mem3:local_shards(mem3:dbname(DbName)) of + [_ | _] -> 1; + [] -> 0 + catch + error:database_does_not_exist -> + 0 + end; get_priority(Channel, {Shard, GroupId}) -> - case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of + try couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of {ok, Pid} -> try {ok, ViewInfo} = couch_index:get_info(Pid), @@ -368,9 +407,10 @@ get_priority(Channel, {Shard, GroupId}) -> [Channel, Shard, GroupId, Reason] ), 0 + catch + throw:{not_found, _} -> + 0 end; -get_priority(Channel, DbName) when is_list(DbName) -> - get_priority(Channel, ?l2b(DbName)); get_priority(Channel, DbName) when is_binary(DbName) -> case couch_db:open_int(DbName, []) of {ok, Db} -> @@ -379,11 +419,8 @@ get_priority(Channel, DbName) when is_binary(DbName) -> after couch_db:close(Db) end; - Error = {not_found, no_db_file} -> - couch_log:warning( - "~p: Error getting priority for ~p: ~p", - [Channel, DbName, Error] - ), + {not_found, no_db_file} -> + % It's expected that a db might be deleted while waiting in queue 0 end; get_priority(Channel, Db) -> @@ -494,6 +531,38 @@ view_needs_upgrade(Props) -> Enabled andalso length(Versions) >= 2 end. +access_cleaner() -> + JitterMSec = rand:uniform(?ACCEESS_CLEAN_INTERVAL_MSEC), + timer:sleep(?ACCEESS_CLEAN_INTERVAL_MSEC + JitterMSec), + NowSec = erlang:monotonic_time(second), + Limit = NowSec - min_staleness_sec(), + Head = {'_', '$1'}, + Guard = {'<', '$1', Limit}, + ets:select_delete(?ACCESS, [{Head, [Guard], [true]}]), + access_cleaner(). + +min_staleness_sec() -> + Min = config:get_integer("smoosh", "staleness", ?STALENESS_MIN), + Min * 60. + +last_updated(Object) -> + try ets:lookup(?ACCESS, Object) of + [{_, AccessSec}] -> + AccessSec; + [] -> + ?ACCESS_NEVER + catch + error:badarg -> + 0 + end. + +quantize(Ratio) when is_integer(Ratio) -> + Ratio; +quantize(Ratio) when is_float(Ratio), Ratio >= 16 -> + round(Ratio); +quantize(Ratio) when is_float(Ratio) -> + round(Ratio * 16) / 16. + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). @@ -690,4 +759,16 @@ add_remove_enqueue_ref_test() -> % It's basically back to an initial (empty) state ?assertEqual(St1, #state{}). +quantize_test() -> + ?assertEqual(0, quantize(0)), + ?assertEqual(1, quantize(1)), + ?assertEqual(0.0, quantize(0.0)), + ?assertEqual(16, quantize(16.0)), + ?assertEqual(15.0, quantize(15.0)), + ?assertEqual(0.0, quantize(0.01)), + ?assertEqual(0.125, quantize(0.1)), + ?assertEqual(0.125, quantize(0.1042)), + ?assertEqual(0.125, quantize(0.125111111111)), + ?assertEqual(10.0, quantize(10.0002)). + -endif. diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl index 73876888b..0ec07025a 100644 --- a/src/smoosh/test/smoosh_tests.erl +++ b/src/smoosh/test/smoosh_tests.erl @@ -3,152 +3,486 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). -%% ========== -%% Setup -%% ---------- - -setup(ChannelType) -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - couch_db:close(Db), - {ok, ChannelPid} = smoosh_server:get_channel(ChannelType), - smoosh_channel:flush(ChannelPid), - ok = config:set("smoosh", "persist", "true", false), - ok = config:set(config_section(ChannelType), "min_size", "1", false), - ok = config:set(config_section(ChannelType), "min_priority", "1", false), - DbName. - -teardown(ChannelType, DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok = config:delete("smoosh", "persist", false), - ok = config:delete(config_section(DbName), "min_size", false), - ok = config:delete(config_section(DbName), "min_priority", false), - meck:unload(), - {ok, ChannelPid} = smoosh_server:get_channel(ChannelType), - smoosh_channel:flush(ChannelPid), - ok. - -config_section(ChannelType) -> - "smoosh." ++ ChannelType. - -%% ========== -%% Tests -%% ---------- - smoosh_test_() -> { - "Testing smoosh", + setup, + fun setup_all/0, + fun teardown_all/1, { - setup, - fun() -> test_util:start_couch([smoosh]) end, - fun test_util:stop/1, + foreach, + fun setup/0, + fun teardown/1, [ - channels_tests(), - persistence_tests() + ?TDEF_FE(t_default_channels), + ?TDEF_FE(t_channels_recreated_on_crash), + ?TDEF_FE(t_can_create_and_delete_channels), + ?TDEF_FE(t_db_is_enqueued_and_compacted), + ?TDEF_FE(t_view_is_enqueued_and_compacted), + ?TDEF_FE(t_index_cleanup_happens_by_default), + ?TDEF_FE(t_index_cleanup_can_be_disabled), + ?TDEF_FE(t_suspend_resume), + ?TDEF_FE(t_check_window_can_resume), + ?TDEF_FE(t_renqueue_on_crashes), + ?TDEF_FE(t_update_status_works), + ?TDEF_FE(t_checkpointing_works, 15), + ?TDEF_FE(t_ignore_checkpoint_resume_if_compacted_already), + ?TDEF_FE(t_access_cleaner_restarts), + ?TDEF_FE(t_event_handler_restarts), + ?TDEF_FE(t_manual_enqueue_api_works), + ?TDEF_FE(t_access_cleaner_works) ] } }. -persistence_tests() -> - Tests = [ - fun should_persist_queue/2, - fun should_call_recover/2, - fun should_not_call_recover/2 - ], - { - "Various persistence tests", - [ - make_test_case("ratio_dbs", Tests) - ] - }. +setup_all() -> + meck:new(smoosh_server, [passthrough]), + meck:new(smoosh_channel, [passthrough]), + meck:new(fabric, [passthrough]), + meck:new(couch_emsort, [passthrough]), + Ctx = test_util:start_couch([fabric]), + config:set("query_server_config", "commit_freq", "0", false), + Ctx. -channels_tests() -> - Tests = [ - fun should_enqueue/2 - ], - { - "Various channels tests", +teardown_all(Ctx) -> + catch application:stop(smoosh), + config:delete("query_server_config", "commit_freq", false), + test_util:stop(Ctx), + meck:unload(). + +setup() -> + config:set("smoosh", "persist", "false", false), + config:set("smoosh", "wait_secs", "0", false), + DbName = ?tempdb(), + fabric:create_db(DbName, [{q, 1}]), + {ok, _} = create_ddoc(DbName, <<"_design/foo">>, <<"bar">>), + {ok, _} = create_doc(DbName, <<"doc1">>, 1500000), + {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>), + application:start(smoosh), + wait_for_channels(), + flush(), + DbName. + +teardown(DbName) -> + catch flush(), + catch application:stop(smoosh), + fabric:delete_db(DbName), + meck:reset(smoosh_server), + meck:reset(smoosh_channel), + meck:reset(couch_emsort), + meck:reset(fabric), + config:delete("smoosh", "db_channels", false), + config:delete("smoosh.ratio_dbs", "min_priority", false), + config:delete("smoosh.ratio_views", "min_priority", false), + config:delete("smoosh", "view_channels", false), + config:delete("smoosh", "cleanup_channels", false), + config:delete("smoosh", "wait_secs", false), + config:delete("smoosh", "persist", false), + config:delete("smoosh", "cleanup_index_files", false). + +t_default_channels(_) -> + ?assertMatch( [ - make_test_case("ratio_dbs", Tests) - ] - }. + {"index_cleanup", _}, + {"ratio_dbs", _}, + {"ratio_views", _}, + {"slack_dbs", _}, + {"slack_views", _}, + {"upgrade_dbs", _}, + {"upgrade_views", _} + ], + status() + ), + % If app hasn't started status won't crash + application:stop(smoosh), + ?assertEqual([], status()). -make_test_case(Type, Funs) -> - {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}. +t_channels_recreated_on_crash(_) -> + RatioDbsPid = get_channel_pid("ratio_dbs"), + meck:reset(smoosh_channel), + exit(RatioDbsPid, kill), + meck:wait(1, smoosh_channel, start_link, 1, 3000), + wait_for_channels(7), + ?assertMatch([_, {"ratio_dbs", _} | _], status()), + ?assertNotEqual(RatioDbsPid, get_channel_pid("ratio_dbs")). -should_enqueue(ChannelType, DbName) -> - ?_test(begin - ok = grow_db_file(DbName, 300), - ok = wait_enqueue(ChannelType, DbName), - ?assert(is_enqueued(ChannelType, DbName)), - ok - end). +t_can_create_and_delete_channels(_) -> + config:set("smoosh", "db_channels", "mychan1", false), + config:set("smoosh", "view_channels", "mychan2", false), + config:set("smoosh", "cleanup_channels", "mychan3", false), + % 7 default ones + 3 new ones + wait_for_channels(10), + meck:reset(smoosh_channel), + config:delete("smoosh", "db_channels", false), + config:delete("smoosh", "view_channels", false), + config:delete("smoosh", "cleanup_channels", false), + wait_for_channels(7). -should_persist_queue(ChannelType, DbName) -> - ?_test(begin - {ok, ChannelPid} = smoosh_server:get_channel(ChannelType), - ok = grow_db_file(DbName, 300), - ok = wait_enqueue(ChannelType, DbName), - ok = smoosh_channel:persist(ChannelPid), - Q0 = channel_queue(ChannelType), - ok = application:stop(smoosh), - ok = application:start(smoosh), - Q1 = channel_queue(ChannelType), - % Assert that queues are not empty - ?assertNotEqual(Q0, smoosh_priority_queue:new(ChannelType)), - ?assertNotEqual(Q1, smoosh_priority_queue:new(ChannelType)), - ?assertEqual(Q0, Q1), - ok - end). +t_db_is_enqueued_and_compacted(DbName) -> + ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")), + meck:reset(smoosh_channel), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_to_enqueue(DbName), + ok = wait_compact_start(), + ok = wait_normal_down(). -should_call_recover(_ChannelType, _DbName) -> - ?_test(begin - ok = application:stop(smoosh), - ok = config:set("smoosh", "persist", "true", false), - meck:new(smoosh_priority_queue, [passthrough]), - ok = application:start(smoosh), - timer:sleep(1000), - ?assertNotEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')), - ok - end). +t_view_is_enqueued_and_compacted(DbName) -> + % We don't want index cleanup to interfere for now + config:set("smoosh", "cleanup_index_files", "false", false), + % Ensure db is compacted + meck:reset(smoosh_channel), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_normal_down(), + % Check view + meck:reset(smoosh_channel), + {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>), + ok = wait_to_enqueue({DbName, <<"_design/foo">>}), + ok = wait_compact_start(), + ok = wait_normal_down(). -should_not_call_recover(_ChannelType, _DbName) -> - ?_test(begin - ok = application:stop(smoosh), - ok = config:set("smoosh", "persist", "false", false), - meck:new(smoosh_priority_queue, [passthrough]), - ok = application:start(smoosh), - timer:sleep(1000), - ?assertEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')), - ok - end). +t_index_cleanup_happens_by_default(DbName) -> + ?assert(config:get_boolean("smoosh", "cleanup_index_files", true)), + % Db compacts + meck:reset(smoosh_channel), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_normal_down(), + % View should compact as well + meck:reset(fabric), + meck:reset(smoosh_channel), + {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>), + % View cleanup should have been invoked + meck:wait(fabric, cleanup_index_files, [DbName], 4000). + +t_index_cleanup_can_be_disabled(DbName) -> + config:set("smoosh", "cleanup_index_files", "false", false), + % Db compacts + meck:reset(smoosh_channel), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_normal_down(), + % View should compact as well + meck:reset(fabric), + meck:reset(smoosh_channel), + {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>), + ok = wait_compact_start(), + ok = wait_normal_down(), + % View cleanup was not called + timer:sleep(1000), + ?assertEqual(0, meck:num_calls(fabric, cleanup_index_files, 1)). + +t_suspend_resume(DbName) -> + ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")), + meck:reset(smoosh_channel), + setup_db_compactor_intercept(), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_to_enqueue(DbName), + CompPid = wait_db_compactor_pid(), + ok = smoosh:suspend(), + ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)), + ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")), + % Suspending twice should work too + ok = smoosh:suspend(), + ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)), + ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")), + ok = smoosh:resume(), + ?assertNotEqual({status, suspended}, erlang:process_info(CompPid, status)), + % Resuming twice should work too + ok = smoosh:resume(), + ?assertNotEqual({status, suspended}, erlang:process_info(CompPid, status)), + CompPid ! continue, + ok = wait_normal_down(). + +t_check_window_can_resume(DbName) -> + ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")), + meck:reset(smoosh_channel), + setup_db_compactor_intercept(), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_to_enqueue(DbName), + CompPid = wait_db_compactor_pid(), + ok = smoosh:suspend(), + ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)), + get_channel_pid("ratio_dbs") ! check_window, + CompPid ! continue, + ok = wait_normal_down(). + +t_renqueue_on_crashes(DbName) -> + ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")), + meck:reset(smoosh_channel), + setup_db_compactor_intercept(), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_to_enqueue(DbName), + CompPid = wait_db_compactor_pid(), + meck:reset(smoosh_channel), + CompPid ! {raise, error, boom}, + ok = wait_to_enqueue(DbName), + CompPid2 = wait_db_compactor_pid(), + CompPid2 ! continue, + ok = wait_normal_down(). + +t_update_status_works(DbName) -> + setup_db_compactor_intercept(), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_to_enqueue(DbName), + CompPid = wait_db_compactor_pid(), + % status should have 1 starting job, but it may have not been updated yet so + % we wait until update_status is called + wait_update_status(), + WaitFun = fun() -> + case {1, 0, 0} =:= status("ratio_dbs") of + true -> ok; + _ -> wait + end + end, + test_util:wait(WaitFun), + CompPid ! continue, + ok = wait_normal_down(). + +t_checkpointing_works(DbName) -> + setup_db_compactor_intercept(), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_to_enqueue(DbName), + CompPid = wait_db_compactor_pid(), + ChanPid = get_channel_pid("ratio_dbs"), + config:set("smoosh", "persist", "true", false), + meck:reset(smoosh_channel), + ChanPid ! checkpoint, + % Wait for checkpoint process to exit + ok = wait_normal_down(), + % Stop smoosh and then crash the compaction + ok = application:stop(smoosh), + CompPid ! {raise, error, kapow}, + % Smoosh should resume job and continue compacting + setup_db_compactor_intercept(), + meck:reset(smoosh_channel), + ok = application:start(smoosh), + CompPid2 = wait_db_compactor_pid(), + ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")), + CompPid2 ! continue, + ok = wait_normal_down(). + +t_ignore_checkpoint_resume_if_compacted_already(DbName) -> + setup_db_compactor_intercept(), + {ok, _} = delete_doc(DbName, <<"doc1">>), + ok = wait_to_enqueue(DbName), + CompPid = wait_db_compactor_pid(), + ChanPid = get_channel_pid("ratio_dbs"), + config:set("smoosh", "persist", "true", false), + meck:reset(smoosh_channel), + ChanPid ! checkpoint, + % Wait for checkpoint process to exit + ok = wait_normal_down(), + % Stop smoosh and then let the compaction finish + ok = application:stop(smoosh), + Ref = erlang:monitor(process, CompPid), + CompPid ! continue, + receive + {'DOWN', Ref, _, _, normal} -> ok + end, + % Smoosh should resume job and *not* compact + setup_db_compactor_intercept(), + meck:reset(smoosh_channel), + ok = application:start(smoosh), + timer:sleep(500), + StartPat = {'_', {ok, '_'}}, + ?assertEqual(0, meck:num_calls(smoosh_channel, handle_info, [StartPat, '_'])). + +t_access_cleaner_restarts(_) -> + ACPid = get_access_cleaner_pid(), + exit(ACPid, kill), + WaitFun = fun() -> + case get_access_cleaner_pid() of + Pid when Pid =/= ACPid -> ok; + _ -> wait + end + end, + test_util:wait(WaitFun), + ?assertNotEqual(ACPid, get_access_cleaner_pid()). + +t_event_handler_restarts(_) -> + EHPid = get_event_handler_pid(), + exit(EHPid, kill), + WaitFun = fun() -> + case get_event_handler_pid() of + Pid when Pid =/= EHPid -> ok; + _ -> wait + end + end, + test_util:wait(WaitFun), + ?assertNotEqual(EHPid, get_access_cleaner_pid()). -grow_db_file(DbName, SizeInKb) -> - {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), - Data = b64url:encode(crypto:strong_rand_bytes(SizeInKb * 1024)), - Body = {[{<<"value">>, Data}]}, - Doc = #doc{id = <<"doc1">>, body = Body}, - {ok, _} = couch_db:update_doc(Db, Doc, []), - couch_db:close(Db), - ok. - -is_enqueued(ChannelType, DbName) -> - {ok, ChannelPid} = smoosh_server:get_channel(ChannelType), - smoosh_channel:is_key(ChannelPid, DbName). - -wait_enqueue(ChannelType, DbName) -> - test_util:wait( - fun() -> - case is_enqueued(ChannelType, DbName) of - false -> - wait; - true -> - ok - end +t_manual_enqueue_api_works(DbName) -> + Shard = shard_name(DbName), + + SmooshPid = whereis(smoosh_server), + RatioDbsPid = get_channel_pid("ratio_dbs"), + RatioViewsPid = get_channel_pid("ratio_views"), + CleanupPid = get_channel_pid("index_cleanup"), + + % Lower min priority so that enqueued shards would try to compact + config:set("smoosh.ratio_dbs", "min_priority", "1", false), + config:set("smoosh.ratio_views", "min_priority", "1", false), + + ?assertEqual(ok, smoosh_server:sync_enqueue(<<"invalid">>)), + ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, <<"invalid">>})), + ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/invalid">>})), + + ?assertEqual(ok, smoosh_server:sync_enqueue(Shard)), + ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, Shard})), + ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/foo">>})), + + ?assertEqual(ok, smoosh:enqueue(Shard)), + ?assertEqual(ok, smoosh:enqueue({index_cleanup, Shard})), + ?assertEqual(ok, smoosh:enqueue({Shard, <<"_design/foo">>})), + + smoosh:enqueue_all_dbs(), + smoosh:enqueue_all_views(), + + % Enqueuing the same items in a loop should work + lists:foreach( + fun(_) -> + ?assertEqual(ok, smoosh:enqueue(Shard)), + ?assertEqual(ok, smoosh:enqueue({index_cleanup, Shard})), + ?assertEqual(ok, smoosh:enqueue({Shard, <<"_design/foo">>})) end, - 15000 - ). + lists:seq(1, 1000) + ), + + ?assertEqual(ok, smoosh_server:sync_enqueue(Shard)), + ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, Shard})), + ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/foo">>})), + + % Assert that channels and smoosh server didn't crash + ?assertEqual(SmooshPid, whereis(smoosh_server)), + ?assertEqual(RatioDbsPid, get_channel_pid("ratio_dbs")), + ?assertEqual(RatioViewsPid, get_channel_pid("ratio_views")), + ?assertEqual(CleanupPid, get_channel_pid("index_cleanup")). + +t_access_cleaner_works(_) -> + Now = erlang:monotonic_time(second), + ets:insert(smoosh_access, {<<"db1">>, Now - 3600}), + WaitFun = fun() -> + case ets:tab2list(smoosh_access) == [] of + true -> ok; + _ -> wait + end + end, + test_util:wait(WaitFun), + ?assertEqual([], ets:tab2list(smoosh_access)). + +create_doc(DbName, DocId, Size) -> + Data = b64url:encode(crypto:strong_rand_bytes(Size)), + Doc = #doc{id = DocId, body = {[{<<"value">>, Data}]}}, + fabric:update_doc(DbName, Doc, [?ADMIN_CTX]). + +create_ddoc(DbName, DocId, ViewName) -> + MapFun = <<"function(doc) {emit(doc._id, doc.value);}">>, + DDoc = couch_doc:from_json_obj( + {[ + {<<"_id">>, DocId}, + {<<"language">>, <<"javascript">>}, + {<<"autoupdate">>, false}, + {<<"views">>, + {[ + {ViewName, + {[ + {<<"map">>, MapFun} + ]}} + ]}} + ]} + ), + fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]). + +delete_doc(DbName, DDocId) -> + {ok, DDoc0} = fabric:open_doc(DbName, DDocId, [?ADMIN_CTX]), + DDoc = DDoc0#doc{deleted = true, body = {[]}}, + fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]). + +status() -> + {ok, Props} = smoosh:status(), + lists:keysort(1, Props). + +status(Channel) -> + case lists:keyfind(Channel, 1, status()) of + {_, Val} -> + Val, + Active = proplists:get_value(active, Val), + Starting = proplists:get_value(starting, Val), + WaitingInfo = proplists:get_value(waiting, Val), + Waiting = proplists:get_value(size, WaitingInfo), + {Active, Starting, Waiting}; + false -> + false + end. + +sync_status(Channel) -> + Pid = get_channel_pid(Channel), + gen_server:call(Pid, get_status_table, infinity), + status(Channel). + +flush() -> + ok = smoosh_server:flush(). + +get_channel_pid(Chan) -> + [{channel, _, Pid, _}] = ets:lookup(smoosh_server, Chan), + Pid. + +get_access_cleaner_pid() -> + {state, _, _, _, _, _, _, ACPid} = sys:get_state(smoosh_server), + ACPid. + +get_event_handler_pid() -> + {state, _, _, _, EHPid, _, _, _} = sys:get_state(smoosh_server), + EHPid. + +wait_for_channels() -> + % 3 default ratios + 3 default views + 1 index cleanup = 7 + wait_for_channels(7). + +wait_for_channels(N) when is_integer(N), N >= 0 -> + WaitFun = fun() -> + case length(status()) of + N -> ok; + _ -> wait + end + end, + test_util:wait(WaitFun). + +wait_to_enqueue(DbName) when is_binary(DbName) -> + wait_enqueue(shard_name(DbName)); +wait_to_enqueue({DbName, View}) when is_binary(DbName) -> + wait_enqueue({shard_name(DbName), View}); +wait_to_enqueue({index_cleanup, DbName}) when is_binary(DbName) -> + wait_enqueue({index_cleanup, DbName}). + +wait_enqueue(Obj) -> + Enqueue = {enqueue, Obj, '_'}, + meck:wait(smoosh_channel, handle_cast, [Enqueue, '_'], 4000). + +shard_name(DbName) -> + [Shard] = mem3:shards(DbName), + mem3:name(Shard). + +wait_compact_start() -> + StartOk = {'_', {ok, '_'}}, + meck:wait(smoosh_channel, handle_info, [StartOk, '_'], 4000). + +wait_normal_down() -> + NormalDown = {'DOWN', '_', '_', '_', normal}, + meck:wait(smoosh_channel, handle_info, [NormalDown, '_'], 4000). + +wait_update_status() -> + meck:wait(smoosh_channel, handle_info, [update_status, '_'], 4000). + +setup_db_compactor_intercept() -> + TestPid = self(), + meck:expect(couch_emsort, open, fun(Fd) -> + TestPid ! {compactor_paused, self()}, + receive + continue -> meck:passthrough([Fd]); + {raise, Tag, Reason} -> meck:exception(Tag, Reason) + end + end). -channel_queue(ChannelType) -> - Q0 = smoosh_priority_queue:new(ChannelType), - smoosh_priority_queue:recover(Q0). +wait_db_compactor_pid() -> + receive + {compactor_paused, Pid} -> + Pid + end. |