diff options
authorMatthias Radestock <>2011-10-03 00:24:19 +0100
committerMatthias Radestock <>2011-10-03 00:24:19 +0100
commit3682f8b880e166d4e984296c4be45263110685cc (patch)
parent9da0de717faac8b460af76ab47f867b393fa6d55 (diff)
simplify delta calculation in push_betas_to_deltas
1 files changed, 47 insertions, 65 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 511c10b4..45c6a17f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -914,6 +914,19 @@ combine_deltas(#delta { start_seq_id = StartLow,
andalso ((StartLow + Count) =< EndHigh),
#delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
+expand_delta(SeqId, Delta) ->
+ DeltaInc = #delta { start_seq_id = SeqId,
+ count = 1,
+ end_seq_id = SeqId + 1 },
+ case Delta of
+ DeltaInc;
+ #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId ->
+ combine_deltas(DeltaInc, Delta);
+ #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId ->
+ combine_deltas(Delta, DeltaInc)
+ end.
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
{1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}.
@@ -1366,10 +1379,7 @@ queue_merge(SeqIds, Q, Front, MsgIds,
delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
{Delta, MsgIds, State};
-delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
- count = Count,
- end_seq_id = EndSeqId} = Delta,
- MsgIds, MsgPropsFun, State) ->
+delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
{#msg_status { msg_id = MsgId,
index_on_disk = IndexOnDisk,
@@ -1379,11 +1389,7 @@ delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
{_MsgStatus, State2} =
maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
MsgStatus, State1),
- {Delta0 #delta {
- start_seq_id = lists:min([SeqId, StartSeqId]),
- count = Count + 1,
- end_seq_id = lists:max([SeqId + 1, EndSeqId]) },
- [MsgId | MsgIds0], State2}
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
@@ -1400,7 +1406,7 @@ beta_limit(Q) ->
{empty, _Q} -> undefined
-delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
@@ -1654,74 +1660,50 @@ push_betas_to_deltas(Quota,
q3 = Q3,
index_state = IndexState,
ram_index_count = RamIndexCount }) ->
- Q2DeltaFun = fun (undefined, undefined, 0) -> ?BLANK_DELTA;
- (Low, High, Len) when High >= Low ->
- #delta { start_seq_id = Low,
- count = Len,
- end_seq_id = High + 1 }
- end,
- Q3DeltaFun = fun (undefined, undefined, 0) -> ?BLANK_DELTA;
- (High, Low, Len) when High >= Low ->
- #delta { start_seq_id = Low,
- count = Len,
- end_seq_id = High + 1 }
- end,
- {Quota1, Delta2, Q2a, RamIndexCount2, IndexState2} =
- push_betas_to_deltas(Q2DeltaFun,
- fun (Q2MinSeqId) -> Q2MinSeqId end,
- fun ?QUEUE:out/1,
- Quota, Q2, RamIndexCount, IndexState),
- {_Quota2, Delta3, Q3a, RamIndexCount3, IndexState3} =
- push_betas_to_deltas(Q3DeltaFun,
- fun rabbit_queue_index:next_segment_boundary/1,
- fun ?QUEUE:out_r/1,
- Quota1, Q3, RamIndexCount2, IndexState2),
- Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
+ PushState = {Quota, Delta, RamIndexCount, IndexState},
+ {Q2a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q2, PushState),
+ {Q3a, PushState2} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ Q3, PushState1),
+ {_, Delta1, RamIndexCount1, IndexState1} = PushState2,
State #vqstate { q2 = Q2a,
- delta = Delta4,
+ delta = Delta1,
q3 = Q3a,
- index_state = IndexState3,
- ram_index_count = RamIndexCount3 }.
-push_betas_to_deltas(_DeltaFun, _LimitFun, _Generator, 0, Q, RamIndexCount,
- IndexState) ->
- {0, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
-push_betas_to_deltas(DeltaFun, LimitFun, Generator, Quota, Q, RamIndexCount,
- IndexState) ->
+ index_state = IndexState1,
+ ram_index_count = RamIndexCount1 }.
+push_betas_to_deltas(_Generator, _LimitFun, Q,
+ {0, _Delta, _RamIndexCount, _IndexState} = PushState) ->
+ {Q, PushState};
+push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
- {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
+ {Q, PushState};
false ->
{{value, #msg_status { seq_id = MinSeqId }}, _Qa} = ?QUEUE:out(Q),
{{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q),
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
- true -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
- false -> {FirstSeqId, LastSeqId, Len, Qc,
- RamIndexCount1, IndexState1} =
- push_betas_to_deltas(
- Generator, Limit, undefined, undefined, 0, Quota,
- Q, RamIndexCount, IndexState),
- {Quota - Len, DeltaFun(FirstSeqId, LastSeqId, Len),
- Qc, RamIndexCount1, IndexState1}
+ true -> {Q, PushState};
+ false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
-push_betas_to_deltas(_Generator, _Limit, FirstSeqId, LastSeqId, Count, 0, Q,
- RamIndexCount, IndexState) ->
- {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState};
-push_betas_to_deltas(Generator, Limit, FirstSeqId, LastSeqId, Count, Quota, Q,
- RamIndexCount, IndexState) ->
+push_betas_to_deltas1(_Generator, _Limit, Q,
+ {0, _Delta, _RamIndexCount, _IndexState} = PushState) ->
+ {Q, PushState};
+push_betas_to_deltas1(Generator, Limit, Q,
+ {Quota, Delta, RamIndexCount, IndexState} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
- {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState};
+ {Q, PushState};
{{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
- {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState};
- {{value, #msg_status { seq_id = SeqId }}, _Qa}
- when FirstSeqId =:= undefined ->
- push_betas_to_deltas(Generator, Limit, SeqId, SeqId, Count, Quota,
- Q, RamIndexCount, IndexState);
+ {Q, PushState};
{{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk,
seq_id = SeqId }}, Qa} ->
{Quota1, RamIndexCount1, IndexState1} =
@@ -1733,9 +1715,9 @@ push_betas_to_deltas(Generator, Limit, FirstSeqId, LastSeqId, Count, Quota, Q,
{Quota - 1, RamIndexCount - 1, IndexState2}
- push_betas_to_deltas(
- Generator, Limit, FirstSeqId, SeqId, Count + 1, Quota1, Qa,
- RamIndexCount1, IndexState1)
+ Delta1 = expand_delta(SeqId, Delta),
+ push_betas_to_deltas1(Generator, Limit, Qa,
+ {Quota1, Delta1, RamIndexCount1, IndexState1})