From b1ddc98ab0ddcdfa065abcc8088722243b946d4e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 18 Feb 2013 18:34:20 +0000 Subject: Ensure that we resume socket reading in all edge cases Specifically in all the cases where handle_other might have changed the connection_state. This is most straightforward and obvious to guarantee by always invoking recvloop after handle_other, unless we are stopping. This does expose an inconsistency in the various non-throw/exit termination cases: two of them were returning State, the other ok. Let's go with the latter; it's easiest. We also take the opportunity to eliminate 'Deb' from the handle_other signature. This is only needed in the {system, ...} message case, which we now handle specially. --- src/rabbit_reader.erl | 102 ++++++++++++++++++++++++++------------------------ 1 file changed, 54 insertions(+), 48 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3a517677..0b5155da 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -277,24 +277,33 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> case rabbit_net:recv(Sock) of - {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); - closed -> case State#v1.connection_state of - closed -> State; - _ -> throw(connection_closed_abruptly) - end; - {error, Reason} -> throw({inet_error, Reason}); - {other, Other} -> handle_other(Other, Deb, State) + {data, Data} -> + recvloop(Deb, State#v1{buf = [Data | Buf], + buf_len = BufLen + size(Data), + pending_recv = false}); + closed when State#v1.connection_state =:= closed -> + ok; + closed -> + throw(connection_closed_abruptly); + {error, Reason} -> + throw({inet_error, Reason}); + {other, {system, From, Request}} -> + sys:handle_system_msg(Request, From, State#v1.parent, + ?MODULE, Deb, State); + {other, Other} -> + case handle_other(Other, State) of + stop -> ok; + NewState -> recvloop(Deb, NewState) + end end. -handle_other({conserve_resources, Conserve}, Deb, State) -> - recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve})); -handle_other({channel_closing, ChPid}, Deb, State) -> +handle_other({conserve_resources, Conserve}, State) -> + control_throttle(State#v1{conserve_resources = Conserve}); +handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(control_throttle(State))); -handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> + maybe_close(control_throttle(State)); +handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to @@ -306,59 +315,56 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> %% initiated by our parent it is probably more important to exit %% quickly. exit(Reason); -handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, - _Deb, _State) -> +handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) -> throw(E); -handle_other({channel_exit, Channel, Reason}, Deb, State) -> - mainloop(Deb, handle_exception(State, Channel, Reason)); -handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); -handle_other(terminate_connection, _Deb, State) -> - State; -handle_other(handshake_timeout, Deb, State) +handle_other({channel_exit, Channel, Reason}, State) -> + handle_exception(State, Channel, Reason); +handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) -> + handle_dependent_exit(ChPid, Reason, State); +handle_other(terminate_connection, _State) -> + stop; +handle_other(handshake_timeout, State) when ?IS_RUNNING(State) orelse State#v1.connection_state =:= closing orelse State#v1.connection_state =:= closed -> - mainloop(Deb, State); -handle_other(handshake_timeout, _Deb, State) -> + State; +handle_other(handshake_timeout, State) -> throw({handshake_timeout, State#v1.callback}); -handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) -> - mainloop(Deb, State); -handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> +handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> + State; +handle_other(heartbeat_timeout, #v1{connection_state = S}) -> throw({heartbeat_timeout, S}); -handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> +handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), case ForceTermination of - force -> ok; - normal -> mainloop(Deb, NewState) + force -> stop; + normal -> NewState end; -handle_other({'$gen_call', From, info}, Deb, State) -> +handle_other({'$gen_call', From, info}, State) -> gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Deb, State); -handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> + State; +handle_other({'$gen_call', From, {info, Items}}, State) -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) + State; +handle_other({'$gen_cast', force_event_refresh}, State) when ?IS_RUNNING(State) -> rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + State; +handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. - mainloop(Deb, State); -handle_other(ensure_stats, Deb, State) -> - mainloop(Deb, ensure_stats_timer(State)); -handle_other(emit_stats, Deb, State) -> - mainloop(Deb, emit_stats(State)); -handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); -handle_other({bump_credit, Msg}, Deb, State) -> + State; +handle_other(ensure_stats, State) -> + ensure_stats_timer(State); +handle_other(emit_stats, State) -> + emit_stats(State); +handle_other({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), - recvloop(Deb, control_throttle(State)); -handle_other(Other, _Deb, _State) -> + control_throttle(State); +handle_other(Other, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). -- cgit v1.2.1