summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-11 13:37:21 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-11 13:37:21 +0100
commit78979a2ea9b563d8355f5287e2d6ff7ed8d03a72 (patch)
tree66b1d6f6f1c844dc7ae01f8935654b3015390da6
parente7c03cc33e7ccde59a8fcf9a150c4c8d9d710ee9 (diff)
downloadrabbitmq-server-78979a2ea9b563d8355f5287e2d6ff7ed8d03a72.tar.gz
Ensure that we push ?-bordering ?s into ? eagerly.
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_variable_queue.erl26
2 files changed, 22 insertions, 10 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index eb456e8c..606c4fe8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2174,7 +2174,7 @@ wait_for_confirms(Unconfirmed) ->
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,
- %%fun test_variable_queue_partial_segments_delta_thing/1,
+ fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
fun test_dropwhile/1,
@@ -2322,7 +2322,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1),
VQ3 = check_variable_queue_status(
rabbit_variable_queue:set_ram_duration_target(0, VQ2),
- %% one segment in q3 as betas, and half a segment in delta
+ %% one segment in q3, and half a segment in delta
[{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}},
{q3, SegmentSize},
{len, SegmentSize + HalfSegment}]),
@@ -2338,7 +2338,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
SegmentSize + HalfSegment + 1, VQ5),
VQ7 = check_variable_queue_status(
VQ6,
- %% the half segment should now be in q3 as betas
+ %% the half segment should now be in q3
[{q1, 1},
{delta, {delta, undefined, 0, undefined}},
{q3, HalfSegment},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 45c6a17f..8151d37a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1467,10 +1467,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
{true, State2}
end,
+ %% AlphaBetaFun may have produced gammas that are bordering
+ %% delta. We must ensure that we push these into delta, which is
+ %% largely a no-op. This is why we call BetaDeltaFun even with a
+ %% quota of 0.
case chunk_size(State1 #vqstate.ram_index_count,
permitted_beta_count(State1)) of
?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
- _ -> {Reduce, State1}
+ _ -> {Reduce, BetaDeltaFun(0, State1)}
end.
limit_ram_acks(0, State) ->
@@ -1676,9 +1680,6 @@ push_betas_to_deltas(Quota,
index_state = IndexState1,
ram_index_count = RamIndexCount1 }.
-push_betas_to_deltas(_Generator, _LimitFun, Q,
- {0, _Delta, _RamIndexCount, _IndexState} = PushState) ->
- {Q, PushState};
push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
@@ -1693,9 +1694,10 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
end
end.
-push_betas_to_deltas1(_Generator, _Limit, Q,
- {0, _Delta, _RamIndexCount, _IndexState} = PushState) ->
- {Q, PushState};
+push_betas_to_deltas1(Generator, Limit, Q,
+ {0, Delta, RamIndexCount, IndexState}) ->
+ {Qb, Delta1} = push_gammas_to_deltas(Generator, Limit, Q, Delta),
+ {Qb, {0, Delta1, RamIndexCount, IndexState}};
push_betas_to_deltas1(Generator, Limit, Q,
{Quota, Delta, RamIndexCount, IndexState} = PushState) ->
case Generator(Q) of
@@ -1720,6 +1722,16 @@ push_betas_to_deltas1(Generator, Limit, Q,
{Quota1, Delta1, RamIndexCount1, IndexState1})
end.
+push_gammas_to_deltas(Generator, Limit, Q, Delta) ->
+ case Generator(Q) of
+ {{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1}
+ when SeqId >= Limit ->
+ push_gammas_to_deltas(Generator, Limit, Q1,
+ expand_delta(SeqId, Delta));
+ {_, _Q} ->
+ {Q, Delta}
+ end.
+
%%----------------------------------------------------------------------------
%% Upgrading
%%----------------------------------------------------------------------------