diff options
-rw-r--r-- | src/rabbit_alarm.erl | 34 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 5 |
2 files changed, 15 insertions, 24 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 16b69af9..40be1951 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -133,7 +133,7 @@ handle_event({node_up, Node}, State) -> {ok, State}; handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; + {ok, maybe_alert(fun dict_unappend_all/3, Node, [], clear, State)}; handle_event({register, Pid, AlertMFA}, State) -> {ok, internal_register(Pid, AlertMFA, State)}; @@ -177,35 +177,25 @@ dict_unappend(Key, Val, Dict) -> X -> dict:store(Key, X, Dict) end. -count_dict_values(Val, Dict) -> - dict:fold(fun (_Node, List, Count) -> - Count + case lists:member(Val, List) of - true -> 1; - false -> 0 - end - end, 0, Dict). - -maybe_alert(UpdateFun, Node, Source, +maybe_alert(UpdateFun, Node, Source, Event, State = #alarms{alarmed_nodes = AN, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), - BeforeSz = count_dict_values(Source, AN), - AfterSz = count_dict_values(Source, AN1), %% If we have changed our alarm state, inform the remotes. IsLocal = Node =:= node(), - if IsLocal andalso BeforeSz < AfterSz -> + if IsLocal andalso Event =:= set -> ok = alert_remote(true, Alertees, Source); - IsLocal andalso BeforeSz > AfterSz -> + IsLocal andalso Event =:= clear -> ok = alert_remote(false, Alertees, Source); - true -> + true -> ok end, - %% If the overall alarm state has changed, inform the locals. - case {dict:size(AN), dict:size(AN1)} of - {0, 1} -> ok = alert_local(true, Alertees, Source); - {1, 0} -> ok = alert_local(false, Alertees, Source); - {_, _} -> ok + case Event of + clear -> + ok = alert_local(false, Alertees, Source); + set -> + ok = alert_local(true, Alertees, Source) end, State#alarms{alarmed_nodes = AN1}. @@ -241,7 +231,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", [Source, Node]), - {ok, maybe_alert(fun dict_append/3, Node, Source, State)}; + {ok, maybe_alert(fun dict_append/3, Node, Source, set, State)}; handle_set_alarm({file_descriptor_limit, []}, State) -> rabbit_log:warning( "file descriptor limit alarm set.~n~n" @@ -256,7 +246,7 @@ handle_set_alarm(Alarm, State) -> handle_clear_alarm({resource_limit, Source, Node}, State) -> rabbit_log:warning("~s resource limit alarm cleared on node ~p~n", [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; + {ok, maybe_alert(fun dict_unappend/3, Node, Source, clear, State)}; handle_clear_alarm(file_descriptor_limit, State) -> rabbit_log:warning("file descriptor limit alarm cleared~n"), {ok, State}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a2727067..31403ab8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -416,8 +416,9 @@ terminate(_Explanation, State) -> {force, State}. control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> - case {CS, ((Throttle#throttle.conserve_resources =/= []) orelse - credit_flow:blocked())} of + IsThrottled = ((Throttle#throttle.conserve_resources =/= []) orelse + credit_flow:blocked()), + case {CS, IsThrottled} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( |