summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-10 14:02:41 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-10 14:02:41 +0100
commit601f79c7e42ee9755999a4bc4f54b6ec59dc4598 (patch)
treee43bd9a0bbbd9237943a98dc1404be825fa73ae8
parent35253052355e30a99ff332226b9619d49fa6c820 (diff)
downloadrabbitmq-server-601f79c7e42ee9755999a4bc4f54b6ec59dc4598.tar.gz
Abstract out the limiter creation. The abstraction made ensures the channel never directly calls supervisor(2)?:.*, nor does it have any knowledge of the channel_sup.
-rw-r--r--src/rabbit_channel.erl33
-rw-r--r--src/rabbit_channel_sup.erl12
-rw-r--r--src/rabbit_tests.erl6
3 files changed, 30 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c45b2cc7..050a5425 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/6, do/2, do/3, shutdown/1]).
+-export([start_link/7, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1, flush/1]).
@@ -43,8 +43,8 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
--record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid,
- transaction_id, tx_participants, next_tag,
+-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
+ start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer}).
@@ -76,9 +76,10 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/6 ::
+-spec(start_link/7 ::
(channel_number(), pid(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) ->
+ rabbit_types:vhost(), pid(),
+ fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
@@ -101,9 +102,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid,
- Username, VHost, CollectorPid], []).
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+ StartLimiterFun) ->
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username,
+ VHost, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -151,15 +153,15 @@ flush(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost,
- CollectorPid]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+ StartLimiterFun]) ->
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
channel = Channel,
- parent_pid = ParentPid,
reader_pid = ReaderPid,
writer_pid = WriterPid,
limiter_pid = undefined,
+ start_limiter_fun = StartLimiterFun,
transaction_id = none,
tx_participants = sets:new(),
next_tag = 1,
@@ -1014,13 +1016,8 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) ->
- Me = self(),
- {ok, LPid} =
- supervisor2:start_child(
- ParentPid,
- {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]},
- transient, ?MAX_WAIT, worker, [rabbit_limiter]}),
+start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) ->
+ {ok, LPid} = SLF(queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid.
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 9011db73..23058bfe 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -67,7 +67,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
SupPid,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, Username, VHost,
- Collector]},
+ Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, FramingChannelPid} =
supervisor2:start_child(
@@ -81,3 +81,13 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
+
+start_limiter_fun(SupPid) ->
+ fun (UnackedCount) ->
+ Me = self(),
+ {ok, _Pid} =
+ supervisor2:start_child(
+ SupPid,
+ {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]})
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 55897679..f861cedd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1020,7 +1020,8 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self()),
+ <<"user">>, <<"/">>, self(),
+ fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1117,7 +1118,8 @@ test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
{ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- <<"guest">>, <<"/">>, self()),
+ <<"guest">>, <<"/">>, self(),
+ fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)