diff options
author | Marek Majkowski <marek@rabbitmq.com> | 2012-02-17 14:09:24 +0000 |
---|---|---|
committer | Marek Majkowski <marek@rabbitmq.com> | 2012-02-17 14:09:24 +0000 |
commit | d22384f137a5a43dfe0838528c9041632de0da73 (patch) | |
tree | 620863bf557501a3a4df6e6f8a25e3c884609b84 | |
parent | ee9f6d3f80103ba9d6a9692aebfe3202ce845407 (diff) | |
download | rabbitmq-server-bug24749.tar.gz |
Let's try to abstract rabbit_net call, the minimalistic way.bug24749
-rw-r--r-- | src/rabbit_heartbeat.erl | 32 |
1 files changed, 20 insertions, 12 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 80b4e768..97c45008 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -55,34 +55,42 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> +start_heartbeat_sender(GetStat, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( - {Sock, TimeoutSec * 1000 div 2, send_oct, 0, + {GetStat, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> SendFun(), continue end}). -start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> +start_heartbeat_receiver(GetStat, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> + heartbeater({GetStat, TimeoutSec * 1000, recv_oct, 1, fun () -> ReceiveFun(), stop end}). start_heartbeat_fun(SupPid) -> - fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + fun (SockOrGetStat, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + GetStat = case SockOrGetStat of + GetStatFun when is_function(GetStatFun) -> + GetStatFun; + Sock -> + fun (Stat) -> + rabbit_net:getstat(Sock, [Stat]) + end + end, {ok, Sender} = - start_heartbeater(SendTimeoutSec, SupPid, Sock, + start_heartbeater(SendTimeoutSec, SupPid, GetStat, SendFun, heartbeat_sender, start_heartbeat_sender), {ok, Receiver} = - start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + start_heartbeater(ReceiveTimeoutSec, SupPid, GetStat, ReceiveFun, heartbeat_receiver, start_heartbeat_receiver), {Sender, Receiver} @@ -101,19 +109,19 @@ resume_monitor({_Sender, Receiver}) -> ok. %%---------------------------------------------------------------------------- -start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> +start_heartbeater(0, _SupPid, _GetStat, _TimeoutFun, _Name, _Callback) -> {ok, none}; -start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> +start_heartbeater(TimeoutSec, SupPid, GetStat, TimeoutFun, Name, Callback) -> supervisor2:start_child( SupPid, {Name, {rabbit_heartbeat, Callback, - [Sock, TimeoutSec, TimeoutFun]}, + [GetStat, TimeoutSec, TimeoutFun]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. -heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, +heartbeater({GetStat, TimeoutMillisec, StatName, Threshold, Handler} = Params, {StatVal, SameCount}) -> Recurse = fun (V) -> heartbeater(Params, V) end, receive @@ -127,7 +135,7 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> - case rabbit_net:getstat(Sock, [StatName]) of + case GetStat(StatName) of {ok, [{StatName, NewStatVal}]} -> if NewStatVal =/= StatVal -> Recurse({NewStatVal, 0}); |