diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-11-23 15:19:31 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-11-23 15:19:31 +0000 |
commit | becc1e47cf46605a2dfb58b7b2ae3c6a3f6991de (patch) | |
tree | 02fa13ce5f7b5767b4c9ee2b3f54d9ddaa5a6feb | |
parent | 220488f1e8df41403f038394a941dd246ce5afad (diff) | |
parent | 107edf1a80b9e43d960bff62310b1ecee5157499 (diff) | |
download | rabbitmq-server-becc1e47cf46605a2dfb58b7b2ae3c6a3f6991de.tar.gz |
Merged bug25315
-rw-r--r-- | src/rabbit_amqqueue.erl | 26 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 82 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 136 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 39 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 53 |
7 files changed, 197 insertions, 151 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8ce1160c..be7c7867 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,7 +34,7 @@ -export([start_mirroring/1, stop_mirroring/1]). %% internal --export([internal_declare/2, internal_delete/2, run_backing_queue/3, +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -156,11 +156,11 @@ -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/2 :: - (name(), pid()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/1 :: + (name()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -257,7 +257,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName, QPid), + false -> TailFun = internal_delete(QueueName), fun () -> TailFun(), ExistingQ end end end @@ -563,7 +563,7 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName, QPid) -> +internal_delete(QueueName) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of @@ -573,8 +573,7 @@ internal_delete(QueueName, QPid) -> fun() -> ok = T(), ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QueueName}]) + [{name, QueueName}]) end end end). @@ -594,7 +593,7 @@ stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = - qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + qlc:e(qlc:q([{QName, delete_queue(QName)} || #amqqueue{name = QName, pid = Pid, slave_pids = []} <- mnesia:table(rabbit_queue), @@ -607,10 +606,9 @@ on_node_down(Node) -> fun () -> T(), lists:foreach( - fun({QName, QPid}) -> + fun(QName) -> ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QName}]) + [{name, QName}]) end, Qs) end end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe3ed88d..476f806d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -85,7 +85,7 @@ %%---------------------------------------------------------------------------- -define(STATISTICS_KEYS, - [pid, + [name, policy, exclusive_consumer_pid, exclusive_consumer_tag, @@ -101,16 +101,14 @@ ]). -define(CREATION_EVENT_KEYS, - [pid, - name, + [name, durable, auto_delete, arguments, owner_pid ]). --define(INFO_KEYS, - ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]). %%---------------------------------------------------------------------------- @@ -183,7 +181,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete doesn't return 'ok'. - rabbit_amqqueue:internal_delete(QName, self()), + rabbit_amqqueue:internal_delete(QName), BQS1 end, State). @@ -277,7 +275,8 @@ terminate_shutdown(Fun, State) -> case BQS of undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), - [emit_consumer_deleted(Ch, CTag) + QName = qname(State), + [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -604,9 +603,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> (CP /= ChPid) or (CTag /= ConsumerTag) end, Queue). -remove_consumers(ChPid, Queue) -> +remove_consumers(ChPid, Queue, QName) -> queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag), + emit_consumer_deleted(ChPid, CTag, QName), false; (_) -> true @@ -647,7 +646,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> - _ = remove_consumers(ChPid, Blocked), %% for stats emission + QName = qname(State), + _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -655,7 +655,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), + ChPid, State#q.active_consumers, + QName), senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; @@ -954,19 +955,19 @@ emit_stats(State) -> emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). -emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) -> rabbit_event:notify(consumer_created, [{consumer_tag, ConsumerTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, - {queue, self()}]). + {queue, QName}]). -emit_consumer_deleted(ChPid, ConsumerTag) -> +emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, {channel, ChPid}, - {queue, self()}]). + {queue, QName}]). %%---------------------------------------------------------------------------- @@ -1075,9 +1076,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{exclusive_consumer = ExistingHolder}) -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of + _From, State = #q{exclusive_consumer = Holder}) -> + case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> @@ -1086,7 +1086,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder + true -> Holder end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, @@ -1101,7 +1101,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, run_message_queue(State1#q{active_consumers = AC1}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck), + not NoAck, qname(State2)), reply(ok, State2) end; @@ -1112,7 +1112,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, not_found -> reply(ok, State); C = #cr{blocked_consumers = Blocked} -> - emit_consumer_deleted(ChPid, ConsumerTag), + emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), State1 = State#q{ @@ -1122,7 +1122,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, end, active_consumers = remove_consumer( ChPid, ConsumerTag, - State#q.active_consumers)}, + State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(From, ok, State1) @@ -1173,11 +1173,13 @@ handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + QName = qname(State), case Exclusive of - none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || + none -> [emit_consumer_created( + Ch, CTag, false, AckRequired, QName) || {Ch, CTag, AckRequired} <- consumers(State)]; {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired) + emit_consumer_created(Ch, CTag, true, AckRequired, QName) end, reply(ok, State). @@ -1225,7 +1227,7 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end, + BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end, BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); @@ -1277,18 +1279,24 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> case rabbit_exchange:lookup(XName) of {ok, X} -> - noreply(lists:foldl( - fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo, - unconfirmed = UC, - queue_monitors = QMon}) -> - QPids = dead_letter_publish(Msg, Reason, X, - State1), - UC1 = dtree:insert(SeqNo, QPids, AckTag, UC), - QMons = pmon:monitor_all(QPids, QMon), - State1#q{queue_monitors = QMons, - publish_seqno = SeqNo + 1, - unconfirmed = UC1} - end, State, Msgs)); + {AckImmediately, State2} = + lists:foldl( + fun({Msg, AckTag}, + {Acks, State1 = #q{publish_seqno = SeqNo, + unconfirmed = UC, + queue_monitors = QMons}}) -> + case dead_letter_publish(Msg, Reason, X, State1) of + [] -> {[AckTag | Acks], State1}; + QPids -> UC1 = dtree:insert( + SeqNo, QPids, AckTag, UC), + QMons1 = pmon:monitor_all(QPids, QMons), + {Acks, + State1#q{publish_seqno = SeqNo + 1, + unconfirmed = UC1, + queue_monitors = QMons1}} + end + end, {[], State}, Msgs), + cleanup_after_confirm(AckImmediately, State2); {error, not_found} -> cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 871becc5..a6e9b198 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -149,7 +149,7 @@ %% Acktags supplied are for messages which should be processed. The %% provided callback function is called with each message. --callback fold(msg_fun(), state(), [ack()]) -> state(). +-callback foreach_ack(msg_fun(), state(), [ack()]) -> state(). %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. @@ -216,7 +216,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, - {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, + {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b97af6d8..b1ef3b6b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,8 +35,9 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, - virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, delivering_queues, + virtual_host, most_recently_declared_queue, + queue_names, queue_monitors, consumer_mapping, + blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). @@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), @@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), - erase_queue_stats(QPid), - noreply(State4#ch{queue_monitors = pmon:erase( - QPid, State4#ch.queue_monitors)}); + #ch{queue_names = QNames, queue_monitors = QMons} = State4, + case dict:find(QPid, QNames) of + {ok, QName} -> erase_queue_stats(QName); + error -> ok + end, + noreply(State4#ch{queue_names = dict:erase(QPid, QNames), + queue_monitors = pmon:erase(QPid, QMons)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -677,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -689,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - State1 = monitor_delivering_queue(NoAck, QPid, State), + State1 = monitor_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} @@ -728,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q = #amqqueue{pid = QPid}} -> + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), State1 = monitor_delivering_queue( - NoAck, QPid, State#ch{consumer_mapping = CM1}), + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1126,9 +1133,12 @@ consumer_monitor(ConsumerTag, State end. -monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons, - delivering_queues = DQ}) -> - State#ch{queue_monitors = pmon:monitor(QPid, QMons), +monitor_delivering_queue(NoAck, QPid, QName, + State = #ch{queue_names = QNames, + queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_names = dict:store(QPid, QName, QNames), + queue_monitors = pmon:monitor(QPid, QMons), delivering_queues = case NoAck of true -> DQ; false -> sets:add_element(QPid, DQ) @@ -1233,18 +1243,18 @@ reject(Requeue, Acked, Limiter) -> ok = notify_limiter(Limiter, Acked). record_sent(ConsumerTag, AckRequired, - Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, trace_state = TraceState}) -> - incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State); + true -> incr_stats([{queue_stats, QName, 1}], redeliver, State); false -> ok end, rabbit_trace:tap_trace_out(Msg, TraceState), @@ -1277,11 +1287,15 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. -ack(Acked, State) -> +ack(Acked, State = #ch{queue_names = QNames}) -> Incs = fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - [{queue_stats, QPid, length(MsgIds)} | L] + case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + [{queue_stats, QName, Count} | L]; + error -> L + end end, [], Acked), ok = notify_limiter(State#ch.limiter, Acked), incr_stats(Incs, ack, State). @@ -1339,23 +1353,42 @@ notify_limiter(Limiter, Acked) -> deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, - QNames}, State) -> - {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = State#ch{queue_monitors = - pmon:monitor_all(DeliveredQPids, - State#ch.queue_monitors)}, - State2 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State1), + DelQNames}, State = #ch{queue_names = QNames, + queue_monitors = QMons}) -> + Qs = rabbit_amqqueue:lookup(DelQNames), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + %% The pmon:monitor_all/2 monitors all queues to which we + %% delivered. But we want to monitor even queues we didn't deliver + %% to, since we need their 'DOWN' messages to clean + %% queue_names. So we also need to monitor each QPid from + %% queues. But that only gets the masters (which is fine for + %% cleaning queue_names), so we need the union of both. + %% + %% ...and we need to add even non-delivered queues to queue_names + %% since alternative algorithms to update queue_names less + %% frequently would in fact be more expensive in the common case. + {QNames1, QMons1} = + lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, + {QNames0, QMons0}) -> + {case dict:is_key(QPid, QNames0) of + true -> QNames0; + false -> dict:store(QPid, QName, QNames0) + end, pmon:monitor(QPid, QMons0)} + end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), + State1 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, + State#ch{queue_names = QNames1, + queue_monitors = QMons1}), incr_stats([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - State2. + [{queue_exchange_stats, {QName, XName}, 1} || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]], + publish, State1), + State1. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}], - return_unroutable, State), + incr_stats([{exchange_stats, XName, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1489,24 +1522,23 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - CoarseStats = infos(?STATISTICS_KEYS, State), + Coarse = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - coarse -> - rabbit_event:notify(channel_stats, Extra ++ CoarseStats); - fine -> - FineStats = - [{channel_queue_stats, - [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, - {channel_exchange_stats, - [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {channel_queue_exchange_stats, - [{QX, Stats} || - {{queue_exchange_stats, QX}, Stats} <- get()]}], - rabbit_event:notify(channel_stats, - Extra ++ CoarseStats ++ FineStats) + coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse); + fine -> Fine = [{channel_queue_stats, + [{QName, Stats} || + {{queue_stats, QName}, Stats} <- get()]}, + {channel_exchange_stats, + [{XName, Stats} || + {{exchange_stats, XName}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine) end. -erase_queue_stats(QPid) -> - erase({queue_stats, QPid}), +erase_queue_stats(QName) -> + erase({queue_stats, QName}), [erase({queue_exchange_stats, QX}) || - {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), + QName0 =:= QName]. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index ac2048b7..8e00fba5 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, fold/3]). + status/1, invoke/3, is_duplicate/2, foreach_ack/3]). -export([start/1, stop/0]). @@ -303,9 +303,9 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -fold(MsgFun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }, AckTags) -> - State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }. +foreach_ack(MsgFun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }. requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 408bacd8..ca4ad49c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1286,8 +1286,7 @@ test_statistics() -> QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, - {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), - QPid = Q#amqqueue.pid, + QRes = rabbit_misc:r(<<"/">>, queue, QName), X = rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), @@ -1311,9 +1310,9 @@ test_statistics() -> length(proplists:get_value( channel_queue_exchange_stats, E)) > 0 end), - [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), + [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), - [{{QPid,X},[{publish,1}]}] = + [{{QRes,X},[{publish,1}]}] = proplists:get_value(channel_queue_exchange_stats, Event2), %% Check the stats remove stuff on queue deletion @@ -1338,31 +1337,31 @@ test_refresh_events(SecondaryNode) -> [channel_created, queue_created]), {_Writer, Ch} = test_spawn(), - expect_events(Ch, channel_created), + expect_events(pid, Ch, channel_created), rabbit_channel:shutdown(Ch), {_Writer2, Ch2} = test_spawn(SecondaryNode), - expect_events(Ch2, channel_created), + expect_events(pid, Ch2, channel_created), rabbit_channel:shutdown(Ch2), - {new, #amqqueue { pid = QPid } = Q} = + {new, #amqqueue{name = QName} = Q} = rabbit_amqqueue:declare(test_queue(), false, false, [], none), - expect_events(QPid, queue_created), + expect_events(name, QName, queue_created), rabbit_amqqueue:delete(Q, false, false), rabbit_tests_event_receiver:stop(), passed. -expect_events(Pid, Type) -> - expect_event(Pid, Type), +expect_events(Tag, Key, Type) -> + expect_event(Tag, Key, Type), rabbit:force_event_refresh(), - expect_event(Pid, Type). + expect_event(Tag, Key, Type). -expect_event(Pid, Type) -> +expect_event(Tag, Key, Type) -> receive #event{type = Type, props = Props} -> - case pget(pid, Props) of - Pid -> ok; - _ -> expect_event(Pid, Type) + case pget(Tag, Props) of + Key -> ok; + _ -> expect_event(Tag, Key, Type) end after ?TIMEOUT -> throw({failed_to_receive_event, Type}) end. @@ -2302,6 +2301,7 @@ test_variable_queue() -> fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_drop/1, + fun test_variable_queue_fold_msg_on_disk/1, fun test_dropwhile/1, fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, @@ -2534,6 +2534,13 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. +test_variable_queue_fold_msg_on_disk(VQ0) -> + VQ1 = variable_queue_publish(true, 1, VQ0), + {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), + VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end, + VQ2, AckTags), + VQ3. + test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), {new, #amqqueue { pid = QPid, name = QName } = Q} = @@ -2559,7 +2566,7 @@ test_queue_recover() -> rabbit_variable_queue:fetch(true, VQ1), CountMinusOne = rabbit_variable_queue:len(VQ2), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), - rabbit_amqqueue:internal_delete(QName, QPid1) + rabbit_amqqueue:internal_delete(QName) end), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3a025ba3..292217da 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -21,7 +21,7 @@ dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0, fold/3]). + is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). -export([start/1, stop/0]). @@ -348,7 +348,7 @@ q4 :: ?QUEUE:?QUEUE(), next_seq_id :: seq_id(), pending_ack :: gb_tree(), - ram_ack_index :: gb_tree(), + ram_ack_index :: gb_set(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -647,16 +647,15 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -fold(undefined, State, _AckTags) -> +foreach_ack(undefined, State, _AckTags) -> State; -fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> - lists:foldl( - fun(SeqId, State1) -> - {MsgStatus, State2} = - read_msg(gb_trees:get(SeqId, PA), State1), - MsgFun(MsgStatus#msg_status.msg, SeqId), - State2 - end, State, AckTags). +foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> + a(lists:foldl(fun(SeqId, State1) -> + {MsgStatus, State2} = + read_msg(gb_trees:get(SeqId, PA), false, State1), + MsgFun(MsgStatus#msg_status.msg, SeqId), + State2 + end, State, AckTags)). requeue(AckTags, #vqstate { delta = Delta, q3 = Q3, @@ -732,7 +731,7 @@ ram_duration(State = #vqstate { {AvgAckIngressRate, AckIngress1} = update_rate(Now, AckTimestamp, AckInCount, AckIngress), - RamAckCount = gb_trees:size(RamAckIndex), + RamAckCount = gb_sets:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso @@ -811,7 +810,7 @@ status(#vqstate { {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RAI)}, + {ram_ack_count , gb_sets:size(RAI)}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, @@ -846,6 +845,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = Len >= 0, true = PersistentCount >= 0, true = RamMsgCount >= 0, + true = RamMsgCount =< Len, State. @@ -1015,7 +1015,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q4 = ?QUEUE:new(), next_seq_id = NextSeqId, pending_ack = gb_trees:empty(), - ram_ack_index = gb_trees:empty(), + ram_ack_index = gb_sets:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1071,17 +1071,19 @@ queue_out(State = #vqstate { q4 = Q4 }) -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. +read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State). + read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> + CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount, + msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + 1, + State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam), msg_store_clients = MSCState1 }}; -read_msg(MsgStatus, State) -> +read_msg(MsgStatus, _CountDiskToRam, State) -> {MsgStatus, State}. internal_fetch(AckRequired, MsgStatus = #msg_status { @@ -1230,7 +1232,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %%---------------------------------------------------------------------------- record_pending_ack(#msg_status { seq_id = SeqId, - msg_id = MsgId, msg_on_disk = MsgOnDisk } = MsgStatus, State = #vqstate { pending_ack = PA, ram_ack_index = RAI, @@ -1238,7 +1239,7 @@ record_pending_ack(#msg_status { seq_id = SeqId, {AckEntry, RAI1} = case MsgOnDisk of true -> {m(trim_msg_status(MsgStatus)), RAI}; - false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} + false -> {MsgStatus, gb_sets:insert(SeqId, RAI)} end, State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA), ram_ack_index = RAI1, @@ -1248,7 +1249,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> {gb_trees:get(SeqId, PA), State #vqstate { pending_ack = gb_trees:delete(SeqId, PA), - ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. + ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}. purge_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, @@ -1259,7 +1260,7 @@ purge_pending_ack(KeepPersistent, accumulate_ack(MsgStatus, Acc) end, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = gb_trees:empty(), - ram_ack_index = gb_trees:empty() }, + ram_ack_index = gb_sets:empty() }, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of error -> State1; @@ -1459,7 +1460,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = - case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is @@ -1487,12 +1488,12 @@ limit_ram_acks(0, State) -> {0, State}; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - case gb_trees:is_empty(RAI) of + case gb_sets:is_empty(RAI) of true -> {Quota, State}; false -> - {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), - MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = + {SeqId, RAI1} = gb_sets:take_largest(RAI), + MsgStatus = #msg_status { is_persistent = false} = gb_trees:get(SeqId, PA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), |