diff options
author | John DeTreville <jdetreville@vmware.com> | 2011-02-07 15:43:11 -0800 |
---|---|---|
committer | John DeTreville <jdetreville@vmware.com> | 2011-02-07 15:43:11 -0800 |
commit | e8344dabf107d074162aa786af539b9d60c09378 (patch) | |
tree | 2bd4fa139239439ed298ab19559803dbce2cb40f | |
parent | 985dac102d3e4bc2b155fec579a7986c2eebc023 (diff) | |
download | rabbitmq-server-e8344dabf107d074162aa786af539b9d60c09378.tar.gz |
Updated rabbit_amqqueue_process. Ready to test.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 66 |
1 files changed, 32 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c7e28fe..f707e3e1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -223,10 +223,8 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> State#q{sync_timer_ref = undefined}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), + TRef = erlang:send_after( + ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration), State#q{rate_timer_ref = TRef}; ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; @@ -238,13 +236,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{rate_timer_ref = undefined}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{expiry_timer_ref = undefined}. %% We wish to expire only when there are no consumers *and* the expiry @@ -256,18 +254,16 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> case is_unused(State) of true -> NewState = stop_expiry_timer(State), - {ok, TRef} = timer:apply_after( - Expires, rabbit_amqqueue, maybe_expire, [self()]), + TRef = erlang:send_after(Expires, self(), maybe_expire), NewState#q{expiry_timer_ref = TRef}; false -> State end. -ensure_stats_timer(State = #q{stats_timer = StatsTimer, - q = Q}) -> - State#q{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> rabbit_amqqueue:emit_stats(Q) end)}. +ensure_stats_timer(State = #q { stats_timer = StatsTimer, + q = #amqqueue { pid = QPid }}) -> + State #q { stats_timer = rabbit_event:ensure_stats_timer( + StatsTimer, QPid, emit_stats) }. assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> @@ -677,8 +673,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, when TTL =/= undefined -> case BQ:is_empty(BQS) of true -> State; - false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired, - [self()]), + false -> TRef = erlang:send_after(TTL, self(), drop_expired), State#q{ttl_timer_ref = TRef} end; ensure_ttl_timer(State) -> @@ -764,13 +759,9 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; {ack, _Txn, _MsgIds, _ChPid} -> 7; {reject, _MsgIds, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; @@ -782,7 +773,14 @@ prioritise_cast(Msg, _State) -> prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8; -prioritise_info(_Msg, _State) -> 0. +prioritise_info(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + _ -> 0 + end. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -1085,15 +1083,6 @@ handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); -handle_cast(update_ram_duration, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State#q{rate_timer_ref = just_measured, - backing_queue_state = BQS2}); - handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), @@ -1101,24 +1090,33 @@ handle_cast({set_ram_duration_target, Duration}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); + noreply(State). + +handle_info(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State#q{rate_timer_ref = just_measured, + backing_queue_state = BQS2}); -handle_cast(maybe_expire, State) -> +handle_info(maybe_expire, State) -> case is_unused(State) of true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; -handle_cast(drop_expired, State) -> +handle_info(drop_expired, State) -> noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); -handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1, hibernate}. + {noreply, State1, hibernate}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> |