diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-10 19:00:27 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-10 19:00:27 +0000 |
commit | 63fab0f6cb17797c5effe58f5abcca2f87ec4a6c (patch) | |
tree | 78f17331238fce1879cb892c974d7cd3472b212e | |
parent | 1298a4035b2d1a4791530ade8a586e22c4f48b1e (diff) | |
download | rabbitmq-server-63fab0f6cb17797c5effe58f5abcca2f87ec4a6c.tar.gz |
Reworked heartbeating so that it really works again and so we can specify different timeouts for send/receive
-rw-r--r-- | src/rabbit_connection_sup.erl | 14 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 27 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 14 |
3 files changed, 29 insertions, 26 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index bb5ed916..22742fa9 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -66,7 +66,8 @@ start_link() -> supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + [ChannelSupSupPid, Collector, + rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. @@ -78,14 +79,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. -start_heartbeat_fun(SupPid) -> - SendFun = fun(Sock) -> - Frame = rabbit_binary_generator:build_heartbeat_frame(), - catch rabbit_net:send(Sock, Frame) - end, - - Parent = self(), - TimeoutFun = fun() -> - Parent ! timeout - end, - rabbit_heartbeat:start_heartbeat_fun(SupPid, SendFun, TimeoutFun). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 5f1e211e..589bf7cc 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,7 +32,7 @@ -module(rabbit_heartbeat). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - start_heartbeat_fun/3, pause_monitor/1, resume_monitor/1]). + start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -45,21 +45,22 @@ -type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). --type(send_fun() :: fun ((rabbit_net:socket()) -> any())). --type(timeout_fun() :: fun (() -> any())). +-type(heartbeat_callback() :: fun (() -> any())). + -type(start_heartbeat_fun() :: - fun((rabbit_net:socket(), non_neg_integer(), non_neg_integer()) -> + fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> no_return())). -spec(start_heartbeat_sender/3 :: - (rabbit_net:socket(), non_neg_integer(), send_fun()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (rabbit_net:socket(), non_neg_integer(), timeout_fun()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). --spec(start_heartbeat_fun/3 :: - (pid(), send_fun(), timeout_fun()) -> start_heartbeat_fun()). +-spec(start_heartbeat_fun/1 :: + (pid()) -> start_heartbeat_fun()). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -76,21 +77,21 @@ start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - SendFun(Sock), + SendFun(), continue end}). -start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - TimeoutFun(), + ReceiveFun(), stop end}). -start_heartbeat_fun(SupPid, SendFun, ReceiveFun) -> - fun (Sock, SendTimeoutSec, ReceiveTimeoutSec) -> +start_heartbeat_fun(SupPid) -> + fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 54e51600..c40e02b8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -768,7 +768,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = SHF(Sock, ClientHeartbeat, ClientHeartbeat), + SendFun = + fun() -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + + Parent = self(), + ReceiveFun = + fun() -> + Parent ! timeout + end, + Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, + ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, |