summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-12 20:55:20 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-12 20:55:20 +0100
commit0e3b7b08aec68f9ec177d97cf7250bacdb2caa48 (patch)
treed906e0da648b084aaa57b3da2a590a75cf97351f
parent440329710e81289609f96d9997c7a4c5cc44efc1 (diff)
downloadrabbitmq-server-0e3b7b08aec68f9ec177d97cf7250bacdb2caa48.tar.gz
Because I've got rid of ram_index_count (i.e. betas count), and because the guard around whether or not betas_to_deltas is now based on q2 and q3 len rather than ram_index_count, we no longer need to have the separate gammas_to_deltas fun, which means we should no longer have the problem with it being called too often as we should be able to rely on the chunk stuff working out.
-rw-r--r--src/rabbit_variable_queue.erl68
1 files changed, 20 insertions, 48 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9951359a..d418a10f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -756,7 +756,6 @@ needs_timeout(State) ->
fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
fun (_Quota, State1) -> {0, State1} end,
- fun null_gamma_delta/1,
State) of
{true, _State} -> idle;
{false, _State} -> false
@@ -764,21 +763,6 @@ needs_timeout(State) ->
true -> timed
end.
-null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) ->
- {null_gamma_delta_msg(?QUEUE:peek(Q2), ?QUEUE:peek(Q2),
- fun (SeqId) -> SeqId end) orelse
- null_gamma_delta_msg(?QUEUE:peek_r(Q3), ?QUEUE:peek(Q3),
- fun rabbit_queue_index:next_segment_boundary/1),
- State}.
-
-null_gamma_delta_msg({value, #msg_status { seq_id = SeqId1,
- index_on_disk = true }},
- {value, #msg_status { seq_id = SeqId2 }},
- LimitFun) ->
- LimitFun =:= undefined orelse SeqId1 >= LimitFun(SeqId2);
-null_gamma_delta_msg(_, _, _) ->
- false.
-
timeout(State) ->
a(reduce_memory_use(confirm_commit_index(State))).
@@ -1448,10 +1432,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun,
+reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
State = #vqstate {target_ram_count = infinity}) ->
{false, State};
-reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun,
+reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
State = #vqstate {
ram_ack_index = RamAckIndex,
ram_msg_count = RamMsgCount,
@@ -1483,14 +1467,11 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun,
{true, State2}
end,
- {Reduce1, State3} =
- case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
- _ -> {Reduce, State1}
- end,
- {Reduce2, State4} = GammaDeltaFun(State3),
- {Reduce1 orelse Reduce2, State4}.
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end.
limit_ram_acks(0, State) ->
{0, State};
@@ -1515,7 +1496,6 @@ reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
fun limit_ram_acks/2,
- fun push_gammas_to_deltas/1,
State),
State1.
@@ -1664,31 +1644,36 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
q3 = Q3,
index_state = IndexState }) ->
PushState = {Quota, Delta, IndexState},
- {Q3a, PushState1} = push_with_limit(
+ {Q3a, PushState1} = push_betas_to_deltas(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
- Q3, fun push_betas_to_deltas1/4, PushState),
- {Q2a, PushState2} = push_with_limit(
+ Q3, PushState),
+ {Q2a, PushState2} = push_betas_to_deltas(
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, fun push_betas_to_deltas1/4, PushState1),
+ Q2, PushState1),
{_, Delta1, IndexState1} = PushState2,
State #vqstate { q2 = Q2a,
delta = Delta1,
q3 = Q3a,
index_state = IndexState1 }.
-push_with_limit(Generator, LimitFun, Q, PushFun, PushState) ->
+push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
{Q, PushState};
false ->
- {{value, #msg_status { seq_id = MinSeqId }}, _Qa} = ?QUEUE:out(Q),
- {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q),
+ {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
+ {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Q, PushState};
- false -> PushFun(Generator, Limit, Q, PushState)
+ false -> {Q1, {Quota, Delta, IndexState}} =
+ push_betas_to_deltas1(
+ Generator, Limit, Q, PushState),
+ {Q2, Delta1} =
+ push_gammas_to_deltas(Generator, Limit, Q1, Delta),
+ {Q2, {Quota, Delta1, IndexState}}
end
end.
@@ -1719,19 +1704,6 @@ push_betas_to_deltas1(Generator, Limit, Q,
{Quota1, Delta1, IndexState1})
end.
-push_gammas_to_deltas(State = #vqstate { q2 = Q2,
- delta = Delta,
- q3 = Q3 }) ->
- {Q2a, Delta1} = push_with_limit(
- fun ?QUEUE:out/1,
- fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, fun push_gammas_to_deltas/4, Delta),
- {Q3a, Delta2} = push_with_limit(
- fun ?QUEUE:out_r/1,
- fun rabbit_queue_index:next_segment_boundary/1,
- Q3, fun push_gammas_to_deltas/4, Delta1),
- {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}.
-
push_gammas_to_deltas(Generator, Limit, Q, Delta) ->
case Generator(Q) of
{{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1}