summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-26 13:59:12 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-26 13:59:12 +0100
commit4019ae31d80cff08b542d84e7edcbf446b0201fc (patch)
tree9d600605b1cd7b0a530dedfbf6ba5d6c87b14d6b
parent5006a8e57a70fb703bf9818a5f6243c75a54a326 (diff)
downloadrabbitmq-server-4019ae31d80cff08b542d84e7edcbf446b0201fc.tar.gz
Make statistics collection into an application parameter, switched off by default.
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_channel.erl69
-rw-r--r--src/rabbit_reader.erl8
-rw-r--r--src/rabbit_tests.erl2
5 files changed, 61 insertions, 32 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 2cd28abb..48e19ff8 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -27,4 +27,5 @@
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
- {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}.
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {collect_statistics, none}]}]}.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c099c051..bf90ef23 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -58,7 +58,8 @@
blocked_consumers,
sync_timer_ref,
rate_timer_ref,
- stats_timer_ref
+ stats_timer_ref,
+ stats_level
}).
-record(consumer, {tag, ack_required}).
@@ -107,6 +108,7 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
{ok, BQ} = application:get_env(backing_queue_module),
+ {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
@@ -117,7 +119,8 @@ init(Q) ->
blocked_consumers = queue:new(),
sync_timer_ref = undefined,
rate_timer_ref = undefined,
- stats_timer_ref = undefined}, hibernate,
+ stats_timer_ref = undefined,
+ stats_level = StatsLevel}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -231,6 +234,8 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
+ensure_stats_timer(State = #q{stats_level = none}) ->
+ State;
ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) ->
{ok, TRef} = timer:apply_after(?STATS_INTERVAL,
rabbit_amqqueue, emit_stats,
@@ -239,6 +244,8 @@ ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) ->
ensure_stats_timer(State) ->
State.
+stop_stats_timer(State = #q{stats_level = none}) ->
+ State;
stop_stats_timer(State = #q{stats_timer_ref = undefined}) ->
emit_stats(State),
State;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c16f9026..5696d82d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -50,7 +50,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, flow,
- stats_timer_ref}).
+ stats_timer_ref, stats_level}).
-record(flow, {server, client, pending}).
@@ -171,6 +171,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
{channel, Channel},
{user, Username},
{vhost, VHost}]),
+ {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
{ok, #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -189,7 +190,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
queue_collector_pid = CollectorPid,
flow = #flow{server = true, client = true,
pending = none},
- stats_timer_ref = undefined},
+ stats_timer_ref = undefined,
+ stats_level = StatsLevel},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -241,11 +243,11 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
{_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
- incr_stats([{QPid, 1}],
- case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end),
+ maybe_incr_stats([{QPid, 1}],
+ case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
@@ -311,6 +313,9 @@ noreply(NewState) ->
NewState1 = ensure_stats_timer(NewState),
{noreply, NewState1, hibernate}.
+ensure_stats_timer(State = #ch{stats_level = none}) ->
+ State;
+
ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(?STATS_INTERVAL,
rabbit_channel, emit_stats, [self()]),
@@ -318,6 +323,8 @@ ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) ->
ensure_stats_timer(State) ->
State.
+stop_stats_timer(State = #ch{stats_level = none}) ->
+ State;
stop_stats_timer(State = #ch{stats_timer_ref = undefined}) ->
internal_emit_stats(State),
State;
@@ -483,9 +490,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
unroutable -> ok = basic_return(Message, WriterPid, no_route);
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
- incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish),
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State),
{noreply, case TxnKey of
none -> State;
_ -> add_tx_participants(DeliveredQPids, State)
@@ -498,7 +505,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
QIncs = ack(TxnKey, Acked),
Participants = [QPid || {QPid, _} <- QIncs],
- incr_stats(QIncs, ack),
+ maybe_incr_stats(QIncs, ack, State),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
State#ch{unacked_message_q = Remaining};
@@ -526,11 +533,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
content = Content}}} ->
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
- incr_stats([{QPid, 1}],
- case NoAck of
- true -> get_no_ack;
- false -> get
- end),
+ maybe_incr_stats([{QPid, 1}],
+ case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -1185,7 +1192,9 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-incr_stats(QXIncs, Measure) ->
+maybe_incr_stats(_QXIncs, _Measure, #ch{stats_level = none}) ->
+ ok;
+maybe_incr_stats(QXIncs, Measure, _State) ->
[incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs].
incr_stats({QPid, _} = QX, Inc, Measure) ->
@@ -1218,16 +1227,22 @@ update_measures(Type, QX, Inc, Measure) ->
put({Type, QX},
orddict:store(Measure, Cur + Inc, Measures)).
-internal_emit_stats(State) ->
- rabbit_event:notify(
- channel_stats,
- [{queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {queue_exchange_stats,
- [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}] ++
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+internal_emit_stats(State = #ch{stats_level = Level}) ->
+ CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
+ case Level of
+ coarse ->
+ rabbit_event:notify(channel_stats, CoarseStats);
+ fine ->
+ FineStats =
+ [{queue_stats,
+ [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
+ {exchange_stats,
+ [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
+ {queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
+ end.
erase_queue_stats(QPid) ->
erase({monitoring, QPid}),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4cf47208..ed48e29a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -60,7 +60,7 @@
%---------------------------------------------------------------------------
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector, stats_timer_ref}).
+ queue_collector, stats_timer_ref, stats_level}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -250,6 +250,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
handshake_timeout),
ProfilingValue = setup_profiling(),
{ok, Collector} = rabbit_queue_collector:start_link(),
+ {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -263,7 +264,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
recv_ref = none,
connection_state = pre_init,
queue_collector = Collector,
- stats_timer_ref = undefined},
+ stats_timer_ref = undefined,
+ stats_level = StatsLevel},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -600,6 +602,8 @@ check_version(ClientVersion, ServerVersion) ->
(ClientMajor == ServerMajor andalso
ClientMinor >= ServerMinor).
+ensure_stats_timer(State = #v1{stats_level = none}) ->
+ State;
ensure_stats_timer(State = #v1{stats_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(?STATS_INTERVAL,
rabbit_reader, emit_stats, [self()]),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e2d59737..fc755898 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1189,6 +1189,8 @@ test_statistics_receive_event(Ch, Retries, Matcher) ->
end.
test_statistics() ->
+ application:set_env(rabbit, collect_statistics, fine),
+
%% ATM this just tests the queue / exchange stats in channels. That's
%% by far the most complex code though.