diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-25 15:01:16 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-25 15:01:16 +0100 |
commit | 5ab8473d3918b5cdb720889f8a4da08afc1d86e3 (patch) | |
tree | f8a521d2bb4aeec17688b49a211e2d4a7aa555b4 /src | |
parent | f37aed126d810e623b41f6a0eade002eec4c3b16 (diff) | |
download | rabbitmq-server-5ab8473d3918b5cdb720889f8a4da08afc1d86e3.tar.gz |
start limiter before channel
Also use an opaque type, a token, to invoke limiter functions.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 122 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 23 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 140 |
5 files changed, 190 insertions, 149 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e9d01d12..01721582 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -119,11 +119,13 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:limiter_token()) -> + ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: - (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined', + (rabbit_types:amqqueue(), boolean(), pid(), + rabbit_limiter:limiter_token(), rabbit_types:ctag(), boolean(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: @@ -439,10 +441,10 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). -limit_all(QPids, ChPid, LimiterPid) -> +limit_all(QPids, ChPid, LimiterToken) -> delegate:invoke_no_result( QPids, fun (QPid) -> - gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) + gen_server2:cast(QPid, {limit, ChPid, LimiterToken}) end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fcd6cc24..e9949913 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -58,7 +58,7 @@ %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, - limiter_pid, + limiter_token, monitor_ref, acktags, is_limit_active, @@ -346,10 +346,10 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, true end. -erase_ch_record(#cr{ch_pid = ChPid, - limiter_pid = LimiterPid, - monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(LimiterPid, self()), +erase_ch_record(#cr{ch_pid = ChPid, + limiter_token = LimiterToken, + monitor_ref = MonitorRef}) -> + ok = rabbit_limiter:unregister(LimiterToken, self()), erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. @@ -374,12 +374,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, ActiveConsumersTail} -> - C = #cr{limiter_pid = LimiterPid, + C = #cr{limiter_token = LimiterToken, unsent_message_count = Count, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of + rabbit_limiter:can_send( LimiterToken, self(), AckRequired )) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), @@ -904,7 +904,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, Remaining, Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, LimiterPid, +handle_call({basic_consume, NoAck, ChPid, LimiterToken, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{exclusive_consumer = ExistingHolder}) -> case check_exclusive_access(ExistingHolder, ExclusiveConsume, @@ -915,10 +915,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + true = maybe_store_ch_record( + C#cr{consumer_count = ConsumerCount +1, + limiter_token = LimiterToken}), ok = case ConsumerCount of - 0 -> rabbit_limiter:register(LimiterPid, self()); + 0 -> rabbit_limiter:register(LimiterToken, self()); _ -> ok end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -951,12 +952,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); C = #cr{consumer_count = ConsumerCount, - limiter_pid = LimiterPid} -> + limiter_token = LimiterToken} -> C1 = C#cr{consumer_count = ConsumerCount -1}, maybe_store_ch_record( case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()), - C1#cr{limiter_pid = undefined}; + 1 -> ok = rabbit_limiter:unregister(LimiterToken, self()), + C1#cr{limiter_token = undefined}; _ -> C1 end), emit_consumer_deleted(ChPid, ConsumerTag), @@ -1065,20 +1066,21 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({limit, ChPid, LimiterPid}, State) -> +handle_cast({limit, ChPid, LimiterToken}, State) -> noreply( possibly_unblock( State, ChPid, fun (C = #cr{consumer_count = ConsumerCount, - limiter_pid = OldLimiterPid, + limiter_token = OldLimiterToken, is_limit_active = Limited}) -> - if ConsumerCount =/= 0 andalso OldLimiterPid == undefined -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> + if ConsumerCount =/= 0 -> + ok = rabbit_limiter:register(LimiterToken, self()); + true -> ok end, - NewLimited = Limited andalso LimiterPid =/= undefined, - C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + NewLimited = Limited, + C#cr{limiter_token = LimiterToken, + is_limit_active = NewLimited} end)); handle_cast({flush, ChPid}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f398fcc5..5c3e5cc9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -30,7 +30,7 @@ prioritise_cast/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_pid, start_limiter_fun, tx_status, next_tag, + limiter_token, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, @@ -71,7 +71,7 @@ -spec(start_link/10 :: (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> + pid(), rabbit_limiter:limiter_token()) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -99,10 +99,10 @@ %%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, StartLimiterFun) -> + Capabilities, CollectorPid, LimiterToken) -> gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, - VHost, Capabilities, CollectorPid, StartLimiterFun], []). + VHost, Capabilities, CollectorPid, LimiterToken], []). do(Pid, Method) -> do(Pid, Method, none). @@ -162,7 +162,7 @@ ready_for_close(Pid) -> %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, StartLimiterFun]) -> + Capabilities, CollectorPid, LimiterToken]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), @@ -172,8 +172,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, reader_pid = ReaderPid, writer_pid = WriterPid, conn_pid = ConnPid, - limiter_pid = undefined, - start_limiter_fun = StartLimiterFun, + limiter_token = LimiterToken, tx_status = none, next_tag = 1, unacked_message_q = queue:new(), @@ -705,7 +704,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid, - limiter_pid = LimiterPid, + limiter_token = LimiterToken, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -724,7 +723,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), LimiterPid, + Q, NoAck, self(), LimiterToken, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), @@ -798,22 +797,24 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{limiter_pid = LimiterPid}) -> - LimiterPid1 = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> undefined; - {undefined, _} -> start_limiter(State); - {_, _} -> LimiterPid - end, - LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of - ok -> LimiterPid1; - stopped -> unlimit_queues(State) - end, - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, + State = #ch{limiter_token = LimiterToken}) -> + LimiterToken1 = + case {rabbit_limiter:is_enabled(LimiterToken), PrefetchCount} of + {false, 0} -> LimiterToken; + {false, _} -> enable_limiter(State); + {_, _} -> LimiterToken + end, + LimiterToken3 = case rabbit_limiter:limit(LimiterToken1, PrefetchCount) of + ok -> LimiterToken1; + {disabled, LimiterToken2} -> unlimit_queues(State), + LimiterToken2 + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter_token = LimiterToken3}}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, - limiter_pid = LimiterPid}) -> + limiter_token = LimiterToken}) -> OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -827,7 +828,7 @@ handle_method(#'basic.recover_async'{requeue = true}, QPid, lists:reverse(MsgIds), self()) end) end, ok, UAMQ), - ok = notify_limiter(LimiterPid, UAMQ), + ok = notify_limiter(LimiterToken, UAMQ), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1074,23 +1075,20 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> NoWait, #'confirm.select_ok'{}); handle_method(#'channel.flow'{active = true}, _, - State = #ch{limiter_pid = LimiterPid}) -> - LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of - ok -> LimiterPid; - stopped -> unlimit_queues(State) - end, + State = #ch{limiter_token = LimiterToken}) -> + LimiterToken2 = case rabbit_limiter:unblock(LimiterToken) of + ok -> LimiterToken; + {disabled, LimiterToken1} -> unlimit_queues(State), + LimiterToken1 + end, {reply, #'channel.flow_ok'{active = true}, - State#ch{limiter_pid = LimiterPid1}}; + State#ch{limiter_token = LimiterToken2}}; handle_method(#'channel.flow'{active = false}, _, - State = #ch{limiter_pid = LimiterPid, - consumer_mapping = Consumers}) -> - LimiterPid1 = case LimiterPid of - undefined -> start_limiter(State); - Other -> Other - end, - State1 = State#ch{limiter_pid = LimiterPid1}, - ok = rabbit_limiter:block(LimiterPid1), + State = #ch{consumer_mapping = Consumers}) -> + LimiterToken1 = enable_limiter(State), + State1 = State#ch{limiter_token = LimiterToken1}, + ok = rabbit_limiter:block(LimiterToken1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || @@ -1220,7 +1218,7 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter_pid, Acked), + ok = notify_limiter(State#ch.limiter_token, Acked), {noreply, State#ch{unacked_message_q = Remaining}}. ack_record(DeliveryTag, ConsumerTag, @@ -1257,7 +1255,7 @@ ack(Acked, State) -> [{QPid, length(MsgIds)} | L] end, [], Acked), maybe_incr_stats(QIncs, ack, State), - ok = notify_limiter(State#ch.limiter_pid, Acked), + ok = notify_limiter(State#ch.limiter_token, Acked), State. new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), @@ -1281,17 +1279,19 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> - {ok, LPid} = SLF(queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid. +enable_limiter(State = #ch{unacked_message_q = UAMQ, + limiter_token = LimiterToken}) -> + LimiterToken1 = rabbit_limiter:enable(LimiterToken, queue:len(UAMQ)), + ok = limit_queues(LimiterToken1, State), + LimiterToken1. -unlimit_queues(State) -> - ok = limit_queues(undefined, State), - undefined. +unlimit_queues(State = #ch{limiter_token = LimiterToken}) -> + LimiterToken1 = rabbit_limiter:disable(LimiterToken), + ok = limit_queues(LimiterToken1, State), + LimiterToken1. -limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). +limit_queues(Token, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Token). consumer_queues(Consumers) -> lists:usort([QPid || @@ -1302,14 +1302,16 @@ consumer_queues(Consumers) -> %% for messages delivered to subscribed consumers, but not acks for %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) -notify_limiter(undefined, _Acked) -> - ok; -notify_limiter(LimiterPid, Acked) -> - case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of - 0 -> ok; - Count -> rabbit_limiter:ack(LimiterPid, Count) +notify_limiter(LimiterToken, Acked) -> + case rabbit_limiter:is_enabled(LimiterToken) of + false -> ok; + true -> + case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of + 0 -> ok; + Count -> rabbit_limiter:ack(LimiterToken, Count) + end end. deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ @@ -1447,10 +1449,10 @@ i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> queue:len(TMQ); i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> queue:len(TAQ); -i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> - rabbit_limiter:get_limit(LimiterPid); -i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> - rabbit_limiter:is_blocked(LimiterPid); +i(prefetch_count, #ch{limiter_token = LimiterToken}) -> + rabbit_limiter:get_limit(LimiterToken); +i(client_flow_blocked, #ch{limiter_token = LimiterToken}) -> + rabbit_limiter:is_blocked(LimiterToken); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 65ccca02..9868cfe0 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -54,26 +54,27 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax, Protocol, ReaderPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), + LimiterToken = start_limiter(SupPid), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, Protocol, - User, VHost, Capabilities, Collector, - start_limiter_fun(SupPid)]}, + User, VHost, Capabilities, Collector, LimiterToken]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), + LimiterToken = start_limiter(SupPid), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector, - start_limiter_fun(SupPid)]}, + LimiterToken]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. @@ -82,12 +83,10 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, init([]) -> {ok, {{one_for_all, 0, 1}, []}}. -start_limiter_fun(SupPid) -> - fun (UnackedCount) -> - Me = self(), - {ok, _Pid} = - supervisor2:start_child( - SupPid, - {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]}, - transient, ?MAX_WAIT, worker, [rabbit_limiter]}) - end. +start_limiter(SupPid) -> + {ok, Pid} = + supervisor2:start_child( + SupPid, + {limiter, {rabbit_limiter, start_link, []}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}), + rabbit_limiter:make_new_token(Pid). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e79583fa..80e36721 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -20,27 +20,34 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). --export([start_link/2]). +-export([start_link/0, make_new_token/1, is_enabled/1, enable/2, disable/1]). +-export_type([limiter_token/0]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- --ifdef(use_specs). +-record(limiter_token, {pid, enabled = false}). --type(maybe_pid() :: pid() | 'undefined'). +-ifdef(use_specs). --spec(start_link/2 :: (pid(), non_neg_integer()) -> - rabbit_types:ok_pid_or_error()). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). --spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). --spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). --spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). --spec(block/1 :: (maybe_pid()) -> 'ok'). --spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). --spec(is_blocked/1 :: (maybe_pid()) -> boolean()). +-type(maybe_token() :: limiter_token() | 'undefined'). +-opaque(limiter_token() :: #'limiter_token'{}). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(make_new_token/1 :: (pid()) -> limiter_token()). +-spec(is_enabled/1 :: (limiter_token()) -> boolean()). +-spec(enable/2 :: (limiter_token(), non_neg_integer()) -> limiter_token()). +-spec(disable/1 :: (limiter_token()) -> limiter_token()). +-spec(limit/2 :: (maybe_token(), non_neg_integer()) -> 'ok' | 'stopped'). +-spec(can_send/3 :: (maybe_token(), pid(), boolean()) -> boolean()). +-spec(ack/2 :: (maybe_token(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (maybe_token(), pid()) -> 'ok'). +-spec(unregister/2 :: (maybe_token(), pid()) -> 'ok'). +-spec(get_limit/1 :: (maybe_token()) -> non_neg_integer()). +-spec(block/1 :: (maybe_token()) -> 'ok'). +-spec(unblock/1 :: (maybe_token()) -> 'ok' | 'stopped'). +-spec(is_blocked/1 :: (maybe_token()) -> boolean()). -endif. @@ -59,63 +66,68 @@ %% API %%---------------------------------------------------------------------------- -start_link(ChPid, UnackedMsgCount) -> - gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). +start_link() -> + gen_server2:start_link(?MODULE, [], []). -limit(undefined, 0) -> - ok; -limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity). +make_new_token(Pid) -> + #limiter_token{pid = Pid}. + +is_enabled(#limiter_token{enabled = Enabled}) -> + Enabled. + +enable(#limiter_token{pid = Pid} = Token, Volume) -> + gen_server2:call(Pid, {enable, Token, self(), Volume}). + +disable(#limiter_token{pid = Pid} = Token) -> + gen_server2:call(Pid, {disable, Token}). + +limit(LimiterToken, PrefetchCount) -> + maybe_call(LimiterToken, {limit, PrefetchCount, LimiterToken}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit can_send(undefined, _QPid, _AckRequired) -> true; -can_send(LimiterPid, QPid, AckRequired) -> +can_send(#limiter_token{enabled = false}, _QPid, _AckRequired) -> + true; +can_send(LimiterToken, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, - infinity) end). + fun () -> maybe_call(LimiterToken, {can_send, QPid, AckRequired}) end). %% Let the limiter know that the channel has received some acks from a %% consumer -ack(undefined, _Count) -> ok; -ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). +ack(LimiterToken, Count) -> maybe_cast(LimiterToken, {ack, Count}). -register(undefined, _QPid) -> ok; -register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). +register(LimiterToken, QPid) -> maybe_cast(LimiterToken, {register, QPid}). -unregister(undefined, _QPid) -> ok; -unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). +unregister(LimiterToken, QPid) -> maybe_cast(LimiterToken, + {unregister, QPid}). -get_limit(undefined) -> - 0; -get_limit(Pid) -> +get_limit(LimiterToken) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> gen_server2:call(Pid, get_limit, infinity) end). + fun () -> maybe_call(LimiterToken, get_limit) end). -block(undefined) -> - ok; -block(LimiterPid) -> - gen_server2:call(LimiterPid, block, infinity). +block(LimiterToken) -> + maybe_call(LimiterToken, block). -unblock(undefined) -> - ok; -unblock(LimiterPid) -> - gen_server2:call(LimiterPid, unblock, infinity). +unblock(LimiterToken) -> + maybe_call(LimiterToken, {unblock, LimiterToken}). is_blocked(undefined) -> false; -is_blocked(LimiterPid) -> - gen_server2:call(LimiterPid, is_blocked, infinity). +is_blocked(#limiter_token{enabled = false}) -> + false; +is_blocked(LimiterToken) -> + maybe_call(LimiterToken, is_blocked). %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([ChPid, UnackedMsgCount]) -> - {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +init([]) -> + {ok, #lim{}}. prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. @@ -135,23 +147,33 @@ handle_call({can_send, QPid, AckRequired}, _From, handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; -handle_call({limit, PrefetchCount}, _From, State) -> +handle_call({limit, PrefetchCount, Token}, _From, State) -> case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} + {cont, State1} -> + {reply, ok, State1}; + {stop, State1} -> + {reply, {disabled, Token#limiter_token{enabled = false}}, State1} end; handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; -handle_call(unblock, _From, State) -> +handle_call({unblock, Token}, _From, State) -> case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} + {cont, State1} -> + {reply, ok, State1}; + {stop, State1} -> + {reply, {disabled, Token#limiter_token{enabled = false}}, State1} end; handle_call(is_blocked, _From, State) -> - {reply, blocked(State), State}. + {reply, blocked(State), State}; + +handle_call({enable, Token, Channel, Volume}, _From, State) -> + {reply, Token#limiter_token{enabled = true}, + State#lim{ch_pid = Channel, volume = Volume}}; +handle_call({disable, Token}, _From, State) -> + {reply, Token#limiter_token{enabled = false}, State}. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; @@ -190,6 +212,20 @@ maybe_notify(OldState, NewState) -> false -> {cont, NewState} end. +maybe_call(undefined, _Call) -> + ok; +maybe_call(#limiter_token{enabled = false}, _Call) -> + ok; +maybe_call(#limiter_token{pid = Pid}, Call) -> + gen_server2:call(Pid, Call, infinity). + +maybe_cast(undefined, _Call) -> + ok; +maybe_cast(#limiter_token{enabled = false}, _Cast) -> + ok; +maybe_cast(#limiter_token{pid = Pid}, Cast) -> + gen_server2:cast(Pid, Cast). + limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. |