diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2018-12-20 12:19:01 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2018-12-20 15:41:55 -0500 |
commit | 632f303a47bd89a97c831fd0532cb7541b80355d (patch) | |
tree | d343cd9e49f02656a37729ae540ee4edb0ba5194 | |
parent | 88dd1255595b513ff778e5efa4b2399aa3ccb570 (diff) | |
download | couchdb-632f303a47bd89a97c831fd0532cb7541b80355d.tar.gz |
Clean rexi stream workers when coordinator process is killed
Sometimes fabric coordinators end up getting brutally terminated [1], and in that
case they might never process their `after` clause where their remote rexi
workers are killed. Those workers are left lingering around keeping databases
active for up to 5 minutes at a time.
To prevent that from happening, let coordinators which use streams spawn an
auxiliary cleaner process. This process will monitor the main coordinator and
if it dies will ensure remote workers are killed, freeing resources
immediately. In order not to send 2x the number of kill messages during the
normal exit, fabric_util:cleanup() will stop the auxiliary process before
continuing.
[1] One instance is when the ddoc cache is refreshed:
https://github.com/apache/couchdb/blob/master/src/ddoc_cache/src/ddoc_cache_entry.erl#L236
-rw-r--r-- | src/fabric/src/fabric_streams.erl | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index 32217c3cf..ae0c2be55 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -22,6 +22,9 @@ -include_lib("mem3/include/mem3.hrl"). +-define(WORKER_CLEANER, fabric_worker_cleaner). + + start(Workers, Keypos) -> start(Workers, Keypos, undefined, undefined). @@ -32,6 +35,7 @@ start(Workers0, Keypos, StartFun, Replacements) -> start_fun = StartFun, replacements = Replacements }, + spawn_worker_cleaner(self(), Workers0), Timeout = fabric_util:request_timeout(), case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of {ok, #stream_acc{workers=Workers}} -> @@ -47,6 +51,16 @@ start(Workers0, Keypos, StartFun, Replacements) -> cleanup(Workers) -> + % Stop the auxiliary cleaner process as we got to the point where cleanup + % happesn in the regular fashion so we don't want to send 2x the number kill + % messages + case get(?WORKER_CLEANER) of + CleanerPid when is_pid(CleanerPid) -> + erase(?WORKER_CLEANER), + exit(CleanerPid, kill); + _ -> + ok + end, fabric_util:cleanup(Workers). @@ -72,6 +86,7 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) -> {value, {_Range, WorkerReplacements}, NewReplacements} -> FinalWorkers = lists:foldl(fun(Repl, NewWorkers) -> NewWorker = (St#stream_acc.start_fun)(Repl), + add_worker_to_cleaner(self(), NewWorker), fabric_dict:store(NewWorker, waiting, NewWorkers) end, Workers, WorkerReplacements), % Assert that our replaced worker provides us @@ -117,3 +132,120 @@ handle_stream_start({ok, ddoc_updated}, _, St) -> handle_stream_start(Else, _, _) -> exit({invalid_stream_start, Else}). + + +% Spawn an auxiliary rexi worker cleaner. This will be used in cases +% when the coordinator (request) process is forceably killed and doesn't +% get a chance to process its `after` fabric:clean/1 clause. +spawn_worker_cleaner(Coordinator, Workers) -> + case get(?WORKER_CLEANER) of + undefined -> + Pid = spawn(fun() -> + erlang:monitor(process, Coordinator), + cleaner_loop(Coordinator, Workers) + end), + put(?WORKER_CLEANER, Pid), + Pid; + ExistingCleaner -> + ExistingCleaner + end. + + +cleaner_loop(Pid, Workers) -> + receive + {add_worker, Pid, Worker} -> + cleaner_loop(Pid, [Worker | Workers]); + {'DOWN', _, _, Pid, _} -> + fabric_util:cleanup(Workers) + end. + + +add_worker_to_cleaner(CoordinatorPid, Worker) -> + case get(?WORKER_CLEANER) of + CleanerPid when is_pid(CleanerPid) -> + CleanerPid ! {add_worker, CoordinatorPid, Worker}; + _ -> + ok + end. + + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +worker_cleaner_test_() -> + { + "Fabric spawn_worker_cleaner test", { + setup, fun setup/0, fun teardown/1, + fun(_) -> [ + should_clean_workers(), + does_not_fire_if_cleanup_called(), + should_clean_additional_worker_too() + ] end + } + }. + + +should_clean_workers() -> + ?_test(begin + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end), + Cleaner = spawn_worker_cleaner(Coord, Workers), + Ref = erlang:monitor(process, Cleaner), + Coord ! die, + receive {'DOWN', Ref, _, Cleaner, _} -> ok end, + ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + end). + + +does_not_fire_if_cleanup_called() -> + ?_test(begin + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end), + Cleaner = spawn_worker_cleaner(Coord, Workers), + Ref = erlang:monitor(process, Cleaner), + cleanup(Workers), + Coord ! die, + receive {'DOWN', Ref, _, _, _} -> ok end, + % 2 calls would be from cleanup/1 function. If cleanup process fired + % too it would have been 4 calls total. + ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + end). + + +should_clean_additional_worker_too() -> + ?_test(begin + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end), + Cleaner = spawn_worker_cleaner(Coord, Workers), + add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}), + Ref = erlang:monitor(process, Cleaner), + Coord ! die, + receive {'DOWN', Ref, _, Cleaner, _} -> ok end, + ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + end). + + +setup() -> + ok = meck:expect(rexi, kill, fun(_, _) -> ok end). + + +teardown(_) -> + meck:unload(). + +-endif. |