diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-10-14 14:13:03 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-10-14 14:13:03 +0100 |
commit | b0300ecc3e0f75edabb201840084f7b9dbfc43ed (patch) | |
tree | ff2ff6c1d61f8d39e0185551de3853600dfed833 | |
parent | b725d82cd5be0dbb7f8fe8c7f516d3853ecb79ff (diff) | |
parent | 8a167cc42cf7ad9ba8cac09668ccb79eed484227 (diff) | |
download | rabbitmq-server-b0300ecc3e0f75edabb201840084f7b9dbfc43ed.tar.gz |
merge bug25404
-rw-r--r-- | src/rabbit_connection_helper_sup.erl (renamed from src/rabbit_intermediate_sup.erl) | 23 | ||||
-rw-r--r-- | src/rabbit_connection_sup.erl | 20 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 40 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 65 |
4 files changed, 74 insertions, 74 deletions
diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_connection_helper_sup.erl index a9381f20..e51615e8 100644 --- a/src/rabbit_intermediate_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -11,21 +11,27 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. %% --module(rabbit_intermediate_sup). +-module(rabbit_connection_helper_sup). -behaviour(supervisor2). -export([start_link/0]). +-export([start_channel_sup_sup/1, + start_queue_collector/1]). -export([init/1]). +-include("rabbit.hrl"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- @@ -33,7 +39,20 @@ start_link() -> supervisor2:start_link(?MODULE, []). +start_channel_sup_sup(SupPid) -> + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). + +start_queue_collector(SupPid) -> + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). + %%---------------------------------------------------------------------------- init([]) -> {ok, {{one_for_one, 10, 10}, []}}. + diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index c1fa17aa..9ed5dc77 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -37,27 +37,25 @@ start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, Collector} = - supervisor2:start_child( - SupPid, - {collector, {rabbit_queue_collector, start_link, []}, - intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), %% We need to get channels in the hierarchy here so they get shut %% down after the reader, so the reader gets a chance to terminate %% them cleanly. But for 1.0 readers we can't start the real %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - %% so we add another supervisor into the hierarchy. - {ok, ChannelSup3Pid} = + %% + %% This supervisor also acts as an intermediary for heartbeaters and + %% the queue collector process, since these must not be siblings of the + %% reader due to the potential for deadlock if they are added/restarted + %% whilst the supervision tree is shutting down. + {ok, HelperSup} = supervisor2:start_child( SupPid, - {channel_sup3, {rabbit_intermediate_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}), + {helper_sup, {rabbit_connection_helper_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_connection_helper_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, - [ChannelSup3Pid, Collector, - rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, + {reader, {rabbit_reader, start_link, [HelperSup]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index fac74edb..ca67254b 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -16,8 +16,9 @@ -module(rabbit_heartbeat). +-export([start/6]). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). + pause_monitor/1, resume_monitor/1]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -28,16 +29,15 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). --export_type([start_heartbeat_fun/0]). -type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). -type(heartbeat_callback() :: fun (() -> any())). --type(start_heartbeat_fun() :: - fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), - non_neg_integer(), heartbeat_callback()) -> - no_return())). +-spec(start/6 :: + (pid(), rabbit_net:socket(), + non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> heartbeaters()). -spec(start_heartbeat_sender/3 :: (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> @@ -46,10 +46,6 @@ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). --spec(start_heartbeat_fun/1 :: - (pid()) -> start_heartbeat_fun()). - - -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -61,6 +57,17 @@ %%---------------------------------------------------------------------------- +start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + {ok, Sender} = + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), + {ok, Receiver} = + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), + {Sender, Receiver}. + start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case @@ -75,19 +82,6 @@ start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> ReceiveFun(), stop end}). -start_heartbeat_fun(SupPid) -> - fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> - {ok, Sender} = - start_heartbeater(SendTimeoutSec, SupPid, Sock, - SendFun, heartbeat_sender, - start_heartbeat_sender), - {ok, Receiver} = - start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, - ReceiveFun, heartbeat_receiver, - start_heartbeat_receiver), - {Sender, Receiver} - end. - pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 157b8270..e00732fd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,12 +18,12 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/2, recvloop/2]). +-export([init/2, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -36,9 +36,8 @@ %%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, - connection_state, queue_collector, heartbeater, stats_timer, - ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, - buf, buf_len, throttle}). + connection_state, helper_sup, queue_collector, heartbeater, + stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, vhost, @@ -74,8 +73,7 @@ -ifdef(use_specs). --spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> - rabbit_types:ok(pid())). +-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). @@ -86,11 +84,9 @@ rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) - -> no_return()). --spec(start_connection/7 :: - (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), - rabbit_net:socket(), +-spec(init/2 :: (pid(), pid()) -> no_return()). +-spec(start_connection/5 :: + (pid(), pid(), any(), rabbit_net:socket(), fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). @@ -104,20 +100,17 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, - Collector, StartHeartbeatFun])}. +start_link(HelperSup) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> +init(Parent, HelperSup) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection( - Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, - SockTransform) + start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -205,8 +198,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, - Sock, SockTransform) -> +start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of {ok, Str} -> Str; @@ -242,11 +234,10 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, recv_len = 0, pending_recv = false, connection_state = pre_init, - queue_collector = Collector, + queue_collector = undefined, %% started on tune-ok + helper_sup = HelperSup, heartbeater = none, - ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, - start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, throttle = #throttle{ @@ -851,8 +842,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, - sock = Sock, - start_heartbeat_fun = SHF}) -> + helper_sup = SupPid, + sock = Sock}) -> ServerFrameMax = server_frame_max(), if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> rabbit_misc:protocol_error( @@ -863,16 +854,20 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ServerFrameMax]); true -> + {ok, Collector} = + rabbit_connection_helper_sup:start_queue_collector(SupPid), Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, - ClientHeartbeat, ReceiveFun), + Heartbeater = + rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, + SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, frame_max = FrameMax}, + queue_collector = Collector, heartbeater = Heartbeater} end; @@ -881,7 +876,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - ch_sup3_pid = ChSup3Pid, + helper_sup = SupPid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), @@ -890,10 +885,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), Throttle1 = Throttle#throttle{alarmed_by = Conserve}, {ok, ChannelSupSupPid} = - supervisor2:start_child( - ChSup3Pid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + rabbit_connection_helper_sup:start_channel_sup_sup(SupPid), State1 = control_throttle( State#v1{connection_state = running, connection = NewConnection, @@ -1106,10 +1098,7 @@ pack_for_1_0(#v1{parent = Parent, sock = Sock, recv_len = RecvLen, pending_recv = PendingRecv, - queue_collector = QueueCollector, - ch_sup3_pid = ChSup3Pid, - start_heartbeat_fun = SHF, + helper_sup = SupPid, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, - Buf, BufLen}. + {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}. |