summaryrefslogtreecommitdiff
path: root/deps/amqp_client/src
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
commitf23a51261d9502ec39df0f8db47ba6b22aa7659f (patch)
tree53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/amqp_client/src
parentafa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff)
parent9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff)
downloadrabbitmq-server-git-f23a51261d9502ec39df0f8db47ba6b22aa7659f.tar.gz
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/amqp_client/src')
-rw-r--r--deps/amqp_client/src/amqp_auth_mechanisms.erl44
-rw-r--r--deps/amqp_client/src/amqp_channel.erl1010
-rw-r--r--deps/amqp_client/src/amqp_channel_sup.erl73
-rw-r--r--deps/amqp_client/src/amqp_channel_sup_sup.erl36
-rw-r--r--deps/amqp_client/src/amqp_channels_manager.erl249
-rw-r--r--deps/amqp_client/src/amqp_client.erl37
-rw-r--r--deps/amqp_client/src/amqp_connection.erl395
-rw-r--r--deps/amqp_client/src/amqp_connection_sup.erl41
-rw-r--r--deps/amqp_client/src/amqp_connection_type_sup.erl91
-rw-r--r--deps/amqp_client/src/amqp_direct_connection.erl232
-rw-r--r--deps/amqp_client/src/amqp_direct_consumer.erl103
-rw-r--r--deps/amqp_client/src/amqp_gen_connection.erl387
-rw-r--r--deps/amqp_client/src/amqp_gen_consumer.erl284
-rw-r--r--deps/amqp_client/src/amqp_main_reader.erl179
-rw-r--r--deps/amqp_client/src/amqp_network_connection.erl380
-rw-r--r--deps/amqp_client/src/amqp_rpc_client.erl176
-rw-r--r--deps/amqp_client/src/amqp_rpc_server.erl138
-rw-r--r--deps/amqp_client/src/amqp_selective_consumer.erl265
-rw-r--r--deps/amqp_client/src/amqp_ssl.erl113
-rw-r--r--deps/amqp_client/src/amqp_sup.erl38
-rw-r--r--deps/amqp_client/src/amqp_uri.erl273
-rw-r--r--deps/amqp_client/src/amqp_util.erl17
-rw-r--r--deps/amqp_client/src/overview.edoc.in27
-rw-r--r--deps/amqp_client/src/rabbit_routing_util.erl222
-rw-r--r--deps/amqp_client/src/uri_parser.erl125
25 files changed, 4935 insertions, 0 deletions
diff --git a/deps/amqp_client/src/amqp_auth_mechanisms.erl b/deps/amqp_client/src/amqp_auth_mechanisms.erl
new file mode 100644
index 0000000000..549cd17376
--- /dev/null
+++ b/deps/amqp_client/src/amqp_auth_mechanisms.erl
@@ -0,0 +1,44 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_auth_mechanisms).
+
+-include("amqp_client.hrl").
+
+-export([plain/3, amqplain/3, external/3, crdemo/3]).
+
+%%---------------------------------------------------------------------------
+
+plain(none, _, init) ->
+ {<<"PLAIN">>, []};
+plain(none, #amqp_params_network{username = Username,
+ password = Password}, _State) ->
+ DecryptedPassword = credentials_obfuscation:decrypt(Password),
+ {<<0, Username/binary, 0, DecryptedPassword/binary>>, _State}.
+
+amqplain(none, _, init) ->
+ {<<"AMQPLAIN">>, []};
+amqplain(none, #amqp_params_network{username = Username,
+ password = Password}, _State) ->
+ LoginTable = [{<<"LOGIN">>, longstr, Username},
+ {<<"PASSWORD">>, longstr, credentials_obfuscation:decrypt(Password)}],
+ {rabbit_binary_generator:generate_table(LoginTable), _State}.
+
+external(none, _, init) ->
+ {<<"EXTERNAL">>, []};
+external(none, _, _State) ->
+ {<<"">>, _State}.
+
+crdemo(none, _, init) ->
+ {<<"RABBIT-CR-DEMO">>, 0};
+crdemo(none, #amqp_params_network{username = Username}, 0) ->
+ {Username, 1};
+crdemo(<<"Please tell me your password">>,
+ #amqp_params_network{password = Password}, 1) ->
+ DecryptedPassword = credentials_obfuscation:decrypt(Password),
+ {<<"My password is ", DecryptedPassword/binary>>, 2}.
diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl
new file mode 100644
index 0000000000..9e95df4fe3
--- /dev/null
+++ b/deps/amqp_client/src/amqp_channel.erl
@@ -0,0 +1,1010 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @type close_reason(Type) = {shutdown, amqp_reason(Type)}.
+%% @type amqp_reason(Type) = {Type, Code, Text}
+%% Code = non_neg_integer()
+%% Text = binary().
+%% @doc This module encapsulates the client's view of an AMQP
+%% channel. Each server side channel is represented by an amqp_channel
+%% process on the client side. Channel processes are created using the
+%% {@link amqp_connection} module. Channel processes are supervised
+%% under amqp_client's supervision tree.<br/>
+%% <br/>
+%% In case of a failure or an AMQP error, the channel process exits with a
+%% meaningful exit reason:<br/>
+%% <br/>
+%% <table>
+%% <tr>
+%% <td><strong>Cause</strong></td>
+%% <td><strong>Exit reason</strong></td>
+%% </tr>
+%% <tr>
+%% <td>Any reason, where Code would have been 200 otherwise</td>
+%% <td>```normal'''</td>
+%% </tr>
+%% <tr>
+%% <td>User application calls amqp_channel:close/3</td>
+%% <td>```close_reason(app_initiated_close)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Server closes channel (soft error)</td>
+%% <td>```close_reason(server_initiated_close)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Server misbehaved (did not follow protocol)</td>
+%% <td>```close_reason(server_misbehaved)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Connection is closing (causing all channels to cleanup and
+%% close)</td>
+%% <td>```{shutdown, {connection_closing, amqp_reason(atom())}}'''</td>
+%% </tr>
+%% <tr>
+%% <td>Other error</td>
+%% <td>(various error reasons, causing more detailed logging)</td>
+%% </tr>
+%% </table>
+%% <br/>
+%% See type definitions below.
+-module(amqp_channel).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(gen_server).
+
+-export([call/2, call/3, cast/2, cast/3, cast_flow/3]).
+-export([close/1, close/3]).
+-export([register_return_handler/2, unregister_return_handler/1,
+ register_flow_handler/2, unregister_flow_handler/1,
+ register_confirm_handler/2, unregister_confirm_handler/1]).
+-export([call_consumer/2, subscribe/3]).
+-export([next_publish_seqno/1, wait_for_confirms/1, wait_for_confirms/2,
+ wait_for_confirms_or_die/1, wait_for_confirms_or_die/2]).
+-export([start_link/5, set_writer/2, connection_closing/3, open/1,
+ enable_delivery_flow_control/1, notify_received/1]).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-define(TIMEOUT_FLUSH, 60000).
+
+-record(state, {number,
+ connection,
+ consumer,
+ driver,
+ rpc_requests = queue:new(),
+ closing = false, %% false |
+ %% {just_channel, Reason} |
+ %% {connection, Reason}
+ writer,
+ return_handler = none,
+ confirm_handler = none,
+ next_pub_seqno = 0,
+ flow_active = true,
+ flow_handler = none,
+ unconfirmed_set = gb_sets:new(),
+ waiting_set = gb_trees:empty(),
+ only_acks_received = true,
+
+ %% true | false, only relevant in the direct
+ %% client case.
+ %% when true, consumers will manually notify
+ %% queue pids using rabbit_amqqueue_common:notify_sent/2
+ %% to prevent the queue from overwhelming slow
+ %% consumers that use automatic acknowledgement
+ %% mode.
+ delivery_flow_control = false
+ }).
+
+%%---------------------------------------------------------------------------
+%% Type Definitions
+%%---------------------------------------------------------------------------
+
+%% @type amqp_method().
+%% This abstract datatype represents the set of methods that comprise
+%% the AMQP execution model. As indicated in the overview, the
+%% attributes of each method in the execution model are described in
+%% the protocol documentation. The Erlang record definitions are
+%% autogenerated from a parseable version of the specification. Most
+%% fields in the generated records have sensible default values that
+%% you need not worry in the case of a simple usage of the client
+%% library.
+
+%% @type amqp_msg() = #amqp_msg{}.
+%% This is the content encapsulated in content-bearing AMQP methods. It
+%% contains the following fields:
+%% <ul>
+%% <li>props :: class_property() - A class property record, defaults to
+%% #'P_basic'{}</li>
+%% <li>payload :: binary() - The arbitrary data payload</li>
+%% </ul>
+
+%%---------------------------------------------------------------------------
+%% AMQP Channel API methods
+%%---------------------------------------------------------------------------
+
+%% @spec (Channel, Method) -> Result
+%% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
+call(Channel, Method) ->
+ gen_server:call(Channel, {call, Method, none, self()}, amqp_util:call_timeout()).
+
+%% @spec (Channel, Method, Content) -> Result
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% Result = amqp_method() | ok | blocked | closing
+%% @doc This sends an AMQP method on the channel.
+%% For content bearing methods, Content has to be an amqp_msg(), whereas
+%% for non-content bearing methods, it needs to be the atom 'none'.<br/>
+%% In the case of synchronous methods, this function blocks until the
+%% corresponding reply comes back from the server and returns it.
+%% In the case of asynchronous methods, the function blocks until the method
+%% gets sent on the wire and returns the atom 'ok' on success.<br/>
+%% This will return the atom 'blocked' if the server has
+%% throttled the client for flow control reasons. This will return the
+%% atom 'closing' if the channel is in the process of shutting down.<br/>
+%% Note that for asynchronous methods, the synchronicity implied by
+%% 'call' only means that the client has transmitted the method to
+%% the broker. It does not necessarily imply that the broker has
+%% accepted responsibility for the message.
+call(Channel, Method, Content) ->
+ gen_server:call(Channel, {call, Method, Content, self()}, amqp_util:call_timeout()).
+
+%% @spec (Channel, Method) -> ok
+%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
+cast(Channel, Method) ->
+ gen_server:cast(Channel, {cast, Method, none, self(), noflow}).
+
+%% @spec (Channel, Method, Content) -> ok
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% @doc This function is the same as {@link call/3}, except that it returns
+%% immediately with the atom 'ok', without blocking the caller process.
+%% This function is not recommended with synchronous methods, since there is no
+%% way to verify that the server has received the method.
+cast(Channel, Method, Content) ->
+ gen_server:cast(Channel, {cast, Method, Content, self(), noflow}).
+
+%% @spec (Channel, Method, Content) -> ok
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% @doc Like cast/3, with flow control.
+cast_flow(Channel, Method, Content) ->
+ credit_flow:send(Channel),
+ gen_server:cast(Channel, {cast, Method, Content, self(), flow}).
+
+%% @spec (Channel) -> ok | closing
+%% where
+%% Channel = pid()
+%% @doc Closes the channel, invokes
+%% close(Channel, 200, &lt;&lt;"Goodbye"&gt;&gt;).
+close(Channel) ->
+ close(Channel, 200, <<"Goodbye">>).
+
+%% @spec (Channel, Code, Text) -> ok | closing
+%% where
+%% Channel = pid()
+%% Code = integer()
+%% Text = binary()
+%% @doc Closes the channel, allowing the caller to supply a reply code and
+%% text. If the channel is already closing, the atom 'closing' is returned.
+close(Channel, Code, Text) ->
+ gen_server:call(Channel, {close, Code, Text}, amqp_util:call_timeout()).
+
+%% @spec (Channel) -> integer()
+%% where
+%% Channel = pid()
+%% @doc When in confirm mode, returns the sequence number of the next
+%% message to be published.
+next_publish_seqno(Channel) ->
+ gen_server:call(Channel, next_publish_seqno, amqp_util:call_timeout()).
+
+%% @spec (Channel) -> boolean() | 'timeout'
+%% where
+%% Channel = pid()
+%% @doc Wait until all messages published since the last call have
+%% been either ack'd or nack'd by the broker. Note, when called on a
+%% non-Confirm channel, waitForConfirms returns an error.
+%% @param Channel: the channel on which to wait.
+%% @end
+wait_for_confirms(Channel) ->
+ wait_for_confirms(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).
+
+%% @spec (Channel, Timeout) -> boolean() | 'timeout'
+%% where
+%% Channel = pid()
+%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
+%% @doc Wait until all messages published since the last call have
+%% been either ack'd or nack'd by the broker or the timeout expires.
+%% Note, when called on a non-Confirm channel, waitForConfirms throws
+%% an exception.
+%% @param Channel: the channel on which to wait.
+%% @param Timeout: the wait timeout in seconds.
+%% @end
+wait_for_confirms(Channel, {Timeout, second}) ->
+ do_wait_for_confirms(Channel, second_to_millisecond(Timeout));
+wait_for_confirms(Channel, {Timeout, millisecond}) ->
+ do_wait_for_confirms(Channel, Timeout);
+wait_for_confirms(Channel, Timeout) ->
+ do_wait_for_confirms(Channel, second_to_millisecond(Timeout)).
+
+%% @spec (Channel) -> true
+%% where
+%% Channel = pid()
+%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
+%% received, the calling process is immediately sent an
+%% exit(nack_received).
+%% @param Channel: the channel on which to wait.
+%% @end
+wait_for_confirms_or_die(Channel) ->
+ wait_for_confirms_or_die(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).
+
+%% @spec (Channel, Timeout) -> true
+%% where
+%% Channel = pid()
+%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
+%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
+%% received, the calling process is immediately sent an
+%% exit(nack_received). If the timeout expires, the calling process is
+%% sent an exit(timeout).
+%% @param Channel: the channel on which to wait.
+%% @param Timeout: the wait timeout in seconds.
+%% @end
+wait_for_confirms_or_die(Channel, Timeout) ->
+ case wait_for_confirms(Channel, Timeout) of
+ timeout -> close(Channel, 200, <<"Confirm Timeout">>),
+ exit(timeout);
+ false -> close(Channel, 200, <<"Nacks Received">>),
+ exit(nacks_received);
+ true -> true
+ end.
+
+%% @spec (Channel, ReturnHandler) -> ok
+%% where
+%% Channel = pid()
+%% ReturnHandler = pid()
+%% @doc This registers a handler to deal with returned messages. The
+%% registered process will receive #basic.return{} records.
+register_return_handler(Channel, ReturnHandler) ->
+ gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
+
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the return handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_return_handler(Channel) ->
+ gen_server:cast(Channel, unregister_return_handler).
+
+%% @spec (Channel, ConfirmHandler) -> ok
+%% where
+%% Channel = pid()
+%% ConfirmHandler = pid()
+
+%% @doc This registers a handler to deal with confirm-related
+%% messages. The registered process will receive #basic.ack{} and
+%% #basic.nack{} commands.
+register_confirm_handler(Channel, ConfirmHandler) ->
+ gen_server:cast(Channel, {register_confirm_handler, ConfirmHandler} ).
+
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the confirm handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_confirm_handler(Channel) ->
+ gen_server:cast(Channel, unregister_confirm_handler).
+
+%% @spec (Channel, FlowHandler) -> ok
+%% where
+%% Channel = pid()
+%% FlowHandler = pid()
+%% @doc This registers a handler to deal with channel flow notifications.
+%% The registered process will receive #channel.flow{} records.
+register_flow_handler(Channel, FlowHandler) ->
+ gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
+
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the flow handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_flow_handler(Channel) ->
+ gen_server:cast(Channel, unregister_flow_handler).
+
+%% @spec (Channel, Msg) -> ok
+%% where
+%% Channel = pid()
+%% Msg = any()
+%% @doc This causes the channel to invoke Consumer:handle_call/2,
+%% where Consumer is the amqp_gen_consumer implementation registered with
+%% the channel.
+call_consumer(Channel, Msg) ->
+ gen_server:call(Channel, {call_consumer, Msg}, amqp_util:call_timeout()).
+
+%% @spec (Channel, BasicConsume, Subscriber) -> ok
+%% where
+%% Channel = pid()
+%% BasicConsume = amqp_method()
+%% Subscriber = pid()
+%% @doc Subscribe the given pid to a queue using the specified
+%% basic.consume method.
+subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
+ gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, amqp_util:call_timeout()).
+
+%%---------------------------------------------------------------------------
+%% Internal interface
+%%---------------------------------------------------------------------------
+
+%% @private
+start_link(Driver, Connection, ChannelNumber, Consumer, Identity) ->
+ gen_server:start_link(
+ ?MODULE, [Driver, Connection, ChannelNumber, Consumer, Identity], []).
+
+set_writer(Pid, Writer) ->
+ gen_server:cast(Pid, {set_writer, Writer}).
+
+enable_delivery_flow_control(Pid) ->
+ gen_server:cast(Pid, enable_delivery_flow_control).
+
+notify_received({Pid, QPid, ServerChPid}) ->
+ gen_server:cast(Pid, {send_notify, {QPid, ServerChPid}}).
+
+%% @private
+connection_closing(Pid, ChannelCloseType, Reason) ->
+ gen_server:cast(Pid, {connection_closing, ChannelCloseType, Reason}).
+
+%% @private
+open(Pid) ->
+ gen_server:call(Pid, open, amqp_util:call_timeout()).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+init([Driver, Connection, ChannelNumber, Consumer, Identity]) ->
+ ?store_proc_name(Identity),
+ {ok, #state{connection = Connection,
+ driver = Driver,
+ number = ChannelNumber,
+ consumer = Consumer}}.
+
+%% @private
+handle_call(open, From, State) ->
+ {noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)};
+%% @private
+handle_call({close, Code, Text}, From, State) ->
+ handle_close(Code, Text, From, State);
+%% @private
+handle_call({call, Method, AmqpMsg, Sender}, From, State) ->
+ handle_method_to_server(Method, AmqpMsg, From, Sender, noflow, State);
+%% Handles the delivery of messages from a direct channel
+%% @private
+handle_call({send_command_sync, Method, Content}, From, State) ->
+ Ret = handle_method_from_server(Method, Content, State),
+ gen_server:reply(From, ok),
+ Ret;
+%% Handles the delivery of messages from a direct channel
+%% @private
+handle_call({send_command_sync, Method}, From, State) ->
+ Ret = handle_method_from_server(Method, none, State),
+ gen_server:reply(From, ok),
+ Ret;
+%% @private
+handle_call(next_publish_seqno, _From,
+ State = #state{next_pub_seqno = SeqNo}) ->
+ {reply, SeqNo, State};
+handle_call({wait_for_confirms, Timeout}, From, State) ->
+ handle_wait_for_confirms(From, Timeout, State);
+%% @private
+handle_call({call_consumer, Msg}, _From,
+ State = #state{consumer = Consumer}) ->
+ {reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State};
+%% @private
+handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
+ handle_method_to_server(BasicConsume, none, From, Subscriber, noflow,
+ State).
+
+%% @private
+handle_cast({set_writer, Writer}, State = #state{driver = direct}) ->
+ link(Writer),
+ {noreply, State#state{writer = Writer}};
+handle_cast({set_writer, Writer}, State) ->
+ {noreply, State#state{writer = Writer}};
+%% @private
+handle_cast(enable_delivery_flow_control, State) ->
+ {noreply, State#state{delivery_flow_control = true}};
+%% @private
+handle_cast({send_notify, {QPid, ChPid}}, State) ->
+ rabbit_amqqueue_common:notify_sent(QPid, ChPid),
+ {noreply, State};
+%% @private
+handle_cast({cast, Method, AmqpMsg, Sender, noflow}, State) ->
+ handle_method_to_server(Method, AmqpMsg, none, Sender, noflow, State);
+handle_cast({cast, Method, AmqpMsg, Sender, flow}, State) ->
+ credit_flow:ack(Sender),
+ handle_method_to_server(Method, AmqpMsg, none, Sender, flow, State);
+%% @private
+handle_cast({register_return_handler, ReturnHandler}, State) ->
+ Ref = erlang:monitor(process, ReturnHandler),
+ {noreply, State#state{return_handler = {ReturnHandler, Ref}}};
+%% @private
+handle_cast(unregister_return_handler,
+ State = #state{return_handler = {_ReturnHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{return_handler = none}};
+%% @private
+handle_cast({register_confirm_handler, ConfirmHandler}, State) ->
+ Ref = erlang:monitor(process, ConfirmHandler),
+ {noreply, State#state{confirm_handler = {ConfirmHandler, Ref}}};
+%% @private
+handle_cast(unregister_confirm_handler,
+ State = #state{confirm_handler = {_ConfirmHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{confirm_handler = none}};
+%% @private
+handle_cast({register_flow_handler, FlowHandler}, State) ->
+ Ref = erlang:monitor(process, FlowHandler),
+ {noreply, State#state{flow_handler = {FlowHandler, Ref}}};
+%% @private
+handle_cast(unregister_flow_handler,
+ State = #state{flow_handler = {_FlowHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{flow_handler = none}};
+%% Received from channels manager
+%% @private
+handle_cast({method, Method, Content, noflow}, State) ->
+ handle_method_from_server(Method, Content, State);
+%% Handles the situation when the connection closes without closing the channel
+%% beforehand. The channel must block all further RPCs,
+%% flush the RPC queue (optional), and terminate
+%% @private
+handle_cast({connection_closing, CloseType, Reason}, State) ->
+ handle_connection_closing(CloseType, Reason, State);
+%% @private
+handle_cast({shutdown, Shutdown}, State) ->
+ handle_shutdown(Shutdown, State).
+
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command, Method}, State) ->
+ handle_method_from_server(Method, none, State);
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command, Method, Content}, State) ->
+ handle_method_from_server(Method, Content, State);
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command_and_notify, QPid, ChPid,
+ Method = #'basic.deliver'{}, Content},
+ State = #state{delivery_flow_control = MFC}) ->
+ case MFC of
+ false -> handle_method_from_server(Method, Content, State),
+ rabbit_amqqueue_common:notify_sent(QPid, ChPid);
+ true -> handle_method_from_server(Method, Content,
+ {self(), QPid, ChPid}, State)
+ end,
+ {noreply, State};
+%% This comes from the writer or rabbit_channel
+%% @private
+handle_info({channel_exit, _ChNumber, Reason}, State) ->
+ handle_channel_exit(Reason, State);
+%% This comes from rabbit_channel in the direct case
+handle_info({channel_closing, ChPid}, State) ->
+ ok = rabbit_channel_common:ready_for_close(ChPid),
+ {noreply, State};
+%% @private
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ {noreply, State};
+%% @private
+handle_info(timed_out_flushing_channel, State) ->
+ ?LOG_WARN("Channel (~p) closing: timed out flushing while "
+ "connection closing~n", [self()]),
+ {stop, timed_out_flushing_channel, State};
+%% @private
+handle_info({'DOWN', _, process, ReturnHandler, shutdown},
+ State = #state{return_handler = {ReturnHandler, _Ref}}) ->
+ {noreply, State#state{return_handler = none}};
+handle_info({'DOWN', _, process, ReturnHandler, Reason},
+ State = #state{return_handler = {ReturnHandler, _Ref}}) ->
+ ?LOG_WARN("Channel (~p): Unregistering return handler ~p because it died. "
+ "Reason: ~p~n", [self(), ReturnHandler, Reason]),
+ {noreply, State#state{return_handler = none}};
+%% @private
+handle_info({'DOWN', _, process, ConfirmHandler, shutdown},
+ State = #state{confirm_handler = {ConfirmHandler, _Ref}}) ->
+ {noreply, State#state{confirm_handler = none}};
+handle_info({'DOWN', _, process, ConfirmHandler, Reason},
+ State = #state{confirm_handler = {ConfirmHandler, _Ref}}) ->
+ ?LOG_WARN("Channel (~p): Unregistering confirm handler ~p because it died. "
+ "Reason: ~p~n", [self(), ConfirmHandler, Reason]),
+ {noreply, State#state{confirm_handler = none}};
+%% @private
+handle_info({'DOWN', _, process, FlowHandler, shutdown},
+ State = #state{flow_handler = {FlowHandler, _Ref}}) ->
+ {noreply, State#state{flow_handler = none}};
+handle_info({'DOWN', _, process, FlowHandler, Reason},
+ State = #state{flow_handler = {FlowHandler, _Ref}}) ->
+ ?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
+ "Reason: ~p~n", [self(), FlowHandler, Reason]),
+ {noreply, State#state{flow_handler = none}};
+handle_info({'DOWN', _, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue_common:notify_sent_queue_down(QPid),
+ {noreply, State};
+handle_info({confirm_timeout, From}, State = #state{waiting_set = WSet}) ->
+ case gb_trees:lookup(From, WSet) of
+ none ->
+ {noreply, State};
+ {value, _} ->
+ gen_server:reply(From, timeout),
+ {noreply, State#state{waiting_set = gb_trees:delete(From, WSet)}}
+ end.
+
+%% @private
+terminate(_Reason, State) ->
+ flush_writer(State),
+ State.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%---------------------------------------------------------------------------
+%% RPC mechanism
+%%---------------------------------------------------------------------------
+
+handle_method_to_server(Method, AmqpMsg, From, Sender, Flow,
+ State = #state{unconfirmed_set = USet}) ->
+ case {check_invalid_method(Method), From,
+ check_block(Method, AmqpMsg, State)} of
+ {ok, _, ok} ->
+ State1 = case {Method, State#state.next_pub_seqno} of
+ {#'confirm.select'{}, 0} ->
+ %% The confirm seqno is set to 1 on the
+ %% first confirm.select only.
+ State#state{next_pub_seqno = 1};
+ {#'basic.publish'{}, 0} ->
+ State;
+ {#'basic.publish'{}, SeqNo} ->
+ State#state{unconfirmed_set =
+ gb_sets:add(SeqNo, USet),
+ next_pub_seqno = SeqNo + 1};
+ _ ->
+ State
+ end,
+ {noreply, rpc_top_half(Method, build_content(AmqpMsg),
+ From, Sender, Flow, State1)};
+ {ok, none, BlockReply} ->
+ ?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
+ "Reason: ~p~n", [self(), Method, BlockReply]),
+ {noreply, State};
+ {ok, _, BlockReply} ->
+ {reply, BlockReply, State};
+ {{_, InvalidMethodMessage}, none, _} ->
+ ?LOG_WARN("Channel (~p): ignoring cast of ~p method. " ++
+ InvalidMethodMessage ++ "~n", [self(), Method]),
+ {noreply, State};
+ {{InvalidMethodReply, _}, _, _} ->
+ {reply, {error, InvalidMethodReply}, State}
+ end.
+
+handle_close(Code, Text, From, State) ->
+ Close = #'channel.close'{reply_code = Code,
+ reply_text = Text,
+ class_id = 0,
+ method_id = 0},
+ case check_block(Close, none, State) of
+ ok -> {noreply, rpc_top_half(Close, none, From, none, noflow,
+ State)};
+ BlockReply -> {reply, BlockReply, State}
+ end.
+
+rpc_top_half(Method, Content, From, Sender, Flow,
+ State0 = #state{rpc_requests = RequestQueue}) ->
+ State1 = State0#state{
+ rpc_requests = queue:in({From, Sender, Method, Content, Flow},
+ RequestQueue)},
+ IsFirstElement = queue:is_empty(RequestQueue),
+ if IsFirstElement -> do_rpc(State1);
+ true -> State1
+ end.
+
+rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
+ {{value, {From, _Sender, _Method, _Content, _Flow}}, RequestQueue1} =
+ queue:out(RequestQueue),
+ case From of
+ none -> ok;
+ _ -> gen_server:reply(From, Reply)
+ end,
+ do_rpc(State#state{rpc_requests = RequestQueue1}).
+
+do_rpc(State = #state{rpc_requests = Q,
+ closing = Closing}) ->
+ case queue:out(Q) of
+ {{value, {From, Sender, Method, Content, Flow}}, NewQ} ->
+ State1 = pre_do(Method, Content, Sender, State),
+ DoRet = do(Method, Content, Flow, State1),
+ case ?PROTOCOL:is_method_synchronous(Method) of
+ true -> State1;
+ false -> case {From, DoRet} of
+ {none, _} -> ok;
+ {_, ok} -> gen_server:reply(From, ok);
+ _ -> ok
+ %% Do not reply if error in do. Expecting
+ %% {channel_exit, _, _}
+ end,
+ do_rpc(State1#state{rpc_requests = NewQ})
+ end;
+ {empty, NewQ} ->
+ case Closing of
+ {connection, Reason} ->
+ gen_server:cast(self(),
+ {shutdown, {connection_closing, Reason}});
+ _ ->
+ ok
+ end,
+ State#state{rpc_requests = NewQ}
+ end.
+
+pending_rpc_method(#state{rpc_requests = Q}) ->
+ {value, {_From, _Sender, Method, _Content, _Flow}} = queue:peek(Q),
+ Method.
+
+pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none,
+ _Sender, State) ->
+ State#state{closing = {just_channel, {app_initiated_close, Code, Text}}};
+pre_do(#'basic.consume'{} = Method, none, Sender, State) ->
+ ok = call_to_consumer(Method, Sender, State),
+ State;
+pre_do(#'basic.cancel'{} = Method, none, Sender, State) ->
+ ok = call_to_consumer(Method, Sender, State),
+ State;
+pre_do(_, _, _, State) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Handling of methods from the server
+%%---------------------------------------------------------------------------
+
+safely_handle_method_from_server(Method, Content,
+ Continuation,
+ State = #state{closing = Closing}) ->
+ case is_connection_method(Method) of
+ true -> server_misbehaved(
+ #amqp_error{name = command_invalid,
+ explanation = "connection method on "
+ "non-zero channel",
+ method = element(1, Method)},
+ State);
+ false -> Drop = case {Closing, Method} of
+ {{just_channel, _}, #'channel.close'{}} -> false;
+ {{just_channel, _}, #'channel.close_ok'{}} -> false;
+ {{just_channel, _}, _} -> true;
+ _ -> false
+ end,
+ if Drop -> ?LOG_INFO("Channel (~p): dropping method ~p from "
+ "server because channel is closing~n",
+ [self(), {Method, Content}]),
+ {noreply, State};
+ true ->
+ Continuation()
+ end
+ end.
+
+handle_method_from_server(Method, Content, State) ->
+ Fun = fun () ->
+ handle_method_from_server1(Method,
+ amqp_msg(Content), State)
+ end,
+ safely_handle_method_from_server(Method, Content, Fun, State).
+
+handle_method_from_server(Method = #'basic.deliver'{},
+ Content, DeliveryCtx, State) ->
+ Fun = fun () ->
+ handle_method_from_server1(Method,
+ amqp_msg(Content),
+ DeliveryCtx,
+ State)
+ end,
+ safely_handle_method_from_server(Method, Content, Fun, State).
+
+handle_method_from_server1(#'channel.open_ok'{}, none, State) ->
+ {noreply, rpc_bottom_half(ok, State)};
+handle_method_from_server1(#'channel.close'{reply_code = Code,
+ reply_text = Text},
+ none,
+ State = #state{closing = {just_channel, _}}) ->
+ %% Both client and server sent close at the same time. Don't shutdown yet,
+ %% wait for close_ok.
+ do(#'channel.close_ok'{}, none, noflow, State),
+ {noreply,
+ State#state{
+ closing = {just_channel, {server_initiated_close, Code, Text}}}};
+handle_method_from_server1(#'channel.close'{reply_code = Code,
+ reply_text = Text}, none, State) ->
+ do(#'channel.close_ok'{}, none, noflow, State),
+ handle_shutdown({server_initiated_close, Code, Text}, State);
+handle_method_from_server1(#'channel.close_ok'{}, none,
+ State = #state{closing = Closing}) ->
+ case Closing of
+ {just_channel, {app_initiated_close, _, _} = Reason} ->
+ handle_shutdown(Reason, rpc_bottom_half(ok, State));
+ {just_channel, {server_initiated_close, _, _} = Reason} ->
+ handle_shutdown(Reason,
+ rpc_bottom_half(closing, State));
+ {connection, Reason} ->
+ handle_shutdown({connection_closing, Reason}, State)
+ end;
+handle_method_from_server1(#'basic.consume_ok'{} = ConsumeOk, none, State) ->
+ Consume = #'basic.consume'{} = pending_rpc_method(State),
+ ok = call_to_consumer(ConsumeOk, Consume, State),
+ {noreply, rpc_bottom_half(ConsumeOk, State)};
+handle_method_from_server1(#'basic.cancel_ok'{} = CancelOk, none, State) ->
+ Cancel = #'basic.cancel'{} = pending_rpc_method(State),
+ ok = call_to_consumer(CancelOk, Cancel, State),
+ {noreply, rpc_bottom_half(CancelOk, State)};
+handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
+ ok = call_to_consumer(Cancel, none, State),
+ {noreply, State};
+handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
+ ok = call_to_consumer(Deliver, AmqpMsg, State),
+ {noreply, State};
+handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
+ State = #state{flow_handler = FlowHandler}) ->
+ case FlowHandler of none -> ok;
+ {Pid, _Ref} -> Pid ! Flow
+ end,
+ %% Putting the flow_ok in the queue so that the RPC queue can be
+ %% flushed beforehand. Methods that made it to the queue are not
+ %% blocked in any circumstance.
+ {noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
+ none, noflow, State#state{flow_active = Active})};
+handle_method_from_server1(
+ #'basic.return'{} = BasicReturn, AmqpMsg,
+ State = #state{return_handler = ReturnHandler}) ->
+ case ReturnHandler of
+ none -> ?LOG_WARN("Channel (~p): received {~p, ~p} but there is "
+ "no return handler registered~n",
+ [self(), BasicReturn, AmqpMsg]);
+ {Pid, _Ref} -> Pid ! {BasicReturn, AmqpMsg}
+ end,
+ {noreply, State};
+handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
+ #state{confirm_handler = none} = State) ->
+ {noreply, update_confirm_set(BasicAck, State)};
+handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
+ #state{confirm_handler = {CH, _Ref}} = State) ->
+ CH ! BasicAck,
+ {noreply, update_confirm_set(BasicAck, State)};
+handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
+ #state{confirm_handler = none} = State) ->
+ ?LOG_WARN("Channel (~p): received ~p but there is no "
+ "confirm handler registered~n", [self(), BasicNack]),
+ {noreply, update_confirm_set(BasicNack, State)};
+handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
+ #state{confirm_handler = {CH, _Ref}} = State) ->
+ CH ! BasicNack,
+ {noreply, update_confirm_set(BasicNack, State)};
+
+handle_method_from_server1(#'basic.credit_drained'{} = CreditDrained, none,
+ #state{consumer = Consumer} = State) ->
+ Consumer ! CreditDrained,
+ {noreply, State};
+handle_method_from_server1(Method, none, State) ->
+ {noreply, rpc_bottom_half(Method, State)};
+handle_method_from_server1(Method, Content, State) ->
+ {noreply, rpc_bottom_half({Method, Content}, State)}.
+
+%% only used with manual consumer-to-queue flow control
+handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg,
+ DeliveryCtx, State) ->
+ ok = call_to_consumer(Deliver, AmqpMsg, DeliveryCtx, State),
+ {noreply, State}.
+
+%%---------------------------------------------------------------------------
+%% Other handle_* functions
+%%---------------------------------------------------------------------------
+
+handle_connection_closing(CloseType, Reason,
+ State = #state{rpc_requests = RpcQueue,
+ closing = Closing}) ->
+ NewState = State#state{closing = {connection, Reason}},
+ case {CloseType, Closing, queue:is_empty(RpcQueue)} of
+ {flush, false, false} ->
+ erlang:send_after(?TIMEOUT_FLUSH, self(),
+ timed_out_flushing_channel),
+ {noreply, NewState};
+ {flush, {just_channel, _}, false} ->
+ {noreply, NewState};
+ _ ->
+ handle_shutdown({connection_closing, Reason}, NewState)
+ end.
+
+handle_channel_exit(Reason = #amqp_error{name = ErrorName, explanation = Expl},
+ State = #state{connection = Connection, number = Number}) ->
+ %% Sent by rabbit_channel for hard errors in the direct case
+ ?LOG_ERR("connection ~p, channel ~p - error:~n~p~n",
+ [Connection, Number, Reason]),
+ {true, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
+ ReportedReason = {server_initiated_close, Code, Expl},
+ amqp_gen_connection:hard_error_in_channel(
+ Connection, self(), ReportedReason),
+ handle_shutdown({connection_closing, ReportedReason}, State);
+handle_channel_exit(Reason, State) ->
+ %% Unexpected death of a channel infrastructure process
+ {stop, {infrastructure_died, Reason}, State}.
+
+handle_shutdown({_, 200, _}, State) ->
+ {stop, normal, State};
+handle_shutdown({connection_closing, {_, 200, _}}, State) ->
+ {stop, normal, State};
+handle_shutdown({connection_closing, normal}, State) ->
+ {stop, normal, State};
+handle_shutdown(Reason, State) ->
+ {stop, {shutdown, Reason}, State}.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+do(Method, Content, Flow, #state{driver = network, writer = W}) ->
+ %% Catching because it expects the {channel_exit, _, _} message on error
+ catch case {Content, Flow} of
+ {none, _} -> rabbit_writer:send_command(W, Method);
+ {_, flow} -> rabbit_writer:send_command_flow(W, Method,
+ Content);
+ {_, noflow} -> rabbit_writer:send_command(W, Method, Content)
+ end;
+do(Method, Content, Flow, #state{driver = direct, writer = W}) ->
+ %% ditto catching because...
+ catch case {Content, Flow} of
+ {none, _} -> rabbit_channel_common:do(W, Method);
+ {_, flow} -> rabbit_channel_common:do_flow(W, Method, Content);
+ {_, noflow} -> rabbit_channel_common:do(W, Method, Content)
+ end.
+
+
+flush_writer(#state{driver = network, writer = Writer}) ->
+ try
+ rabbit_writer:flush(Writer)
+ catch
+ exit:noproc -> ok
+ end;
+flush_writer(#state{driver = direct}) ->
+ ok.
+amqp_msg(none) ->
+ none;
+amqp_msg(Content) ->
+ {Props, Payload} = rabbit_basic_common:from_content(Content),
+ #amqp_msg{props = Props, payload = Payload}.
+
+build_content(none) ->
+ none;
+build_content(#amqp_msg{props = Props, payload = Payload}) ->
+ rabbit_basic_common:build_content(Props, Payload).
+
+check_block(_Method, _AmqpMsg, #state{closing = {just_channel, _}}) ->
+ closing;
+check_block(_Method, _AmqpMsg, #state{closing = {connection, _}}) ->
+ closing;
+check_block(_Method, none, #state{}) ->
+ ok;
+check_block(_Method, #amqp_msg{}, #state{flow_active = false}) ->
+ blocked;
+check_block(_Method, _AmqpMsg, #state{}) ->
+ ok.
+
+check_invalid_method(#'channel.open'{}) ->
+ {use_amqp_connection_module,
+ "Use amqp_connection:open_channel/{1,2} instead"};
+check_invalid_method(#'channel.close'{}) ->
+ {use_close_function, "Use close/{1,3} instead"};
+check_invalid_method(Method) ->
+ case is_connection_method(Method) of
+ true -> {connection_methods_not_allowed,
+ "Sending connection methods is not allowed"};
+ false -> ok
+ end.
+
+is_connection_method(Method) ->
+ {ClassId, _} = ?PROTOCOL:method_id(element(1, Method)),
+ ?PROTOCOL:lookup_class_name(ClassId) == connection.
+
+server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) ->
+ case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of
+ {0, _} ->
+ handle_shutdown({server_misbehaved, AmqpError}, State);
+ {_, Close} ->
+ ?LOG_WARN("Channel (~p) flushing and closing due to soft "
+ "error caused by the server ~p~n", [self(), AmqpError]),
+ Self = self(),
+ spawn(fun () -> call(Self, Close) end),
+ {noreply, State}
+ end.
+
+update_confirm_set(#'basic.ack'{delivery_tag = SeqNo,
+ multiple = Multiple},
+ State = #state{unconfirmed_set = USet}) ->
+ maybe_notify_waiters(
+ State#state{unconfirmed_set =
+ update_unconfirmed(SeqNo, Multiple, USet)});
+update_confirm_set(#'basic.nack'{delivery_tag = SeqNo,
+ multiple = Multiple},
+ State = #state{unconfirmed_set = USet}) ->
+ maybe_notify_waiters(
+ State#state{unconfirmed_set = update_unconfirmed(SeqNo, Multiple, USet),
+ only_acks_received = false}).
+
+update_unconfirmed(SeqNo, false, USet) ->
+ gb_sets:del_element(SeqNo, USet);
+update_unconfirmed(SeqNo, true, USet) ->
+ case gb_sets:is_empty(USet) of
+ true -> USet;
+ false -> {S, USet1} = gb_sets:take_smallest(USet),
+ case S > SeqNo of
+ true -> USet;
+ false -> update_unconfirmed(SeqNo, true, USet1)
+ end
+ end.
+
+maybe_notify_waiters(State = #state{unconfirmed_set = USet}) ->
+ case gb_sets:is_empty(USet) of
+ false -> State;
+ true -> notify_confirm_waiters(State)
+ end.
+
+notify_confirm_waiters(State = #state{waiting_set = WSet,
+ only_acks_received = OAR}) ->
+ [begin
+ safe_cancel_timer(TRef),
+ gen_server:reply(From, OAR)
+ end || {From, TRef} <- gb_trees:to_list(WSet)],
+ State#state{waiting_set = gb_trees:empty(),
+ only_acks_received = true}.
+
+do_wait_for_confirms(Channel, Timeout) when is_integer(Timeout) ->
+ case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
+ {error, Reason} -> throw(Reason);
+ Other -> Other
+ end.
+
+handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) ->
+ {reply, {error, not_in_confirm_mode}, State};
+handle_wait_for_confirms(From, Timeout,
+ State = #state{unconfirmed_set = USet,
+ waiting_set = WSet}) ->
+ case gb_sets:is_empty(USet) of
+ true -> {reply, true, State};
+ false -> TRef = case Timeout of
+ infinity -> undefined;
+ _ -> erlang:send_after(
+ Timeout, self(),
+ {confirm_timeout, From})
+ end,
+ {noreply,
+ State#state{waiting_set = gb_trees:insert(From, TRef, WSet)}}
+ end.
+
+call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
+ amqp_gen_consumer:call_consumer(Consumer, Method, Args).
+
+call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) ->
+ amqp_gen_consumer:call_consumer(Consumer, Method, Args, DeliveryCtx).
+
+safe_cancel_timer(undefined) -> ok;
+safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).
+
+second_to_millisecond(Timeout) ->
+ Timeout * 1000.
diff --git a/deps/amqp_client/src/amqp_channel_sup.erl b/deps/amqp_client/src/amqp_channel_sup.erl
new file mode 100644
index 0000000000..9bd85ce946
--- /dev/null
+++ b/deps/amqp_client/src/amqp_channel_sup.erl
@@ -0,0 +1,73 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_channel_sup).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/6]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Type, Connection, ConnName, InfraArgs, ChNumber,
+ Consumer = {_, _}) ->
+ Identity = {ConnName, ChNumber},
+ {ok, Sup} = supervisor2:start_link(?MODULE, [Consumer, Identity]),
+ [{gen_consumer, ConsumerPid, _, _}] = supervisor2:which_children(Sup),
+ {ok, ChPid} = supervisor2:start_child(
+ Sup, {channel,
+ {amqp_channel, start_link,
+ [Type, Connection, ChNumber, ConsumerPid, Identity]},
+ intrinsic, ?WORKER_WAIT, worker, [amqp_channel]}),
+ case start_writer(Sup, Type, InfraArgs, ConnName, ChNumber, ChPid) of
+ {ok, Writer} ->
+ amqp_channel:set_writer(ChPid, Writer),
+ {ok, AState} = init_command_assembler(Type),
+ {ok, Sup, {ChPid, AState}};
+ {error, _}=Error ->
+ Error
+ end.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+%% 1GB
+-define(DEFAULT_GC_THRESHOLD, 1000000000).
+
+start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
+ ConnName, ChNumber, ChPid) ->
+ rpc:call(Node, rabbit_direct, start_channel,
+ [ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
+ VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams]);
+start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
+ GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD),
+ supervisor2:start_child(
+ Sup,
+ {writer, {rabbit_writer, start_link,
+ [Sock, ChNumber, FrameMax, ?PROTOCOL, ChPid,
+ {ConnName, ChNumber}, false, GCThreshold]},
+ transient, ?WORKER_WAIT, worker, [rabbit_writer]}).
+
+init_command_assembler(direct) -> {ok, none};
+init_command_assembler(network) -> rabbit_command_assembler:init(?PROTOCOL).
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([{ConsumerModule, ConsumerArgs}, Identity]) ->
+ {ok, {{one_for_all, 0, 1},
+ [{gen_consumer, {amqp_gen_consumer, start_link,
+ [ConsumerModule, ConsumerArgs, Identity]},
+ intrinsic, ?WORKER_WAIT, worker, [amqp_gen_consumer]}]}}.
diff --git a/deps/amqp_client/src/amqp_channel_sup_sup.erl b/deps/amqp_client/src/amqp_channel_sup_sup.erl
new file mode 100644
index 0000000000..720b0e5726
--- /dev/null
+++ b/deps/amqp_client/src/amqp_channel_sup_sup.erl
@@ -0,0 +1,36 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_channel_sup_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/3, start_channel_sup/4]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Type, Connection, ConnName) ->
+ supervisor2:start_link(?MODULE, [Type, Connection, ConnName]).
+
+start_channel_sup(Sup, InfraArgs, ChannelNumber, Consumer) ->
+ supervisor2:start_child(Sup, [InfraArgs, ChannelNumber, Consumer]).
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([Type, Connection, ConnName]) ->
+ {ok, {{simple_one_for_one, 0, 1},
+ [{channel_sup,
+ {amqp_channel_sup, start_link, [Type, Connection, ConnName]},
+ temporary, infinity, supervisor, [amqp_channel_sup]}]}}.
diff --git a/deps/amqp_client/src/amqp_channels_manager.erl b/deps/amqp_client/src/amqp_channels_manager.erl
new file mode 100644
index 0000000000..2a8d427dc4
--- /dev/null
+++ b/deps/amqp_client/src/amqp_channels_manager.erl
@@ -0,0 +1,249 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_channels_manager).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/3, open_channel/4, set_channel_max/2, is_empty/1,
+ num_channels/1, pass_frame/3, signal_connection_closing/3,
+ process_channel_frame/4]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-record(state, {connection,
+ channel_sup_sup,
+ map_num_pa = gb_trees:empty(), %% Number -> {Pid, AState}
+ map_pid_num = #{}, %% Pid -> Number
+ channel_max = ?MAX_CHANNEL_NUMBER,
+ closing = false}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Connection, ConnName, ChSupSup) ->
+ gen_server:start_link(?MODULE, [Connection, ConnName, ChSupSup], []).
+
+open_channel(ChMgr, ProposedNumber, Consumer, InfraArgs) ->
+ gen_server:call(ChMgr, {open_channel, ProposedNumber, Consumer, InfraArgs},
+ amqp_util:call_timeout()).
+
+set_channel_max(ChMgr, ChannelMax) ->
+ gen_server:cast(ChMgr, {set_channel_max, ChannelMax}).
+
+is_empty(ChMgr) ->
+ gen_server:call(ChMgr, is_empty, amqp_util:call_timeout()).
+
+num_channels(ChMgr) ->
+ gen_server:call(ChMgr, num_channels, amqp_util:call_timeout()).
+
+pass_frame(ChMgr, ChNumber, Frame) ->
+ gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}).
+
+signal_connection_closing(ChMgr, ChannelCloseType, Reason) ->
+ gen_server:cast(ChMgr, {connection_closing, ChannelCloseType, Reason}).
+
+process_channel_frame(Frame, Channel, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> NewAState;
+ {ok, Method, NewAState} -> rabbit_channel_common:do(ChPid, Method),
+ NewAState;
+ {ok, Method, Content, NewAState} -> rabbit_channel_common:do(ChPid, Method,
+ Content),
+ NewAState;
+ {error, Reason} -> ChPid ! {channel_exit, Channel,
+ Reason},
+ AState
+ end.
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init([Connection, ConnName, ChSupSup]) ->
+ ?store_proc_name(ConnName),
+ {ok, #state{connection = Connection, channel_sup_sup = ChSupSup}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_call({open_channel, ProposedNumber, Consumer, InfraArgs}, _,
+ State = #state{closing = false}) ->
+ handle_open_channel(ProposedNumber, Consumer, InfraArgs, State);
+handle_call(is_empty, _, State) ->
+ {reply, internal_is_empty(State), State};
+handle_call(num_channels, _, State) ->
+ {reply, internal_num_channels(State), State}.
+
+handle_cast({set_channel_max, ChannelMax}, State) ->
+ {noreply, State#state{channel_max = ChannelMax}};
+handle_cast({pass_frame, ChNumber, Frame}, State) ->
+ {noreply, internal_pass_frame(ChNumber, Frame, State)};
+handle_cast({connection_closing, ChannelCloseType, Reason}, State) ->
+ handle_connection_closing(ChannelCloseType, Reason, State).
+
+handle_info({'DOWN', _, process, Pid, Reason}, State) ->
+ handle_down(Pid, Reason, State).
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+handle_open_channel(ProposedNumber, Consumer, InfraArgs,
+ State = #state{channel_sup_sup = ChSupSup}) ->
+ case new_number(ProposedNumber, State) of
+ {ok, Number} ->
+ {ok, _ChSup, {Ch, AState}} =
+ amqp_channel_sup_sup:start_channel_sup(ChSupSup, InfraArgs,
+ Number, Consumer),
+ NewState = internal_register(Number, Ch, AState, State),
+ erlang:monitor(process, Ch),
+ {reply, {ok, Ch}, NewState};
+ {error, _} = Error ->
+ {reply, Error, State}
+ end.
+
+new_number(none, #state{channel_max = ChannelMax, map_num_pa = MapNPA}) ->
+ case gb_trees:is_empty(MapNPA) of
+ true -> {ok, 1};
+ false -> {Smallest, _} = gb_trees:smallest(MapNPA),
+ if Smallest > 1 ->
+ {ok, Smallest - 1};
+ true ->
+ {Largest, _} = gb_trees:largest(MapNPA),
+ if Largest < ChannelMax -> {ok, Largest + 1};
+ true -> find_free(MapNPA)
+ end
+ end
+ end;
+new_number(Proposed, State = #state{channel_max = ChannelMax,
+ map_num_pa = MapNPA}) ->
+ IsValid = Proposed > 0 andalso Proposed =< ChannelMax andalso
+ not gb_trees:is_defined(Proposed, MapNPA),
+ case IsValid of true -> {ok, Proposed};
+ false -> new_number(none, State)
+ end.
+
+find_free(MapNPA) ->
+ find_free(gb_trees:iterator(MapNPA), 1).
+
+find_free(It, Candidate) ->
+ case gb_trees:next(It) of
+ {Number, _, It1} -> if Number > Candidate ->
+ {ok, Number - 1};
+ Number =:= Candidate ->
+ find_free(It1, Candidate + 1)
+ end;
+ none -> {error, out_of_channel_numbers}
+ end.
+
+handle_down(Pid, Reason, State) ->
+ case internal_lookup_pn(Pid, State) of
+ undefined -> {stop, {error, unexpected_down}, State};
+ Number -> handle_channel_down(Pid, Number, Reason, State)
+ end.
+
+handle_channel_down(Pid, Number, Reason, State) ->
+ maybe_report_down(Pid, case Reason of {shutdown, R} -> R;
+ _ -> Reason
+ end,
+ State),
+ NewState = internal_unregister(Number, Pid, State),
+ check_all_channels_terminated(NewState),
+ {noreply, NewState}.
+
+maybe_report_down(_Pid, normal, _State) ->
+ ok;
+maybe_report_down(_Pid, shutdown, _State) ->
+ ok;
+maybe_report_down(_Pid, {app_initiated_close, _, _}, _State) ->
+ ok;
+maybe_report_down(_Pid, {server_initiated_close, _, _}, _State) ->
+ ok;
+maybe_report_down(_Pid, {connection_closing, _}, _State) ->
+ ok;
+maybe_report_down(_Pid, {server_misbehaved, AmqpError},
+ #state{connection = Connection}) ->
+ amqp_gen_connection:server_misbehaved(Connection, AmqpError);
+maybe_report_down(Pid, Other, #state{connection = Connection}) ->
+ amqp_gen_connection:channel_internal_error(Connection, Pid, Other).
+
+check_all_channels_terminated(#state{closing = false}) ->
+ ok;
+check_all_channels_terminated(State = #state{closing = true,
+ connection = Connection}) ->
+ case internal_is_empty(State) of
+ true -> amqp_gen_connection:channels_terminated(Connection);
+ false -> ok
+ end.
+
+handle_connection_closing(ChannelCloseType, Reason,
+ State = #state{connection = Connection}) ->
+ case internal_is_empty(State) of
+ true -> amqp_gen_connection:channels_terminated(Connection);
+ false -> signal_channels_connection_closing(ChannelCloseType, Reason,
+ State)
+ end,
+ {noreply, State#state{closing = true}}.
+
+%%---------------------------------------------------------------------------
+
+internal_pass_frame(Number, Frame, State) ->
+ case internal_lookup_npa(Number, State) of
+ undefined ->
+ ?LOG_INFO("Dropping frame ~p for invalid or closed "
+ "channel number ~p~n", [Frame, Number]),
+ State;
+ {ChPid, AState} ->
+ NewAState = process_channel_frame(Frame, Number, ChPid, AState),
+ internal_update_npa(Number, ChPid, NewAState, State)
+ end.
+
+internal_register(Number, Pid, AState,
+ State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
+ MapNPA1 = gb_trees:enter(Number, {Pid, AState}, MapNPA),
+ MapPN1 = maps:put(Pid, Number, MapPN),
+ State#state{map_num_pa = MapNPA1,
+ map_pid_num = MapPN1}.
+
+internal_unregister(Number, Pid,
+ State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
+ MapNPA1 = gb_trees:delete(Number, MapNPA),
+ MapPN1 = maps:remove(Pid, MapPN),
+ State#state{map_num_pa = MapNPA1,
+ map_pid_num = MapPN1}.
+
+internal_is_empty(#state{map_num_pa = MapNPA}) ->
+ gb_trees:is_empty(MapNPA).
+
+internal_num_channels(#state{map_num_pa = MapNPA}) ->
+ gb_trees:size(MapNPA).
+
+internal_lookup_npa(Number, #state{map_num_pa = MapNPA}) ->
+ case gb_trees:lookup(Number, MapNPA) of {value, PA} -> PA;
+ none -> undefined
+ end.
+
+internal_lookup_pn(Pid, #state{map_pid_num = MapPN}) ->
+ case maps:find(Pid, MapPN) of {ok, Number} -> Number;
+ error -> undefined
+ end.
+
+internal_update_npa(Number, Pid, AState, State = #state{map_num_pa = MapNPA}) ->
+ State#state{map_num_pa = gb_trees:update(Number, {Pid, AState}, MapNPA)}.
+
+signal_channels_connection_closing(ChannelCloseType, Reason,
+ #state{map_pid_num = MapPN}) ->
+ [amqp_channel:connection_closing(Pid, ChannelCloseType, Reason)
+ || Pid <- maps:keys(MapPN)].
diff --git a/deps/amqp_client/src/amqp_client.erl b/deps/amqp_client/src/amqp_client.erl
new file mode 100644
index 0000000000..cf85c1b04c
--- /dev/null
+++ b/deps/amqp_client/src/amqp_client.erl
@@ -0,0 +1,37 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_client).
+
+-behaviour(application).
+
+-export([start/0]).
+-export([start/2, stop/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start() ->
+ %% rabbit_common needs compiler and syntax_tools, see
+ %%
+ %% * https://github.com/rabbitmq/rabbitmq-erlang-client/issues/72
+ %% * https://github.com/rabbitmq/rabbitmq-common/pull/149
+ application:ensure_all_started(rabbit_common),
+ {ok, _} = application:ensure_all_started(amqp_client),
+ ok.
+
+%%---------------------------------------------------------------------------
+%% application callbacks
+%%---------------------------------------------------------------------------
+
+start(_StartType, _StartArgs) ->
+ amqp_sup:start_link().
+
+stop(_State) ->
+ ok.
diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl
new file mode 100644
index 0000000000..6800a44a3e
--- /dev/null
+++ b/deps/amqp_client/src/amqp_connection.erl
@@ -0,0 +1,395 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @type close_reason(Type) = {shutdown, amqp_reason(Type)}.
+%% @type amqp_reason(Type) = {Type, Code, Text}
+%% Code = non_neg_integer()
+%% Text = binary().
+%% @doc This module is responsible for maintaining a connection to an AMQP
+%% broker and manages channels within the connection. This module is used to
+%% open and close connections to the broker as well as creating new channels
+%% within a connection.<br/>
+%% The connections and channels created by this module are supervised under
+%% amqp_client's supervision tree. Please note that connections and channels
+%% do not get restarted automatically by the supervision tree in the case of a
+%% failure. If you need robust connections and channels, we recommend you use
+%% Erlang monitors on the returned connection and channel PIDs.<br/>
+%% <br/>
+%% In case of a failure or an AMQP error, the connection process exits with a
+%% meaningful exit reason:<br/>
+%% <br/>
+%% <table>
+%% <tr>
+%% <td><strong>Cause</strong></td>
+%% <td><strong>Exit reason</strong></td>
+%% </tr>
+%% <tr>
+%% <td>Any reason, where Code would have been 200 otherwise</td>
+%% <td>```normal'''</td>
+%% </tr>
+%% <tr>
+%% <td>User application calls amqp_connection:close/3</td>
+%% <td>```close_reason(app_initiated_close)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Server closes connection (hard error)</td>
+%% <td>```close_reason(server_initiated_close)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Server misbehaved (did not follow protocol)</td>
+%% <td>```close_reason(server_misbehaved)'''</td>
+%% </tr>
+%% <tr>
+%% <td>AMQP client internal error - usually caused by a channel exiting
+%% with an unusual reason. This is usually accompanied by a more
+%% detailed error log from the channel</td>
+%% <td>```close_reason(internal_error)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Other error</td>
+%% <td>(various error reasons, causing more detailed logging)</td>
+%% </tr>
+%% </table>
+%% <br/>
+%% See type definitions below.
+-module(amqp_connection).
+
+-include("amqp_client_internal.hrl").
+
+-export([open_channel/1, open_channel/2, open_channel/3, register_blocked_handler/2]).
+-export([start/1, start/2, close/1, close/2, close/3, close/4]).
+-export([error_atom/1]).
+-export([info/2, info_keys/1, info_keys/0]).
+-export([connection_name/1, update_secret/3]).
+-export([socket_adapter_info/2]).
+
+-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
+
+-define(PROTOCOL_SSL_PORT, (?PROTOCOL_PORT - 1)).
+
+%%---------------------------------------------------------------------------
+%% Type Definitions
+%%---------------------------------------------------------------------------
+
+%% @type amqp_adapter_info() = #amqp_adapter_info{}.
+%% @type amqp_params_direct() = #amqp_params_direct{}.
+%% As defined in amqp_client.hrl. It contains the following fields:
+%% <ul>
+%% <li>username :: binary() - The name of a user registered with the broker,
+%% defaults to &lt;&lt;guest"&gt;&gt;</li>
+%% <li>password :: binary() - The password of user, defaults to 'none'</li>
+%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
+%% defaults to &lt;&lt;"/"&gt;&gt;</li>
+%% <li>node :: atom() - The node the broker runs on (direct only)</li>
+%% <li>adapter_info :: amqp_adapter_info() - Extra management information for if
+%% this connection represents a non-AMQP network connection.</li>
+%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
+%% client properties to be sent to the server, defaults to []</li>
+%% </ul>
+%%
+%% @type amqp_params_network() = #amqp_params_network{}.
+%% As defined in amqp_client.hrl. It contains the following fields:
+%% <ul>
+%% <li>username :: binary() - The name of a user registered with the broker,
+%% defaults to &lt;&lt;guest"&gt;&gt;</li>
+%% <li>password :: binary() - The user's password, defaults to
+%% &lt;&lt;"guest"&gt;&gt;</li>
+%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
+%% defaults to &lt;&lt;"/"&gt;&gt;</li>
+%% <li>host :: string() - The hostname of the broker,
+%% defaults to "localhost" (network only)</li>
+%% <li>port :: integer() - The port the broker is listening on,
+%% defaults to 5672 (network only)</li>
+%% <li>channel_max :: non_neg_integer() - The channel_max handshake parameter,
+%% defaults to 0</li>
+%% <li>frame_max :: non_neg_integer() - The frame_max handshake parameter,
+%% defaults to 0 (network only)</li>
+%% <li>heartbeat :: non_neg_integer() - The heartbeat interval in seconds,
+%% defaults to 0 (turned off) (network only)</li>
+%% <li>connection_timeout :: non_neg_integer() | 'infinity'
+%% - The connection timeout in milliseconds,
+%% defaults to 30000 (network only)</li>
+%% <li>ssl_options :: term() - The second parameter to be used with the
+%% ssl:connect/2 function, defaults to 'none' (network only)</li>
+%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
+%% client properties to be sent to the server, defaults to []</li>
+%% <li>socket_options :: [any()] - Extra socket options. These are
+%% appended to the default options. See
+%% <a href="https://www.erlang.org/doc/man/inet.html#setopts-2">inet:setopts/2</a>
+%% and <a href="https://www.erlang.org/doc/man/gen_tcp.html#connect-4">
+%% gen_tcp:connect/4</a> for descriptions of the available options.</li>
+%% </ul>
+
+
+%%---------------------------------------------------------------------------
+%% Starting a connection
+%%---------------------------------------------------------------------------
+
+%% @spec (Params) -> {ok, Connection} | {error, Error}
+%% where
+%% Params = amqp_params_network() | amqp_params_direct()
+%% Connection = pid()
+%% @doc same as {@link amqp_connection:start/2. start(Params, undefined)}
+start(AmqpParams) ->
+ start(AmqpParams, undefined).
+
+%% @spec (Params, ConnectionName) -> {ok, Connection} | {error, Error}
+%% where
+%% Params = amqp_params_network() | amqp_params_direct()
+%% ConnectionName = undefined | binary()
+%% Connection = pid()
+%% @doc Starts a connection to an AMQP server. Use network params to
+%% connect to a remote AMQP server or direct params for a direct
+%% connection to a RabbitMQ server, assuming that the server is
+%% running in the same process space. If the port is set to 'undefined',
+%% the default ports will be selected depending on whether this is a
+%% normal or an SSL connection.
+%% If ConnectionName is binary - it will be added to client_properties as
+%% user specified connection name.
+start(AmqpParams, ConnName) when ConnName == undefined; is_binary(ConnName) ->
+ ensure_started(),
+ AmqpParams0 =
+ case AmqpParams of
+ #amqp_params_direct{password = Password} ->
+ AmqpParams#amqp_params_direct{password = credentials_obfuscation:encrypt(Password)};
+ #amqp_params_network{password = Password} ->
+ AmqpParams#amqp_params_network{password = credentials_obfuscation:encrypt(Password)}
+ end,
+ AmqpParams1 =
+ case AmqpParams0 of
+ #amqp_params_network{port = undefined, ssl_options = none} ->
+ AmqpParams0#amqp_params_network{port = ?PROTOCOL_PORT};
+ #amqp_params_network{port = undefined, ssl_options = _} ->
+ AmqpParams0#amqp_params_network{port = ?PROTOCOL_SSL_PORT};
+ _ ->
+ AmqpParams0
+ end,
+ AmqpParams2 = set_connection_name(ConnName, AmqpParams1),
+ AmqpParams3 = amqp_ssl:maybe_enhance_ssl_options(AmqpParams2),
+ {ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams3),
+ amqp_gen_connection:connect(Connection).
+
+set_connection_name(undefined, Params) -> Params;
+set_connection_name(ConnName,
+ #amqp_params_network{client_properties = Props} = Params) ->
+ Params#amqp_params_network{
+ client_properties = [
+ {<<"connection_name">>, longstr, ConnName} | Props
+ ]};
+set_connection_name(ConnName,
+ #amqp_params_direct{client_properties = Props} = Params) ->
+ Params#amqp_params_direct{
+ client_properties = [
+ {<<"connection_name">>, longstr, ConnName} | Props
+ ]}.
+
+%% Usually the amqp_client application will already be running. We
+%% check whether that is the case by invoking an undocumented function
+%% which does not require a synchronous call to the application
+%% controller. That way we don't risk a dead-lock if, say, the
+%% application controller is in the process of shutting down the very
+%% application which is making this call.
+ensure_started() ->
+ [ensure_started(App) || App <- [syntax_tools, compiler, xmerl,
+ rabbit_common, amqp_client, credentials_obfuscation]].
+
+ensure_started(App) ->
+ case is_pid(application_controller:get_master(App)) andalso amqp_sup:is_ready() of
+ true -> ok;
+ false -> case application:ensure_all_started(App) of
+ {ok, _} -> ok;
+ {error, _} = E -> throw(E)
+ end
+ end.
+
+%%---------------------------------------------------------------------------
+%% Commands
+%%---------------------------------------------------------------------------
+
+%% @doc Invokes open_channel(ConnectionPid, none,
+%% {amqp_selective_consumer, []}). Opens a channel without having to
+%% specify a channel number. This uses the default consumer
+%% implementation.
+open_channel(ConnectionPid) ->
+ open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
+
+%% @doc Invokes open_channel(ConnectionPid, none, Consumer).
+%% Opens a channel without having to specify a channel number.
+open_channel(ConnectionPid, {_, _} = Consumer) ->
+ open_channel(ConnectionPid, none, Consumer);
+
+%% @doc Invokes open_channel(ConnectionPid, ChannelNumber,
+%% {amqp_selective_consumer, []}). Opens a channel, using the default
+%% consumer implementation.
+open_channel(ConnectionPid, ChannelNumber)
+ when is_number(ChannelNumber) orelse ChannelNumber =:= none ->
+ open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
+
+%% @spec (ConnectionPid, ChannelNumber, Consumer) -> Result
+%% where
+%% ConnectionPid = pid()
+%% ChannelNumber = pos_integer() | 'none'
+%% Consumer = {ConsumerModule, ConsumerArgs}
+%% ConsumerModule = atom()
+%% ConsumerArgs = [any()]
+%% Result = {ok, ChannelPid} | {error, Error}
+%% ChannelPid = pid()
+%% @doc Opens an AMQP channel.<br/>
+%% Opens a channel, using a proposed channel number and a specific consumer
+%% implementation.<br/>
+%% ConsumerModule must implement the amqp_gen_consumer behaviour. ConsumerArgs
+%% is passed as parameter to ConsumerModule:init/1.<br/>
+%% This function assumes that an AMQP connection (networked or direct)
+%% has already been successfully established.<br/>
+%% ChannelNumber must be less than or equal to the negotiated
+%% max_channel value, or less than or equal to ?MAX_CHANNEL_NUMBER
+%% (65535) if the negotiated max_channel value is 0.<br/>
+%% In the direct connection, max_channel is always 0.
+open_channel(ConnectionPid, ChannelNumber,
+ {_ConsumerModule, _ConsumerArgs} = Consumer) ->
+ amqp_gen_connection:open_channel(ConnectionPid, ChannelNumber, Consumer).
+
+%% @spec (ConnectionPid) -> ok | Error
+%% where
+%% ConnectionPid = pid()
+%% @doc Closes the channel, invokes
+%% close(Channel, 200, &lt;&lt;"Goodbye"&gt;&gt;).
+close(ConnectionPid) ->
+ close(ConnectionPid, 200, <<"Goodbye">>).
+
+%% @spec (ConnectionPid, Timeout) -> ok | Error
+%% where
+%% ConnectionPid = pid()
+%% Timeout = integer()
+%% @doc Closes the channel, using the supplied Timeout value.
+close(ConnectionPid, Timeout) ->
+ close(ConnectionPid, 200, <<"Goodbye">>, Timeout).
+
+%% @spec (ConnectionPid, Code, Text) -> ok | closing
+%% where
+%% ConnectionPid = pid()
+%% Code = integer()
+%% Text = binary()
+%% @doc Closes the AMQP connection, allowing the caller to set the reply
+%% code and text.
+close(ConnectionPid, Code, Text) ->
+ close(ConnectionPid, Code, Text, amqp_util:call_timeout()).
+
+%% @spec (ConnectionPid, Code, Text, Timeout) -> ok | closing
+%% where
+%% ConnectionPid = pid()
+%% Code = integer()
+%% Text = binary()
+%% Timeout = integer()
+%% @doc Closes the AMQP connection, allowing the caller to set the reply
+%% code and text, as well as a timeout for the operation, after which the
+%% connection will be abruptly terminated.
+close(ConnectionPid, Code, Text, Timeout) ->
+ Close = #'connection.close'{reply_text = Text,
+ reply_code = Code,
+ class_id = 0,
+ method_id = 0},
+ amqp_gen_connection:close(ConnectionPid, Close, Timeout).
+
+register_blocked_handler(ConnectionPid, BlockHandler) ->
+ amqp_gen_connection:register_blocked_handler(ConnectionPid, BlockHandler).
+
+-spec update_secret(pid(), term(), binary()) ->
+ {'ok', rabbit_types:auth_user()} |
+ {'refused', string(), [any()]} |
+ {'error', any()}.
+
+update_secret(ConnectionPid, NewSecret, Reason) ->
+ Update = #'connection.update_secret'{new_secret = NewSecret,
+ reason = Reason},
+ amqp_gen_connection:update_secret(ConnectionPid, Update).
+
+%%---------------------------------------------------------------------------
+%% Other functions
+%%---------------------------------------------------------------------------
+
+%% @spec (Code) -> atom()
+%% where
+%% Code = integer()
+%% @doc Returns a descriptive atom corresponding to the given AMQP
+%% error code.
+error_atom(Code) -> ?PROTOCOL:amqp_exception(Code).
+
+%% @spec (ConnectionPid, Items) -> ResultList
+%% where
+%% ConnectionPid = pid()
+%% Items = [Item]
+%% ResultList = [{Item, Result}]
+%% Item = atom()
+%% Result = term()
+%% @doc Returns information about the connection, as specified by the Items
+%% list. Item may be any atom returned by info_keys/1:
+%%<ul>
+%%<li>type - returns the type of the connection (network or direct)</li>
+%%<li>server_properties - returns the server_properties fields sent by the
+%% server while establishing the connection</li>
+%%<li>is_closing - returns true if the connection is in the process of closing
+%% and false otherwise</li>
+%%<li>amqp_params - returns the #amqp_params{} structure used to start the
+%% connection</li>
+%%<li>num_channels - returns the number of channels currently open under the
+%% connection (excluding channel 0)</li>
+%%<li>channel_max - returns the channel_max value negotiated with the
+%% server</li>
+%%<li>heartbeat - returns the heartbeat value negotiated with the server
+%% (only for the network connection)</li>
+%%<li>frame_max - returns the frame_max value negotiated with the
+%% server (only for the network connection)</li>
+%%<li>sock - returns the socket for the network connection (for use with
+%% e.g. inet:sockname/1) (only for the network connection)</li>
+%%<li>any other value - throws an exception</li>
+%%</ul>
+info(ConnectionPid, Items) ->
+ amqp_gen_connection:info(ConnectionPid, Items).
+
+%% @spec (ConnectionPid) -> Items
+%% where
+%% ConnectionPid = pid()
+%% Items = [Item]
+%% Item = atom()
+%% @doc Returns a list of atoms that can be used in conjunction with info/2.
+%% Note that the list differs from a type of connection to another (network vs.
+%% direct). Use info_keys/0 to get a list of info keys that can be used for
+%% any connection.
+info_keys(ConnectionPid) ->
+ amqp_gen_connection:info_keys(ConnectionPid).
+
+%% @spec () -> Items
+%% where
+%% Items = [Item]
+%% Item = atom()
+%% @doc Returns a list of atoms that can be used in conjunction with info/2.
+%% These are general info keys, which can be used in any type of connection.
+%% Other info keys may exist for a specific type. To get the full list of
+%% atoms that can be used for a certain connection, use info_keys/1.
+info_keys() ->
+ amqp_gen_connection:info_keys().
+
+%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
+%% based on the socket for the protocol given.
+socket_adapter_info(Sock, Protocol) ->
+ amqp_direct_connection:socket_adapter_info(Sock, Protocol).
+
+%% @spec (ConnectionPid) -> ConnectionName
+%% where
+%% ConnectionPid = pid()
+%% ConnectionName = binary()
+%% @doc Returns user specified connection name from client properties
+connection_name(ConnectionPid) ->
+ ClientProperties = case info(ConnectionPid, [amqp_params]) of
+ [{_, #amqp_params_network{client_properties = Props}}] -> Props;
+ [{_, #amqp_params_direct{client_properties = Props}}] -> Props
+ end,
+ case lists:keyfind(<<"connection_name">>, 1, ClientProperties) of
+ {<<"connection_name">>, _, ConnName} -> ConnName;
+ false -> undefined
+ end.
diff --git a/deps/amqp_client/src/amqp_connection_sup.erl b/deps/amqp_client/src/amqp_connection_sup.erl
new file mode 100644
index 0000000000..b71fb54fd4
--- /dev/null
+++ b/deps/amqp_client/src/amqp_connection_sup.erl
@@ -0,0 +1,41 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_connection_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/1]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(AMQPParams) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ {ok, TypeSup} = supervisor2:start_child(
+ Sup, {connection_type_sup,
+ {amqp_connection_type_sup, start_link, []},
+ transient, ?SUPERVISOR_WAIT, supervisor,
+ [amqp_connection_type_sup]}),
+ {ok, Connection} = supervisor2:start_child(
+ Sup, {connection, {amqp_gen_connection, start_link,
+ [TypeSup, AMQPParams]},
+ intrinsic, brutal_kill, worker,
+ [amqp_gen_connection]}),
+ {ok, Sup, Connection}.
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/deps/amqp_client/src/amqp_connection_type_sup.erl b/deps/amqp_client/src/amqp_connection_type_sup.erl
new file mode 100644
index 0000000000..f67dc56836
--- /dev/null
+++ b/deps/amqp_client/src/amqp_connection_type_sup.erl
@@ -0,0 +1,91 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_connection_type_sup).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/0, start_infrastructure_fun/3, type_module/1]).
+
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link() ->
+ supervisor2:start_link(?MODULE, []).
+
+type_module(#amqp_params_direct{}) -> {direct, amqp_direct_connection};
+type_module(#amqp_params_network{}) -> {network, amqp_network_connection}.
+
+%%---------------------------------------------------------------------------
+
+start_channels_manager(Sup, Conn, ConnName, Type) ->
+ {ok, ChSupSup} = supervisor2:start_child(
+ Sup,
+ {channel_sup_sup, {amqp_channel_sup_sup, start_link,
+ [Type, Conn, ConnName]},
+ intrinsic, ?SUPERVISOR_WAIT, supervisor,
+ [amqp_channel_sup_sup]}),
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {channels_manager, {amqp_channels_manager, start_link,
+ [Conn, ConnName, ChSupSup]},
+ transient, ?WORKER_WAIT, worker, [amqp_channels_manager]}).
+
+start_infrastructure_fun(Sup, Conn, network) ->
+ fun (Sock, ConnName) ->
+ {ok, ChMgr} = start_channels_manager(Sup, Conn, ConnName, network),
+ {ok, AState} = rabbit_command_assembler:init(?PROTOCOL),
+ {ok, GCThreshold} = application:get_env(amqp_client, writer_gc_threshold),
+ {ok, Writer} =
+ supervisor2:start_child(
+ Sup,
+ {writer,
+ {rabbit_writer, start_link,
+ [Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Conn, ConnName,
+ false, GCThreshold]},
+ transient, ?WORKER_WAIT, worker, [rabbit_writer]}),
+ {ok, Reader} =
+ supervisor2:start_child(
+ Sup,
+ {main_reader, {amqp_main_reader, start_link,
+ [Sock, Conn, ChMgr, AState, ConnName]},
+ transient, ?WORKER_WAIT, worker, [amqp_main_reader]}),
+ case rabbit_net:controlling_process(Sock, Reader) of
+ ok ->
+ case amqp_main_reader:post_init(Reader) of
+ ok ->
+ {ok, ChMgr, Writer};
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end
+ end;
+start_infrastructure_fun(Sup, Conn, direct) ->
+ fun (ConnName) ->
+ {ok, ChMgr} = start_channels_manager(Sup, Conn, ConnName, direct),
+ {ok, Collector} =
+ supervisor2:start_child(
+ Sup,
+ {collector, {rabbit_queue_collector, start_link, [ConnName]},
+ transient, ?WORKER_WAIT, worker, [rabbit_queue_collector]}),
+ {ok, ChMgr, Collector}
+ end.
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl
new file mode 100644
index 0000000000..a07c67074e
--- /dev/null
+++ b/deps/amqp_client/src/amqp_direct_connection.erl
@@ -0,0 +1,232 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_direct_connection).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(amqp_gen_connection).
+
+-export([server_close/3]).
+
+-export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
+ info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
+
+-export([socket_adapter_info/2]).
+
+-record(state, {node,
+ user,
+ vhost,
+ params,
+ adapter_info,
+ collector,
+ closing_reason, %% undefined | Reason
+ connected_at
+ }).
+
+-define(INFO_KEYS, [type]).
+
+-define(CREATION_EVENT_KEYS, [pid, protocol, host, port, name,
+ peer_host, peer_port,
+ user, vhost, client_properties, type,
+ connected_at, node, user_who_performed_action]).
+
+%%---------------------------------------------------------------------------
+
+%% amqp_connection:close() logically closes from the client end. We may
+%% want to close from the server end.
+server_close(ConnectionPid, Code, Text) ->
+ Close = #'connection.close'{reply_text = Text,
+ reply_code = Code,
+ class_id = 0,
+ method_id = 0},
+ amqp_gen_connection:server_close(ConnectionPid, Close).
+
+init() ->
+ {ok, #state{}}.
+
+open_channel_args(#state{node = Node,
+ user = User,
+ vhost = VHost,
+ collector = Collector,
+ params = Params}) ->
+ [self(), Node, User, VHost, Collector, Params].
+
+do(_Method, _State) ->
+ ok.
+
+handle_message({force_event_refresh, Ref}, State = #state{node = Node}) ->
+ rpc:call(Node, rabbit_event, notify,
+ [connection_created, connection_info(State), Ref]),
+ {ok, State};
+handle_message(closing_timeout, State = #state{closing_reason = Reason}) ->
+ {stop, {closing_timeout, Reason}, State};
+handle_message({'DOWN', _MRef, process, _ConnSup, shutdown}, State) ->
+ {stop, {shutdown, node_down}, State};
+handle_message({'DOWN', _MRef, process, _ConnSup, Reason}, State) ->
+ {stop, {remote_node_down, Reason}, State};
+handle_message({'EXIT', Pid, Reason}, State) ->
+ {stop, rabbit_misc:format("stopping because dependent process ~p died: ~p", [Pid, Reason]), State};
+handle_message(Msg, State) ->
+ {stop, {unexpected_msg, Msg}, State}.
+
+closing(_ChannelCloseType, Reason, State) ->
+ {ok, State#state{closing_reason = Reason}}.
+
+channels_terminated(State = #state{closing_reason = Reason,
+ collector = Collector}) ->
+ rabbit_queue_collector:delete_all(Collector),
+ {stop, {shutdown, Reason}, State}.
+
+terminate(_Reason, #state{node = Node} = State) ->
+ rpc:call(Node, rabbit_direct, disconnect,
+ [self(), [{pid, self()},
+ {node, Node},
+ {name, i(name, State)}]]),
+ ok.
+
+i(type, _State) -> direct;
+i(pid, _State) -> self();
+
+%% Mandatory connection parameters
+
+i(node, #state{node = N}) -> N;
+i(user, #state{params = P}) -> P#amqp_params_direct.username;
+i(user_who_performed_action, St) -> i(user, St);
+i(vhost, #state{params = P}) -> P#amqp_params_direct.virtual_host;
+i(client_properties, #state{params = P}) ->
+ P#amqp_params_direct.client_properties;
+i(connected_at, #state{connected_at = T}) -> T;
+
+%%
+%% Optional adapter info
+%%
+
+%% adapter_info can be undefined e.g. when we were
+%% not granted access to a vhost
+i(_Key, #state{adapter_info = undefined}) -> unknown;
+i(protocol, #state{adapter_info = I}) -> I#amqp_adapter_info.protocol;
+i(host, #state{adapter_info = I}) -> I#amqp_adapter_info.host;
+i(port, #state{adapter_info = I}) -> I#amqp_adapter_info.port;
+i(peer_host, #state{adapter_info = I}) -> I#amqp_adapter_info.peer_host;
+i(peer_port, #state{adapter_info = I}) -> I#amqp_adapter_info.peer_port;
+i(name, #state{adapter_info = I}) -> I#amqp_adapter_info.name;
+i(internal_user, #state{user = U}) -> U;
+i(Item, _State) -> throw({bad_argument, Item}).
+
+info_keys() ->
+ ?INFO_KEYS.
+
+infos(Items, State) ->
+ [{Item, i(Item, State)} || Item <- Items].
+
+connection_info(State = #state{adapter_info = I}) ->
+ infos(?CREATION_EVENT_KEYS, State) ++ I#amqp_adapter_info.additional_info.
+
+connect(Params = #amqp_params_direct{username = Username,
+ password = Password,
+ node = Node,
+ adapter_info = Info,
+ virtual_host = VHost},
+ SIF, _TypeSup, State) ->
+ State1 = State#state{node = Node,
+ vhost = VHost,
+ params = Params,
+ adapter_info = ensure_adapter_info(Info),
+ connected_at =
+ os:system_time(milli_seconds)},
+ DecryptedPassword = credentials_obfuscation:decrypt(Password),
+ case rpc:call(Node, rabbit_direct, connect,
+ [{Username, DecryptedPassword}, VHost, ?PROTOCOL, self(),
+ connection_info(State1)]) of
+ {ok, {User, ServerProperties}} ->
+ {ok, ChMgr, Collector} = SIF(i(name, State1)),
+ State2 = State1#state{user = User,
+ collector = Collector},
+ %% There's no real connection-level process on the remote
+ %% node for us to monitor or link to, but we want to
+ %% detect connection death if the remote node goes down
+ %% when there are no channels. So we monitor the
+ %% supervisor; that way we find out if the node goes down
+ %% or the rabbit app stops.
+ erlang:monitor(process, {rabbit_direct_client_sup, Node}),
+ {ok, {ServerProperties, 0, ChMgr, State2}};
+ {error, _} = E ->
+ E;
+ {badrpc, nodedown} ->
+ {error, {nodedown, Node}}
+ end.
+
+ensure_adapter_info(none) ->
+ ensure_adapter_info(#amqp_adapter_info{});
+
+ensure_adapter_info(A = #amqp_adapter_info{protocol = unknown}) ->
+ ensure_adapter_info(A#amqp_adapter_info{
+ protocol = {'Direct', ?PROTOCOL:version()}});
+
+ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) ->
+ Name = list_to_binary(rabbit_misc:pid_to_string(self())),
+ ensure_adapter_info(A#amqp_adapter_info{name = Name});
+
+ensure_adapter_info(Info) -> Info.
+
+socket_adapter_info(Sock, Protocol) ->
+ {PeerHost, PeerPort, Host, Port} =
+ case rabbit_net:socket_ends(Sock, inbound) of
+ {ok, Res} -> Res;
+ _ -> {unknown, unknown, unknown, unknown}
+ end,
+ Name = case rabbit_net:connection_string(Sock, inbound) of
+ {ok, Res1} -> Res1;
+ _Error -> "(unknown)"
+ end,
+ #amqp_adapter_info{protocol = Protocol,
+ name = list_to_binary(Name),
+ host = Host,
+ port = Port,
+ peer_host = PeerHost,
+ peer_port = PeerPort,
+ additional_info = maybe_ssl_info(Sock)}.
+
+maybe_ssl_info(Sock) ->
+ RealSocket = rabbit_net:unwrap_socket(Sock),
+ case rabbit_net:is_ssl(RealSocket) of
+ true -> [{ssl, true}] ++ ssl_info(RealSocket) ++ ssl_cert_info(RealSocket);
+ false -> [{ssl, false}]
+ end.
+
+ssl_info(Sock) ->
+ {Protocol, KeyExchange, Cipher, Hash} =
+ case rabbit_net:ssl_info(Sock) of
+ {ok, Infos} ->
+ {_, P} = lists:keyfind(protocol, 1, Infos),
+ #{cipher := C,
+ key_exchange := K,
+ mac := H} = proplists:get_value(
+ selected_cipher_suite, Infos),
+ {P, K, C, H};
+ _ ->
+ {unknown, unknown, unknown, unknown}
+ end,
+ [{ssl_protocol, Protocol},
+ {ssl_key_exchange, KeyExchange},
+ {ssl_cipher, Cipher},
+ {ssl_hash, Hash}].
+
+ssl_cert_info(Sock) ->
+ case rabbit_net:peercert(Sock) of
+ {ok, Cert} ->
+ [{peer_cert_issuer, list_to_binary(
+ rabbit_cert_info:issuer(Cert))},
+ {peer_cert_subject, list_to_binary(
+ rabbit_cert_info:subject(Cert))},
+ {peer_cert_validity, list_to_binary(
+ rabbit_cert_info:validity(Cert))}];
+ _ ->
+ []
+ end.
diff --git a/deps/amqp_client/src/amqp_direct_consumer.erl b/deps/amqp_client/src/amqp_direct_consumer.erl
new file mode 100644
index 0000000000..74517a03c8
--- /dev/null
+++ b/deps/amqp_client/src/amqp_direct_consumer.erl
@@ -0,0 +1,103 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @doc This module is an implementation of the amqp_gen_consumer
+%% behaviour and can be used as part of the Consumer parameter when
+%% opening AMQP channels.
+%% <br/>
+%% <br/>
+%% The Consumer parameter for this implementation is {{@module},
+%% [ConsumerPid]@}, where ConsumerPid is a process that will receive
+%% queue subscription-related messages.<br/>
+%% <br/>
+%% This consumer implementation causes the channel to send to the
+%% ConsumerPid all basic.consume, basic.consume_ok, basic.cancel,
+%% basic.cancel_ok and basic.deliver messages received from the
+%% server.
+%% <br/>
+%% <br/>
+%% In addition, this consumer implementation monitors the ConsumerPid
+%% and exits with the same shutdown reason when it dies. 'DOWN'
+%% messages from other sources are passed to ConsumerPid.
+%% <br/>
+%% Warning! It is not recommended to rely on a consumer on killing off the
+%% channel (through the exit signal). That may cause messages to get lost.
+%% Always use amqp_channel:close/{1,3} for a clean shut down.<br/>
+%% <br/>
+%% This module has no public functions.
+-module(amqp_direct_consumer).
+
+-include("amqp_gen_consumer_spec.hrl").
+
+-behaviour(amqp_gen_consumer).
+
+-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
+ handle_cancel/2, handle_server_cancel/2,
+ handle_deliver/3, handle_deliver/4,
+ handle_info/2, handle_call/3, terminate/2]).
+
+%%---------------------------------------------------------------------------
+%% amqp_gen_consumer callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+init([ConsumerPid]) ->
+ erlang:monitor(process, ConsumerPid),
+ {ok, ConsumerPid}.
+
+%% @private
+handle_consume(M, A, C) ->
+ C ! {M, A},
+ {ok, C}.
+
+%% @private
+handle_consume_ok(M, _, C) ->
+ C ! M,
+ {ok, C}.
+
+%% @private
+handle_cancel(M, C) ->
+ C ! M,
+ {ok, C}.
+
+%% @private
+handle_cancel_ok(M, _, C) ->
+ C ! M,
+ {ok, C}.
+
+%% @private
+handle_server_cancel(M, C) ->
+ C ! {server_cancel, M},
+ {ok, C}.
+
+%% @private
+handle_deliver(M, A, C) ->
+ C ! {M, A},
+ {ok, C}.
+handle_deliver(M, A, DeliveryCtx, C) ->
+ C ! {M, A, DeliveryCtx},
+ {ok, C}.
+
+
+%% @private
+handle_info({'DOWN', _MRef, process, C, normal}, C) ->
+ %% The channel was closed.
+ {ok, C};
+handle_info({'DOWN', _MRef, process, C, Info}, C) ->
+ {error, {consumer_died, Info}, C};
+handle_info({'DOWN', MRef, process, Pid, Info}, C) ->
+ C ! {'DOWN', MRef, process, Pid, Info},
+ {ok, C}.
+
+%% @private
+handle_call(M, A, C) ->
+ C ! {M, A},
+ {reply, ok, C}.
+
+%% @private
+terminate(_Reason, C) ->
+ C.
diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl
new file mode 100644
index 0000000000..5c826a5b5f
--- /dev/null
+++ b/deps/amqp_client/src/amqp_gen_connection.erl
@@ -0,0 +1,387 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_gen_connection).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/2, connect/1, open_channel/3, hard_error_in_channel/3,
+ channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
+ close/3, server_close/2, info/2, info_keys/0, info_keys/1,
+ register_blocked_handler/2, update_secret/2]).
+-export([behaviour_info/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-define(INFO_KEYS, [server_properties, is_closing, amqp_params, num_channels,
+ channel_max]).
+
+-record(state, {module,
+ module_state,
+ channels_manager,
+ amqp_params,
+ channel_max,
+ server_properties,
+ %% connection.block, connection.unblock handler
+ block_handler,
+ closing = false %% #closing{} | false
+ }).
+
+-record(closing, {reason,
+ close,
+ from = none}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(TypeSup, AMQPParams) ->
+ gen_server:start_link(?MODULE, {TypeSup, AMQPParams}, []).
+
+connect(Pid) ->
+ gen_server:call(Pid, connect, amqp_util:call_timeout()).
+
+open_channel(Pid, ProposedNumber, Consumer) ->
+ case gen_server:call(Pid,
+ {command, {open_channel, ProposedNumber, Consumer}},
+ amqp_util:call_timeout()) of
+ {ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid),
+ {ok, ChannelPid};
+ Error -> Error
+ end.
+
+hard_error_in_channel(Pid, ChannelPid, Reason) ->
+ gen_server:cast(Pid, {hard_error_in_channel, ChannelPid, Reason}).
+
+channel_internal_error(Pid, ChannelPid, Reason) ->
+ gen_server:cast(Pid, {channel_internal_error, ChannelPid, Reason}).
+
+server_misbehaved(Pid, AmqpError) ->
+ gen_server:cast(Pid, {server_misbehaved, AmqpError}).
+
+channels_terminated(Pid) ->
+ gen_server:cast(Pid, channels_terminated).
+
+close(Pid, Close, Timeout) ->
+ gen_server:call(Pid, {command, {close, Close, Timeout}}, amqp_util:call_timeout()).
+
+server_close(Pid, Close) ->
+ gen_server:cast(Pid, {server_close, Close}).
+
+update_secret(Pid, Method) ->
+ gen_server:call(Pid, {command, {update_secret, Method}}, amqp_util:call_timeout()).
+
+info(Pid, Items) ->
+ gen_server:call(Pid, {info, Items}, amqp_util:call_timeout()).
+
+info_keys() ->
+ ?INFO_KEYS.
+
+info_keys(Pid) ->
+ gen_server:call(Pid, info_keys, amqp_util:call_timeout()).
+
+%%---------------------------------------------------------------------------
+%% Behaviour
+%%---------------------------------------------------------------------------
+
+behaviour_info(callbacks) ->
+ [
+ %% init() -> {ok, InitialState}
+ {init, 0},
+
+ %% terminate(Reason, FinalState) -> Ignored
+ {terminate, 2},
+
+ %% connect(AmqpParams, SIF, TypeSup, State) ->
+ %% {ok, ConnectParams} | {closing, ConnectParams, AmqpError, Reply} |
+ %% {error, Error}
+ %% where
+ %% ConnectParams = {ServerProperties, ChannelMax, ChMgr, NewState}
+ {connect, 4},
+
+ %% do(Method, State) -> Ignored
+ {do, 2},
+
+ %% open_channel_args(State) -> OpenChannelArgs
+ {open_channel_args, 1},
+
+ %% i(InfoItem, State) -> Info
+ {i, 2},
+
+ %% info_keys() -> [InfoItem]
+ {info_keys, 0},
+
+ %% CallbackReply = {ok, NewState} | {stop, Reason, FinalState}
+
+ %% handle_message(Message, State) -> CallbackReply
+ {handle_message, 2},
+
+ %% closing(flush|abrupt, Reason, State) -> CallbackReply
+ {closing, 3},
+
+ %% channels_terminated(State) -> CallbackReply
+ {channels_terminated, 1}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+callback(Function, Params, State = #state{module = Mod,
+ module_state = MState}) ->
+ case erlang:apply(Mod, Function, Params ++ [MState]) of
+ {ok, NewMState} -> {noreply,
+ State#state{module_state = NewMState}};
+ {stop, Reason, NewMState} -> {stop, Reason,
+ State#state{module_state = NewMState}}
+ end.
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init({TypeSup, AMQPParams}) ->
+ %% Trapping exits since we need to make sure that the `terminate/2' is
+ %% called in the case of direct connection (it does not matter for a network
+ %% connection). See bug25116.
+ process_flag(trap_exit, true),
+ %% connect() has to be called first, so we can use a special state here
+ {ok, {TypeSup, AMQPParams}}.
+
+handle_call(connect, _From, {TypeSup, AMQPParams}) ->
+ {Type, Mod} = amqp_connection_type_sup:type_module(AMQPParams),
+ {ok, MState} = Mod:init(),
+ SIF = amqp_connection_type_sup:start_infrastructure_fun(
+ TypeSup, self(), Type),
+ State = #state{module = Mod,
+ module_state = MState,
+ amqp_params = AMQPParams,
+ block_handler = none},
+ case Mod:connect(AMQPParams, SIF, TypeSup, MState) of
+ {ok, Params} ->
+ {reply, {ok, self()}, after_connect(Params, State)};
+ {closing, #amqp_error{name = access_refused} = AmqpError, Error} ->
+ {stop, {shutdown, AmqpError}, Error, State};
+ {closing, Params, #amqp_error{} = AmqpError, Error} ->
+ server_misbehaved(self(), AmqpError),
+ {reply, Error, after_connect(Params, State)};
+ {error, _} = Error ->
+ {stop, {shutdown, Error}, Error, State}
+ end;
+handle_call({command, Command}, From, State = #state{closing = false}) ->
+ handle_command(Command, From, State);
+handle_call({command, _Command}, _From, State) ->
+ {reply, closing, State};
+handle_call({info, Items}, _From, State) ->
+ {reply, [{Item, i(Item, State)} || Item <- Items], State};
+handle_call(info_keys, _From, State = #state{module = Mod}) ->
+ {reply, ?INFO_KEYS ++ Mod:info_keys(), State}.
+
+after_connect({ServerProperties, ChannelMax, ChMgr, NewMState}, State) ->
+ case ChannelMax of
+ 0 -> ok;
+ _ -> amqp_channels_manager:set_channel_max(ChMgr, ChannelMax)
+ end,
+ State1 = State#state{server_properties = ServerProperties,
+ channel_max = ChannelMax,
+ channels_manager = ChMgr,
+ module_state = NewMState},
+ rabbit_misc:store_proc_name(?MODULE, i(name, State1)),
+ State1.
+
+handle_cast({method, Method, none, noflow}, State) ->
+ handle_method(Method, State);
+handle_cast(channels_terminated, State) ->
+ handle_channels_terminated(State);
+handle_cast({hard_error_in_channel, _Pid, Reason}, State) ->
+ server_initiated_close(Reason, State);
+handle_cast({channel_internal_error, Pid, Reason}, State) ->
+ ?LOG_WARN("Connection (~p) closing: internal error in channel (~p): ~p~n",
+ [self(), Pid, Reason]),
+ internal_error(Pid, Reason, State);
+handle_cast({server_misbehaved, AmqpError}, State) ->
+ server_misbehaved_close(AmqpError, State);
+handle_cast({server_close, #'connection.close'{} = Close}, State) ->
+ server_initiated_close(Close, State);
+handle_cast({register_blocked_handler, HandlerPid}, State) ->
+ Ref = erlang:monitor(process, HandlerPid),
+ {noreply, State#state{block_handler = {HandlerPid, Ref}}}.
+
+%% @private
+handle_info({'DOWN', _, process, BlockHandler, Reason},
+ State = #state{block_handler = {BlockHandler, _Ref}}) ->
+ ?LOG_WARN("Connection (~p): Unregistering connection.{blocked,unblocked} handler ~p because it died. "
+ "Reason: ~p~n", [self(), BlockHandler, Reason]),
+ {noreply, State#state{block_handler = none}};
+handle_info({'EXIT', BlockHandler, Reason},
+ State = #state{block_handler = {BlockHandler, Ref}}) ->
+ ?LOG_WARN("Connection (~p): Unregistering connection.{blocked,unblocked} handler ~p because it died. "
+ "Reason: ~p~n", [self(), BlockHandler, Reason]),
+ erlang:demonitor(Ref, [flush]),
+ {noreply, State#state{block_handler = none}};
+%% propagate the exit to the module that will stop with a sensible reason logged
+handle_info({'EXIT', _Pid, _Reason} = Info, State) ->
+ callback(handle_message, [Info], State);
+handle_info(Info, State) ->
+ callback(handle_message, [Info], State).
+
+terminate(Reason, #state{module = Mod, module_state = MState}) ->
+ Mod:terminate(Reason, MState).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%---------------------------------------------------------------------------
+%% Infos
+%%---------------------------------------------------------------------------
+
+i(server_properties, State) -> State#state.server_properties;
+i(is_closing, State) -> State#state.closing =/= false;
+i(amqp_params, State) -> State#state.amqp_params;
+i(channel_max, State) -> State#state.channel_max;
+i(num_channels, State) -> amqp_channels_manager:num_channels(
+ State#state.channels_manager);
+i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState).
+
+%%---------------------------------------------------------------------------
+%% connection.blocked, connection.unblocked
+%%---------------------------------------------------------------------------
+
+register_blocked_handler(Pid, HandlerPid) ->
+ gen_server:cast(Pid, {register_blocked_handler, HandlerPid}).
+
+%%---------------------------------------------------------------------------
+%% Command handling
+%%---------------------------------------------------------------------------
+
+handle_command({open_channel, ProposedNumber, Consumer}, _From,
+ State = #state{channels_manager = ChMgr,
+ module = Mod,
+ module_state = MState}) ->
+ {reply, amqp_channels_manager:open_channel(ChMgr, ProposedNumber, Consumer,
+ Mod:open_channel_args(MState)),
+ State};
+handle_command({close, #'connection.close'{} = Close, Timeout}, From, State) ->
+ app_initiated_close(Close, From, Timeout, State);
+handle_command({update_secret, #'connection.update_secret'{} = Method}, _From,
+ State = #state{module = Mod,
+ module_state = MState}) ->
+ {reply, Mod:do(Method, MState), State}.
+
+%%---------------------------------------------------------------------------
+%% Handling methods from broker
+%%---------------------------------------------------------------------------
+
+handle_method(#'connection.close'{} = Close, State) ->
+ server_initiated_close(Close, State);
+handle_method(#'connection.close_ok'{}, State = #state{closing = Closing}) ->
+ case Closing of #closing{from = none} -> ok;
+ #closing{from = From} -> gen_server:reply(From, ok)
+ end,
+ {stop, {shutdown, closing_to_reason(Closing)}, State};
+handle_method(#'connection.blocked'{} = Blocked, State = #state{block_handler = BlockHandler}) ->
+ case BlockHandler of none -> ok;
+ {Pid, _Ref} -> Pid ! Blocked
+ end,
+ {noreply, State};
+handle_method(#'connection.unblocked'{} = Unblocked, State = #state{block_handler = BlockHandler}) ->
+ case BlockHandler of none -> ok;
+ {Pid, _Ref} -> Pid ! Unblocked
+ end,
+ {noreply, State};
+handle_method(#'connection.update_secret_ok'{} = _Method, State) ->
+ {noreply, State};
+handle_method(Other, State) ->
+ server_misbehaved_close(#amqp_error{name = command_invalid,
+ explanation = "unexpected method on "
+ "channel 0",
+ method = element(1, Other)},
+ State).
+
+%%---------------------------------------------------------------------------
+%% Closing
+%%---------------------------------------------------------------------------
+
+app_initiated_close(Close, From, Timeout, State) ->
+ case Timeout of
+ infinity -> ok;
+ _ -> erlang:send_after(Timeout, self(), closing_timeout)
+ end,
+ set_closing_state(flush, #closing{reason = app_initiated_close,
+ close = Close,
+ from = From}, State).
+
+internal_error(Pid, Reason, State) ->
+ Str = list_to_binary(rabbit_misc:format("~p:~p", [Pid, Reason])),
+ Close = #'connection.close'{reply_text = Str,
+ reply_code = ?INTERNAL_ERROR,
+ class_id = 0,
+ method_id = 0},
+ set_closing_state(abrupt, #closing{reason = internal_error, close = Close},
+ State).
+
+server_initiated_close(Close, State) ->
+ ?LOG_WARN("Connection (~p) closing: received hard error ~p "
+ "from server~n", [self(), Close]),
+ set_closing_state(abrupt, #closing{reason = server_initiated_close,
+ close = Close}, State).
+
+server_misbehaved_close(AmqpError, State) ->
+ ?LOG_WARN("Connection (~p) closing: server misbehaved: ~p~n",
+ [self(), AmqpError]),
+ {0, Close} = rabbit_binary_generator:map_exception(0, AmqpError, ?PROTOCOL),
+ set_closing_state(abrupt, #closing{reason = server_misbehaved,
+ close = Close}, State).
+
+set_closing_state(ChannelCloseType, NewClosing,
+ State = #state{channels_manager = ChMgr,
+ closing = CurClosing}) ->
+ ResClosing =
+ case closing_priority(NewClosing) =< closing_priority(CurClosing) of
+ true -> NewClosing;
+ false -> CurClosing
+ end,
+ ClosingReason = closing_to_reason(ResClosing),
+ amqp_channels_manager:signal_connection_closing(ChMgr, ChannelCloseType,
+ ClosingReason),
+ callback(closing, [ChannelCloseType, ClosingReason],
+ State#state{closing = ResClosing}).
+
+closing_priority(false) -> 99;
+closing_priority(#closing{reason = app_initiated_close}) -> 4;
+closing_priority(#closing{reason = internal_error}) -> 3;
+closing_priority(#closing{reason = server_misbehaved}) -> 2;
+closing_priority(#closing{reason = server_initiated_close}) -> 1.
+
+closing_to_reason(#closing{close = #'connection.close'{reply_code = 200}}) ->
+ normal;
+closing_to_reason(#closing{reason = Reason,
+ close = #'connection.close'{reply_code = Code,
+ reply_text = Text}}) ->
+ {Reason, Code, Text};
+closing_to_reason(#closing{reason = Reason,
+ close = {Reason, _Code, _Text} = Close}) ->
+ Close.
+
+handle_channels_terminated(State = #state{closing = Closing,
+ module = Mod,
+ module_state = MState}) ->
+ #closing{reason = Reason, close = Close, from = From} = Closing,
+ case Reason of
+ server_initiated_close ->
+ Mod:do(#'connection.close_ok'{}, MState);
+ _ ->
+ Mod:do(Close, MState)
+ end,
+ case callback(channels_terminated, [], State) of
+ {stop, _, _} = Stop -> case From of none -> ok;
+ _ -> gen_server:reply(From, ok)
+ end,
+ Stop;
+ Other -> Other
+ end.
diff --git a/deps/amqp_client/src/amqp_gen_consumer.erl b/deps/amqp_client/src/amqp_gen_consumer.erl
new file mode 100644
index 0000000000..9caea78d8a
--- /dev/null
+++ b/deps/amqp_client/src/amqp_gen_consumer.erl
@@ -0,0 +1,284 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @doc A behaviour module for implementing consumers for
+%% amqp_channel. To specify a consumer implementation for a channel,
+%% use amqp_connection:open_channel/{2,3}.
+%% <br/>
+%% All callbacks are called within the gen_consumer process. <br/>
+%% <br/>
+%% See comments in amqp_gen_consumer.erl source file for documentation
+%% on the callback functions.
+%% <br/>
+%% Note that making calls to the channel from the callback module will
+%% result in deadlock.
+-module(amqp_gen_consumer).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server2).
+
+-export([start_link/3, call_consumer/2, call_consumer/3, call_consumer/4]).
+-export([behaviour_info/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2, prioritise_info/3]).
+
+-record(state, {module,
+ module_state}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+%% @type ok_error() = {ok, state()} | {error, reason(), state()}.
+%% Denotes a successful or an error return from a consumer module call.
+
+start_link(ConsumerModule, ExtraParams, Identity) ->
+ gen_server2:start_link(
+ ?MODULE, [ConsumerModule, ExtraParams, Identity], []).
+
+%% @spec (Consumer, Msg) -> ok
+%% where
+%% Consumer = pid()
+%% Msg = any()
+%%
+%% @doc This function is used to perform arbitrary calls into the
+%% consumer module.
+call_consumer(Pid, Msg) ->
+ gen_server2:call(Pid, {consumer_call, Msg}, amqp_util:call_timeout()).
+
+%% @spec (Consumer, Method, Args) -> ok
+%% where
+%% Consumer = pid()
+%% Method = amqp_method()
+%% Args = any()
+%%
+%% @doc This function is used by amqp_channel to forward received
+%% methods and deliveries to the consumer module.
+call_consumer(Pid, Method, Args) ->
+ gen_server2:call(Pid, {consumer_call, Method, Args}, amqp_util:call_timeout()).
+
+call_consumer(Pid, Method, Args, DeliveryCtx) ->
+ gen_server2:call(Pid, {consumer_call, Method, Args, DeliveryCtx}, amqp_util:call_timeout()).
+
+%%---------------------------------------------------------------------------
+%% Behaviour
+%%---------------------------------------------------------------------------
+
+%% @private
+behaviour_info(callbacks) ->
+ [
+ %% init(Args) -> {ok, InitialState} | {stop, Reason} | ignore
+ %% where
+ %% Args = [any()]
+ %% InitialState = state()
+ %% Reason = term()
+ %%
+ %% This callback is invoked by the channel, when it starts
+ %% up. Use it to initialize the state of the consumer. In case of
+ %% an error, return {stop, Reason} or ignore.
+ {init, 1},
+
+ %% handle_consume(Consume, Sender, State) -> ok_error()
+ %% where
+ %% Consume = #'basic.consume'{}
+ %% Sender = pid()
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel before a basic.consume
+ %% is sent to the server.
+ {handle_consume, 3},
+
+ %% handle_consume_ok(ConsumeOk, Consume, State) -> ok_error()
+ %% where
+ %% ConsumeOk = #'basic.consume_ok'{}
+ %% Consume = #'basic.consume'{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a
+ %% basic.consume_ok is received from the server. Consume is the original
+ %% method sent out to the server - it can be used to associate the
+ %% call with the response.
+ {handle_consume_ok, 3},
+
+ %% handle_cancel(Cancel, State) -> ok_error()
+ %% where
+ %% Cancel = #'basic.cancel'{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a basic.cancel
+ %% is sent to the server.
+ {handle_cancel, 2},
+
+ %% handle_cancel_ok(CancelOk, Cancel, State) -> ok_error()
+ %% where
+ %% CancelOk = #'basic.cancel_ok'{}
+ %% Cancel = #'basic.cancel'{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a basic.cancel_ok
+ %% is received from the server.
+ {handle_cancel_ok, 3},
+
+ %% handle_server_cancel(Cancel, State) -> ok_error()
+ %% where
+ %% Cancel = #'basic.cancel'{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a basic.cancel
+ %% is received from the server.
+ {handle_server_cancel, 2},
+
+ %% handle_deliver(Deliver, Message, State) -> ok_error()
+ %% where
+ %% Deliver = #'basic.deliver'{}
+ %% Message = #amqp_msg{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a basic.deliver
+ %% is received from the server.
+ {handle_deliver, 3},
+
+ %% handle_deliver(Deliver, Message,
+ %% DeliveryCtx, State) -> ok_error()
+ %% where
+ %% Deliver = #'basic.deliver'{}
+ %% Message = #amqp_msg{}
+ %% DeliveryCtx = {pid(), pid(), pid()}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a basic.deliver
+ %% is received from the server. Only relevant for channels that use
+ %% direct client connection and manual flow control.
+ {handle_deliver, 4},
+
+ %% handle_info(Info, State) -> ok_error()
+ %% where
+ %% Info = any()
+ %% State = state()
+ %%
+ %% This callback is invoked the consumer process receives a
+ %% message.
+ {handle_info, 2},
+
+ %% handle_call(Msg, From, State) -> {reply, Reply, NewState} |
+ %% {noreply, NewState} |
+ %% {error, Reason, NewState}
+ %% where
+ %% Msg = any()
+ %% From = any()
+ %% Reply = any()
+ %% State = state()
+ %% NewState = state()
+ %%
+ %% This callback is invoked by the channel when calling
+ %% amqp_channel:call_consumer/2. Reply is the term that
+ %% amqp_channel:call_consumer/2 will return. If the callback
+ %% returns {noreply, _}, then the caller to
+ %% amqp_channel:call_consumer/2 and the channel remain blocked
+ %% until gen_server2:reply/2 is used with the provided From as
+ %% the first argument.
+ {handle_call, 3},
+
+ %% terminate(Reason, State) -> any()
+ %% where
+ %% Reason = any()
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel after it has shut down and
+ %% just before its process exits.
+ {terminate, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+%%---------------------------------------------------------------------------
+%% gen_server2 callbacks
+%%---------------------------------------------------------------------------
+
+init([ConsumerModule, ExtraParams, Identity]) ->
+ ?store_proc_name(Identity),
+ case ConsumerModule:init(ExtraParams) of
+ {ok, MState} ->
+ {ok, #state{module = ConsumerModule, module_state = MState}};
+ {stop, Reason} ->
+ {stop, Reason};
+ ignore ->
+ ignore
+ end.
+
+prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _Len, _State) -> 1;
+prioritise_info(_, _Len, _State) -> 0.
+
+consumer_call_reply(Return, State) ->
+ case Return of
+ {ok, NewMState} ->
+ {reply, ok, State#state{module_state = NewMState}};
+ {error, Reason, NewMState} ->
+ {stop, {error, Reason}, {error, Reason},
+ State#state{module_state = NewMState}}
+ end.
+
+handle_call({consumer_call, Msg}, From,
+ State = #state{module = ConsumerModule,
+ module_state = MState}) ->
+ case ConsumerModule:handle_call(Msg, From, MState) of
+ {noreply, NewMState} ->
+ {noreply, State#state{module_state = NewMState}};
+ {reply, Reply, NewMState} ->
+ {reply, Reply, State#state{module_state = NewMState}};
+ {error, Reason, NewMState} ->
+ {stop, {error, Reason}, {error, Reason},
+ State#state{module_state = NewMState}}
+ end;
+handle_call({consumer_call, Method, Args}, _From,
+ State = #state{module = ConsumerModule,
+ module_state = MState}) ->
+ Return =
+ case Method of
+ #'basic.consume'{} ->
+ ConsumerModule:handle_consume(Method, Args, MState);
+ #'basic.consume_ok'{} ->
+ ConsumerModule:handle_consume_ok(Method, Args, MState);
+ #'basic.cancel'{} ->
+ case Args of
+ none -> %% server-sent
+ ConsumerModule:handle_server_cancel(Method, MState);
+ Pid when is_pid(Pid) -> %% client-sent
+ ConsumerModule:handle_cancel(Method, MState)
+ end;
+ #'basic.cancel_ok'{} ->
+ ConsumerModule:handle_cancel_ok(Method, Args, MState);
+ #'basic.deliver'{} ->
+ ConsumerModule:handle_deliver(Method, Args, MState)
+ end,
+ consumer_call_reply(Return, State);
+
+%% only supposed to be used with basic.deliver
+handle_call({consumer_call, Method = #'basic.deliver'{}, Args, DeliveryCtx}, _From,
+ State = #state{module = ConsumerModule,
+ module_state = MState}) ->
+ Return = ConsumerModule:handle_deliver(Method, Args, DeliveryCtx, MState),
+ consumer_call_reply(Return, State).
+
+handle_cast(_What, State) ->
+ {noreply, State}.
+
+handle_info(Info, State = #state{module_state = MState,
+ module = ConsumerModule}) ->
+ case ConsumerModule:handle_info(Info, MState) of
+ {ok, NewMState} ->
+ {noreply, State#state{module_state = NewMState}};
+ {error, Reason, NewMState} ->
+ {stop, {error, Reason}, State#state{module_state = NewMState}}
+ end.
+
+terminate(Reason, #state{module = ConsumerModule, module_state = MState}) ->
+ ConsumerModule:terminate(Reason, MState).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/amqp_client/src/amqp_main_reader.erl b/deps/amqp_client/src/amqp_main_reader.erl
new file mode 100644
index 0000000000..60cd93d03b
--- /dev/null
+++ b/deps/amqp_client/src/amqp_main_reader.erl
@@ -0,0 +1,179 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_main_reader).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/5, post_init/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-record(state, {sock,
+ timer,
+ connection,
+ channels_manager,
+ astate,
+ message = none %% none | {Type, Channel, Length}
+ }).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Sock, Connection, ChMgr, AState, ConnName) ->
+ gen_server:start_link(
+ ?MODULE, [Sock, Connection, ConnName, ChMgr, AState], []).
+
+post_init(Reader) ->
+ try
+ gen_server:call(Reader, post_init)
+ catch
+ exit:{timeout, Timeout} ->
+ {error, {timeout, Timeout}}
+ end.
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init([Sock, Connection, ConnName, ChMgr, AState]) ->
+ ?store_proc_name(ConnName),
+ State = #state{sock = Sock,
+ connection = Connection,
+ channels_manager = ChMgr,
+ astate = AState,
+ message = none},
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% We need to use a call because we are not controlling the socket yet.
+handle_call(post_init, _From, State = #state{sock = Sock}) ->
+ case rabbit_net:setopts(Sock, [{active, once}]) of
+ ok -> {reply, ok, set_timeout(State)};
+ {error, Reason} -> handle_error(Reason, State)
+ end;
+handle_call(Call, From, State) ->
+ {stop, {unexpected_call, Call, From}, State}.
+
+handle_cast(Cast, State) ->
+ {stop, {unexpected_cast, Cast}, State}.
+
+handle_info({Tag, Sock, Data}, State = #state{sock = Sock})
+ when Tag =:= tcp; Tag =:= ssl ->
+ %% Latency hiding: Request next packet first, then process data
+ case rabbit_net:setopts(Sock, [{active, once}]) of
+ ok -> handle_data(Data, set_timeout(State));
+ {error, Reason} -> handle_error(Reason, State)
+ end;
+handle_info({Tag, Sock}, State = #state{sock = Sock})
+ when Tag =:= tcp_closed; Tag =:= ssl_closed ->
+ handle_error(closed, State);
+handle_info({Tag, Sock, Reason}, State = #state{sock = Sock})
+ when Tag =:= tcp_error; Tag =:= ssl_error ->
+ handle_error(Reason, State);
+handle_info({timeout, _TimerRef, idle_timeout}, State) ->
+ handle_error(timeout, State).
+
+handle_data(<<Type:8, Channel:16, Length:32, Payload:Length/binary, ?FRAME_END,
+ More/binary>>,
+ #state{message = none} = State) when
+ Type =:= ?FRAME_METHOD; Type =:= ?FRAME_HEADER;
+ Type =:= ?FRAME_BODY; Type =:= ?FRAME_HEARTBEAT ->
+ %% Optimisation for the direct match
+ handle_data(
+ More, process_frame(Type, Channel, Payload, State#state{message = none}));
+handle_data(<<Type:8, Channel:16, Length:32, Data/binary>>,
+ #state{message = none} = State) when
+ Type =:= ?FRAME_METHOD; Type =:= ?FRAME_HEADER;
+ Type =:= ?FRAME_BODY; Type =:= ?FRAME_HEARTBEAT ->
+ {noreply, State#state{message = {Type, Channel, Length, Data}}};
+handle_data(<<"AMQP", A, B, C>>, #state{sock = Sock, message = none} = State) ->
+ {ok, <<D>>} = rabbit_net:sync_recv(Sock, 1),
+ handle_error({refused, {A, B, C, D}}, State);
+handle_data(<<Malformed:7/binary, _Rest/binary>>,
+ #state{message = none} = State) ->
+ handle_error({malformed_header, Malformed}, State);
+handle_data(<<Data/binary>>, #state{message = none} = State) ->
+ {noreply, State#state{message = {expecting_header, Data}}};
+handle_data(Data, #state{message = {Type, Channel, L, OldData}} = State) ->
+ case <<OldData/binary, Data/binary>> of
+ <<Payload:L/binary, ?FRAME_END, More/binary>> ->
+ handle_data(More,
+ process_frame(Type, Channel, Payload,
+ State#state{message = none}));
+ NotEnough ->
+ %% Read in more data from the socket
+ {noreply, State#state{message = {Type, Channel, L, NotEnough}}}
+ end;
+handle_data(Data,
+ #state{message = {expecting_header, Old}} = State) ->
+ handle_data(<<Old/binary, Data/binary>>, State#state{message = none});
+handle_data(<<>>, State) ->
+ {noreply, State}.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+set_timeout(State0) ->
+ State = cancel_timeout(State0),
+ TimerRef = case amqp_util:call_timeout() of
+ infinity -> undefined;
+ Timeout -> erlang:start_timer(Timeout, self(), idle_timeout)
+ end,
+ State#state{timer=TimerRef}.
+
+cancel_timeout(State=#state{timer=TimerRef}) ->
+ ok = case TimerRef of
+ undefined -> ok;
+ _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
+ end,
+ State#state{timer=undefined}.
+
+process_frame(Type, ChNumber, Payload,
+ State = #state{connection = Connection,
+ channels_manager = ChMgr,
+ astate = AState}) ->
+ case rabbit_command_assembler:analyze_frame(Type, Payload, ?PROTOCOL) of
+ heartbeat when ChNumber /= 0 ->
+ amqp_gen_connection:server_misbehaved(
+ Connection,
+ #amqp_error{name = command_invalid,
+ explanation = "heartbeat on non-zero channel"}),
+ State;
+ %% Match heartbeats but don't do anything with them
+ heartbeat ->
+ State;
+ AnalyzedFrame when ChNumber /= 0 ->
+ amqp_channels_manager:pass_frame(ChMgr, ChNumber, AnalyzedFrame),
+ State;
+ AnalyzedFrame ->
+ State#state{astate = amqp_channels_manager:process_channel_frame(
+ AnalyzedFrame, 0, Connection, AState)}
+ end.
+
+handle_error(closed, State = #state{connection = Conn}) ->
+ Conn ! socket_closed,
+ {noreply, State};
+handle_error({refused, Version}, State = #state{connection = Conn}) ->
+ Conn ! {refused, Version},
+ {noreply, State};
+handle_error({malformed_header, Version}, State = #state{connection = Conn}) ->
+ Conn ! {malformed_header, Version},
+ {noreply, State};
+handle_error(Reason, State = #state{connection = Conn}) ->
+ Conn ! {socket_error, Reason},
+ {stop, {socket_error, Reason}, State}.
diff --git a/deps/amqp_client/src/amqp_network_connection.erl b/deps/amqp_client/src/amqp_network_connection.erl
new file mode 100644
index 0000000000..975ea591da
--- /dev/null
+++ b/deps/amqp_client/src/amqp_network_connection.erl
@@ -0,0 +1,380 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_network_connection).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(amqp_gen_connection).
+-export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
+ info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
+
+-define(RABBIT_TCP_OPTS, [binary, {packet, 0}, {active,false}, {nodelay, true}]).
+-define(SOCKET_CLOSING_TIMEOUT, 1000).
+-define(HANDSHAKE_RECEIVE_TIMEOUT, 60000).
+-define(TCP_MAX_PACKET_SIZE, (16#4000000 + ?EMPTY_FRAME_SIZE - 1)).
+
+-record(state, {sock,
+ name,
+ heartbeat,
+ writer0,
+ frame_max,
+ type_sup,
+ closing_reason, %% undefined | Reason
+ waiting_socket_close = false}).
+
+-define(INFO_KEYS, [type, heartbeat, frame_max, sock, name]).
+
+%%---------------------------------------------------------------------------
+
+init() ->
+ {ok, #state{}}.
+
+open_channel_args(#state{sock = Sock, frame_max = FrameMax}) ->
+ [Sock, FrameMax].
+
+do(#'connection.close_ok'{} = CloseOk, State) ->
+ erlang:send_after(?SOCKET_CLOSING_TIMEOUT, self(), socket_closing_timeout),
+ do2(CloseOk, State);
+do(Method, State) ->
+ do2(Method, State).
+
+do2(Method, #state{writer0 = Writer}) ->
+ %% Catching because it expects the {channel_exit, _, _} message on error
+ catch rabbit_writer:send_command_sync(Writer, Method).
+
+handle_message(socket_closing_timeout,
+ State = #state{closing_reason = Reason}) ->
+ {stop, {socket_closing_timeout, Reason}, State};
+handle_message(socket_closed, State = #state{waiting_socket_close = true,
+ closing_reason = Reason}) ->
+ {stop, {shutdown, Reason}, State};
+handle_message(socket_closed, State = #state{waiting_socket_close = false}) ->
+ {stop, socket_closed_unexpectedly, State};
+handle_message({socket_error, _} = SocketError, State) ->
+ {stop, SocketError, State};
+handle_message({channel_exit, 0, Reason}, State) ->
+ {stop, {channel0_died, Reason}, State};
+handle_message(heartbeat_timeout, State) ->
+ {stop, heartbeat_timeout, State};
+handle_message(closing_timeout, State = #state{closing_reason = Reason}) ->
+ {stop, Reason, State};
+handle_message({'EXIT', Pid, Reason}, State) ->
+ {stop, rabbit_misc:format("stopping because dependent process ~p died: ~p", [Pid, Reason]), State};
+%% see http://erlang.org/pipermail/erlang-bugs/2012-June/002933.html
+handle_message({Ref, {error, Reason}},
+ State = #state{waiting_socket_close = Waiting,
+ closing_reason = CloseReason})
+ when is_reference(Ref) ->
+ {stop, case {Reason, Waiting} of
+ {closed, true} -> {shutdown, CloseReason};
+ {closed, false} -> socket_closed_unexpectedly;
+ {_, _} -> {socket_error, Reason}
+ end, State}.
+
+closing(_ChannelCloseType, {server_initiated_close, _, _} = Reason, State) ->
+ {ok, State#state{waiting_socket_close = true,
+ closing_reason = Reason}};
+closing(_ChannelCloseType, Reason, State) ->
+ {ok, State#state{closing_reason = Reason}}.
+
+channels_terminated(State = #state{closing_reason =
+ {server_initiated_close, _, _}}) ->
+ {ok, State#state{waiting_socket_close = true}};
+channels_terminated(State) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+i(type, _State) -> network;
+i(heartbeat, State) -> State#state.heartbeat;
+i(frame_max, State) -> State#state.frame_max;
+i(sock, State) -> State#state.sock;
+i(name, State) -> State#state.name;
+i(Item, _State) -> throw({bad_argument, Item}).
+
+info_keys() ->
+ ?INFO_KEYS.
+
+%%---------------------------------------------------------------------------
+%% Handshake
+%%---------------------------------------------------------------------------
+
+connect(AmqpParams = #amqp_params_network{host = Host}, SIF, TypeSup, State) ->
+ case gethostaddr(Host) of
+ [] -> {error, unknown_host};
+ [AF|_] -> do_connect(
+ AF, AmqpParams, SIF, State#state{type_sup = TypeSup})
+ end.
+
+do_connect({Addr, Family},
+ AmqpParams = #amqp_params_network{ssl_options = none,
+ port = Port,
+ connection_timeout = Timeout,
+ socket_options = ExtraOpts},
+ SIF, State) ->
+ ok = obtain(),
+ case gen_tcp:connect(Addr, Port,
+ [Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
+ Timeout) of
+ {ok, Sock} -> try_handshake(AmqpParams, SIF,
+ State#state{sock = Sock});
+ {error, _} = E -> E
+ end;
+do_connect({Addr, Family},
+ AmqpParams = #amqp_params_network{ssl_options = SslOpts0,
+ port = Port,
+ connection_timeout = Timeout,
+ socket_options = ExtraOpts},
+ SIF, State) ->
+ {ok, GlobalSslOpts} = application:get_env(amqp_client, ssl_options),
+ app_utils:start_applications([asn1, crypto, public_key, ssl]),
+ ok = obtain(),
+ case gen_tcp:connect(Addr, Port,
+ [Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
+ Timeout) of
+ {ok, Sock} ->
+ SslOpts = rabbit_ssl_options:fix(
+ orddict:to_list(
+ orddict:merge(fun (_, _A, B) -> B end,
+ orddict:from_list(GlobalSslOpts),
+ orddict:from_list(SslOpts0)))),
+ case ssl:connect(Sock, SslOpts, Timeout) of
+ {ok, SslSock} ->
+ try_handshake(AmqpParams, SIF,
+ State#state{sock = SslSock});
+ {error, _} = E ->
+ E
+ end;
+ {error, _} = E ->
+ E
+ end.
+
+inet_address_preference() ->
+ case application:get_env(amqp_client, prefer_ipv6) of
+ {ok, true} -> [inet6, inet];
+ {ok, false} -> [inet, inet6]
+ end.
+
+gethostaddr(Host) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)}
+ || Family <- inet_address_preference()],
+ [{IP, Family} || {Family, {ok, IP}} <- Lookups].
+
+try_handshake(AmqpParams, SIF, State = #state{sock = Sock}) ->
+ Name = case rabbit_net:connection_string(Sock, outbound) of
+ {ok, Str} -> list_to_binary(Str);
+ {error, _} -> <<"unknown">>
+ end,
+ try handshake(AmqpParams, SIF,
+ State#state{name = <<"client ", Name/binary>>}) of
+ Return -> Return
+ catch exit:Reason -> {error, Reason}
+ end.
+
+handshake(AmqpParams, SIF, State0 = #state{sock = Sock}) ->
+ ok = rabbit_net:send(Sock, ?PROTOCOL_HEADER),
+ case start_infrastructure(SIF, State0) of
+ {ok, ChMgr, State1} ->
+ network_handshake(AmqpParams, {ChMgr, State1});
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+start_infrastructure(SIF, State = #state{sock = Sock, name = Name}) ->
+ case SIF(Sock, Name) of
+ {ok, ChMgr, Writer} ->
+ {ok, ChMgr, State#state{writer0 = Writer}};
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+network_handshake(AmqpParams = #amqp_params_network{virtual_host = VHost},
+ {ChMgr, State0}) ->
+ Start = #'connection.start'{server_properties = ServerProperties,
+ mechanisms = Mechanisms} =
+ handshake_recv('connection.start'),
+ ok = check_version(Start),
+ case login(AmqpParams, Mechanisms, State0) of
+ {closing, #amqp_error{}, _Error} = Err ->
+ do(#'connection.close_ok'{}, State0),
+ Err;
+ Tune ->
+ {TuneOk, ChannelMax, State1} = tune(Tune, AmqpParams, State0),
+ do2(TuneOk, State1),
+ do2(#'connection.open'{virtual_host = VHost}, State1),
+ Params = {ServerProperties, ChannelMax, ChMgr, State1},
+ case handshake_recv('connection.open_ok') of
+ #'connection.open_ok'{} -> {ok, Params};
+ {closing, #amqp_error{} = AmqpError, Error} -> {closing, Params,
+ AmqpError, Error}
+ end
+ end.
+
+check_version(#'connection.start'{version_major = ?PROTOCOL_VERSION_MAJOR,
+ version_minor = ?PROTOCOL_VERSION_MINOR}) ->
+ ok;
+check_version(#'connection.start'{version_major = 8,
+ version_minor = 0}) ->
+ exit({protocol_version_mismatch, 0, 8});
+check_version(#'connection.start'{version_major = Major,
+ version_minor = Minor}) ->
+ exit({protocol_version_mismatch, Major, Minor}).
+
+tune(#'connection.tune'{channel_max = ServerChannelMax,
+ frame_max = ServerFrameMax,
+ heartbeat = ServerHeartbeat},
+ #amqp_params_network{channel_max = ClientChannelMax,
+ frame_max = ClientFrameMax,
+ heartbeat = ClientHeartbeat}, State) ->
+ [ChannelMax, Heartbeat, FrameMax] =
+ lists:zipwith(fun (Client, Server) when Client =:= 0; Server =:= 0 ->
+ lists:max([Client, Server]);
+ (Client, Server) ->
+ lists:min([Client, Server])
+ end,
+ [ClientChannelMax, ClientHeartbeat, ClientFrameMax],
+ [ServerChannelMax, ServerHeartbeat, ServerFrameMax]),
+ %% If we attempt to recv > 64Mb, inet_drv will return enomem, so
+ %% we cap the max negotiated frame size accordingly. Note that
+ %% since we receive the frame header separately, we can actually
+ %% cope with frame sizes of 64M + ?EMPTY_FRAME_SIZE - 1.
+ CappedFrameMax = case FrameMax of
+ 0 -> ?TCP_MAX_PACKET_SIZE;
+ _ -> lists:min([FrameMax, ?TCP_MAX_PACKET_SIZE])
+ end,
+ NewState = State#state{heartbeat = Heartbeat, frame_max = CappedFrameMax},
+ start_heartbeat(NewState),
+ {#'connection.tune_ok'{channel_max = ChannelMax,
+ frame_max = CappedFrameMax,
+ heartbeat = Heartbeat}, ChannelMax, NewState}.
+
+start_heartbeat(#state{sock = Sock,
+ name = Name,
+ heartbeat = Heartbeat,
+ type_sup = Sup}) ->
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun () -> catch rabbit_net:send(Sock, Frame) end,
+ Connection = self(),
+ ReceiveFun = fun () -> Connection ! heartbeat_timeout end,
+ rabbit_heartbeat:start(
+ Sup, Sock, Name, Heartbeat, SendFun, Heartbeat, ReceiveFun).
+
+login(Params = #amqp_params_network{auth_mechanisms = ClientMechanisms,
+ client_properties = UserProps},
+ ServerMechanismsStr, State) ->
+ ServerMechanisms = string:tokens(binary_to_list(ServerMechanismsStr), " "),
+ case [{N, S, F} || F <- ClientMechanisms,
+ {N, S} <- [F(none, Params, init)],
+ lists:member(binary_to_list(N), ServerMechanisms)] of
+ [{Name, MState0, Mech}|_] ->
+ {Resp, MState1} = Mech(none, Params, MState0),
+ StartOk = #'connection.start_ok'{
+ client_properties = client_properties(UserProps),
+ mechanism = Name,
+ response = Resp},
+ do2(StartOk, State),
+ login_loop(Mech, MState1, Params, State);
+ [] ->
+ exit({no_suitable_auth_mechanism, ServerMechanisms})
+ end.
+
+login_loop(Mech, MState0, Params, State) ->
+ case handshake_recv('connection.tune') of
+ Tune = #'connection.tune'{} ->
+ Tune;
+ #'connection.secure'{challenge = Challenge} ->
+ {Resp, MState1} = Mech(Challenge, Params, MState0),
+ do2(#'connection.secure_ok'{response = Resp}, State),
+ login_loop(Mech, MState1, Params, State);
+ #'connection.close'{reply_code = ?ACCESS_REFUSED,
+ reply_text = ExplanationBin} ->
+ Explanation = binary_to_list(ExplanationBin),
+ {closing,
+ #amqp_error{name = access_refused,
+ explanation = Explanation},
+ {error, {auth_failure, Explanation}}}
+ end.
+
+client_properties(UserProperties) ->
+ {ok, Vsn} = application:get_key(amqp_client, vsn),
+ Default = [{<<"product">>, longstr, <<"RabbitMQ">>},
+ {<<"version">>, longstr, list_to_binary(Vsn)},
+ {<<"platform">>, longstr, <<"Erlang">>},
+ {<<"copyright">>, longstr,
+ <<"Copyright (c) 2007-2020 VMware, Inc. or its affiliates.">>},
+ {<<"information">>, longstr,
+ <<"Licensed under the MPL. "
+ "See https://www.rabbitmq.com/">>},
+ {<<"capabilities">>, table, ?CLIENT_CAPABILITIES}],
+ lists:foldl(fun({K, _, _} = Tuple, Acc) ->
+ lists:keystore(K, 1, Acc, Tuple)
+ end, Default, UserProperties).
+
+handshake_recv(Expecting) ->
+ receive
+ {'$gen_cast', {method, Method, none, noflow}} ->
+ case {Expecting, element(1, Method)} of
+ {E, M} when E =:= M ->
+ Method;
+ {'connection.tune', 'connection.secure'} ->
+ Method;
+ {'connection.tune', 'connection.close'} ->
+ Method;
+ {'connection.open_ok', 'connection.close'} ->
+ exit(get_reason(Method));
+ {'connection.open_ok', _} ->
+ {closing,
+ #amqp_error{name = command_invalid,
+ explanation = "was expecting "
+ "connection.open_ok"},
+ {error, {unexpected_method, Method,
+ {expecting, Expecting}}}};
+ _ ->
+ throw({unexpected_method, Method,
+ {expecting, Expecting}})
+ end;
+ socket_closed ->
+ case Expecting of
+ 'connection.tune' -> exit({auth_failure, "Disconnected"});
+ 'connection.open_ok' -> exit(access_refused);
+ _ -> exit({socket_closed_unexpectedly,
+ Expecting})
+ end;
+ {socket_error, _} = SocketError ->
+ exit({SocketError, {expecting, Expecting}});
+ {refused, Version} ->
+ exit({server_refused_connection, Version});
+ {malformed_header, All} ->
+ exit({server_sent_malformed_header, All});
+ heartbeat_timeout ->
+ exit(heartbeat_timeout);
+ Other ->
+ throw({handshake_recv_unexpected_message, Other})
+ after ?HANDSHAKE_RECEIVE_TIMEOUT ->
+ case Expecting of
+ 'connection.open_ok' ->
+ {closing,
+ #amqp_error{name = internal_error,
+ explanation = "handshake timed out waiting "
+ "connection.open_ok"},
+ {error, handshake_receive_timed_out}};
+ _ ->
+ exit(handshake_receive_timed_out)
+ end
+ end.
+
+obtain() ->
+ case code:is_loaded(file_handle_cache) of
+ false -> ok;
+ _ -> file_handle_cache:obtain()
+ end.
+
+get_reason(#'connection.close'{reply_code = ErrCode}) ->
+ ?PROTOCOL:amqp_exception(ErrCode).
diff --git a/deps/amqp_client/src/amqp_rpc_client.erl b/deps/amqp_client/src/amqp_rpc_client.erl
new file mode 100644
index 0000000000..3fd9a34650
--- /dev/null
+++ b/deps/amqp_client/src/amqp_rpc_client.erl
@@ -0,0 +1,176 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @doc This module allows the simple execution of an asynchronous RPC over
+%% AMQP. It frees a client programmer of the necessary having to AMQP
+%% plumbing. Note that the this module does not handle any data encoding,
+%% so it is up to the caller to marshall and unmarshall message payloads
+%% accordingly.
+-module(amqp_rpc_client).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start/2, start_link/2, stop/1]).
+-export([call/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3,
+ handle_cast/2, handle_info/2]).
+
+-record(state, {channel,
+ reply_queue,
+ exchange,
+ routing_key,
+ continuations = #{},
+ correlation_id = 0}).
+
+%%--------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------
+
+%% @spec (Connection, Queue) -> RpcClient
+%% where
+%% Connection = pid()
+%% Queue = binary()
+%% RpcClient = pid()
+%% @doc Starts a new RPC client instance that sends requests to a
+%% specified queue. This function returns the pid of the RPC client process
+%% that can be used to invoke RPCs and stop the client.
+start(Connection, Queue) ->
+ {ok, Pid} = gen_server:start(?MODULE, [Connection, Queue], []),
+ Pid.
+
+%% @spec (Connection, Queue) -> RpcClient
+%% where
+%% Connection = pid()
+%% Queue = binary()
+%% RpcClient = pid()
+%% @doc Starts, and links to, a new RPC client instance that sends requests
+%% to a specified queue. This function returns the pid of the RPC client
+%% process that can be used to invoke RPCs and stop the client.
+start_link(Connection, Queue) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [Connection, Queue], []),
+ Pid.
+
+%% @spec (RpcClient) -> ok
+%% where
+%% RpcClient = pid()
+%% @doc Stops an existing RPC client.
+stop(Pid) ->
+ gen_server:call(Pid, stop, amqp_util:call_timeout()).
+
+%% @spec (RpcClient, Payload) -> ok
+%% where
+%% RpcClient = pid()
+%% Payload = binary()
+%% @doc Invokes an RPC. Note the caller of this function is responsible for
+%% encoding the request and decoding the response.
+call(RpcClient, Payload) ->
+ gen_server:call(RpcClient, {call, Payload}, amqp_util:call_timeout()).
+
+%%--------------------------------------------------------------------------
+%% Plumbing
+%%--------------------------------------------------------------------------
+
+%% Sets up a reply queue for this client to listen on
+setup_reply_queue(State = #state{channel = Channel}) ->
+ #'queue.declare_ok'{queue = Q} =
+ amqp_channel:call(Channel, #'queue.declare'{exclusive = true,
+ auto_delete = true}),
+ State#state{reply_queue = Q}.
+
+%% Registers this RPC client instance as a consumer to handle rpc responses
+setup_consumer(#state{channel = Channel, reply_queue = Q}) ->
+ amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
+
+%% Publishes to the broker, stores the From address against
+%% the correlation id and increments the correlationid for
+%% the next request
+publish(Payload, From,
+ State = #state{channel = Channel,
+ reply_queue = Q,
+ exchange = X,
+ routing_key = RoutingKey,
+ correlation_id = CorrelationId,
+ continuations = Continuations}) ->
+ EncodedCorrelationId = base64:encode(<<CorrelationId:64>>),
+ Props = #'P_basic'{correlation_id = EncodedCorrelationId,
+ content_type = <<"application/octet-stream">>,
+ reply_to = Q},
+ Publish = #'basic.publish'{exchange = X,
+ routing_key = RoutingKey,
+ mandatory = true},
+ amqp_channel:call(Channel, Publish, #amqp_msg{props = Props,
+ payload = Payload}),
+ State#state{correlation_id = CorrelationId + 1,
+ continuations = maps:put(EncodedCorrelationId, From, Continuations)}.
+
+%%--------------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% Sets up a reply queue and consumer within an existing channel
+%% @private
+init([Connection, RoutingKey]) ->
+ {ok, Channel} = amqp_connection:open_channel(
+ Connection, {amqp_direct_consumer, [self()]}),
+ InitialState = #state{channel = Channel,
+ exchange = <<>>,
+ routing_key = RoutingKey},
+ State = setup_reply_queue(InitialState),
+ setup_consumer(State),
+ {ok, State}.
+
+%% Closes the channel this gen_server instance started
+%% @private
+terminate(_Reason, #state{channel = Channel}) ->
+ amqp_channel:close(Channel),
+ ok.
+
+%% Handle the application initiated stop by just stopping this gen server
+%% @private
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State};
+
+%% @private
+handle_call({call, Payload}, From, State) ->
+ NewState = publish(Payload, From, State),
+ {noreply, NewState}.
+
+%% @private
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%% @private
+handle_info({#'basic.consume'{}, _Pid}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel_ok'{}, State) ->
+ {stop, normal, State};
+
+%% @private
+handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = #'P_basic'{correlation_id = Id},
+ payload = Payload}},
+ State = #state{continuations = Conts, channel = Channel}) ->
+ From = maps:get(Id, Conts),
+ gen_server:reply(From, Payload),
+ amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
+ {noreply, State#state{continuations = maps:remove(Id, Conts) }}.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/amqp_client/src/amqp_rpc_server.erl b/deps/amqp_client/src/amqp_rpc_server.erl
new file mode 100644
index 0000000000..44e5113a94
--- /dev/null
+++ b/deps/amqp_client/src/amqp_rpc_server.erl
@@ -0,0 +1,138 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @doc This is a utility module that is used to expose an arbitrary function
+%% via an asynchronous RPC over AMQP mechanism. It frees the implementor of
+%% a simple function from having to plumb this into AMQP. Note that the
+%% RPC server does not handle any data encoding, so it is up to the callback
+%% function to marshall and unmarshall message payloads accordingly.
+-module(amqp_rpc_server).
+
+-behaviour(gen_server).
+
+-include("amqp_client.hrl").
+
+-export([init/1, terminate/2, code_change/3, handle_call/3,
+ handle_cast/2, handle_info/2]).
+-export([start/3, start_link/3]).
+-export([stop/1]).
+
+-record(state, {channel,
+ handler}).
+
+%%--------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------
+
+%% @spec (Connection, Queue, RpcHandler) -> RpcServer
+%% where
+%% Connection = pid()
+%% Queue = binary()
+%% RpcHandler = function()
+%% RpcServer = pid()
+%% @doc Starts a new RPC server instance that receives requests via a
+%% specified queue and dispatches them to a specified handler function. This
+%% function returns the pid of the RPC server that can be used to stop the
+%% server.
+start(Connection, Queue, Fun) ->
+ {ok, Pid} = gen_server:start(?MODULE, [Connection, Queue, Fun], []),
+ Pid.
+
+%% @spec (Connection, Queue, RpcHandler) -> RpcServer
+%% where
+%% Connection = pid()
+%% Queue = binary()
+%% RpcHandler = function()
+%% RpcServer = pid()
+%% @doc Starts, and links to, a new RPC server instance that receives
+%% requests via a specified queue and dispatches them to a specified
+%% handler function. This function returns the pid of the RPC server that
+%% can be used to stop the server.
+start_link(Connection, Queue, Fun) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [Connection, Queue, Fun], []),
+ Pid.
+
+%% @spec (RpcServer) -> ok
+%% where
+%% RpcServer = pid()
+%% @doc Stops an existing RPC server.
+stop(Pid) ->
+ gen_server:call(Pid, stop, amqp_util:call_timeout()).
+
+%%--------------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% @private
+init([Connection, Q, Fun]) ->
+ {ok, Channel} = amqp_connection:open_channel(
+ Connection, {amqp_direct_consumer, [self()]}),
+ amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
+ amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
+ {ok, #state{channel = Channel, handler = Fun} }.
+
+%% @private
+handle_info(shutdown, State) ->
+ {stop, normal, State};
+
+%% @private
+handle_info({#'basic.consume'{}, _}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel_ok'{}, State) ->
+ {stop, normal, State};
+
+%% @private
+handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = Props, payload = Payload}},
+ State = #state{handler = Fun, channel = Channel}) ->
+ #'P_basic'{correlation_id = CorrelationId,
+ reply_to = Q} = Props,
+ Response = Fun(Payload),
+ Properties = #'P_basic'{correlation_id = CorrelationId},
+ Publish = #'basic.publish'{exchange = <<>>,
+ routing_key = Q,
+ mandatory = true},
+ amqp_channel:call(Channel, Publish, #amqp_msg{props = Properties,
+ payload = Response}),
+ amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
+ {noreply, State};
+
+%% @private
+handle_info({'DOWN', _MRef, process, _Pid, _Info}, State) ->
+ {noreply, State}.
+
+%% @private
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+%%--------------------------------------------------------------------------
+%% Rest of the gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% @private
+handle_cast(_Message, State) ->
+ {noreply, State}.
+
+%% Closes the channel this gen_server instance started
+%% @private
+terminate(_Reason, #state{channel = Channel}) ->
+ amqp_channel:close(Channel),
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/amqp_client/src/amqp_selective_consumer.erl b/deps/amqp_client/src/amqp_selective_consumer.erl
new file mode 100644
index 0000000000..da1138654e
--- /dev/null
+++ b/deps/amqp_client/src/amqp_selective_consumer.erl
@@ -0,0 +1,265 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @doc This module is an implementation of the amqp_gen_consumer
+%% behaviour and can be used as part of the Consumer parameter when
+%% opening AMQP channels. This is the default implementation selected
+%% by channel. <br/>
+%% <br/>
+%% The Consumer parameter for this implementation is {{@module}, []@}<br/>
+%% This consumer implementation keeps track of consumer tags and sends
+%% the subscription-relevant messages to the registered consumers, according
+%% to an internal tag dictionary.<br/>
+%% <br/>
+%% Send a #basic.consume{} message to the channel to subscribe a
+%% consumer to a queue and send a #basic.cancel{} message to cancel a
+%% subscription.<br/>
+%% <br/>
+%% The channel will send to the relevant registered consumers the
+%% basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver messages
+%% received from the server.<br/>
+%% <br/>
+%% If a consumer is not registered for a given consumer tag, the message
+%% is sent to the default consumer registered with
+%% {@module}:register_default_consumer. If there is no default consumer
+%% registered in this case, an exception occurs and the channel is abruptly
+%% terminated.<br/>
+-module(amqp_selective_consumer).
+
+-include("amqp_gen_consumer_spec.hrl").
+
+-behaviour(amqp_gen_consumer).
+
+-export([register_default_consumer/2]).
+-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
+ handle_cancel/2, handle_server_cancel/2,
+ handle_deliver/3, handle_deliver/4,
+ handle_info/2, handle_call/3, terminate/2]).
+
+-record(state, {consumers = #{}, %% Tag -> ConsumerPid
+ unassigned = undefined, %% Pid
+ monitors = #{}, %% Pid -> {Count, MRef}
+ default_consumer = none}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+%% @spec (ChannelPid, ConsumerPid) -> ok
+%% where
+%% ChannelPid = pid()
+%% ConsumerPid = pid()
+%% @doc This function registers a default consumer with the channel. A
+%% default consumer is used when a subscription is made via
+%% amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than
+%% {@module}:subscribe/3) and hence there is no consumer pid
+%% registered with the consumer tag. In this case, the relevant
+%% deliveries will be sent to the default consumer.
+register_default_consumer(ChannelPid, ConsumerPid) ->
+ amqp_channel:call_consumer(ChannelPid,
+ {register_default_consumer, ConsumerPid}).
+
+%%---------------------------------------------------------------------------
+%% amqp_gen_consumer callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+init([]) ->
+ {ok, #state{}}.
+
+%% @private
+handle_consume(#'basic.consume'{consumer_tag = Tag,
+ nowait = NoWait},
+ Pid, State = #state{consumers = Consumers,
+ monitors = Monitors}) ->
+ Result = case NoWait of
+ true when Tag =:= undefined orelse size(Tag) == 0 ->
+ no_consumer_tag_specified;
+ _ when is_binary(Tag) andalso size(Tag) >= 0 ->
+ case resolve_consumer(Tag, State) of
+ {consumer, _} -> consumer_tag_in_use;
+ _ -> ok
+ end;
+ _ ->
+ ok
+ end,
+ case {Result, NoWait} of
+ {ok, true} ->
+ {ok, State#state
+ {consumers = maps:put(Tag, Pid, Consumers),
+ monitors = add_to_monitor_dict(Pid, Monitors)}};
+ {ok, false} ->
+ {ok, State#state{unassigned = Pid}};
+ {Err, true} ->
+ {error, Err, State};
+ {_Err, false} ->
+ %% Don't do anything (don't override existing
+ %% consumers), the server will close the channel with an error.
+ {ok, State}
+ end.
+
+%% @private
+handle_consume_ok(BasicConsumeOk, _BasicConsume,
+ State = #state{unassigned = Pid,
+ consumers = Consumers,
+ monitors = Monitors})
+ when is_pid(Pid) ->
+ State1 =
+ State#state{
+ consumers = maps:put(tag(BasicConsumeOk), Pid, Consumers),
+ monitors = add_to_monitor_dict(Pid, Monitors),
+ unassigned = undefined},
+ deliver(BasicConsumeOk, State1),
+ {ok, State1}.
+
+%% @private
+%% We sent a basic.cancel.
+handle_cancel(#'basic.cancel'{nowait = true},
+ #state{default_consumer = none}) ->
+ exit(cancel_nowait_requires_default_consumer);
+
+handle_cancel(Cancel = #'basic.cancel'{nowait = NoWait}, State) ->
+ State1 = case NoWait of
+ true -> do_cancel(Cancel, State);
+ false -> State
+ end,
+ {ok, State1}.
+
+%% @private
+%% We sent a basic.cancel and now receive the ok.
+handle_cancel_ok(CancelOk, _Cancel, State) ->
+ State1 = do_cancel(CancelOk, State),
+ %% Use old state
+ deliver(CancelOk, State),
+ {ok, State1}.
+
+%% @private
+%% The server sent a basic.cancel.
+handle_server_cancel(Cancel = #'basic.cancel'{nowait = true}, State) ->
+ State1 = do_cancel(Cancel, State),
+ %% Use old state
+ deliver(Cancel, State),
+ {ok, State1}.
+
+%% @private
+handle_deliver(Method, Message, State) ->
+ deliver(Method, Message, State),
+ {ok, State}.
+
+%% @private
+handle_deliver(Method, Message, DeliveryCtx, State) ->
+ deliver(Method, Message, DeliveryCtx, State),
+ {ok, State}.
+
+%% @private
+handle_info({'DOWN', _MRef, process, Pid, _Info},
+ State = #state{monitors = Monitors,
+ consumers = Consumers,
+ default_consumer = DConsumer }) ->
+ case maps:find(Pid, Monitors) of
+ {ok, _CountMRef} ->
+ {ok, State#state{monitors = maps:remove(Pid, Monitors),
+ consumers =
+ maps:filter(
+ fun (_, Pid1) when Pid1 =:= Pid -> false;
+ (_, _) -> true
+ end, Consumers)}};
+ error ->
+ case Pid of
+ DConsumer -> {ok, State#state{
+ monitors = maps:remove(Pid, Monitors),
+ default_consumer = none}};
+ _ -> {ok, State} %% unnamed consumer went down
+ %% before receiving consume_ok
+ end
+ end;
+handle_info(#'basic.credit_drained'{} = Method, State) ->
+ deliver_to_consumer_or_die(Method, Method, State),
+ {ok, State}.
+
+%% @private
+handle_call({register_default_consumer, Pid}, _From,
+ State = #state{default_consumer = PrevPid,
+ monitors = Monitors}) ->
+ Monitors1 = case PrevPid of
+ none -> Monitors;
+ _ -> remove_from_monitor_dict(PrevPid, Monitors)
+ end,
+ {reply, ok,
+ State#state{default_consumer = Pid,
+ monitors = add_to_monitor_dict(Pid, Monitors1)}}.
+
+%% @private
+terminate(_Reason, State) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+deliver_to_consumer_or_die(Method, Msg, State) ->
+ case resolve_consumer(tag(Method), State) of
+ {consumer, Pid} -> Pid ! Msg;
+ {default, Pid} -> Pid ! Msg;
+ error -> exit(unexpected_delivery_and_no_default_consumer)
+ end.
+
+deliver(Method, State) ->
+ deliver(Method, undefined, State).
+deliver(Method, Message, State) ->
+ Combined = if Message =:= undefined -> Method;
+ true -> {Method, Message}
+ end,
+ deliver_to_consumer_or_die(Method, Combined, State).
+deliver(Method, Message, DeliveryCtx, State) ->
+ Combined = if Message =:= undefined -> Method;
+ true -> {Method, Message, DeliveryCtx}
+ end,
+ deliver_to_consumer_or_die(Method, Combined, State).
+
+do_cancel(Cancel, State = #state{consumers = Consumers,
+ monitors = Monitors}) ->
+ Tag = tag(Cancel),
+ case maps:find(Tag, Consumers) of
+ {ok, Pid} -> State#state{
+ consumers = maps:remove(Tag, Consumers),
+ monitors = remove_from_monitor_dict(Pid, Monitors)};
+ error -> %% Untracked consumer. Do nothing.
+ State
+ end.
+
+resolve_consumer(Tag, #state{consumers = Consumers,
+ default_consumer = DefaultConsumer}) ->
+ case maps:find(Tag, Consumers) of
+ {ok, ConsumerPid} -> {consumer, ConsumerPid};
+ error -> case DefaultConsumer of
+ none -> error;
+ _ -> {default, DefaultConsumer}
+ end
+ end.
+
+tag(#'basic.consume'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.consume_ok'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.cancel'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.cancel_ok'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.deliver'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.credit_drained'{consumer_tag = Tag}) -> Tag.
+
+add_to_monitor_dict(Pid, Monitors) ->
+ case maps:find(Pid, Monitors) of
+ error -> maps:put(Pid,
+ {1, erlang:monitor(process, Pid)},
+ Monitors);
+ {ok, {Count, MRef}} -> maps:put(Pid, {Count + 1, MRef}, Monitors)
+ end.
+
+remove_from_monitor_dict(Pid, Monitors) ->
+ case maps:get(Pid, Monitors) of
+ {1, MRef} -> erlang:demonitor(MRef),
+ maps:remove(Pid, Monitors);
+ {Count, MRef} -> maps:put(Pid, {Count - 1, MRef}, Monitors)
+ end.
diff --git a/deps/amqp_client/src/amqp_ssl.erl b/deps/amqp_client/src/amqp_ssl.erl
new file mode 100644
index 0000000000..ff2bddd55a
--- /dev/null
+++ b/deps/amqp_client/src/amqp_ssl.erl
@@ -0,0 +1,113 @@
+-module(amqp_ssl).
+
+-include("amqp_client_internal.hrl").
+
+-include_lib("public_key/include/public_key.hrl").
+
+-export([maybe_enhance_ssl_options/1,
+ add_verify_fun_to_opts/2,
+ verify_fun/3]).
+
+maybe_enhance_ssl_options(Params = #amqp_params_network{ssl_options = none}) ->
+ Params;
+maybe_enhance_ssl_options(Params = #amqp_params_network{host = Host, ssl_options = Opts0}) ->
+ Opts1 = maybe_add_sni(Host, Opts0),
+ Opts2 = maybe_add_verify(Opts1),
+ Params#amqp_params_network{ssl_options = Opts2};
+maybe_enhance_ssl_options(Params) ->
+ Params.
+
+% https://github.com/erlang/otp/blob/master/lib/inets/src/http_client/httpc_handler.erl
+maybe_add_sni(Host, Options) ->
+ maybe_add_sni_0(lists:keyfind(server_name_indication, 1, Options), Host, Options).
+
+maybe_add_sni_0(false, Host, Options) ->
+ % NB: this is the case where the user did not specify
+ % server_name_indication at all. If Host is a DNS host name,
+ % we will specify server_name_indication via code
+ maybe_add_sni_1(inet_parse:domain(Host), Host, Options);
+maybe_add_sni_0({server_name_indication, disable}, _Host, Options) ->
+ % NB: this is the case where the user explicitly disabled
+ % server_name_indication
+ Options;
+maybe_add_sni_0({server_name_indication, SniHost}, _Host, Options) ->
+ % NB: this is the case where the user explicitly specified
+ % an SNI host name. We may need to add verify_fun for OTP 19
+ maybe_add_verify_fun(lists:keymember(verify_fun, 1, Options), SniHost, Options).
+
+maybe_add_sni_1(false, _Host, Options) ->
+ % NB: host is not a DNS host name, so nothing to add
+ Options;
+maybe_add_sni_1(true, Host, Options) ->
+ Opts1 = [{server_name_indication, Host} | Options],
+ maybe_add_verify_fun(lists:keymember(verify_fun, 1, Opts1), Host, Opts1).
+
+maybe_add_verify_fun(true, _Host, Options) ->
+ % NB: verify_fun already present, don't add twice
+ Options;
+maybe_add_verify_fun(false, Host, Options) ->
+ add_verify_fun_to_opts(lists:keyfind(verify, 1, Options), Host, Options).
+
+maybe_add_verify(Options) ->
+ case lists:keymember(verify, 1, Options) of
+ true ->
+ % NB: user has explicitly set 'verify'
+ Options;
+ _ ->
+ ?LOG_WARN("Connection (~p): Certificate chain verification is not enabled for this TLS connection. "
+ "Please see https://rabbitmq.com/ssl.html for more information.~n", [self()]),
+ Options
+ end.
+ % TODO FUTURE 3.8.x
+ % verify_peer will become the default in RabbitMQ 3.8.0
+ % false ->
+ % [{verify, verify_peer} | Options]
+ % end.
+
+add_verify_fun_to_opts(Host, Options) ->
+ add_verify_fun_to_opts(false, Host, Options).
+
+add_verify_fun_to_opts({verify, verify_none}, _Host, Options) ->
+ % NB: this is the case where the user explicitly disabled
+ % certificate chain verification so there's not much sense
+ % in adding verify_fun
+ Options;
+add_verify_fun_to_opts(_, Host, Options) ->
+ % NB: this is the case where the user either did not
+ % set the verify option or set it to verify_peer
+ case erlang:system_info(otp_release) of
+ "19" ->
+ F = fun ?MODULE:verify_fun/3,
+ [{verify_fun, {F, Host}} | Options];
+ _ -> Options
+ end.
+
+-type hostname() :: nonempty_string() | binary().
+
+-spec verify_fun(Cert :: #'OTPCertificate'{},
+ Event :: {bad_cert, Reason :: atom() | {revoked, atom()}} |
+ {extension, #'Extension'{}}, InitialUserState :: term()) ->
+ {valid, UserState :: term()} | {valid_peer, UserState :: hostname()} |
+ {fail, Reason :: term()} | {unknown, UserState :: term()}.
+verify_fun(_, {bad_cert, _} = Reason, _) ->
+ {fail, Reason};
+verify_fun(_, {extension, _}, UserState) ->
+ {unknown, UserState};
+verify_fun(_, valid, UserState) ->
+ {valid, UserState};
+% NOTE:
+% The user state is the hostname to verify as configured in
+% amqp_ssl:make_verify_fun
+verify_fun(Cert, valid_peer, Hostname) when Hostname =/= disable ->
+ verify_hostname(Cert, Hostname);
+verify_fun(_, valid_peer, UserState) ->
+ {valid, UserState}.
+
+% https://github.com/erlang/otp/blob/master/lib/ssl/src/ssl_certificate.erl
+verify_hostname(Cert, Hostname) ->
+ case public_key:pkix_verify_hostname(Cert, [{dns_id, Hostname}]) of
+ true ->
+ {valid, Hostname};
+ false ->
+ {fail, {bad_cert, hostname_check_failed}}
+ end.
diff --git a/deps/amqp_client/src/amqp_sup.erl b/deps/amqp_client/src/amqp_sup.erl
new file mode 100644
index 0000000000..05bc8e4185
--- /dev/null
+++ b/deps/amqp_client/src/amqp_sup.erl
@@ -0,0 +1,38 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/0, is_ready/0, start_connection_sup/1]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link() ->
+ supervisor2:start_link({local, amqp_sup}, ?MODULE, []).
+
+is_ready() ->
+ whereis(amqp_sup) =/= undefined.
+
+start_connection_sup(AmqpParams) ->
+ supervisor2:start_child(amqp_sup, [AmqpParams]).
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{simple_one_for_one, 0, 1},
+ [{connection_sup, {amqp_connection_sup, start_link, []},
+ temporary, ?SUPERVISOR_WAIT, supervisor, [amqp_connection_sup]}]}}.
diff --git a/deps/amqp_client/src/amqp_uri.erl b/deps/amqp_client/src/amqp_uri.erl
new file mode 100644
index 0000000000..ff545a48b2
--- /dev/null
+++ b/deps/amqp_client/src/amqp_uri.erl
@@ -0,0 +1,273 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(amqp_uri).
+
+-include("amqp_client.hrl").
+
+-export([parse/1, parse/2, remove_credentials/1]).
+
+%%---------------------------------------------------------------------------
+%% AMQP URI Parsing
+%%---------------------------------------------------------------------------
+
+%% Reformat a URI to remove authentication secrets from it (before we
+%% log it or display it anywhere).
+-spec remove_credentials(URI :: string() | binary()) -> string().
+remove_credentials(URI) ->
+ UriString = rabbit_data_coercion:to_list(URI),
+ Props = uri_parser:parse(UriString,
+ [{host, undefined}, {path, undefined},
+ {port, undefined}, {'query', []}]),
+ PortPart = case proplists:get_value(port, Props) of
+ undefined -> "";
+ Port -> rabbit_misc:format(":~B", [Port])
+ end,
+ PGet = fun(K, P) -> case proplists:get_value(K, P) of
+ undefined -> "";
+ R -> R
+ end
+ end,
+ rabbit_misc:format(
+ "~s://~s~s~s", [proplists:get_value(scheme, Props), PGet(host, Props),
+ PortPart, PGet(path, Props)]).
+
+%% @spec (Uri) -> {ok, #amqp_params_network{} | #amqp_params_direct{}} |
+%% {error, {Info, Uri}}
+%% where
+%% Uri = string()
+%% Info = any()
+%%
+%% @doc Parses an AMQP URI. If any of the URI parts are missing, the
+%% default values are used. If the hostname is zero-length, an
+%% #amqp_params_direct{} record is returned; otherwise, an
+%% #amqp_params_network{} record is returned. Extra parameters may be
+%% specified via the query string
+%% (e.g. "?heartbeat=5&amp;auth_mechanism=external"). In case of failure,
+%% an {error, {Info, Uri}} tuple is returned.
+%%
+%% The extra parameters that may be specified are channel_max,
+%% frame_max, heartbeat and auth_mechanism (the latter can appear more
+%% than once). The extra parameters that may be specified for an SSL
+%% connection are cacertfile, certfile, keyfile, verify,
+%% fail_if_no_peer_cert, password, and depth.
+-type parse_result() :: {ok, #amqp_params_network{}} |
+ {ok, #amqp_params_direct{}} |
+ {error, {any(), string()}}.
+
+-spec parse(Uri :: string() | binary()) -> parse_result().
+parse(Uri) -> parse(Uri, <<"/">>).
+
+-spec parse(Uri :: string() | binary(), DefaultVHost :: binary()) -> parse_result().
+parse(Uri, DefaultVHost) ->
+ try return(parse1(Uri, DefaultVHost))
+ catch throw:Err -> {error, {Err, Uri}};
+ error:Err -> {error, {Err, Uri}}
+ end.
+
+parse1(Uri, DefaultVHost) when is_list(Uri); is_binary(Uri) ->
+ UriString = rabbit_data_coercion:to_list(Uri),
+ case uri_parser:parse(UriString, [{host, undefined}, {path, undefined},
+ {port, undefined}, {'query', []}]) of
+ {error, Err} ->
+ throw({unable_to_parse_uri, Err});
+ Parsed ->
+ Endpoint =
+ case string:to_lower(proplists:get_value(scheme, Parsed)) of
+ "amqp" -> build_broker(Parsed, DefaultVHost);
+ "amqps" -> build_ssl_broker(Parsed, DefaultVHost);
+ Scheme -> fail({unexpected_uri_scheme, Scheme})
+ end,
+ return({ok, broker_add_query(Endpoint, Parsed)})
+ end;
+parse1(_, _DefaultVHost) ->
+ fail(expected_string_uri).
+
+unescape_string(Atom) when is_atom(Atom) ->
+ Atom;
+unescape_string(Integer) when is_integer(Integer) ->
+ Integer;
+unescape_string([]) ->
+ [];
+unescape_string([$%, N1, N2 | Rest]) ->
+ try
+ [erlang:list_to_integer([N1, N2], 16) | unescape_string(Rest)]
+ catch
+ error:badarg -> throw({invalid_entitiy, ['%', N1, N2]})
+ end;
+unescape_string([$% | Rest]) ->
+ fail({unterminated_entity, ['%' | Rest]});
+unescape_string([C | Rest]) ->
+ [C | unescape_string(Rest)].
+
+build_broker(ParsedUri, DefaultVHost) ->
+ [Host, Port, Path] =
+ [proplists:get_value(F, ParsedUri) || F <- [host, port, path]],
+ case Port =:= undefined orelse (0 < Port andalso Port =< 65535) of
+ true -> ok;
+ false -> fail({port_out_of_range, Port})
+ end,
+ VHost = case Path of
+ undefined -> DefaultVHost;
+ [$/|Rest] -> case string:chr(Rest, $/) of
+ 0 -> list_to_binary(unescape_string(Rest));
+ _ -> fail({invalid_vhost, Rest})
+ end
+ end,
+ UserInfo = proplists:get_value(userinfo, ParsedUri),
+ Record = case {unescape_string(Host), Port} of
+ {undefined, undefined} ->
+ #amqp_params_direct{virtual_host = VHost};
+ {undefined, _Port} ->
+ fail(port_requires_host);
+ {Host1, Port1} ->
+ Mech = mechanisms(ParsedUri),
+ #amqp_params_network{host = Host1,
+ port = Port1,
+ virtual_host = VHost,
+ auth_mechanisms = Mech}
+ end,
+ set_user_info(Record, UserInfo).
+
+set_user_info(Ps, UserInfo) ->
+ case UserInfo of
+ [U, P | _] -> set([{username, list_to_binary(unescape_string(U))},
+ {password, list_to_binary(unescape_string(P))}], Ps);
+
+ [U] -> set([{username, list_to_binary(unescape_string(U))}], Ps);
+ [] -> Ps
+ end.
+
+set(KVs, Ps = #amqp_params_direct{}) ->
+ set(KVs, Ps, record_info(fields, amqp_params_direct));
+set(KVs, Ps = #amqp_params_network{}) ->
+ set(KVs, Ps, record_info(fields, amqp_params_network)).
+
+set(KVs, Ps, Fields) ->
+ {Ps1, _Ix} = lists:foldl(fun (Field, {PsN, Ix}) ->
+ {case lists:keyfind(Field, 1, KVs) of
+ false -> PsN;
+ {_, V} -> setelement(Ix, PsN, V)
+ end, Ix + 1}
+ end, {Ps, 2}, Fields),
+ Ps1.
+
+build_ssl_broker(ParsedUri, DefaultVHost) ->
+ Params0 = build_broker(ParsedUri, DefaultVHost),
+ Query = proplists:get_value('query', ParsedUri),
+ SSLOptions =
+ run_state_monad(
+ [fun (L) -> KeyString = atom_to_list(Key),
+ case lists:keysearch(KeyString, 1, Query) of
+ {value, {_, Value}} ->
+ try return([{Key, unescape_string(Fun(Value))} | L])
+ catch throw:Reason ->
+ fail({invalid_ssl_parameter,
+ Key, Value, Query, Reason})
+ end;
+ false ->
+ L
+ end
+ end || {Fun, Key} <-
+ [{fun find_path_parameter/1, cacertfile},
+ {fun find_path_parameter/1, certfile},
+ {fun find_path_parameter/1, keyfile},
+ {fun find_atom_parameter/1, verify},
+ {fun find_boolean_parameter/1, fail_if_no_peer_cert},
+ {fun find_identity_parameter/1, password},
+ {fun find_sni_parameter/1, server_name_indication},
+ {fun find_integer_parameter/1, depth}]],
+ []),
+ Params1 = Params0#amqp_params_network{ssl_options = SSLOptions},
+ amqp_ssl:maybe_enhance_ssl_options(Params1).
+
+broker_add_query(Params = #amqp_params_direct{}, Uri) ->
+ broker_add_query(Params, Uri, record_info(fields, amqp_params_direct));
+broker_add_query(Params = #amqp_params_network{}, Uri) ->
+ broker_add_query(Params, Uri, record_info(fields, amqp_params_network)).
+
+broker_add_query(Params, ParsedUri, Fields) ->
+ Query = proplists:get_value('query', ParsedUri),
+ {Params1, _Pos} =
+ run_state_monad(
+ [fun ({ParamsN, Pos}) ->
+ Pos1 = Pos + 1,
+ KeyString = atom_to_list(Field),
+ case proplists:get_value(KeyString, Query) of
+ undefined ->
+ return({ParamsN, Pos1});
+ true -> %% proplists short form, not permitted
+ return({ParamsN, Pos1});
+ Value ->
+ try
+ ValueParsed = parse_amqp_param(Field, Value),
+ return(
+ {setelement(Pos, ParamsN, ValueParsed), Pos1})
+ catch throw:Reason ->
+ fail({invalid_amqp_params_parameter,
+ Field, Value, Query, Reason})
+ end
+ end
+ end || Field <- Fields], {Params, 2}),
+ Params1.
+
+parse_amqp_param(Field, String) when Field =:= channel_max orelse
+ Field =:= frame_max orelse
+ Field =:= heartbeat orelse
+ Field =:= connection_timeout orelse
+ Field =:= depth ->
+ find_integer_parameter(String);
+parse_amqp_param(Field, String) when Field =:= password ->
+ find_identity_parameter(String);
+parse_amqp_param(Field, String) ->
+ fail({parameter_unconfigurable_in_query, Field, String}).
+
+find_path_parameter(Value) ->
+ find_identity_parameter(Value).
+
+find_sni_parameter("disable") ->
+ disable;
+find_sni_parameter(Value) ->
+ find_identity_parameter(Value).
+
+find_identity_parameter(Value) -> return(Value).
+
+find_integer_parameter(Value) ->
+ try return(list_to_integer(Value))
+ catch error:badarg -> fail({not_an_integer, Value})
+ end.
+
+find_boolean_parameter(Value) ->
+ Bool = list_to_atom(Value),
+ case is_boolean(Bool) of
+ true -> return(Bool);
+ false -> fail({require_boolean, Bool})
+ end.
+
+find_atom_parameter(Value) -> return(list_to_atom(Value)).
+
+mechanisms(ParsedUri) ->
+ Query = proplists:get_value('query', ParsedUri),
+ Mechanisms = case proplists:get_all_values("auth_mechanism", Query) of
+ [] -> ["plain", "amqplain"];
+ Mechs -> Mechs
+ end,
+ [case [list_to_atom(T) || T <- string:tokens(Mech, ":")] of
+ [F] -> fun (R, P, S) -> amqp_auth_mechanisms:F(R, P, S) end;
+ [M, F] -> fun (R, P, S) -> M:F(R, P, S) end;
+ L -> throw({not_mechanism, L})
+ end || Mech <- Mechanisms].
+
+%% --=: Plain state monad implementation start :=--
+run_state_monad(FunList, State) ->
+ lists:foldl(fun (Fun, StateN) -> Fun(StateN) end, State, FunList).
+
+return(V) -> V.
+
+-spec fail(_) -> no_return().
+fail(Reason) -> throw(Reason).
+%% --=: end :=--
diff --git a/deps/amqp_client/src/amqp_util.erl b/deps/amqp_client/src/amqp_util.erl
new file mode 100644
index 0000000000..df7ce30662
--- /dev/null
+++ b/deps/amqp_client/src/amqp_util.erl
@@ -0,0 +1,17 @@
+-module(amqp_util).
+
+-include("amqp_client_internal.hrl").
+
+-export([call_timeout/0]).
+
+call_timeout() ->
+ case get(gen_server_call_timeout) of
+ undefined ->
+ Timeout = rabbit_misc:get_env(amqp_client,
+ gen_server_call_timeout,
+ 60000),
+ put(gen_server_call_timeout, Timeout),
+ Timeout;
+ Timeout ->
+ Timeout
+ end.
diff --git a/deps/amqp_client/src/overview.edoc.in b/deps/amqp_client/src/overview.edoc.in
new file mode 100644
index 0000000000..799b293239
--- /dev/null
+++ b/deps/amqp_client/src/overview.edoc.in
@@ -0,0 +1,27 @@
+@title AMQP Client for Erlang
+@author GoPivotal Inc. <support@rabbitmq.com>
+@copyright 2007-2020 VMware, Inc. or its affiliates.
+
+@version %%VERSION%%
+
+@reference <a href="https://www.rabbitmq.com/protocol.html" target="_top">AMQP documentation</a> on the RabbitMQ website.
+
+@doc
+
+== Overview ==
+
+This application provides an Erlang library to interact with an AMQP 0-9-1 compliant message broker. The module documentation assumes that the programmer has some basic familiarity with the execution model defined in the AMQP specification.
+
+The main components are {@link amqp_connection} and {@link amqp_channel}. The {@link amqp_connection} module is used to open and close connections to an AMQP broker as well as creating channels. The {@link amqp_channel} module is used to send and receive commands and messages to and from a broker within the context of a channel.
+
+== AMQP Record Definitions ==
+
+Many of the API functions take structured records as arguments. These records represent the commands defined in the AMQP execution model. The definitions for these records are automatically generated by the rabbitmq-codegen project. rabbitmq-codegen parses a machine readable view of the specification and generates a header file that includes the entire command set of AMQP. Each command in AMQP has an identically named record. The protocol documentation serves as a reference for the attributes of each command.
+
+== Programming Model ==
+
+For more information, refer to the Erlang AMQP client <a href="https://www.rabbitmq.com/erlang-client-user-guide.html">developer's guide</a> on the RabbitMQ website.
+
+== RPC Components ==
+
+The {@link amqp_rpc_server} module provides a generic building block to expose Erlang functions via an RPC over AMQP mechanism. The {@link amqp_rpc_client} provides a simple client utility to submit RPC requests to a server via AMQP.
diff --git a/deps/amqp_client/src/rabbit_routing_util.erl b/deps/amqp_client/src/rabbit_routing_util.erl
new file mode 100644
index 0000000000..9d64a1468e
--- /dev/null
+++ b/deps/amqp_client/src/rabbit_routing_util.erl
@@ -0,0 +1,222 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_routing_util).
+
+-export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]).
+-export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]).
+-export([parse_endpoint/1, parse_endpoint/2]).
+-export([parse_routing/1, dest_temp_queue/1]).
+
+-include("amqp_client.hrl").
+-include("rabbit_routing_prefixes.hrl").
+
+%%----------------------------------------------------------------------------
+
+init_state() -> sets:new().
+
+dest_prefixes() -> [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX,
+ ?AMQQUEUE_PREFIX, ?REPLY_QUEUE_PREFIX].
+
+all_dest_prefixes() -> [?TEMP_QUEUE_PREFIX | dest_prefixes()].
+
+%% --------------------------------------------------------------------------
+
+parse_endpoint(Destination) ->
+ parse_endpoint(Destination, false).
+
+parse_endpoint(undefined, AllowAnonymousQueue) ->
+ parse_endpoint("/queue", AllowAnonymousQueue);
+
+parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) ->
+ parse_endpoint(unicode:characters_to_list(Destination),
+ AllowAnonymousQueue);
+parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) ->
+ case re:split(Destination, "/", [{return, list}]) of
+ [Name] ->
+ {ok, {queue, unescape(Name)}};
+ ["", Type | Rest]
+ when Type =:= "exchange" orelse Type =:= "queue" orelse
+ Type =:= "topic" orelse Type =:= "temp-queue" ->
+ parse_endpoint0(atomise(Type), Rest, AllowAnonymousQueue);
+ ["", "amq", "queue" | Rest] ->
+ parse_endpoint0(amqqueue, Rest, AllowAnonymousQueue);
+ ["", "reply-queue" = Prefix | [_|_]] ->
+ parse_endpoint0(reply_queue,
+ [lists:nthtail(2 + length(Prefix), Destination)],
+ AllowAnonymousQueue);
+ _ ->
+ {error, {unknown_destination, Destination}}
+ end.
+
+parse_endpoint0(exchange, ["" | _] = Rest, _) ->
+ {error, {invalid_destination, exchange, to_url(Rest)}};
+parse_endpoint0(exchange, [Name], _) ->
+ {ok, {exchange, {unescape(Name), undefined}}};
+parse_endpoint0(exchange, [Name, Pattern], _) ->
+ {ok, {exchange, {unescape(Name), unescape(Pattern)}}};
+parse_endpoint0(queue, [], false) ->
+ {error, {invalid_destination, queue, []}};
+parse_endpoint0(queue, [], true) ->
+ {ok, {queue, undefined}};
+parse_endpoint0(Type, [[_|_]] = [Name], _) ->
+ {ok, {Type, unescape(Name)}};
+parse_endpoint0(Type, Rest, _) ->
+ {error, {invalid_destination, Type, to_url(Rest)}}.
+
+%% --------------------------------------------------------------------------
+
+ensure_endpoint(Dir, Channel, Endpoint, State) ->
+ ensure_endpoint(Dir, Channel, Endpoint, [], State).
+
+ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) ->
+ check_exchange(Name, Channel,
+ proplists:get_value(check_exchange, Params, false)),
+ Method = queue_declare_method(#'queue.declare'{}, exchange, Params),
+ #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method),
+ {ok, Queue, State};
+
+ensure_endpoint(source, Channel, {topic, _}, Params, State) ->
+ Method = queue_declare_method(#'queue.declare'{}, topic, Params),
+ #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method),
+ {ok, Queue, State};
+
+ensure_endpoint(_Dir, _Channel, {queue, undefined}, _Params, State) ->
+ {ok, undefined, State};
+
+ensure_endpoint(_, Channel, {queue, Name}, Params, State) ->
+ Params1 = rabbit_misc:pmerge(durable, true, Params),
+ Queue = list_to_binary(Name),
+ State1 = case sets:is_element(Queue, State) of
+ true -> State;
+ _ -> Method = queue_declare_method(
+ #'queue.declare'{queue = Queue,
+ nowait = true},
+ queue, Params1),
+ case Method#'queue.declare'.nowait of
+ true -> amqp_channel:cast(Channel, Method);
+ false -> amqp_channel:call(Channel, Method)
+ end,
+ sets:add_element(Queue, State)
+ end,
+ {ok, Queue, State1};
+
+ensure_endpoint(dest, Channel, {exchange, {Name, _}}, Params, State) ->
+ check_exchange(Name, Channel,
+ proplists:get_value(check_exchange, Params, false)),
+ {ok, undefined, State};
+
+ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) ->
+ {ok, undefined, State};
+
+ensure_endpoint(_, _Ch, {amqqueue, Name}, _Params, State) ->
+ {ok, list_to_binary(Name), State};
+
+ensure_endpoint(_, _Ch, {reply_queue, Name}, _Params, State) ->
+ {ok, list_to_binary(Name), State};
+
+ensure_endpoint(_Direction, _Ch, _Endpoint, _Params, _State) ->
+ {error, invalid_endpoint}.
+
+%% --------------------------------------------------------------------------
+
+ensure_binding(QueueBin, {"", Queue}, _Channel) ->
+ %% i.e., we should only be asked to bind to the default exchange a
+ %% queue with its own name
+ QueueBin = list_to_binary(Queue),
+ ok;
+ensure_binding(Queue, {Exchange, RoutingKey}, Channel) ->
+ #'queue.bind_ok'{} =
+ amqp_channel:call(Channel,
+ #'queue.bind'{
+ queue = Queue,
+ exchange = list_to_binary(Exchange),
+ routing_key = list_to_binary(RoutingKey)}),
+ ok.
+
+%% --------------------------------------------------------------------------
+
+parse_routing({exchange, {Name, undefined}}) ->
+ {Name, ""};
+parse_routing({exchange, {Name, Pattern}}) ->
+ {Name, Pattern};
+parse_routing({topic, Name}) ->
+ {"amq.topic", Name};
+parse_routing({Type, Name})
+ when Type =:= queue orelse Type =:= reply_queue orelse Type =:= amqqueue ->
+ {"", Name}.
+
+dest_temp_queue({temp_queue, Name}) -> Name;
+dest_temp_queue(_) -> none.
+
+%% --------------------------------------------------------------------------
+
+check_exchange(_, _, false) ->
+ ok;
+check_exchange(ExchangeName, Channel, true) ->
+ XDecl = #'exchange.declare'{ exchange = list_to_binary(ExchangeName),
+ passive = true },
+ #'exchange.declare_ok'{} = amqp_channel:call(Channel, XDecl),
+ ok.
+
+update_queue_declare_arguments(Method, Params) ->
+ Method#'queue.declare'{arguments =
+ proplists:get_value(arguments, Params, [])}.
+
+update_queue_declare_exclusive(Method, Params) ->
+ case proplists:get_value(exclusive, Params) of
+ undefined -> Method;
+ Val -> Method#'queue.declare'{exclusive = Val}
+ end.
+
+update_queue_declare_auto_delete(Method, Params) ->
+ case proplists:get_value(auto_delete, Params) of
+ undefined -> Method;
+ Val -> Method#'queue.declare'{auto_delete = Val}
+ end.
+
+update_queue_declare_nowait(Method, Params) ->
+ case proplists:get_value(nowait, Params) of
+ undefined -> Method;
+ Val -> Method#'queue.declare'{nowait = Val}
+ end.
+
+queue_declare_method(#'queue.declare'{} = Method, Type, Params) ->
+ %% defaults
+ Method1 = case proplists:get_value(durable, Params, false) of
+ true -> Method#'queue.declare'{durable = true};
+ false -> Method#'queue.declare'{auto_delete = true,
+ exclusive = true}
+ end,
+ %% set the rest of queue.declare fields from Params
+ Method2 = lists:foldl(fun (F, Acc) -> F(Acc, Params) end,
+ Method1, [fun update_queue_declare_arguments/2,
+ fun update_queue_declare_exclusive/2,
+ fun update_queue_declare_auto_delete/2,
+ fun update_queue_declare_nowait/2]),
+ case {Type, proplists:get_value(subscription_queue_name_gen, Params)} of
+ {topic, SQNG} when is_function(SQNG) ->
+ Method2#'queue.declare'{queue = SQNG()};
+ {exchange, SQNG} when is_function(SQNG) ->
+ Method2#'queue.declare'{queue = SQNG()};
+ _ ->
+ Method2
+ end.
+
+%% --------------------------------------------------------------------------
+
+to_url([]) -> [];
+to_url(Lol) -> "/" ++ string:join(Lol, "/").
+
+atomise(Name) when is_list(Name) ->
+ list_to_atom(re:replace(Name, "-", "_", [{return,list}, global])).
+
+unescape(Str) -> unescape(Str, []).
+
+unescape("%2F" ++ Str, Acc) -> unescape(Str, [$/ | Acc]);
+unescape([C | Str], Acc) -> unescape(Str, [C | Acc]);
+unescape([], Acc) -> lists:reverse(Acc).
diff --git a/deps/amqp_client/src/uri_parser.erl b/deps/amqp_client/src/uri_parser.erl
new file mode 100644
index 0000000000..a2bb98dad7
--- /dev/null
+++ b/deps/amqp_client/src/uri_parser.erl
@@ -0,0 +1,125 @@
+%% This file is a copy of http_uri.erl from the R13B-1 Erlang/OTP
+%% distribution with several modifications.
+
+%% All modifications are Copyright (c) 2009-2020 VMware, Inc. or its affiliates.
+
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at https://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+
+%% See https://tools.ietf.org/html/rfc3986
+
+-module(uri_parser).
+
+-export([parse/2]).
+
+%%%=========================================================================
+%%% API
+%%%=========================================================================
+
+%% Returns a key list of elements extracted from the URI. Note that
+%% only 'scheme' is guaranteed to exist. Key-Value pairs from the
+%% Defaults list will be used absence of a non-empty value extracted
+%% from the URI. The values extracted are strings, except for 'port'
+%% which is an integer, 'userinfo' which is a list of strings (split
+%% on $:), and 'query' which is a list of strings where no $= char
+%% found, or a {key,value} pair where a $= char is found (initial
+%% split on $& and subsequent optional split on $=). Possible keys
+%% are: 'scheme', 'userinfo', 'host', 'port', 'path', 'query',
+%% 'fragment'.
+
+-spec parse(AbsURI, Defaults :: list())
+ -> [{atom(), string()}] | {error, no_scheme | {malformed_uri, AbsURI, any()}}
+ when AbsURI :: string() | binary().
+parse(AbsURI, Defaults) ->
+ AbsUriString = rabbit_data_coercion:to_list(AbsURI),
+ case parse_scheme(AbsUriString) of
+ {error, Reason} ->
+ {error, Reason};
+ {Scheme, Rest} ->
+ case (catch parse_uri_rest(Rest, true)) of
+ [_|_] = List ->
+ merge_keylists([{scheme, Scheme} | List], Defaults);
+ E ->
+ {error, {malformed_uri, AbsURI, E}}
+ end
+ end.
+
+%%%========================================================================
+%%% Internal functions
+%%%========================================================================
+parse_scheme(AbsURI) ->
+ split_uri(AbsURI, ":", {error, no_scheme}).
+
+parse_uri_rest("//" ++ URIPart, true) ->
+ %% we have an authority
+ {Authority, PathQueryFrag} =
+ split_uri(URIPart, "/|\\?|#", {URIPart, ""}, 1, 0),
+ AuthorityParts = parse_authority(Authority),
+ parse_uri_rest(PathQueryFrag, false) ++ AuthorityParts;
+parse_uri_rest(PathQueryFrag, _Bool) ->
+ %% no authority, just a path and maybe query
+ {PathQuery, Frag} = split_uri(PathQueryFrag, "#", {PathQueryFrag, ""}),
+ {Path, QueryString} = split_uri(PathQuery, "\\?", {PathQuery, ""}),
+ QueryPropList = split_query(QueryString),
+ [{path, Path}, {'query', QueryPropList}, {fragment, Frag}].
+
+parse_authority(Authority) ->
+ {UserInfo, HostPort} = split_uri(Authority, "@", {"", Authority}),
+ UserInfoSplit = case re:split(UserInfo, ":", [{return, list}]) of
+ [""] -> [];
+ UIS -> UIS
+ end,
+ [{userinfo, UserInfoSplit} | parse_host_port(HostPort)].
+
+parse_host_port("[" ++ HostPort) -> %ipv6
+ {Host, ColonPort} = split_uri(HostPort, "\\]", {HostPort, ""}),
+ [{host, Host} | case split_uri(ColonPort, ":", not_found, 0, 1) of
+ not_found -> case ColonPort of
+ [] -> [];
+ _ -> throw({invalid_port, ColonPort})
+ end;
+ {_, Port} -> [{port, list_to_integer(Port)}]
+ end];
+
+parse_host_port(HostPort) ->
+ {Host, Port} = split_uri(HostPort, ":", {HostPort, not_found}),
+ [{host, Host} | case Port of
+ not_found -> [];
+ _ -> [{port, list_to_integer(Port)}]
+ end].
+
+split_query(Query) ->
+ case re:split(Query, "&", [{return, list}]) of
+ [""] -> [];
+ QParams -> [split_uri(Param, "=", Param) || Param <- QParams]
+ end.
+
+split_uri(UriPart, SplitChar, NoMatchResult) ->
+ split_uri(UriPart, SplitChar, NoMatchResult, 1, 1).
+
+split_uri(UriPart, SplitChar, NoMatchResult, SkipLeft, SkipRight) ->
+ case re:run(UriPart, SplitChar) of
+ {match, [{Match, _}]} ->
+ {string:substr(UriPart, 1, Match + 1 - SkipLeft),
+ string:substr(UriPart, Match + 1 + SkipRight, length(UriPart))};
+ nomatch ->
+ NoMatchResult
+ end.
+
+merge_keylists(A, B) ->
+ {AEmpty, ANonEmpty} = lists:partition(fun ({_Key, V}) -> V =:= [] end, A),
+ [AEmptyS, ANonEmptyS, BS] =
+ [lists:ukeysort(1, X) || X <- [AEmpty, ANonEmpty, B]],
+ lists:ukeymerge(1, lists:ukeymerge(1, ANonEmptyS, BS), AEmptyS).