summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-04 16:00:08 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-04 16:00:08 +0000
commit867c225ccf18ae85529f3125c82e651e26ab6632 (patch)
tree8e9612d606c759319ae45f066430afe9d9de2c61
parent39740b60706338a45fec7c856c40026d086e5d0f (diff)
downloadrabbitmq-server-867c225ccf18ae85529f3125c82e651e26ab6632.tar.gz
Added a test for the ram ack flushing. Tweaked the way RAM is shared between acks and msgs
-rw-r--r--src/rabbit_tests.erl33
-rw-r--r--src/rabbit_variable_queue.erl20
2 files changed, 47 insertions, 6 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 71b23e01..8efcf239 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1865,9 +1865,40 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
- fun test_dropwhile/1]],
+ fun test_dropwhile/1,
+ fun test_variable_queue_ack_limiting/1]],
passed.
+test_variable_queue_ack_limiting(VQ0) ->
+ %% start by sending in a bunch of messages <
+ Len = 1024,
+ VQ1 = variable_queue_publish(false, Len, VQ0),
+
+ %% squeeze and relax queue
+ Churn = Len div 32,
+ VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
+
+ %% update stats for duration
+ {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2),
+
+ %% fetch half the messages
+ {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
+
+ VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
+ {ram_ack_count , Len div 2},
+ {ram_msg_count , Len div 2}]),
+
+ %% quarter the allowed duration
+ VQ6 = check_variable_queue_status(
+ rabbit_variable_queue:set_ram_duration_target(Duration / 4, VQ5),
+ [{len, Len div 2},
+ {target_ram_msg_count, Len div 8},
+ {ram_msg_count, Len div 8},
+ {ram_ack_count, 0}]),
+
+ VQ6.
+
+
test_dropwhile(VQ0) ->
Count = 10,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 2ef4ee27..57d1344e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1307,7 +1307,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
orddict:new(), GuidsByStore)),
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
- ack_out_counter = AckOutCount - length(AckTags) }.
+ ack_out_counter = AckOutCount + length(AckTags) }.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1348,10 +1348,20 @@ find_persistent_count(LensByStore) ->
%% conversion is needed. That in turn could cause an infinite loop.
reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, State) ->
{ReduceAck, State1} = reduce_ack_memory_use(AckFun, State),
- {Reduce, State2} = case chunk_size(State1 #vqstate.ram_msg_count,
- State1 #vqstate.target_ram_msg_count) of
- 0 -> {ReduceAck, State1};
- S1 -> {true, AlphaBetaFun(S1, State1)}
+
+ {Reduce, State2} = case ReduceAck of
+ true ->
+ %% Don't want to reduce the number of
+ %% ram messages if we might yet be able
+ %% to reduce more acks.
+ {true, State1};
+ false ->
+ case chunk_size(
+ State1 #vqstate.ram_msg_count,
+ State1 #vqstate.target_ram_msg_count) of
+ 0 -> {false, State1};
+ S1 -> {true, AlphaBetaFun(S1, State1)}
+ end
end,
case State2 #vqstate.target_ram_msg_count of
infinity -> {Reduce, State2};