summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-27 13:16:33 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-27 13:16:33 +0000
commit0377b484a839c94076df6ba04e1005253c1b8851 (patch)
tree8279b0ce7a283531e1cc82d0d9bbf27be5dde9f2
parent56d820ce1406723fbc2864da52e529af48686aa3 (diff)
downloadrabbitmq-server-0377b484a839c94076df6ba04e1005253c1b8851.tar.gz
Unify the #rates record so we don't duplicate as much stuff.
-rw-r--r--src/rabbit_variable_queue.erl159
1 files changed, 80 insertions, 79 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ddd90eae..2586b70c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -277,11 +277,12 @@
unconfirmed,
confirmed,
ack_out_counter,
- ack_in_counter,
- ack_rates
+ ack_in_counter
}).
--record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
+-record(rates, { in_count, out_count, ack_in_count, ack_out_count,
+ in_avg, out_avg, ack_in_avg, ack_out_avg,
+ timestamp, old_timestamp}).
-record(msg_status,
{ seq_id,
@@ -322,11 +323,16 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
- ingress :: {timestamp(), non_neg_integer()},
- avg_egress :: float(),
- avg_ingress :: float(),
- timestamp :: timestamp() }).
+-type(rates() :: #rates { in_count :: non_neg_integer(),
+ out_count :: non_neg_integer(),
+ ack_in_count :: non_neg_integer(),
+ ack_out_count :: non_neg_integer(),
+ in_avg :: float(),
+ out_avg :: float(),
+ ack_in_avg :: float(),
+ ack_out_avg :: float(),
+ timestamp :: timestamp(),
+ old_timestamp :: timestamp()}).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer(),
@@ -368,8 +374,7 @@
unconfirmed :: gb_set(),
confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
- ack_in_counter :: non_neg_integer(),
- ack_rates :: rates() }).
+ ack_in_counter :: non_neg_integer() }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -690,11 +695,11 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
set_ram_duration_target(
DurationTarget, State = #vqstate {
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
- target_ram_count = TargetRamCount }) ->
+ rates = #rates { out_avg = AvgEgressRate,
+ in_avg = AvgIngressRate,
+ ack_out_avg = AvgAckEgressRate,
+ ack_in_avg = AvgAckIngressRate },
+ target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
TargetRamCount1 =
@@ -719,50 +724,45 @@ maybe_update_rates(State = #vqstate { in_counter = InCount,
false -> State
end.
-update_rates(State = #vqstate {
- rates = #rates { timestamp = Timestamp,
- egress = Egress,
- ingress = Ingress } = Rates,
- ack_rates = #rates { timestamp = AckTimestamp,
- egress = AckEgress,
- ingress = AckIngress } = ARates,
- in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
- ack_out_counter = AckOutCount }) ->
- Now = now(),
- {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
- {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
-
- {AvgAckEgressRate, AckEgress1} =
- update_rate(Now, AckTimestamp, AckOutCount, AckEgress),
- {AvgAckIngressRate, AckIngress1} =
- update_rate(Now, AckTimestamp, AckInCount, AckIngress),
-
- State #vqstate {
- rates = Rates #rates {
- egress = Egress1,
- ingress = Ingress1,
- avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate,
- timestamp = Now },
- ack_rates = ARates #rates {
- egress = AckEgress1,
- ingress = AckIngress1,
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate,
- timestamp = Now },
- in_counter = 0,
- out_counter = 0,
- ack_in_counter = 0,
- ack_out_counter = 0 }.
+update_rates(State = #vqstate{in_counter = In,
+ out_counter = Out,
+ ack_in_counter = AckIn,
+ ack_out_counter = AckOut,
+ rates =
+ #rates{in_count = In0,
+ out_count = Out0,
+ ack_in_count = AckIn0,
+ ack_out_count = AckOut0,
+ timestamp = TS,
+ old_timestamp = OldTS}}) ->
+ Now = erlang:now(),
+
+ Rates = #rates { out_count = Out,
+ in_count = In,
+ ack_out_count = AckOut,
+ ack_in_count = AckIn,
+ out_avg = update_rate(Now, OldTS, Out0, Out),
+ in_avg = update_rate(Now, OldTS, In0, In),
+ ack_out_avg = update_rate(Now, OldTS, AckOut0, AckOut),
+ ack_in_avg = update_rate(Now, OldTS, AckIn0, AckIn),
+ timestamp = Now,
+ old_timestamp = TS },
+ State#vqstate{ in_counter = 0,
+ out_counter = 0,
+ ack_in_counter = 0,
+ ack_out_counter = 0,
+ rates = Rates }.
+
+update_rate(Now, OldTS, Count, Count0) ->
+ %% avg over the current period and the previous
+ 1000000.0 * (Count + Count0) / timer:now_diff(Now, OldTS).
ram_duration(State0) ->
State = #vqstate {
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
+ rates = #rates { out_avg = AvgEgressRate,
+ in_avg = AvgIngressRate,
+ ack_out_avg = AvgAckEgressRate,
+ ack_in_avg = AvgAckIngressRate },
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
ram_pending_ack = RPA,
@@ -806,8 +806,8 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-msg_rates(#vqstate { rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate } }) ->
+msg_rates(#vqstate { rates = #rates { out_avg = AvgEgressRate,
+ in_avg = AvgIngressRate } }) ->
{AvgIngressRate, AvgEgressRate}.
status(#vqstate {
@@ -819,10 +819,11 @@ status(#vqstate {
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate } }) ->
+ rates = #rates { in_avg = AvgIngressRate,
+ out_avg = AvgEgressRate,
+ ack_in_avg = AvgAckIngressRate,
+ ack_out_avg = AvgAckEgressRate }}) ->
+
[ {q1 , ?QUEUE:len(Q1)},
{q2 , ?QUEUE:len(Q2)},
{delta , Delta},
@@ -1012,10 +1013,6 @@ expand_delta(SeqId, #delta { count = Count,
expand_delta(_SeqId, #delta { count = Count } = Delta) ->
d(Delta #delta { count = Count + 1 }).
-update_rate(Now, Then, Count, {OThen, OCount}) ->
- %% avg over the current period and the previous
- {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}.
-
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
@@ -1055,22 +1052,26 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ram_ack_count_prev = 0,
out_counter = 0,
in_counter = 0,
- rates = blank_rate(Now, DeltaCount1),
+ rates = blank_rates(Now, DeltaCount1),
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
confirmed = gb_sets:new(),
ack_out_counter = 0,
- ack_in_counter = 0,
- ack_rates = blank_rate(Now, 0) },
+ ack_in_counter = 0 },
a(maybe_deltas_to_betas(State)).
-blank_rate(Timestamp, IngressLength) ->
- #rates { egress = {Timestamp, 0},
- ingress = {Timestamp, IngressLength},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Timestamp }.
+blank_rates(Now, IngressLength) ->
+ #rates { in_count = IngressLength,
+ out_count = 0,
+ ack_in_count = 0,
+ ack_out_count = 0,
+ in_avg = 0,
+ out_avg = 0,
+ ack_in_avg = 0,
+ ack_out_avg = 0,
+ timestamp = Now,
+ old_timestamp = Now}.
in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
@@ -1544,10 +1545,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
- rates = #rates { avg_ingress = AvgIngress,
- avg_egress = AvgEgress },
- ack_rates = #rates { avg_ingress = AvgAckIngress,
- avg_egress = AvgAckEgress }
+ rates = #rates { in_avg = AvgIngress,
+ out_avg = AvgEgress,
+ ack_in_avg = AvgAckIngress,
+ ack_out_avg = AvgAckEgress }
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =