diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-10-10 10:40:54 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-10-10 10:40:54 +0100 |
commit | c48fa989303bd235f16c60168988818426a7d7c0 (patch) | |
tree | bc8ff91995092587bb734cf4d4681f1a257b6eb3 | |
parent | 43eaf1fd45f55a7cfaf9618bc3622982d03109d5 (diff) | |
download | rabbitmq-server-c48fa989303bd235f16c60168988818426a7d7c0.tar.gz |
Unify the intermediate supervision tree under r_connection_helper_sup
This deprecates r_intermediate_sup and transitions the chan_sup_sup and
queue collector processes into the helper sup, as well as providing an
API for starting both.
-rw-r--r-- | src/rabbit_connection_helper_sup.erl | 16 | ||||
-rw-r--r-- | src/rabbit_connection_sup.erl | 18 | ||||
-rw-r--r-- | src/rabbit_intermediate_sup.erl | 39 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 48 |
4 files changed, 43 insertions, 78 deletions
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl index 8f6c7698..e51615e8 100644 --- a/src/rabbit_connection_helper_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -19,7 +19,9 @@ -behaviour(supervisor2). -export([start_link/0]). --export([start_queue_collector/1]). +-export([start_channel_sup_sup/1, + start_queue_collector/1]). + -export([init/1]). -include("rabbit.hrl"). @@ -28,6 +30,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). -spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). -endif. @@ -36,15 +39,20 @@ start_link() -> supervisor2:start_link(?MODULE, []). +start_channel_sup_sup(SupPid) -> + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). + start_queue_collector(SupPid) -> supervisor2:start_child( SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). -%%-------------------------------------------------------------------------- +%%---------------------------------------------------------------------------- init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. - + {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 1f4ab19c..0c6dce76 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,22 +42,22 @@ start_link() -> %% them cleanly. But for 1.0 readers we can't start the real %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - %% so we add another supervisor into the hierarchy. - {ok, ChannelSup3Pid} = + %% + %% This supervisor also acts as an intermediary for heartbeaters and + %% the queue collector process, since these must not be siblings of the + %% reader due to the potential for deadlock if they are added/restarted + %% whilst the supervision tree is shutting down. + {ok, IntermediateSup} = supervisor2:start_child( SupPid, - {channel_sup3, {rabbit_intermediate_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}), - {ok, ConHelperSupPid} = - supervisor2:start_child( - SupPid, - {helper_sup, {rabbit_connection_helper_sup, start_link, []}, + {channel_sup3, {rabbit_connection_helper_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_connection_helper_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSup3Pid, ConHelperSupPid, - rabbit_heartbeat:start_heartbeat_fun(ConHelperSupPid)]}, + [IntermediateSup, + rabbit_heartbeat:start_heartbeat_fun(IntermediateSup)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_intermediate_sup.erl deleted file mode 100644 index a9381f20..00000000 --- a/src/rabbit_intermediate_sup.erl +++ /dev/null @@ -1,39 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved. -%% - --module(rabbit_intermediate_sup). - --behaviour(supervisor2). - --export([start_link/0]). - --export([init/1]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - supervisor2:start_link(?MODULE, []). - -%%---------------------------------------------------------------------------- - -init([]) -> - {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9e81511a..574bf882 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,12 +18,12 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/1, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/2, recvloop/2]). +-export([init/3, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -36,8 +36,8 @@ %%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, - connection_state, helper_sup_pid, queue_collector, heartbeater, - stats_timer, ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, + connection_state, helper_sup, queue_collector, heartbeater, + stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, @@ -74,7 +74,7 @@ -ifdef(use_specs). --spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> +-spec(start_link/2 :: (pid(), rabbit_heartbeat:start_heartbeat_fun()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -86,10 +86,10 @@ rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) +-spec(init/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> no_return()). --spec(start_connection/7 :: - (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), +-spec(start_connection/6 :: + (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), rabbit_net:socket(), fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( @@ -104,19 +104,19 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSup3Pid, HelperPid, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, - HelperPid, StartHeartbeatFun])}. +start_link(HelperSup, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, + StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun) -> +init(Parent, HelperSup, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb, Sock, + Parent, HelperSup, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -205,8 +205,8 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb, - Sock, SockTransform) -> +start_connection(Parent, HelperSup, StartHeartbeatFun, + Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of {ok, Str} -> Str; @@ -243,9 +243,8 @@ start_connection(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb, pending_recv = false, connection_state = pre_init, queue_collector = undefined, %% started on tune-ok - helper_sup_pid = HelperPid, + helper_sup = HelperSup, heartbeater = none, - ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], @@ -852,7 +851,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, - helper_sup_pid = HelperPid, + helper_sup = SupPid, sock = Sock, start_heartbeat_fun = SHF}) -> ServerFrameMax = server_frame_max(), @@ -866,7 +865,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, [FrameMax, ServerFrameMax]); true -> {ok, Collector} = - rabbit_connection_helper_sup:start_queue_collector(HelperPid), + rabbit_connection_helper_sup:start_queue_collector(SupPid), Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), @@ -886,7 +885,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - ch_sup3_pid = ChSup3Pid, + helper_sup = SupPid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), @@ -895,10 +894,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), Throttle1 = Throttle#throttle{alarmed_by = Conserve}, {ok, ChannelSupSupPid} = - supervisor2:start_child( - ChSup3Pid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + rabbit_connection_helper_sup:start_channel_sup_sup(SupPid), State1 = control_throttle( State#v1{connection_state = running, connection = NewConnection, @@ -1112,9 +1108,9 @@ pack_for_1_0(#v1{parent = Parent, recv_len = RecvLen, pending_recv = PendingRecv, queue_collector = QueueCollector, - ch_sup3_pid = ChSup3Pid, + helper_sup = SupPid, start_heartbeat_fun = SHF, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, SupPid, SHF, Buf, BufLen}. |