diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-02 11:11:14 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-02 11:11:14 +0100 |
commit | 5d2fba42902b1a09eb911dc77fd1a0b5c7c8f969 (patch) | |
tree | 915bcc49e7b1e23fcf05c3f607d5927776dcfad7 | |
parent | 62b6ce3e78a42a69a336049e5872bf4686a8543a (diff) | |
download | rabbitmq-server-5d2fba42902b1a09eb911dc77fd1a0b5c7c8f969.tar.gz |
refactor
-rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 |
2 files changed, 13 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6024db65..977cd241 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,8 +32,8 @@ %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, - sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). + sync_timeout/1, set_ram_duration_target/2, + set_maximum_since_use/2]). -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -141,10 +141,8 @@ (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). --spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(maybe_expire/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). @@ -484,21 +482,12 @@ run_backing_queue(QPid, Mod, Fun) -> sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). -update_ram_duration(QPid) -> - gen_server2:cast(QPid, update_ram_duration). - set_ram_duration_target(QPid, Duration) -> gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -maybe_expire(QPid) -> - gen_server2:cast(QPid, maybe_expire). - -drop_expired(QPid) -> - gen_server2:cast(QPid, drop_expired). - on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e787fa84..c7b9eaab 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -275,13 +275,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - _ = erlang:cancel_timer(TRef), + erlang:cancel_timer(TRef), State#q{rate_timer_ref = undefined}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> - _ = erlang:cancel_timer(TRef), + erlang:cancel_timer(TRef), State#q{expiry_timer_ref = undefined}. %% We wish to expire only when there are no consumers *and* the expiry @@ -787,12 +787,9 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; emit_stats -> 7; {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; @@ -1096,15 +1093,6 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). -handle_info(update_ram_duration, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State#q{rate_timer_ref = just_measured, - backing_queue_state = BQS2}); - handle_info(maybe_expire, State) -> case is_unused(State) of true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), @@ -1137,6 +1125,15 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; +handle_info(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State#q{rate_timer_ref = just_measured, + backing_queue_state = BQS2}); + handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); |