diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-11 16:47:12 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-11 16:47:12 +0100 |
commit | 00096f34cd13fd52a8870d2b329b1eea3aaaebc3 (patch) | |
tree | a9c25437687107a955f3d776c1a07c49e14411cc | |
parent | d2b1d95293fab98b23a64aa7434c870398fee968 (diff) | |
download | rabbitmq-server-00096f34cd13fd52a8870d2b329b1eea3aaaebc3.tar.gz |
When under memory pressure, we want to limit the size of q2 and q3, not just of ?s. Simplify calculation of permitted length. Simplify (and correct) expansion of ?. We can now completely drop ram_index_count as we never care about just the number of ?s.
-rw-r--r-- | src/rabbit_tests.erl | 7 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 286 |
2 files changed, 132 insertions, 161 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 606c4fe8..44c9b499 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2189,12 +2189,7 @@ test_variable_queue_requeue(VQ0) -> Seq = lists:seq(1, Count), VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, Count, VQ1), - {VQ3, Acks} = lists:foldl( - fun (_N, {VQN, AckTags}) -> - {{#basic_message{}, false, AckTag, _}, VQM} = - rabbit_variable_queue:fetch(true, VQN), - {VQM, [AckTag | AckTags]} - end, {VQ2, []}, Seq), + {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2), Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 -> [Ack | Acc]; (_, Acc) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 03004e10..60c3dfd2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -248,7 +248,6 @@ ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, - ram_index_count, out_counter, in_counter, rates, @@ -336,7 +335,6 @@ target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), - ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), @@ -490,7 +488,6 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState3, len = 0, ram_msg_count = 0, - ram_index_count = 0, persistent_count = PCount1 })}. publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -718,6 +715,7 @@ needs_timeout(State) -> fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (_Quota, State1) -> {0, State1} end, + fun null_gamma_delta/1, State) of {true, _State} -> idle; {false, _State} -> false @@ -725,6 +723,21 @@ needs_timeout(State) -> true -> timed end. +null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) -> + {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2), + fun (SeqId) -> SeqId end) orelse + null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3), + fun rabbit_queue_index:next_segment_boundary/1), + State}. + +null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1, + index_on_disk = true }}, _Q}, + {{value, #msg_status { seq_id = SeqId2 }}, _Q2}, + LimitFun) -> + SeqId1 >= LimitFun(SeqId2); +null_gamma_delta_msg(_, _, _) -> + false. + timeout(State) -> a(reduce_memory_use(confirm_commit_index(State))). @@ -738,7 +751,6 @@ status(#vqstate { ram_ack_index = RAI, target_ram_count = TargetRamCount, ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, next_seq_id = NextSeqId, persistent_count = PersistentCount, rates = #rates { avg_egress = AvgEgressRate, @@ -755,7 +767,6 @@ status(#vqstate { {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, - {ram_index_count , RamIndexCount}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, @@ -774,10 +785,9 @@ discard(_Msg, _ChPid, State) -> State. %%---------------------------------------------------------------------------- a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - persistent_count = PersistentCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount }) -> + len = Len, + persistent_count = PersistentCount, + ram_msg_count = RamMsgCount }) -> E1 = ?QUEUE:is_empty(Q1), E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, @@ -793,7 +803,6 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = Len >= 0, true = PersistentCount >= 0, true = RamMsgCount >= 0, - true = RamIndexCount >= 0, State. @@ -910,44 +919,25 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> {Filtered, rabbit_queue_index:ack( Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. -%% the first arg is the older delta -combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) -> - ?BLANK_DELTA; -combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start, - count = Count, - end_seq_id = End } = B) -> - true = Start + Count =< End, %% ASSERTION - B; -combine_deltas(#delta { start_seq_id = Start, - count = Count, - end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) -> - true = Start + Count =< End, %% ASSERTION - A; -combine_deltas(#delta { start_seq_id = StartLow, - count = CountLow, - end_seq_id = EndLow }, - #delta { start_seq_id = StartHigh, - count = CountHigh, - end_seq_id = EndHigh }) -> - Count = CountLow + CountHigh, - true = (StartLow =< StartHigh) %% ASSERTIONS - andalso ((StartLow + CountLow) =< EndLow) - andalso ((StartHigh + CountHigh) =< EndHigh) - andalso ((StartLow + Count) =< EndHigh), - #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. - -expand_delta(SeqId, Delta) -> - DeltaInc = #delta { start_seq_id = SeqId, - count = 1, - end_seq_id = SeqId + 1 }, - case Delta of - ?BLANK_DELTA -> - DeltaInc; - #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId -> - combine_deltas(DeltaInc, Delta); - #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId -> - combine_deltas(Delta, DeltaInc) - end. +expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> + #delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }; +expand_delta(SeqId, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId } = Delta) + when SeqId < StartSeqId -> + true = StartSeqId + Count =< EndSeqId, %% ASSERTION + Delta #delta { start_seq_id = SeqId, count = Count + 1 }; +expand_delta(SeqId, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId } = Delta) + when SeqId >= EndSeqId -> + true = StartSeqId + Count =< EndSeqId, %% ASSERTION + Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 }; +expand_delta(_SeqId, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId } = Delta) -> + true = StartSeqId + Count + 1 =< EndSeqId, %% ASSERTION + Delta #delta { count = Count + 1 }. update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous @@ -992,7 +982,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, ram_msg_count = 0, ram_msg_count_prev = 0, ram_ack_count_prev = 0, - ram_index_count = 0, out_counter = 0, in_counter = 0, rates = blank_rate(Now, DeltaCount1), @@ -1012,12 +1001,10 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, - State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> +in_r(MsgStatus = #msg_status { msg = undefined }, + State = #vqstate { q3 = Q3, q4 = Q4 }) -> case ?QUEUE:is_empty(Q4) of - true -> State #vqstate { - q3 = ?QUEUE:in_r(MsgStatus, Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; + true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) } @@ -1335,14 +1322,12 @@ publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. -publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) -> - {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1, - #vqstate { ram_index_count = RamIndexCount, - ram_msg_count = RamMsgCount } = State1} = - maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), +publish_beta(MsgStatus, State) -> + {#msg_status { msg = Msg} = MsgStatus1, + #vqstate { ram_msg_count = RamMsgCount } = State1} = + maybe_write_to_disk(true, false, MsgStatus, State), {MsgStatus1, State1 #vqstate { - ram_msg_count = RamMsgCount + one_if(Msg =/= undefined), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}. + ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) -> @@ -1375,14 +1360,10 @@ delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> {Delta, MsgIds, State}; delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) -> lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> - {#msg_status { msg_id = MsgId, - index_on_disk = IndexOnDisk, - msg_on_disk = MsgOnDisk} = MsgStatus, - State1} = + {#msg_status { msg_id = MsgId } = MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State0), {_MsgStatus, State2} = - maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk, - MsgStatus, State1), + maybe_write_to_disk(true, true, MsgStatus, State1), {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} end, {Delta, MsgIds, State}, SeqIds). @@ -1426,10 +1407,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, +reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun, State = #vqstate {target_ram_count = infinity}) -> {false, State}; -reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, +reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, @@ -1440,7 +1421,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, avg_egress = AvgAckEgress } }) -> - {Reduce, State1} = + {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; @@ -1461,15 +1442,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, {true, State2} end, - %% AlphaBetaFun may have produced gammas that are bordering - %% delta. We must ensure that we push these into delta, which is - %% largely a no-op. This is why we call BetaDeltaFun even with a - %% quota of 0. - case chunk_size(State1 #vqstate.ram_index_count, - permitted_beta_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; - _ -> {Reduce, BetaDeltaFun(0, State1)} - end. + {Reduce1, State3} = + case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), + permitted_beta_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; + _ -> {Reduce, State1} + end, + {Reduce2, State4} = GammaDeltaFun(State3), + {Reduce1 orelse Reduce2, State4}. limit_ram_acks(0, State) -> {0, State}; @@ -1494,23 +1474,20 @@ reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun push_betas_to_deltas/2, fun limit_ram_acks/2, + fun push_gammas_to_deltas/1, State), State1. permitted_beta_count(#vqstate { len = 0 }) -> infinity; +permitted_beta_count(#vqstate { target_ram_count = 0 }) -> + rabbit_queue_index:next_segment_boundary(0); permitted_beta_count(#vqstate { len = Len, q1 = Q1, - q3 = Q3, q4 = Q4 }) -> BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), - Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len), - case ?QUEUE:out(Q3) of - {empty, _Q3} -> Permitted; - {{value, #msg_status { seq_id = MinSeqId }}, _Q3} -> - lists:max([Permitted, rabbit_queue_index:next_segment_boundary( - MinSeqId) - MinSeqId]) - end. + lists:max([BetaDeltaLen - ((BetaDeltaLen * BetaDeltaLen) div Len), + rabbit_queue_index:next_segment_boundary(0)]). chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> @@ -1518,41 +1495,35 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). -fetch_from_q3(State = #vqstate { - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4, - ram_index_count = RamIndexCount}) -> +fetch_from_q3(State = #vqstate { q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4 }) -> case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} -> - RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), - true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, - ram_index_count = RamIndexCount1 }, - State2 = - case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of - {true, true} -> - %% q3 is now empty, it wasn't before; delta is - %% still empty. So q2 must be empty, and we - %% know q4 is empty otherwise we wouldn't be - %% loading from q3. As such, we can just set - %% q4 to Q1. - true = ?QUEUE:is_empty(Q2), %% ASSERTION - true = ?QUEUE:is_empty(Q4), %% ASSERTION - State1 #vqstate { q1 = ?QUEUE:new(), - q4 = Q1 }; - {true, false} -> - maybe_deltas_to_betas(State1); - {false, _} -> - %% q3 still isn't empty, we've not touched - %% delta, so the invariants between q1, q2, - %% delta and q3 are maintained - State1 - end, + {{value, MsgStatus}, Q3a} -> + State1 = State #vqstate { q3 = Q3a }, + State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of + {true, true} -> + %% q3 is now empty, it wasn't before; + %% delta is still empty. So q2 must be + %% empty, and we know q4 is empty + %% otherwise we wouldn't be loading from + %% q3. As such, we can just set q4 to Q1. + true = ?QUEUE:is_empty(Q2), %% ASSERTION + true = ?QUEUE:is_empty(Q4), %% ASSERTION + State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; + {true, false} -> + maybe_deltas_to_betas(State1); + {false, _} -> + %% q3 still isn't empty, we've not + %% touched delta, so the invariants + %% between q1, q2, delta and q3 are + %% maintained + State1 + end, {loaded, {MsgStatus, State2}} end. @@ -1639,42 +1610,35 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount }} = + {MsgStatus1 = #msg_status { msg_on_disk = true }, + State1 = #vqstate { ram_msg_count = RamMsgCount }} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, - ram_index_count = RamIndexCount1 }, + State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 }, maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, Consumer(MsgStatus2, Qa, State2)) end. -push_betas_to_deltas(Quota, - State = #vqstate { q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState, - ram_index_count = RamIndexCount }) -> - PushState = {Quota, Delta, RamIndexCount, IndexState}, - {Q2a, PushState1} = push_betas_to_deltas( +push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3, + index_state = IndexState }) -> + PushState = {Quota, Delta, IndexState}, + {Q2a, PushState1} = push_with_limit( fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, PushState), - {Q3a, PushState2} = push_betas_to_deltas( + Q2, fun push_betas_to_deltas1/4, PushState), + {Q3a, PushState2} = push_with_limit( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, - Q3, PushState1), - {_, Delta1, RamIndexCount1, IndexState1} = PushState2, - State #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a, - index_state = IndexState1, - ram_index_count = RamIndexCount1 }. - -push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> + Q3, fun push_betas_to_deltas1/4, PushState1), + {_, Delta1, IndexState1} = PushState2, + State #vqstate { q2 = Q2a, + delta = Delta1, + q3 = Q3a, + index_state = IndexState1 }. + +push_with_limit(Generator, LimitFun, Q, PushFun, PushState) -> case ?QUEUE:is_empty(Q) of true -> {Q, PushState}; @@ -1684,16 +1648,15 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Q, PushState}; - false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) + false -> PushFun(Generator, Limit, Q, PushState) end end. +push_betas_to_deltas1(_Generator, _Limit, Q, + {0, _Delta, _IndexState} = PushState) -> + {Q, PushState}; push_betas_to_deltas1(Generator, Limit, Q, - {0, Delta, RamIndexCount, IndexState}) -> - {Qb, Delta1} = push_gammas_to_deltas(Generator, Limit, Q, Delta), - {Qb, {0, Delta1, RamIndexCount, IndexState}}; -push_betas_to_deltas1(Generator, Limit, Q, - {Quota, Delta, RamIndexCount, IndexState} = PushState) -> + {Quota, Delta, IndexState} = PushState) -> case Generator(Q) of {empty, _Q} -> {Q, PushState}; @@ -1702,20 +1665,33 @@ push_betas_to_deltas1(Generator, Limit, Q, {Q, PushState}; {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk, seq_id = SeqId }}, Qa} -> - {Quota1, RamIndexCount1, IndexState1} = + {Quota1, IndexState1} = case IndexOnDisk of - true -> {Quota, RamIndexCount, IndexState}; + true -> {Quota, IndexState}; false -> {#msg_status { index_on_disk = true }, IndexState2} = maybe_write_index_to_disk(true, MsgStatus, IndexState), - {Quota - 1, RamIndexCount - 1, IndexState2} + {Quota - 1, IndexState2} end, Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota1, Delta1, RamIndexCount1, IndexState1}) + {Quota1, Delta1, IndexState1}) end. +push_gammas_to_deltas(State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3 }) -> + {Q2a, Delta1} = push_with_limit( + fun ?QUEUE:out/1, + fun (Q2MinSeqId) -> Q2MinSeqId end, + Q2, fun push_gammas_to_deltas/4, Delta), + {Q3a, Delta2} = push_with_limit( + fun ?QUEUE:out_r/1, + fun rabbit_queue_index:next_segment_boundary/1, + Q3, fun push_gammas_to_deltas/4, Delta1), + {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}. + push_gammas_to_deltas(Generator, Limit, Q, Delta) -> case Generator(Q) of {{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1} |