diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-20 11:04:22 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-20 11:04:22 +0100 |
commit | 327ea2f8b5ade0c9c6d132992147b46d469b871f (patch) | |
tree | fa32dcf53ff61cd7e016c4d9f65bacef89ad2ca5 | |
parent | c21895db64a40c07ead558addc737cd0fec92f04 (diff) | |
download | rabbitmq-server-bug17174.tar.gz |
improve flow control by penalising message ingestion in the schedulerbug17174
-rw-r--r-- | src/rabbit_reader.erl | 32 |
1 files changed, 18 insertions, 14 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3822aaeb..b4871cef 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -502,20 +502,7 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame, self(), Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - channel_cleanup(ChPid), - State; - {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking - andalso - Protocol:method_has_content(MethodName)) of - true -> State#v1{connection_state = blocked}; - false -> State - end; - _ -> - State - end; + post_process_frame(AnalyzedFrame, ChPid, State); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -527,6 +514,23 @@ handle_frame(Type, Channel, Payload, end end. +post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> + channel_cleanup(ChPid), + State; +post_process_frame({method, MethodName, _}, _ChPid, + State = #v1{connection = #connection{ + protocol = Protocol}}) -> + case Protocol:method_has_content(MethodName) of + true -> erlang:bump_reductions(2000), + case State#v1.connection_state of + blocking -> State#v1{connection_state = blocked}; + _ -> State + end; + false -> State + end; +post_process_frame(_Frame, _ChPid, State) -> + State. + handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, |