summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-02 14:53:31 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-02 14:53:31 +0100
commit364b3898e14f9c801397639ce3df9d5f92588cb0 (patch)
tree6238b9cc4550f1d8a0aa2a9de65d7ebf3d729488
parenteb65a2e9e4e31379067859f5c544354d4b002905 (diff)
downloadrabbitmq-server-364b3898e14f9c801397639ce3df9d5f92588cb0.tar.gz
Avoid lots of combine_delta calls
-rw-r--r--src/lqueue.erl42
-rw-r--r--src/rabbit_variable_queue.erl90
2 files changed, 87 insertions, 45 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 8bbd52d9..ea54ffd4 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -1,17 +1,33 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%%
+
-module(lqueue).
--compile([export_all]).
+-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
+ foldl/3, foldr/3, from_list/1, to_list/1]).
-define(QUEUE, queue).
new() -> {0, ?QUEUE:new()}.
-is_empty({0, _Q}) ->
- true;
-is_empty(_) ->
- false.
+is_empty({0, _Q}) -> true;
+is_empty(_) -> false.
in(V, {L, Q}) -> {L+1, ?QUEUE:in(V, Q)}.
+
in_r(V, {L, Q}) -> {L+1, ?QUEUE:in_r(V, Q)}.
out({0, _Q} = Q) ->
@@ -29,16 +45,20 @@ out_r({L, Q}) ->
join({L1, Q1}, {L2, Q2}) ->
{L1 + L2, ?QUEUE:join(Q1, Q2)}.
-to_list({_L, Q}) ->
- ?QUEUE:to_list(Q).
+to_list({_L, Q}) -> ?QUEUE:to_list(Q).
-from_list(L) ->
- {length(L), ?QUEUE:from_list(L)}.
+from_list(L) -> {length(L), ?QUEUE:from_list(L)}.
-queue_fold(Fun, Init, Q) ->
+foldl(Fun, Init, Q) ->
case out(Q) of
{empty, _Q} -> Init;
- {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ {{value, V}, Q1} -> foldl(Fun, Fun(V, Init), Q1)
+ end.
+
+foldr(Fun, Init, Q) ->
+ case out_r(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1)
end.
len({L, _Q}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 347749e7..310d9357 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -476,7 +476,7 @@ purge(State = #vqstate { q4 = Q4,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
{LensByStore, IndexState1} = remove_queue_entries(
- fun ?QUEUE:queue_fold/3, Q4,
+ fun ?QUEUE:foldl/3, Q4,
orddict:new(), IndexState, MSCState),
{LensByStore1, State1 = #vqstate { q1 = Q1,
index_state = IndexState2,
@@ -485,7 +485,7 @@ purge(State = #vqstate { q4 = Q4,
State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
{LensByStore2, IndexState3} = remove_queue_entries(
- fun ?QUEUE:queue_fold/3, Q1,
+ fun ?QUEUE:foldl/3, Q1,
LensByStore1, IndexState2, MSCState1),
PCount1 = PCount - find_persistent_count(LensByStore2),
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
@@ -869,20 +869,22 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
false -> case gb_trees:is_defined(SeqId, PA) of
- false -> {?QUEUE:in_r(
- m(#msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }), Filtered1),
+ false ->
+ {?QUEUE:in_r(
+ m(#msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps
+ }), Filtered1),
Delivers1,
Acks1};
- true -> Acc
+ true ->
+ Acc
end
end
end, {?QUEUE:new(), [], []}, List),
@@ -916,9 +918,7 @@ combine_deltas(#delta { start_seq_id = StartLow,
andalso ((StartLow + Count) =< EndHigh),
#delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
-combine_deltas_flip(A, B) -> combine_deltas(B, A).
-
-beta_fold(Fun, Init, Q) -> ?QUEUE:queue_fold(Fun, Init, Q).
+beta_fold(Fun, Init, Q) -> ?QUEUE:foldl(Fun, Init, Q).
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -1683,13 +1683,26 @@ push_betas_to_deltas(Quota,
q3 = Q3,
index_state = IndexState,
ram_index_count = RamIndexCount }) ->
+ Q2DeltaFun = fun (undefined, undefined, 0) -> ?BLANK_DELTA;
+ (Low, High, Len) when High >= Low ->
+ #delta { start_seq_id = Low,
+ count = Len,
+ end_seq_id = High + 1 }
+ end,
+ Q3DeltaFun = fun (undefined, undefined, 0) -> ?BLANK_DELTA;
+ (High, Low, Len) when High >= Low ->
+ #delta { start_seq_id = Low,
+ count = Len,
+ end_seq_id = High + 1 }
+ end,
{Quota1, Delta2, Q2a, RamIndexCount2, IndexState2} =
- push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
- fun ?QUEUE:out/1, fun combine_deltas/2, Quota, Q2,
+ push_betas_to_deltas(Q2DeltaFun, fun (Q2MinSeqId) -> Q2MinSeqId end,
+ fun ?QUEUE:out/1, Quota, Q2,
RamIndexCount, IndexState),
{_Quota2, Delta3, Q3a, RamIndexCount3, IndexState3} =
- push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
- fun ?QUEUE:out_r/1, fun combine_deltas_flip/2, Quota1, Q3,
+ push_betas_to_deltas(Q3DeltaFun,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ fun ?QUEUE:out_r/1, Quota1, Q3,
RamIndexCount2, IndexState2),
Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
State #vqstate { q2 = Q2a,
@@ -1698,7 +1711,11 @@ push_betas_to_deltas(Quota,
index_state = IndexState3,
ram_index_count = RamIndexCount3 }.
-push_betas_to_deltas(LimitFun, Generator, Combine, Quota, Q, RamIndexCount, IndexState) ->
+push_betas_to_deltas(_DeltaFun, _LimitFun, _Generator, 0, Q, RamIndexCount,
+ IndexState) ->
+ {0, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
+push_betas_to_deltas(DeltaFun, LimitFun, Generator, Quota, Q, RamIndexCount,
+ IndexState) ->
case ?QUEUE:out(Q) of
{empty, _Q} ->
{Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
@@ -1707,24 +1724,31 @@ push_betas_to_deltas(LimitFun, Generator, Combine, Quota, Q, RamIndexCount, Inde
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Quota, ?BLANK_DELTA, Q, RamIndexCount, IndexState};
- false -> {Delta, Qc, RamIndexCount1, IndexState1} =
+ false -> {FirstSeqId, LastSeqId, Len, Qc,
+ RamIndexCount1, IndexState1} =
push_betas_to_deltas(
- Generator, Combine, Limit, Quota, ?BLANK_DELTA,
+ Generator, Limit, undefined, undefined, 0, Quota,
Q, RamIndexCount, IndexState),
- {Quota - Delta #delta.count, Delta,
+ {Quota - Len, DeltaFun(FirstSeqId, LastSeqId, Len),
Qc, RamIndexCount1, IndexState1}
end
end.
-push_betas_to_deltas(_Generator, _Combine, _Limit, 0, Delta, Q, RamIndexCount, IndexState) ->
- {Delta, Q, RamIndexCount, IndexState};
-push_betas_to_deltas(Generator, Combine, Limit, Quota, Delta, Q, RamIndexCount, IndexState) ->
+push_betas_to_deltas(_Generator, _Limit, FirstSeqId, LastSeqId, Count, 0, Q,
+ RamIndexCount, IndexState) ->
+ {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState};
+push_betas_to_deltas(Generator, Limit, FirstSeqId, LastSeqId, Count, Quota, Q,
+ RamIndexCount, IndexState) ->
case Generator(Q) of
{empty, _Q} ->
- {Delta, Q, RamIndexCount, IndexState};
+ {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState};
{{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
- {Delta, Q, RamIndexCount, IndexState};
+ {FirstSeqId, LastSeqId, Count, Q, RamIndexCount, IndexState};
+ {{value, #msg_status { seq_id = SeqId }}, _Qa}
+ when FirstSeqId =:= undefined ->
+ push_betas_to_deltas(Generator, Limit, SeqId, SeqId, Count, Quota,
+ Q, RamIndexCount, IndexState);
{{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk,
seq_id = SeqId }}, Qa} ->
{Quota1, RamIndexCount1, IndexState1} =
@@ -1736,11 +1760,9 @@ push_betas_to_deltas(Generator, Combine, Limit, Quota, Delta, Q, RamIndexCount,
IndexState),
{Quota - 1, RamIndexCount - 1, IndexState2}
end,
- Delta1 = Combine(Delta, #delta { start_seq_id = SeqId,
- count = 1,
- end_seq_id = SeqId + 1 }),
push_betas_to_deltas(
- Generator, Combine, Limit, Quota1, Delta1, Qa, RamIndexCount1, IndexState1)
+ Generator, Limit, FirstSeqId, SeqId, Count + 1, Quota1, Qa,
+ RamIndexCount1, IndexState1)
end.
%%----------------------------------------------------------------------------