diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-03-19 17:15:07 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-03-19 17:15:07 +0000 |
commit | 7045b90b2e9a2be678b6781db6815f20af120e87 (patch) | |
tree | f37170960239a003ecfeeb0804ec1f0d3e90f91e | |
parent | d8baacd25557f3bee697dd867bd3c32e1d1ba874 (diff) | |
download | rabbitmq-server-7045b90b2e9a2be678b6781db6815f20af120e87.tar.gz |
First pass at autohealing.
-rw-r--r-- | src/rabbit_node_monitor.erl | 132 |
1 files changed, 121 insertions, 11 deletions
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index de53b7f0..aac2bf84 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). --record(state, {monitors, partitions, subscribers}). +-record(state, {monitors, partitions, subscribers, autoheal}). %%---------------------------------------------------------------------------- @@ -196,7 +196,8 @@ init([]) -> {ok, _} = mnesia:subscribe(system), {ok, #state{monitors = pmon:new(), subscribers = pmon:new(), - partitions = []}}. + partitions = [], + autoheal = not_healing}}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> {reply, {node(), Partitions}, State}; @@ -268,9 +269,60 @@ handle_info({mnesia_system_event, monitors = pmon:monitor({rabbit, Node}, Monitors)} end, ok = handle_live_rabbit(Node), + State2 = case application:get_env(rabbit, cluster_partition_handling) of + {ok, autoheal} -> case ratio() of + 1.0 -> autoheal(State1); + _ -> State1 + end; + _ -> State1 + end, Partitions1 = ordsets:to_list( ordsets:add_element(Node, ordsets:from_list(Partitions))), - {noreply, State1#state{partitions = Partitions1}}; + {noreply, State2#state{partitions = Partitions1}}; + +handle_info({autoheal_request_winner, Node}, + State = #state{autoheal = {wait_for_winner_reqs,[Node], Notify}}) -> + %% TODO actually do something sensible to figure out who the winner is + Winner = self(), + [{?MODULE, N} ! {autoheal_winner, Winner} || N <- Notify], + {noreply, State#state{autoheal = wait_for_winner}}; + +handle_info({autoheal_request_winner, Node}, + State = #state{autoheal = {wait_for_winner_reqs, Nodes, Notify}}) -> + {noreply, State#state{autoheal = {wait_for_winner_reqs, + Nodes -- [Node], Notify}}}; + +handle_info({autoheal_winner, Winner}, + State = #state{autoheal = wait_for_winner, + partitions = Partitions}) -> + Node = node(Winner), + case lists:member(Node, Partitions) of + false -> case node() of + Node -> {noreply, + State#state{autoheal = {wait_for, Partitions, + Partitions}}}; + _ -> {noreply, State#state{autoheal = not_healing}} + end; + true -> autoheal_restart(Winner), + {noreply, State} + end; + +handle_info({autoheal_winner, _Winner}, State) -> + %% ignore, we already cancelled the autoheal process + {noreply, State}; + +handle_info({autoheal_node_stopped, Node}, + State = #state{autoheal = {wait_for, [Node], Notify}}) -> + [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], + {noreply, State#state{autoheal = not_healing}}; + +handle_info({autoheal_node_stopped, Node}, + State = #state{autoheal = {wait_for, WaitFor, Notify}}) -> + {noreply, State#state{autoheal = {wait_for, WaitFor -- [Node], Notify}}}; + +handle_info({autoheal_node_stopped, _Node}, State) -> + %% ignore, we already cancelled the autoheal process + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -301,6 +353,8 @@ handle_dead_rabbit(Node) -> end; {ok, ignore} -> ok; + {ok, autoheal} -> + ok; {ok, Term} -> rabbit_log:warning("cluster_partition_handling ~p unrecognised, " "assuming 'ignore'~n", [Term]), @@ -308,8 +362,8 @@ handle_dead_rabbit(Node) -> end, ok. -majority() -> - length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5. +majority() -> ratio() > 0.5. +ratio() -> length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)). %% mnesia:system_info(db_nodes) (and hence %% rabbit_mnesia:cluster_nodes(running)) does not give reliable results @@ -322,15 +376,20 @@ await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), Nodes = rabbit_mnesia:cluster_nodes(all), + run_outside_applications(fun () -> + rabbit:stop(), + wait_for_cluster_recovery(Nodes) + end). + +run_outside_applications(Fun) -> spawn(fun () -> %% If our group leader is inside an application we are about %% to stop, application:stop/1 does not return. group_leader(whereis(init), self()), - %% Ensure only one restarting process at a time, will + %% Ensure only one such process at a time, will %% exit(badarg) (harmlessly) if one is already running - register(rabbit_restarting_process, self()), - rabbit:stop(), - wait_for_cluster_recovery(Nodes) + register(rabbit_outside_app_process, self()), + Fun() end). wait_for_cluster_recovery(Nodes) -> @@ -340,7 +399,54 @@ wait_for_cluster_recovery(Nodes) -> wait_for_cluster_recovery(Nodes) end. -handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> +%% In order to autoheal we want to: +%% +%% * Find the winning partition +%% * Stop all nodes in other partitions +%% * Wait for them all to be stopped +%% * Start them again +%% +%% To keep things simple, we assume all nodes are up. We don't start +%% unless all nodes are up, and if a node goes down we abandon the +%% whole process. To further keep things simple we also defer the +%% decision as to the winning node to the "leader" - arbitrarily +%% selected as the first node in the cluster. +%% +%% To coordinate the restarting nodes we pick a special node from the +%% winning partition - the "winner". Restarting nodes then stop, tell +%% the winner they have done so, and wait for it to tell them it is +%% safe to start again. +%% +%% The winner and the leader are not necessarily the same node! Since +%% the leader may end up restarting, we also make sure that it does +%% not announce its decision (and thus cue other nodes to restart) +%% until it has seen a request from every node. +autoheal(State) -> + [Leader | _] = All = lists:usort(rabbit_mnesia:cluster_nodes(all)), + {?MODULE, Leader} ! {autoheal_request_winner, node()}, + State#state{autoheal = case node() of + Leader -> {wait_for_winner_reqs, All, All}; + _ -> wait_for_winner + end}. + +autoheal_restart(Winner) -> + rabbit_log:warning( + "Cluster partition healed; we were selected to restart~n", []), + run_outside_applications( + fun () -> + MRef = erlang:monitor(process, Winner), + rabbit:stop(), + Winner ! {autoheal_node_stopped, node()}, + receive + {'DOWN', MRef, process, Winner, _Reason} -> ok; + autoheal_safe_to_start -> ok + end, + erlang:demonitor(MRef, [flush]), + rabbit:start() + end). + +handle_dead_rabbit_state(State = #state{partitions = Partitions, + autoheal = Autoheal}) -> %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -350,7 +456,11 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> [] -> []; _ -> Partitions end, - State#state{partitions = Partitions1}. + State#state{partitions = Partitions1, + autoheal = case Autoheal of + {wait_for, _Nodes, _Notify} -> Autoheal; + _ -> not_healing + end}. handle_live_rabbit(Node) -> ok = rabbit_alarm:on_node_up(Node), |