diff options
-rw-r--r-- | src/rabbit_channel_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit_command_assembler.erl (renamed from src/rabbit_framing_channel.erl) | 20 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 26 |
3 files changed, 24 insertions, 24 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 83e91f6c..9882156d 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -72,7 +72,7 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, [Channel, ReaderPid, WriterPid, Username, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), - {ok, FramingState} = rabbit_framing_channel:init(Protocol), + {ok, FramingState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, FramingState}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_command_assembler.erl index 57089fc2..b3fb6561 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_command_assembler.erl @@ -29,42 +29,42 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_framing_channel). +-module(rabbit_command_assembler). -include("rabbit.hrl"). --export([init/1, collect/2]). +-export([init/1, process/2]). %%-------------------------------------------------------------------- init(Protocol) -> {ok, {method, Protocol}}. -collect({method, MethodName, FieldsBin}, {method, Protocol}) -> +process({method, MethodName, FieldsBin}, {method, Protocol}) -> Method = Protocol:decode_method_fields(MethodName, FieldsBin), case Protocol:method_has_content(MethodName) of true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), {ok, {content_header, Method, ClassId, Protocol}}; false -> {ok, Method, {method, Protocol}} end; -collect(_Frame, {method, _Protocol}) -> +process(_Frame, {method, _Protocol}) -> unexpected_frame("expected method frame, " "got non method frame instead", [], none); -collect({content_header, ClassId, 0, 0, PropertiesBin}, +process({content_header, ClassId, 0, 0, PropertiesBin}, {content_header, Method, ClassId, Protocol}) -> Content = empty_content(ClassId, PropertiesBin, Protocol), {ok, Method, Content, {method, Protocol}}; -collect({content_header, ClassId, 0, BodySize, PropertiesBin}, +process({content_header, ClassId, 0, BodySize, PropertiesBin}, {content_header, Method, ClassId, Protocol}) -> Content = empty_content(ClassId, PropertiesBin, Protocol), {ok, {content_body, Method, BodySize, Content, Protocol}}; -collect({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin}, +process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin}, {content_header, Method, ClassId, _Protocol}) -> unexpected_frame("expected content header for class ~w, " "got one for class ~w instead", [ClassId, HeaderClassId], Method); -collect(_Frame, {content_header, Method, ClassId, _Protocol}) -> +process(_Frame, {content_header, Method, ClassId, _Protocol}) -> unexpected_frame("expected content header for class ~w, " "got non content header frame instead", [ClassId], Method); -collect({content_body, FragmentBin}, +process({content_body, FragmentBin}, {content_body, Method, RemainingSize, Content = #content{payload_fragments_rev = Fragments}, Protocol}) -> NewContent = Content#content{ @@ -73,7 +73,7 @@ collect({content_body, FragmentBin}, 0 -> {ok, Method, NewContent, {method, Protocol}}; Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}} end; -collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> +process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> unexpected_frame("expected content body, " "got non content body frame instead", [], Method). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9b304018..91dd42dd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -960,29 +960,29 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, _ChSupPid, {ChPid, ChFrSt}} = + {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector}), erlang:monitor(process, ChPid), - put({channel, Channel}, {ChPid, ChFrSt}), + put({channel, Channel}, {ChPid, AState}), put({ch_pid, ChPid}, Channel), - process_channel_frame(AnalyzedFrame, Channel, ChPid, ChFrSt, State). + process_channel_frame(AnalyzedFrame, Channel, ChPid, AState, State). -process_channel_frame(Frame, Channel, ChPid, ChFrSt, State) -> - UpdateFramingState = fun (NewChFrSt) -> - put({channel, Channel}, {ChPid, NewChFrSt}), +process_channel_frame(Frame, Channel, ChPid, AState, State) -> + UpdateFramingState = fun (NewAState) -> + put({channel, Channel}, {ChPid, NewAState}), State end, - case rabbit_framing_channel:collect(Frame, ChFrSt) of - {ok, NewChFrSt} -> - UpdateFramingState(NewChFrSt); - {ok, Method, NewChFrSt} -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + UpdateFramingState(NewAState); + {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - UpdateFramingState(NewChFrSt); - {ok, Method, Content, NewChFrSt} -> + UpdateFramingState(NewAState); + {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, Method, Content), - UpdateFramingState(NewChFrSt); + UpdateFramingState(NewAState); {error, Reason} -> handle_exception(State, Channel, Reason) end. |