summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-03 11:29:38 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-03 11:29:38 +0000
commit3499cb05a36e41b456a807935811e097bd4a676d (patch)
treebf0165af4e0e3f96d24442b775d5fbf96148035f
parent423a27191f248d75362ba6675c401d6968cf8497 (diff)
downloadrabbitmq-server-3499cb05a36e41b456a807935811e097bd4a676d.tar.gz
Further experimentation. Still not quite working
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_variable_queue.erl42
2 files changed, 35 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2c975b..83b360b8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -239,6 +239,7 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
State#q{sync_timer_ref = undefined}.
ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
+ io:format("Ensuring rate timer~n"),
{ok, TRef} = timer:apply_after(
?RAM_DURATION_UPDATE_INTERVAL,
rabbit_amqqueue, update_ram_duration,
@@ -963,7 +964,9 @@ handle_cast({flush, ChPid}, State) ->
handle_cast(update_ram_duration, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ io:format("Before ram duration~n"),
{RamDuration, BQS1} = BQ:ram_duration(BQS),
+ io:format("RamDuration~p~n", [RamDuration]),
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d2f79eb6..ad62248a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -688,8 +688,13 @@ set_ram_duration_target(DurationTarget,
State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,
avg_ingress = AvgIngressRate },
+ ack_rates =
+ #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
target_ram_msg_count = TargetRamMsgCount }) ->
- Rate = AvgEgressRate + AvgIngressRate,
+ io:format("set:~p~n", [DurationTarget]),
+ Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate
+ + AvgAckIngressRate,
TargetRamMsgCount1 =
case DurationTarget of
infinity -> infinity;
@@ -717,6 +722,7 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCountPrev,
ram_ack_index = RamAckIndex,
ram_ack_count_prev = RamAckCountPrev }) ->
+ io:format("Setting ram duration~n"),
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
@@ -729,7 +735,8 @@ ram_duration(State = #vqstate {
RamAckCount = gb_trees:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 of
+ case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
@@ -737,6 +744,7 @@ ram_duration(State = #vqstate {
AvgAckEgressRate + AvgAckIngressRate))
end,
+ io:format("Duration:~p~n", [Duration]),
{Duration, State #vqstate {
rates = Rates #rates {
egress = Egress1,
@@ -1335,14 +1343,11 @@ find_persistent_count(LensByStore) ->
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
- Size = gb_trees:size(State #vqstate.ram_ack_index),
- State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of
- 0 -> State;
- S -> io:format("Limiting~n"), limit_ram_acks(S, State)
- end,
- {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count,
- State #vqstate.target_ram_msg_count) of
- 0 -> {false, State1};
+ %%io:format("Reducing mem~p~p~n", [State #vqstate.target_ram_msg_count, State #vqstate.ram_msg_count]),
+ {ReduceAck, State1} = {false, State}, %%reduce_ack_memory_use(State),
+ {Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count,
+ State1 #vqstate.target_ram_msg_count) of
+ 0 -> {ReduceAck, State1};
S1 -> {true, AlphaBetaFun(S1, State1)}
end,
case State2 #vqstate.target_ram_msg_count of
@@ -1355,6 +1360,23 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
end
end.
+reduce_ack_memory_use(State = #vqstate { target_ram_msg_count = infinity }) ->
+ {false, State};
+reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount,
+ ram_msg_count = RamMsgCount,
+ ram_ack_index = RamAckIndex} ) ->
+ io:format("RAI:~p,TRMC:~p,RMC:~p~n", [gb_trees:size(RamAckIndex), TargetRamMsgCount, RamMsgCount]),
+ PermittedAckCount = case TargetRamMsgCount > RamMsgCount of
+ true -> TargetRamMsgCount - RamMsgCount;
+ false -> 0
+ end,
+ case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of
+ 0 -> {false, State};
+ C -> {true, limit_ram_acks(C, State)}
+ end.
+
+
+
limit_ram_acks(0, State) ->
State;
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,