summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-23 15:02:32 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-23 15:02:32 +0000
commit0d160f405915259c1b7aa64a8f8c1ecf07978434 (patch)
tree738f04023d34bb9a85de835a37d0fe0f3616a5e7
parentcf1272653c67dd02824a416999f6cfb1891f829c (diff)
parent05fa30a345228631d8c7002252ad133ef38e0e9e (diff)
downloadrabbitmq-server-0d160f405915259c1b7aa64a8f8c1ecf07978434.tar.gz
merge default into bug18557
-rw-r--r--ebin/rabbit.app1
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_amqqueue_process.erl155
-rw-r--r--src/rabbit_channel.erl40
-rw-r--r--src/rabbit_limiter.erl145
5 files changed, 286 insertions, 72 deletions
diff --git a/ebin/rabbit.app b/ebin/rabbit.app
index 70a13208..e377a33a 100644
--- a/ebin/rabbit.app
+++ b/ebin/rabbit.app
@@ -19,6 +19,7 @@
rabbit_framing_channel,
rabbit_framing,
rabbit_heartbeat,
+ rabbit_limiter,
rabbit_load,
rabbit_log,
rabbit_memsup_linux,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b9abb29..24ded98c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,8 +37,9 @@
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2]).
+-export([unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
@@ -94,12 +95,13 @@
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -265,10 +267,10 @@ claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server:call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -276,6 +278,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server:cast(QPid, {notify_sent, ChPid}).
+unblock(QPid, ChPid) ->
+ gen_server:cast(QPid, {unblock, ChPid}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 709e355e..53b569b4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -62,9 +62,10 @@
%% These are held in our process dictionary
-record(cr, {consumers,
ch_pid,
+ limiter_pid,
monitor_ref,
unacked_messages,
- is_overload_protection_active,
+ is_limit_active,
unsent_message_count}).
-define(INFO_KEYS,
@@ -131,7 +132,7 @@ ch_record(ChPid) ->
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
- is_overload_protection_active = false,
+ is_limit_active = false,
unsent_message_count = 0},
put(Key, C),
C;
@@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
-update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Active,
- unsent_message_count = Count}) ->
- {Result, NewActive} =
- if
- not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true};
- Active and (Count == 0) ->
- {unblock_ch, false};
- true ->
- {ok, Active}
- end,
- store_ch_record(C#cr{is_overload_protection_active = NewActive}),
- Result.
+is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
+ Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+
+ch_record_state_transition(OldCR, NewCR) ->
+ BlockedOld = is_ch_blocked(OldCR),
+ BlockedNew = is_ch_blocked(NewCR),
+ if BlockedOld andalso not(BlockedNew) -> unblock;
+ BlockedNew andalso not(BlockedOld) -> block;
+ true -> ok
+ end.
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
@@ -165,31 +162,59 @@ deliver_immediately(Message, Delivered,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(RoundRobin) of
- {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}},
+ {{value, QEntry = {ChPid,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired = true}}},
RoundRobinTail} ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
- unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}};
+ % Use Qos Limits if an ack is required
+ % Query the limiter to find out if a limit has been breached
+ C = #cr{limiter_pid = LimiterPid} = ch_record(ChPid),
+ case rabbit_limiter:can_send(LimiterPid, self()) of
+ true ->
+ really_deliver(AckRequired, ChPid, ConsumerTag,
+ Delivered, Message, NextId, QName,
+ QEntry, RoundRobinTail, State);
+ false ->
+ % Have another go by cycling through the consumer
+ % queue
+ store_ch_record(C#cr{is_limit_active = true}),
+ NewConsumers = block_consumers(ChPid, RoundRobinTail),
+ deliver_immediately(Message, Delivered,
+ State#q{round_robin = NewConsumers})
+ end;
+ {{value, QEntry = {ChPid,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired = false}}},
+ RoundRobinTail} ->
+ really_deliver(AckRequired, ChPid, ConsumerTag,
+ Delivered, Message, NextId, QName,
+ QEntry, RoundRobinTail, State);
{empty, _} ->
- not_offered
+ {not_offered, State}
end.
+% TODO The arity of this function seems a bit large :-(
+really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId,
+ QName, QEntry, RoundRobinTail, State) ->
+ rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ C = #cr{unsent_message_count = Count,
+ unacked_messages = UAM} = ch_record(ChPid),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId +1}}.
+
attempt_delivery(none, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
@@ -198,8 +223,8 @@ attempt_delivery(none, Message, State) ->
persist_message(none, qname(State), Message),
persist_delivery(qname(State), Message, false),
{true, State1};
- not_offered ->
- {false, State}
+ {not_offered, State1} ->
+ {false, State1}
end;
attempt_delivery(Txn, Message, State) ->
persist_message(Txn, qname(State), Message),
@@ -237,16 +262,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) ->
(CP /= ChPid) or (CT /= ConsumerTag)
end, queue:to_list(RoundRobin))).
-possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid},
- State = #q{round_robin = RoundRobin}) ->
- case update_store_and_maybe_block_ch(C) of
- ok ->
+possibly_unblock(State, ChPid, Update) ->
+ case lookup_ch(ChPid) of
+ not_found ->
State;
- unblock_ch ->
- run_poke_burst(State#q{round_robin =
- unblock_consumers(ChPid, Consumers, RoundRobin)})
+ C ->
+ NewC = Update(C),
+ store_ch_record(NewC),
+ case ch_record_state_transition(C, NewC) of
+ ok -> State;
+ unblock -> NewRR = unblock_consumers(ChPid,
+ NewC#cr.consumers,
+ State#q.round_robin),
+ run_poke_burst(State#q{round_robin = NewRR})
+ end
end.
-
+
check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
{continue, State};
check_auto_delete(State = #q{has_had_consumers = false}) ->
@@ -301,7 +332,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
{stop, normal, NewState}
end
end.
-
+
cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
none;
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
@@ -334,8 +365,8 @@ run_poke_burst(MessageBuffer, State) ->
{offered, false, NewState} ->
persist_auto_ack(qname(State), Message),
run_poke_burst(BufferTail, NewState);
- not_offered ->
- State#q{message_buffer = MessageBuffer}
+ {not_offered, NewState} ->
+ NewState#q{message_buffer = MessageBuffer}
end;
{empty, _} ->
State#q{message_buffer = MessageBuffer}
@@ -585,8 +616,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
- ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+ ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
@@ -600,7 +631,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers]},
+ C1 = C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid},
store_ch_record(C1),
State1 = State#q{has_had_consumers = true,
exclusive_consumer =
@@ -729,14 +761,17 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end));
+
handle_cast({notify_sent, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found -> noreply(State);
- T = #cr{unsent_message_count =Count} ->
- noreply(possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State))
- end.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - 1}
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 19104bcb..af1923a7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -42,7 +42,7 @@
-record(ch, {state, proxy_pid, reader_pid, writer_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host,
+ username, virtual_host, limiter,
most_recently_declared_queue, consumer_mapping}).
%%----------------------------------------------------------------------------
@@ -108,6 +108,8 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ % TODO See point 3.1.1 of the design - start the limiter lazily
+ limiter = rabbit_limiter:start_link(ProxyPid),
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
@@ -290,7 +292,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
{noreply, case TxnKey of
- none -> State#ch{unacked_message_q = Remaining};
+ none -> ok = notify_limiter(State#ch.limiter, Acked),
+ State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
@@ -334,6 +337,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ proxy_pid = ProxyPid,
reader_pid = ReaderPid,
+ limiter = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -351,7 +355,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, ProxyPid,
+ Q, NoAck, ReaderPid, ProxyPid, LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -416,8 +420,19 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{}, _, State) ->
- %% FIXME: Need to implement QOS
+handle_method(#'basic.qos'{global = Flag = true}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented,
+ "Global flag (~s) for basic.qos not implementented", [Flag]);
+
+handle_method(#'basic.qos'{prefetch_size = Size},
+ _, _State) when Size /= 0 ->
+ rabbit_misc:protocol_error(not_implemented,
+ "Pre-fetch size (~s) for basic.qos not implementented",
+ [Size]);
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter}) ->
+ ok = rabbit_limiter:limit(Limiter, PrefetchCount),
{reply, #'basic.qos_ok'{}, State};
handle_method(#'basic.recover'{requeue = true},
@@ -764,7 +779,9 @@ internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
- ok -> new_tx(State);
+ ok -> ok = notify_limiter(State#ch.limiter,
+ State#ch.uncommitted_ack_q),
+ new_tx(State);
{error, Errors} -> rabbit_misc:protocol_error(
internal_error, "commit failed: ~w", [Errors])
end.
@@ -815,6 +832,17 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
end],
ProxyPid).
+%% tell the limiter about the number of acks that have been received
+%% for messages delivered to subscribed consumers, rather than those
+%% for messages sent in a response to a basic.get
+notify_limiter(Limiter, Acked) ->
+ case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, queue:to_list(Acked)) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(Limiter, Count)
+ end.
+
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
case Mode of
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
new file mode 100644
index 00000000..12632625
--- /dev/null
+++ b/src/rabbit_limiter.erl
@@ -0,0 +1,145 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_limiter).
+
+-behaviour(gen_server).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+-export([start_link/1]).
+-export([limit/2, can_send/2, ack/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(can_send/2 :: (pid(), pid()) -> bool()).
+-spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-record(lim, {prefetch_count = 0,
+ ch_pid,
+ queues = dict:new(),
+ in_use = 0}).
+
+%%----------------------------------------------------------------------------
+%% API
+%%----------------------------------------------------------------------------
+
+start_link(ChPid) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
+ Pid.
+
+limit(LimiterPid, PrefetchCount) ->
+ gen_server:cast(LimiterPid, {limit, PrefetchCount}).
+
+%% Ask the limiter whether the queue can deliver a message without
+%% breaching a limit
+can_send(LimiterPid, QPid) ->
+ gen_server:call(LimiterPid, {can_send, QPid}).
+
+%% Let the limiter know that the channel has received some acks from a
+%% consumer
+ack(LimiterPid, Count) ->
+ gen_server:cast(LimiterPid, {ack, Count}).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([ChPid]) ->
+ {ok, #lim{ch_pid = ChPid} }.
+
+handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) ->
+ case limit_reached(State) of
+ true -> {reply, false, remember_queue(QPid, State)};
+ false -> {reply, true, State#lim{in_use = InUse + 1}}
+ end.
+
+handle_cast({limit, PrefetchCount}, State) ->
+ {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
+
+handle_cast({ack, Count}, State = #lim{in_use = InUse}) ->
+ NewInUse = if InUse == 0 -> 0;
+ true -> InUse - Count
+ end,
+ {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}.
+
+handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info},
+ State = #lim{queues = Queues}) ->
+ {noreply, State#lim{queues = dict:erase(QPid, Queues)}}.
+
+terminate(_, _) ->
+ ok.
+
+code_change(_, State, _) ->
+ State.
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing
+%%----------------------------------------------------------------------------
+
+maybe_notify(OldState, NewState) ->
+ case limit_reached(OldState) andalso not(limit_reached(NewState)) of
+ true -> forget_queues(NewState);
+ false -> NewState
+ end.
+
+limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) ->
+ Limit =/= 0 andalso InUse >= Limit.
+
+remember_queue(QPid, State = #lim{queues = Queues}) ->
+ case dict:is_key(QPid, Queues) of
+ false -> MonitorRef = erlang:monitor(process, QPid),
+ State#lim{queues = dict:store(QPid, MonitorRef, Queues)};
+ true -> State
+ end.
+
+forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ QList = dict:to_list(Queues),
+ case length(QList) of
+ 0 -> ok;
+ L ->
+ %% We randomly vary the position in which each queue
+ %% appears in the list, thus ensuring that each queue has
+ %% an equal chance of being notified first.
+ {L1, L2} = lists:split(random:uniform(L), QList),
+ [begin
+ true = erlang:demonitor(Ref),
+ ok = rabbit_amqqueue:unblock(Q, ChPid)
+ end || {Q, Ref} <- L2 ++ L1]
+ end,
+ State#lim{queues = dict:new()}.