diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-03 14:30:39 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-03 14:30:39 +0000 |
commit | a14752efdaac19e4f771673e06c26c7b2037db39 (patch) | |
tree | 4d4f40e5e48fe5b2766f91ddf161b82c28b463e2 | |
parent | 3499cb05a36e41b456a807935811e097bd4a676d (diff) | |
download | rabbitmq-server-a14752efdaac19e4f771673e06c26c7b2037db39.tar.gz |
Roughly working ack shedding
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 37 |
2 files changed, 27 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 83b360b8..9699ac32 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -969,6 +969,7 @@ handle_cast(update_ram_duration, State = #q{backing_queue = BQ, io:format("RamDuration~p~n", [RamDuration]), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + io:format("Desired duration:~p~n", [DesiredDuration]), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), noreply(State#q{rate_timer_ref = just_measured, backing_queue_state = BQS2}); @@ -1025,11 +1026,15 @@ handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. +handle_post_hibernate(_) -> + io:format("hello~n"). + handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS, stats_timer = StatsTimer}) -> + io:format("Hibernating~p~n", [self()]), BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ad62248a..491d4e38 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -722,7 +722,7 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCountPrev, ram_ack_index = RamAckIndex, ram_ack_count_prev = RamAckCountPrev }) -> - io:format("Setting ram duration~n"), + io:format("Ram duration~n"), Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), @@ -766,6 +766,7 @@ needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, fun (_Quota, State1) -> State1 end, fun (State1) -> State1 end, + fun (_Quota, State1) -> State1 end, State), Res; needs_idle_timeout(_State) -> @@ -779,6 +780,7 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, pending_ack = PA, + ram_ack_index = RAI, on_sync = #sync { funs = From }, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, @@ -787,7 +789,10 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, persistent_count = PersistentCount, rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate } }) -> + avg_ingress = AvgIngressRate }, + ack_rates = #rates { + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate } }) -> [ {q1 , queue:len(Q1)}, {q2 , bpqueue:len(Q2)}, {delta , Delta}, @@ -795,6 +800,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {q4 , queue:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, + {ram_ack_count , gb_trees:size(RAI)}, {outstanding_txns , length(From)}, {target_ram_msg_count , TargetRamMsgCount}, {ram_msg_count , RamMsgCount}, @@ -802,7 +808,9 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_egress_rate , AvgEgressRate}, - {avg_ingress_rate , AvgIngressRate} ]. + {avg_ingress_rate , AvgIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate}, + {avg_ack_ingress_rate , AvgAckIngressRate}]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -1342,9 +1350,8 @@ find_persistent_count(LensByStore) -> %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - %%io:format("Reducing mem~p~p~n", [State #vqstate.target_ram_msg_count, State #vqstate.ram_msg_count]), - {ReduceAck, State1} = {false, State}, %%reduce_ack_memory_use(State), +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> + {ReduceAck, State1} = reduce_ack_memory_use(AckFun, State), {Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count, State1 #vqstate.target_ram_msg_count) of 0 -> {ReduceAck, State1}; @@ -1360,9 +1367,9 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> end end. -reduce_ack_memory_use(State = #vqstate { target_ram_msg_count = infinity }) -> +reduce_ack_memory_use(_AckFun, State = #vqstate { target_ram_msg_count = infinity }) -> {false, State}; -reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount, +reduce_ack_memory_use(AckFun, State = #vqstate {target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, ram_ack_index = RamAckIndex} ) -> io:format("RAI:~p,TRMC:~p,RMC:~p~n", [gb_trees:size(RamAckIndex), TargetRamMsgCount, RamMsgCount]), @@ -1372,7 +1379,7 @@ reduce_ack_memory_use(State = #vqstate {target_ram_msg_count = TargetRamMsgCount end, case chunk_size(gb_trees:size(RamAckIndex), PermittedAckCount) of 0 -> {false, State}; - C -> {true, limit_ram_acks(C, State)} + C -> {true, AckFun(C, State)} end. @@ -1381,20 +1388,19 @@ limit_ram_acks(0, State) -> State; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - io:format("Limiting acks~p~n", [Quota]), case gb_trees:is_empty(RAI) of true -> State; false -> {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), - io:format("Largest~p~n", [SeqId]), - MsgStatus = dict:fetch(SeqId, PA), - State1 = maybe_write_to_disk(true, false, MsgStatus, State), - io:format("Wrote~n"), + MsgStatus = #msg_status { + guid = Guid, %% ASSERTION + msg_props = MsgProps } = dict:fetch(SeqId, PA), + {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), limit_ram_acks(Quota - 1, State1 #vqstate { pending_ack = - dict:update(SeqId, {false, Guid}, PA), + dict:store(SeqId, {false, Guid, MsgProps}, PA), ram_ack_index = RAI1 }) end. @@ -1403,6 +1409,7 @@ reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, fun push_betas_to_deltas/1, + fun limit_ram_acks/2, State), State1. |