diff options
author | Michael Klishin <michael@rabbitmq.com> | 2013-06-17 17:39:26 +0400 |
---|---|---|
committer | Michael Klishin <michael@rabbitmq.com> | 2013-06-17 17:39:26 +0400 |
commit | 5286cf1aceff6967dbb0dfe34d7a7aa387c4106f (patch) | |
tree | 3c694b882e0c462699ff781e425a7f0a7a2c437b | |
parent | f793ed30b070f5210822b04ce203cec6d49463c5 (diff) | |
parent | e0d2c7e2c7a2010dda1352bbf9482d7b1ee1bc71 (diff) | |
download | rabbitmq-server-5286cf1aceff6967dbb0dfe34d7a7aa387c4106f.tar.gz |
merge default into bug25191
-rw-r--r-- | src/rabbit_alarm.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 71 |
2 files changed, 57 insertions, 20 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 6d24d130..17f1edcf 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -37,7 +37,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]). -spec(set_alarm/1 :: (any()) -> 'ok'). -spec(clear_alarm/1 :: (any()) -> 'ok'). -spec(on_node_up/1 :: (node()) -> 'ok'). @@ -93,8 +93,8 @@ init([]) -> alarmed_nodes = dict:new(), alarms = []}}. -handle_call({register, Pid, AlertMFA}, State) -> - {ok, 0 < dict:size(State#alarms.alarmed_nodes), +handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> + {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 61fac0e2..a2727067 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -45,7 +45,8 @@ client_properties, capabilities, auth_mechanism, auth_state}). --record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}). +-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at, + blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -142,8 +143,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_resources(Pid, _Source, Conserve) -> - Pid ! {conserve_resources, Conserve}, +conserve_resources(Pid, Source, Conserve) -> + Pid ! {conserve_resources, Source, Conserve}, ok. server_properties(Protocol) -> @@ -246,9 +247,10 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, throttle = #throttle{ - conserve_resources = false, - last_blocked_by = none, - last_blocked_at = never}}, + conserve_resources = [], + last_blocked_by = none, + last_blocked_at = never, + blocked_sent = false}}, try run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( @@ -321,9 +323,14 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. -handle_other({conserve_resources, Conserve}, - State = #v1{throttle = Throttle}) -> - Throttle1 = Throttle#throttle{conserve_resources = Conserve}, +handle_other({conserve_resources, Source, Conserve}, + State = #v1{throttle = Throttle = + #throttle{conserve_resources = CR}}) -> + CR1 = case Conserve of + true -> lists:usort([Source | CR]); + false -> CR -- [Source] + end, + Throttle1 = Throttle#throttle{conserve_resources = CR1}, control_throttle(State#v1{throttle = Throttle1}); handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), @@ -409,30 +416,60 @@ terminate(_Explanation, State) -> {force, State}. control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> - case {CS, (Throttle#throttle.conserve_resources orelse + case {CS, ((Throttle#throttle.conserve_resources =/= []) orelse credit_flow:blocked())} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - State#v1{connection_state = running}; + maybe_send_unblocked(State), + State#v1{connection_state = running, + throttle = Throttle#throttle{ + blocked_sent = false}}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State end. -maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> +maybe_block(State = #v1{connection_state = blocking, + throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + Sent = maybe_send_blocked(State), State#v1{connection_state = blocked, throttle = update_last_blocked_by( - Throttle#throttle{last_blocked_at = erlang:now()})}; + Throttle#throttle{last_blocked_at = erlang:now(), + blocked_sent = Sent})}; maybe_block(State) -> State. -update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) -> - Throttle#throttle{last_blocked_by = resource}; -update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) -> - Throttle#throttle{last_blocked_by = flow}. +maybe_send_blocked(#v1{throttle = #throttle{conserve_resources = []}}) -> + false; +maybe_send_blocked(#v1{throttle = #throttle{conserve_resources = CR}, + connection = #connection{ + protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> + case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of + {bool, true} -> + RStr = string:join([atom_to_list(A) || A <- CR], " & "), + Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, + Protocol), + true; + _ -> + false + end. + +maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> + ok; +maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, + sock = Sock}) -> + ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). + +update_last_blocked_by(Throttle = #throttle{conserve_resources = []}) -> + Throttle#throttle{last_blocked_by = flow}; +update_last_blocked_by(Throttle) -> + Throttle#throttle{last_blocked_by = resource}. %%-------------------------------------------------------------------------- %% error handling / termination |