diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-02 14:53:31 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-02 14:53:31 +0100 |
commit | 364b3898e14f9c801397639ce3df9d5f92588cb0 (patch) | |
tree | 6238b9cc4550f1d8a0aa2a9de65d7ebf3d729488 | |
parent | eb65a2e9e4e31379067859f5c544354d4b002905 (diff) | |
download | rabbitmq-server-364b3898e14f9c801397639ce3df9d5f92588cb0.tar.gz |
Avoid lots of combine_delta calls
-rw-r--r-- | src/lqueue.erl | 42 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 90 |
2 files changed, 87 insertions, 45 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl index 8bbd52d9..ea54ffd4 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -1,17 +1,33 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% + -module(lqueue). --compile([export_all]). +-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, + foldl/3, foldr/3, from_list/1, to_list/1]). -define(QUEUE, queue). new() -> {0, ?QUEUE:new()}. -is_empty({0, _Q}) -> - true; -is_empty(_) -> - false. +is_empty({0, _Q}) -> true; +is_empty(_) -> false. in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}. + in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}. out({0, _Q} = Q) -> @@ -29,16 +45,20 @@ out_r({L, Q}) -> join({L1, Q1}, {L2, Q2}) -> {L1 + L2, ?QUEUE:join(Q1, Q2)}. -to_list({_L, Q}) -> - ?QUEUE:to_list(Q). +to_list({_L, Q}) -> ?QUEUE:to_list(Q). -from_list(L) -> - {length(L), ?QUEUE:from_list(L)}. +from_list(L) -> {length(L), ?QUEUE:from_list(L)}. -queue_fold(Fun, Init, Q) -> +foldl(Fun, Init, Q) -> case out(Q) of {empty, _Q} -> Init; - {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + {{value, V}, Q1} -> foldl(Fun, Fun(V, Init), Q1) + end. + +foldr(Fun, Init, Q) -> + case out_r(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1) end. len({L, _Q}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 347749e7..310d9357 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -476,7 +476,7 @@ purge(State = #vqstate { q4 = Q4, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( - fun ?QUEUE:queue_fold/3, Q4, + fun ?QUEUE:foldl/3, Q4, orddict:new(), IndexState, MSCState), {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2, @@ -485,7 +485,7 @@ purge(State = #vqstate { q4 = Q4, State #vqstate { q4 = ?QUEUE:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( - fun ?QUEUE:queue_fold/3, Q1, + fun ?QUEUE:foldl/3, Q1, LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), @@ -869,20 +869,22 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; false -> case gb_trees:is_defined(SeqId, PA) of - false -> {?QUEUE:in_r( - m(#msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_props = MsgProps - }), Filtered1), + false -> + {?QUEUE:in_r( + m(#msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps + }), Filtered1), Delivers1, Acks1}; - true -> Acc + true -> + Acc end end end, {?QUEUE:new(), [], []}, List), @@ -916,9 +918,7 @@ combine_deltas(#delta { start_seq_id = StartLow, andalso ((StartLow + Count) =< EndHigh), #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. -combine_deltas_flip(A, B) -> combine_deltas(B, A). - -beta_fold(Fun, Init, Q) -> ?QUEUE:queue_fold(Fun, Init, Q). +beta_fold(Fun, Init, Q) -> ?QUEUE:foldl(Fun, Init, Q). update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous @@ -1683,13 +1683,26 @@ push_betas_to_deltas(Quota, q3 = Q3, index_state = IndexState, ram_index_count = RamIndexCount }) -> + Q2DeltaFun = fun (undefined, undefined, 0) -> ?BLANK_DELTA; + (Low, High, Len) when High >= Low -> + #delta { start_seq_id = Low, + count = Len, + end_seq_id = High + 1 } + end, + Q3DeltaFun = fun (undefined, undefined, 0) -> ?BLANK_DELTA; + (High, Low, Len) when High >= Low -> + #delta { start_seq_id = Low, + count = Len, + end_seq_id = High + 1 } + end, {Quota1, Delta2, Q2a, RamIndexCount2, IndexState2} = - push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, - fun ?QUEUE:out/1, fun combine_deltas/2, Quota, Q2, + push_betas_to_deltas(Q2DeltaFun, fun (Q2MinSeqId) -> Q2MinSeqId end, + fun ?QUEUE:out/1, Quota, Q2, RamIndexCount, IndexState), {_Quota2, Delta3, Q3a, RamIndexCount3, IndexState3} = - push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, - fun ?QUEUE:out_r/1, fun combine_deltas_flip/2, Quota1, Q3, + push_betas_to_deltas(Q3DeltaFun, + fun rabbit_queue_index:next_segment_boundary/1, + fun ?QUEUE:out_r/1, Quota1, Q3, RamIndexCount2, IndexState2), Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)), State #vqstate { q2 = Q2a, @@ -1698,7 +1711,11 @@ push_betas_to_deltas(Quota, index_state = IndexState3, ram_index_count = RamIndexCount3 }. -push_betas_to_deltas(LimitFun, Generator, Combine, Quota, Q, RamIndexCount, IndexState) -> +push_betas_to_deltas(_DeltaFun, _LimitFun, _Generator, 0, Q, RamIndexCount, + IndexState) -> + {0, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; +push_betas_to_deltas(DeltaFun, LimitFun, Generator, Quota, Q, RamIndexCount, + IndexState) -> case ?QUEUE:out(Q) of {empty, _Q} -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; @@ -1707,24 +1724,31 @@ push_betas_to_deltas(LimitFun, Generator, Combine, Quota, Q, RamIndexCount, Inde Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState}; - false -> {Delta, Qc, RamIndexCount1, IndexState1} = + false -> {FirstSeqId, LastSeqId, Len, Qc, + RamIndexCount1, IndexState1} = push_betas_to_deltas( - Generator, Combine, Limit, Quota, ?BLANK_DELTA, + Generator, Limit, undefined, undefined, 0, Quota, Q, RamIndexCount, IndexState), - {Quota - Delta #delta.count, Delta, + {Quota - Len, DeltaFun(FirstSeqId, LastSeqId, Len), Qc, RamIndexCount1, IndexState1} end end. -push_betas_to_deltas(_Generator, _Combine, _Limit, 0, Delta, Q, RamIndexCount, IndexState) -> - {Delta, Q, RamIndexCount, IndexState}; -push_betas_to_deltas(Generator, Combine, Limit, Quota, Delta, Q, RamIndexCount, IndexState) -> +push_betas_to_deltas(_Generator, _Limit, FirstSeqId, LastSeqId, Count, 0, Q, + RamIndexCount, IndexState) -> + {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState}; +push_betas_to_deltas(Generator, Limit, FirstSeqId, LastSeqId, Count, Quota, Q, + RamIndexCount, IndexState) -> case Generator(Q) of {empty, _Q} -> - {Delta, Q, RamIndexCount, IndexState}; + {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState}; {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> - {Delta, Q, RamIndexCount, IndexState}; + {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState}; + {{value, #msg_status { seq_id = SeqId }}, _Qa} + when FirstSeqId =:= undefined -> + push_betas_to_deltas(Generator, Limit, SeqId, SeqId, Count, Quota, + Q, RamIndexCount, IndexState); {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk, seq_id = SeqId }}, Qa} -> {Quota1, RamIndexCount1, IndexState1} = @@ -1736,11 +1760,9 @@ push_betas_to_deltas(Generator, Combine, Limit, Quota, Delta, Q, RamIndexCount, IndexState), {Quota - 1, RamIndexCount - 1, IndexState2} end, - Delta1 = Combine(Delta, #delta { start_seq_id = SeqId, - count = 1, - end_seq_id = SeqId + 1 }), push_betas_to_deltas( - Generator, Combine, Limit, Quota1, Delta1, Qa, RamIndexCount1, IndexState1) + Generator, Limit, FirstSeqId, SeqId, Count + 1, Quota1, Qa, + RamIndexCount1, IndexState1) end. %%---------------------------------------------------------------------------- |