diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-08 14:12:26 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-08 14:12:26 +0100 |
commit | ce51765ac7299ea27796d57c3903a15e4f4120ca (patch) | |
tree | 131df7ebfbb697ee699f55a6f07cc85d51470284 | |
parent | 644af26c9866e6af05dd58cd1cd02b39c8933647 (diff) | |
download | rabbitmq-server-bug24038.tar.gz |
Abstract out mainly timer maintanence functionsbug24038
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 54 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process_utils.erl | 99 |
2 files changed, 118 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2b0fe17e..435edc07 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -21,8 +21,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined, needs_confirming = false}). @@ -226,37 +224,27 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> false -> {stop_sync_timer(State1), hibernate} end. -ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), - State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> - State. + rabbit_amqqueue_process_utils:ensure_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +stop_sync_timer(State) -> + rabbit_amqqueue_process_utils:stop_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +sync_timer_getter(State) -> State#q.sync_timer_ref. +sync_timer_setter(Timer, State) -> State#q{sync_timer_ref = Timer}. -stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> - State; -stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#q{sync_timer_ref = undefined}. - -ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), - State#q{rate_timer_ref = TRef}; -ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; ensure_rate_timer(State) -> - State. + rabbit_amqqueue_process_utils:ensure_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). -stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> - State; -stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; -stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#q{rate_timer_ref = undefined}. +stop_rate_timer(State) -> + rabbit_amqqueue_process_utils:stop_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). + +rate_timer_getter(State) -> State#q.rate_timer_ref. +rate_timer_setter(Timer, State) -> State#q{rate_timer_ref = Timer}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; @@ -1160,15 +1148,11 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS, stats_timer = StatsTimer}) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - BQS3 = BQ:handle_pre_hibernate(BQS2), + BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS), rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - backing_queue_state = BQS3}, + backing_queue_state = BQS1}, {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_amqqueue_process_utils.erl b/src/rabbit_amqqueue_process_utils.erl new file mode 100644 index 00000000..feb2a79c --- /dev/null +++ b/src/rabbit_amqqueue_process_utils.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 201-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_amqqueue_process_utils). + +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). + +-export([backing_queue_pre_hibernate/2, + ensure_sync_timer/3, stop_sync_timer/3, + ensure_rate_timer/3, stop_rate_timer/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(bq_mod() :: atom()). +-type(bq_state() :: any()). %% A good example of dialyzer's shortcomings + +-type(queue_state() :: any()). %% Another such example. +-type(getter(A) :: fun ((queue_state()) -> A)). +-type(setter(A) :: fun ((A, queue_state()) -> queue_state())). + +-type(tref() :: term()). %% Sigh. According to timer docs. + +-spec(backing_queue_pre_hibernate/2 :: (bq_mod(), bq_state()) -> bq_state()). + +-spec(ensure_sync_timer/3 :: (getter('undefined'|tref()), + setter('undefined'|tref()), + queue_state()) -> queue_state()). +-spec(stop_sync_timer/3 :: (getter('undefined'|tref()), + setter('undefined'|tref()), + queue_state()) -> queue_state()). + +-spec(ensure_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()), + setter('undefined'|'just_measured'|tref()), + queue_state()) -> queue_state()). +-spec(stop_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()), + setter('undefined'|'just_measured'|tref()), + queue_state()) -> queue_state()). + +-endif. + +%%---------------------------------------------------------------------------- + +backing_queue_pre_hibernate(BQ, BQS) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQ:handle_pre_hibernate(BQS2). + +ensure_sync_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, + sync_timeout, [self()]), + Setter(TRef, State); + _TRef -> State + end. + +stop_sync_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> State; + TRef -> {ok, cancel} = timer:cancel(TRef), + Setter(undefined, State) + end. + +ensure_rate_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> {ok, TRef} = + timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, + update_ram_duration, [self()]), + Setter(TRef, State); + just_measured -> Setter(undefined, State); + _TRef -> State + end. + +stop_rate_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> State; + just_measured -> Setter(undefined, State); + TRef -> {ok, cancel} = timer:cancel(TRef), + Setter(undefined, State) + end. |