diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-06 15:32:02 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-06 15:32:02 +0100 |
commit | 0d789ad162e8b0974f90dd6d4fe7257be9f17f49 (patch) | |
tree | f571572edea0eee96f4d5f82cba9b79a881a4298 | |
parent | b13e397a82b45b2726fd6f9b7758accbd811496d (diff) | |
parent | 56b47f4015bc99b50608c8ca472de6a0aea68f6b (diff) | |
download | rabbitmq-server-0d789ad162e8b0974f90dd6d4fe7257be9f17f49.tar.gz |
Merge bug25191
-rw-r--r-- | .hgignore | 1 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 72 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 79 |
3 files changed, 96 insertions, 56 deletions
@@ -3,6 +3,7 @@ syntax: glob *~ *.swp *.patch +*.orig erl_crash.dump deps.mk diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 6607c4f6..cd1d125b 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -37,7 +37,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]). -spec(set_alarm/1 :: (any()) -> 'ok'). -spec(clear_alarm/1 :: (any()) -> 'ok'). -spec(on_node_up/1 :: (node()) -> 'ok'). @@ -93,8 +93,8 @@ init([]) -> alarmed_nodes = dict:new(), alarms = []}}. -handle_call({register, Pid, AlertMFA}, State) -> - {ok, 0 < dict:size(State#alarms.alarmed_nodes), +handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> + {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> @@ -104,11 +104,20 @@ handle_call(_Request, State) -> {ok, not_understood, State}. handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> - handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]}); + case lists:member(Alarm, Alarms) of + true -> {ok, State}; + false -> UpdatedAlarms = lists:usort([Alarm|Alarms]), + handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms}) + end; handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> - handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1, - Alarms)}); + case lists:keymember(Alarm, 1, Alarms) of + true -> handle_clear_alarm( + Alarm, State#alarms{alarms = lists:keydelete( + Alarm, 1, Alarms)}); + false -> {ok, State} + + end; handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. @@ -118,7 +127,7 @@ handle_event({node_up, Node}, State) -> {ok, State}; handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; + {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)}; handle_event({register, Pid, AlertMFA}, State) -> {ok, internal_register(Pid, AlertMFA, State)}; @@ -141,45 +150,36 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +dict_append(Key, Val, Dict) -> + L = case dict:find(Key, Dict) of + {ok, V} -> V; + error -> [] + end, + dict:store(Key, lists:usort([Val|L]), Dict). + dict_unappend_all(Key, _Val, Dict) -> dict:erase(Key, Dict). dict_unappend(Key, Val, Dict) -> - case lists:delete(Val, dict:fetch(Key, Dict)) of + L = case dict:find(Key, Dict) of + {ok, V} -> V; + error -> [] + end, + + case lists:delete(Val, L) of [] -> dict:erase(Key, Dict); X -> dict:store(Key, X, Dict) end. -count_dict_values(Val, Dict) -> - dict:fold(fun (_Node, List, Count) -> - Count + case lists:member(Val, List) of - true -> 1; - false -> 0 - end - end, 0, Dict). - -maybe_alert(UpdateFun, Node, Source, +maybe_alert(UpdateFun, Node, Source, Alert, State = #alarms{alarmed_nodes = AN, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), - BeforeSz = count_dict_values(Source, AN), - AfterSz = count_dict_values(Source, AN1), - - %% If we have changed our alarm state, inform the remotes. - IsLocal = Node =:= node(), - if IsLocal andalso BeforeSz < AfterSz -> - ok = alert_remote(true, Alertees, Source); - IsLocal andalso BeforeSz > AfterSz -> - ok = alert_remote(false, Alertees, Source); - true -> - ok - end, - %% If the overall alarm state has changed, inform the locals. - case {dict:size(AN), dict:size(AN1)} of - {0, 1} -> ok = alert_local(true, Alertees, Source); - {1, 0} -> ok = alert_local(false, Alertees, Source); - {_, _} -> ok + case node() of + Node -> ok = alert_remote(Alert, Alertees, Source); + _ -> ok end, + ok = alert_local(Alert, Alertees, Source), State#alarms{alarmed_nodes = AN1}. alert_local(Alert, Alertees, Source) -> @@ -214,7 +214,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", [Source, Node]), - {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; + {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}; handle_set_alarm({file_descriptor_limit, []}, State) -> rabbit_log:warning( "file descriptor limit alarm set.~n~n" @@ -229,7 +229,7 @@ handle_set_alarm(Alarm, State) -> handle_clear_alarm({resource_limit, Source, Node}, State) -> rabbit_log:warning("~s resource limit alarm cleared on node ~p~n", [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; + {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}; handle_clear_alarm(file_descriptor_limit, State) -> rabbit_log:warning("file descriptor limit alarm cleared~n"), {ok, State}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 6d3ac2d9..9c902703 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -45,7 +45,8 @@ client_properties, capabilities, auth_mechanism, auth_state}). --record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}). +-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, + blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -142,8 +143,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_resources(Pid, _Source, Conserve) -> - Pid ! {conserve_resources, Conserve}, +conserve_resources(Pid, Source, Conserve) -> + Pid ! {conserve_resources, Source, Conserve}, ok. server_properties(Protocol) -> @@ -178,7 +179,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, {<<"basic.nack">>, bool, true}, - {<<"consumer_cancel_notify">>, bool, true}]; + {<<"consumer_cancel_notify">>, bool, true}, + {<<"connection.blocked">>, bool, true}]; server_capabilities(_) -> []. @@ -246,9 +248,10 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, throttle = #throttle{ - conserve_resources = false, - last_blocked_by = none, - last_blocked_at = never}}, + alarmed_by = [], + last_blocked_by = none, + last_blocked_at = never, + blocked_sent = false}}, try run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( @@ -321,9 +324,14 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. -handle_other({conserve_resources, Conserve}, - State = #v1{throttle = Throttle}) -> - Throttle1 = Throttle#throttle{conserve_resources = Conserve}, +handle_other({conserve_resources, Source, Conserve}, + State = #v1{throttle = Throttle = + #throttle{alarmed_by = CR}}) -> + CR1 = case Conserve of + true -> lists:usort([Source | CR]); + false -> CR -- [Source] + end, + Throttle1 = Throttle#throttle{alarmed_by = CR1}, control_throttle(State#v1{throttle = Throttle1}); handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), @@ -409,30 +417,61 @@ terminate(_Explanation, State) -> {force, State}. control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> - case {CS, (Throttle#throttle.conserve_resources orelse - credit_flow:blocked())} of + IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse + credit_flow:blocked()), + case {CS, IsThrottled} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - State#v1{connection_state = running}; + maybe_send_unblocked(State), + State#v1{connection_state = running, + throttle = Throttle#throttle{ + blocked_sent = false}}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State end. -maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> +maybe_block(State = #v1{connection_state = blocking, + throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + Sent = maybe_send_blocked(State), State#v1{connection_state = blocked, throttle = update_last_blocked_by( - Throttle#throttle{last_blocked_at = erlang:now()})}; + Throttle#throttle{last_blocked_at = erlang:now(), + blocked_sent = Sent})}; maybe_block(State) -> State. -update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) -> - Throttle#throttle{last_blocked_by = resource}; -update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) -> - Throttle#throttle{last_blocked_by = flow}. +maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) -> + false; +maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, + connection = #connection{ + protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> + case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of + {bool, true} -> + RStr = string:join([atom_to_list(A) || A <- CR], " & "), + Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, + Protocol), + true; + _ -> + false + end. + +maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> + ok; +maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, + sock = Sock}) -> + ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). + +update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> + Throttle#throttle{last_blocked_by = flow}; +update_last_blocked_by(Throttle) -> + Throttle#throttle{last_blocked_by = resource}. %%-------------------------------------------------------------------------- %% error handling / termination @@ -847,7 +886,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - Throttle1 = Throttle#throttle{conserve_resources = Conserve}, + Throttle1 = Throttle#throttle{alarmed_by = Conserve}, {ok, ChannelSupSupPid} = supervisor2:start_child( ChSup3Pid, |