diff options
-rw-r--r-- | src/rabbit_reader.erl | 47 |
1 files changed, 24 insertions, 23 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 48bc6be2..74e5fc77 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,6 +41,8 @@ -export([conserve_memory/2, server_properties/0]). +-export([process_channel_frame/5]). %% used be erlang-client + -export([emit_stats/1]). -define(HANDSHAKE_TIMEOUT, 10). @@ -551,22 +553,23 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame -> case get({channel, Channel}) of {ChPid, FramingState} -> - State1 = process_channel_frame( - AnalyzedFrame, Channel, ChPid, FramingState, - State), + NewAState = process_channel_frame( + AnalyzedFrame, self(), + Channel, ChPid, FramingState), + put({channel, Channel}, {ChPid, NewAState}), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), - State1; + State; {method, MethodName, _} -> case (State#v1.connection_state =:= blocking andalso Protocol:method_has_content(MethodName)) of - true -> State1#v1{connection_state = blocked}; - false -> State1 + true -> State#v1{connection_state = blocked}; + false -> State end; _ -> - State1 + State end; closing -> %% According to the spec, after sending a @@ -949,24 +952,22 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> erlang:monitor(process, ChPid), put({channel, Channel}, {ChPid, AState}), put({ch_pid, ChPid}, Channel), - process_channel_frame(AnalyzedFrame, Channel, ChPid, AState, State). + NewAState = process_channel_frame(AnalyzedFrame, self(), + Channel, ChPid, AState), + put({channel, Channel}, {ChPid, NewAState}), + State. -process_channel_frame(Frame, Channel, ChPid, AState, State) -> - UpdateAState = fun (NewAState) -> - put({channel, Channel}, {ChPid, NewAState}), - State - end, +process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - UpdateAState(NewAState); - {ok, Method, NewAState} -> - rabbit_channel:do(ChPid, Method), - UpdateAState(NewAState); - {ok, Method, Content, NewAState} -> - rabbit_channel:do(ChPid, Method, Content), - UpdateAState(NewAState); - {error, Reason} -> - handle_exception(State, Channel, Reason) + {ok, NewAState} -> NewAState; + {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), + NewAState; + {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, + Method, Content), + NewAState; + {error, Reason} -> ErrPid ! {channel_exit, Channel, + Reason}, + AState end. log_channel_error(ConnectionState, Channel, Reason) -> |