diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-07-15 16:58:43 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-07-15 16:58:43 +0100 |
commit | 5d878a6666046c4ae92efa65d86fd8859c295161 (patch) | |
tree | 95c58e5bf63ffa28927113b6fb6a59d8ec7bf5a2 | |
parent | 3206dd9d996afa4e728e80beba6bd926b309bf21 (diff) | |
download | rabbitmq-server-5d878a6666046c4ae92efa65d86fd8859c295161.tar.gz |
refactor: move things around in the reader
...for better grouping of functions
-rw-r--r-- | src/rabbit_reader.erl | 121 |
1 files changed, 65 insertions, 56 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 86255790..ee3bf29a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> server_capabilities(_) -> []. +%%-------------------------------------------------------------------------- + log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) -> update_last_blocked_by(State = #v1{conserve_resources = false}) -> State#v1{last_blocked_by = flow}. +%%-------------------------------------------------------------------------- +%% error handling / termination + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -412,18 +417,6 @@ handle_dependent_exit(ChPid, Reason, State) -> Channel, Reason)) end. -channel_cleanup(ChPid) -> - case get({ch_pid, ChPid}) of - undefined -> undefined; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - Channel - end. - -all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. - terminate_channels() -> NChannels = length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), @@ -477,6 +470,53 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. +handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> + State; +handle_exception(State, Channel, Reason) -> + send_exception(State, Channel, Reason). + +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> + {0, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + terminate_channels(), + State1 = close_connection(State), + ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), + State1. + +%%-------------------------------------------------------------------------- + +create_channel(Channel, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State, + {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), + Protocol, User, VHost, Capabilities, Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + ok. + +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + {Channel, MRef} -> credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + Channel + end. + +all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. + +%%-------------------------------------------------------------------------- + handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) @@ -522,6 +562,17 @@ process_frame(Frame, Channel, State) -> Channel, State#v1.connection_state, Frame}) end. +process_channel_frame(Frame, ChPid, AState) -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> {ok, NewAState}; + {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), + {ok, NewAState}; + {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( + ChPid, Method, Content), + {ok, NewAState}; + {error, Reason} -> {error, Reason} + end. + post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), control_throttle(State); @@ -536,6 +587,8 @@ post_process_frame({method, MethodName, _}, _ChPid, post_process_frame(_Frame, _ChPid, State) -> control_throttle(State). +%%-------------------------------------------------------------------------- + handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, @@ -897,50 +950,6 @@ cert_info(F, Sock) -> {ok, Cert} -> list_to_binary(F(Cert)) end. -%%-------------------------------------------------------------------------- - -create_channel(Channel, State) -> - #v1{sock = Sock, queue_collector = Collector, - channel_sup_sup_pid = ChanSupSup, - connection = #connection{protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost, - capabilities = Capabilities}} = State, - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - ok. - -process_channel_frame(Frame, ChPid, AState) -> - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> {ok, NewAState}; - {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - {ok, NewAState}; - {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( - ChPid, Method, Content), - {ok, NewAState}; - {error, Reason} -> {error, Reason} - end. - -handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> - State; -handle_exception(State, Channel, Reason) -> - send_exception(State, Channel, Reason). - -send_exception(State = #v1{connection = #connection{protocol = Protocol}}, - Channel, Reason) -> - {0, CloseMethod} = - rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - terminate_channels(), - State1 = close_connection(State), - ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), - State1. - emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). |