summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-06-27 16:12:43 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-06-27 16:12:43 +0100
commit6b99f2f174c4c980540275beecf62c053a38f7df (patch)
tree86c6802f385f0ae88c4ef794f282ee0b60205a17
parentcabc2b8d44787f5a567d0b0c2b8027cbaafd27c0 (diff)
parenteff0e28206975dc2a5088e008563ffabdcb2357c (diff)
downloadrabbitmq-server-6b99f2f174c4c980540275beecf62c053a38f7df.tar.gz
Merge bug26225
-rw-r--r--src/rabbit_autoheal.erl1
-rw-r--r--src/rabbit_networking.erl31
-rw-r--r--src/rabbit_node_monitor.erl27
3 files changed, 45 insertions, 14 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 826bfc45..c5237d34 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -118,6 +118,7 @@ node_down(Node, _State) ->
handle_msg({request_start, Node},
not_healing, Partitions) ->
rabbit_log:info("Autoheal request received from ~p~n", [Node]),
+ rabbit_node_monitor:ping_all(),
case rabbit_node_monitor:all_rabbit_nodes_up() of
false -> not_healing;
true -> AllPartitions = all_partitions(Partitions),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9082dbd3..0791bbe2 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -16,8 +16,8 @@
-module(rabbit_networking).
--export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
- stop_tcp_listener/1, on_node_down/1, active_listeners/0,
+-export([boot/0, start/0, killall/0, start_tcp_listener/1, start_ssl_listener/2,
+ on_node_down/1, active_listeners/0,
node_listeners/1, register_connection/1, unregister_connection/1,
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
@@ -60,10 +60,10 @@
-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
+-spec(killall/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(start_ssl_listener/2 ::
(listener_config(), rabbit_types:infos()) -> 'ok').
--spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
-spec(register_connection/1 :: (pid()) -> ok).
@@ -145,6 +145,25 @@ start() -> rabbit_sup:start_supervisor_child(
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]).
+%% We are going to stop for pause-minority, so we are already
+%% compromised; anything we confirm from now on is not going to be
+%% remembered after we come back. Since rabbit:stop/0 may take a while
+%% to gracefully shut down, we should stop talking to the outside
+%% world *immediately*.
+killall() ->
+ %% Stop ASAP
+ kill_connections(),
+ {ok, TCPListeners} = application:get_env(rabbit, tcp_listeners),
+ {ok, SSLListeners} = application:get_env(rabbit, ssl_listeners),
+ [stop_listener(L) || L <- TCPListeners ++ SSLListeners],
+ %% In case anything reconnected while we were stopping listeners
+ kill_connections(),
+ ok.
+
+kill_connections() ->
+ Conns = connections_local() ++ rabbit_direct:list_local(),
+ [exit(P, kill) || P <- Conns].
+
ensure_ssl() ->
{ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
ok = app_utils:start_applications(SslAppsConfig),
@@ -245,12 +264,12 @@ start_listener0(Address, Protocol, Label, OnConnect) ->
{rabbit_misc:ntoa(IPAddress), Port}})
end.
-stop_tcp_listener(Listener) ->
- [stop_tcp_listener0(Address) ||
+stop_listener(Listener) ->
+ [stop_listener0(Address) ||
Address <- tcp_listener_addresses(Listener)],
ok.
-stop_tcp_listener0({IPAddress, Port, _Family}) ->
+stop_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 14961478..22b0c280 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -31,7 +31,7 @@
code_change/3]).
%% Utils
--export([all_rabbit_nodes_up/0, run_outside_applications/1]).
+-export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -63,6 +63,7 @@
-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
+-spec(ping_all/0 :: () -> 'ok').
-endif.
@@ -301,12 +302,11 @@ handle_info(ping_nodes, State) ->
%% to ping the nodes that are up, after all.
State1 = State#state{down_ping_timer = undefined},
Self = self(),
- %% all_nodes_up() both pings all the nodes and tells us if we need to again.
- %%
%% We ping in a separate process since in a partition it might
%% take some noticeable length of time and we don't want to block
%% the node monitor for that long.
spawn_link(fun () ->
+ ping_all(),
case all_nodes_up() of
true -> ok;
false -> Self ! ping_again
@@ -361,10 +361,10 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
[]),
- Nodes = rabbit_mnesia:cluster_nodes(all),
run_outside_applications(fun () ->
+ rabbit_networking:killall(),
rabbit:stop(),
- wait_for_cluster_recovery(Nodes)
+ wait_for_cluster_recovery()
end),
ok.
@@ -381,11 +381,12 @@ run_outside_applications(Fun) ->
end
end).
-wait_for_cluster_recovery(Nodes) ->
+wait_for_cluster_recovery() ->
+ ping_all(),
case majority() of
true -> rabbit:start();
false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
- wait_for_cluster_recovery(Nodes)
+ wait_for_cluster_recovery()
end.
handle_dead_rabbit(Node, State = #state{partitions = Partitions,
@@ -453,6 +454,11 @@ del_node(Node, Nodes) -> Nodes -- [Node].
%% functions here. "rabbit" in a function's name implies we test if
%% the rabbit application is up, not just the node.
+%% As we use these functions to decide what to do in pause_minority
+%% state, they *must* be fast, even in the case where TCP connections
+%% are timing out. So that means we should be careful about whether we
+%% connect to nodes which are currently disconnected.
+
majority() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
@@ -465,9 +471,14 @@ all_rabbit_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
-alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
+alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
alive_rabbit_nodes(Nodes) ->
[N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
+
+%% This one is allowed to connect!
+ping_all() ->
+ [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
+ ok.