diff options
Diffstat (limited to 'src/rabbit_heartbeat.erl')
-rw-r--r-- | src/rabbit_heartbeat.erl | 59 |
1 files changed, 31 insertions, 28 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ab50c28c..a9945af1 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,16 +31,26 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]). +-export([start_heartbeat_sender/3, start_heartbeat_receiver/3, + pause_monitor/1, resume_monitor/1]). + +-include("rabbit.hrl"). %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([heartbeaters/0]). + -type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). --spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> - heartbeaters()). +-spec(start_heartbeat_sender/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). + -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -48,27 +58,26 @@ %%---------------------------------------------------------------------------- -start_heartbeat(_Sock, 0) -> - none; -start_heartbeat(Sock, TimeoutSec) -> - Parent = self(), +start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - Sender = heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end}, Parent), + heartbeater( + {Sock, TimeoutSec * 1000 div 2, send_oct, 0, + fun () -> + catch rabbit_net:send( + Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end}). + +start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - Receiver = heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end}, Parent), - {Sender, Receiver}. + heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> + Parent ! timeout, + stop + end}). pause_monitor(none) -> ok; @@ -84,21 +93,15 @@ resume_monitor({_Sender, Receiver}) -> %%---------------------------------------------------------------------------- -heartbeater(Params, Parent) -> - spawn_link(fun () -> heartbeater(Params, erlang:monitor(process, Parent), - {0, 0}) - end). +heartbeater(Params) -> + {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - MonitorRef, {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, + {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, V) end, receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; pause -> receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; resume -> Recurse({0, 0}); Other -> |