diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-09 17:05:04 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-09 17:05:04 +0100 |
commit | ae37555e5e0f1a5e6b64c2b9de5a81747d000ab6 (patch) | |
tree | ad1a341d542aa7f1d3ff8f30f92a47ab3286a4c6 | |
parent | 634565b83de7d527f85c18a94402c4c117c41e3e (diff) | |
parent | e67f6a1a24ed1e78ec501de8057f19664c18fb07 (diff) | |
download | rabbitmq-server-ae37555e5e0f1a5e6b64c2b9de5a81747d000ab6.tar.gz |
Merging default into bug 15930
-rw-r--r-- | src/rabbit_channel.erl | 28 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 83 | ||||
-rw-r--r-- | src/rabbit_channel_sup_sup.erl | 49 | ||||
-rw-r--r-- | src/rabbit_connection_sup.erl | 65 | ||||
-rw-r--r-- | src/rabbit_framing_channel.erl | 20 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 64 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 17 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 137 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 21 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 32 | ||||
-rw-r--r-- | src/supervisor2.erl | 21 | ||||
-rw-r--r-- | src/tcp_client_sup.erl | 10 |
12 files changed, 408 insertions, 139 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 582960e7..f8d4f307 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -43,7 +43,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, @@ -78,7 +78,8 @@ -spec(start_link/6 :: (channel_number(), pid(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). + rabbit_types:vhost(), pid()) -> + 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -101,7 +102,7 @@ %%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, + gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid, Username, VHost, CollectorPid], []). do(Pid, Method) -> @@ -150,12 +151,14 @@ flush(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> +init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, + CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, channel = Channel, + parent_pid = ParentPid, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, @@ -259,8 +262,10 @@ terminate(_Reason, State = #ch{state = terminating}) -> terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of - normal -> ok = Res; - _ -> ok + normal -> ok = Res; + shutdown -> ok = Res; + {shutdown, _Term} -> ok = Res; + _ -> ok end, terminate(State). @@ -1016,8 +1021,13 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ}) -> - {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)), +start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> + Me = self(), + {ok, LPid} = + supervisor:start_child( + ParentPid, + {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}), ok = limit_queues(LPid, State), LPid. @@ -1092,7 +1102,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]), - rabbit_writer:shutdown(WriterPid), + rabbit_writer:flush(WriterPid), rabbit_limiter:shutdown(LimiterPid). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl new file mode 100644 index 00000000..e4dcbae1 --- /dev/null +++ b/src/rabbit_channel_sup.erl @@ -0,0 +1,83 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup). + +-behaviour(supervisor2). + +-export([start_link/8]). + +-export([init/1]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/8 :: + (rabbit_types:protocol(), rabbit_net:socket(), + rabbit_channel:channel_number(), non_neg_integer(), pid(), + rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> + {'ok', pid(), pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, + Collector) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, WriterPid} = + supervisor2:start_child( + SupPid, + {writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ReaderPid, WriterPid, Username, VHost, + Collector]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, FramingChannelPid} = + supervisor2:start_child( + SupPid, + {framing_channel, {rabbit_framing_channel, start_link, + [ChannelPid, Protocol]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), + {ok, SupPid, FramingChannelPid}. + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 10, 10}, []}}. diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl new file mode 100644 index 00000000..2fab8678 --- /dev/null +++ b/src/rabbit_channel_sup_sup.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup_sup). + +-behaviour(supervisor2). + +-export([start_link/0, start_channel/2]). + +-export([init/1]). + +start_link() -> + supervisor2:start_link(?MODULE, []). + +init([]) -> + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{channel_sup, {rabbit_channel_sup, start_link, []}, + temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. + +start_channel(Pid, Args) -> + supervisor2:start_child(Pid, Args). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl new file mode 100644 index 00000000..f097f80a --- /dev/null +++ b/src/rabbit_connection_sup.erl @@ -0,0 +1,65 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_connection_sup). + +-behaviour(supervisor2). + +-export([start_link/0, reader/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +start_link() -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, Collector} = + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + {ok, ChannelSupSupPid} = + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + {ok, _ReaderPid} = + supervisor2:start_child( + SupPid, + {reader, {rabbit_reader, start_link, [ChannelSupSupPid, Collector]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), + {ok, SupPid}. + +init([]) -> + {ok, {{one_for_all, 10, 10}, []}}. + +reader(Pid) -> + hd(supervisor2:find_child(Pid, reader)). diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 553faaa8..08aaafe1 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,23 +32,17 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/3, process/2, shutdown/1]). +-export([start_link/2, process/2, shutdown/1]). %% internal -export([mainloop/3]). %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs, Protocol) -> +start_link(ChannelPid, Protocol) -> Parent = self(), - {ok, spawn_link( - fun () -> - %% we trap exits so that a normal termination of - %% the channel or reader process terminates us too. - process_flag(trap_exit, true), - {ok, ChannelPid} = apply(StartFun, StartArgs), - mainloop(Parent, ChannelPid, Protocol) - end)}. + {ok, proc_lib:spawn_link( + fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -62,12 +56,6 @@ shutdown(Pid) -> read_frame(ChannelPid) -> receive - %% converting the exit signal into one of our own ensures that - %% the reader sees the right pid (i.e. ours) when a channel - %% exits. Similarly in the other direction, though it is not - %% really relevant there since the channel is not specifically - %% watching out for reader exit signals. - {'EXIT', _Pid, Reason} -> exit(Reason); {frame, Frame} -> Frame; terminate -> rabbit_channel:shutdown(ChannelPid), read_frame(ChannelPid); diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index faddffc1..b277de70 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,7 +31,11 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]). +-export([start_heartbeat/3, pause_monitor/1, resume_monitor/1, + start_heartbeat_sender/2, + start_heartbeat_receiver/2]). + +-include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -39,7 +43,13 @@ -type(pids() :: rabbit_types:maybe({pid(), pid()})). --spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()). +-spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) -> + pids()). +-spec(start_heartbeat_sender/2 :: (rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/2 :: (rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). + -spec(pause_monitor/1 :: (pids()) -> 'ok'). -spec(resume_monitor/1 :: (pids()) -> 'ok'). @@ -47,31 +57,44 @@ %%---------------------------------------------------------------------------- -start_heartbeat(_Sock, 0) -> +start_heartbeat(_Sup, _Sock, 0) -> none; -start_heartbeat(Sock, TimeoutSec) -> - Parent = self(), +start_heartbeat(Sup, Sock, TimeoutSec) -> + {ok, Sender} = + supervisor:start_child( + Sup, {heartbeat_sender, + {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]}, + permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, Receiver} = + supervisor:start_child( + Sup, {heartbeat_receiver, + {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]}, + permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {Sender, Receiver}. + +start_heartbeat_sender(Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - Sender = - spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, - send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end}, Parent) end), + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, + send_oct, 0, + fun () -> + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end}, Parent) + end)}. + +start_heartbeat_receiver(Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - Receiver = - spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000, - recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end}, Parent) end), - {Sender, Receiver}. + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> heartbeater({Sock, TimeoutSec * 1000, + recv_oct, 1, + fun () -> exit(timeout) end}, Parent) end)}. pause_monitor(none) -> ok; @@ -116,7 +139,6 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Recurse({NewStatVal, SameCount + 1}); true -> case Handler() of - stop -> ok; continue -> Recurse({NewStatVal, 0}) end end; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 3a3357ba..492d7d01 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -118,7 +118,7 @@ start() -> {rabbit_tcp_client_sup, {tcp_client_sup, start_link, [{local, rabbit_tcp_client_sup}, - {rabbit_reader,start_link,[]}]}, + {rabbit_connection_sup,start_link,[]}]}, transient, infinity, supervisor, [tcp_client_sup]}), ok. @@ -205,9 +205,10 @@ on_node_down(Node) -> start_client(Sock, SockTransform) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock, SockTransform}, - Child. + Reader = rabbit_connection_sup:reader(Child), + ok = rabbit_net:controlling_process(Sock, Reader), + Reader ! {go, Sock, SockTransform}, + Reader. start_client(Sock) -> start_client(Sock, fun (S) -> {ok, S} end). @@ -230,8 +231,9 @@ start_ssl_client(SslOpts, Sock) -> end). connections() -> - [Pid || {_, Pid, _, _} <- supervisor:which_children( - rabbit_tcp_client_sup)]. + [rabbit_connection_sup:reader(ConnSup) || + {_, ConnSup, supervisor, _} + <- supervisor:which_children(rabbit_tcp_client_sup)]. connection_info_keys() -> rabbit_reader:info_keys(). @@ -242,8 +244,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). close_connection(Pid, Explanation) -> - case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end, - supervisor:which_children(rabbit_tcp_client_sup)) of + case lists:member(Pid, connections()) of true -> rabbit_reader:shutdown(Pid, Explanation); false -> throw({error, {not_a_connection_pid, Pid}}) end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d5ade90f..6e336c86 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/2, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/2]). +-export([init/3, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -60,7 +60,8 @@ %--------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, - connection_state, queue_collector, heartbeater, stats_timer}). + connection_state, queue_collector, heartbeater, stats_timer, + channel_sup_sup_pid}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -160,6 +161,7 @@ -ifdef(use_specs). +-spec(start_link/2 :: (pid(), pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -168,21 +170,30 @@ -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). +%% These specs only exists to add no_return() to keep dialyzer happy +-spec(init/3 :: (pid(), pid(), pid()) -> no_return()). +-spec(start_connection/6 :: + (pid(), pid(), pid(), any(), rabbit_networking:socket(), + fun ((rabbit_networking:socket()) -> + rabbit_types:ok_or_error2( + rabbit_networking:socket(), any()))) -> no_return()). + -endif. %%-------------------------------------------------------------------------- -start_link() -> - {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +start_link(ChannelSupSupPid, Collector) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, Collector])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent) -> +init(Parent, ChannelSupSupPid, Collector) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection(Parent, Deb, Sock, SockTransform) + start_connection( + Parent, ChannelSupSupPid, Collector, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -261,7 +272,8 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, Deb, Sock, SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock, + SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -271,27 +283,28 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - {ok, Collector} = rabbit_queue_collector:start_link(), try mainloop(Deb, switch_callback( - #v1{parent = Parent, - sock = ClientSock, - connection = #connection{ - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none, - protocol = none}, - callback = uninitialized_callback, - recv_length = 0, - recv_ref = none, - connection_state = pre_init, - queue_collector = Collector, - heartbeater = none, - stats_timer = - rabbit_event:init_stats_timer()}, - handshake, 8)) + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + protocol = none, + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none}, + callback = uninitialized_callback, + recv_length = 0, + recv_ref = none, + connection_state = pre_init, + queue_collector = Collector, + heartbeater = none, + stats_timer = + rabbit_event:init_stats_timer(), + channel_sup_sup_pid = ChannelSupSupPid + }, + handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; @@ -309,8 +322,6 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), - rabbit_misc:unlink_and_capture_exit(Collector), - rabbit_queue_collector:shutdown(Collector), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. @@ -361,8 +372,6 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> true -> throw({handshake_timeout, State#v1.callback}) end; - timeout -> - throw({timeout, State#v1.connection_state}); {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -449,13 +458,23 @@ handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) -> handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, normal, State) -> - erase({chpid, Pid}), - maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> maybe_close(handle_exception(State, Channel, Reason)) + case (case Reason of + shutdown -> controlled; + {shutdown, _Term} -> controlled; + normal -> controlled; + _ -> uncontrolled + end) of + controlled -> + erase({chpid, Pid}), + maybe_close(State); + uncontrolled -> + case channel_cleanup(Pid) of + undefined -> + exit({abnormal_dependent_exit, Pid, Reason}); + Channel -> + maybe_close(handle_exception(State, Channel, Reason)) + end end. channel_cleanup(Pid) -> @@ -469,7 +488,8 @@ channel_cleanup(Pid) -> all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. terminate_channels() -> - NChannels = length([exit(Pid, normal) || Pid <- all_channels()]), + NChannels = + length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -493,7 +513,9 @@ wait_for_channel_termination(N, TimerRef) -> exit({abnormal_dependent_exit, Pid, Reason}); Channel -> case Reason of - normal -> ok; + normal -> ok; + shutdown -> ok; + {shutdown, _Term} -> ok; _ -> rabbit_log:error( "connection ~p, channel ~p - " @@ -730,7 +752,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, client_properties = ClientProperties}}; handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, - State = #v1{connection_state = tuning, + State = #v1{parent = Parent, + connection_state = tuning, connection = Connection, sock = Sock}) -> if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> @@ -742,8 +765,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = rabbit_heartbeat:start_heartbeat( - Sock, ClientHeartbeat), + Heartbeater = + rabbit_heartbeat:start_heartbeat(Parent, Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, @@ -848,21 +871,21 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector}) -> - #v1{sock = Sock, connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost, - protocol = Protocol}} = State, - {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), - {ok, ChPid} = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector], - Protocol), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). +send_to_new_channel(Channel, AnalyzedFrame, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + {ok, _ChanSup, FrChPid} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, [Protocol, Sock, Channel, FrameMax, + self(), Username, VHost, Collector]), + link(FrChPid), + put({channel, Channel}, {chpid, FrChPid}), + put({chpid, FrChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c07055af..55897679 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1061,6 +1061,8 @@ test_server_status() -> %% cleanup [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], + + unlink(Ch), ok = rabbit_channel:shutdown(Ch), passed. @@ -1114,14 +1116,23 @@ test_hooks() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, - <<"/">>, self()), + {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, + <<"guest">>, <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), - MRef = erlang:monitor(process, Ch), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) end, - {Writer, Ch, MRef}. + {Writer, Ch}. + +expect_normal_channel_termination(Ch) -> + receive {'EXIT', Ch, normal} -> ok + after 1000 -> throw({channel_failed_to_shutdown, Ch}) + end. + +gobble_channel_exit() -> + receive {channel_exit, _, _} -> ok + after 1000 -> throw(channel_exit_not_received) + end. test_statistics_receiver(Pid) -> receive @@ -1160,7 +1171,7 @@ test_statistics() -> %% by far the most complex code though. %% Set up a channel and queue - {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1), + {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1), rabbit_channel:do(Ch, #'queue.declare'{}), QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index f90ee734..483b46f7 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/4, start_link/4, shutdown/1, mainloop/1]). +-export([start/4, start_link/4, flush/1, mainloop/1]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). @@ -79,23 +79,26 @@ rabbit_framing:amqp_method_record(), rabbit_types:content(), non_neg_integer(), rabbit_types:protocol()) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- start(Sock, Channel, FrameMax, Protocol) -> - {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}])}. - -start_link(Sock, Channel, FrameMax, Protocol) -> - {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + {ok, + proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}])}. +start_link(Sock, Channel, FrameMax, Protocol) -> + {ok, + proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}])}. + mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) @@ -144,8 +147,9 @@ handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); -handle_message(shutdown, _State) -> - exit(normal); +handle_message({flush, Pid, Ref}, State) -> + Pid ! Ref, + State; handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). @@ -171,10 +175,10 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -shutdown(W) -> - W ! shutdown, - rabbit_misc:unlink_and_capture_exit(W), - ok. +flush(W) -> + Ref = make_ref(), + W ! {flush, self(), Ref}, + receive Ref -> ok end. %--------------------------------------------------------------------------- diff --git a/src/supervisor2.erl b/src/supervisor2.erl index fb4c9b02..3ded5d7f 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -31,6 +31,11 @@ %% the MaxT and MaxR parameters to permit the child to be %% restarted. This may require waiting for longer than Delay. %% +%% 4) Added an 'intrinsic' restart type. Like the transient type, this +%% type means the child should only be restarted if the child exits +%% abnormally. Unlike the transient type, if the child exits +%% normally, the supervisor itself also exits normally. +%% %% All modifications are (C) 2010 Rabbit Technologies Ltd. %% %% %CopyrightBegin% @@ -58,7 +63,7 @@ -export([start_link/2,start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, + which_children/1, find_child/2, check_childspecs/1]). -export([behaviour_info/1]). @@ -133,6 +138,10 @@ terminate_child(Supervisor, Name) -> which_children(Supervisor) -> call(Supervisor, which_children). +find_child(Supervisor, Name) -> + [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor), + Name1 =:= Name]. + call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). @@ -531,6 +540,8 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> [self(), {{RestartType, Delay}, Reason, Child}]), {ok, NState} end; +do_restart(intrinsic, normal, Child, State) -> + {shutdown, state_del_child(Child, State)}; do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); @@ -540,7 +551,8 @@ do_restart(_, normal, Child, State) -> do_restart(_, shutdown, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; -do_restart(transient, Reason, Child, State) -> +do_restart(Type, Reason, Child, State) when Type =:= transient orelse + Type =:= intrinsic -> report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); do_restart(temporary, Reason, Child, State) -> @@ -833,8 +845,8 @@ supname(N,_) -> N. %%% {Name, Func, RestartType, Shutdown, ChildType, Modules} %%% where Name is an atom %%% Func is {Mod, Fun, Args} == {atom, atom, list} -%%% RestartType is permanent | temporary | transient | -%%% {permanent, Delay} | +%%% RestartType is intrinsic | permanent | temporary | +%%% transient | {permanent, Delay} | %%% {transient, Delay} where Delay >= 0 %%% Shutdown = integer() | infinity | brutal_kill %%% ChildType = supervisor | worker @@ -881,6 +893,7 @@ validFunc({M, F, A}) when is_atom(M), is_list(A) -> true; validFunc(Func) -> throw({invalid_mfa, Func}). +validRestartType(intrinsic) -> true; validRestartType(permanent) -> true; validRestartType(temporary) -> true; validRestartType(transient) -> true; diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl index 1b785843..02d7e0e4 100644 --- a/src/tcp_client_sup.erl +++ b/src/tcp_client_sup.erl @@ -31,19 +31,19 @@ -module(tcp_client_sup). --behaviour(supervisor). +-behaviour(supervisor2). -export([start_link/1, start_link/2]). -export([init/1]). start_link(Callback) -> - supervisor:start_link(?MODULE, Callback). + supervisor2:start_link(?MODULE, Callback). start_link(SupName, Callback) -> - supervisor:start_link(SupName, ?MODULE, Callback). + supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one, 10, 10}, + {ok, {{simple_one_for_one_terminate, 10, 10}, [{tcp_client, {M,F,A}, - temporary, brutal_kill, worker, [M]}]}}. + temporary, infinity, supervisor, [M]}]}}. |