summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-06 10:48:30 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-06 10:48:30 +0000
commitc8d85b08d02b6c96419fff88eb5ea77bd0f8f3bc (patch)
treed4249d72e805f56282114c5f1d94170a759a3650
parentf2ca90882a6f4e5b67a990d023b91d45101dc8f6 (diff)
downloadrabbitmq-server-c8d85b08d02b6c96419fff88eb5ea77bd0f8f3bc.tar.gz
add stats for confirms
To recap, a published message is confirmed by the channel. A message is confirmed only after all the queues it was published to confirm it. With the current change, the emitted stats look like this: {channel_exchange_stats, [{{resource,<<"/">>,exchange,<<"direct">>}, [{confirm,545},{publish,545}]}]}, {channel_queue_exchange_stats, [{{<0.204.0>,{resource,<<"/">>,exchange,<<"direct">>}}, [{confirm,545},{publish,545}]}, {{<0.195.0>,{resource,<<"/">>,exchange,<<"direct">>}}, [{confirm,545},{publish,545}]}]}] The confirm field in channel_exchange_stats represents the number of messages sent to that exchange that have also been confirmed. If the exchanged routed the message to different queues, this number is only increased when all queues have confirmed the message. If the message was unroutable or was routed to 0 queues, this number is still increased. This is the number of basic.confirms sent back to publisher. The confirm field in channel_queue_exchange_stats represents the number of messages confirmed by that queue (but not necessarily confirmed by the channel). In channel_exchange_stats, if the number of confirms lags behind the number of publishes, one of the queues is not confirming messages in a timely fashion.
-rw-r--r--src/rabbit_channel.erl15
1 files changed, 7 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ef85c318..e0f6f0e2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -467,29 +467,30 @@ send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false})
State;
send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
State = #ch{confirm_multiple = false}) ->
- maybe_incr_confirm_stats(QPid, ExchangeName, State),
+ maybe_incr_confirm_queue_stats(QPid, ExchangeName, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
WriterPid, #'basic.ack'{delivery_tag = MSN}),
+ maybe_incr_stats([{ExchangeName, 1}], confirm, State1),
State1
end, State);
send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
State = #ch{confirm_multiple = true}) ->
- maybe_incr_confirm_stats(QPid, ExchangeName, State),
+ maybe_incr_confirm_queue_stats(QPid, ExchangeName, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
+ maybe_incr_stats([{ExchangeName, 1}], confirm, State1),
start_confirm_timer(
State1#ch{held_confirms = gb_sets:add(MSN, As)})
end, State).
-maybe_incr_confirm_stats(QPid, ExchangeName, State) ->
- maybe_incr_stats([{ExchangeName, 1}], confirm, State),
+maybe_incr_confirm_queue_stats(QPid, ExchangeName, State) ->
case QPid of
undefined -> ok;
- _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State)
+ _ -> maybe_incr_stats([{{QPid, ExchangeName}, 1}], confirm, State)
end.
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
@@ -568,7 +569,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
content = DecodedContent,
guid = rabbit_guid:guid(),
is_persistent = IsPersistent},
- io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -579,7 +579,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State2),
- io:format("did~n"),
{noreply, case TxnKey of
none -> State2;
_ -> add_tx_participants(DeliveredQPids, State2)
@@ -1256,7 +1255,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _Msg,
State = #ch{queues_for_msg = QFM,
exchange_for_msg = EFM}) ->
EFM1 = dict:store(MsgSeqNo, XName, EFM),
- io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, XName]),
QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
[maybe_monitor(QPid) || QPid <- QPids],
State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}.
@@ -1368,6 +1366,7 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
{channel_queue_exchange_stats,
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],
+ io:format("Stats: ~p~n", [Extra ++ CoarseStats ++ FineStats]),
rabbit_event:notify(channel_stats,
Extra ++ CoarseStats ++ FineStats)
end.