diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-24 18:15:34 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-24 18:15:34 +0000 |
commit | 6fd77744201852a1fb961809f693d8b27acf7346 (patch) | |
tree | ec435583bd44f02bdef618a522ab94ea27a7cab6 | |
parent | 5b5c93a5612203ce2db313fc451f223fc2e83c61 (diff) | |
download | rabbitmq-server-6fd77744201852a1fb961809f693d8b27acf7346.tar.gz |
Make memory alarms work correctly over clusters
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 122 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 11 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 4 |
4 files changed, 107 insertions, 34 deletions
@@ -177,11 +177,11 @@ stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) set-memory-alarm: all - echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \ + echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \ $(ERL_CALL) clear-memory-alarm: all - echo "alarm_handler:clear_alarm(vm_memory_high_watermark)." | \ + echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \ $(ERL_CALL) stop-node: diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 37e40981..365a5ed2 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -18,12 +18,14 @@ -behaviour(gen_event). --export([start/0, stop/0, register/2]). +-export([start/0, stop/0, register/2, on_node/2]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --record(alarms, {alertees, vm_memory_high_watermark = false}). +-export([remote_conserve_memory/2]). %% Internal use only + +-record(alarms, {alertees, high_watermarks}). %%---------------------------------------------------------------------------- @@ -33,6 +35,7 @@ -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). +-spec(on_node/2 :: ('up'|'down', node()) -> 'ok'). -endif. @@ -56,32 +59,61 @@ register(Pid, HighMemMFA) -> {register, Pid, HighMemMFA}, infinity). +on_node(Action, Node) -> + gen_event:notify(alarm_handler, {node, Action, Node}). + +remote_conserve_memory(Pid, Conserve) -> + RemoteNode = node(Pid), + %% Can't use alarm_handler:{set,clear}_alarm because that doesn't + %% permit notifying a remote node. + case Conserve of + true -> gen_event:notify( + {alarm_handler, RemoteNode}, + {set_alarm, {{vm_memory_high_watermark, node()}, []}}); + false -> gen_event:notify( + {alarm_handler, RemoteNode}, + {clear_alarm, {vm_memory_high_watermark, node()}}) + end. + %%---------------------------------------------------------------------------- init([]) -> - {ok, #alarms{alertees = dict:new()}}. + {ok, #alarms{alertees = dict:new(), + high_watermarks = sets:new()}}. -handle_call({register, Pid, {M, F, A} = HighMemMFA}, - State = #alarms{alertees = Alertess}) -> - _MRef = erlang:monitor(process, Pid), - ok = case State#alarms.vm_memory_high_watermark of - true -> apply(M, F, A ++ [Pid, true]); - false -> ok - end, - NewAlertees = dict:store(Pid, HighMemMFA, Alertess), - {ok, State#alarms.vm_memory_high_watermark, - State#alarms{alertees = NewAlertees}}; +handle_call({register, Pid, HighMemMFA}, State) -> + {ok, 0 < sets:size(State#alarms.high_watermarks), + internal_register(Pid, HighMemMFA, State)}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) -> - ok = alert(true, State#alarms.alertees), - {ok, State#alarms{vm_memory_high_watermark = true}}; - -handle_event({clear_alarm, vm_memory_high_watermark}, State) -> - ok = alert(false, State#alarms.alertees), - {ok, State#alarms{vm_memory_high_watermark = false}}; +handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, + State = #alarms{high_watermarks = Highs}) -> + Highs1 = sets:add_element(Node, Highs), + ok = maybe_alert(Highs, Highs1, State#alarms.alertees, Node, true), + {ok, State#alarms{high_watermarks = Highs1}}; + +handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, + State = #alarms{high_watermarks = Highs}) -> + Highs1 = sets:del_element(Node, Highs), + ok = maybe_alert(Highs, Highs1, State#alarms.alertees, Node, false), + {ok, State#alarms{high_watermarks = Highs1}}; + +handle_event({node, up, Node}, State) -> + %% Must do this via notify and not call to avoid possible deadlock. + ok = gen_event:notify( + {alarm_handler, Node}, + {register, self(), {?MODULE, remote_conserve_memory, []}}), + {ok, State}; + +handle_event({node, down, Node}, State = #alarms{high_watermarks = Highs}) -> + Highs1 = sets:del_element(Node, Highs), + ok = maybe_alert(Highs, Highs1, State#alarms.alertees, Node, false), + {ok, State#alarms{high_watermarks = Highs1}}; + +handle_event({register, Pid, HighMemMFA}, State) -> + {ok, internal_register(Pid, HighMemMFA, State)}; handle_event(_Event, State) -> {ok, State}. @@ -100,10 +132,50 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -alert(_Alert, undefined) -> - ok; -alert(Alert, Alertees) -> + +maybe_alert(Before, After, Alertees, AlarmNode, Action) + when AlarmNode =:= node() -> + %% If we have changed our alarm state, always inform the remotes. + case {sets:is_element(AlarmNode, Before), sets:is_element(AlarmNode, After), + Action} of + {false, true, true} -> alert_remote(Action, Alertees); + {true, false, false} -> alert_remote(Action, Alertees); + _ -> ok + end, + maybe_alert_local(Before, After, Alertees, Action); +maybe_alert(Before, After, Alertees, _AlarmNode, Action) -> + maybe_alert_local(Before, After, Alertees, Action). + +maybe_alert_local(Before, After, Alertees, Action) -> + %% If the overall alarm state has changed, inform the locals. + case {sets:size(Before), sets:size(After), Action} of + {0, 1, true} -> alert_local(Action, Alertees); + {1, 0, false} -> alert_local(Action, Alertees); + _ -> ok + end. + +alert_local(Alert, Alertees) -> + alert(Alert, Alertees, fun erlang:'=:='/2). + +alert_remote(Alert, Alertees) -> + alert(Alert, Alertees, fun erlang:'=/='/2). + +alert(Alert, Alertees, NodeComparator) -> + Node = node(), dict:fold(fun (Pid, {M, F, A}, Acc) -> - ok = erlang:apply(M, F, A ++ [Pid, Alert]), - Acc + case NodeComparator(Node, node(Pid)) of + true -> ok = erlang:apply(M, F, A ++ [Pid, Alert]), + Acc; + false -> Acc + end end, ok, Alertees). + +internal_register(Pid, {M, F, A} = HighMemMFA, + State = #alarms{alertees = Alertees}) -> + _MRef = erlang:monitor(process, Pid), + ok = case sets:is_element(node(), State#alarms.high_watermarks) of + true -> apply(M, F, A ++ [Pid, true]); + false -> ok + end, + NewAlertees = dict:store(Pid, HighMemMFA, Alertees), + State#alarms{alertees = NewAlertees}. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 817abaa2..061f628d 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -69,6 +69,7 @@ handle_call(_Request, _From, State) -> handle_cast({rabbit_running_on, Node}, State) -> rabbit_log:info("node ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), + ok = rabbit_alarm:on_node(up, Node), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -92,10 +93,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -%% TODO: This may turn out to be a performance hog when there are -%% lots of nodes. We really only need to execute this code on -%% *one* node, rather than all of them. +%% TODO: This may turn out to be a performance hog when there are lots +%% of nodes. We really only need to execute some of these statements +%% on *one* node, rather than all of them. handle_dead_rabbit(Node) -> ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node). - + ok = rabbit_amqqueue:on_node_down(Node), + ok = rabbit_alarm:on_node(down, Node). diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 44e1e4b5..dcc6aff5 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -175,10 +175,10 @@ internal_update(State = #state { memory_limit = MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({vm_memory_high_watermark, []}); + alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm(vm_memory_high_watermark); + alarm_handler:clear_alarm({vm_memory_high_watermark, node()}); _ -> ok end, |