summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-11-18 13:21:26 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-11-18 13:21:26 +0000
commitef3a945414a2df5779e183f4c7d599e708ff107f (patch)
tree45242a676ec6970b33ef9ac81b160694b0a9e111
parent18acdc0126fa7e36c1611e424d551ba39e70f139 (diff)
downloadrabbitmq-server-ef3a945414a2df5779e183f4c7d599e708ff107f.tar.gz
Minor refactorings, and a couple of fixes
-rw-r--r--src/rabbit_tests.erl10
-rw-r--r--src/rabbit_variable_queue.erl96
2 files changed, 54 insertions, 52 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3130bca3..27e4d925 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1870,7 +1870,7 @@ test_variable_queue() ->
passed.
test_variable_queue_ack_limiting(VQ0) ->
- %% start by sending in a bunch of messages <
+ %% start by sending in a bunch of messages
Len = 1024,
VQ1 = variable_queue_publish(false, Len, VQ0),
@@ -1884,11 +1884,11 @@ test_variable_queue_ack_limiting(VQ0) ->
%% fetch half the messages
{VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
- VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
- {ram_ack_count , Len div 2},
- {ram_msg_count , Len div 2}]),
+ VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
+ {ram_ack_count, Len div 2},
+ {ram_msg_count, Len div 2}]),
- %% quarter the allowed duration
+ %% ensure all acks go to disk on 0 duration target
VQ6 = check_variable_queue_status(
rabbit_variable_queue:set_ram_duration_target(0, VQ5),
[{len, Len div 2},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 48f621e1..e9910a56 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -171,19 +171,18 @@
%% segments.
%%
%% Pending acks are recorded in memory either as the tuple {SeqId,
-%% Guid, MsgProps} (tuple form) or as the message itself (message
-%% form). Acks for persistent messages are always stored in the tuple
-%% form. Acks for transient messages are also stored in tuple form if
-%% the message has been forgotten to disk as part of the memory
-%% reduction process. For transient messages that haven't already been
-%% written to disk, acks are stored in message form to avoid the
-%% overhead of writing to disk.
+%% Guid, MsgProps} (tuple-form) or as the message itself (message-
+%% form). Acks for persistent messages are always stored in the tuple-
+%% form. Acks for transient messages are also stored in tuple-form if
+%% the message has been sent to disk as part of the memory reduction
+%% process. For transient messages that haven't already been written
+%% to disk, acks are stored in message-form.
%%
-%% During memory reduction, acks stored in message form are converted
-%% to tuple form, and the corresponding messages are pushed out to
+%% During memory reduction, acks stored in message-form are converted
+%% to tuple-form, and the corresponding messages are pushed out to
%% disk.
%%
-%% The order in which alphas are pushed to betas and message form acks
+%% The order in which alphas are pushed to betas and message-form acks
%% are pushed to disk is determined dynamically. We always prefer to
%% push messages for the source (alphas or acks) that is growing the
%% fastest (with growth measured as avg. ingress - avg. egress). In
@@ -710,14 +709,15 @@ is_empty(State) -> 0 == len(State).
set_ram_duration_target(DurationTarget,
State = #vqstate {
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
+ rates =
+ #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
ack_rates =
#rates { avg_egress = AvgAckEgressRate,
avg_ingress = AvgAckIngressRate },
target_ram_item_count = TargetRamItemCount }) ->
- Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate
- + AvgAckIngressRate,
+ Rate =
+ AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
TargetRamItemCount1 =
case DurationTarget of
infinity -> infinity;
@@ -732,10 +732,11 @@ set_ram_duration_target(DurationTarget,
end).
ram_duration(State = #vqstate {
- rates = #rates { egress = Egress,
- ingress = Ingress,
- timestamp = Timestamp } = Rates,
- ack_rates = #rates { egress = AckEgress,
+ rates = #rates { timestamp = Timestamp,
+ egress = Egress,
+ ingress = Ingress } = Rates,
+ ack_rates = #rates { timestamp = AckTimestamp,
+ egress = AckEgress,
ingress = AckIngress } = ARates,
in_counter = InCount,
out_counter = OutCount,
@@ -749,10 +750,10 @@ ram_duration(State = #vqstate {
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
- {AvgAckEgressRate, AckEgress1} =
- update_rate(Now, Timestamp, AckOutCount, AckEgress),
+ {AvgAckEgressRate, AckEgress1} =
+ update_rate(Now, AckTimestamp, AckOutCount, AckEgress),
{AvgAckIngressRate, AckIngress1} =
- update_rate(Now, Timestamp, AckInCount, AckIngress),
+ update_rate(Now, AckTimestamp, AckInCount, AckIngress),
RamAckCount = gb_trees:size(RamAckIndex),
@@ -762,7 +763,7 @@ ram_duration(State = #vqstate {
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
- (2 * (AvgEgressRate + AvgIngressRate +
+ (4 * (AvgEgressRate + AvgIngressRate +
AvgAckEgressRate + AvgAckIngressRate))
end,
@@ -774,12 +775,15 @@ ram_duration(State = #vqstate {
avg_ingress = AvgIngressRate,
timestamp = Now },
ack_rates = ARates #rates {
- egress = AckEgress1,
- ingress = AckIngress1,
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
+ egress = AckEgress1,
+ ingress = AckIngress1,
+ avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate,
+ timestamp = Now },
in_counter = 0,
out_counter = 0,
+ ack_in_counter = 0,
+ ack_out_counter = 0,
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
@@ -1047,18 +1051,17 @@ init(IsDurable, IndexState, DeltaCount, Terms,
in_counter = 0,
ack_out_counter = 0,
ack_in_counter = 0,
- rates = #rates { egress = {Now, 0},
- ingress = {Now, DeltaCount1},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Now },
- ack_rates = #rates { egress = {Now, 0},
- ingress = {Now, 0},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = undefined } },
+ rates = blank_rate(Now, DeltaCount1),
+ ack_rates = blank_rate(Now, 0) },
a(maybe_deltas_to_betas(State)).
+blank_rate(Timestamp, IngressLength) ->
+ #rates { egress = {Timestamp, 0},
+ ingress = {Timestamp, IngressLength},
+ avg_egress = 0.0,
+ avg_ingress = 0.0,
+ timestamp = Timestamp }.
+
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
@@ -1315,7 +1318,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ack_out_counter = AckOutCount }} =
lists:foldl(
fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
- ram_ack_index = RAI}}) ->
+ ram_ack_index = RAI }}) ->
AckEntry = dict:fetch(SeqId, PA),
{accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
@@ -1403,28 +1406,27 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
%% acks, push queue messages first.
[AlphaBetaFun, AckFun]
end,
- {_, StateOut} =
+ {_, State2} =
%% Both reduce functions get a chance to reduce
%% memory. The second may very well get a quota of
%% 0 if the first function managed to push out the
%% maximum number of messages.
lists:foldl(
- fun(ReduceFun, {QuotaN, StateN}) ->
+ fun (ReduceFun, {QuotaN, StateN}) ->
ReduceFun(QuotaN, StateN)
end, {S1, State}, ReduceFuns),
- {true, StateOut}
+ {true, State2}
end,
case State1 #vqstate.target_ram_item_count of
- 0 -> {Reduce, BetaDeltaFun(State1)};
- _ -> case chunk_size(State1 #vqstate.ram_index_count,
- permitted_ram_index_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
- _ -> {Reduce, State1}
- end
+ 0 -> {Reduce, BetaDeltaFun(State1)};
+ _ -> case chunk_size(State1 #vqstate.ram_index_count,
+ permitted_ram_index_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end
end.
-
limit_ram_acks(0, State) ->
{0, State};
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,