diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-02 14:02:25 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-02 14:02:25 +0100 |
commit | c63c043677c34c7ed26dfb0eebfd7ece0bec5613 (patch) | |
tree | fcea781dc1a81abaa96022f133e389d0c1310693 | |
parent | 46d48171d63b40e1cdc4650a278a9af48717e7ed (diff) | |
download | rabbitmq-server-c63c043677c34c7ed26dfb0eebfd7ece0bec5613.tar.gz |
only stop reading from clients that publish while memory is tight
-rw-r--r-- | src/rabbit_reader.erl | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2c1a24c4..f9dd75ad 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -357,10 +357,9 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit({unexpected_message, Other}) end. -switch_callback(State = #v1{conserving_memory = true, +switch_callback(State = #v1{conserving_memory = active, heartbeater = Heartbeater}, Callback, Length) -> ok = rabbit_heartbeat:pause_monitor(Heartbeater), - %% TODO: only do this after receiving a content-bearing method State#v1{callback = {Callback, Length}, recv_ref = none}; switch_callback(State, Callback, Length) -> Ref = inet_op(fun () -> rabbit_net:async_recv( @@ -374,7 +373,7 @@ terminate(Explanation, State = #v1{connection_state = running}) -> terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(false, State = #v1{conserving_memory = true, +internal_conserve_memory(false, State = #v1{conserving_memory = active, heartbeater = Heartbeater, callback = {Callback, Length}, recv_ref = none}) -> @@ -507,13 +506,20 @@ handle_frame(Type, Channel, Payload, %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), case AnalyzedFrame of {method, 'channel.close', _} -> - erase({channel, Channel}); - _ -> ok - end, - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), - State; + erase({channel, Channel}), + State; + {method, MethodName, _} -> + case (State#v1.conserving_memory == true andalso + Protocol:method_has_content(MethodName)) of + true -> State#v1{conserving_memory = active}; + false -> State + end; + _ -> + State + end; closing -> %% According to the spec, after sending a %% channel.close we must ignore all frames except |