summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-02 16:57:38 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-02 16:57:38 +0000
commit423a27191f248d75362ba6675c401d6968cf8497 (patch)
treee2d842b2fff21322c1741ec14d6c76821086dd34
parentc8940f9ce61112880926d508a09e7595d68f8c37 (diff)
downloadrabbitmq-server-423a27191f248d75362ba6675c401d6968cf8497.tar.gz
Calculating RAM duration takes into consideration ack rates
-rw-r--r--src/rabbit_variable_queue.erl35
1 files changed, 29 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 97833991..d2f79eb6 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -235,6 +235,7 @@
target_ram_ack_count,
ram_msg_count,
ram_msg_count_prev,
+ ram_ack_count_prev,
ram_index_count,
out_counter,
in_counter,
@@ -706,19 +707,34 @@ ram_duration(State = #vqstate {
rates = #rates { egress = Egress,
ingress = Ingress,
timestamp = Timestamp } = Rates,
+ ack_rates = #rates { egress = AckEgress,
+ ingress = AckIngress } = ARates,
in_counter = InCount,
out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev }) ->
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_ack_index = RamAckIndex,
+ ram_ack_count_prev = RamAckCountPrev }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
- Duration = %% msgs / (msgs/sec) == sec
+ {AvgAckEgressRate, AckEgress1} =
+ update_rate(Now, Timestamp, AckOutCount, AckEgress),
+ {AvgAckIngressRate, AckIngress1} =
+ update_rate(Now, Timestamp, AckInCount, AckIngress),
+
+ RamAckCount = gb_trees:size(RamAckIndex),
+
+ Duration = %% msgs+acks / (msgs+acks/sec) == sec
case AvgEgressRate == 0 andalso AvgIngressRate == 0 of
true -> infinity;
- false -> (RamMsgCountPrev + RamMsgCount) /
- (2 * (AvgEgressRate + AvgIngressRate))
+ false -> (RamMsgCountPrev + RamMsgCount +
+ RamAckCount + RamAckCountPrev) /
+ (2 * (AvgEgressRate + AvgIngressRate +
+ AvgAckEgressRate + AvgAckIngressRate))
end,
{Duration, State #vqstate {
@@ -728,9 +744,15 @@ ram_duration(State = #vqstate {
avg_egress = AvgEgressRate,
avg_ingress = AvgIngressRate,
timestamp = Now },
+ ack_rates = ARates #rates {
+ egress = AckEgress1,
+ ingress = AckIngress1,
+ avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
in_counter = 0,
out_counter = 0,
- ram_msg_count_prev = RamMsgCount }}.
+ ram_msg_count_prev = RamMsgCount,
+ ram_ack_count_prev = RamAckCount }}.
needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
{Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
@@ -982,6 +1004,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
target_ram_msg_count = infinity,
ram_msg_count = 0,
ram_msg_count_prev = 0,
+ ram_ack_count_prev = 0,
ram_index_count = 0,
out_counter = 0,
in_counter = 0,
@@ -996,7 +1019,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ingress = {Now, 0},
avg_egress = 0.0,
avg_ingress = 0.0,
- timestamp = Now } },
+ timestamp = undefined } },
a(maybe_deltas_to_betas(State)).
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->