summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-10 15:05:21 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-10 15:05:21 +0100
commit6fe45dcc722c13b093c90c934a4743adbfa17d9e (patch)
tree08eb81340bcde07b7123fba946c804a78c112ee6
parent601f79c7e42ee9755999a4bc4f54b6ec59dc4598 (diff)
downloadrabbitmq-server-6fe45dcc722c13b093c90c934a4743adbfa17d9e.tar.gz
Abstract the heartbeaters in the same way - the reader now has no references to supervisor(2)?:.* nor does heartbeater.
-rw-r--r--src/rabbit_connection_sup.erl23
-rw-r--r--src/rabbit_heartbeat.erl31
-rw-r--r--src/rabbit_reader.erl46
3 files changed, 56 insertions, 44 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index aee8d987..354540c1 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -54,7 +54,8 @@ start_link() ->
{ok, _ReaderPid} =
supervisor2:start_child(
SupPid,
- {reader, {rabbit_reader, start_link, [ChannelSupSupPid, Collector]},
+ {reader, {rabbit_reader, start_link,
+ [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid}.
@@ -63,3 +64,23 @@ init([]) ->
reader(Pid) ->
hd(supervisor2:find_child(Pid, reader)).
+
+start_heartbeat_fun(SupPid) ->
+ fun (_Sock, 0) ->
+ none;
+ (Sock, TimeoutSec) ->
+ Parent = self(),
+ {ok, Sender} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_sender,
+ {rabbit_heartbeat, start_heartbeat_sender,
+ [Parent, Sock, TimeoutSec]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {ok, Receiver} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_receiver,
+ {rabbit_heartbeat, start_heartbeat_receiver,
+ [Parent, Sock, TimeoutSec]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {Sender, Receiver}
+ end.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 264dbb68..61ef5efb 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,9 +31,8 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/3, pause_monitor/1, resume_monitor/1,
- start_heartbeat_sender/3,
- start_heartbeat_receiver/3]).
+-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+ pause_monitor/1, resume_monitor/1]).
-include("rabbit.hrl").
@@ -41,10 +40,10 @@
-ifdef(use_specs).
--type(pids() :: rabbit_types:maybe({pid(), pid()})).
+-export_type([heartbeaters/0]).
+
+-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
--spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) ->
- pids()).
-spec(start_heartbeat_sender/3 ::
(pid(), rabbit_net:socket(), non_neg_integer()) ->
rabbit_types:ok(pid())).
@@ -52,29 +51,13 @@
(pid(), rabbit_net:socket(), non_neg_integer()) ->
rabbit_types:ok(pid())).
--spec(pause_monitor/1 :: (pids()) -> 'ok').
--spec(resume_monitor/1 :: (pids()) -> 'ok').
+-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
+-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_heartbeat(_Sup, _Sock, 0) ->
- none;
-start_heartbeat(Sup, Sock, TimeoutSec) ->
- Parent = self(),
- {ok, Sender} =
- supervisor2:start_child(
- Sup, {heartbeat_sender,
- {?MODULE, start_heartbeat_sender, [Parent, Sock, TimeoutSec]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {ok, Receiver} =
- supervisor2:start_child(
- Sup, {heartbeat_receiver,
- {?MODULE, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {Sender, Receiver}.
-
start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e0f4d6ec..69f6773f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,11 +33,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/2, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/3, mainloop/2]).
+-export([init/4, mainloop/2]).
-export([conserve_memory/2, server_properties/0]).
@@ -61,7 +61,7 @@
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid}).
+ channel_sup_sup_pid, start_heartbeat_fun}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -161,7 +161,12 @@
-ifdef(use_specs).
--spec(start_link/2 :: (pid(), pid()) -> rabbit_types:ok(pid())).
+-type(start_heartbeat_fun() ::
+ fun ((rabbit_networking:socket(), non_neg_integer()) ->
+ rabbit_heartbeat:heartbeaters())).
+
+-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) ->
+ rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
@@ -171,9 +176,10 @@
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/3 :: (pid(), pid(), pid()) -> no_return()).
--spec(start_connection/6 ::
- (pid(), pid(), pid(), any(), rabbit_networking:socket(),
+-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()).
+-spec(start_connection/7 ::
+ (pid(), pid(), pid(), start_heartbeat_fun(), any(),
+ rabbit_networking:socket(),
fun ((rabbit_networking:socket()) ->
rabbit_types:ok_or_error2(
rabbit_networking:socket(), any()))) -> no_return()).
@@ -182,18 +188,20 @@
%%--------------------------------------------------------------------------
-start_link(ChannelSupSupPid, Collector) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, Collector])}.
+start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid,
+ Collector, StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChannelSupSupPid, Collector) ->
+init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChannelSupSupPid, Collector, Deb, Sock, SockTransform)
+ Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -272,8 +280,8 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock,
- SockTransform) ->
+start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+ Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
PeerAddressS = inet_parse:ntoa(PeerAddress),
@@ -302,7 +310,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, Deb, Sock,
heartbeater = none,
stats_timer =
rabbit_event:init_stats_timer(),
- channel_sup_sup_pid = ChannelSupSupPid
+ channel_sup_sup_pid = ChannelSupSupPid,
+ start_heartbeat_fun = StartHeartbeatFun
},
handshake, 8))
catch
@@ -754,10 +763,10 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
client_properties = ClientProperties}};
handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
- State = #v1{parent = Parent,
- connection_state = tuning,
+ State = #v1{connection_state = tuning,
connection = Connection,
- sock = Sock}) ->
+ sock = Sock,
+ start_heartbeat_fun = SHF}) ->
if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w < ~w min size",
@@ -767,8 +776,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater =
- rabbit_heartbeat:start_heartbeat(Parent, Sock, ClientHeartbeat),
+ Heartbeater = SHF(Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,