diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-21 14:15:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-21 14:15:31 +0100 |
commit | ea28306cba88cbc4df6552840f27bfee217ff38a (patch) | |
tree | c8a6925f43361708dad0697deb5af3b594982506 | |
parent | ce04648364101332f298ffda7558496990d465b4 (diff) | |
download | rabbitmq-server-ea28306cba88cbc4df6552840f27bfee217ff38a.tar.gz |
Invoking os:timestamp and timer:now_diff all the time is too expensive, use timers instead.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 55 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 41 |
5 files changed, 100 insertions, 53 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index ac7e3851..5df9e690 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -82,7 +82,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --define(STATISTICS_UPDATE_INTERVAL, 5000000). %% microseconds +-define(STATS_INTERVAL, 5000). -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f1b52768..7bc85f2e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,6 +41,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([emit_stats/1]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). @@ -104,6 +105,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). +-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -305,6 +307,9 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +emit_stats(#amqqueue{pid = QPid}) -> + delegate_cast(QPid, emit_stats). + delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -449,6 +454,9 @@ safe_delegate_call_ok(H, F, Pids) -> delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). +delegate_cast(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). + delegate_pcall(Pid, Pri, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 78f3de97..e7d4817d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -58,7 +58,7 @@ blocked_consumers, sync_timer_ref, rate_timer_ref, - last_stats_update + stats_timer_ref }). -record(consumer, {tag, ack_required}). @@ -117,7 +117,7 @@ init(Q) -> blocked_consumers = queue:new(), sync_timer_ref = undefined, rate_timer_ref = undefined, - last_stats_update = {0,0,0}}, hibernate, + stats_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -191,7 +191,7 @@ noreply(NewState) -> next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), - State2 = maybe_emit_stats(State1), + State2 = ensure_stats_timer(State1), case BQ:needs_sync(BQS)of true -> {ensure_sync_timer(State2), 0}; false -> {stop_sync_timer(State2), hibernate} @@ -231,6 +231,23 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. +ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + rabbit_amqqueue, emit_stats, + [Q]), + State#q{stats_timer_ref = TRef}; +ensure_stats_timer(State) -> + State. + +stop_stats_timer(State = #q{stats_timer_ref = undefined}) -> + emit_stats(State), + State; +stop_stats_timer(State = #q{stats_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + emit_stats(State), + State#q{stats_timer_ref = undefined}. + + assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -541,19 +558,10 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). -%--------------------------------------------------------------------------- +emit_stats(State) -> + rabbit_event:notify(queue_stats, + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). -maybe_emit_stats(State = #q{last_stats_update = LastUpdate}) -> - Now = os:timestamp(), - case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of - true -> - rabbit_event:notify( - queue_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]), - State#q{last_stats_update = Now}; - _ -> - State - end. %--------------------------------------------------------------------------- handle_call({init, Recover}, From, @@ -829,7 +837,11 @@ handle_cast({set_ram_duration_target, Duration}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State). + noreply(State); + +handle_cast(emit_stats, State) -> + emit_stats(State), + noreply(State#q{stats_timer_ref = undefined}). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -866,4 +878,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}. + {hibernate, stop_stats_timer( + stop_rate_timer(State#q{backing_queue_state = BQS2}))}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c878617..21d2b3de 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,6 +38,7 @@ -export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([emit_stats/1]). -export([flow_timeout/2]). @@ -49,7 +50,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, flow, - queue_exchange_stats, last_stats_update}). + queue_exchange_stats, stats_timer_ref}). -record(flow, {server, client, pending}). @@ -100,6 +101,7 @@ -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). -spec(info_all/0 :: () -> [[rabbit_types:info()]]). -spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). +-spec(emit_stats/1 :: (pid()) -> 'ok'). -endif. @@ -155,6 +157,9 @@ info_all() -> info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). +emit_stats(Pid) -> + gen_server2:cast(Pid, emit_stats). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> @@ -185,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> flow = #flow{server = true, client = true, pending = none}, queue_exchange_stats = dict:new(), - last_stats_update = {0,0,0}}, + stats_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -262,7 +267,11 @@ handle_cast({flow_timeout, Ref}, "timeout waiting for channel.flow_ok{active=~w}", [not Flow], none), State)}; handle_cast({flow_timeout, _Ref}, State) -> - {noreply, State}. + {noreply, State}; + +handle_cast(emit_stats, State) -> + internal_emit_stats(State), + noreply(State#ch{stats_timer_ref = undefined}). handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -277,7 +286,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {hibernate, State}. + {hibernate, stop_stats_timer(State)}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -296,13 +305,29 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- reply(Reply, NewState) -> - NewState1 = maybe_emit_stats(NewState), + NewState1 = ensure_stats_timer(NewState), {reply, Reply, NewState1, hibernate}. noreply(NewState) -> - NewState1 = maybe_emit_stats(NewState), + NewState1 = ensure_stats_timer(NewState), {noreply, NewState1, hibernate}. +ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + rabbit_channel, emit_stats, + [self()]), + State#ch{stats_timer_ref = TRef}; +ensure_stats_timer(State) -> + State. + +stop_stats_timer(State = #ch{stats_timer_ref = undefined}) -> + internal_emit_stats(State), + State; +stop_stats_timer(State = #ch{stats_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + internal_emit_stats(State), + State#ch{stats_timer_ref = undefined}. + return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -1186,19 +1211,11 @@ incr_stats(QXCounts, Item, State = #ch{queue_exchange_stats = Stats}) -> end, Stats, QXCounts), State#ch{queue_exchange_stats = Stats1}. -maybe_emit_stats(State = #ch{queue_exchange_stats = QueueExchangeStats, - last_stats_update = LastUpdate}) -> - Now = os:timestamp(), - case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of - true -> - rabbit_event:notify( - channel_stats, - [{queue_exchange_stats, dict:to_list(QueueExchangeStats)} | - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]), - State#ch{last_stats_update = Now}; - _ -> - State - end. +internal_emit_stats(State = #ch{queue_exchange_stats = QueueExchangeStats}) -> + rabbit_event:notify( + channel_stats, + [{queue_exchange_stats, dict:to_list(QueueExchangeStats)} | + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]). erase_queue_stats(QPid, State = #ch{queue_exchange_stats = Stats}) -> Stats1 = dict:erase(QPid, Stats), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4be191ae..1143794e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -43,6 +43,8 @@ -export([analyze_frame/2]). +-export([emit_stats/1]). + -import(gen_tcp). -import(fprof). -import(inet). @@ -58,7 +60,7 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector, last_stats_update}). + queue_collector, stats_timer_ref}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -144,6 +146,7 @@ -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). +-spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). @@ -184,6 +187,9 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. +emit_stats(Pid) -> + gen_server2:cast(Pid, emit_stats). + setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of @@ -257,7 +263,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> recv_ref = none, connection_state = pre_init, queue_collector = Collector, - last_stats_update = {0,0,0}}, + stats_timer_ref = undefined}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -282,9 +288,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> end, done. -mainloop(Parent, Deb, State_ = #v1{sock= Sock, recv_ref = Ref}) -> +mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), - State = maybe_emit_stats(State_), receive {inet_async, Sock, Ref, {ok, Data}} -> {State1, Callback1, Length1} = @@ -345,6 +350,9 @@ mainloop(Parent, Deb, State_ = #v1{sock= Sock, recv_ref = Ref}) -> catch Error -> {error, Error} end), mainloop(Parent, Deb, State); + {'$gen_cast', emit_stats} -> + internal_emit_stats(State), + mainloop(Parent, Deb, State#v1{stats_timer_ref = undefined}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -537,7 +545,8 @@ analyze_frame(_Type, _Body) -> handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]), - {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1}; + {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1}; handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of @@ -591,6 +600,14 @@ check_version(ClientVersion, ServerVersion) -> (ClientMajor == ServerMajor andalso ClientMinor >= ServerMinor). +ensure_stats_timer(State = #v1{stats_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + rabbit_reader, emit_stats, + [self()]), + State#v1{stats_timer_ref = TRef}; +ensure_stats_timer(State) -> + State. + %%-------------------------------------------------------------------------- handle_method0(MethodName, FieldsBin, State) -> @@ -857,14 +874,6 @@ amqp_exception_explanation(Text, Expl) -> true -> CompleteTextBin end. -maybe_emit_stats(State = #v1{last_stats_update = LastUpdate}) -> - Now = os:timestamp(), - case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of - true -> - rabbit_event:notify( - connection_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]), - State#v1{last_stats_update = Now}; - _ -> - State - end. +internal_emit_stats(State) -> + rabbit_event:notify(connection_stats, + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). |