summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-29 13:27:44 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-29 13:27:44 +0100
commit1d801e1f4a3675644430c773a0ef745ca8b19ad4 (patch)
tree8445d107f8ffa1a0d438583ee5830e8911942364
parent916b6291d7a9ae3a6dbc5cf581696f8ec375db4c (diff)
downloadrabbitmq-server-1d801e1f4a3675644430c773a0ef745ca8b19ad4.tar.gz
Refactor: abstract some things into rabbit_event.
-rw-r--r--src/rabbit_amqqueue_process.erl40
-rw-r--r--src/rabbit_channel.erl51
-rw-r--r--src/rabbit_event.erl33
-rw-r--r--src/rabbit_reader.erl34
-rw-r--r--src/rabbit_tests.erl1
5 files changed, 91 insertions, 68 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 78697841..a69891e4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -60,8 +60,7 @@
sync_timer_ref,
rate_timer_ref,
expiry_timer_ref,
- stats_timer_ref,
- stats_level
+ stats_timer
}).
-record(consumer, {tag, ack_required}).
@@ -110,7 +109,6 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
{ok, BQ} = application:get_env(backing_queue_module),
- {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
@@ -123,8 +121,7 @@ init(Q) ->
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
- stats_timer_ref = undefined,
- stats_level = StatsLevel}, hibernate,
+ stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -265,26 +262,19 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
State
end.
-ensure_stats_timer(State = #q{stats_level = none}) ->
- State;
-ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) ->
- {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
- rabbit_amqqueue, emit_stats, [Q]),
- emit_stats(State),
- State#q{stats_timer_ref = TRef};
-ensure_stats_timer(State) ->
- State.
-
-stop_stats_timer(State = #q{stats_level = none}) ->
- State;
-stop_stats_timer(State = #q{stats_timer_ref = undefined}) ->
- emit_stats(State),
- State;
-stop_stats_timer(State = #q{stats_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- emit_stats(State),
- State#q{stats_timer_ref = undefined}.
-
+ensure_stats_timer(State = #q{stats_timer = StatsTimer,
+ q = Q}) ->
+ StatsTimer1 = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ fun() -> emit_stats(State) end,
+ fun() -> rabbit_amqqueue:emit_stats(Q) end),
+ State#q{stats_timer = StatsTimer1}.
+
+stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
+ StatsTimer1 = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> emit_stats(State) end),
+ State#q{stats_timer = StatsTimer1}.
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c2431ccb..0b3fd380 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -50,7 +50,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, flow,
- stats_timer_ref, stats_level}).
+ stats_timer}).
-record(flow, {server, client, pending}).
@@ -170,7 +170,6 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
- {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
State = #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -189,8 +188,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
queue_collector_pid = CollectorPid,
flow = #flow{server = true, client = true,
pending = none},
- stats_timer_ref = undefined,
- stats_level = StatsLevel},
+ stats_timer = rabbit_event:init_stats_timer()},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- [pid|?CREATION_EVENT_KEYS]]),
@@ -277,7 +275,7 @@ handle_cast({flow_timeout, _Ref}, State) ->
handle_cast(emit_stats, State) ->
internal_emit_stats(State),
- noreply(State).
+ {noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -315,25 +313,19 @@ reply(Reply, NewState) ->
noreply(NewState) ->
{noreply, ensure_stats_timer(NewState), hibernate}.
-ensure_stats_timer(State = #ch{stats_level = none}) ->
- State;
-ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) ->
- internal_emit_stats(State),
- {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
- rabbit_channel, emit_stats, [self()]),
- State#ch{stats_timer_ref = TRef};
-ensure_stats_timer(State) ->
- State.
+ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
+ ChPid = self(),
+ StatsTimer1 = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end,
+ fun() -> emit_stats(ChPid) end),
+ State#ch{stats_timer = StatsTimer1}.
-stop_stats_timer(State = #ch{stats_level = none}) ->
- State;
-stop_stats_timer(State = #ch{stats_timer_ref = undefined}) ->
- internal_emit_stats(State),
- State;
-stop_stats_timer(State = #ch{stats_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- internal_emit_stats(State),
- State#ch{stats_timer_ref = undefined}.
+stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
+ StatsTimer1 = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end),
+ State#ch{stats_timer = StatsTimer1}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -1194,10 +1186,11 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-maybe_incr_stats(_QXIncs, _Measure, #ch{stats_level = none}) ->
- ok;
-maybe_incr_stats(QXIncs, Measure, _State) ->
- [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs].
+maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
+ case rabbit_event:stats_level(StatsTimer) of
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
+ end.
incr_stats({QPid, _} = QX, Inc, Measure) ->
maybe_monitor(QPid),
@@ -1227,9 +1220,9 @@ update_measures(Type, QX, Inc, Measure) ->
put({Type, QX},
orddict:store(Measure, Cur + Inc, Measures)).
-internal_emit_stats(State = #ch{stats_level = Level}) ->
+internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
- case Level of
+ case rabbit_event:stats_level(StatsTimer) of
coarse ->
rabbit_event:notify(channel_stats, CoarseStats);
fine ->
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 72a9f8ba..6cd48078 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -33,10 +33,43 @@
-include("rabbit.hrl").
+-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
+-export([stats_level/1]).
-export([notify/2]).
+
+-record(state, {level, timer}).
+
%%----------------------------------------------------------------------------
+init_stats_timer() ->
+ {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
+ #state{level = StatsLevel,
+ timer = undefined}.
+
+ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
+ State;
+ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
+ NowFun(),
+ {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
+ erlang, apply, [TimerFun, []]),
+ State#state{timer = TRef};
+ensure_stats_timer(State, _NowFun, _TimerFun) ->
+ State.
+
+stop_stats_timer(State = #state{level = none}, _NowFun) ->
+ State;
+stop_stats_timer(State = #state{timer = undefined}, NowFun) ->
+ NowFun(),
+ State;
+stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
+ {ok, cancel} = timer:cancel(TRef),
+ NowFun(),
+ State#state{timer = undefined}.
+
+stats_level(#state{level = Level}) ->
+ Level.
+
notify(Type, Props) ->
try
gen_event:notify(rabbit_event, #event{type = Type,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a2d92d2b..e1e4662b 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -60,7 +60,7 @@
%---------------------------------------------------------------------------
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector, stats_timer_ref, stats_level}).
+ queue_collector, stats_timer}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -251,7 +251,6 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
handshake_timeout),
ProfilingValue = setup_profiling(),
{ok, Collector} = rabbit_queue_collector:start_link(),
- {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -265,8 +264,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
recv_ref = none,
connection_state = pre_init,
queue_collector = Collector,
- stats_timer_ref = undefined,
- stats_level = StatsLevel},
+ stats_timer =
+ rabbit_event:init_stats_timer()},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -354,8 +353,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end),
mainloop(Parent, Deb, State);
{'$gen_cast', emit_stats} ->
- internal_emit_stats(State),
- mainloop(Parent, Deb, State#v1{stats_timer_ref = undefined});
+ mainloop(Parent, Deb, stop_stats_timer(State));
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -603,14 +601,22 @@ check_version(ClientVersion, ServerVersion) ->
(ClientMajor == ServerMajor andalso
ClientMinor >= ServerMinor).
-ensure_stats_timer(State = #v1{stats_level = none}) ->
- State;
-ensure_stats_timer(State = #v1{stats_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- rabbit_reader, emit_stats, [self()]),
- State#v1{stats_timer_ref = TRef};
-ensure_stats_timer(State) ->
- State.
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ ReaderPid = self(),
+ StatsTimer1 = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ %% Don't run internal_emit_stats here, in normal use
+ %% ensure_stats_timer will get invoked almost immediately
+ %% after stop_stats_timer and we'll emit double events
+ fun() -> ok end,
+ fun() -> emit_stats(ReaderPid) end),
+ State#v1{stats_timer = StatsTimer1}.
+
+stop_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ StatsTimer1 = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end),
+ State#v1{stats_timer = StatsTimer1}.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d2dca4a2..ef36945e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1251,6 +1251,7 @@ test_statistics() ->
[{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3),
[] = proplists:get_value(channel_queue_exchange_stats, Event3),
+ rabbit_channel:shutdown(Ch),
rabbit_tests_event_receiver:stop(),
passed.