From d22384f137a5a43dfe0838528c9041632de0da73 Mon Sep 17 00:00:00 2001 From: Marek Majkowski Date: Fri, 17 Feb 2012 14:09:24 +0000 Subject: Let's try to abstract rabbit_net call, the minimalistic way. --- src/rabbit_heartbeat.erl | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 80b4e768..97c45008 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -55,34 +55,42 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> +start_heartbeat_sender(GetStat, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( - {Sock, TimeoutSec * 1000 div 2, send_oct, 0, + {GetStat, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> SendFun(), continue end}). -start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> +start_heartbeat_receiver(GetStat, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> + heartbeater({GetStat, TimeoutSec * 1000, recv_oct, 1, fun () -> ReceiveFun(), stop end}). start_heartbeat_fun(SupPid) -> - fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + fun (SockOrGetStat, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + GetStat = case SockOrGetStat of + GetStatFun when is_function(GetStatFun) -> + GetStatFun; + Sock -> + fun (Stat) -> + rabbit_net:getstat(Sock, [Stat]) + end + end, {ok, Sender} = - start_heartbeater(SendTimeoutSec, SupPid, Sock, + start_heartbeater(SendTimeoutSec, SupPid, GetStat, SendFun, heartbeat_sender, start_heartbeat_sender), {ok, Receiver} = - start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + start_heartbeater(ReceiveTimeoutSec, SupPid, GetStat, ReceiveFun, heartbeat_receiver, start_heartbeat_receiver), {Sender, Receiver} @@ -101,19 +109,19 @@ resume_monitor({_Sender, Receiver}) -> ok. %%---------------------------------------------------------------------------- -start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> +start_heartbeater(0, _SupPid, _GetStat, _TimeoutFun, _Name, _Callback) -> {ok, none}; -start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> +start_heartbeater(TimeoutSec, SupPid, GetStat, TimeoutFun, Name, Callback) -> supervisor2:start_child( SupPid, {Name, {rabbit_heartbeat, Callback, - [Sock, TimeoutSec, TimeoutFun]}, + [GetStat, TimeoutSec, TimeoutFun]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. -heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, +heartbeater({GetStat, TimeoutMillisec, StatName, Threshold, Handler} = Params, {StatVal, SameCount}) -> Recurse = fun (V) -> heartbeater(Params, V) end, receive @@ -127,7 +135,7 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> - case rabbit_net:getstat(Sock, [StatName]) of + case GetStat(StatName) of {ok, [{StatName, NewStatVal}]} -> if NewStatVal =/= StatVal -> Recurse({NewStatVal, 0}); -- cgit v1.2.1