summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-02 14:41:06 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-02 14:41:06 +0100
commitd2d7e8dc326e31caddd2e3829b127dcd1fef8212 (patch)
tree3d4886fe1b64860ff1bb897b1b6f85ff6bb78252
parentc63c043677c34c7ed26dfb0eebfd7ece0bec5613 (diff)
downloadrabbitmq-server-d2d7e8dc326e31caddd2e3829b127dcd1fef8212.tar.gz
fold the memory conservation status into the connection state
-rw-r--r--src/rabbit_reader.erl57
1 files changed, 33 insertions, 24 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f9dd75ad..2efda4fb 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -58,7 +58,7 @@
%---------------------------------------------------------------------------
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector, heartbeater, conserving_memory}).
+ queue_collector, heartbeater}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -134,6 +134,11 @@
%%
%% TODO: refactor the code so that the above is obvious
+-define(IS_RUNNING(State),
+ (State#v1.connection_state =:= running orelse
+ State#v1.connection_state =:= blocking orelse
+ State#v1.connection_state =:= blocked)).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -260,8 +265,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
recv_ref = none,
connection_state = pre_init,
queue_collector = Collector,
- heartbeater = none,
- conserving_memory = false},
+ heartbeater = none},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -325,7 +329,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
terminate_connection ->
State;
handshake_timeout ->
- if State#v1.connection_state =:= running orelse
+ if ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
mainloop(Parent, Deb, State);
@@ -357,7 +361,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
end.
-switch_callback(State = #v1{conserving_memory = active,
+switch_callback(State = #v1{connection_state = blocked,
heartbeater = Heartbeater}, Callback, Length) ->
ok = rabbit_heartbeat:pause_monitor(Heartbeater),
State#v1{callback = {Callback, Length}, recv_ref = none};
@@ -366,21 +370,25 @@ switch_callback(State, Callback, Length) ->
State#v1.sock, Length, infinity) end),
State#v1{callback = Callback, recv_ref = Ref}.
-terminate(Explanation, State = #v1{connection_state = running}) ->
+terminate(Explanation, State) when ?IS_RUNNING(State) ->
{normal, send_exception(State, 0,
rabbit_misc:amqp_error(
connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(false, State = #v1{conserving_memory = active,
+internal_conserve_memory(true, State = #v1{connection_state = running}) ->
+ State#v1{connection_state = blocking};
+internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
+ State#v1{connection_state = running};
+internal_conserve_memory(false, State = #v1{connection_state = blocked,
heartbeater = Heartbeater,
callback = {Callback, Length},
recv_ref = none}) ->
ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- switch_callback(State#v1{conserving_memory = false}, Callback, Length);
-internal_conserve_memory(Conserve, State) ->
- State#v1{conserving_memory = Conserve}.
+ switch_callback(State#v1{connection_state = running}, Callback, Length);
+internal_conserve_memory(_Conserve, State) ->
+ State.
close_connection(State = #v1{connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -512,9 +520,9 @@ handle_frame(Type, Channel, Payload,
erase({channel, Channel}),
State;
{method, MethodName, _} ->
- case (State#v1.conserving_memory == true andalso
+ case (State#v1.connection_state == blocking andalso
Protocol:method_has_content(MethodName)) of
- true -> State#v1{conserving_memory = active};
+ true -> State#v1{connection_state = blocked};
false -> State
end;
_ ->
@@ -539,12 +547,13 @@ handle_frame(Type, Channel, Payload,
end,
State;
undefined ->
- case State#v1.connection_state of
- running -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
- Other -> throw({channel_frame_while_starting,
- Channel, Other, AnalyzedFrame})
+ case ?IS_RUNNING(State) of
+ true -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
+ State;
+ false -> throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state,
+ AnalyzedFrame})
end
end
end.
@@ -641,13 +650,14 @@ handle_method0(MethodName, FieldsBin,
Reason#amqp_error{method = MethodName};
OtherReason -> OtherReason
end,
- case State#v1.connection_state of
- running -> send_exception(State, 0, CompleteReason);
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, CompleteReason);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
- Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, Other, CompleteReason})
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state,
+ CompleteReason})
end
end.
@@ -703,8 +713,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
State#v1{connection_state = running,
connection = NewConnection};
-handle_method0(#'connection.close'{},
- State = #v1{connection_state = running}) ->
+handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},