summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-05-06 12:49:22 -0400
committerNick Vatamaniuc <vatamane@apache.org>2019-05-07 01:42:45 -0400
commit28cc7ca93c9ed259bf78ca4b2eb67d4734bac274 (patch)
tree470706e7a903e76d5f010727043fcc2476d892e6
parent44bc5533ae1928c5e174065c0cf9b14113a5fd63 (diff)
downloadcouchdb-prototype/rfc-couch-workers.tar.gz
Couch workers prototypeprototype/rfc-couch-workers
-rw-r--r--rebar.config.script1
-rw-r--r--rel/reltool.config2
-rw-r--r--src/couch_workers/.gitignore4
-rw-r--r--src/couch_workers/README.md4
-rw-r--r--src/couch_workers/src/couch_workers.app.src30
-rw-r--r--src/couch_workers/src/couch_workers.erl54
-rw-r--r--src/couch_workers/src/couch_workers_app.erl26
-rw-r--r--src/couch_workers/src/couch_workers_fdb.erl155
-rw-r--r--src/couch_workers/src/couch_workers_global.erl228
-rw-r--r--src/couch_workers/src/couch_workers_local.erl155
-rw-r--r--src/couch_workers/src/couch_workers_sup.erl49
11 files changed, 708 insertions, 0 deletions
diff --git a/rebar.config.script b/rebar.config.script
index 3b58bcb1e..0069597c2 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -76,6 +76,7 @@ SubDirs = [
"src/couch_tests",
"src/ddoc_cache",
"src/fabric",
+ "src/couch_workers",
"src/global_changes",
"src/mango",
"src/rexi",
diff --git a/rel/reltool.config b/rel/reltool.config
index 1051d2e77..ffb795a00 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -34,6 +34,7 @@
couch,
couch_epi,
couch_index,
+ couch_workers,
couch_log,
couch_mrview,
couch_plugins,
@@ -90,6 +91,7 @@
{app, config, [{incl_cond, include}]},
{app, couch, [{incl_cond, include}]},
{app, couch_epi, [{incl_cond, include}]},
+ {app, couch_workers, [{incl_cond, include}]},
{app, couch_index, [{incl_cond, include}]},
{app, couch_log, [{incl_cond, include}]},
{app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_workers/.gitignore b/src/couch_workers/.gitignore
new file mode 100644
index 000000000..62c8b0712
--- /dev/null
+++ b/src/couch_workers/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_workers.app
+.DS_Store \ No newline at end of file
diff --git a/src/couch_workers/README.md b/src/couch_workers/README.md
new file mode 100644
index 000000000..8cfcd3446
--- /dev/null
+++ b/src/couch_workers/README.md
@@ -0,0 +1,4 @@
+Couch Workers Application
+=========================
+
+Couch workers perform background jobs for CouchDB running on top of FDB
diff --git a/src/couch_workers/src/couch_workers.app.src b/src/couch_workers/src/couch_workers.app.src
new file mode 100644
index 000000000..7db57d64a
--- /dev/null
+++ b/src/couch_workers/src/couch_workers.app.src
@@ -0,0 +1,30 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_workers, [
+ {description, "CouchDB Workers"},
+ {vsn, git},
+ {mod, {couch_workers_app, []}},
+ {registered, [
+ couch_workers_sup,
+ couch_workers_global,
+ couch_workers_local
+ ]},
+ {applications, [
+ kernel,
+ stdlib,
+ erlfdb,
+ couch_log,
+ config,
+ fabric
+ ]}
+]}.
diff --git a/src/couch_workers/src/couch_workers.erl b/src/couch_workers/src/couch_workers.erl
new file mode 100644
index 000000000..36080a095
--- /dev/null
+++ b/src/couch_workers/src/couch_workers.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers).
+
+-export([
+ worker_register/4,
+ worker_unregister/1,
+
+ membership_subscribe/3,
+ membership_unsubscribe/1,
+
+ get_workers/1
+]).
+
+
+-callback couch_workers_membership_update(
+ WorkerType :: term(),
+ Workers :: #{},
+ VStamp :: binary(),
+ SubscriberRef :: reference()
+) -> ok.
+
+
+% External API
+
+worker_register(WorkerType, Id, Opts, Pid) when is_binary(Id), is_map(Opts),
+ is_pid(Pid) ->
+ couch_workers_local:worker_register(WorkerType, Id, Pid).
+
+
+worker_unregister(Ref) when is_reference(Ref) ->
+ couch_workers_local:worker_unregister(Ref).
+
+
+membership_subscribe(WorkerType, Module, Pid) ->
+ couch_workers_global:subscribe(WorkerType, Module, Pid).
+
+
+membership_unsubscribe(Ref) when is_reference(Ref) ->
+ couch_workers_global:unsubscribe(Ref).
+
+
+get_workers(WorkerType) ->
+ couch_workers_global:get_workers(WorkerType).
diff --git a/src/couch_workers/src/couch_workers_app.erl b/src/couch_workers/src/couch_workers_app.erl
new file mode 100644
index 000000000..734dc1bfa
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_app.erl
@@ -0,0 +1,26 @@
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_app).
+
+
+-behaviour(application).
+
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_Type, []) ->
+ couch_workers_sup:start_link().
+
+
+stop([]) ->
+ ok.
diff --git a/src/couch_workers/src/couch_workers_fdb.erl b/src/couch_workers/src/couch_workers_fdb.erl
new file mode 100644
index 000000000..8ebe55ef8
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_fdb.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_fdb).
+
+-export([
+ set_worker/4,
+ clear_worker/3,
+ get_worker/3,
+
+ set_worker_health/5,
+ get_worker_health/3,
+ get_workers_health/2,
+
+ set_workers_vs/2,
+ get_workers_vs/2,
+
+ get_workers/2
+]).
+
+
+%% Switch these to numbers eventually
+-define(COUCH_WORKERS, <<"couch_workers">>).
+-define(JOBS, <<"jobs">>).
+-define(PENDING, <<"pending">>).
+-define(ACTIVE, <<"active">>).
+-define(WORKERS, <<"workers">>).
+-define(WORKERS_VS, <<"workers_vs">>).
+-define(HEALTH, <<"health">>).
+
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+
+-define(PREFIX_CACHE, '$couch_worker_prefix').
+
+%% (?COUCH_WORKERS, ?JOBS, JobType, JobId) = (JobState, WorkerId, Priority, CancelReq, JobInfo, JobOps)
+%% (?COUCH_WORKERS, ?PENDING, JobType, Priority, JobId) = ""
+
+%% (?COUCH_WORKERS, ?ACTIVE, WorkerType, Worker, JobId) = JobState
+
+%% (?COUCH_WORKERS, WorkerType, ?WORKERS_VS) = VS
+%% (?COUCH_WORKERS, WorkerType, ?WORKERS, Worker) = WOpts
+%% (?COUCH_WORKERS, WorkerType, ?HEALTH, Worker) = (VS, TStamp, WorkerTimeout)
+
+
+get_worker(Tx, WorkerType, Worker) ->
+ Key = erlfdb_tuple:pack({?WORKERS, Worker}, workers_prefix(Tx, WorkerType)),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ <<_/binary>> = Val ->
+ % Use json here
+ binary_to_term(Val, [safe]);
+ not_found ->
+ not_found
+ end.
+
+
+set_worker(Tx, WorkerType, Worker, WOpts) ->
+ Key = erlfdb_tuple:pack({?WORKERS, Worker}, workers_prefix(Tx, WorkerType)),
+ case get_worker(Tx, WorkerType, Worker) of
+ not_found ->
+ set_workers_vs(Tx, WorkerType);
+ #{} ->
+ ok
+ end,
+ erlfdb:wait(erlfdb:set(Tx, Key, jiffy:encode(WOpts))).
+
+
+clear_worker(Tx, WorkerType, Worker) ->
+ Prefix = workers_prefix(Tx, WorkerType),
+ case get_worker(Tx, WorkerType, Worker) of
+ not_found ->
+ ok;
+ #{} ->
+ WPrefix = erlfdb_tuple:pack({?WORKERS, Worker}, Prefix),
+ erlfdb:clear_range_startswith(Tx, WPrefix),
+ HPrefix = erlfdb_tuple:pack({?HEALTH, Worker}, Prefix),
+ erlfdb:clear_range_startswith(Tx, HPrefix),
+ set_workers_vs(Tx, WorkerType),
+ ok
+ end.
+
+
+get_worker_health(Tx, WorkerType, Worker) ->
+ Key = erlfdb_tuple:pack({?HEALTH, Worker}, workers_prefix(Tx, WorkerType)),
+ Val = erlfdb:wait(erlfdb:get(Tx, Key)),
+ {VS, TStamp, WorkerTimeout} = erlfdb_tuple:unpack(Val),
+ {VS, TStamp, WorkerTimeout}.
+
+
+get_workers_health(Tx, WorkerType) ->
+ Prefix = workers_prefix(Tx, WorkerType),
+ {Start, End} = erlfdb_tuple:range({?HEALTH}, Prefix),
+ RawKVs = erlfdb:wait(erlfdb:get_range(Tx, Start, End)),
+ KVs = lists:map(fun({K, V}) ->
+ {?HEALTH, Worker} = erlfdb_tuple:unpack(K, Prefix),
+ WOpts = jiffy:decode(V, [return_maps]),
+ {Worker, WOpts}
+ end, RawKVs),
+ maps:from_list(KVs).
+
+
+set_worker_health(Tx, WorkerType, Worker, TStamp, WorkerTimeout) when
+ is_integer(TStamp), is_integer(WorkerTimeout) ->
+ Key = erlfdb_tuple:pack({?HEALTH, Worker}, workers_prefix(Tx, WorkerType)),
+ Val = erlfdb_tuple:pack({?UNSET_VS, TStamp, WorkerTimeout}),
+ erlfdb:wait(erlfdb:set(Tx, Key, Val)).
+
+
+get_workers_vs(Tx, WorkerType) ->
+ % return a watch here eventually
+ Key = erlfdb_tuple:pack({?WORKERS_VS}, workers_prefix(Tx, WorkerType)),
+ erlfdb:wait(erlfdb:get(Tx, Key)).
+
+
+set_workers_vs(Tx, WorkerType) ->
+ % return a watch here eventually
+ Key = erlfdb_tuple:pack({?WORKERS_VS}, workers_prefix(Tx, WorkerType)),
+ Val = erlfdb_tuple:pack({?UNSET_VS}),
+ erlfdb:wait(erlfdb:set(Tx, Key, Val)).
+
+
+get_workers(Tx, WorkerType) ->
+ Prefix = workers_prefix(Tx, WorkerType),
+ {Start, End} = erlfdb_tuple:range({?WORKERS}, Prefix),
+ RawKVs = erlfdb:wait(erlfdb:get_range(Tx, Start, End)),
+ KVs = lists:map(fun({K, V}) ->
+ {?WORKERS, Worker} = erlfdb_tuple:unpack(K, Prefix),
+ WOpts = jiffy:decode(V, [return_maps]),
+ {Worker, WOpts}
+ end, RawKVs),
+ maps:from_list(KVs).
+
+
+workers_prefix(Tx, WorkerType) ->
+ case get({?PREFIX_CACHE, WorkerType}) of
+ undefined ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ Res = erlfdb_tuple:pack({?COUCH_WORKERS, WorkerType}, Prefix),
+ put({?PREFIX_CACHE, WorkerType}, Res),
+ Res;
+ Res ->
+ Res
+ end.
diff --git a/src/couch_workers/src/couch_workers_global.erl b/src/couch_workers/src/couch_workers_global.erl
new file mode 100644
index 000000000..eba1241f1
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_global.erl
@@ -0,0 +1,228 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_global).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+ subscribe/3,
+ unsubscribe/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(ROLE_MONITOR_POLL_INTERVAL_MSEC, 5000).
+
+-define(CACHE, couch_workers_global_cache).
+-define(SUBSCRIBERS, couch_workers_global_subscribers).
+-define(MONITOR_PIDS, couch_workers_global_monitor_pids).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+subscribe(WorkerType, Module, Pid) ->
+ gen_server:call(?MODULE, {subscribe, WorkerType, Module, Pid}, infinity).
+
+
+unsubscribe(Ref) ->
+ gen_server:call(?MODULE, {unsubscribe, Ref}, infinity).
+
+
+get_workers(WorkerType) ->
+ case ets:lookup(?CACHE, WorkerType) of
+ [{WorkerType, VS, Workers}] ->
+ {ok, VS, Workers};
+ [] ->
+ {error, not_found}
+ end.
+
+
+%% gen_server callbacks
+
+init(_) ->
+ EtsOpts = [protected, named_table],
+ % {WorkerType, VS, Workers}
+ ets:new(?CACHE, EtsOpts ++ [{read_concurrency, true}]),
+ % {{WorkerType, Ref}, Module, VS}
+ ets:new(?SUBSCRIBERS, EtsOpts ++ [ordered_set]),
+ % {WorkerType, Pid}
+ ets:new(?MONITOR_PIDS, EtsOpts),
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call({subscribe, WorkerType, Mod, Pid}, From, St) ->
+ Ref = erlang:monitor(process, Pid),
+ subscribe_int(WorkerType, Ref, Mod),
+ gen_server:reply(From, Ref),
+ case get_workers(WorkerType) of
+ {ok, VS, Workers} ->
+ do_callback(Mod, WorkerType, Workers, VS, Ref);
+ {error, not_found} ->
+ ok
+ end,
+ {noreply, St};
+
+handle_call({unsubscribe, Ref}, _From, St) ->
+ unsubscribe_int(Ref),
+ {reply, ok, St};
+
+handle_call({membership_update, WorkerType, VS, Workers}, _From, St) ->
+ membership_update(WorkerType, VS, Workers),
+ {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+ unsubscribe_int(Ref),
+ {reply, ok, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+%% Utility functions
+
+membership_update(WorkerType, VS, Workers) ->
+ true = ets:insert(?CACHE, {WorkerType, VS, Workers}),
+ lists:foreach(fun
+ ({_Ref, _Mod, VS}) -> ok;
+ ({Ref, Mod, _OldVs}) -> do_callback(Mod, WorkerType, Workers, VS, Ref)
+ end, find_subscribers(WorkerType)).
+
+
+do_callback(nil, _, _, _, _) ->
+ % User didn't want a callback, they'll be polling the cache
+ ok;
+
+do_callback(Mod, WorkerType, Workers, VS, Ref) ->
+ try
+ Mod:couch_workers_membership_update(WorkerType, Workers, VS, Ref)
+ catch
+ Tag:Err ->
+ ErrMsg = "~p : failed when calling callback Mod:~p WorkerType:~p ~p:~p",
+ couch_log:error(ErrMsg, [?MODULE, Mod, WorkerType, Tag, Err])
+ end.
+
+
+subscribe_int(WorkerType, Ref, Module) ->
+ true = ets:insert(?SUBSCRIBERS, {{WorkerType, Ref}, Module, nil}),
+ maybe_start_role_monitor(WorkerType).
+
+
+unsubscribe_int(Ref) ->
+ case find_subscriber_role(Ref) of
+ {ok, WorkerType} ->
+ true = ets:delete(?SUBSCRIBERS, {WorkerType, Ref}),
+ case find_subscribers(WorkerType) of
+ [] ->
+ true = ets:delete(?CACHE, WorkerType),
+ stop_role_monitor(WorkerType);
+ [_] ->
+ ok
+ end;
+ {error, not_found} ->
+ ok
+ end.
+
+
+find_subscriber_role(Ref) ->
+ case ets:match(?SUBSCRIBERS, {{'$1', Ref}, '_'}) of
+ [] -> {error, not_found};
+ [[WorkerType]] -> {ok, WorkerType}
+ end.
+
+
+find_subscribers(WorkerType) ->
+ % Using ETS partial key match in ordered set
+ Subscribers = ets:match(?SUBSCRIBERS, {{WorkerType, '$1'}, '$2', '$3'}),
+ lists:map(fun([Ref, Mod, VS]) -> {Ref, Mod, VS} end, Subscribers).
+
+
+maybe_start_role_monitor(WorkerType) ->
+ case ets:lookup(?MONITOR_PIDS, WorkerType) of
+ [{_WorkerType, _Pid}] ->
+ false;
+ [] ->
+ PollMSec = ?ROLE_MONITOR_POLL_INTERVAL_MSEC,
+ Self = self(),
+ Pid = spawn_link(fun() ->
+ role_monitor_loop(WorkerType, nil, Self, PollMSec)
+ end),
+ true = ets:insert(?MONITOR_PIDS, {WorkerType, Pid})
+ end.
+
+
+stop_role_monitor(WorkerType) ->
+ [{_, Pid}] = ets:lookup(?MONITOR_PIDS, WorkerType),
+ true = ets:delete(?MONITOR_PIDS, WorkerType),
+ Ref = monitor(process, Pid),
+ unlink(Pid),
+ exit(Pid, kill),
+ receive {'DOWN', Ref, _, _, _} -> ok end.
+
+
+
+% Replace with a watch eventually but with a polling fallback
+% in case we make too many watches
+role_monitor_loop(WorkerType, VS, ReportPid, PollMSec) ->
+ NewVS = case get_workers_vs(WorkerType) of
+ VS ->
+ VS;
+ OtherVS when is_binary(OtherVS) ->
+ {VS1, Workers} = get_workers_and_vs(WorkerType),
+ CallMsg = {membership_updated, WorkerType, VS1, Workers},
+ ok = gen_server:call(ReportPid, CallMsg, infinity),
+ VS1
+ end,
+ timer:sleep(PollMSec),
+ role_monitor_loop(WorkerType, NewVS, ReportPid, PollMSec).
+
+
+get_workers_vs(WorkerType) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_workers_fdb:get_workers_vs(Tx, WorkerType)
+ end).
+
+
+get_workers_and_vs(WorkerType) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ VS = couch_workers_fdb:get_workers(Tx, WorkerType),
+ Workers = couch_workers_fdb:get_workers(Tx, WorkerType),
+ {VS, Workers}
+ end).
diff --git a/src/couch_workers/src/couch_workers_local.erl b/src/couch_workers/src/couch_workers_local.erl
new file mode 100644
index 000000000..117357a1d
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_local.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_local).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+ worker_register/4,
+ worker_unregister/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(DEFAULT_HEALTH_TIMEOUT_SEC, 15).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+worker_register(WorkerType, Id, Opts, Pid) ->
+ gen_server:call(?MODULE, {worker_register, WorkerType, Id, Opts, Pid}, infinity).
+
+
+worker_unregister(Ref) ->
+ get_server:call(?MODULE, {worker_unregsiter, Ref}, infinity).
+
+
+init(_) ->
+ % {Ref, WorkerType, Id, HealthPid}
+ ets:new(?MODULE, [protected, named_table]),
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call({worker_register, WorkerType, Id, Opts, Pid}, _From, St) ->
+ worker_register_int(WorkerType, Id, Opts, Pid),
+ {noreply, St};
+
+handle_call({worker_unregister, Ref}, _From, St) ->
+ worker_unregister_int(Ref),
+ {reply, ok, St};
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, St) ->
+ worker_unregister(Ref),
+ {reply, ok, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+%% Utility functions
+
+worker_register_int(WorkerType, Id, Pid, Opts) ->
+ case ets:match(?MODULE, {'$1', WorkerType, Id, '_'}) of
+ [] ->
+ Ref = erlang:monitor(process, Pid),
+ ok = set_worker(WorkerType, Id, Opts),
+ Timeout = maps:get(timeout, Opts, ?DEFAULT_HEALTH_TIMEOUT_SEC),
+ HPid = spawn_link(fun() ->
+ health_pinger_loop(WorkerType, Id, Timeout)
+ end),
+ true = ets:insert(?MODULE, {Ref, WorkerType, Id, HPid}),
+ Ref;
+ [[Ref]] ->
+ Ref
+ end.
+
+
+worker_unregister_int(Ref) ->
+ case ets:lookup(?MODULE, Ref) of
+ [{_, WorkerType, Id, HealthPid}] ->
+ ok = clear_worker(WorkerType, Id),
+ kill_health_pinger(HealthPid),
+ true = ets:delete(?MODULE, Ref),
+ ok;
+ [] ->
+ couch_log:error("~p : unknown worker reference ~p", [?MODULE, Ref]),
+ ok
+ end.
+
+
+now_sec() ->
+ {Mega, Sec, _Micro} = os:timestamp(),
+ Mega * 1000000 + Sec.
+
+
+kill_health_pinger(Pid) when is_pid(Pid) ->
+ Ref = monitor(process, Pid),
+ unlink(Pid),
+ exit(Pid, kill),
+ receive {'DOWN', Ref, _, _, _} -> ok end.
+
+
+health_pinger_loop(WorkerType, Id, Timeout) ->
+ set_worker_health(WorkerType, Id, now_sec(), Timeout),
+ % todo: dd jitter here
+ timer:sleep(max(10, Timeout * 1000 / 3)),
+ health_pinger_loop(WorkerType, Id, Timeout).
+
+
+set_worker_health(WorkerType, Worker, TStamp, Timeout) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_workers_fdb:set_worker_health(Tx, WorkerType, Worker, TStamp, Timeout)
+ end).
+
+
+set_worker(WorkerType, Worker, Opts) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_workers_fdb:set_worker(Tx, WorkerType, Worker, Opts)
+ end),
+ ok.
+
+
+clear_worker(WorkerType, Worker) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_workers_fdb:clear_worker(Tx, WorkerType, Worker)
+ end),
+ ok.
diff --git a/src/couch_workers/src/couch_workers_sup.erl b/src/couch_workers/src/couch_workers_sup.erl
new file mode 100644
index 000000000..15a0a3b91
--- /dev/null
+++ b/src/couch_workers/src/couch_workers_sup.erl
@@ -0,0 +1,49 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_workers_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ Flags = #{
+ strategy => one_for_one,
+ intensity => 1,
+ period => 5
+ },
+ Children = [
+ #{
+ id => couch_workers_server,
+ start => {couch_workers_global, start_link, []}
+ },
+ #{
+ id => couch_workers_server,
+ start => {couch_workers_local, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.