summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-26 15:20:28 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-26 15:20:28 +0000
commit2bfef9281ca14bcd6a3feacc560813fefb9b9d0b (patch)
tree69021fdf3c5c4f2fa8052933b99159ee06167abc
parent7045b90b2e9a2be678b6781db6815f20af120e87 (diff)
parent810b5f620066304fe85cb3029e7544f984ffe806 (diff)
downloadrabbitmq-server-2bfef9281ca14bcd6a3feacc560813fefb9b9d0b.tar.gz
merge default
-rw-r--r--packaging/debs/apt-repository/distributions2
-rw-r--r--src/file_handle_cache.erl4
-rw-r--r--src/gen_server2.erl38
-rw-r--r--src/gm.erl8
-rw-r--r--src/priority_queue.erl70
-rw-r--r--src/rabbit_amqqueue.erl42
-rw-r--r--src/rabbit_amqqueue_process.erl218
-rw-r--r--src/rabbit_channel.erl161
-rw-r--r--src/rabbit_channel_sup.erl4
-rw-r--r--src/rabbit_exchange.erl36
-rw-r--r--src/rabbit_exchange_decorator.erl12
-rw-r--r--src/rabbit_limiter.erl409
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_msg_store_gc.erl6
-rw-r--r--src/rabbit_reader.erl23
-rw-r--r--src/rabbit_registry.erl39
-rw-r--r--src/rabbit_tests.erl31
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/worker_pool_worker.erl6
20 files changed, 733 insertions, 398 deletions
diff --git a/packaging/debs/apt-repository/distributions b/packaging/debs/apt-repository/distributions
index 61fd778a..75b9fe46 100644
--- a/packaging/debs/apt-repository/distributions
+++ b/packaging/debs/apt-repository/distributions
@@ -2,6 +2,6 @@ Origin: RabbitMQ
Label: RabbitMQ Repository for Debian / Ubuntu etc
Suite: testing
Codename: kitten
-Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc
+Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc source
Components: main
Description: RabbitMQ Repository for Debian / Ubuntu etc
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d2d4d295..406add8a 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -152,7 +152,7 @@
-export([ulimit/0]).
-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3, prioritise_cast/2]).
+ handle_info/2, terminate/2, code_change/3, prioritise_cast/3]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -848,7 +848,7 @@ init([AlarmSet, AlarmClear]) ->
alarm_set = AlarmSet,
alarm_clear = AlarmClear }}.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{release, _, _} -> 5;
_ -> 0
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index c82327a2..9109febd 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -16,12 +16,15 @@
%% The original code could reorder messages when communicating with a
%% process on a remote node that was not currently connected.
%%
-%% 4) The callback module can optionally implement prioritise_call/3,
-%% prioritise_cast/2 and prioritise_info/2. These functions take
-%% Message, From and State or just Message and State and return a
-%% single integer representing the priority attached to the message.
-%% Messages with higher priorities are processed before requests with
-%% lower priorities. The default priority is 0.
+%% 4) The callback module can optionally implement prioritise_call/4,
+%% prioritise_cast/3 and prioritise_info/3. These functions take
+%% Message, From, Length and State or just Message, Length and State
+%% (where Length is the current number of messages waiting to be
+%% processed) and return a single integer representing the priority
+%% attached to the message, or 'drop' to ignore it (for
+%% prioritise_cast/3 and prioritise_info/3 only). Messages with
+%% higher priorities are processed before requests with lower
+%% priorities. The default priority is 0.
%%
%% 5) The callback module can optionally implement
%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
@@ -649,6 +652,9 @@ in({system, _From, _Req} = Input, GS2State) ->
in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) ->
in(Input, F(Input, GS2State), GS2State).
+in(_Input, drop, GS2State) ->
+ GS2State;
+
in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
@@ -1148,27 +1154,33 @@ whereis_name(Name) ->
end.
find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
- PCall = function_exported_or_default(Mod, 'prioritise_call', 3,
+ PCall = function_exported_or_default(Mod, 'prioritise_call', 4,
fun (_Msg, _From, _State) -> 0 end),
- PCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
+ PCast = function_exported_or_default(Mod, 'prioritise_cast', 3,
fun (_Msg, _State) -> 0 end),
- PInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
+ PInfo = function_exported_or_default(Mod, 'prioritise_info', 3,
fun (_Msg, _State) -> 0 end),
GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }.
function_exported_or_default(Mod, Fun, Arity, Default) ->
case erlang:function_exported(Mod, Fun, Arity) of
true -> case Arity of
- 2 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
- case catch Mod:Fun(Msg, State) of
+ 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue,
+ state = State }) ->
+ Length = priority_queue:len(Queue),
+ case catch Mod:Fun(Msg, Length, State) of
+ drop ->
+ drop;
Res when is_integer(Res) ->
Res;
Err ->
handle_common_termination(Err, Msg, GS2State)
end
end;
- 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
- case catch Mod:Fun(Msg, From, State) of
+ 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue,
+ state = State }) ->
+ Length = priority_queue:len(Queue),
+ case catch Mod:Fun(Msg, From, Length, State) of
Res when is_integer(Res) ->
Res;
Err ->
diff --git a/src/gm.erl b/src/gm.erl
index 76b535e6..3f0909e8 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -380,7 +380,7 @@
confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_info/2]).
+ code_change/3, prioritise_info/3]).
-ifndef(use_specs).
-export([behaviour_info/1]).
@@ -721,12 +721,12 @@ terminate(Reason, State = #state { module = Module,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prioritise_info(flush, _State) ->
+prioritise_info(flush, _Len, _State) ->
1;
-prioritise_info({'DOWN', _MRef, process, _Pid, _Reason},
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len,
#state { members_state = MS }) when MS /= undefined ->
1;
-prioritise_info(_, _State) ->
+prioritise_info(_, _Len, _State) ->
0.
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 02a0a1df..0dc19819 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -69,9 +69,9 @@
%%----------------------------------------------------------------------------
new() ->
- {queue, [], []}.
+ {queue, [], [], 0}.
-is_queue({queue, R, F}) when is_list(R), is_list(F) ->
+is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) ->
true;
is_queue({pqueue, Queues}) when is_list(Queues) ->
lists:all(fun ({infinity, Q}) -> is_queue(Q);
@@ -80,17 +80,17 @@ is_queue({pqueue, Queues}) when is_list(Queues) ->
is_queue(_) ->
false.
-is_empty({queue, [], []}) ->
+is_empty({queue, [], [], 0}) ->
true;
is_empty(_) ->
false.
-len({queue, R, F}) when is_list(R), is_list(F) ->
- length(R) + length(F);
+len({queue, _R, _F, L}) ->
+ L;
len({pqueue, Queues}) ->
lists:sum([len(Q) || {_, Q} <- Queues]).
-to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
+to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) ->
[{0, V} || V <- Out ++ lists:reverse(In, [])];
to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
@@ -99,13 +99,13 @@ to_list({pqueue, Queues}) ->
in(Item, Q) ->
in(Item, 0, Q).
-in(X, 0, {queue, [_] = In, []}) ->
- {queue, [X], In};
-in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
- {queue, [X|In], Out};
-in(X, Priority, _Q = {queue, [], []}) ->
+in(X, 0, {queue, [_] = In, [], 1}) ->
+ {queue, [X], In, 2};
+in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
+ {queue, [X|In], Out, Len + 1};
+in(X, Priority, _Q = {queue, [], [], 0}) ->
in(X, Priority, {pqueue, []});
-in(X, Priority, Q = {queue, _, _}) ->
+in(X, Priority, Q = {queue, _, _, _}) ->
in(X, Priority, {pqueue, [{0, Q}]});
in(X, Priority, {pqueue, Queues}) ->
P = maybe_negate_priority(Priority),
@@ -113,33 +113,33 @@ in(X, Priority, {pqueue, Queues}) ->
{value, {_, Q}} ->
lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
false when P == infinity ->
- [{P, {queue, [X], []}} | Queues];
+ [{P, {queue, [X], [], 1}} | Queues];
false ->
case Queues of
[{infinity, InfQueue} | Queues1] ->
[{infinity, InfQueue} |
- lists:keysort(1, [{P, {queue, [X], []}} | Queues1])];
+ lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])];
_ ->
- lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues])
end
end}.
-out({queue, [], []} = Q) ->
+out({queue, [], [], 0} = Q) ->
{empty, Q};
-out({queue, [V], []}) ->
- {{value, V}, {queue, [], []}};
-out({queue, [Y|In], []}) ->
+out({queue, [V], [], 1}) ->
+ {{value, V}, {queue, [], [], 0}};
+out({queue, [Y|In], [], Len}) ->
[V|Out] = lists:reverse(In, []),
- {{value, V}, {queue, [Y], Out}};
-out({queue, In, [V]}) when is_list(In) ->
- {{value,V}, r2f(In)};
-out({queue, In,[V|Out]}) when is_list(In) ->
- {{value, V}, {queue, In, Out}};
+ {{value, V}, {queue, [Y], Out}, Len - 1};
+out({queue, In, [V], Len}) when is_list(In) ->
+ {{value,V}, r2f(In, Len - 1)};
+out({queue, In,[V|Out], Len}) when is_list(In) ->
+ {{value, V}, {queue, In, Out, Len - 1}};
out({pqueue, [{P, Q} | Queues]}) ->
{R, Q1} = out(Q),
NewQ = case is_empty(Q1) of
true -> case Queues of
- [] -> {queue, [], []};
+ [] -> {queue, [], [], 0};
[{0, OnlyQ}] -> OnlyQ;
[_|_] -> {pqueue, Queues}
end;
@@ -147,13 +147,13 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
-join(A, {queue, [], []}) ->
+join(A, {queue, [], [], 0}) ->
A;
-join({queue, [], []}, B) ->
+join({queue, [], [], 0}, B) ->
B;
-join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
- {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
-join(A = {queue, _, _}, {pqueue, BPQ}) ->
+join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen};
+join(A = {queue, _, _, _}, {pqueue, BPQ}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
Post1 = case Post of
@@ -162,7 +162,7 @@ join(A = {queue, _, _}, {pqueue, BPQ}) ->
_ -> [ {0, A} | Post ]
end,
{pqueue, Pre ++ Post1};
-join({pqueue, APQ}, B = {queue, _, _}) ->
+join({pqueue, APQ}, B = {queue, _, _, _}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
Post1 = case Post of
@@ -185,10 +185,10 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
-r2f([]) -> {queue, [], []};
-r2f([_] = R) -> {queue, [], R};
-r2f([X,Y]) -> {queue, [X], [Y]};
-r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
+r2f([], 0) -> {queue, [], [], 0};
+r2f([_] = R, 1) -> {queue, [], R, 1};
+r2f([X,Y], 2) -> {queue, [X], [Y], 2};
+r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 82ac74fa..8c00c85c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,9 +26,9 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
--export([notify_down_all/2, limit_all/3]).
+-export([basic_get/4, basic_consume/9, basic_cancel/4]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
+-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
@@ -144,19 +144,20 @@
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
--spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
- ok_or_errors()).
--spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
+-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
+-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(),
- rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any())
+-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean()) -> 'ok').
+-spec(basic_consume/9 ::
+ (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
+ rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
--spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(resume/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
@@ -538,16 +539,19 @@ notify_down_all(QPids, ChPid) ->
Bads1 -> {error, Bads1}
end.
-limit_all(QPids, ChPid, Limiter) ->
- delegate:cast(QPids, {limit, ChPid, Limiter}).
+activate_limit_all(QPids, ChPid) ->
+ delegate:cast(QPids, {activate_limit, ChPid}).
-basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate:call(QPid, {basic_get, ChPid, NoAck}).
+credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
+ delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
- ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate:call(QPid, {basic_consume, NoAck, ChPid,
- Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
+basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
+ delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
+
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
+ delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -569,7 +573,7 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}).
+resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 18b641d4..b016c4d2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -29,8 +29,8 @@
-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Queue's state
-record(q, {q,
@@ -66,9 +66,12 @@
monitor_ref,
acktags,
consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
blocked_consumers,
+ %% The limiter itself
limiter,
- is_limit_active,
+ %% Internal flow control for queue -> writer
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -165,6 +168,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
+ rabbit_event:if_enabled(State, #q.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -362,17 +367,17 @@ lookup_ch(ChPid) ->
C -> C
end.
-ch_record(ChPid) ->
+ch_record(ChPid, LimiterPid) ->
Key = {ch, ChPid},
case get(Key) of
undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
- is_limit_active = false,
- limiter = rabbit_limiter:make_token(),
+ limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
C;
@@ -392,37 +397,32 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
ok.
-erase_ch_record(#cr{ch_pid = ChPid,
- limiter = Limiter,
- monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
-update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
- ok = rabbit_limiter:register(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 1});
-update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 0,
- limiter = rabbit_limiter:make_token()});
-update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
- update_ch_record(C#cr{consumer_count = Count + Delta}).
-
all_ch_record() -> [C || {{ch, _}, C} <- get()].
block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
-is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
-ch_record_state_transition(OldCR, NewCR) ->
- case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
- {true, false} -> unblock;
- {false, true} -> block;
- {_, _} -> ok
+maybe_send_drained(WasEmpty, State) ->
+ case (not WasEmpty) andalso is_empty(State) of
+ true -> [send_drained(C) || C <- all_ch_record()];
+ false -> ok
+ end,
+ State.
+
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> ok;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ update_ch_record(C#cr{limiter = Limiter2})
end.
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
@@ -440,18 +440,21 @@ deliver_msgs_to_consumers(DeliverFun, false,
end.
deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
- C = ch_record(ChPid),
+ C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
{false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
- Consumer#consumer.ack_required) of
- false -> block_consumer(C#cr{is_limit_active = true}, E),
- {false, State};
- true -> AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
- State#q{active_consumers = AC1})
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ {false, State};
+ {continue, Limiter} ->
+ AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C#cr{limiter = Limiter},
+ State#q{active_consumers = AC1})
end
end.
@@ -602,14 +605,16 @@ maybe_drop_head(State = #q{max_length = MaxLen,
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ WasEmpty = BQ:is_empty(BQS),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
{_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
- run_message_queue(drop_expired_msgs(State1)).
+ run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}.
+ State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
+ {Result, maybe_send_drained(Result =:= empty, State1)}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -638,20 +643,29 @@ remove_consumers(ChPid, Queue, QName) ->
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
- not_found ->
+ not_found -> State;
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ State;
+ true -> unblock(State, C1)
+ end
+ end.
+
+unblock(State, C = #cr{limiter = Limiter}) ->
+ case lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, queue:to_list(C#cr.blocked_consumers)) of
+ {_, []} ->
+ update_ch_record(C),
State;
- C ->
- C1 = Update(C),
- case ch_record_state_transition(C, C1) of
- ok -> update_ch_record(C1),
- State;
- unblock -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
- run_message_queue(State#q{active_consumers = AC1})
- end
+ {Blocked, Unblocked} ->
+ BlockedQ = queue:from_list(Blocked),
+ UnblockedQ = queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ}),
+ AC1 = queue:join(State#q.active_consumers, UnblockedQ),
+ run_message_queue(State#q{active_consumers = AC1})
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -748,6 +762,11 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
+%% Logically this function should invoke maybe_send_drained/2.
+%% However, that is expensive. Since some frequent callers of
+%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot
+%% possibly cause the queue to become empty, we push the
+%% responsibility to the callers. So be cautious when adding new ones.
drop_expired_msgs(State) ->
case is_empty(State) of
true -> State;
@@ -1025,7 +1044,7 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
@@ -1034,7 +1053,7 @@ prioritise_call(Msg, _From, _State) ->
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
@@ -1043,7 +1062,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
update_ram_duration -> 8;
@@ -1108,7 +1127,7 @@ handle_call({notify_down, ChPid}, From, State) ->
{stop, State1} -> stop(From, ok, State1)
end;
-handle_call({basic_get, ChPid, NoAck}, _From,
+handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
@@ -1118,7 +1137,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{{Message, IsDelivered, AckTag}, State2} ->
State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ true -> C = #cr{acktags = ChAckTags} =
+ ch_record(ChPid, LimiterPid),
ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
@@ -1128,15 +1148,30 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, BQ:len(BQS), Msg}, State3)
end;
-handle_call({basic_consume, NoAck, ChPid, Limiter,
- ConsumerTag, ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = ch_record(ChPid),
- update_consumer_count(C#cr{limiter = Limiter}, +1),
+ C = #cr{consumer_count = Count,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ Limiter2 = case CreditArgs of
+ none -> Limiter1;
+ {Crd, Drain} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag, Crd, Drain)
+ end,
+ C1 = update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter2}),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1157,10 +1192,19 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
case lookup_ch(ChPid) of
not_found ->
reply(ok, State);
- C = #cr{blocked_consumers = Blocked} ->
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter);
+ _ -> Limiter
+ end,
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter2,
+ blocked_consumers = Blocked1}),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, ConsumerTag} -> none;
@@ -1193,7 +1237,8 @@ handle_call({delete, IfUnused, IfEmpty}, From,
handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ State1 = State#q{backing_queue_state = BQS1},
+ reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1288,10 +1333,12 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
handle_cast(delete_immediately, State) ->
stop(State);
-handle_cast({unblock, ChPid}, State) ->
+handle_cast({resume, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
- fun (C) -> C#cr{is_limit_active = false} end));
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end));
handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
@@ -1300,21 +1347,12 @@ handle_cast({notify_sent, ChPid, Credit}, State) ->
C#cr{unsent_message_count = Count - Credit}
end));
-handle_cast({limit, ChPid, Limiter}, State) ->
+handle_cast({activate_limit, ChPid}, State) ->
noreply(
- possibly_unblock(
- State, ChPid,
- fun (C = #cr{consumer_count = ConsumerCount,
- limiter = OldLimiter,
- is_limit_active = OldLimited}) ->
- case (ConsumerCount =/= 0 andalso
- not rabbit_limiter:is_enabled(OldLimiter)) of
- true -> ok = rabbit_limiter:register(Limiter, self());
- false -> ok
- end,
- Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter),
- C#cr{limiter = Limiter, is_limit_active = Limited}
- end));
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end));
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
@@ -1346,6 +1384,24 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
noreply(State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
+handle_cast({credit, ChPid, CTag, Credit, Drain},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ Len = BQ:len(BQS),
+ rabbit_channel:send_credit_reply(ChPid, Len),
+ C = #cr{limiter = Limiter} = lookup_ch(ChPid),
+ C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)},
+ noreply(case Drain andalso Len == 0 of
+ true -> update_ch_record(C1),
+ send_drained(C1),
+ State;
+ false -> case is_ch_blocked(C1) of
+ true -> update_ch_record(C1),
+ State;
+ false -> unblock(State, C1)
+ end
+ end);
+
handle_cast(wake_up, State) ->
noreply(State).
@@ -1365,7 +1421,9 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
+ WasEmpty = is_empty(State),
+ State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
+ noreply(maybe_send_drained(WasEmpty, State1));
handle_info(emit_stats, State) ->
emit_stats(State),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 792a06c9..52c6140e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,14 +21,15 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
+ flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Internal
-export([list_local/0]).
@@ -81,8 +82,8 @@
-spec(start_link/11 ::
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(),
- pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()).
+ rabbit_framing:amqp_table(), pid(), pid()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -94,6 +95,9 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
+-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
+ -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
@@ -138,6 +142,12 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+send_credit_reply(Pid, Len) ->
+ gen_server2:cast(Pid, {send_credit_reply, Len}).
+
+send_drained(Pid, CTagCredit) ->
+ gen_server2:cast(Pid, {send_drained, CTagCredit}).
+
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
@@ -180,7 +190,7 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
- Capabilities, CollectorPid, Limiter]) ->
+ Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
@@ -190,7 +200,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
- limiter = Limiter,
+ limiter = rabbit_limiter:new(LimiterPid),
tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -217,20 +227,20 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
{ok, State1, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
emit_stats -> 7;
_ -> 0
@@ -315,6 +325,18 @@ handle_cast({deliver, ConsumerTag, AckRequired,
Content),
noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
+handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_ok'{available = Len}),
+ noreply(State);
+
+handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
+ [ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
+ credit_drained = CreditDrained})
+ || {ConsumerTag, CreditDrained} <- CTagCredit],
+ noreply(State);
+
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
@@ -372,6 +394,8 @@ terminate(Reason, State) ->
_ -> ok
end,
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:if_enabled(State, #ch.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
@@ -676,12 +700,15 @@ handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
+ limiter = Limiter,
next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(
+ Q, self(), NoAck, rabbit_limiter:pid(Limiter))
+ end) of
{ok, MessageCount,
Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -706,7 +733,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_local = _, % FIXME: implement
no_ack = NoAck,
exclusive = ExclusiveConsume,
- nowait = NoWait},
+ nowait = NoWait,
+ arguments = Arguments},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -728,8 +756,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), Limiter,
+ Q, NoAck, self(),
+ rabbit_limiter:pid(Limiter),
+ rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
+ parse_credit_args(Arguments),
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -803,19 +834,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
+handle_method(#'basic.qos'{prefetch_count = 0}, _,
State = #ch{limiter = Limiter}) ->
- Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of
- {false, 0} -> Limiter;
- {false, _} -> enable_limiter(State);
- {_, _} -> Limiter
- end,
- Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of
- ok -> Limiter1;
- {disabled, Limiter2} -> ok = limit_queues(Limiter2, State),
- Limiter2
- end,
- {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}};
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
+ State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
+ Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
+ PrefetchCount, queue:len(UAMQ)),
+ {reply, #'basic.qos_ok'{},
+ maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
@@ -1078,25 +1107,45 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter = Limiter}) ->
- Limiter2 = case rabbit_limiter:unblock(Limiter) of
- ok -> Limiter;
- {disabled, Limiter1} -> ok = limit_queues(Limiter1, State),
- Limiter1
- end,
- {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}};
+ Limiter1 = rabbit_limiter:unblock(Limiter),
+ {reply, #'channel.flow_ok'{active = true},
+ maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'channel.flow'{active = false}, _,
State = #ch{consumer_mapping = Consumers,
limiter = Limiter}) ->
- Limiter1 = case rabbit_limiter:is_enabled(Limiter) of
- true -> Limiter;
- false -> enable_limiter(State)
- end,
- State1 = State#ch{limiter = Limiter1},
- ok = rabbit_limiter:block(Limiter1),
- QPids = consumer_queues(Consumers),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})};
+ case rabbit_limiter:is_blocked(Limiter) of
+ true -> {noreply, maybe_send_flow_ok(State)};
+ false -> Limiter1 = rabbit_limiter:block(Limiter),
+ State1 = maybe_limit_queues(Limiter, Limiter1,
+ State#ch{limiter = Limiter1}),
+ %% The semantics of channel.flow{active=false}
+ %% require that no messages are delivered after the
+ %% channel.flow_ok has been sent. We accomplish that
+ %% by "flushing" all messages in flight from the
+ %% consumer queues to us. To do this we tell all the
+ %% queues to invoke rabbit_channel:flushed/2, which
+ %% will send us a {flushed, ...} message that appears
+ %% *after* all the {deliver, ...} messages. We keep
+ %% track of all the QPids thus asked, and once all of
+ %% them have responded (or died) we send the
+ %% channel.flow_ok.
+ QPids = consumer_queues(Consumers),
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, maybe_send_flow_ok(
+ State1#ch{blocking = sets:from_list(QPids)})}
+ end;
+
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ drain = Drain}, _,
+ State = #ch{consumer_mapping = Consumers}) ->
+ case dict:find(CTag, Consumers) of
+ {ok, Q} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain),
+ {noreply, State};
+ error -> precondition_failed("unknown consumer tag '~s'", [CTag])
+ end;
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -1164,6 +1213,16 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+parse_credit_args(Arguments) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
+ _ -> none
+ end;
+ undefined -> none
+ end.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1332,14 +1391,14 @@ foreach_per_queue(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-enable_limiter(State = #ch{unacked_message_q = UAMQ,
- limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)),
- ok = limit_queues(Limiter1, State),
- Limiter1.
-
-limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter).
+maybe_limit_queues(OldLimiter, NewLimiter, State) ->
+ case ((not rabbit_limiter:is_active(OldLimiter)) andalso
+ rabbit_limiter:is_active(NewLimiter)) of
+ true -> Queues = consumer_queues(State#ch.consumer_mapping),
+ rabbit_amqqueue:activate_limit_all(Queues, self());
+ false -> ok
+ end,
+ State.
consumer_queues(Consumers) ->
lists:usort([QPid ||
@@ -1350,7 +1409,9 @@ consumer_queues(Consumers) ->
%% messages sent in a response to a basic.get (identified by their
%% 'none' consumer tag)
notify_limiter(Limiter, Acked) ->
- case rabbit_limiter:is_enabled(Limiter) of
+ %% optimisation: avoid the potentially expensive 'foldl' in the
+ %% common case.
+ case rabbit_limiter:is_prefetch_limited(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
@@ -1515,7 +1576,7 @@ i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
- rabbit_limiter:get_limit(Limiter);
+ rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 8ea44a81..a0c7624b 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -58,7 +58,7 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
Protocol, User, VHost, Capabilities, Collector,
- rabbit_limiter:make_token(LimiterPid)]},
+ LimiterPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
@@ -72,7 +72,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
{channel, {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
ConnName, Protocol, User, VHost, Capabilities, Collector,
- rabbit_limiter:make_token(LimiterPid)]},
+ LimiterPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 5f4fb9ec..9e98448d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -326,34 +326,34 @@ route(#exchange{name = #resource{virtual_host = VHost,
%% Optimisation
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
{Decorators, _} ->
- QNames = route1(Delivery, {[X], XName, []}),
- lists:usort(decorate_route(Decorators, X, Delivery, QNames))
+ lists:usort(route1(Delivery, Decorators, {[X], XName, []}))
end.
-decorate_route([], _X, _Delivery, QNames) ->
+route1(_, _, {[], _, QNames}) ->
QNames;
-decorate_route(Decorators, X, Delivery, QNames) ->
- QNames ++
- lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
-
-route1(_, {[], _, QNames}) ->
- QNames;
-route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
- DstNames = process_alternate(
- X, ((type_to_module(Type)):route(X, Delivery))),
- route1(Delivery,
+route1(Delivery, Decorators,
+ {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
+ ExchangeDests = (type_to_module(Type)):route(X, Delivery),
+ DecorateDests = process_decorators(X, Decorators, Delivery),
+ AlternateDests = process_alternate(X, ExchangeDests),
+ route1(Delivery, Decorators,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
- DstNames)).
+ AlternateDests ++ DecorateDests ++ ExchangeDests)).
-process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
- Results;
+process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation
+ [];
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
AName -> [AName]
end;
-process_alternate(_X, Results) ->
- Results.
+process_alternate(_X, _Results) ->
+ [].
+
+process_decorators(_, [], _) -> %% optimisation
+ [];
+process_decorators(X, Decorators, Delivery) ->
+ lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
process_route(#resource{kind = exchange} = XName,
{_WorkList, XName, _QNames} = Acc) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 8f17adfc..040b55db 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -57,12 +57,10 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% called after exchange routing
-%% return value is a list of queues to be added to the list of
-%% destination queues. decorators must register separately for
-%% this callback using exchange_decorator_route.
--callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
- [rabbit_amqqueue:name()].
+%% Decorators can optionally implement route/2 which allows additional
+%% destinations to be added to the routing decision.
+%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+%% [rabbit_amqqueue:name() | rabbit_exchange:name()].
-else.
@@ -70,7 +68,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, {route, 2}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 8a7d14fe..d9f1170e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -14,42 +14,165 @@
%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
+%% The purpose of the limiter is to stem the flow of messages from
+%% queues to channels, in order to act upon various protocol-level
+%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
+%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer)
+%% credit mechanism.
+%%
+%% Each channel has an associated limiter process, created with
+%% start_link/1, which it passes to queues on consumer creation with
+%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4.
+%% The latter isn't strictly necessary, since basic.get is not
+%% subject to limiting, but it means that whenever a queue knows about
+%% a channel, it also knows about its limiter, which is less fiddly.
+%%
+%% The limiter process holds state that is, in effect, shared between
+%% the channel and all queues from which the channel is
+%% consuming. Essentially all these queues are competing for access to
+%% a single, limited resource - the ability to deliver messages via
+%% the channel - and it is the job of the limiter process to mediate
+%% that access.
+%%
+%% The limiter process is separate from the channel process for two
+%% reasons: separation of concerns, and efficiency. Channels can get
+%% very busy, particularly if they are also dealing with publishes.
+%% With a separate limiter process all the aforementioned access
+%% mediation can take place without touching the channel.
+%%
+%% For efficiency, both the channel and the queues keep some local
+%% state, initialised from the limiter pid with new/1 and client/1,
+%% respectively. In particular this allows them to avoid any
+%% interaction with the limiter process when it is 'inactive', i.e. no
+%% protocol-level flow control is taking place.
+%%
+%% This optimisation does come at the cost of some complexity though:
+%% when a limiter becomes active, the channel needs to inform all its
+%% consumer queues of this change in status. It does this by invoking
+%% rabbit_amqqueue:activate_limit_all/2. Note that there is no inverse
+%% transition, i.e. once a queue has been told about an active
+%% limiter, it is not subsequently told when that limiter becomes
+%% inactive. In practice it is rare for that to happen, though we
+%% could optimise this case in the future.
+%%
+%% In addition, the consumer credit bookkeeping is local to queues, so
+%% it is not necessary to store information about it in the limiter
+%% process. But for abstraction we hide it from the queue behind the
+%% limiter API, and it therefore becomes part of the queue local
+%% state.
+%%
+%% The interactions with the limiter are as follows:
+%%
+%% 1. Channels tell the limiter about basic.qos prefetch counts -
+%% that's what the limit_prefetch/3, unlimit_prefetch/1,
+%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
+%% about - and channel.flow blocking - that's what block/1,
+%% unblock/1 and is_blocked/1 are for. They also tell the limiter
+%% queue state (via the queue) about consumer credit changes -
+%% that's what credit/4 is for.
+%%
+%% 2. Queues also tell the limiter queue state about the queue
+%% becoming empty (via drained/1) and consumers leaving (via
+%% forget_consumer/2).
+%%
+%% 3. Queues register with the limiter - this happens as part of
+%% activate/1.
+%%
+%% 4. The limiter process maintains an internal counter of 'messages
+%% sent but not yet acknowledged', called the 'volume'.
+%%
+%% 5. Queues ask the limiter for permission (with can_send/3) whenever
+%% they want to deliver a message to a channel. The limiter checks
+%% whether a) the channel isn't blocked by channel.flow, b) the
+%% volume has not yet reached the prefetch limit, and c) whether
+%% the consumer has enough credit. If so it increments the volume
+%% and tells the queue to proceed. Otherwise it marks the queue as
+%% requiring notification (see below) and tells the queue not to
+%% proceed.
+%%
+%% 6. A queue that has been told to proceed (by the return value of
+%% can_send/3) sends the message to the channel. Conversely, a
+%% queue that has been told not to proceed, will not attempt to
+%% deliver that message, or any future messages, to the
+%% channel. This is accomplished by can_send/3 capturing the
+%% outcome in the local state, where it can be accessed with
+%% is_suspended/1.
+%%
+%% 7. When a channel receives an ack it tells the limiter (via ack/2)
+%% how many messages were ack'ed. The limiter process decrements
+%% the volume and if it falls below the prefetch_count then it
+%% notifies (through rabbit_amqqueue:resume/2) all the queues
+%% requiring notification, i.e. all those that had a can_send/3
+%% request denied.
+%%
+%% 8. Upon receipt of such a notification, queues resume delivery to
+%% the channel, i.e. they will once again start asking limiter, as
+%% described in (5).
+%%
+%% 9. When a queue has no more consumers associated with a particular
+%% channel, it deactivates use of the limiter with deactivate/1,
+%% which alters the local state such that no further interactions
+%% with the limiter process take place until a subsequent
+%% activate/1.
+
-module(rabbit_limiter).
-behaviour(gen_server2).
+-export([start_link/0]).
+%% channel API
+-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
+ is_prefetch_limited/1, is_blocked/1, is_active/1,
+ get_prefetch_limit/1, ack/2, pid/1]).
+%% queue API
+-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
+ is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
+ forget_consumer/2]).
+%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, prioritise_call/3]).
--export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
- disable/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1, is_blocked/1]).
+ handle_info/2, prioritise_call/4]).
%%----------------------------------------------------------------------------
--record(token, {pid, enabled}).
+-record(lstate, {pid, prefetch_limited, blocked}).
+-record(qstate, {pid, state, credits}).
-ifdef(use_specs).
--export_type([token/0]).
-
--opaque(token() :: #token{}).
+-type(lstate() :: #lstate{pid :: pid(),
+ prefetch_limited :: boolean(),
+ blocked :: boolean()}).
+-type(qstate() :: #qstate{pid :: pid(),
+ state :: 'dormant' | 'active' | 'suspended'}).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(make_token/0 :: () -> token()).
--spec(make_token/1 :: ('undefined' | pid()) -> token()).
--spec(is_enabled/1 :: (token()) -> boolean()).
--spec(enable/2 :: (token(), non_neg_integer()) -> token()).
--spec(disable/1 :: (token()) -> token()).
--spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
--spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
--spec(register/2 :: (token(), pid()) -> 'ok').
--spec(unregister/2 :: (token(), pid()) -> 'ok').
--spec(get_limit/1 :: (token()) -> non_neg_integer()).
--spec(block/1 :: (token()) -> 'ok').
--spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
--spec(is_blocked/1 :: (token()) -> boolean()).
+-spec(new/1 :: (pid()) -> lstate()).
+
+-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
+ -> lstate()).
+-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
+-spec(block/1 :: (lstate()) -> lstate()).
+-spec(unblock/1 :: (lstate()) -> lstate()).
+-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
+-spec(is_blocked/1 :: (lstate()) -> boolean()).
+-spec(is_active/1 :: (lstate()) -> boolean()).
+-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()).
+-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
+-spec(pid/1 :: (lstate()) -> pid()).
+
+-spec(client/1 :: (pid()) -> qstate()).
+-spec(activate/1 :: (qstate()) -> qstate()).
+-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) ->
+ {'continue' | 'suspend', qstate()}).
+-spec(resume/1 :: (qstate()) -> qstate()).
+-spec(deactivate/1 :: (qstate()) -> qstate()).
+-spec(is_suspended/1 :: (qstate()) -> boolean()).
+-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
+-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
+ -> qstate()).
+-spec(drained/1 :: (qstate())
+ -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
+-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
-endif.
@@ -64,120 +187,181 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
+-record(credit, {credit = 0, drain = false}).
+
%%----------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------
start_link() -> gen_server2:start_link(?MODULE, [], []).
-make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid, enabled = false}.
+new(Pid) ->
+ %% this a 'call' to ensure that it is invoked at most once.
+ ok = gen_server:call(Pid, {new, self()}),
+ #lstate{pid = Pid, prefetch_limited = false, blocked = false}.
-is_enabled(#token{enabled = Enabled}) -> Enabled.
+limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
+ ok = gen_server:call(L#lstate.pid,
+ {limit_prefetch, PrefetchCount, UnackedCount}),
+ L#lstate{prefetch_limited = true}.
-enable(#token{pid = Pid} = Token, Volume) ->
- gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity).
+unlimit_prefetch(L) ->
+ ok = gen_server:call(L#lstate.pid, unlimit_prefetch),
+ L#lstate{prefetch_limited = false}.
-disable(#token{pid = Pid} = Token) ->
- gen_server2:call(Pid, {disable, Token}, infinity).
+block(L) ->
+ ok = gen_server:call(L#lstate.pid, block),
+ L#lstate{blocked = true}.
-limit(Limiter, PrefetchCount) ->
- maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok).
+unblock(L) ->
+ ok = gen_server:call(L#lstate.pid, unblock),
+ L#lstate{blocked = false}.
-%% Ask the limiter whether the queue can deliver a message without
-%% breaching a limit. Note that we don't use maybe_call here in order
-%% to avoid always going through with_exit_handler/2, even when the
-%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
- rabbit_misc:with_exit_handler(
- fun () -> true end,
- fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
- end);
-can_send(_, _, _) ->
- true.
+is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
+
+is_blocked(#lstate{blocked = Blocked}) -> Blocked.
+
+is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
+
+get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
+get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit).
-%% Let the limiter know that the channel has received some acks from a
-%% consumer
-ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
+ack(#lstate{prefetch_limited = false}, _AckCount) -> ok;
+ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
-register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}).
+pid(#lstate{pid = Pid}) -> Pid.
-unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}).
+client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}.
-get_limit(Limiter) ->
+activate(L = #qstate{state = dormant}) ->
+ ok = gen_server:cast(L#qstate.pid, {register, self()}),
+ L#qstate{state = active};
+activate(L) -> L.
+
+can_send(L = #qstate{pid = Pid, state = State, credits = Credits},
+ AckRequired, CTag) ->
+ case is_consumer_blocked(L, CTag) of
+ false -> case (State =/= active orelse
+ safe_call(Pid, {can_send, self(), AckRequired}, true)) of
+ true -> {continue, L#qstate{
+ credits = record_send_q(CTag, Credits)}};
+ false -> {suspend, L#qstate{state = suspended}}
+ end;
+ true -> {suspend, L}
+ end.
+
+safe_call(Pid, Msg, ExitValue) ->
rabbit_misc:with_exit_handler(
- fun () -> 0 end,
- fun () -> maybe_call(Limiter, get_limit, 0) end).
+ fun () -> ExitValue end,
+ fun () -> gen_server2:call(Pid, Msg, infinity) end).
+
+resume(L) -> L#qstate{state = active}.
-block(Limiter) ->
- maybe_call(Limiter, block, ok).
+deactivate(L = #qstate{state = dormant}) -> L;
+deactivate(L) ->
+ ok = gen_server:cast(L#qstate.pid, {unregister, self()}),
+ L#qstate{state = dormant}.
+
+is_suspended(#qstate{state = suspended}) -> true;
+is_suspended(#qstate{}) -> false.
+
+is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
+ case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{credit = C}} when C > 0 -> false;
+ {value, #credit{}} -> true;
+ none -> false
+ end.
-unblock(Limiter) ->
- maybe_call(Limiter, {unblock, Limiter}, ok).
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
+ Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
-is_blocked(Limiter) ->
- maybe_call(Limiter, is_blocked, false).
+drained(Limiter = #qstate{credits = Credits}) ->
+ {CTagCredits, Credits2} =
+ rabbit_misc:gb_trees_fold(
+ fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) ->
+ {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)};
+ (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) ->
+ {Acc, Creds0}
+ end, {[], Credits}, Credits),
+ {CTagCredits, Limiter#qstate{credits = Credits2}}.
+
+forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
+ Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}.
+
+%%----------------------------------------------------------------------------
+%% Queue-local code
+%%----------------------------------------------------------------------------
+
+%% We want to do all the AMQP 1.0-ish link level credit calculations
+%% in the queue (to do them elsewhere introduces a ton of
+%% races). However, it's a big chunk of code that is conceptually very
+%% linked to the limiter concept. So we get the queue to hold a bit of
+%% state for us (#qstate.credits), and maintain a fiction that the
+%% limiter is making the decisions...
+
+record_send_q(CTag, Credits) ->
+ case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{credit = Credit, drain = Drain}} ->
+ update_credit(CTag, Credit - 1, Drain, Credits);
+ none ->
+ Credits
+ end.
+
+update_credit(CTag, Credit, Drain, Credits) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ Drain1 = Drain andalso Credit > 0,
+ gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits).
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, #lim{}}.
+init([]) -> {ok, #lim{}}.
+
+prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9;
+prioritise_call(_Msg, _From, _Len, _State) -> 0.
-prioritise_call(get_limit, _From, _State) -> 9;
-prioritise_call(_Msg, _From, _State) -> 0.
+handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) ->
+ {reply, ok, State#lim{ch_pid = ChPid}};
+
+handle_call({limit_prefetch, PrefetchCount, UnackedCount}, _From, State) ->
+ %% assertion
+ true = State#lim.prefetch_count == 0 orelse
+ State#lim.volume == UnackedCount,
+ {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount,
+ volume = UnackedCount})};
+
+handle_call(unlimit_prefetch, _From, State) ->
+ {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
+ volume = 0})};
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State) ->
+ {reply, ok, maybe_notify(State, State#lim{blocked = false})};
+
+handle_call(get_prefetch_limit, _From,
+ State = #lim{prefetch_count = PrefetchCount}) ->
+ {reply, PrefetchCount, State};
handle_call({can_send, QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
{reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
- case limit_reached(State) of
+ case prefetch_limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
end}}
- end;
-
-handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State};
-
-handle_call({limit, PrefetchCount, Token}, _From, State) ->
- case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
-
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call({unblock, Token}, _From, State) ->
- case maybe_notify(State, State#lim{blocked = false}) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
-
-handle_call(is_blocked, _From, State) ->
- {reply, blocked(State), State};
-
-handle_call({enable, Token, Channel, Volume}, _From, State) ->
- {reply, Token#token{enabled = true},
- State#lim{ch_pid = Channel, volume = Volume}};
-handle_call({disable, Token}, _From, State) ->
- {reply, Token#token{enabled = false}, State}.
+ end.
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
- {noreply, State1};
+ {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -199,27 +383,13 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse blocked(OldState)) andalso
- not (limit_reached(NewState) orelse blocked(NewState)) of
- true -> NewState1 = notify_queues(NewState),
- {case NewState1#lim.prefetch_count of
- 0 -> stop;
- _ -> cont
- end, NewState1};
- false -> {cont, NewState}
+ case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso
+ not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of
+ true -> notify_queues(NewState);
+ false -> NewState
end.
-maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) ->
- gen_server2:call(Pid, Call, infinity);
-maybe_call(_, _Call, Default) ->
- Default.
-
-maybe_cast(#token{pid = Pid, enabled = true}, Cast) ->
- gen_server2:cast(Pid, Cast);
-maybe_cast(_, _Call) ->
- ok.
-
-limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
+prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
blocked(#lim{blocked = Blocked}) -> Blocked.
@@ -231,10 +401,9 @@ remember_queue(QPid, State = #lim{queues = Queues}) ->
true -> State
end.
-forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+forget_queue(QPid, State = #lim{queues = Queues}) ->
case orddict:find(QPid, Queues) of
{ok, {MRef, _}} -> true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
State#lim{queues = orddict:erase(QPid, Queues)};
error -> State
end.
@@ -251,13 +420,13 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
- 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case
+ 1 -> ok = rabbit_amqqueue:resume(hd(QList), ChPid); %% common case
L ->
%% We randomly vary the position of queues in the list,
%% thus ensuring that each queue has an equal chance of
%% being notified first.
{L1, L2} = lists:split(random:uniform(L), QList),
- [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3]
+ [[ok = rabbit_amqqueue:resume(Q, ChPid) || Q <- L3]
|| L3 <- [L2, L1]],
ok
end,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b435e0f3..22edfcb6 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -27,8 +27,8 @@
-export([start_link/1, set_maximum_since_use/2, info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ code_change/3, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -323,14 +323,14 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
{hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{gm_deaths, _Deaths} -> 5;
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
@@ -339,7 +339,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
update_ram_duration -> 8;
sync_timeout -> 6;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 13b40a48..2344b1b2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -29,8 +29,8 @@
-export([transform_dir/3, force_recovery/2]). %% upgrade
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_call/3, prioritise_cast/2,
- prioritise_info/2, format_message_queue/2]).
+ code_change/3, prioritise_call/4, prioritise_cast/3,
+ prioritise_info/3, format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -738,7 +738,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
successfully_recovered_state -> 7;
{new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7;
@@ -746,7 +746,7 @@ prioritise_call(Msg, _From, _State) ->
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
@@ -755,7 +755,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
sync -> 8;
_ -> 0
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 3881de23..0dd7a7cc 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -23,7 +23,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+ terminate/2, code_change/3, prioritise_cast/3]).
-record(state,
{ pending_no_readers,
@@ -79,8 +79,8 @@ init([MsgStoreState]) ->
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
-prioritise_cast(_Msg, _State) -> 0.
+prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index aaaa179a..61fac0e2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -306,8 +306,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
+ maybe_emit_stats(State),
throw(connection_closed_abruptly);
{error, Reason} ->
+ maybe_emit_stats(State),
throw({inet_error, Reason});
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
@@ -338,23 +340,28 @@ handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
%% ordinary error case. However, since this termination is
%% initiated by our parent it is probably more important to exit
%% quickly.
+ maybe_emit_stats(State),
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
+ maybe_emit_stats(State),
throw(E);
handle_other({channel_exit, Channel, Reason}, State) ->
handle_exception(State, Channel, Reason);
handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
handle_dependent_exit(ChPid, Reason, State);
-handle_other(terminate_connection, _State) ->
+handle_other(terminate_connection, State) ->
+ maybe_emit_stats(State),
stop;
handle_other(handshake_timeout, State)
when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
State;
handle_other(handshake_timeout, State) ->
+ maybe_emit_stats(State),
throw({handshake_timeout, State#v1.callback});
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
State;
-handle_other(heartbeat_timeout, #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = S}) ->
+ maybe_emit_stats(State),
throw({heartbeat_timeout, S});
handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
@@ -386,8 +393,9 @@ handle_other(emit_stats, State) ->
handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
control_throttle(State);
-handle_other(Other, _State) ->
+handle_other(Other, State) ->
%% internal error -> something worth dying for
+ maybe_emit_stats(State),
exit({unexpected_message, Other}).
switch_callback(State, Callback, Length) ->
@@ -850,8 +858,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
- rabbit_event:if_enabled(State1, #v1.stats_timer,
- fun() -> emit_stats(State1) end),
+ maybe_emit_stats(State1),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
@@ -1010,6 +1017,10 @@ cert_info(F, #v1{sock = Sock}) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #v1.stats_timer,
+ fun() -> emit_stats(State) end).
+
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 3514e780..acdc2cff 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -84,12 +84,34 @@ internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
internal_register(Class, TypeName, ModuleName)
when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
ok = sanity_check_module(class_module(Class), ModuleName),
- true = ets:insert(?ETS_NAME,
- {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
+ RegArg = {{Class, internal_binary_to_type(TypeName)}, ModuleName},
+ true = ets:insert(?ETS_NAME, RegArg),
+ conditional_register(RegArg),
ok.
internal_unregister(Class, TypeName) ->
- true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}),
+ UnregArg = {Class, internal_binary_to_type(TypeName)},
+ conditional_unregister(UnregArg),
+ true = ets:delete(?ETS_NAME, UnregArg),
+ ok.
+
+%% register exchange decorator route callback only when implemented,
+%% in order to avoid unnecessary decorator calls on the fast
+%% publishing path
+conditional_register({{exchange_decorator, Type}, ModuleName}) ->
+ case erlang:function_exported(ModuleName, route, 2) of
+ true -> true = ets:insert(?ETS_NAME,
+ {{exchange_decorator_route, Type},
+ ModuleName});
+ false -> ok
+ end;
+conditional_register(_) ->
+ ok.
+
+conditional_unregister({exchange_decorator, Type}) ->
+ true = ets:delete(?ETS_NAME, {exchange_decorator_route, Type}),
+ ok;
+conditional_unregister(_) ->
ok.
sanity_check_module(ClassModule, Module) ->
@@ -104,12 +126,11 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(exchange_decorator_route) -> rabbit_exchange_decorator;
-class_module(policy_validator) -> rabbit_policy_validator.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(policy_validator) -> rabbit_policy_validator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1188c554..e7b69879 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1100,20 +1100,14 @@ test_policy_validation() ->
test_server_status() ->
%% create a few things so there is some useful information to list
- Writer = spawn(fun test_writer/0),
- {ok, Ch} = rabbit_channel:start_link(
- 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1,
- user(<<"user">>), <<"/">>, [], self(),
- rabbit_limiter:make_token(self())),
+ {_Writer, Limiter, Ch} = test_channel(),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
-
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, rabbit_limiter:make_token(),
- <<"ctag">>, true, undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -1191,8 +1185,6 @@ find_listener() ->
N =:= node()],
{H, P}.
-test_writer() -> test_writer(none).
-
test_writer(Pid) ->
receive
{'$gen_call', From, flush} -> gen_server:reply(From, ok),
@@ -1202,13 +1194,17 @@ test_writer(Pid) ->
shutdown -> ok
end.
-test_spawn() ->
+test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
+ {ok, Limiter} = rabbit_limiter:start_link(),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
- user(<<"guest">>), <<"/">>, [], Me,
- rabbit_limiter:make_token(self())),
+ user(<<"guest">>), <<"/">>, [], Me, Limiter),
+ {Writer, Limiter, Ch}.
+
+test_spawn() ->
+ {Writer, _Limiter, Ch} = test_channel(),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
@@ -1580,7 +1576,7 @@ control_action(Command, Node, Args, Opts) ->
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
- if CheckVHost -> ok = control_action(Command, []);
+ if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]);
true -> ok
end,
ok = control_action(Command, lists:map(fun atom_to_list/1, Args)),
@@ -2722,12 +2718,13 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
+ {ok, Limiter} = rabbit_limiter:start_link(),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
- rabbit_amqqueue:basic_get(Q1, self(), false),
+ rabbit_amqqueue:basic_get(Q1, self(), false, Limiter),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
@@ -2748,9 +2745,11 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
+ {ok, Limiter} = rabbit_limiter:start_link(),
+
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
- rabbit_amqqueue:basic_get(Q, self(), true),
+ rabbit_amqqueue:basic_get(Q, self(), true, Limiter),
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
%% give the queue a second to receive the close_fds callback msg
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index d0f39221..2858cf58 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -70,6 +70,7 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic}]],
ok
end),
+ rabbit_event:notify(vhost_created, info(VHostPath)),
R.
delete(VHostPath) ->
@@ -87,6 +88,7 @@ delete(VHostPath) ->
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
end)),
+ ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
R.
internal_delete(VHostPath) ->
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 56e4b7b3..22b223d2 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -23,7 +23,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+ terminate/2, code_change/3, prioritise_cast/3]).
%%----------------------------------------------------------------------------
@@ -73,8 +73,8 @@ init([WId]) ->
{ok, WId, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
-prioritise_cast(_Msg, _State) -> 0.
+prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call({submit, Fun}, From, WId) ->
gen_server2:reply(From, run(Fun)),