summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-12 22:37:32 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-12 22:37:32 +0000
commitcafab603838269f6bff32148510f088aea315453 (patch)
tree897aeabf75b8ab79ed94549bb95372ca31c0afbc
parent10bf454c95b54a87d86d9fab152862d5856f1b40 (diff)
downloadrabbitmq-server-cafab603838269f6bff32148510f088aea315453.tar.gz
a little bit of refactoring on the reader
- move the 'blocked'-induced heartbeater disabling to the point at which the transition to the 'blocked' state takes place. This makes the correlation between these two events more obvious. It also prevents duplicate invocations of rabbit_heartbeat:pause_monitor/1, which hitherto was accomplished by relying on switch_callback/3 never getting called while 'blocked' - an assumption that could easily become invalid one day. - a little bit of inlining of timeout calculation code in close_connection/1. - more compact case handling in handle_dependent_exit/3 and wait_for_channel_termination/2.
-rw-r--r--src/rabbit_reader.erl58
1 files changed, 23 insertions, 35 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 045cc969..ff5eac21 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -345,10 +345,6 @@ handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
-switch_callback(State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}, Callback, Length) ->
- ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- State#v1{callback = Callback, recv_len = Length};
switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
@@ -380,28 +376,22 @@ close_connection(State = #v1{queue_collector = Collector,
rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
- TimeoutMillisec =
- 1000 * if TimeoutSec > 0 andalso
- TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
- true -> ?CLOSING_TIMEOUT
- end,
- erlang:send_after(TimeoutMillisec, self(), terminate_connection),
+ erlang:send_after((if TimeoutSec > 0 andalso
+ TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
+ true -> ?CLOSING_TIMEOUT
+ end) * 1000, self(), terminate_connection),
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case termination_kind(Reason) of
- controlled ->
- channel_cleanup(ChPid),
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, uncontrolled} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_Channel, controlled} ->
maybe_close(State);
- uncontrolled ->
- case channel_cleanup(ChPid) of
- undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
- Channel -> rabbit_log:error(
- "connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
- maybe_close(
- handle_exception(State, Channel, Reason))
- end
+ {Channel, uncontrolled} ->
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(handle_exception(State, Channel, Reason))
end.
channel_cleanup(ChPid) ->
@@ -436,19 +426,15 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case channel_cleanup(ChPid) of
- undefined ->
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, _} ->
exit({abnormal_dependent_exit, ChPid, Reason});
- Channel ->
- case termination_kind(Reason) of
- controlled ->
- ok;
- uncontrolled ->
- rabbit_log:error(
- "connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason])
- end,
+ {_Channel, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef);
+ {Channel, uncontrolled} ->
+ rabbit_log:error("connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
@@ -525,7 +511,9 @@ post_process_frame({method, MethodName, _}, _ChPid,
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
case State#v1.connection_state of
- blocking -> State#v1{connection_state = blocked};
+ blocking -> ok = rabbit_heartbeat:pause_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = blocked};
_ -> State
end;
false -> State