summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_channel_sup.erl
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
commitf23a51261d9502ec39df0f8db47ba6b22aa7659f (patch)
tree53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbit/src/rabbit_channel_sup.erl
parentafa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff)
parent9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff)
downloadrabbitmq-server-git-stream-timestamp-offset.tar.gz
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbit/src/rabbit_channel_sup.erl')
-rw-r--r--deps/rabbit/src/rabbit_channel_sup.erl92
1 files changed, 92 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_channel_sup.erl b/deps/rabbit/src/rabbit_channel_sup.erl
new file mode 100644
index 0000000000..0d405ad3a7
--- /dev/null
+++ b/deps/rabbit/src/rabbit_channel_sup.erl
@@ -0,0 +1,92 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_channel_sup).
+
+%% Supervises processes that implement AMQP 0-9-1 channels:
+%%
+%% * Channel process itself
+%% * Network writer (for network connections)
+%% * Limiter (handles channel QoS and flow control)
+%%
+%% Every rabbit_channel_sup is supervised by rabbit_channel_sup_sup.
+%%
+%% See also rabbit_channel, rabbit_writer, rabbit_limiter.
+
+-behaviour(supervisor2).
+
+-export([start_link/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-export_type([start_link_args/0]).
+
+-type start_link_args() ::
+ {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), pid(), string(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid()} |
+ {'direct', rabbit_channel:channel_number(), pid(), string(),
+ rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
+ rabbit_framing:amqp_table(), pid()}.
+
+-define(FAIR_WAIT, 70000).
+
+%%----------------------------------------------------------------------------
+
+-spec start_link(start_link_args()) -> {'ok', pid(), {pid(), any()}}.
+
+start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
+ VHost, Capabilities, Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {tcp, Sock, Channel, FrameMax,
+ ReaderPid, Protocol, {ConnName, Channel}}),
+ [LimiterPid] = supervisor2:find_child(SupPid, limiter),
+ [WriterPid] = supervisor2:find_child(SupPid, writer),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
+ Protocol, User, VHost, Capabilities, Collector,
+ LimiterPid]},
+ intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}),
+ {ok, AState} = rabbit_command_assembler:init(Protocol),
+ {ok, SupPid, {ChannelPid, AState}};
+start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
+ User, VHost, Capabilities, Collector, AmqpParams}) ->
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {direct, {ConnName, Channel}}),
+ [LimiterPid] = supervisor2:find_child(SupPid, limiter),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid, ConnPid,
+ ConnName, Protocol, User, VHost, Capabilities, Collector,
+ LimiterPid, AmqpParams]},
+ intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}),
+ {ok, SupPid, {ChannelPid, none}}.
+
+%%----------------------------------------------------------------------------
+
+init(Type) ->
+ ?LG_PROCESS_TYPE(channel_sup),
+ {ok, {{one_for_all, 0, 1}, child_specs(Type)}}.
+
+child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) ->
+ [{writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]},
+ intrinsic, ?FAIR_WAIT, worker, [rabbit_writer]}
+ | child_specs({direct, Identity})];
+child_specs({direct, Identity}) ->
+ [{limiter, {rabbit_limiter, start_link, [Identity]},
+ transient, ?FAIR_WAIT, worker, [rabbit_limiter]}].