summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-03 17:47:22 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-03 17:47:22 +0000
commit0c3770b5c1817203f2a027c6fa93baf63e62dcea (patch)
treecf6449dc860a3f9d9547040064dafa78fbcb9487
parente8c8f4cd095c8d144d1e3167d1c4e124451d004a (diff)
parent9458aae390d88ba90fe9bb11c433e40c84b48bbc (diff)
downloadrabbitmq-server-0c3770b5c1817203f2a027c6fa93baf63e62dcea.tar.gz
Merge bug24408 (again)
-rw-r--r--docs/rabbitmqctl.1.xml8
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/rabbit_amqqueue.erl47
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_backing_queue.erl23
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_channel.erl109
-rw-r--r--src/rabbit_channel_interceptor.erl29
-rw-r--r--src/rabbit_exchange_type_topic.erl25
-rw-r--r--src/rabbit_limiter.erl66
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_queue_consumers.erl56
-rw-r--r--src/rabbit_queue_index.erl111
-rw-r--r--src/rabbit_reader.erl3
-rw-r--r--src/rabbit_recovery_terms.erl121
-rw-r--r--src/rabbit_tests.erl89
-rw-r--r--src/rabbit_variable_queue.erl223
21 files changed, 538 insertions, 464 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d2a3f7c7..d19acd00 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1559,14 +1559,6 @@
<term>prefetch_count</term>
<listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem>
</varlistentry>
- <varlistentry>
- <term>client_flow_blocked</term>
- <listitem><para>True if the client issued a
- <command>channel.flow{active=false}</command>
- command, blocking the server from delivering
- messages to the channel's consumers.
- </para></listitem>
- </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 6d117e3d..6f6f4244 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -60,7 +60,7 @@
-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
--record(trie_binding, {exchange_name, node_id, destination}).
+-record(trie_binding, {exchange_name, node_id, destination, arguments}).
-record(listener, {node, protocol, host, ip_address, port}).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index f53eea95..a96ccb35 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,9 @@ done
rm -rf %{buildroot}
%changelog
+* Thu Jan 23 2014 emile@rabbitmq.com 3.2.3-1
+- New Upstream Release
+
* Tue Dec 10 2013 emile@rabbitmq.com 3.2.2-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index f1e1c66b..7138409c 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.2.3-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Emile Joubert <emile@rabbitmq.com> Thu, 23 Jan 2014 14:46:37 +0000
+
rabbitmq-server (3.2.2-1) unstable; urgency=low
* New Upstream Release
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index bb3a544e..eeb0e0bf 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,8 +26,8 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
--export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
+-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
@@ -149,9 +149,9 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/10 ::
+-spec(basic_consume/9 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any())
+ rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -159,7 +159,6 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -193,13 +192,18 @@ recover() ->
on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
- ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
+
+ %% We rely on BQ:start/1 returning the recovery terms in the same
+ %% order as the supplied queue names, so that we can zip them together
+ %% for further processing in recover_durable_queues.
+ {ok, OrderedRecoveryTerms} =
+ BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- recover_durable_queues(DurableQueues).
+ recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -227,10 +231,11 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
-recover_durable_queues(DurableQueues) ->
- Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [Q || Q = #amqqueue{pid = Pid} <- Qs,
- gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}].
+recover_durable_queues(QueuesAndRecoveryTerms) ->
+ Qs = [{start_queue_process(node(), Q), Terms} ||
+ {Q, Terms} <- QueuesAndRecoveryTerms],
+ [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs,
+ gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -425,7 +430,7 @@ declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
- {<<"x-max-length">>, fun check_max_length_arg/2}].
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}].
@@ -435,7 +440,7 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
-check_max_length_arg({Type, Val}, Args) ->
+check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
@@ -548,8 +553,8 @@ requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
-reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}).
+reject(QPid, Requeue, MsgIds, ChPid) ->
+ delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
notify_down_all(QPids, ChPid) ->
{_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
@@ -570,13 +575,11 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
-basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid,
- LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) ->
- ok = check_consume_arguments(QName, OtherArgs),
+basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
+ LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) ->
+ ok = check_consume_arguments(QName, Args),
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs,
- OkMsg}).
+ ConsumerTag, ExclusiveConsume, Args, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -603,8 +606,6 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
-flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
-
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
%% this 'guarded' delete prevents unnecessary writes to the mnesia
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ba00ff41..21b6089b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -22,6 +22,7 @@
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
-export([start_link/1, info_keys/0]).
@@ -173,9 +174,10 @@ code_change(_OldVsn, State, _Extra) ->
declare(Recover, From, State = #q{q = Q,
backing_queue = undefined,
backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
+ {Recovery, TermsOrNew} = recovery_status(Recover),
+ case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of
#amqqueue{} = Q1 ->
- case matches(Recover, Q, Q1) of
+ case matches(Recovery, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
@@ -184,8 +186,8 @@ declare(Recover, From, State = #q{q = Q,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQ = backing_queue_module(Q1),
- BQS = bq_init(BQ, Q, Recover),
- recovery_barrier(Recover),
+ BQS = bq_init(BQ, Q, TermsOrNew),
+ recovery_barrier(Recovery),
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
@@ -202,6 +204,9 @@ declare(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
+recovery_status(new) -> {new, new};
+recovery_status({Recover, Terms}) -> {Recover, Terms}.
+
matches(new, Q1, Q2) ->
%% i.e. not policy
Q1#amqqueue.name =:= Q2#amqqueue.name andalso
@@ -237,7 +242,7 @@ decorator_callback(QName, F, A) ->
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover =/= new,
+ BQ:init(Q, Recover,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
@@ -507,7 +512,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, State),
+ Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State2 = State1#q{backing_queue_state = BQS1},
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
@@ -829,24 +834,36 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _Len, _State) ->
+prioritise_call(Msg, _From, _Len, State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- stat -> 7;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ stat -> 7;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
+ {basic_cancel, _, _, _} -> consumer_bias(State);
+ _ -> 0
end.
-prioritise_cast(Msg, _Len, _State) ->
+prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
+ {ack, _AckTags, _ChPid} -> consumer_bias(State);
+ {notify_sent, _ChPid, _Credit} -> consumer_bias(State);
+ {resume, _ChPid} -> consumer_bias(State);
_ -> 0
end.
+consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case BQ:msg_rates(BQS) of
+ {0.0, _} -> 0;
+ {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> 1;
+ {_, _} -> 0
+ end.
+
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
@@ -865,7 +882,7 @@ handle_call({init, Recover}, From,
%% You used to be able to declare an exclusive durable queue. Sadly we
%% need to still tidy up after that case, there could be the remnants
%% of one left over from an upgrade. So that's why we don't enforce
-%% Recover = false here.
+%% Recover = new here.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
case rabbit_misc:is_process_alive(Owner) of
@@ -876,7 +893,8 @@ handle_call({init, Recover}, From,
q = Q} = State,
gen_server2:reply(From, {owner_died, Q}),
BQ = backing_queue_module(Q),
- BQS = bq_init(BQ, Q, Recover),
+ {_, Terms} = recovery_status(Recover),
+ BQS = bq_init(BQ, Q, Terms),
%% Rely on terminate to delete the queue.
{stop, {shutdown, missing_owner},
State#q{backing_queue = BQ, backing_queue_state = BQS}}
@@ -924,7 +942,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
+ ConsumerTag, ExclusiveConsume, Args, OkMsg},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -932,8 +950,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
- CreditArgs, OtherArgs,
- is_empty(State), Consumers),
+ Args, is_empty(State), Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
@@ -943,7 +960,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), OtherArgs),
+ not NoAck, qname(State1), Args),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1057,10 +1074,10 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(ack(AckTags, ChPid, State));
-handle_cast({reject, AckTags, true, ChPid}, State) ->
+handle_cast({reject, true, AckTags, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State) ->
+handle_cast({reject, false, AckTags, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
fun (X) -> subtract_acks(ChPid, AckTags, State,
@@ -1085,10 +1102,6 @@ handle_cast({activate_limit, ChPid}, State) ->
noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(),
ChPid, State));
-handle_cast({flush, ChPid}, State) ->
- ok = rabbit_channel:flushed(ChPid, self()),
- noreply(State);
-
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 61b504bc..3d88be7a 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -27,7 +27,8 @@
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
('empty' | {rabbit_types:msg_id(), Ack})).
--type(attempt_recovery() :: boolean()).
+-type(recovery_terms() :: [term()] | 'non_clean_shutdown').
+-type(recovery_info() :: 'new' | recovery_terms()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
@@ -40,7 +41,10 @@
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
%% of those queues, or initialise any other shared resources.
--callback start([rabbit_amqqueue:name()]) -> 'ok'.
+%%
+%% The list of queue recovery terms returned as {ok, Terms} must be given
+%% in the same order as the list of queue names supplied.
+-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
%% Called to tear down any state/resources. NB: Implementations should
%% not depend on this function being called on shutdown and instead
@@ -51,15 +55,17 @@
%%
%% Takes
%% 1. the amqqueue record
-%% 2. a boolean indicating whether the queue is an existing queue that
-%% should be recovered
+%% 2. a term indicating whether the queue is an existing queue that
+%% should be recovered or not. When 'new' is given, no recovery is
+%% taking place, otherwise a list of recovery terms is given, or
+%% the atom 'non_clean_shutdown' if no recovery terms are available.
%% 3. an asynchronous callback which accepts a function of type
%% backing-queue-state to backing-queue-state. This callback
%% function can be safely invoked from any process, which makes it
%% useful for passing messages back into the backing queue,
%% especially as the backing queue does not have control of its own
%% mailbox.
--callback init(rabbit_types:amqqueue(), attempt_recovery(),
+-callback init(rabbit_types:amqqueue(), recovery_info(),
async_callback()) -> state().
%% Called on queue shutdown when queue isn't being deleted.
@@ -203,6 +209,10 @@
%% Called immediately before the queue hibernates.
-callback handle_pre_hibernate(state()) -> state().
+%% Used to help prioritisation in rabbit_amqqueue_process. The rate of
+%% inbound messages and outbound messages at the moment.
+-callback msg_rates(state()) -> {float(), float()}.
+
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
-callback status(state()) -> [{atom(), any()}].
@@ -230,7 +240,8 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1},
+ {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index e2bc3247..b0545915 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -373,7 +373,7 @@ qc_default_exchange() ->
qc_variable_queue_init(Q) ->
{call, ?BQMOD, init,
- [Q, false, function(2, ok)]}.
+ [Q, new, function(2, {ok, []})]}.
qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e7f8b290..3cfd7fd8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,8 +21,7 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
- flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -37,7 +36,7 @@
conn_name, limiter, tx, next_tag, unacked_message_q, user,
virtual_host, most_recently_declared_queue,
queue_names, queue_monitors, consumer_mapping,
- blocking, queue_consumers, delivering_queues,
+ queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, mandatory, capabilities, trace_state}).
@@ -53,7 +52,6 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked,
state]).
-define(CREATION_EVENT_KEYS,
@@ -99,7 +97,6 @@
-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
-> 'ok').
--spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -149,9 +146,6 @@ send_credit_reply(Pid, Len) ->
send_drained(Pid, CTagCredit) ->
gen_server2:cast(Pid, {send_drained, CTagCredit}).
-flushed(Pid, QPid) ->
- gen_server2:cast(Pid, {flushed, QPid}).
-
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_channel, list_local, []).
@@ -213,7 +207,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
- blocking = sets:new(),
queue_consumers = dict:new(),
delivering_queues = sets:new(),
queue_collector_pid = CollectorPid,
@@ -270,13 +263,14 @@ handle_call(_Request, _From, State) ->
noreply(State).
handle_cast({method, Method, Content, Flow},
- State = #ch{reader_pid = Reader}) ->
+ State = #ch{reader_pid = Reader,
+ virtual_host = VHost}) ->
case Flow of
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
try handle_method(rabbit_channel_interceptor:intercept_method(
- expand_shortcuts(Method, State)),
+ expand_shortcuts(Method, State), VHost),
Content, State) of
{reply, Reply, NewState} ->
ok = send(Reply, NewState),
@@ -293,9 +287,6 @@ handle_cast({method, Method, Content, Flow},
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_cast({flushed, QPid}, State) ->
- {noreply, queue_blocked(QPid, State), hibernate};
-
handle_cast(ready_for_close, State = #ch{state = closing,
writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
@@ -373,8 +364,7 @@ handle_info(emit_stats, State) ->
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
- State2 = queue_blocked(QPid, State1),
- State3 = handle_consuming_queue_down(QPid, State2),
+ State3 = handle_consuming_queue_down(QPid, State1),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
#ch{queue_names = QNames, queue_monitors = QMons} = State4,
@@ -607,20 +597,6 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
-queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case sets:is_element(QPid, Blocking) of
- false -> State;
- true -> maybe_send_flow_ok(
- State#ch{blocking = sets:del_element(QPid, Blocking)})
- end.
-
-maybe_send_flow_ok(State = #ch{blocking = Blocking}) ->
- case sets:size(Blocking) of
- 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
- _ -> ok
- end,
- State.
-
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -796,13 +772,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
- {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
- ActualConsumerTag, ExclusiveConsume,
- CreditArgs, OtherArgs,
+ ActualConsumerTag, ExclusiveConsume, Args,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -886,8 +860,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
PrefetchCount, queue:len(UAMQ)),
- {reply, #'basic.qos_ok'{},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
+ case ((not rabbit_limiter:is_active(Limiter)) andalso
+ rabbit_limiter:is_active(Limiter1)) of
+ true -> rabbit_amqqueue:activate_limit_all(
+ consumer_queues(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
@@ -1176,36 +1155,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'channel.flow'{active = true},
- _, State = #ch{limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:unblock(Limiter),
- {reply, #'channel.flow_ok'{active = true},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
-
-handle_method(#'channel.flow'{active = false},
- _, State = #ch{consumer_mapping = Consumers,
- limiter = Limiter}) ->
- case rabbit_limiter:is_blocked(Limiter) of
- true -> {noreply, maybe_send_flow_ok(State)};
- false -> Limiter1 = rabbit_limiter:block(Limiter),
- State1 = maybe_limit_queues(Limiter, Limiter1,
- State#ch{limiter = Limiter1}),
- %% The semantics of channel.flow{active=false}
- %% require that no messages are delivered after the
- %% channel.flow_ok has been sent. We accomplish that
- %% by "flushing" all messages in flight from the
- %% consumer queues to us. To do this we tell all the
- %% queues to invoke rabbit_channel:flushed/2, which
- %% will send us a {flushed, ...} message that appears
- %% *after* all the {deliver, ...} messages. We keep
- %% track of all the QPids thus asked, and once all of
- %% them have responded (or died) we send the
- %% channel.flow_ok.
- QPids = consumer_queues(Consumers),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, maybe_send_flow_ok(
- State1#ch{blocking = sets:from_list(QPids)})}
- end;
+handle_method(#'channel.flow'{active = true}, _, State) ->
+ {reply, #'channel.flow_ok'{active = true}, State};
+
+handle_method(#'channel.flow'{active = false}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "active=false", []);
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
@@ -1295,16 +1249,6 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-parse_credit_args(Arguments) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
- _ -> none
- end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
- undefined -> {none, Arguments}
- end.
-
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1371,7 +1315,7 @@ reject(DeliveryTag, Requeue, Multiple,
reject(Requeue, Acked, Limiter) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self())
end, Acked),
ok = notify_limiter(Limiter, Acked).
@@ -1474,15 +1418,6 @@ foreach_per_queue(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-maybe_limit_queues(OldLimiter, NewLimiter, State) ->
- case ((not rabbit_limiter:is_active(OldLimiter)) andalso
- rabbit_limiter:is_active(NewLimiter)) of
- true -> Queues = consumer_queues(State#ch.consumer_mapping),
- rabbit_amqqueue:activate_limit_all(Queues, self());
- false -> ok
- end,
- State.
-
consumer_queues(Consumers) ->
lists:usort([QPid ||
{_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
@@ -1494,7 +1429,7 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
%% optimisation: avoid the potentially expensive 'foldl' in the
%% common case.
- case rabbit_limiter:is_prefetch_limited(Limiter) of
+ case rabbit_limiter:is_active(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
@@ -1670,8 +1605,6 @@ i(state, #ch{state = running}) -> credit_flow:state();
i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
-i(client_flow_blocked, #ch{limiter = Limiter}) ->
- rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 5d1665e0..49f7e388 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -22,7 +22,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([intercept_method/1]).
+-export([intercept_method/2]).
-ifdef(use_specs).
@@ -32,7 +32,7 @@
-callback description() -> [proplists:property()].
--callback intercept(original_method()) ->
+-callback intercept(original_method(), rabbit_types:vhost()) ->
rabbit_types:ok_or_error2(processed_method(), any()).
%% Whether the interceptor wishes to intercept the amqp method
@@ -43,7 +43,7 @@
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {intercept, 1}, {applies_to, 1}];
+ [{description, 0}, {intercept, 2}, {applies_to, 1}];
behaviour_info(_Other) ->
undefined.
@@ -51,15 +51,18 @@ behaviour_info(_Other) ->
%%----------------------------------------------------------------------------
-intercept_method(#'basic.publish'{} = M) ->
- M;
-intercept_method(M) ->
- intercept_method(M, select(rabbit_misc:method_record_type(M))).
+intercept_method(#'basic.publish'{} = M, _VHost) -> M;
+intercept_method(#'basic.ack'{} = M, _VHost) -> M;
+intercept_method(#'basic.nack'{} = M, _VHost) -> M;
+intercept_method(#'basic.reject'{} = M, _VHost) -> M;
+intercept_method(#'basic.credit'{} = M, _VHost) -> M;
+intercept_method(M, VHost) ->
+ intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))).
-intercept_method(M, []) ->
+intercept_method(M, _VHost, []) ->
M;
-intercept_method(M, [I]) ->
- case I:intercept(M) of
+intercept_method(M, VHost, [I]) ->
+ case I:intercept(M, VHost) of
{ok, M2} ->
case validate_method(M, M2) of
true ->
@@ -74,7 +77,7 @@ intercept_method(M, [I]) ->
internal_error("Interceptor: ~p failed with reason: ~p",
[I, Reason])
end;
-intercept_method(M, Is) ->
+intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
[rabbit_misc:method_record_type(M), Is]).
@@ -87,5 +90,7 @@ select(Method) ->
validate_method(M, M2) ->
rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
+%% keep dialyzer happy
+-spec internal_error(string(), [any()]) -> no_return().
internal_error(Format, Args) ->
- rabbit_misc:protocol_error(internal_error, Format, Args). \ No newline at end of file
+ rabbit_misc:protocol_error(internal_error, Format, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 8ba29deb..27b8d1e6 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -79,9 +79,9 @@ remove_bindings(transaction, _X, Bs) ->
[begin
Path = [{FinalNode, _} | _] =
follow_down_get_path(X, split_topic_key(K)),
- trie_remove_binding(X, FinalNode, D),
+ trie_remove_binding(X, FinalNode, D, Args),
remove_path_if_empty(X, Path)
- end || #binding{source = X, key = K, destination = D} <- Bs],
+ end || #binding{source = X, key = K, destination = D, args = Args} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
@@ -91,9 +91,10 @@ assert_args_equivalence(X, Args) ->
%%----------------------------------------------------------------------------
-internal_add_binding(#binding{source = X, key = K, destination = D}) ->
+internal_add_binding(#binding{source = X, key = K, destination = D,
+ args = Args}) ->
FinalNode = follow_down_create(X, split_topic_key(K)),
- trie_add_binding(X, FinalNode, D),
+ trie_add_binding(X, FinalNode, D, Args),
ok.
trie_match(X, Words) ->
@@ -176,7 +177,8 @@ trie_bindings(X, Node) ->
MatchHead = #topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
node_id = Node,
- destination = '$1'}},
+ destination = '$1',
+ arguments = '_'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
trie_update_node_counts(X, Node, Field, Delta) ->
@@ -213,20 +215,21 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
node_id = ToNode},
write).
-trie_add_binding(X, Node, D) ->
+trie_add_binding(X, Node, D, Args) ->
trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
- trie_binding_op(X, Node, D, fun mnesia:write/3).
+ trie_binding_op(X, Node, D, Args, fun mnesia:write/3).
-trie_remove_binding(X, Node, D) ->
+trie_remove_binding(X, Node, D, Args) ->
trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
- trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
+ trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3).
-trie_binding_op(X, Node, D, Op) ->
+trie_binding_op(X, Node, D, Args, Op) ->
ok = Op(rabbit_topic_trie_binding,
#topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
node_id = Node,
- destination = D}},
+ destination = D,
+ arguments = Args}},
write).
trie_remove_all_nodes(X) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index da728033..d37b356c 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -17,8 +17,7 @@
%% The purpose of the limiter is to stem the flow of messages from
%% queues to channels, in order to act upon various protocol-level
%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
-%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer)
-%% credit mechanism.
+%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism.
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
@@ -65,11 +64,9 @@
%%
%% 1. Channels tell the limiter about basic.qos prefetch counts -
%% that's what the limit_prefetch/3, unlimit_prefetch/1,
-%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
-%% about - and channel.flow blocking - that's what block/1,
-%% unblock/1 and is_blocked/1 are for. They also tell the limiter
-%% queue state (via the queue) about consumer credit changes -
-%% that's what credit/5 is for.
+%% get_prefetch_limit/1 API functions are about. They also tell the
+%% limiter queue state (via the queue) about consumer credit
+%% changes - that's what credit/5 is for.
%%
%% 2. Queues also tell the limiter queue state about the queue
%% becoming empty (via drained/1) and consumers leaving (via
@@ -83,12 +80,11 @@
%%
%% 5. Queues ask the limiter for permission (with can_send/3) whenever
%% they want to deliver a message to a channel. The limiter checks
-%% whether a) the channel isn't blocked by channel.flow, b) the
-%% volume has not yet reached the prefetch limit, and c) whether
-%% the consumer has enough credit. If so it increments the volume
-%% and tells the queue to proceed. Otherwise it marks the queue as
-%% requiring notification (see below) and tells the queue not to
-%% proceed.
+%% whether a) the volume has not yet reached the prefetch limit,
+%% and b) whether the consumer has enough credit. If so it
+%% increments the volume and tells the queue to proceed. Otherwise
+%% it marks the queue as requiring notification (see below) and
+%% tells the queue not to proceed.
%%
%% 6. A queue that has been told to proceed (by the return value of
%% can_send/3) sends the message to the channel. Conversely, a
@@ -123,8 +119,7 @@
-export([start_link/1]).
%% channel API
--export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
- is_prefetch_limited/1, is_blocked/1, is_active/1,
+-export([new/1, limit_prefetch/3, unlimit_prefetch/1, is_active/1,
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
@@ -136,14 +131,13 @@
%%----------------------------------------------------------------------------
--record(lstate, {pid, prefetch_limited, blocked}).
+-record(lstate, {pid, prefetch_limited}).
-record(qstate, {pid, state, credits}).
-ifdef(use_specs).
-type(lstate() :: #lstate{pid :: pid(),
- prefetch_limited :: boolean(),
- blocked :: boolean()}).
+ prefetch_limited :: boolean()}).
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
@@ -154,10 +148,6 @@
-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
-> lstate()).
-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
--spec(block/1 :: (lstate()) -> lstate()).
--spec(unblock/1 :: (lstate()) -> lstate()).
--spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
--spec(is_blocked/1 :: (lstate()) -> boolean()).
-spec(is_active/1 :: (lstate()) -> boolean()).
-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()).
-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
@@ -183,7 +173,6 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- blocked = false,
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -201,7 +190,7 @@ start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
ok = gen_server:call(Pid, {new, self()}, infinity),
- #lstate{pid = Pid, prefetch_limited = false, blocked = false}.
+ #lstate{pid = Pid, prefetch_limited = false}.
limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
ok = gen_server:call(
@@ -213,19 +202,7 @@ unlimit_prefetch(L) ->
ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity),
L#lstate{prefetch_limited = false}.
-block(L) ->
- ok = gen_server:call(L#lstate.pid, block, infinity),
- L#lstate{blocked = true}.
-
-unblock(L) ->
- ok = gen_server:call(L#lstate.pid, unblock, infinity),
- L#lstate{blocked = false}.
-
-is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
-
-is_blocked(#lstate{blocked = Blocked}) -> Blocked.
-
-is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
+is_active(#lstate{prefetch_limited = Limited}) -> Limited.
get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
get_prefetch_limit(L) ->
@@ -349,19 +326,10 @@ handle_call(unlimit_prefetch, _From, State) ->
{reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
volume = 0})};
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call(unblock, _From, State) ->
- {reply, ok, maybe_notify(State, State#lim{blocked = false})};
-
handle_call(get_prefetch_limit, _From,
State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
-handle_call({can_send, QPid, _AckRequired}, _From,
- State = #lim{blocked = true}) ->
- {reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case prefetch_limit_reached(State) of
@@ -397,8 +365,8 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso
- not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of
+ case prefetch_limit_reached(OldState) andalso
+ not prefetch_limit_reached(NewState) of
true -> notify_queues(NewState);
false -> NewState
end.
@@ -406,8 +374,6 @@ maybe_notify(OldState, NewState) ->
prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
-blocked(#lim{blocked = Blocked}) -> Blocked.
-
remember_queue(QPid, State = #lim{queues = Queues}) ->
case orddict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9ce5afcb..b272c64f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -353,6 +353,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:msg_rates(BQS).
+
status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:status(BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d562210a..1f31b5c8 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- BQS = bq_init(BQ, Q1, false),
+ BQS = bq_init(BQ, Q1, new),
State = #state { q = Q1,
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 80e160d9..1c63980e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -71,6 +71,7 @@
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
+-export([moving_average/4]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -251,6 +252,8 @@
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
+-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined')
+ -> float()).
-endif.
%%----------------------------------------------------------------------------
@@ -1088,6 +1091,12 @@ stop_timer(State, Idx) ->
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
+moving_average(_Time, _HalfLife, Next, undefined) ->
+ Next;
+moving_average(Time, HalfLife, Next, Current) ->
+ Weight = math:exp(Time * math:log(0.5) / HalfLife),
+ Next * (1 - Weight) + Current * Weight.
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 908e4783..c9540da8 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -27,6 +27,9 @@
-define(UNSENT_MESSAGE_LIMIT, 200).
+%% Utilisation average calculations are all in μs.
+-define(USE_AVG_HALF_LIFE, 1000000.0).
+
-record(state, {consumers, use}).
-record(consumer, {tag, ack_required, args}).
@@ -57,7 +60,6 @@
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
--type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
-spec new() -> state().
@@ -68,8 +70,7 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- credit_args(), rabbit_framing:amqp_table(), boolean(),
- state()) -> state().
+ rabbit_framing:amqp_table(), boolean(), state()) -> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
-spec erase_ch(ch(), state()) ->
@@ -120,8 +121,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
- IsEmpty, State = #state{consumers = Consumers}) ->
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+ State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -129,34 +130,34 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
false -> Limiter
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
- update_ch_record(case CreditArgs of
+ update_ch_record(case parse_credit_args(Args) of
none -> C1;
{Crd, Drain} -> credit_and_drain(
- C1, ConsumerTag, Crd, Drain, IsEmpty)
+ C1, CTag, Crd, Drain, IsEmpty)
end),
- Consumer = #consumer{tag = ConsumerTag,
+ Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
- args = OtherArgs},
+ args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
-remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
+remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
C = #cr{consumer_count = Count,
limiter = Limiter,
blocked_consumers = Blocked} ->
- Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ Blocked1 = remove_consumer(ChPid, CTag, Blocked),
Limiter1 = case Count of
1 -> rabbit_limiter:deactivate(Limiter);
_ -> Limiter
end,
- Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag),
update_ch_record(C#cr{consumer_count = Count - 1,
limiter = Limiter2,
blocked_consumers = Blocked1}),
State#state{consumers =
- remove_consumer(ChPid, ConsumerTag, Consumers)}
+ remove_consumer(ChPid, CTag, Consumers)}
end.
erase_ch(ChPid, State = #state{consumers = Consumers}) ->
@@ -215,14 +216,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
end.
deliver_to_consumer(FetchFun,
- #consumer{tag = ConsumerTag,
+ #consumer{tag = CTag,
ack_required = AckRequired},
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
QName) ->
{{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
true -> queue:in(AckTag, ChAckTags);
@@ -324,6 +325,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
%%----------------------------------------------------------------------------
+parse_credit_args(Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end;
+ undefined -> none
+ end.
+
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
undefined -> not_found;
@@ -399,9 +410,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
end,
priority_queue:in({ChPid, Consumer}, Priority, Queue).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
+remove_consumer(ChPid, CTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= CTag)
end, Queue).
remove_consumers(ChPid, Queue) ->
@@ -422,11 +433,6 @@ update_use({inactive, Since, Active, Avg}, active) ->
use_avg(Active, Inactive, Avg) ->
Time = Inactive + Active,
- Ratio = Active / Time,
- Weight = erlang:min(1, Time / 1000000),
- case Avg of
- undefined -> Ratio;
- _ -> Ratio * Weight + Avg * (1 - Weight)
- end.
+ rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f69d8355..919b7376 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,12 +16,10 @@
-module(rabbit_queue_index).
--export([init/2, shutdown_terms/1, recover/5,
+-export([init/2, recover/5,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
- read/3, next_segment_boundary/1, bounds/1, recover/1]).
-
--export([scan/3]).
+ read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
-export([add_queue_ttl/0, avoid_zeroes/0]).
@@ -196,10 +194,9 @@
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
--type(shutdown_terms() :: [any()]).
+-type(shutdown_terms() :: [term()] | 'non_clean_shutdown').
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
--spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
contains_predicate(), on_sync_fun()) ->
{'undefined' | non_neg_integer(), qistate()}).
@@ -220,13 +217,7 @@
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
-
--spec(scan/3 :: (file:filename(),
- fun ((seq_id(), rabbit_types:msg_id(),
- rabbit_types:message_properties(), boolean(),
- ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A),
- A) -> A).
+-spec(start/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -242,26 +233,20 @@ init(Name, OnSyncFun) ->
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
State #qistate { on_sync = OnSyncFun }.
-shutdown_terms(Name) ->
- #qistate { dir = Dir } = blank_state(Name),
- case read_shutdown_terms(Dir) of
- {error, _} -> [];
- {ok, Terms1} -> Terms1
- end.
-
recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+ State = blank_state(Name),
State1 = State #qistate { on_sync = OnSyncFun },
- CleanShutdown = detect_clean_shutdown(Dir),
+ CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
init_clean(RecoveredCounts, State1);
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
-terminate(Terms, State) ->
- {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
- store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir),
+terminate(Terms, State = #qistate { dir = Dir }) ->
+ {SegmentCounts, State1} = terminate(State),
+ rabbit_recovery_terms:store(filename:basename(Dir),
+ [{segments, SegmentCounts} | Terms]),
State1.
delete_and_terminate(State) ->
@@ -357,37 +342,40 @@ bounds(State = #qistate { segments = Segments }) ->
end,
{LowSeqId, NextSeqId, State}.
-recover(DurableQueues) ->
- DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
- Queue <- DurableQueues ]),
- QueuesDir = queues_dir(),
- QueueDirNames = all_queue_directory_names(QueuesDir),
- DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
- {DurableQueueNames, DurableTerms} =
+start(DurableQueueNames) ->
+ ok = rabbit_recovery_terms:start(),
+ {DurableTerms, DurableDirectories} =
lists:foldl(
- fun (QueueDirName, {DurableAcc, TermsAcc}) ->
- QueueDirPath = filename:join(QueuesDir, QueueDirName),
- case sets:is_element(QueueDirName, DurableDirectories) of
- true ->
- TermsAcc1 =
- case read_shutdown_terms(QueueDirPath) of
- {error, _} -> TermsAcc;
- {ok, Terms} -> [Terms | TermsAcc]
- end,
- {[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
- TermsAcc1};
- false ->
- ok = rabbit_file:recursive_delete([QueueDirPath]),
- {DurableAcc, TermsAcc}
- end
- end, {[], []}, QueueDirNames),
- {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+ fun(QName, {RecoveryTerms, ValidDirectories}) ->
+ DirName = queue_name_to_dir_name(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read(DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ {[RecoveryInfo | RecoveryTerms],
+ sets:add_element(DirName, ValidDirectories)}
+ end, {[], sets:new()}, DurableQueueNames),
+
+ %% Any queue directory we've not been asked to recover is considered garbage
+ QueuesDir = queues_dir(),
+ rabbit_file:recursive_delete(
+ [filename:join(QueuesDir, DirName) ||
+ DirName <- all_queue_directory_names(QueuesDir),
+ not sets:is_element(DirName, DurableDirectories)]),
+
+ rabbit_recovery_terms:clear(),
+
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+stop() -> rabbit_recovery_terms:stop().
all_queue_directory_names(Dir) ->
case rabbit_file:list_dir(Dir) of
- {ok, Entries} -> [ Entry || Entry <- Entries,
- rabbit_file:is_dir(
- filename:join(Dir, Entry)) ];
+ {ok, Entries} -> [E || E <- Entries,
+ rabbit_file:is_dir(filename:join(Dir, E))];
{error, enoent} -> []
end.
@@ -410,22 +398,6 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
unconfirmed = gb_sets:new() }.
-clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
-
-detect_clean_shutdown(Dir) ->
- case rabbit_file:delete(clean_filename(Dir)) of
- ok -> true;
- {error, enoent} -> false
- end.
-
-read_shutdown_terms(Dir) ->
- rabbit_file:read_term_file(clean_filename(Dir)).
-
-store_clean_shutdown(Terms, Dir) ->
- CleanFileName = clean_filename(Dir),
- ok = rabbit_file:ensure_dir(CleanFileName),
- rabbit_file:write_term_file(CleanFileName, Terms).
-
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
%% gets us back to where we were on shutdown.
@@ -554,9 +526,6 @@ queue_index_walker_reader(QueueName, Gatherer) ->
end, ok, State),
ok = gatherer:finish(Gatherer).
-scan(Dir, Fun, Acc) ->
- scan_segments(Fun, Acc, blank_state_dir(Dir)).
-
scan_segments(Fun, Acc, State) ->
State1 = #qistate { segments = Segments, dir = Dir } =
recover_journal(State),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 64debcab..122eb305 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -957,6 +957,9 @@ validate_negotiated_integer_value(Field, Min, ClientValue) ->
ok
end.
+%% keep dialyzer happy
+-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) ->
+ no_return().
fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
{S1, S2} = case MinOrMax of
min -> {lower, minimum};
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
new file mode 100644
index 00000000..efb94b81
--- /dev/null
+++ b/src/rabbit_recovery_terms.erl
@@ -0,0 +1,121 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+%% We use a gen_server simply so that during the terminate/2 call
+%% (i.e., during shutdown), we can sync/flush the dets table to disk.
+
+-module(rabbit_recovery_terms).
+
+-behaviour(gen_server).
+
+-export([start/0, stop/0, store/2, read/1, clear/0]).
+
+-export([upgrade_recovery_terms/0, start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-rabbit_upgrade({upgrade_recovery_terms, local, []}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start() -> rabbit_types:ok_or_error(term())).
+-spec(stop() -> rabbit_types:ok_or_error(term())).
+-spec(store(file:filename(), term()) -> rabbit_types:ok_or_error(term())).
+-spec(read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found)).
+-spec(clear() -> 'ok').
+
+-endif. % use_specs
+
+%%----------------------------------------------------------------------------
+
+-define(SERVER, ?MODULE).
+
+start() -> rabbit_sup:start_child(?MODULE).
+
+stop() -> rabbit_sup:stop_child(?MODULE).
+
+store(DirBaseName, Terms) -> dets:insert(?MODULE, {DirBaseName, Terms}).
+
+read(DirBaseName) ->
+ case dets:lookup(?MODULE, DirBaseName) of
+ [{_, Terms}] -> {ok, Terms};
+ _ -> {error, not_found}
+ end.
+
+clear() ->
+ dets:delete_all_objects(?MODULE),
+ flush().
+
+%%----------------------------------------------------------------------------
+
+upgrade_recovery_terms() ->
+ open_table(),
+ try
+ QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"),
+ Dirs = case rabbit_file:list_dir(QueuesDir) of
+ {ok, Entries} -> Entries;
+ {error, _} -> []
+ end,
+ [begin
+ File = filename:join([QueuesDir, Dir, "clean.dot"]),
+ case rabbit_file:read_term_file(File) of
+ {ok, Terms} -> ok = store(Dir, Terms);
+ {error, _} -> ok
+ end,
+ file:delete(File)
+ end || Dir <- Dirs],
+ ok
+ after
+ close_table()
+ end.
+
+start_link() -> gen_server:start_link(?MODULE, [], []).
+
+%%----------------------------------------------------------------------------
+
+init(_) ->
+ process_flag(trap_exit, true),
+ open_table(),
+ {ok, undefined}.
+
+handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(_Info, State) -> {noreply, State}.
+
+terminate(_Reason, _State) ->
+ close_table().
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+open_table() ->
+ File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
+ {ok, _} = dets:open_file(?MODULE, [{file, File},
+ {ram_file, true},
+ {auto_save, infinity}]).
+
+flush() -> dets:sync(?MODULE).
+
+close_table() ->
+ ok = flush(),
+ ok = dets:close(?MODULE).
+
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index cffc4cf1..74ff2adb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -578,33 +578,38 @@ test_topic_matching() ->
key = list_to_binary(Key),
destination = #resource{virtual_host = <<"/">>,
kind = queue,
- name = list_to_binary(Q)}} ||
- {Key, Q} <- [{"a.b.c", "t1"},
- {"a.*.c", "t2"},
- {"a.#.b", "t3"},
- {"a.b.b.c", "t4"},
- {"#", "t5"},
- {"#.#", "t6"},
- {"#.b", "t7"},
- {"*.*", "t8"},
- {"a.*", "t9"},
- {"*.b.c", "t10"},
- {"a.#", "t11"},
- {"a.#.#", "t12"},
- {"b.b.c", "t13"},
- {"a.b.b", "t14"},
- {"a.b", "t15"},
- {"b.c", "t16"},
- {"", "t17"},
- {"*.*.*", "t18"},
- {"vodka.martini", "t19"},
- {"a.b.c", "t20"},
- {"*.#", "t21"},
- {"#.*.#", "t22"},
- {"*.#.#", "t23"},
- {"#.#.#", "t24"},
- {"*", "t25"},
- {"#.b.#", "t26"}]],
+ name = list_to_binary(Q)},
+ args = Args} ||
+ {Key, Q, Args} <- [{"a.b.c", "t1", []},
+ {"a.*.c", "t2", []},
+ {"a.#.b", "t3", []},
+ {"a.b.b.c", "t4", []},
+ {"#", "t5", []},
+ {"#.#", "t6", []},
+ {"#.b", "t7", []},
+ {"*.*", "t8", []},
+ {"a.*", "t9", []},
+ {"*.b.c", "t10", []},
+ {"a.#", "t11", []},
+ {"a.#.#", "t12", []},
+ {"b.b.c", "t13", []},
+ {"a.b.b", "t14", []},
+ {"a.b", "t15", []},
+ {"b.c", "t16", []},
+ {"", "t17", []},
+ {"*.*.*", "t18", []},
+ {"vodka.martini", "t19", []},
+ {"a.b.c", "t20", []},
+ {"*.#", "t21", []},
+ {"#.*.#", "t22", []},
+ {"*.#.#", "t23", []},
+ {"#.#.#", "t24", []},
+ {"*", "t25", []},
+ {"#.b.#", "t26", []},
+ {"args-test", "t27",
+ [{<<"foo">>, longstr, <<"bar">>}]},
+ {"args-test", "t27", %% Note aliasing
+ [{<<"foo">>, longstr, <<"baz">>}]}]],
lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
Bindings),
@@ -631,12 +636,13 @@ test_topic_matching() ->
"t22", "t23", "t24", "t26"]},
{"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
{"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
- "t25"]}]),
-
+ "t25"]},
+ {"args-test", ["t5", "t6", "t21", "t22", "t23", "t24",
+ "t25", "t27"]}]),
%% remove some bindings
RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
lists:nth(11, Bindings), lists:nth(19, Bindings),
- lists:nth(21, Bindings)],
+ lists:nth(21, Bindings), lists:nth(28, Bindings)],
exchange_op_callback(X, remove_bindings, [RemovedBindings]),
RemainingBindings = ordsets:to_list(
ordsets:subtract(ordsets:from_list(Bindings),
@@ -659,7 +665,8 @@ test_topic_matching() ->
{"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
"t24", "t26"]},
{"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
- {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]},
+ {"args-test", ["t6", "t22", "t23", "t24", "t25", "t27"]}]),
%% remove the entire exchange
exchange_op_callback(X, delete, [RemainingBindings]),
@@ -1172,7 +1179,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, [], undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -2129,11 +2136,10 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
- Terms = rabbit_queue_index:shutdown_terms(TestQueue),
- PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
+ PRef = rabbit_guid:gen(),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
- TestQueue, Terms, false,
+ TestQueue, [], false,
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
@@ -2144,12 +2150,12 @@ init_test_queue() ->
restart_test_queue(Qi) ->
_ = rabbit_queue_index:terminate([], Qi),
ok = rabbit_variable_queue:stop(),
- ok = rabbit_variable_queue:start([test_queue()]),
+ {ok, _} = rabbit_variable_queue:start([test_queue()]),
init_test_queue().
empty_test_queue() ->
ok = rabbit_variable_queue:stop(),
- ok = rabbit_variable_queue:start([]),
+ {ok, _} = rabbit_variable_queue:start([]),
{0, Qi} = init_test_queue(),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
@@ -2205,7 +2211,7 @@ test_queue_index_props() ->
end),
ok = rabbit_variable_queue:stop(),
- ok = rabbit_variable_queue:start([]),
+ {ok, _} = rabbit_variable_queue:start([]),
passed.
@@ -2329,13 +2335,16 @@ test_queue_index() ->
end),
ok = rabbit_variable_queue:stop(),
- ok = rabbit_variable_queue:start([]),
+ {ok, _} = rabbit_variable_queue:start([]),
passed.
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
- Q, Recover, fun nop/2, fun nop/2, fun nop/1).
+ Q, case Recover of
+ true -> non_clean_shutdown;
+ false -> new
+ end, fun nop/2, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52..020b5b33 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -21,8 +21,8 @@
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0]).
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1,
+ status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -277,11 +277,10 @@
unconfirmed,
confirmed,
ack_out_counter,
- ack_in_counter,
- ack_rates
+ ack_in_counter
}).
--record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
+-record(rates, { in, out, ack_in, ack_out, timestamp }).
-record(msg_status,
{ seq_id,
@@ -322,11 +321,11 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
- ingress :: {timestamp(), non_neg_integer()},
- avg_egress :: float(),
- avg_ingress :: float(),
- timestamp :: timestamp() }).
+-type(rates() :: #rates { in :: float(),
+ out :: float(),
+ ack_in :: float(),
+ ack_out :: float(),
+ timestamp :: timestamp()}).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer(),
@@ -368,8 +367,7 @@
unconfirmed :: gb_set(),
confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
- ack_in_counter :: non_neg_integer(),
- ack_rates :: rates() }).
+ ack_in_counter :: non_neg_integer() }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -384,21 +382,37 @@
count = 0,
end_seq_id = Z }).
+-define(MICROS_PER_SECOND, 1000000.0).
+
+%% We're sampling every 5s for RAM duration; a half life that is of
+%% the same order of magnitude is probably about right.
+-define(RATE_AVG_HALF_LIFE, 5.0).
+
+%% We will recalculate the #rates{} every time we get asked for our
+%% RAM duration, or every N messages published, whichever is
+%% sooner. We do this since the priority calculations in
+%% rabbit_amqqueue_process need fairly fresh rates.
+-define(MSGS_PER_RATE_CALC, 100).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
start(DurableQueues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
+ {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
start_msg_store(
[Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
begin
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
- StartFunState).
+ StartFunState),
+ {ok, AllTerms}.
-stop() -> stop_msg_store().
+stop() ->
+ ok = stop_msg_store(),
+ ok = rabbit_queue_index:stop().
start_msg_store(Refs, StartFunState) ->
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
@@ -419,7 +433,7 @@ init(Queue, Recover, AsyncCallback) ->
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
@@ -430,29 +444,32 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(#amqqueue { name = QueueName, durable = true }, true,
+init(#amqqueue { name = QueueName, durable = true }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- Terms = rabbit_queue_index:shutdown_terms(QueueName),
- {PRef, Terms1} =
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:gen(), []};
- PRef1 -> {PRef1, Terms}
- end,
+ {PRef, RecoveryTerms} = process_recovery_terms(Terms),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
- QueueName, Terms1,
+ QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
+ init(true, IndexState, DeltaCount, RecoveryTerms,
PersistentClient, TransientClient).
+process_recovery_terms(Terms=non_clean_shutdown) ->
+ {rabbit_guid:gen(), Terms};
+process_recovery_terms(Terms) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {rabbit_guid:gen(), []};
+ PRef -> {PRef, Terms}
+ end.
+
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
@@ -533,14 +550,18 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
- PCount1 = PCount + one_if(IsPersistent1),
+ InCount1 = InCount + 1,
+ PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- a(reduce_memory_use(
- inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }))).
+ State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }),
+ a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of
+ true -> update_rates(State3);
+ false -> State3
+ end)).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -689,10 +710,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
set_ram_duration_target(
DurationTarget, State = #vqstate {
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
@@ -709,29 +730,43 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
-ram_duration(State = #vqstate {
- rates = #rates { timestamp = Timestamp,
- egress = Egress,
- ingress = Ingress } = Rates,
- ack_rates = #rates { timestamp = AckTimestamp,
- egress = AckEgress,
- ingress = AckIngress } = ARates,
- in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
- ack_out_counter = AckOutCount,
- ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev,
- ram_pending_ack = RPA,
- ram_ack_count_prev = RamAckCountPrev }) ->
- Now = now(),
- {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
- {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
-
- {AvgAckEgressRate, AckEgress1} =
- update_rate(Now, AckTimestamp, AckOutCount, AckEgress),
- {AvgAckIngressRate, AckIngress1} =
- update_rate(Now, AckTimestamp, AckInCount, AckIngress),
+update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
+ rates = #rates{ in = InRate,
+ out = OutRate,
+ ack_in = AckInRate,
+ ack_out = AckOutRate,
+ timestamp = TS }}) ->
+ Now = erlang:now(),
+
+ Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
+ out = update_rate(Now, TS, OutCount, OutRate),
+ ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
+ timestamp = Now },
+
+ State#vqstate{ in_counter = 0,
+ out_counter = 0,
+ ack_in_counter = 0,
+ ack_out_counter = 0,
+ rates = Rates }.
+
+update_rate(Now, TS, Count, Rate) ->
+ Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND,
+ rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate).
+
+ram_duration(State) ->
+ State1 = #vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
+ ram_msg_count = RamMsgCount,
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_pending_ack = RPA,
+ ram_ack_count_prev = RamAckCountPrev } =
+ update_rates(State),
RamAckCount = gb_trees:size(RPA),
@@ -745,25 +780,7 @@ ram_duration(State = #vqstate {
AvgAckEgressRate + AvgAckIngressRate))
end,
- {Duration, State #vqstate {
- rates = Rates #rates {
- egress = Egress1,
- ingress = Ingress1,
- avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate,
- timestamp = Now },
- ack_rates = ARates #rates {
- egress = AckEgress1,
- ingress = AckIngress1,
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate,
- timestamp = Now },
- in_counter = 0,
- out_counter = 0,
- ack_in_counter = 0,
- ack_out_counter = 0,
- ram_msg_count_prev = RamMsgCount,
- ram_ack_count_prev = RamAckCount }}.
+ {Duration, State1}.
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->
@@ -789,6 +806,10 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate } }) ->
+ {AvgIngressRate, AvgEgressRate}.
+
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
@@ -798,10 +819,11 @@ status(#vqstate {
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate } }) ->
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate }}) ->
+
[ {q1 , ?QUEUE:len(Q1)},
{q2 , ?QUEUE:len(Q2)},
{delta , Delta},
@@ -991,10 +1013,6 @@ expand_delta(SeqId, #delta { count = Count,
expand_delta(_SeqId, #delta { count = Count } = Delta) ->
d(Delta #delta { count = Count + 1 }).
-update_rate(Now, Then, Count, {OThen, OCount}) ->
- %% avg over the current period and the previous
- {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}.
-
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
@@ -1003,7 +1021,12 @@ init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
- DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
+ DeltaCount1 =
+ case Terms of
+ non_clean_shutdown -> DeltaCount;
+ _ -> proplists:get_value(persistent_count,
+ Terms, DeltaCount)
+ end,
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
false -> d(#delta { start_seq_id = LowSeqId,
@@ -1034,22 +1057,21 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ram_ack_count_prev = 0,
out_counter = 0,
in_counter = 0,
- rates = blank_rate(Now, DeltaCount1),
+ rates = blank_rates(Now),
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
confirmed = gb_sets:new(),
ack_out_counter = 0,
- ack_in_counter = 0,
- ack_rates = blank_rate(Now, 0) },
+ ack_in_counter = 0 },
a(maybe_deltas_to_betas(State)).
-blank_rate(Timestamp, IngressLength) ->
- #rates { egress = {Timestamp, 0},
- ingress = {Timestamp, IngressLength},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Timestamp }.
+blank_rates(Now) ->
+ #rates { in = 0.0,
+ out = 0.0,
+ ack_in = 0.0,
+ ack_out = 0.0,
+ timestamp = Now}.
in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
@@ -1523,11 +1545,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
- rates = #rates { avg_ingress = AvgIngress,
- avg_egress = AvgEgress },
- ack_rates = #rates { avg_ingress = AvgAckIngress,
- avg_egress = AvgAckEgress }
- }) ->
+ rates = #rates { in = AvgIngress,
+ out = AvgEgress,
+ ack_in = AvgAckIngress,
+ ack_out = AvgAckEgress } }) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of