diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-10 15:39:12 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-10 15:39:12 +0000 |
commit | 1298a4035b2d1a4791530ade8a586e22c4f48b1e (patch) | |
tree | 344176129361cae482c659c61001cc7ea19e5a92 | |
parent | fcf0ec3a214a97da1d4fee3f02bd730ed73c6840 (diff) | |
download | rabbitmq-server-1298a4035b2d1a4791530ade8a586e22c4f48b1e.tar.gz |
Reworked start_heartbeat_fun to allow a different timeout for send/receive heartbeats
-rw-r--r-- | src/rabbit_heartbeat.erl | 44 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 13 |
2 files changed, 29 insertions, 28 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ebdfbdc6..5f1e211e 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -41,11 +41,15 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). +-export_type([start_heartbeat_fun/0]). --type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). -type(send_fun() :: fun ((rabbit_net:socket()) -> any())). -type(timeout_fun() :: fun (() -> any())). +-type(start_heartbeat_fun() :: + fun((rabbit_net:socket(), non_neg_integer(), non_neg_integer()) -> + no_return())). -spec(start_heartbeat_sender/3 :: (rabbit_net:socket(), non_neg_integer(), send_fun()) -> @@ -55,9 +59,7 @@ rabbit_types:ok(pid())). -spec(start_heartbeat_fun/3 :: - (pid(), send_fun(), timeout_fun()) -> - fun((rabbit_net:socket(), non_neg_integer()) - -> heartbeaters())). + (pid(), send_fun(), timeout_fun()) -> start_heartbeat_fun()). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -87,38 +89,40 @@ start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) -> stop end}). -start_heartbeat_fun(SupPid, SendFun, TimeoutFun) -> - fun (_Sock, 0) -> - none; - (Sock, TimeoutSec) -> +start_heartbeat_fun(SupPid, SendFun, ReceiveFun) -> + fun (Sock, SendTimeoutSec, ReceiveTimeoutSec) -> {ok, Sender} = - supervisor2:start_child( - SupPid, {heartbeat_sender, - {rabbit_heartbeat, start_heartbeat_sender, - [Sock, TimeoutSec, SendFun]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), {ok, Receiver} = - supervisor2:start_child( - SupPid, {heartbeat_receiver, - {rabbit_heartbeat, start_heartbeat_receiver, - [Sock, TimeoutSec, TimeoutFun]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), {Sender, Receiver} end. -pause_monitor(none) -> +pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. -resume_monitor(none) -> +resume_monitor({_Sender, none}) -> ok; resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. %%---------------------------------------------------------------------------- +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> + {ok, none}; +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> + supervisor2:start_child( + SupPid, {Name, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 127467bb..54e51600 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -162,11 +162,7 @@ -ifdef(use_specs). --type(start_heartbeat_fun() :: - fun ((rabbit_networking:socket(), non_neg_integer()) -> - rabbit_heartbeat:heartbeaters())). - --spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> +-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). @@ -177,9 +173,10 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) + -> no_return()). -spec(start_connection/7 :: - (pid(), pid(), pid(), start_heartbeat_fun(), any(), + (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), rabbit_networking:socket(), fun ((rabbit_networking:socket()) -> rabbit_types:ok_or_error2( @@ -771,7 +768,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = SHF(Sock, ClientHeartbeat), + Heartbeater = SHF(Sock, ClientHeartbeat, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, |