summaryrefslogtreecommitdiff
path: root/deps/amqp_client/src/amqp_connection_type_sup.erl
blob: f67dc56836878a98feaad0cc776a31905de9f029 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%

%% @private
-module(amqp_connection_type_sup).

-include("amqp_client_internal.hrl").

-behaviour(supervisor2).

-export([start_link/0, start_infrastructure_fun/3, type_module/1]).

-export([init/1]).

%%---------------------------------------------------------------------------
%% Interface
%%---------------------------------------------------------------------------

start_link() ->
    supervisor2:start_link(?MODULE, []).

type_module(#amqp_params_direct{})  -> {direct, amqp_direct_connection};
type_module(#amqp_params_network{}) -> {network, amqp_network_connection}.

%%---------------------------------------------------------------------------

start_channels_manager(Sup, Conn, ConnName, Type) ->
    {ok, ChSupSup} = supervisor2:start_child(
                       Sup,
                       {channel_sup_sup, {amqp_channel_sup_sup, start_link,
                                          [Type, Conn, ConnName]},
                        intrinsic, ?SUPERVISOR_WAIT, supervisor,
                        [amqp_channel_sup_sup]}),
    {ok, _} = supervisor2:start_child(
                Sup,
                {channels_manager, {amqp_channels_manager, start_link,
                                    [Conn, ConnName, ChSupSup]},
                 transient, ?WORKER_WAIT, worker, [amqp_channels_manager]}).

start_infrastructure_fun(Sup, Conn, network) ->
    fun (Sock, ConnName) ->
            {ok, ChMgr} = start_channels_manager(Sup, Conn, ConnName, network),
            {ok, AState} = rabbit_command_assembler:init(?PROTOCOL),
            {ok, GCThreshold} = application:get_env(amqp_client, writer_gc_threshold),
            {ok, Writer} =
                supervisor2:start_child(
                  Sup,
                  {writer,
                   {rabbit_writer, start_link,
                    [Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Conn, ConnName,
                     false, GCThreshold]},
                   transient, ?WORKER_WAIT, worker, [rabbit_writer]}),
            {ok, Reader} =
                supervisor2:start_child(
                  Sup,
                  {main_reader, {amqp_main_reader, start_link,
                                 [Sock, Conn, ChMgr, AState, ConnName]},
                   transient, ?WORKER_WAIT, worker, [amqp_main_reader]}),
            case rabbit_net:controlling_process(Sock, Reader) of
              ok ->
                case amqp_main_reader:post_init(Reader) of
                  ok ->
                    {ok, ChMgr, Writer};
                  {error, Reason} ->
                    {error, Reason}
                end;
              {error, Reason} ->
                {error, Reason}
            end
    end;
start_infrastructure_fun(Sup, Conn, direct) ->
    fun (ConnName) ->
            {ok, ChMgr} = start_channels_manager(Sup, Conn, ConnName, direct),
            {ok, Collector} =
                supervisor2:start_child(
                  Sup,
                  {collector, {rabbit_queue_collector, start_link, [ConnName]},
                   transient, ?WORKER_WAIT, worker, [rabbit_queue_collector]}),
            {ok, ChMgr, Collector}
    end.

%%---------------------------------------------------------------------------
%% supervisor2 callbacks
%%---------------------------------------------------------------------------

init([]) ->
    {ok, {{one_for_all, 0, 1}, []}}.