summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-11 16:47:12 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-11 16:47:12 +0100
commit00096f34cd13fd52a8870d2b329b1eea3aaaebc3 (patch)
treea9c25437687107a955f3d776c1a07c49e14411cc
parentd2b1d95293fab98b23a64aa7434c870398fee968 (diff)
downloadrabbitmq-server-00096f34cd13fd52a8870d2b329b1eea3aaaebc3.tar.gz
When under memory pressure, we want to limit the size of q2 and q3, not just of ?s. Simplify calculation of permitted length. Simplify (and correct) expansion of ?. We can now completely drop ram_index_count as we never care about just the number of ?s.
-rw-r--r--src/rabbit_tests.erl7
-rw-r--r--src/rabbit_variable_queue.erl286
2 files changed, 132 insertions, 161 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 606c4fe8..44c9b499 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2189,12 +2189,7 @@ test_variable_queue_requeue(VQ0) ->
Seq = lists:seq(1, Count),
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, Count, VQ1),
- {VQ3, Acks} = lists:foldl(
- fun (_N, {VQN, AckTags}) ->
- {{#basic_message{}, false, AckTag, _}, VQM} =
- rabbit_variable_queue:fetch(true, VQN),
- {VQM, [AckTag | AckTags]}
- end, {VQ2, []}, Seq),
+ {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2),
Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
[Ack | Acc];
(_, Acc) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 03004e10..60c3dfd2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -248,7 +248,6 @@
ram_msg_count,
ram_msg_count_prev,
ram_ack_count_prev,
- ram_index_count,
out_counter,
in_counter,
rates,
@@ -336,7 +335,6 @@
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
- ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
@@ -490,7 +488,6 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
- ram_index_count = 0,
persistent_count = PCount1 })}.
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
@@ -718,6 +715,7 @@ needs_timeout(State) ->
fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
fun (_Quota, State1) -> {0, State1} end,
+ fun null_gamma_delta/1,
State) of
{true, _State} -> idle;
{false, _State} -> false
@@ -725,6 +723,21 @@ needs_timeout(State) ->
true -> timed
end.
+null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) ->
+ {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2),
+ fun (SeqId) -> SeqId end) orelse
+ null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3),
+ fun rabbit_queue_index:next_segment_boundary/1),
+ State}.
+
+null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1,
+ index_on_disk = true }}, _Q},
+ {{value, #msg_status { seq_id = SeqId2 }}, _Q2},
+ LimitFun) ->
+ SeqId1 >= LimitFun(SeqId2);
+null_gamma_delta_msg(_, _, _) ->
+ false.
+
timeout(State) ->
a(reduce_memory_use(confirm_commit_index(State))).
@@ -738,7 +751,6 @@ status(#vqstate {
ram_ack_index = RAI,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
rates = #rates { avg_egress = AvgEgressRate,
@@ -755,7 +767,6 @@ status(#vqstate {
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
- {ram_index_count , RamIndexCount},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -774,10 +785,9 @@ discard(_Msg, _ChPid, State) -> State.
%%----------------------------------------------------------------------------
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- persistent_count = PersistentCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }) ->
+ len = Len,
+ persistent_count = PersistentCount,
+ ram_msg_count = RamMsgCount }) ->
E1 = ?QUEUE:is_empty(Q1),
E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
@@ -793,7 +803,6 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
- true = RamIndexCount >= 0,
State.
@@ -910,44 +919,25 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
{Filtered, rabbit_queue_index:ack(
Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
-%% the first arg is the older delta
-combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
- ?BLANK_DELTA;
-combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start,
- count = Count,
- end_seq_id = End } = B) ->
- true = Start + Count =< End, %% ASSERTION
- B;
-combine_deltas(#delta { start_seq_id = Start,
- count = Count,
- end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) ->
- true = Start + Count =< End, %% ASSERTION
- A;
-combine_deltas(#delta { start_seq_id = StartLow,
- count = CountLow,
- end_seq_id = EndLow },
- #delta { start_seq_id = StartHigh,
- count = CountHigh,
- end_seq_id = EndHigh }) ->
- Count = CountLow + CountHigh,
- true = (StartLow =< StartHigh) %% ASSERTIONS
- andalso ((StartLow + CountLow) =< EndLow)
- andalso ((StartHigh + CountHigh) =< EndHigh)
- andalso ((StartLow + Count) =< EndHigh),
- #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
-
-expand_delta(SeqId, Delta) ->
- DeltaInc = #delta { start_seq_id = SeqId,
- count = 1,
- end_seq_id = SeqId + 1 },
- case Delta of
- ?BLANK_DELTA ->
- DeltaInc;
- #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId ->
- combine_deltas(DeltaInc, Delta);
- #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId ->
- combine_deltas(Delta, DeltaInc)
- end.
+expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
+ #delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 };
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta)
+ when SeqId < StartSeqId ->
+ true = StartSeqId + Count =< EndSeqId, %% ASSERTION
+ Delta #delta { start_seq_id = SeqId, count = Count + 1 };
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta)
+ when SeqId >= EndSeqId ->
+ true = StartSeqId + Count =< EndSeqId, %% ASSERTION
+ Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 };
+expand_delta(_SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta) ->
+ true = StartSeqId + Count + 1 =< EndSeqId, %% ASSERTION
+ Delta #delta { count = Count + 1 }.
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -992,7 +982,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
- ram_index_count = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rate(Now, DeltaCount1),
@@ -1012,12 +1001,10 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
- State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
+in_r(MsgStatus = #msg_status { msg = undefined },
+ State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
- true -> State #vqstate {
- q3 = ?QUEUE:in_r(MsgStatus, Q3),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
+ true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
@@ -1335,14 +1322,12 @@ publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
-publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) ->
- {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1,
- #vqstate { ram_index_count = RamIndexCount,
- ram_msg_count = RamMsgCount } = State1} =
- maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
+publish_beta(MsgStatus, State) ->
+ {#msg_status { msg = Msg} = MsgStatus1,
+ #vqstate { ram_msg_count = RamMsgCount } = State1} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
{MsgStatus1, State1 #vqstate {
- ram_msg_count = RamMsgCount + one_if(Msg =/= undefined),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}.
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) ->
@@ -1375,14 +1360,10 @@ delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
{Delta, MsgIds, State};
delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
- {#msg_status { msg_id = MsgId,
- index_on_disk = IndexOnDisk,
- msg_on_disk = MsgOnDisk} = MsgStatus,
- State1} =
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
msg_from_pending_ack(SeqId, MsgPropsFun, State0),
{_MsgStatus, State2} =
- maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
- MsgStatus, State1),
+ maybe_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
@@ -1426,10 +1407,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
+reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun,
State = #vqstate {target_ram_count = infinity}) ->
{false, State};
-reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
+reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun,
State = #vqstate {
ram_ack_index = RamAckIndex,
ram_msg_count = RamMsgCount,
@@ -1440,7 +1421,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
avg_egress = AvgAckEgress }
}) ->
- {Reduce, State1} =
+ {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
@@ -1461,15 +1442,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
{true, State2}
end,
- %% AlphaBetaFun may have produced gammas that are bordering
- %% delta. We must ensure that we push these into delta, which is
- %% largely a no-op. This is why we call BetaDeltaFun even with a
- %% quota of 0.
- case chunk_size(State1 #vqstate.ram_index_count,
- permitted_beta_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
- _ -> {Reduce, BetaDeltaFun(0, State1)}
- end.
+ {Reduce1, State3} =
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end,
+ {Reduce2, State4} = GammaDeltaFun(State3),
+ {Reduce1 orelse Reduce2, State4}.
limit_ram_acks(0, State) ->
{0, State};
@@ -1494,23 +1474,20 @@ reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
fun limit_ram_acks/2,
+ fun push_gammas_to_deltas/1,
State),
State1.
permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
+permitted_beta_count(#vqstate { target_ram_count = 0 }) ->
+ rabbit_queue_index:next_segment_boundary(0);
permitted_beta_count(#vqstate { len = Len,
q1 = Q1,
- q3 = Q3,
q4 = Q4 }) ->
BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
- Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len),
- case ?QUEUE:out(Q3) of
- {empty, _Q3} -> Permitted;
- {{value, #msg_status { seq_id = MinSeqId }}, _Q3} ->
- lists:max([Permitted, rabbit_queue_index:next_segment_boundary(
- MinSeqId) - MinSeqId])
- end.
+ lists:max([BetaDeltaLen - ((BetaDeltaLen * BetaDeltaLen) div Len),
+ rabbit_queue_index:next_segment_boundary(0)]).
chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
@@ -1518,41 +1495,35 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
-fetch_from_q3(State = #vqstate {
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4,
- ram_index_count = RamIndexCount}) ->
+fetch_from_q3(State = #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4 }) ->
case ?QUEUE:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} ->
- RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
- true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3a,
- ram_index_count = RamIndexCount1 },
- State2 =
- case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
- {true, true} ->
- %% q3 is now empty, it wasn't before; delta is
- %% still empty. So q2 must be empty, and we
- %% know q4 is empty otherwise we wouldn't be
- %% loading from q3. As such, we can just set
- %% q4 to Q1.
- true = ?QUEUE:is_empty(Q2), %% ASSERTION
- true = ?QUEUE:is_empty(Q4), %% ASSERTION
- State1 #vqstate { q1 = ?QUEUE:new(),
- q4 = Q1 };
- {true, false} ->
- maybe_deltas_to_betas(State1);
- {false, _} ->
- %% q3 still isn't empty, we've not touched
- %% delta, so the invariants between q1, q2,
- %% delta and q3 are maintained
- State1
- end,
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
+ {true, true} ->
+ %% q3 is now empty, it wasn't before;
+ %% delta is still empty. So q2 must be
+ %% empty, and we know q4 is empty
+ %% otherwise we wouldn't be loading from
+ %% q3. As such, we can just set q4 to Q1.
+ true = ?QUEUE:is_empty(Q2), %% ASSERTION
+ true = ?QUEUE:is_empty(Q4), %% ASSERTION
+ State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
+ {true, false} ->
+ maybe_deltas_to_betas(State1);
+ {false, _} ->
+ %% q3 still isn't empty, we've not
+ %% touched delta, so the invariants
+ %% between q1, q2, delta and q3 are
+ %% maintained
+ State1
+ end,
{loaded, {MsgStatus, State2}}
end.
@@ -1639,42 +1610,35 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }} =
+ {MsgStatus1 = #msg_status { msg_on_disk = true },
+ State1 = #vqstate { ram_msg_count = RamMsgCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
- ram_index_count = RamIndexCount1 },
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
Consumer(MsgStatus2, Qa, State2))
end.
-push_betas_to_deltas(Quota,
- State = #vqstate { q2 = Q2,
- delta = Delta,
- q3 = Q3,
- index_state = IndexState,
- ram_index_count = RamIndexCount }) ->
- PushState = {Quota, Delta, RamIndexCount, IndexState},
- {Q2a, PushState1} = push_betas_to_deltas(
+push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState }) ->
+ PushState = {Quota, Delta, IndexState},
+ {Q2a, PushState1} = push_with_limit(
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, PushState),
- {Q3a, PushState2} = push_betas_to_deltas(
+ Q2, fun push_betas_to_deltas1/4, PushState),
+ {Q3a, PushState2} = push_with_limit(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
- Q3, PushState1),
- {_, Delta1, RamIndexCount1, IndexState1} = PushState2,
- State #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a,
- index_state = IndexState1,
- ram_index_count = RamIndexCount1 }.
-
-push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
+ Q3, fun push_betas_to_deltas1/4, PushState1),
+ {_, Delta1, IndexState1} = PushState2,
+ State #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a,
+ index_state = IndexState1 }.
+
+push_with_limit(Generator, LimitFun, Q, PushFun, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
{Q, PushState};
@@ -1684,16 +1648,15 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Q, PushState};
- false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
+ false -> PushFun(Generator, Limit, Q, PushState)
end
end.
+push_betas_to_deltas1(_Generator, _Limit, Q,
+ {0, _Delta, _IndexState} = PushState) ->
+ {Q, PushState};
push_betas_to_deltas1(Generator, Limit, Q,
- {0, Delta, RamIndexCount, IndexState}) ->
- {Qb, Delta1} = push_gammas_to_deltas(Generator, Limit, Q, Delta),
- {Qb, {0, Delta1, RamIndexCount, IndexState}};
-push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, RamIndexCount, IndexState} = PushState) ->
+ {Quota, Delta, IndexState} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
{Q, PushState};
@@ -1702,20 +1665,33 @@ push_betas_to_deltas1(Generator, Limit, Q,
{Q, PushState};
{{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk,
seq_id = SeqId }}, Qa} ->
- {Quota1, RamIndexCount1, IndexState1} =
+ {Quota1, IndexState1} =
case IndexOnDisk of
- true -> {Quota, RamIndexCount, IndexState};
+ true -> {Quota, IndexState};
false -> {#msg_status { index_on_disk = true },
IndexState2} =
maybe_write_index_to_disk(true, MsgStatus,
IndexState),
- {Quota - 1, RamIndexCount - 1, IndexState2}
+ {Quota - 1, IndexState2}
end,
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota1, Delta1, RamIndexCount1, IndexState1})
+ {Quota1, Delta1, IndexState1})
end.
+push_gammas_to_deltas(State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3 }) ->
+ {Q2a, Delta1} = push_with_limit(
+ fun ?QUEUE:out/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q2, fun push_gammas_to_deltas/4, Delta),
+ {Q3a, Delta2} = push_with_limit(
+ fun ?QUEUE:out_r/1,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ Q3, fun push_gammas_to_deltas/4, Delta1),
+ {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}.
+
push_gammas_to_deltas(Generator, Limit, Q, Delta) ->
case Generator(Q) of
{{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1}