summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-04-16 17:07:09 +0100
committerTim Watson <tim@rabbitmq.com>2013-04-16 17:07:09 +0100
commit463fbb4b0c80059d7ad7a5908e06689587f38d50 (patch)
treef3687be0e0e9a55944e0c206bef960aff604c8a0
parent0c0b55cbb63bdb3b3f3a6604c8aa5690ef23222c (diff)
parent160573b71c48e05a32965e287fe9b184b5fa8da1 (diff)
downloadrabbitmq-server-463fbb4b0c80059d7ad7a5908e06689587f38d50.tar.gz
merge default
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--src/file_handle_cache.erl4
-rw-r--r--src/gen_server2.erl38
-rw-r--r--src/gm.erl8
-rw-r--r--src/priority_queue.erl70
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_amqqueue_process.erl251
-rw-r--r--src/rabbit_channel.erl59
-rw-r--r--src/rabbit_client_sup.erl13
-rw-r--r--src/rabbit_limiter.erl144
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
-rw-r--r--src/rabbit_msg_store.erl21
-rw-r--r--src/rabbit_msg_store_gc.erl6
-rw-r--r--src/rabbit_node_monitor.erl52
-rw-r--r--src/rabbit_nodes.erl11
-rw-r--r--src/rabbit_ssl.erl29
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/worker_pool_worker.erl6
18 files changed, 468 insertions, 275 deletions
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index d4526d87..3a15c4b6 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -9,7 +9,7 @@ Standards-Version: 3.9.2
Package: rabbitmq-server
Architecture: all
-Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends}
+Depends: erlang-nox (>= 1:12.b.3) | esl-erlang, adduser, logrotate, ${misc:Depends}
Description: AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d2d4d295..406add8a 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -152,7 +152,7 @@
-export([ulimit/0]).
-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3, prioritise_cast/2]).
+ handle_info/2, terminate/2, code_change/3, prioritise_cast/3]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -848,7 +848,7 @@ init([AlarmSet, AlarmClear]) ->
alarm_set = AlarmSet,
alarm_clear = AlarmClear }}.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{release, _, _} -> 5;
_ -> 0
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index c82327a2..9109febd 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -16,12 +16,15 @@
%% The original code could reorder messages when communicating with a
%% process on a remote node that was not currently connected.
%%
-%% 4) The callback module can optionally implement prioritise_call/3,
-%% prioritise_cast/2 and prioritise_info/2. These functions take
-%% Message, From and State or just Message and State and return a
-%% single integer representing the priority attached to the message.
-%% Messages with higher priorities are processed before requests with
-%% lower priorities. The default priority is 0.
+%% 4) The callback module can optionally implement prioritise_call/4,
+%% prioritise_cast/3 and prioritise_info/3. These functions take
+%% Message, From, Length and State or just Message, Length and State
+%% (where Length is the current number of messages waiting to be
+%% processed) and return a single integer representing the priority
+%% attached to the message, or 'drop' to ignore it (for
+%% prioritise_cast/3 and prioritise_info/3 only). Messages with
+%% higher priorities are processed before requests with lower
+%% priorities. The default priority is 0.
%%
%% 5) The callback module can optionally implement
%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
@@ -649,6 +652,9 @@ in({system, _From, _Req} = Input, GS2State) ->
in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) ->
in(Input, F(Input, GS2State), GS2State).
+in(_Input, drop, GS2State) ->
+ GS2State;
+
in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
@@ -1148,27 +1154,33 @@ whereis_name(Name) ->
end.
find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
- PCall = function_exported_or_default(Mod, 'prioritise_call', 3,
+ PCall = function_exported_or_default(Mod, 'prioritise_call', 4,
fun (_Msg, _From, _State) -> 0 end),
- PCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
+ PCast = function_exported_or_default(Mod, 'prioritise_cast', 3,
fun (_Msg, _State) -> 0 end),
- PInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
+ PInfo = function_exported_or_default(Mod, 'prioritise_info', 3,
fun (_Msg, _State) -> 0 end),
GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }.
function_exported_or_default(Mod, Fun, Arity, Default) ->
case erlang:function_exported(Mod, Fun, Arity) of
true -> case Arity of
- 2 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
- case catch Mod:Fun(Msg, State) of
+ 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue,
+ state = State }) ->
+ Length = priority_queue:len(Queue),
+ case catch Mod:Fun(Msg, Length, State) of
+ drop ->
+ drop;
Res when is_integer(Res) ->
Res;
Err ->
handle_common_termination(Err, Msg, GS2State)
end
end;
- 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
- case catch Mod:Fun(Msg, From, State) of
+ 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue,
+ state = State }) ->
+ Length = priority_queue:len(Queue),
+ case catch Mod:Fun(Msg, From, Length, State) of
Res when is_integer(Res) ->
Res;
Err ->
diff --git a/src/gm.erl b/src/gm.erl
index 76b535e6..3f0909e8 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -380,7 +380,7 @@
confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_info/2]).
+ code_change/3, prioritise_info/3]).
-ifndef(use_specs).
-export([behaviour_info/1]).
@@ -721,12 +721,12 @@ terminate(Reason, State = #state { module = Module,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prioritise_info(flush, _State) ->
+prioritise_info(flush, _Len, _State) ->
1;
-prioritise_info({'DOWN', _MRef, process, _Pid, _Reason},
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len,
#state { members_state = MS }) when MS /= undefined ->
1;
-prioritise_info(_, _State) ->
+prioritise_info(_, _Len, _State) ->
0.
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 02a0a1df..0dc19819 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -69,9 +69,9 @@
%%----------------------------------------------------------------------------
new() ->
- {queue, [], []}.
+ {queue, [], [], 0}.
-is_queue({queue, R, F}) when is_list(R), is_list(F) ->
+is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) ->
true;
is_queue({pqueue, Queues}) when is_list(Queues) ->
lists:all(fun ({infinity, Q}) -> is_queue(Q);
@@ -80,17 +80,17 @@ is_queue({pqueue, Queues}) when is_list(Queues) ->
is_queue(_) ->
false.
-is_empty({queue, [], []}) ->
+is_empty({queue, [], [], 0}) ->
true;
is_empty(_) ->
false.
-len({queue, R, F}) when is_list(R), is_list(F) ->
- length(R) + length(F);
+len({queue, _R, _F, L}) ->
+ L;
len({pqueue, Queues}) ->
lists:sum([len(Q) || {_, Q} <- Queues]).
-to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
+to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) ->
[{0, V} || V <- Out ++ lists:reverse(In, [])];
to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
@@ -99,13 +99,13 @@ to_list({pqueue, Queues}) ->
in(Item, Q) ->
in(Item, 0, Q).
-in(X, 0, {queue, [_] = In, []}) ->
- {queue, [X], In};
-in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
- {queue, [X|In], Out};
-in(X, Priority, _Q = {queue, [], []}) ->
+in(X, 0, {queue, [_] = In, [], 1}) ->
+ {queue, [X], In, 2};
+in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
+ {queue, [X|In], Out, Len + 1};
+in(X, Priority, _Q = {queue, [], [], 0}) ->
in(X, Priority, {pqueue, []});
-in(X, Priority, Q = {queue, _, _}) ->
+in(X, Priority, Q = {queue, _, _, _}) ->
in(X, Priority, {pqueue, [{0, Q}]});
in(X, Priority, {pqueue, Queues}) ->
P = maybe_negate_priority(Priority),
@@ -113,33 +113,33 @@ in(X, Priority, {pqueue, Queues}) ->
{value, {_, Q}} ->
lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
false when P == infinity ->
- [{P, {queue, [X], []}} | Queues];
+ [{P, {queue, [X], [], 1}} | Queues];
false ->
case Queues of
[{infinity, InfQueue} | Queues1] ->
[{infinity, InfQueue} |
- lists:keysort(1, [{P, {queue, [X], []}} | Queues1])];
+ lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])];
_ ->
- lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues])
end
end}.
-out({queue, [], []} = Q) ->
+out({queue, [], [], 0} = Q) ->
{empty, Q};
-out({queue, [V], []}) ->
- {{value, V}, {queue, [], []}};
-out({queue, [Y|In], []}) ->
+out({queue, [V], [], 1}) ->
+ {{value, V}, {queue, [], [], 0}};
+out({queue, [Y|In], [], Len}) ->
[V|Out] = lists:reverse(In, []),
- {{value, V}, {queue, [Y], Out}};
-out({queue, In, [V]}) when is_list(In) ->
- {{value,V}, r2f(In)};
-out({queue, In,[V|Out]}) when is_list(In) ->
- {{value, V}, {queue, In, Out}};
+ {{value, V}, {queue, [Y], Out}, Len - 1};
+out({queue, In, [V], Len}) when is_list(In) ->
+ {{value,V}, r2f(In, Len - 1)};
+out({queue, In,[V|Out], Len}) when is_list(In) ->
+ {{value, V}, {queue, In, Out, Len - 1}};
out({pqueue, [{P, Q} | Queues]}) ->
{R, Q1} = out(Q),
NewQ = case is_empty(Q1) of
true -> case Queues of
- [] -> {queue, [], []};
+ [] -> {queue, [], [], 0};
[{0, OnlyQ}] -> OnlyQ;
[_|_] -> {pqueue, Queues}
end;
@@ -147,13 +147,13 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
-join(A, {queue, [], []}) ->
+join(A, {queue, [], [], 0}) ->
A;
-join({queue, [], []}, B) ->
+join({queue, [], [], 0}, B) ->
B;
-join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
- {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
-join(A = {queue, _, _}, {pqueue, BPQ}) ->
+join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen};
+join(A = {queue, _, _, _}, {pqueue, BPQ}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
Post1 = case Post of
@@ -162,7 +162,7 @@ join(A = {queue, _, _}, {pqueue, BPQ}) ->
_ -> [ {0, A} | Post ]
end,
{pqueue, Pre ++ Post1};
-join({pqueue, APQ}, B = {queue, _, _}) ->
+join({pqueue, APQ}, B = {queue, _, _, _}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
Post1 = case Post of
@@ -185,10 +185,10 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
-r2f([]) -> {queue, [], []};
-r2f([_] = R) -> {queue, [], R};
-r2f([X,Y]) -> {queue, [X], [Y]};
-r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
+r2f([], 0) -> {queue, [], [], 0};
+r2f([_] = R, 1) -> {queue, [], R, 1};
+r2f([X,Y], 2) -> {queue, [X], [Y], 2};
+r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3f0a7f9c..8c00c85c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,9 +26,9 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/8, basic_cancel/4]).
+-export([basic_get/4, basic_consume/9, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
--export([notify_down_all/2, activate_limit_all/2]).
+-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
@@ -147,9 +147,11 @@
-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/8 ::
+-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean()) -> 'ok').
+-spec(basic_consume/9 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), any())
+ rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -540,13 +542,16 @@ notify_down_all(QPids, ChPid) ->
activate_limit_all(QPids, ChPid) ->
delegate:cast(QPids, {activate_limit, ChPid}).
+credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
+ delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
+
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, OkMsg) ->
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index efe8efc4..3712a625 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -29,8 +29,8 @@
-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Queue's state
-record(q, {q,
@@ -49,10 +49,6 @@
ttl_timer_ref,
ttl_timer_expiry,
senders,
- publish_seqno,
- unconfirmed,
- delayed_stop,
- queue_monitors,
dlx,
dlx_routing_key,
max_length,
@@ -66,8 +62,12 @@
monitor_ref,
acktags,
consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
blocked_consumers,
+ %% The limiter itself
limiter,
+ %% Internal flow control for queue -> writer
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -147,9 +147,6 @@ init_state(Q) ->
has_had_consumers = false,
active_consumers = queue:new(),
senders = pmon:new(),
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
@@ -406,6 +403,21 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
+maybe_send_drained(WasEmpty, State) ->
+ case (not WasEmpty) andalso is_empty(State) of
+ true -> [send_drained(C) || C <- all_ch_record()];
+ false -> ok
+ end,
+ State.
+
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> ok;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ update_ch_record(C#cr{limiter = Limiter2})
+ end.
+
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
@@ -426,7 +438,8 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
true -> block_consumer(C, E),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
- Consumer#consumer.ack_required) of
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
{false, State};
@@ -585,14 +598,16 @@ maybe_drop_head(State = #q{max_length = MaxLen,
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ WasEmpty = BQ:is_empty(BQS),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
{_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
- run_message_queue(drop_expired_msgs(State1)).
+ run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}.
+ State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
+ {Result, maybe_send_drained(Result =:= empty, State1)}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -621,20 +636,29 @@ remove_consumers(ChPid, Queue, QName) ->
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
- not_found ->
+ not_found -> State;
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ State;
+ true -> unblock(State, C1)
+ end
+ end.
+
+unblock(State, C = #cr{limiter = Limiter}) ->
+ case lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, queue:to_list(C#cr.blocked_consumers)) of
+ {_, []} ->
+ update_ch_record(C),
State;
- C ->
- C1 = Update(C),
- case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
- false -> update_ch_record(C1),
- State;
- true -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
- run_message_queue(State#q{active_consumers = AC1})
- end
+ {Blocked, Unblocked} ->
+ BlockedQ = queue:from_list(Blocked),
+ UnblockedQ = queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ}),
+ AC1 = queue:join(State#q.active_consumers, UnblockedQ),
+ run_message_queue(State#q{active_consumers = AC1})
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -731,6 +755,11 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
+%% Logically this function should invoke maybe_send_drained/2.
+%% However, that is expensive. Since some frequent callers of
+%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot
+%% possibly cause the queue to become empty, we push the
+%% responsibility to the callers. So be cautious when adding new ones.
drop_expired_msgs(State) ->
case is_empty(State) of
true -> State;
@@ -784,80 +813,31 @@ dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
State1.
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
- publish_seqno = SeqNo0,
- unconfirmed = UC0,
- queue_monitors = QMons0,
backing_queue_state = BQS,
backing_queue = BQ}) ->
QName = qname(State),
- {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} =
- Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) ->
- case dead_letter_publish(Msg, Reason,
- X, RK, SeqNo, QName) of
- [] -> {[AckTag | AckImm], SeqNo, UC, QMons};
- QPids -> {AckImm, SeqNo + 1,
- dtree:insert(SeqNo, QPids, AckTag, UC),
- pmon:monitor_all(QPids, QMons)}
- end
- end, {[], SeqNo0, UC0, QMons0}, BQS),
- {_Guids, BQS2} = BQ:ack(AckImm1, BQS1),
- {Res, State#q{publish_seqno = SeqNo1,
- unconfirmed = UC1,
- queue_monitors = QMons1,
- backing_queue_state = BQS2}}.
-
-dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
+ {Res, Acks1, BQS1} =
+ Fun(fun (Msg, AckTag, Acks) ->
+ dead_letter_publish(Msg, Reason, X, RK, QName),
+ [AckTag | Acks]
+ end, [], BQS),
+ {_Guids, BQS2} = BQ:ack(Acks1, BQS1),
+ {Res, State#q{backing_queue_state = BQS2}}.
+
+dead_letter_publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(
- rabbit_amqqueue:lookup(Queues), Delivery),
- DeliveredQPids.
-
-handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed = UC}) ->
- case pmon:is_monitored(QPid, QMons) of
- false -> noreply(State);
- true -> case rabbit_misc:is_abnormal_exit(Reason) of
- true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
- QNameS = rabbit_misc:rs(qname(State)),
- rabbit_log:warning("DLQ ~p for ~s died with "
- "~p unconfirmed messages~n",
- [QPid, QNameS, length(Lost)]);
- false -> ok
- end,
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = pmon:erase(QPid, QMons),
- unconfirmed = UC1})
- end.
+ rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
+ ok.
-stop(State) -> stop(undefined, noreply, State).
+stop(State) -> stop(noreply, State).
-stop(From, Reply, State = #q{unconfirmed = UC}) ->
- case {dtree:is_empty(UC), Reply} of
- {true, noreply} -> {stop, normal, State};
- {true, _} -> {stop, normal, Reply, State};
- {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
- end.
+stop(noreply, State) -> {stop, normal, State};
+stop(Reply, State) -> {stop, normal, Reply, State}.
-cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
- unconfirmed = UC,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case dtree:is_empty(UC) andalso DS =/= undefined of
- true -> case DS of
- {_, noreply} -> ok;
- {From, Reply} -> gen_server2:reply(From, Reply)
- end,
- {stop, normal, State1};
- false -> noreply(State1)
- end.
detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
@@ -1008,7 +988,7 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
@@ -1017,7 +997,7 @@ prioritise_call(Msg, _From, _State) ->
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
@@ -1026,7 +1006,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
update_ram_duration -> 8;
@@ -1037,9 +1017,6 @@ prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
-handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -1079,16 +1056,15 @@ handle_call({deliver, Delivery, Delivered}, From, State) ->
gen_server2:reply(From, ok),
noreply(deliver_or_enqueue(Delivery, Delivered, State));
-handle_call({notify_down, ChPid}, From, State) ->
+handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
%% return stop with a reply, terminate/2 will be called by
- %% gen_server2 *before* the reply is sent. FIXME: in case of a
- %% delayed stop the reply is sent earlier.
+ %% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> stop(From, ok, State1)
+ {stop, State1} -> stop(ok, State1)
end;
handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
@@ -1113,7 +1089,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, OkMsg},
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
@@ -1125,8 +1101,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
true -> rabbit_limiter:activate(Limiter);
false -> Limiter
end,
- update_ch_record(C#cr{consumer_count = Count + 1,
- limiter = Limiter1}),
+ Limiter2 = case CreditArgs of
+ none -> Limiter1;
+ {Crd, Drain} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag, Crd, Drain)
+ end,
+ C1 = update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter2}),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1141,7 +1126,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State = #q{exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
@@ -1156,8 +1141,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
1 -> rabbit_limiter:deactivate(Limiter);
_ -> Limiter
end,
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
update_ch_record(C#cr{consumer_count = Count - 1,
- limiter = Limiter1,
+ limiter = Limiter2,
blocked_consumers = Blocked1}),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -1169,7 +1155,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> stop(From, ok, State1)
+ true -> stop(ok, State1)
end
end;
@@ -1178,20 +1164,21 @@ handle_call(stat, _From, State) ->
ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, From,
+handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> stop(From, {ok, BQ:len(BQS)}, State)
+ true -> stop({ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ State1 = State#q{backing_queue_state = BQS1},
+ reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1239,19 +1226,6 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
- {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
- State1 = case dtree:is_defined(QPid, UC1) of
- false -> QMons = State#q.queue_monitors,
- State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
- true -> State
- end,
- cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State1#q{unconfirmed = UC1});
-
-handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
@@ -1337,18 +1311,27 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
noreply(State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
+handle_cast({credit, ChPid, CTag, Credit, Drain},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ Len = BQ:len(BQS),
+ rabbit_channel:send_credit_reply(ChPid, Len),
+ C = #cr{limiter = Limiter} = lookup_ch(ChPid),
+ C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)},
+ noreply(case Drain andalso Len == 0 of
+ true -> update_ch_record(C1),
+ send_drained(C1),
+ State;
+ false -> case is_ch_blocked(C1) of
+ true -> update_ch_record(C1),
+ State;
+ false -> unblock(State, C1)
+ end
+ end);
+
handle_cast(wake_up, State) ->
noreply(State).
-%% We need to not ignore this as we need to remove outstanding
-%% confirms due to queue death.
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason},
- State = #q{delayed_stop = DS}) when DS =/= undefined ->
- handle_queue_down(DownPid, Reason, State);
-
-handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> stop(State);
@@ -1356,7 +1339,9 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
+ WasEmpty = is_empty(State),
+ State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
+ noreply(maybe_send_drained(WasEmpty, State1));
handle_info(emit_stats, State) ->
emit_stats(State),
@@ -1375,9 +1360,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% unexpectedly.
stop(State);
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, State1} -> handle_queue_down(DownPid, Reason, State1);
+ {ok, State1} -> noreply(State1);
{stop, State1} -> stop(State1)
end;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e9f69b62..52c6140e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,14 +21,15 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
+ flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Internal
-export([list_local/0]).
@@ -94,6 +95,9 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
+-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
+ -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
@@ -138,6 +142,12 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+send_credit_reply(Pid, Len) ->
+ gen_server2:cast(Pid, {send_credit_reply, Len}).
+
+send_drained(Pid, CTagCredit) ->
+ gen_server2:cast(Pid, {send_drained, CTagCredit}).
+
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
@@ -217,20 +227,20 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
{ok, State1, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
emit_stats -> 7;
_ -> 0
@@ -315,6 +325,18 @@ handle_cast({deliver, ConsumerTag, AckRequired,
Content),
noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
+handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_ok'{available = Len}),
+ noreply(State);
+
+handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
+ [ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
+ credit_drained = CreditDrained})
+ || {ConsumerTag, CreditDrained} <- CTagCredit],
+ noreply(State);
+
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
@@ -711,7 +733,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_local = _, % FIXME: implement
no_ack = NoAck,
exclusive = ExclusiveConsume,
- nowait = NoWait},
+ nowait = NoWait,
+ arguments = Arguments},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -737,6 +760,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
+ parse_credit_args(Arguments),
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1112,6 +1136,17 @@ handle_method(#'channel.flow'{active = false}, _,
State1#ch{blocking = sets:from_list(QPids)})}
end;
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ drain = Drain}, _,
+ State = #ch{consumer_mapping = Consumers}) ->
+ case dict:find(CTag, Consumers) of
+ {ok, Q} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain),
+ {noreply, State};
+ error -> precondition_failed("unknown consumer tag '~s'", [CTag])
+ end;
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
@@ -1178,6 +1213,16 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+parse_credit_args(Arguments) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
+ _ -> none
+ end;
+ undefined -> none
+ end.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index 91798a0f..16a640ec 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start_link/1, start_link/2]).
+-export([start_link/1, start_link/2, start_link_worker/2]).
-export([init/1]).
@@ -32,6 +32,8 @@
rabbit_types:ok_pid_or_error()).
-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) ->
rabbit_types:ok_pid_or_error()).
+-spec(start_link_worker/2 :: ({'local', atom()}, rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
@@ -43,6 +45,13 @@ start_link(Callback) ->
start_link(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, Callback).
+start_link_worker(SupName, Callback) ->
+ supervisor2:start_link(SupName, ?MODULE, {Callback, worker}).
+
init({M,F,A}) ->
{ok, {{simple_one_for_one, 0, 1},
- [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}.
+ [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}};
+init({{M,F,A}, worker}) ->
+ {ok, {{simple_one_for_one, 0, 1},
+ [{client, {M,F,A}, temporary, ?MAX_WAIT, worker, [M]}]}}.
+
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 430c2716..d9f1170e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -16,17 +16,18 @@
%% The purpose of the limiter is to stem the flow of messages from
%% queues to channels, in order to act upon various protocol-level
-%% flow control mechanisms, specifically AMQP's basic.qos
-%% prefetch_count and channel.flow.
+%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
+%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer)
+%% credit mechanism.
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
-%% rabbit_amqqueue:basic_consume/8, and rabbit_amqqueue:basic_get/4.
+%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4.
%% The latter isn't strictly necessary, since basic.get is not
%% subject to limiting, but it means that whenever a queue knows about
%% a channel, it also knows about its limiter, which is less fiddly.
%%
-%% Th limiter process holds state that is, in effect, shared between
+%% The limiter process holds state that is, in effect, shared between
%% the channel and all queues from which the channel is
%% consuming. Essentially all these queues are competing for access to
%% a single, limited resource - the ability to deliver messages via
@@ -54,33 +55,46 @@
%% inactive. In practice it is rare for that to happen, though we
%% could optimise this case in the future.
%%
+%% In addition, the consumer credit bookkeeping is local to queues, so
+%% it is not necessary to store information about it in the limiter
+%% process. But for abstraction we hide it from the queue behind the
+%% limiter API, and it therefore becomes part of the queue local
+%% state.
+%%
%% The interactions with the limiter are as follows:
%%
%% 1. Channels tell the limiter about basic.qos prefetch counts -
%% that's what the limit_prefetch/3, unlimit_prefetch/1,
%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
%% about - and channel.flow blocking - that's what block/1,
-%% unblock/1 and is_blocked/1 are for.
+%% unblock/1 and is_blocked/1 are for. They also tell the limiter
+%% queue state (via the queue) about consumer credit changes -
+%% that's what credit/4 is for.
+%%
+%% 2. Queues also tell the limiter queue state about the queue
+%% becoming empty (via drained/1) and consumers leaving (via
+%% forget_consumer/2).
%%
-%% 2. Queues register with the limiter - this happens as part of
+%% 3. Queues register with the limiter - this happens as part of
%% activate/1.
%%
%% 4. The limiter process maintains an internal counter of 'messages
%% sent but not yet acknowledged', called the 'volume'.
%%
-%% 5. Queues ask the limiter for permission (with can_send/2) whenever
+%% 5. Queues ask the limiter for permission (with can_send/3) whenever
%% they want to deliver a message to a channel. The limiter checks
-%% whether a) the channel isn't blocked by channel.flow, and b) the
-%% volume has not yet reached the prefetch limit. If so it
-%% increments the volume and tells the queue to proceed. Otherwise
-%% it marks the queue as requiring notification (see below) and
-%% tells the queue not to proceed.
+%% whether a) the channel isn't blocked by channel.flow, b) the
+%% volume has not yet reached the prefetch limit, and c) whether
+%% the consumer has enough credit. If so it increments the volume
+%% and tells the queue to proceed. Otherwise it marks the queue as
+%% requiring notification (see below) and tells the queue not to
+%% proceed.
%%
-%% 6. A queue that has told to proceed (by the return value of
-%% can_send/2) sends the message to the channel. Conversely, a
+%% 6. A queue that has been told to proceed (by the return value of
+%% can_send/3) sends the message to the channel. Conversely, a
%% queue that has been told not to proceed, will not attempt to
%% deliver that message, or any future messages, to the
-%% channel. This is accomplished by can_send/2 capturing the
+%% channel. This is accomplished by can_send/3 capturing the
%% outcome in the local state, where it can be accessed with
%% is_suspended/1.
%%
@@ -88,14 +102,14 @@
%% how many messages were ack'ed. The limiter process decrements
%% the volume and if it falls below the prefetch_count then it
%% notifies (through rabbit_amqqueue:resume/2) all the queues
-%% requiring notification, i.e. all those that had a can_send/2
+%% requiring notification, i.e. all those that had a can_send/3
%% request denied.
%%
%% 8. Upon receipt of such a notification, queues resume delivery to
%% the channel, i.e. they will once again start asking limiter, as
%% described in (5).
%%
-%% 9. When a queues has no more consumers associated with a particular
+%% 9. When a queue has no more consumers associated with a particular
%% channel, it deactivates use of the limiter with deactivate/1,
%% which alters the local state such that no further interactions
%% with the limiter process take place until a subsequent
@@ -111,16 +125,17 @@
is_prefetch_limited/1, is_blocked/1, is_active/1,
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
--export([client/1, activate/1, can_send/2, resume/1, deactivate/1,
- is_suspended/1]).
+-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
+ is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
+ forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, prioritise_call/3]).
+ handle_info/2, prioritise_call/4]).
%%----------------------------------------------------------------------------
-record(lstate, {pid, prefetch_limited, blocked}).
--record(qstate, {pid, state}).
+-record(qstate, {pid, state, credits}).
-ifdef(use_specs).
@@ -147,11 +162,17 @@
-spec(client/1 :: (pid()) -> qstate()).
-spec(activate/1 :: (qstate()) -> qstate()).
--spec(can_send/2 :: (qstate(), boolean()) ->
+-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) ->
{'continue' | 'suspend', qstate()}).
-spec(resume/1 :: (qstate()) -> qstate()).
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
+-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
+-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
+ -> qstate()).
+-spec(drained/1 :: (qstate())
+ -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
+-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
-endif.
@@ -166,6 +187,8 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
+-record(credit, {credit = 0, drain = false}).
+
%%----------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------
@@ -208,23 +231,29 @@ ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
pid(#lstate{pid = Pid}) -> Pid.
-client(Pid) -> #qstate{pid = Pid, state = dormant}.
+client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}.
activate(L = #qstate{state = dormant}) ->
ok = gen_server:cast(L#qstate.pid, {register, self()}),
L#qstate{state = active};
activate(L) -> L.
-can_send(L = #qstate{state = active}, AckRequired) ->
+can_send(L = #qstate{pid = Pid, state = State, credits = Credits},
+ AckRequired, CTag) ->
+ case is_consumer_blocked(L, CTag) of
+ false -> case (State =/= active orelse
+ safe_call(Pid, {can_send, self(), AckRequired}, true)) of
+ true -> {continue, L#qstate{
+ credits = record_send_q(CTag, Credits)}};
+ false -> {suspend, L#qstate{state = suspended}}
+ end;
+ true -> {suspend, L}
+ end.
+
+safe_call(Pid, Msg, ExitValue) ->
rabbit_misc:with_exit_handler(
- fun () -> {continue, L} end,
- fun () -> Msg = {can_send, self(), AckRequired},
- case gen_server2:call(L#qstate.pid, Msg, infinity) of
- true -> {continue, L};
- false -> {suspend, L#qstate{state = suspended}}
- end
- end);
-can_send(L, _AckRequired) -> {continue, L}.
+ fun () -> ExitValue end,
+ fun () -> gen_server2:call(Pid, Msg, infinity) end).
resume(L) -> L#qstate{state = active}.
@@ -236,14 +265,61 @@ deactivate(L) ->
is_suspended(#qstate{state = suspended}) -> true;
is_suspended(#qstate{}) -> false.
+is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
+ case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{credit = C}} when C > 0 -> false;
+ {value, #credit{}} -> true;
+ none -> false
+ end.
+
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
+ Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
+
+drained(Limiter = #qstate{credits = Credits}) ->
+ {CTagCredits, Credits2} =
+ rabbit_misc:gb_trees_fold(
+ fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) ->
+ {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)};
+ (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) ->
+ {Acc, Creds0}
+ end, {[], Credits}, Credits),
+ {CTagCredits, Limiter#qstate{credits = Credits2}}.
+
+forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
+ Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}.
+
+%%----------------------------------------------------------------------------
+%% Queue-local code
+%%----------------------------------------------------------------------------
+
+%% We want to do all the AMQP 1.0-ish link level credit calculations
+%% in the queue (to do them elsewhere introduces a ton of
+%% races). However, it's a big chunk of code that is conceptually very
+%% linked to the limiter concept. So we get the queue to hold a bit of
+%% state for us (#qstate.credits), and maintain a fiction that the
+%% limiter is making the decisions...
+
+record_send_q(CTag, Credits) ->
+ case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{credit = Credit, drain = Drain}} ->
+ update_credit(CTag, Credit - 1, Drain, Credits);
+ none ->
+ Credits
+ end.
+
+update_credit(CTag, Credit, Drain, Credits) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ Drain1 = Drain andalso Credit > 0,
+ gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
init([]) -> {ok, #lim{}}.
-prioritise_call(get_prefetch_limit, _From, _State) -> 9;
-prioritise_call(_Msg, _From, _State) -> 0.
+prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9;
+prioritise_call(_Msg, _From, _Len, _State) -> 0.
handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) ->
{reply, ok, State#lim{ch_pid = ChPid}};
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b435e0f3..22edfcb6 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -27,8 +27,8 @@
-export([start_link/1, set_maximum_since_use/2, info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ code_change/3, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -323,14 +323,14 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
{hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{gm_deaths, _Deaths} -> 5;
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
@@ -339,7 +339,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
update_ram_duration -> 8;
sync_timeout -> 6;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 13b40a48..c63321b5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -29,8 +29,8 @@
-export([transform_dir/3, force_recovery/2]). %% upgrade
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_call/3, prioritise_cast/2,
- prioritise_info/2, format_message_queue/2]).
+ code_change/3, prioritise_call/4, prioritise_cast/3,
+ prioritise_info/3, format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -51,6 +51,9 @@
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
+ %% i.e. two pairs, so GC does not go idle when busy
+-define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4).
+
%%----------------------------------------------------------------------------
-record(msstate,
@@ -738,7 +741,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
successfully_recovered_state -> 7;
{new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7;
@@ -746,7 +749,7 @@ prioritise_call(Msg, _From, _State) ->
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
@@ -755,7 +758,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, _State) ->
+prioritise_info(Msg, _Len, _State) ->
case Msg of
sync -> 8;
_ -> 0
@@ -1728,10 +1731,12 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
%% TODO: the algorithm here is sub-optimal - it may result in a
%% complete traversal of FileSummaryEts.
- case ets:first(FileSummaryEts) of
- '$end_of_table' ->
+ First = ets:first(FileSummaryEts),
+ case First =:= '$end_of_table' orelse
+ orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of
+ true ->
State;
- First ->
+ false ->
case find_files_to_combine(FileSummaryEts, FileSizeLimit,
ets:lookup(FileSummaryEts, First)) of
not_found ->
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 3881de23..0dd7a7cc 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -23,7 +23,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+ terminate/2, code_change/3, prioritise_cast/3]).
-record(state,
{ pending_no_readers,
@@ -79,8 +79,8 @@ init([MsgStoreState]) ->
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
-prioritise_cast(_Msg, _State) -> 0.
+prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 3872f3df..fb74d4a3 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -32,8 +32,9 @@
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
+-define(RABBIT_DOWN_PING_INTERVAL, 1000).
--record(state, {monitors, partitions, subscribers}).
+-record(state, {monitors, partitions, subscribers, down_ping_timer}).
%%----------------------------------------------------------------------------
@@ -272,10 +273,34 @@ handle_info({mnesia_system_event,
ordsets:add_element(Node, ordsets:from_list(Partitions))),
{noreply, State1#state{partitions = Partitions1}};
+handle_info(ping_nodes, State) ->
+ %% We ping nodes when some are down to ensure that we find out
+ %% about healed partitions quickly. We ping all nodes rather than
+ %% just the ones we know are down for simplicity; it's not expensive
+ %% to ping the nodes that are up, after all.
+ State1 = State#state{down_ping_timer = undefined},
+ Self = self(),
+ %% all_nodes_up() both pings all the nodes and tells us if we need to again.
+ %%
+ %% We ping in a separate process since in a partition it might
+ %% take some noticeable length of time and we don't want to block
+ %% the node monitor for that long.
+ spawn_link(fun () ->
+ case all_nodes_up() of
+ true -> ok;
+ false -> Self ! ping_again
+ end
+ end),
+ {noreply, State1};
+
+handle_info(ping_again, State) ->
+ {noreply, ensure_ping_timer(State)};
+
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, _State) ->
+terminate(_Reason, State) ->
+ rabbit_misc:stop_timer(State, #state.down_ping_timer),
ok.
code_change(_OldVsn, State, _Extra) ->
@@ -309,17 +334,22 @@ handle_dead_rabbit(Node) ->
ok.
majority() ->
- length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5.
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
+
+all_nodes_up() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ length(alive_nodes(Nodes)) =:= length(Nodes).
%% mnesia:system_info(db_nodes) (and hence
%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results
%% when partitioned.
-alive_nodes() ->
- Nodes = rabbit_mnesia:cluster_nodes(all),
- [N || N <- Nodes, pong =:= net_adm:ping(N)].
+alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)).
+
+alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
alive_rabbit_nodes() ->
- [N || N <- alive_nodes(), rabbit_nodes:is_running(N, rabbit)].
+ [N || N <- alive_nodes(), rabbit_nodes:is_process_running(N, rabbit)].
await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
@@ -339,7 +369,7 @@ await_cluster_recovery() ->
wait_for_cluster_recovery(Nodes) ->
case majority() of
true -> rabbit:start();
- false -> timer:sleep(1000),
+ false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
wait_for_cluster_recovery(Nodes)
end.
@@ -353,7 +383,11 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
[] -> [];
_ -> Partitions
end,
- State#state{partitions = Partitions1}.
+ ensure_ping_timer(State#state{partitions = Partitions1}).
+
+ensure_ping_timer(State) ->
+ rabbit_misc:ensure_timer(
+ State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes).
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index c92e5963..5640f12a 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -16,7 +16,8 @@
-module(rabbit_nodes).
--export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2]).
+-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
+ is_running/2, is_process_running/2]).
-define(EPMD_TIMEOUT, 30000).
@@ -33,6 +34,7 @@
-spec(parts/1 :: (node() | string()) -> {string(), string()}).
-spec(cookie_hash/0 :: () -> string()).
-spec(is_running/2 :: (node(), atom()) -> boolean()).
+-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
-endif.
@@ -98,3 +100,10 @@ is_running(Node, Application) ->
{badrpc, _} -> false;
Apps -> proplists:is_defined(Application, Apps)
end.
+
+is_process_running(Node, Process) ->
+ case rpc:call(Node, erlang, whereis, [Process]) of
+ {badrpc, _} -> false;
+ undefined -> false;
+ P when is_pid(P) -> true
+ end.
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index b1238623..96277b68 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -162,15 +162,16 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
{?'id-at-pseudonym' , "PSEUDONYM"},
{?'id-domainComponent' , "DC"},
{?'id-emailAddress' , "EMAILADDRESS"},
- {?'street-address' , "STREET"}],
+ {?'street-address' , "STREET"},
+ {{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl
case proplists:lookup(T, Fmts) of
{_, Fmt} ->
- io_lib:format(Fmt ++ "=~s", [FV]);
+ rabbit_misc:format(Fmt ++ "=~s", [FV]);
none when is_tuple(T) ->
- TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)],
- io_lib:format("~s:~s", [string:join(TypeL, "."), FV]);
+ TypeL = [rabbit_misc:format("~w", [X]) || X <- tuple_to_list(T)],
+ rabbit_misc:format("~s=~s", [string:join(TypeL, "."), FV]);
none ->
- io_lib:format("~p:~s", [T, FV])
+ rabbit_misc:format("~p=~s", [T, FV])
end.
%% Escape a string as per RFC4514.
@@ -204,14 +205,26 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
format_directory_string(ST, S);
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
- io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
- [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
+ rabbit_misc:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
+ [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
%% We appear to get an untagged value back for an ia5string
%% (e.g. domainComponent).
format_asn1_value(V) when is_list(V) ->
V;
+format_asn1_value(V) when is_binary(V) ->
+ %% OTP does not decode some values when combined with an unknown
+ %% type. That's probably wrong, so as a last ditch effort let's
+ %% try manually decoding. 'DirectoryString' is semi-arbitrary -
+ %% but it is the type which covers the various string types we
+ %% handle below.
+ try
+ {ST, S} = public_key:der_decode('DirectoryString', V),
+ format_directory_string(ST, S)
+ catch _:_ ->
+ rabbit_misc:format("~p", [V])
+ end;
format_asn1_value(V) ->
- io_lib:format("~p", [V]).
+ rabbit_misc:format("~p", [V]).
%% DirectoryString { INTEGER : maxSize } ::= CHOICE {
%% teletexString TeletexString (SIZE (1..maxSize)),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b2c80364..e7b69879 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1107,7 +1107,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, <<"ctag">>, true, undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 56e4b7b3..22b223d2 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -23,7 +23,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+ terminate/2, code_change/3, prioritise_cast/3]).
%%----------------------------------------------------------------------------
@@ -73,8 +73,8 @@ init([WId]) ->
{ok, WId, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
-prioritise_cast(_Msg, _State) -> 0.
+prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call({submit, Fun}, From, WId) ->
gen_server2:reply(From, run(Fun)),