summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-07-22 13:34:31 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-07-22 13:34:31 +0100
commit0d53507cdf5dedcad354e82355618052613588b0 (patch)
tree0788baffc117f4f8c765b4f71e32076886efc834
parent01e7b5198c0ea4ca167f15af0dfb9fd884f7b285 (diff)
downloadrabbitmq-server-0d53507cdf5dedcad354e82355618052613588b0.tar.gz
Check for pause minority mode being about to cause a shutdown before sending confirms. In this way we can get rid of the effort to kill all network connections ASAP.
-rw-r--r--src/rabbit_channel.erl23
-rw-r--r--src/rabbit_networking.erl31
-rw-r--r--src/rabbit_node_monitor.erl44
3 files changed, 64 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74f9cacf..27a7d208 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1565,15 +1565,22 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx = none, confirmed = C}) ->
- MsgSeqNos =
- lists:foldl(
- fun ({MsgSeqNo, XName}, MSNs) ->
- ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State),
- [MsgSeqNo | MSNs]
- end, [], lists:append(C)),
- send_confirms(MsgSeqNos, State#ch{confirmed = []});
+ case rabbit_node_monitor:pause_minority_guard() of
+ ok -> MsgSeqNos =
+ lists:foldl(
+ fun ({MsgSeqNo, XName}, MSNs) ->
+ ?INCR_STATS([{exchange_stats, XName, 1}],
+ confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
+ pausing -> State#ch{confirmed = []}
+ end;
send_confirms(State) ->
- maybe_complete_tx(State).
+ case rabbit_node_monitor:pause_minority_guard() of
+ ok -> maybe_complete_tx(State);
+ pausing -> State
+ end.
send_confirms([], State) ->
State;
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 0791bbe2..9082dbd3 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -16,8 +16,8 @@
-module(rabbit_networking).
--export([boot/0, start/0, killall/0, start_tcp_listener/1, start_ssl_listener/2,
- on_node_down/1, active_listeners/0,
+-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
+ stop_tcp_listener/1, on_node_down/1, active_listeners/0,
node_listeners/1, register_connection/1, unregister_connection/1,
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
@@ -60,10 +60,10 @@
-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
--spec(killall/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(start_ssl_listener/2 ::
(listener_config(), rabbit_types:infos()) -> 'ok').
+-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
-spec(register_connection/1 :: (pid()) -> ok).
@@ -145,25 +145,6 @@ start() -> rabbit_sup:start_supervisor_child(
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]).
-%% We are going to stop for pause-minority, so we are already
-%% compromised; anything we confirm from now on is not going to be
-%% remembered after we come back. Since rabbit:stop/0 may take a while
-%% to gracefully shut down, we should stop talking to the outside
-%% world *immediately*.
-killall() ->
- %% Stop ASAP
- kill_connections(),
- {ok, TCPListeners} = application:get_env(rabbit, tcp_listeners),
- {ok, SSLListeners} = application:get_env(rabbit, ssl_listeners),
- [stop_listener(L) || L <- TCPListeners ++ SSLListeners],
- %% In case anything reconnected while we were stopping listeners
- kill_connections(),
- ok.
-
-kill_connections() ->
- Conns = connections_local() ++ rabbit_direct:list_local(),
- [exit(P, kill) || P <- Conns].
-
ensure_ssl() ->
{ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
ok = app_utils:start_applications(SslAppsConfig),
@@ -264,12 +245,12 @@ start_listener0(Address, Protocol, Label, OnConnect) ->
{rabbit_misc:ntoa(IPAddress), Port}})
end.
-stop_listener(Listener) ->
- [stop_listener0(Address) ||
+stop_tcp_listener(Listener) ->
+ [stop_tcp_listener0(Address) ||
Address <- tcp_listener_addresses(Listener)],
ok.
-stop_listener0({IPAddress, Port, _Family}) ->
+stop_tcp_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 22b0c280..ca843e14 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -25,6 +25,7 @@
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
-export([partitions/0, partitions/1, subscribe/1]).
+-export([pause_minority_guard/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -60,6 +61,7 @@
-spec(partitions/0 :: () -> [node()]).
-spec(partitions/1 :: ([node()]) -> [{node(), [node()]}]).
-spec(subscribe/1 :: (pid()) -> 'ok').
+-spec(pause_minority_guard/0 :: () -> 'ok' | 'pausing').
-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
@@ -193,6 +195,47 @@ subscribe(Pid) ->
gen_server:cast(?SERVER, {subscribe, Pid}).
%%----------------------------------------------------------------------------
+%% pause_minority safety
+%%----------------------------------------------------------------------------
+
+%% If we are in a minority and pause_minority mode then a) we are
+%% going to shut down imminently and b) we should not confirm anything
+%% until then, since anything we confirm is likely to be lost.
+%%
+%% We could confirm something by having an HA queue see the minority
+%% state (and fail over into it) before the node monitor stops us, or
+%% by using unmirrored queues and just having them vanish (and
+%% confiming messages as thrown away).
+%%
+%% So we have channels call in here before issuing confirms, to do a
+%% lightweight check that we have not entered a minority state.
+
+pause_minority_guard() ->
+ case get(pause_minority_guard) of
+ not_minority_mode ->
+ ok;
+ undefined ->
+ {ok, M} = application:get_env(rabbit, cluster_partition_handling),
+ case M of
+ pause_minority -> pause_minority_guard([]);
+ _ -> put(pause_minority_guard, not_minority_mode),
+ ok
+ end;
+ {minority_mode, Nodes} ->
+ pause_minority_guard(Nodes)
+ end.
+
+pause_minority_guard(LastNodes) ->
+ case nodes() of
+ LastNodes -> ok;
+ _ -> put(pause_minority_guard, {minority_mode, nodes()}),
+ case majority() of
+ false -> pausing;
+ true -> ok
+ end
+ end.
+
+%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -362,7 +405,6 @@ await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
[]),
run_outside_applications(fun () ->
- rabbit_networking:killall(),
rabbit:stop(),
wait_for_cluster_recovery()
end),