summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-06 17:24:16 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-06 17:24:16 +0100
commitd53bc353ac5e87bceb99de0d3953074ae27ddacc (patch)
tree5363fb2aeb0c311a48cdabde20d1c94d35718eb0
parent1389a3dfdeeee3edae605dc22315fdd0dafcff73 (diff)
parent5247d20bd3c9d1c6949c7a2952b6cea17b3293c4 (diff)
downloadrabbitmq-server-d53bc353ac5e87bceb99de0d3953074ae27ddacc.tar.gz
Merging default into bug 15930
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/rabbit_channel.erl28
-rw-r--r--src/rabbit_channel_sup.erl83
-rw-r--r--src/rabbit_channel_sup_sup.erl49
-rw-r--r--src/rabbit_connection_sup.erl63
-rw-r--r--src/rabbit_framing_channel.erl19
-rw-r--r--src/rabbit_heartbeat.erl64
-rw-r--r--src/rabbit_networking.erl17
-rw-r--r--src/rabbit_reader.erl166
-rw-r--r--src/rabbit_tests.erl21
-rw-r--r--src/rabbit_writer.erl32
-rw-r--r--src/supervisor2.erl36
-rw-r--r--src/tcp_client_sup.erl10
13 files changed, 431 insertions, 159 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 49ae63c1..f1c8eb4d 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -164,7 +164,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
+ enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
-export([behaviour_info/1]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6f244166..8eb148c1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -43,7 +43,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
--record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
+-record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
@@ -78,7 +78,8 @@
-spec(start_link/6 ::
(channel_number(), pid(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())).
+ rabbit_types:vhost(), pid()) ->
+ 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -101,7 +102,7 @@
%%----------------------------------------------------------------------------
start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid,
+ gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid,
Username, VHost, CollectorPid], []).
do(Pid, Method) ->
@@ -150,12 +151,14 @@ flush(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
+init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost,
+ CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
channel = Channel,
+ parent_pid = ParentPid,
reader_pid = ReaderPid,
writer_pid = WriterPid,
limiter_pid = undefined,
@@ -259,8 +262,10 @@ terminate(_Reason, State = #ch{state = terminating}) ->
terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
- normal -> ok = Res;
- _ -> ok
+ normal -> ok = Res;
+ shutdown -> ok = Res;
+ {shutdown, _Term} -> ok = Res;
+ _ -> ok
end,
terminate(State).
@@ -1017,8 +1022,13 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
- {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) ->
+ Me = self(),
+ {ok, LPid} =
+ supervisor:start_child(
+ ParentPid,
+ {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]}),
ok = limit_queues(LPid, State),
LPid.
@@ -1093,7 +1103,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]),
- rabbit_writer:shutdown(WriterPid),
+ rabbit_writer:flush(WriterPid),
rabbit_limiter:shutdown(LimiterPid).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
new file mode 100644
index 00000000..17e1446d
--- /dev/null
+++ b/src/rabbit_channel_sup.erl
@@ -0,0 +1,83 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/8]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/8 ::
+ (rabbit_types:protocol(), rabbit_net:socket(),
+ rabbit_channel:channel_number(), non_neg_integer(), pid(),
+ rabbit_access_control:username(), rabbit_types:vhost(), pid()) ->
+ {'ok', pid(), pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+ Collector) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, WriterPid} =
+ supervisor2:start_child(
+ SupPid,
+ {writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol]},
+ permanent, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, WriterPid, Username, VHost,
+ Collector]},
+ permanent, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, FramingChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {framing_channel, {rabbit_framing_channel, start_link,
+ [ChannelPid, Protocol]},
+ permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
+ {ok, SupPid, FramingChannelPid}.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
new file mode 100644
index 00000000..2fab8678
--- /dev/null
+++ b/src/rabbit_channel_sup_sup.erl
@@ -0,0 +1,49 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, start_channel/2]).
+
+-export([init/1]).
+
+start_link() ->
+ supervisor2:start_link(?MODULE, []).
+
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{channel_sup, {rabbit_channel_sup, start_link, []},
+ temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
+
+start_channel(Pid, Args) ->
+ supervisor2:start_child(Pid, Args).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
new file mode 100644
index 00000000..8d09961f
--- /dev/null
+++ b/src/rabbit_connection_sup.erl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_connection_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, reader/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+start_link() ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ permanent, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ {ok, _ReaderPid} =
+ supervisor2:start_child(
+ SupPid,
+ {reader, {rabbit_reader, start_link, [ChannelSupSupPid]},
+ permanent, ?MAX_WAIT, worker, [rabbit_reader]}),
+ {ok, SupPid}.
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1},
+ [{collector, {rabbit_queue_collector, start_link, []},
+ permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]}
+ ]}}.
+
+reader(Pid) ->
+ hd(supervisor2:find_child(Pid, reader)).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 00b74ad0..6ee5a555 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,22 +32,15 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/3, process/2, shutdown/1]).
+-export([start_link/2, process/2, shutdown/1]).
%% internal
-export([mainloop/2]).
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs, Protocol) ->
- {ok, spawn_link(
- fun () ->
- %% we trap exits so that a normal termination of
- %% the channel or reader process terminates us too.
- process_flag(trap_exit, true),
- {ok, ChannelPid} = apply(StartFun, StartArgs),
- mainloop(ChannelPid, Protocol)
- end)}.
+start_link(ChannelPid, Protocol) ->
+ {ok, proc_lib:spawn_link(fun () -> mainloop(ChannelPid, Protocol) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -61,12 +54,6 @@ shutdown(Pid) ->
read_frame(ChannelPid) ->
receive
- %% converting the exit signal into one of our own ensures that
- %% the reader sees the right pid (i.e. ours) when a channel
- %% exits. Similarly in the other direction, though it is not
- %% really relevant there since the channel is not specifically
- %% watching out for reader exit signals.
- {'EXIT', _Pid, Reason} -> exit(Reason);
{frame, Frame} -> Frame;
terminate -> rabbit_channel:shutdown(ChannelPid),
read_frame(ChannelPid);
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index faddffc1..d694011a 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,7 +31,11 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
+-export([start_heartbeat/3, pause_monitor/1, resume_monitor/1,
+ start_heartbeat_sender/2,
+ start_heartbeat_receiver/2]).
+
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
@@ -39,7 +43,13 @@
-type(pids() :: rabbit_types:maybe({pid(), pid()})).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()).
+-spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ pids()).
+-spec(start_heartbeat_sender/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+
-spec(pause_monitor/1 :: (pids()) -> 'ok').
-spec(resume_monitor/1 :: (pids()) -> 'ok').
@@ -47,31 +57,44 @@
%%----------------------------------------------------------------------------
-start_heartbeat(_Sock, 0) ->
+start_heartbeat(_Sup, _Sock, 0) ->
none;
-start_heartbeat(Sock, TimeoutSec) ->
- Parent = self(),
+start_heartbeat(Sup, Sock, TimeoutSec) ->
+ {ok, Sender} =
+ supervisor:start_child(
+ Sup, {heartbeat_sender,
+ {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {ok, Receiver} =
+ supervisor:start_child(
+ Sup, {heartbeat_receiver,
+ {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {Sender, Receiver}.
+
+start_heartbeat_sender(Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- Sender =
- spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
- send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end}, Parent) end),
+ Parent = self(),
+ {ok, proc_lib:spawn_link(
+ fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
+ send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}, Parent)
+ end)}.
+
+start_heartbeat_receiver(Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- Receiver =
- spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000,
- recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end}, Parent) end),
- {Sender, Receiver}.
+ Parent = self(),
+ {ok, proc_lib:spawn_link(
+ fun () -> heartbeater({Sock, TimeoutSec * 1000,
+ recv_oct, 1,
+ fun () -> exit(timeout) end}, Parent) end)}.
pause_monitor(none) ->
ok;
@@ -116,7 +139,6 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Recurse({NewStatVal, SameCount + 1});
true ->
case Handler() of
- stop -> ok;
continue -> Recurse({NewStatVal, 0})
end
end;
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 3a3357ba..492d7d01 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -118,7 +118,7 @@ start() ->
{rabbit_tcp_client_sup,
{tcp_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
- {rabbit_reader,start_link,[]}]},
+ {rabbit_connection_sup,start_link,[]}]},
transient, infinity, supervisor, [tcp_client_sup]}),
ok.
@@ -205,9 +205,10 @@ on_node_down(Node) ->
start_client(Sock, SockTransform) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- ok = rabbit_net:controlling_process(Sock, Child),
- Child ! {go, Sock, SockTransform},
- Child.
+ Reader = rabbit_connection_sup:reader(Child),
+ ok = rabbit_net:controlling_process(Sock, Reader),
+ Reader ! {go, Sock, SockTransform},
+ Reader.
start_client(Sock) ->
start_client(Sock, fun (S) -> {ok, S} end).
@@ -230,8 +231,9 @@ start_ssl_client(SslOpts, Sock) ->
end).
connections() ->
- [Pid || {_, Pid, _, _} <- supervisor:which_children(
- rabbit_tcp_client_sup)].
+ [rabbit_connection_sup:reader(ConnSup) ||
+ {_, ConnSup, supervisor, _}
+ <- supervisor:which_children(rabbit_tcp_client_sup)].
connection_info_keys() -> rabbit_reader:info_keys().
@@ -242,8 +244,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
close_connection(Pid, Explanation) ->
- case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end,
- supervisor:which_children(rabbit_tcp_client_sup)) of
+ case lists:member(Pid, connections()) of
true -> rabbit_reader:shutdown(Pid, Explanation);
false -> throw({error, {not_a_connection_pid, Pid}})
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f947cd90..eaf56886 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,11 +33,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/1, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/3]).
+-export([init/2, mainloop/2]).
-export([conserve_memory/2, server_properties/0]).
@@ -59,8 +59,9 @@
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_length, recv_ref,
- connection_state, queue_collector, heartbeater, stats_timer}).
+-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
+ connection_state, queue_collector, heartbeater, stats_timer,
+ channel_sup_sup_pid}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -160,6 +161,7 @@
-ifdef(use_specs).
+-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
@@ -168,25 +170,33 @@
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+%% These specs only exists to add no_return() to keep dialyzer happy
+-spec(init/2 :: (pid(), pid()) -> no_return()).
+-spec(start_connection/5 ::
+ (pid(), pid(), any(), rabbit_networking:socket(),
+ fun ((rabbit_networking:socket()) ->
+ rabbit_types:ok_or_error2(
+ rabbit_networking:socket(), any()))) -> no_return()).
+
-endif.
%%--------------------------------------------------------------------------
-start_link() ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+start_link(ChannelSupSupPid) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent) ->
+init(Parent, ChannelSupSupPid) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(Parent, Deb, Sock, SockTransform)
+ start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
- ?MODULE:mainloop(Parent, Deb, State).
+ ?MODULE:mainloop(Deb, State = #v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -261,7 +271,7 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, Deb, Sock, SockTransform) ->
+start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
PeerAddressS = inet_parse:ntoa(PeerAddress),
@@ -271,26 +281,29 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_queue_collector:start_link(),
+ [Collector] = supervisor2:find_child(Parent, collector),
try
- mainloop(Parent, Deb, switch_callback(
- #v1{sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none,
- protocol = none},
- callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
- connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
- rabbit_event:init_stats_timer()},
- handshake, 8))
+ mainloop(Deb, switch_callback(
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
+ rabbit_event:init_stats_timer(),
+ channel_sup_sup_pid = ChannelSupSupPid
+ },
+ handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
@@ -308,20 +321,18 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
- rabbit_misc:unlink_and_capture_exit(Collector),
- rabbit_queue_collector:shutdown(Collector),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
-mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
+mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
{State1, Callback1, Length1} =
handle_input(State#v1.callback, Data,
State#v1{recv_ref = none}),
- mainloop(Parent, Deb,
+ mainloop(Deb,
switch_callback(State1, Callback1, Length1));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
@@ -332,7 +343,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
{conserve_memory, Conserve} ->
- mainloop(Parent, Deb, internal_conserve_memory(Conserve, State));
+ mainloop(Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -348,42 +359,39 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
{channel_exit, Channel, Reason} ->
- mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
+ mainloop(Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
- mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
+ mainloop(Deb, handle_dependent_exit(Pid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
if ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
true ->
throw({handshake_timeout, State#v1.callback})
end;
- timeout ->
- throw({timeout, State#v1.connection_state});
{'$gen_call', From, {shutdown, Explanation}} ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
case ForceTermination of
force -> ok;
- normal -> mainloop(Parent, Deb, NewState)
+ normal -> mainloop(Deb, NewState)
end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
{'$gen_call', From, {info, Items}} ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
{'$gen_cast', emit_stats} ->
internal_emit_stats(State),
- mainloop(Parent, Deb,
- State#v1{stats_timer =
- rabbit_event:reset_stats_timer_after(
- State#v1.stats_timer)});
+ mainloop(Deb, State#v1{stats_timer =
+ rabbit_event:reset_stats_timer_after(
+ State#v1.stats_timer)});
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -447,13 +455,23 @@ close_channel(Channel, State) ->
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, normal, State) ->
- erase({chpid, Pid}),
- maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> maybe_close(handle_exception(State, Channel, Reason))
+ case (case Reason of
+ shutdown -> controlled;
+ {shutdown, _Term} -> controlled;
+ normal -> controlled;
+ _ -> uncontrolled
+ end) of
+ controlled ->
+ erase({chpid, Pid}),
+ maybe_close(State);
+ uncontrolled ->
+ case channel_cleanup(Pid) of
+ undefined ->
+ exit({abnormal_dependent_exit, Pid, Reason});
+ Channel ->
+ maybe_close(handle_exception(State, Channel, Reason))
+ end
end.
channel_cleanup(Pid) ->
@@ -467,7 +485,8 @@ channel_cleanup(Pid) ->
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
terminate_channels() ->
- NChannels = length([exit(Pid, normal) || Pid <- all_channels()]),
+ NChannels =
+ length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -491,7 +510,9 @@ wait_for_channel_termination(N, TimerRef) ->
exit({abnormal_dependent_exit, Pid, Reason});
Channel ->
case Reason of
- normal -> ok;
+ normal -> ok;
+ shutdown -> ok;
+ {shutdown, _Term} -> ok;
_ ->
rabbit_log:error(
"connection ~p, channel ~p - "
@@ -728,7 +749,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
client_properties = ClientProperties}};
handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
- State = #v1{connection_state = tuning,
+ State = #v1{parent = Parent,
+ connection_state = tuning,
connection = Connection,
sock = Sock}) ->
if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
@@ -740,8 +762,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = rabbit_heartbeat:start_heartbeat(
- Sock, ClientHeartbeat),
+ Heartbeater =
+ rabbit_heartbeat:start_heartbeat(Parent, Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
@@ -846,21 +868,21 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame,
- State = #v1{queue_collector = Collector}) ->
- #v1{sock = Sock, connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost,
- protocol = Protocol}} = State,
- {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
- {ok, ChPid} = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector],
- Protocol),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
+send_to_new_channel(Channel, AnalyzedFrame, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ {ok, _ChanSup, FrChPid} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, [Protocol, Sock, Channel, FrameMax,
+ self(), Username, VHost, Collector]),
+ link(FrChPid),
+ put({channel, Channel}, {chpid, FrChPid}),
+ put({chpid, FrChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6812b8d4..4b59844f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1015,6 +1015,8 @@ test_server_status() ->
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
+
+ unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
passed.
@@ -1068,14 +1070,23 @@ test_hooks() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>,
- <<"/">>, self()),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
+ <<"guest">>, <<"/">>, self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
- MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
end,
- {Writer, Ch, MRef}.
+ {Writer, Ch}.
+
+expect_normal_channel_termination(Ch) ->
+ receive {'EXIT', Ch, normal} -> ok
+ after 1000 -> throw({channel_failed_to_shutdown, Ch})
+ end.
+
+gobble_channel_exit() ->
+ receive {channel_exit, _, _} -> ok
+ after 1000 -> throw(channel_exit_not_received)
+ end.
test_statistics_receiver(Pid) ->
receive
@@ -1114,7 +1125,7 @@ test_statistics() ->
%% by far the most complex code though.
%% Set up a channel and queue
- {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1),
rabbit_channel:do(Ch, #'queue.declare'{}),
QName = receive #'queue.declare_ok'{queue = Q0} ->
Q0
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f90ee734..483b46f7 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/4, start_link/4, shutdown/1, mainloop/1]).
+-export([start/4, start_link/4, flush/1, mainloop/1]).
-export([send_command/2, send_command/3, send_command_and_signal_back/3,
send_command_and_signal_back/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
@@ -79,23 +79,26 @@
rabbit_framing:amqp_method_record(), rabbit_types:content(),
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}])}.
-
-start_link(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ {ok,
+ proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}])}.
+start_link(Sock, Channel, FrameMax, Protocol) ->
+ {ok,
+ proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
mainloop(State) ->
receive
Message -> ?MODULE:mainloop(handle_message(Message, State))
@@ -144,8 +147,9 @@ handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
-handle_message(shutdown, _State) ->
- exit(normal);
+handle_message({flush, Pid, Ref}, State) ->
+ Pid ! Ref,
+ State;
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
@@ -171,10 +175,10 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-shutdown(W) ->
- W ! shutdown,
- rabbit_misc:unlink_and_capture_exit(W),
- ok.
+flush(W) ->
+ Ref = make_ref(),
+ W ! {flush, self(), Ref},
+ receive Ref -> ok end.
%---------------------------------------------------------------------------
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index fb4c9b02..7616e44d 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -31,6 +31,10 @@
%% the MaxT and MaxR parameters to permit the child to be
%% restarted. This may require waiting for longer than Delay.
%%
+%% 4) When the MaxR (intensity) == 0, errors that would otherwise be
+%% reported concerning child death, or the reaching of max restart
+%% intensity are elided.
+%%
%% All modifications are (C) 2010 Rabbit Technologies Ltd.
%%
%% %CopyrightBegin%
@@ -58,7 +62,7 @@
-export([start_link/2,start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1,
+ which_children/1, find_child/2,
check_childspecs/1]).
-export([behaviour_info/1]).
@@ -133,6 +137,10 @@ terminate_child(Supervisor, Name) ->
which_children(Supervisor) ->
call(Supervisor, which_children).
+find_child(Supervisor, Name) ->
+ [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor),
+ Name1 =:= Name].
+
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
@@ -341,7 +349,6 @@ handle_call(which_children, _From, State) ->
State#state.children),
{reply, Resp, State}.
-
handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
when ?is_simple(State) ->
{ok, NState} = do_restart(RestartType, Reason, Child, State),
@@ -532,7 +539,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
{ok, NState}
end;
do_restart(permanent, Reason, Child, State) ->
- report_error(child_terminated, Reason, Child, State#state.name),
+ report_error(child_terminated, Reason, Child, State),
restart(Child, State);
do_restart(_, normal, Child, State) ->
NState = state_del_child(Child, State),
@@ -541,10 +548,10 @@ do_restart(_, shutdown, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
do_restart(transient, Reason, Child, State) ->
- report_error(child_terminated, Reason, Child, State#state.name),
+ report_error(child_terminated, Reason, Child, State),
restart(Child, State);
do_restart(temporary, Reason, Child, State) ->
- report_error(child_terminated, Reason, Child, State#state.name),
+ report_error(child_terminated, Reason, Child, State),
NState = state_del_child(Child, State),
{ok, NState}.
@@ -553,8 +560,8 @@ restart(Child, State) ->
{ok, NState} ->
restart(NState#state.strategy, Child, NState, fun restart/2);
{terminate, NState} ->
- report_error(shutdown, reached_max_restart_intensity,
- Child, State#state.name),
+ report_error(shutdown, reached_max_restart_intensity,
+ Child, State),
{shutdown, remove_child(Child, NState)}
end.
@@ -673,6 +680,8 @@ shutdown(Pid, brutal_kill) ->
{'DOWN', _MRef, process, Pid, OtherReason} ->
{error, OtherReason}
end;
+ normal_shutdown ->
+ ok;
{error, Reason} ->
{error, Reason}
end;
@@ -694,6 +703,8 @@ shutdown(Pid, Time) ->
{error, OtherReason}
end
end;
+ normal_shutdown ->
+ ok;
{error, Reason} ->
{error, Reason}
end.
@@ -714,7 +725,12 @@ monitor_child(Pid) ->
{'EXIT', Pid, Reason} ->
receive
{'DOWN', _, process, Pid, _} ->
- {error, Reason}
+ case Reason of
+ normal -> normal_shutdown;
+ shutdown -> normal_shutdown;
+ {shutdown, _Terms} -> normal_shutdown;
+ _ -> {error, Reason}
+ end
end
after 0 ->
%% If a naughty child did unlink and the child dies before
@@ -968,6 +984,10 @@ difference({_, TimeS, _}, {_, CurS, _}) ->
%%% Error and progress reporting.
%%% ------------------------------------------------------
+report_error(_Error, _Reason, _Child, #state{intensity = 0}) ->
+ ok;
+report_error(Error, Reason, Child, #state{name=Name}) ->
+ report_error(Error, Reason, Child, Name);
report_error(Error, Reason, Child, SupName) ->
ErrorMsg = [{supervisor, SupName},
{errorContext, Error},
diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl
index 1b785843..02d7e0e4 100644
--- a/src/tcp_client_sup.erl
+++ b/src/tcp_client_sup.erl
@@ -31,19 +31,19 @@
-module(tcp_client_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
-export([start_link/1, start_link/2]).
-export([init/1]).
start_link(Callback) ->
- supervisor:start_link(?MODULE, Callback).
+ supervisor2:start_link(?MODULE, Callback).
start_link(SupName, Callback) ->
- supervisor:start_link(SupName, ?MODULE, Callback).
+ supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one, 10, 10},
+ {ok, {{simple_one_for_one_terminate, 10, 10},
[{tcp_client, {M,F,A},
- temporary, brutal_kill, worker, [M]}]}}.
+ temporary, infinity, supervisor, [M]}]}}.