summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-10 19:00:27 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-10 19:00:27 +0000
commit63fab0f6cb17797c5effe58f5abcca2f87ec4a6c (patch)
tree78f17331238fce1879cb892c974d7cd3472b212e
parent1298a4035b2d1a4791530ade8a586e22c4f48b1e (diff)
downloadrabbitmq-server-63fab0f6cb17797c5effe58f5abcca2f87ec4a6c.tar.gz
Reworked heartbeating so that it really works again and so we can specify different timeouts for send/receive
-rw-r--r--src/rabbit_connection_sup.erl14
-rw-r--r--src/rabbit_heartbeat.erl27
-rw-r--r--src/rabbit_reader.erl14
3 files changed, 29 insertions, 26 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index bb5ed916..22742fa9 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -66,7 +66,8 @@ start_link() ->
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]},
+ [ChannelSupSupPid, Collector,
+ rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
@@ -78,14 +79,3 @@ reader(Pid) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
-start_heartbeat_fun(SupPid) ->
- SendFun = fun(Sock) ->
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- catch rabbit_net:send(Sock, Frame)
- end,
-
- Parent = self(),
- TimeoutFun = fun() ->
- Parent ! timeout
- end,
- rabbit_heartbeat:start_heartbeat_fun(SupPid, SendFun, TimeoutFun).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 5f1e211e..589bf7cc 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -32,7 +32,7 @@
-module(rabbit_heartbeat).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
- start_heartbeat_fun/3, pause_monitor/1, resume_monitor/1]).
+ start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
-include("rabbit.hrl").
@@ -45,21 +45,22 @@
-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}).
--type(send_fun() :: fun ((rabbit_net:socket()) -> any())).
--type(timeout_fun() :: fun (() -> any())).
+-type(heartbeat_callback() :: fun (() -> any())).
+
-type(start_heartbeat_fun() ::
- fun((rabbit_net:socket(), non_neg_integer(), non_neg_integer()) ->
+ fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) ->
no_return())).
-spec(start_heartbeat_sender/3 ::
- (rabbit_net:socket(), non_neg_integer(), send_fun()) ->
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
-spec(start_heartbeat_receiver/3 ::
- (rabbit_net:socket(), non_neg_integer(), timeout_fun()) ->
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
--spec(start_heartbeat_fun/3 ::
- (pid(), send_fun(), timeout_fun()) -> start_heartbeat_fun()).
+-spec(start_heartbeat_fun/1 ::
+ (pid()) -> start_heartbeat_fun()).
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -76,21 +77,21 @@ start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
heartbeater(
{Sock, TimeoutSec * 1000 div 2, send_oct, 0,
fun () ->
- SendFun(Sock),
+ SendFun(),
continue
end}).
-start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
- TimeoutFun(),
+ ReceiveFun(),
stop
end}).
-start_heartbeat_fun(SupPid, SendFun, ReceiveFun) ->
- fun (Sock, SendTimeoutSec, ReceiveTimeoutSec) ->
+start_heartbeat_fun(SupPid) ->
+ fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
{ok, Sender} =
start_heartbeater(SendTimeoutSec, SupPid, Sock,
SendFun, heartbeat_sender,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 54e51600..c40e02b8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -768,7 +768,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = SHF(Sock, ClientHeartbeat, ClientHeartbeat),
+ SendFun =
+ fun() ->
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ catch rabbit_net:send(Sock, Frame)
+ end,
+
+ Parent = self(),
+ ReceiveFun =
+ fun() ->
+ Parent ! timeout
+ end,
+ Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
+ ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,