summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 11:47:23 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 11:47:23 +0100
commit7083ee430fb23719cb599e4ddc185fa97163c396 (patch)
treeecee31770658a8d3e1cd4dc3322328b9a6a520a3
parentc6b1df765ff968a0b5e0686fef143351143820ff (diff)
downloadrabbitmq-server-7083ee430fb23719cb599e4ddc185fa97163c396.tar.gz
convert amqqueue_process's sync timer
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl23
2 files changed, 12 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 977cd241..eae6312b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,8 +32,7 @@
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
- sync_timeout/1, set_ram_duration_target/2,
- set_maximum_since_use/2]).
+ set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -140,7 +139,6 @@
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
@@ -479,9 +477,6 @@ internal_delete(QueueName) ->
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
-sync_timeout(QPid) ->
- gen_server2:cast(QPid, sync_timeout).
-
set_ram_duration_target(QPid, Duration) ->
gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 403c39fc..eee899b3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -249,8 +249,7 @@ backing_queue_module(#amqqueue{arguments = Args}) ->
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -258,7 +257,7 @@ ensure_sync_timer(State) ->
stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
State;
stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
+ rabbit_misc:cancel_timer(TRef),
State#q{sync_timer_ref = undefined}.
ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
@@ -795,7 +794,6 @@ prioritise_cast(Msg, _State) ->
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
- sync_timeout -> 6;
_ -> 0
end.
@@ -803,11 +801,12 @@ prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
#q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
prioritise_info(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- _ -> 0
+ update_ram_duration -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ sync_timeout -> 6;
+ _ -> 0
end.
handle_call({init, Recover}, From,
@@ -1013,9 +1012,6 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast(sync_timeout, State) ->
- noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));
-
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
noreply(deliver_or_enqueue(Delivery, State));
@@ -1133,6 +1129,9 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ,
noreply(State#q{rate_timer_ref = just_measured,
backing_queue_state = BQS2});
+handle_info(sync_timeout, State) ->
+ noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));
+
handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));