summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-10-14 14:13:03 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-10-14 14:13:03 +0100
commitb0300ecc3e0f75edabb201840084f7b9dbfc43ed (patch)
treeff2ff6c1d61f8d39e0185551de3853600dfed833
parentb725d82cd5be0dbb7f8fe8c7f516d3853ecb79ff (diff)
parent8a167cc42cf7ad9ba8cac09668ccb79eed484227 (diff)
downloadrabbitmq-server-b0300ecc3e0f75edabb201840084f7b9dbfc43ed.tar.gz
merge bug25404
-rw-r--r--src/rabbit_connection_helper_sup.erl (renamed from src/rabbit_intermediate_sup.erl)23
-rw-r--r--src/rabbit_connection_sup.erl20
-rw-r--r--src/rabbit_heartbeat.erl40
-rw-r--r--src/rabbit_reader.erl65
4 files changed, 74 insertions, 74 deletions
diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_connection_helper_sup.erl
index a9381f20..e51615e8 100644
--- a/src/rabbit_intermediate_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -11,21 +11,27 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
%%
--module(rabbit_intermediate_sup).
+-module(rabbit_connection_helper_sup).
-behaviour(supervisor2).
-export([start_link/0]).
+-export([start_channel_sup_sup/1,
+ start_queue_collector/1]).
-export([init/1]).
+-include("rabbit.hrl").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -33,7 +39,20 @@
start_link() ->
supervisor2:start_link(?MODULE, []).
+start_channel_sup_sup(SupPid) ->
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
+
+start_queue_collector(SupPid) ->
+ supervisor2:start_child(
+ SupPid,
+ {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
+
%%----------------------------------------------------------------------------
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.
+
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index c1fa17aa..9ed5dc77 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -37,27 +37,25 @@
start_link() ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
- {ok, Collector} =
- supervisor2:start_child(
- SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
- intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
%% them cleanly. But for 1.0 readers we can't start the real
%% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) -
%% so we add another supervisor into the hierarchy.
- {ok, ChannelSup3Pid} =
+ %%
+ %% This supervisor also acts as an intermediary for heartbeaters and
+ %% the queue collector process, since these must not be siblings of the
+ %% reader due to the potential for deadlock if they are added/restarted
+ %% whilst the supervision tree is shutting down.
+ {ok, HelperSup} =
supervisor2:start_child(
SupPid,
- {channel_sup3, {rabbit_intermediate_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}),
+ {helper_sup, {rabbit_connection_helper_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_connection_helper_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
- {reader, {rabbit_reader, start_link,
- [ChannelSup3Pid, Collector,
- rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
+ {reader, {rabbit_reader, start_link, [HelperSup]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index fac74edb..ca67254b 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,8 +16,9 @@
-module(rabbit_heartbeat).
+-export([start/6]).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
- start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
+ pause_monitor/1, resume_monitor/1]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -28,16 +29,15 @@
-ifdef(use_specs).
-export_type([heartbeaters/0]).
--export_type([start_heartbeat_fun/0]).
-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}).
-type(heartbeat_callback() :: fun (() -> any())).
--type(start_heartbeat_fun() ::
- fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
- non_neg_integer(), heartbeat_callback()) ->
- no_return())).
+-spec(start/6 ::
+ (pid(), rabbit_net:socket(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
-spec(start_heartbeat_sender/3 ::
(rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
@@ -46,10 +46,6 @@
(rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
--spec(start_heartbeat_fun/1 ::
- (pid()) -> start_heartbeat_fun()).
-
-
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -61,6 +57,17 @@
%%----------------------------------------------------------------------------
+start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ {ok, Sender} =
+ start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ SendFun, heartbeat_sender,
+ start_heartbeat_sender),
+ {ok, Receiver} =
+ start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ ReceiveFun, heartbeat_receiver,
+ start_heartbeat_receiver),
+ {Sender, Receiver}.
+
start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
@@ -75,19 +82,6 @@ start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
fun () -> ReceiveFun(), stop end}).
-start_heartbeat_fun(SupPid) ->
- fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
- {ok, Sender} =
- start_heartbeater(SendTimeoutSec, SupPid, Sock,
- SendFun, heartbeat_sender,
- start_heartbeat_sender),
- {ok, Receiver} =
- start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
- ReceiveFun, heartbeat_receiver,
- start_heartbeat_receiver),
- {Sender, Receiver}
- end.
-
pause_monitor({_Sender, none}) -> ok;
pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 157b8270..e00732fd 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -18,12 +18,12 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1,
+-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2, recvloop/2]).
+-export([init/2, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -36,9 +36,8 @@
%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
- connection_state, queue_collector, heartbeater, stats_timer,
- ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun,
- buf, buf_len, throttle}).
+ connection_state, helper_sup, queue_collector, heartbeater,
+ stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
@@ -74,8 +73,7 @@
-ifdef(use_specs).
--spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
- rabbit_types:ok(pid())).
+-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
@@ -86,11 +84,9 @@
rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
- -> no_return()).
--spec(start_connection/7 ::
- (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
- rabbit_net:socket(),
+-spec(init/2 :: (pid(), pid()) -> no_return()).
+-spec(start_connection/5 ::
+ (pid(), pid(), any(), rabbit_net:socket(),
fun ((rabbit_net:socket()) ->
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
@@ -104,20 +100,17 @@
%%--------------------------------------------------------------------------
-start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid,
- Collector, StartHeartbeatFun])}.
+start_link(HelperSup) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) ->
+init(Parent, HelperSup) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(
- Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock,
- SockTransform)
+ start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -205,8 +198,7 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
- Sock, SockTransform) ->
+start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Str} -> Str;
@@ -242,11 +234,10 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
recv_len = 0,
pending_recv = false,
connection_state = pre_init,
- queue_collector = Collector,
+ queue_collector = undefined, %% started on tune-ok
+ helper_sup = HelperSup,
heartbeater = none,
- ch_sup3_pid = ChSup3Pid,
channel_sup_sup_pid = none,
- start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
throttle = #throttle{
@@ -851,8 +842,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
- sock = Sock,
- start_heartbeat_fun = SHF}) ->
+ helper_sup = SupPid,
+ sock = Sock}) ->
ServerFrameMax = server_frame_max(),
if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
rabbit_misc:protocol_error(
@@ -863,16 +854,20 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ServerFrameMax]);
true ->
+ {ok, Collector} =
+ rabbit_connection_helper_sup:start_queue_collector(SupPid),
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
- ClientHeartbeat, ReceiveFun),
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
+ SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
frame_max = FrameMax},
+ queue_collector = Collector,
heartbeater = Heartbeater}
end;
@@ -881,7 +876,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- ch_sup3_pid = ChSup3Pid,
+ helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
@@ -890,10 +885,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Throttle1 = Throttle#throttle{alarmed_by = Conserve},
{ok, ChannelSupSupPid} =
- supervisor2:start_child(
- ChSup3Pid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
@@ -1106,10 +1098,7 @@ pack_for_1_0(#v1{parent = Parent,
sock = Sock,
recv_len = RecvLen,
pending_recv = PendingRecv,
- queue_collector = QueueCollector,
- ch_sup3_pid = ChSup3Pid,
- start_heartbeat_fun = SHF,
+ helper_sup = SupPid,
buf = Buf,
buf_len = BufLen}) ->
- {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF,
- Buf, BufLen}.
+ {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.