diff options
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 21 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 75 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 156 |
6 files changed, 136 insertions, 132 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d415b358..1bb16edb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1241,9 +1241,10 @@ handle_info(timeout, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info({bump_credit, Msg}, State) -> +handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> credit_flow:handle_bump_msg(Msg), - noreply(State); + noreply(State#q{backing_queue_state = BQ:resume(BQS)}); handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 3d88be7a..cae939ba 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -209,6 +209,9 @@ %% Called immediately before the queue hibernates. -callback handle_pre_hibernate(state()) -> state(). +%% Called when more credit has become available for credit_flow. +-callback resume(state()) -> state(). + %% Used to help prioritisation in rabbit_amqqueue_process. The rate of %% inbound messages and outbound messages at the moment. -callback msg_rates(state()) -> {float(), float()}. @@ -240,7 +243,7 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1}, + {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index b0545915..2adff7e5 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -34,7 +34,8 @@ -export([initial_state/0, command/1, precondition/2, postcondition/3, next_state/3]). --export([prop_backing_queue_test/0, publish_multiple/1, timeout/2]). +-export([prop_backing_queue_test/0, publish_multiple/1, + timeout/2, bump_credit/1]). -record(state, {bqstate, len, %% int @@ -106,6 +107,7 @@ command(S) -> {1, qc_dropwhile(S)}, {1, qc_is_empty(S)}, {1, qc_timeout(S)}, + {1, qc_bump_credit(S)}, {1, qc_purge(S)}, {1, qc_fold(S)}]). @@ -155,6 +157,9 @@ qc_is_empty(#state{bqstate = BQ}) -> qc_timeout(#state{bqstate = BQ}) -> {call, ?MODULE, timeout, [BQ, ?TIMEOUT_LIMIT]}. +qc_bump_credit(#state{bqstate = BQ}) -> + {call, ?MODULE, bump_credit, [BQ]}. + qc_purge(#state{bqstate = BQ}) -> {call, ?BQMOD, purge, [BQ]}. @@ -177,6 +182,8 @@ precondition(_S, {call, ?BQMOD, _Fun, _Arg}) -> true; precondition(_S, {call, ?MODULE, timeout, _Arg}) -> true; +precondition(_S, {call, ?MODULE, bump_credit, _Arg}) -> + true; precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) -> Len < ?QUEUE_MAXLEN. @@ -272,6 +279,8 @@ next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> S#state{bqstate = BQ}; +next_state(S, BQ, {call, ?MODULE, bump_credit, _Args}) -> + S#state{bqstate = BQ}; next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, @@ -354,6 +363,16 @@ timeout(BQ, AtMost) -> _ -> timeout(?BQMOD:timeout(BQ), AtMost - 1) end. +bump_credit(BQ) -> + case credit_flow:blocked() of + false -> BQ; + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + ?BQMOD:resume(BQ) + end + end. + qc_message_payload() -> ?SIZED(Size, resize(Size * Size, binary())). qc_routing_key() -> noshrink(binary(10)). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 7bf6bd4a..b5ae1aca 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -21,7 +21,7 @@ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, + needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -353,6 +353,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. +resume(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:resume(BQS) }. + msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:msg_rates(BQS). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ad4cd560..1552d1ec 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2413,18 +2413,19 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> fun (_N) -> <<>> end, VQ). variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> - lists:foldl( - fun (N, VQN) -> - rabbit_variable_queue:publish( - rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = case IsPersistent of - true -> 2; - false -> 1 - end}, - PayloadFun(N)), - PropFun(N, #message_properties{}), false, self(), VQN) - end, VQ, lists:seq(Start, Start + Count - 1)). + variable_queue_wait_for_shuffling_end( + lists:foldl( + fun (N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = case IsPersistent of + true -> 2; + false -> 1 + end}, + PayloadFun(N)), + PropFun(N, #message_properties{}), false, self(), VQN) + end, VQ, lists:seq(Start, Start + Count - 1))). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> @@ -2436,6 +2437,10 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). +variable_queue_set_ram_duration_target(Duration, VQ) -> + variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:set_ram_duration_target(Duration, VQ)). + assert_prop(List, Prop, Value) -> Value = proplists:get_value(Prop, List). @@ -2550,10 +2555,10 @@ requeue_one_by_one(Acks, VQ) -> %% Create a vq with messages in q1, delta, and q3, and holes (in the %% form of pending acks) in the latter two. variable_queue_with_holes(VQ0) -> - Interval = 64, + Interval = 2048, %% should match vq:IO_BATCH_SIZE Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval, Seq = lists:seq(1, Count), - VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ1 = variable_queue_set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish( false, 1, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), @@ -2567,12 +2572,12 @@ variable_queue_with_holes(VQ0) -> {_MsgIds, VQ4} = rabbit_variable_queue:requeue( Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3), VQ5 = requeue_one_by_one(Subset1, VQ4), - %% by now we have some messages (and holes) in delt + %% by now we have some messages (and holes) in delta VQ6 = requeue_one_by_one(Subset2, VQ5), - VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6), + VQ7 = variable_queue_set_ram_duration_target(infinity, VQ6), %% add the q1 tail VQ8 = variable_queue_publish( - true, Count + 1, 64, + true, Count + 1, Interval, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), %% assertions [false = case V of @@ -2581,11 +2586,11 @@ variable_queue_with_holes(VQ0) -> _ -> false end || {K, V} <- rabbit_variable_queue:status(VQ8), lists:member(K, [q1, delta, q3])], - Depth = Count + 64, + Depth = Count + Interval, Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), - {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}. + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}. test_variable_queue_requeue(VQ0) -> {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = @@ -2608,11 +2613,11 @@ test_variable_queue_requeue(VQ0) -> %% requeue from ram_pending_ack into q3, move to delta and then empty queue test_variable_queue_requeue_ram_beta(VQ0) -> Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2, - VQ1 = rabbit_tests:variable_queue_publish(false, Count, VQ0), + VQ1 = variable_queue_publish(false, Count, VQ0), {VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1), {Back, Front} = lists:split(Count div 2, AcksR), {_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2), - VQ4 = rabbit_variable_queue:set_ram_duration_target(0, VQ3), + VQ4 = variable_queue_set_ram_duration_target(0, VQ3), {_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4), VQ6 = requeue_one_by_one(Front, VQ5), {VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6), @@ -2655,7 +2660,7 @@ test_variable_queue_ack_limiting(VQ0) -> %% ensure all acks go to disk on 0 duration target VQ6 = check_variable_queue_status( - rabbit_variable_queue:set_ram_duration_target(0, VQ5), + variable_queue_set_ram_duration_target(0, VQ5), [{len, Len div 2}, {target_ram_count, 0}, {ram_msg_count, 0}, @@ -2738,9 +2743,9 @@ test_fetchwhile_varying_ram_duration(VQ0) -> test_dropfetchwhile_varying_ram_duration(Fun, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), + VQ2 = variable_queue_set_ram_duration_target(0, VQ1), VQ3 = Fun(VQ2), - VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), + VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), VQ6 = Fun(VQ5), VQ6. @@ -2761,7 +2766,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), io:format("~p:~n~p~n", [Duration1, rabbit_variable_queue:status(VQ5)]), - VQ6 = rabbit_variable_queue:set_ram_duration_target( + VQ6 = variable_queue_set_ram_duration_target( Duration1, VQ5), publish_fetch_and_ack(Churn, Len, VQ6) end, VQ3, [Duration / 4, 0, Duration / 4, infinity]), @@ -2789,12 +2794,12 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0), {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), VQ3 = check_variable_queue_status( - rabbit_variable_queue:set_ram_duration_target(0, VQ2), + variable_queue_set_ram_duration_target(0, VQ2), %% one segment in q3, and half a segment in delta [{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, {q3, SegmentSize}, {len, SegmentSize + HalfSegment}]), - VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), + VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3), VQ5 = check_variable_queue_status( variable_queue_publish(true, 1, VQ4), %% one alpha, but it's in the same segment as the deltas @@ -2826,17 +2831,21 @@ check_variable_queue_status(VQ0, Props) -> VQ1. variable_queue_wait_for_shuffling_end(VQ) -> - case rabbit_variable_queue:needs_timeout(VQ) of + case credit_flow:blocked() of false -> VQ; - _ -> variable_queue_wait_for_shuffling_end( - rabbit_variable_queue:timeout(VQ)) + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:resume(VQ)) + end end. test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), VQ1 = variable_queue_publish(true, Count, VQ0), VQ2 = variable_queue_publish(false, Count, VQ1), - VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2), + VQ3 = variable_queue_set_ram_duration_target(0, VQ2), {VQ4, _AckTags} = variable_queue_fetch(Count, true, false, Count + Count, VQ3), {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, @@ -2846,13 +2855,13 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), Count1 = rabbit_variable_queue:len(VQ8), VQ9 = variable_queue_publish(false, 1, VQ8), - VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), + VQ10 = variable_queue_set_ram_duration_target(0, VQ9), {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), {VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11), VQ12. test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> - VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ1 = variable_queue_set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), {_Guids, VQ4} = diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 995c7319..d0336de1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,10 +18,10 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/2, fetchwhile/4, - fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, - is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1, + dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, + ackfold/4, fold/3, len/1, is_empty/1, depth/1, + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, + handle_pre_hibernate/1, resume/1, msg_rates/1, status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -156,21 +156,19 @@ %% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as %% the target_ram_count shrinks to 0, so must betas and gammas. %% -%% The conversion of betas to gammas is done in batches of exactly +%% The conversion of betas to gammas is done in batches of at least %% ?IO_BATCH_SIZE. This value should not be too small, otherwise the %% frequent operations on the queues of q2 and q3 will not be %% effectively amortised (switching the direction of queue access -%% defeats amortisation), nor should it be too big, otherwise -%% converting a batch stalls the queue for too long. Therefore, it -%% must be just right. +%% defeats amortisation). Note that there is a natural upper bound due +%% to credit_flow limits on the alpha to beta conversion. %% -%% The conversion from alphas to betas is also chunked, but only to -%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at -%% any one time. This further smooths the effects of changes to the -%% target_ram_count and ensures the queue remains responsive -%% even when there is a large amount of IO work to do. The -%% timeout callback is utilised to ensure that conversions are -%% done as promptly as possible whilst ensuring the queue remains +%% The conversion from alphas to betas is chunked due to the +%% credit_flow limits of the msg_store. This further smooths the +%% effects of changes to the target_ram_count and ensures the queue +%% remains responsive even when there is a large amount of IO work to +%% do. The 'resume' callback is utilised to ensure that conversions +%% are done as promptly as possible whilst ensuring the queue remains %% responsive. %% %% In the queue we keep track of both messages that are pending @@ -196,13 +194,7 @@ %% The order in which alphas are pushed to betas and pending acks %% are pushed to disk is determined dynamically. We always prefer to %% push messages for the source (alphas or acks) that is growing the -%% fastest (with growth measured as avg. ingress - avg. egress). In -%% each round of memory reduction a chunk of messages at most -%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The -%% fastest growing source will be reduced by as much of this chunk as -%% possible. If there is any remaining allocation in the chunk after -%% the first source has been reduced to zero, the second source will -%% be reduced by as much of the remaining chunk as possible. +%% fastest (with growth measured as avg. ingress - avg. egress). %% %% Notes on Clean Shutdown %% (This documents behaviour in variable_queue, queue_index and @@ -299,13 +291,10 @@ end_seq_id %% end_seq_id is exclusive }). -%% When we discover, on publish, that we should write some indices to -%% disk for some betas, the IO_BATCH_SIZE sets the number of betas -%% that we must be due to write indices for before we do any work at -%% all. This is both a minimum and a maximum - we don't write fewer -%% than IO_BATCH_SIZE indices out in one go, and we don't write more - -%% we can always come back on the next publish to do more. --define(IO_BATCH_SIZE, 64). +%% When we discover that we should write some indices to disk for some +%% betas, the IO_BATCH_SIZE sets the number of betas that we must be +%% due to write indices for before we do any work at all. +-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(QUEUE, lqueue). @@ -813,30 +802,21 @@ ram_duration(State) -> {Duration, State1}. -needs_timeout(State = #vqstate { index_state = IndexState, - target_ram_count = TargetRamCount }) -> +needs_timeout(#vqstate { index_state = IndexState }) -> case rabbit_queue_index:needs_sync(IndexState) of - confirms -> timed; - other -> idle; - false when TargetRamCount == infinity -> false; - false -> case reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end + confirms -> timed; + other -> idle; + false -> false end. timeout(State = #vqstate { index_state = IndexState }) -> - IndexState1 = rabbit_queue_index:sync(IndexState), - State1 = State #vqstate { index_state = IndexState1 }, - a(reduce_memory_use(State1)). + State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }. handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. +resume(State) -> a(reduce_memory_use(State)). + msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> {AvgIngressRate, AvgEgressRate}. @@ -1553,27 +1533,9 @@ ifold(Fun, Acc, Its, State) -> %% Phase changes %%---------------------------------------------------------------------------- -%% Determine whether a reduction in memory use is necessary, and call -%% functions to perform the required phase changes. The function can -%% also be used to just do the former, by passing in dummy phase -%% change functions. -%% -%% The function does not report on any needed beta->delta conversions, -%% though the conversion function for that is called as necessary. The -%% reason is twofold. Firstly, this is safe because the conversion is -%% only ever necessary just after a transition to a -%% target_ram_count of zero or after an incremental alpha->beta -%% conversion. In the former case the conversion is performed straight -%% away (i.e. any betas present at the time are converted to deltas), -%% and in the latter case the need for a conversion is flagged up -%% anyway. Secondly, this is necessary because we do not have a -%% precise and cheap predicate for determining whether a beta->delta -%% conversion is necessary - due to the complexities of retaining up -%% one segment's worth of messages in q3 - and thus would risk -%% perpetually reporting the need for a conversion when no such -%% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, - State = #vqstate { +reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> + State; +reduce_memory_use(State = #vqstate { ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, @@ -1582,28 +1544,38 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, ack_in = AvgAckIngress, ack_out = AvgAckEgress } }) -> - {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = + State1 = #vqstate { q2 = Q2, q3 = Q3 } = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> {false, State}; + 0 -> State; %% Reduce memory of pending acks and alphas. The order is %% determined based on which is growing faster. Whichever %% comes second may very well get a quota of 0 if the %% first manages to push out the max number of messages. S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > (AvgIngress - AvgEgress)) of - true -> [AckFun, AlphaBetaFun]; - false -> [AlphaBetaFun, AckFun] + true -> [fun limit_ram_acks/2, + fun push_alphas_to_betas/2]; + false -> [fun push_alphas_to_betas/2, + fun limit_ram_acks/2] end, {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) end, {S1, State}, Funs), - {true, State2} + State2 end, case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), permitted_beta_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; - _ -> {Reduce, State1} + S2 when S2 >= ?IO_BATCH_SIZE -> + %% There is an implicit, but subtle, upper bound here. We + %% may shuffle a lot of messages from Q2/3 into delta, but + %% the number of these that require any disk operation, + %% namely index writing, i.e. messages that are genuine + %% betas and not gammas, is bounded by the credit_flow + %% limiting of the alpha->beta conversion above. + push_betas_to_deltas(S2, State1); + _ -> + State1 end. limit_ram_acks(0, State) -> @@ -1623,15 +1595,6 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA1 }) end. -reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> - State; -reduce_memory_use(State) -> - {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, - fun push_betas_to_deltas/2, - fun limit_ram_acks/2, - State), - State1. - permitted_beta_count(#vqstate { len = 0 }) -> infinity; permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> @@ -1649,7 +1612,7 @@ chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> 0; chunk_size(Current, Permitted) -> - lists:min([Current - Permitted, ?IO_BATCH_SIZE]). + Current - Permitted. fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, @@ -1755,17 +1718,22 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, TargetRamCount >= RamMsgCount -> {Quota, State}; push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> - case Generator(Q) of - {empty, _Q} -> - {Quota, State}; - {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, - State1 = #vqstate { ram_msg_count = RamMsgCount }} = - maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 }, - push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, - Consumer(MsgStatus2, Qa, State2)) + case credit_flow:blocked() of + true -> {Quota, State}; + false -> case Generator(Q) of + {empty, _Q} -> + {Quota, State}; + {{value, MsgStatus}, Qa} -> + {MsgStatus1 = #msg_status { msg_on_disk = true }, + State1 = #vqstate { ram_msg_count = RamMsgCount }} = + maybe_write_to_disk(true, false, MsgStatus, State), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + State2 = Consumer(MsgStatus2, Qa, + State1 #vqstate { + ram_msg_count = RamMsgCount - 1 }), + push_alphas_to_betas(Generator, Consumer, Quota - 1, + Qa, State2) + end end. push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, |