summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-05-20 14:22:56 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-05-20 14:22:56 +0100
commit7effbff31ee2cee9b2a543ab10e210e74e80129d (patch)
treed7e431a690c492a84807fe528a7d97b71fbc7d31
parentbcd37ba7a6b103cb37686dc65e945a17add6b123 (diff)
parent9fd568ecdbc48e5d8caae5ef400b139a4848c5e8 (diff)
downloadrabbitmq-server-7effbff31ee2cee9b2a543ab10e210e74e80129d.tar.gz
merge bug24118 into default
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_backing_queue.erl12
-rw-r--r--src/rabbit_tests.erl10
-rw-r--r--src/rabbit_variable_queue.erl49
5 files changed, 47 insertions, 43 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index d9296bf6..1c2b94e2 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -66,8 +66,8 @@
-spec(set_ram_duration_target/2 ::
(('undefined' | 'infinity' | number()), state()) -> state()).
-spec(ram_duration/1 :: (state()) -> {number(), state()}).
--spec(needs_idle_timeout/1 :: (state()) -> boolean()).
--spec(idle_timeout/1 :: (state()) -> state()).
+-spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle').
+-spec(timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 110817a9..8091e2c2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -220,9 +220,10 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
ensure_rate_timer(
confirm_messages(MsgIds, State#q{
backing_queue_state = BQS1}))),
- case BQ:needs_idle_timeout(BQS1) of
- true -> {ensure_sync_timer(State1), 0};
- false -> {stop_sync_timer(State1), hibernate}
+ case BQ:needs_timeout(BQS1) of
+ false -> {stop_sync_timer(State1), hibernate};
+ idle -> {stop_sync_timer(State1), 0 };
+ timed -> {ensure_sync_timer(State1), 0 }
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
@@ -661,8 +662,8 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State).
+backing_queue_timeout(State = #q{backing_queue = BQ}) ->
+ run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1046,7 +1047,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
handle_cast(sync_timeout, State) ->
- noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
+ noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -1180,7 +1181,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
end;
handle_info(timeout, State) ->
- noreply(backing_queue_idle_timeout(State));
+ noreply(backing_queue_timeout(State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 0955a080..addaabc5 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -149,15 +149,15 @@ behaviour_info(callbacks) ->
%% queue.
{ram_duration, 1},
- %% Should 'idle_timeout' be called as soon as the queue process
+ %% Should 'timeout' be called as soon as the queue process
%% can manage (either on an empty mailbox, or when a timer
%% fires)?
- {needs_idle_timeout, 1},
+ {needs_timeout, 1},
- %% Called (eventually) after needs_idle_timeout returns
- %% 'true'. Note this may be called more than once for each 'true'
- %% returned from needs_idle_timeout.
- {idle_timeout, 1},
+ %% Called (eventually) after needs_timeout returns 'idle' or
+ %% 'timed'. Note this may be called more than once for each
+ %% 'idle' or 'timed' returned from needs_timeout.
+ {timeout, 1},
%% Called immediately before the queue hibernates.
{handle_pre_hibernate, 1},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3726420d..1a37cdff 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2269,10 +2269,10 @@ check_variable_queue_status(VQ0, Props) ->
VQ1.
variable_queue_wait_for_shuffling_end(VQ) ->
- case rabbit_variable_queue:needs_idle_timeout(VQ) of
- true -> variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:idle_timeout(VQ));
- false -> VQ
+ case rabbit_variable_queue:needs_timeout(VQ) of
+ false -> VQ;
+ _ -> variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:timeout(VQ))
end.
test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
@@ -2300,7 +2300,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
- VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
+ VQ5 = rabbit_variable_queue:timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7a3c17a2..8ac3ad43 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -21,7 +21,7 @@
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
- needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/3, discard/3,
multiple_routing_keys/0]).
@@ -146,7 +146,7 @@
%% any one time. This further smooths the effects of changes to the
%% target_ram_count and ensures the queue remains responsive
%% even when there is a large amount of IO work to do. The
-%% idle_timeout callback is utilised to ensure that conversions are
+%% timeout callback is utilised to ensure that conversions are
%% done as promptly as possible whilst ensuring the queue remains
%% responsive.
%%
@@ -567,20 +567,22 @@ dropwhile1(Pred, State) ->
internal_queue_out(
fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
case Pred(MsgProps) of
- true ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile1(Pred, State2);
- false ->
- %% message needs to go back into Q4 (or maybe go
- %% in for the first time if it was loaded from
- %% Q3). Also the msg contents might not be in
- %% RAM, so read them in now
- {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
- read_msg(MsgStatus, State1),
- {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }}
+ true -> {_, State2} = internal_fetch(false, MsgStatus,
+ State1),
+ dropwhile1(Pred, State2);
+ false -> {ok, in_r(MsgStatus, State1)}
end
end, State).
+in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
+ State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
+ true = queue:is_empty(Q4), %% ASSERTION
+ State #vqstate {
+ q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
+in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
+ State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }.
+
fetch(AckRequired, State) ->
internal_queue_out(
fun(MsgStatus, State1) ->
@@ -830,21 +832,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_idle_timeout(State = #vqstate { on_sync = OnSync }) ->
+needs_timeout(State = #vqstate { on_sync = OnSync }) ->
case {OnSync, needs_index_sync(State)} of
{?BLANK_SYNC, false} ->
- {Res, _State} = reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State),
- Res;
+ case reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end;
_ ->
- true
+ timed
end.
-idle_timeout(State) ->
+timeout(State) ->
a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->