diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-04 16:00:08 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-04 16:00:08 +0000 |
commit | 867c225ccf18ae85529f3125c82e651e26ab6632 (patch) | |
tree | 8e9612d606c759319ae45f066430afe9d9de2c61 | |
parent | 39740b60706338a45fec7c856c40026d086e5d0f (diff) | |
download | rabbitmq-server-867c225ccf18ae85529f3125c82e651e26ab6632.tar.gz |
Added a test for the ram ack flushing. Tweaked the way RAM is shared between acks and msgs
-rw-r--r-- | src/rabbit_tests.erl | 33 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 20 |
2 files changed, 47 insertions, 6 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 71b23e01..8efcf239 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1865,9 +1865,40 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_dropwhile/1]], + fun test_dropwhile/1, + fun test_variable_queue_ack_limiting/1]], passed. +test_variable_queue_ack_limiting(VQ0) -> + %% start by sending in a bunch of messages < + Len = 1024, + VQ1 = variable_queue_publish(false, Len, VQ0), + + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + + %% update stats for duration + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + + %% fetch half the messages + {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), + + VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, + {ram_ack_count , Len div 2}, + {ram_msg_count , Len div 2}]), + + %% quarter the allowed duration + VQ6 = check_variable_queue_status( + rabbit_variable_queue:set_ram_duration_target(Duration / 4, VQ5), + [{len, Len div 2}, + {target_ram_msg_count, Len div 8}, + {ram_msg_count, Len div 8}, + {ram_ack_count, 0}]), + + VQ6. + + test_dropwhile(VQ0) -> Count = 10, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 2ef4ee27..57d1344e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1307,7 +1307,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> orddict:new(), GuidsByStore)), State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, - ack_out_counter = AckOutCount - length(AckTags) }. + ack_out_counter = AckOutCount + length(AckTags) }. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1348,10 +1348,20 @@ find_persistent_count(LensByStore) -> %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) -> {ReduceAck, State1} = reduce_ack_memory_use(AckFun, State), - {Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count, - State1 #vqstate.target_ram_msg_count) of - 0 -> {ReduceAck, State1}; - S1 -> {true, AlphaBetaFun(S1, State1)} + + {Reduce, State2} = case ReduceAck of + true -> + %% Don't want to reduce the number of + %% ram messages if we might yet be able + %% to reduce more acks. + {true, State1}; + false -> + case chunk_size( + State1 #vqstate.ram_msg_count, + State1 #vqstate.target_ram_msg_count) of + 0 -> {false, State1}; + S1 -> {true, AlphaBetaFun(S1, State1)} + end end, case State2 #vqstate.target_ram_msg_count of infinity -> {Reduce, State2}; |