diff options
authorMatthias Radestock <>2012-07-15 16:58:43 +0100
committerMatthias Radestock <>2012-07-15 16:58:43 +0100
commit5d878a6666046c4ae92efa65d86fd8859c295161 (patch)
parent3206dd9d996afa4e728e80beba6bd926b309bf21 (diff)
refactor: move things around in the reader
...for better grouping of functions
1 files changed, 65 insertions, 56 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 86255790..ee3bf29a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) ->
update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
+%% error handling / termination
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -412,18 +417,6 @@ handle_dependent_exit(ChPid, Reason, State) ->
Channel, Reason))
-channel_cleanup(ChPid) ->
- case get({ch_pid, ChPid}) of
- undefined -> undefined;
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- Channel
- end.
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
terminate_channels() ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
@@ -477,6 +470,53 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
+ State;
+handle_exception(State, Channel, Reason) ->
+ send_exception(State, Channel, Reason).
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ terminate_channels(),
+ State1 = close_connection(State),
+ ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
+ State1.
+create_channel(Channel, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
+ end.
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -522,6 +562,17 @@ process_frame(Frame, Channel, State) ->
Channel, State#v1.connection_state, Frame})
+process_channel_frame(Frame, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> {ok, NewAState};
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ {ok, NewAState};
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
+ {ok, NewAState};
+ {error, Reason} -> {error, Reason}
+ end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -536,6 +587,8 @@ post_process_frame({method, MethodName, _}, _ChPid,
post_process_frame(_Frame, _ChPid, State) ->
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -897,50 +950,6 @@ cert_info(F, Sock) ->
{ok, Cert} -> list_to_binary(F(Cert))
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- ok.
-process_channel_frame(Frame, ChPid, AState) ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
- end.
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
- State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
- {0, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
- ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
- State1.
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).