diff options
-rw-r--r-- | src/rabbit_channel_sup.erl | 17 | ||||
-rw-r--r-- | src/rabbit_channel_sup_sup.erl | 20 | ||||
-rw-r--r-- | src/rabbit_connection_sup.erl | 19 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 12 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 4 |
5 files changed, 47 insertions, 25 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 23058bfe..02199a65 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/8]). +-export([start_link/1]). -export([init/1]). @@ -43,18 +43,21 @@ -ifdef(use_specs). --spec(start_link/8 :: - (rabbit_types:protocol(), rabbit_net:socket(), +-export_type([start_link_args/0]). + +-type(start_link_args() :: + {rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> - {'ok', pid(), pid()}). + rabbit_access_control:username(), rabbit_types:vhost(), pid()}). + +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). -endif. %%---------------------------------------------------------------------------- -start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, - Collector) -> +start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, + Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = supervisor2:start_child( diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 2fab8678..98ad5fc0 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -37,13 +37,27 @@ -export([init/1]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> + {'ok', pid(), pid()} | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + start_link() -> supervisor2:start_link(?MODULE, []). +start_channel(Pid, Args) -> + supervisor2:start_child(Pid, [Args]). + +%%---------------------------------------------------------------------------- + init([]) -> {ok, {{simple_one_for_one_terminate, 0, 1}, [{channel_sup, {rabbit_channel_sup, start_link, []}, temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. - -start_channel(Pid, Args) -> - supervisor2:start_child(Pid, Args). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 354540c1..2606210b 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -39,6 +39,17 @@ -include("rabbit.hrl"). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok(pid())). +-spec(reader/1 :: (pid()) -> pid()). + +-endif. + +%%-------------------------------------------------------------------------- + start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelSupSupPid} = @@ -59,12 +70,14 @@ start_link() -> intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid}. -init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. - reader(Pid) -> hd(supervisor2:find_child(Pid, reader)). +%%-------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + start_heartbeat_fun(SupPid) -> fun (_Sock, 0) -> none; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 5b7dd707..da7078f1 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -79,8 +79,7 @@ start_link(ChPid, UnackedMsgCount) -> limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, {limit, PrefetchCount})). + gen_server2:call(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -118,8 +117,7 @@ block(LimiterPid) -> unblock(undefined) -> ok; unblock(LimiterPid) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, unblock, infinity)). + gen_server2:call(LimiterPid, unblock, infinity). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -237,9 +235,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. - -unlink_on_stopped(LimiterPid, stopped) -> - ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), - stopped; -unlink_on_stopped(_LimiterPid, Result) -> - Result. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1d2dd166..685dd83e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -895,8 +895,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, {ok, ChSupPid, ChFrPid} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, [Protocol, Sock, Channel, FrameMax, - self(), Username, VHost, Collector]), + ChanSupSup, {Protocol, Sock, Channel, FrameMax, + self(), Username, VHost, Collector}), link(ChSupPid), put({channel, Channel}, {ch_fr_pid, ChFrPid}), put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), |