summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-02 11:50:08 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-02 11:50:08 +0100
commit46d48171d63b40e1cdc4650a278a9af48717e7ed (patch)
tree7f802177ce980914851af066cf83b3e653d549fe
parentbb7bc187767f89b48bb0ab464b2244eeefe7c658 (diff)
downloadrabbitmq-server-46d48171d63b40e1cdc4650a278a9af48717e7ed.tar.gz
pause/resume heartbeat monitor as required
-rw-r--r--src/rabbit_heartbeat.erl34
-rw-r--r--src/rabbit_reader.erl20
2 files changed, 43 insertions, 11 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 1989fb7b..faddffc1 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,14 +31,17 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2]).
+-export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) ->
- rabbit_types:maybe({pid(), pid()})).
+-type(pids() :: rabbit_types:maybe({pid(), pid()})).
+
+-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()).
+-spec(pause_monitor/1 :: (pids()) -> 'ok').
+-spec(resume_monitor/1 :: (pids()) -> 'ok').
-endif.
@@ -70,20 +73,43 @@ start_heartbeat(Sock, TimeoutSec) ->
end}, Parent) end),
{Sender, Receiver}.
+pause_monitor(none) ->
+ ok;
+pause_monitor({_Sender, Receiver}) ->
+ Receiver ! pause,
+ ok.
+
+resume_monitor(none) ->
+ ok;
+resume_monitor({_Sender, Receiver}) ->
+ Receiver ! resume,
+ ok.
+
+%%----------------------------------------------------------------------------
+
heartbeater(Params, Parent) ->
heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
MonitorRef, {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
receive
{'DOWN', MonitorRef, process, _Object, _Info} ->
ok;
+ pause ->
+ receive
+ {'DOWN', MonitorRef, process, _Object, _Info} ->
+ ok;
+ resume ->
+ Recurse({0, 0});
+ Other ->
+ exit({unexpected_message, Other})
+ end;
Other ->
exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
if NewStatVal =/= StatVal ->
Recurse({NewStatVal, 0});
SameCount < Threshold ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 46171aec..2c1a24c4 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -58,7 +58,7 @@
%---------------------------------------------------------------------------
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector, conserving_memory}).
+ queue_collector, heartbeater, conserving_memory}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -259,7 +259,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init,
- queue_collector = Collector},
+ queue_collector = Collector,
+ heartbeater = none,
+ conserving_memory = false},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -355,8 +357,9 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
end.
-switch_callback(State = #v1{conserving_memory = true}, Callback, Length) ->
- %% TODO: pause heartbeat monitor
+switch_callback(State = #v1{conserving_memory = true,
+ heartbeater = Heartbeater}, Callback, Length) ->
+ ok = rabbit_heartbeat:pause_monitor(Heartbeater),
%% TODO: only do this after receiving a content-bearing method
State#v1{callback = {Callback, Length}, recv_ref = none};
switch_callback(State, Callback, Length) ->
@@ -372,9 +375,10 @@ terminate(_Explanation, State) ->
{force, State}.
internal_conserve_memory(false, State = #v1{conserving_memory = true,
+ heartbeater = Heartbeater,
callback = {Callback, Length},
recv_ref = none}) ->
- %% TODO: resume heartbeat monitor
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
switch_callback(State#v1{conserving_memory = false}, Callback, Length);
internal_conserve_memory(Conserve, State) ->
State#v1{conserving_memory = Conserve}.
@@ -671,11 +675,13 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ Heartbeater = rabbit_heartbeat:start_heartbeat(
+ Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}}
+ frame_max = FrameMax},
+ heartbeater = Heartbeater}
end;
handle_method0(#'connection.open'{virtual_host = VHostPath},