summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 11:11:14 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 11:11:14 +0100
commit5d2fba42902b1a09eb911dc77fd1a0b5c7c8f969 (patch)
tree915bcc49e7b1e23fcf05c3f607d5927776dcfad7
parent62b6ce3e78a42a69a336049e5872bf4686a8543a (diff)
downloadrabbitmq-server-5d2fba42902b1a09eb911dc77fd1a0b5c7c8f969.tar.gz
refactor
-rw-r--r--src/rabbit_amqqueue.erl15
-rw-r--r--src/rabbit_amqqueue_process.erl25
2 files changed, 13 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6024db65..977cd241 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,8 +32,8 @@
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
- sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
+ sync_timeout/1, set_ram_duration_target/2,
+ set_maximum_since_use/2]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -141,10 +141,8 @@
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
--spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
--spec(maybe_expire/1 :: (pid()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
@@ -484,21 +482,12 @@ run_backing_queue(QPid, Mod, Fun) ->
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
-update_ram_duration(QPid) ->
- gen_server2:cast(QPid, update_ram_duration).
-
set_ram_duration_target(QPid, Duration) ->
gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-maybe_expire(QPid) ->
- gen_server2:cast(QPid, maybe_expire).
-
-drop_expired(QPid) ->
- gen_server2:cast(QPid, drop_expired).
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e787fa84..c7b9eaab 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -275,13 +275,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- _ = erlang:cancel_timer(TRef),
+ erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
- _ = erlang:cancel_timer(TRef),
+ erlang:cancel_timer(TRef),
State#q{expiry_timer_ref = undefined}.
%% We wish to expire only when there are no consumers *and* the expiry
@@ -787,12 +787,9 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
emit_stats -> 7;
{ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
@@ -1096,15 +1093,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
-handle_info(update_ram_duration, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
-
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
@@ -1137,6 +1125,15 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
+handle_info(update_ram_duration, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State#q{rate_timer_ref = just_measured,
+ backing_queue_state = BQS2});
+
handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));