diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-30 10:04:56 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-30 10:04:56 +0100 |
commit | 16999af49edf2d184b3c4f1ef5f67e07fbeae98e (patch) | |
tree | f1fbc79a98f45a6957b34dfcfbf1de39408290fe | |
parent | be4a2b09d6b9867dda53461fe65f88eb50f33f26 (diff) | |
download | rabbitmq-server-bug23034.tar.gz |
tweaks to heartbeaterbug23034
- make it return the pids of the created processes. That gives us a useful
future extension point, e.g. for pausing/resuming
- add specs
- get rid of the Y combinator - it actually makes the code longer
-rw-r--r-- | src/rabbit_heartbeat.erl | 116 |
1 files changed, 60 insertions, 56 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 45565705..1989fb7b 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -33,68 +33,72 @@ -export([start_heartbeat/2]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:maybe({pid(), pid()})). + +-endif. + +%%---------------------------------------------------------------------------- + start_heartbeat(_Sock, 0) -> none; start_heartbeat(Sock, TimeoutSec) -> Parent = self(), - %% we check for incoming data every interval, and time out after - %% two checks with no change. As a result we will time out between - %% 2 and 3 intervals after the last data has been received. - spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000, - recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end, - erlang:monitor(process, Parent)) end), %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, - send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end, - erlang:monitor(process, Parent)) end), - ok. + Sender = + spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, + send_oct, 0, + fun () -> + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end}, Parent) end), + %% we check for incoming data every interval, and time out after + %% two checks with no change. As a result we will time out between + %% 2 and 3 intervals after the last data has been received. + Receiver = + spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000, + recv_oct, 1, + fun () -> + Parent ! timeout, + stop + end}, Parent) end), + {Sender, Receiver}. -%% Y-combinator, posted by Vladimir Sekissov to the Erlang mailing list -%% http://www.erlang.org/ml-archive/erlang-questions/200301/msg00053.html -y(X) -> - F = fun (P) -> X(fun (A) -> (P(P))(A) end) end, - F(F). +heartbeater(Params, Parent) -> + heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). -heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) -> - Heartbeat = - fun (F) -> - fun ({StatVal, SameCount}) -> - receive - {'DOWN', MonitorRef, process, _Object, _Info} -> ok; - Other -> exit({unexpected_message, Other}) - after TimeoutMillisec -> - case rabbit_net:getstat(Sock, [StatName]) of - {ok, [{StatName, NewStatVal}]} -> - if NewStatVal =/= StatVal -> - F({NewStatVal, 0}); - SameCount < Threshold -> - F({NewStatVal, SameCount + 1}); - true -> - case Handler() of - stop -> ok; - continue -> F({NewStatVal, 0}) - end - end; - {error, einval} -> - %% the socket is dead, most - %% likely because the - %% connection is being shut - %% down -> terminate - ok; - {error, Reason} -> - exit({cannot_get_socket_stats, Reason}) - end - end - end - end, - (y(Heartbeat))({0, 0}). +heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, + MonitorRef, {StatVal, SameCount}) -> + receive + {'DOWN', MonitorRef, process, _Object, _Info} -> + ok; + Other -> + exit({unexpected_message, Other}) + after TimeoutMillisec -> + case rabbit_net:getstat(Sock, [StatName]) of + {ok, [{StatName, NewStatVal}]} -> + Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, + if NewStatVal =/= StatVal -> + Recurse({NewStatVal, 0}); + SameCount < Threshold -> + Recurse({NewStatVal, SameCount + 1}); + true -> + case Handler() of + stop -> ok; + continue -> Recurse({NewStatVal, 0}) + end + end; + {error, einval} -> + %% the socket is dead, most likely because the + %% connection is being shut down -> terminate + ok; + {error, Reason} -> + exit({cannot_get_socket_stats, Reason}) + end + end. |