summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-02-09 12:03:17 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-02-09 12:03:17 +0000
commite19af4d7f8036f4317b956221010491899d7302d (patch)
treecafbf473d5b42291dc0a2ef4a2784e6fae8ada7d
parent22871e540c28aab4cb2cab1184f6cdb83dfe100f (diff)
downloadrabbitmq-server-e19af4d7f8036f4317b956221010491899d7302d.tar.gz
refactor channel frame handling
...so there is just one process_channel_frame call site. Also, ensure control_throttle isn't called twice, which would happen when processing a 'channel.close_ok' frame. No harm in it, really, but unnecessary.
-rw-r--r--src/rabbit_reader.erl45
1 files changed, 21 insertions, 24 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1602cc2b..908a279c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -499,23 +499,21 @@ handle_frame(Type, Channel, Payload,
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame ->
- case get({channel, Channel}) of
- {ChPid, AState} ->
- NewAState = process_channel_frame(
- AnalyzedFrame, Channel, ChPid, AState),
- put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid,
- control_throttle(State));
- undefined ->
- case ?IS_RUNNING(State) of
- true -> send_to_new_channel(
- Channel, AnalyzedFrame, State);
- false -> throw({channel_frame_while_starting,
- Channel, State#v1.connection_state,
- AnalyzedFrame})
- end
- end
+ AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
+ end.
+
+process_frame(Frame, Channel, State) ->
+ case get({channel, Channel}) of
+ {ChPid, AState} ->
+ NewAState = process_channel_frame(Frame, Channel, ChPid, AState),
+ put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ undefined when ?IS_RUNNING(State) ->
+ ok = create_channel(Channel, State),
+ process_frame(Frame, Channel, State);
+ undefined ->
+ throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state, Frame})
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -526,11 +524,11 @@ post_process_frame({method, MethodName, _}, _ChPid,
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- maybe_block(State);
- false -> State
+ maybe_block(control_throttle(State));
+ false -> control_throttle(State)
end;
post_process_frame(_Frame, _ChPid, State) ->
- State.
+ control_throttle(State).
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
@@ -895,7 +893,7 @@ cert_info(F, Sock) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+create_channel(Channel, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
@@ -908,10 +906,9 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState),
- put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
- State.
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
process_channel_frame(Frame, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of