summaryrefslogtreecommitdiff
path: root/deps/amqp_client/src/amqp_channel_sup.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/amqp_client/src/amqp_channel_sup.erl')
-rw-r--r--deps/amqp_client/src/amqp_channel_sup.erl73
1 files changed, 73 insertions, 0 deletions
diff --git a/deps/amqp_client/src/amqp_channel_sup.erl b/deps/amqp_client/src/amqp_channel_sup.erl
new file mode 100644
index 0000000000..9bd85ce946
--- /dev/null
+++ b/deps/amqp_client/src/amqp_channel_sup.erl
@@ -0,0 +1,73 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_channel_sup).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/6]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Type, Connection, ConnName, InfraArgs, ChNumber,
+ Consumer = {_, _}) ->
+ Identity = {ConnName, ChNumber},
+ {ok, Sup} = supervisor2:start_link(?MODULE, [Consumer, Identity]),
+ [{gen_consumer, ConsumerPid, _, _}] = supervisor2:which_children(Sup),
+ {ok, ChPid} = supervisor2:start_child(
+ Sup, {channel,
+ {amqp_channel, start_link,
+ [Type, Connection, ChNumber, ConsumerPid, Identity]},
+ intrinsic, ?WORKER_WAIT, worker, [amqp_channel]}),
+ case start_writer(Sup, Type, InfraArgs, ConnName, ChNumber, ChPid) of
+ {ok, Writer} ->
+ amqp_channel:set_writer(ChPid, Writer),
+ {ok, AState} = init_command_assembler(Type),
+ {ok, Sup, {ChPid, AState}};
+ {error, _}=Error ->
+ Error
+ end.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+%% 1GB
+-define(DEFAULT_GC_THRESHOLD, 1000000000).
+
+start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
+ ConnName, ChNumber, ChPid) ->
+ rpc:call(Node, rabbit_direct, start_channel,
+ [ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
+ VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams]);
+start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
+ GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD),
+ supervisor2:start_child(
+ Sup,
+ {writer, {rabbit_writer, start_link,
+ [Sock, ChNumber, FrameMax, ?PROTOCOL, ChPid,
+ {ConnName, ChNumber}, false, GCThreshold]},
+ transient, ?WORKER_WAIT, worker, [rabbit_writer]}).
+
+init_command_assembler(direct) -> {ok, none};
+init_command_assembler(network) -> rabbit_command_assembler:init(?PROTOCOL).
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([{ConsumerModule, ConsumerArgs}, Identity]) ->
+ {ok, {{one_for_all, 0, 1},
+ [{gen_consumer, {amqp_gen_consumer, start_link,
+ [ConsumerModule, ConsumerArgs, Identity]},
+ intrinsic, ?WORKER_WAIT, worker, [amqp_gen_consumer]}]}}.