summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-13 17:20:45 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-13 17:20:45 +0000
commit458ef28845f9153b2424fa346c36c16b544958a3 (patch)
treebd59c737e54a1ebbf0eb24a3e47abc91a36053fc /src/rabbit_reader.erl
parent20035471ba37f0c5c48f2139ee3833a306cc77a0 (diff)
downloadrabbitmq-server-458ef28845f9153b2424fa346c36c16b544958a3.tar.gz
do not hard-wire the channel interaction into process_channel_frame
This makes that function more versatile, e.g. permitting the Erlang client to skip the credit_flow:send step.
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl32
1 files changed, 20 insertions, 12 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 059f88ba..90207938 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -93,8 +93,10 @@
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
-spec(process_channel_frame/5 ::
- (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(),
- tuple()) -> tuple()).
+ (rabbit_command_assembler:frame(), pid(), non_neg_integer(),
+ fun ((rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok'), tuple()) ->
+ tuple()).
-endif.
@@ -508,10 +510,9 @@ handle_frame(Type, Channel, Payload,
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ChPid, FramingState} ->
+ {ChPid, AState} ->
NewAState = process_channel_frame(
- AnalyzedFrame, self(),
- Channel, ChPid, FramingState),
+ AnalyzedFrame, Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
post_process_frame(AnalyzedFrame, ChPid,
control_throttle(State));
@@ -916,20 +917,27 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, self(),
- Channel, ChPid, AState),
+ NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
State.
-process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+process_channel_frame(Frame, Channel, ChPid, AState) ->
+ process_channel_frame(
+ Frame, self(), Channel,
+ fun (Method, none) ->
+ rabbit_channel:do(ChPid, Method);
+ (Method, Content) ->
+ credit_flow:send(ChPid),
+ rabbit_channel:do(ChPid, Method, Content)
+ end, AState).
+
+process_channel_frame(Frame, ErrPid, Channel, CommandFun, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} -> NewAState;
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ {ok, Method, NewAState} -> ok = CommandFun(Method, none),
NewAState;
- {ok, Method, Content, NewAState} -> credit_flow:send(ChPid),
- rabbit_channel:do(ChPid,
- Method, Content),
+ {ok, Method, Content, NewAState} -> ok = CommandFun(Method, Content),
NewAState;
{error, Reason} -> ErrPid ! {channel_exit, Channel,
Reason},