From 72c32850933d01b9ad7adf788af21affc40ac27a Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 6 Jul 2010 12:20:56 +0100 Subject: As a first step, get the channel, framing channel, and writer to exit with shutdown, not normal, as shutdown is the correct exit reason for supervised processes. Tests adjusted, and still pass --- src/rabbit_channel.erl | 26 ++++++++++++++------------ src/rabbit_reader.erl | 10 +++++----- src/rabbit_tests.erl | 32 +++++++++++++++++--------------- src/rabbit_writer.erl | 2 +- 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 94a20fbd..692d7bac 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -191,14 +191,16 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, normal, State#ch{state = terminating}} + {stop, shutdown, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, normal, terminating(Reason#amqp_error{method = MethodName}, + {stop, shutdown, terminating(Reason#amqp_error{method = MethodName}, State)}; exit:normal -> - {stop, normal, State}; + {stop, shutdown, State}; + exit:shutdown -> + {stop, shutdown, State}; _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -207,7 +209,7 @@ handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; handle_cast(terminate, State) -> - {stop, normal, State}; + {stop, shutdown, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), @@ -232,18 +234,18 @@ handle_cast({conserve_memory, _Conserve}, State) -> handle_cast({flow_timeout, Ref}, State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> - {stop, normal, terminating( - rabbit_misc:amqp_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", - [not Flow], none), State)}; + {stop, shutdown, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; handle_cast({flow_timeout, _Ref}, State) -> {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, normal, State}; + {stop, shutdown, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> @@ -259,8 +261,8 @@ terminate(_Reason, State = #ch{state = terminating}) -> terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of - normal -> ok = Res; - _ -> ok + shutdown -> ok = Res; + _ -> ok end, terminate(State). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a54e0de9..0a48e61a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -227,7 +227,7 @@ socket_op(Sock, Fun) -> [self(), Reason]), rabbit_log:info("closing TCP connection ~p~n", [self()]), - exit(normal) + exit(shutdown) end. start_connection(Parent, Deb, Sock, SockTransform) -> @@ -275,7 +275,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> rabbit_reader_queue_collector:shutdown(Collector), rabbit_misc:unlink_and_capture_exit(Collector) end, - done. + exit(shutdown). mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), @@ -379,7 +379,7 @@ close_channel(Channel, State) -> handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, normal, State) -> +handle_dependent_exit(Pid, shutdown, State) -> erase({chpid, Pid}), maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> @@ -399,7 +399,7 @@ channel_cleanup(Pid) -> all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. terminate_channels() -> - NChannels = length([exit(Pid, normal) || Pid <- all_channels()]), + NChannels = length([exit(Pid, shutdown) || Pid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -423,7 +423,7 @@ wait_for_channel_termination(N, TimerRef) -> exit({abnormal_dependent_exit, Pid, Reason}); Channel -> case Reason of - normal -> ok; + shutdown -> ok; _ -> rabbit_log:error( "connection ~p, channel ~p - " diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7fbbc1ea..a8996e43 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -828,6 +828,8 @@ test_server_status() -> %% cleanup [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], + + unlink(Ch), ok = rabbit_channel:shutdown(Ch), passed. @@ -919,23 +921,23 @@ test_memory_pressure_spawn() -> Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), - MRef = erlang:monitor(process, Ch), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) end, - {Writer, Ch, MRef}. + {Writer, Ch}. -expect_normal_channel_termination(MRef, Ch) -> - receive {'DOWN', MRef, process, Ch, normal} -> ok +expect_normal_channel_termination(Ch) -> + receive {'EXIT', Ch, shutdown} -> ok after 1000 -> throw(channel_failed_to_exit) end. test_memory_pressure() -> - {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), + OldTrap = process_flag(trap_exit, true), + {Writer0, Ch0} = test_memory_pressure_spawn(), [ok = rabbit_channel:conserve_memory(Ch0, Conserve) || Conserve <- [false, false, true, false, true, true, false]], ok = test_memory_pressure_sync(Ch0, Writer0), - receive {'DOWN', MRef0, process, Ch0, Info0} -> + receive {'EXIT', Ch0, Info0} -> throw({channel_died_early, Info0}) after 0 -> ok end, @@ -951,9 +953,9 @@ test_memory_pressure() -> %% if we publish at this point, the channel should die Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), - expect_normal_channel_termination(MRef0, Ch0), + expect_normal_channel_termination(Ch0), - {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), + {Writer1, Ch1} = test_memory_pressure_spawn(), ok = rabbit_channel:conserve_memory(Ch1, true), ok = test_memory_pressure_receive_flow(false), ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), @@ -962,16 +964,16 @@ test_memory_pressure() -> ok = test_memory_pressure_receive_flow(true), %% send back the wrong flow_ok. Channel should die. ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - expect_normal_channel_termination(MRef1, Ch1), + expect_normal_channel_termination(Ch1), - {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), + {_Writer2, Ch2} = test_memory_pressure_spawn(), %% just out of the blue, send a flow_ok. Life should end. ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), - expect_normal_channel_termination(MRef2, Ch2), + expect_normal_channel_termination(Ch2), - {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), + {_Writer3, Ch3} = test_memory_pressure_spawn(), ok = rabbit_channel:conserve_memory(Ch3, true), - receive {'DOWN', MRef3, process, Ch3, _} -> + receive {'EXIT', Ch3, shutdown} -> ok after 12000 -> throw(channel_failed_to_exit) @@ -983,7 +985,6 @@ test_memory_pressure() -> Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), - MRef4 = erlang:monitor(process, Ch4), Writer4 ! sync, receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) @@ -996,8 +997,9 @@ test_memory_pressure() -> after 1000 -> throw(failed_to_receive_channel_open_ok) end, rabbit_channel:shutdown(Ch4), - expect_normal_channel_termination(MRef4, Ch4), + expect_normal_channel_termination(Ch4), + true = process_flag(trap_exit, OldTrap), passed. test_delegates_async(SecondaryNode) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 233d7291..bf6e9bdf 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -121,7 +121,7 @@ handle_message({inet_reply, _, ok}, State) -> handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); handle_message(shutdown, _State) -> - exit(normal); + exit(shutdown); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). -- cgit v1.2.1 -- cgit v1.2.1 From 602dd321af9badc83d930785d6dc5d6e9bbbe518 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 6 Jul 2010 14:43:51 +0100 Subject: Now we have the reader, heartbeaters and queue_collector all under the same connection_supervisor, with one connection_supervisor per connection. Tests pass. The framing_channel, channel, writer and limiter are all still just linked as before, without supervision --- src/rabbit_connection_sup.erl | 52 ++++++++++++++++++++++++++++ src/rabbit_heartbeat.erl | 65 ++++++++++++++++++++++------------- src/rabbit_networking.erl | 20 ++++++----- src/rabbit_reader.erl | 64 +++++++++++++++++----------------- src/rabbit_reader_queue_collector.erl | 2 +- src/tcp_client_sup.erl | 10 +++--- 6 files changed, 145 insertions(+), 68 deletions(-) create mode 100644 src/rabbit_connection_sup.erl diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl new file mode 100644 index 00000000..db9dc3c9 --- /dev/null +++ b/src/rabbit_connection_sup.erl @@ -0,0 +1,52 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_connection_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-include("rabbit.hrl"). + +start_link() -> + supervisor:start_link(?MODULE, []). + +init([]) -> + {ok, {{one_for_all, 0, 1}, + [{reader, {rabbit_reader, start_link, []}, + permanent, ?MAX_WAIT, worker, [rabbit_reader]}, + {collector, {rabbit_reader_queue_collector, start_link, []}, + permanent, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]} + ]}}. + diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 45565705..f4df128d 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,33 +31,53 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2]). +-include("rabbit.hrl"). -start_heartbeat(_Sock, 0) -> +-export([start_heartbeat/3, + start_heartbeat_sender/2, + start_heartbeat_receiver/3]). + +start_heartbeat(_Sup, _Sock, 0) -> none; -start_heartbeat(Sock, TimeoutSec) -> +start_heartbeat(Sup, Sock, TimeoutSec) -> Parent = self(), - %% we check for incoming data every interval, and time out after - %% two checks with no change. As a result we will time out between - %% 2 and 3 intervals after the last data has been received. - spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000, - recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end, - erlang:monitor(process, Parent)) end), + {ok, _Sender} = + supervisor:start_child( + Sup, {heartbeat_sender, + {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]}, + permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, _Receiver} = + supervisor:start_child( + Sup, {heartbeat_receiver, + {?MODULE, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]}, + permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + ok. + +start_heartbeat_sender(Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, - send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end, - erlang:monitor(process, Parent)) end), - ok. + {ok, proc_lib:spawn_link( + fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, + send_oct, 0, + fun () -> + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end) + end)}. + +start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> + %% we check for incoming data every interval, and time out after + %% two checks with no change. As a result we will time out between + %% 2 and 3 intervals after the last data has been received. + {ok, proc_lib:spawn_link( + fun () -> heartbeater(Sock, TimeoutSec * 1000, + recv_oct, 1, + fun () -> + Parent ! timeout, + stop + end) + end)}. %% Y-combinator, posted by Vladimir Sekissov to the Erlang mailing list %% http://www.erlang.org/ml-archive/erlang-questions/200301/msg00053.html @@ -65,12 +85,11 @@ y(X) -> F = fun (P) -> X(fun (A) -> (P(P))(A) end) end, F(F). -heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) -> +heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler) -> Heartbeat = fun (F) -> fun ({StatVal, SameCount}) -> receive - {'DOWN', MonitorRef, process, _Object, _Info} -> ok; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 68ffc98a..6502c6d1 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -114,7 +114,7 @@ start() -> {rabbit_tcp_client_sup, {tcp_client_sup, start_link, [{local, rabbit_tcp_client_sup}, - {rabbit_reader,start_link,[]}]}, + {rabbit_connection_sup,start_link,[]}]}, transient, infinity, supervisor, [tcp_client_sup]}), ok. @@ -201,9 +201,12 @@ on_node_down(Node) -> start_client(Sock, SockTransform) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock, SockTransform}, - Child. + hd([begin + ok = rabbit_net:controlling_process(Sock, Reader), + Reader ! {go, Sock, SockTransform}, + Reader + end || {reader, Reader, worker, [rabbit_reader]} + <- supervisor:which_children(Child)]). start_client(Sock) -> start_client(Sock, fun (S) -> {ok, S} end). @@ -226,8 +229,10 @@ start_ssl_client(SslOpts, Sock) -> end). connections() -> - [Pid || {_, Pid, _, _} <- supervisor:which_children( - rabbit_tcp_client_sup)]. + [Pid || {_, ConnSup, supervisor, _} + <- supervisor:which_children(rabbit_tcp_client_sup), + {reader, Pid, worker, [rabbit_reader]} + <- supervisor:which_children(ConnSup)]. connection_info_keys() -> rabbit_reader:info_keys(). @@ -238,8 +243,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). close_connection(Pid, Explanation) -> - case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end, - supervisor:which_children(rabbit_tcp_client_sup)) of + case lists:member(Pid, connections()) of true -> rabbit_reader:shutdown(Pid, Explanation); false -> throw({error, {not_a_connection_pid, Pid}}) end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 0a48e61a..16ec4263 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -37,7 +37,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/3]). +-export([init/1, mainloop/2]). -export([server_properties/0]). @@ -57,7 +57,7 @@ %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_ref, connection_state, +-record(v1, {parent, sock, connection, callback, recv_ref, connection_state, queue_collector}). -define(INFO_KEYS, @@ -161,8 +161,8 @@ init(Parent) -> start_connection(Parent, Deb, Sock, SockTransform) end. -system_continue(Parent, Deb, State) -> - ?MODULE:mainloop(Parent, Deb, State). +system_continue(_Parent, Deb, State) -> + ?MODULE:mainloop(Deb, State). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -240,21 +240,24 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - {ok, Collector} = rabbit_reader_queue_collector:start_link(), + [Collector] = + [Pid || {collector, Pid, worker, [rabbit_reader_queue_collector]} + <- supervisor:which_children(Parent)], try - mainloop(Parent, Deb, switch_callback( - #v1{sock = ClientSock, - connection = #connection{ - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none}, - callback = uninitialized_callback, - recv_ref = none, - connection_state = pre_init, - queue_collector = Collector}, - handshake, 8)) + mainloop(Deb, switch_callback( + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none}, + callback = uninitialized_callback, + recv_ref = none, + connection_state = pre_init, + queue_collector = Collector}, + handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; @@ -271,20 +274,18 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue), - rabbit_reader_queue_collector:shutdown(Collector), - rabbit_misc:unlink_and_capture_exit(Collector) + teardown_profiling(ProfilingValue) end, exit(shutdown). -mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> +mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), receive {inet_async, Sock, Ref, {ok, Data}} -> {State1, Callback1, Length1} = handle_input(State#v1.callback, Data, State#v1{recv_ref = none}), - mainloop(Parent, Deb, + mainloop(Deb, switch_callback(State1, Callback1, Length1)); {inet_async, Sock, Ref, {error, closed}} -> if State#v1.connection_state =:= closed -> @@ -309,16 +310,16 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> - mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); + mainloop(Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> - mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); + mainloop(Deb, handle_dependent_exit(Pid, Reason, State)); terminate_connection -> State; handshake_timeout -> if State#v1.connection_state =:= running orelse State#v1.connection_state =:= closing orelse State#v1.connection_state =:= closed -> - mainloop(Parent, Deb, State); + mainloop(Deb, State); true -> throw({handshake_timeout, State#v1.callback}) end; @@ -329,16 +330,16 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> gen_server:reply(From, ok), case ForceTermination of force -> ok; - normal -> mainloop(Parent, Deb, NewState) + normal -> mainloop(Deb, NewState) end; {'$gen_call', From, info} -> gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Parent, Deb, State); + mainloop(Deb, State); {'$gen_call', From, {info, Items}} -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), - mainloop(Parent, Deb, State); + mainloop(Deb, State); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -626,7 +627,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, client_properties = ClientProperties}}; handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, - State = #v1{connection_state = tuning, + State = #v1{parent = Parent, + connection_state = tuning, connection = Connection, sock = Sock}) -> if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> @@ -638,7 +640,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + rabbit_heartbeat:start_heartbeat(Parent, Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl index 8d4e8fdb..cb7a510e 100644 --- a/src/rabbit_reader_queue_collector.erl +++ b/src/rabbit_reader_queue_collector.erl @@ -91,7 +91,7 @@ handle_call(delete_all, _From, {reply, ok, State}; handle_call(shutdown, _From, State) -> - {stop, normal, ok, State}. + {stop, shutdown, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl index 1b785843..02d7e0e4 100644 --- a/src/tcp_client_sup.erl +++ b/src/tcp_client_sup.erl @@ -31,19 +31,19 @@ -module(tcp_client_sup). --behaviour(supervisor). +-behaviour(supervisor2). -export([start_link/1, start_link/2]). -export([init/1]). start_link(Callback) -> - supervisor:start_link(?MODULE, Callback). + supervisor2:start_link(?MODULE, Callback). start_link(SupName, Callback) -> - supervisor:start_link(SupName, ?MODULE, Callback). + supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one, 10, 10}, + {ok, {{simple_one_for_one_terminate, 10, 10}, [{tcp_client, {M,F,A}, - temporary, brutal_kill, worker, [M]}]}}. + temporary, infinity, supervisor, [M]}]}}. -- cgit v1.2.1 From 4c5794188c5e046ad6141573b52a323ee3aca989 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 6 Jul 2010 15:13:28 +0100 Subject: The heartbeater doesn't need to signal the reader in the event of a timeout - it should just exit abnormally and rely on the supervisor to tear everything down, esp seeing as the reader previously was just throwing the timeout exception. --- src/rabbit_heartbeat.erl | 12 ++++-------- src/rabbit_reader.erl | 2 -- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index f4df128d..bca27dd7 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -35,12 +35,11 @@ -export([start_heartbeat/3, start_heartbeat_sender/2, - start_heartbeat_receiver/3]). + start_heartbeat_receiver/2]). start_heartbeat(_Sup, _Sock, 0) -> none; start_heartbeat(Sup, Sock, TimeoutSec) -> - Parent = self(), {ok, _Sender} = supervisor:start_child( Sup, {heartbeat_sender, @@ -49,7 +48,7 @@ start_heartbeat(Sup, Sock, TimeoutSec) -> {ok, _Receiver} = supervisor:start_child( Sup, {heartbeat_receiver, - {?MODULE, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]}, + {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]}, permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), ok. @@ -66,17 +65,14 @@ start_heartbeat_sender(Sock, TimeoutSec) -> end) end)}. -start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> +start_heartbeat_receiver(Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. {ok, proc_lib:spawn_link( fun () -> heartbeater(Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end) + fun () -> exit(timeout) end) end)}. %% Y-combinator, posted by Vladimir Sekissov to the Erlang mailing list diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 16ec4263..761c97b6 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -323,8 +323,6 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> true -> throw({handshake_timeout, State#v1.callback}) end; - timeout -> - throw({timeout, State#v1.connection_state}); {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), -- cgit v1.2.1 From 39720759ee8778ceeb4fdc85f5d0bbc56ee7244c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 6 Jul 2010 16:09:23 +0100 Subject: Added ability to stop supervisor2, which means children only need to be transient, not permanent, as we're no longer relying on reached_max_restart_intensity to nuke out the supervisor. However, we still don't quite have the error-free shutdown because supervisor still complains if it tries to stop a child and finds the child already dead. --- src/rabbit_connection_sup.erl | 14 ++++++++------ src/rabbit_heartbeat.erl | 4 ++-- src/rabbit_reader.erl | 7 ++++--- src/supervisor2.erl | 7 ++++++- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index db9dc3c9..f6c4de2a 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -31,22 +31,24 @@ -module(rabbit_connection_sup). --behaviour(supervisor). +-behaviour(supervisor2). --export([start_link/0]). +-export([start_link/0, stop/1]). -export([init/1]). -include("rabbit.hrl"). start_link() -> - supervisor:start_link(?MODULE, []). + supervisor2:start_link(?MODULE, []). + +stop(Pid) -> + supervisor2:stop(Pid). init([]) -> {ok, {{one_for_all, 0, 1}, [{reader, {rabbit_reader, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_reader]}, + transient, ?MAX_WAIT, worker, [rabbit_reader]}, {collector, {rabbit_reader_queue_collector, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]} + transient, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]} ]}}. - diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index bca27dd7..7da17071 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -44,12 +44,12 @@ start_heartbeat(Sup, Sock, TimeoutSec) -> supervisor:start_child( Sup, {heartbeat_sender, {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]}, - permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {ok, _Receiver} = supervisor:start_child( Sup, {heartbeat_receiver, {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]}, - permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), ok. start_heartbeat_sender(Sock, TimeoutSec) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 761c97b6..d789c15e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -274,9 +274,10 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue) - end, - exit(shutdown). + teardown_profiling(ProfilingValue), + rabbit_connection_sup:stop(Parent), + exit(shutdown) + end. mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 0b1d7265..32cd9370 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -36,7 +36,7 @@ start_child/2, restart_child/2, delete_child/2, terminate_child/2, which_children/1, - check_childspecs/1]). + check_childspecs/1, stop/1]). -export([behaviour_info/1]). @@ -119,6 +119,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. +stop(Supervisor) -> + gen_server:cast(Supervisor, stop). + %%% --------------------------------------------------- %%% %%% Initialize the supervisor. @@ -314,6 +317,8 @@ handle_call(which_children, _From, State) -> State#state.children), {reply, Resp, State}. +handle_cast(stop, State) -> + {stop, shutdown, State}; %%% Hopefully cause a function-clause as there is no API function %%% that utilizes cast. -- cgit v1.2.1 From fe2b74cdfce74e7ccc33e287080a7cb500978e1f Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 6 Jul 2010 16:12:46 +0100 Subject: Ensure that a supervisor, when attempting to stop a process that turns out to be already stopped with either normal or shutdown, does not winge --- src/supervisor2.erl | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 32cd9370..5f1ec54c 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -614,6 +614,8 @@ shutdown(Pid, brutal_kill) -> {'DOWN', _MRef, process, Pid, OtherReason} -> {error, OtherReason} end; + normal_shutdown -> + ok; {error, Reason} -> {error, Reason} end; @@ -635,6 +637,8 @@ shutdown(Pid, Time) -> {error, OtherReason} end end; + normal_shutdown -> + ok; {error, Reason} -> {error, Reason} end. @@ -655,7 +659,12 @@ monitor_child(Pid) -> {'EXIT', Pid, Reason} -> receive {'DOWN', _, process, Pid, _} -> - {error, Reason} + case Reason of + normal -> normal_shutdown; + shutdown -> normal_shutdown; + {shutdown, _Terms} -> normal_shutdown; + _ -> {error, Reason} + end end after 0 -> %% If a naughty child did unlink and the child dies before -- cgit v1.2.1 From 753bdcd0ce88868fb723e792d91c9fdd59ea3c9b Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 6 Jul 2010 18:31:56 +0100 Subject: And now the channel, writer, limiter and framing_channel are also all suitably supervisored --- src/gen_server2.erl | 2 +- src/rabbit_channel.erl | 79 ++++++++++++++++++++++++------------------ src/rabbit_channel_sup.erl | 66 +++++++++++++++++++++++++++++++++++ src/rabbit_channel_sup_sup.erl | 49 ++++++++++++++++++++++++++ src/rabbit_connection_sup.erl | 14 ++++++-- src/rabbit_framing_channel.erl | 20 +++-------- src/rabbit_limiter.erl | 5 ++- src/rabbit_networking.erl | 17 ++++----- src/rabbit_reader.erl | 13 ++++--- src/rabbit_writer.erl | 20 ++++++----- src/supervisor2.erl | 6 +++- 11 files changed, 213 insertions(+), 78 deletions(-) create mode 100644 src/rabbit_channel_sup.erl create mode 100644 src/rabbit_channel_sup_sup.erl diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 547f0a42..32a8d0a1 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -164,7 +164,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). + enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]). -export([behaviour_info/1]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 692d7bac..b91391eb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/6, do/2, do/3, shutdown/1]). +-export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). @@ -44,7 +44,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, @@ -73,8 +73,8 @@ -type(ref() :: any()). --spec(start_link/6 :: - (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). +-spec(start_link/5 :: + (channel_number(), pid(), username(), vhost(), pid()) -> pid()). -spec(do/2 :: (pid(), amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -94,11 +94,18 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - {ok, Pid} = gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost, CollectorPid], []), - Pid. +start_link(Channel, ReaderPid, Username, VHost, CollectorPid) -> + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> + WriterPid = rabbit_channel_sup:writer(Parent), + State = init([Channel, Parent, ReaderPid, WriterPid, + Username, VHost, CollectorPid]), + gen_server2:enter_loop( + ?MODULE, [], State, self(), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}) + end)}. do(Pid, Method) -> do(Pid, Method, none). @@ -146,30 +153,30 @@ info_all(Items) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> +init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, + CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), - {ok, #ch{state = starting, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid, - flow = #flow{server = true, client = true, - pending = none}}, - hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + #ch{state = starting, + channel = Channel, + parent_pid = ParentPid, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + flow = #flow{server = true, client = true, + pending = none}}. handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -208,7 +215,8 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; -handle_cast(terminate, State) -> +handle_cast(terminate, State = #ch{parent_pid = ParentPid}) -> + supervisor2:stop(ParentPid), {stop, shutdown, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> @@ -1021,8 +1029,13 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ}) -> - LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), +start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> + Me = self(), + {ok, LPid} = + supervisor2:start_child( + ParentPid, + {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}), ok = limit_queues(LPid, State), LPid. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl new file mode 100644 index 00000000..f8a7a7c6 --- /dev/null +++ b/src/rabbit_channel_sup.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup). + +-behaviour(supervisor2). + +-export([start_link/7, writer/1, framing_channel/1, channel/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) -> + supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid, + Username, VHost, Collector]). + +init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> + {ok, {{one_for_all, 0, 1}, + [{channel, {rabbit_channel, start_link, + [Channel, ReaderPid, Username, VHost, Collector]}, + permanent, ?MAX_WAIT, worker, [rabbit_channel]}, + {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, + permanent, ?MAX_WAIT, worker, [rabbit_writer]}, + {framing_channel, {rabbit_framing_channel, start_link, []}, + permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]} + ]}}. + +writer(Pid) -> + hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])). + +channel(Pid) -> + hd(supervisor2:find_child(Pid, channel, worker, [rabbit_channel])). + +framing_channel(Pid) -> + hd(supervisor2:find_child(Pid, framing_channel, worker, + [rabbit_framing_channel])). + diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl new file mode 100644 index 00000000..42064709 --- /dev/null +++ b/src/rabbit_channel_sup_sup.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup_sup). + +-behaviour(supervisor2). + +-export([start_link/0, start_channel/2]). + +-export([init/1]). + +start_link() -> + supervisor2:start_link(?MODULE, []). + +init([]) -> + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{channel_sup, {rabbit_channel_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_channel_sup]}]}}. + +start_channel(Pid, Args) -> + supervisor2:start_child(Pid, Args). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index f6c4de2a..cafe9612 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/0, stop/1]). +-export([start_link/0, stop/1, reader/1, channel_sup_sup/1]). -export([init/1]). @@ -50,5 +50,15 @@ init([]) -> [{reader, {rabbit_reader, start_link, []}, transient, ?MAX_WAIT, worker, [rabbit_reader]}, {collector, {rabbit_reader_queue_collector, start_link, []}, - transient, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]} + transient, ?MAX_WAIT, worker, [rabbit_reader_queue_collector]}, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_channel_sup_sup]} ]}}. + +reader(Pid) -> + hd(supervisor2:find_child(Pid, reader, worker, [rabbit_reader])). + +channel_sup_sup(Pid) -> + hd(supervisor2:find_child(Pid, channel_sup_sup, supervisor, + [rabbit_channel_sup_sup])). + diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index bc1a2a08..d07d871b 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,21 +32,17 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/2, process/2, shutdown/1]). +-export([start_link/0, process/2, shutdown/1]). %% internal -export([mainloop/1]). %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs) -> - spawn_link( - fun () -> - %% we trap exits so that a normal termination of the - %% channel or reader process terminates us too. - process_flag(trap_exit, true), - mainloop(apply(StartFun, StartArgs)) - end). +start_link() -> + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> mainloop(rabbit_channel_sup:channel(Parent)) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -60,12 +56,6 @@ shutdown(Pid) -> read_frame(ChannelPid) -> receive - %% converting the exit signal into one of our own ensures that - %% the reader sees the right pid (i.e. ours) when a channel - %% exits. Similarly in the other direction, though it is not - %% really relevant there since the channel is not specifically - %% watching out for reader exit signals. - {'EXIT', _Pid, Reason} -> exit(Reason); {frame, Frame} -> Frame; terminate -> rabbit_channel:shutdown(ChannelPid), read_frame(ChannelPid); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 878af029..491ae7d6 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -45,7 +45,7 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> {'ok', pid()}). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). @@ -74,8 +74,7 @@ %%---------------------------------------------------------------------------- start_link(ChPid, UnackedMsgCount) -> - {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []), - Pid. + gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). shutdown(undefined) -> ok; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 6502c6d1..e7094640 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -201,12 +201,10 @@ on_node_down(Node) -> start_client(Sock, SockTransform) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - hd([begin - ok = rabbit_net:controlling_process(Sock, Reader), - Reader ! {go, Sock, SockTransform}, - Reader - end || {reader, Reader, worker, [rabbit_reader]} - <- supervisor:which_children(Child)]). + Reader = rabbit_connection_sup:reader(Child), + ok = rabbit_net:controlling_process(Sock, Reader), + Reader ! {go, Sock, SockTransform}, + Reader. start_client(Sock) -> start_client(Sock, fun (S) -> {ok, S} end). @@ -229,10 +227,9 @@ start_ssl_client(SslOpts, Sock) -> end). connections() -> - [Pid || {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup), - {reader, Pid, worker, [rabbit_reader]} - <- supervisor:which_children(ConnSup)]. + [rabbit_connection_sup:reader(ConnSup) || + {_, ConnSup, supervisor, _} + <- supervisor:which_children(rabbit_tcp_client_sup)]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d789c15e..16cf40ee 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -767,15 +767,18 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector}) -> + State = #v1{queue_collector = Collector, parent = Parent}) -> #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector]), + ChanSupSup = rabbit_connection_sup:channel_sup_sup(Parent), + {ok, ChanSup} = rabbit_channel_sup_sup:start_channel( + ChanSupSup, + [Sock, Channel, FrameMax, self(), + Username, VHost, Collector]), + ChPid = rabbit_channel_sup:framing_channel(ChanSup), + link(ChPid), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index bf6e9bdf..ff5ef9d1 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -48,8 +48,10 @@ -ifdef(use_specs). --spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> + {'ok', pid()}). +-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> + {'ok', pid()}). -spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). -spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). -spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). @@ -68,14 +70,16 @@ %%---------------------------------------------------------------------------- start(Sock, Channel, FrameMax) -> - spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). + {ok, + proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}])}. start_link(Sock, Channel, FrameMax) -> - spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). + {ok, + proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}])}. mainloop(State) -> receive diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 5f1ec54c..136a6533 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -35,7 +35,7 @@ -export([start_link/2,start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, + which_children/1, find_child/4, check_childspecs/1, stop/1]). -export([behaviour_info/1]). @@ -109,6 +109,10 @@ terminate_child(Supervisor, Name) -> which_children(Supervisor) -> call(Supervisor, which_children). +find_child(Supervisor, Name, Type, Modules) -> + [Pid || {Name1, Pid, Type1, Modules1} <- which_children(Supervisor), + Name1 =:= Name, Type1 =:= Type, Modules1 =:= Modules]. + call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). -- cgit v1.2.1 From 9bf9b7687f8fa34b1d0a1c8ff22c479772c88d86 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 21 Jul 2010 13:59:59 +0100 Subject: Add framing_channel:start_link/1 for use by Erlang client --- src/rabbit_framing_channel.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index d07d871b..b1e42e59 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,7 +32,7 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/0, process/2, shutdown/1]). +-export([start_link/0, start_link/1, process/2, shutdown/1]). %% internal -export([mainloop/1]). @@ -40,9 +40,10 @@ %%-------------------------------------------------------------------- start_link() -> - Parent = self(), - {ok, proc_lib:spawn_link( - fun () -> mainloop(rabbit_channel_sup:channel(Parent)) end)}. + start_link(rabbit_channel_sup:channel(self())). + +start_link(ChannelPid) -> + {ok, proc_lib:spawn_link(fun() -> mainloop(ChannelPid) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, -- cgit v1.2.1 From 1b06255f672c238471a23d5b7bfa02d96e098189 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 21 Jul 2010 14:38:45 +0100 Subject: Whoops - revert to avoid deadlock with supervisor trying to call itself during child creation --- src/rabbit_framing_channel.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index b1e42e59..2e9f02a3 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -40,7 +40,9 @@ %%-------------------------------------------------------------------- start_link() -> - start_link(rabbit_channel_sup:channel(self())). + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> mainloop(rabbit_channel_sup:channel(Parent)) end)}. start_link(ChannelPid) -> {ok, proc_lib:spawn_link(fun() -> mainloop(ChannelPid) end)}. -- cgit v1.2.1 From a33e2ee12debebf5a512e9c931ef80411e5c5c3a Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 21 Jul 2010 15:10:12 +0100 Subject: Well the rabbit_tests now pass owing to being able to support a very similar API to previously wrt channel, but shutdown seems to be sporadically successful at best. --- src/rabbit_channel.erl | 26 ++++++++++++++++++-------- src/rabbit_channel_sup.erl | 41 +++++++++++++++++++++++++++++------------ src/rabbit_heartbeat.erl | 6 ++---- src/rabbit_reader.erl | 8 ++++++++ src/rabbit_tests.erl | 12 ++++++------ src/rabbit_writer.erl | 6 ++---- 6 files changed, 65 insertions(+), 34 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 600a2672..e043492a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/5, do/2, do/3, shutdown/1]). +-export([start_link/5, start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). @@ -78,7 +78,10 @@ -spec(start_link/5 :: (channel_number(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> {'ok', pid()}). + rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). +-spec(start_link/6 :: + (channel_number(), pid(), pid(), rabbit_access_control:username(), + rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -106,14 +109,16 @@ start_link(Channel, ReaderPid, Username, VHost, CollectorPid) -> {ok, proc_lib:spawn_link( fun () -> WriterPid = rabbit_channel_sup:writer(Parent), - State = init([Channel, Parent, ReaderPid, WriterPid, - Username, VHost, CollectorPid]), - gen_server2:enter_loop( - ?MODULE, [], State, self(), hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}) + init_and_go([Channel, Parent, ReaderPid, WriterPid, + Username, VHost, CollectorPid]) end)}. +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> init_and_go([Channel, Parent, ReaderPid, WriterPid, + Username, VHost, CollectorPid]) end)}. + do(Pid, Method) -> do(Pid, Method, none). @@ -185,6 +190,11 @@ init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, flow = #flow{server = true, client = true, pending = none}}. +init_and_go(InitArgs) -> + gen_server2:enter_loop(?MODULE, [], init(InitArgs), self(), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}). + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index f8a7a7c6..0e716b48 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,26 +33,31 @@ -behaviour(supervisor2). --export([start_link/7, writer/1, framing_channel/1, channel/1]). +-export([start_link/7, stop/1, writer/1, framing_channel/1, channel/1]). -export([init/1]). -include("rabbit.hrl"). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/7 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), pid(), rabbit_access_control:username(), + rabbit_types:vhost(), pid()) -> + ignore | rabbit_types:ok_or_error2(pid(), any())). + +-endif. + +%%---------------------------------------------------------------------------- + start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) -> supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]). - -init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> - {ok, {{one_for_all, 0, 1}, - [{channel, {rabbit_channel, start_link, - [Channel, ReaderPid, Username, VHost, Collector]}, - permanent, ?MAX_WAIT, worker, [rabbit_channel]}, - {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, - permanent, ?MAX_WAIT, worker, [rabbit_writer]}, - {framing_channel, {rabbit_framing_channel, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]} - ]}}. +stop(Pid) -> + supervisor2:stop(Pid). writer(Pid) -> hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])). @@ -64,3 +69,15 @@ framing_channel(Pid) -> hd(supervisor2:find_child(Pid, framing_channel, worker, [rabbit_framing_channel])). +%%---------------------------------------------------------------------------- + +init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> + {ok, {{one_for_all, 0, 1}, + [{channel, {rabbit_channel, start_link, + [Channel, ReaderPid, Username, VHost, Collector]}, + permanent, ?MAX_WAIT, worker, [rabbit_channel]}, + {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, + permanent, ?MAX_WAIT, worker, [rabbit_writer]}, + {framing_channel, {rabbit_framing_channel, start_link, []}, + permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]} + ]}}. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 7da17071..b7c73ae7 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -95,10 +95,8 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler) -> SameCount < Threshold -> F({NewStatVal, SameCount + 1}); true -> - case Handler() of - stop -> ok; - continue -> F({NewStatVal, 0}) - end + continue = Handler(), + F({NewStatVal, 0}) end; {error, einval} -> %% the socket is dead, most diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ca38b6ab..542ef32c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -144,6 +144,14 @@ -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). +%% These specs only exists to add no_return() to keep dialyzer happy +-spec(init/1 :: (pid()) -> no_return()). +-spec(start_connection/4 :: + (pid(), any(), rabbit_networking:socket(), + fun ((rabbit_networking:socket()) -> + rabbit_types:ok_or_error2( + rabbit_networking:socket(), any()))) -> no_return()). + -endif. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 969f33b8..46ba0b53 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -794,8 +794,8 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -926,8 +926,8 @@ test_memory_pressure_sync(Ch, Writer) -> test_memory_pressure_spawn() -> Me = self(), Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) @@ -990,8 +990,8 @@ test_memory_pressure() -> alarm_handler:set_alarm({vm_memory_high_watermark, []}), Me = self(), Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, - self()), + {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, + <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), Writer4 ! sync, receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index a5355c58..581ea428 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -50,12 +50,10 @@ -spec(start/3 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) - -> {'ok', pid()}). + non_neg_integer()) -> rabbit_types:ok(pid())). -spec(start_link/3 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) - -> {'ok', pid()}). + non_neg_integer()) -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: -- cgit v1.2.1 From 33bae0576d6783067286c0363faa9e0f3d227193 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 21 Jul 2010 15:29:42 +0100 Subject: Correct shutdown to use API --- src/rabbit_channel.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e043492a..9cfc9e27 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -233,7 +233,7 @@ handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; handle_cast(terminate, State = #ch{parent_pid = ParentPid}) -> - supervisor2:stop(ParentPid), + rabbit_channel_sup:stop(ParentPid), {stop, shutdown, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> -- cgit v1.2.1 From ec5bd9bf2b447d322a7b37d354de8895c9202e0d Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 21 Jul 2010 16:46:41 +0100 Subject: Fixed channel shutdown mechanisms --- src/rabbit_channel.erl | 7 +++---- src/rabbit_channel_sup.erl | 10 +++++----- src/rabbit_channel_sup_sup.erl | 2 +- src/rabbit_connection_sup.erl | 6 +++--- src/rabbit_reader.erl | 1 - src/rabbit_writer.erl | 16 +++++++++------- 6 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9cfc9e27..5757d9f3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -232,8 +232,7 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; -handle_cast(terminate, State = #ch{parent_pid = ParentPid}) -> - rabbit_channel_sup:stop(ParentPid), +handle_cast(terminate, State) -> {stop, shutdown, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> @@ -1130,8 +1129,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + rabbit_limiter:shutdown(LimiterPid), + rabbit_writer:flush(WriterPid). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 0e716b48..7a7c7b79 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -73,11 +73,11 @@ framing_channel(Pid) -> init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> {ok, {{one_for_all, 0, 1}, - [{channel, {rabbit_channel, start_link, - [Channel, ReaderPid, Username, VHost, Collector]}, - permanent, ?MAX_WAIT, worker, [rabbit_channel]}, + [{framing_channel, {rabbit_framing_channel, start_link, []}, + permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}, {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, permanent, ?MAX_WAIT, worker, [rabbit_writer]}, - {framing_channel, {rabbit_framing_channel, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]} + {channel, {rabbit_channel, start_link, + [Channel, ReaderPid, Username, VHost, Collector]}, + permanent, ?MAX_WAIT, worker, [rabbit_channel]} ]}}. diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 42064709..2fab8678 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,7 +43,7 @@ start_link() -> init([]) -> {ok, {{simple_one_for_one_terminate, 0, 1}, [{channel_sup, {rabbit_channel_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_channel_sup]}]}}. + temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. start_channel(Pid, Args) -> supervisor2:start_child(Pid, Args). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index f6bd9826..53d086f0 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -48,11 +48,11 @@ stop(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, [{reader, {rabbit_reader, start_link, []}, - transient, ?MAX_WAIT, worker, [rabbit_reader]}, + permanent, ?MAX_WAIT, worker, [rabbit_reader]}, {collector, {rabbit_queue_collector, start_link, []}, - transient, ?MAX_WAIT, worker, [rabbit_queue_collector]}, + permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]}, {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_channel_sup_sup]} + permanent, infinity, supervisor, [rabbit_channel_sup_sup]} ]}}. reader(Pid) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 542ef32c..99d76b8a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -283,7 +283,6 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), - rabbit_connection_sup:stop(Parent), exit(shutdown) end. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 581ea428..16436bc0 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([start/3, start_link/3, flush/1, mainloop/1]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). -export([internal_send_command/3, internal_send_command/5]). @@ -77,6 +77,7 @@ rabbit_framing:amqp_method_record(), rabbit_types:content(), non_neg_integer()) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -endif. @@ -137,8 +138,9 @@ handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); -handle_message(shutdown, _State) -> - exit(shutdown); +handle_message({flush, Pid, Ref}, State) -> + Pid ! Ref, + State; handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). @@ -164,10 +166,10 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -shutdown(W) -> - W ! shutdown, - rabbit_misc:unlink_and_capture_exit(W), - ok. +flush(W) -> + Ref = make_ref(), + W ! {flush, self(), Ref}, + receive Ref -> ok end. %--------------------------------------------------------------------------- -- cgit v1.2.1 From 1f163036f49001bb05eee3825d7a59b7d30b48a8 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 27 Jul 2010 15:56:49 +0100 Subject: Correct tests by making the fake writer obey the same flush protocol as the genuine writer --- src/rabbit_tests.erl | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 63ae4fe8..9acc58c9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1043,8 +1043,8 @@ test_memory_pressure_receiver(Pid) -> end, Pid ! Method, test_memory_pressure_receiver(Pid); - sync -> - Pid ! sync, + {flush, Pid1, Ref} -> + Pid1 ! Ref, test_memory_pressure_receiver(Pid) end. @@ -1060,12 +1060,16 @@ test_memory_pressure_receive_flow(Active) -> test_memory_pressure_sync(Ch, Writer) -> ok = rabbit_channel:do(Ch, #'basic.qos'{}), - Writer ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + ok = test_memory_pressure_flush(Writer), receive #'basic.qos_ok'{} -> ok after 1000 -> throw(failed_to_receive_basic_qos_ok) end. +test_memory_pressure_flush(Writer) -> + Ref = make_ref(), + Writer ! {flush, self(), Ref}, + receive Ref -> ok after 1000 -> throw(failed_to_receive_writer_sync) end. + test_memory_pressure_spawn() -> Me = self(), Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), @@ -1077,9 +1081,9 @@ test_memory_pressure_spawn() -> end, {Writer, Ch}. -expect_normal_channel_termination(Ch) -> +expect_shutdown_channel_termination(Ch) -> receive {'EXIT', Ch, shutdown} -> ok - after 1000 -> throw(channel_failed_to_exit) + after 1000 -> throw({channel_failed_to_shutdown, Ch}) end. gobble_channel_exit() -> @@ -1109,7 +1113,7 @@ test_memory_pressure() -> %% if we publish at this point, the channel should die Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), - expect_normal_channel_termination(Ch0), + expect_shutdown_channel_termination(Ch0), gobble_channel_exit(), {Writer1, Ch1} = test_memory_pressure_spawn(), @@ -1121,13 +1125,13 @@ test_memory_pressure() -> ok = test_memory_pressure_receive_flow(true), %% send back the wrong flow_ok. Channel should die. ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - expect_normal_channel_termination(Ch1), + expect_shutdown_channel_termination(Ch1), gobble_channel_exit(), {_Writer2, Ch2} = test_memory_pressure_spawn(), %% just out of the blue, send a flow_ok. Life should end. ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), - expect_normal_channel_termination(Ch2), + expect_shutdown_channel_termination(Ch2), gobble_channel_exit(), {_Writer3, Ch3} = test_memory_pressure_spawn(), @@ -1146,19 +1150,17 @@ test_memory_pressure() -> {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), - Writer4 ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + ok = test_memory_pressure_flush(Writer4), receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) after 0 -> ok end, alarm_handler:clear_alarm(vm_memory_high_watermark), - Writer4 ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + ok = test_memory_pressure_flush(Writer4), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) end, rabbit_channel:shutdown(Ch4), - expect_normal_channel_termination(Ch4), + expect_shutdown_channel_termination(Ch4), true = process_flag(trap_exit, OldTrap), passed. -- cgit v1.2.1 From 3537756d0397431e4ffa043a48ba68db8d5f4f12 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 27 Jul 2010 17:01:03 +0100 Subject: Remove dead code and get supervisor2 to hide some error messages when MaxR is 0 --- src/rabbit_connection_sup.erl | 5 +---- src/supervisor2.erl | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 53d086f0..3dd81ef6 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/0, stop/1, reader/1, channel_sup_sup/1]). +-export([start_link/0, reader/1, channel_sup_sup/1]). -export([init/1]). @@ -42,9 +42,6 @@ start_link() -> supervisor2:start_link(?MODULE, []). -stop(Pid) -> - supervisor2:stop(Pid). - init([]) -> {ok, {{one_for_all, 0, 1}, [{reader, {rabbit_reader, start_link, []}, diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 046d1f02..be24e09b 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -8,6 +8,10 @@ %% simple_one_for_one, except that children *are* explicitly %% terminated as per the shutdown component of the child_spec. %% +%% 3) When the MaxR (intensity) == 0, errors that would otherwise be +%% reported concerning child death, or the reaching of max restart +%% intensity are elided. +%% %% All modifications are (C) 2010 LShift Ltd. %% %% %CopyrightBegin% @@ -490,7 +494,7 @@ restart_child(Pid, Reason, State) -> end. do_restart(permanent, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State#state.name), + report_error(child_terminated, Reason, Child, State), restart(Child, State); do_restart(_, normal, Child, State) -> NState = state_del_child(Child, State), @@ -499,10 +503,10 @@ do_restart(_, shutdown, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; do_restart(transient, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State#state.name), + report_error(child_terminated, Reason, Child, State), restart(Child, State); do_restart(temporary, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State#state.name), + report_error(child_terminated, Reason, Child, State), NState = state_del_child(Child, State), {ok, NState}. @@ -511,8 +515,8 @@ restart(Child, State) -> {ok, NState} -> restart(NState#state.strategy, Child, NState); {terminate, NState} -> - report_error(shutdown, reached_max_restart_intensity, - Child, State#state.name), + report_error(shutdown, reached_max_restart_intensity, + Child, State), {shutdown, remove_child(Child, NState)} end. @@ -913,6 +917,10 @@ difference({_, TimeS, _}, {_, CurS, _}) -> %%% Error and progress reporting. %%% ------------------------------------------------------ +report_error(_Error, _Reason, _Child, #state{intensity = 0}) -> + ok; +report_error(Error, Reason, Child, #state{name=Name}) -> + report_error(Error, Reason, Child, Name); report_error(Error, Reason, Child, SupName) -> ErrorMsg = [{supervisor, SupName}, {errorContext, Error}, -- cgit v1.2.1 From e3ebf90ce247f9addc61042faf9f4d3b9f2b2235 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 29 Jul 2010 13:50:46 +0100 Subject: Remove unnecessary stop code which was never being called --- src/rabbit_channel_sup.erl | 4 +--- src/supervisor2.erl | 8 +------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 7a7c7b79..0be7b566 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/7, stop/1, writer/1, framing_channel/1, channel/1]). +-export([start_link/7, writer/1, framing_channel/1, channel/1]). -export([init/1]). @@ -56,8 +56,6 @@ start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) -> supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]). -stop(Pid) -> - supervisor2:stop(Pid). writer(Pid) -> hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])). diff --git a/src/supervisor2.erl b/src/supervisor2.erl index be24e09b..55f16fee 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -40,7 +40,7 @@ start_child/2, restart_child/2, delete_child/2, terminate_child/2, which_children/1, find_child/4, - check_childspecs/1, stop/1]). + check_childspecs/1]). -export([behaviour_info/1]). @@ -127,9 +127,6 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. -stop(Supervisor) -> - gen_server:cast(Supervisor, stop). - %%% --------------------------------------------------- %%% %%% Initialize the supervisor. @@ -325,9 +322,6 @@ handle_call(which_children, _From, State) -> State#state.children), {reply, Resp, State}. -handle_cast(stop, State) -> - {stop, shutdown, State}; - %%% Hopefully cause a function-clause as there is no API function %%% that utilizes cast. handle_cast(null, State) -> -- cgit v1.2.1 From 3cc4df9f81d8ce35ef337ba8f4f8bc0c1fdaefb3 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 2 Aug 2010 11:57:38 +0100 Subject: Make the channel and framing_channel more flexible in terms of being created. This improves the API to reduce special casing needed by the erlang client --- src/rabbit_channel.erl | 19 +++++-------------- src/rabbit_channel_sup.erl | 7 +++++-- src/rabbit_framing_channel.erl | 11 +++-------- 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 373cb690..58cb82f8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/5, start_link/6, do/2, do/3, shutdown/1]). +-export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). @@ -75,12 +75,10 @@ -type(ref() :: any()). -type(channel_number() :: non_neg_integer()). +-type(pid_fun() :: fun (() -> pid())). --spec(start_link/5 :: - (channel_number(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). -spec(start_link/6 :: - (channel_number(), pid(), pid(), rabbit_access_control:username(), + (channel_number(), pid_fun(), pid_fun(), rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -104,21 +102,14 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, Username, VHost, CollectorPid) -> +start_link(Channel, GetReader, GetWriter, Username, VHost, CollectorPid) -> Parent = self(), {ok, proc_lib:spawn_link( fun () -> - WriterPid = rabbit_channel_sup:writer(Parent), - init_and_go([Channel, Parent, ReaderPid, WriterPid, + init_and_go([Channel, Parent, GetReader(), GetWriter(), Username, VHost, CollectorPid]) end)}. -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - Parent = self(), - {ok, proc_lib:spawn_link( - fun () -> init_and_go([Channel, Parent, ReaderPid, WriterPid, - Username, VHost, CollectorPid]) end)}. - do(Pid, Method) -> do(Pid, Method, none). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 0be7b566..eb7fab0d 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -70,12 +70,15 @@ framing_channel(Pid) -> %%---------------------------------------------------------------------------- init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> + Me = self(), {ok, {{one_for_all, 0, 1}, - [{framing_channel, {rabbit_framing_channel, start_link, []}, + [{framing_channel, {rabbit_framing_channel, start_link, + [fun () -> channel(Me) end]}, permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}, {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, permanent, ?MAX_WAIT, worker, [rabbit_writer]}, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, Username, VHost, Collector]}, + [Channel, fun () -> ReaderPid end, + fun () -> writer(Me) end, Username, VHost, Collector]}, permanent, ?MAX_WAIT, worker, [rabbit_channel]} ]}}. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 2e9f02a3..3a0d71f5 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,20 +32,15 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/0, start_link/1, process/2, shutdown/1]). +-export([start_link/1, process/2, shutdown/1]). %% internal -export([mainloop/1]). %%-------------------------------------------------------------------- -start_link() -> - Parent = self(), - {ok, proc_lib:spawn_link( - fun () -> mainloop(rabbit_channel_sup:channel(Parent)) end)}. - -start_link(ChannelPid) -> - {ok, proc_lib:spawn_link(fun() -> mainloop(ChannelPid) end)}. +start_link(GetChannelPid) -> + {ok, proc_lib:spawn_link(fun () -> mainloop(GetChannelPid()) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, -- cgit v1.2.1 From 39d6550069cb94ec25c6c180c6a7ddd4a5b20445 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 2 Aug 2010 15:11:11 +0100 Subject: Forgot to _ prefix --- src/supervisor2.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/supervisor2.erl b/src/supervisor2.erl index d9eb4d5b..7616e44d 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -138,7 +138,7 @@ which_children(Supervisor) -> call(Supervisor, which_children). find_child(Supervisor, Name) -> - [Pid || {Name1, Pid, Type1, Modules1} <- which_children(Supervisor), + [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor), Name1 =:= Name]. call(Supervisor, Req) -> -- cgit v1.2.1 From bb9b11a2eb4c76877b43296cd602cec43ca3ad99 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 3 Aug 2010 11:41:35 +0100 Subject: Don't dynamically start up the channel_sup_sup - it doesn't gain us much to predeclare it with the Protocol and it makes the children of the channel_sup_sup less obvious --- src/rabbit_channel_sup_sup.erl | 10 +++++----- src/rabbit_connection_sup.erl | 4 +++- src/rabbit_reader.erl | 17 ++++++----------- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index f608b724..2fab8678 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -33,16 +33,16 @@ -behaviour(supervisor2). --export([start_link/1, start_channel/2]). +-export([start_link/0, start_channel/2]). -export([init/1]). -start_link(Protocol) -> - supervisor2:start_link(?MODULE, [Protocol]). +start_link() -> + supervisor2:start_link(?MODULE, []). -init([Protocol]) -> +init([]) -> {ok, {{simple_one_for_one_terminate, 0, 1}, - [{channel_sup, {rabbit_channel_sup, start_link, [Protocol]}, + [{channel_sup, {rabbit_channel_sup, start_link, []}, temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. start_channel(Pid, Args) -> diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 5d05ca28..4ad9d3f0 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -44,7 +44,9 @@ start_link() -> init([]) -> {ok, {{one_for_all, 0, 1}, - [{reader, {rabbit_reader, start_link, []}, + [{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + permanent, infinity, supervisor, [rabbit_channel_sup_sup]}, + {reader, {rabbit_reader, start_link, []}, permanent, ?MAX_WAIT, worker, [rabbit_reader]}, {collector, {rabbit_queue_collector, start_link, []}, permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]} diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9c984c7a..24bde74d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -615,13 +615,8 @@ handle_input(Callback, Data, _State) -> %% includes a major and minor version number, Luckily 0-9 and 0-9-1 %% are similar enough that clients will be happy with either. start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, - Protocol, State = #v1{parent = Parent, sock = Sock, + Protocol, State = #v1{sock = Sock, connection = Connection}) -> - {ok, _Pid} = - supervisor:start_child( - Parent, {channel_sup_sup, - {rabbit_channel_sup_sup, start_link, [Protocol]}, - permanent, infinity, supervisor, [rabbit_channel_sup_sup]}), Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, server_properties = server_properties(), @@ -802,17 +797,17 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector, parent = Parent}) -> +send_to_new_channel(Channel, AnalyzedFrame, State = + #v1{queue_collector = Collector, parent = Parent}) -> #v1{sock = Sock, connection = #connection{ + protocol = Protocol, frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, ChanSupSup = rabbit_connection_sup:channel_sup_sup(Parent), {ok, ChanSup} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, - [Sock, Channel, FrameMax, self(), - Username, VHost, Collector]), + ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), + Username, VHost, Collector]), ChPid = rabbit_channel_sup:framing_channel(ChanSup), link(ChPid), put({channel, Channel}, {chpid, ChPid}), -- cgit v1.2.1 From 2c470c48f6d20929d29a543a407e06da63a9028e Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 3 Aug 2010 13:11:27 +0100 Subject: Avoid the unnecessary callbacks in the various new _sups as much as possible by breaking the declarative child specs but trying hard to keep them declarative as much as possible. --- src/rabbit_channel.erl | 66 ++++++++++++++------------------ src/rabbit_channel_sup.erl | 53 ++++++++++++-------------- src/rabbit_connection_sup.erl | 25 ++++++------ src/rabbit_framing_channel.erl | 5 +-- src/rabbit_reader.erl | 86 +++++++++++++++++++++--------------------- src/rabbit_tests.erl | 9 ++--- 6 files changed, 117 insertions(+), 127 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ee771986..3478908f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -82,11 +82,11 @@ -type(ref() :: any()). -type(channel_number() :: non_neg_integer()). --type(pid_fun() :: fun (() -> pid())). -spec(start_link/6 :: - (channel_number(), pid_fun(), pid_fun(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). + (channel_number(), pid(), pid(), rabbit_access_control:username(), + rabbit_types:vhost(), pid()) -> + 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -110,13 +110,9 @@ %%---------------------------------------------------------------------------- -start_link(Channel, GetReader, GetWriter, Username, VHost, CollectorPid) -> - Parent = self(), - {ok, proc_lib:spawn_link( - fun () -> - init_and_go([Channel, Parent, GetReader(), GetWriter(), - Username, VHost, CollectorPid]) - end)}. +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> + gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid, + Username, VHost, CollectorPid], []). do(Pid, Method) -> do(Pid, Method, none). @@ -175,35 +171,31 @@ init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), - #ch{state = starting, - channel = Channel, - parent_pid = ParentPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid, - flow = #flow{server = true, client = true, - pending = none}, - stats_timer = rabbit_event:init_stats_timer()}. - -init_and_go(InitArgs) -> - State = init(InitArgs), + State = #ch{state = starting, + channel = Channel, + parent_pid = ParentPid, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + flow = #flow{server = true, client = true, + pending = none}, + stats_timer = rabbit_event:init_stats_timer()}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), - gen_server2:enter_loop(?MODULE, [], State, self(), hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}). + {ok, State, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -1112,7 +1104,7 @@ fold_per_queue(F, Acc0, UAQ) -> start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> Me = self(), {ok, LPid} = - supervisor2:start_child( + supervisor:start_child( ParentPid, {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, transient, ?MAX_WAIT, worker, [rabbit_limiter]}), diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 1d02d992..b565f236 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/8, writer/1, framing_channel/1, channel/1]). +-export([start_link/8]). -export([init/1]). @@ -47,7 +47,7 @@ (rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> - ignore | rabbit_types:ok_or_error2(pid(), any())). + rabbit_types:ok({pid(), pid()})). -endif. @@ -55,32 +55,29 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) -> - supervisor2:start_link(?MODULE, [Protocol, Sock, Channel, FrameMax, - ReaderPid, Username, VHost, Collector]). - -writer(Pid) -> - hd(supervisor2:find_child(Pid, writer)). - -channel(Pid) -> - hd(supervisor2:find_child(Pid, channel)). - -framing_channel(Pid) -> - hd(supervisor2:find_child(Pid, framing_channel)). + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, WriterPid} = + supervisor2:start_child( + SupPid, + {writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol]}, + permanent, ?MAX_WAIT, worker, [rabbit_writer]}), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ReaderPid, WriterPid, Username, VHost, + Collector]}, + permanent, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, FramingChannelPid} = + supervisor2:start_child( + SupPid, + {framing_channel, {rabbit_framing_channel, start_link, + [ChannelPid, Protocol]}, + permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}), + {ok, {SupPid, FramingChannelPid}}. %%---------------------------------------------------------------------------- -init([Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, - Collector]) -> - Me = self(), - {ok, {{one_for_all, 0, 1}, - [{framing_channel, {rabbit_framing_channel, start_link, - [fun () -> channel(Me) end, Protocol]}, - permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}, - {writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol]}, - permanent, ?MAX_WAIT, worker, [rabbit_writer]}, - {channel, {rabbit_channel, start_link, - [Channel, fun () -> ReaderPid end, - fun () -> writer(Me) end, Username, VHost, Collector]}, - permanent, ?MAX_WAIT, worker, [rabbit_channel]} - ]}}. +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 4ad9d3f0..8d09961f 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -33,28 +33,31 @@ -behaviour(supervisor2). --export([start_link/0, reader/1, channel_sup_sup/1]). +-export([start_link/0, reader/1]). -export([init/1]). -include("rabbit.hrl"). start_link() -> - supervisor2:start_link(?MODULE, []). + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelSupSupPid} = + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + permanent, infinity, supervisor, [rabbit_channel_sup_sup]}), + {ok, _ReaderPid} = + supervisor2:start_child( + SupPid, + {reader, {rabbit_reader, start_link, [ChannelSupSupPid]}, + permanent, ?MAX_WAIT, worker, [rabbit_reader]}), + {ok, SupPid}. init([]) -> {ok, {{one_for_all, 0, 1}, - [{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - permanent, infinity, supervisor, [rabbit_channel_sup_sup]}, - {reader, {rabbit_reader, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_reader]}, - {collector, {rabbit_queue_collector, start_link, []}, + [{collector, {rabbit_queue_collector, start_link, []}, permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]} ]}}. reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). - -channel_sup_sup(Pid) -> - hd(supervisor2:find_child(Pid, channel_sup_sup)). - diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 6ece3436..6ee5a555 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -39,9 +39,8 @@ %%-------------------------------------------------------------------- -start_link(GetChannelPid, Protocol) -> - {ok, proc_lib:spawn_link( - fun () -> mainloop(GetChannelPid(), Protocol) end)}. +start_link(ChannelPid, Protocol) -> + {ok, proc_lib:spawn_link(fun () -> mainloop(ChannelPid, Protocol) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 24bde74d..4b89db47 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/1, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/2]). +-export([init/2, mainloop/2]). -export([server_properties/0]). @@ -60,7 +60,7 @@ %--------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_ref, connection_state, - queue_collector, stats_timer}). + queue_collector, stats_timer, channel_sup_sup_pid}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -144,6 +144,7 @@ -ifdef(use_specs). +-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -152,9 +153,9 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/1 :: (pid()) -> no_return()). --spec(start_connection/4 :: - (pid(), any(), rabbit_networking:socket(), +-spec(init/2 :: (pid(), pid()) -> no_return()). +-spec(start_connection/5 :: + (pid(), pid(), any(), rabbit_networking:socket(), fun ((rabbit_networking:socket()) -> rabbit_types:ok_or_error2( rabbit_networking:socket(), any()))) -> no_return()). @@ -163,17 +164,17 @@ %%-------------------------------------------------------------------------- -start_link() -> - {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +start_link(ChannelSupSupPid) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent) -> +init(Parent, ChannelSupSupPid) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection(Parent, Deb, Sock, SockTransform) + start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) end. system_continue(_Parent, Deb, State) -> @@ -248,7 +249,7 @@ socket_op(Sock, Fun) -> exit(shutdown) end. -start_connection(Parent, Deb, Sock, SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -261,21 +262,23 @@ start_connection(Parent, Deb, Sock, SockTransform) -> [Collector] = supervisor2:find_child(Parent, collector), try mainloop(Deb, switch_callback( - #v1{parent = Parent, - sock = ClientSock, - connection = #connection{ - protocol = none, - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none}, - callback = uninitialized_callback, - recv_ref = none, - connection_state = pre_init, - queue_collector = Collector, - stats_timer = - rabbit_event:init_stats_timer()}, + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + protocol = none, + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none}, + callback = uninitialized_callback, + recv_ref = none, + connection_state = pre_init, + queue_collector = Collector, + stats_timer = + rabbit_event:init_stats_timer(), + channel_sup_sup_pid = ChannelSupSupPid + }, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -797,22 +800,21 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, State = - #v1{queue_collector = Collector, parent = Parent}) -> - #v1{sock = Sock, connection = #connection{ - protocol = Protocol, - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - ChanSupSup = rabbit_connection_sup:channel_sup_sup(Parent), - {ok, ChanSup} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), - Username, VHost, Collector]), - ChPid = rabbit_channel_sup:framing_channel(ChanSup), - link(ChPid), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). +send_to_new_channel(Channel, AnalyzedFrame, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + {ok, {_ChanSup, FrChPid}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, [Protocol, Sock, Channel, FrameMax, + self(), Username, VHost, Collector]), + link(FrChPid), + put({channel, Channel}, {chpid, FrChPid}), + put({chpid, FrChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 79c8a31c..59cfc064 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -938,13 +938,10 @@ test_user_management() -> passed. -make_fun(Result) -> - fun () -> Result end. - test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, make_fun(self()), make_fun(Writer), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, self()), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- @@ -1083,7 +1080,7 @@ test_memory_pressure_spawn() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer), + {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, <<"guest">>, <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok @@ -1157,7 +1154,7 @@ test_memory_pressure() -> alarm_handler:set_alarm({vm_memory_high_watermark, []}), Me = self(), Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), - {ok, Ch4} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer4), + {ok, Ch4} = rabbit_channel:start_link(1, Me, Writer4, <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), ok = test_memory_pressure_flush(Writer4), -- cgit v1.2.1 From 5247d20bd3c9d1c6949c7a2952b6cea17b3293c4 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 3 Aug 2010 14:26:05 +0100 Subject: Reverted 'shutdown' to 'normal' for our own programmatic exits. This then leaves us with a problem with the reader shutting down channels as previously the framing_ch was trapping exits (so Reason of 'normal' gets converted to EXIT msg), but now it's not (Reason of 'normal' does *not* cause process to exit). Thus use framing_channel:shutdown instead . Also, a child may not return {ok, {pid, pid}} from its startup. But it may return {ok, pid, any} where any is passed back to the caller. --- src/rabbit_channel.erl | 28 ++++++++++++------------- src/rabbit_channel_sup.erl | 4 ++-- src/rabbit_queue_collector.erl | 2 +- src/rabbit_reader.erl | 47 +++++++++++++++++++++++++++--------------- src/rabbit_tests.erl | 14 ++++++------- 5 files changed, 54 insertions(+), 41 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3478908f..ca296b60 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -220,16 +220,14 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, shutdown, State#ch{state = terminating}} + {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, shutdown, terminating(Reason#amqp_error{method = MethodName}, + {stop, normal, terminating(Reason#amqp_error{method = MethodName}, State)}; exit:normal -> - {stop, shutdown, State}; - exit:shutdown -> - {stop, shutdown, State}; + {stop, normal, State}; _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -238,7 +236,7 @@ handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; handle_cast(terminate, State) -> - {stop, shutdown, State}; + {stop, normal, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), @@ -269,11 +267,11 @@ handle_cast({conserve_memory, _Conserve}, State) -> handle_cast({flow_timeout, Ref}, State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> - {stop, shutdown, terminating( - rabbit_misc:amqp_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", - [not Flow], none), State)}; + {stop, normal, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; handle_cast({flow_timeout, _Ref}, State) -> {noreply, State}; @@ -284,7 +282,7 @@ handle_cast(emit_stats, State) -> handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, shutdown, State}; + {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> @@ -301,8 +299,10 @@ terminate(_Reason, State = #ch{state = terminating}) -> terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of - shutdown -> ok = Res; - _ -> ok + normal -> ok = Res; + shutdown -> ok = Res; + {shutdown, _Term} -> ok = Res; + _ -> ok end, terminate(State). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index b565f236..17e1446d 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,7 +47,7 @@ (rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> - rabbit_types:ok({pid(), pid()})). + {'ok', pid(), pid()}). -endif. @@ -75,7 +75,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, {framing_channel, {rabbit_framing_channel, start_link, [ChannelPid, Protocol]}, permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}), - {ok, {SupPid, FramingChannelPid}}. + {ok, SupPid, FramingChannelPid}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 2da6c182..ea3768d4 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -90,7 +90,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> {reply, ok, State}; handle_call(shutdown, _From, State) -> - {stop, shutdown, ok, State}. + {stop, normal, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4b89db47..d1d21e16 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -177,8 +177,8 @@ init(Parent, ChannelSupSupPid) -> start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) end. -system_continue(_Parent, Deb, State) -> - ?MODULE:mainloop(Deb, State). +system_continue(Parent, Deb, State) -> + ?MODULE:mainloop(Deb, State = #v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -246,7 +246,7 @@ socket_op(Sock, Fun) -> [self(), Reason]), rabbit_log:info("closing TCP connection ~p~n", [self()]), - exit(shutdown) + exit(normal) end. start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) -> @@ -297,9 +297,9 @@ start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) -> %% %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), - rabbit_event:notify(connection_closed, [{pid, self()}]), - exit(shutdown) - end. + rabbit_event:notify(connection_closed, [{pid, self()}]) + end, + done. mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), @@ -406,13 +406,23 @@ close_channel(Channel, State) -> handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, shutdown, State) -> - erase({chpid, Pid}), - maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> maybe_close(handle_exception(State, Channel, Reason)) + case (case Reason of + shutdown -> controlled; + {shutdown, _Term} -> controlled; + normal -> controlled; + _ -> uncontrolled + end) of + controlled -> + erase({chpid, Pid}), + maybe_close(State); + uncontrolled -> + case channel_cleanup(Pid) of + undefined -> + exit({abnormal_dependent_exit, Pid, Reason}); + Channel -> + maybe_close(handle_exception(State, Channel, Reason)) + end end. channel_cleanup(Pid) -> @@ -426,7 +436,8 @@ channel_cleanup(Pid) -> all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. terminate_channels() -> - NChannels = length([exit(Pid, shutdown) || Pid <- all_channels()]), + NChannels = + length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -450,7 +461,9 @@ wait_for_channel_termination(N, TimerRef) -> exit({abnormal_dependent_exit, Pid, Reason}); Channel -> case Reason of - shutdown -> ok; + normal -> ok; + shutdown -> ok; + {shutdown, _Term} -> ok; _ -> rabbit_log:error( "connection ~p, channel ~p - " @@ -618,8 +631,8 @@ handle_input(Callback, Data, _State) -> %% includes a major and minor version number, Luckily 0-9 and 0-9-1 %% are similar enough that clients will be happy with either. start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, - Protocol, State = #v1{sock = Sock, - connection = Connection}) -> + Protocol, + State = #v1{sock = Sock, connection = Connection}) -> Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, server_properties = server_properties(), @@ -807,7 +820,7 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, {_ChanSup, FrChPid}} = + {ok, _ChanSup, FrChPid} = rabbit_channel_sup_sup:start_channel( ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector]), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 59cfc064..23f30d13 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1088,8 +1088,8 @@ test_spawn(Receiver) -> end, {Writer, Ch}. -expect_shutdown_channel_termination(Ch) -> - receive {'EXIT', Ch, shutdown} -> ok +expect_normal_channel_termination(Ch) -> + receive {'EXIT', Ch, normal} -> ok after 1000 -> throw({channel_failed_to_shutdown, Ch}) end. @@ -1120,7 +1120,7 @@ test_memory_pressure() -> %% if we publish at this point, the channel should die Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), - expect_shutdown_channel_termination(Ch0), + expect_normal_channel_termination(Ch0), gobble_channel_exit(), {Writer1, Ch1} = test_memory_pressure_spawn(), @@ -1132,19 +1132,19 @@ test_memory_pressure() -> ok = test_memory_pressure_receive_flow(true), %% send back the wrong flow_ok. Channel should die. ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - expect_shutdown_channel_termination(Ch1), + expect_normal_channel_termination(Ch1), gobble_channel_exit(), {_Writer2, Ch2} = test_memory_pressure_spawn(), %% just out of the blue, send a flow_ok. Life should end. ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), - expect_shutdown_channel_termination(Ch2), + expect_normal_channel_termination(Ch2), gobble_channel_exit(), {_Writer3, Ch3} = test_memory_pressure_spawn(), ok = rabbit_channel:conserve_memory(Ch3, true), ok = test_memory_pressure_receive_flow(false), - receive {'EXIT', Ch3, shutdown} -> + receive {'EXIT', Ch3, normal} -> ok after 12000 -> throw(channel_failed_to_exit) @@ -1167,7 +1167,7 @@ test_memory_pressure() -> after 1000 -> throw(failed_to_receive_channel_open_ok) end, rabbit_channel:shutdown(Ch4), - expect_shutdown_channel_termination(Ch4), + expect_normal_channel_termination(Ch4), true = process_flag(trap_exit, OldTrap), passed. -- cgit v1.2.1 From b6bddb90d008d4c9ff53a51da96d09bab1cde924 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 6 Aug 2010 17:27:06 +0100 Subject: Removing the wrong changes to supervisor2 to suppress error reporting. Shall fix this one differently. --- src/supervisor2.erl | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 7616e44d..6bc5b1e7 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -31,10 +31,6 @@ %% the MaxT and MaxR parameters to permit the child to be %% restarted. This may require waiting for longer than Delay. %% -%% 4) When the MaxR (intensity) == 0, errors that would otherwise be -%% reported concerning child death, or the reaching of max restart -%% intensity are elided. -%% %% All modifications are (C) 2010 Rabbit Technologies Ltd. %% %% %CopyrightBegin% @@ -349,6 +345,7 @@ handle_call(which_children, _From, State) -> State#state.children), {reply, Resp, State}. + handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) when ?is_simple(State) -> {ok, NState} = do_restart(RestartType, Reason, Child, State), @@ -539,7 +536,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> {ok, NState} end; do_restart(permanent, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State), + report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); do_restart(_, normal, Child, State) -> NState = state_del_child(Child, State), @@ -548,10 +545,10 @@ do_restart(_, shutdown, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; do_restart(transient, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State), + report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); do_restart(temporary, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State), + report_error(child_terminated, Reason, Child, State#state.name), NState = state_del_child(Child, State), {ok, NState}. @@ -560,8 +557,8 @@ restart(Child, State) -> {ok, NState} -> restart(NState#state.strategy, Child, NState, fun restart/2); {terminate, NState} -> - report_error(shutdown, reached_max_restart_intensity, - Child, State), + report_error(shutdown, reached_max_restart_intensity, + Child, State#state.name), {shutdown, remove_child(Child, NState)} end. @@ -680,8 +677,6 @@ shutdown(Pid, brutal_kill) -> {'DOWN', _MRef, process, Pid, OtherReason} -> {error, OtherReason} end; - normal_shutdown -> - ok; {error, Reason} -> {error, Reason} end; @@ -703,8 +698,6 @@ shutdown(Pid, Time) -> {error, OtherReason} end end; - normal_shutdown -> - ok; {error, Reason} -> {error, Reason} end. @@ -725,12 +718,7 @@ monitor_child(Pid) -> {'EXIT', Pid, Reason} -> receive {'DOWN', _, process, Pid, _} -> - case Reason of - normal -> normal_shutdown; - shutdown -> normal_shutdown; - {shutdown, _Terms} -> normal_shutdown; - _ -> {error, Reason} - end + {error, Reason} end after 0 -> %% If a naughty child did unlink and the child dies before @@ -984,10 +972,6 @@ difference({_, TimeS, _}, {_, CurS, _}) -> %%% Error and progress reporting. %%% ------------------------------------------------------ -report_error(_Error, _Reason, _Child, #state{intensity = 0}) -> - ok; -report_error(Error, Reason, Child, #state{name=Name}) -> - report_error(Error, Reason, Child, Name); report_error(Error, Reason, Child, SupName) -> ErrorMsg = [{supervisor, SupName}, {errorContext, Error}, -- cgit v1.2.1 From 98f9798d30f6e00c52e56916e9c33551cd69924c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 6 Aug 2010 17:55:16 +0100 Subject: Added intrinsic restart type and used it --- src/rabbit_channel_sup.erl | 8 ++++---- src/rabbit_connection_sup.erl | 16 +++++++++------- src/rabbit_heartbeat.erl | 4 ++-- src/rabbit_reader.erl | 25 +++++++++++++------------ src/supervisor2.erl | 19 ++++++++++++++++++- 5 files changed, 46 insertions(+), 26 deletions(-) diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 17e1446d..e4dcbae1 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -61,23 +61,23 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, SupPid, {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax, Protocol]}, - permanent, ?MAX_WAIT, worker, [rabbit_writer]}), + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, Username, VHost, Collector]}, - permanent, ?MAX_WAIT, worker, [rabbit_channel]}), + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, FramingChannelPid} = supervisor2:start_child( SupPid, {framing_channel, {rabbit_framing_channel, start_link, [ChannelPid, Protocol]}, - permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}), + intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), {ok, SupPid, FramingChannelPid}. %%---------------------------------------------------------------------------- init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. + {ok, {{one_for_all, 10, 10}, []}}. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 8d09961f..f097f80a 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -41,23 +41,25 @@ start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, Collector} = + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), {ok, ChannelSupSupPid} = supervisor2:start_child( SupPid, {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - permanent, infinity, supervisor, [rabbit_channel_sup_sup]}), + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), {ok, _ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, [ChannelSupSupPid]}, - permanent, ?MAX_WAIT, worker, [rabbit_reader]}), + {reader, {rabbit_reader, start_link, [ChannelSupSupPid, Collector]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid}. init([]) -> - {ok, {{one_for_all, 0, 1}, - [{collector, {rabbit_queue_collector, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]} - ]}}. + {ok, {{one_for_all, 10, 10}, []}}. reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index d694011a..b277de70 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -64,12 +64,12 @@ start_heartbeat(Sup, Sock, TimeoutSec) -> supervisor:start_child( Sup, {heartbeat_sender, {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {ok, Receiver} = supervisor:start_child( Sup, {heartbeat_receiver, {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {Sender, Receiver}. start_heartbeat_sender(Sock, TimeoutSec) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index eaf56886..5c0dee73 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/1, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/2, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/2, mainloop/2]). +-export([init/3, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -161,7 +161,7 @@ -ifdef(use_specs). --spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())). +-spec(start_link/2 :: (pid(), pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -171,9 +171,9 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/2 :: (pid(), pid()) -> no_return()). --spec(start_connection/5 :: - (pid(), pid(), any(), rabbit_networking:socket(), +-spec(init/3 :: (pid(), pid(), pid()) -> no_return()). +-spec(start_connection/6 :: + (pid(), pid(), pid(), any(), rabbit_networking:socket(), fun ((rabbit_networking:socket()) -> rabbit_types:ok_or_error2( rabbit_networking:socket(), any()))) -> no_return()). @@ -182,17 +182,18 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSupSupPid) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid])}. +start_link(ChannelSupSupPid, Collector) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, Collector])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChannelSupSupPid) -> +init(Parent, ChannelSupSupPid, Collector) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) + start_connection( + Parent, ChannelSupSupPid, Collector, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -271,7 +272,8 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock, + SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -281,7 +283,6 @@ start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - [Collector] = supervisor2:find_child(Parent, collector), try mainloop(Deb, switch_callback( #v1{parent = Parent, diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 6bc5b1e7..682faba1 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -31,6 +31,14 @@ %% the MaxT and MaxR parameters to permit the child to be %% restarted. This may require waiting for longer than Delay. %% +%% 4) Added an 'intrinsic' restart type. This type means that the +%% child should never be restarted (same as temporary) but whenever +%% such a child exits, it will cause the entire supervisor to exit +%% (i.e. the child's existence is intrinsic to the supervisor's +%% existence). Because such children are never restarted, the +%% supervisor's restart strategy, MaxT and MaxR have no bearing on +%% such children. +%% %% All modifications are (C) 2010 Rabbit Technologies Ltd. %% %% %CopyrightBegin% @@ -525,6 +533,14 @@ restart_child(Pid, Reason, State) -> {ok, State} end. +do_restart(intrinsic, Reason, Child, State) -> + case Reason of + normal -> ok; + shutdown -> ok; + {shutdown, _Term} -> ok; + _ -> report_error(child_terminated, Reason, Child, State#state.name) + end, + {shutdown, remove_child(Child, State)}; do_restart({RestartType, Delay}, Reason, Child, State) -> case restart1(Child, State) of {ok, NState} -> @@ -838,7 +854,7 @@ supname(N,_) -> N. %%% where Name is an atom %%% Func is {Mod, Fun, Args} == {atom, atom, list} %%% RestartType is permanent | temporary | transient | -%%% {permanent, Delay} | +%%% intrinsic | {permanent, Delay} | %%% {transient, Delay} where Delay >= 0 %%% Shutdown = integer() | infinity | brutal_kill %%% ChildType = supervisor | worker @@ -885,6 +901,7 @@ validFunc({M, F, A}) when is_atom(M), is_list(A) -> true; validFunc(Func) -> throw({invalid_mfa, Func}). +validRestartType(intrinsic) -> true; validRestartType(permanent) -> true; validRestartType(temporary) -> true; validRestartType(transient) -> true; -- cgit v1.2.1 From 2222b5fd4e118ec335d29d87d634bf37f3f22610 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 9 Aug 2010 17:09:17 +0100 Subject: Flush the writer much earlier on, allowing the actual termination to be less risky --- src/rabbit_channel.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f8d4f307..e93b6665 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -202,7 +202,8 @@ handle_cast({method, Method, Content}, State) -> noreply(NewState); {noreply, NewState} -> noreply(NewState); - stop -> + flush_and_stop -> + rabbit_writer:flush(State#ch.writer_pid), {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> @@ -409,7 +410,7 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rollback_and_notify(State), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - stop; + flush_and_stop; handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -1099,10 +1100,9 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> +terminate(#ch{limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]), - rabbit_writer:flush(WriterPid), rabbit_limiter:shutdown(LimiterPid). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -- cgit v1.2.1 From 1bd3c7adc0b94c5d9a8bceb5166aceba1fa52d70 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 9 Aug 2010 17:10:19 +0100 Subject: Arbitrarily decide to make the channel_sup_sup the very first thing that gets started and thus the last that gets stopped --- src/rabbit_connection_sup.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index f097f80a..5993044c 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -41,16 +41,16 @@ start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, Collector} = - supervisor2:start_child( - SupPid, - {collector, {rabbit_queue_collector, start_link, []}, - intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), {ok, ChannelSupSupPid} = supervisor2:start_child( SupPid, {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + {ok, Collector} = + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), {ok, _ReaderPid} = supervisor2:start_child( SupPid, -- cgit v1.2.1 From bb0832b6e8fb747b56c7e4fad91b3e14909dab38 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 9 Aug 2010 17:14:10 +0100 Subject: Given changes to intrinsic, if we do ever try to restart (abnormal exit), we should hit the max restart intensity immediately --- src/rabbit_channel_sup.erl | 2 +- src/rabbit_connection_sup.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index e4dcbae1..9e68b497 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -80,4 +80,4 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, %%---------------------------------------------------------------------------- init([]) -> - {ok, {{one_for_all, 10, 10}, []}}. + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 5993044c..aee8d987 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -59,7 +59,7 @@ start_link() -> {ok, SupPid}. init([]) -> - {ok, {{one_for_all, 10, 10}, []}}. + {ok, {{one_for_all, 0, 1}, []}}. reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). -- cgit v1.2.1 From d08175070c9e38e9d9993ea8e1abf046c4b95732 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 9 Aug 2010 18:45:37 +0100 Subject: Fixes all over. Still more to do, though mainly API fixes. Functionality seems to be correct. --- src/rabbit_channel.erl | 15 +++--------- src/rabbit_channel_sup.erl | 4 ++-- src/rabbit_framing_channel.erl | 5 ++-- src/rabbit_heartbeat.erl | 54 +++++++++++++++++++----------------------- src/rabbit_limiter.erl | 9 +------ src/rabbit_reader.erl | 14 ++++++----- src/rabbit_writer.erl | 34 ++++++++++++++++---------- 7 files changed, 63 insertions(+), 72 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e93b6665..c45b2cc7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -153,8 +153,6 @@ flush(Pid) -> init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> - process_flag(trap_exit, true), - link(WriterPid), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, channel = Channel, @@ -243,12 +241,6 @@ handle_cast(emit_stats, State) -> internal_emit_stats(State), {noreply, State}. -handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, - State = #ch{writer_pid = WriterPid}) -> - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, normal, State}; -handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. @@ -1025,7 +1017,7 @@ fold_per_queue(F, Acc0, UAQ) -> start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> Me = self(), {ok, LPid} = - supervisor:start_child( + supervisor2:start_child( ParentPid, {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, transient, ?MAX_WAIT, worker, [rabbit_limiter]}), @@ -1100,10 +1092,9 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(#ch{limiter_pid = LimiterPid}) -> +terminate(_State) -> pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]), - rabbit_limiter:shutdown(LimiterPid). + rabbit_event:notify(channel_closed, [{pid, self()}]). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9e68b497..9011db73 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -60,7 +60,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, supervisor2:start_child( SupPid, {writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol]}, + [Sock, Channel, FrameMax, Protocol, ReaderPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), {ok, ChannelPid} = supervisor2:start_child( @@ -73,7 +73,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, supervisor2:start_child( SupPid, {framing_channel, {rabbit_framing_channel, start_link, - [ChannelPid, Protocol]}, + [ReaderPid, ChannelPid, Protocol]}, intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), {ok, SupPid, FramingChannelPid}. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 08aaafe1..cb53185f 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,15 +32,14 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/2, process/2, shutdown/1]). +-export([start_link/3, process/2, shutdown/1]). %% internal -export([mainloop/3]). %%-------------------------------------------------------------------- -start_link(ChannelPid, Protocol) -> - Parent = self(), +start_link(Parent, ChannelPid, Protocol) -> {ok, proc_lib:spawn_link( fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index b277de70..264dbb68 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,8 +32,8 @@ -module(rabbit_heartbeat). -export([start_heartbeat/3, pause_monitor/1, resume_monitor/1, - start_heartbeat_sender/2, - start_heartbeat_receiver/2]). + start_heartbeat_sender/3, + start_heartbeat_receiver/3]). -include("rabbit.hrl"). @@ -45,9 +45,11 @@ -spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) -> pids()). --spec(start_heartbeat_sender/2 :: (rabbit_net:socket(), non_neg_integer()) -> +-spec(start_heartbeat_sender/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> rabbit_types:ok(pid())). --spec(start_heartbeat_receiver/2 :: (rabbit_net:socket(), non_neg_integer()) -> +-spec(start_heartbeat_receiver/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (pids()) -> 'ok'). @@ -60,41 +62,41 @@ start_heartbeat(_Sup, _Sock, 0) -> none; start_heartbeat(Sup, Sock, TimeoutSec) -> + Parent = self(), {ok, Sender} = - supervisor:start_child( + supervisor2:start_child( Sup, {heartbeat_sender, - {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]}, - permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {?MODULE, start_heartbeat_sender, [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {ok, Receiver} = - supervisor:start_child( + supervisor2:start_child( Sup, {heartbeat_receiver, - {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]}, - permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {?MODULE, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {Sender, Receiver}. -start_heartbeat_sender(Sock, TimeoutSec) -> +start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - Parent = self(), {ok, proc_lib:spawn_link( - fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, - send_oct, 0, + fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), continue - end}, Parent) + end}, {0, 0}) end)}. -start_heartbeat_receiver(Sock, TimeoutSec) -> +start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - Parent = self(), {ok, proc_lib:spawn_link( - fun () -> heartbeater({Sock, TimeoutSec * 1000, - recv_oct, 1, - fun () -> exit(timeout) end}, Parent) end)}. + fun () -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, + fun () -> + Parent ! timeout, + stop + end}, {0, 0}) end)}. pause_monitor(none) -> ok; @@ -110,19 +112,12 @@ resume_monitor({_Sender, Receiver}) -> %%---------------------------------------------------------------------------- -heartbeater(Params, Parent) -> - heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). - heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - MonitorRef, {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, + {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, V) end, receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; pause -> receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; resume -> Recurse({0, 0}); Other -> @@ -139,6 +134,7 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Recurse({NewStatVal, SameCount + 1}); true -> case Handler() of + stop -> ok; continue -> Recurse({NewStatVal, 0}) end end; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 813ccc75..fccf5f34 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -35,7 +35,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/2, shutdown/1]). +-export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -46,7 +46,6 @@ -type(maybe_pid() :: pid() | 'undefined'). -spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())). --spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). @@ -76,12 +75,6 @@ start_link(ChPid, UnackedMsgCount) -> gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). -shutdown(undefined) -> - ok; -shutdown(LimiterPid) -> - true = unlink(LimiterPid), - gen_server2:cast(LimiterPid, shutdown). - limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 6e336c86..e0f4d6ec 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -372,6 +372,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> true -> throw({handshake_timeout, State#v1.callback}) end; + timeout -> + throw({timeout, State#v1.connection_state}); {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -460,9 +462,9 @@ handle_channel_exit(Channel, Reason, State) -> handle_dependent_exit(Pid, Reason, State) -> case (case Reason of + normal -> controlled; shutdown -> controlled; {shutdown, _Term} -> controlled; - normal -> controlled; _ -> uncontrolled end) of controlled -> @@ -878,14 +880,14 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, _ChanSup, FrChPid} = + {ok, _ChanSup, ChPid} = rabbit_channel_sup_sup:start_channel( ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector]), - link(FrChPid), - put({channel, Channel}, {chpid, FrChPid}), - put({chpid, FrChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame). + link(ChPid), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 483b46f7..faeec406 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/4, start_link/4, flush/1, mainloop/1]). +-export([start/5, start_link/5, flush/1, mainloop/2, mainloop1/2]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). @@ -48,13 +48,13 @@ -ifdef(use_specs). --spec(start/4 :: +-spec(start/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). --spec(start_link/4 :: +-spec(start_link/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -85,25 +85,35 @@ %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol) -> +start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> {ok, - proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock, + proc_lib:spawn(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}])}. -start_link(Sock, Channel, FrameMax, Protocol) -> +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> {ok, - proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}])}. -mainloop(State) -> +mainloop(ReaderPid, State) -> + try + mainloop1(ReaderPid, State) + catch + exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + end, + done. + +mainloop1(ReaderPid, State) -> receive - Message -> ?MODULE:mainloop(handle_message(Message, State)) + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) + erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) end. handle_message({send_command, MethodRecord}, -- cgit v1.2.1 From 638000f10c8eae60982d8f4184308884a5ed9216 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 9 Aug 2010 18:59:24 +0100 Subject: When a supervisor is killing off its children, if its children have already exited normally, don't class it as an error - after all, the supervisor traps_exits, so the relevant exit signals will likely already be in the supervisor's mailbox --- src/supervisor2.erl | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 773d67d3..5cb2c301 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -695,10 +695,13 @@ shutdown(Pid, Time) -> ok -> exit(Pid, shutdown), %% Try to shutdown gracefully receive - {'DOWN', _MRef, process, Pid, shutdown} -> - ok; - {'DOWN', _MRef, process, Pid, OtherReason} -> - {error, OtherReason} + {'DOWN', _MRef, process, Pid, Reason} -> + case Reason of + normal -> ok; + shutdown -> ok; + noproc -> ok; + _ -> {error, Reason} + end after Time -> exit(Pid, kill), %% Force termination. receive @@ -726,7 +729,12 @@ monitor_child(Pid) -> {'EXIT', Pid, Reason} -> receive {'DOWN', _, process, Pid, _} -> - {error, Reason} + case Reason of + normal -> ok; + shutdown -> ok; + noproc -> ok; + _ -> {error, Reason} + end end after 0 -> %% If a naughty child did unlink and the child dies before -- cgit v1.2.1 From 601f79c7e42ee9755999a4bc4f54b6ec59dc4598 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 10 Aug 2010 14:02:41 +0100 Subject: Abstract out the limiter creation. The abstraction made ensures the channel never directly calls supervisor(2)?:.*, nor does it have any knowledge of the channel_sup. --- src/rabbit_channel.erl | 33 +++++++++++++++------------------ src/rabbit_channel_sup.erl | 12 +++++++++++- src/rabbit_tests.erl | 6 ++++-- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c45b2cc7..050a5425 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/6, do/2, do/3, shutdown/1]). +-export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1, flush/1]). @@ -43,8 +43,8 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). --record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid, - transaction_id, tx_participants, next_tag, +-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, + start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer}). @@ -76,9 +76,10 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/6 :: +-spec(start_link/7 :: (channel_number(), pid(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> + rabbit_types:vhost(), pid(), + fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -101,9 +102,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid, - Username, VHost, CollectorPid], []). +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun) -> + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username, + VHost, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -151,15 +153,15 @@ flush(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, - CollectorPid]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun]) -> ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, channel = Channel, - parent_pid = ParentPid, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, + start_limiter_fun = StartLimiterFun, transaction_id = none, tx_participants = sets:new(), next_tag = 1, @@ -1014,13 +1016,8 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> - Me = self(), - {ok, LPid} = - supervisor2:start_child( - ParentPid, - {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, - transient, ?MAX_WAIT, worker, [rabbit_limiter]}), +start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> + {ok, LPid} = SLF(queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9011db73..23058bfe 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -67,7 +67,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, SupPid, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, Username, VHost, - Collector]}, + Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, FramingChannelPid} = supervisor2:start_child( @@ -81,3 +81,13 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, init([]) -> {ok, {{one_for_all, 0, 1}, []}}. + +start_limiter_fun(SupPid) -> + fun (UnackedCount) -> + Me = self(), + {ok, _Pid} = + supervisor2:start_child( + SupPid, + {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 55897679..f861cedd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1020,7 +1020,8 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - <<"user">>, <<"/">>, self()), + <<"user">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1117,7 +1118,8 @@ test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, - <<"guest">>, <<"/">>, self()), + <<"guest">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) -- cgit v1.2.1 From 6fe45dcc722c13b093c90c934a4743adbfa17d9e Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 10 Aug 2010 15:05:21 +0100 Subject: Abstract the heartbeaters in the same way - the reader now has no references to supervisor(2)?:.* nor does heartbeater. --- src/rabbit_connection_sup.erl | 23 +++++++++++++++++++++- src/rabbit_heartbeat.erl | 31 +++++++---------------------- src/rabbit_reader.erl | 46 +++++++++++++++++++++++++------------------ 3 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index aee8d987..354540c1 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -54,7 +54,8 @@ start_link() -> {ok, _ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, [ChannelSupSupPid, Collector]}, + {reader, {rabbit_reader, start_link, + [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid}. @@ -63,3 +64,23 @@ init([]) -> reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). + +start_heartbeat_fun(SupPid) -> + fun (_Sock, 0) -> + none; + (Sock, TimeoutSec) -> + Parent = self(), + {ok, Sender} = + supervisor2:start_child( + SupPid, {heartbeat_sender, + {rabbit_heartbeat, start_heartbeat_sender, + [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, Receiver} = + supervisor2:start_child( + SupPid, {heartbeat_receiver, + {rabbit_heartbeat, start_heartbeat_receiver, + [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {Sender, Receiver} + end. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 264dbb68..61ef5efb 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,9 +31,8 @@ -module(rabbit_heartbeat). --export([start_heartbeat/3, pause_monitor/1, resume_monitor/1, - start_heartbeat_sender/3, - start_heartbeat_receiver/3]). +-export([start_heartbeat_sender/3, start_heartbeat_receiver/3, + pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -41,10 +40,10 @@ -ifdef(use_specs). --type(pids() :: rabbit_types:maybe({pid(), pid()})). +-export_type([heartbeaters/0]). + +-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). --spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) -> - pids()). -spec(start_heartbeat_sender/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) -> rabbit_types:ok(pid())). @@ -52,29 +51,13 @@ (pid(), rabbit_net:socket(), non_neg_integer()) -> rabbit_types:ok(pid())). --spec(pause_monitor/1 :: (pids()) -> 'ok'). --spec(resume_monitor/1 :: (pids()) -> 'ok'). +-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). +-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_heartbeat(_Sup, _Sock, 0) -> - none; -start_heartbeat(Sup, Sock, TimeoutSec) -> - Parent = self(), - {ok, Sender} = - supervisor2:start_child( - Sup, {heartbeat_sender, - {?MODULE, start_heartbeat_sender, [Parent, Sock, TimeoutSec]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, Receiver} = - supervisor2:start_child( - Sup, {heartbeat_receiver, - {?MODULE, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {Sender, Receiver}. - start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e0f4d6ec..69f6773f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/2, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/3, mainloop/2]). +-export([init/4, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -61,7 +61,7 @@ -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid}). + channel_sup_sup_pid, start_heartbeat_fun}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -161,7 +161,12 @@ -ifdef(use_specs). --spec(start_link/2 :: (pid(), pid()) -> rabbit_types:ok(pid())). +-type(start_heartbeat_fun() :: + fun ((rabbit_networking:socket(), non_neg_integer()) -> + rabbit_heartbeat:heartbeaters())). + +-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> + rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -171,9 +176,10 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/3 :: (pid(), pid(), pid()) -> no_return()). --spec(start_connection/6 :: - (pid(), pid(), pid(), any(), rabbit_networking:socket(), +-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(start_connection/7 :: + (pid(), pid(), pid(), start_heartbeat_fun(), any(), + rabbit_networking:socket(), fun ((rabbit_networking:socket()) -> rabbit_types:ok_or_error2( rabbit_networking:socket(), any()))) -> no_return()). @@ -182,18 +188,20 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSupSupPid, Collector) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, Collector])}. +start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, + Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChannelSupSupPid, Collector) -> +init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ChannelSupSupPid, Collector, Deb, Sock, SockTransform) + Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + SockTransform) end. system_continue(Parent, Deb, State) -> @@ -272,8 +280,8 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock, - SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, + Sock, SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -302,7 +310,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock, heartbeater = none, stats_timer = rabbit_event:init_stats_timer(), - channel_sup_sup_pid = ChannelSupSupPid + channel_sup_sup_pid = ChannelSupSupPid, + start_heartbeat_fun = StartHeartbeatFun }, handshake, 8)) catch @@ -754,10 +763,10 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, client_properties = ClientProperties}}; handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, - State = #v1{parent = Parent, - connection_state = tuning, + State = #v1{connection_state = tuning, connection = Connection, - sock = Sock}) -> + sock = Sock, + start_heartbeat_fun = SHF}) -> if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w < ~w min size", @@ -767,8 +776,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = - rabbit_heartbeat:start_heartbeat(Parent, Sock, ClientHeartbeat), + Heartbeater = SHF(Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, -- cgit v1.2.1 From ca84b554b7a4ec4dadd951c0374829fa13fed5cb Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 10 Aug 2010 15:11:31 +0100 Subject: I erroneously removed trap_exit on channel yesterday. It needs to be on because otherwise the gen_server won't call terminate when it's sent a shutdown signal --- src/rabbit_channel.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 050a5425..c68bb77d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -155,6 +155,7 @@ flush(Pid) -> init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, StartLimiterFun]) -> + process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, channel = Channel, -- cgit v1.2.1 From 5301419aade68db6925f44d10b50bb770fcbc207 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 10 Aug 2010 15:27:58 +0100 Subject: Remove dead code - collector is never explicitly shutdown: the supervisor will kill it off appropriately --- src/rabbit_queue_collector.erl | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 9257ec82..cb8b7b21 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -33,7 +33,7 @@ -behaviour(gen_server). --export([start_link/0, register/2, delete_all/1, shutdown/1]). +-export([start_link/0, register/2, delete_all/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -49,7 +49,6 @@ -spec(start_link/0 :: () -> rabbit_types:ok(pid())). -spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). --spec(shutdown/1 :: (pid()) -> 'ok'). -endif. @@ -64,9 +63,6 @@ register(CollectorPid, Q) -> delete_all(CollectorPid) -> gen_server:call(CollectorPid, delete_all, infinity). -shutdown(CollectorPid) -> - gen_server:cast(CollectorPid, shutdown). - %%---------------------------------------------------------------------------- init([]) -> @@ -90,8 +86,8 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> || {MonitorRef, Q} <- dict:to_list(Queues)], {reply, ok, State}. -handle_cast(shutdown, State) -> - {stop, normal, State}. +handle_cast(Msg, State) -> + {stop, {unhandled_cast, Msg}, State}. handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State = #state{queues = Queues}) -> -- cgit v1.2.1 From a6a531f2147831521f5a1e2f4811de263a6a78fe Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 11 Aug 2010 16:16:15 +0100 Subject: Get the reader to link to the chan_sup instead of the framing_chan. It used to link to the framing chan, which meant that when it got the exit signal from the framing_chan, it could be sure that the channel and writer had already died. However, this is no longer the case - now the framing_chan is actually the last to start and first to exit in the chan_sup and so the reader needs to link to the chan_sup instead. This means the reader needs to track both the framing_chan and the chan_sup --- src/rabbit_reader.erl | 63 ++++++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 69f6773f..313b7aaf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -367,10 +367,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> exit(Reason); {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_channel_exit(Channel, Reason, State)); - {'EXIT', Pid, Reason} -> - mainloop(Deb, handle_dependent_exit(Pid, Reason, State)); + {channel_exit, ChannelOrFrPid, Reason} -> + mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); + {'EXIT', ChSupPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -463,13 +463,13 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) -> - {channel, Channel} = get({chpid, ChPid}), +handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> + {channel, Channel} = get({ch_fr_pid, ChFrPid}), handle_exception(State, Channel, Reason); handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, Reason, State) -> +handle_dependent_exit(ChSupPid, Reason, State) -> case (case Reason of normal -> controlled; shutdown -> controlled; @@ -477,30 +477,36 @@ handle_dependent_exit(Pid, Reason, State) -> _ -> uncontrolled end) of controlled -> - erase({chpid, Pid}), + case erase({ch_sup_pid, ChSupPid}) of + undefined -> ok; + {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) + end, maybe_close(State); uncontrolled -> - case channel_cleanup(Pid) of + case channel_cleanup(ChSupPid) of undefined -> - exit({abnormal_dependent_exit, Pid, Reason}); + exit({abnormal_dependent_exit, ChSupPid, Reason}); Channel -> maybe_close(handle_exception(State, Channel, Reason)) end end. -channel_cleanup(Pid) -> - case get({chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> erase({channel, Channel}), - erase({chpid, Pid}), - Channel +channel_cleanup(ChSupPid) -> + case get({ch_sup_pid, ChSupPid}) of + undefined -> undefined; + {{channel, Channel}, ChFr} -> erase({channel, Channel}), + erase(ChFr), + erase({ch_sup_pid, ChSupPid}), + Channel end. -all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. +all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, + {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. terminate_channels() -> NChannels = - length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]), + length([rabbit_framing_channel:shutdown(ChFrPid) + || ChFrPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -518,10 +524,10 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'EXIT', Pid, Reason} -> - case channel_cleanup(Pid) of + {'EXIT', ChSupPid, Reason} -> + case channel_cleanup(ChSupPid) of undefined -> - exit({abnormal_dependent_exit, Pid, Reason}); + exit({abnormal_dependent_exit, ChSupPid, Reason}); Channel -> case Reason of normal -> ok; @@ -581,8 +587,8 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of - {chpid, ChPid} -> - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), + {ch_fr_pid, ChFrPid} -> + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), @@ -888,14 +894,15 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, _ChanSup, ChPid} = + {ok, ChSupPid, ChFrPid} = rabbit_channel_sup_sup:start_channel( ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector]), - link(ChPid), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). + link(ChSupPid), + put({channel, Channel}, {ch_fr_pid, ChFrPid}), + put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), + put({ch_fr_pid, ChFrPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", -- cgit v1.2.1 From ac13eb7afdd0743f28fd30583061daec99855498 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 13 Aug 2010 05:58:05 +0100 Subject: cosmetic --- src/rabbit_limiter.erl | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index fccf5f34..b9338a6e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -157,9 +157,6 @@ handle_call(unblock, _From, State) -> {stop, State1} -> {stop, normal, stopped, State1} end. -handle_cast(shutdown, State) -> - {stop, normal, State}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count -- cgit v1.2.1 From 6312508ec3594bb3eb74f8b1a008f15835ab072a Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 13 Aug 2010 05:58:16 +0100 Subject: remove dead code --- src/rabbit_writer.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index faeec406..7c2da24f 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -56,6 +56,7 @@ (rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -79,7 +80,6 @@ rabbit_framing:amqp_method_record(), rabbit_types:content(), non_neg_integer(), rabbit_types:protocol()) -> 'ok'). --spec(flush/1 :: (pid()) -> 'ok'). -endif. -- cgit v1.2.1 From ec46c466d0f19bd3e8c7476a311d49b9fab8829d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 13 Aug 2010 06:01:45 +0100 Subject: revert the change from 'stop' to 'flush_stop' and move the writer:flush call to a place where the need for it is more obvious. --- src/rabbit_channel.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c68bb77d..01d0c38e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -203,8 +203,7 @@ handle_cast({method, Method, Content}, State) -> noreply(NewState); {noreply, NewState} -> noreply(NewState); - flush_and_stop -> - rabbit_writer:flush(State#ch.writer_pid), + stop -> {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> @@ -405,7 +404,8 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rollback_and_notify(State), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - flush_and_stop; + ok = rabbit_writer:flush(WriterPid), + stop; handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -- cgit v1.2.1 From 0217c09bdd9e1365135f8f477192571d8605b3bd Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 13 Aug 2010 06:08:40 +0100 Subject: refactor: extract termination Reason analysis from handle_dependent_exit and wait_for_channel_termination --- src/rabbit_reader.erl | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 313b7aaf..1d2dd166 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -470,12 +470,7 @@ handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). handle_dependent_exit(ChSupPid, Reason, State) -> - case (case Reason of - normal -> controlled; - shutdown -> controlled; - {shutdown, _Term} -> controlled; - _ -> uncontrolled - end) of + case termination_kind(Reason) of controlled -> case erase({ch_sup_pid, ChSupPid}) of undefined -> ok; @@ -529,11 +524,10 @@ wait_for_channel_termination(N, TimerRef) -> undefined -> exit({abnormal_dependent_exit, ChSupPid, Reason}); Channel -> - case Reason of - normal -> ok; - shutdown -> ok; - {shutdown, _Term} -> ok; - _ -> + case termination_kind(Reason) of + controlled -> + ok; + uncontrolled -> rabbit_log:error( "connection ~p, channel ~p - " "error while terminating:~n~p~n", @@ -558,6 +552,11 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. +termination_kind(normal) -> controlled; +termination_kind(shutdown) -> controlled; +termination_kind({shutdown, _Term}) -> controlled; +termination_kind(_) -> uncontrolled. + handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) -- cgit v1.2.1 From 171f2949e080cb910a5681f5578ea562882a3ae1 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 13 Aug 2010 14:54:20 +0100 Subject: Remove writer flush and improve sync methods --- src/rabbit_channel.erl | 3 +-- src/rabbit_writer.erl | 46 ++++++++++++++++++++-------------------------- 2 files changed, 21 insertions(+), 28 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 493b1542..835d3f0d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -403,8 +403,7 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rollback_and_notify(State), - ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - ok = rabbit_writer:flush(WriterPid), + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), stop; handle_method(#'access.request'{},_, State) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 7c2da24f..9242593f 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,9 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, flush/1, mainloop/2, mainloop1/2]). --export([send_command/2, send_command/3, send_command_and_signal_back/3, - send_command_and_signal_back/4, send_command_and_notify/5]). +-export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([send_command/2, send_command/3, send_command_sync/2, + send_command_sync/3, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). @@ -56,17 +56,15 @@ (rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). --spec(flush/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(send_command_and_signal_back/3 :: - (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok'). --spec(send_command_and_signal_back/4 :: - (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid()) - -> 'ok'). +-spec(send_command_sync/2 :: + (pid(), rabbit_framing:amqp_method()) -> 'ok'). +-spec(send_command_sync/3 :: + (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) @@ -129,20 +127,20 @@ handle_message({send_command, MethodRecord, Content}, ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, Protocol), State; -handle_message({send_command_and_signal_back, MethodRecord, Parent}, +handle_message({'$writer_call', From, MethodRecord}, State = #wstate{sock = Sock, channel = Channel, protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), - Parent ! rabbit_writer_send_command_signal, + gen_server:reply(From, ok), State; -handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, +handle_message({'$writer_call', From, {MethodRecord, Content}}, State = #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, Protocol), - Parent ! rabbit_writer_send_command_signal, + gen_server:reply(From, ok), State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, @@ -157,9 +155,6 @@ handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); -handle_message({flush, Pid, Ref}, State) -> - Pid ! Ref, - State; handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). @@ -173,22 +168,21 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. -send_command_and_signal_back(W, MethodRecord, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Parent}, - ok. +send_command_sync(W, MethodRecord) -> + call(W, MethodRecord). -send_command_and_signal_back(W, MethodRecord, Content, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, - ok. +send_command_sync(W, MethodRecord, Content) -> + call(W, {MethodRecord, Content}). send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -flush(W) -> - Ref = make_ref(), - W ! {flush, self(), Ref}, - receive Ref -> ok end. +%--------------------------------------------------------------------------- + +call(Pid, Msg) -> + {ok, Res} = gen:call(Pid, '$writer_call', Msg, infinity), + Res. %--------------------------------------------------------------------------- -- cgit v1.2.1 From 0cba9d88b0054c2dfd503c626d9f64efe297b975 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 13 Aug 2010 15:02:48 +0100 Subject: Cosmetics --- src/rabbit_writer.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 9242593f..fd5b5ba5 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -127,13 +127,13 @@ handle_message({send_command, MethodRecord, Content}, ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, Protocol), State; -handle_message({'$writer_call', From, MethodRecord}, +handle_message({send_command_sync, From, MethodRecord}, State = #wstate{sock = Sock, channel = Channel, protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), gen_server:reply(From, ok), State; -handle_message({'$writer_call', From, {MethodRecord, Content}}, +handle_message({send_command_sync, From, {MethodRecord, Content}}, State = #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, @@ -169,10 +169,10 @@ send_command(W, MethodRecord, Content) -> ok. send_command_sync(W, MethodRecord) -> - call(W, MethodRecord). + call(W, send_command_sync, MethodRecord). send_command_sync(W, MethodRecord, Content) -> - call(W, {MethodRecord, Content}). + call(W, send_command_sync, {MethodRecord, Content}). send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, @@ -180,8 +180,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> %--------------------------------------------------------------------------- -call(Pid, Msg) -> - {ok, Res} = gen:call(Pid, '$writer_call', Msg, infinity), +call(Pid, Label, Msg) -> + {ok, Res} = gen:call(Pid, Label, Msg, infinity), Res. %--------------------------------------------------------------------------- -- cgit v1.2.1 From 45ab05142b64d1f047c3a855565765c63a3d2f4f Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 13 Aug 2010 15:49:56 +0100 Subject: Specs, moving things around, and removing dead code --- src/rabbit_channel_sup.erl | 17 ++++++++++------- src/rabbit_channel_sup_sup.erl | 20 +++++++++++++++++--- src/rabbit_connection_sup.erl | 19 ++++++++++++++++--- src/rabbit_limiter.erl | 12 ++---------- src/rabbit_reader.erl | 4 ++-- 5 files changed, 47 insertions(+), 25 deletions(-) diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 23058bfe..02199a65 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/8]). +-export([start_link/1]). -export([init/1]). @@ -43,18 +43,21 @@ -ifdef(use_specs). --spec(start_link/8 :: - (rabbit_types:protocol(), rabbit_net:socket(), +-export_type([start_link_args/0]). + +-type(start_link_args() :: + {rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> - {'ok', pid(), pid()}). + rabbit_access_control:username(), rabbit_types:vhost(), pid()}). + +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). -endif. %%---------------------------------------------------------------------------- -start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, - Collector) -> +start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, + Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = supervisor2:start_child( diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 2fab8678..98ad5fc0 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -37,13 +37,27 @@ -export([init/1]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> + {'ok', pid(), pid()} | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + start_link() -> supervisor2:start_link(?MODULE, []). +start_channel(Pid, Args) -> + supervisor2:start_child(Pid, [Args]). + +%%---------------------------------------------------------------------------- + init([]) -> {ok, {{simple_one_for_one_terminate, 0, 1}, [{channel_sup, {rabbit_channel_sup, start_link, []}, temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. - -start_channel(Pid, Args) -> - supervisor2:start_child(Pid, Args). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 354540c1..2606210b 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -39,6 +39,17 @@ -include("rabbit.hrl"). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok(pid())). +-spec(reader/1 :: (pid()) -> pid()). + +-endif. + +%%-------------------------------------------------------------------------- + start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelSupSupPid} = @@ -59,12 +70,14 @@ start_link() -> intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid}. -init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. - reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). +%%-------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + start_heartbeat_fun(SupPid) -> fun (_Sock, 0) -> none; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 5b7dd707..da7078f1 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -79,8 +79,7 @@ start_link(ChPid, UnackedMsgCount) -> limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, {limit, PrefetchCount})). + gen_server2:call(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -118,8 +117,7 @@ block(LimiterPid) -> unblock(undefined) -> ok; unblock(LimiterPid) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, unblock, infinity)). + gen_server2:call(LimiterPid, unblock, infinity). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -237,9 +235,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. - -unlink_on_stopped(LimiterPid, stopped) -> - ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), - stopped; -unlink_on_stopped(_LimiterPid, Result) -> - Result. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1d2dd166..685dd83e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -895,8 +895,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, {ok, ChSupPid, ChFrPid} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, [Protocol, Sock, Channel, FrameMax, - self(), Username, VHost, Collector]), + ChanSupSup, {Protocol, Sock, Channel, FrameMax, + self(), Username, VHost, Collector}), link(ChSupPid), put({channel, Channel}, {ch_fr_pid, ChFrPid}), put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), -- cgit v1.2.1 From 20d06d5ce38c259261bcaf0a35acf1b68300b0a7 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 13 Aug 2010 16:04:11 +0100 Subject: Reduce calls to connection_sup:reader --- src/rabbit_connection_sup.erl | 6 +++--- src/rabbit_networking.erl | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 2606210b..69e21d73 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -43,7 +43,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok(pid())). +-spec(start_link/0 :: () -> {'ok', pid(), pid()}). -spec(reader/1 :: (pid()) -> pid()). -endif. @@ -62,13 +62,13 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), - {ok, _ReaderPid} = + {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), - {ok, SupPid}. + {ok, SupPid, ReaderPid}. reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 492d7d01..f656e04c 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -204,8 +204,7 @@ on_node_down(Node) -> ok = mnesia:dirty_delete(rabbit_listener, Node). start_client(Sock, SockTransform) -> - {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - Reader = rabbit_connection_sup:reader(Child), + {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Reader), Reader ! {go, Sock, SockTransform}, Reader. -- cgit v1.2.1 From 87ab27b0db7f2250b4d99b48c48aa0bd5cadf34d Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 13 Aug 2010 16:16:49 +0100 Subject: Move the linking into the start_channel function --- src/rabbit_channel_sup_sup.erl | 6 ++++-- src/rabbit_reader.erl | 1 - 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 98ad5fc0..d1938805 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,7 +43,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), pid()} | {'error', any()}). + {'ok', pid(), pid()}). -endif. @@ -53,7 +53,9 @@ start_link() -> supervisor2:start_link(?MODULE, []). start_channel(Pid, Args) -> - supervisor2:start_child(Pid, [Args]). + {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]), + link(ChSupPid), + Result. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 685dd83e..6efbeab6 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -897,7 +897,6 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector}), - link(ChSupPid), put({channel, Channel}, {ch_fr_pid, ChFrPid}), put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), put({ch_fr_pid, ChFrPid}, {channel, Channel}), -- cgit v1.2.1 From ac55ed28110a5c51327bc62eae155071bb2fb466 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 13 Aug 2010 16:32:47 +0100 Subject: Remove dead code in tests --- src/rabbit_tests.erl | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f861cedd..a71e49f8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1126,16 +1126,6 @@ test_spawn(Receiver) -> end, {Writer, Ch}. -expect_normal_channel_termination(Ch) -> - receive {'EXIT', Ch, normal} -> ok - after 1000 -> throw({channel_failed_to_shutdown, Ch}) - end. - -gobble_channel_exit() -> - receive {channel_exit, _, _} -> ok - after 1000 -> throw(channel_exit_not_received) - end. - test_statistics_receiver(Pid) -> receive shutdown -> -- cgit v1.2.1