summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-20 11:04:22 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-20 11:04:22 +0100
commit327ea2f8b5ade0c9c6d132992147b46d469b871f (patch)
treefa32dcf53ff61cd7e016c4d9f65bacef89ad2ca5
parentc21895db64a40c07ead558addc737cd0fec92f04 (diff)
downloadrabbitmq-server-bug17174.tar.gz
improve flow control by penalising message ingestion in the schedulerbug17174
-rw-r--r--src/rabbit_reader.erl32
1 files changed, 18 insertions, 14 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3822aaeb..b4871cef 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -502,20 +502,7 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame, self(),
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- channel_cleanup(ChPid),
- State;
- {method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking
- andalso
- Protocol:method_has_content(MethodName)) of
- true -> State#v1{connection_state = blocked};
- false -> State
- end;
- _ ->
- State
- end;
+ post_process_frame(AnalyzedFrame, ChPid, State);
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -527,6 +514,23 @@ handle_frame(Type, Channel, Payload,
end
end.
+post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
+ channel_cleanup(ChPid),
+ State;
+post_process_frame({method, MethodName, _}, _ChPid,
+ State = #v1{connection = #connection{
+ protocol = Protocol}}) ->
+ case Protocol:method_has_content(MethodName) of
+ true -> erlang:bump_reductions(2000),
+ case State#v1.connection_state of
+ blocking -> State#v1{connection_state = blocked};
+ _ -> State
+ end;
+ false -> State
+ end;
+post_process_frame(_Frame, _ChPid, State) ->
+ State.
+
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},