diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-03-27 15:56:58 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-03-27 15:56:58 +0000 |
commit | 8a610a9f1fde47a273438ec58ffbc2b242a1f49f (patch) | |
tree | f76ac50cd20204a6f7a3bdedc247c8e0af32c48e | |
parent | 85c13eb713440de1d8b24ef1c5be9cceb0e1d8f5 (diff) | |
parent | c1039e1e1d64228b8ae93acc3acb3d919997bd88 (diff) | |
download | rabbitmq-server-8a610a9f1fde47a273438ec58ffbc2b242a1f49f.tar.gz |
Merge default
-rw-r--r-- | src/rabbit_node_monitor.erl | 170 |
1 files changed, 161 insertions, 9 deletions
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index fb74d4a3..ffd02fc4 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -34,7 +34,7 @@ -define(RABBIT_UP_RPC_TIMEOUT, 2000). -define(RABBIT_DOWN_PING_INTERVAL, 1000). --record(state, {monitors, partitions, subscribers, down_ping_timer}). +-record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}). %%---------------------------------------------------------------------------- @@ -197,7 +197,8 @@ init([]) -> {ok, _} = mnesia:subscribe(system), {ok, #state{monitors = pmon:new(), subscribers = pmon:new(), - partitions = []}}. + partitions = [], + autoheal = not_healing}}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> {reply, {node(), Partitions}, State}; @@ -269,9 +270,68 @@ handle_info({mnesia_system_event, monitors = pmon:monitor({rabbit, Node}, Monitors)} end, ok = handle_live_rabbit(Node), + State2 = case application:get_env(rabbit, cluster_partition_handling) of + {ok, autoheal} -> case all_nodes_up() of + true -> autoheal(State1); + false -> State1 + end; + _ -> State1 + end, Partitions1 = ordsets:to_list( ordsets:add_element(Node, ordsets:from_list(Partitions))), - {noreply, State1#state{partitions = Partitions1}}; + {noreply, State2#state{partitions = Partitions1}}; + +handle_info({autoheal_request_winner, Node}, + State = #state{autoheal = {wait_for_winner_reqs,[Node], Notify}, + partitions = Partitions}) -> + Winner = autoheal_select_winner(all_partitions(Partitions)), + rabbit_log:info("Autoheal request winner from ~p: winner is ~p~n", + [Node, Winner]), + [{?MODULE, N} ! {autoheal_winner, Winner} || N <- Notify], + {noreply, State#state{autoheal = wait_for_winner}}; + +handle_info({autoheal_request_winner, Node}, + State = #state{autoheal = {wait_for_winner_reqs, Nodes, Notify}}) -> + rabbit_log:info("Autoheal request winner from ~p~n", [Node]), + {noreply, State#state{autoheal = {wait_for_winner_reqs, + Nodes -- [Node], Notify}}}; + +handle_info({autoheal_winner, Winner}, + State = #state{autoheal = wait_for_winner, + partitions = Partitions}) -> + case lists:member(Winner, Partitions) of + false -> case node() of + Winner -> rabbit_log:info( + "Autoheal: waiting for nodes to stop: ~p~n", + [Partitions]), + {noreply, + State#state{autoheal = {wait_for, Partitions, + Partitions}}}; + _ -> rabbit_log:info( + "Autoheal: nothing to do~n", []), + {noreply, State#state{autoheal = not_healing}} + end; + true -> autoheal_restart(Winner), + {noreply, State} + end; + +handle_info({autoheal_winner, _Winner}, State) -> + %% ignore, we already cancelled the autoheal process + {noreply, State}; + +handle_info({autoheal_node_stopped, Node}, + State = #state{autoheal = {wait_for, [Node], Notify}}) -> + rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), + [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], + {noreply, State#state{autoheal = not_healing}}; + +handle_info({autoheal_node_stopped, Node}, + State = #state{autoheal = {wait_for, WaitFor, Notify}}) -> + {noreply, State#state{autoheal = {wait_for, WaitFor -- [Node], Notify}}}; + +handle_info({autoheal_node_stopped, _Node}, State) -> + %% ignore, we already cancelled the autoheal process + {noreply, State}; handle_info(ping_nodes, State) -> %% We ping nodes when some are down to ensure that we find out @@ -326,6 +386,8 @@ handle_dead_rabbit(Node) -> end; {ok, ignore} -> ok; + {ok, autoheal} -> + ok; {ok, Term} -> rabbit_log:warning("cluster_partition_handling ~p unrecognised, " "assuming 'ignore'~n", [Term]), @@ -355,15 +417,20 @@ await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), Nodes = rabbit_mnesia:cluster_nodes(all), + run_outside_applications(fun () -> + rabbit:stop(), + wait_for_cluster_recovery(Nodes) + end). + +run_outside_applications(Fun) -> spawn(fun () -> %% If our group leader is inside an application we are about %% to stop, application:stop/1 does not return. group_leader(whereis(init), self()), - %% Ensure only one restarting process at a time, will + %% Ensure only one such process at a time, will %% exit(badarg) (harmlessly) if one is already running - register(rabbit_restarting_process, self()), - rabbit:stop(), - wait_for_cluster_recovery(Nodes) + register(rabbit_outside_app_process, self()), + Fun() end). wait_for_cluster_recovery(Nodes) -> @@ -373,7 +440,87 @@ wait_for_cluster_recovery(Nodes) -> wait_for_cluster_recovery(Nodes) end. -handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> +%% In order to autoheal we want to: +%% +%% * Find the winning partition +%% * Stop all nodes in other partitions +%% * Wait for them all to be stopped +%% * Start them again +%% +%% To keep things simple, we assume all nodes are up. We don't start +%% unless all nodes are up, and if a node goes down we abandon the +%% whole process. To further keep things simple we also defer the +%% decision as to the winning node to the "leader" - arbitrarily +%% selected as the first node in the cluster. +%% +%% To coordinate the restarting nodes we pick a special node from the +%% winning partition - the "winner". Restarting nodes then stop, tell +%% the winner they have done so, and wait for it to tell them it is +%% safe to start again. +%% +%% The winner and the leader are not necessarily the same node! Since +%% the leader may end up restarting, we also make sure that it does +%% not announce its decision (and thus cue other nodes to restart) +%% until it has seen a request from every node. +autoheal(State = #state{autoheal = not_healing}) -> + [Leader | _] = All = lists:usort(rabbit_mnesia:cluster_nodes(all)), + rabbit_log:info("Autoheal: leader is ~p~n", [Leader]), + {?MODULE, Leader} ! {autoheal_request_winner, node()}, + State#state{autoheal = case node() of + Leader -> {wait_for_winner_reqs, All, All}; + _ -> wait_for_winner + end}; +autoheal(State) -> + State. + +autoheal_select_winner(AllPartitions) -> + {_, [Winner | _]} = hd(lists:sort( + [{autoheal_value(P), P} || P <- AllPartitions])), + Winner. + +autoheal_value(Partition) -> + Connections = [Res || Node <- Partition, + Res <- [rpc:call(Node, rabbit_networking, + connections_local, [])], + is_list(Res)], + {length(lists:append(Connections)), length(Partition)}. + +autoheal_restart(Winner) -> + rabbit_log:warning( + "Autoheal: we were selected to restart; winner is ~p~n", [Winner]), + run_outside_applications( + fun () -> + MRef = erlang:monitor(process, {?MODULE, Winner}), + rabbit:stop(), + {?MODULE, Winner} ! {autoheal_node_stopped, node()}, + receive + {'DOWN', MRef, process, {?MODULE, Winner}, _Reason} -> ok; + autoheal_safe_to_start -> ok + end, + erlang:demonitor(MRef, [flush]), + rabbit:start() + end). + +%% We have our local understanding of what partitions exist; but we +%% only know which nodes we have been partitioned from, not which +%% nodes are partitioned from each other. +%% +%% Note that here we assume that partition information is +%% consistent. If it isn't, what can we do? +all_partitions(PartitionedWith) -> + All = rabbit_mnesia:cluster_nodes(all), + OurPartition = All -- PartitionedWith, + all_partitions([OurPartition], PartitionedWith, All). + +all_partitions(AllPartitions, [], _) -> + AllPartitions; +all_partitions(AllPartitions, [One | _] = ToDo, All) -> + {One, PartitionedFrom} = rpc:call(One, rabbit_node_monitor, partitions, []), + Partition = All -- PartitionedFrom, + all_partitions([Partition | AllPartitions], ToDo -- Partition, All). + +handle_dead_rabbit_state(State = #state{partitions = Partitions, + autoheal = Autoheal}) -> %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -383,7 +530,12 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> [] -> []; _ -> Partitions end, - ensure_ping_timer(State#state{partitions = Partitions1}). + ensure_ping_timer( + State#state{partitions = Partitions1, + autoheal = case Autoheal of + {wait_for, _Nodes, _Notify} -> Autoheal; + _ -> not_healing + end}). ensure_ping_timer(State) -> rabbit_misc:ensure_timer( |