diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-02 16:57:38 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-02 16:57:38 +0000 |
commit | 423a27191f248d75362ba6675c401d6968cf8497 (patch) | |
tree | e2d842b2fff21322c1741ec14d6c76821086dd34 | |
parent | c8940f9ce61112880926d508a09e7595d68f8c37 (diff) | |
download | rabbitmq-server-423a27191f248d75362ba6675c401d6968cf8497.tar.gz |
Calculating RAM duration takes into consideration ack rates
-rw-r--r-- | src/rabbit_variable_queue.erl | 35 |
1 files changed, 29 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 97833991..d2f79eb6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -235,6 +235,7 @@ target_ram_ack_count, ram_msg_count, ram_msg_count_prev, + ram_ack_count_prev, ram_index_count, out_counter, in_counter, @@ -706,19 +707,34 @@ ram_duration(State = #vqstate { rates = #rates { egress = Egress, ingress = Ingress, timestamp = Timestamp } = Rates, + ack_rates = #rates { egress = AckEgress, + ingress = AckIngress } = ARates, in_counter = InCount, out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev }) -> + ram_msg_count_prev = RamMsgCountPrev, + ram_ack_index = RamAckIndex, + ram_ack_count_prev = RamAckCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - Duration = %% msgs / (msgs/sec) == sec + {AvgAckEgressRate, AckEgress1} = + update_rate(Now, Timestamp, AckOutCount, AckEgress), + {AvgAckIngressRate, AckIngress1} = + update_rate(Now, Timestamp, AckInCount, AckIngress), + + RamAckCount = gb_trees:size(RamAckIndex), + + Duration = %% msgs+acks / (msgs+acks/sec) == sec case AvgEgressRate == 0 andalso AvgIngressRate == 0 of true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount) / - (2 * (AvgEgressRate + AvgIngressRate)) + false -> (RamMsgCountPrev + RamMsgCount + + RamAckCount + RamAckCountPrev) / + (2 * (AvgEgressRate + AvgIngressRate + + AvgAckEgressRate + AvgAckIngressRate)) end, {Duration, State #vqstate { @@ -728,9 +744,15 @@ ram_duration(State = #vqstate { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate, timestamp = Now }, + ack_rates = ARates #rates { + egress = AckEgress1, + ingress = AckIngress1, + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, in_counter = 0, out_counter = 0, - ram_msg_count_prev = RamMsgCount }}. + ram_msg_count_prev = RamMsgCount, + ram_ack_count_prev = RamAckCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, @@ -982,6 +1004,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, target_ram_msg_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, + ram_ack_count_prev = 0, ram_index_count = 0, out_counter = 0, in_counter = 0, @@ -996,7 +1019,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, ingress = {Now, 0}, avg_egress = 0.0, avg_ingress = 0.0, - timestamp = Now } }, + timestamp = undefined } }, a(maybe_deltas_to_betas(State)). msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> |