diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-04-23 13:12:03 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-04-23 13:12:03 +0100 |
commit | 8fff2197fc071404825845135cc48a83ec239914 (patch) | |
tree | b29cbc474627bf1323db2ba7fddb85b4680163c2 | |
parent | b1dfca1296df55c729ca2a24eabd532ca5d17f08 (diff) | |
parent | d35534e6a7a0a04fc2ed68061d9c2508864a0865 (diff) | |
download | rabbitmq-server-8fff2197fc071404825845135cc48a83ec239914.tar.gz |
merge bug25471 into default
-rw-r--r-- | src/rabbit_autoheal.erl | 199 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 110 |
2 files changed, 279 insertions, 30 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl new file mode 100644 index 00000000..c00c2dd6 --- /dev/null +++ b/src/rabbit_autoheal.erl @@ -0,0 +1,199 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_autoheal). + +-export([init/0, maybe_start/1, node_down/2, handle_msg/3]). + +%% The named process we are running in. +-define(SERVER, rabbit_node_monitor). + +%%---------------------------------------------------------------------------- + +%% In order to autoheal we want to: +%% +%% * Find the winning partition +%% * Stop all nodes in other partitions +%% * Wait for them all to be stopped +%% * Start them again +%% +%% To keep things simple, we assume all nodes are up. We don't start +%% unless all nodes are up, and if a node goes down we abandon the +%% whole process. To further keep things simple we also defer the +%% decision as to the winning node to the "leader" - arbitrarily +%% selected as the first node in the cluster. +%% +%% To coordinate the restarting nodes we pick a special node from the +%% winning partition - the "winner". Restarting nodes then stop, tell +%% the winner they have done so, and wait for it to tell them it is +%% safe to start again. +%% +%% The winner and the leader are not necessarily the same node! Since +%% the leader may end up restarting, we also make sure that it does +%% not announce its decision (and thus cue other nodes to restart) +%% until it has seen a request from every node that has experienced a +%% partition. +%% +%% Possible states: +%% +%% not_healing +%% - the default +%% +%% {winner_waiting, OutstandingStops, Notify} +%% - we are the winner and are waiting for all losing nodes to stop +%% before telling them they can restart +%% +%% restarting +%% - we are restarting. Of course the node monitor immediately dies +%% then so this state does not last long. We therefore send the +%% autoheal_safe_to_start message to the rabbit_outside_app_process +%% instead. + +%%---------------------------------------------------------------------------- + +init() -> not_healing. + +maybe_start(not_healing) -> + case enabled() of + true -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)), + send(Leader, {request_start, node()}), + rabbit_log:info("Autoheal request sent to ~p~n", [Leader]), + not_healing; + false -> not_healing + end; +maybe_start(State) -> + State. + +enabled() -> + {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling). + +node_down(_Node, {winner_waiting, _Nodes, _Notify} = Autoheal) -> + Autoheal; +node_down(_Node, not_healing) -> + not_healing; +node_down(Node, _State) -> + rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), + not_healing. + +%% By receiving this message we become the leader +%% TODO should we try to debounce this? +handle_msg({request_start, Node}, + not_healing, Partitions) -> + rabbit_log:info("Autoheal request received from ~p~n", [Node]), + case rabbit_node_monitor:all_rabbit_nodes_up() of + false -> not_healing; + true -> AllPartitions = all_partitions(Partitions), + {Winner, Losers} = make_decision(AllPartitions), + rabbit_log:info("Autoheal decision~n" + " * Partitions: ~p~n" + " * Winner: ~p~n" + " * Losers: ~p~n", + [AllPartitions, Winner, Losers]), + send(Winner, {become_winner, Losers}), + [send(L, {winner_is, Winner}) || L <- Losers], + not_healing + end; + +handle_msg({become_winner, Losers}, + not_healing, _Partitions) -> + rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n", + [Losers]), + {winner_waiting, Losers, Losers}; + +handle_msg({become_winner, Losers}, + {winner_waiting, WaitFor, Notify}, _Partitions) -> + rabbit_log:info("Autoheal: I am the winner, waiting additionally for " + "~p to stop~n", [Losers]), + {winner_waiting, lists:usort(Losers ++ WaitFor), + lists:usort(Losers ++ Notify)}; + +handle_msg({winner_is, Winner}, + not_healing, _Partitions) -> + rabbit_log:warning( + "Autoheal: we were selected to restart; winner is ~p~n", [Winner]), + rabbit_node_monitor:run_outside_applications( + fun () -> + MRef = erlang:monitor(process, {?SERVER, Winner}), + rabbit:stop(), + send(Winner, {node_stopped, node()}), + receive + {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok; + autoheal_safe_to_start -> ok + end, + erlang:demonitor(MRef, [flush]), + rabbit:start() + end), + restarting; + +%% This is the winner receiving its last notification that a node has +%% stopped - all nodes can now start again +handle_msg({node_stopped, Node}, + {winner_waiting, [Node], Notify}, _Partitions) -> + rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), + [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], + not_healing; + +handle_msg({node_stopped, Node}, + {winner_waiting, WaitFor, Notify}, _Partitions) -> + {winner_waiting, WaitFor -- [Node], Notify}; + +handle_msg(_, restarting, _Partitions) -> + %% ignore, we can contribute no further + restarting; + +handle_msg({node_stopped, _Node}, State, _Partitions) -> + %% ignore, we already cancelled the autoheal process + State. + +%%---------------------------------------------------------------------------- + +send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}. + +make_decision(AllPartitions) -> + Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), + [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), + {Winner, lists:append(Rest)}. + +partition_value(Partition) -> + Connections = [Res || Node <- Partition, + Res <- [rpc:call(Node, rabbit_networking, + connections_local, [])], + is_list(Res)], + {length(lists:append(Connections)), length(Partition)}. + +%% We have our local understanding of what partitions exist; but we +%% only know which nodes we have been partitioned from, not which +%% nodes are partitioned from each other. +all_partitions(PartitionedWith) -> + Nodes = rabbit_mnesia:cluster_nodes(all), + Partitions = [{node(), PartitionedWith} | + [rpc:call(Node, rabbit_node_monitor, partitions, []) + || Node <- Nodes -- [node()]]], + all_partitions(Partitions, [Nodes]). + +all_partitions([], Partitions) -> + Partitions; +all_partitions([{Node, CantSee} | Rest], Partitions) -> + {[Containing], Others} = + lists:partition(fun (Part) -> lists:member(Node, Part) end, Partitions), + A = Containing -- CantSee, + B = Containing -- A, + Partitions1 = case {A, B} of + {[], _} -> Partitions; + {_, []} -> Partitions; + _ -> [A, B | Others] + end, + all_partitions(Rest, Partitions1). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index fb74d4a3..7d844c72 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -30,11 +30,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + %% Utils +-export([all_rabbit_nodes_up/0, run_outside_applications/1]). + -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). -define(RABBIT_DOWN_PING_INTERVAL, 1000). --record(state, {monitors, partitions, subscribers, down_ping_timer}). +-record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}). %%---------------------------------------------------------------------------- @@ -57,6 +60,9 @@ -spec(partitions/0 :: () -> {node(), [node()]}). -spec(subscribe/1 :: (pid()) -> 'ok'). +-spec(all_rabbit_nodes_up/0 :: () -> boolean()). +-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). + -endif. %%---------------------------------------------------------------------------- @@ -194,10 +200,12 @@ init([]) -> %% writing out the cluster status files - bad things can then %% happen. process_flag(trap_exit, true), + net_kernel:monitor_nodes(true), {ok, _} = mnesia:subscribe(system), {ok, #state{monitors = pmon:new(), subscribers = pmon:new(), - partitions = []}}. + partitions = [], + autoheal = rabbit_autoheal:init()}}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> {reply, {node(), Partitions}, State}; @@ -251,16 +259,22 @@ handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, ok = handle_dead_rabbit(Node), [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)], {noreply, handle_dead_rabbit_state( + Node, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{subscribers = Subscribers}) -> {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; +handle_info({nodedown, Node}, State) -> + ok = handle_dead_node(Node), + {noreply, State}; + handle_info({mnesia_system_event, {inconsistent_database, running_partitioned_network, Node}}, State = #state{partitions = Partitions, - monitors = Monitors}) -> + monitors = Monitors, + autoheal = AState}) -> %% We will not get a node_up from this node - yet we should treat it as %% up (mostly). State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of @@ -271,7 +285,13 @@ handle_info({mnesia_system_event, ok = handle_live_rabbit(Node), Partitions1 = ordsets:to_list( ordsets:add_element(Node, ordsets:from_list(Partitions))), - {noreply, State1#state{partitions = Partitions1}}; + {noreply, State1#state{partitions = Partitions1, + autoheal = rabbit_autoheal:maybe_start(AState)}}; + +handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState, + partitions = Partitions}) -> + AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions), + {noreply, State#state{autoheal = AState1}}; handle_info(ping_nodes, State) -> %% We ping nodes when some are down to ensure that we find out @@ -318,6 +338,18 @@ handle_dead_rabbit(Node) -> ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node), ok = rabbit_mnesia:on_node_down(Node), + ok. + +handle_dead_node(_Node) -> + %% In general in rabbit_node_monitor we care about whether the + %% rabbit application is up rather than the node; we do this so + %% that we can respond in the same way to "rabbitmqctl stop_app" + %% and "rabbitmqctl stop" as much as possible. + %% + %% However, for pause_minority mode we can't do this, since we + %% depend on looking at whether other nodes are up to decide + %% whether to come back up ourselves - if we decide that based on + %% the rabbit application we would go down and never come back. case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> case majority() of @@ -326,44 +358,32 @@ handle_dead_rabbit(Node) -> end; {ok, ignore} -> ok; + {ok, autoheal} -> + ok; {ok, Term} -> rabbit_log:warning("cluster_partition_handling ~p unrecognised, " "assuming 'ignore'~n", [Term]), ok - end, - ok. - -majority() -> - Nodes = rabbit_mnesia:cluster_nodes(all), - length(alive_nodes(Nodes)) / length(Nodes) > 0.5. - -all_nodes_up() -> - Nodes = rabbit_mnesia:cluster_nodes(all), - length(alive_nodes(Nodes)) =:= length(Nodes). - -%% mnesia:system_info(db_nodes) (and hence -%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results -%% when partitioned. -alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)). - -alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. - -alive_rabbit_nodes() -> - [N || N <- alive_nodes(), rabbit_nodes:is_process_running(N, rabbit)]. + end. await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), Nodes = rabbit_mnesia:cluster_nodes(all), + run_outside_applications(fun () -> + rabbit:stop(), + wait_for_cluster_recovery(Nodes) + end). + +run_outside_applications(Fun) -> spawn(fun () -> %% If our group leader is inside an application we are about %% to stop, application:stop/1 does not return. group_leader(whereis(init), self()), - %% Ensure only one restarting process at a time, will + %% Ensure only one such process at a time, will %% exit(badarg) (harmlessly) if one is already running - register(rabbit_restarting_process, self()), - rabbit:stop(), - wait_for_cluster_recovery(Nodes) + register(rabbit_outside_app_process, self()), + Fun() end). wait_for_cluster_recovery(Nodes) -> @@ -373,7 +393,8 @@ wait_for_cluster_recovery(Nodes) -> wait_for_cluster_recovery(Nodes) end. -handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> +handle_dead_rabbit_state(Node, State = #state{partitions = Partitions, + autoheal = Autoheal}) -> %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -383,7 +404,9 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> [] -> []; _ -> Partitions end, - ensure_ping_timer(State#state{partitions = Partitions1}). + ensure_ping_timer( + State#state{partitions = Partitions1, + autoheal = rabbit_autoheal:node_down(Node, Autoheal)}). ensure_ping_timer(State) -> rabbit_misc:ensure_timer( @@ -416,3 +439,30 @@ legacy_should_be_disc_node(DiscNodes) -> add_node(Node, Nodes) -> lists:usort([Node | Nodes]). del_node(Node, Nodes) -> Nodes -- [Node]. + +%%-------------------------------------------------------------------- + +%% mnesia:system_info(db_nodes) (and hence +%% rabbit_mnesia:cluster_nodes(running)) does not give reliable +%% results when partitioned. So we have a small set of replacement +%% functions here. "rabbit" in a function's name implies we test if +%% the rabbit application is up, not just the node. + +majority() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + length(alive_nodes(Nodes)) / length(Nodes) > 0.5. + +all_nodes_up() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + length(alive_nodes(Nodes)) =:= length(Nodes). + +all_rabbit_nodes_up() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). + +alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. + +alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). + +alive_rabbit_nodes(Nodes) -> + [N || N <- alive_nodes(Nodes), rabbit_nodes:is_process_running(N, rabbit)]. |