summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-24 18:15:34 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-24 18:15:34 +0000
commit6fd77744201852a1fb961809f693d8b27acf7346 (patch)
treeec435583bd44f02bdef618a522ab94ea27a7cab6
parent5b5c93a5612203ce2db313fc451f223fc2e83c61 (diff)
downloadrabbitmq-server-6fd77744201852a1fb961809f693d8b27acf7346.tar.gz
Make memory alarms work correctly over clusters
-rw-r--r--Makefile4
-rw-r--r--src/rabbit_alarm.erl122
-rw-r--r--src/rabbit_node_monitor.erl11
-rw-r--r--src/vm_memory_monitor.erl4
4 files changed, 107 insertions, 34 deletions
diff --git a/Makefile b/Makefile
index 00c7809d..cdb86aad 100644
--- a/Makefile
+++ b/Makefile
@@ -177,11 +177,11 @@ stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
set-memory-alarm: all
- echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \
+ echo "alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []})." | \
$(ERL_CALL)
clear-memory-alarm: all
- echo "alarm_handler:clear_alarm(vm_memory_high_watermark)." | \
+ echo "alarm_handler:clear_alarm({vm_memory_high_watermark, node()})." | \
$(ERL_CALL)
stop-node:
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 37e40981..365a5ed2 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -18,12 +18,14 @@
-behaviour(gen_event).
--export([start/0, stop/0, register/2]).
+-export([start/0, stop/0, register/2, on_node/2]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--record(alarms, {alertees, vm_memory_high_watermark = false}).
+-export([remote_conserve_memory/2]). %% Internal use only
+
+-record(alarms, {alertees, high_watermarks}).
%%----------------------------------------------------------------------------
@@ -33,6 +35,7 @@
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
+-spec(on_node/2 :: ('up'|'down', node()) -> 'ok').
-endif.
@@ -56,32 +59,61 @@ register(Pid, HighMemMFA) ->
{register, Pid, HighMemMFA},
infinity).
+on_node(Action, Node) ->
+ gen_event:notify(alarm_handler, {node, Action, Node}).
+
+remote_conserve_memory(Pid, Conserve) ->
+ RemoteNode = node(Pid),
+ %% Can't use alarm_handler:{set,clear}_alarm because that doesn't
+ %% permit notifying a remote node.
+ case Conserve of
+ true -> gen_event:notify(
+ {alarm_handler, RemoteNode},
+ {set_alarm, {{vm_memory_high_watermark, node()}, []}});
+ false -> gen_event:notify(
+ {alarm_handler, RemoteNode},
+ {clear_alarm, {vm_memory_high_watermark, node()}})
+ end.
+
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #alarms{alertees = dict:new()}}.
+ {ok, #alarms{alertees = dict:new(),
+ high_watermarks = sets:new()}}.
-handle_call({register, Pid, {M, F, A} = HighMemMFA},
- State = #alarms{alertees = Alertess}) ->
- _MRef = erlang:monitor(process, Pid),
- ok = case State#alarms.vm_memory_high_watermark of
- true -> apply(M, F, A ++ [Pid, true]);
- false -> ok
- end,
- NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
- {ok, State#alarms.vm_memory_high_watermark,
- State#alarms{alertees = NewAlertees}};
+handle_call({register, Pid, HighMemMFA}, State) ->
+ {ok, 0 < sets:size(State#alarms.high_watermarks),
+ internal_register(Pid, HighMemMFA, State)};
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) ->
- ok = alert(true, State#alarms.alertees),
- {ok, State#alarms{vm_memory_high_watermark = true}};
-
-handle_event({clear_alarm, vm_memory_high_watermark}, State) ->
- ok = alert(false, State#alarms.alertees),
- {ok, State#alarms{vm_memory_high_watermark = false}};
+handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}},
+ State = #alarms{high_watermarks = Highs}) ->
+ Highs1 = sets:add_element(Node, Highs),
+ ok = maybe_alert(Highs, Highs1, State#alarms.alertees, Node, true),
+ {ok, State#alarms{high_watermarks = Highs1}};
+
+handle_event({clear_alarm, {vm_memory_high_watermark, Node}},
+ State = #alarms{high_watermarks = Highs}) ->
+ Highs1 = sets:del_element(Node, Highs),
+ ok = maybe_alert(Highs, Highs1, State#alarms.alertees, Node, false),
+ {ok, State#alarms{high_watermarks = Highs1}};
+
+handle_event({node, up, Node}, State) ->
+ %% Must do this via notify and not call to avoid possible deadlock.
+ ok = gen_event:notify(
+ {alarm_handler, Node},
+ {register, self(), {?MODULE, remote_conserve_memory, []}}),
+ {ok, State};
+
+handle_event({node, down, Node}, State = #alarms{high_watermarks = Highs}) ->
+ Highs1 = sets:del_element(Node, Highs),
+ ok = maybe_alert(Highs, Highs1, State#alarms.alertees, Node, false),
+ {ok, State#alarms{high_watermarks = Highs1}};
+
+handle_event({register, Pid, HighMemMFA}, State) ->
+ {ok, internal_register(Pid, HighMemMFA, State)};
handle_event(_Event, State) ->
{ok, State}.
@@ -100,10 +132,50 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-alert(_Alert, undefined) ->
- ok;
-alert(Alert, Alertees) ->
+
+maybe_alert(Before, After, Alertees, AlarmNode, Action)
+ when AlarmNode =:= node() ->
+ %% If we have changed our alarm state, always inform the remotes.
+ case {sets:is_element(AlarmNode, Before), sets:is_element(AlarmNode, After),
+ Action} of
+ {false, true, true} -> alert_remote(Action, Alertees);
+ {true, false, false} -> alert_remote(Action, Alertees);
+ _ -> ok
+ end,
+ maybe_alert_local(Before, After, Alertees, Action);
+maybe_alert(Before, After, Alertees, _AlarmNode, Action) ->
+ maybe_alert_local(Before, After, Alertees, Action).
+
+maybe_alert_local(Before, After, Alertees, Action) ->
+ %% If the overall alarm state has changed, inform the locals.
+ case {sets:size(Before), sets:size(After), Action} of
+ {0, 1, true} -> alert_local(Action, Alertees);
+ {1, 0, false} -> alert_local(Action, Alertees);
+ _ -> ok
+ end.
+
+alert_local(Alert, Alertees) ->
+ alert(Alert, Alertees, fun erlang:'=:='/2).
+
+alert_remote(Alert, Alertees) ->
+ alert(Alert, Alertees, fun erlang:'=/='/2).
+
+alert(Alert, Alertees, NodeComparator) ->
+ Node = node(),
dict:fold(fun (Pid, {M, F, A}, Acc) ->
- ok = erlang:apply(M, F, A ++ [Pid, Alert]),
- Acc
+ case NodeComparator(Node, node(Pid)) of
+ true -> ok = erlang:apply(M, F, A ++ [Pid, Alert]),
+ Acc;
+ false -> Acc
+ end
end, ok, Alertees).
+
+internal_register(Pid, {M, F, A} = HighMemMFA,
+ State = #alarms{alertees = Alertees}) ->
+ _MRef = erlang:monitor(process, Pid),
+ ok = case sets:is_element(node(), State#alarms.high_watermarks) of
+ true -> apply(M, F, A ++ [Pid, true]);
+ false -> ok
+ end,
+ NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
+ State#alarms{alertees = NewAlertees}.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 817abaa2..061f628d 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -69,6 +69,7 @@ handle_call(_Request, _From, State) ->
handle_cast({rabbit_running_on, Node}, State) ->
rabbit_log:info("node ~p up~n", [Node]),
erlang:monitor(process, {rabbit, Node}),
+ ok = rabbit_alarm:on_node(up, Node),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -92,10 +93,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-%% TODO: This may turn out to be a performance hog when there are
-%% lots of nodes. We really only need to execute this code on
-%% *one* node, rather than all of them.
+%% TODO: This may turn out to be a performance hog when there are lots
+%% of nodes. We really only need to execute some of these statements
+%% on *one* node, rather than all of them.
handle_dead_rabbit(Node) ->
ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node).
-
+ ok = rabbit_amqqueue:on_node_down(Node),
+ ok = rabbit_alarm:on_node(down, Node).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 44e1e4b5..dcc6aff5 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -175,10 +175,10 @@ internal_update(State = #state { memory_limit = MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({vm_memory_high_watermark, []});
+ alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm(vm_memory_high_watermark);
+ alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
_ ->
ok
end,