summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_alarm.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_alarm.erl')
-rw-r--r--deps/rabbit/src/rabbit_alarm.erl365
1 files changed, 365 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_alarm.erl b/deps/rabbit/src/rabbit_alarm.erl
new file mode 100644
index 0000000000..3f1ab7ae62
--- /dev/null
+++ b/deps/rabbit/src/rabbit_alarm.erl
@@ -0,0 +1,365 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+%% There are two types of alarms handled by this module:
+%%
+%% * per-node resource (disk, memory) alarms for the whole cluster. If any node
+%% has an alarm, then all publishing should be disabled across the
+%% cluster until all alarms clear. When a node sets such an alarm,
+%% this information is automatically propagated throughout the cluster.
+%% `#alarms.alarmed_nodes' is being used to track this type of alarms.
+%% * limits local to this node (file_descriptor_limit). Used for information
+%% purposes only: logging and getting node status. This information is not propagated
+%% throughout the cluster. `#alarms.alarms' is being used to track this type of alarms.
+%% @end
+
+-module(rabbit_alarm).
+
+-behaviour(gen_event).
+
+-export([start_link/0, start/0, stop/0, register/2, set_alarm/1,
+ clear_alarm/1, get_alarms/0, get_alarms/1, get_local_alarms/0, get_local_alarms/1, on_node_up/1, on_node_down/1,
+ format_as_map/1, format_as_maps/1, is_local/1]).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([remote_conserve_resources/3]). %% Internal use only
+
+-define(SERVER, ?MODULE).
+
+-define(FILE_DESCRIPTOR_RESOURCE, <<"file descriptors">>).
+-define(MEMORY_RESOURCE, <<"memory">>).
+-define(DISK_SPACE_RESOURCE, <<"disk space">>).
+
+%%----------------------------------------------------------------------------
+
+-record(alarms, {alertees :: dict:dict(pid(), rabbit_types:mfargs()),
+ alarmed_nodes :: dict:dict(node(), [resource_alarm_source()]),
+ alarms :: [alarm()]}).
+
+-type local_alarm() :: 'file_descriptor_limit'.
+-type resource_alarm_source() :: 'disk' | 'memory'.
+-type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}.
+-type alarm() :: local_alarm() | resource_alarm().
+
+%%----------------------------------------------------------------------------
+
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+
+start_link() ->
+ gen_event:start_link({local, ?SERVER}).
+
+-spec start() -> 'ok'.
+
+start() ->
+ ok = rabbit_sup:start_restartable_child(?MODULE),
+ ok = gen_event:add_handler(?SERVER, ?MODULE, []),
+ {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
+
+ rabbit_sup:start_restartable_child(
+ vm_memory_monitor, [MemoryWatermark,
+ fun (Alarm) ->
+ background_gc:run(),
+ set_alarm(Alarm)
+ end,
+ fun clear_alarm/1]),
+ {ok, DiskLimit} = application:get_env(disk_free_limit),
+ rabbit_sup:start_delayed_restartable_child(
+ rabbit_disk_monitor, [DiskLimit]),
+ ok.
+
+-spec stop() -> 'ok'.
+
+stop() -> ok.
+
+%% Registers a handler that should be called on every resource alarm change.
+%% Given a call rabbit_alarm:register(Pid, {M, F, A}), the handler would be
+%% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source'
+%% has the type of resource_alarm_source() and `Alert' has the type of resource_alert().
+
+-spec register(pid(), rabbit_types:mfargs()) -> [atom()].
+
+register(Pid, AlertMFA) ->
+ gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity).
+
+-spec set_alarm({alarm(), []}) -> 'ok'.
+
+set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}).
+
+-spec clear_alarm(alarm()) -> 'ok'.
+
+clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}).
+
+-spec get_alarms() -> [{alarm(), []}].
+get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity).
+
+-spec get_alarms(timeout()) -> [{alarm(), []}].
+get_alarms(Timeout) -> gen_event:call(?SERVER, ?MODULE, get_alarms, Timeout).
+
+-spec get_local_alarms() -> [alarm()].
+get_local_alarms() -> gen_event:call(?SERVER, ?MODULE, get_local_alarms, infinity).
+
+-spec get_local_alarms(timeout()) -> [alarm()].
+get_local_alarms(Timeout) -> gen_event:call(?SERVER, ?MODULE, get_local_alarms, Timeout).
+
+-spec filter_local_alarms([alarm()]) -> [alarm()].
+filter_local_alarms(Alarms) ->
+ lists:filter(fun is_local/1, Alarms).
+
+-spec is_local({alarm(), any()}) -> boolean().
+is_local({file_descriptor_limit, _}) -> true;
+is_local({{resource_limit, _Resource, Node}, _}) when Node =:= node() -> true;
+is_local({{resource_limit, _Resource, Node}, _}) when Node =/= node() -> false.
+
+-spec format_as_map(alarm()) -> #{binary() => term()}.
+format_as_map(file_descriptor_limit) ->
+ #{
+ <<"resource">> => ?FILE_DESCRIPTOR_RESOURCE,
+ <<"node">> => node()
+ };
+format_as_map({resource_limit, disk, Node}) ->
+ #{
+ <<"resource">> => ?DISK_SPACE_RESOURCE,
+ <<"node">> => Node
+ };
+format_as_map({resource_limit, memory, Node}) ->
+ #{
+ <<"resource">> => ?MEMORY_RESOURCE,
+ <<"node">> => Node
+ };
+format_as_map({resource_limit, Limit, Node}) ->
+ #{
+ <<"resource">> => rabbit_data_coercion:to_binary(Limit),
+ <<"node">> => Node
+ }.
+
+-spec format_as_maps([{alarm(), []}]) -> [#{any() => term()}].
+format_as_maps(Alarms) when is_list(Alarms) ->
+ %% get_alarms/0 returns
+ %%
+ %% [
+ %% {file_descriptor_limit, []},
+ %% {{resource_limit, disk, rabbit@warp10}, []},
+ %% {{resource_limit, memory, rabbit@warp10}, []}
+ %% ]
+ lists:map(fun({Resource, _}) -> format_as_map(Resource);
+ (Resource) -> format_as_map(Resource)
+ end, Alarms).
+
+
+-spec on_node_up(node()) -> 'ok'.
+on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}).
+
+-spec on_node_down(node()) -> 'ok'.
+on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}).
+
+remote_conserve_resources(Pid, Source, {true, _, _}) ->
+ gen_event:notify({?SERVER, node(Pid)},
+ {set_alarm, {{resource_limit, Source, node()}, []}});
+remote_conserve_resources(Pid, Source, {false, _, _}) ->
+ gen_event:notify({?SERVER, node(Pid)},
+ {clear_alarm, {resource_limit, Source, node()}}).
+
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #alarms{alertees = dict:new(),
+ alarmed_nodes = dict:new(),
+ alarms = []}}.
+
+handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) ->
+ {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])),
+ internal_register(Pid, AlertMFA, State)};
+
+handle_call(get_alarms, State) ->
+ {ok, compute_alarms(State), State};
+
+handle_call(get_local_alarms, State) ->
+ {ok, filter_local_alarms(compute_alarms(State)), State};
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
+ case is_node_alarmed(Source, Node, State) of
+ true ->
+ {ok, State};
+ false ->
+ rabbit_event:notify(alarm_set, [{source, Source},
+ {node, Node}]),
+ handle_set_resource_alarm(Source, Node, State)
+ end;
+handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
+ case lists:member(Alarm, Alarms) of
+ true -> {ok, State};
+ false -> UpdatedAlarms = lists:usort([Alarm|Alarms]),
+ handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms})
+ end;
+
+handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
+ case is_node_alarmed(Source, Node, State) of
+ true ->
+ rabbit_event:notify(alarm_cleared, [{source, Source},
+ {node, Node}]),
+ handle_clear_resource_alarm(Source, Node, State);
+ false ->
+ {ok, State}
+ end;
+handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
+ case lists:keymember(Alarm, 1, Alarms) of
+ true -> handle_clear_alarm(
+ Alarm, State#alarms{alarms = lists:keydelete(
+ Alarm, 1, Alarms)});
+ false -> {ok, State}
+
+ end;
+
+handle_event({node_up, Node}, State) ->
+ %% Must do this via notify and not call to avoid possible deadlock.
+ ok = gen_event:notify(
+ {?SERVER, Node},
+ {register, self(), {?MODULE, remote_conserve_resources, []}}),
+ {ok, State};
+
+handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) ->
+ AlarmsForDeadNode = case dict:find(Node, AN) of
+ {ok, V} -> V;
+ error -> []
+ end,
+ {ok, lists:foldr(fun(Source, AccState) ->
+ rabbit_log:warning("~s resource limit alarm cleared for dead node ~p~n",
+ [Source, Node]),
+ maybe_alert(fun dict_unappend/3, Node, Source, false, AccState)
+ end, State, AlarmsForDeadNode)};
+
+handle_event({register, Pid, AlertMFA}, State) ->
+ {ok, internal_register(Pid, AlertMFA, State)};
+
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #alarms{alertees = Alertees}) ->
+ {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}};
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+dict_append(Key, Val, Dict) ->
+ L = case dict:find(Key, Dict) of
+ {ok, V} -> V;
+ error -> []
+ end,
+ dict:store(Key, lists:usort([Val|L]), Dict).
+
+dict_unappend(Key, Val, Dict) ->
+ L = case dict:find(Key, Dict) of
+ {ok, V} -> V;
+ error -> []
+ end,
+
+ case lists:delete(Val, L) of
+ [] -> dict:erase(Key, Dict);
+ X -> dict:store(Key, X, Dict)
+ end.
+
+maybe_alert(UpdateFun, Node, Source, WasAlertAdded,
+ State = #alarms{alarmed_nodes = AN,
+ alertees = Alertees}) ->
+ AN1 = UpdateFun(Node, Source, AN),
+ %% Is alarm for Source still set on any node?
+ StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)),
+ case StillHasAlerts of
+ true -> ok;
+ false -> rabbit_log:warning("~s resource limit alarm cleared across the cluster~n", [Source])
+ end,
+ Alert = {WasAlertAdded, StillHasAlerts, Node},
+ case node() of
+ Node -> ok = alert_remote(Alert, Alertees, Source);
+ _ -> ok
+ end,
+ ok = alert_local(Alert, Alertees, Source),
+ State#alarms{alarmed_nodes = AN1}.
+
+alert_local(Alert, Alertees, Source) ->
+ alert(Alertees, Source, Alert, fun erlang:'=:='/2).
+
+alert_remote(Alert, Alertees, Source) ->
+ alert(Alertees, Source, Alert, fun erlang:'=/='/2).
+
+alert(Alertees, Source, Alert, NodeComparator) ->
+ Node = node(),
+ dict:fold(fun (Pid, {M, F, A}, ok) ->
+ case NodeComparator(Node, node(Pid)) of
+ true -> apply(M, F, A ++ [Pid, Source, Alert]);
+ false -> ok
+ end
+ end, ok, Alertees).
+
+internal_register(Pid, {M, F, A} = AlertMFA,
+ State = #alarms{alertees = Alertees}) ->
+ _MRef = erlang:monitor(process, Pid),
+ case dict:find(node(), State#alarms.alarmed_nodes) of
+ {ok, Sources} -> [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources];
+ error -> ok
+ end,
+ NewAlertees = dict:store(Pid, AlertMFA, Alertees),
+ State#alarms{alertees = NewAlertees}.
+
+handle_set_resource_alarm(Source, Node, State) ->
+ rabbit_log:warning(
+ "~s resource limit alarm set on node ~p.~n~n"
+ "**********************************************************~n"
+ "*** Publishers will be blocked until this alarm clears ***~n"
+ "**********************************************************~n",
+ [Source, Node]),
+ {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}.
+
+handle_set_alarm({file_descriptor_limit, []}, State) ->
+ rabbit_log:warning(
+ "file descriptor limit alarm set.~n~n"
+ "********************************************************************~n"
+ "*** New connections will not be accepted until this alarm clears ***~n"
+ "********************************************************************~n"),
+ {ok, State};
+handle_set_alarm(Alarm, State) ->
+ rabbit_log:warning("alarm '~p' set~n", [Alarm]),
+ {ok, State}.
+
+handle_clear_resource_alarm(Source, Node, State) ->
+ rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
+ [Source, Node]),
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}.
+
+handle_clear_alarm(file_descriptor_limit, State) ->
+ rabbit_log:warning("file descriptor limit alarm cleared~n"),
+ {ok, State};
+handle_clear_alarm(Alarm, State) ->
+ rabbit_log:warning("alarm '~p' cleared~n", [Alarm]),
+ {ok, State}.
+
+is_node_alarmed(Source, Node, #alarms{alarmed_nodes = AN}) ->
+ case dict:find(Node, AN) of
+ {ok, Sources} ->
+ lists:member(Source, Sources);
+ error ->
+ false
+ end.
+
+compute_alarms(#alarms{alarms = Alarms,
+ alarmed_nodes = AN}) ->
+ Alarms ++ [ {{resource_limit, Source, Node}, []}
+ || {Node, Sources} <- dict:to_list(AN), Source <- Sources ].