diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-04-17 16:26:34 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-04-17 16:26:34 +0100 |
commit | c3291ec23ade84322093c7f67ca17f14353c9ca6 (patch) | |
tree | e0d3e7f9a4e85e6ab36439a9f1aef1d831cb0a5c | |
parent | d3f6a2dc0ecce0ff7ced9f4bbfea0276c8f53043 (diff) | |
download | rabbitmq-server-c3291ec23ade84322093c7f67ca17f14353c9ca6.tar.gz |
First pass at splitting all the autoheal stuff out into a separate module.
-rw-r--r-- | src/rabbit_autoheal.erl | 208 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 203 |
2 files changed, 223 insertions, 188 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl new file mode 100644 index 00000000..4a6307fc --- /dev/null +++ b/src/rabbit_autoheal.erl @@ -0,0 +1,208 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_autoheal). + +-export([init/0, maybe_start/1, node_down/2, handle_msg/3]). + +%% The named process we are running in. +-define(SERVER, rabbit_node_monitor). + +%%---------------------------------------------------------------------------- + +%% In order to autoheal we want to: +%% +%% * Find the winning partition +%% * Stop all nodes in other partitions +%% * Wait for them all to be stopped +%% * Start them again +%% +%% To keep things simple, we assume all nodes are up. We don't start +%% unless all nodes are up, and if a node goes down we abandon the +%% whole process. To further keep things simple we also defer the +%% decision as to the winning node to the "leader" - arbitrarily +%% selected as the first node in the cluster. +%% +%% To coordinate the restarting nodes we pick a special node from the +%% winning partition - the "winner". Restarting nodes then stop, tell +%% the winner they have done so, and wait for it to tell them it is +%% safe to start again. +%% +%% The winner and the leader are not necessarily the same node! Since +%% the leader may end up restarting, we also make sure that it does +%% not announce its decision (and thus cue other nodes to restart) +%% until it has seen a request from every node that has experienced a +%% partition. + +%%---------------------------------------------------------------------------- + +init() -> not_healing. + +maybe_start(not_healing) -> + case enabled() andalso rabbit_node_monitor:all_nodes_up() of + true -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)), + rabbit_log:info("Autoheal: leader is ~p~n", [Leader]), + send(Leader, {request_winner, node()}), + case node() of + Leader -> not_healing; + _ -> wait_for_winner + end; + false -> not_healing + end; +maybe_start(State) -> + State. + +enabled() -> + {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling). + +node_down(_Node, {wait_for, _Nodes, _Notify} = Autoheal) -> + Autoheal; +node_down(_Node, not_healing) -> + not_healing; +node_down(Node, _State) -> + rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), + not_healing. + +handle_msg({request_winner, Node}, + not_healing, Partitions) -> + case rabbit_node_monitor:all_nodes_up() of + false -> not_healing; + true -> Nodes = rabbit_mnesia:cluster_nodes(all), + Partitioned = + [N || N <- Nodes -- [node()], + P <- [begin + {_, R} = rpc:call(N, rabbit_node_monitor, + partitions, []), + R + end], + is_list(P) andalso length(P) > 0], + Partitioned1 = case Partitions of + [] -> Partitioned; + _ -> [node() | Partitioned] + end, + rabbit_log:info( + "Autoheal leader start; partitioned nodes are ~p~n", + [Partitioned1]), + handle_msg({request_winner, Node}, + {wait_for_winner_reqs, Partitioned1, Partitioned1}, + Partitions) + end; + +handle_msg({request_winner, Node}, + {wait_for_winner_reqs, [Node], Notify}, Partitions) -> + AllPartitions = all_partitions(Partitions), + Winner = select_winner(AllPartitions), + rabbit_log:info("Autoheal request winner from ~p~n" + " Partitions were determined to be ~p~n" + " Winner is ~p~n", [Node, AllPartitions, Winner]), + [send(N, {winner, Winner}) || N <- Notify], + wait_for_winner; + +handle_msg({request_winner, Node}, + {wait_for_winner_reqs, Nodes, Notify}, _Partitions) -> + rabbit_log:info("Autoheal request winner from ~p~n", [Node]), + {wait_for_winner_reqs, Nodes -- [Node], Notify}; + +handle_msg({winner, Winner}, + wait_for_winner, Partitions) -> + case lists:member(Winner, Partitions) of + false -> case node() of + Winner -> rabbit_log:info( + "Autoheal: waiting for nodes to stop: ~p~n", + [Partitions]), + {wait_for, Partitions, Partitions}; + _ -> rabbit_log:info( + "Autoheal: nothing to do~n", []), + not_healing + end; + true -> restart_me(Winner), + restarting + end; + +handle_msg({winner, _Winner}, State, _Partitions) -> + %% ignore, we already cancelled the autoheal process + State; + +handle_msg({node_stopped, Node}, + {wait_for, [Node], Notify}, _Partitions) -> + rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), + [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], + not_healing; + +handle_msg({node_stopped, Node}, + {wait_for, WaitFor, Notify}, _Partitions) -> + {wait_for, WaitFor -- [Node], Notify}; + +handle_msg({node_stopped, _Node}, State, _Partitions) -> + %% ignore, we already cancelled the autoheal process + State. + +%%---------------------------------------------------------------------------- + +send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}. + +select_winner(AllPartitions) -> + {_, [Winner | _]} = + hd(lists:reverse( + lists:sort([{partition_value(P), P} || P <- AllPartitions]))), + Winner. + +partition_value(Partition) -> + Connections = [Res || Node <- Partition, + Res <- [rpc:call(Node, rabbit_networking, + connections_local, [])], + is_list(Res)], + {length(lists:append(Connections)), length(Partition)}. + +restart_me(Winner) -> + rabbit_log:warning( + "Autoheal: we were selected to restart; winner is ~p~n", [Winner]), + rabbit_node_monitor:run_outside_applications( + fun () -> + MRef = erlang:monitor(process, {?SERVER, Winner}), + rabbit:stop(), + send(Winner, {node_stopped, node()}), + receive + {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok; + autoheal_safe_to_start -> ok + end, + erlang:demonitor(MRef, [flush]), + rabbit:start() + end). + +%% We have our local understanding of what partitions exist; but we +%% only know which nodes we have been partitioned from, not which +%% nodes are partitioned from each other. +all_partitions(PartitionedWith) -> + Nodes = rabbit_mnesia:cluster_nodes(all), + Partitions = [{node(), PartitionedWith} | + [rpc:call(Node, rabbit_node_monitor, partitions, []) + || Node <- Nodes -- [node()]]], + all_partitions(Partitions, [Nodes]). + +all_partitions([], Partitions) -> + Partitions; +all_partitions([{Node, CantSee} | Rest], Partitions) -> + {[Containing], Others} = + lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions), + A = Containing -- CantSee, + B = Containing -- A, + Partitions1 = case {A, B} of + {[], _} -> Partitions; + {_, []} -> Partitions; + _ -> [A, B | Others] + end, + all_partitions(Rest, Partitions1). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 2cdde14e..61df9f66 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -30,6 +30,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + %% Utils +-export([all_nodes_up/0, run_outside_applications/1]). + -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). -define(RABBIT_DOWN_PING_INTERVAL, 1000). @@ -198,7 +201,7 @@ init([]) -> {ok, #state{monitors = pmon:new(), subscribers = pmon:new(), partitions = [], - autoheal = not_healing}}. + autoheal = rabbit_autoheal:init()}}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> {reply, {node(), Partitions}, State}; @@ -262,7 +265,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, handle_info({mnesia_system_event, {inconsistent_database, running_partitioned_network, Node}}, State = #state{partitions = Partitions, - monitors = Monitors}) -> + monitors = Monitors, + autoheal = AState}) -> %% We will not get a node_up from this node - yet we should treat it as %% up (mostly). State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of @@ -271,96 +275,15 @@ handle_info({mnesia_system_event, monitors = pmon:monitor({rabbit, Node}, Monitors)} end, ok = handle_live_rabbit(Node), - State2 = case application:get_env(rabbit, cluster_partition_handling) of - {ok, autoheal} -> case all_nodes_up() of - true -> autoheal(State1); - false -> State1 - end; - _ -> State1 - end, Partitions1 = ordsets:to_list( ordsets:add_element(Node, ordsets:from_list(Partitions))), - {noreply, State2#state{partitions = Partitions1}}; - -handle_info({autoheal_request_winner, Node}, - State = #state{autoheal = not_healing, - partitions = Partitions}) -> - case all_nodes_up() of - false -> {noreply, State}; - true -> Nodes = rabbit_mnesia:cluster_nodes(all), - Partitioned = - [N || N <- Nodes -- [node()], - P <- [begin - {_, R} = rpc:call(N, rabbit_node_monitor, - partitions, []), - R - end], - is_list(P) andalso length(P) > 0], - Partitioned1 = case Partitions of - [] -> Partitioned; - _ -> [node() | Partitioned] - end, - rabbit_log:info( - "Autoheal leader start; partitioned nodes are ~p~n", - [Partitioned1]), - Autoheal = {wait_for_winner_reqs, Partitioned1, Partitioned1}, - handle_info({autoheal_request_winner, Node}, - State#state{autoheal = Autoheal}) - end; - -handle_info({autoheal_request_winner, Node}, - State = #state{autoheal = {wait_for_winner_reqs, [Node], Notify}, - partitions = Partitions}) -> - AllPartitions = all_partitions(Partitions), - Winner = autoheal_select_winner(AllPartitions), - rabbit_log:info("Autoheal request winner from ~p~n" - " Partitions were determined to be ~p~n" - " Winner is ~p~n", [Node, AllPartitions, Winner]), - [{?MODULE, N} ! {autoheal_winner, Winner} || N <- Notify], - {noreply, State#state{autoheal = wait_for_winner}}; - -handle_info({autoheal_request_winner, Node}, - State = #state{autoheal = {wait_for_winner_reqs, Nodes, Notify}}) -> - rabbit_log:info("Autoheal request winner from ~p~n", [Node]), - {noreply, State#state{autoheal = {wait_for_winner_reqs, - Nodes -- [Node], Notify}}}; - -handle_info({autoheal_winner, Winner}, - State = #state{autoheal = wait_for_winner, - partitions = Partitions}) -> - case lists:member(Winner, Partitions) of - false -> case node() of - Winner -> rabbit_log:info( - "Autoheal: waiting for nodes to stop: ~p~n", - [Partitions]), - {noreply, - State#state{autoheal = {wait_for, Partitions, - Partitions}}}; - _ -> rabbit_log:info( - "Autoheal: nothing to do~n", []), - {noreply, State#state{autoheal = not_healing}} - end; - true -> autoheal_restart(Winner), - {noreply, State} - end; - -handle_info({autoheal_winner, _Winner}, State) -> - %% ignore, we already cancelled the autoheal process - {noreply, State}; - -handle_info({autoheal_node_stopped, Node}, - State = #state{autoheal = {wait_for, [Node], Notify}}) -> - rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), - [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], - {noreply, State#state{autoheal = not_healing}}; - -handle_info({autoheal_node_stopped, Node}, - State = #state{autoheal = {wait_for, WaitFor, Notify}}) -> - {noreply, State#state{autoheal = {wait_for, WaitFor -- [Node], Notify}}}; + {noreply, State1#state{partitions = Partitions1, + autoheal = rabbit_autoheal:maybe_start(AState)}}; -handle_info({autoheal_node_stopped, _Node}, State) -> - %% ignore, we already cancelled the autoheal process - {noreply, State}; +handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState, + partitions = Partitions}) -> + AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions), + {noreply, State#state{autoheal = AState1}}; handle_info(ping_nodes, State) -> %% We ping nodes when some are down to ensure that we find out @@ -469,93 +392,6 @@ wait_for_cluster_recovery(Nodes) -> wait_for_cluster_recovery(Nodes) end. -%% In order to autoheal we want to: -%% -%% * Find the winning partition -%% * Stop all nodes in other partitions -%% * Wait for them all to be stopped -%% * Start them again -%% -%% To keep things simple, we assume all nodes are up. We don't start -%% unless all nodes are up, and if a node goes down we abandon the -%% whole process. To further keep things simple we also defer the -%% decision as to the winning node to the "leader" - arbitrarily -%% selected as the first node in the cluster. -%% -%% To coordinate the restarting nodes we pick a special node from the -%% winning partition - the "winner". Restarting nodes then stop, tell -%% the winner they have done so, and wait for it to tell them it is -%% safe to start again. -%% -%% The winner and the leader are not necessarily the same node! Since -%% the leader may end up restarting, we also make sure that it does -%% not announce its decision (and thus cue other nodes to restart) -%% until it has seen a request from every node that has experienced a -%% partition. -autoheal(State = #state{autoheal = not_healing}) -> - [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)), - rabbit_log:info("Autoheal: leader is ~p~n", [Leader]), - {?MODULE, Leader} ! {autoheal_request_winner, node()}, - State#state{autoheal = case node() of - Leader -> not_healing; - _ -> wait_for_winner - end}; -autoheal(State) -> - State. - -autoheal_select_winner(AllPartitions) -> - {_, [Winner | _]} = - hd(lists:reverse( - lists:sort([{autoheal_value(P), P} || P <- AllPartitions]))), - Winner. - -autoheal_value(Partition) -> - Connections = [Res || Node <- Partition, - Res <- [rpc:call(Node, rabbit_networking, - connections_local, [])], - is_list(Res)], - {length(lists:append(Connections)), length(Partition)}. - -autoheal_restart(Winner) -> - rabbit_log:warning( - "Autoheal: we were selected to restart; winner is ~p~n", [Winner]), - run_outside_applications( - fun () -> - MRef = erlang:monitor(process, {?MODULE, Winner}), - rabbit:stop(), - {?MODULE, Winner} ! {autoheal_node_stopped, node()}, - receive - {'DOWN', MRef, process, {?MODULE, Winner}, _Reason} -> ok; - autoheal_safe_to_start -> ok - end, - erlang:demonitor(MRef, [flush]), - rabbit:start() - end). - -%% We have our local understanding of what partitions exist; but we -%% only know which nodes we have been partitioned from, not which -%% nodes are partitioned from each other. -all_partitions(PartitionedWith) -> - Nodes = rabbit_mnesia:cluster_nodes(all), - Partitions = [{node(), PartitionedWith} | - [rpc:call(Node, rabbit_node_monitor, partitions, []) - || Node <- Nodes -- [node()]]], - all_partitions(Partitions, [Nodes]). - -all_partitions([], Partitions) -> - Partitions; -all_partitions([{Node, CantSee} | Rest], Partitions) -> - {[Containing], Others} = - lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions), - A = Containing -- CantSee, - B = Containing -- A, - Partitions1 = case {A, B} of - {[], _} -> Partitions; - {_, []} -> Partitions; - _ -> [A, B | Others] - end, - all_partitions(Rest, Partitions1). - handle_dead_rabbit_state(Node, State = #state{partitions = Partitions, autoheal = Autoheal}) -> %% If we have been partitioned, and we are now in the only remaining @@ -567,18 +403,9 @@ handle_dead_rabbit_state(Node, State = #state{partitions = Partitions, [] -> []; _ -> Partitions end, - Autoheal1 = case Autoheal of - {wait_for, _Nodes, _Notify} -> - Autoheal; - not_healing -> - not_healing; - _ -> - rabbit_log:info( - "Autoheal: aborting - ~p went down~n", [Node]), - not_healing - end, - ensure_ping_timer(State#state{partitions = Partitions1, - autoheal = Autoheal1}). + ensure_ping_timer( + State#state{partitions = Partitions1, + autoheal = rabbit_autoheal:node_down(Node, Autoheal)}). ensure_ping_timer(State) -> rabbit_misc:ensure_timer( |