summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarek Majkowski <marek@rabbitmq.com>2012-02-17 14:09:24 +0000
committerMarek Majkowski <marek@rabbitmq.com>2012-02-17 14:09:24 +0000
commitd22384f137a5a43dfe0838528c9041632de0da73 (patch)
tree620863bf557501a3a4df6e6f8a25e3c884609b84
parentee9f6d3f80103ba9d6a9692aebfe3202ce845407 (diff)
downloadrabbitmq-server-bug24749.tar.gz
Let's try to abstract rabbit_net call, the minimalistic way.bug24749
-rw-r--r--src/rabbit_heartbeat.erl32
1 files changed, 20 insertions, 12 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 80b4e768..97c45008 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -55,34 +55,42 @@
%%----------------------------------------------------------------------------
-start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
+start_heartbeat_sender(GetStat, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater(
- {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ {GetStat, TimeoutSec * 1000 div 2, send_oct, 0,
fun () ->
SendFun(),
continue
end}).
-start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
+start_heartbeat_receiver(GetStat, TimeoutSec, ReceiveFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
+ heartbeater({GetStat, TimeoutSec * 1000, recv_oct, 1, fun () ->
ReceiveFun(),
stop
end}).
start_heartbeat_fun(SupPid) ->
- fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ fun (SockOrGetStat, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ GetStat = case SockOrGetStat of
+ GetStatFun when is_function(GetStatFun) ->
+ GetStatFun;
+ Sock ->
+ fun (Stat) ->
+ rabbit_net:getstat(Sock, [Stat])
+ end
+ end,
{ok, Sender} =
- start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ start_heartbeater(SendTimeoutSec, SupPid, GetStat,
SendFun, heartbeat_sender,
start_heartbeat_sender),
{ok, Receiver} =
- start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ start_heartbeater(ReceiveTimeoutSec, SupPid, GetStat,
ReceiveFun, heartbeat_receiver,
start_heartbeat_receiver),
{Sender, Receiver}
@@ -101,19 +109,19 @@ resume_monitor({_Sender, Receiver}) ->
ok.
%%----------------------------------------------------------------------------
-start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+start_heartbeater(0, _SupPid, _GetStat, _TimeoutFun, _Name, _Callback) ->
{ok, none};
-start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+start_heartbeater(TimeoutSec, SupPid, GetStat, TimeoutFun, Name, Callback) ->
supervisor2:start_child(
SupPid, {Name,
{rabbit_heartbeat, Callback,
- [Sock, TimeoutSec, TimeoutFun]},
+ [GetStat, TimeoutSec, TimeoutFun]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
{ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
-heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
+heartbeater({GetStat, TimeoutMillisec, StatName, Threshold, Handler} = Params,
{StatVal, SameCount}) ->
Recurse = fun (V) -> heartbeater(Params, V) end,
receive
@@ -127,7 +135,7 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Other ->
exit({unexpected_message, Other})
after TimeoutMillisec ->
- case rabbit_net:getstat(Sock, [StatName]) of
+ case GetStat(StatName) of
{ok, [{StatName, NewStatVal}]} ->
if NewStatVal =/= StatVal ->
Recurse({NewStatVal, 0});