diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-03 11:17:18 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-03 11:17:18 +0100 |
commit | ece61e122771dddcad1e0892c55e7d5c3239a1a1 (patch) | |
tree | 304b3a6de863246fd27ebc62c21a0af4e502a947 | |
parent | 7e0e36acb972d00cd4f0723de6097b4678f68c2d (diff) | |
download | rabbitmq-server-ece61e122771dddcad1e0892c55e7d5c3239a1a1.tar.gz |
Stick in yet another supervisor to get shutdown happening in the right order.bug25631
-rw-r--r-- | src/rabbit_connection_sup.erl | 11 | ||||
-rw-r--r-- | src/rabbit_intermediate_sup.erl | 39 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 22 |
3 files changed, 60 insertions, 12 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 31bc51b8..fedfe97a 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,11 +42,20 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + %% We need to get channels in the hierarchy here so they close + %% before the reader. But for 1.0 readers we can't start the real + %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - + %% so we add another supervisor into the hierarchy. + {ok, ChannelSup3Pid} = + supervisor2:start_child( + SupPid, + {channel_sup3, {rabbit_intermediate_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [SupPid, Collector, + [ChannelSup3Pid, Collector, rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_intermediate_sup.erl new file mode 100644 index 00000000..1919d9d6 --- /dev/null +++ b/src/rabbit_intermediate_sup.erl @@ -0,0 +1,39 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_intermediate_sup). + +-behaviour(supervisor2). + +-export([start_link/0]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link(?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 61fac0e2..3cf88d06 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -37,7 +37,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun, + ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, @@ -103,19 +103,19 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, +start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ConnSupPid, Collector, StartHeartbeatFun) -> +init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -201,7 +201,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of @@ -240,7 +240,7 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, connection_state = pre_init, queue_collector = Collector, heartbeater = none, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], @@ -837,7 +837,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), @@ -847,7 +847,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, Throttle1 = Throttle#throttle{conserve_resources = Conserve}, {ok, ChannelSupSupPid} = supervisor2:start_child( - ConnSupPid, + ChSup3Pid, {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), State1 = control_throttle( @@ -1048,9 +1048,9 @@ pack_for_1_0(#v1{parent = Parent, recv_len = RecvLen, pending_recv = PendingRecv, queue_collector = QueueCollector, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, start_heartbeat_fun = SHF, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF, + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, Buf, BufLen}. |