diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-09-14 14:02:31 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-09-14 14:02:31 +0100 |
commit | b4d810a150ab4a49db4d93657e2d14f39b664bb3 (patch) | |
tree | fc29dd9d29635840c9f6cf041b2f1463887faecc | |
parent | 833d65afe865c0fc54051de665b870af197357af (diff) | |
download | rabbitmq-server-b4d810a150ab4a49db4d93657e2d14f39b664bb3.tar.gz |
* Use one-shot timers
* Ensure we always send a stats event straight after creation and before hibernation
* Tidy the API a bit.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
-rw-r--r-- | src/rabbit_event.erl | 71 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 19 |
4 files changed, 63 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 77df71cc..fd4d37aa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -167,6 +167,8 @@ declare(Recover, From, State1 = init_expires(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:maybe(StatsTimer, + fun() -> emit_stats(State1) end), noreply(State1); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -267,14 +269,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer, q = Q}) -> State#q{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> emit_stats(State) end, fun() -> rabbit_amqqueue:emit_stats(Q) end)}. -stop_stats_timer(State = #q{stats_timer = StatsTimer}) -> - State#q{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> emit_stats(State) end)}. - assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -586,8 +582,9 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). -emit_stats(State) -> - rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)). +emit_stats(State = #q{stats_timer = StatsTimer}) -> + rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)), + State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. %--------------------------------------------------------------------------- @@ -889,8 +886,9 @@ handle_cast(maybe_expire, State) -> end; handle_cast(emit_stats, State) -> - emit_stats(State), - noreply(State). + %% Do not invoke noreply as it would see no timer and create a new one. + State1 = emit_stats(State), + {noreply, State1}. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -927,5 +925,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_stats_timer( + {hibernate, emit_stats( stop_rate_timer(State#q{backing_queue_state = BQS2}))}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e5511330..637bee30 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -177,6 +177,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, queue_collector_pid = CollectorPid, stats_timer = StatsTimer}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), + rabbit_event:maybe(StatsTimer, fun() -> internal_emit_stats(State) end), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -239,8 +240,8 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State) -> - internal_emit_stats(State), - {noreply, State}. + State1 = internal_emit_stats(State), + {noreply, State1}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), @@ -248,7 +249,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {hibernate, stop_stats_timer(State)}. + {hibernate, internal_emit_stats(State)}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -278,14 +279,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), State#ch{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> internal_emit_stats(State) end, fun() -> emit_stats(ChPid) end)}. -stop_stats_timer(State = #ch{stats_timer = StatsTimer}) -> - State#ch{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> internal_emit_stats(State) end)}. - return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -1164,7 +1159,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], rabbit_event:notify(channel_stats, CoarseStats ++ FineStats) - end. + end, + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. erase_queue_stats(QPid) -> erase({monitoring, QPid}), diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 0f00537a..d7d3ac47 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -34,9 +34,9 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). --export([ensure_stats_timer_after/2, reset_stats_timer_after/1]). --export([stats_level/1]). +-export([init_stats_timer/0, ensure_stats_timer/2]). +-export([reset_stats_timer/1]). +-export([stats_level/1, maybe/2]). -export([notify/2]). %%---------------------------------------------------------------------------- @@ -71,11 +71,10 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). --spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). --spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()). --spec(reset_stats_timer_after/1 :: (state()) -> state()). +-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(reset_stats_timer/1 :: (state()) -> state()). -spec(stats_level/1 :: (state()) -> level()). +-spec(maybe/2 :: (state(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -endif. @@ -85,44 +84,54 @@ start_link() -> gen_event:start_link({local, ?MODULE}). +%% The idea is, for each of channel, queue, connection: +%% +%% On startup: +%% Timer = init_stats_timer() +%% notify(created event) +%% maybe(internal_emit_stats) - so we immediately send something +%% +%% On wakeup: +%% ensure_stats_timer(Timer, emit_stats) +%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) +%% +%% emit_stats: +%% internal_emit_stats +%% reset_stats_timer(Timer) - just bookkeeping +%% +%% Pre-hibernation: +%% internal_emit_stats +%% reset_stats_timer(Timer) - just bookkeeping +%% +%% internal_emit_stats: +%% notify(stats) + init_stats_timer() -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), #state{level = StatsLevel, timer = undefined}. -ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) -> - State; -ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) -> - NowFun(), - {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, - erlang, apply, [TimerFun, []]), - State#state{timer = TRef}; -ensure_stats_timer(State, _NowFun, _TimerFun) -> - State. - -stop_stats_timer(State = #state{level = none}, _NowFun) -> - State; -stop_stats_timer(State = #state{timer = undefined}, _NowFun) -> - State; -stop_stats_timer(State = #state{timer = TRef}, NowFun) -> - {ok, cancel} = timer:cancel(TRef), - NowFun(), - State#state{timer = undefined}. - -ensure_stats_timer_after(State = #state{level = none}, _TimerFun) -> +ensure_stats_timer(State = #state{level = none}, _Fun) -> State; -ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) -> +ensure_stats_timer(State = #state{timer = undefined}, Fun) -> {ok, TRef} = timer:apply_after(?STATS_INTERVAL, - erlang, apply, [TimerFun, []]), + erlang, apply, [Fun, []]), State#state{timer = TRef}; -ensure_stats_timer_after(State, _TimerFun) -> +ensure_stats_timer(State, _Fun) -> State. -reset_stats_timer_after(State) -> +reset_stats_timer(State) -> State#state{timer = undefined}. stats_level(#state{level = Level}) -> Level. + +maybe(#state{level = none}, _Fun) -> + ok; +maybe(_State, Fun) -> + Fun(), + ok. + notify(Type, Props) -> try %% TODO: switch to os:timestamp() when we drop support for diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index dc94a83c..c99c8874 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -369,10 +369,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> end), mainloop(Deb, State); {'$gen_cast', emit_stats} -> - internal_emit_stats(State), - mainloop(Deb, State#v1{stats_timer = - rabbit_event:reset_stats_timer_after( - State#v1.stats_timer)}); + State1 = internal_emit_stats(State), + mainloop(Deb, State1); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -693,7 +691,7 @@ refuse_connection(Sock, Exception) -> ensure_stats_timer(State = #v1{stats_timer = StatsTimer, connection_state = running}) -> Self = self(), - State#v1{stats_timer = rabbit_event:ensure_stats_timer_after( + State#v1{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, fun() -> emit_stats(Self) end)}; ensure_stats_timer(State) -> @@ -779,10 +777,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = NewConnection}), rabbit_event:notify(connection_created, infos(?CREATION_EVENT_KEYS, State1)), - case rabbit_event:stats_level(StatsTimer) of - none -> ok; - _ -> internal_emit_stats(State1) - end, + rabbit_event:maybe(StatsTimer, + fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -945,5 +941,6 @@ amqp_exception_explanation(Text, Expl) -> true -> CompleteTextBin end. -internal_emit_stats(State) -> - rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)). +internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> + rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), + State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. |