summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-06 12:31:03 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-06 12:31:03 +0000
commitb7e4bb978ddac088e0151037db35c8cf2fd4667e (patch)
tree8c42286ee3befbfcfead8afedc23c4bd6609274e
parent5b673b52e76cc2880e78f0557e03dcb83c43c420 (diff)
downloadrabbitmq-server-b7e4bb978ddac088e0151037db35c8cf2fd4667e.tar.gz
refactor: extract function to handle frame and pass to channel
-rw-r--r--src/rabbit_reader.erl47
1 files changed, 24 insertions, 23 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 48bc6be2..74e5fc77 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,6 +41,8 @@
-export([conserve_memory/2, server_properties/0]).
+-export([process_channel_frame/5]). %% used be erlang-client
+
-export([emit_stats/1]).
-define(HANDSHAKE_TIMEOUT, 10).
@@ -551,22 +553,23 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame ->
case get({channel, Channel}) of
{ChPid, FramingState} ->
- State1 = process_channel_frame(
- AnalyzedFrame, Channel, ChPid, FramingState,
- State),
+ NewAState = process_channel_frame(
+ AnalyzedFrame, self(),
+ Channel, ChPid, FramingState),
+ put({channel, Channel}, {ChPid, NewAState}),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
- State1;
+ State;
{method, MethodName, _} ->
case (State#v1.connection_state =:= blocking
andalso
Protocol:method_has_content(MethodName)) of
- true -> State1#v1{connection_state = blocked};
- false -> State1
+ true -> State#v1{connection_state = blocked};
+ false -> State
end;
_ ->
- State1
+ State
end;
closing ->
%% According to the spec, after sending a
@@ -949,24 +952,22 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
erlang:monitor(process, ChPid),
put({channel, Channel}, {ChPid, AState}),
put({ch_pid, ChPid}, Channel),
- process_channel_frame(AnalyzedFrame, Channel, ChPid, AState, State).
+ NewAState = process_channel_frame(AnalyzedFrame, self(),
+ Channel, ChPid, AState),
+ put({channel, Channel}, {ChPid, NewAState}),
+ State.
-process_channel_frame(Frame, Channel, ChPid, AState, State) ->
- UpdateAState = fun (NewAState) ->
- put({channel, Channel}, {ChPid, NewAState}),
- State
- end,
+process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- UpdateAState(NewAState);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- UpdateAState(NewAState);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do(ChPid, Method, Content),
- UpdateAState(NewAState);
- {error, Reason} ->
- handle_exception(State, Channel, Reason)
+ {ok, NewAState} -> NewAState;
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ NewAState;
+ {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
+ Method, Content),
+ NewAState;
+ {error, Reason} -> ErrPid ! {channel_exit, Channel,
+ Reason},
+ AState
end.
log_channel_error(ConnectionState, Channel, Reason) ->