summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-30 16:33:01 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-30 16:33:01 +0000
commit95173fe26aeb4e18653d751ecf9a99e170b20986 (patch)
tree5d10aa2084473ddd99f16144e2e455d6104455c8
parenta51739f17601cea3e6e7760d146b51798a64bc94 (diff)
parentba6c5bec1693585c7ee67ce8d1a4ab76f77fbec5 (diff)
downloadrabbitmq-server-95173fe26aeb4e18653d751ecf9a99e170b20986.tar.gz
merge bug25888 into default
-rw-r--r--src/rabbit_amqqueue_process.erl27
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_queue_consumers.erl10
-rw-r--r--src/rabbit_variable_queue.erl183
6 files changed, 139 insertions, 102 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index da8c0607..9bc5a775 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -22,6 +22,7 @@
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
-export([start_link/1, info_keys/0]).
@@ -833,24 +834,36 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _Len, _State) ->
+prioritise_call(Msg, _From, _Len, State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- stat -> 7;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ stat -> 7;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
+ {basic_cancel, _, _, _} -> consumer_bias(State);
+ _ -> 0
end.
-prioritise_cast(Msg, _Len, _State) ->
+prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
+ {ack, _AckTags, _ChPid} -> consumer_bias(State);
+ {notify_sent, _ChPid, _Credit} -> consumer_bias(State);
+ {resume, _ChPid} -> consumer_bias(State);
_ -> 0
end.
+consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case BQ:msg_rates(BQS) of
+ {0.0, _} -> 0;
+ {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> 1;
+ {_, _} -> 0
+ end.
+
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2b561900..3d88be7a 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -209,6 +209,10 @@
%% Called immediately before the queue hibernates.
-callback handle_pre_hibernate(state()) -> state().
+%% Used to help prioritisation in rabbit_amqqueue_process. The rate of
+%% inbound messages and outbound messages at the moment.
+-callback msg_rates(state()) -> {float(), float()}.
+
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
-callback status(state()) -> [{atom(), any()}].
@@ -236,7 +240,8 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1},
+ {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9ce5afcb..b272c64f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -353,6 +353,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:msg_rates(BQS).
+
status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:status(BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 80e160d9..1c63980e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -71,6 +71,7 @@
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
+-export([moving_average/4]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -251,6 +252,8 @@
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
+-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined')
+ -> float()).
-endif.
%%----------------------------------------------------------------------------
@@ -1088,6 +1091,12 @@ stop_timer(State, Idx) ->
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
+moving_average(_Time, _HalfLife, Next, undefined) ->
+ Next;
+moving_average(Time, HalfLife, Next, Current) ->
+ Weight = math:exp(Time * math:log(0.5) / HalfLife),
+ Next * (1 - Weight) + Current * Weight.
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index bea7e0d0..c9540da8 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -27,6 +27,9 @@
-define(UNSENT_MESSAGE_LIMIT, 200).
+%% Utilisation average calculations are all in μs.
+-define(USE_AVG_HALF_LIFE, 1000000.0).
+
-record(state, {consumers, use}).
-record(consumer, {tag, ack_required, args}).
@@ -430,11 +433,6 @@ update_use({inactive, Since, Active, Avg}, active) ->
use_avg(Active, Inactive, Avg) ->
Time = Inactive + Active,
- Ratio = Active / Time,
- Weight = erlang:min(1, Time / 1000000),
- case Avg of
- undefined -> Ratio;
- _ -> Ratio * Weight + Avg * (1 - Weight)
- end.
+ rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8711d139..020b5b33 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -21,8 +21,8 @@
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0]).
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1,
+ status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -277,11 +277,10 @@
unconfirmed,
confirmed,
ack_out_counter,
- ack_in_counter,
- ack_rates
+ ack_in_counter
}).
--record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
+-record(rates, { in, out, ack_in, ack_out, timestamp }).
-record(msg_status,
{ seq_id,
@@ -322,11 +321,11 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
- ingress :: {timestamp(), non_neg_integer()},
- avg_egress :: float(),
- avg_ingress :: float(),
- timestamp :: timestamp() }).
+-type(rates() :: #rates { in :: float(),
+ out :: float(),
+ ack_in :: float(),
+ ack_out :: float(),
+ timestamp :: timestamp()}).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer(),
@@ -368,8 +367,7 @@
unconfirmed :: gb_set(),
confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
- ack_in_counter :: non_neg_integer(),
- ack_rates :: rates() }).
+ ack_in_counter :: non_neg_integer() }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -384,6 +382,18 @@
count = 0,
end_seq_id = Z }).
+-define(MICROS_PER_SECOND, 1000000.0).
+
+%% We're sampling every 5s for RAM duration; a half life that is of
+%% the same order of magnitude is probably about right.
+-define(RATE_AVG_HALF_LIFE, 5.0).
+
+%% We will recalculate the #rates{} every time we get asked for our
+%% RAM duration, or every N messages published, whichever is
+%% sooner. We do this since the priority calculations in
+%% rabbit_amqqueue_process need fairly fresh rates.
+-define(MSGS_PER_RATE_CALC, 100).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -540,14 +550,18 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
- PCount1 = PCount + one_if(IsPersistent1),
+ InCount1 = InCount + 1,
+ PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- a(reduce_memory_use(
- inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }))).
+ State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }),
+ a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of
+ true -> update_rates(State3);
+ false -> State3
+ end)).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -696,10 +710,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
set_ram_duration_target(
DurationTarget, State = #vqstate {
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
@@ -716,29 +730,43 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
-ram_duration(State = #vqstate {
- rates = #rates { timestamp = Timestamp,
- egress = Egress,
- ingress = Ingress } = Rates,
- ack_rates = #rates { timestamp = AckTimestamp,
- egress = AckEgress,
- ingress = AckIngress } = ARates,
- in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
- ack_out_counter = AckOutCount,
- ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev,
- ram_pending_ack = RPA,
- ram_ack_count_prev = RamAckCountPrev }) ->
- Now = now(),
- {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
- {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
-
- {AvgAckEgressRate, AckEgress1} =
- update_rate(Now, AckTimestamp, AckOutCount, AckEgress),
- {AvgAckIngressRate, AckIngress1} =
- update_rate(Now, AckTimestamp, AckInCount, AckIngress),
+update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
+ rates = #rates{ in = InRate,
+ out = OutRate,
+ ack_in = AckInRate,
+ ack_out = AckOutRate,
+ timestamp = TS }}) ->
+ Now = erlang:now(),
+
+ Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
+ out = update_rate(Now, TS, OutCount, OutRate),
+ ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
+ timestamp = Now },
+
+ State#vqstate{ in_counter = 0,
+ out_counter = 0,
+ ack_in_counter = 0,
+ ack_out_counter = 0,
+ rates = Rates }.
+
+update_rate(Now, TS, Count, Rate) ->
+ Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND,
+ rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate).
+
+ram_duration(State) ->
+ State1 = #vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
+ ram_msg_count = RamMsgCount,
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_pending_ack = RPA,
+ ram_ack_count_prev = RamAckCountPrev } =
+ update_rates(State),
RamAckCount = gb_trees:size(RPA),
@@ -752,25 +780,7 @@ ram_duration(State = #vqstate {
AvgAckEgressRate + AvgAckIngressRate))
end,
- {Duration, State #vqstate {
- rates = Rates #rates {
- egress = Egress1,
- ingress = Ingress1,
- avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate,
- timestamp = Now },
- ack_rates = ARates #rates {
- egress = AckEgress1,
- ingress = AckIngress1,
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate,
- timestamp = Now },
- in_counter = 0,
- out_counter = 0,
- ack_in_counter = 0,
- ack_out_counter = 0,
- ram_msg_count_prev = RamMsgCount,
- ram_ack_count_prev = RamAckCount }}.
+ {Duration, State1}.
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->
@@ -796,6 +806,10 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate } }) ->
+ {AvgIngressRate, AvgEgressRate}.
+
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
@@ -805,10 +819,11 @@ status(#vqstate {
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
- rates = #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate } }) ->
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate }}) ->
+
[ {q1 , ?QUEUE:len(Q1)},
{q2 , ?QUEUE:len(Q2)},
{delta , Delta},
@@ -998,10 +1013,6 @@ expand_delta(SeqId, #delta { count = Count,
expand_delta(_SeqId, #delta { count = Count } = Delta) ->
d(Delta #delta { count = Count + 1 }).
-update_rate(Now, Then, Count, {OThen, OCount}) ->
- %% avg over the current period and the previous
- {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}.
-
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
@@ -1046,22 +1057,21 @@ init(IsDurable, IndexState, DeltaCount, Terms,
ram_ack_count_prev = 0,
out_counter = 0,
in_counter = 0,
- rates = blank_rate(Now, DeltaCount1),
+ rates = blank_rates(Now),
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
confirmed = gb_sets:new(),
ack_out_counter = 0,
- ack_in_counter = 0,
- ack_rates = blank_rate(Now, 0) },
+ ack_in_counter = 0 },
a(maybe_deltas_to_betas(State)).
-blank_rate(Timestamp, IngressLength) ->
- #rates { egress = {Timestamp, 0},
- ingress = {Timestamp, IngressLength},
- avg_egress = 0.0,
- avg_ingress = 0.0,
- timestamp = Timestamp }.
+blank_rates(Now) ->
+ #rates { in = 0.0,
+ out = 0.0,
+ ack_in = 0.0,
+ ack_out = 0.0,
+ timestamp = Now}.
in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
@@ -1535,11 +1545,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
- rates = #rates { avg_ingress = AvgIngress,
- avg_egress = AvgEgress },
- ack_rates = #rates { avg_ingress = AvgAckIngress,
- avg_egress = AvgAckEgress }
- }) ->
+ rates = #rates { in = AvgIngress,
+ out = AvgEgress,
+ ack_in = AvgAckIngress,
+ ack_out = AvgAckEgress } }) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of