summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-06 15:32:02 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-06 15:32:02 +0100
commit0d789ad162e8b0974f90dd6d4fe7257be9f17f49 (patch)
treef571572edea0eee96f4d5f82cba9b79a881a4298
parentb13e397a82b45b2726fd6f9b7758accbd811496d (diff)
parent56b47f4015bc99b50608c8ca472de6a0aea68f6b (diff)
downloadrabbitmq-server-0d789ad162e8b0974f90dd6d4fe7257be9f17f49.tar.gz
Merge bug25191
-rw-r--r--.hgignore1
-rw-r--r--src/rabbit_alarm.erl72
-rw-r--r--src/rabbit_reader.erl79
3 files changed, 96 insertions, 56 deletions
diff --git a/.hgignore b/.hgignore
index 912b4a56..cd017298 100644
--- a/.hgignore
+++ b/.hgignore
@@ -3,6 +3,7 @@ syntax: glob
*~
*.swp
*.patch
+*.orig
erl_crash.dump
deps.mk
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 6607c4f6..cd1d125b 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -37,7 +37,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]).
-spec(set_alarm/1 :: (any()) -> 'ok').
-spec(clear_alarm/1 :: (any()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
@@ -93,8 +93,8 @@ init([]) ->
alarmed_nodes = dict:new(),
alarms = []}}.
-handle_call({register, Pid, AlertMFA}, State) ->
- {ok, 0 < dict:size(State#alarms.alarmed_nodes),
+handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) ->
+ {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])),
internal_register(Pid, AlertMFA, State)};
handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
@@ -104,11 +104,20 @@ handle_call(_Request, State) ->
{ok, not_understood, State}.
handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
- handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]});
+ case lists:member(Alarm, Alarms) of
+ true -> {ok, State};
+ false -> UpdatedAlarms = lists:usort([Alarm|Alarms]),
+ handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms})
+ end;
handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
- handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1,
- Alarms)});
+ case lists:keymember(Alarm, 1, Alarms) of
+ true -> handle_clear_alarm(
+ Alarm, State#alarms{alarms = lists:keydelete(
+ Alarm, 1, Alarms)});
+ false -> {ok, State}
+
+ end;
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
@@ -118,7 +127,7 @@ handle_event({node_up, Node}, State) ->
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)};
handle_event({register, Pid, AlertMFA}, State) ->
{ok, internal_register(Pid, AlertMFA, State)};
@@ -141,45 +150,36 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+dict_append(Key, Val, Dict) ->
+ L = case dict:find(Key, Dict) of
+ {ok, V} -> V;
+ error -> []
+ end,
+ dict:store(Key, lists:usort([Val|L]), Dict).
+
dict_unappend_all(Key, _Val, Dict) ->
dict:erase(Key, Dict).
dict_unappend(Key, Val, Dict) ->
- case lists:delete(Val, dict:fetch(Key, Dict)) of
+ L = case dict:find(Key, Dict) of
+ {ok, V} -> V;
+ error -> []
+ end,
+
+ case lists:delete(Val, L) of
[] -> dict:erase(Key, Dict);
X -> dict:store(Key, X, Dict)
end.
-count_dict_values(Val, Dict) ->
- dict:fold(fun (_Node, List, Count) ->
- Count + case lists:member(Val, List) of
- true -> 1;
- false -> 0
- end
- end, 0, Dict).
-
-maybe_alert(UpdateFun, Node, Source,
+maybe_alert(UpdateFun, Node, Source, Alert,
State = #alarms{alarmed_nodes = AN,
alertees = Alertees}) ->
AN1 = UpdateFun(Node, Source, AN),
- BeforeSz = count_dict_values(Source, AN),
- AfterSz = count_dict_values(Source, AN1),
-
- %% If we have changed our alarm state, inform the remotes.
- IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz ->
- ok = alert_remote(true, Alertees, Source);
- IsLocal andalso BeforeSz > AfterSz ->
- ok = alert_remote(false, Alertees, Source);
- true ->
- ok
- end,
- %% If the overall alarm state has changed, inform the locals.
- case {dict:size(AN), dict:size(AN1)} of
- {0, 1} -> ok = alert_local(true, Alertees, Source);
- {1, 0} -> ok = alert_local(false, Alertees, Source);
- {_, _} -> ok
+ case node() of
+ Node -> ok = alert_remote(Alert, Alertees, Source);
+ _ -> ok
end,
+ ok = alert_local(Alert, Alertees, Source),
State#alarms{alarmed_nodes = AN1}.
alert_local(Alert, Alertees, Source) ->
@@ -214,7 +214,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
[Source, Node]),
- {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
rabbit_log:warning(
"file descriptor limit alarm set.~n~n"
@@ -229,7 +229,7 @@ handle_set_alarm(Alarm, State) ->
handle_clear_alarm({resource_limit, Source, Node}, State) ->
rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)};
handle_clear_alarm(file_descriptor_limit, State) ->
rabbit_log:warning("file descriptor limit alarm cleared~n"),
{ok, State};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 6d3ac2d9..9c902703 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -45,7 +45,8 @@
client_properties, capabilities,
auth_mechanism, auth_state}).
--record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
+-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at,
+ blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -142,8 +143,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_resources(Pid, _Source, Conserve) ->
- Pid ! {conserve_resources, Conserve},
+conserve_resources(Pid, Source, Conserve) ->
+ Pid ! {conserve_resources, Source, Conserve},
ok.
server_properties(Protocol) ->
@@ -178,7 +179,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
{<<"basic.nack">>, bool, true},
- {<<"consumer_cancel_notify">>, bool, true}];
+ {<<"consumer_cancel_notify">>, bool, true},
+ {<<"connection.blocked">>, bool, true}];
server_capabilities(_) ->
[].
@@ -246,9 +248,10 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
throttle = #throttle{
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never}},
+ alarmed_by = [],
+ last_blocked_by = none,
+ last_blocked_at = never,
+ blocked_sent = false}},
try
run({?MODULE, recvloop,
[Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -321,9 +324,14 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.
-handle_other({conserve_resources, Conserve},
- State = #v1{throttle = Throttle}) ->
- Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+handle_other({conserve_resources, Source, Conserve},
+ State = #v1{throttle = Throttle =
+ #throttle{alarmed_by = CR}}) ->
+ CR1 = case Conserve of
+ true -> lists:usort([Source | CR]);
+ false -> CR -- [Source]
+ end,
+ Throttle1 = Throttle#throttle{alarmed_by = CR1},
control_throttle(State#v1{throttle = Throttle1});
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
@@ -409,30 +417,61 @@ terminate(_Explanation, State) ->
{force, State}.
control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
- case {CS, (Throttle#throttle.conserve_resources orelse
- credit_flow:blocked())} of
+ IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse
+ credit_flow:blocked()),
+ case {CS, IsThrottled} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
- State#v1{connection_state = running};
+ maybe_send_unblocked(State),
+ State#v1{connection_state = running,
+ throttle = Throttle#throttle{
+ blocked_sent = false}};
{blocked, true} -> State#v1{throttle = update_last_blocked_by(
Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
+maybe_block(State = #v1{connection_state = blocking,
+ throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ Sent = maybe_send_blocked(State),
State#v1{connection_state = blocked,
throttle = update_last_blocked_by(
- Throttle#throttle{last_blocked_at = erlang:now()})};
+ Throttle#throttle{last_blocked_at = erlang:now(),
+ blocked_sent = Sent})};
maybe_block(State) ->
State.
-update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
- Throttle#throttle{last_blocked_by = resource};
-update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
- Throttle#throttle{last_blocked_by = flow}.
+maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) ->
+ false;
+maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
+ connection = #connection{
+ protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ RStr = string:join([atom_to_list(A) || A <- CR], " & "),
+ Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
+ ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
+ Protocol),
+ true;
+ _ ->
+ false
+ end.
+
+maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) ->
+ ok;
+maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol).
+
+update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->
+ Throttle#throttle{last_blocked_by = flow};
+update_last_blocked_by(Throttle) ->
+ Throttle#throttle{last_blocked_by = resource}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -847,7 +886,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
- Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ Throttle1 = Throttle#throttle{alarmed_by = Conserve},
{ok, ChannelSupSupPid} =
supervisor2:start_child(
ChSup3Pid,