summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-12-11 21:55:00 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-12-11 21:55:00 +0000
commit4d90b2208ca460ad285f1c38b45f0749c88d7223 (patch)
tree73b45def08ef8e9cda28f3189a862e96a4470802
parent0d6becf0d292553430d0c5c93dd2ec0984e0ca27 (diff)
downloadrabbitmq-server-4d90b2208ca460ad285f1c38b45f0749c88d7223.tar.gz
make sure the stats and rate timers don't keep each other alive
-rw-r--r--src/rabbit_amqqueue_process.erl28
1 files changed, 10 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f4459e45..9381774d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -284,21 +284,17 @@ terminate_shutdown(Fun, State) ->
end.
reply(Reply, NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {reply, Reply, NewState1, Timeout}.
+ {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
noreply(NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {noreply, NewState1, Timeout}.
+ {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = ensure_stats_timer(
- ensure_rate_timer(
- confirm_messages(MsgIds, State#q{
- backing_queue_state = BQS1}))),
+ State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -328,15 +324,11 @@ ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
TRef = erlang:send_after(
?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
State.
stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
State;
-stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
@@ -1322,10 +1314,9 @@ handle_info(drop_expired, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- {noreply, State1, Timeout} = noreply(State),
- %% Need to reset *after* we've been through noreply/1 so we do not
- %% just create another timer always and therefore never hibernate
- {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout};
+ {State1, Timeout} = next_state(rabbit_event:reset_stats_timer(
+ State, #q.stats_timer)),
+ {noreply, State1, Timeout};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -1349,8 +1340,9 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
+ {State1, Timeout} = next_state(State#q{rate_timer_ref = undefined,
+ backing_queue_state = BQS2}),
+ {noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));