diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-02 11:50:08 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-02 11:50:08 +0100 |
commit | 46d48171d63b40e1cdc4650a278a9af48717e7ed (patch) | |
tree | 7f802177ce980914851af066cf83b3e653d549fe | |
parent | bb7bc187767f89b48bb0ab464b2244eeefe7c658 (diff) | |
download | rabbitmq-server-46d48171d63b40e1cdc4650a278a9af48717e7ed.tar.gz |
pause/resume heartbeat monitor as required
-rw-r--r-- | src/rabbit_heartbeat.erl | 34 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 20 |
2 files changed, 43 insertions, 11 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 1989fb7b..faddffc1 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,14 +31,17 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2]). +-export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> - rabbit_types:maybe({pid(), pid()})). +-type(pids() :: rabbit_types:maybe({pid(), pid()})). + +-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()). +-spec(pause_monitor/1 :: (pids()) -> 'ok'). +-spec(resume_monitor/1 :: (pids()) -> 'ok'). -endif. @@ -70,20 +73,43 @@ start_heartbeat(Sock, TimeoutSec) -> end}, Parent) end), {Sender, Receiver}. +pause_monitor(none) -> + ok; +pause_monitor({_Sender, Receiver}) -> + Receiver ! pause, + ok. + +resume_monitor(none) -> + ok; +resume_monitor({_Sender, Receiver}) -> + Receiver ! resume, + ok. + +%%---------------------------------------------------------------------------- + heartbeater(Params, Parent) -> heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, MonitorRef, {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, receive {'DOWN', MonitorRef, process, _Object, _Info} -> ok; + pause -> + receive + {'DOWN', MonitorRef, process, _Object, _Info} -> + ok; + resume -> + Recurse({0, 0}); + Other -> + exit({unexpected_message, Other}) + end; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, if NewStatVal =/= StatVal -> Recurse({NewStatVal, 0}); SameCount < Threshold -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 46171aec..2c1a24c4 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -58,7 +58,7 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector, conserving_memory}). + queue_collector, heartbeater, conserving_memory}). -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, @@ -259,7 +259,9 @@ start_connection(Parent, Deb, Sock, SockTransform) -> callback = uninitialized_callback, recv_ref = none, connection_state = pre_init, - queue_collector = Collector}, + queue_collector = Collector, + heartbeater = none, + conserving_memory = false}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -355,8 +357,9 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit({unexpected_message, Other}) end. -switch_callback(State = #v1{conserving_memory = true}, Callback, Length) -> - %% TODO: pause heartbeat monitor +switch_callback(State = #v1{conserving_memory = true, + heartbeater = Heartbeater}, Callback, Length) -> + ok = rabbit_heartbeat:pause_monitor(Heartbeater), %% TODO: only do this after receiving a content-bearing method State#v1{callback = {Callback, Length}, recv_ref = none}; switch_callback(State, Callback, Length) -> @@ -372,9 +375,10 @@ terminate(_Explanation, State) -> {force, State}. internal_conserve_memory(false, State = #v1{conserving_memory = true, + heartbeater = Heartbeater, callback = {Callback, Length}, recv_ref = none}) -> - %% TODO: resume heartbeat monitor + ok = rabbit_heartbeat:resume_monitor(Heartbeater), switch_callback(State#v1{conserving_memory = false}, Callback, Length); internal_conserve_memory(Conserve, State) -> State#v1{conserving_memory = Conserve}. @@ -671,11 +675,13 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + Heartbeater = rabbit_heartbeat:start_heartbeat( + Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, - frame_max = FrameMax}} + frame_max = FrameMax}, + heartbeater = Heartbeater} end; handle_method0(#'connection.open'{virtual_host = VHostPath}, |