diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-02 16:34:35 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-02 16:34:35 +0000 |
commit | c8940f9ce61112880926d508a09e7595d68f8c37 (patch) | |
tree | 75ec59756f7db51a825999dde3c4751e129b62ef | |
parent | f525c97b059b62cf68361cd70e346339aa088582 (diff) | |
parent | 950866c469f6f46edba59e3b30cb47c1ba711e1b (diff) | |
download | rabbitmq-server-c8940f9ce61112880926d508a09e7595d68f8c37.tar.gz |
Starting tracking ack in/out rates
-rw-r--r-- | src/rabbit_variable_queue.erl | 144 |
1 files changed, 101 insertions, 43 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f88e49c2..97833991 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -220,6 +220,8 @@ q4, next_seq_id, pending_ack, + pending_ack_index, + ram_ack_index, index_state, msg_store_clients, on_sync, @@ -230,12 +232,16 @@ persistent_count, target_ram_msg_count, + target_ram_ack_count, ram_msg_count, ram_msg_count_prev, ram_index_count, out_counter, in_counter, - rates + ack_out_counter, + ack_in_counter, + rates, + ack_rates }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -306,6 +312,7 @@ q4 :: queue(), next_seq_id :: seq_id(), pending_ack :: dict:dictionary(), + ram_ack_index :: gb_tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -317,12 +324,16 @@ transient_threshold :: non_neg_integer(), target_ram_msg_count :: non_neg_integer() | 'infinity', + target_ram_ack_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), - rates :: rates() }). + ack_out_counter :: non_neg_integer(), + ack_in_counter :: non_neg_integer(), + rates :: rates(), + ack_rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). @@ -479,19 +490,17 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, - pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - PA1 = record_pending_ack(m(MsgStatus1), PA), + State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + {SeqId, a(State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + persistent_count = PCount1 })}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -561,8 +570,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + persistent_count = PCount }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -582,12 +590,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {blank_ack, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), @@ -595,12 +603,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })}. + a(State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 })}. ack(AckTags, State) -> a(ack(fun msg_store_remove/3, @@ -692,7 +699,7 @@ set_ram_duration_target(DurationTarget, (TargetRamMsgCount =/= infinity andalso TargetRamMsgCount1 >= TargetRamMsgCount) of true -> State1; - false -> reduce_memory_use(State1) + false -> io:format("Reducing~n"), reduce_memory_use(State1) end). ram_duration(State = #vqstate { @@ -962,6 +969,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, q4 = queue:new(), next_seq_id = NextSeqId, pending_ack = dict:new(), + ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, on_sync = ?BLANK_SYNC, @@ -977,10 +985,17 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_index_count = 0, out_counter = 0, in_counter = 0, + ack_out_counter = 0, + ack_in_counter = 0, rates = #rates { egress = {Now, 0}, ingress = {Now, DeltaCount1}, avg_egress = 0.0, avg_ingress = 0.0, + timestamp = Now }, + ack_rates = #rates { egress = {Now, 0}, + ingress = {Now, 0}, + avg_egress = 0.0, + avg_ingress = 0.0, timestamp = Now } }, a(maybe_deltas_to_betas(State)). @@ -1191,20 +1206,31 @@ record_pending_ack(#msg_status { seq_id = SeqId, guid = Guid, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk, - msg_props = MsgProps } = MsgStatus, PA) -> - AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid, MsgProps}; - false -> MsgStatus - end, - dict:store(SeqId, AckEntry, PA). + msg_props = MsgProps } = MsgStatus, + State = #vqstate { pending_ack = PA, + ram_ack_index = RAI, + ack_in_counter = AckInCount}) -> + {AckEntry, RAI1} = + case MsgOnDisk of + true -> + {{IsPersistent, Guid, MsgProps}, RAI}; + false -> + {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + end, + PA1 = dict:store(SeqId, AckEntry, PA), + State #vqstate { pending_ack = PA1, + ram_ack_index = RAI1, + ack_in_counter = AckInCount + 1}. +%% TODO: On remove, need to prevent any seqids that remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), - State1 = State #vqstate { pending_ack = dict:new() }, + State1 = State #vqstate { pending_ack = dict:new(), + ram_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(false, GuidsByStore) of error -> State1; @@ -1226,13 +1252,17 @@ ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount }} = + persistent_count = PCount, + ack_out_counter = AckOutCount }} = lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, + ram_ack_index = RAI}}) -> AckEntry = dict:fetch(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA) })} + pending_ack = dict:erase(SeqId, PA), + ram_ack_index = + gb_trees:delete_any(SeqId, RAI)})} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (IsPersistent, Guids, ok) -> @@ -1241,7 +1271,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + persistent_count = PCount1, + ack_out_counter = AckOutCount - length(AckTags) }. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1281,21 +1312,48 @@ find_persistent_count(LensByStore) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, + Size = gb_trees:size(State #vqstate.ram_ack_index), + State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of + 0 -> State; + S -> io:format("Limiting~n"), limit_ram_acks(S, State) + end, + {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count, State #vqstate.target_ram_msg_count) of - 0 -> {false, State}; - S1 -> {true, AlphaBetaFun(S1, State)} + 0 -> {false, State1}; + S1 -> {true, AlphaBetaFun(S1, State1)} end, - case State1 #vqstate.target_ram_msg_count of - infinity -> {Reduce, State1}; - 0 -> {Reduce, BetaDeltaFun(State1)}; - _ -> case chunk_size(State1 #vqstate.ram_index_count, - permitted_ram_index_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; - _ -> {Reduce, State1} + case State2 #vqstate.target_ram_msg_count of + infinity -> {Reduce, State2}; + 0 -> {Reduce, BetaDeltaFun(State2)}; + _ -> case chunk_size(State2 #vqstate.ram_index_count, + permitted_ram_index_count(State2)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State2)}; + _ -> {Reduce, State2} end end. +limit_ram_acks(0, State) -> + State; +limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, + ram_ack_index = RAI }) -> + io:format("Limiting acks~p~n", [Quota]), + case gb_trees:is_empty(RAI) of + true -> + State; + false -> + {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), + io:format("Largest~p~n", [SeqId]), + MsgStatus = dict:fetch(SeqId, PA), + State1 = maybe_write_to_disk(true, false, MsgStatus, State), + io:format("Wrote~n"), + limit_ram_acks(Quota - 1, + State1 #vqstate { + pending_ack = + dict:update(SeqId, {false, Guid}, PA), + ram_ack_index = RAI1 }) + end. + + reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, |