summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-24 13:55:54 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-24 13:55:54 +0000
commit56d820ce1406723fbc2864da52e529af48686aa3 (patch)
tree2adfe602e9f38450fa29b157a62c92134dc223f0
parentdb010117d6bf5d5e2118cdcb13cd9bf18b72131f (diff)
downloadrabbitmq-server-56d820ce1406723fbc2864da52e529af48686aa3.tar.gz
Update rates every 1000 messages or at the ram duration timeout, whichever comes first.
-rw-r--r--src/rabbit_variable_queue.erl95
1 files changed, 56 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index aa090ed4..ddd90eae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -536,11 +536,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
a(reduce_memory_use(
- inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }))).
+ maybe_update_rates(
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 })))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -709,21 +710,26 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
-ram_duration(State = #vqstate {
- rates = #rates { timestamp = Timestamp,
- egress = Egress,
- ingress = Ingress } = Rates,
- ack_rates = #rates { timestamp = AckTimestamp,
- egress = AckEgress,
- ingress = AckIngress } = ARates,
- in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
- ack_out_counter = AckOutCount,
- ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev,
- ram_pending_ack = RPA,
- ram_ack_count_prev = RamAckCountPrev }) ->
+maybe_update_rates(State = #vqstate { in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount }) ->
+ case InCount + OutCount + AckInCount + AckOutCount > 1000 of
+ true -> update_rates(State);
+ false -> State
+ end.
+
+update_rates(State = #vqstate {
+ rates = #rates { timestamp = Timestamp,
+ egress = Egress,
+ ingress = Ingress } = Rates,
+ ack_rates = #rates { timestamp = AckTimestamp,
+ egress = AckEgress,
+ ingress = AckIngress } = ARates,
+ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
@@ -733,6 +739,35 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
+ State #vqstate {
+ rates = Rates #rates {
+ egress = Egress1,
+ ingress = Ingress1,
+ avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate,
+ timestamp = Now },
+ ack_rates = ARates #rates {
+ egress = AckEgress1,
+ ingress = AckIngress1,
+ avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate,
+ timestamp = Now },
+ in_counter = 0,
+ out_counter = 0,
+ ack_in_counter = 0,
+ ack_out_counter = 0 }.
+
+ram_duration(State0) ->
+ State = #vqstate {
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
+ ram_msg_count = RamMsgCount,
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_pending_ack = RPA,
+ ram_ack_count_prev = RamAckCountPrev } = update_rates(State0),
+
RamAckCount = gb_trees:size(RPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
@@ -745,25 +780,7 @@ ram_duration(State = #vqstate {
AvgAckEgressRate + AvgAckIngressRate))
end,
- {Duration, State #vqstate {
- rates = Rates #rates {
- egress = Egress1,
- ingress = Ingress1,
- avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate,
- timestamp = Now },
- ack_rates = ARates #rates {
- egress = AckEgress1,
- ingress = AckIngress1,
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate,
- timestamp = Now },
- in_counter = 0,
- out_counter = 0,
- ack_in_counter = 0,
- ack_out_counter = 0,
- ram_msg_count_prev = RamMsgCount,
- ram_ack_count_prev = RamAckCount }}.
+ {Duration, State}.
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->