summaryrefslogtreecommitdiff
path: root/src/rabbit_heartbeat.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_heartbeat.erl')
-rw-r--r--src/rabbit_heartbeat.erl59
1 files changed, 31 insertions, 28 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ab50c28c..a9945af1 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,16 +31,26 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
+-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+ pause_monitor/1, resume_monitor/1]).
+
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([heartbeaters/0]).
+
-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) ->
- heartbeaters()).
+-spec(start_heartbeat_sender/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -48,27 +58,26 @@
%%----------------------------------------------------------------------------
-start_heartbeat(_Sock, 0) ->
- none;
-start_heartbeat(Sock, TimeoutSec) ->
- Parent = self(),
+start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- Sender = heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end}, Parent),
+ heartbeater(
+ {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(
+ Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}).
+
+start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- Receiver = heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end}, Parent),
- {Sender, Receiver}.
+ heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
+ Parent ! timeout,
+ stop
+ end}).
pause_monitor(none) ->
ok;
@@ -84,21 +93,15 @@ resume_monitor({_Sender, Receiver}) ->
%%----------------------------------------------------------------------------
-heartbeater(Params, Parent) ->
- spawn_link(fun () -> heartbeater(Params, erlang:monitor(process, Parent),
- {0, 0})
- end).
+heartbeater(Params) ->
+ {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
- MonitorRef, {StatVal, SameCount}) ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
+ {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, V) end,
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
pause ->
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
resume ->
Recurse({0, 0});
Other ->