diff options
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r-- | src/rabbit_reader.erl | 32 |
1 files changed, 20 insertions, 12 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 059f88ba..90207938 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -93,8 +93,10 @@ -spec(system_terminate/4 :: (_,_,_,_) -> none()). -spec(process_channel_frame/5 :: - (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(), - tuple()) -> tuple()). + (rabbit_command_assembler:frame(), pid(), non_neg_integer(), + fun ((rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> 'ok'), tuple()) -> + tuple()). -endif. @@ -508,10 +510,9 @@ handle_frame(Type, Channel, Payload, heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ChPid, FramingState} -> + {ChPid, AState} -> NewAState = process_channel_frame( - AnalyzedFrame, self(), - Channel, ChPid, FramingState), + AnalyzedFrame, Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), post_process_frame(AnalyzedFrame, ChPid, control_throttle(State)); @@ -916,20 +917,27 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), - NewAState = process_channel_frame(AnalyzedFrame, self(), - Channel, ChPid, AState), + NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), put({ch_pid, ChPid}, {Channel, MRef}), State. -process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> +process_channel_frame(Frame, Channel, ChPid, AState) -> + process_channel_frame( + Frame, self(), Channel, + fun (Method, none) -> + rabbit_channel:do(ChPid, Method); + (Method, Content) -> + credit_flow:send(ChPid), + rabbit_channel:do(ChPid, Method, Content) + end, AState). + +process_channel_frame(Frame, ErrPid, Channel, CommandFun, AState) -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> NewAState; - {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), + {ok, Method, NewAState} -> ok = CommandFun(Method, none), NewAState; - {ok, Method, Content, NewAState} -> credit_flow:send(ChPid), - rabbit_channel:do(ChPid, - Method, Content), + {ok, Method, Content, NewAState} -> ok = CommandFun(Method, Content), NewAState; {error, Reason} -> ErrPid ! {channel_exit, Channel, Reason}, |