diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-12 20:55:20 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-12 20:55:20 +0100 |
commit | 0e3b7b08aec68f9ec177d97cf7250bacdb2caa48 (patch) | |
tree | d906e0da648b084aaa57b3da2a590a75cf97351f | |
parent | 440329710e81289609f96d9997c7a4c5cc44efc1 (diff) | |
download | rabbitmq-server-0e3b7b08aec68f9ec177d97cf7250bacdb2caa48.tar.gz |
Because I've got rid of ram_index_count (i.e. betas count), and because the guard around whether or not betas_to_deltas is now based on q2 and q3 len rather than ram_index_count, we no longer need to have the separate gammas_to_deltas fun, which means we should no longer have the problem with it being called too often as we should be able to rely on the chunk stuff working out.
-rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
1 files changed, 20 insertions, 48 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9951359a..d418a10f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -756,7 +756,6 @@ needs_timeout(State) -> fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (_Quota, State1) -> {0, State1} end, - fun null_gamma_delta/1, State) of {true, _State} -> idle; {false, _State} -> false @@ -764,21 +763,6 @@ needs_timeout(State) -> true -> timed end. -null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) -> - {null_gamma_delta_msg(?QUEUE:peek(Q2), ?QUEUE:peek(Q2), - fun (SeqId) -> SeqId end) orelse - null_gamma_delta_msg(?QUEUE:peek_r(Q3), ?QUEUE:peek(Q3), - fun rabbit_queue_index:next_segment_boundary/1), - State}. - -null_gamma_delta_msg({value, #msg_status { seq_id = SeqId1, - index_on_disk = true }}, - {value, #msg_status { seq_id = SeqId2 }}, - LimitFun) -> - LimitFun =:= undefined orelse SeqId1 >= LimitFun(SeqId2); -null_gamma_delta_msg(_, _, _) -> - false. - timeout(State) -> a(reduce_memory_use(confirm_commit_index(State))). @@ -1448,10 +1432,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun, +reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, State = #vqstate {target_ram_count = infinity}) -> {false, State}; -reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun, +reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, @@ -1483,14 +1467,11 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun, {true, State2} end, - {Reduce1, State3} = - case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - permitted_beta_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; - _ -> {Reduce, State1} - end, - {Reduce2, State4} = GammaDeltaFun(State3), - {Reduce1 orelse Reduce2, State4}. + case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), + permitted_beta_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; + _ -> {Reduce, State1} + end. limit_ram_acks(0, State) -> {0, State}; @@ -1515,7 +1496,6 @@ reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun push_betas_to_deltas/2, fun limit_ram_acks/2, - fun push_gammas_to_deltas/1, State), State1. @@ -1664,31 +1644,36 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState }) -> PushState = {Quota, Delta, IndexState}, - {Q3a, PushState1} = push_with_limit( + {Q3a, PushState1} = push_betas_to_deltas( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, - Q3, fun push_betas_to_deltas1/4, PushState), - {Q2a, PushState2} = push_with_limit( + Q3, PushState), + {Q2a, PushState2} = push_betas_to_deltas( fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, fun push_betas_to_deltas1/4, PushState1), + Q2, PushState1), {_, Delta1, IndexState1} = PushState2, State #vqstate { q2 = Q2a, delta = Delta1, q3 = Q3a, index_state = IndexState1 }. -push_with_limit(Generator, LimitFun, Q, PushFun, PushState) -> +push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of true -> {Q, PushState}; false -> - {{value, #msg_status { seq_id = MinSeqId }}, _Qa} = ?QUEUE:out(Q), - {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q), + {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q), + {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Q, PushState}; - false -> PushFun(Generator, Limit, Q, PushState) + false -> {Q1, {Quota, Delta, IndexState}} = + push_betas_to_deltas1( + Generator, Limit, Q, PushState), + {Q2, Delta1} = + push_gammas_to_deltas(Generator, Limit, Q1, Delta), + {Q2, {Quota, Delta1, IndexState}} end end. @@ -1719,19 +1704,6 @@ push_betas_to_deltas1(Generator, Limit, Q, {Quota1, Delta1, IndexState1}) end. -push_gammas_to_deltas(State = #vqstate { q2 = Q2, - delta = Delta, - q3 = Q3 }) -> - {Q2a, Delta1} = push_with_limit( - fun ?QUEUE:out/1, - fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, fun push_gammas_to_deltas/4, Delta), - {Q3a, Delta2} = push_with_limit( - fun ?QUEUE:out_r/1, - fun rabbit_queue_index:next_segment_boundary/1, - Q3, fun push_gammas_to_deltas/4, Delta1), - {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}. - push_gammas_to_deltas(Generator, Limit, Q, Delta) -> case Generator(Q) of {{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1} |