diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-11 13:37:21 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-11 13:37:21 +0100 |
commit | 78979a2ea9b563d8355f5287e2d6ff7ed8d03a72 (patch) | |
tree | 66b1d6f6f1c844dc7ae01f8935654b3015390da6 | |
parent | e7c03cc33e7ccde59a8fcf9a150c4c8d9d710ee9 (diff) | |
download | rabbitmq-server-78979a2ea9b563d8355f5287e2d6ff7ed8d03a72.tar.gz |
Ensure that we push ?-bordering ?s into ? eagerly.
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
2 files changed, 22 insertions, 10 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index eb456e8c..606c4fe8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2174,7 +2174,7 @@ wait_for_confirms(Unconfirmed) -> test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, - %%fun test_variable_queue_partial_segments_delta_thing/1, + fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_dropwhile/1, @@ -2322,7 +2322,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), VQ3 = check_variable_queue_status( rabbit_variable_queue:set_ram_duration_target(0, VQ2), - %% one segment in q3 as betas, and half a segment in delta + %% one segment in q3, and half a segment in delta [{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, {q3, SegmentSize}, {len, SegmentSize + HalfSegment}]), @@ -2338,7 +2338,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> SegmentSize + HalfSegment + 1, VQ5), VQ7 = check_variable_queue_status( VQ6, - %% the half segment should now be in q3 as betas + %% the half segment should now be in q3 [{q1, 1}, {delta, {delta, undefined, 0, undefined}}, {q3, HalfSegment}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 45c6a17f..8151d37a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1467,10 +1467,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, {true, State2} end, + %% AlphaBetaFun may have produced gammas that are bordering + %% delta. We must ensure that we push these into delta, which is + %% largely a no-op. This is why we call BetaDeltaFun even with a + %% quota of 0. case chunk_size(State1 #vqstate.ram_index_count, permitted_beta_count(State1)) of ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; - _ -> {Reduce, State1} + _ -> {Reduce, BetaDeltaFun(0, State1)} end. limit_ram_acks(0, State) -> @@ -1676,9 +1680,6 @@ push_betas_to_deltas(Quota, index_state = IndexState1, ram_index_count = RamIndexCount1 }. -push_betas_to_deltas(_Generator, _LimitFun, Q, - {0, _Delta, _RamIndexCount, _IndexState} = PushState) -> - {Q, PushState}; push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of true -> @@ -1693,9 +1694,10 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> end end. -push_betas_to_deltas1(_Generator, _Limit, Q, - {0, _Delta, _RamIndexCount, _IndexState} = PushState) -> - {Q, PushState}; +push_betas_to_deltas1(Generator, Limit, Q, + {0, Delta, RamIndexCount, IndexState}) -> + {Qb, Delta1} = push_gammas_to_deltas(Generator, Limit, Q, Delta), + {Qb, {0, Delta1, RamIndexCount, IndexState}}; push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, RamIndexCount, IndexState} = PushState) -> case Generator(Q) of @@ -1720,6 +1722,16 @@ push_betas_to_deltas1(Generator, Limit, Q, {Quota1, Delta1, RamIndexCount1, IndexState1}) end. +push_gammas_to_deltas(Generator, Limit, Q, Delta) -> + case Generator(Q) of + {{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1} + when SeqId >= Limit -> + push_gammas_to_deltas(Generator, Limit, Q1, + expand_delta(SeqId, Delta)); + {_, _Q} -> + {Q, Delta} + end. + %%---------------------------------------------------------------------------- %% Upgrading %%---------------------------------------------------------------------------- |