summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-18 18:34:20 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-18 18:34:20 +0000
commitb1ddc98ab0ddcdfa065abcc8088722243b946d4e (patch)
tree07c3179d27a33e9802d84fae8b3364f94980b0bf
parentbe5107d83082bcfd2292b307dc4903fa2c78fa6f (diff)
downloadrabbitmq-server-b1ddc98ab0ddcdfa065abcc8088722243b946d4e.tar.gz
Ensure that we resume socket reading in all edge casesbug25457
Specifically in all the cases where handle_other might have changed the connection_state. This is most straightforward and obvious to guarantee by always invoking recvloop after handle_other, unless we are stopping. This does expose an inconsistency in the various non-throw/exit termination cases: two of them were returning State, the other ok. Let's go with the latter; it's easiest. We also take the opportunity to eliminate 'Deb' from the handle_other signature. This is only needed in the {system, ...} message case, which we now handle specially.
-rw-r--r--src/rabbit_reader.erl102
1 files changed, 54 insertions, 48 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3a517677..0b5155da 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -277,24 +277,33 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
- {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
- closed -> case State#v1.connection_state of
- closed -> State;
- _ -> throw(connection_closed_abruptly)
- end;
- {error, Reason} -> throw({inet_error, Reason});
- {other, Other} -> handle_other(Other, Deb, State)
+ {data, Data} ->
+ recvloop(Deb, State#v1{buf = [Data | Buf],
+ buf_len = BufLen + size(Data),
+ pending_recv = false});
+ closed when State#v1.connection_state =:= closed ->
+ ok;
+ closed ->
+ throw(connection_closed_abruptly);
+ {error, Reason} ->
+ throw({inet_error, Reason});
+ {other, {system, From, Request}} ->
+ sys:handle_system_msg(Request, From, State#v1.parent,
+ ?MODULE, Deb, State);
+ {other, Other} ->
+ case handle_other(Other, State) of
+ stop -> ok;
+ NewState -> recvloop(Deb, NewState)
+ end
end.
-handle_other({conserve_resources, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
-handle_other({channel_closing, ChPid}, Deb, State) ->
+handle_other({conserve_resources, Conserve}, State) ->
+ control_throttle(State#v1{conserve_resources = Conserve});
+handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(control_throttle(State)));
-handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ maybe_close(control_throttle(State));
+handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
@@ -306,59 +315,56 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
%% initiated by our parent it is probably more important to exit
%% quickly.
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
- _Deb, _State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) ->
throw(E);
-handle_other({channel_exit, Channel, Reason}, Deb, State) ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
-handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
-handle_other(terminate_connection, _Deb, State) ->
- State;
-handle_other(handshake_timeout, Deb, State)
+handle_other({channel_exit, Channel, Reason}, State) ->
+ handle_exception(State, Channel, Reason);
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
+ handle_dependent_exit(ChPid, Reason, State);
+handle_other(terminate_connection, _State) ->
+ stop;
+handle_other(handshake_timeout, State)
when ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
- mainloop(Deb, State);
-handle_other(handshake_timeout, _Deb, State) ->
+ State;
+handle_other(handshake_timeout, State) ->
throw({handshake_timeout, State#v1.callback});
-handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
- mainloop(Deb, State);
-handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
+ State;
+handle_other(heartbeat_timeout, #v1{connection_state = S}) ->
throw({heartbeat_timeout, S});
-handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
+ force -> stop;
+ normal -> NewState
end;
-handle_other({'$gen_call', From, info}, Deb, State) ->
+handle_other({'$gen_call', From, info}, State) ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
-handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ State;
+handle_other({'$gen_call', From, {info, Items}}, State) ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State)
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State)
when ?IS_RUNNING(State) ->
rabbit_event:notify(connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
- mainloop(Deb, State);
-handle_other(ensure_stats, Deb, State) ->
- mainloop(Deb, ensure_stats_timer(State));
-handle_other(emit_stats, Deb, State) ->
- mainloop(Deb, emit_stats(State));
-handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
-handle_other({bump_credit, Msg}, Deb, State) ->
+ State;
+handle_other(ensure_stats, State) ->
+ ensure_stats_timer(State);
+handle_other(emit_stats, State) ->
+ emit_stats(State);
+handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
- recvloop(Deb, control_throttle(State));
-handle_other(Other, _Deb, _State) ->
+ control_throttle(State);
+handle_other(Other, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).