summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@rabbitmq.com>2013-06-24 17:43:46 +0400
committerMichael Klishin <michael@rabbitmq.com>2013-06-24 17:43:46 +0400
commit7b88b47870f487bef9ccdab25883582943613c10 (patch)
treefc3b96fd9bb5161a3322a06d94ef482eb36dac6d
parent9d8c38f818676f0bb0615e3a9fce4ad8f4cab126 (diff)
downloadrabbitmq-server-7b88b47870f487bef9ccdab25883582943613c10.tar.gz
Deliver all alart notification to handle overlapping alarms
connection.blocked requires us to track resources we are conserving. This means the old logic of determining edge state transitions for alarms does not work any more. Instead of using the old strategy of comparing alarmed node collection sizes, instead of pass around what event the notification is for and simply deliver it to the relevant nodes. This requires that rabbit_alarm event consumers handle duplicate notifications. They already do so after earlier changes on branch bug25191. This makes connection unblocking work correctly in the following sequence of events: * memory alarm set for node A * disk alarm set for node A * memory alarm cleared for node A * disk alarm cleared for node A as well as other similar scenarios with overlapping alarms. This slighly increases internode and intranode message traffic of alarm notifications. Since alarms occur rarely in well-monitored systems, this is a reasonable trade-off.
-rw-r--r--src/rabbit_alarm.erl34
-rw-r--r--src/rabbit_reader.erl5
2 files changed, 15 insertions, 24 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 16b69af9..40be1951 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -133,7 +133,7 @@ handle_event({node_up, Node}, State) ->
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], clear, State)};
handle_event({register, Pid, AlertMFA}, State) ->
{ok, internal_register(Pid, AlertMFA, State)};
@@ -177,35 +177,25 @@ dict_unappend(Key, Val, Dict) ->
X -> dict:store(Key, X, Dict)
end.
-count_dict_values(Val, Dict) ->
- dict:fold(fun (_Node, List, Count) ->
- Count + case lists:member(Val, List) of
- true -> 1;
- false -> 0
- end
- end, 0, Dict).
-
-maybe_alert(UpdateFun, Node, Source,
+maybe_alert(UpdateFun, Node, Source, Event,
State = #alarms{alarmed_nodes = AN,
alertees = Alertees}) ->
AN1 = UpdateFun(Node, Source, AN),
- BeforeSz = count_dict_values(Source, AN),
- AfterSz = count_dict_values(Source, AN1),
%% If we have changed our alarm state, inform the remotes.
IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz ->
+ if IsLocal andalso Event =:= set ->
ok = alert_remote(true, Alertees, Source);
- IsLocal andalso BeforeSz > AfterSz ->
+ IsLocal andalso Event =:= clear ->
ok = alert_remote(false, Alertees, Source);
- true ->
+ true ->
ok
end,
- %% If the overall alarm state has changed, inform the locals.
- case {dict:size(AN), dict:size(AN1)} of
- {0, 1} -> ok = alert_local(true, Alertees, Source);
- {1, 0} -> ok = alert_local(false, Alertees, Source);
- {_, _} -> ok
+ case Event of
+ clear ->
+ ok = alert_local(false, Alertees, Source);
+ set ->
+ ok = alert_local(true, Alertees, Source)
end,
State#alarms{alarmed_nodes = AN1}.
@@ -241,7 +231,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_append/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_append/3, Node, Source, set, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
rabbit_log:warning(
"file descriptor limit alarm set.~n~n"
@@ -256,7 +246,7 @@ handle_set_alarm(Alarm, State) ->
handle_clear_alarm({resource_limit, Source, Node}, State) ->
rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, clear, State)};
handle_clear_alarm(file_descriptor_limit, State) ->
rabbit_log:warning("file descriptor limit alarm cleared~n"),
{ok, State};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a2727067..31403ab8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -416,8 +416,9 @@ terminate(_Explanation, State) ->
{force, State}.
control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
- case {CS, ((Throttle#throttle.conserve_resources =/= []) orelse
- credit_flow:blocked())} of
+ IsThrottled = ((Throttle#throttle.conserve_resources =/= []) orelse
+ credit_flow:blocked()),
+ case {CS, IsThrottled} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(