diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-06-27 16:12:43 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-06-27 16:12:43 +0100 |
commit | 6b99f2f174c4c980540275beecf62c053a38f7df (patch) | |
tree | 86c6802f385f0ae88c4ef794f282ee0b60205a17 | |
parent | cabc2b8d44787f5a567d0b0c2b8027cbaafd27c0 (diff) | |
parent | eff0e28206975dc2a5088e008563ffabdcb2357c (diff) | |
download | rabbitmq-server-6b99f2f174c4c980540275beecf62c053a38f7df.tar.gz |
Merge bug26225
-rw-r--r-- | src/rabbit_autoheal.erl | 1 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 31 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 27 |
3 files changed, 45 insertions, 14 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index 826bfc45..c5237d34 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -118,6 +118,7 @@ node_down(Node, _State) -> handle_msg({request_start, Node}, not_healing, Partitions) -> rabbit_log:info("Autoheal request received from ~p~n", [Node]), + rabbit_node_monitor:ping_all(), case rabbit_node_monitor:all_rabbit_nodes_up() of false -> not_healing; true -> AllPartitions = all_partitions(Partitions), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9082dbd3..0791bbe2 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -16,8 +16,8 @@ -module(rabbit_networking). --export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2, - stop_tcp_listener/1, on_node_down/1, active_listeners/0, +-export([boot/0, start/0, killall/0, start_tcp_listener/1, start_ssl_listener/2, + on_node_down/1, active_listeners/0, node_listeners/1, register_connection/1, unregister_connection/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, @@ -60,10 +60,10 @@ -type(label() :: string()). -spec(start/0 :: () -> 'ok'). +-spec(killall/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(start_ssl_listener/2 :: (listener_config(), rabbit_types:infos()) -> 'ok'). --spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). -spec(register_connection/1 :: (pid()) -> ok). @@ -145,6 +145,25 @@ start() -> rabbit_sup:start_supervisor_child( [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]). +%% We are going to stop for pause-minority, so we are already +%% compromised; anything we confirm from now on is not going to be +%% remembered after we come back. Since rabbit:stop/0 may take a while +%% to gracefully shut down, we should stop talking to the outside +%% world *immediately*. +killall() -> + %% Stop ASAP + kill_connections(), + {ok, TCPListeners} = application:get_env(rabbit, tcp_listeners), + {ok, SSLListeners} = application:get_env(rabbit, ssl_listeners), + [stop_listener(L) || L <- TCPListeners ++ SSLListeners], + %% In case anything reconnected while we were stopping listeners + kill_connections(), + ok. + +kill_connections() -> + Conns = connections_local() ++ rabbit_direct:list_local(), + [exit(P, kill) || P <- Conns]. + ensure_ssl() -> {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), ok = app_utils:start_applications(SslAppsConfig), @@ -245,12 +264,12 @@ start_listener0(Address, Protocol, Label, OnConnect) -> {rabbit_misc:ntoa(IPAddress), Port}}) end. -stop_tcp_listener(Listener) -> - [stop_tcp_listener0(Address) || +stop_listener(Listener) -> + [stop_listener0(Address) || Address <- tcp_listener_addresses(Listener)], ok. -stop_tcp_listener0({IPAddress, Port, _Family}) -> +stop_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 14961478..22b0c280 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -31,7 +31,7 @@ code_change/3]). %% Utils --export([all_rabbit_nodes_up/0, run_outside_applications/1]). +-export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -63,6 +63,7 @@ -spec(all_rabbit_nodes_up/0 :: () -> boolean()). -spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). +-spec(ping_all/0 :: () -> 'ok'). -endif. @@ -301,12 +302,11 @@ handle_info(ping_nodes, State) -> %% to ping the nodes that are up, after all. State1 = State#state{down_ping_timer = undefined}, Self = self(), - %% all_nodes_up() both pings all the nodes and tells us if we need to again. - %% %% We ping in a separate process since in a partition it might %% take some noticeable length of time and we don't want to block %% the node monitor for that long. spawn_link(fun () -> + ping_all(), case all_nodes_up() of true -> ok; false -> Self ! ping_again @@ -361,10 +361,10 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), - Nodes = rabbit_mnesia:cluster_nodes(all), run_outside_applications(fun () -> + rabbit_networking:killall(), rabbit:stop(), - wait_for_cluster_recovery(Nodes) + wait_for_cluster_recovery() end), ok. @@ -381,11 +381,12 @@ run_outside_applications(Fun) -> end end). -wait_for_cluster_recovery(Nodes) -> +wait_for_cluster_recovery() -> + ping_all(), case majority() of true -> rabbit:start(); false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL), - wait_for_cluster_recovery(Nodes) + wait_for_cluster_recovery() end. handle_dead_rabbit(Node, State = #state{partitions = Partitions, @@ -453,6 +454,11 @@ del_node(Node, Nodes) -> Nodes -- [Node]. %% functions here. "rabbit" in a function's name implies we test if %% the rabbit application is up, not just the node. +%% As we use these functions to decide what to do in pause_minority +%% state, they *must* be fast, even in the case where TCP connections +%% are timing out. So that means we should be careful about whether we +%% connect to nodes which are currently disconnected. + majority() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_nodes(Nodes)) / length(Nodes) > 0.5. @@ -465,9 +471,14 @@ all_rabbit_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). -alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. +alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])]. alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). alive_rabbit_nodes(Nodes) -> [N || N <- alive_nodes(Nodes), rabbit:is_running(N)]. + +%% This one is allowed to connect! +ping_all() -> + [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)], + ok. |