diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-09 12:03:17 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-09 12:03:17 +0000 |
commit | e19af4d7f8036f4317b956221010491899d7302d (patch) | |
tree | cafbf473d5b42291dc0a2ef4a2784e6fae8ada7d | |
parent | 22871e540c28aab4cb2cab1184f6cdb83dfe100f (diff) | |
download | rabbitmq-server-e19af4d7f8036f4317b956221010491899d7302d.tar.gz |
refactor channel frame handling
...so there is just one process_channel_frame call site.
Also, ensure control_throttle isn't called twice, which would happen
when processing a 'channel.close_ok' frame. No harm in it, really, but
unnecessary.
-rw-r--r-- | src/rabbit_reader.erl | 45 |
1 files changed, 21 insertions, 24 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1602cc2b..908a279c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -499,23 +499,21 @@ handle_frame(Type, Channel, Payload, case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - AnalyzedFrame -> - case get({channel, Channel}) of - {ChPid, AState} -> - NewAState = process_channel_frame( - AnalyzedFrame, Channel, ChPid, AState), - put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(AnalyzedFrame, ChPid, - control_throttle(State)); - undefined -> - case ?IS_RUNNING(State) of - true -> send_to_new_channel( - Channel, AnalyzedFrame, State); - false -> throw({channel_frame_while_starting, - Channel, State#v1.connection_state, - AnalyzedFrame}) - end - end + AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State) + end. + +process_frame(Frame, Channel, State) -> + case get({channel, Channel}) of + {ChPid, AState} -> + NewAState = process_channel_frame(Frame, Channel, ChPid, AState), + put({channel, Channel}, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + undefined when ?IS_RUNNING(State) -> + ok = create_channel(Channel, State), + process_frame(Frame, Channel, State); + undefined -> + throw({channel_frame_while_starting, + Channel, State#v1.connection_state, Frame}) end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> @@ -526,11 +524,11 @@ post_process_frame({method, MethodName, _}, _ChPid, protocol = Protocol}}) -> case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), - maybe_block(State); - false -> State + maybe_block(control_throttle(State)); + false -> control_throttle(State) end; post_process_frame(_Frame, _ChPid, State) -> - State. + control_throttle(State). handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( @@ -895,7 +893,7 @@ cert_info(F, Sock) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, State) -> +create_channel(Channel, State) -> #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, connection = #connection{protocol = Protocol, @@ -908,10 +906,9 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), - NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState), - put({channel, Channel}, {ChPid, NewAState}), put({ch_pid, ChPid}, {Channel, MRef}), - State. + put({channel, Channel}, {ChPid, AState}), + ok. process_channel_frame(Frame, Channel, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of |