summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-09 17:05:04 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-09 17:05:04 +0100
commitae37555e5e0f1a5e6b64c2b9de5a81747d000ab6 (patch)
treead1a341d542aa7f1d3ff8f30f92a47ab3286a4c6
parent634565b83de7d527f85c18a94402c4c117c41e3e (diff)
parente67f6a1a24ed1e78ec501de8057f19664c18fb07 (diff)
downloadrabbitmq-server-ae37555e5e0f1a5e6b64c2b9de5a81747d000ab6.tar.gz
Merging default into bug 15930
-rw-r--r--src/rabbit_channel.erl28
-rw-r--r--src/rabbit_channel_sup.erl83
-rw-r--r--src/rabbit_channel_sup_sup.erl49
-rw-r--r--src/rabbit_connection_sup.erl65
-rw-r--r--src/rabbit_framing_channel.erl20
-rw-r--r--src/rabbit_heartbeat.erl64
-rw-r--r--src/rabbit_networking.erl17
-rw-r--r--src/rabbit_reader.erl137
-rw-r--r--src/rabbit_tests.erl21
-rw-r--r--src/rabbit_writer.erl32
-rw-r--r--src/supervisor2.erl21
-rw-r--r--src/tcp_client_sup.erl10
12 files changed, 408 insertions, 139 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 582960e7..f8d4f307 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -43,7 +43,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
--record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
+-record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
@@ -78,7 +78,8 @@
-spec(start_link/6 ::
(channel_number(), pid(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())).
+ rabbit_types:vhost(), pid()) ->
+ 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -101,7 +102,7 @@
%%----------------------------------------------------------------------------
start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid,
+ gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid,
Username, VHost, CollectorPid], []).
do(Pid, Method) ->
@@ -150,12 +151,14 @@ flush(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
+init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost,
+ CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
channel = Channel,
+ parent_pid = ParentPid,
reader_pid = ReaderPid,
writer_pid = WriterPid,
limiter_pid = undefined,
@@ -259,8 +262,10 @@ terminate(_Reason, State = #ch{state = terminating}) ->
terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
- normal -> ok = Res;
- _ -> ok
+ normal -> ok = Res;
+ shutdown -> ok = Res;
+ {shutdown, _Term} -> ok = Res;
+ _ -> ok
end,
terminate(State).
@@ -1016,8 +1021,13 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
- {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) ->
+ Me = self(),
+ {ok, LPid} =
+ supervisor:start_child(
+ ParentPid,
+ {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]}),
ok = limit_queues(LPid, State),
LPid.
@@ -1092,7 +1102,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]),
- rabbit_writer:shutdown(WriterPid),
+ rabbit_writer:flush(WriterPid),
rabbit_limiter:shutdown(LimiterPid).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
new file mode 100644
index 00000000..e4dcbae1
--- /dev/null
+++ b/src/rabbit_channel_sup.erl
@@ -0,0 +1,83 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/8]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/8 ::
+ (rabbit_types:protocol(), rabbit_net:socket(),
+ rabbit_channel:channel_number(), non_neg_integer(), pid(),
+ rabbit_access_control:username(), rabbit_types:vhost(), pid()) ->
+ {'ok', pid(), pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+ Collector) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, WriterPid} =
+ supervisor2:start_child(
+ SupPid,
+ {writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, WriterPid, Username, VHost,
+ Collector]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, FramingChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {framing_channel, {rabbit_framing_channel, start_link,
+ [ChannelPid, Protocol]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
+ {ok, SupPid, FramingChannelPid}.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 10, 10}, []}}.
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
new file mode 100644
index 00000000..2fab8678
--- /dev/null
+++ b/src/rabbit_channel_sup_sup.erl
@@ -0,0 +1,49 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, start_channel/2]).
+
+-export([init/1]).
+
+start_link() ->
+ supervisor2:start_link(?MODULE, []).
+
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{channel_sup, {rabbit_channel_sup, start_link, []},
+ temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
+
+start_channel(Pid, Args) ->
+ supervisor2:start_child(Pid, Args).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
new file mode 100644
index 00000000..f097f80a
--- /dev/null
+++ b/src/rabbit_connection_sup.erl
@@ -0,0 +1,65 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_connection_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, reader/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+start_link() ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, Collector} =
+ supervisor2:start_child(
+ SupPid,
+ {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ {ok, _ReaderPid} =
+ supervisor2:start_child(
+ SupPid,
+ {reader, {rabbit_reader, start_link, [ChannelSupSupPid, Collector]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
+ {ok, SupPid}.
+
+init([]) ->
+ {ok, {{one_for_all, 10, 10}, []}}.
+
+reader(Pid) ->
+ hd(supervisor2:find_child(Pid, reader)).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 553faaa8..08aaafe1 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,23 +32,17 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/3, process/2, shutdown/1]).
+-export([start_link/2, process/2, shutdown/1]).
%% internal
-export([mainloop/3]).
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs, Protocol) ->
+start_link(ChannelPid, Protocol) ->
Parent = self(),
- {ok, spawn_link(
- fun () ->
- %% we trap exits so that a normal termination of
- %% the channel or reader process terminates us too.
- process_flag(trap_exit, true),
- {ok, ChannelPid} = apply(StartFun, StartArgs),
- mainloop(Parent, ChannelPid, Protocol)
- end)}.
+ {ok, proc_lib:spawn_link(
+ fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -62,12 +56,6 @@ shutdown(Pid) ->
read_frame(ChannelPid) ->
receive
- %% converting the exit signal into one of our own ensures that
- %% the reader sees the right pid (i.e. ours) when a channel
- %% exits. Similarly in the other direction, though it is not
- %% really relevant there since the channel is not specifically
- %% watching out for reader exit signals.
- {'EXIT', _Pid, Reason} -> exit(Reason);
{frame, Frame} -> Frame;
terminate -> rabbit_channel:shutdown(ChannelPid),
read_frame(ChannelPid);
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index faddffc1..b277de70 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,7 +31,11 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
+-export([start_heartbeat/3, pause_monitor/1, resume_monitor/1,
+ start_heartbeat_sender/2,
+ start_heartbeat_receiver/2]).
+
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
@@ -39,7 +43,13 @@
-type(pids() :: rabbit_types:maybe({pid(), pid()})).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()).
+-spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ pids()).
+-spec(start_heartbeat_sender/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+
-spec(pause_monitor/1 :: (pids()) -> 'ok').
-spec(resume_monitor/1 :: (pids()) -> 'ok').
@@ -47,31 +57,44 @@
%%----------------------------------------------------------------------------
-start_heartbeat(_Sock, 0) ->
+start_heartbeat(_Sup, _Sock, 0) ->
none;
-start_heartbeat(Sock, TimeoutSec) ->
- Parent = self(),
+start_heartbeat(Sup, Sock, TimeoutSec) ->
+ {ok, Sender} =
+ supervisor:start_child(
+ Sup, {heartbeat_sender,
+ {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]},
+ permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {ok, Receiver} =
+ supervisor:start_child(
+ Sup, {heartbeat_receiver,
+ {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]},
+ permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {Sender, Receiver}.
+
+start_heartbeat_sender(Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- Sender =
- spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
- send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end}, Parent) end),
+ Parent = self(),
+ {ok, proc_lib:spawn_link(
+ fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
+ send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}, Parent)
+ end)}.
+
+start_heartbeat_receiver(Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- Receiver =
- spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000,
- recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end}, Parent) end),
- {Sender, Receiver}.
+ Parent = self(),
+ {ok, proc_lib:spawn_link(
+ fun () -> heartbeater({Sock, TimeoutSec * 1000,
+ recv_oct, 1,
+ fun () -> exit(timeout) end}, Parent) end)}.
pause_monitor(none) ->
ok;
@@ -116,7 +139,6 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Recurse({NewStatVal, SameCount + 1});
true ->
case Handler() of
- stop -> ok;
continue -> Recurse({NewStatVal, 0})
end
end;
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 3a3357ba..492d7d01 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -118,7 +118,7 @@ start() ->
{rabbit_tcp_client_sup,
{tcp_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
- {rabbit_reader,start_link,[]}]},
+ {rabbit_connection_sup,start_link,[]}]},
transient, infinity, supervisor, [tcp_client_sup]}),
ok.
@@ -205,9 +205,10 @@ on_node_down(Node) ->
start_client(Sock, SockTransform) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- ok = rabbit_net:controlling_process(Sock, Child),
- Child ! {go, Sock, SockTransform},
- Child.
+ Reader = rabbit_connection_sup:reader(Child),
+ ok = rabbit_net:controlling_process(Sock, Reader),
+ Reader ! {go, Sock, SockTransform},
+ Reader.
start_client(Sock) ->
start_client(Sock, fun (S) -> {ok, S} end).
@@ -230,8 +231,9 @@ start_ssl_client(SslOpts, Sock) ->
end).
connections() ->
- [Pid || {_, Pid, _, _} <- supervisor:which_children(
- rabbit_tcp_client_sup)].
+ [rabbit_connection_sup:reader(ConnSup) ||
+ {_, ConnSup, supervisor, _}
+ <- supervisor:which_children(rabbit_tcp_client_sup)].
connection_info_keys() -> rabbit_reader:info_keys().
@@ -242,8 +244,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
close_connection(Pid, Explanation) ->
- case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end,
- supervisor:which_children(rabbit_tcp_client_sup)) of
+ case lists:member(Pid, connections()) of
true -> rabbit_reader:shutdown(Pid, Explanation);
false -> throw({error, {not_a_connection_pid, Pid}})
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index d5ade90f..6e336c86 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,11 +33,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/2, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/2]).
+-export([init/3, mainloop/2]).
-export([conserve_memory/2, server_properties/0]).
@@ -60,7 +60,8 @@
%---------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
- connection_state, queue_collector, heartbeater, stats_timer}).
+ connection_state, queue_collector, heartbeater, stats_timer,
+ channel_sup_sup_pid}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -160,6 +161,7 @@
-ifdef(use_specs).
+-spec(start_link/2 :: (pid(), pid()) -> rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
@@ -168,21 +170,30 @@
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+%% These specs only exists to add no_return() to keep dialyzer happy
+-spec(init/3 :: (pid(), pid(), pid()) -> no_return()).
+-spec(start_connection/6 ::
+ (pid(), pid(), pid(), any(), rabbit_networking:socket(),
+ fun ((rabbit_networking:socket()) ->
+ rabbit_types:ok_or_error2(
+ rabbit_networking:socket(), any()))) -> no_return()).
+
-endif.
%%--------------------------------------------------------------------------
-start_link() ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+start_link(ChannelSupSupPid, Collector) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, Collector])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent) ->
+init(Parent, ChannelSupSupPid, Collector) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(Parent, Deb, Sock, SockTransform)
+ start_connection(
+ Parent, ChannelSupSupPid, Collector, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -261,7 +272,8 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, Deb, Sock, SockTransform) ->
+start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock,
+ SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
PeerAddressS = inet_parse:ntoa(PeerAddress),
@@ -271,27 +283,28 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_queue_collector:start_link(),
try
mainloop(Deb, switch_callback(
- #v1{parent = Parent,
- sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none,
- protocol = none},
- callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
- connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
- rabbit_event:init_stats_timer()},
- handshake, 8))
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
+ rabbit_event:init_stats_timer(),
+ channel_sup_sup_pid = ChannelSupSupPid
+ },
+ handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
@@ -309,8 +322,6 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
- rabbit_misc:unlink_and_capture_exit(Collector),
- rabbit_queue_collector:shutdown(Collector),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
@@ -361,8 +372,6 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
true ->
throw({handshake_timeout, State#v1.callback})
end;
- timeout ->
- throw({timeout, State#v1.connection_state});
{'$gen_call', From, {shutdown, Explanation}} ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -449,13 +458,23 @@ handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) ->
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, normal, State) ->
- erase({chpid, Pid}),
- maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> maybe_close(handle_exception(State, Channel, Reason))
+ case (case Reason of
+ shutdown -> controlled;
+ {shutdown, _Term} -> controlled;
+ normal -> controlled;
+ _ -> uncontrolled
+ end) of
+ controlled ->
+ erase({chpid, Pid}),
+ maybe_close(State);
+ uncontrolled ->
+ case channel_cleanup(Pid) of
+ undefined ->
+ exit({abnormal_dependent_exit, Pid, Reason});
+ Channel ->
+ maybe_close(handle_exception(State, Channel, Reason))
+ end
end.
channel_cleanup(Pid) ->
@@ -469,7 +488,8 @@ channel_cleanup(Pid) ->
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
terminate_channels() ->
- NChannels = length([exit(Pid, normal) || Pid <- all_channels()]),
+ NChannels =
+ length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -493,7 +513,9 @@ wait_for_channel_termination(N, TimerRef) ->
exit({abnormal_dependent_exit, Pid, Reason});
Channel ->
case Reason of
- normal -> ok;
+ normal -> ok;
+ shutdown -> ok;
+ {shutdown, _Term} -> ok;
_ ->
rabbit_log:error(
"connection ~p, channel ~p - "
@@ -730,7 +752,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
client_properties = ClientProperties}};
handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
- State = #v1{connection_state = tuning,
+ State = #v1{parent = Parent,
+ connection_state = tuning,
connection = Connection,
sock = Sock}) ->
if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
@@ -742,8 +765,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = rabbit_heartbeat:start_heartbeat(
- Sock, ClientHeartbeat),
+ Heartbeater =
+ rabbit_heartbeat:start_heartbeat(Parent, Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
@@ -848,21 +871,21 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame,
- State = #v1{queue_collector = Collector}) ->
- #v1{sock = Sock, connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost,
- protocol = Protocol}} = State,
- {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
- {ok, ChPid} = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector],
- Protocol),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
+send_to_new_channel(Channel, AnalyzedFrame, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ {ok, _ChanSup, FrChPid} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, [Protocol, Sock, Channel, FrameMax,
+ self(), Username, VHost, Collector]),
+ link(FrChPid),
+ put({channel, Channel}, {chpid, FrChPid}),
+ put({chpid, FrChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c07055af..55897679 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1061,6 +1061,8 @@ test_server_status() ->
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
+
+ unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
passed.
@@ -1114,14 +1116,23 @@ test_hooks() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>,
- <<"/">>, self()),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
+ <<"guest">>, <<"/">>, self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
- MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
end,
- {Writer, Ch, MRef}.
+ {Writer, Ch}.
+
+expect_normal_channel_termination(Ch) ->
+ receive {'EXIT', Ch, normal} -> ok
+ after 1000 -> throw({channel_failed_to_shutdown, Ch})
+ end.
+
+gobble_channel_exit() ->
+ receive {channel_exit, _, _} -> ok
+ after 1000 -> throw(channel_exit_not_received)
+ end.
test_statistics_receiver(Pid) ->
receive
@@ -1160,7 +1171,7 @@ test_statistics() ->
%% by far the most complex code though.
%% Set up a channel and queue
- {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1),
rabbit_channel:do(Ch, #'queue.declare'{}),
QName = receive #'queue.declare_ok'{queue = Q0} ->
Q0
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f90ee734..483b46f7 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/4, start_link/4, shutdown/1, mainloop/1]).
+-export([start/4, start_link/4, flush/1, mainloop/1]).
-export([send_command/2, send_command/3, send_command_and_signal_back/3,
send_command_and_signal_back/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
@@ -79,23 +79,26 @@
rabbit_framing:amqp_method_record(), rabbit_types:content(),
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}])}.
-
-start_link(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ {ok,
+ proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}])}.
+start_link(Sock, Channel, FrameMax, Protocol) ->
+ {ok,
+ proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
mainloop(State) ->
receive
Message -> ?MODULE:mainloop(handle_message(Message, State))
@@ -144,8 +147,9 @@ handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
-handle_message(shutdown, _State) ->
- exit(normal);
+handle_message({flush, Pid, Ref}, State) ->
+ Pid ! Ref,
+ State;
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
@@ -171,10 +175,10 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-shutdown(W) ->
- W ! shutdown,
- rabbit_misc:unlink_and_capture_exit(W),
- ok.
+flush(W) ->
+ Ref = make_ref(),
+ W ! {flush, self(), Ref},
+ receive Ref -> ok end.
%---------------------------------------------------------------------------
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index fb4c9b02..3ded5d7f 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -31,6 +31,11 @@
%% the MaxT and MaxR parameters to permit the child to be
%% restarted. This may require waiting for longer than Delay.
%%
+%% 4) Added an 'intrinsic' restart type. Like the transient type, this
+%% type means the child should only be restarted if the child exits
+%% abnormally. Unlike the transient type, if the child exits
+%% normally, the supervisor itself also exits normally.
+%%
%% All modifications are (C) 2010 Rabbit Technologies Ltd.
%%
%% %CopyrightBegin%
@@ -58,7 +63,7 @@
-export([start_link/2,start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1,
+ which_children/1, find_child/2,
check_childspecs/1]).
-export([behaviour_info/1]).
@@ -133,6 +138,10 @@ terminate_child(Supervisor, Name) ->
which_children(Supervisor) ->
call(Supervisor, which_children).
+find_child(Supervisor, Name) ->
+ [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor),
+ Name1 =:= Name].
+
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
@@ -531,6 +540,8 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
[self(), {{RestartType, Delay}, Reason, Child}]),
{ok, NState}
end;
+do_restart(intrinsic, normal, Child, State) ->
+ {shutdown, state_del_child(Child, State)};
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
@@ -540,7 +551,8 @@ do_restart(_, normal, Child, State) ->
do_restart(_, shutdown, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
-do_restart(transient, Reason, Child, State) ->
+do_restart(Type, Reason, Child, State) when Type =:= transient orelse
+ Type =:= intrinsic ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
do_restart(temporary, Reason, Child, State) ->
@@ -833,8 +845,8 @@ supname(N,_) -> N.
%%% {Name, Func, RestartType, Shutdown, ChildType, Modules}
%%% where Name is an atom
%%% Func is {Mod, Fun, Args} == {atom, atom, list}
-%%% RestartType is permanent | temporary | transient |
-%%% {permanent, Delay} |
+%%% RestartType is intrinsic | permanent | temporary |
+%%% transient | {permanent, Delay} |
%%% {transient, Delay} where Delay >= 0
%%% Shutdown = integer() | infinity | brutal_kill
%%% ChildType = supervisor | worker
@@ -881,6 +893,7 @@ validFunc({M, F, A}) when is_atom(M),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
+validRestartType(intrinsic) -> true;
validRestartType(permanent) -> true;
validRestartType(temporary) -> true;
validRestartType(transient) -> true;
diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl
index 1b785843..02d7e0e4 100644
--- a/src/tcp_client_sup.erl
+++ b/src/tcp_client_sup.erl
@@ -31,19 +31,19 @@
-module(tcp_client_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
-export([start_link/1, start_link/2]).
-export([init/1]).
start_link(Callback) ->
- supervisor:start_link(?MODULE, Callback).
+ supervisor2:start_link(?MODULE, Callback).
start_link(SupName, Callback) ->
- supervisor:start_link(SupName, ?MODULE, Callback).
+ supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one, 10, 10},
+ {ok, {{simple_one_for_one_terminate, 10, 10},
[{tcp_client, {M,F,A},
- temporary, brutal_kill, worker, [M]}]}}.
+ temporary, infinity, supervisor, [M]}]}}.