diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-11 11:53:14 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-11 11:53:14 +0000 |
commit | 999490abb3b826db109f4ee78780dc86eb9e4e0d (patch) | |
tree | 374eefc1dbb073e49b08be4d2f0b36775398461f | |
parent | 1ebacc3dcf63320c4ebfc55d956134b6ab67b9aa (diff) | |
parent | 63fab0f6cb17797c5effe58f5abcca2f87ec4a6c (diff) | |
download | rabbitmq-server-bug23484.tar.gz |
Merge with defaultbug23484
-rw-r--r-- | src/rabbit_connection_sup.erl | 22 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 54 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 25 |
3 files changed, 62 insertions, 39 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b3821d3b..22742fa9 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -66,7 +66,8 @@ start_link() -> supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + [ChannelSupSupPid, Collector, + rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. @@ -78,22 +79,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. -start_heartbeat_fun(SupPid) -> - fun (_Sock, 0) -> - none; - (Sock, TimeoutSec) -> - Parent = self(), - {ok, Sender} = - supervisor2:start_child( - SupPid, {heartbeat_sender, - {rabbit_heartbeat, start_heartbeat_sender, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, Receiver} = - supervisor2:start_child( - SupPid, {heartbeat_receiver, - {rabbit_heartbeat, start_heartbeat_receiver, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {Sender, Receiver} - end. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index a9945af1..589bf7cc 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,7 +32,7 @@ -module(rabbit_heartbeat). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - pause_monitor/1, resume_monitor/1]). + start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -41,16 +41,28 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). +-export_type([start_heartbeat_fun/0]). --type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). + +-type(heartbeat_callback() :: fun (() -> any())). + +-type(start_heartbeat_fun() :: + fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> + no_return())). -spec(start_heartbeat_sender/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_fun/1 :: + (pid()) -> start_heartbeat_fun()). + + -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -58,40 +70,60 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch rabbit_net:send( - Sock, rabbit_binary_generator:build_heartbeat_frame()), + SendFun(), continue end}). -start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - Parent ! timeout, + ReceiveFun(), stop end}). -pause_monitor(none) -> +start_heartbeat_fun(SupPid) -> + fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + {ok, Sender} = + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), + {ok, Receiver} = + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), + {Sender, Receiver} + end. + +pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. -resume_monitor(none) -> +resume_monitor({_Sender, none}) -> ok; resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. %%---------------------------------------------------------------------------- +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> + {ok, none}; +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> + supervisor2:start_child( + SupPid, {Name, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7f7bd9d8..23eb3058 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -162,11 +162,7 @@ -ifdef(use_specs). --type(start_heartbeat_fun() :: - fun ((rabbit_net:socket(), non_neg_integer()) -> - rabbit_heartbeat:heartbeaters())). - --spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> +-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -177,9 +173,10 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) + -> no_return()). -spec(start_connection/7 :: - (pid(), pid(), pid(), start_heartbeat_fun(), any(), + (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), rabbit_net:socket(), fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( @@ -771,7 +768,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = SHF(Sock, ClientHeartbeat), + SendFun = + fun() -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + + Parent = self(), + ReceiveFun = + fun() -> + Parent ! timeout + end, + Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, + ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, |