diff options
author | Rob Harrop <rharrop@vmware.com> | 2010-10-06 21:09:30 +0100 |
---|---|---|
committer | Rob Harrop <rharrop@vmware.com> | 2010-10-06 21:09:30 +0100 |
commit | 1dcfb86c14f608a12e70e83cde8ddb3b748b0bab (patch) | |
tree | 7d3c2d33f0c067d6c7ef98f460abeb32021255be | |
parent | 888556965532313c67eb9be46b7997e2c489b16c (diff) | |
download | rabbitmq-server-1dcfb86c14f608a12e70e83cde8ddb3b748b0bab.tar.gz |
Reworked ensure_ttl_timer to use apply_after rather than send_after
-rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 |
2 files changed, 10 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0077e223..25a061ac 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1]). + set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -484,6 +484,9 @@ set_maximum_since_use(QPid, Age) -> maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). +drop_expired(QPid) -> + gen_server2:cast(QPid, drop_expired). + on_node_down(Node) -> [Hook() || Hook <- rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4aa3eb8f..65166f32 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -617,7 +617,8 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, State; false -> State#q{ttl_timer_ref = - timer:send_after(TTL, self(), drop_expired)} + timer:apply_after(TTL, rabbit_amqqueue, + drop_expired, [self()])} end; ensure_ttl_timer(State) -> State. @@ -685,6 +686,7 @@ prioritise_cast(Msg, _State) -> {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; maybe_expire -> 8; + drop_expired -> 8; emit_stats -> 7; {ack, _Txn, _MsgIds, _ChPid} -> 7; {reject, _MsgIds, _Requeue, _ChPid} -> 7; @@ -1008,6 +1010,9 @@ handle_cast(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; +handle_cast(drop_expired, State) -> + noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); + handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), @@ -1034,9 +1039,6 @@ handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( fun (BQS) -> BQ:idle_timeout(BQS) end, State)); -handle_info(drop_expired, State) -> - noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); - handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; |