diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-24 11:17:25 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-24 11:17:25 +0000 |
commit | c1012385819d70b91ceefbf38a940b86f89c128f (patch) | |
tree | 8ac6f1eeabc4091053bb1df922cfe7e0b2070696 | |
parent | 1393e28472dceec62b5baa563474963c2d1754c1 (diff) | |
parent | 1ced2443128de5c1f6b97997a48ee60c02d62dce (diff) | |
download | rabbitmq-server-c1012385819d70b91ceefbf38a940b86f89c128f.tar.gz |
merge bug26013 into stable
-rw-r--r-- | src/rabbit_reader.erl | 76 |
1 files changed, 44 insertions, 32 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1ae9bacf..f9fd4d4e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -44,8 +44,7 @@ client_properties, capabilities, auth_mechanism, auth_state}). --record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, - blocked_sent}). +-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -243,8 +242,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> throttle = #throttle{ alarmed_by = [], last_blocked_by = none, - last_blocked_at = never, - blocked_sent = false}}, + last_blocked_at = never}}, try run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( @@ -323,14 +321,19 @@ stop(Reason, State) -> maybe_emit_stats(State), throw({inet_error, Reason}). handle_other({conserve_resources, Source, Conserve}, - State = #v1{throttle = Throttle = - #throttle{alarmed_by = CR}}) -> + State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> CR1 = case Conserve of true -> lists:usort([Source | CR]); false -> CR -- [Source] end, - Throttle1 = Throttle#throttle{alarmed_by = CR1}, - control_throttle(State#v1{throttle = Throttle1}); + State1 = control_throttle( + State#v1{throttle = Throttle#throttle{alarmed_by = CR1}}), + case {blocked_by_alarm(State), blocked_by_alarm(State1)} of + {false, true} -> ok = send_blocked(State1); + {true, false} -> ok = send_unblocked(State1); + {_, _} -> ok + end, + State1; handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -422,10 +425,7 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - maybe_send_unblocked(State), - State#v1{connection_state = running, - throttle = Throttle#throttle{ - blocked_sent = false}}; + State#v1{connection_state = running}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State @@ -434,37 +434,49 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), - Sent = maybe_send_blocked(State), - State#v1{connection_state = blocked, - throttle = update_last_blocked_by( - Throttle#throttle{last_blocked_at = erlang:now(), - blocked_sent = Sent})}; + State1 = State#v1{connection_state = blocked, + throttle = update_last_blocked_by( + Throttle#throttle{ + last_blocked_at = erlang:now()})}, + case {blocked_by_alarm(State), blocked_by_alarm(State1)} of + {false, true} -> ok = send_blocked(State1); + {_, _} -> ok + end, + State1; maybe_block(State) -> State. -maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) -> - false; -maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, - connection = #connection{ - protocol = Protocol, - capabilities = Capabilities}, - sock = Sock}) -> + +blocked_by_alarm(#v1{connection_state = blocked, + throttle = #throttle{alarmed_by = CR}}) + when CR =/= [] -> + true; +blocked_by_alarm(#v1{}) -> + false. + +send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, + connection = #connection{protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> RStr = string:join([atom_to_list(A) || A <- CR], " & "), Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, - Protocol), - true; + Protocol); _ -> - false + ok end. -maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> - ok; -maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, - sock = Sock}) -> - ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). +send_unblocked(#v1{connection = #connection{protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> + case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of + {bool, true} -> + ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol); + _ -> + ok + end. update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> Throttle#throttle{last_blocked_by = flow}; |