summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-10-10 10:40:54 +0100
committerTim Watson <tim@rabbitmq.com>2013-10-10 10:40:54 +0100
commitc48fa989303bd235f16c60168988818426a7d7c0 (patch)
treebc8ff91995092587bb734cf4d4681f1a257b6eb3
parent43eaf1fd45f55a7cfaf9618bc3622982d03109d5 (diff)
downloadrabbitmq-server-c48fa989303bd235f16c60168988818426a7d7c0.tar.gz
Unify the intermediate supervision tree under r_connection_helper_sup
This deprecates r_intermediate_sup and transitions the chan_sup_sup and queue collector processes into the helper sup, as well as providing an API for starting both.
-rw-r--r--src/rabbit_connection_helper_sup.erl16
-rw-r--r--src/rabbit_connection_sup.erl18
-rw-r--r--src/rabbit_intermediate_sup.erl39
-rw-r--r--src/rabbit_reader.erl48
4 files changed, 43 insertions, 78 deletions
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl
index 8f6c7698..e51615e8 100644
--- a/src/rabbit_connection_helper_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -19,7 +19,9 @@
-behaviour(supervisor2).
-export([start_link/0]).
--export([start_queue_collector/1]).
+-export([start_channel_sup_sup/1,
+ start_queue_collector/1]).
+
-export([init/1]).
-include("rabbit.hrl").
@@ -28,6 +30,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
-endif.
@@ -36,15 +39,20 @@
start_link() ->
supervisor2:start_link(?MODULE, []).
+start_channel_sup_sup(SupPid) ->
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
+
start_queue_collector(SupPid) ->
supervisor2:start_child(
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
-%%--------------------------------------------------------------------------
+%%----------------------------------------------------------------------------
init([]) ->
- {ok, {{one_for_all, 0, 1}, []}}.
-
+ {ok, {{one_for_one, 10, 10}, []}}.
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 1f4ab19c..0c6dce76 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,22 +42,22 @@ start_link() ->
%% them cleanly. But for 1.0 readers we can't start the real
%% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) -
%% so we add another supervisor into the hierarchy.
- {ok, ChannelSup3Pid} =
+ %%
+ %% This supervisor also acts as an intermediary for heartbeaters and
+ %% the queue collector process, since these must not be siblings of the
+ %% reader due to the potential for deadlock if they are added/restarted
+ %% whilst the supervision tree is shutting down.
+ {ok, IntermediateSup} =
supervisor2:start_child(
SupPid,
- {channel_sup3, {rabbit_intermediate_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}),
- {ok, ConHelperSupPid} =
- supervisor2:start_child(
- SupPid,
- {helper_sup, {rabbit_connection_helper_sup, start_link, []},
+ {channel_sup3, {rabbit_connection_helper_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_connection_helper_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSup3Pid, ConHelperSupPid,
- rabbit_heartbeat:start_heartbeat_fun(ConHelperSupPid)]},
+ [IntermediateSup,
+ rabbit_heartbeat:start_heartbeat_fun(IntermediateSup)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_intermediate_sup.erl
deleted file mode 100644
index a9381f20..00000000
--- a/src/rabbit_intermediate_sup.erl
+++ /dev/null
@@ -1,39 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved.
-%%
-
--module(rabbit_intermediate_sup).
-
--behaviour(supervisor2).
-
--export([start_link/0]).
-
--export([init/1]).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link() ->
- supervisor2:start_link(?MODULE, []).
-
-%%----------------------------------------------------------------------------
-
-init([]) ->
- {ok, {{one_for_one, 10, 10}, []}}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9e81511a..574bf882 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -18,12 +18,12 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1,
+-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/1,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2, recvloop/2]).
+-export([init/3, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -36,8 +36,8 @@
%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
- connection_state, helper_sup_pid, queue_collector, heartbeater,
- stats_timer, ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ connection_state, helper_sup, queue_collector, heartbeater,
+ stats_timer, channel_sup_sup_pid, start_heartbeat_fun,
buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
@@ -74,7 +74,7 @@
-ifdef(use_specs).
--spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
+-spec(start_link/2 :: (pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -86,10 +86,10 @@
rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
+-spec(init/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
-> no_return()).
--spec(start_connection/7 ::
- (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
+-spec(start_connection/6 ::
+ (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
rabbit_net:socket(),
fun ((rabbit_net:socket()) ->
rabbit_types:ok_or_error2(
@@ -104,19 +104,19 @@
%%--------------------------------------------------------------------------
-start_link(ChannelSup3Pid, HelperPid, StartHeartbeatFun) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid,
- HelperPid, StartHeartbeatFun])}.
+start_link(HelperSup, StartHeartbeatFun) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup,
+ StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun) ->
+init(Parent, HelperSup, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb, Sock,
+ Parent, HelperSup, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
@@ -205,8 +205,8 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb,
- Sock, SockTransform) ->
+start_connection(Parent, HelperSup, StartHeartbeatFun,
+ Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Str} -> Str;
@@ -243,9 +243,8 @@ start_connection(Parent, ChSup3Pid, HelperPid, StartHeartbeatFun, Deb,
pending_recv = false,
connection_state = pre_init,
queue_collector = undefined, %% started on tune-ok
- helper_sup_pid = HelperPid,
+ helper_sup = HelperSup,
heartbeater = none,
- ch_sup3_pid = ChSup3Pid,
channel_sup_sup_pid = none,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
@@ -852,7 +851,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
- helper_sup_pid = HelperPid,
+ helper_sup = SupPid,
sock = Sock,
start_heartbeat_fun = SHF}) ->
ServerFrameMax = server_frame_max(),
@@ -866,7 +865,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
[FrameMax, ServerFrameMax]);
true ->
{ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(HelperPid),
+ rabbit_connection_helper_sup:start_queue_collector(SupPid),
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
@@ -886,7 +885,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- ch_sup3_pid = ChSup3Pid,
+ helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
@@ -895,10 +894,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Throttle1 = Throttle#throttle{alarmed_by = Conserve},
{ok, ChannelSupSupPid} =
- supervisor2:start_child(
- ChSup3Pid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
@@ -1112,9 +1108,9 @@ pack_for_1_0(#v1{parent = Parent,
recv_len = RecvLen,
pending_recv = PendingRecv,
queue_collector = QueueCollector,
- ch_sup3_pid = ChSup3Pid,
+ helper_sup = SupPid,
start_heartbeat_fun = SHF,
buf = Buf,
buf_len = BufLen}) ->
- {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF,
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, SupPid, SHF,
Buf, BufLen}.