summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-29 11:38:27 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-29 11:38:27 +0100
commit65efb0a3887f32fb77c3b6f60d0dfe7d51b6e1fd (patch)
tree41e798f1f22aab64f46e0217f9e94946ef6e5f48
parentb3efbd8b78d4a115a6443def25f5fa29304999e6 (diff)
parent2a25a2be165e457c21d8ac085c1b08ca721a13ed (diff)
downloadrabbitmq-server-65efb0a3887f32fb77c3b6f60d0dfe7d51b6e1fd.tar.gz
merge from bug20284
-rw-r--r--src/rabbit_channel.erl16
1 files changed, 16 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7c45b52d..08fcd768 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -57,8 +57,10 @@
-define(STATISTICS_KEYS,
[pid,
transactional,
+ confirm,
consumer_count,
messages_unacknowledged,
+ unconfirmed,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -465,6 +467,7 @@ send_or_enqueue_ack(undefined, _QPid, State) ->
send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
State;
send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+ maybe_incr_stats([{channel_stats, 1}], confirm, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
@@ -473,6 +476,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
State1
end, State);
send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
+ maybe_incr_stats([{channel_stats, 1}], confirm, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
@@ -1297,8 +1301,16 @@ i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{username = Username}) -> Username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE,
+ confirm_multiple = CM}) -> case {CE, CM} of
+ {false, _} -> none;
+ {_, false} -> single;
+ {_, true} -> multiple
+ end;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
+i(unconfirmed, #ch{unconfirmed = UC}) ->
+ gb_sets:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);
@@ -1323,6 +1335,8 @@ incr_stats({QPid, _} = QX, Inc, Measure) ->
incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
maybe_monitor(QPid),
update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(channel_stats, Inc, Measure) ->
+ update_measures(channel_stats, self(), Inc, Measure);
incr_stats(X, Inc, Measure) ->
update_measures(exchange_stats, X, Inc, Measure).
@@ -1356,6 +1370,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
[{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
{channel_exchange_stats,
[{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
+ {channel_channel_stats,
+ [Stats || {{channel_stats, _}, Stats} <- get()]},
{channel_queue_exchange_stats,
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],