diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-06 10:48:30 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-06 10:48:30 +0000 |
commit | c8d85b08d02b6c96419fff88eb5ea77bd0f8f3bc (patch) | |
tree | d4249d72e805f56282114c5f1d94170a759a3650 | |
parent | f2ca90882a6f4e5b67a990d023b91d45101dc8f6 (diff) | |
download | rabbitmq-server-c8d85b08d02b6c96419fff88eb5ea77bd0f8f3bc.tar.gz |
add stats for confirms
To recap, a published message is confirmed by the channel. A message
is confirmed only after all the queues it was published to confirm it.
With the current change, the emitted stats look like this:
{channel_exchange_stats,
[{{resource,<<"/">>,exchange,<<"direct">>},
[{confirm,545},{publish,545}]}]},
{channel_queue_exchange_stats,
[{{<0.204.0>,{resource,<<"/">>,exchange,<<"direct">>}},
[{confirm,545},{publish,545}]},
{{<0.195.0>,{resource,<<"/">>,exchange,<<"direct">>}},
[{confirm,545},{publish,545}]}]}]
The confirm field in channel_exchange_stats represents the number of
messages sent to that exchange that have also been confirmed. If the
exchanged routed the message to different queues, this number is only
increased when all queues have confirmed the message. If the message
was unroutable or was routed to 0 queues, this number is still
increased. This is the number of basic.confirms sent back to
publisher.
The confirm field in channel_queue_exchange_stats represents the
number of messages confirmed by that queue (but not necessarily
confirmed by the channel).
In channel_exchange_stats, if the number of confirms lags behind the
number of publishes, one of the queues is not confirming messages in a
timely fashion.
-rw-r--r-- | src/rabbit_channel.erl | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ef85c318..e0f6f0e2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -467,29 +467,30 @@ send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) State; send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, State = #ch{confirm_multiple = false}) -> - maybe_incr_confirm_stats(QPid, ExchangeName, State), + maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.ack'{delivery_tag = MSN}), + maybe_incr_stats([{ExchangeName, 1}], confirm, State1), State1 end, State); send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, State = #ch{confirm_multiple = true}) -> - maybe_incr_confirm_stats(QPid, ExchangeName, State), + maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> + maybe_incr_stats([{ExchangeName, 1}], confirm, State1), start_confirm_timer( State1#ch{held_confirms = gb_sets:add(MSN, As)}) end, State). -maybe_incr_confirm_stats(QPid, ExchangeName, State) -> - maybe_incr_stats([{ExchangeName, 1}], confirm, State), +maybe_incr_confirm_queue_stats(QPid, ExchangeName, State) -> case QPid of undefined -> ok; - _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State) + _ -> maybe_incr_stats([{{QPid, ExchangeName}, 1}], confirm, State) end. do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, @@ -568,7 +569,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, content = DecodedContent, guid = rabbit_guid:guid(), is_persistent = IsPersistent}, - io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -579,7 +579,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State2), - io:format("did~n"), {noreply, case TxnKey of none -> State2; _ -> add_tx_participants(DeliveredQPids, State2) @@ -1256,7 +1255,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _Msg, State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> EFM1 = dict:store(MsgSeqNo, XName, EFM), - io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, XName]), QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}. @@ -1368,6 +1366,7 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], + io:format("Stats: ~p~n", [Extra ++ CoarseStats ++ FineStats]), rabbit_event:notify(channel_stats, Extra ++ CoarseStats ++ FineStats) end. |