summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-02 14:02:25 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-02 14:02:25 +0100
commitc63c043677c34c7ed26dfb0eebfd7ece0bec5613 (patch)
treefcea781dc1a81abaa96022f133e389d0c1310693
parent46d48171d63b40e1cdc4650a278a9af48717e7ed (diff)
downloadrabbitmq-server-c63c043677c34c7ed26dfb0eebfd7ece0bec5613.tar.gz
only stop reading from clients that publish while memory is tight
-rw-r--r--src/rabbit_reader.erl22
1 files changed, 14 insertions, 8 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2c1a24c4..f9dd75ad 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -357,10 +357,9 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
end.
-switch_callback(State = #v1{conserving_memory = true,
+switch_callback(State = #v1{conserving_memory = active,
heartbeater = Heartbeater}, Callback, Length) ->
ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- %% TODO: only do this after receiving a content-bearing method
State#v1{callback = {Callback, Length}, recv_ref = none};
switch_callback(State, Callback, Length) ->
Ref = inet_op(fun () -> rabbit_net:async_recv(
@@ -374,7 +373,7 @@ terminate(Explanation, State = #v1{connection_state = running}) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(false, State = #v1{conserving_memory = true,
+internal_conserve_memory(false, State = #v1{conserving_memory = active,
heartbeater = Heartbeater,
callback = {Callback, Length},
recv_ref = none}) ->
@@ -507,13 +506,20 @@ handle_frame(Type, Channel, Payload,
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
- erase({channel, Channel});
- _ -> ok
- end,
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
- State;
+ erase({channel, Channel}),
+ State;
+ {method, MethodName, _} ->
+ case (State#v1.conserving_memory == true andalso
+ Protocol:method_has_content(MethodName)) of
+ true -> State#v1{conserving_memory = active};
+ false -> State
+ end;
+ _ ->
+ State
+ end;
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except