summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn DeTreville <jdetreville@vmware.com>2011-02-07 15:43:11 -0800
committerJohn DeTreville <jdetreville@vmware.com>2011-02-07 15:43:11 -0800
commite8344dabf107d074162aa786af539b9d60c09378 (patch)
tree2bd4fa139239439ed298ab19559803dbce2cb40f
parent985dac102d3e4bc2b155fec579a7986c2eebc023 (diff)
downloadrabbitmq-server-e8344dabf107d074162aa786af539b9d60c09378.tar.gz
Updated rabbit_amqqueue_process. Ready to test.
-rw-r--r--src/rabbit_amqqueue_process.erl66
1 files changed, 32 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c7e28fe..f707e3e1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -223,10 +223,8 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
State#q{sync_timer_ref = undefined}.
ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
+ TRef = erlang:send_after(
+ ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
State#q{rate_timer_ref = TRef};
ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
State#q{rate_timer_ref = undefined};
@@ -238,13 +236,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State#q{expiry_timer_ref = undefined}.
%% We wish to expire only when there are no consumers *and* the expiry
@@ -256,18 +254,16 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
case is_unused(State) of
true ->
NewState = stop_expiry_timer(State),
- {ok, TRef} = timer:apply_after(
- Expires, rabbit_amqqueue, maybe_expire, [self()]),
+ TRef = erlang:send_after(Expires, self(), maybe_expire),
NewState#q{expiry_timer_ref = TRef};
false ->
State
end.
-ensure_stats_timer(State = #q{stats_timer = StatsTimer,
- q = Q}) ->
- State#q{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer,
- fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
+ensure_stats_timer(State = #q { stats_timer = StatsTimer,
+ q = #amqqueue { pid = QPid }}) ->
+ State #q { stats_timer = rabbit_event:ensure_stats_timer(
+ StatsTimer, QPid, emit_stats) }.
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -677,8 +673,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
when TTL =/= undefined ->
case BQ:is_empty(BQS) of
true -> State;
- false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired,
- [self()]),
+ false -> TRef = erlang:send_after(TTL, self(), drop_expired),
State#q{ttl_timer_ref = TRef}
end;
ensure_ttl_timer(State) ->
@@ -764,13 +759,9 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
{ack, _Txn, _MsgIds, _ChPid} -> 7;
{reject, _MsgIds, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
@@ -782,7 +773,14 @@ prioritise_cast(Msg, _State) ->
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
#q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
-prioritise_info(_Msg, _State) -> 0.
+prioritise_info(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ _ -> 0
+ end.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -1085,15 +1083,6 @@ handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
noreply(State);
-handle_cast(update_ram_duration, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
-
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
@@ -1101,24 +1090,33 @@ handle_cast({set_ram_duration_target, Duration},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
+ noreply(State).
+
+handle_info(update_ram_duration, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State#q{rate_timer_ref = just_measured,
+ backing_queue_state = BQS2});
-handle_cast(maybe_expire, State) ->
+handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
-handle_cast(drop_expired, State) ->
+handle_info(drop_expired, State) ->
noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
-handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) ->
%% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
assert_invariant(State1),
- {noreply, State1, hibernate}.
+ {noreply, State1, hibernate};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->