diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-16 16:53:56 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-16 16:53:56 +0000 |
commit | 97491563e8d8fb1cac3b6f01b5471b78307bc11e (patch) | |
tree | c6cb792c381833f7fd6161cae9d124a83c5b830a | |
parent | e2f5beaf5d2f9db139fc8cfa4e61d4d5f06bbbff (diff) | |
download | rabbitmq-server-97491563e8d8fb1cac3b6f01b5471b78307bc11e.tar.gz |
identify queues by name rather than pid in channel stats
- in deliver_to_queues, take the result of rabbit_amqqueue:lookup and
use it to a) add entries to a QPid -> QName mapping in the state
for all master pids, and b) setup monitors for all master pids
- for the stats creation in deliver_to_queues, map the DeliveredQPids
to the associated QNames, via the mapping in the State. Note that
this will ignore slave pids, hence we only get one stat per QName
(which is good). Also, in the event that the master died between
lookup and delivery (and the delivery was 'mandatory'), we will not
record any stats at all.
- in monitor_delivering_queue, which is called by basic.{consume,get}
we add to the mapping
- in ack/2 we use the mapping to obtain QNames from QPids, and use
that in stats. Since a queue may have vanished prior to the
ack/reject arriving, we need to handle the case of no entry being
present in the mapping for the given QPid.
- in record_sent we have the QName anyway, so can just record stats
against that instead of the QPid.
- in the 'DOWN' handler we use the mapping to determine the QName from
the pid, and pass that to erase_queue_stats, which can now remove
entries based on the QName. We then remove the entry from the
mapping.
-rw-r--r-- | src/rabbit_channel.erl | 124 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 29 |
2 files changed, 86 insertions, 67 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b97af6d8..f8f099f6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,8 +35,9 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, - virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, delivering_queues, + virtual_host, most_recently_declared_queue, + queue_names, queue_monitors, consumer_mapping, + blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). @@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), @@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), - erase_queue_stats(QPid), - noreply(State4#ch{queue_monitors = pmon:erase( - QPid, State4#ch.queue_monitors)}); + #ch{queue_names = QNames, queue_monitors = QMons} = State4, + case dict:find(QPid, QNames) of + {ok, QName} -> erase_queue_stats(QName); + error -> ok + end, + noreply(State4#ch{queue_names = dict:erase(QPid, QNames), + queue_monitors = pmon:erase(QPid, QMons)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -677,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -689,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - State1 = monitor_delivering_queue(NoAck, QPid, State), + State1 = monitor_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} @@ -728,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q = #amqqueue{pid = QPid}} -> + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), State1 = monitor_delivering_queue( - NoAck, QPid, State#ch{consumer_mapping = CM1}), + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1126,9 +1133,12 @@ consumer_monitor(ConsumerTag, State end. -monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons, - delivering_queues = DQ}) -> - State#ch{queue_monitors = pmon:monitor(QPid, QMons), +monitor_delivering_queue(NoAck, QPid, QName, + State = #ch{queue_names = QNames, + queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_names = dict:store(QPid, QName, QNames), + queue_monitors = pmon:monitor(QPid, QMons), delivering_queues = case NoAck of true -> DQ; false -> sets:add_element(QPid, DQ) @@ -1233,18 +1243,18 @@ reject(Requeue, Acked, Limiter) -> ok = notify_limiter(Limiter, Acked). record_sent(ConsumerTag, AckRequired, - Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, trace_state = TraceState}) -> - incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State); + true -> incr_stats([{queue_stats, QName, 1}], redeliver, State); false -> ok end, rabbit_trace:tap_trace_out(Msg, TraceState), @@ -1277,11 +1287,15 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. -ack(Acked, State) -> +ack(Acked, State = #ch{queue_names = QNames}) -> Incs = fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - [{queue_stats, QPid, length(MsgIds)} | L] + case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + [{queue_stats, QName, Count} | L]; + error -> L + end end, [], Acked), ok = notify_limiter(State#ch.limiter, Acked), incr_stats(Incs, ack, State). @@ -1339,23 +1353,30 @@ notify_limiter(Limiter, Acked) -> deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, - QNames}, State) -> - {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = State#ch{queue_monitors = - pmon:monitor_all(DeliveredQPids, - State#ch.queue_monitors)}, - State2 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State1), + DelQNames}, State = #ch{queue_names = QNames, + queue_monitors = QMons}) -> + Qs = rabbit_amqqueue:lookup(DelQNames), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + {QNames1, QMons1} = + lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, + {QNames0, QMons0}) -> + {dict:store(QPid, QName, QNames0), + pmon:monitor(QPid, QMons0)} + end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), + State1 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, + State#ch{queue_names = QNames1, + queue_monitors = QMons1}), incr_stats([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - State2. + [{queue_exchange_stats, {QName, XName}, 1} || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]], + publish, State1), + State1. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}], - return_unroutable, State), + incr_stats([{exchange_stats, XName, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1489,24 +1510,23 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - CoarseStats = infos(?STATISTICS_KEYS, State), + Coarse = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - coarse -> - rabbit_event:notify(channel_stats, Extra ++ CoarseStats); - fine -> - FineStats = - [{channel_queue_stats, - [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, - {channel_exchange_stats, - [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {channel_queue_exchange_stats, - [{QX, Stats} || - {{queue_exchange_stats, QX}, Stats} <- get()]}], - rabbit_event:notify(channel_stats, - Extra ++ CoarseStats ++ FineStats) + coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse); + fine -> Fine = [{channel_queue_stats, + [{QName, Stats} || + {{queue_stats, QName}, Stats} <- get()]}, + {channel_exchange_stats, + [{XName, Stats} || + {{exchange_stats, XName}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine) end. -erase_queue_stats(QPid) -> - erase({queue_stats, QPid}), +erase_queue_stats(QName) -> + erase({queue_stats, QName}), [erase({queue_exchange_stats, QX}) || - {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), + QName0 =:= QName]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e25843c3..8b2f3b3a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1286,8 +1286,7 @@ test_statistics() -> QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, - {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), - QPid = Q#amqqueue.pid, + QRes = rabbit_misc:r(<<"/">>, queue, QName), X = rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), @@ -1311,9 +1310,9 @@ test_statistics() -> length(proplists:get_value( channel_queue_exchange_stats, E)) > 0 end), - [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), + [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), - [{{QPid,X},[{publish,1}]}] = + [{{QRes,X},[{publish,1}]}] = proplists:get_value(channel_queue_exchange_stats, Event2), %% Check the stats remove stuff on queue deletion @@ -1338,31 +1337,31 @@ test_refresh_events(SecondaryNode) -> [channel_created, queue_created]), {_Writer, Ch} = test_spawn(), - expect_events(Ch, channel_created), + expect_events(pid, Ch, channel_created), rabbit_channel:shutdown(Ch), {_Writer2, Ch2} = test_spawn(SecondaryNode), - expect_events(Ch2, channel_created), + expect_events(pid, Ch2, channel_created), rabbit_channel:shutdown(Ch2), - {new, #amqqueue { pid = QPid } = Q} = + {new, #amqqueue{name = QName} = Q} = rabbit_amqqueue:declare(test_queue(), false, false, [], none), - expect_events(QPid, queue_created), + expect_events(name, QName, queue_created), rabbit_amqqueue:delete(Q, false, false), rabbit_tests_event_receiver:stop(), passed. -expect_events(Pid, Type) -> - expect_event(Pid, Type), +expect_events(Tag, Key, Type) -> + expect_event(Tag, Key, Type), rabbit:force_event_refresh(), - expect_event(Pid, Type). + expect_event(Tag, Key, Type). -expect_event(Pid, Type) -> +expect_event(Tag, Key, Type) -> receive #event{type = Type, props = Props} -> - case pget(pid, Props) of - Pid -> ok; - _ -> expect_event(Pid, Type) + case pget(Tag, Props) of + Key -> ok; + _ -> expect_event(Tag, Key, Type) end after ?TIMEOUT -> throw({failed_to_receive_event, Type}) end. |