diff options
Diffstat (limited to 'src/couch_jobs/src/couch_jobs_server.erl')
-rw-r--r-- | src/couch_jobs/src/couch_jobs_server.erl | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl new file mode 100644 index 000000000..2e03c7dcf --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_server.erl @@ -0,0 +1,193 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_server). + +-behaviour(gen_server). + + +-export([ + start_link/0, + get_notifier_server/1, + force_check_types/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-define(TYPE_CHECK_PERIOD_DEFAULT, 15000). +-define(MAX_JITTER_DEFAULT, 5000). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []). + + +get_notifier_server(Type) -> + case get_type_pid_refs(Type) of + {{_, _}, {NotifierPid, _}} -> + {ok, NotifierPid}; + not_found -> + force_check_types(), + case get_type_pid_refs(Type) of + {{_, _}, {NotifierPid, _}} -> + {ok, NotifierPid}; + not_found -> + {error, not_found} + end + end. + + +force_check_types() -> + gen_server:call(?MODULE, check_types, infinity). + + +init(_) -> + % If couch_jobs_server is after the notifiers and activity supervisor. If + % it restart, there could be some stale notifier or activity monitors. Kill + % those as later on we'd start new ones anyway. + reset_monitors(), + reset_notifiers(), + ets:new(?MODULE, [protected, named_table]), + check_types(), + schedule_check(), + {ok, nil}. + + +terminate(_, _St) -> + ok. + + +handle_call(check_types, _From, St) -> + check_types(), + {reply, ok, St}; + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info(check_types, St) -> + check_types(), + schedule_check(), + {noreply, St}; + +handle_info({'DOWN', _Ref, process, Pid, Reason}, St) -> + LogMsg = "~p : process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + {stop, {unexpected_process_exit, Pid, Reason}, St}; + +handle_info({Ref, ready}, St) when is_reference(Ref) -> + % Don't crash out couch_jobs_server and the whole application would need to + % eventually do proper cleanup in erlfdb:wait timeout code. + LogMsg = "~p : spurious erlfdb future ready message ~p", + couch_log:error(LogMsg, [?MODULE, Ref]), + {noreply, St}; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +check_types() -> + FdbTypes = fdb_types(), + EtsTypes = ets_types(), + ToStart = FdbTypes -- EtsTypes, + ToStop = EtsTypes -- FdbTypes, + lists:foreach(fun(Type) -> start_monitors(Type) end, ToStart), + lists:foreach(fun(Type) -> stop_monitors(Type) end, ToStop). + + +start_monitors(Type) -> + MonPidRef = case couch_jobs_activity_monitor_sup:start_monitor(Type) of + {ok, Pid1} -> {Pid1, monitor(process, Pid1)}; + {error, Error1} -> error({failed_to_start_monitor, Type, Error1}) + end, + NotifierPidRef = case couch_jobs_notifier_sup:start_notifier(Type) of + {ok, Pid2} -> {Pid2, monitor(process, Pid2)}; + {error, Error2} -> error({failed_to_start_notifier, Type, Error2}) + end, + ets:insert_new(?MODULE, {Type, MonPidRef, NotifierPidRef}). + + +stop_monitors(Type) -> + {{MonPid, MonRef}, {NotifierPid, NotifierRef}} = get_type_pid_refs(Type), + ok = couch_jobs_activity_monitor_sup:stop_monitor(MonPid), + demonitor(MonRef, [flush]), + ok = couch_jobs_notifier_sup:stop_notifier(NotifierPid), + demonitor(NotifierRef, [flush]), + ets:delete(?MODULE, Type). + + +reset_monitors() -> + lists:foreach(fun(Pid) -> + couch_jobs_activity_monitor_sup:stop_monitor(Pid) + end, couch_jobs_activity_monitor_sup:get_child_pids()). + + +reset_notifiers() -> + lists:foreach(fun(Pid) -> + couch_jobs_notifier_sup:stop_notifier(Pid) + end, couch_jobs_notifier_sup:get_child_pids()). + + +get_type_pid_refs(Type) -> + case ets:lookup(?MODULE, Type) of + [{_, MonPidRef, NotifierPidRef}] -> {MonPidRef, NotifierPidRef}; + [] -> not_found + end. + + +ets_types() -> + lists:flatten(ets:match(?MODULE, {'$1', '_', '_'})). + + +fdb_types() -> + try + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:get_types(JTx) + end) + catch + error:{timeout, _} -> + couch_log:warning("~p : Timed out connecting to FDB", [?MODULE]), + [] + end. + + +schedule_check() -> + Timeout = get_period_msec(), + MaxJitter = max(Timeout div 2, get_max_jitter_msec()), + Wait = Timeout + rand:uniform(max(1, MaxJitter)), + erlang:send_after(Wait, self(), check_types). + + +get_period_msec() -> + config:get_integer("couch_jobs", "type_check_period_msec", + ?TYPE_CHECK_PERIOD_DEFAULT). + + +get_max_jitter_msec() -> + config:get_integer("couch_jobs", "type_check_max_jitter_msec", + ?MAX_JITTER_DEFAULT). |