diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-03 11:29:38 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-03 11:29:38 +0000 |
commit | 3499cb05a36e41b456a807935811e097bd4a676d (patch) | |
tree | bf0165af4e0e3f96d24442b775d5fbf96148035f | |
parent | 423a27191f248d75362ba6675c401d6968cf8497 (diff) | |
download | rabbitmq-server-3499cb05a36e41b456a807935811e097bd4a676d.tar.gz |
Further experimentation. Still not quite working
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 42 |
2 files changed, 35 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2c975b..83b360b8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -239,6 +239,7 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> State#q{sync_timer_ref = undefined}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> + io:format("Ensuring rate timer~n"), {ok, TRef} = timer:apply_after( ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, update_ram_duration, @@ -963,7 +964,9 @@ handle_cast({flush, ChPid}, State) -> handle_cast(update_ram_duration, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + io:format("Before ram duration~n"), {RamDuration, BQS1} = BQ:ram_duration(BQS), + io:format("RamDuration~p~n", [RamDuration]), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d2f79eb6..ad62248a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -688,8 +688,13 @@ set_ram_duration_target(DurationTarget, State = #vqstate { rates = #rates { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate }, + ack_rates = + #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, target_ram_msg_count = TargetRamMsgCount }) -> - Rate = AvgEgressRate + AvgIngressRate, + io:format("set:~p~n", [DurationTarget]), + Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + + AvgAckIngressRate, TargetRamMsgCount1 = case DurationTarget of infinity -> infinity; @@ -717,6 +722,7 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCountPrev, ram_ack_index = RamAckIndex, ram_ack_count_prev = RamAckCountPrev }) -> + io:format("Setting ram duration~n"), Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), @@ -729,7 +735,8 @@ ram_duration(State = #vqstate { RamAckCount = gb_trees:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 of + case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of true -> infinity; false -> (RamMsgCountPrev + RamMsgCount + RamAckCount + RamAckCountPrev) / @@ -737,6 +744,7 @@ ram_duration(State = #vqstate { AvgAckEgressRate + AvgAckIngressRate)) end, + io:format("Duration:~p~n", [Duration]), {Duration, State #vqstate { rates = Rates #rates { egress = Egress1, @@ -1335,14 +1343,11 @@ find_persistent_count(LensByStore) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - Size = gb_trees:size(State #vqstate.ram_ack_index), - State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of - 0 -> State; - S -> io:format("Limiting~n"), limit_ram_acks(S, State) - end, - {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count, - State #vqstate.target_ram_msg_count) of - 0 -> {false, State1}; + %%io:format("Reducing mem~p~p~n", [State #vqstate.target_ram_msg_count, State #vqstate.ram_msg_count]), + {ReduceAck, State1} = {false, State}, %%reduce_ack_memory_use(State), + {Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count, + State1 #vqstate.target_ram_msg_count) of + 0 -> {ReduceAck, State1}; S1 -> {true, AlphaBetaFun(S1, State1)} end, case State2 #vqstate.target_ram_msg_count of @@ -1355,6 +1360,23 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> end end. +reduce_ack_memory_use(State = #vqstate { target_ram_msg_count = infinity }) -> + {false, State}; +reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount, + ram_ack_index = RamAckIndex} ) -> + io:format("RAI:~p,TRMC:~p,RMC:~p~n", [gb_trees:size(RamAckIndex), TargetRamMsgCount, RamMsgCount]), + PermittedAckCount = case TargetRamMsgCount > RamMsgCount of + true -> TargetRamMsgCount - RamMsgCount; + false -> 0 + end, + case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of + 0 -> {false, State}; + C -> {true, limit_ram_acks(C, State)} + end. + + + limit_ram_acks(0, State) -> State; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, |