summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-06 17:29:02 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-06 17:29:02 +0100
commitba2c06a682f5b4e98f9fa66df094f98d52724246 (patch)
tree5bde2d8749f18f84dbd00a1d32cf348bbd644399
parent7dab5238a12bffca25dcc6d6e932876de8127a28 (diff)
parent53c18464e9f0355f1b7e96ca9ecbfb5edf51cf96 (diff)
downloadrabbitmq-server-ba2c06a682f5b4e98f9fa66df094f98d52724246.tar.gz
merge bug21673 into bug22896
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_backing_queue.erl17
-rw-r--r--src/rabbit_invariable_queue.erl8
-rw-r--r--src/rabbit_tests.erl15
-rw-r--r--src/rabbit_variable_queue.erl60
6 files changed, 66 insertions, 44 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 55cd126e..47748bdb 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -57,7 +57,7 @@
-spec(set_ram_duration_target/2 ::
(('undefined' | 'infinity' | number()), state()) -> state()).
-spec(ram_duration/1 :: (state()) -> {number(), state()}).
--spec(needs_sync/1 :: (state()) -> boolean()).
--spec(sync/1 :: (state()) -> state()).
+-spec(needs_idle_timeout/1 :: (state()) -> boolean()).
+-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3bf48b4c..ec59095d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -179,7 +179,7 @@ noreply(NewState) ->
next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
- case BQ:needs_sync(BQS)of
+ case BQ:needs_idle_timeout(BQS)of
true -> {ensure_sync_timer(State1), 0};
false -> {stop_sync_timer(State1), hibernate}
end.
@@ -188,7 +188,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL,
rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> BQ:sync(BQS) end]),
+ [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -822,7 +822,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:sync(BQS) end, State));
+ fun (BQS) -> BQ:idle_timeout(BQS) end, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 432d6290..b76ae11e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -113,14 +113,15 @@ behaviour_info(callbacks) ->
%% queue.
{ram_duration, 1},
- %% Should 'sync' be called as soon as the queue process can
- %% manage (either on an empty mailbox, or when a timer fires)?
- {needs_sync, 1},
-
- %% Called (eventually) after needs_sync returns 'true'. Note this
- %% may be called more than once for each 'true' returned from
- %% needs_sync.
- {sync, 1},
+ %% Should 'idle_timeout' be called as soon as the queue process
+ %% can manage (either on an empty mailbox, or when a timer
+ %% fires)?
+ {needs_idle_timeout, 1},
+
+ %% Called (eventually) after needs_idle_timeout returns
+ %% 'true'. Note this may be called more than once for each 'true'
+ %% returned from needs_idle_timeout.
+ {idle_timeout, 1},
%% Called immediately before the queue hibernates.
{handle_pre_hibernate, 1},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index a7ca20c8..e6bd11e3 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -34,8 +34,8 @@
-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1,
- handle_pre_hibernate/1, status/1]).
+ set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
+ idle_timeout/1, handle_pre_hibernate/1, status/1]).
-export([start/1]).
@@ -197,9 +197,9 @@ set_ram_duration_target(_DurationTarget, State) -> State.
ram_duration(State) -> {0, State}.
-needs_sync(_State) -> false.
+needs_idle_timeout(_State) -> false.
-sync(State) -> State.
+idle_timeout(State) -> State.
handle_pre_hibernate(State) -> State.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b28dd839..dd6a9089 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1845,7 +1845,8 @@ test_variable_queue_partial_segments_delta_thing() ->
VQ0 = fresh_variable_queue(),
VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
{_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1),
- VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2),
+ VQ3 = variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:set_ram_duration_target(0, VQ2)),
%% one segment in q3 as betas, and half a segment in delta
S3 = rabbit_variable_queue:status(VQ3),
io:format("~p~n", [S3]),
@@ -1854,7 +1855,8 @@ test_variable_queue_partial_segments_delta_thing() ->
assert_prop(S3, q3, SegmentSize),
assert_prop(S3, len, SegmentSize + HalfSegment),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
- VQ5 = variable_queue_publish(true, 1, VQ4),
+ VQ5 = variable_queue_wait_for_shuffling_end(
+ variable_queue_publish(true, 1, VQ4)),
%% should have 1 alpha, but it's in the same segment as the deltas
S5 = rabbit_variable_queue:status(VQ5),
io:format("~p~n", [S5]),
@@ -1881,6 +1883,13 @@ test_variable_queue_partial_segments_delta_thing() ->
passed.
+variable_queue_wait_for_shuffling_end(VQ) ->
+ case rabbit_variable_queue:needs_idle_timeout(VQ) of
+ true -> variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:idle_timeout(VQ));
+ false -> VQ
+ end.
+
test_queue_recover() ->
Count = 2*rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
@@ -1939,7 +1948,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere() ->
VQa2 = variable_queue_publish(false, 4, VQa1),
{VQa3, AckTags} = variable_queue_fetch(2, false, false, 4, VQa2),
VQa4 = rabbit_variable_queue:requeue(AckTags, VQa3),
- VQa5 = rabbit_variable_queue:sync(VQa4),
+ VQa5 = rabbit_variable_queue:idle_timeout(VQa4),
_VQa6 = rabbit_variable_queue:terminate(VQa5),
VQa7 = rabbit_variable_queue:init(test_queue(), true, true),
{empty, VQa8} = rabbit_variable_queue:fetch(false, VQa7),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8b9d17e7..5893385a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -36,7 +36,8 @@
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
- needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]).
+ needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
+ status/1]).
-export([start/1]).
@@ -221,7 +222,7 @@
%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
%% write more - we can always come back on the next publish to do
%% more.
--define(RAM_INDEX_BATCH_SIZE, 64).
+-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -633,10 +634,16 @@ ram_duration(State = #vqstate { egress_rate = Egress,
out_counter = 0,
ram_msg_count_prev = RamMsgCount })}.
-needs_sync(#vqstate { on_sync = {_, _, []} }) -> false;
-needs_sync(_) -> true.
+needs_idle_timeout(#vqstate { on_sync = {_, _, SFuns},
+ target_ram_msg_count = TargetRamMsgCount,
+ ram_msg_count = RamMsgCount })
+ when SFuns =/= [] orelse RamMsgCount > TargetRamMsgCount ->
+ true;
+needs_idle_timeout(State = #vqstate { ram_index_count = RamIndexCount }) ->
+ Permitted = permitted_ram_index_count(State),
+ Permitted =/= infinity andalso RamIndexCount > Permitted.
-sync(State) -> a(tx_commit_index(State)).
+idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -671,7 +678,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
persistent_count = PersistentCount,
- target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }) ->
E1 = queue:is_empty(Q1),
@@ -679,13 +685,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
ED = Delta#delta.count == 0,
E3 = bpqueue:is_empty(Q3),
E4 = queue:is_empty(Q4),
- TZ = TargetRamMsgCount == 0,
LZ = Len == 0,
true = E1 or not E3,
true = E2 or not ED,
true = ED or not E3,
- true = (E1 and E2 and E4) or not TZ,
true = LZ == (E3 and E4),
true = Len >= 0,
@@ -1117,19 +1121,21 @@ reduce_memory_use(State = #vqstate {
when TargetRamMsgCount >= RamMsgCount ->
limit_ram_index(State);
reduce_memory_use(State = #vqstate {
+ ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount }) ->
- State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
+ Reduction = lists:min([RamMsgCount - TargetRamMsgCount, ?IO_BATCH_SIZE]),
+ { Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State),
+ {_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1),
case TargetRamMsgCount of
- 0 -> push_betas_to_deltas(State1);
- _ -> limit_ram_index(State1)
+ 0 -> push_betas_to_deltas(State2);
+ _ -> limit_ram_index(State2)
end.
limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
Permitted = permitted_ram_index_count(State),
if Permitted =/= infinity andalso RamIndexCount > Permitted ->
- Reduction = lists:min([RamIndexCount - Permitted,
- ?RAM_INDEX_BATCH_SIZE]),
- case Reduction < ?RAM_INDEX_BATCH_SIZE of
+ Reduction = lists:min([RamIndexCount - Permitted, ?IO_BATCH_SIZE]),
+ case Reduction < ?IO_BATCH_SIZE of
true -> State;
false -> #vqstate { q2 = Q2, q3 = Q3,
index_state = IndexState } = State,
@@ -1227,7 +1233,9 @@ maybe_deltas_to_betas(State = #vqstate {
end
end.
-maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
+maybe_push_q1_to_betas(0, State) ->
+ {0, State};
+maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
fun queue:out/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
@@ -1238,26 +1246,30 @@ maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
Q1a, State1 = #vqstate { q2 = Q2 }) ->
State1 #vqstate { q1 = Q1a,
q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) }
- end, Q1, State).
+ end, Quota, Q1, State).
-maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
+maybe_push_q4_to_betas(0, State) ->
+ {0, State};
+maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(
fun queue:out_r/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q4a, State1 = #vqstate { q3 = Q3 }) ->
State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
q4 = Q4a }
- end, Q4, State).
+ end, Quota, Q4, State).
-maybe_push_alphas_to_betas(_Generator, _Consumer, _Q,
+maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate {
ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
- State;
-maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
+ when Quota =:= 0 orelse
+ TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
+ {Quota, State};
+maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of
- {empty, _Q} -> State;
+ {empty, _Q} ->
+ {Quota, State};
{{value, MsgStatus}, Qa} ->
{MsgStatus1 = #msg_status { msg_on_disk = true,
index_on_disk = IndexOnDisk },
@@ -1268,7 +1280,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
ram_index_count = RamIndexCount1 },
- maybe_push_alphas_to_betas(Generator, Consumer, Qa,
+ maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
Consumer(MsgStatus2, Qa, State2))
end.