diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-02 14:41:06 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-02 14:41:06 +0100 |
commit | d2d7e8dc326e31caddd2e3829b127dcd1fef8212 (patch) | |
tree | 3d4886fe1b64860ff1bb897b1b6f85ff6bb78252 /src | |
parent | c63c043677c34c7ed26dfb0eebfd7ece0bec5613 (diff) | |
download | rabbitmq-server-d2d7e8dc326e31caddd2e3829b127dcd1fef8212.tar.gz |
fold the memory conservation status into the connection state
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_reader.erl | 57 |
1 files changed, 33 insertions, 24 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f9dd75ad..2efda4fb 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -58,7 +58,7 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector, heartbeater, conserving_memory}). + queue_collector, heartbeater}). -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, @@ -134,6 +134,11 @@ %% %% TODO: refactor the code so that the above is obvious +-define(IS_RUNNING(State), + (State#v1.connection_state =:= running orelse + State#v1.connection_state =:= blocking orelse + State#v1.connection_state =:= blocked)). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -260,8 +265,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> recv_ref = none, connection_state = pre_init, queue_collector = Collector, - heartbeater = none, - conserving_memory = false}, + heartbeater = none}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -325,7 +329,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> terminate_connection -> State; handshake_timeout -> - if State#v1.connection_state =:= running orelse + if ?IS_RUNNING(State) orelse State#v1.connection_state =:= closing orelse State#v1.connection_state =:= closed -> mainloop(Parent, Deb, State); @@ -357,7 +361,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit({unexpected_message, Other}) end. -switch_callback(State = #v1{conserving_memory = active, +switch_callback(State = #v1{connection_state = blocked, heartbeater = Heartbeater}, Callback, Length) -> ok = rabbit_heartbeat:pause_monitor(Heartbeater), State#v1{callback = {Callback, Length}, recv_ref = none}; @@ -366,21 +370,25 @@ switch_callback(State, Callback, Length) -> State#v1.sock, Length, infinity) end), State#v1{callback = Callback, recv_ref = Ref}. -terminate(Explanation, State = #v1{connection_state = running}) -> +terminate(Explanation, State) when ?IS_RUNNING(State) -> {normal, send_exception(State, 0, rabbit_misc:amqp_error( connection_forced, Explanation, [], none))}; terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(false, State = #v1{conserving_memory = active, +internal_conserve_memory(true, State = #v1{connection_state = running}) -> + State#v1{connection_state = blocking}; +internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> + State#v1{connection_state = running}; +internal_conserve_memory(false, State = #v1{connection_state = blocked, heartbeater = Heartbeater, callback = {Callback, Length}, recv_ref = none}) -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), - switch_callback(State#v1{conserving_memory = false}, Callback, Length); -internal_conserve_memory(Conserve, State) -> - State#v1{conserving_memory = Conserve}. + switch_callback(State#v1{connection_state = running}, Callback, Length); +internal_conserve_memory(_Conserve, State) -> + State. close_connection(State = #v1{connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -512,9 +520,9 @@ handle_frame(Type, Channel, Payload, erase({channel, Channel}), State; {method, MethodName, _} -> - case (State#v1.conserving_memory == true andalso + case (State#v1.connection_state == blocking andalso Protocol:method_has_content(MethodName)) of - true -> State#v1{conserving_memory = active}; + true -> State#v1{connection_state = blocked}; false -> State end; _ -> @@ -539,12 +547,13 @@ handle_frame(Type, Channel, Payload, end, State; undefined -> - case State#v1.connection_state of - running -> ok = send_to_new_channel( - Channel, AnalyzedFrame, State), - State; - Other -> throw({channel_frame_while_starting, - Channel, Other, AnalyzedFrame}) + case ?IS_RUNNING(State) of + true -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), + State; + false -> throw({channel_frame_while_starting, + Channel, State#v1.connection_state, + AnalyzedFrame}) end end end. @@ -641,13 +650,14 @@ handle_method0(MethodName, FieldsBin, Reason#amqp_error{method = MethodName}; OtherReason -> OtherReason end, - case State#v1.connection_state of - running -> send_exception(State, 0, CompleteReason); + case ?IS_RUNNING(State) of + true -> send_exception(State, 0, CompleteReason); %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. - Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, Other, CompleteReason}) + false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, State#v1.connection_state, + CompleteReason}) end end. @@ -703,8 +713,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State#v1{connection_state = running, connection = NewConnection}; -handle_method0(#'connection.close'{}, - State = #v1{connection_state = running}) -> +handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, |