diff options
Diffstat (limited to 'src/rabbit_heartbeat.erl')
-rw-r--r-- | src/rabbit_heartbeat.erl | 70 |
1 files changed, 39 insertions, 31 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 80b4e768..fac74edb 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -10,8 +10,8 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. %% -module(rabbit_heartbeat). @@ -19,6 +19,8 @@ -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). +-export([system_continue/3, system_terminate/4, system_code_change/4]). + -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -51,6 +53,10 @@ -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). +-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). +-spec(system_continue/3 :: (_,_,{_, _}) -> any()). +-spec(system_terminate/4 :: (_,_,_,_) -> none()). + -endif. %%---------------------------------------------------------------------------- @@ -59,21 +65,15 @@ start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - heartbeater( - {Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> - SendFun(), - continue - end}). + heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, + fun () -> SendFun(), continue end}). start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - ReceiveFun(), - stop - end}). + heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, + fun () -> ReceiveFun(), stop end}). start_heartbeat_fun(SupPid) -> fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> @@ -88,17 +88,20 @@ start_heartbeat_fun(SupPid) -> {Sender, Receiver} end. -pause_monitor({_Sender, none}) -> - ok; -pause_monitor({_Sender, Receiver}) -> - Receiver ! pause, - ok. +pause_monitor({_Sender, none}) -> ok; +pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. + +resume_monitor({_Sender, none}) -> ok; +resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. + +system_continue(_Parent, Deb, {Params, State}) -> + heartbeater(Params, Deb, State). + +system_terminate(Reason, _Parent, _Deb, _State) -> + exit(Reason). -resume_monitor({_Sender, none}) -> - ok; -resume_monitor({_Sender, Receiver}) -> - Receiver ! resume, - ok. +system_code_change(Misc, _Module, _OldVsn, _Extra) -> + {ok, Misc}. %%---------------------------------------------------------------------------- start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> @@ -106,24 +109,29 @@ start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> supervisor2:start_child( SupPid, {Name, - {rabbit_heartbeat, Callback, - [Sock, TimeoutSec, TimeoutFun]}, + {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> - {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, V) end, + Deb, {StatVal, SameCount} = State) -> + Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end, + System = fun (From, Req) -> + sys:handle_system_msg( + Req, From, self(), ?MODULE, Deb, {Params, State}) + end, receive pause -> receive - resume -> - Recurse({0, 0}); - Other -> - exit({unexpected_message, Other}) + resume -> Recurse({0, 0}); + {system, From, Req} -> System(From, Req); + Other -> exit({unexpected_message, Other}) end; + {system, From, Req} -> + System(From, Req); Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> |