diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
new file mode 100644
index 00000000..c00c2dd6
--- /dev/null
+++ b/src/rabbit_autoheal.erl
@@ -0,0 +1,199 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%% The Original Code is RabbitMQ.
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
+-export([init/0, maybe_start/1, node_down/2, handle_msg/3]).
+%% The named process we are running in.
+-define(SERVER, rabbit_node_monitor).
+%% In order to autoheal we want to:
+%% * Find the winning partition
+%% * Stop all nodes in other partitions
+%% * Wait for them all to be stopped
+%% * Start them again
+%% To keep things simple, we assume all nodes are up. We don't start
+%% unless all nodes are up, and if a node goes down we abandon the
+%% whole process. To further keep things simple we also defer the
+%% decision as to the winning node to the "leader" - arbitrarily
+%% selected as the first node in the cluster.
+%% To coordinate the restarting nodes we pick a special node from the
+%% winning partition - the "winner". Restarting nodes then stop, tell
+%% the winner they have done so, and wait for it to tell them it is
+%% safe to start again.
+%% The winner and the leader are not necessarily the same node! Since
+%% the leader may end up restarting, we also make sure that it does
+%% not announce its decision (and thus cue other nodes to restart)
+%% until it has seen a request from every node that has experienced a
+%% partition.
+%% Possible states:
+%% not_healing
+%% - the default
+%% {winner_waiting, OutstandingStops, Notify}
+%% - we are the winner and are waiting for all losing nodes to stop
+%% before telling them they can restart
+%% restarting
+%% - we are restarting. Of course the node monitor immediately dies
+%% then so this state does not last long. We therefore send the
+%% autoheal_safe_to_start message to the rabbit_outside_app_process
+%% instead.
+init() -> not_healing.
+maybe_start(not_healing) ->
+ case enabled() of
+ true -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)),
+ send(Leader, {request_start, node()}),
+ rabbit_log:info("Autoheal request sent to ~p~n", [Leader]),
+ not_healing;
+ false -> not_healing
+ end;
+maybe_start(State) ->
+ State.
+enabled() ->
+ {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling).
+node_down(_Node, {winner_waiting, _Nodes, _Notify} = Autoheal) ->
+ Autoheal;
+node_down(_Node, not_healing) ->
+ not_healing;
+node_down(Node, _State) ->
+ rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
+ not_healing.
+%% By receiving this message we become the leader
+%% TODO should we try to debounce this?
+handle_msg({request_start, Node},
+ not_healing, Partitions) ->
+ rabbit_log:info("Autoheal request received from ~p~n", [Node]),
+ case rabbit_node_monitor:all_rabbit_nodes_up() of
+ false -> not_healing;
+ true -> AllPartitions = all_partitions(Partitions),
+ {Winner, Losers} = make_decision(AllPartitions),
+ rabbit_log:info("Autoheal decision~n"
+ " * Partitions: ~p~n"
+ " * Winner: ~p~n"
+ " * Losers: ~p~n",
+ [AllPartitions, Winner, Losers]),
+ send(Winner, {become_winner, Losers}),
+ [send(L, {winner_is, Winner}) || L <- Losers],
+ not_healing
+ end;
+handle_msg({become_winner, Losers},
+ not_healing, _Partitions) ->
+ rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n",
+ [Losers]),
+ {winner_waiting, Losers, Losers};
+handle_msg({become_winner, Losers},
+ {winner_waiting, WaitFor, Notify}, _Partitions) ->
+ rabbit_log:info("Autoheal: I am the winner, waiting additionally for "
+ "~p to stop~n", [Losers]),
+ {winner_waiting, lists:usort(Losers ++ WaitFor),
+ lists:usort(Losers ++ Notify)};
+handle_msg({winner_is, Winner},
+ not_healing, _Partitions) ->
+ rabbit_log:warning(
+ "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
+ rabbit_node_monitor:run_outside_applications(
+ fun () ->
+ MRef = erlang:monitor(process, {?SERVER, Winner}),
+ rabbit:stop(),
+ send(Winner, {node_stopped, node()}),
+ receive
+ {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok;
+ autoheal_safe_to_start -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ rabbit:start()
+ end),
+ restarting;
+%% This is the winner receiving its last notification that a node has
+%% stopped - all nodes can now start again
+handle_msg({node_stopped, Node},
+ {winner_waiting, [Node], Notify}, _Partitions) ->
+ rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
+ [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
+ not_healing;
+handle_msg({node_stopped, Node},
+ {winner_waiting, WaitFor, Notify}, _Partitions) ->
+ {winner_waiting, WaitFor -- [Node], Notify};
+handle_msg(_, restarting, _Partitions) ->
+ %% ignore, we can contribute no further
+ restarting;
+handle_msg({node_stopped, _Node}, State, _Partitions) ->
+ %% ignore, we already cancelled the autoheal process
+ State.
+send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}.
+make_decision(AllPartitions) ->
+ Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
+ [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
+ {Winner, lists:append(Rest)}.
+partition_value(Partition) ->
+ Connections = [Res || Node <- Partition,
+ Res <- [rpc:call(Node, rabbit_networking,
+ connections_local, [])],
+ is_list(Res)],
+ {length(lists:append(Connections)), length(Partition)}.
+%% We have our local understanding of what partitions exist; but we
+%% only know which nodes we have been partitioned from, not which
+%% nodes are partitioned from each other.
+all_partitions(PartitionedWith) ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ Partitions = [{node(), PartitionedWith} |
+ [rpc:call(Node, rabbit_node_monitor, partitions, [])
+ || Node <- Nodes -- [node()]]],
+ all_partitions(Partitions, [Nodes]).
+all_partitions([], Partitions) ->
+ Partitions;
+all_partitions([{Node, CantSee} | Rest], Partitions) ->
+ {[Containing], Others} =
+ lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions),
+ A = Containing -- CantSee,
+ B = Containing -- A,
+ Partitions1 = case {A, B} of
+ {[], _} -> Partitions;
+ {_, []} -> Partitions;
+ _ -> [A, B | Others]
+ end,
+ all_partitions(Rest, Partitions1).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index fb74d4a3..7d844c72 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -30,11 +30,14 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ %% Utils
+-export([all_rabbit_nodes_up/0, run_outside_applications/1]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
--record(state, {monitors, partitions, subscribers, down_ping_timer}).
+-record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}).
@@ -57,6 +60,9 @@
-spec(partitions/0 :: () -> {node(), [node()]}).
-spec(subscribe/1 :: (pid()) -> 'ok').
+-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
+-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
@@ -194,10 +200,12 @@ init([]) ->
%% writing out the cluster status files - bad things can then
%% happen.
process_flag(trap_exit, true),
+ net_kernel:monitor_nodes(true),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
- partitions = []}}.
+ partitions = [],
+ autoheal = rabbit_autoheal:init()}}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, {node(), Partitions}, State};
@@ -251,16 +259,22 @@ handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
ok = handle_dead_rabbit(Node),
[P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
{noreply, handle_dead_rabbit_state(
+ Node,
State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
+handle_info({nodedown, Node}, State) ->
+ ok = handle_dead_node(Node),
+ {noreply, State};
{inconsistent_database, running_partitioned_network, Node}},
State = #state{partitions = Partitions,
- monitors = Monitors}) ->
+ monitors = Monitors,
+ autoheal = AState}) ->
%% We will not get a node_up from this node - yet we should treat it as
%% up (mostly).
State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
@@ -271,7 +285,13 @@ handle_info({mnesia_system_event,
ok = handle_live_rabbit(Node),
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State1#state{partitions = Partitions1}};
+ {noreply, State1#state{partitions = Partitions1,
+ autoheal = rabbit_autoheal:maybe_start(AState)}};
+handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState,
+ partitions = Partitions}) ->
+ AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
+ {noreply, State#state{autoheal = AState1}};
handle_info(ping_nodes, State) ->
%% We ping nodes when some are down to ensure that we find out
@@ -318,6 +338,18 @@ handle_dead_rabbit(Node) ->
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node),
+ ok.
+handle_dead_node(_Node) ->
+ %% In general in rabbit_node_monitor we care about whether the
+ %% rabbit application is up rather than the node; we do this so
+ %% that we can respond in the same way to "rabbitmqctl stop_app"
+ %% and "rabbitmqctl stop" as much as possible.
+ %%
+ %% However, for pause_minority mode we can't do this, since we
+ %% depend on looking at whether other nodes are up to decide
+ %% whether to come back up ourselves - if we decide that based on
+ %% the rabbit application we would go down and never come back.
case application:get_env(rabbit, cluster_partition_handling) of
{ok, pause_minority} ->
case majority() of
@@ -326,44 +358,32 @@ handle_dead_rabbit(Node) ->
{ok, ignore} ->
+ {ok, autoheal} ->
+ ok;
{ok, Term} ->
rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
"assuming 'ignore'~n", [Term]),
- end,
- ok.
-majority() ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
-all_nodes_up() ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- length(alive_nodes(Nodes)) =:= length(Nodes).
-%% mnesia:system_info(db_nodes) (and hence
-%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results
-%% when partitioned.
-alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)).
-alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
-alive_rabbit_nodes() ->
- [N || N <- alive_nodes(), rabbit_nodes:is_process_running(N, rabbit)].
+ end.
await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
Nodes = rabbit_mnesia:cluster_nodes(all),
+ run_outside_applications(fun () ->
+ rabbit:stop(),
+ wait_for_cluster_recovery(Nodes)
+ end).
+run_outside_applications(Fun) ->
spawn(fun () ->
%% If our group leader is inside an application we are about
%% to stop, application:stop/1 does not return.
group_leader(whereis(init), self()),
- %% Ensure only one restarting process at a time, will
+ %% Ensure only one such process at a time, will
%% exit(badarg) (harmlessly) if one is already running
- register(rabbit_restarting_process, self()),
- rabbit:stop(),
- wait_for_cluster_recovery(Nodes)
+ register(rabbit_outside_app_process, self()),
+ Fun()
wait_for_cluster_recovery(Nodes) ->
@@ -373,7 +393,8 @@ wait_for_cluster_recovery(Nodes) ->
-handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
+handle_dead_rabbit_state(Node, State = #state{partitions = Partitions,
+ autoheal = Autoheal}) ->
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -383,7 +404,9 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
[] -> [];
_ -> Partitions
- ensure_ping_timer(State#state{partitions = Partitions1}).
+ ensure_ping_timer(
+ State#state{partitions = Partitions1,
+ autoheal = rabbit_autoheal:node_down(Node, Autoheal)}).
ensure_ping_timer(State) ->
@@ -416,3 +439,30 @@ legacy_should_be_disc_node(DiscNodes) ->
add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
del_node(Node, Nodes) -> Nodes -- [Node].
+%% mnesia:system_info(db_nodes) (and hence
+%% rabbit_mnesia:cluster_nodes(running)) does not give reliable
+%% results when partitioned. So we have a small set of replacement
+%% functions here. "rabbit" in a function's name implies we test if
+%% the rabbit application is up, not just the node.
+majority() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
+all_nodes_up() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ length(alive_nodes(Nodes)) =:= length(Nodes).
+all_rabbit_nodes_up() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
+alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
+alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
+alive_rabbit_nodes(Nodes) ->
+ [N || N <- alive_nodes(Nodes), rabbit_nodes:is_process_running(N, rabbit)].