diff options
authorSimon MacMullen <>2013-03-19 17:15:07 +0000
committerSimon MacMullen <>2013-03-19 17:15:07 +0000
commit7045b90b2e9a2be678b6781db6815f20af120e87 (patch)
parentd8baacd25557f3bee697dd867bd3c32e1d1ba874 (diff)
First pass at autohealing.
1 files changed, 121 insertions, 11 deletions
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index de53b7f0..aac2bf84 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -33,7 +33,7 @@
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
--record(state, {monitors, partitions, subscribers}).
+-record(state, {monitors, partitions, subscribers, autoheal}).
@@ -196,7 +196,8 @@ init([]) ->
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
- partitions = []}}.
+ partitions = [],
+ autoheal = not_healing}}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, {node(), Partitions}, State};
@@ -268,9 +269,60 @@ handle_info({mnesia_system_event,
monitors = pmon:monitor({rabbit, Node}, Monitors)}
ok = handle_live_rabbit(Node),
+ State2 = case application:get_env(rabbit, cluster_partition_handling) of
+ {ok, autoheal} -> case ratio() of
+ 1.0 -> autoheal(State1);
+ _ -> State1
+ end;
+ _ -> State1
+ end,
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State1#state{partitions = Partitions1}};
+ {noreply, State2#state{partitions = Partitions1}};
+handle_info({autoheal_request_winner, Node},
+ State = #state{autoheal = {wait_for_winner_reqs,[Node], Notify}}) ->
+ %% TODO actually do something sensible to figure out who the winner is
+ Winner = self(),
+ [{?MODULE, N} ! {autoheal_winner, Winner} || N <- Notify],
+ {noreply, State#state{autoheal = wait_for_winner}};
+handle_info({autoheal_request_winner, Node},
+ State = #state{autoheal = {wait_for_winner_reqs, Nodes, Notify}}) ->
+ {noreply, State#state{autoheal = {wait_for_winner_reqs,
+ Nodes -- [Node], Notify}}};
+handle_info({autoheal_winner, Winner},
+ State = #state{autoheal = wait_for_winner,
+ partitions = Partitions}) ->
+ Node = node(Winner),
+ case lists:member(Node, Partitions) of
+ false -> case node() of
+ Node -> {noreply,
+ State#state{autoheal = {wait_for, Partitions,
+ Partitions}}};
+ _ -> {noreply, State#state{autoheal = not_healing}}
+ end;
+ true -> autoheal_restart(Winner),
+ {noreply, State}
+ end;
+handle_info({autoheal_winner, _Winner}, State) ->
+ %% ignore, we already cancelled the autoheal process
+ {noreply, State};
+handle_info({autoheal_node_stopped, Node},
+ State = #state{autoheal = {wait_for, [Node], Notify}}) ->
+ [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
+ {noreply, State#state{autoheal = not_healing}};
+handle_info({autoheal_node_stopped, Node},
+ State = #state{autoheal = {wait_for, WaitFor, Notify}}) ->
+ {noreply, State#state{autoheal = {wait_for, WaitFor -- [Node], Notify}}};
+handle_info({autoheal_node_stopped, _Node}, State) ->
+ %% ignore, we already cancelled the autoheal process
+ {noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -301,6 +353,8 @@ handle_dead_rabbit(Node) ->
{ok, ignore} ->
+ {ok, autoheal} ->
+ ok;
{ok, Term} ->
rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
"assuming 'ignore'~n", [Term]),
@@ -308,8 +362,8 @@ handle_dead_rabbit(Node) ->
-majority() ->
- length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5.
+majority() -> ratio() > 0.5.
+ratio() -> length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)).
%% mnesia:system_info(db_nodes) (and hence
%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results
@@ -322,15 +376,20 @@ await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
Nodes = rabbit_mnesia:cluster_nodes(all),
+ run_outside_applications(fun () ->
+ rabbit:stop(),
+ wait_for_cluster_recovery(Nodes)
+ end).
+run_outside_applications(Fun) ->
spawn(fun () ->
%% If our group leader is inside an application we are about
%% to stop, application:stop/1 does not return.
group_leader(whereis(init), self()),
- %% Ensure only one restarting process at a time, will
+ %% Ensure only one such process at a time, will
%% exit(badarg) (harmlessly) if one is already running
- register(rabbit_restarting_process, self()),
- rabbit:stop(),
- wait_for_cluster_recovery(Nodes)
+ register(rabbit_outside_app_process, self()),
+ Fun()
wait_for_cluster_recovery(Nodes) ->
@@ -340,7 +399,54 @@ wait_for_cluster_recovery(Nodes) ->
-handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
+%% In order to autoheal we want to:
+%% * Find the winning partition
+%% * Stop all nodes in other partitions
+%% * Wait for them all to be stopped
+%% * Start them again
+%% To keep things simple, we assume all nodes are up. We don't start
+%% unless all nodes are up, and if a node goes down we abandon the
+%% whole process. To further keep things simple we also defer the
+%% decision as to the winning node to the "leader" - arbitrarily
+%% selected as the first node in the cluster.
+%% To coordinate the restarting nodes we pick a special node from the
+%% winning partition - the "winner". Restarting nodes then stop, tell
+%% the winner they have done so, and wait for it to tell them it is
+%% safe to start again.
+%% The winner and the leader are not necessarily the same node! Since
+%% the leader may end up restarting, we also make sure that it does
+%% not announce its decision (and thus cue other nodes to restart)
+%% until it has seen a request from every node.
+autoheal(State) ->
+ [Leader | _] = All = lists:usort(rabbit_mnesia:cluster_nodes(all)),
+ {?MODULE, Leader} ! {autoheal_request_winner, node()},
+ State#state{autoheal = case node() of
+ Leader -> {wait_for_winner_reqs, All, All};
+ _ -> wait_for_winner
+ end}.
+autoheal_restart(Winner) ->
+ rabbit_log:warning(
+ "Cluster partition healed; we were selected to restart~n", []),
+ run_outside_applications(
+ fun () ->
+ MRef = erlang:monitor(process, Winner),
+ rabbit:stop(),
+ Winner ! {autoheal_node_stopped, node()},
+ receive
+ {'DOWN', MRef, process, Winner, _Reason} -> ok;
+ autoheal_safe_to_start -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ rabbit:start()
+ end).
+handle_dead_rabbit_state(State = #state{partitions = Partitions,
+ autoheal = Autoheal}) ->
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -350,7 +456,11 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
[] -> [];
_ -> Partitions
- State#state{partitions = Partitions1}.
+ State#state{partitions = Partitions1,
+ autoheal = case Autoheal of
+ {wait_for, _Nodes, _Notify} -> Autoheal;
+ _ -> not_healing
+ end}.
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),