summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-10-06 21:09:30 +0100
committerRob Harrop <rharrop@vmware.com>2010-10-06 21:09:30 +0100
commit1dcfb86c14f608a12e70e83cde8ddb3b748b0bab (patch)
tree7d3c2d33f0c067d6c7ef98f460abeb32021255be
parent888556965532313c67eb9be46b7997e2c489b16c (diff)
downloadrabbitmq-server-1dcfb86c14f608a12e70e83cde8ddb3b748b0bab.tar.gz
Reworked ensure_ttl_timer to use apply_after rather than send_after
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl10
2 files changed, 10 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0077e223..25a061ac 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1]).
+ set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -484,6 +484,9 @@ set_maximum_since_use(QPid, Age) ->
maybe_expire(QPid) ->
gen_server2:cast(QPid, maybe_expire).
+drop_expired(QPid) ->
+ gen_server2:cast(QPid, drop_expired).
+
on_node_down(Node) ->
[Hook() ||
Hook <- rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4aa3eb8f..65166f32 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -617,7 +617,8 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
State;
false ->
State#q{ttl_timer_ref =
- timer:send_after(TTL, self(), drop_expired)}
+ timer:apply_after(TTL, rabbit_amqqueue,
+ drop_expired, [self()])}
end;
ensure_ttl_timer(State) ->
State.
@@ -685,6 +686,7 @@ prioritise_cast(Msg, _State) ->
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
maybe_expire -> 8;
+ drop_expired -> 8;
emit_stats -> 7;
{ack, _Txn, _MsgIds, _ChPid} -> 7;
{reject, _MsgIds, _Requeue, _ChPid} -> 7;
@@ -1008,6 +1010,9 @@ handle_cast(maybe_expire, State) ->
false -> noreply(ensure_expiry_timer(State))
end;
+handle_cast(drop_expired, State) ->
+ noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+
handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
%% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
@@ -1034,9 +1039,6 @@ handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
fun (BQS) -> BQ:idle_timeout(BQS) end, State));
-handle_info(drop_expired, State) ->
- noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
-
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};