diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-07-22 13:34:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-07-22 13:34:31 +0100 |
commit | 0d53507cdf5dedcad354e82355618052613588b0 (patch) | |
tree | 0788baffc117f4f8c765b4f71e32076886efc834 | |
parent | 01e7b5198c0ea4ca167f15af0dfb9fd884f7b285 (diff) | |
download | rabbitmq-server-0d53507cdf5dedcad354e82355618052613588b0.tar.gz |
Check for pause minority mode being about to cause a shutdown before sending confirms. In this way we can get rid of the effort to kill all network connections ASAP.
-rw-r--r-- | src/rabbit_channel.erl | 23 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 31 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 44 |
3 files changed, 64 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 74f9cacf..27a7d208 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1565,15 +1565,22 @@ send_nacks(_, State) -> send_confirms(State = #ch{tx = none, confirmed = []}) -> State; send_confirms(State = #ch{tx = none, confirmed = C}) -> - MsgSeqNos = - lists:foldl( - fun ({MsgSeqNo, XName}, MSNs) -> - ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State), - [MsgSeqNo | MSNs] - end, [], lists:append(C)), - send_confirms(MsgSeqNos, State#ch{confirmed = []}); + case rabbit_node_monitor:pause_minority_guard() of + ok -> MsgSeqNos = + lists:foldl( + fun ({MsgSeqNo, XName}, MSNs) -> + ?INCR_STATS([{exchange_stats, XName, 1}], + confirm, State), + [MsgSeqNo | MSNs] + end, [], lists:append(C)), + send_confirms(MsgSeqNos, State#ch{confirmed = []}); + pausing -> State#ch{confirmed = []} + end; send_confirms(State) -> - maybe_complete_tx(State). + case rabbit_node_monitor:pause_minority_guard() of + ok -> maybe_complete_tx(State); + pausing -> State + end. send_confirms([], State) -> State; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 0791bbe2..9082dbd3 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -16,8 +16,8 @@ -module(rabbit_networking). --export([boot/0, start/0, killall/0, start_tcp_listener/1, start_ssl_listener/2, - on_node_down/1, active_listeners/0, +-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2, + stop_tcp_listener/1, on_node_down/1, active_listeners/0, node_listeners/1, register_connection/1, unregister_connection/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, @@ -60,10 +60,10 @@ -type(label() :: string()). -spec(start/0 :: () -> 'ok'). --spec(killall/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(start_ssl_listener/2 :: (listener_config(), rabbit_types:infos()) -> 'ok'). +-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). -spec(register_connection/1 :: (pid()) -> ok). @@ -145,25 +145,6 @@ start() -> rabbit_sup:start_supervisor_child( [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]). -%% We are going to stop for pause-minority, so we are already -%% compromised; anything we confirm from now on is not going to be -%% remembered after we come back. Since rabbit:stop/0 may take a while -%% to gracefully shut down, we should stop talking to the outside -%% world *immediately*. -killall() -> - %% Stop ASAP - kill_connections(), - {ok, TCPListeners} = application:get_env(rabbit, tcp_listeners), - {ok, SSLListeners} = application:get_env(rabbit, ssl_listeners), - [stop_listener(L) || L <- TCPListeners ++ SSLListeners], - %% In case anything reconnected while we were stopping listeners - kill_connections(), - ok. - -kill_connections() -> - Conns = connections_local() ++ rabbit_direct:list_local(), - [exit(P, kill) || P <- Conns]. - ensure_ssl() -> {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), ok = app_utils:start_applications(SslAppsConfig), @@ -264,12 +245,12 @@ start_listener0(Address, Protocol, Label, OnConnect) -> {rabbit_misc:ntoa(IPAddress), Port}}) end. -stop_listener(Listener) -> - [stop_listener0(Address) || +stop_tcp_listener(Listener) -> + [stop_tcp_listener0(Address) || Address <- tcp_listener_addresses(Listener)], ok. -stop_listener0({IPAddress, Port, _Family}) -> +stop_tcp_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 22b0c280..ca843e14 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -25,6 +25,7 @@ update_cluster_status/0, reset_cluster_status/0]). -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). -export([partitions/0, partitions/1, subscribe/1]). +-export([pause_minority_guard/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -60,6 +61,7 @@ -spec(partitions/0 :: () -> [node()]). -spec(partitions/1 :: ([node()]) -> [{node(), [node()]}]). -spec(subscribe/1 :: (pid()) -> 'ok'). +-spec(pause_minority_guard/0 :: () -> 'ok' | 'pausing'). -spec(all_rabbit_nodes_up/0 :: () -> boolean()). -spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). @@ -193,6 +195,47 @@ subscribe(Pid) -> gen_server:cast(?SERVER, {subscribe, Pid}). %%---------------------------------------------------------------------------- +%% pause_minority safety +%%---------------------------------------------------------------------------- + +%% If we are in a minority and pause_minority mode then a) we are +%% going to shut down imminently and b) we should not confirm anything +%% until then, since anything we confirm is likely to be lost. +%% +%% We could confirm something by having an HA queue see the minority +%% state (and fail over into it) before the node monitor stops us, or +%% by using unmirrored queues and just having them vanish (and +%% confiming messages as thrown away). +%% +%% So we have channels call in here before issuing confirms, to do a +%% lightweight check that we have not entered a minority state. + +pause_minority_guard() -> + case get(pause_minority_guard) of + not_minority_mode -> + ok; + undefined -> + {ok, M} = application:get_env(rabbit, cluster_partition_handling), + case M of + pause_minority -> pause_minority_guard([]); + _ -> put(pause_minority_guard, not_minority_mode), + ok + end; + {minority_mode, Nodes} -> + pause_minority_guard(Nodes) + end. + +pause_minority_guard(LastNodes) -> + case nodes() of + LastNodes -> ok; + _ -> put(pause_minority_guard, {minority_mode, nodes()}), + case majority() of + false -> pausing; + true -> ok + end + end. + +%%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -362,7 +405,6 @@ await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), run_outside_applications(fun () -> - rabbit_networking:killall(), rabbit:stop(), wait_for_cluster_recovery() end), |