diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-29 11:38:27 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-29 11:38:27 +0100 |
commit | 65efb0a3887f32fb77c3b6f60d0dfe7d51b6e1fd (patch) | |
tree | 41e798f1f22aab64f46e0217f9e94946ef6e5f48 | |
parent | b3efbd8b78d4a115a6443def25f5fa29304999e6 (diff) | |
parent | 2a25a2be165e457c21d8ac085c1b08ca721a13ed (diff) | |
download | rabbitmq-server-65efb0a3887f32fb77c3b6f60d0dfe7d51b6e1fd.tar.gz |
merge from bug20284
-rw-r--r-- | src/rabbit_channel.erl | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7c45b52d..08fcd768 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -57,8 +57,10 @@ -define(STATISTICS_KEYS, [pid, transactional, + confirm, consumer_count, messages_unacknowledged, + unconfirmed, acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -465,6 +467,7 @@ send_or_enqueue_ack(undefined, _QPid, State) -> send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> State; send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> + maybe_incr_stats([{channel_stats, 1}], confirm, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> @@ -473,6 +476,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> State1 end, State); send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> + maybe_incr_stats([{channel_stats, 1}], confirm, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> @@ -1297,8 +1301,16 @@ i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{username = Username}) -> Username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE, + confirm_multiple = CM}) -> case {CE, CM} of + {false, _} -> none; + {_, false} -> single; + {_, true} -> multiple + end; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); +i(unconfirmed, #ch{unconfirmed = UC}) -> + gb_sets:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); @@ -1323,6 +1335,8 @@ incr_stats({QPid, _} = QX, Inc, Measure) -> incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> maybe_monitor(QPid), update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(channel_stats, Inc, Measure) -> + update_measures(channel_stats, self(), Inc, Measure); incr_stats(X, Inc, Measure) -> update_measures(exchange_stats, X, Inc, Measure). @@ -1356,6 +1370,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, {channel_exchange_stats, [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, + {channel_channel_stats, + [Stats || {{channel_stats, _}, Stats} <- get()]}, {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], |