summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl')
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl292
1 files changed, 292 insertions, 0 deletions
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl
new file mode 100644
index 0000000000..2921f929db
--- /dev/null
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl
@@ -0,0 +1,292 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_amqp1_0_writer).
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+-include("rabbit_amqp1_0.hrl").
+
+-export([start/5, start_link/5, start/6, start_link/6]).
+-export([send_command/2, send_command/3,
+ send_command_sync/2, send_command_sync/3,
+ send_command_and_notify/4, send_command_and_notify/5]).
+-export([internal_send_command/4, internal_send_command/6]).
+
+%% internal
+-export([mainloop/1, mainloop1/1]).
+
+-record(wstate, {sock, channel, frame_max, protocol, reader,
+ stats_timer, pending}).
+
+-define(HIBERNATE_AFTER, 5000).
+-define(AMQP_SASL_FRAME_TYPE, 1).
+
+%%---------------------------------------------------------------------------
+
+-spec start
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid())
+ -> rabbit_types:ok(pid()).
+-spec start_link
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid())
+ -> rabbit_types:ok(pid()).
+-spec start
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ -> rabbit_types:ok(pid()).
+-spec start_link
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ -> rabbit_types:ok(pid()).
+-spec send_command
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
+-spec send_command
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok'.
+-spec send_command_sync
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
+-spec send_command_sync
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok'.
+-spec send_command_and_notify
+ (pid(), pid(), pid(), rabbit_framing:amqp_method_record())
+ -> 'ok'.
+-spec send_command_and_notify
+ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:content())
+ -> 'ok'.
+-spec internal_send_command
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record(), rabbit_types:protocol())
+ -> 'ok'.
+-spec internal_send_command
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record(), rabbit_types:content(),
+ non_neg_integer(), rabbit_types:protocol())
+ -> 'ok'.
+
+%%---------------------------------------------------------------------------
+
+start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ start(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
+ ReaderWantsStats),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
+
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
+ ReaderWantsStats),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+
+initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+ (case ReaderWantsStats of
+ true -> fun rabbit_event:init_stats_timer/2;
+ false -> fun rabbit_event:init_disabled_stats_timer/2
+ end)(#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ reader = ReaderPid,
+ pending = []},
+ #wstate.stats_timer).
+
+mainloop(State) ->
+ try
+ mainloop1(State)
+ catch
+ exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
+ ReaderPid ! {channel_exit, Channel, Error}
+ end,
+ done.
+
+mainloop1(State = #wstate{pending = []}) ->
+ receive
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop, [State])
+ end;
+mainloop1(State) ->
+ receive
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
+ after 0 ->
+ ?MODULE:mainloop1(flush(State))
+ end.
+
+handle_message({send_command, MethodRecord}, State) ->
+ internal_send_command_async(MethodRecord, State);
+handle_message({send_command, MethodRecord, Content}, State) ->
+ internal_send_command_async(MethodRecord, Content, State);
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
+ State1 = flush(internal_send_command_async(MethodRecord, State)),
+ gen_server:reply(From, ok),
+ State1;
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
+ State) ->
+ State1 = flush(internal_send_command_async(MethodRecord, Content, State)),
+ gen_server:reply(From, ok),
+ State1;
+handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
+ State1 = internal_send_command_async(MethodRecord, State),
+ rabbit_amqqueue:notify_sent(QPid, ChPid),
+ State1;
+handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
+ State) ->
+ State1 = internal_send_command_async(MethodRecord, Content, State),
+ rabbit_amqqueue:notify_sent(QPid, ChPid),
+ State1;
+handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue:notify_sent_queue_down(QPid),
+ State;
+handle_message({inet_reply, _, ok}, State) ->
+ rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
+handle_message({inet_reply, _, Status}, _State) ->
+ exit({writer, send_failed, Status});
+handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
+ ReaderPid ! ensure_stats,
+ rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
+handle_message(Message, _State) ->
+ exit({writer, message_not_understood, Message}).
+
+%%---------------------------------------------------------------------------
+
+send_command(W, MethodRecord) ->
+ W ! {send_command, MethodRecord},
+ ok.
+
+send_command(W, MethodRecord, Content) ->
+ W ! {send_command, MethodRecord, Content},
+ ok.
+
+send_command_sync(W, MethodRecord) ->
+ call(W, {send_command_sync, MethodRecord}).
+
+send_command_sync(W, MethodRecord, Content) ->
+ call(W, {send_command_sync, MethodRecord, Content}).
+
+send_command_and_notify(W, Q, ChPid, MethodRecord) ->
+ W ! {send_command_and_notify, Q, ChPid, MethodRecord},
+ ok.
+
+send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
+ W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
+ ok.
+
+%%---------------------------------------------------------------------------
+
+call(Pid, Msg) ->
+ {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
+ Res.
+
+%%---------------------------------------------------------------------------
+
+%% Begin 1-0
+
+assemble_frame(Channel, Performative, amqp10_framing) ->
+ ?DEBUG("Channel ~p <-~n~p~n~n",
+ [Channel, amqp10_framing:pprint(Performative)]),
+ PerfBin = amqp10_framing:encode_bin(Performative),
+ amqp10_binary_generator:build_frame(Channel, PerfBin);
+
+assemble_frame(Channel, Performative, rabbit_amqp1_0_sasl) ->
+ ?DEBUG("Channel ~p <-~n~p~n~n",
+ [Channel, amqp10_framing:pprint(Performative)]),
+ PerfBin = amqp10_framing:encode_bin(Performative),
+ amqp10_binary_generator:build_frame(Channel,
+ ?AMQP_SASL_FRAME_TYPE, PerfBin).
+
+%% Note: a transfer record can be followed by a number of other
+%% records to make a complete frame but unlike 0-9-1 we may have many
+%% content records. However, that's already been handled for us, we're
+%% just sending a chunk, so from this perspective it's just a binary.
+
+assemble_frames(Channel, Performative, Content, _FrameMax,
+ amqp10_framing) ->
+ ?DEBUG("Channel ~p <-~n~p~n followed by ~p bytes of content~n~n",
+ [Channel, amqp10_framing:pprint(Performative),
+ iolist_size(Content)]),
+ PerfBin = amqp10_framing:encode_bin(Performative),
+ amqp10_binary_generator:build_frame(Channel, [PerfBin, Content]).
+
+%% End 1-0
+
+tcp_send(Sock, Data) ->
+ rabbit_misc:throw_on_error(inet_error,
+ fun () -> rabbit_net:send(Sock, Data) end).
+
+internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
+ ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
+
+internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
+ ok = lists:foldl(fun (Frame, ok) -> tcp_send(Sock, Frame);
+ (_Frame, Other) -> Other
+ end, ok, assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
+
+internal_send_command_async(MethodRecord,
+ State = #wstate{channel = Channel,
+ protocol = Protocol,
+ pending = Pending}) ->
+ Frame = assemble_frame(Channel, MethodRecord, Protocol),
+ maybe_flush(State#wstate{pending = [Frame | Pending]}).
+
+internal_send_command_async(MethodRecord, Content,
+ State = #wstate{channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ pending = Pending}) ->
+ Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax,
+ Protocol),
+ maybe_flush(State#wstate{pending = [Frames | Pending]}).
+
+%% This magic number is the tcp-over-ethernet MSS (1460) minus the
+%% minimum size of a AMQP basic.deliver method frame (24) plus basic
+%% content header (22). The idea is that we want to flush just before
+%% exceeding the MSS.
+-define(FLUSH_THRESHOLD, 1414).
+
+maybe_flush(State = #wstate{pending = Pending}) ->
+ case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
+ true -> flush(State);
+ false -> State
+ end.
+
+flush(State = #wstate{pending = []}) ->
+ State;
+flush(State = #wstate{sock = Sock, pending = Pending}) ->
+ ok = port_cmd(Sock, lists:reverse(Pending)),
+ State#wstate{pending = []}.
+
+%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
+%% Status} to obtain the result. That is bad when it is called from
+%% the writer since it requires scanning of the writers possibly quite
+%% large message queue.
+%%
+%% So instead we lift the code from prim_inet:send/2, which is what
+%% gen_tcp:send/2 calls, do the first half here and then just process
+%% the result code in handle_message/2 as and when it arrives.
+%%
+%% This means we may end up happily sending data down a closed/broken
+%% socket, but that's ok since a) data in the buffers will be lost in
+%% any case (so qualitatively we are no worse off than if we used
+%% gen_tcp:send/2), and b) we do detect the changed socket status
+%% eventually, i.e. when we get round to handling the result code.
+%%
+%% Also note that the port has bounded buffers and port_command blocks
+%% when these are full. So the fact that we process the result
+%% asynchronously does not impact flow control.
+port_cmd(Sock, Data) ->
+ true = try rabbit_net:port_command(Sock, Data)
+ catch error:Error -> exit({writer, send_failed, Error})
+ end,
+ ok.