diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-26 13:59:12 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-26 13:59:12 +0100 |
commit | 4019ae31d80cff08b542d84e7edcbf446b0201fc (patch) | |
tree | 9d600605b1cd7b0a530dedfbf6ba5d6c87b14d6b | |
parent | 5006a8e57a70fb703bf9818a5f6243c75a54a326 (diff) | |
download | rabbitmq-server-4019ae31d80cff08b542d84e7edcbf446b0201fc.tar.gz |
Make statistics collection into an application parameter, switched off by default.
-rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 69 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 8 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
5 files changed, 61 insertions, 32 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 2cd28abb..48e19ff8 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -27,4 +27,5 @@ {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, - {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}. + {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {collect_statistics, none}]}]}. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c099c051..bf90ef23 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -58,7 +58,8 @@ blocked_consumers, sync_timer_ref, rate_timer_ref, - stats_timer_ref + stats_timer_ref, + stats_level }). -record(consumer, {tag, ack_required}). @@ -107,6 +108,7 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), {ok, BQ} = application:get_env(backing_queue_module), + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, @@ -117,7 +119,8 @@ init(Q) -> blocked_consumers = queue:new(), sync_timer_ref = undefined, rate_timer_ref = undefined, - stats_timer_ref = undefined}, hibernate, + stats_timer_ref = undefined, + stats_level = StatsLevel}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -231,6 +234,8 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. +ensure_stats_timer(State = #q{stats_level = none}) -> + State; ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) -> {ok, TRef} = timer:apply_after(?STATS_INTERVAL, rabbit_amqqueue, emit_stats, @@ -239,6 +244,8 @@ ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) -> ensure_stats_timer(State) -> State. +stop_stats_timer(State = #q{stats_level = none}) -> + State; stop_stats_timer(State = #q{stats_timer_ref = undefined}) -> emit_stats(State), State; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c16f9026..5696d82d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,7 +50,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, flow, - stats_timer_ref}). + stats_timer_ref, stats_level}). -record(flow, {server, client, pending}). @@ -171,6 +171,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> {channel, Channel}, {user, Username}, {vhost, VHost}]), + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -189,7 +190,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> queue_collector_pid = CollectorPid, flow = #flow{server = true, client = true, pending = none}, - stats_timer_ref = undefined}, + stats_timer_ref = undefined, + stats_level = StatsLevel}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -241,11 +243,11 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, - incr_stats([{QPid, 1}], - case AckRequired of - true -> deliver; - false -> deliver_no_ack - end), + maybe_incr_stats([{QPid, 1}], + case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> @@ -311,6 +313,9 @@ noreply(NewState) -> NewState1 = ensure_stats_timer(NewState), {noreply, NewState1, hibernate}. +ensure_stats_timer(State = #ch{stats_level = none}) -> + State; + ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after(?STATS_INTERVAL, rabbit_channel, emit_stats, [self()]), @@ -318,6 +323,8 @@ ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) -> ensure_stats_timer(State) -> State. +stop_stats_timer(State = #ch{stats_level = none}) -> + State; stop_stats_timer(State = #ch{stats_timer_ref = undefined}) -> internal_emit_stats(State), State; @@ -483,9 +490,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, - incr_stats([{ExchangeName, 1} | - [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish), + maybe_incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish, State), {noreply, case TxnKey of none -> State; _ -> add_tx_participants(DeliveredQPids, State) @@ -498,7 +505,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), QIncs = ack(TxnKey, Acked), Participants = [QPid || {QPid, _} <- QIncs], - incr_stats(QIncs, ack), + maybe_incr_stats(QIncs, ack, State), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; @@ -526,11 +533,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, content = Content}}} -> State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), - incr_stats([{QPid, 1}], - case NoAck of - true -> get_no_ack; - false -> get - end), + maybe_incr_stats([{QPid, 1}], + case NoAck of + true -> get_no_ack; + false -> get + end, State), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -1185,7 +1192,9 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). -incr_stats(QXIncs, Measure) -> +maybe_incr_stats(_QXIncs, _Measure, #ch{stats_level = none}) -> + ok; +maybe_incr_stats(QXIncs, Measure, _State) -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]. incr_stats({QPid, _} = QX, Inc, Measure) -> @@ -1218,16 +1227,22 @@ update_measures(Type, QX, Inc, Measure) -> put({Type, QX}, orddict:store(Measure, Cur + Inc, Measures)). -internal_emit_stats(State) -> - rabbit_event:notify( - channel_stats, - [{queue_stats, - [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, - {exchange_stats, - [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {queue_exchange_stats, - [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}] ++ - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). +internal_emit_stats(State = #ch{stats_level = Level}) -> + CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS], + case Level of + coarse -> + rabbit_event:notify(channel_stats, CoarseStats); + fine -> + FineStats = + [{queue_stats, + [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, + {exchange_stats, + [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, + {queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, CoarseStats ++ FineStats) + end. erase_queue_stats(QPid) -> erase({monitoring, QPid}), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4cf47208..ed48e29a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -60,7 +60,7 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector, stats_timer_ref}). + queue_collector, stats_timer_ref, stats_level}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -250,6 +250,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> handshake_timeout), ProfilingValue = setup_profiling(), {ok, Collector} = rabbit_queue_collector:start_link(), + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, @@ -263,7 +264,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> recv_ref = none, connection_state = pre_init, queue_collector = Collector, - stats_timer_ref = undefined}, + stats_timer_ref = undefined, + stats_level = StatsLevel}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -600,6 +602,8 @@ check_version(ClientVersion, ServerVersion) -> (ClientMajor == ServerMajor andalso ClientMinor >= ServerMinor). +ensure_stats_timer(State = #v1{stats_level = none}) -> + State; ensure_stats_timer(State = #v1{stats_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after(?STATS_INTERVAL, rabbit_reader, emit_stats, [self()]), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e2d59737..fc755898 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1189,6 +1189,8 @@ test_statistics_receive_event(Ch, Retries, Matcher) -> end. test_statistics() -> + application:set_env(rabbit, collect_statistics, fine), + %% ATM this just tests the queue / exchange stats in channels. That's %% by far the most complex code though. |