summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-30 10:04:56 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-30 10:04:56 +0100
commit16999af49edf2d184b3c4f1ef5f67e07fbeae98e (patch)
treef1fbc79a98f45a6957b34dfcfbf1de39408290fe
parentbe4a2b09d6b9867dda53461fe65f88eb50f33f26 (diff)
downloadrabbitmq-server-bug23034.tar.gz
tweaks to heartbeaterbug23034
- make it return the pids of the created processes. That gives us a useful future extension point, e.g. for pausing/resuming - add specs - get rid of the Y combinator - it actually makes the code longer
-rw-r--r--src/rabbit_heartbeat.erl116
1 files changed, 60 insertions, 56 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 45565705..1989fb7b 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -33,68 +33,72 @@
-export([start_heartbeat/2]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:maybe({pid(), pid()})).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_heartbeat(_Sock, 0) ->
none;
start_heartbeat(Sock, TimeoutSec) ->
Parent = self(),
- %% we check for incoming data every interval, and time out after
- %% two checks with no change. As a result we will time out between
- %% 2 and 3 intervals after the last data has been received.
- spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000,
- recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end,
- erlang:monitor(process, Parent)) end),
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2,
- send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end,
- erlang:monitor(process, Parent)) end),
- ok.
+ Sender =
+ spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
+ send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}, Parent) end),
+ %% we check for incoming data every interval, and time out after
+ %% two checks with no change. As a result we will time out between
+ %% 2 and 3 intervals after the last data has been received.
+ Receiver =
+ spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000,
+ recv_oct, 1,
+ fun () ->
+ Parent ! timeout,
+ stop
+ end}, Parent) end),
+ {Sender, Receiver}.
-%% Y-combinator, posted by Vladimir Sekissov to the Erlang mailing list
-%% http://www.erlang.org/ml-archive/erlang-questions/200301/msg00053.html
-y(X) ->
- F = fun (P) -> X(fun (A) -> (P(P))(A) end) end,
- F(F).
+heartbeater(Params, Parent) ->
+ heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
-heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) ->
- Heartbeat =
- fun (F) ->
- fun ({StatVal, SameCount}) ->
- receive
- {'DOWN', MonitorRef, process, _Object, _Info} -> ok;
- Other -> exit({unexpected_message, Other})
- after TimeoutMillisec ->
- case rabbit_net:getstat(Sock, [StatName]) of
- {ok, [{StatName, NewStatVal}]} ->
- if NewStatVal =/= StatVal ->
- F({NewStatVal, 0});
- SameCount < Threshold ->
- F({NewStatVal, SameCount + 1});
- true ->
- case Handler() of
- stop -> ok;
- continue -> F({NewStatVal, 0})
- end
- end;
- {error, einval} ->
- %% the socket is dead, most
- %% likely because the
- %% connection is being shut
- %% down -> terminate
- ok;
- {error, Reason} ->
- exit({cannot_get_socket_stats, Reason})
- end
- end
- end
- end,
- (y(Heartbeat))({0, 0}).
+heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
+ MonitorRef, {StatVal, SameCount}) ->
+ receive
+ {'DOWN', MonitorRef, process, _Object, _Info} ->
+ ok;
+ Other ->
+ exit({unexpected_message, Other})
+ after TimeoutMillisec ->
+ case rabbit_net:getstat(Sock, [StatName]) of
+ {ok, [{StatName, NewStatVal}]} ->
+ Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
+ if NewStatVal =/= StatVal ->
+ Recurse({NewStatVal, 0});
+ SameCount < Threshold ->
+ Recurse({NewStatVal, SameCount + 1});
+ true ->
+ case Handler() of
+ stop -> ok;
+ continue -> Recurse({NewStatVal, 0})
+ end
+ end;
+ {error, einval} ->
+ %% the socket is dead, most likely because the
+ %% connection is being shut down -> terminate
+ ok;
+ {error, Reason} ->
+ exit({cannot_get_socket_stats, Reason})
+ end
+ end.