diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-10 14:02:41 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-10 14:02:41 +0100 |
commit | 601f79c7e42ee9755999a4bc4f54b6ec59dc4598 (patch) | |
tree | e43bd9a0bbbd9237943a98dc1404be825fa73ae8 | |
parent | 35253052355e30a99ff332226b9619d49fa6c820 (diff) | |
download | rabbitmq-server-601f79c7e42ee9755999a4bc4f54b6ec59dc4598.tar.gz |
Abstract out the limiter creation. The abstraction made ensures the channel never directly calls supervisor(2)?:.*, nor does it have any knowledge of the channel_sup.
-rw-r--r-- | src/rabbit_channel.erl | 33 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 12 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 |
3 files changed, 30 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c45b2cc7..050a5425 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/6, do/2, do/3, shutdown/1]). +-export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1, flush/1]). @@ -43,8 +43,8 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). --record(ch, {state, channel, parent_pid, reader_pid, writer_pid, limiter_pid, - transaction_id, tx_participants, next_tag, +-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, + start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer}). @@ -76,9 +76,10 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/6 :: +-spec(start_link/7 :: (channel_number(), pid(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> + rabbit_types:vhost(), pid(), + fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -101,9 +102,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - gen_server2:start_link(?MODULE, [Channel, self(), ReaderPid, WriterPid, - Username, VHost, CollectorPid], []). +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun) -> + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username, + VHost, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -151,15 +153,15 @@ flush(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, - CollectorPid]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun]) -> ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, channel = Channel, - parent_pid = ParentPid, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, + start_limiter_fun = StartLimiterFun, transaction_id = none, tx_participants = sets:new(), next_tag = 1, @@ -1014,13 +1016,8 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> - Me = self(), - {ok, LPid} = - supervisor2:start_child( - ParentPid, - {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, - transient, ?MAX_WAIT, worker, [rabbit_limiter]}), +start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> + {ok, LPid} = SLF(queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9011db73..23058bfe 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -67,7 +67,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, SupPid, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, Username, VHost, - Collector]}, + Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, FramingChannelPid} = supervisor2:start_child( @@ -81,3 +81,13 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, init([]) -> {ok, {{one_for_all, 0, 1}, []}}. + +start_limiter_fun(SupPid) -> + fun (UnackedCount) -> + Me = self(), + {ok, _Pid} = + supervisor2:start_child( + SupPid, + {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 55897679..f861cedd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1020,7 +1020,8 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - <<"user">>, <<"/">>, self()), + <<"user">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1117,7 +1118,8 @@ test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, - <<"guest">>, <<"/">>, self()), + <<"guest">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) |