summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-09-14 14:02:31 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-09-14 14:02:31 +0100
commitb4d810a150ab4a49db4d93657e2d14f39b664bb3 (patch)
treefc29dd9d29635840c9f6cf041b2f1463887faecc
parent833d65afe865c0fc54051de665b870af197357af (diff)
downloadrabbitmq-server-b4d810a150ab4a49db4d93657e2d14f39b664bb3.tar.gz
* Use one-shot timers
* Ensure we always send a stats event straight after creation and before hibernation * Tidy the API a bit.
-rw-r--r--src/rabbit_amqqueue_process.erl20
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_event.erl71
-rw-r--r--src/rabbit_reader.erl19
4 files changed, 63 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 77df71cc..fd4d37aa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -167,6 +167,8 @@ declare(Recover, From,
State1 = init_expires(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:maybe(StatsTimer,
+ fun() -> emit_stats(State1) end),
noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -267,14 +269,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer,
q = Q}) ->
State#q{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(State) end,
fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
-stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
- State#q{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> emit_stats(State) end)}.
-
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -586,8 +582,9 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-emit_stats(State) ->
- rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)).
+emit_stats(State = #q{stats_timer = StatsTimer}) ->
+ rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)),
+ State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
%---------------------------------------------------------------------------
@@ -889,8 +886,9 @@ handle_cast(maybe_expire, State) ->
end;
handle_cast(emit_stats, State) ->
- emit_stats(State),
- noreply(State).
+ %% Do not invoke noreply as it would see no timer and create a new one.
+ State1 = emit_stats(State),
+ {noreply, State1}.
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -927,5 +925,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_stats_timer(
+ {hibernate, emit_stats(
stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e5511330..637bee30 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -177,6 +177,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
+ rabbit_event:maybe(StatsTimer, fun() -> internal_emit_stats(State) end),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -239,8 +240,8 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(emit_stats, State) ->
- internal_emit_stats(State),
- {noreply, State}.
+ State1 = internal_emit_stats(State),
+ {noreply, State1}.
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
@@ -248,7 +249,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {hibernate, stop_stats_timer(State)}.
+ {hibernate, internal_emit_stats(State)}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -278,14 +279,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> internal_emit_stats(State) end,
fun() -> emit_stats(ChPid) end)}.
-stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- State#ch{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> internal_emit_stats(State) end)}.
-
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -1164,7 +1159,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],
rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
- end.
+ end,
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
erase_queue_stats(QPid) ->
erase({monitoring, QPid}),
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 0f00537a..d7d3ac47 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -34,9 +34,9 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
--export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
--export([stats_level/1]).
+-export([init_stats_timer/0, ensure_stats_timer/2]).
+-export([reset_stats_timer/1]).
+-export([stats_level/1, maybe/2]).
-export([notify/2]).
%%----------------------------------------------------------------------------
@@ -71,11 +71,10 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
--spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
--spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()).
--spec(reset_stats_timer_after/1 :: (state()) -> state()).
+-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(reset_stats_timer/1 :: (state()) -> state()).
-spec(stats_level/1 :: (state()) -> level()).
+-spec(maybe/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-endif.
@@ -85,44 +84,54 @@
start_link() ->
gen_event:start_link({local, ?MODULE}).
+%% The idea is, for each of channel, queue, connection:
+%%
+%% On startup:
+%% Timer = init_stats_timer()
+%% notify(created event)
+%% maybe(internal_emit_stats) - so we immediately send something
+%%
+%% On wakeup:
+%% ensure_stats_timer(Timer, emit_stats)
+%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
+%%
+%% emit_stats:
+%% internal_emit_stats
+%% reset_stats_timer(Timer) - just bookkeeping
+%%
+%% Pre-hibernation:
+%% internal_emit_stats
+%% reset_stats_timer(Timer) - just bookkeeping
+%%
+%% internal_emit_stats:
+%% notify(stats)
+
init_stats_timer() ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
#state{level = StatsLevel, timer = undefined}.
-ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
- State;
-ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
- NowFun(),
- {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
- State#state{timer = TRef};
-ensure_stats_timer(State, _NowFun, _TimerFun) ->
- State.
-
-stop_stats_timer(State = #state{level = none}, _NowFun) ->
- State;
-stop_stats_timer(State = #state{timer = undefined}, _NowFun) ->
- State;
-stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
- {ok, cancel} = timer:cancel(TRef),
- NowFun(),
- State#state{timer = undefined}.
-
-ensure_stats_timer_after(State = #state{level = none}, _TimerFun) ->
+ensure_stats_timer(State = #state{level = none}, _Fun) ->
State;
-ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) ->
+ensure_stats_timer(State = #state{timer = undefined}, Fun) ->
{ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
+ erlang, apply, [Fun, []]),
State#state{timer = TRef};
-ensure_stats_timer_after(State, _TimerFun) ->
+ensure_stats_timer(State, _Fun) ->
State.
-reset_stats_timer_after(State) ->
+reset_stats_timer(State) ->
State#state{timer = undefined}.
stats_level(#state{level = Level}) ->
Level.
+
+maybe(#state{level = none}, _Fun) ->
+ ok;
+maybe(_State, Fun) ->
+ Fun(),
+ ok.
+
notify(Type, Props) ->
try
%% TODO: switch to os:timestamp() when we drop support for
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index dc94a83c..c99c8874 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -369,10 +369,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
end),
mainloop(Deb, State);
{'$gen_cast', emit_stats} ->
- internal_emit_stats(State),
- mainloop(Deb, State#v1{stats_timer =
- rabbit_event:reset_stats_timer_after(
- State#v1.stats_timer)});
+ State1 = internal_emit_stats(State),
+ mainloop(Deb, State1);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -693,7 +691,7 @@ refuse_connection(Sock, Exception) ->
ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
connection_state = running}) ->
Self = self(),
- State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
fun() -> emit_stats(Self) end)};
ensure_stats_timer(State) ->
@@ -779,10 +777,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = NewConnection}),
rabbit_event:notify(connection_created,
infos(?CREATION_EVENT_KEYS, State1)),
- case rabbit_event:stats_level(StatsTimer) of
- none -> ok;
- _ -> internal_emit_stats(State1)
- end,
+ rabbit_event:maybe(StatsTimer,
+ fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -945,5 +941,6 @@ amqp_exception_explanation(Text, Expl) ->
true -> CompleteTextBin
end.
-internal_emit_stats(State) ->
- rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)).
+internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
+ State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.