summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-02-20 16:05:56 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-02-20 16:05:56 +0000
commit1ced2443128de5c1f6b97997a48ee60c02d62dce (patch)
treec3c93c1a219b839f7f7daa14bf01c39679aa8b34
parent4501bf173c7aaa78b15bdfb5cf2669fc5bc9c7f2 (diff)
downloadrabbitmq-server-bug26013.tar.gz
improve clarity and correctnessbug26013
- introduce blocked_by_alarm state predicate which makes it easier / more obvious to detect the transitions to/from - do not send 'unblocked' when the client doesn't have the capability - ditch superfluous true/false return from maybe_send_unblocked (now send_unblocked) - detect the transition into blocked_by_alarm that can result from handling conserve_resources (the original purpose of the bug)
-rw-r--r--src/rabbit_reader.erl68
1 files changed, 38 insertions, 30 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 68acebfd..f9fd4d4e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -321,24 +321,17 @@ stop(Reason, State) -> maybe_emit_stats(State),
throw({inet_error, Reason}).
handle_other({conserve_resources, Source, Conserve},
- State = #v1{connection_state = CS,
- throttle = Throttle =
- #throttle{alarmed_by = CR}}) ->
+ State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) ->
CR1 = case Conserve of
true -> lists:usort([Source | CR]);
false -> CR -- [Source]
end,
- Throttle1 = Throttle#throttle{alarmed_by = CR1},
- State1 = control_throttle(State#v1{throttle = Throttle1}),
- case {CS, State1#v1.connection_state, (CR =/= []), (CR1 =:= [])} of
- {blocked, running, _, _} ->
- send_unblocked(State1),
- ok;
- {blocked, blocked, true, true} ->
- send_unblocked(State1),
- ok;
- {_, _, _, _} ->
- ok
+ State1 = control_throttle(
+ State#v1{throttle = Throttle#throttle{alarmed_by = CR1}}),
+ case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
+ {false, true} -> ok = send_blocked(State1);
+ {true, false} -> ok = send_unblocked(State1);
+ {_, _} -> ok
end,
State1;
handle_other({channel_closing, ChPid}, State) ->
@@ -441,34 +434,49 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
maybe_block(State = #v1{connection_state = blocking,
throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- maybe_send_blocked(State),
- State#v1{connection_state = blocked,
- throttle = update_last_blocked_by(
- Throttle#throttle{last_blocked_at = erlang:now()})};
+ State1 = State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{
+ last_blocked_at = erlang:now()})},
+ case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
+ {false, true} -> ok = send_blocked(State1);
+ {_, _} -> ok
+ end,
+ State1;
maybe_block(State) ->
State.
-maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) ->
- false;
-maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
- connection = #connection{
- protocol = Protocol,
- capabilities = Capabilities},
- sock = Sock}) ->
+
+blocked_by_alarm(#v1{connection_state = blocked,
+ throttle = #throttle{alarmed_by = CR}})
+ when CR =/= [] ->
+ true;
+blocked_by_alarm(#v1{}) ->
+ false.
+
+send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
+ connection = #connection{protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
{bool, true} ->
RStr = string:join([atom_to_list(A) || A <- CR], " & "),
Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
- Protocol),
- true;
+ Protocol);
_ ->
- false
+ ok
end.
-send_unblocked(#v1{connection = #connection{protocol = Protocol},
+send_unblocked(#v1{connection = #connection{protocol = Protocol,
+ capabilities = Capabilities},
sock = Sock}) ->
- ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol).
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol);
+ _ ->
+ ok
+ end.
update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->
Throttle#throttle{last_blocked_by = flow};