diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-03 00:24:19 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-03 00:24:19 +0100 |
commit | 3682f8b880e166d4e984296c4be45263110685cc (patch) | |
tree | d58116f4018f7abe70f4c62d6a78816faab4814f | |
parent | 9da0de717faac8b460af76ab47f867b393fa6d55 (diff) | |
download | rabbitmq-server-3682f8b880e166d4e984296c4be45263110685cc.tar.gz |
simplify delta calculation in push_betas_to_deltas
-rw-r--r-- | src/rabbit_variable_queue.erl | 112 |
1 files changed, 47 insertions, 65 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 511c10b4..45c6a17f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -914,6 +914,19 @@ combine_deltas(#delta { start_seq_id = StartLow, andalso ((StartLow + Count) =< EndHigh), #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. +expand_delta(SeqId, Delta) -> + DeltaInc = #delta { start_seq_id = SeqId, + count = 1, + end_seq_id = SeqId + 1 }, + case Delta of + ?BLANK_DELTA -> + DeltaInc; + #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId -> + combine_deltas(DeltaInc, Delta); + #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId -> + combine_deltas(Delta, DeltaInc) + end. + update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}. @@ -1366,10 +1379,7 @@ queue_merge(SeqIds, Q, Front, MsgIds, delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> {Delta, MsgIds, State}; -delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, - count = Count, - end_seq_id = EndSeqId} = Delta, - MsgIds, MsgPropsFun, State) -> +delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) -> lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> {#msg_status { msg_id = MsgId, index_on_disk = IndexOnDisk, @@ -1379,11 +1389,7 @@ delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, {_MsgStatus, State2} = maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk, MsgStatus, State1), - {Delta0 #delta { - start_seq_id = lists:min([SeqId, StartSeqId]), - count = Count + 1, - end_seq_id = lists:max([SeqId + 1, EndSeqId]) }, - [MsgId | MsgIds0], State2} + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 @@ -1400,7 +1406,7 @@ beta_limit(Q) -> {empty, _Q} -> undefined end. -delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; +delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %%---------------------------------------------------------------------------- @@ -1654,74 +1660,50 @@ push_betas_to_deltas(Quota, q3 = Q3, index_state = IndexState, ram_index_count = RamIndexCount }) -> - Q2DeltaFun = fun (undefined, undefined, 0) -> ?BLANK_DELTA; - (Low, High, Len) when High >= Low -> - #delta { start_seq_id = Low, - count = Len, - end_seq_id = High + 1 } - end, - Q3DeltaFun = fun (undefined, undefined, 0) -> ?BLANK_DELTA; - (High, Low, Len) when High >= Low -> - #delta { start_seq_id = Low, - count = Len, - end_seq_id = High + 1 } - end, - {Quota1, Delta2, Q2a, RamIndexCount2, IndexState2} = - push_betas_to_deltas(Q2DeltaFun, - fun (Q2MinSeqId) -> Q2MinSeqId end, - fun ?QUEUE:out/1, - Quota, Q2, RamIndexCount, IndexState), - {_Quota2, Delta3, Q3a, RamIndexCount3, IndexState3} = - push_betas_to_deltas(Q3DeltaFun, - fun rabbit_queue_index:next_segment_boundary/1, - fun ?QUEUE:out_r/1, - Quota1, Q3, RamIndexCount2, IndexState2), - Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), + PushState = {Quota, Delta, RamIndexCount, IndexState}, + {Q2a, PushState1} = push_betas_to_deltas( + fun ?QUEUE:out/1, + fun (Q2MinSeqId) -> Q2MinSeqId end, + Q2, PushState), + {Q3a, PushState2} = push_betas_to_deltas( + fun ?QUEUE:out_r/1, + fun rabbit_queue_index:next_segment_boundary/1, + Q3, PushState1), + {_, Delta1, RamIndexCount1, IndexState1} = PushState2, State #vqstate { q2 = Q2a, - delta = Delta4, + delta = Delta1, q3 = Q3a, - index_state = IndexState3, - ram_index_count = RamIndexCount3 }. - -push_betas_to_deltas(_DeltaFun, _LimitFun, _Generator, 0, Q, RamIndexCount, - IndexState) -> - {0, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; -push_betas_to_deltas(DeltaFun, LimitFun, Generator, Quota, Q, RamIndexCount, - IndexState) -> + index_state = IndexState1, + ram_index_count = RamIndexCount1 }. + +push_betas_to_deltas(_Generator, _LimitFun, Q, + {0, _Delta, _RamIndexCount, _IndexState} = PushState) -> + {Q, PushState}; +push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of true -> - {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; + {Q, PushState}; false -> {{value, #msg_status { seq_id = MinSeqId }}, _Qa} = ?QUEUE:out(Q), {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of - true -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; - false -> {FirstSeqId, LastSeqId, Len, Qc, - RamIndexCount1, IndexState1} = - push_betas_to_deltas( - Generator, Limit, undefined, undefined, 0, Quota, - Q, RamIndexCount, IndexState), - {Quota - Len, DeltaFun(FirstSeqId, LastSeqId, Len), - Qc, RamIndexCount1, IndexState1} + true -> {Q, PushState}; + false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) end end. -push_betas_to_deltas(_Generator, _Limit, FirstSeqId, LastSeqId, Count, 0, Q, - RamIndexCount, IndexState) -> - {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState}; -push_betas_to_deltas(Generator, Limit, FirstSeqId, LastSeqId, Count, Quota, Q, - RamIndexCount, IndexState) -> +push_betas_to_deltas1(_Generator, _Limit, Q, + {0, _Delta, _RamIndexCount, _IndexState} = PushState) -> + {Q, PushState}; +push_betas_to_deltas1(Generator, Limit, Q, + {Quota, Delta, RamIndexCount, IndexState} = PushState) -> case Generator(Q) of {empty, _Q} -> - {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState}; + {Q, PushState}; {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> - {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState}; - {{value, #msg_status { seq_id = SeqId }}, _Qa} - when FirstSeqId =:= undefined -> - push_betas_to_deltas(Generator, Limit, SeqId, SeqId, Count, Quota, - Q, RamIndexCount, IndexState); + {Q, PushState}; {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk, seq_id = SeqId }}, Qa} -> {Quota1, RamIndexCount1, IndexState1} = @@ -1733,9 +1715,9 @@ push_betas_to_deltas(Generator, Limit, FirstSeqId, LastSeqId, Count, Quota, Q, IndexState), {Quota - 1, RamIndexCount - 1, IndexState2} end, - push_betas_to_deltas( - Generator, Limit, FirstSeqId, SeqId, Count + 1, Quota1, Qa, - RamIndexCount1, IndexState1) + Delta1 = expand_delta(SeqId, Delta), + push_betas_to_deltas1(Generator, Limit, Qa, + {Quota1, Delta1, RamIndexCount1, IndexState1}) end. %%---------------------------------------------------------------------------- |