diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-21 13:24:07 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-21 13:24:07 +0100 |
commit | 347ea71a8c20a8b76ccd753c4c4ab3fd30d5fb4d (patch) | |
tree | f207f41189065d51183535a41f4dae8bea22f7d2 | |
parent | 745564638964f730fc7039a1714f04c0e1a9a57a (diff) | |
parent | 0d0d11f356e4b57baa94f61ed9363ae3c1089c22 (diff) | |
download | rabbitmq-server-347ea71a8c20a8b76ccd753c4c4ab3fd30d5fb4d.tar.gz |
Merging bug 23193 onto default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 24 | ||||
-rw-r--r-- | src/rabbit_event.erl | 71 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 25 |
4 files changed, 90 insertions, 62 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 91877efb..d15a6eb3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -153,7 +153,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined}) -> + backing_queue = BQ, backing_queue_state = undefined, + stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; Q -> gen_server2:reply(From, {new, Q}), @@ -164,9 +165,12 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), + State1 = init_expires(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State)), - noreply(init_expires(State#q{backing_queue_state = BQS})); + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(StatsTimer, + fun() -> emit_stats(State1) end), + noreply(State1); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -266,14 +270,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer, q = Q}) -> State#q{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> emit_stats(State) end, fun() -> rabbit_amqqueue:emit_stats(Q) end)}. -stop_stats_timer(State = #q{stats_timer = StatsTimer}) -> - State#q{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> emit_stats(State) end)}. - assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -910,9 +908,12 @@ handle_cast(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; -handle_cast(emit_stats, State) -> +handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> + %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), - noreply(State). + State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + assert_invariant(State1), + {noreply, State1}. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -943,11 +944,14 @@ handle_info(Info, State) -> handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + stats_timer = StatsTimer}) -> BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_stats_timer( - stop_rate_timer(State#q{backing_queue_state = BQS2}))}. + rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end), + State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), + backing_queue_state = BQS2}, + {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f19f98d2..bde11f00 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -158,6 +158,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), + StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -175,8 +176,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = rabbit_event:init_stats_timer()}, + stats_timer = StatsTimer}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), + rabbit_event:if_enabled(StatsTimer, + fun() -> internal_emit_stats(State) end), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -251,17 +254,22 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, end, State), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast(emit_stats, State) -> +handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), - {noreply, State}. + {noreply, + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. -handle_pre_hibernate(State) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - {hibernate, stop_stats_timer(State)}. + rabbit_event:if_enabled(StatsTimer, fun () -> + internal_emit_stats(State) + end), + {hibernate, + State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -291,14 +299,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), State#ch{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> internal_emit_stats(State) end, fun() -> emit_stats(ChPid) end)}. -stop_stats_timer(State = #ch{stats_timer = StatsTimer}) -> - State#ch{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> internal_emit_stats(State) end)}. - return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 0f00537a..2b236531 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -34,9 +34,9 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). --export([ensure_stats_timer_after/2, reset_stats_timer_after/1]). --export([stats_level/1]). +-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). +-export([reset_stats_timer/1]). +-export([stats_level/1, if_enabled/2]). -export([notify/2]). %%---------------------------------------------------------------------------- @@ -71,11 +71,11 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). --spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). --spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()). --spec(reset_stats_timer_after/1 :: (state()) -> state()). +-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(stop_stats_timer/1 :: (state()) -> state()). +-spec(reset_stats_timer/1 :: (state()) -> state()). -spec(stats_level/1 :: (state()) -> level()). +-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -endif. @@ -85,44 +85,61 @@ start_link() -> gen_event:start_link({local, ?MODULE}). +%% The idea is, for each stat-emitting object: +%% +%% On startup: +%% Timer = init_stats_timer() +%% notify(created event) +%% if_enabled(internal_emit_stats) - so we immediately send something +%% +%% On wakeup: +%% ensure_stats_timer(Timer, emit_stats) +%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) +%% +%% emit_stats: +%% if_enabled(internal_emit_stats) +%% reset_stats_timer(Timer) - just bookkeeping +%% +%% Pre-hibernation: +%% if_enabled(internal_emit_stats) +%% stop_stats_timer(Timer) +%% +%% internal_emit_stats: +%% notify(stats) + init_stats_timer() -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), #state{level = StatsLevel, timer = undefined}. -ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) -> +ensure_stats_timer(State = #state{level = none}, _Fun) -> State; -ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) -> - NowFun(), - {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, - erlang, apply, [TimerFun, []]), +ensure_stats_timer(State = #state{timer = undefined}, Fun) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + erlang, apply, [Fun, []]), State#state{timer = TRef}; -ensure_stats_timer(State, _NowFun, _TimerFun) -> +ensure_stats_timer(State, _Fun) -> State. -stop_stats_timer(State = #state{level = none}, _NowFun) -> +stop_stats_timer(State = #state{level = none}) -> State; -stop_stats_timer(State = #state{timer = undefined}, _NowFun) -> +stop_stats_timer(State = #state{timer = undefined}) -> State; -stop_stats_timer(State = #state{timer = TRef}, NowFun) -> +stop_stats_timer(State = #state{timer = TRef}) -> {ok, cancel} = timer:cancel(TRef), - NowFun(), State#state{timer = undefined}. -ensure_stats_timer_after(State = #state{level = none}, _TimerFun) -> - State; -ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) -> - {ok, TRef} = timer:apply_after(?STATS_INTERVAL, - erlang, apply, [TimerFun, []]), - State#state{timer = TRef}; -ensure_stats_timer_after(State, _TimerFun) -> - State. - -reset_stats_timer_after(State) -> +reset_stats_timer(State) -> State#state{timer = undefined}. stats_level(#state{level = Level}) -> Level. +if_enabled(#state{level = none}, _Fun) -> + ok; +if_enabled(_State, Fun) -> + Fun(), + ok. + notify(Type, Props) -> try %% TODO: switch to os:timestamp() when we drop support for diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 252f81a3..745e0083 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -369,10 +369,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> end), mainloop(Deb, State); {'$gen_cast', emit_stats} -> - internal_emit_stats(State), - mainloop(Deb, State#v1{stats_timer = - rabbit_event:reset_stats_timer_after( - State#v1.stats_timer)}); + State1 = internal_emit_stats(State), + mainloop(Deb, State1); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -690,11 +688,14 @@ refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), throw(Exception). -ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) -> +ensure_stats_timer(State = #v1{stats_timer = StatsTimer, + connection_state = running}) -> Self = self(), - State#v1{stats_timer = rabbit_event:ensure_stats_timer_after( + State#v1{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> emit_stats(Self) end)}. + fun() -> emit_stats(Self) end)}; +ensure_stats_timer(State) -> + State. %%-------------------------------------------------------------------------- @@ -765,7 +766,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - sock = Sock}) -> + sock = Sock, + stats_timer = StatsTimer}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -775,6 +777,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = NewConnection}), rabbit_event:notify(connection_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(StatsTimer, + fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -938,5 +942,6 @@ amqp_exception_explanation(Text, Expl) -> true -> CompleteTextBin end. -internal_emit_stats(State) -> - rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)). +internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> + rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), + State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. |