summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_alarm.erl34
-rw-r--r--src/rabbit_reader.erl5
2 files changed, 15 insertions, 24 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 16b69af9..40be1951 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -133,7 +133,7 @@ handle_event({node_up, Node}, State) ->
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], clear, State)};
handle_event({register, Pid, AlertMFA}, State) ->
{ok, internal_register(Pid, AlertMFA, State)};
@@ -177,35 +177,25 @@ dict_unappend(Key, Val, Dict) ->
X -> dict:store(Key, X, Dict)
end.
-count_dict_values(Val, Dict) ->
- dict:fold(fun (_Node, List, Count) ->
- Count + case lists:member(Val, List) of
- true -> 1;
- false -> 0
- end
- end, 0, Dict).
-
-maybe_alert(UpdateFun, Node, Source,
+maybe_alert(UpdateFun, Node, Source, Event,
State = #alarms{alarmed_nodes = AN,
alertees = Alertees}) ->
AN1 = UpdateFun(Node, Source, AN),
- BeforeSz = count_dict_values(Source, AN),
- AfterSz = count_dict_values(Source, AN1),
%% If we have changed our alarm state, inform the remotes.
IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz ->
+ if IsLocal andalso Event =:= set ->
ok = alert_remote(true, Alertees, Source);
- IsLocal andalso BeforeSz > AfterSz ->
+ IsLocal andalso Event =:= clear ->
ok = alert_remote(false, Alertees, Source);
- true ->
+ true ->
ok
end,
- %% If the overall alarm state has changed, inform the locals.
- case {dict:size(AN), dict:size(AN1)} of
- {0, 1} -> ok = alert_local(true, Alertees, Source);
- {1, 0} -> ok = alert_local(false, Alertees, Source);
- {_, _} -> ok
+ case Event of
+ clear ->
+ ok = alert_local(false, Alertees, Source);
+ set ->
+ ok = alert_local(true, Alertees, Source)
end,
State#alarms{alarmed_nodes = AN1}.
@@ -241,7 +231,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_append/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_append/3, Node, Source, set, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
rabbit_log:warning(
"file descriptor limit alarm set.~n~n"
@@ -256,7 +246,7 @@ handle_set_alarm(Alarm, State) ->
handle_clear_alarm({resource_limit, Source, Node}, State) ->
rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, clear, State)};
handle_clear_alarm(file_descriptor_limit, State) ->
rabbit_log:warning("file descriptor limit alarm cleared~n"),
{ok, State};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a2727067..31403ab8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -416,8 +416,9 @@ terminate(_Explanation, State) ->
{force, State}.
control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
- case {CS, ((Throttle#throttle.conserve_resources =/= []) orelse
- credit_flow:blocked())} of
+ IsThrottled = ((Throttle#throttle.conserve_resources =/= []) orelse
+ credit_flow:blocked()),
+ case {CS, IsThrottled} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(