diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-29 13:27:44 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-29 13:27:44 +0100 |
commit | 1d801e1f4a3675644430c773a0ef745ca8b19ad4 (patch) | |
tree | 8445d107f8ffa1a0d438583ee5830e8911942364 | |
parent | 916b6291d7a9ae3a6dbc5cf581696f8ec375db4c (diff) | |
download | rabbitmq-server-1d801e1f4a3675644430c773a0ef745ca8b19ad4.tar.gz |
Refactor: abstract some things into rabbit_event.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 40 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 51 | ||||
-rw-r--r-- | src/rabbit_event.erl | 33 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 34 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 1 |
5 files changed, 91 insertions, 68 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 78697841..a69891e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -60,8 +60,7 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer_ref, - stats_level + stats_timer }). -record(consumer, {tag, ack_required}). @@ -110,7 +109,6 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), {ok, BQ} = application:get_env(backing_queue_module), - {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, @@ -123,8 +121,7 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, - stats_timer_ref = undefined, - stats_level = StatsLevel}, hibernate, + stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -265,26 +262,19 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> State end. -ensure_stats_timer(State = #q{stats_level = none}) -> - State; -ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) -> - {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, - rabbit_amqqueue, emit_stats, [Q]), - emit_stats(State), - State#q{stats_timer_ref = TRef}; -ensure_stats_timer(State) -> - State. - -stop_stats_timer(State = #q{stats_level = none}) -> - State; -stop_stats_timer(State = #q{stats_timer_ref = undefined}) -> - emit_stats(State), - State; -stop_stats_timer(State = #q{stats_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - emit_stats(State), - State#q{stats_timer_ref = undefined}. - +ensure_stats_timer(State = #q{stats_timer = StatsTimer, + q = Q}) -> + StatsTimer1 = rabbit_event:ensure_stats_timer( + StatsTimer, + fun() -> emit_stats(State) end, + fun() -> rabbit_amqqueue:emit_stats(Q) end), + State#q{stats_timer = StatsTimer1}. + +stop_stats_timer(State = #q{stats_timer = StatsTimer}) -> + StatsTimer1 = rabbit_event:stop_stats_timer( + StatsTimer, + fun() -> emit_stats(State) end), + State#q{stats_timer = StatsTimer1}. assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c2431ccb..0b3fd380 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,7 +50,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, flow, - stats_timer_ref, stats_level}). + stats_timer}). -record(flow, {server, client, pending}). @@ -170,7 +170,6 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), - {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), State = #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -189,8 +188,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> queue_collector_pid = CollectorPid, flow = #flow{server = true, client = true, pending = none}, - stats_timer_ref = undefined, - stats_level = StatsLevel}, + stats_timer = rabbit_event:init_stats_timer()}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- [pid|?CREATION_EVENT_KEYS]]), @@ -277,7 +275,7 @@ handle_cast({flow_timeout, _Ref}, State) -> handle_cast(emit_stats, State) -> internal_emit_stats(State), - noreply(State). + {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -315,25 +313,19 @@ reply(Reply, NewState) -> noreply(NewState) -> {noreply, ensure_stats_timer(NewState), hibernate}. -ensure_stats_timer(State = #ch{stats_level = none}) -> - State; -ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) -> - internal_emit_stats(State), - {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, - rabbit_channel, emit_stats, [self()]), - State#ch{stats_timer_ref = TRef}; -ensure_stats_timer(State) -> - State. +ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> + ChPid = self(), + StatsTimer1 = rabbit_event:ensure_stats_timer( + StatsTimer, + fun() -> internal_emit_stats(State) end, + fun() -> emit_stats(ChPid) end), + State#ch{stats_timer = StatsTimer1}. -stop_stats_timer(State = #ch{stats_level = none}) -> - State; -stop_stats_timer(State = #ch{stats_timer_ref = undefined}) -> - internal_emit_stats(State), - State; -stop_stats_timer(State = #ch{stats_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - internal_emit_stats(State), - State#ch{stats_timer_ref = undefined}. +stop_stats_timer(State = #ch{stats_timer = StatsTimer}) -> + StatsTimer1 = rabbit_event:stop_stats_timer( + StatsTimer, + fun() -> internal_emit_stats(State) end), + State#ch{stats_timer = StatsTimer1}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -1194,10 +1186,11 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). -maybe_incr_stats(_QXIncs, _Measure, #ch{stats_level = none}) -> - ok; -maybe_incr_stats(QXIncs, Measure, _State) -> - [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]. +maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> + case rabbit_event:stats_level(StatsTimer) of + fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; + _ -> ok + end. incr_stats({QPid, _} = QX, Inc, Measure) -> maybe_monitor(QPid), @@ -1227,9 +1220,9 @@ update_measures(Type, QX, Inc, Measure) -> put({Type, QX}, orddict:store(Measure, Cur + Inc, Measures)). -internal_emit_stats(State = #ch{stats_level = Level}) -> +internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS], - case Level of + case rabbit_event:stats_level(StatsTimer) of coarse -> rabbit_event:notify(channel_stats, CoarseStats); fine -> diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 72a9f8ba..6cd48078 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -33,10 +33,43 @@ -include("rabbit.hrl"). +-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). +-export([stats_level/1]). -export([notify/2]). + +-record(state, {level, timer}). + %%---------------------------------------------------------------------------- +init_stats_timer() -> + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), + #state{level = StatsLevel, + timer = undefined}. + +ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) -> + State; +ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) -> + NowFun(), + {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, + erlang, apply, [TimerFun, []]), + State#state{timer = TRef}; +ensure_stats_timer(State, _NowFun, _TimerFun) -> + State. + +stop_stats_timer(State = #state{level = none}, _NowFun) -> + State; +stop_stats_timer(State = #state{timer = undefined}, NowFun) -> + NowFun(), + State; +stop_stats_timer(State = #state{timer = TRef}, NowFun) -> + {ok, cancel} = timer:cancel(TRef), + NowFun(), + State#state{timer = undefined}. + +stats_level(#state{level = Level}) -> + Level. + notify(Type, Props) -> try gen_event:notify(rabbit_event, #event{type = Type, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a2d92d2b..e1e4662b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -60,7 +60,7 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector, stats_timer_ref, stats_level}). + queue_collector, stats_timer}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -251,7 +251,6 @@ start_connection(Parent, Deb, Sock, SockTransform) -> handshake_timeout), ProfilingValue = setup_profiling(), {ok, Collector} = rabbit_queue_collector:start_link(), - {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, @@ -265,8 +264,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> recv_ref = none, connection_state = pre_init, queue_collector = Collector, - stats_timer_ref = undefined, - stats_level = StatsLevel}, + stats_timer = + rabbit_event:init_stats_timer()}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -354,8 +353,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end), mainloop(Parent, Deb, State); {'$gen_cast', emit_stats} -> - internal_emit_stats(State), - mainloop(Parent, Deb, State#v1{stats_timer_ref = undefined}); + mainloop(Parent, Deb, stop_stats_timer(State)); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -603,14 +601,22 @@ check_version(ClientVersion, ServerVersion) -> (ClientMajor == ServerMajor andalso ClientMinor >= ServerMinor). -ensure_stats_timer(State = #v1{stats_level = none}) -> - State; -ensure_stats_timer(State = #v1{stats_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after(?STATS_INTERVAL, - rabbit_reader, emit_stats, [self()]), - State#v1{stats_timer_ref = TRef}; -ensure_stats_timer(State) -> - State. +ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) -> + ReaderPid = self(), + StatsTimer1 = rabbit_event:ensure_stats_timer( + StatsTimer, + %% Don't run internal_emit_stats here, in normal use + %% ensure_stats_timer will get invoked almost immediately + %% after stop_stats_timer and we'll emit double events + fun() -> ok end, + fun() -> emit_stats(ReaderPid) end), + State#v1{stats_timer = StatsTimer1}. + +stop_stats_timer(State = #v1{stats_timer = StatsTimer}) -> + StatsTimer1 = rabbit_event:stop_stats_timer( + StatsTimer, + fun() -> internal_emit_stats(State) end), + State#v1{stats_timer = StatsTimer1}. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d2dca4a2..ef36945e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1251,6 +1251,7 @@ test_statistics() -> [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3), [] = proplists:get_value(channel_queue_exchange_stats, Event3), + rabbit_channel:shutdown(Ch), rabbit_tests_event_receiver:stop(), passed. |