diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-27 13:16:33 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-27 13:16:33 +0000 |
commit | 0377b484a839c94076df6ba04e1005253c1b8851 (patch) | |
tree | 8279b0ce7a283531e1cc82d0d9bbf27be5dde9f2 | |
parent | 56d820ce1406723fbc2864da52e529af48686aa3 (diff) | |
download | rabbitmq-server-0377b484a839c94076df6ba04e1005253c1b8851.tar.gz |
Unify the #rates record so we don't duplicate as much stuff.
-rw-r--r-- | src/rabbit_variable_queue.erl | 159 |
1 files changed, 80 insertions, 79 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ddd90eae..2586b70c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -277,11 +277,12 @@ unconfirmed, confirmed, ack_out_counter, - ack_in_counter, - ack_rates + ack_in_counter }). --record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). +-record(rates, { in_count, out_count, ack_in_count, ack_out_count, + in_avg, out_avg, ack_in_avg, ack_out_avg, + timestamp, old_timestamp}). -record(msg_status, { seq_id, @@ -322,11 +323,16 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, - ingress :: {timestamp(), non_neg_integer()}, - avg_egress :: float(), - avg_ingress :: float(), - timestamp :: timestamp() }). +-type(rates() :: #rates { in_count :: non_neg_integer(), + out_count :: non_neg_integer(), + ack_in_count :: non_neg_integer(), + ack_out_count :: non_neg_integer(), + in_avg :: float(), + out_avg :: float(), + ack_in_avg :: float(), + ack_out_avg :: float(), + timestamp :: timestamp(), + old_timestamp :: timestamp()}). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer(), @@ -368,8 +374,7 @@ unconfirmed :: gb_set(), confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer(), - ack_rates :: rates() }). + ack_in_counter :: non_neg_integer() }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -690,11 +695,11 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> set_ram_duration_target( DurationTarget, State = #vqstate { - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate }, - target_ram_count = TargetRamCount }) -> + rates = #rates { out_avg = AvgEgressRate, + in_avg = AvgIngressRate, + ack_out_avg = AvgAckEgressRate, + ack_in_avg = AvgAckIngressRate }, + target_ram_count = TargetRamCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, TargetRamCount1 = @@ -719,50 +724,45 @@ maybe_update_rates(State = #vqstate { in_counter = InCount, false -> State end. -update_rates(State = #vqstate { - rates = #rates { timestamp = Timestamp, - egress = Egress, - ingress = Ingress } = Rates, - ack_rates = #rates { timestamp = AckTimestamp, - egress = AckEgress, - ingress = AckIngress } = ARates, - in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount }) -> - Now = now(), - {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), - {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - - {AvgAckEgressRate, AckEgress1} = - update_rate(Now, AckTimestamp, AckOutCount, AckEgress), - {AvgAckIngressRate, AckIngress1} = - update_rate(Now, AckTimestamp, AckInCount, AckIngress), - - State #vqstate { - rates = Rates #rates { - egress = Egress1, - ingress = Ingress1, - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate, - timestamp = Now }, - ack_rates = ARates #rates { - egress = AckEgress1, - ingress = AckIngress1, - avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate, - timestamp = Now }, - in_counter = 0, - out_counter = 0, - ack_in_counter = 0, - ack_out_counter = 0 }. +update_rates(State = #vqstate{in_counter = In, + out_counter = Out, + ack_in_counter = AckIn, + ack_out_counter = AckOut, + rates = + #rates{in_count = In0, + out_count = Out0, + ack_in_count = AckIn0, + ack_out_count = AckOut0, + timestamp = TS, + old_timestamp = OldTS}}) -> + Now = erlang:now(), + + Rates = #rates { out_count = Out, + in_count = In, + ack_out_count = AckOut, + ack_in_count = AckIn, + out_avg = update_rate(Now, OldTS, Out0, Out), + in_avg = update_rate(Now, OldTS, In0, In), + ack_out_avg = update_rate(Now, OldTS, AckOut0, AckOut), + ack_in_avg = update_rate(Now, OldTS, AckIn0, AckIn), + timestamp = Now, + old_timestamp = TS }, + State#vqstate{ in_counter = 0, + out_counter = 0, + ack_in_counter = 0, + ack_out_counter = 0, + rates = Rates }. + +update_rate(Now, OldTS, Count, Count0) -> + %% avg over the current period and the previous + 1000000.0 * (Count + Count0) / timer:now_diff(Now, OldTS). ram_duration(State0) -> State = #vqstate { - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate }, + rates = #rates { out_avg = AvgEgressRate, + in_avg = AvgIngressRate, + ack_out_avg = AvgAckEgressRate, + ack_in_avg = AvgAckIngressRate }, ram_msg_count = RamMsgCount, ram_msg_count_prev = RamMsgCountPrev, ram_pending_ack = RPA, @@ -806,8 +806,8 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -msg_rates(#vqstate { rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate } }) -> +msg_rates(#vqstate { rates = #rates { out_avg = AvgEgressRate, + in_avg = AvgIngressRate } }) -> {AvgIngressRate, AvgEgressRate}. status(#vqstate { @@ -819,10 +819,11 @@ status(#vqstate { ram_msg_count = RamMsgCount, next_seq_id = NextSeqId, persistent_count = PersistentCount, - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate } }) -> + rates = #rates { in_avg = AvgIngressRate, + out_avg = AvgEgressRate, + ack_in_avg = AvgAckIngressRate, + ack_out_avg = AvgAckEgressRate }}) -> + [ {q1 , ?QUEUE:len(Q1)}, {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, @@ -1012,10 +1013,6 @@ expand_delta(SeqId, #delta { count = Count, expand_delta(_SeqId, #delta { count = Count } = Delta) -> d(Delta #delta { count = Count + 1 }). -update_rate(Now, Then, Count, {OThen, OCount}) -> - %% avg over the current period and the previous - {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}. - %%---------------------------------------------------------------------------- %% Internal major helpers for Public API %%---------------------------------------------------------------------------- @@ -1055,22 +1052,26 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_ack_count_prev = 0, out_counter = 0, in_counter = 0, - rates = blank_rate(Now, DeltaCount1), + rates = blank_rates(Now, DeltaCount1), msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), confirmed = gb_sets:new(), ack_out_counter = 0, - ack_in_counter = 0, - ack_rates = blank_rate(Now, 0) }, + ack_in_counter = 0 }, a(maybe_deltas_to_betas(State)). -blank_rate(Timestamp, IngressLength) -> - #rates { egress = {Timestamp, 0}, - ingress = {Timestamp, IngressLength}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Timestamp }. +blank_rates(Now, IngressLength) -> + #rates { in_count = IngressLength, + out_count = 0, + ack_in_count = 0, + ack_out_count = 0, + in_avg = 0, + out_avg = 0, + ack_in_avg = 0, + ack_out_avg = 0, + timestamp = Now, + old_timestamp = Now}. in_r(MsgStatus = #msg_status { msg = undefined }, State = #vqstate { q3 = Q3, q4 = Q4 }) -> @@ -1544,10 +1545,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, - rates = #rates { avg_ingress = AvgIngress, - avg_egress = AvgEgress }, - ack_rates = #rates { avg_ingress = AvgAckIngress, - avg_egress = AvgAckEgress } + rates = #rates { in_avg = AvgIngress, + out_avg = AvgEgress, + ack_in_avg = AvgAckIngress, + ack_out_avg = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = |