diff options
Diffstat (limited to 'src/rabbit_command_assembler.erl')
-rw-r--r-- | src/rabbit_command_assembler.erl | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl new file mode 100644 index 00000000..f8d3260e --- /dev/null +++ b/src/rabbit_command_assembler.erl @@ -0,0 +1,148 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_command_assembler). +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-export([analyze_frame/3, init/1, process/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY | + ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY | + ?FRAME_TRACE | ?FRAME_HEARTBEAT). +-type(protocol() :: rabbit_framing:protocol()). +-type(method() :: rabbit_framing:amqp_method_record()). +-type(class_id() :: rabbit_framing:amqp_class_id()). +-type(weight() :: non_neg_integer()). +-type(body_size() :: non_neg_integer()). +-type(content() :: rabbit_types:undecoded_content()). + +-type(frame() :: + {'method', rabbit_framing:amqp_method_name(), binary()} | + {'content_header', class_id(), weight(), body_size(), binary()} | + {'content_body', binary()}). + +-type(state() :: + {'method', protocol()} | + {'content_header', method(), class_id(), protocol()} | + {'content_body', method(), body_size(), class_id(), protocol()}). + +-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) -> + frame() | 'heartbeat' | 'error'). + +-spec(init/1 :: (protocol()) -> {ok, state()}). +-spec(process/2 :: (frame(), state()) -> + {ok, state()} | + {ok, method(), state()} | + {ok, method(), content(), state()} | + {error, rabbit_types:amqp_error()}). + +-endif. + +%%-------------------------------------------------------------------- + +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> + {content_header, ClassId, Weight, BodySize, Properties}; +analyze_frame(?FRAME_BODY, Body, _Protocol) -> + {content_body, Body}; +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> + heartbeat; +analyze_frame(_Type, _Body, _Protocol) -> + error. + +init(Protocol) -> {ok, {method, Protocol}}. + +process({method, MethodName, FieldsBin}, {method, Protocol}) -> + try + Method = Protocol:decode_method_fields(MethodName, FieldsBin), + case Protocol:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), + {ok, {content_header, Method, ClassId, Protocol}}; + false -> {ok, Method, {method, Protocol}} + end + catch exit:#amqp_error{} = Reason -> {error, Reason} + end; +process(_Frame, {method, _Protocol}) -> + unexpected_frame("expected method frame, " + "got non method frame instead", [], none); +process({content_header, ClassId, 0, 0, PropertiesBin}, + {content_header, Method, ClassId, Protocol}) -> + Content = empty_content(ClassId, PropertiesBin, Protocol), + {ok, Method, Content, {method, Protocol}}; +process({content_header, ClassId, 0, BodySize, PropertiesBin}, + {content_header, Method, ClassId, Protocol}) -> + Content = empty_content(ClassId, PropertiesBin, Protocol), + {ok, {content_body, Method, BodySize, Content, Protocol}}; +process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin}, + {content_header, Method, ClassId, _Protocol}) -> + unexpected_frame("expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId], Method); +process(_Frame, {content_header, Method, ClassId, _Protocol}) -> + unexpected_frame("expected content header for class ~w, " + "got non content header frame instead", [ClassId], Method); +process({content_body, FragmentBin}, + {content_body, Method, RemainingSize, + Content = #content{payload_fragments_rev = Fragments}, Protocol}) -> + NewContent = Content#content{ + payload_fragments_rev = [FragmentBin | Fragments]}, + case RemainingSize - size(FragmentBin) of + 0 -> {ok, Method, NewContent, {method, Protocol}}; + Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}} + end; +process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> + unexpected_frame("expected content body, " + "got non content body frame instead", [], Method). + +%%-------------------------------------------------------------------- + +empty_content(ClassId, PropertiesBin, Protocol) -> + #content{class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + protocol = Protocol, + payload_fragments_rev = []}. + +unexpected_frame(Format, Params, Method) when is_atom(Method) -> + {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)}; +unexpected_frame(Format, Params, Method) -> + unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)). |