diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-03 18:12:15 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-03 18:12:15 +0000 |
commit | 3bf95f76bf25d4646e7d79bc11ae983f2b434abe (patch) | |
tree | e933b1e93639b9d2e6f152e92f2f0aab3f8cd72e | |
parent | d01883ca941aaecd6da1ff45626af7217c046a10 (diff) | |
download | rabbitmq-server-bug23743.tar.gz |
Not really sure if this is getting any better. I've introduced bugs because of conflating a couple of areas, but I'm getting more familiar with it, and it might eventually get better. We shall see.bug23743
-rw-r--r-- | src/rabbit_reader.erl | 206 |
1 files changed, 105 insertions, 101 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 12f77964..881198a9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -340,12 +340,11 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> - throw(E); {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_exception(State, Channel, Reason)); + mainloop(Deb, handle_channel_event(Channel, {died, Reason}, State)); {'DOWN', _MRef, process, ChPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); + mainloop(Deb, handle_channel_event(undefined, {down, ChPid, Reason}, + State)); terminate_connection -> State; handshake_timeout -> @@ -448,19 +447,6 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_dependent_exit(ChPid, Reason, State) -> - case termination_kind(Reason) of - controlled -> - erase({ch_pid, ChPid}), - maybe_close(State); - uncontrolled -> - case channel_cleanup(ChPid) of - undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> maybe_close( - handle_exception(State, Channel, Reason)) - end - end. - channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of undefined -> undefined; @@ -471,42 +457,30 @@ channel_cleanup(ChPid) -> all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. -terminate_channels() -> +terminate_channels(State) -> NChannels = length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), - if NChannels > 0 -> - Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, - TimerRef = erlang:send_after(Timeout, self(), cancel_wait), - wait_for_channel_termination(NChannels, TimerRef); - true -> ok + case NChannels of + 0 -> State; + _ -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, + TimerRef = erlang:send_after(Timeout, self(), cancel_wait), + wait_for_channel_termination(NChannels, TimerRef, State) end. -wait_for_channel_termination(0, TimerRef) -> +wait_for_channel_termination(0, TimerRef, State) -> case erlang:cancel_timer(TimerRef) of false -> receive cancel_wait -> ok end; _ -> ok - end; + end, + State; -wait_for_channel_termination(N, TimerRef) -> +wait_for_channel_termination(N, TimerRef, State) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> - case channel_cleanup(ChPid) of - undefined -> - exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> - case termination_kind(Reason) of - controlled -> - ok; - uncontrolled -> - rabbit_log:error( - "connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]) - end, - wait_for_channel_termination(N-1, TimerRef) - end; + wait_for_channel_termination( + N-1, TimerRef, handle_channel_down(ChPid, Reason, State)); cancel_wait -> exit(channel_termination_timeout) end. @@ -524,8 +498,8 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. -termination_kind(normal) -> controlled; -termination_kind(_) -> uncontrolled. +termination_kind(normal) -> controlled; +termination_kind(_) -> uncontrolled. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, @@ -553,55 +527,7 @@ handle_frame(Type, Channel, Payload, case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - AnalyzedFrame -> - case get({channel, Channel}) of - {ChPid, FramingState} -> - NewAState = process_channel_frame( - AnalyzedFrame, self(), - Channel, ChPid, FramingState), - put({channel, Channel}, {ChPid, NewAState}), - case AnalyzedFrame of - {method, 'channel.close', _} -> - erase({channel, Channel}), - State; - {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking - andalso - Protocol:method_has_content(MethodName)) of - true -> State#v1{connection_state = blocked}; - false -> State - end; - _ -> - State - end; - closing -> - %% According to the spec, after sending a - %% channel.close we must ignore all frames except - %% channel.close and channel.close_ok. In the - %% event of a channel.close, we should send back a - %% channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erase({channel, Channel}); - {method, 'channel.close', _} -> - %% We're already closing this channel, so - %% there's no cleanup to do (notify - %% queues, etc.) - ok = rabbit_writer:internal_send_command( - State#v1.sock, Channel, - #'channel.close_ok'{}, Protocol); - _ -> ok - end, - State; - undefined -> - case ?IS_RUNNING(State) of - true -> send_to_new_channel( - Channel, AnalyzedFrame, State); - false -> throw({channel_frame_while_starting, - Channel, State#v1.connection_state, - AnalyzedFrame}) - end - end + AnalyzedFrame -> handle_methodN(Channel, AnalyzedFrame, State) end. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> @@ -861,6 +787,92 @@ auth_phase(Response, %%-------------------------------------------------------------------------- +handle_methodN(Channel, Frame, State) -> + handle_channel_event(Channel, {frame, Frame}, State). + +handle_channel_event(Channel, Event, State) -> + case channel_event(Channel, get({channel, Channel}), Event, State) of + {erase_channel, State1} -> + erase({channel, Channel}), + State1; + {{update_channel, Value}, State1} -> + put({channel, Channel}, Value), + State1; + {{send, Method}, State1 = + #v1{connection = #connection{protocol = Protocol}, sock = Sock}} -> + ok = rabbit_writer:internal_send_command( + Sock, Channel, Method, Protocol), + State1; + {{send_to_new_channel, Frame}, State1} -> + send_to_new_channel(Channel, Frame, State1); + {{send_exception, Reason}, State1} -> + send_exception(State1, Channel, Reason); + {maybe_close, State1} -> + maybe_close(State1); + {noop, State1} -> + State1 + end. + +%% Frame +channel_event(Channel, {ChPid, FramingState}, {frame, Frame}, + State = #v1{connection = #connection{protocol = Protocol}}) -> + FramingState1 = process_channel_frame(Frame, self(), + Channel, ChPid, FramingState), + case Frame of + {method, 'channel.close', _} -> + {erase_channel, State}; + {method, MethodName, _} -> + State1 = case (State#v1.connection_state =:= blocking andalso + Protocol:method_has_content(MethodName)) of + true -> State#v1{connection_state = blocked}; + false -> State + end, + {{update_channel, {ChPid, FramingState1}}, State1}; + _ -> + {{update_channel, {ChPid, FramingState1}}, State} + end; +channel_event(_Channel, closing, {frame, {method, 'channel.close_ok', _}}, + State) -> + {erase_channel, State}; +channel_event(_Channel, closing, {frame, {method, 'channel.close', _}}, + State) -> + {{send, #'channel.close_ok'{}}, State}; +channel_event(Channel, undefined, {frame, Frame}, State) -> + case ?IS_RUNNING(State) of + true -> {{send_to_new_channel, Frame}, State}; + false -> throw({channel_frame_while_starting, + Channel, State#v1.connection_state, + Frame}) + end; + +%% exits and downs +channel_event(_Channel, _ChState, {died, E = {writer, send_failed, _Error}}, + _State) -> + throw(E); +channel_event(Channel, _ChState, {died, Reason}, + State = #v1{connection_state = closed}) -> + log_channel_error(closed, Channel, Reason), + {noop, State}; +channel_event(Channel, _ChState, {died, Reason}, + State = #v1{connection_state = CS}) -> + log_channel_error(CS, Channel, Reason), + {{send_exception, Reason}, State}; +channel_event(undefined, undefined, {down, ChPid, Reason}, State) -> + {maybe_close, handle_channel_down(ChPid, Reason, State)}. + +handle_channel_down(ChPid, Reason, State) -> + case {termination_kind(Reason), channel_cleanup(ChPid)} of + {controlled, _} -> + State; + {uncontrolled, undefined} -> + exit({abnormal_channel_termination, ChPid, Reason}); + {uncontrolled, Channel} -> + %% channel died without us forwarding it a channel.close + handle_channel_event(Channel, {died, Reason}, State) + end. + +%%-------------------------------------------------------------------------- + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> @@ -984,20 +996,12 @@ log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", [self(), ConnectionState, Channel, Reason]). -handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log_channel_error(closed, Channel, Reason), - State; -handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> - log_channel_error(CS, Channel, Reason), - send_exception(State, Channel, Reason). - send_exception(State = #v1{connection = #connection{protocol = Protocol}}, Channel, Reason) -> {ShouldClose, CloseChannel, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of - true -> terminate_channels(), - close_connection(State); + true -> close_connection(terminate_channels(State)); false -> close_channel(Channel, State) end, ok = rabbit_writer:internal_send_command( |