diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-11 16:16:15 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-11 16:16:15 +0100 |
commit | a6a531f2147831521f5a1e2f4811de263a6a78fe (patch) | |
tree | cce8cec5d9b0d1337f05b20f6be70a2c5dab786f | |
parent | 5301419aade68db6925f44d10b50bb770fcbc207 (diff) | |
download | rabbitmq-server-a6a531f2147831521f5a1e2f4811de263a6a78fe.tar.gz |
Get the reader to link to the chan_sup instead of the framing_chan. It used to link to the framing chan, which meant that when it got the exit signal from the framing_chan, it could be sure that the channel and writer had already died. However, this is no longer the case - now the framing_chan is actually the last to start and first to exit in the chan_sup and so the reader needs to link to the chan_sup instead. This means the reader needs to track both the framing_chan and the chan_sup
-rw-r--r-- | src/rabbit_reader.erl | 63 |
1 files changed, 35 insertions, 28 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 69f6773f..313b7aaf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -367,10 +367,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> exit(Reason); {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_channel_exit(Channel, Reason, State)); - {'EXIT', Pid, Reason} -> - mainloop(Deb, handle_dependent_exit(Pid, Reason, State)); + {channel_exit, ChannelOrFrPid, Reason} -> + mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); + {'EXIT', ChSupPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -463,13 +463,13 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) -> - {channel, Channel} = get({chpid, ChPid}), +handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> + {channel, Channel} = get({ch_fr_pid, ChFrPid}), handle_exception(State, Channel, Reason); handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, Reason, State) -> +handle_dependent_exit(ChSupPid, Reason, State) -> case (case Reason of normal -> controlled; shutdown -> controlled; @@ -477,30 +477,36 @@ handle_dependent_exit(Pid, Reason, State) -> _ -> uncontrolled end) of controlled -> - erase({chpid, Pid}), + case erase({ch_sup_pid, ChSupPid}) of + undefined -> ok; + {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) + end, maybe_close(State); uncontrolled -> - case channel_cleanup(Pid) of + case channel_cleanup(ChSupPid) of undefined -> - exit({abnormal_dependent_exit, Pid, Reason}); + exit({abnormal_dependent_exit, ChSupPid, Reason}); Channel -> maybe_close(handle_exception(State, Channel, Reason)) end end. -channel_cleanup(Pid) -> - case get({chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> erase({channel, Channel}), - erase({chpid, Pid}), - Channel +channel_cleanup(ChSupPid) -> + case get({ch_sup_pid, ChSupPid}) of + undefined -> undefined; + {{channel, Channel}, ChFr} -> erase({channel, Channel}), + erase(ChFr), + erase({ch_sup_pid, ChSupPid}), + Channel end. -all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. +all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, + {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. terminate_channels() -> NChannels = - length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]), + length([rabbit_framing_channel:shutdown(ChFrPid) + || ChFrPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -518,10 +524,10 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'EXIT', Pid, Reason} -> - case channel_cleanup(Pid) of + {'EXIT', ChSupPid, Reason} -> + case channel_cleanup(ChSupPid) of undefined -> - exit({abnormal_dependent_exit, Pid, Reason}); + exit({abnormal_dependent_exit, ChSupPid, Reason}); Channel -> case Reason of normal -> ok; @@ -581,8 +587,8 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of - {chpid, ChPid} -> - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), + {ch_fr_pid, ChFrPid} -> + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), @@ -888,14 +894,15 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, _ChanSup, ChPid} = + {ok, ChSupPid, ChFrPid} = rabbit_channel_sup_sup:start_channel( ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector]), - link(ChPid), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). + link(ChSupPid), + put({channel, Channel}, {ch_fr_pid, ChFrPid}), + put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), + put({ch_fr_pid, ChFrPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", |