summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@rabbitmq.com>2013-06-17 17:39:26 +0400
committerMichael Klishin <michael@rabbitmq.com>2013-06-17 17:39:26 +0400
commit5286cf1aceff6967dbb0dfe34d7a7aa387c4106f (patch)
tree3c694b882e0c462699ff781e425a7f0a7a2c437b
parentf793ed30b070f5210822b04ce203cec6d49463c5 (diff)
parente0d2c7e2c7a2010dda1352bbf9482d7b1ee1bc71 (diff)
downloadrabbitmq-server-5286cf1aceff6967dbb0dfe34d7a7aa387c4106f.tar.gz
merge default into bug25191
-rw-r--r--src/rabbit_alarm.erl6
-rw-r--r--src/rabbit_reader.erl71
2 files changed, 57 insertions, 20 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 6d24d130..17f1edcf 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -37,7 +37,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]).
-spec(set_alarm/1 :: (any()) -> 'ok').
-spec(clear_alarm/1 :: (any()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
@@ -93,8 +93,8 @@ init([]) ->
alarmed_nodes = dict:new(),
alarms = []}}.
-handle_call({register, Pid, AlertMFA}, State) ->
- {ok, 0 < dict:size(State#alarms.alarmed_nodes),
+handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) ->
+ {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])),
internal_register(Pid, AlertMFA, State)};
handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 61fac0e2..a2727067 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -45,7 +45,8 @@
client_properties, capabilities,
auth_mechanism, auth_state}).
--record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
+-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at,
+ blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -142,8 +143,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_resources(Pid, _Source, Conserve) ->
- Pid ! {conserve_resources, Conserve},
+conserve_resources(Pid, Source, Conserve) ->
+ Pid ! {conserve_resources, Source, Conserve},
ok.
server_properties(Protocol) ->
@@ -246,9 +247,10 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
throttle = #throttle{
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never}},
+ conserve_resources = [],
+ last_blocked_by = none,
+ last_blocked_at = never,
+ blocked_sent = false}},
try
run({?MODULE, recvloop,
[Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -321,9 +323,14 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.
-handle_other({conserve_resources, Conserve},
- State = #v1{throttle = Throttle}) ->
- Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+handle_other({conserve_resources, Source, Conserve},
+ State = #v1{throttle = Throttle =
+ #throttle{conserve_resources = CR}}) ->
+ CR1 = case Conserve of
+ true -> lists:usort([Source | CR]);
+ false -> CR -- [Source]
+ end,
+ Throttle1 = Throttle#throttle{conserve_resources = CR1},
control_throttle(State#v1{throttle = Throttle1});
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
@@ -409,30 +416,60 @@ terminate(_Explanation, State) ->
{force, State}.
control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
- case {CS, (Throttle#throttle.conserve_resources orelse
+ case {CS, ((Throttle#throttle.conserve_resources =/= []) orelse
credit_flow:blocked())} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
- State#v1{connection_state = running};
+ maybe_send_unblocked(State),
+ State#v1{connection_state = running,
+ throttle = Throttle#throttle{
+ blocked_sent = false}};
{blocked, true} -> State#v1{throttle = update_last_blocked_by(
Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
+maybe_block(State = #v1{connection_state = blocking,
+ throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ Sent = maybe_send_blocked(State),
State#v1{connection_state = blocked,
throttle = update_last_blocked_by(
- Throttle#throttle{last_blocked_at = erlang:now()})};
+ Throttle#throttle{last_blocked_at = erlang:now(),
+ blocked_sent = Sent})};
maybe_block(State) ->
State.
-update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
- Throttle#throttle{last_blocked_by = resource};
-update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
- Throttle#throttle{last_blocked_by = flow}.
+maybe_send_blocked(#v1{throttle = #throttle{conserve_resources = []}}) ->
+ false;
+maybe_send_blocked(#v1{throttle = #throttle{conserve_resources = CR},
+ connection = #connection{
+ protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ RStr = string:join([atom_to_list(A) || A <- CR], " & "),
+ Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
+ ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
+ Protocol),
+ true;
+ _ ->
+ false
+ end.
+
+maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) ->
+ ok;
+maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol).
+
+update_last_blocked_by(Throttle = #throttle{conserve_resources = []}) ->
+ Throttle#throttle{last_blocked_by = flow};
+update_last_blocked_by(Throttle) ->
+ Throttle#throttle{last_blocked_by = resource}.
%%--------------------------------------------------------------------------
%% error handling / termination