summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-04-17 16:26:34 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-04-17 16:26:34 +0100
commitc3291ec23ade84322093c7f67ca17f14353c9ca6 (patch)
treee0d3e7f9a4e85e6ab36439a9f1aef1d831cb0a5c
parentd3f6a2dc0ecce0ff7ced9f4bbfea0276c8f53043 (diff)
downloadrabbitmq-server-c3291ec23ade84322093c7f67ca17f14353c9ca6.tar.gz
First pass at splitting all the autoheal stuff out into a separate module.
-rw-r--r--src/rabbit_autoheal.erl208
-rw-r--r--src/rabbit_node_monitor.erl203
2 files changed, 223 insertions, 188 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
new file mode 100644
index 00000000..4a6307fc
--- /dev/null
+++ b/src/rabbit_autoheal.erl
@@ -0,0 +1,208 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_autoheal).
+
+-export([init/0, maybe_start/1, node_down/2, handle_msg/3]).
+
+%% The named process we are running in.
+-define(SERVER, rabbit_node_monitor).
+
+%%----------------------------------------------------------------------------
+
+%% In order to autoheal we want to:
+%%
+%% * Find the winning partition
+%% * Stop all nodes in other partitions
+%% * Wait for them all to be stopped
+%% * Start them again
+%%
+%% To keep things simple, we assume all nodes are up. We don't start
+%% unless all nodes are up, and if a node goes down we abandon the
+%% whole process. To further keep things simple we also defer the
+%% decision as to the winning node to the "leader" - arbitrarily
+%% selected as the first node in the cluster.
+%%
+%% To coordinate the restarting nodes we pick a special node from the
+%% winning partition - the "winner". Restarting nodes then stop, tell
+%% the winner they have done so, and wait for it to tell them it is
+%% safe to start again.
+%%
+%% The winner and the leader are not necessarily the same node! Since
+%% the leader may end up restarting, we also make sure that it does
+%% not announce its decision (and thus cue other nodes to restart)
+%% until it has seen a request from every node that has experienced a
+%% partition.
+
+%%----------------------------------------------------------------------------
+
+init() -> not_healing.
+
+maybe_start(not_healing) ->
+ case enabled() andalso rabbit_node_monitor:all_nodes_up() of
+ true -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)),
+ rabbit_log:info("Autoheal: leader is ~p~n", [Leader]),
+ send(Leader, {request_winner, node()}),
+ case node() of
+ Leader -> not_healing;
+ _ -> wait_for_winner
+ end;
+ false -> not_healing
+ end;
+maybe_start(State) ->
+ State.
+
+enabled() ->
+ {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling).
+
+node_down(_Node, {wait_for, _Nodes, _Notify} = Autoheal) ->
+ Autoheal;
+node_down(_Node, not_healing) ->
+ not_healing;
+node_down(Node, _State) ->
+ rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
+ not_healing.
+
+handle_msg({request_winner, Node},
+ not_healing, Partitions) ->
+ case rabbit_node_monitor:all_nodes_up() of
+ false -> not_healing;
+ true -> Nodes = rabbit_mnesia:cluster_nodes(all),
+ Partitioned =
+ [N || N <- Nodes -- [node()],
+ P <- [begin
+ {_, R} = rpc:call(N, rabbit_node_monitor,
+ partitions, []),
+ R
+ end],
+ is_list(P) andalso length(P) > 0],
+ Partitioned1 = case Partitions of
+ [] -> Partitioned;
+ _ -> [node() | Partitioned]
+ end,
+ rabbit_log:info(
+ "Autoheal leader start; partitioned nodes are ~p~n",
+ [Partitioned1]),
+ handle_msg({request_winner, Node},
+ {wait_for_winner_reqs, Partitioned1, Partitioned1},
+ Partitions)
+ end;
+
+handle_msg({request_winner, Node},
+ {wait_for_winner_reqs, [Node], Notify}, Partitions) ->
+ AllPartitions = all_partitions(Partitions),
+ Winner = select_winner(AllPartitions),
+ rabbit_log:info("Autoheal request winner from ~p~n"
+ " Partitions were determined to be ~p~n"
+ " Winner is ~p~n", [Node, AllPartitions, Winner]),
+ [send(N, {winner, Winner}) || N <- Notify],
+ wait_for_winner;
+
+handle_msg({request_winner, Node},
+ {wait_for_winner_reqs, Nodes, Notify}, _Partitions) ->
+ rabbit_log:info("Autoheal request winner from ~p~n", [Node]),
+ {wait_for_winner_reqs, Nodes -- [Node], Notify};
+
+handle_msg({winner, Winner},
+ wait_for_winner, Partitions) ->
+ case lists:member(Winner, Partitions) of
+ false -> case node() of
+ Winner -> rabbit_log:info(
+ "Autoheal: waiting for nodes to stop: ~p~n",
+ [Partitions]),
+ {wait_for, Partitions, Partitions};
+ _ -> rabbit_log:info(
+ "Autoheal: nothing to do~n", []),
+ not_healing
+ end;
+ true -> restart_me(Winner),
+ restarting
+ end;
+
+handle_msg({winner, _Winner}, State, _Partitions) ->
+ %% ignore, we already cancelled the autoheal process
+ State;
+
+handle_msg({node_stopped, Node},
+ {wait_for, [Node], Notify}, _Partitions) ->
+ rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
+ [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
+ not_healing;
+
+handle_msg({node_stopped, Node},
+ {wait_for, WaitFor, Notify}, _Partitions) ->
+ {wait_for, WaitFor -- [Node], Notify};
+
+handle_msg({node_stopped, _Node}, State, _Partitions) ->
+ %% ignore, we already cancelled the autoheal process
+ State.
+
+%%----------------------------------------------------------------------------
+
+send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}.
+
+select_winner(AllPartitions) ->
+ {_, [Winner | _]} =
+ hd(lists:reverse(
+ lists:sort([{partition_value(P), P} || P <- AllPartitions]))),
+ Winner.
+
+partition_value(Partition) ->
+ Connections = [Res || Node <- Partition,
+ Res <- [rpc:call(Node, rabbit_networking,
+ connections_local, [])],
+ is_list(Res)],
+ {length(lists:append(Connections)), length(Partition)}.
+
+restart_me(Winner) ->
+ rabbit_log:warning(
+ "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
+ rabbit_node_monitor:run_outside_applications(
+ fun () ->
+ MRef = erlang:monitor(process, {?SERVER, Winner}),
+ rabbit:stop(),
+ send(Winner, {node_stopped, node()}),
+ receive
+ {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok;
+ autoheal_safe_to_start -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ rabbit:start()
+ end).
+
+%% We have our local understanding of what partitions exist; but we
+%% only know which nodes we have been partitioned from, not which
+%% nodes are partitioned from each other.
+all_partitions(PartitionedWith) ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ Partitions = [{node(), PartitionedWith} |
+ [rpc:call(Node, rabbit_node_monitor, partitions, [])
+ || Node <- Nodes -- [node()]]],
+ all_partitions(Partitions, [Nodes]).
+
+all_partitions([], Partitions) ->
+ Partitions;
+all_partitions([{Node, CantSee} | Rest], Partitions) ->
+ {[Containing], Others} =
+ lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions),
+ A = Containing -- CantSee,
+ B = Containing -- A,
+ Partitions1 = case {A, B} of
+ {[], _} -> Partitions;
+ {_, []} -> Partitions;
+ _ -> [A, B | Others]
+ end,
+ all_partitions(Rest, Partitions1).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 2cdde14e..61df9f66 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -30,6 +30,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
+ %% Utils
+-export([all_nodes_up/0, run_outside_applications/1]).
+
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
@@ -198,7 +201,7 @@ init([]) ->
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
partitions = [],
- autoheal = not_healing}}.
+ autoheal = rabbit_autoheal:init()}}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, {node(), Partitions}, State};
@@ -262,7 +265,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
handle_info({mnesia_system_event,
{inconsistent_database, running_partitioned_network, Node}},
State = #state{partitions = Partitions,
- monitors = Monitors}) ->
+ monitors = Monitors,
+ autoheal = AState}) ->
%% We will not get a node_up from this node - yet we should treat it as
%% up (mostly).
State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
@@ -271,96 +275,15 @@ handle_info({mnesia_system_event,
monitors = pmon:monitor({rabbit, Node}, Monitors)}
end,
ok = handle_live_rabbit(Node),
- State2 = case application:get_env(rabbit, cluster_partition_handling) of
- {ok, autoheal} -> case all_nodes_up() of
- true -> autoheal(State1);
- false -> State1
- end;
- _ -> State1
- end,
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State2#state{partitions = Partitions1}};
-
-handle_info({autoheal_request_winner, Node},
- State = #state{autoheal = not_healing,
- partitions = Partitions}) ->
- case all_nodes_up() of
- false -> {noreply, State};
- true -> Nodes = rabbit_mnesia:cluster_nodes(all),
- Partitioned =
- [N || N <- Nodes -- [node()],
- P <- [begin
- {_, R} = rpc:call(N, rabbit_node_monitor,
- partitions, []),
- R
- end],
- is_list(P) andalso length(P) > 0],
- Partitioned1 = case Partitions of
- [] -> Partitioned;
- _ -> [node() | Partitioned]
- end,
- rabbit_log:info(
- "Autoheal leader start; partitioned nodes are ~p~n",
- [Partitioned1]),
- Autoheal = {wait_for_winner_reqs, Partitioned1, Partitioned1},
- handle_info({autoheal_request_winner, Node},
- State#state{autoheal = Autoheal})
- end;
-
-handle_info({autoheal_request_winner, Node},
- State = #state{autoheal = {wait_for_winner_reqs, [Node], Notify},
- partitions = Partitions}) ->
- AllPartitions = all_partitions(Partitions),
- Winner = autoheal_select_winner(AllPartitions),
- rabbit_log:info("Autoheal request winner from ~p~n"
- " Partitions were determined to be ~p~n"
- " Winner is ~p~n", [Node, AllPartitions, Winner]),
- [{?MODULE, N} ! {autoheal_winner, Winner} || N <- Notify],
- {noreply, State#state{autoheal = wait_for_winner}};
-
-handle_info({autoheal_request_winner, Node},
- State = #state{autoheal = {wait_for_winner_reqs, Nodes, Notify}}) ->
- rabbit_log:info("Autoheal request winner from ~p~n", [Node]),
- {noreply, State#state{autoheal = {wait_for_winner_reqs,
- Nodes -- [Node], Notify}}};
-
-handle_info({autoheal_winner, Winner},
- State = #state{autoheal = wait_for_winner,
- partitions = Partitions}) ->
- case lists:member(Winner, Partitions) of
- false -> case node() of
- Winner -> rabbit_log:info(
- "Autoheal: waiting for nodes to stop: ~p~n",
- [Partitions]),
- {noreply,
- State#state{autoheal = {wait_for, Partitions,
- Partitions}}};
- _ -> rabbit_log:info(
- "Autoheal: nothing to do~n", []),
- {noreply, State#state{autoheal = not_healing}}
- end;
- true -> autoheal_restart(Winner),
- {noreply, State}
- end;
-
-handle_info({autoheal_winner, _Winner}, State) ->
- %% ignore, we already cancelled the autoheal process
- {noreply, State};
-
-handle_info({autoheal_node_stopped, Node},
- State = #state{autoheal = {wait_for, [Node], Notify}}) ->
- rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
- [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
- {noreply, State#state{autoheal = not_healing}};
-
-handle_info({autoheal_node_stopped, Node},
- State = #state{autoheal = {wait_for, WaitFor, Notify}}) ->
- {noreply, State#state{autoheal = {wait_for, WaitFor -- [Node], Notify}}};
+ {noreply, State1#state{partitions = Partitions1,
+ autoheal = rabbit_autoheal:maybe_start(AState)}};
-handle_info({autoheal_node_stopped, _Node}, State) ->
- %% ignore, we already cancelled the autoheal process
- {noreply, State};
+handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState,
+ partitions = Partitions}) ->
+ AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
+ {noreply, State#state{autoheal = AState1}};
handle_info(ping_nodes, State) ->
%% We ping nodes when some are down to ensure that we find out
@@ -469,93 +392,6 @@ wait_for_cluster_recovery(Nodes) ->
wait_for_cluster_recovery(Nodes)
end.
-%% In order to autoheal we want to:
-%%
-%% * Find the winning partition
-%% * Stop all nodes in other partitions
-%% * Wait for them all to be stopped
-%% * Start them again
-%%
-%% To keep things simple, we assume all nodes are up. We don't start
-%% unless all nodes are up, and if a node goes down we abandon the
-%% whole process. To further keep things simple we also defer the
-%% decision as to the winning node to the "leader" - arbitrarily
-%% selected as the first node in the cluster.
-%%
-%% To coordinate the restarting nodes we pick a special node from the
-%% winning partition - the "winner". Restarting nodes then stop, tell
-%% the winner they have done so, and wait for it to tell them it is
-%% safe to start again.
-%%
-%% The winner and the leader are not necessarily the same node! Since
-%% the leader may end up restarting, we also make sure that it does
-%% not announce its decision (and thus cue other nodes to restart)
-%% until it has seen a request from every node that has experienced a
-%% partition.
-autoheal(State = #state{autoheal = not_healing}) ->
- [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)),
- rabbit_log:info("Autoheal: leader is ~p~n", [Leader]),
- {?MODULE, Leader} ! {autoheal_request_winner, node()},
- State#state{autoheal = case node() of
- Leader -> not_healing;
- _ -> wait_for_winner
- end};
-autoheal(State) ->
- State.
-
-autoheal_select_winner(AllPartitions) ->
- {_, [Winner | _]} =
- hd(lists:reverse(
- lists:sort([{autoheal_value(P), P} || P <- AllPartitions]))),
- Winner.
-
-autoheal_value(Partition) ->
- Connections = [Res || Node <- Partition,
- Res <- [rpc:call(Node, rabbit_networking,
- connections_local, [])],
- is_list(Res)],
- {length(lists:append(Connections)), length(Partition)}.
-
-autoheal_restart(Winner) ->
- rabbit_log:warning(
- "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
- run_outside_applications(
- fun () ->
- MRef = erlang:monitor(process, {?MODULE, Winner}),
- rabbit:stop(),
- {?MODULE, Winner} ! {autoheal_node_stopped, node()},
- receive
- {'DOWN', MRef, process, {?MODULE, Winner}, _Reason} -> ok;
- autoheal_safe_to_start -> ok
- end,
- erlang:demonitor(MRef, [flush]),
- rabbit:start()
- end).
-
-%% We have our local understanding of what partitions exist; but we
-%% only know which nodes we have been partitioned from, not which
-%% nodes are partitioned from each other.
-all_partitions(PartitionedWith) ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- Partitions = [{node(), PartitionedWith} |
- [rpc:call(Node, rabbit_node_monitor, partitions, [])
- || Node <- Nodes -- [node()]]],
- all_partitions(Partitions, [Nodes]).
-
-all_partitions([], Partitions) ->
- Partitions;
-all_partitions([{Node, CantSee} | Rest], Partitions) ->
- {[Containing], Others} =
- lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions),
- A = Containing -- CantSee,
- B = Containing -- A,
- Partitions1 = case {A, B} of
- {[], _} -> Partitions;
- {_, []} -> Partitions;
- _ -> [A, B | Others]
- end,
- all_partitions(Rest, Partitions1).
-
handle_dead_rabbit_state(Node, State = #state{partitions = Partitions,
autoheal = Autoheal}) ->
%% If we have been partitioned, and we are now in the only remaining
@@ -567,18 +403,9 @@ handle_dead_rabbit_state(Node, State = #state{partitions = Partitions,
[] -> [];
_ -> Partitions
end,
- Autoheal1 = case Autoheal of
- {wait_for, _Nodes, _Notify} ->
- Autoheal;
- not_healing ->
- not_healing;
- _ ->
- rabbit_log:info(
- "Autoheal: aborting - ~p went down~n", [Node]),
- not_healing
- end,
- ensure_ping_timer(State#state{partitions = Partitions1,
- autoheal = Autoheal1}).
+ ensure_ping_timer(
+ State#state{partitions = Partitions1,
+ autoheal = rabbit_autoheal:node_down(Node, Autoheal)}).
ensure_ping_timer(State) ->
rabbit_misc:ensure_timer(