summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-21 13:24:07 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-21 13:24:07 +0100
commit347ea71a8c20a8b76ccd753c4c4ab3fd30d5fb4d (patch)
treef207f41189065d51183535a41f4dae8bea22f7d2
parent745564638964f730fc7039a1714f04c0e1a9a57a (diff)
parent0d0d11f356e4b57baa94f61ed9363ae3c1089c22 (diff)
downloadrabbitmq-server-347ea71a8c20a8b76ccd753c4c4ab3fd30d5fb4d.tar.gz
Merging bug 23193 onto default
-rw-r--r--src/rabbit_amqqueue_process.erl32
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_event.erl71
-rw-r--r--src/rabbit_reader.erl25
4 files changed, 90 insertions, 62 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 91877efb..d15a6eb3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -153,7 +153,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined}) ->
+ backing_queue = BQ, backing_queue_state = undefined,
+ stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
Q -> gen_server2:reply(From, {new, Q}),
@@ -164,9 +165,12 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
+ State1 = init_expires(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
- infos(?CREATION_EVENT_KEYS, State)),
- noreply(init_expires(State#q{backing_queue_state = BQS}));
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -266,14 +270,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer,
q = Q}) ->
State#q{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(State) end,
fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
-stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
- State#q{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> emit_stats(State) end)}.
-
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -910,9 +908,12 @@ handle_cast(maybe_expire, State) ->
false -> noreply(ensure_expiry_timer(State))
end;
-handle_cast(emit_stats, State) ->
+handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+ %% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
- noreply(State).
+ State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
+ assert_invariant(State1),
+ {noreply, State1}.
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -943,11 +944,14 @@ handle_info(Info, State) ->
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
{hibernate, State};
handle_pre_hibernate(State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ stats_timer = StatsTimer}) ->
BQS1 = BQ:handle_pre_hibernate(BQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_stats_timer(
- stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
+ rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end),
+ State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
+ backing_queue_state = BQS2},
+ {hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f19f98d2..bde11f00 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -158,6 +158,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
+ StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -175,8 +176,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
consumer_mapping = dict:new(),
blocking = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = rabbit_event:init_stats_timer()},
+ stats_timer = StatsTimer},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> internal_emit_stats(State) end),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -251,17 +254,22 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast(emit_stats, State) ->
+handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
internal_emit_stats(State),
- {noreply, State}.
+ {noreply,
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}.
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
-handle_pre_hibernate(State) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- {hibernate, stop_stats_timer(State)}.
+ rabbit_event:if_enabled(StatsTimer, fun () ->
+ internal_emit_stats(State)
+ end),
+ {hibernate,
+ State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -291,14 +299,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> internal_emit_stats(State) end,
fun() -> emit_stats(ChPid) end)}.
-stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- State#ch{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> internal_emit_stats(State) end)}.
-
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 0f00537a..2b236531 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -34,9 +34,9 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
--export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
--export([stats_level/1]).
+-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
+-export([reset_stats_timer/1]).
+-export([stats_level/1, if_enabled/2]).
-export([notify/2]).
%%----------------------------------------------------------------------------
@@ -71,11 +71,11 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
--spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
--spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()).
--spec(reset_stats_timer_after/1 :: (state()) -> state()).
+-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(stop_stats_timer/1 :: (state()) -> state()).
+-spec(reset_stats_timer/1 :: (state()) -> state()).
-spec(stats_level/1 :: (state()) -> level()).
+-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-endif.
@@ -85,44 +85,61 @@
start_link() ->
gen_event:start_link({local, ?MODULE}).
+%% The idea is, for each stat-emitting object:
+%%
+%% On startup:
+%% Timer = init_stats_timer()
+%% notify(created event)
+%% if_enabled(internal_emit_stats) - so we immediately send something
+%%
+%% On wakeup:
+%% ensure_stats_timer(Timer, emit_stats)
+%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
+%%
+%% emit_stats:
+%% if_enabled(internal_emit_stats)
+%% reset_stats_timer(Timer) - just bookkeeping
+%%
+%% Pre-hibernation:
+%% if_enabled(internal_emit_stats)
+%% stop_stats_timer(Timer)
+%%
+%% internal_emit_stats:
+%% notify(stats)
+
init_stats_timer() ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
#state{level = StatsLevel, timer = undefined}.
-ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
+ensure_stats_timer(State = #state{level = none}, _Fun) ->
State;
-ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
- NowFun(),
- {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
+ensure_stats_timer(State = #state{timer = undefined}, Fun) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ erlang, apply, [Fun, []]),
State#state{timer = TRef};
-ensure_stats_timer(State, _NowFun, _TimerFun) ->
+ensure_stats_timer(State, _Fun) ->
State.
-stop_stats_timer(State = #state{level = none}, _NowFun) ->
+stop_stats_timer(State = #state{level = none}) ->
State;
-stop_stats_timer(State = #state{timer = undefined}, _NowFun) ->
+stop_stats_timer(State = #state{timer = undefined}) ->
State;
-stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
+stop_stats_timer(State = #state{timer = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- NowFun(),
State#state{timer = undefined}.
-ensure_stats_timer_after(State = #state{level = none}, _TimerFun) ->
- State;
-ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) ->
- {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
- State#state{timer = TRef};
-ensure_stats_timer_after(State, _TimerFun) ->
- State.
-
-reset_stats_timer_after(State) ->
+reset_stats_timer(State) ->
State#state{timer = undefined}.
stats_level(#state{level = Level}) ->
Level.
+if_enabled(#state{level = none}, _Fun) ->
+ ok;
+if_enabled(_State, Fun) ->
+ Fun(),
+ ok.
+
notify(Type, Props) ->
try
%% TODO: switch to os:timestamp() when we drop support for
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 252f81a3..745e0083 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -369,10 +369,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
end),
mainloop(Deb, State);
{'$gen_cast', emit_stats} ->
- internal_emit_stats(State),
- mainloop(Deb, State#v1{stats_timer =
- rabbit_event:reset_stats_timer_after(
- State#v1.stats_timer)});
+ State1 = internal_emit_stats(State),
+ mainloop(Deb, State1);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -690,11 +688,14 @@ refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
throw(Exception).
-ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
+ connection_state = running}) ->
Self = self(),
- State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(Self) end)}.
+ fun() -> emit_stats(Self) end)};
+ensure_stats_timer(State) ->
+ State.
%%--------------------------------------------------------------------------
@@ -765,7 +766,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- sock = Sock}) ->
+ sock = Sock,
+ stats_timer = StatsTimer}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -775,6 +777,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = NewConnection}),
rabbit_event:notify(connection_created,
infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -938,5 +942,6 @@ amqp_exception_explanation(Text, Expl) ->
true -> CompleteTextBin
end.
-internal_emit_stats(State) ->
- rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)).
+internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
+ State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.