summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2018-12-20 12:19:01 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2018-12-20 15:41:55 -0500
commit632f303a47bd89a97c831fd0532cb7541b80355d (patch)
treed343cd9e49f02656a37729ae540ee4edb0ba5194
parent88dd1255595b513ff778e5efa4b2399aa3ccb570 (diff)
downloadcouchdb-632f303a47bd89a97c831fd0532cb7541b80355d.tar.gz
Clean rexi stream workers when coordinator process is killed
Sometimes fabric coordinators end up getting brutally terminated [1], and in that case they might never process their `after` clause where their remote rexi workers are killed. Those workers are left lingering around keeping databases active for up to 5 minutes at a time. To prevent that from happening, let coordinators which use streams spawn an auxiliary cleaner process. This process will monitor the main coordinator and if it dies will ensure remote workers are killed, freeing resources immediately. In order not to send 2x the number of kill messages during the normal exit, fabric_util:cleanup() will stop the auxiliary process before continuing. [1] One instance is when the ddoc cache is refreshed: https://github.com/apache/couchdb/blob/master/src/ddoc_cache/src/ddoc_cache_entry.erl#L236
-rw-r--r--src/fabric/src/fabric_streams.erl132
1 files changed, 132 insertions, 0 deletions
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
index 32217c3cf..ae0c2be55 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -22,6 +22,9 @@
-include_lib("mem3/include/mem3.hrl").
+-define(WORKER_CLEANER, fabric_worker_cleaner).
+
+
start(Workers, Keypos) ->
start(Workers, Keypos, undefined, undefined).
@@ -32,6 +35,7 @@ start(Workers0, Keypos, StartFun, Replacements) ->
start_fun = StartFun,
replacements = Replacements
},
+ spawn_worker_cleaner(self(), Workers0),
Timeout = fabric_util:request_timeout(),
case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
{ok, #stream_acc{workers=Workers}} ->
@@ -47,6 +51,16 @@ start(Workers0, Keypos, StartFun, Replacements) ->
cleanup(Workers) ->
+ % Stop the auxiliary cleaner process as we got to the point where cleanup
+ % happesn in the regular fashion so we don't want to send 2x the number kill
+ % messages
+ case get(?WORKER_CLEANER) of
+ CleanerPid when is_pid(CleanerPid) ->
+ erase(?WORKER_CLEANER),
+ exit(CleanerPid, kill);
+ _ ->
+ ok
+ end,
fabric_util:cleanup(Workers).
@@ -72,6 +86,7 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
{value, {_Range, WorkerReplacements}, NewReplacements} ->
FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
NewWorker = (St#stream_acc.start_fun)(Repl),
+ add_worker_to_cleaner(self(), NewWorker),
fabric_dict:store(NewWorker, waiting, NewWorkers)
end, Workers, WorkerReplacements),
% Assert that our replaced worker provides us
@@ -117,3 +132,120 @@ handle_stream_start({ok, ddoc_updated}, _, St) ->
handle_stream_start(Else, _, _) ->
exit({invalid_stream_start, Else}).
+
+
+% Spawn an auxiliary rexi worker cleaner. This will be used in cases
+% when the coordinator (request) process is forceably killed and doesn't
+% get a chance to process its `after` fabric:clean/1 clause.
+spawn_worker_cleaner(Coordinator, Workers) ->
+ case get(?WORKER_CLEANER) of
+ undefined ->
+ Pid = spawn(fun() ->
+ erlang:monitor(process, Coordinator),
+ cleaner_loop(Coordinator, Workers)
+ end),
+ put(?WORKER_CLEANER, Pid),
+ Pid;
+ ExistingCleaner ->
+ ExistingCleaner
+ end.
+
+
+cleaner_loop(Pid, Workers) ->
+ receive
+ {add_worker, Pid, Worker} ->
+ cleaner_loop(Pid, [Worker | Workers]);
+ {'DOWN', _, _, Pid, _} ->
+ fabric_util:cleanup(Workers)
+ end.
+
+
+add_worker_to_cleaner(CoordinatorPid, Worker) ->
+ case get(?WORKER_CLEANER) of
+ CleanerPid when is_pid(CleanerPid) ->
+ CleanerPid ! {add_worker, CoordinatorPid, Worker};
+ _ ->
+ ok
+ end.
+
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+worker_cleaner_test_() ->
+ {
+ "Fabric spawn_worker_cleaner test", {
+ setup, fun setup/0, fun teardown/1,
+ fun(_) -> [
+ should_clean_workers(),
+ does_not_fire_if_cleanup_called(),
+ should_clean_additional_worker_too()
+ ] end
+ }
+ }.
+
+
+should_clean_workers() ->
+ ?_test(begin
+ meck:reset(rexi),
+ erase(?WORKER_CLEANER),
+ Workers = [
+ #shard{node = 'n1', ref = make_ref()},
+ #shard{node = 'n2', ref = make_ref()}
+ ],
+ {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+ Cleaner = spawn_worker_cleaner(Coord, Workers),
+ Ref = erlang:monitor(process, Cleaner),
+ Coord ! die,
+ receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
+ ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+ end).
+
+
+does_not_fire_if_cleanup_called() ->
+ ?_test(begin
+ meck:reset(rexi),
+ erase(?WORKER_CLEANER),
+ Workers = [
+ #shard{node = 'n1', ref = make_ref()},
+ #shard{node = 'n2', ref = make_ref()}
+ ],
+ {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+ Cleaner = spawn_worker_cleaner(Coord, Workers),
+ Ref = erlang:monitor(process, Cleaner),
+ cleanup(Workers),
+ Coord ! die,
+ receive {'DOWN', Ref, _, _, _} -> ok end,
+ % 2 calls would be from cleanup/1 function. If cleanup process fired
+ % too it would have been 4 calls total.
+ ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+ end).
+
+
+should_clean_additional_worker_too() ->
+ ?_test(begin
+ meck:reset(rexi),
+ erase(?WORKER_CLEANER),
+ Workers = [
+ #shard{node = 'n1', ref = make_ref()}
+ ],
+ {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+ Cleaner = spawn_worker_cleaner(Coord, Workers),
+ add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
+ Ref = erlang:monitor(process, Cleaner),
+ Coord ! die,
+ receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
+ ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+ end).
+
+
+setup() ->
+ ok = meck:expect(rexi, kill, fun(_, _) -> ok end).
+
+
+teardown(_) ->
+ meck:unload().
+
+-endif.