diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-12 22:37:32 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-12 22:37:32 +0000 |
commit | cafab603838269f6bff32148510f088aea315453 (patch) | |
tree | 897aeabf75b8ab79ed94549bb95372ca31c0afbc | |
parent | 10bf454c95b54a87d86d9fab152862d5856f1b40 (diff) | |
download | rabbitmq-server-cafab603838269f6bff32148510f088aea315453.tar.gz |
a little bit of refactoring on the reader
- move the 'blocked'-induced heartbeater disabling to the point at which
the transition to the 'blocked' state takes place. This makes the
correlation between these two events more obvious. It also prevents
duplicate invocations of rabbit_heartbeat:pause_monitor/1, which
hitherto was accomplished by relying on switch_callback/3 never
getting called while 'blocked' - an assumption that could easily
become invalid one day.
- a little bit of inlining of timeout calculation code in
close_connection/1.
- more compact case handling in handle_dependent_exit/3 and
wait_for_channel_termination/2.
-rw-r--r-- | src/rabbit_reader.erl | 58 |
1 files changed, 23 insertions, 35 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 045cc969..ff5eac21 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -345,10 +345,6 @@ handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). -switch_callback(State = #v1{connection_state = blocked, - heartbeater = Heartbeater}, Callback, Length) -> - ok = rabbit_heartbeat:pause_monitor(Heartbeater), - State#v1{callback = Callback, recv_len = Length}; switch_callback(State, Callback, Length) -> State#v1{callback = Callback, recv_len = Length}. @@ -380,28 +376,22 @@ close_connection(State = #v1{queue_collector = Collector, rabbit_queue_collector:delete_all(Collector), %% We terminate the connection after the specified interval, but %% no later than ?CLOSING_TIMEOUT seconds. - TimeoutMillisec = - 1000 * if TimeoutSec > 0 andalso - TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; - true -> ?CLOSING_TIMEOUT - end, - erlang:send_after(TimeoutMillisec, self(), terminate_connection), + erlang:send_after((if TimeoutSec > 0 andalso + TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; + true -> ?CLOSING_TIMEOUT + end) * 1000, self(), terminate_connection), State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> - case termination_kind(Reason) of - controlled -> - channel_cleanup(ChPid), + case {channel_cleanup(ChPid), termination_kind(Reason)} of + {undefined, uncontrolled} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {_Channel, controlled} -> maybe_close(State); - uncontrolled -> - case channel_cleanup(ChPid) of - undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> rabbit_log:error( - "connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), - maybe_close( - handle_exception(State, Channel, Reason)) - end + {Channel, uncontrolled} -> + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close(handle_exception(State, Channel, Reason)) end. channel_cleanup(ChPid) -> @@ -436,19 +426,15 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> - case channel_cleanup(ChPid) of - undefined -> + case {channel_cleanup(ChPid), termination_kind(Reason)} of + {undefined, _} -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> - case termination_kind(Reason) of - controlled -> - ok; - uncontrolled -> - rabbit_log:error( - "connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]) - end, + {_Channel, controlled} -> + wait_for_channel_termination(N-1, TimerRef); + {Channel, uncontrolled} -> + rabbit_log:error("connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), wait_for_channel_termination(N-1, TimerRef) end; cancel_wait -> @@ -525,7 +511,9 @@ post_process_frame({method, MethodName, _}, _ChPid, case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), case State#v1.connection_state of - blocking -> State#v1{connection_state = blocked}; + blocking -> ok = rabbit_heartbeat:pause_monitor( + State#v1.heartbeater), + State#v1{connection_state = blocked}; _ -> State end; false -> State |