diff options
author | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2011-01-19 21:26:34 +0000 |
---|---|---|
committer | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2011-01-19 21:26:34 +0000 |
commit | f15f44d12f8f0c0f9bd9bfa899bd29aca2ed4692 (patch) | |
tree | 2507e1bf6f11d668a676dd6dbf14ddd2ba3fff7e | |
parent | 1161a528c4917db800492aa9cf241701aa0134b2 (diff) | |
download | rabbitmq-server-f15f44d12f8f0c0f9bd9bfa899bd29aca2ed4692.tar.gz |
adding supervision support for direct connection channels
-rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 21 | ||||
-rw-r--r-- | src/rabbit_channel_sup_sup.erl | 11 | ||||
-rw-r--r-- | src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl) | 7 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 52 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 4 |
8 files changed, 92 insertions, 20 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 5ed872b6..bb0fbcd3 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -8,7 +8,8 @@ rabbit_node_monitor, rabbit_router, rabbit_sup, - rabbit_tcp_client_sup]}, + rabbit_tcp_client_sup, + rabbit_direct_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it diff --git a/src/rabbit.erl b/src/rabbit.erl index 954e289b..60604eed 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -160,11 +160,19 @@ {requires, routing_ready}, {enables, networking}]}). +-rabbit_boot_step({direct_client, + [{mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}, + {enables, direct_listening}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}, {enables, networking_listening}]}). +-rabbit_boot_step({direct_listening, + [{description, "direct connection listeners available"}]}). + -rabbit_boot_step({networking_listening, [{description, "network listeners available"}]}). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9f50176d..f67a44df 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/1]). +-export([start_link/2]). -export([init/1]). @@ -50,14 +50,15 @@ rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_types:user(), rabbit_types:vhost(), pid()}). --spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). +-spec(start_link/2 :: (atom(), start_link_args()) -> + {'ok', pid(), {pid(), any()}}). -endif. %%---------------------------------------------------------------------------- -start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, - Collector}) -> +start_link(tcp, {Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, + Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = supervisor2:start_child( @@ -73,7 +74,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), - {ok, SupPid, {ChannelPid, AState}}. + {ok, SupPid, {ChannelPid, AState}}; +start_link(direct, {Channel, ClientChannelPid, User, VHost, Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ClientChannelPid, ClientChannelPid, + User, VHost, Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index fd99af56..e8c25796 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/0, start_channel/2]). +-export([start_link/0, start_channel/3]). -export([init/1]). @@ -42,8 +42,9 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), {pid(), any()}}). +-spec(start_channel/3 :: (pid(), atom(), + rabbit_channel_sup:start_link_args()) -> + {'ok', pid(), {pid(), any()}}). -endif. @@ -52,8 +53,8 @@ start_link() -> supervisor2:start_link(?MODULE, []). -start_channel(Pid, Args) -> - supervisor2:start_child(Pid, [Args]). +start_channel(Pid, Type, Args) -> + supervisor2:start_child(Pid, [Type, Args]). %%---------------------------------------------------------------------------- diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl index 02d7e0e4..d37ab942 100644 --- a/src/tcp_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -29,7 +29,7 @@ %% Contributor(s): ______________________________________. %% --module(tcp_client_sup). +-module(rabbit_client_sup). -behaviour(supervisor2). @@ -44,6 +44,5 @@ start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, - [{tcp_client, {M,F,A}, - temporary, infinity, supervisor, [M]}]}}. + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl new file mode 100644 index 00000000..b1c5f415 --- /dev/null +++ b/src/rabbit_direct.erl @@ -0,0 +1,52 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_direct). + +-export([boot/0, start_channel/1]). + +%%---------------------------------------------------------------------------- + +boot() -> + {ok, _} = + supervisor2:start_child( + rabbit_sup, + {rabbit_direct_client_sup, + {rabbit_client_sup, start_link, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, [direct]}]}, + transient, infinity, supervisor, [rabbit_client_sup]}), + ok. + +start_channel(Args) -> + {ok, _, {ChannelPid, _}} = + supervisor2:start_child(rabbit_direct_client_sup, [Args]), + {ok, ChannelPid}. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index d5a9d73c..b80596c0 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -123,13 +123,13 @@ boot_ssl() -> end. start() -> - {ok,_} = supervisor:start_child( + {ok,_} = supervisor2:start_child( rabbit_sup, {rabbit_tcp_client_sup, - {tcp_client_sup, start_link, + {rabbit_client_sup, start_link, [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]}, - transient, infinity, supervisor, [tcp_client_sup]}), + transient, infinity, supervisor, [rabbit_client_sup]}), ok. getaddr(Host) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 08bc18ba..8528ac40 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -955,8 +955,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {Protocol, Sock, Channel, FrameMax, - self(), User, VHost, Collector}), + ChanSupSup, tcp, {Protocol, Sock, Channel, FrameMax, + self(), User, VHost, Collector}), erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), |