summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-23 15:19:31 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-23 15:19:31 +0000
commitbecc1e47cf46605a2dfb58b7b2ae3c6a3f6991de (patch)
tree02fa13ce5f7b5767b4c9ee2b3f54d9ddaa5a6feb
parent220488f1e8df41403f038394a941dd246ce5afad (diff)
parent107edf1a80b9e43d960bff62310b1ecee5157499 (diff)
downloadrabbitmq-server-becc1e47cf46605a2dfb58b7b2ae3c6a3f6991de.tar.gz
Merged bug25315
-rw-r--r--src/rabbit_amqqueue.erl26
-rw-r--r--src/rabbit_amqqueue_process.erl82
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_channel.erl136
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_tests.erl39
-rw-r--r--src/rabbit_variable_queue.erl53
7 files changed, 197 insertions, 151 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8ce1160c..be7c7867 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,7 +34,7 @@
-export([start_mirroring/1, stop_mirroring/1]).
%% internal
--export([internal_declare/2, internal_delete/2, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -156,11 +156,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/2 ::
- (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -257,7 +257,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName, QPid),
+ false -> TailFun = internal_delete(QueueName),
fun () -> TailFun(), ExistingQ end
end
end
@@ -563,7 +563,7 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName, QPid) ->
+internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
@@ -573,8 +573,7 @@ internal_delete(QueueName, QPid) ->
fun() ->
ok = T(),
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QueueName}])
+ [{name, QueueName}])
end
end
end).
@@ -594,7 +593,7 @@ stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
- qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
@@ -607,10 +606,9 @@ on_node_down(Node) ->
fun () ->
T(),
lists:foreach(
- fun({QName, QPid}) ->
+ fun(QName) ->
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QName}])
+ [{name, QName}])
end, Qs)
end
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe3ed88d..476f806d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,7 +85,7 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [pid,
+ [name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
@@ -101,16 +101,14 @@
]).
-define(CREATION_EVENT_KEYS,
- [pid,
- name,
+ [name,
durable,
auto_delete,
arguments,
owner_pid
]).
--define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]).
%%----------------------------------------------------------------------------
@@ -183,7 +181,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, self()),
+ rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -277,7 +275,8 @@ terminate_shutdown(Fun, State) ->
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- [emit_consumer_deleted(Ch, CTag)
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -604,9 +603,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
-remove_consumers(ChPid, Queue) ->
+remove_consumers(ChPid, Queue, QName) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag),
+ emit_consumer_deleted(ChPid, CTag, QName),
false;
(_) ->
true
@@ -647,7 +646,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
- _ = remove_consumers(ChPid, Blocked), %% for stats emission
+ QName = qname(State),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -655,7 +655,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State#q.active_consumers,
+ QName),
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
@@ -954,19 +955,19 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
-emit_consumer_deleted(ChPid, ConsumerTag) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
%%----------------------------------------------------------------------------
@@ -1075,9 +1076,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = ExistingHolder}) ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+ _From, State = #q{exclusive_consumer = Holder}) ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
@@ -1086,7 +1086,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
+ true -> Holder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
@@ -1101,7 +1101,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
+ not NoAck, qname(State2)),
reply(ok, State2)
end;
@@ -1112,7 +1112,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
not_found ->
reply(ok, State);
C = #cr{blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag),
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
@@ -1122,7 +1122,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(From, ok, State1)
@@ -1173,11 +1173,13 @@ handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ QName = qname(State),
case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
+ emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
reply(ok, State).
@@ -1225,7 +1227,7 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
+ BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1277,18 +1279,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
case rabbit_exchange:lookup(XName) of
{ok, X} ->
- noreply(lists:foldl(
- fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMon}) ->
- QPids = dead_letter_publish(Msg, Reason, X,
- State1),
- UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
- QMons = pmon:monitor_all(QPids, QMon),
- State1#q{queue_monitors = QMons,
- publish_seqno = SeqNo + 1,
- unconfirmed = UC1}
- end, State, Msgs));
+ {AckImmediately, State2} =
+ lists:foldl(
+ fun({Msg, AckTag},
+ {Acks, State1 = #q{publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMons}}) ->
+ case dead_letter_publish(Msg, Reason, X, State1) of
+ [] -> {[AckTag | Acks], State1};
+ QPids -> UC1 = dtree:insert(
+ SeqNo, QPids, AckTag, UC),
+ QMons1 = pmon:monitor_all(QPids, QMons),
+ {Acks,
+ State1#q{publish_seqno = SeqNo + 1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1}}
+ end
+ end, {[], State}, Msgs),
+ cleanup_after_confirm(AckImmediately, State2);
{error, not_found} ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 871becc5..a6e9b198 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -149,7 +149,7 @@
%% Acktags supplied are for messages which should be processed. The
%% provided callback function is called with each message.
--callback fold(msg_fun(), state(), [ack()]) -> state().
+-callback foreach_ack(msg_fun(), state(), [ack()]) -> state().
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
@@ -216,7 +216,7 @@ behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b97af6d8..b1ef3b6b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,8 +35,9 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
conn_name, limiter, tx_status, next_tag, unacked_message_q,
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
- virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, delivering_queues,
+ virtual_host, most_recently_declared_queue,
+ queue_names, queue_monitors, consumer_mapping,
+ blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
@@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
- erase_queue_stats(QPid),
- noreply(State4#ch{queue_monitors = pmon:erase(
- QPid, State4#ch.queue_monitors)});
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -677,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -689,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- State1 = monitor_delivering_queue(NoAck, QPid, State),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -728,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
State1 = monitor_delivering_queue(
- NoAck, QPid, State#ch{consumer_mapping = CM1}),
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1126,9 +1133,12 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
delivering_queues = case NoAck of
true -> DQ;
false -> sets:add_element(QPid, DQ)
@@ -1233,18 +1243,18 @@ reject(Requeue, Acked, Limiter) ->
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
- Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State);
+ true -> incr_stats([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
rabbit_trace:tap_trace_out(Msg, TraceState),
@@ -1277,11 +1287,15 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-ack(Acked, State) ->
+ack(Acked, State = #ch{queue_names = QNames}) ->
Incs = fold_per_queue(
fun (QPid, MsgIds, L) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{queue_stats, QPid, length(MsgIds)} | L]
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count} | L];
+ error -> L
+ end
end, [], Acked),
ok = notify_limiter(State#ch.limiter, Acked),
incr_stats(Incs, ack, State).
@@ -1339,23 +1353,42 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
- QNames}, State) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = State#ch{queue_monitors =
- pmon:monitor_all(DeliveredQPids,
- State#ch.queue_monitors)},
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State1),
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1}),
incr_stats([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- State2.
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
+ State1.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
+ incr_stats([{exchange_stats, XName, 1}], return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
@@ -1489,24 +1522,23 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- CoarseStats = infos(?STATISTICS_KEYS, State),
+ Coarse = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse ->
- rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
- fine ->
- FineStats =
- [{channel_queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {channel_exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats,
- Extra ++ CoarseStats ++ FineStats)
+ coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
+ fine -> Fine = [{channel_queue_stats,
+ [{QName, Stats} ||
+ {{queue_stats, QName}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{XName, Stats} ||
+ {{exchange_stats, XName}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
end.
-erase_queue_stats(QPid) ->
- erase({queue_stats, QPid}),
+erase_queue_stats(QName) ->
+ erase({queue_stats, QName}),
[erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+ {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index ac2048b7..8e00fba5 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, fold/3]).
+ status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
-export([start/1, stop/0]).
@@ -303,9 +303,9 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-fold(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
+foreach_ack(MsgFun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 408bacd8..ca4ad49c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1286,8 +1286,7 @@ test_statistics() ->
QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
- {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
- QPid = Q#amqqueue.pid,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
@@ -1311,9 +1310,9 @@ test_statistics() ->
length(proplists:get_value(
channel_queue_exchange_stats, E)) > 0
end),
- [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
[{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
- [{{QPid,X},[{publish,1}]}] =
+ [{{QRes,X},[{publish,1}]}] =
proplists:get_value(channel_queue_exchange_stats, Event2),
%% Check the stats remove stuff on queue deletion
@@ -1338,31 +1337,31 @@ test_refresh_events(SecondaryNode) ->
[channel_created, queue_created]),
{_Writer, Ch} = test_spawn(),
- expect_events(Ch, channel_created),
+ expect_events(pid, Ch, channel_created),
rabbit_channel:shutdown(Ch),
{_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(Ch2, channel_created),
+ expect_events(pid, Ch2, channel_created),
rabbit_channel:shutdown(Ch2),
- {new, #amqqueue { pid = QPid } = Q} =
+ {new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(test_queue(), false, false, [], none),
- expect_events(QPid, queue_created),
+ expect_events(name, QName, queue_created),
rabbit_amqqueue:delete(Q, false, false),
rabbit_tests_event_receiver:stop(),
passed.
-expect_events(Pid, Type) ->
- expect_event(Pid, Type),
+expect_events(Tag, Key, Type) ->
+ expect_event(Tag, Key, Type),
rabbit:force_event_refresh(),
- expect_event(Pid, Type).
+ expect_event(Tag, Key, Type).
-expect_event(Pid, Type) ->
+expect_event(Tag, Key, Type) ->
receive #event{type = Type, props = Props} ->
- case pget(pid, Props) of
- Pid -> ok;
- _ -> expect_event(Pid, Type)
+ case pget(Tag, Props) of
+ Key -> ok;
+ _ -> expect_event(Tag, Key, Type)
end
after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
@@ -2302,6 +2301,7 @@ test_variable_queue() ->
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
fun test_drop/1,
+ fun test_variable_queue_fold_msg_on_disk/1,
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
@@ -2534,6 +2534,13 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
+test_variable_queue_fold_msg_on_disk(VQ0) ->
+ VQ1 = variable_queue_publish(true, 1, VQ0),
+ {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
+ VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end,
+ VQ2, AckTags),
+ VQ3.
+
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
@@ -2559,7 +2566,7 @@ test_queue_recover() ->
rabbit_variable_queue:fetch(true, VQ1),
CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName, QPid1)
+ rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3a025ba3..292217da 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -21,7 +21,7 @@
dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
-export([start/1, stop/0]).
@@ -348,7 +348,7 @@
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
pending_ack :: gb_tree(),
- ram_ack_index :: gb_tree(),
+ ram_ack_index :: gb_set(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -647,16 +647,15 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-fold(undefined, State, _AckTags) ->
+foreach_ack(undefined, State, _AckTags) ->
State;
-fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- lists:foldl(
- fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags).
+foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
+ a(lists:foldl(fun(SeqId, State1) ->
+ {MsgStatus, State2} =
+ read_msg(gb_trees:get(SeqId, PA), false, State1),
+ MsgFun(MsgStatus#msg_status.msg, SeqId),
+ State2
+ end, State, AckTags)).
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
@@ -732,7 +731,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_trees:size(RamAckIndex),
+ RamAckCount = gb_sets:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -811,7 +810,7 @@ status(#vqstate {
{pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
+ {ram_ack_count , gb_sets:size(RAI)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -846,6 +845,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
+ true = RamMsgCount =< Len,
State.
@@ -1015,7 +1015,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty(),
+ ram_ack_index = gb_sets:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1071,17 +1071,19 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
+read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
+
read_msg(MsgStatus = #msg_status { msg = undefined,
msg_id = MsgId,
is_persistent = IsPersistent },
- State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+ CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
{MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + 1,
+ State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, State) ->
+read_msg(MsgStatus, _CountDiskToRam, State) ->
{MsgStatus, State}.
internal_fetch(AckRequired, MsgStatus = #msg_status {
@@ -1230,7 +1232,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%%----------------------------------------------------------------------------
record_pending_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
msg_on_disk = MsgOnDisk } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
@@ -1238,7 +1239,7 @@ record_pending_ack(#msg_status { seq_id = SeqId,
{AckEntry, RAI1} =
case MsgOnDisk of
true -> {m(trim_msg_status(MsgStatus)), RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
+ false -> {MsgStatus, gb_sets:insert(SeqId, RAI)}
end,
State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
ram_ack_index = RAI1,
@@ -1248,7 +1249,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
{gb_trees:get(SeqId, PA),
State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+ ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}.
purge_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
@@ -1259,7 +1260,7 @@ purge_pending_ack(KeepPersistent,
accumulate_ack(MsgStatus, Acc)
end, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty() },
+ ram_ack_index = gb_sets:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1459,7 +1460,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
+ case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
@@ -1487,12 +1488,12 @@ limit_ram_acks(0, State) ->
{0, State};
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- case gb_trees:is_empty(RAI) of
+ case gb_sets:is_empty(RAI) of
true ->
{Quota, State};
false ->
- {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
+ {SeqId, RAI1} = gb_sets:take_largest(RAI),
+ MsgStatus = #msg_status { is_persistent = false} =
gb_trees:get(SeqId, PA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),