diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-05 15:18:26 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-05 15:18:26 +0000 |
commit | 8fbc7073556c0718562a70245c65644cea01e3f9 (patch) | |
tree | 22641892d7914272e94c45c8d90ec83ba49c2ba8 | |
parent | 7ed10703eeea9379736e32cec751f23c3393aa4c (diff) | |
parent | 3ed9259814c38bc0b65317ab306740dacdea7c21 (diff) | |
download | rabbitmq-server-8fbc7073556c0718562a70245c65644cea01e3f9.tar.gz |
Merge with default
-rw-r--r-- | src/rabbit_tests.erl | 32 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 273 |
2 files changed, 244 insertions, 61 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 71b23e01..4f543704 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1865,9 +1865,39 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_dropwhile/1]], + fun test_dropwhile/1, + fun test_variable_queue_ack_limiting/1]], passed. +test_variable_queue_ack_limiting(VQ0) -> + %% start by sending in a bunch of messages < + Len = 1024, + VQ1 = variable_queue_publish(false, Len, VQ0), + + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + + %% update stats for duration + {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + + %% fetch half the messages + {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), + + VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, + {ram_ack_count , Len div 2}, + {ram_msg_count , Len div 2}]), + + %% quarter the allowed duration + VQ6 = check_variable_queue_status( + rabbit_variable_queue:set_ram_duration_target(0, VQ5), + [{len, Len div 2}, + {target_ram_msg_count, 0}, + {ram_msg_count, 0}, + {ram_ack_count, 0}]), + + VQ6. + test_dropwhile(VQ0) -> Count = 10, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f88e49c2..9c25540e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -89,12 +89,14 @@ %% %% The duration indicated to us by the memory_monitor is used to %% calculate, given our current ingress and egress rates, how many -%% messages we should hold in RAM. When we need to push alphas to -%% betas or betas to gammas, we favour writing out messages that are -%% further from the head of the queue. This minimises writes to disk, -%% as the messages closer to the tail of the queue stay in the queue -%% for longer, thus do not need to be replaced as quickly by sending -%% other messages to disk. +%% messages we should hold in RAM. We track the ingress and egress +%% rates for both messages and pending acks and rates for both are +%% considered when calculating the number of messages to hold in +%% RAM. When we need to push alphas to betas or betas to gammas, we +%% favour writing out messages that are further from the head of the +%% queue. This minimises writes to disk, as the messages closer to the +%% tail of the queue stay in the queue for longer, thus do not need to +%% be replaced as quickly by sending other messages to disk. %% %% Whilst messages are pushed to disk and forgotten from RAM as soon %% as requested by a new setting of the queue RAM duration, the @@ -168,6 +170,30 @@ %% the latter) are both cheap and do require any scanning through qi %% segments. %% +%% Pending acks are recorded in memory either as the tuple {SeqId, +%% Guid, MsgProps} (tuple form) or as the message itself (message +%% form). Acks for persistent messages are always stored in the tuple +%% form. Acks for transient messages are also stored in tuple form if +%% the message has been forgotten to disk as part of the memory +%% reduction process. For transient messages that haven't already been +%% written to disk, acks are stored in message form to avoid the +%% overhead of writing to disk. +%% +%% During memory reduction, acks stored in message form are converted +%% to tuple form, and the corresponding messages are pushed out to +%% disk. +%% +%% The order in which alphas are pushed to betas and message form acks +%% are pushed to disk is determined dynamically. We always prefer to +%% push messages for the source (alphas or acks) that is growing the +%% fastest (with growth measured as avg. ingress - avg. egress). In +%% each round of memory reduction a chunk of messages at most +%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The +%% fastest growing source will be reduced by as much of this chunk as +%% possible. If there is any remaining allocation in the chunk after +%% the first source has been reduced to zero, the second source will +%% be reduced by as much of the remaining chunk as possible. +%% %% Notes on Clean Shutdown %% (This documents behaviour in variable_queue, queue_index and %% msg_store.) @@ -220,6 +246,8 @@ q4, next_seq_id, pending_ack, + pending_ack_index, + ram_ack_index, index_state, msg_store_clients, on_sync, @@ -230,12 +258,17 @@ persistent_count, target_ram_msg_count, + target_ram_ack_count, ram_msg_count, ram_msg_count_prev, + ram_ack_count_prev, ram_index_count, out_counter, in_counter, - rates + ack_out_counter, + ack_in_counter, + rates, + ack_rates }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -306,6 +339,7 @@ q4 :: queue(), next_seq_id :: seq_id(), pending_ack :: dict:dictionary(), + ram_ack_index :: gb_tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -317,12 +351,16 @@ transient_threshold :: non_neg_integer(), target_ram_msg_count :: non_neg_integer() | 'infinity', + target_ram_ack_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), - rates :: rates() }). + ack_out_counter :: non_neg_integer(), + ack_in_counter :: non_neg_integer(), + rates :: rates(), + ack_rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). @@ -479,19 +517,17 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, - pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - PA1 = record_pending_ack(m(MsgStatus1), PA), + State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + {SeqId, a(State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + persistent_count = PCount1 })}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -561,8 +597,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + persistent_count = PCount }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -582,12 +617,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {blank_ack, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), @@ -595,12 +630,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })}. + a(State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 })}. ack(AckTags, State) -> a(ack(fun msg_store_remove/3, @@ -680,8 +714,12 @@ set_ram_duration_target(DurationTarget, State = #vqstate { rates = #rates { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate }, + ack_rates = + #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, target_ram_msg_count = TargetRamMsgCount }) -> - Rate = AvgEgressRate + AvgIngressRate, + Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + + AvgAckIngressRate, TargetRamMsgCount1 = case DurationTarget of infinity -> infinity; @@ -699,19 +737,35 @@ ram_duration(State = #vqstate { rates = #rates { egress = Egress, ingress = Ingress, timestamp = Timestamp } = Rates, + ack_rates = #rates { egress = AckEgress, + ingress = AckIngress } = ARates, in_counter = InCount, out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev }) -> + ram_msg_count_prev = RamMsgCountPrev, + ram_ack_index = RamAckIndex, + ram_ack_count_prev = RamAckCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - Duration = %% msgs / (msgs/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 of + {AvgAckEgressRate, AckEgress1} = + update_rate(Now, Timestamp, AckOutCount, AckEgress), + {AvgAckIngressRate, AckIngress1} = + update_rate(Now, Timestamp, AckInCount, AckIngress), + + RamAckCount = gb_trees:size(RamAckIndex), + + Duration = %% msgs+acks / (msgs+acks/sec) == sec + case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount) / - (2 * (AvgEgressRate + AvgIngressRate)) + false -> (RamMsgCountPrev + RamMsgCount + + RamAckCount + RamAckCountPrev) / + (2 * (AvgEgressRate + AvgIngressRate + + AvgAckEgressRate + AvgAckIngressRate)) end, {Duration, State #vqstate { @@ -721,14 +775,21 @@ ram_duration(State = #vqstate { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate, timestamp = Now }, + ack_rates = ARates #rates { + egress = AckEgress1, + ingress = AckIngress1, + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, in_counter = 0, out_counter = 0, - ram_msg_count_prev = RamMsgCount }}. + ram_msg_count_prev = RamMsgCount, + ram_ack_count_prev = RamAckCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, State), Res; needs_idle_timeout(_State) -> @@ -742,6 +803,7 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, pending_ack = PA, + ram_ack_index = RAI, on_sync = #sync { funs = From }, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, @@ -750,7 +812,10 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, persistent_count = PersistentCount, rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate } }) -> + avg_ingress = AvgIngressRate }, + ack_rates = #rates { + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate } }) -> [ {q1 , queue:len(Q1)}, {q2 , bpqueue:len(Q2)}, {delta , Delta}, @@ -758,6 +823,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {q4 , queue:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, + {ram_ack_count , gb_trees:size(RAI)}, {outstanding_txns , length(From)}, {target_ram_msg_count , TargetRamMsgCount}, {ram_msg_count , RamMsgCount}, @@ -765,7 +831,9 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_egress_rate , AvgEgressRate}, - {avg_ingress_rate , AvgIngressRate} ]. + {avg_ingress_rate , AvgIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate}, + {avg_ack_ingress_rate , AvgAckIngressRate}]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -962,6 +1030,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, q4 = queue:new(), next_seq_id = NextSeqId, pending_ack = dict:new(), + ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, on_sync = ?BLANK_SYNC, @@ -974,14 +1043,22 @@ init(IsDurable, IndexState, DeltaCount, Terms, target_ram_msg_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, + ram_ack_count_prev = 0, ram_index_count = 0, out_counter = 0, in_counter = 0, + ack_out_counter = 0, + ack_in_counter = 0, rates = #rates { egress = {Now, 0}, ingress = {Now, DeltaCount1}, avg_egress = 0.0, avg_ingress = 0.0, - timestamp = Now } }, + timestamp = Now }, + ack_rates = #rates { egress = {Now, 0}, + ingress = {Now, 0}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = undefined } }, a(maybe_deltas_to_betas(State)). msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> @@ -1191,12 +1268,21 @@ record_pending_ack(#msg_status { seq_id = SeqId, guid = Guid, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk, - msg_props = MsgProps } = MsgStatus, PA) -> - AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid, MsgProps}; - false -> MsgStatus - end, - dict:store(SeqId, AckEntry, PA). + msg_props = MsgProps } = MsgStatus, + State = #vqstate { pending_ack = PA, + ram_ack_index = RAI, + ack_in_counter = AckInCount}) -> + {AckEntry, RAI1} = + case MsgOnDisk of + true -> + {{IsPersistent, Guid, MsgProps}, RAI}; + false -> + {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + end, + PA1 = dict:store(SeqId, AckEntry, PA), + State #vqstate { pending_ack = PA1, + ram_ack_index = RAI1, + ack_in_counter = AckInCount + 1}. remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, @@ -1204,7 +1290,8 @@ remove_pending_ack(KeepPersistent, msg_store_clients = MSCState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), - State1 = State #vqstate { pending_ack = dict:new() }, + State1 = State #vqstate { pending_ack = dict:new(), + ram_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(false, GuidsByStore) of error -> State1; @@ -1226,13 +1313,17 @@ ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount }} = + persistent_count = PCount, + ack_out_counter = AckOutCount }} = lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, + ram_ack_index = RAI}}) -> AckEntry = dict:fetch(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA) })} + pending_ack = dict:erase(SeqId, PA), + ram_ack_index = + gb_trees:delete_any(SeqId, RAI)})} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (IsPersistent, Guids, ok) -> @@ -1241,7 +1332,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1280,14 +1372,52 @@ find_persistent_count(LensByStore) -> %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, - State #vqstate.target_ram_msg_count) of - 0 -> {false, State}; - S1 -> {true, AlphaBetaFun(S1, State)} - end, +reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, + State = #vqstate {target_ram_msg_count = infinity}) -> + {false, State}; +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, + State = #vqstate { + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount, + rates = #rates { + avg_ingress = AvgIngress, + avg_egress = AvgEgress }, + ack_rates = #rates { + avg_ingress = AvgAckIngress, + avg_egress = AvgAckEgress } }) -> + + {Reduce, State1} = + case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + TargetRamMsgCount) of + 0 -> + {false, State}; + S1 -> + ReduceFuns = + case (AvgAckIngress - AvgAckEgress) > + (AvgIngress - AvgEgress) of + true -> + %% ACKs are growing faster than the queue, + %% push messages from there first. + [AckFun, AlphaBetaFun]; + false -> + %% The queue is growing faster than the + %% acks, push queue messages first. + [AlphaBetaFun, AckFun] + end, + {_, StateOut} = + %% Both reduce functions get a chance to reduce + %% memory. The second may very well get a quota of + %% 0 if the first function managed to push out the + %% maximum number of messages. + lists:foldl( + fun(ReduceFun, {QuotaN, StateN}) -> + ReduceFun(QuotaN, StateN) + end, {S1, State}, ReduceFuns), + {true, StateOut} + end, + case State1 #vqstate.target_ram_msg_count of - infinity -> {Reduce, State1}; 0 -> {Reduce, BetaDeltaFun(State1)}; _ -> case chunk_size(State1 #vqstate.ram_index_count, permitted_ram_index_count(State1)) of @@ -1296,10 +1426,33 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> end end. + +limit_ram_acks(0, State) -> + {0, State}; +limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, + ram_ack_index = RAI }) -> + case gb_trees:is_empty(RAI) of + true -> + {Quota, State}; + false -> + {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), + MsgStatus = #msg_status { + guid = Guid, %% ASSERTION + msg_props = MsgProps } = dict:fetch(SeqId, PA), + {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + limit_ram_acks(Quota - 1, + State1 #vqstate { + pending_ack = + dict:store(SeqId, {false, Guid, MsgProps}, PA), + ram_ack_index = RAI1 }) + end. + + reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, fun push_betas_to_deltas/1, + fun limit_ram_acks/2, State), State1. @@ -1344,7 +1497,7 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). chunk_size(Current, Permitted) - when Permitted =:= infinity orelse Permitted >= Current -> + when Permitted >= Current -> 0; chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). @@ -1432,9 +1585,9 @@ maybe_deltas_to_betas(State = #vqstate { end. push_alphas_to_betas(Quota, State) -> - { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), - {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), - State2. + {Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + {Quota2, State2}. maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( |