diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-05-06 12:49:22 -0400 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2019-05-07 01:42:45 -0400 |
commit | 28cc7ca93c9ed259bf78ca4b2eb67d4734bac274 (patch) | |
tree | 470706e7a903e76d5f010727043fcc2476d892e6 | |
parent | 44bc5533ae1928c5e174065c0cf9b14113a5fd63 (diff) | |
download | couchdb-prototype/rfc-couch-workers.tar.gz |
Couch workers prototypeprototype/rfc-couch-workers
-rw-r--r-- | rebar.config.script | 1 | ||||
-rw-r--r-- | rel/reltool.config | 2 | ||||
-rw-r--r-- | src/couch_workers/.gitignore | 4 | ||||
-rw-r--r-- | src/couch_workers/README.md | 4 | ||||
-rw-r--r-- | src/couch_workers/src/couch_workers.app.src | 30 | ||||
-rw-r--r-- | src/couch_workers/src/couch_workers.erl | 54 | ||||
-rw-r--r-- | src/couch_workers/src/couch_workers_app.erl | 26 | ||||
-rw-r--r-- | src/couch_workers/src/couch_workers_fdb.erl | 155 | ||||
-rw-r--r-- | src/couch_workers/src/couch_workers_global.erl | 228 | ||||
-rw-r--r-- | src/couch_workers/src/couch_workers_local.erl | 155 | ||||
-rw-r--r-- | src/couch_workers/src/couch_workers_sup.erl | 49 |
11 files changed, 708 insertions, 0 deletions
diff --git a/rebar.config.script b/rebar.config.script index 3b58bcb1e..0069597c2 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -76,6 +76,7 @@ SubDirs = [ "src/couch_tests", "src/ddoc_cache", "src/fabric", + "src/couch_workers", "src/global_changes", "src/mango", "src/rexi", diff --git a/rel/reltool.config b/rel/reltool.config index 1051d2e77..ffb795a00 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -34,6 +34,7 @@ couch, couch_epi, couch_index, + couch_workers, couch_log, couch_mrview, couch_plugins, @@ -90,6 +91,7 @@ {app, config, [{incl_cond, include}]}, {app, couch, [{incl_cond, include}]}, {app, couch_epi, [{incl_cond, include}]}, + {app, couch_workers, [{incl_cond, include}]}, {app, couch_index, [{incl_cond, include}]}, {app, couch_log, [{incl_cond, include}]}, {app, couch_mrview, [{incl_cond, include}]}, diff --git a/src/couch_workers/.gitignore b/src/couch_workers/.gitignore new file mode 100644 index 000000000..62c8b0712 --- /dev/null +++ b/src/couch_workers/.gitignore @@ -0,0 +1,4 @@ +*.beam +.eunit +ebin/couch_workers.app +.DS_Store
\ No newline at end of file diff --git a/src/couch_workers/README.md b/src/couch_workers/README.md new file mode 100644 index 000000000..8cfcd3446 --- /dev/null +++ b/src/couch_workers/README.md @@ -0,0 +1,4 @@ +Couch Workers Application +========================= + +Couch workers perform background jobs for CouchDB running on top of FDB diff --git a/src/couch_workers/src/couch_workers.app.src b/src/couch_workers/src/couch_workers.app.src new file mode 100644 index 000000000..7db57d64a --- /dev/null +++ b/src/couch_workers/src/couch_workers.app.src @@ -0,0 +1,30 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, couch_workers, [ + {description, "CouchDB Workers"}, + {vsn, git}, + {mod, {couch_workers_app, []}}, + {registered, [ + couch_workers_sup, + couch_workers_global, + couch_workers_local + ]}, + {applications, [ + kernel, + stdlib, + erlfdb, + couch_log, + config, + fabric + ]} +]}. diff --git a/src/couch_workers/src/couch_workers.erl b/src/couch_workers/src/couch_workers.erl new file mode 100644 index 000000000..36080a095 --- /dev/null +++ b/src/couch_workers/src/couch_workers.erl @@ -0,0 +1,54 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_workers). + +-export([ + worker_register/4, + worker_unregister/1, + + membership_subscribe/3, + membership_unsubscribe/1, + + get_workers/1 +]). + + +-callback couch_workers_membership_update( + WorkerType :: term(), + Workers :: #{}, + VStamp :: binary(), + SubscriberRef :: reference() +) -> ok. + + +% External API + +worker_register(WorkerType, Id, Opts, Pid) when is_binary(Id), is_map(Opts), + is_pid(Pid) -> + couch_workers_local:worker_register(WorkerType, Id, Pid). + + +worker_unregister(Ref) when is_reference(Ref) -> + couch_workers_local:worker_unregister(Ref). + + +membership_subscribe(WorkerType, Module, Pid) -> + couch_workers_global:subscribe(WorkerType, Module, Pid). + + +membership_unsubscribe(Ref) when is_reference(Ref) -> + couch_workers_global:unsubscribe(Ref). + + +get_workers(WorkerType) -> + couch_workers_global:get_workers(WorkerType). diff --git a/src/couch_workers/src/couch_workers_app.erl b/src/couch_workers/src/couch_workers_app.erl new file mode 100644 index 000000000..734dc1bfa --- /dev/null +++ b/src/couch_workers/src/couch_workers_app.erl @@ -0,0 +1,26 @@ +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_workers_app). + + +-behaviour(application). + + +-export([ + start/2, + stop/1 +]). + + +start(_Type, []) -> + couch_workers_sup:start_link(). + + +stop([]) -> + ok. diff --git a/src/couch_workers/src/couch_workers_fdb.erl b/src/couch_workers/src/couch_workers_fdb.erl new file mode 100644 index 000000000..8ebe55ef8 --- /dev/null +++ b/src/couch_workers/src/couch_workers_fdb.erl @@ -0,0 +1,155 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_workers_fdb). + +-export([ + set_worker/4, + clear_worker/3, + get_worker/3, + + set_worker_health/5, + get_worker_health/3, + get_workers_health/2, + + set_workers_vs/2, + get_workers_vs/2, + + get_workers/2 +]). + + +%% Switch these to numbers eventually +-define(COUCH_WORKERS, <<"couch_workers">>). +-define(JOBS, <<"jobs">>). +-define(PENDING, <<"pending">>). +-define(ACTIVE, <<"active">>). +-define(WORKERS, <<"workers">>). +-define(WORKERS_VS, <<"workers_vs">>). +-define(HEALTH, <<"health">>). + +-define(uint2bin(I), binary:encode_unsigned(I, little)). +-define(bin2uint(I), binary:decode_unsigned(I, little)). +-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}). + +-define(PREFIX_CACHE, '$couch_worker_prefix'). + +%% (?COUCH_WORKERS, ?JOBS, JobType, JobId) = (JobState, WorkerId, Priority, CancelReq, JobInfo, JobOps) +%% (?COUCH_WORKERS, ?PENDING, JobType, Priority, JobId) = "" + +%% (?COUCH_WORKERS, ?ACTIVE, WorkerType, Worker, JobId) = JobState + +%% (?COUCH_WORKERS, WorkerType, ?WORKERS_VS) = VS +%% (?COUCH_WORKERS, WorkerType, ?WORKERS, Worker) = WOpts +%% (?COUCH_WORKERS, WorkerType, ?HEALTH, Worker) = (VS, TStamp, WorkerTimeout) + + +get_worker(Tx, WorkerType, Worker) -> + Key = erlfdb_tuple:pack({?WORKERS, Worker}, workers_prefix(Tx, WorkerType)), + case erlfdb:wait(erlfdb:get(Tx, Key)) of + <<_/binary>> = Val -> + % Use json here + binary_to_term(Val, [safe]); + not_found -> + not_found + end. + + +set_worker(Tx, WorkerType, Worker, WOpts) -> + Key = erlfdb_tuple:pack({?WORKERS, Worker}, workers_prefix(Tx, WorkerType)), + case get_worker(Tx, WorkerType, Worker) of + not_found -> + set_workers_vs(Tx, WorkerType); + #{} -> + ok + end, + erlfdb:wait(erlfdb:set(Tx, Key, jiffy:encode(WOpts))). + + +clear_worker(Tx, WorkerType, Worker) -> + Prefix = workers_prefix(Tx, WorkerType), + case get_worker(Tx, WorkerType, Worker) of + not_found -> + ok; + #{} -> + WPrefix = erlfdb_tuple:pack({?WORKERS, Worker}, Prefix), + erlfdb:clear_range_startswith(Tx, WPrefix), + HPrefix = erlfdb_tuple:pack({?HEALTH, Worker}, Prefix), + erlfdb:clear_range_startswith(Tx, HPrefix), + set_workers_vs(Tx, WorkerType), + ok + end. + + +get_worker_health(Tx, WorkerType, Worker) -> + Key = erlfdb_tuple:pack({?HEALTH, Worker}, workers_prefix(Tx, WorkerType)), + Val = erlfdb:wait(erlfdb:get(Tx, Key)), + {VS, TStamp, WorkerTimeout} = erlfdb_tuple:unpack(Val), + {VS, TStamp, WorkerTimeout}. + + +get_workers_health(Tx, WorkerType) -> + Prefix = workers_prefix(Tx, WorkerType), + {Start, End} = erlfdb_tuple:range({?HEALTH}, Prefix), + RawKVs = erlfdb:wait(erlfdb:get_range(Tx, Start, End)), + KVs = lists:map(fun({K, V}) -> + {?HEALTH, Worker} = erlfdb_tuple:unpack(K, Prefix), + WOpts = jiffy:decode(V, [return_maps]), + {Worker, WOpts} + end, RawKVs), + maps:from_list(KVs). + + +set_worker_health(Tx, WorkerType, Worker, TStamp, WorkerTimeout) when + is_integer(TStamp), is_integer(WorkerTimeout) -> + Key = erlfdb_tuple:pack({?HEALTH, Worker}, workers_prefix(Tx, WorkerType)), + Val = erlfdb_tuple:pack({?UNSET_VS, TStamp, WorkerTimeout}), + erlfdb:wait(erlfdb:set(Tx, Key, Val)). + + +get_workers_vs(Tx, WorkerType) -> + % return a watch here eventually + Key = erlfdb_tuple:pack({?WORKERS_VS}, workers_prefix(Tx, WorkerType)), + erlfdb:wait(erlfdb:get(Tx, Key)). + + +set_workers_vs(Tx, WorkerType) -> + % return a watch here eventually + Key = erlfdb_tuple:pack({?WORKERS_VS}, workers_prefix(Tx, WorkerType)), + Val = erlfdb_tuple:pack({?UNSET_VS}), + erlfdb:wait(erlfdb:set(Tx, Key, Val)). + + +get_workers(Tx, WorkerType) -> + Prefix = workers_prefix(Tx, WorkerType), + {Start, End} = erlfdb_tuple:range({?WORKERS}, Prefix), + RawKVs = erlfdb:wait(erlfdb:get_range(Tx, Start, End)), + KVs = lists:map(fun({K, V}) -> + {?WORKERS, Worker} = erlfdb_tuple:unpack(K, Prefix), + WOpts = jiffy:decode(V, [return_maps]), + {Worker, WOpts} + end, RawKVs), + maps:from_list(KVs). + + +workers_prefix(Tx, WorkerType) -> + case get({?PREFIX_CACHE, WorkerType}) of + undefined -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + Prefix = erlfdb_directory:get_name(CouchDB), + Res = erlfdb_tuple:pack({?COUCH_WORKERS, WorkerType}, Prefix), + put({?PREFIX_CACHE, WorkerType}, Res), + Res; + Res -> + Res + end. diff --git a/src/couch_workers/src/couch_workers_global.erl b/src/couch_workers/src/couch_workers_global.erl new file mode 100644 index 000000000..eba1241f1 --- /dev/null +++ b/src/couch_workers/src/couch_workers_global.erl @@ -0,0 +1,228 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_workers_global). + +-behaviour(gen_server). + + +-export([ + start_link/0, + subscribe/3, + unsubscribe/1 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-define(ROLE_MONITOR_POLL_INTERVAL_MSEC, 5000). + +-define(CACHE, couch_workers_global_cache). +-define(SUBSCRIBERS, couch_workers_global_subscribers). +-define(MONITOR_PIDS, couch_workers_global_monitor_pids). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []). + + +subscribe(WorkerType, Module, Pid) -> + gen_server:call(?MODULE, {subscribe, WorkerType, Module, Pid}, infinity). + + +unsubscribe(Ref) -> + gen_server:call(?MODULE, {unsubscribe, Ref}, infinity). + + +get_workers(WorkerType) -> + case ets:lookup(?CACHE, WorkerType) of + [{WorkerType, VS, Workers}] -> + {ok, VS, Workers}; + [] -> + {error, not_found} + end. + + +%% gen_server callbacks + +init(_) -> + EtsOpts = [protected, named_table], + % {WorkerType, VS, Workers} + ets:new(?CACHE, EtsOpts ++ [{read_concurrency, true}]), + % {{WorkerType, Ref}, Module, VS} + ets:new(?SUBSCRIBERS, EtsOpts ++ [ordered_set]), + % {WorkerType, Pid} + ets:new(?MONITOR_PIDS, EtsOpts), + {ok, nil}. + + +terminate(_, _St) -> + ok. + + +handle_call({subscribe, WorkerType, Mod, Pid}, From, St) -> + Ref = erlang:monitor(process, Pid), + subscribe_int(WorkerType, Ref, Mod), + gen_server:reply(From, Ref), + case get_workers(WorkerType) of + {ok, VS, Workers} -> + do_callback(Mod, WorkerType, Workers, VS, Ref); + {error, not_found} -> + ok + end, + {noreply, St}; + +handle_call({unsubscribe, Ref}, _From, St) -> + unsubscribe_int(Ref), + {reply, ok, St}; + +handle_call({membership_update, WorkerType, VS, Workers}, _From, St) -> + membership_update(WorkerType, VS, Workers), + {reply, ok, St}; + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) -> + unsubscribe_int(Ref), + {reply, ok, St}; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +%% Utility functions + +membership_update(WorkerType, VS, Workers) -> + true = ets:insert(?CACHE, {WorkerType, VS, Workers}), + lists:foreach(fun + ({_Ref, _Mod, VS}) -> ok; + ({Ref, Mod, _OldVs}) -> do_callback(Mod, WorkerType, Workers, VS, Ref) + end, find_subscribers(WorkerType)). + + +do_callback(nil, _, _, _, _) -> + % User didn't want a callback, they'll be polling the cache + ok; + +do_callback(Mod, WorkerType, Workers, VS, Ref) -> + try + Mod:couch_workers_membership_update(WorkerType, Workers, VS, Ref) + catch + Tag:Err -> + ErrMsg = "~p : failed when calling callback Mod:~p WorkerType:~p ~p:~p", + couch_log:error(ErrMsg, [?MODULE, Mod, WorkerType, Tag, Err]) + end. + + +subscribe_int(WorkerType, Ref, Module) -> + true = ets:insert(?SUBSCRIBERS, {{WorkerType, Ref}, Module, nil}), + maybe_start_role_monitor(WorkerType). + + +unsubscribe_int(Ref) -> + case find_subscriber_role(Ref) of + {ok, WorkerType} -> + true = ets:delete(?SUBSCRIBERS, {WorkerType, Ref}), + case find_subscribers(WorkerType) of + [] -> + true = ets:delete(?CACHE, WorkerType), + stop_role_monitor(WorkerType); + [_] -> + ok + end; + {error, not_found} -> + ok + end. + + +find_subscriber_role(Ref) -> + case ets:match(?SUBSCRIBERS, {{'$1', Ref}, '_'}) of + [] -> {error, not_found}; + [[WorkerType]] -> {ok, WorkerType} + end. + + +find_subscribers(WorkerType) -> + % Using ETS partial key match in ordered set + Subscribers = ets:match(?SUBSCRIBERS, {{WorkerType, '$1'}, '$2', '$3'}), + lists:map(fun([Ref, Mod, VS]) -> {Ref, Mod, VS} end, Subscribers). + + +maybe_start_role_monitor(WorkerType) -> + case ets:lookup(?MONITOR_PIDS, WorkerType) of + [{_WorkerType, _Pid}] -> + false; + [] -> + PollMSec = ?ROLE_MONITOR_POLL_INTERVAL_MSEC, + Self = self(), + Pid = spawn_link(fun() -> + role_monitor_loop(WorkerType, nil, Self, PollMSec) + end), + true = ets:insert(?MONITOR_PIDS, {WorkerType, Pid}) + end. + + +stop_role_monitor(WorkerType) -> + [{_, Pid}] = ets:lookup(?MONITOR_PIDS, WorkerType), + true = ets:delete(?MONITOR_PIDS, WorkerType), + Ref = monitor(process, Pid), + unlink(Pid), + exit(Pid, kill), + receive {'DOWN', Ref, _, _, _} -> ok end. + + + +% Replace with a watch eventually but with a polling fallback +% in case we make too many watches +role_monitor_loop(WorkerType, VS, ReportPid, PollMSec) -> + NewVS = case get_workers_vs(WorkerType) of + VS -> + VS; + OtherVS when is_binary(OtherVS) -> + {VS1, Workers} = get_workers_and_vs(WorkerType), + CallMsg = {membership_updated, WorkerType, VS1, Workers}, + ok = gen_server:call(ReportPid, CallMsg, infinity), + VS1 + end, + timer:sleep(PollMSec), + role_monitor_loop(WorkerType, NewVS, ReportPid, PollMSec). + + +get_workers_vs(WorkerType) -> + fabric2_fdb:transactional(fun(Tx) -> + couch_workers_fdb:get_workers_vs(Tx, WorkerType) + end). + + +get_workers_and_vs(WorkerType) -> + fabric2_fdb:transactional(fun(Tx) -> + VS = couch_workers_fdb:get_workers(Tx, WorkerType), + Workers = couch_workers_fdb:get_workers(Tx, WorkerType), + {VS, Workers} + end). diff --git a/src/couch_workers/src/couch_workers_local.erl b/src/couch_workers/src/couch_workers_local.erl new file mode 100644 index 000000000..117357a1d --- /dev/null +++ b/src/couch_workers/src/couch_workers_local.erl @@ -0,0 +1,155 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_workers_local). + +-behaviour(gen_server). + + +-export([ + start_link/0, + worker_register/4, + worker_unregister/1 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-define(DEFAULT_HEALTH_TIMEOUT_SEC, 15). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []). + + +worker_register(WorkerType, Id, Opts, Pid) -> + gen_server:call(?MODULE, {worker_register, WorkerType, Id, Opts, Pid}, infinity). + + +worker_unregister(Ref) -> + get_server:call(?MODULE, {worker_unregsiter, Ref}, infinity). + + +init(_) -> + % {Ref, WorkerType, Id, HealthPid} + ets:new(?MODULE, [protected, named_table]), + {ok, nil}. + + +terminate(_, _St) -> + ok. + + +handle_call({worker_register, WorkerType, Id, Opts, Pid}, _From, St) -> + worker_register_int(WorkerType, Id, Opts, Pid), + {noreply, St}; + +handle_call({worker_unregister, Ref}, _From, St) -> + worker_unregister_int(Ref), + {reply, ok, St}; + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) -> + worker_unregister(Ref), + {reply, ok, St}; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +%% Utility functions + +worker_register_int(WorkerType, Id, Pid, Opts) -> + case ets:match(?MODULE, {'$1', WorkerType, Id, '_'}) of + [] -> + Ref = erlang:monitor(process, Pid), + ok = set_worker(WorkerType, Id, Opts), + Timeout = maps:get(timeout, Opts, ?DEFAULT_HEALTH_TIMEOUT_SEC), + HPid = spawn_link(fun() -> + health_pinger_loop(WorkerType, Id, Timeout) + end), + true = ets:insert(?MODULE, {Ref, WorkerType, Id, HPid}), + Ref; + [[Ref]] -> + Ref + end. + + +worker_unregister_int(Ref) -> + case ets:lookup(?MODULE, Ref) of + [{_, WorkerType, Id, HealthPid}] -> + ok = clear_worker(WorkerType, Id), + kill_health_pinger(HealthPid), + true = ets:delete(?MODULE, Ref), + ok; + [] -> + couch_log:error("~p : unknown worker reference ~p", [?MODULE, Ref]), + ok + end. + + +now_sec() -> + {Mega, Sec, _Micro} = os:timestamp(), + Mega * 1000000 + Sec. + + +kill_health_pinger(Pid) when is_pid(Pid) -> + Ref = monitor(process, Pid), + unlink(Pid), + exit(Pid, kill), + receive {'DOWN', Ref, _, _, _} -> ok end. + + +health_pinger_loop(WorkerType, Id, Timeout) -> + set_worker_health(WorkerType, Id, now_sec(), Timeout), + % todo: dd jitter here + timer:sleep(max(10, Timeout * 1000 / 3)), + health_pinger_loop(WorkerType, Id, Timeout). + + +set_worker_health(WorkerType, Worker, TStamp, Timeout) -> + fabric2_fdb:transactional(fun(Tx) -> + couch_workers_fdb:set_worker_health(Tx, WorkerType, Worker, TStamp, Timeout) + end). + + +set_worker(WorkerType, Worker, Opts) -> + fabric2_fdb:transactional(fun(Tx) -> + couch_workers_fdb:set_worker(Tx, WorkerType, Worker, Opts) + end), + ok. + + +clear_worker(WorkerType, Worker) -> + fabric2_fdb:transactional(fun(Tx) -> + couch_workers_fdb:clear_worker(Tx, WorkerType, Worker) + end), + ok. diff --git a/src/couch_workers/src/couch_workers_sup.erl b/src/couch_workers/src/couch_workers_sup.erl new file mode 100644 index 000000000..15a0a3b91 --- /dev/null +++ b/src/couch_workers/src/couch_workers_sup.erl @@ -0,0 +1,49 @@ +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_workers_sup). + + +-behaviour(supervisor). + + +-export([ + start_link/0 +]). + +-export([ + init/1 +]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +init([]) -> + Flags = #{ + strategy => one_for_one, + intensity => 1, + period => 5 + }, + Children = [ + #{ + id => couch_workers_server, + start => {couch_workers_global, start_link, []} + }, + #{ + id => couch_workers_server, + start => {couch_workers_local, start_link, []} + } + ], + {ok, {Flags, Children}}. |