diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-10 15:05:21 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-10 15:05:21 +0100 |
commit | 6fe45dcc722c13b093c90c934a4743adbfa17d9e (patch) | |
tree | 08eb81340bcde07b7123fba946c804a78c112ee6 | |
parent | 601f79c7e42ee9755999a4bc4f54b6ec59dc4598 (diff) | |
download | rabbitmq-server-6fe45dcc722c13b093c90c934a4743adbfa17d9e.tar.gz |
Abstract the heartbeaters in the same way - the reader now has no references to supervisor(2)?:.* nor does heartbeater.
-rw-r--r-- | src/rabbit_connection_sup.erl | 23 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 31 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 46 |
3 files changed, 56 insertions, 44 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index aee8d987..354540c1 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -54,7 +54,8 @@ start_link() -> {ok, _ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, [ChannelSupSupPid, Collector]}, + {reader, {rabbit_reader, start_link, + [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid}. @@ -63,3 +64,23 @@ init([]) -> reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). + +start_heartbeat_fun(SupPid) -> + fun (_Sock, 0) -> + none; + (Sock, TimeoutSec) -> + Parent = self(), + {ok, Sender} = + supervisor2:start_child( + SupPid, {heartbeat_sender, + {rabbit_heartbeat, start_heartbeat_sender, + [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, Receiver} = + supervisor2:start_child( + SupPid, {heartbeat_receiver, + {rabbit_heartbeat, start_heartbeat_receiver, + [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {Sender, Receiver} + end. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 264dbb68..61ef5efb 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,9 +31,8 @@ -module(rabbit_heartbeat). --export([start_heartbeat/3, pause_monitor/1, resume_monitor/1, - start_heartbeat_sender/3, - start_heartbeat_receiver/3]). +-export([start_heartbeat_sender/3, start_heartbeat_receiver/3, + pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -41,10 +40,10 @@ -ifdef(use_specs). --type(pids() :: rabbit_types:maybe({pid(), pid()})). +-export_type([heartbeaters/0]). + +-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). --spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) -> - pids()). -spec(start_heartbeat_sender/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) -> rabbit_types:ok(pid())). @@ -52,29 +51,13 @@ (pid(), rabbit_net:socket(), non_neg_integer()) -> rabbit_types:ok(pid())). --spec(pause_monitor/1 :: (pids()) -> 'ok'). --spec(resume_monitor/1 :: (pids()) -> 'ok'). +-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). +-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_heartbeat(_Sup, _Sock, 0) -> - none; -start_heartbeat(Sup, Sock, TimeoutSec) -> - Parent = self(), - {ok, Sender} = - supervisor2:start_child( - Sup, {heartbeat_sender, - {?MODULE, start_heartbeat_sender, [Parent, Sock, TimeoutSec]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, Receiver} = - supervisor2:start_child( - Sup, {heartbeat_receiver, - {?MODULE, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {Sender, Receiver}. - start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e0f4d6ec..69f6773f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/2, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/3, mainloop/2]). +-export([init/4, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -61,7 +61,7 @@ -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid}). + channel_sup_sup_pid, start_heartbeat_fun}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -161,7 +161,12 @@ -ifdef(use_specs). --spec(start_link/2 :: (pid(), pid()) -> rabbit_types:ok(pid())). +-type(start_heartbeat_fun() :: + fun ((rabbit_networking:socket(), non_neg_integer()) -> + rabbit_heartbeat:heartbeaters())). + +-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> + rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -171,9 +176,10 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/3 :: (pid(), pid(), pid()) -> no_return()). --spec(start_connection/6 :: - (pid(), pid(), pid(), any(), rabbit_networking:socket(), +-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(start_connection/7 :: + (pid(), pid(), pid(), start_heartbeat_fun(), any(), + rabbit_networking:socket(), fun ((rabbit_networking:socket()) -> rabbit_types:ok_or_error2( rabbit_networking:socket(), any()))) -> no_return()). @@ -182,18 +188,20 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSupSupPid, Collector) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, Collector])}. +start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, + Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChannelSupSupPid, Collector) -> +init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ChannelSupSupPid, Collector, Deb, Sock, SockTransform) + Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + SockTransform) end. system_continue(Parent, Deb, State) -> @@ -272,8 +280,8 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock, - SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, + Sock, SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -302,7 +310,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock, heartbeater = none, stats_timer = rabbit_event:init_stats_timer(), - channel_sup_sup_pid = ChannelSupSupPid + channel_sup_sup_pid = ChannelSupSupPid, + start_heartbeat_fun = StartHeartbeatFun }, handshake, 8)) catch @@ -754,10 +763,10 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, client_properties = ClientProperties}}; handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, - State = #v1{parent = Parent, - connection_state = tuning, + State = #v1{connection_state = tuning, connection = Connection, - sock = Sock}) -> + sock = Sock, + start_heartbeat_fun = SHF}) -> if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w < ~w min size", @@ -767,8 +776,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = - rabbit_heartbeat:start_heartbeat(Parent, Sock, ClientHeartbeat), + Heartbeater = SHF(Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, |