authorMatthew Sackman <>2011-02-03 18:12:15 +0000
committerMatthew Sackman <>2011-02-03 18:12:15 +0000
commit3bf95f76bf25d4646e7d79bc11ae983f2b434abe (patch)
parentd01883ca941aaecd6da1ff45626af7217c046a10 (diff)
Not really sure if this is getting any better. I've introduced bugs because of conflating a couple of areas, but I'm getting more familiar with it, and it might eventually get better. We shall see.bug23743
1 files changed, 105 insertions, 101 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 12f77964..881198a9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -340,12 +340,11 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
- {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
- throw(E);
{channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
+ mainloop(Deb, handle_channel_event(Channel, {died, Reason}, State));
{'DOWN', _MRef, process, ChPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
+ mainloop(Deb, handle_channel_event(undefined, {down, ChPid, Reason},
+ State));
terminate_connection ->
handshake_timeout ->
@@ -448,19 +447,6 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
-handle_dependent_exit(ChPid, Reason, State) ->
- case termination_kind(Reason) of
- controlled ->
- erase({ch_pid, ChPid}),
- maybe_close(State);
- uncontrolled ->
- case channel_cleanup(ChPid) of
- undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
- Channel -> maybe_close(
- handle_exception(State, Channel, Reason))
- end
- end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
undefined -> undefined;
@@ -471,42 +457,30 @@ channel_cleanup(ChPid) ->
all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
-terminate_channels() ->
+terminate_channels(State) ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
- if NChannels > 0 ->
- Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
- TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
- wait_for_channel_termination(NChannels, TimerRef);
- true -> ok
+ case NChannels of
+ 0 -> State;
+ _ -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
+ TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
+ wait_for_channel_termination(NChannels, TimerRef, State)
-wait_for_channel_termination(0, TimerRef) ->
+wait_for_channel_termination(0, TimerRef, State) ->
case erlang:cancel_timer(TimerRef) of
false -> receive
cancel_wait -> ok
_ -> ok
- end;
+ end,
+ State;
-wait_for_channel_termination(N, TimerRef) ->
+wait_for_channel_termination(N, TimerRef, State) ->
{'DOWN', _MRef, process, ChPid, Reason} ->
- case channel_cleanup(ChPid) of
- undefined ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- Channel ->
- case termination_kind(Reason) of
- controlled ->
- ok;
- uncontrolled ->
- rabbit_log:error(
- "connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason])
- end,
- wait_for_channel_termination(N-1, TimerRef)
- end;
+ wait_for_channel_termination(
+ N-1, TimerRef, handle_channel_down(ChPid, Reason, State));
cancel_wait ->
@@ -524,8 +498,8 @@ maybe_close(State = #v1{connection_state = closing,
maybe_close(State) ->
-termination_kind(normal) -> controlled;
-termination_kind(_) -> uncontrolled.
+termination_kind(normal) -> controlled;
+termination_kind(_) -> uncontrolled.
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
@@ -553,55 +527,7 @@ handle_frame(Type, Channel, Payload,
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame ->
- case get({channel, Channel}) of
- {ChPid, FramingState} ->
- NewAState = process_channel_frame(
- AnalyzedFrame, self(),
- Channel, ChPid, FramingState),
- put({channel, Channel}, {ChPid, NewAState}),
- case AnalyzedFrame of
- {method, 'channel.close', _} ->
- erase({channel, Channel}),
- State;
- {method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking
- andalso
- Protocol:method_has_content(MethodName)) of
- true -> State#v1{connection_state = blocked};
- false -> State
- end;
- _ ->
- State
- end;
- closing ->
- %% According to the spec, after sending a
- %% channel.close we must ignore all frames except
- %% channel.close and channel.close_ok. In the
- %% event of a channel.close, we should send back a
- %% channel.close_ok.
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- erase({channel, Channel});
- {method, 'channel.close', _} ->
- %% We're already closing this channel, so
- %% there's no cleanup to do (notify
- %% queues, etc.)
- ok = rabbit_writer:internal_send_command(
- State#v1.sock, Channel,
- #'channel.close_ok'{}, Protocol);
- _ -> ok
- end,
- State;
- undefined ->
- case ?IS_RUNNING(State) of
- true -> send_to_new_channel(
- Channel, AnalyzedFrame, State);
- false -> throw({channel_frame_while_starting,
- Channel, State#v1.connection_state,
- AnalyzedFrame})
- end
- end
+ AnalyzedFrame -> handle_methodN(Channel, AnalyzedFrame, State)
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
@@ -861,6 +787,92 @@ auth_phase(Response,
+handle_methodN(Channel, Frame, State) ->
+ handle_channel_event(Channel, {frame, Frame}, State).
+handle_channel_event(Channel, Event, State) ->
+ case channel_event(Channel, get({channel, Channel}), Event, State) of
+ {erase_channel, State1} ->
+ erase({channel, Channel}),
+ State1;
+ {{update_channel, Value}, State1} ->
+ put({channel, Channel}, Value),
+ State1;
+ {{send, Method}, State1 =
+ #v1{connection = #connection{protocol = Protocol}, sock = Sock}} ->
+ ok = rabbit_writer:internal_send_command(
+ Sock, Channel, Method, Protocol),
+ State1;
+ {{send_to_new_channel, Frame}, State1} ->
+ send_to_new_channel(Channel, Frame, State1);
+ {{send_exception, Reason}, State1} ->
+ send_exception(State1, Channel, Reason);
+ {maybe_close, State1} ->
+ maybe_close(State1);
+ {noop, State1} ->
+ State1
+ end.
+%% Frame
+channel_event(Channel, {ChPid, FramingState}, {frame, Frame},
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ FramingState1 = process_channel_frame(Frame, self(),
+ Channel, ChPid, FramingState),
+ case Frame of
+ {method, 'channel.close', _} ->
+ {erase_channel, State};
+ {method, MethodName, _} ->
+ State1 = case (State#v1.connection_state =:= blocking andalso
+ Protocol:method_has_content(MethodName)) of
+ true -> State#v1{connection_state = blocked};
+ false -> State
+ end,
+ {{update_channel, {ChPid, FramingState1}}, State1};
+ _ ->
+ {{update_channel, {ChPid, FramingState1}}, State}
+ end;
+channel_event(_Channel, closing, {frame, {method, 'channel.close_ok', _}},
+ State) ->
+ {erase_channel, State};
+channel_event(_Channel, closing, {frame, {method, 'channel.close', _}},
+ State) ->
+ {{send, #'channel.close_ok'{}}, State};
+channel_event(Channel, undefined, {frame, Frame}, State) ->
+ case ?IS_RUNNING(State) of
+ true -> {{send_to_new_channel, Frame}, State};
+ false -> throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state,
+ Frame})
+ end;
+%% exits and downs
+channel_event(_Channel, _ChState, {died, E = {writer, send_failed, _Error}},
+ _State) ->
+ throw(E);
+channel_event(Channel, _ChState, {died, Reason},
+ State = #v1{connection_state = closed}) ->
+ log_channel_error(closed, Channel, Reason),
+ {noop, State};
+channel_event(Channel, _ChState, {died, Reason},
+ State = #v1{connection_state = CS}) ->
+ log_channel_error(CS, Channel, Reason),
+ {{send_exception, Reason}, State};
+channel_event(undefined, undefined, {down, ChPid, Reason}, State) ->
+ {maybe_close, handle_channel_down(ChPid, Reason, State)}.
+handle_channel_down(ChPid, Reason, State) ->
+ case {termination_kind(Reason), channel_cleanup(ChPid)} of
+ {controlled, _} ->
+ State;
+ {uncontrolled, undefined} ->
+ exit({abnormal_channel_termination, ChPid, Reason});
+ {uncontrolled, Channel} ->
+ %% channel died without us forwarding it a channel.close
+ handle_channel_event(Channel, {died, Reason}, State)
+ end.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) ->
@@ -984,20 +996,12 @@ log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
[self(), ConnectionState, Channel, Reason]).
-handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log_channel_error(closed, Channel, Reason),
- State;
-handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
- log_channel_error(CS, Channel, Reason),
- send_exception(State, Channel, Reason).
send_exception(State = #v1{connection = #connection{protocol = Protocol}},
Channel, Reason) ->
{ShouldClose, CloseChannel, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
- true -> terminate_channels(),
- close_connection(State);
+ true -> close_connection(terminate_channels(State));
false -> close_channel(Channel, State)
ok = rabbit_writer:internal_send_command(