summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-27 15:56:58 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-27 15:56:58 +0000
commit8a610a9f1fde47a273438ec58ffbc2b242a1f49f (patch)
treef76ac50cd20204a6f7a3bdedc247c8e0af32c48e
parent85c13eb713440de1d8b24ef1c5be9cceb0e1d8f5 (diff)
parentc1039e1e1d64228b8ae93acc3acb3d919997bd88 (diff)
downloadrabbitmq-server-8a610a9f1fde47a273438ec58ffbc2b242a1f49f.tar.gz
Merge default
-rw-r--r--src/rabbit_node_monitor.erl170
1 files changed, 161 insertions, 9 deletions
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index fb74d4a3..ffd02fc4 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -34,7 +34,7 @@
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
--record(state, {monitors, partitions, subscribers, down_ping_timer}).
+-record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}).
%%----------------------------------------------------------------------------
@@ -197,7 +197,8 @@ init([]) ->
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
- partitions = []}}.
+ partitions = [],
+ autoheal = not_healing}}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, {node(), Partitions}, State};
@@ -269,9 +270,68 @@ handle_info({mnesia_system_event,
monitors = pmon:monitor({rabbit, Node}, Monitors)}
end,
ok = handle_live_rabbit(Node),
+ State2 = case application:get_env(rabbit, cluster_partition_handling) of
+ {ok, autoheal} -> case all_nodes_up() of
+ true -> autoheal(State1);
+ false -> State1
+ end;
+ _ -> State1
+ end,
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State1#state{partitions = Partitions1}};
+ {noreply, State2#state{partitions = Partitions1}};
+
+handle_info({autoheal_request_winner, Node},
+ State = #state{autoheal = {wait_for_winner_reqs,[Node], Notify},
+ partitions = Partitions}) ->
+ Winner = autoheal_select_winner(all_partitions(Partitions)),
+ rabbit_log:info("Autoheal request winner from ~p: winner is ~p~n",
+ [Node, Winner]),
+ [{?MODULE, N} ! {autoheal_winner, Winner} || N <- Notify],
+ {noreply, State#state{autoheal = wait_for_winner}};
+
+handle_info({autoheal_request_winner, Node},
+ State = #state{autoheal = {wait_for_winner_reqs, Nodes, Notify}}) ->
+ rabbit_log:info("Autoheal request winner from ~p~n", [Node]),
+ {noreply, State#state{autoheal = {wait_for_winner_reqs,
+ Nodes -- [Node], Notify}}};
+
+handle_info({autoheal_winner, Winner},
+ State = #state{autoheal = wait_for_winner,
+ partitions = Partitions}) ->
+ case lists:member(Winner, Partitions) of
+ false -> case node() of
+ Winner -> rabbit_log:info(
+ "Autoheal: waiting for nodes to stop: ~p~n",
+ [Partitions]),
+ {noreply,
+ State#state{autoheal = {wait_for, Partitions,
+ Partitions}}};
+ _ -> rabbit_log:info(
+ "Autoheal: nothing to do~n", []),
+ {noreply, State#state{autoheal = not_healing}}
+ end;
+ true -> autoheal_restart(Winner),
+ {noreply, State}
+ end;
+
+handle_info({autoheal_winner, _Winner}, State) ->
+ %% ignore, we already cancelled the autoheal process
+ {noreply, State};
+
+handle_info({autoheal_node_stopped, Node},
+ State = #state{autoheal = {wait_for, [Node], Notify}}) ->
+ rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
+ [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
+ {noreply, State#state{autoheal = not_healing}};
+
+handle_info({autoheal_node_stopped, Node},
+ State = #state{autoheal = {wait_for, WaitFor, Notify}}) ->
+ {noreply, State#state{autoheal = {wait_for, WaitFor -- [Node], Notify}}};
+
+handle_info({autoheal_node_stopped, _Node}, State) ->
+ %% ignore, we already cancelled the autoheal process
+ {noreply, State};
handle_info(ping_nodes, State) ->
%% We ping nodes when some are down to ensure that we find out
@@ -326,6 +386,8 @@ handle_dead_rabbit(Node) ->
end;
{ok, ignore} ->
ok;
+ {ok, autoheal} ->
+ ok;
{ok, Term} ->
rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
"assuming 'ignore'~n", [Term]),
@@ -355,15 +417,20 @@ await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
[]),
Nodes = rabbit_mnesia:cluster_nodes(all),
+ run_outside_applications(fun () ->
+ rabbit:stop(),
+ wait_for_cluster_recovery(Nodes)
+ end).
+
+run_outside_applications(Fun) ->
spawn(fun () ->
%% If our group leader is inside an application we are about
%% to stop, application:stop/1 does not return.
group_leader(whereis(init), self()),
- %% Ensure only one restarting process at a time, will
+ %% Ensure only one such process at a time, will
%% exit(badarg) (harmlessly) if one is already running
- register(rabbit_restarting_process, self()),
- rabbit:stop(),
- wait_for_cluster_recovery(Nodes)
+ register(rabbit_outside_app_process, self()),
+ Fun()
end).
wait_for_cluster_recovery(Nodes) ->
@@ -373,7 +440,87 @@ wait_for_cluster_recovery(Nodes) ->
wait_for_cluster_recovery(Nodes)
end.
-handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
+%% In order to autoheal we want to:
+%%
+%% * Find the winning partition
+%% * Stop all nodes in other partitions
+%% * Wait for them all to be stopped
+%% * Start them again
+%%
+%% To keep things simple, we assume all nodes are up. We don't start
+%% unless all nodes are up, and if a node goes down we abandon the
+%% whole process. To further keep things simple we also defer the
+%% decision as to the winning node to the "leader" - arbitrarily
+%% selected as the first node in the cluster.
+%%
+%% To coordinate the restarting nodes we pick a special node from the
+%% winning partition - the "winner". Restarting nodes then stop, tell
+%% the winner they have done so, and wait for it to tell them it is
+%% safe to start again.
+%%
+%% The winner and the leader are not necessarily the same node! Since
+%% the leader may end up restarting, we also make sure that it does
+%% not announce its decision (and thus cue other nodes to restart)
+%% until it has seen a request from every node.
+autoheal(State = #state{autoheal = not_healing}) ->
+ [Leader | _] = All = lists:usort(rabbit_mnesia:cluster_nodes(all)),
+ rabbit_log:info("Autoheal: leader is ~p~n", [Leader]),
+ {?MODULE, Leader} ! {autoheal_request_winner, node()},
+ State#state{autoheal = case node() of
+ Leader -> {wait_for_winner_reqs, All, All};
+ _ -> wait_for_winner
+ end};
+autoheal(State) ->
+ State.
+
+autoheal_select_winner(AllPartitions) ->
+ {_, [Winner | _]} = hd(lists:sort(
+ [{autoheal_value(P), P} || P <- AllPartitions])),
+ Winner.
+
+autoheal_value(Partition) ->
+ Connections = [Res || Node <- Partition,
+ Res <- [rpc:call(Node, rabbit_networking,
+ connections_local, [])],
+ is_list(Res)],
+ {length(lists:append(Connections)), length(Partition)}.
+
+autoheal_restart(Winner) ->
+ rabbit_log:warning(
+ "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
+ run_outside_applications(
+ fun () ->
+ MRef = erlang:monitor(process, {?MODULE, Winner}),
+ rabbit:stop(),
+ {?MODULE, Winner} ! {autoheal_node_stopped, node()},
+ receive
+ {'DOWN', MRef, process, {?MODULE, Winner}, _Reason} -> ok;
+ autoheal_safe_to_start -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ rabbit:start()
+ end).
+
+%% We have our local understanding of what partitions exist; but we
+%% only know which nodes we have been partitioned from, not which
+%% nodes are partitioned from each other.
+%%
+%% Note that here we assume that partition information is
+%% consistent. If it isn't, what can we do?
+all_partitions(PartitionedWith) ->
+ All = rabbit_mnesia:cluster_nodes(all),
+ OurPartition = All -- PartitionedWith,
+ all_partitions([OurPartition], PartitionedWith, All).
+
+all_partitions(AllPartitions, [], _) ->
+ AllPartitions;
+all_partitions(AllPartitions, [One | _] = ToDo, All) ->
+ {One, PartitionedFrom} = rpc:call(One, rabbit_node_monitor, partitions, []),
+ Partition = All -- PartitionedFrom,
+ all_partitions([Partition | AllPartitions], ToDo -- Partition, All).
+
+handle_dead_rabbit_state(State = #state{partitions = Partitions,
+ autoheal = Autoheal}) ->
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -383,7 +530,12 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
[] -> [];
_ -> Partitions
end,
- ensure_ping_timer(State#state{partitions = Partitions1}).
+ ensure_ping_timer(
+ State#state{partitions = Partitions1,
+ autoheal = case Autoheal of
+ {wait_for, _Nodes, _Notify} -> Autoheal;
+ _ -> not_healing
+ end}).
ensure_ping_timer(State) ->
rabbit_misc:ensure_timer(