summaryrefslogtreecommitdiff
path: root/src/rabbit_command_assembler.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_command_assembler.erl')
-rw-r--r--src/rabbit_command_assembler.erl148
1 files changed, 148 insertions, 0 deletions
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
new file mode 100644
index 00000000..f8d3260e
--- /dev/null
+++ b/src/rabbit_command_assembler.erl
@@ -0,0 +1,148 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_command_assembler).
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([analyze_frame/3, init/1, process/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY |
+ ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY |
+ ?FRAME_TRACE | ?FRAME_HEARTBEAT).
+-type(protocol() :: rabbit_framing:protocol()).
+-type(method() :: rabbit_framing:amqp_method_record()).
+-type(class_id() :: rabbit_framing:amqp_class_id()).
+-type(weight() :: non_neg_integer()).
+-type(body_size() :: non_neg_integer()).
+-type(content() :: rabbit_types:undecoded_content()).
+
+-type(frame() ::
+ {'method', rabbit_framing:amqp_method_name(), binary()} |
+ {'content_header', class_id(), weight(), body_size(), binary()} |
+ {'content_body', binary()}).
+
+-type(state() ::
+ {'method', protocol()} |
+ {'content_header', method(), class_id(), protocol()} |
+ {'content_body', method(), body_size(), class_id(), protocol()}).
+
+-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) ->
+ frame() | 'heartbeat' | 'error').
+
+-spec(init/1 :: (protocol()) -> {ok, state()}).
+-spec(process/2 :: (frame(), state()) ->
+ {ok, state()} |
+ {ok, method(), state()} |
+ {ok, method(), content(), state()} |
+ {error, rabbit_types:amqp_error()}).
+
+-endif.
+
+%%--------------------------------------------------------------------
+
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
+ {content_header, ClassId, Weight, BodySize, Properties};
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
+ {content_body, Body};
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
+ heartbeat;
+analyze_frame(_Type, _Body, _Protocol) ->
+ error.
+
+init(Protocol) -> {ok, {method, Protocol}}.
+
+process({method, MethodName, FieldsBin}, {method, Protocol}) ->
+ try
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ {ok, {content_header, Method, ClassId, Protocol}};
+ false -> {ok, Method, {method, Protocol}}
+ end
+ catch exit:#amqp_error{} = Reason -> {error, Reason}
+ end;
+process(_Frame, {method, _Protocol}) ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead", [], none);
+process({content_header, ClassId, 0, 0, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, Method, Content, {method, Protocol}};
+process({content_header, ClassId, 0, BodySize, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, {content_body, Method, BodySize, Content, Protocol}};
+process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin},
+ {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId], Method);
+process(_Frame, {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead", [ClassId], Method);
+process({content_body, FragmentBin},
+ {content_body, Method, RemainingSize,
+ Content = #content{payload_fragments_rev = Fragments}, Protocol}) ->
+ NewContent = Content#content{
+ payload_fragments_rev = [FragmentBin | Fragments]},
+ case RemainingSize - size(FragmentBin) of
+ 0 -> {ok, Method, NewContent, {method, Protocol}};
+ Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}}
+ end;
+process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
+ unexpected_frame("expected content body, "
+ "got non content body frame instead", [], Method).
+
+%%--------------------------------------------------------------------
+
+empty_content(ClassId, PropertiesBin, Protocol) ->
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = []}.
+
+unexpected_frame(Format, Params, Method) when is_atom(Method) ->
+ {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)};
+unexpected_frame(Format, Params, Method) ->
+ unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)).