summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_backing_queue_qc.erl21
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_tests.erl75
-rw-r--r--src/rabbit_variable_queue.erl156
6 files changed, 136 insertions, 132 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d415b358..1bb16edb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1241,9 +1241,10 @@ handle_info(timeout, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info({bump_credit, Msg}, State) ->
+handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
credit_flow:handle_bump_msg(Msg),
- noreply(State);
+ noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 3d88be7a..cae939ba 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -209,6 +209,9 @@
%% Called immediately before the queue hibernates.
-callback handle_pre_hibernate(state()) -> state().
+%% Called when more credit has become available for credit_flow.
+-callback resume(state()) -> state().
+
%% Used to help prioritisation in rabbit_amqqueue_process. The rate of
%% inbound messages and outbound messages at the moment.
-callback msg_rates(state()) -> {float(), float()}.
@@ -240,7 +243,7 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1},
+ {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1},
{invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index b0545915..2adff7e5 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -34,7 +34,8 @@
-export([initial_state/0, command/1, precondition/2, postcondition/3,
next_state/3]).
--export([prop_backing_queue_test/0, publish_multiple/1, timeout/2]).
+-export([prop_backing_queue_test/0, publish_multiple/1,
+ timeout/2, bump_credit/1]).
-record(state, {bqstate,
len, %% int
@@ -106,6 +107,7 @@ command(S) ->
{1, qc_dropwhile(S)},
{1, qc_is_empty(S)},
{1, qc_timeout(S)},
+ {1, qc_bump_credit(S)},
{1, qc_purge(S)},
{1, qc_fold(S)}]).
@@ -155,6 +157,9 @@ qc_is_empty(#state{bqstate = BQ}) ->
qc_timeout(#state{bqstate = BQ}) ->
{call, ?MODULE, timeout, [BQ, ?TIMEOUT_LIMIT]}.
+qc_bump_credit(#state{bqstate = BQ}) ->
+ {call, ?MODULE, bump_credit, [BQ]}.
+
qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
@@ -177,6 +182,8 @@ precondition(_S, {call, ?BQMOD, _Fun, _Arg}) ->
true;
precondition(_S, {call, ?MODULE, timeout, _Arg}) ->
true;
+precondition(_S, {call, ?MODULE, bump_credit, _Arg}) ->
+ true;
precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
Len < ?QUEUE_MAXLEN.
@@ -272,6 +279,8 @@ next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
S#state{bqstate = BQ};
+next_state(S, BQ, {call, ?MODULE, bump_credit, _Args}) ->
+ S#state{bqstate = BQ};
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
@@ -354,6 +363,16 @@ timeout(BQ, AtMost) ->
_ -> timeout(?BQMOD:timeout(BQ), AtMost - 1)
end.
+bump_credit(BQ) ->
+ case credit_flow:blocked() of
+ false -> BQ;
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ ?BQMOD:resume(BQ)
+ end
+ end.
+
qc_message_payload() -> ?SIZED(Size, resize(Size * Size, binary())).
qc_routing_key() -> noshrink(binary(10)).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 7bf6bd4a..b5ae1aca 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -21,7 +21,7 @@
discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1,
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
msg_rates/1, status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -353,6 +353,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+resume(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:resume(BQS) }.
+
msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:msg_rates(BQS).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ad4cd560..1552d1ec 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2413,18 +2413,19 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
fun (_N) -> <<>> end, VQ).
variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
- lists:foldl(
- fun (N, VQN) ->
- rabbit_variable_queue:publish(
- rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = case IsPersistent of
- true -> 2;
- false -> 1
- end},
- PayloadFun(N)),
- PropFun(N, #message_properties{}), false, self(), VQN)
- end, VQ, lists:seq(Start, Start + Count - 1)).
+ variable_queue_wait_for_shuffling_end(
+ lists:foldl(
+ fun (N, VQN) ->
+ rabbit_variable_queue:publish(
+ rabbit_basic:message(
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = case IsPersistent of
+ true -> 2;
+ false -> 1
+ end},
+ PayloadFun(N)),
+ PropFun(N, #message_properties{}), false, self(), VQN)
+ end, VQ, lists:seq(Start, Start + Count - 1))).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
@@ -2436,6 +2437,10 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
+variable_queue_set_ram_duration_target(Duration, VQ) ->
+ variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:set_ram_duration_target(Duration, VQ)).
+
assert_prop(List, Prop, Value) ->
Value = proplists:get_value(Prop, List).
@@ -2550,10 +2555,10 @@ requeue_one_by_one(Acks, VQ) ->
%% Create a vq with messages in q1, delta, and q3, and holes (in the
%% form of pending acks) in the latter two.
variable_queue_with_holes(VQ0) ->
- Interval = 64,
+ Interval = 2048, %% should match vq:IO_BATCH_SIZE
Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval,
Seq = lists:seq(1, Count),
- VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ1 = variable_queue_set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(
false, 1, Count,
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
@@ -2567,12 +2572,12 @@ variable_queue_with_holes(VQ0) ->
{_MsgIds, VQ4} = rabbit_variable_queue:requeue(
Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3),
VQ5 = requeue_one_by_one(Subset1, VQ4),
- %% by now we have some messages (and holes) in delt
+ %% by now we have some messages (and holes) in delta
VQ6 = requeue_one_by_one(Subset2, VQ5),
- VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6),
+ VQ7 = variable_queue_set_ram_duration_target(infinity, VQ6),
%% add the q1 tail
VQ8 = variable_queue_publish(
- true, Count + 1, 64,
+ true, Count + 1, Interval,
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
%% assertions
[false = case V of
@@ -2581,11 +2586,11 @@ variable_queue_with_holes(VQ0) ->
_ -> false
end || {K, V} <- rabbit_variable_queue:status(VQ8),
lists:member(K, [q1, delta, q3])],
- Depth = Count + 64,
+ Depth = Count + Interval,
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
- {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}.
+ {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}.
test_variable_queue_requeue(VQ0) ->
{_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
@@ -2608,11 +2613,11 @@ test_variable_queue_requeue(VQ0) ->
%% requeue from ram_pending_ack into q3, move to delta and then empty queue
test_variable_queue_requeue_ram_beta(VQ0) ->
Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2,
- VQ1 = rabbit_tests:variable_queue_publish(false, Count, VQ0),
+ VQ1 = variable_queue_publish(false, Count, VQ0),
{VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
{Back, Front} = lists:split(Count div 2, AcksR),
{_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2),
- VQ4 = rabbit_variable_queue:set_ram_duration_target(0, VQ3),
+ VQ4 = variable_queue_set_ram_duration_target(0, VQ3),
{_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4),
VQ6 = requeue_one_by_one(Front, VQ5),
{VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6),
@@ -2655,7 +2660,7 @@ test_variable_queue_ack_limiting(VQ0) ->
%% ensure all acks go to disk on 0 duration target
VQ6 = check_variable_queue_status(
- rabbit_variable_queue:set_ram_duration_target(0, VQ5),
+ variable_queue_set_ram_duration_target(0, VQ5),
[{len, Len div 2},
{target_ram_count, 0},
{ram_msg_count, 0},
@@ -2738,9 +2743,9 @@ test_fetchwhile_varying_ram_duration(VQ0) ->
test_dropfetchwhile_varying_ram_duration(Fun, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
+ VQ2 = variable_queue_set_ram_duration_target(0, VQ1),
VQ3 = Fun(VQ2),
- VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
+ VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
VQ6 = Fun(VQ5),
VQ6.
@@ -2761,7 +2766,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
{_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
io:format("~p:~n~p~n",
[Duration1, rabbit_variable_queue:status(VQ5)]),
- VQ6 = rabbit_variable_queue:set_ram_duration_target(
+ VQ6 = variable_queue_set_ram_duration_target(
Duration1, VQ5),
publish_fetch_and_ack(Churn, Len, VQ6)
end, VQ3, [Duration / 4, 0, Duration / 4, infinity]),
@@ -2789,12 +2794,12 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0),
{_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1),
VQ3 = check_variable_queue_status(
- rabbit_variable_queue:set_ram_duration_target(0, VQ2),
+ variable_queue_set_ram_duration_target(0, VQ2),
%% one segment in q3, and half a segment in delta
[{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}},
{q3, SegmentSize},
{len, SegmentSize + HalfSegment}]),
- VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
+ VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3),
VQ5 = check_variable_queue_status(
variable_queue_publish(true, 1, VQ4),
%% one alpha, but it's in the same segment as the deltas
@@ -2826,17 +2831,21 @@ check_variable_queue_status(VQ0, Props) ->
VQ1.
variable_queue_wait_for_shuffling_end(VQ) ->
- case rabbit_variable_queue:needs_timeout(VQ) of
+ case credit_flow:blocked() of
false -> VQ;
- _ -> variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:timeout(VQ))
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:resume(VQ))
+ end
end.
test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
VQ1 = variable_queue_publish(true, Count, VQ0),
VQ2 = variable_queue_publish(false, Count, VQ1),
- VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2),
+ VQ3 = variable_queue_set_ram_duration_target(0, VQ2),
{VQ4, _AckTags} = variable_queue_fetch(Count, true, false,
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
@@ -2846,13 +2855,13 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
- VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
+ VQ10 = variable_queue_set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
{VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11),
VQ12.
test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
- VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ1 = variable_queue_set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
{_Guids, VQ4} =
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 995c7319..d0336de1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,10 +18,10 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/2, fetchwhile/4,
- fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
- is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1,
+ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
+ ackfold/4, fold/3, len/1, is_empty/1, depth/1,
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
+ handle_pre_hibernate/1, resume/1, msg_rates/1,
status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -156,21 +156,19 @@
%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
%% the target_ram_count shrinks to 0, so must betas and gammas.
%%
-%% The conversion of betas to gammas is done in batches of exactly
+%% The conversion of betas to gammas is done in batches of at least
%% ?IO_BATCH_SIZE. This value should not be too small, otherwise the
%% frequent operations on the queues of q2 and q3 will not be
%% effectively amortised (switching the direction of queue access
-%% defeats amortisation), nor should it be too big, otherwise
-%% converting a batch stalls the queue for too long. Therefore, it
-%% must be just right.
+%% defeats amortisation). Note that there is a natural upper bound due
+%% to credit_flow limits on the alpha to beta conversion.
%%
-%% The conversion from alphas to betas is also chunked, but only to
-%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
-%% any one time. This further smooths the effects of changes to the
-%% target_ram_count and ensures the queue remains responsive
-%% even when there is a large amount of IO work to do. The
-%% timeout callback is utilised to ensure that conversions are
-%% done as promptly as possible whilst ensuring the queue remains
+%% The conversion from alphas to betas is chunked due to the
+%% credit_flow limits of the msg_store. This further smooths the
+%% effects of changes to the target_ram_count and ensures the queue
+%% remains responsive even when there is a large amount of IO work to
+%% do. The 'resume' callback is utilised to ensure that conversions
+%% are done as promptly as possible whilst ensuring the queue remains
%% responsive.
%%
%% In the queue we keep track of both messages that are pending
@@ -196,13 +194,7 @@
%% The order in which alphas are pushed to betas and pending acks
%% are pushed to disk is determined dynamically. We always prefer to
%% push messages for the source (alphas or acks) that is growing the
-%% fastest (with growth measured as avg. ingress - avg. egress). In
-%% each round of memory reduction a chunk of messages at most
-%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The
-%% fastest growing source will be reduced by as much of this chunk as
-%% possible. If there is any remaining allocation in the chunk after
-%% the first source has been reduced to zero, the second source will
-%% be reduced by as much of the remaining chunk as possible.
+%% fastest (with growth measured as avg. ingress - avg. egress).
%%
%% Notes on Clean Shutdown
%% (This documents behaviour in variable_queue, queue_index and
@@ -299,13 +291,10 @@
end_seq_id %% end_seq_id is exclusive
}).
-%% When we discover, on publish, that we should write some indices to
-%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
-%% that we must be due to write indices for before we do any work at
-%% all. This is both a minimum and a maximum - we don't write fewer
-%% than IO_BATCH_SIZE indices out in one go, and we don't write more -
-%% we can always come back on the next publish to do more.
--define(IO_BATCH_SIZE, 64).
+%% When we discover that we should write some indices to disk for some
+%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
+%% due to write indices for before we do any work at all.
+-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(QUEUE, lqueue).
@@ -813,30 +802,21 @@ ram_duration(State) ->
{Duration, State1}.
-needs_timeout(State = #vqstate { index_state = IndexState,
- target_ram_count = TargetRamCount }) ->
+needs_timeout(#vqstate { index_state = IndexState }) ->
case rabbit_queue_index:needs_sync(IndexState) of
- confirms -> timed;
- other -> idle;
- false when TargetRamCount == infinity -> false;
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end
+ confirms -> timed;
+ other -> idle;
+ false -> false
end.
timeout(State = #vqstate { index_state = IndexState }) ->
- IndexState1 = rabbit_queue_index:sync(IndexState),
- State1 = State #vqstate { index_state = IndexState1 },
- a(reduce_memory_use(State1)).
+ State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+resume(State) -> a(reduce_memory_use(State)).
+
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
{AvgIngressRate, AvgEgressRate}.
@@ -1553,27 +1533,9 @@ ifold(Fun, Acc, Its, State) ->
%% Phase changes
%%----------------------------------------------------------------------------
-%% Determine whether a reduction in memory use is necessary, and call
-%% functions to perform the required phase changes. The function can
-%% also be used to just do the former, by passing in dummy phase
-%% change functions.
-%%
-%% The function does not report on any needed beta->delta conversions,
-%% though the conversion function for that is called as necessary. The
-%% reason is twofold. Firstly, this is safe because the conversion is
-%% only ever necessary just after a transition to a
-%% target_ram_count of zero or after an incremental alpha->beta
-%% conversion. In the former case the conversion is performed straight
-%% away (i.e. any betas present at the time are converted to deltas),
-%% and in the latter case the need for a conversion is flagged up
-%% anyway. Secondly, this is necessary because we do not have a
-%% precise and cheap predicate for determining whether a beta->delta
-%% conversion is necessary - due to the complexities of retaining up
-%% one segment's worth of messages in q3 - and thus would risk
-%% perpetually reporting the need for a conversion when no such
-%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
- State = #vqstate {
+reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
+ State;
+reduce_memory_use(State = #vqstate {
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
@@ -1582,28 +1544,38 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) ->
- {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
+ State1 = #vqstate { q2 = Q2, q3 = Q3 } =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
- 0 -> {false, State};
+ 0 -> State;
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
%% comes second may very well get a quota of 0 if the
%% first manages to push out the max number of messages.
S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
(AvgIngress - AvgEgress)) of
- true -> [AckFun, AlphaBetaFun];
- false -> [AlphaBetaFun, AckFun]
+ true -> [fun limit_ram_acks/2,
+ fun push_alphas_to_betas/2];
+ false -> [fun push_alphas_to_betas/2,
+ fun limit_ram_acks/2]
end,
{_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
ReduceFun(QuotaN, StateN)
end, {S1, State}, Funs),
- {true, State2}
+ State2
end,
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
permitted_beta_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
- _ -> {Reduce, State1}
+ S2 when S2 >= ?IO_BATCH_SIZE ->
+ %% There is an implicit, but subtle, upper bound here. We
+ %% may shuffle a lot of messages from Q2/3 into delta, but
+ %% the number of these that require any disk operation,
+ %% namely index writing, i.e. messages that are genuine
+ %% betas and not gammas, is bounded by the credit_flow
+ %% limiting of the alpha->beta conversion above.
+ push_betas_to_deltas(S2, State1);
+ _ ->
+ State1
end.
limit_ram_acks(0, State) ->
@@ -1623,15 +1595,6 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA1 })
end.
-reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
- State;
-reduce_memory_use(State) ->
- {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
- fun push_betas_to_deltas/2,
- fun limit_ram_acks/2,
- State),
- State1.
-
permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
@@ -1649,7 +1612,7 @@ chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
0;
chunk_size(Current, Permitted) ->
- lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
+ Current - Permitted.
fetch_from_q3(State = #vqstate { q1 = Q1,
q2 = Q2,
@@ -1755,17 +1718,22 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
TargetRamCount >= RamMsgCount ->
{Quota, State};
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
- case Generator(Q) of
- {empty, _Q} ->
- {Quota, State};
- {{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true },
- State1 = #vqstate { ram_msg_count = RamMsgCount }} =
- maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
- push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
- Consumer(MsgStatus2, Qa, State2))
+ case credit_flow:blocked() of
+ true -> {Quota, State};
+ false -> case Generator(Q) of
+ {empty, _Q} ->
+ {Quota, State};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1 = #msg_status { msg_on_disk = true },
+ State1 = #vqstate { ram_msg_count = RamMsgCount }} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = Consumer(MsgStatus2, Qa,
+ State1 #vqstate {
+ ram_msg_count = RamMsgCount - 1 }),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State2)
+ end
end.
push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,