diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-17 12:48:23 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-17 12:48:23 +0100 |
commit | fad96e13a66375d7dd12914b5f83f23eefcb3792 (patch) | |
tree | 57fd96798307603198c86821c46be7ca7cfff1d5 | |
parent | 1a7940bbc28618ed34ed97eda70d6573631dd98d (diff) | |
parent | f6f05a7b83e32a1d8069fff0127545dd1e8a317a (diff) | |
download | rabbitmq-server-fad96e13a66375d7dd12914b5f83f23eefcb3792.tar.gz |
Merging default to bug24285
-rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 128 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 41 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 143 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 9 |
6 files changed, 210 insertions, 173 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fbea763c..07079375 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -119,12 +119,13 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) -> + ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: - (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined', - rabbit_types:ctag(), boolean(), any()) + (rabbit_types:amqqueue(), boolean(), pid(), + rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -440,19 +441,19 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). -limit_all(QPids, ChPid, LimiterPid) -> +limit_all(QPids, ChPid, Limiter) -> delegate:invoke_no_result( QPids, fun (QPid) -> - gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) + gen_server2:cast(QPid, {limit, ChPid, Limiter}) end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate_call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg) -> delegate_call(QPid, {basic_consume, NoAck, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1d63b351..ac40bd44 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -58,7 +58,7 @@ %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, - limiter_pid, + limiter, monitor_ref, acktags, is_limit_active, @@ -326,6 +326,7 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, acktags = sets:new(), is_limit_active = false, + limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, put(Key, C), C; @@ -346,9 +347,9 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, end. erase_ch_record(#cr{ch_pid = ChPid, - limiter_pid = LimiterPid, + limiter = Limiter, monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(LimiterPid, self()), + ok = rabbit_limiter:unregister(Limiter, self()), erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. @@ -373,12 +374,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, ActiveConsumersTail} -> - C = #cr{limiter_pid = LimiterPid, + C = #cr{limiter = Limiter, unsent_message_count = Count, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of + rabbit_limiter:can_send(Limiter, self(), AckRequired)) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), @@ -938,7 +939,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, Remaining, Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, LimiterPid, +handle_call({basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{exclusive_consumer = ExistingHolder}) -> case check_exclusive_access(ExistingHolder, ExclusiveConsume, @@ -949,10 +950,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + true = maybe_store_ch_record( + C#cr{consumer_count = ConsumerCount +1, + limiter = Limiter}), ok = case ConsumerCount of - 0 -> rabbit_limiter:register(LimiterPid, self()); + 0 -> rabbit_limiter:register(Limiter, self()); _ -> ok end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -985,12 +987,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); C = #cr{consumer_count = ConsumerCount, - limiter_pid = LimiterPid} -> + limiter = Limiter} -> C1 = C#cr{consumer_count = ConsumerCount -1}, maybe_store_ch_record( case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()), - C1#cr{limiter_pid = undefined}; + 1 -> ok = rabbit_limiter:unregister(Limiter, self()), + C1#cr{limiter = rabbit_limiter:make_token()}; _ -> C1 end), emit_consumer_deleted(ChPid, ConsumerTag), @@ -1096,20 +1098,23 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({limit, ChPid, LimiterPid}, State) -> +handle_cast({limit, ChPid, Limiter}, State) -> noreply( possibly_unblock( State, ChPid, - fun (C = #cr{consumer_count = ConsumerCount, - limiter_pid = OldLimiterPid, - is_limit_active = Limited}) -> - if ConsumerCount =/= 0 andalso OldLimiterPid == undefined -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> + fun (C = #cr{consumer_count = ConsumerCount, + limiter = OldLimiter, + is_limit_active = OldLimited}) -> + case (ConsumerCount =/= 0 andalso + not rabbit_limiter:is_enabled(OldLimiter)) of + true -> + ok = rabbit_limiter:register(Limiter, self()); + false -> ok end, - NewLimited = Limited andalso LimiterPid =/= undefined, - C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + Limited = + OldLimited andalso rabbit_limiter:is_enabled(Limiter), + C#cr{limiter = Limiter, is_limit_active = Limited} end)); handle_cast({flush, ChPid}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 93bff62b..151a9801 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,7 +33,7 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_pid, start_limiter_fun, tx_status, next_tag, + limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, @@ -74,8 +74,7 @@ -spec(start_link/10 :: (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> - rabbit_types:ok_pid_or_error()). + pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -103,10 +102,10 @@ %%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, StartLimiterFun) -> + Capabilities, CollectorPid, Limiter) -> gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, - VHost, Capabilities, CollectorPid, StartLimiterFun], []). + VHost, Capabilities, CollectorPid, Limiter], []). do(Pid, Method) -> do(Pid, Method, none). @@ -171,7 +170,7 @@ force_event_refresh() -> %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, StartLimiterFun]) -> + Capabilities, CollectorPid, Limiter]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), @@ -181,8 +180,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, reader_pid = ReaderPid, writer_pid = WriterPid, conn_pid = ConnPid, - limiter_pid = undefined, - start_limiter_fun = StartLimiterFun, + limiter = Limiter, tx_status = none, next_tag = 1, unacked_message_q = queue:new(), @@ -721,7 +719,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid, - limiter_pid = LimiterPid, + limiter = Limiter, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -740,7 +738,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), LimiterPid, + Q, NoAck, self(), Limiter, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), @@ -814,22 +812,26 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{limiter_pid = LimiterPid}) -> - LimiterPid1 = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> undefined; - {undefined, _} -> start_limiter(State); - {_, _} -> LimiterPid - end, - LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of - ok -> LimiterPid1; - stopped -> unlimit_queues(State) - end, - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, + State = #ch{limiter = Limiter}) -> + Limiter1 = + case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of + {false, 0} -> Limiter; + {false, _} -> enable_limiter(State); + {_, _} -> Limiter + end, + Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of + ok -> + Limiter1; + {disabled, Limiter2} -> + ok = limit_queues(Limiter2, State), + Limiter2 + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, - limiter_pid = LimiterPid}) -> + limiter = Limiter}) -> OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -843,7 +845,7 @@ handle_method(#'basic.recover_async'{requeue = true}, QPid, lists:reverse(MsgIds), self()) end) end, ok, UAMQ), - ok = notify_limiter(LimiterPid, UAMQ), + ok = notify_limiter(Limiter, UAMQ), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1090,23 +1092,26 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> NoWait, #'confirm.select_ok'{}); handle_method(#'channel.flow'{active = true}, _, - State = #ch{limiter_pid = LimiterPid}) -> - LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of - ok -> LimiterPid; - stopped -> unlimit_queues(State) - end, + State = #ch{limiter = Limiter}) -> + Limiter2 = case rabbit_limiter:unblock(Limiter) of + ok -> + Limiter; + {disabled, Limiter1} -> + ok = limit_queues(Limiter1, State), + Limiter1 + end, {reply, #'channel.flow_ok'{active = true}, - State#ch{limiter_pid = LimiterPid1}}; + State#ch{limiter = Limiter2}}; handle_method(#'channel.flow'{active = false}, _, - State = #ch{limiter_pid = LimiterPid, - consumer_mapping = Consumers}) -> - LimiterPid1 = case LimiterPid of - undefined -> start_limiter(State); - Other -> Other - end, - State1 = State#ch{limiter_pid = LimiterPid1}, - ok = rabbit_limiter:block(LimiterPid1), + State = #ch{consumer_mapping = Consumers, + limiter = Limiter}) -> + Limiter1 = case rabbit_limiter:is_enabled(Limiter) of + true -> Limiter; + false -> enable_limiter(State) + end, + State1 = State#ch{limiter = Limiter1}, + ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || @@ -1236,7 +1241,7 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter_pid, Acked), + ok = notify_limiter(State#ch.limiter, Acked), {noreply, State#ch{unacked_message_q = Remaining}}. ack_record(DeliveryTag, ConsumerTag, @@ -1273,7 +1278,7 @@ ack(Acked, State) -> [{QPid, length(MsgIds)} | L] end, [], Acked), maybe_incr_stats(QIncs, ack, State), - ok = notify_limiter(State#ch.limiter_pid, Acked), + ok = notify_limiter(State#ch.limiter, Acked), State. new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), @@ -1297,17 +1302,14 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> - {ok, LPid} = SLF(queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid. - -unlimit_queues(State) -> - ok = limit_queues(undefined, State), - undefined. +enable_limiter(State = #ch{unacked_message_q = UAMQ, + limiter = Limiter}) -> + Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)), + ok = limit_queues(Limiter1, State), + Limiter1. -limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). +limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter). consumer_queues(Consumers) -> lists:usort([QPid || @@ -1318,14 +1320,16 @@ consumer_queues(Consumers) -> %% for messages delivered to subscribed consumers, but not acks for %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) -notify_limiter(undefined, _Acked) -> - ok; -notify_limiter(LimiterPid, Acked) -> - case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of - 0 -> ok; - Count -> rabbit_limiter:ack(LimiterPid, Count) +notify_limiter(Limiter, Acked) -> + case rabbit_limiter:is_enabled(Limiter) of + false -> ok; + true -> case rabbit_misc:queue_fold( + fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of + 0 -> ok; + Count -> rabbit_limiter:ack(Limiter, Count) + end end. deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ @@ -1463,10 +1467,10 @@ i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> queue:len(TMQ); i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> queue:len(TAQ); -i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> - rabbit_limiter:get_limit(LimiterPid); -i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> - rabbit_limiter:is_blocked(LimiterPid); +i(prefetch_count, #ch{limiter = Limiter}) -> + rabbit_limiter:get_limit(Limiter); +i(client_flow_blocked, #ch{limiter = Limiter}) -> + rabbit_limiter:is_blocked(Limiter); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 65ccca02..a19b6bfd 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,47 +47,44 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, WriterPid} = - supervisor2:start_child( - SupPid, - {writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), + {ok, SupPid} = supervisor2:start_link(?MODULE, + {tcp, Sock, Channel, FrameMax, + ReaderPid, Protocol}), + [LimiterPid] = supervisor2:find_child(SupPid, limiter), + [WriterPid] = supervisor2:find_child(SupPid, writer), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, Protocol, User, VHost, Capabilities, Collector, - start_limiter_fun(SupPid)]}, + rabbit_limiter:make_token(LimiterPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, SupPid} = supervisor2:start_link(?MODULE, direct), + [LimiterPid] = supervisor2:find_child(SupPid, limiter), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector, - start_limiter_fun(SupPid)]}, + rabbit_limiter:make_token(LimiterPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- -init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. - -start_limiter_fun(SupPid) -> - fun (UnackedCount) -> - Me = self(), - {ok, _Pid} = - supervisor2:start_child( - SupPid, - {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]}, - transient, ?MAX_WAIT, worker, [rabbit_limiter]}) - end. +init(Type) -> + {ok, {{one_for_all, 0, 1}, child_specs(Type)}}. + +child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) -> + [{writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol, ReaderPid]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)]; +child_specs(direct) -> + [{limiter, {rabbit_limiter, start_link, []}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}]. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8f9ab032..2d367707 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -20,27 +20,36 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). --export([start_link/2]). +-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, + disable/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- --ifdef(use_specs). +-record(token, {pid, enabled}). --type(maybe_pid() :: pid() | 'undefined'). +-ifdef(use_specs). --spec(start_link/2 :: (pid(), non_neg_integer()) -> - rabbit_types:ok_pid_or_error()). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). --spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). --spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). --spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). --spec(block/1 :: (maybe_pid()) -> 'ok'). --spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). --spec(is_blocked/1 :: (maybe_pid()) -> boolean()). +-export_type([token/0]). + +-opaque(token() :: #token{}). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(make_token/0 :: () -> token()). +-spec(make_token/1 :: (undefined | pid()) -> token()). +-spec(is_enabled/1 :: (token()) -> boolean()). +-spec(enable/2 :: (token(), non_neg_integer()) -> token()). +-spec(disable/1 :: (token()) -> token()). +-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). +-spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). +-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (token(), pid()) -> 'ok'). +-spec(unregister/2 :: (token(), pid()) -> 'ok'). +-spec(get_limit/1 :: (token()) -> non_neg_integer()). +-spec(block/1 :: (token()) -> 'ok'). +-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). +-spec(is_blocked/1 :: (token()) -> boolean()). -endif. @@ -59,63 +68,63 @@ %% API %%---------------------------------------------------------------------------- -start_link(ChPid, UnackedMsgCount) -> - gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). +start_link() -> gen_server2:start_link(?MODULE, [], []). + +make_token() -> make_token(undefined). +make_token(Pid) -> #token{pid = Pid, enabled = false}. + +is_enabled(#token{enabled = Enabled}) -> Enabled. + +enable(#token{pid = Pid} = Token, Volume) -> + gen_server2:call(Pid, {enable, Token, self(), Volume}). -limit(undefined, 0) -> - ok; -limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity). +disable(#token{pid = Pid} = Token) -> + gen_server2:call(Pid, {disable, Token}). + +limit(Limiter, PrefetchCount) -> + maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok). %% Ask the limiter whether the queue can deliver a message without -%% breaching a limit -can_send(undefined, _QPid, _AckRequired) -> - true; -can_send(LimiterPid, QPid, AckRequired) -> +%% breaching a limit. Note that we don't use maybe_call here in order +%% to avoid always going through with_exit_handler/2, even when the +%% limiter is disabled. +can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, - infinity) end). + fun () -> + gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) + end); +can_send(_, _, _) -> + true. %% Let the limiter know that the channel has received some acks from a %% consumer -ack(undefined, _Count) -> ok; -ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). +ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). -register(undefined, _QPid) -> ok; -register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). +register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). -unregister(undefined, _QPid) -> ok; -unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). +unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}). -get_limit(undefined) -> - 0; -get_limit(Pid) -> +get_limit(Limiter) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> gen_server2:call(Pid, get_limit, infinity) end). + fun () -> maybe_call(Limiter, get_limit, ok) end). -block(undefined) -> - ok; -block(LimiterPid) -> - gen_server2:call(LimiterPid, block, infinity). +block(Limiter) -> + maybe_call(Limiter, block, ok). -unblock(undefined) -> - ok; -unblock(LimiterPid) -> - gen_server2:call(LimiterPid, unblock, infinity). +unblock(Limiter) -> + maybe_call(Limiter, {unblock, Limiter}, ok). -is_blocked(undefined) -> - false; -is_blocked(LimiterPid) -> - gen_server2:call(LimiterPid, is_blocked, infinity). +is_blocked(Limiter) -> + maybe_call(Limiter, is_blocked, false). %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([ChPid, UnackedMsgCount]) -> - {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +init([]) -> + {ok, #lim{}}. prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. @@ -135,23 +144,33 @@ handle_call({can_send, QPid, AckRequired}, _From, handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; -handle_call({limit, PrefetchCount}, _From, State) -> +handle_call({limit, PrefetchCount, Token}, _From, State) -> case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} + {cont, State1} -> + {reply, ok, State1}; + {stop, State1} -> + {reply, {disabled, Token#token{enabled = false}}, State1} end; handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; -handle_call(unblock, _From, State) -> +handle_call({unblock, Token}, _From, State) -> case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} + {cont, State1} -> + {reply, ok, State1}; + {stop, State1} -> + {reply, {disabled, Token#token{enabled = false}}, State1} end; handle_call(is_blocked, _From, State) -> - {reply, blocked(State), State}. + {reply, blocked(State), State}; + +handle_call({enable, Token, Channel, Volume}, _From, State) -> + {reply, Token#token{enabled = true}, + State#lim{ch_pid = Channel, volume = Volume}}; +handle_call({disable, Token}, _From, State) -> + {reply, Token#token{enabled = false}, State}. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; @@ -190,6 +209,16 @@ maybe_notify(OldState, NewState) -> false -> {cont, NewState} end. +maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) -> + gen_server2:call(Pid, Call, infinity); +maybe_call(_, _Call, Default) -> + Default. + +maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> + gen_server2:cast(Pid, Cast); +maybe_cast(_, _Call) -> + ok. + limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a068efe5..699cfbce 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1203,15 +1203,16 @@ test_server_status() -> {ok, Ch} = rabbit_channel:start_link( 1, self(), Writer, self(), rabbit_framing_amqp_0_9_1, user(<<"user">>), <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + rabbit_limiter:make_token(self())), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], - ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, - <<"ctag">>, true, undefined), + ok = rabbit_amqqueue:basic_consume( + Q, true, Ch, rabbit_limiter:make_token(), + <<"ctag">>, true, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -1270,7 +1271,7 @@ test_spawn() -> {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, rabbit_framing_amqp_0_9_1, user(<<"guest">>), <<"/">>, [], Me, - fun (_) -> {ok, Me} end), + rabbit_limiter:make_token(self())), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) |