diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-02 11:47:23 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-02 11:47:23 +0100 |
commit | 7083ee430fb23719cb599e4ddc185fa97163c396 (patch) | |
tree | ecee31770658a8d3e1cd4dc3322328b9a6a520a3 | |
parent | c6b1df765ff968a0b5e0686fef143351143820ff (diff) | |
download | rabbitmq-server-7083ee430fb23719cb599e4ddc185fa97163c396.tar.gz |
convert amqqueue_process's sync timer
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 |
2 files changed, 12 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 977cd241..eae6312b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,8 +32,7 @@ %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, - sync_timeout/1, set_ram_duration_target/2, - set_maximum_since_use/2]). + set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -140,7 +139,6 @@ -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). @@ -479,9 +477,6 @@ internal_delete(QueueName) -> run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). -sync_timeout(QPid) -> - gen_server2:cast(QPid, sync_timeout). - set_ram_duration_target(QPid, Duration) -> gen_server2:cast(QPid, {set_ram_duration_target, Duration}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 403c39fc..eee899b3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -249,8 +249,7 @@ backing_queue_module(#amqqueue{arguments = Args}) -> end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -258,7 +257,7 @@ ensure_sync_timer(State) -> stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> State; stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + rabbit_misc:cancel_timer(TRef), State#q{sync_timer_ref = undefined}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> @@ -795,7 +794,6 @@ prioritise_cast(Msg, _State) -> {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; _ -> 0 end. @@ -803,11 +801,12 @@ prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8; prioritise_info(Msg, _State) -> case Msg of - update_ram_duration -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - _ -> 0 + update_ram_duration -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + sync_timeout -> 6; + _ -> 0 end. handle_call({init, Recover}, From, @@ -1013,9 +1012,6 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast(sync_timeout, State) -> - noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); - handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); @@ -1133,6 +1129,9 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ, noreply(State#q{rate_timer_ref = just_measured, backing_queue_state = BQS2}); +handle_info(sync_timeout, State) -> + noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); + handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); |