summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-08-17 14:08:02 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-08-17 14:08:02 +0100
commit8b3ff283afea04bd8ae1a53e941e10b759e955ee (patch)
tree217d990770ad17c32cf1f94258caa5eccd5cca86
parent70fcd8fc6acc17a51f668821a3581b972dcd7d79 (diff)
parentda22e75f58ec28d6597bc721e7eedba5ecdcc317 (diff)
downloadrabbitmq-server-8b3ff283afea04bd8ae1a53e941e10b759e955ee.tar.gz
Merging bug24285 to default
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_amqqueue_process.erl46
-rw-r--r--src/rabbit_channel.erl123
-rw-r--r--src/rabbit_channel_sup.erl41
-rw-r--r--src/rabbit_limiter.erl143
-rw-r--r--src/rabbit_tests.erl9
6 files changed, 202 insertions, 177 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fbea763c..88ff26cc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -119,12 +119,13 @@
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) ->
+ ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
- rabbit_types:ctag(), boolean(), any())
+ (rabbit_types:amqqueue(), boolean(), pid(),
+ rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -440,19 +441,17 @@ notify_down_all(QPids, ChPid) ->
fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
-limit_all(QPids, ChPid, LimiterPid) ->
+limit_all(QPids, ChPid, Limiter) ->
delegate:invoke_no_result(
- QPids, fun (QPid) ->
- gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
- end).
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate_call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg) ->
delegate_call(QPid, {basic_consume, NoAck, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
+ Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1d63b351..11a95a62 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -58,7 +58,7 @@
%% These are held in our process dictionary
-record(cr, {consumer_count,
ch_pid,
- limiter_pid,
+ limiter,
monitor_ref,
acktags,
is_limit_active,
@@ -326,6 +326,7 @@ ch_record(ChPid) ->
monitor_ref = MonitorRef,
acktags = sets:new(),
is_limit_active = false,
+ limiter = rabbit_limiter:make_token(),
unsent_message_count = 0},
put(Key, C),
C;
@@ -346,9 +347,9 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
end.
erase_ch_record(#cr{ch_pid = ChPid,
- limiter_pid = LimiterPid,
+ limiter = Limiter,
monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(LimiterPid, self()),
+ ok = rabbit_limiter:unregister(Limiter, self()),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
@@ -373,12 +374,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
ActiveConsumersTail} ->
- C = #cr{limiter_pid = LimiterPid,
+ C = #cr{limiter = Limiter,
unsent_message_count = Count,
acktags = ChAckTags} = ch_record(ChPid),
IsMsgReady = PredFun(FunAcc, State),
case (IsMsgReady andalso
- rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
+ rabbit_limiter:can_send(Limiter, self(), AckRequired)) of
true ->
{{Message, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc, State),
@@ -938,7 +939,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, Remaining, Msg}, State3)
end;
-handle_call({basic_consume, NoAck, ChPid, LimiterPid,
+handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{exclusive_consumer = ExistingHolder}) ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume,
@@ -949,10 +950,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
+ true = maybe_store_ch_record(
+ C#cr{consumer_count = ConsumerCount +1,
+ limiter = Limiter}),
ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(LimiterPid, self());
+ 0 -> rabbit_limiter:register(Limiter, self());
_ -> ok
end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -985,12 +987,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
C = #cr{consumer_count = ConsumerCount,
- limiter_pid = LimiterPid} ->
+ limiter = Limiter} ->
C1 = C#cr{consumer_count = ConsumerCount -1},
maybe_store_ch_record(
case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()),
- C1#cr{limiter_pid = undefined};
+ 1 -> ok = rabbit_limiter:unregister(Limiter, self()),
+ C1#cr{limiter = rabbit_limiter:make_token()};
_ -> C1
end),
emit_consumer_deleted(ChPid, ConsumerTag),
@@ -1096,20 +1098,20 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({limit, ChPid, LimiterPid}, State) ->
+handle_cast({limit, ChPid, Limiter}, State) ->
noreply(
possibly_unblock(
State, ChPid,
- fun (C = #cr{consumer_count = ConsumerCount,
- limiter_pid = OldLimiterPid,
- is_limit_active = Limited}) ->
- if ConsumerCount =/= 0 andalso OldLimiterPid == undefined ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true ->
- ok
+ fun (C = #cr{consumer_count = ConsumerCount,
+ limiter = OldLimiter,
+ is_limit_active = OldLimited}) ->
+ case (ConsumerCount =/= 0 andalso
+ not rabbit_limiter:is_enabled(OldLimiter)) of
+ true -> ok = rabbit_limiter:register(Limiter, self());
+ false -> ok
end,
- NewLimited = Limited andalso LimiterPid =/= undefined,
- C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
+ Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter),
+ C#cr{limiter = Limiter, is_limit_active = Limited}
end));
handle_cast({flush, ChPid}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 93bff62b..dfe84644 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,7 +33,7 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter_pid, start_limiter_fun, tx_status, next_tag,
+ limiter, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
@@ -74,8 +74,7 @@
-spec(start_link/10 ::
(channel_number(), pid(), pid(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
- rabbit_types:ok_pid_or_error()).
+ pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -103,10 +102,10 @@
%%----------------------------------------------------------------------------
start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
- Capabilities, CollectorPid, StartLimiterFun) ->
+ Capabilities, CollectorPid, Limiter) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User,
- VHost, Capabilities, CollectorPid, StartLimiterFun], []).
+ VHost, Capabilities, CollectorPid, Limiter], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -171,7 +170,7 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
- Capabilities, CollectorPid, StartLimiterFun]) ->
+ Capabilities, CollectorPid, Limiter]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
@@ -181,8 +180,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
reader_pid = ReaderPid,
writer_pid = WriterPid,
conn_pid = ConnPid,
- limiter_pid = undefined,
- start_limiter_fun = StartLimiterFun,
+ limiter = Limiter,
tx_status = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -721,7 +719,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid,
- limiter_pid = LimiterPid,
+ limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -740,7 +738,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), LimiterPid,
+ Q, NoAck, self(), Limiter,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
@@ -814,22 +812,23 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{limiter_pid = LimiterPid}) ->
- LimiterPid1 = case {LimiterPid, PrefetchCount} of
- {undefined, 0} -> undefined;
- {undefined, _} -> start_limiter(State);
- {_, _} -> LimiterPid
- end,
- LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of
- ok -> LimiterPid1;
- stopped -> unlimit_queues(State)
- end,
- {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
+ State = #ch{limiter = Limiter}) ->
+ Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of
+ {false, 0} -> Limiter;
+ {false, _} -> enable_limiter(State);
+ {_, _} -> Limiter
+ end,
+ Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of
+ ok -> Limiter1;
+ {disabled, Limiter2} -> ok = limit_queues(Limiter2, State),
+ Limiter2
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
- limiter_pid = LimiterPid}) ->
+ limiter = Limiter}) ->
OkFun = fun () -> ok end,
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -843,7 +842,7 @@ handle_method(#'basic.recover_async'{requeue = true},
QPid, lists:reverse(MsgIds), self())
end)
end, ok, UAMQ),
- ok = notify_limiter(LimiterPid, UAMQ),
+ ok = notify_limiter(Limiter, UAMQ),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1090,23 +1089,23 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
NoWait, #'confirm.select_ok'{});
handle_method(#'channel.flow'{active = true}, _,
- State = #ch{limiter_pid = LimiterPid}) ->
- LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
- ok -> LimiterPid;
- stopped -> unlimit_queues(State)
- end,
- {reply, #'channel.flow_ok'{active = true},
- State#ch{limiter_pid = LimiterPid1}};
+ State = #ch{limiter = Limiter}) ->
+ Limiter2 = case rabbit_limiter:unblock(Limiter) of
+ ok -> Limiter;
+ {disabled, Limiter1} -> ok = limit_queues(Limiter1, State),
+ Limiter1
+ end,
+ {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}};
handle_method(#'channel.flow'{active = false}, _,
- State = #ch{limiter_pid = LimiterPid,
- consumer_mapping = Consumers}) ->
- LimiterPid1 = case LimiterPid of
- undefined -> start_limiter(State);
- Other -> Other
- end,
- State1 = State#ch{limiter_pid = LimiterPid1},
- ok = rabbit_limiter:block(LimiterPid1),
+ State = #ch{consumer_mapping = Consumers,
+ limiter = Limiter}) ->
+ Limiter1 = case rabbit_limiter:is_enabled(Limiter) of
+ true -> Limiter;
+ false -> enable_limiter(State)
+ end,
+ State1 = State#ch{limiter = Limiter1},
+ ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
@@ -1236,7 +1235,7 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked),
{noreply, State#ch{unacked_message_q = Remaining}}.
ack_record(DeliveryTag, ConsumerTag,
@@ -1273,7 +1272,7 @@ ack(Acked, State) ->
[{QPid, length(MsgIds)} | L]
end, [], Acked),
maybe_incr_stats(QIncs, ack, State),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked),
State.
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
@@ -1297,17 +1296,14 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) ->
- {ok, LPid} = SLF(queue:len(UAMQ)),
- ok = limit_queues(LPid, State),
- LPid.
-
-unlimit_queues(State) ->
- ok = limit_queues(undefined, State),
- undefined.
+enable_limiter(State = #ch{unacked_message_q = UAMQ,
+ limiter = Limiter}) ->
+ Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)),
+ ok = limit_queues(Limiter1, State),
+ Limiter1.
-limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
+limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter).
consumer_queues(Consumers) ->
lists:usort([QPid ||
@@ -1318,14 +1314,15 @@ consumer_queues(Consumers) ->
%% for messages delivered to subscribed consumers, but not acks for
%% messages sent in a response to a basic.get (identified by their
%% 'none' consumer tag)
-notify_limiter(undefined, _Acked) ->
- ok;
-notify_limiter(LimiterPid, Acked) ->
- case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
- 0 -> ok;
- Count -> rabbit_limiter:ack(LimiterPid, Count)
+notify_limiter(Limiter, Acked) ->
+ case rabbit_limiter:is_enabled(Limiter) of
+ false -> ok;
+ true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(Limiter, Count)
+ end
end.
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
@@ -1463,10 +1460,10 @@ i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
queue:len(TMQ);
i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
queue:len(TAQ);
-i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
- rabbit_limiter:get_limit(LimiterPid);
-i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
- rabbit_limiter:is_blocked(LimiterPid);
+i(prefetch_count, #ch{limiter = Limiter}) ->
+ rabbit_limiter:get_limit(Limiter);
+i(client_flow_blocked, #ch{limiter = Limiter}) ->
+ rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 65ccca02..a19b6bfd 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -47,47 +47,44 @@
start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost,
Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE, []),
- {ok, WriterPid} =
- supervisor2:start_child(
- SupPid,
- {writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, SupPid} = supervisor2:start_link(?MODULE,
+ {tcp, Sock, Channel, FrameMax,
+ ReaderPid, Protocol}),
+ [LimiterPid] = supervisor2:find_child(SupPid, limiter),
+ [WriterPid] = supervisor2:find_child(SupPid, writer),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, Protocol,
User, VHost, Capabilities, Collector,
- start_limiter_fun(SupPid)]},
+ rabbit_limiter:make_token(LimiterPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost,
Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, SupPid} = supervisor2:start_link(?MODULE, direct),
+ [LimiterPid] = supervisor2:find_child(SupPid, limiter),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
Protocol, User, VHost, Capabilities, Collector,
- start_limiter_fun(SupPid)]},
+ rabbit_limiter:make_token(LimiterPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, {{one_for_all, 0, 1}, []}}.
-
-start_limiter_fun(SupPid) ->
- fun (UnackedCount) ->
- Me = self(),
- {ok, _Pid} =
- supervisor2:start_child(
- SupPid,
- {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]},
- transient, ?MAX_WAIT, worker, [rabbit_limiter]})
- end.
+init(Type) ->
+ {ok, {{one_for_all, 0, 1}, child_specs(Type)}}.
+
+child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) ->
+ [{writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol, ReaderPid]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)];
+child_specs(direct) ->
+ [{limiter, {rabbit_limiter, start_link, []},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]}].
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d43ec1fc..66a4f89b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -20,27 +20,36 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
--export([start_link/2]).
+-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
+ disable/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
%%----------------------------------------------------------------------------
--ifdef(use_specs).
+-record(token, {pid, enabled}).
--type(maybe_pid() :: pid() | 'undefined').
+-ifdef(use_specs).
--spec(start_link/2 :: (pid(), non_neg_integer()) ->
- rabbit_types:ok_pid_or_error()).
--spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
--spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
--spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
--spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
--spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
--spec(block/1 :: (maybe_pid()) -> 'ok').
--spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
--spec(is_blocked/1 :: (maybe_pid()) -> boolean()).
+-export_type([token/0]).
+
+-opaque(token() :: #token{}).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(make_token/0 :: () -> token()).
+-spec(make_token/1 :: ('undefined' | pid()) -> token()).
+-spec(is_enabled/1 :: (token()) -> boolean()).
+-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
+-spec(disable/1 :: (token()) -> token()).
+-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
+-spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
+-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
+-spec(register/2 :: (token(), pid()) -> 'ok').
+-spec(unregister/2 :: (token(), pid()) -> 'ok').
+-spec(get_limit/1 :: (token()) -> non_neg_integer()).
+-spec(block/1 :: (token()) -> 'ok').
+-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
+-spec(is_blocked/1 :: (token()) -> boolean()).
-endif.
@@ -59,63 +68,63 @@
%% API
%%----------------------------------------------------------------------------
-start_link(ChPid, UnackedMsgCount) ->
- gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []).
+start_link() -> gen_server2:start_link(?MODULE, [], []).
+
+make_token() -> make_token(undefined).
+make_token(Pid) -> #token{pid = Pid, enabled = false}.
+
+is_enabled(#token{enabled = Enabled}) -> Enabled.
+
+enable(#token{pid = Pid} = Token, Volume) ->
+ gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity).
-limit(undefined, 0) ->
- ok;
-limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity).
+disable(#token{pid = Pid} = Token) ->
+ gen_server2:call(Pid, {disable, Token}, infinity).
+
+limit(Limiter, PrefetchCount) ->
+ maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok).
%% Ask the limiter whether the queue can deliver a message without
-%% breaching a limit
-can_send(undefined, _QPid, _AckRequired) ->
- true;
-can_send(LimiterPid, QPid, AckRequired) ->
+%% breaching a limit. Note that we don't use maybe_call here in order
+%% to avoid always going through with_exit_handler/2, even when the
+%% limiter is disabled.
+can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
- infinity) end).
+ fun () ->
+ gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
+ end);
+can_send(_, _, _) ->
+ true.
%% Let the limiter know that the channel has received some acks from a
%% consumer
-ack(undefined, _Count) -> ok;
-ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}).
+ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
-register(undefined, _QPid) -> ok;
-register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
+register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}).
-unregister(undefined, _QPid) -> ok;
-unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}).
+unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}).
-get_limit(undefined) ->
- 0;
-get_limit(Pid) ->
+get_limit(Limiter) ->
rabbit_misc:with_exit_handler(
fun () -> 0 end,
- fun () -> gen_server2:call(Pid, get_limit, infinity) end).
+ fun () -> maybe_call(Limiter, get_limit, ok) end).
-block(undefined) ->
- ok;
-block(LimiterPid) ->
- gen_server2:call(LimiterPid, block, infinity).
+block(Limiter) ->
+ maybe_call(Limiter, block, ok).
-unblock(undefined) ->
- ok;
-unblock(LimiterPid) ->
- gen_server2:call(LimiterPid, unblock, infinity).
+unblock(Limiter) ->
+ maybe_call(Limiter, {unblock, Limiter}, ok).
-is_blocked(undefined) ->
- false;
-is_blocked(LimiterPid) ->
- gen_server2:call(LimiterPid, is_blocked, infinity).
+is_blocked(Limiter) ->
+ maybe_call(Limiter, is_blocked, false).
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([ChPid, UnackedMsgCount]) ->
- {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+init([]) ->
+ {ok, #lim{}}.
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
@@ -135,23 +144,33 @@ handle_call({can_send, QPid, AckRequired}, _From,
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
-handle_call({limit, PrefetchCount}, _From, State) ->
+handle_call({limit, PrefetchCount, Token}, _From, State) ->
case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
- {cont, State1} -> {reply, ok, State1};
- {stop, State1} -> {stop, normal, stopped, State1}
+ {cont, State1} ->
+ {reply, ok, State1};
+ {stop, State1} ->
+ {reply, {disabled, Token#token{enabled = false}}, State1}
end;
handle_call(block, _From, State) ->
{reply, ok, State#lim{blocked = true}};
-handle_call(unblock, _From, State) ->
+handle_call({unblock, Token}, _From, State) ->
case maybe_notify(State, State#lim{blocked = false}) of
- {cont, State1} -> {reply, ok, State1};
- {stop, State1} -> {stop, normal, stopped, State1}
+ {cont, State1} ->
+ {reply, ok, State1};
+ {stop, State1} ->
+ {reply, {disabled, Token#token{enabled = false}}, State1}
end;
handle_call(is_blocked, _From, State) ->
- {reply, blocked(State), State}.
+ {reply, blocked(State), State};
+
+handle_call({enable, Token, Channel, Volume}, _From, State) ->
+ {reply, Token#token{enabled = true},
+ State#lim{ch_pid = Channel, volume = Volume}};
+handle_call({disable, Token}, _From, State) ->
+ {reply, Token#token{enabled = false}, State}.
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
@@ -190,6 +209,16 @@ maybe_notify(OldState, NewState) ->
false -> {cont, NewState}
end.
+maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) ->
+ gen_server2:call(Pid, Call, infinity);
+maybe_call(_, _Call, Default) ->
+ Default.
+
+maybe_cast(#token{pid = Pid, enabled = true}, Cast) ->
+ gen_server2:cast(Pid, Cast);
+maybe_cast(_, _Call) ->
+ ok.
+
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a068efe5..699cfbce 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1203,15 +1203,16 @@ test_server_status() ->
{ok, Ch} = rabbit_channel:start_link(
1, self(), Writer, self(), rabbit_framing_amqp_0_9_1,
user(<<"user">>), <<"/">>, [], self(),
- fun (_) -> {ok, self()} end),
+ rabbit_limiter:make_token(self())),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
- ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
- <<"ctag">>, true, undefined),
+ ok = rabbit_amqqueue:basic_consume(
+ Q, true, Ch, rabbit_limiter:make_token(),
+ <<"ctag">>, true, undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -1270,7 +1271,7 @@ test_spawn() ->
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], Me,
- fun (_) -> {ok, Me} end),
+ rabbit_limiter:make_token(self())),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)