diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-24 13:55:54 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-24 13:55:54 +0000 |
commit | 56d820ce1406723fbc2864da52e529af48686aa3 (patch) | |
tree | 2adfe602e9f38450fa29b157a62c92134dc223f0 | |
parent | db010117d6bf5d5e2118cdcb13cd9bf18b72131f (diff) | |
download | rabbitmq-server-56d820ce1406723fbc2864da52e529af48686aa3.tar.gz |
Update rates every 1000 messages or at the ram duration timeout, whichever comes first.
-rw-r--r-- | src/rabbit_variable_queue.erl | 95 |
1 files changed, 56 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index aa090ed4..ddd90eae 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -536,11 +536,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), a(reduce_memory_use( - inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }))). + maybe_update_rates( + inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 })))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -709,21 +710,26 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). -ram_duration(State = #vqstate { - rates = #rates { timestamp = Timestamp, - egress = Egress, - ingress = Ingress } = Rates, - ack_rates = #rates { timestamp = AckTimestamp, - egress = AckEgress, - ingress = AckIngress } = ARates, - in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount, - ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev, - ram_pending_ack = RPA, - ram_ack_count_prev = RamAckCountPrev }) -> +maybe_update_rates(State = #vqstate { in_counter = InCount, + out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount }) -> + case InCount + OutCount + AckInCount + AckOutCount > 1000 of + true -> update_rates(State); + false -> State + end. + +update_rates(State = #vqstate { + rates = #rates { timestamp = Timestamp, + egress = Egress, + ingress = Ingress } = Rates, + ack_rates = #rates { timestamp = AckTimestamp, + egress = AckEgress, + ingress = AckIngress } = ARates, + in_counter = InCount, + out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), @@ -733,6 +739,35 @@ ram_duration(State = #vqstate { {AvgAckIngressRate, AckIngress1} = update_rate(Now, AckTimestamp, AckInCount, AckIngress), + State #vqstate { + rates = Rates #rates { + egress = Egress1, + ingress = Ingress1, + avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate, + timestamp = Now }, + ack_rates = ARates #rates { + egress = AckEgress1, + ingress = AckIngress1, + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate, + timestamp = Now }, + in_counter = 0, + out_counter = 0, + ack_in_counter = 0, + ack_out_counter = 0 }. + +ram_duration(State0) -> + State = #vqstate { + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + ack_rates = #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, + ram_msg_count = RamMsgCount, + ram_msg_count_prev = RamMsgCountPrev, + ram_pending_ack = RPA, + ram_ack_count_prev = RamAckCountPrev } = update_rates(State0), + RamAckCount = gb_trees:size(RPA), Duration = %% msgs+acks / (msgs+acks/sec) == sec @@ -745,25 +780,7 @@ ram_duration(State = #vqstate { AvgAckEgressRate + AvgAckIngressRate)) end, - {Duration, State #vqstate { - rates = Rates #rates { - egress = Egress1, - ingress = Ingress1, - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate, - timestamp = Now }, - ack_rates = ARates #rates { - egress = AckEgress1, - ingress = AckIngress1, - avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate, - timestamp = Now }, - in_counter = 0, - out_counter = 0, - ack_in_counter = 0, - ack_out_counter = 0, - ram_msg_count_prev = RamMsgCount, - ram_ack_count_prev = RamAckCount }}. + {Duration, State}. needs_timeout(State = #vqstate { index_state = IndexState, target_ram_count = TargetRamCount }) -> |