-% public api.
--export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1]).
-% gen_server api.
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- code_change/3, terminate/2]).
-% records.
--record(state, {
- active=[],
- name,
- waiting=smoosh_priority_queue:new(),
- paused=true,
- starting=[]
-% public functions.
-start_link(Name) ->
- gen_server:start_link(?MODULE, Name, []).
-suspend(ServerRef) ->
- gen_server:call(ServerRef, suspend).
-resume(ServerRef) ->
- gen_server:call(ServerRef, resume).
-enqueue(ServerRef, Object, Priority) ->
- gen_server:cast(ServerRef, {enqueue, Object, Priority}).
-last_updated(ServerRef, Object) ->
- gen_server:call(ServerRef, {last_updated, Object}).
-get_status(ServerRef) ->
- gen_server:call(ServerRef, status).
-close(ServerRef) ->
- gen_server:call(ServerRef, close).
-flush(ServerRef) ->
- gen_server:call(ServerRef, flush).
-% gen_server functions.
-init(Name) ->
- schedule_unpause(),
- erlang:send_after(60 * 1000, self(), check_window),
- {ok, #state{name=Name}}.
-handle_call({last_updated, Object}, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
- {reply, LastUpdated, State};
-handle_call(suspend, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- #state{active = Active} = State,
- [catch erlang:suspend_process(Pid, [unless_suspending])
- || {_,Pid} <- Active],
- {reply, ok, State#state{paused=true}};
-handle_call(resume, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- #state{active = Active} = State,
- [catch erlang:resume_process(Pid) || {_,Pid} <- Active],
- {reply, ok, State#state{paused=false}};
-handle_call(status, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {reply, {ok, [
- {active, length(},
- {starting, length(State#state.starting)},
- {waiting, smoosh_priority_queue:info(State#state.waiting)}
- ]}, State};
-handle_call(close, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {stop, normal, ok, State};
-handle_call(flush, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {reply, ok, State#state{waiting=smoosh_priority_queue:new()}}.
-handle_cast({enqueue, _Object, 0}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {noreply, State};
-handle_cast({enqueue, Object, Priority}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}.
-% We accept noproc here due to possibly having monitored a restarted compaction
-% pid after it finished.
-handle_info({'DOWN', Ref, _, Job, Reason}, State0) when Reason == normal;
- Reason == noproc ->
- {ok, State} = code_change(nil, State0, nil),
- #state{active=Active, starting=Starting} = State,
- {noreply, maybe_start_compaction(
- State#state{active=lists:keydelete(Job, 2, Active),
- starting=lists:keydelete(Ref, 1, Starting)})};
-handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- #state{active=Active0, starting=Starting0} = State,
- case lists:keytake(Job, 2, Active0) of
- {value, {Key, _Pid}, Active1} ->
- State1 = maybe_remonitor_cpid(State#state{active=Active1}, Key,
- Reason),
- {noreply, maybe_start_compaction(State1)};
- false ->
- case lists:keytake(Ref, 1, Starting0) of
- {value, {_, Key}, Starting1} ->
- couch_log:warning("failed to start compaction of ~p: ~p", [
- smoosh_utils:stringify(Key), Reason]),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
- {noreply, maybe_start_compaction(State#state{starting=Starting1})};
- false ->
- {noreply, State}
- end
- end;
-handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
- {ok, State} = code_change(nil, State0, nil),
- case lists:keytake(Ref, 1, State#state.starting) of
- {value, {_, Key}, Starting1} ->
- couch_log:notice("~s: Started compaction for ~s",
- [, smoosh_utils:stringify(Key)]),
- erlang:monitor(process, Pid),
- erlang:demonitor(Ref, [flush]),
- {noreply, State#state{active=[{Key, Pid}|],
- starting=Starting1}};
- false ->
- {noreply, State}
- end;
-handle_info(check_window, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- #state{paused = Paused, name = Name} = State,
- StrictWindow = smoosh_utils:get(Name, "strict_window", "false"),
- FinalState = case {not Paused, smoosh_utils:in_allowed_window(Name)} of
- {false, false} ->
- % already in desired state
- State;
- {true, true} ->
- % already in desired state
- State;
- {false, true} ->
- % resume is always safe even if we did not previously suspend
- {reply, ok, NewState} = handle_call(resume, nil, State),
- NewState;
- {true, false} ->
- if StrictWindow =:= "true" ->
- {reply, ok, NewState} = handle_call(suspend, nil, State),
- NewState;
- true ->
- State#state{paused=true}
- end
- end,
- erlang:send_after(60 * 1000, self(), check_window),
- {noreply, FinalState};
-handle_info(pause, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {noreply, State#state{paused=true}};
-handle_info(unpause, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {noreply, maybe_start_compaction(State#state{paused=false})}.
-terminate(_Reason, _State) ->
- ok.
-code_change(_OldVsn, #state{}=State, _Extra) ->
- {ok, State}.
-% private functions.
-add_to_queue(Key, Priority, State) ->
- #state{active=Active,waiting=Q} = State,
- case lists:keymember(Key, 1, Active) of
- true ->
- State;
- false ->
- Capacity = list_to_integer(smoosh_utils:get(, "capacity", "9999")),
- couch_log:notice(
- "~s: adding ~p to internal compactor queue with priority ~p",
- [, Key, Priority]),
- State#state{
- waiting=smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q)
- }
- end.
-maybe_start_compaction(#state{paused=true}=State) ->
- State;
-maybe_start_compaction(State) ->
- Concurrency = list_to_integer(smoosh_utils:get(,
- "concurrency", "1")),
- if length( + length(State#state.starting) < Concurrency ->
- case smoosh_priority_queue:out(State#state.waiting) of
- false ->
- State;
- {Key, Priority, Q} ->
- try
- State2 = case start_compact(State, Key) of
- false ->
- State;
- State1 ->
- couch_log:notice(
- "~s: Starting compaction for ~s (priority ~p)",
- [, smoosh_utils:stringify(Key), Priority]),
- State1
- end,
- maybe_start_compaction(State2#state{waiting=Q})
- catch Class:Exception ->
- couch_log:notice("~s: ~p ~p for ~s",
- [, Class, Exception,
- smoosh_utils:stringify(Key)]),
- maybe_start_compaction(State#state{waiting=Q})
- end
- end;
- true ->
- State
- end.
-start_compact(State, {schema, DbName, GroupId}) ->
- case smoosh_utils:ignore_db({DbName, GroupId}) of
- false ->
- {ok, Pid} = couch_md_index_manager:get_group_pid(DbName,
- GroupId),
- Ref = erlang:monitor(process, Pid),
- Pid ! {'$gen_call', {self(), Ref}, compact},
- State#state{starting=[{Ref, {schema, DbName,
- GroupId}} | State#state.starting]};
- _ ->
- false
- end;
-start_compact(State, DbName) when is_list(DbName) ->
- start_compact(State, ?l2b(DbName));
-start_compact(State, DbName) when is_binary(DbName) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- try start_compact(State, Db) after couch_db:close(Db) end;
-start_compact(State, {Shard,GroupId}) ->
- case smoosh_utils:ignore_db({Shard, GroupId}) of
- false ->
- DbName = mem3:dbname(Shard),
- {ok, Pid} = couch_index_server:get_index(
- couch_mrview_index, Shard, GroupId),
- spawn(fun() -> cleanup_index_files(DbName, Shard) end),
- Ref = erlang:monitor(process, Pid),
- Pid ! {'$gen_call', {self(), Ref}, compact},
- State#state{starting=[{Ref, {Shard, GroupId}}|State#state.starting]};
- _ ->
- false
- end;
-start_compact(State, Db) ->
- case smoosh_utils:ignore_db(Db) of
- false ->
- DbPid = couch_db:get_pid(Db),
- Key = couch_db:name(Db),
- case couch_db:get_compactor_pid(Db) of
- nil ->
- Ref = erlang:monitor(process, DbPid),
- DbPid ! {'$gen_call', {self(), Ref}, start_compact},
- State#state{starting=[{Ref, Key}|State#state.starting]};
- % Compaction is already running, so monitor existing compaction pid.
- CPid ->
- couch_log:notice("Db ~s continuing compaction",
- [smoosh_utils:stringify(Key)]),
- erlang:monitor(process, CPid),
- State#state{active=[{Key, CPid}|]}
- end;
- _ ->
- false
- end.
-maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- case couch_db:get_compactor_pid_sync(Db) of
- nil ->
- couch_log:warning("exit for compaction of ~p: ~p",
- [smoosh_utils:stringify(DbName), Reason]),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]),
- State;
- CPid ->
- couch_log:notice("~s compaction already running. Re-monitor Pid ~p",
- [smoosh_utils:stringify(DbName), CPid]),
- erlang:monitor(process, CPid),
- State#state{active=[{DbName, CPid}|]}
- end;
-% not a database compaction, so ignore the pid check
-maybe_remonitor_cpid(State, Key, Reason) ->
- couch_log:warning("exit for compaction of ~p: ~p",
- [smoosh_utils:stringify(Key), Reason]),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
- State.
-schedule_unpause() ->
- WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
- erlang:send_after(WaitSecs * 1000, self(), unpause).
-cleanup_index_files(DbName, _Shard) ->
- case config:get("smoosh", "cleanup_index_files", "false") of
- "true" ->
- fabric:cleanup_index_files(DbName);
- _ ->
- ok
- end.