diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-30 16:33:01 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-30 16:33:01 +0000 |
commit | 95173fe26aeb4e18653d751ecf9a99e170b20986 (patch) | |
tree | 5d10aa2084473ddd99f16144e2e455d6104455c8 | |
parent | a51739f17601cea3e6e7760d146b51798a64bc94 (diff) | |
parent | ba6c5bec1693585c7ee67ce8d1a4ab76f77fbec5 (diff) | |
download | rabbitmq-server-95173fe26aeb4e18653d751ecf9a99e170b20986.tar.gz |
merge bug25888 into default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 10 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 183 |
6 files changed, 139 insertions, 102 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index da8c0607..9bc5a775 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -22,6 +22,7 @@ -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster -export([start_link/1, info_keys/0]). @@ -833,24 +834,36 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - stat -> 7; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + stat -> 7; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State); + {basic_cancel, _, _, _} -> consumer_bias(State); + _ -> 0 end. -prioritise_cast(Msg, _Len, _State) -> +prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; + {ack, _AckTags, _ChPid} -> consumer_bias(State); + {notify_sent, _ChPid, _Credit} -> consumer_bias(State); + {resume, _ChPid} -> consumer_bias(State); _ -> 0 end. +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) -> + case BQ:msg_rates(BQS) of + {0.0, _} -> 0; + {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> 1; + {_, _} -> 0 + end. + prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2b561900..3d88be7a 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -209,6 +209,10 @@ %% Called immediately before the queue hibernates. -callback handle_pre_hibernate(state()) -> state(). +%% Used to help prioritisation in rabbit_amqqueue_process. The rate of +%% inbound messages and outbound messages at the moment. +-callback msg_rates(state()) -> {float(), float()}. + %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status -callback status(state()) -> [{atom(), any()}]. @@ -236,7 +240,8 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1}, + {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9ce5afcb..b272c64f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2]). + msg_rates/1, status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -353,6 +353,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. +msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:msg_rates(BQS). + status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:status(BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 80e160d9..1c63980e 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -71,6 +71,7 @@ -export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). +-export([moving_average/4]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -251,6 +252,8 @@ -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). +-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined') + -> float()). -endif. %%---------------------------------------------------------------------------- @@ -1088,6 +1091,12 @@ stop_timer(State, Idx) -> store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). +moving_average(_Time, _HalfLife, Next, undefined) -> + Next; +moving_average(Time, HalfLife, Next, Current) -> + Weight = math:exp(Time * math:log(0.5) / HalfLife), + Next * (1 - Weight) + Current * Weight. + %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index bea7e0d0..c9540da8 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -27,6 +27,9 @@ -define(UNSENT_MESSAGE_LIMIT, 200). +%% Utilisation average calculations are all in μs. +-define(USE_AVG_HALF_LIFE, 1000000.0). + -record(state, {consumers, use}). -record(consumer, {tag, ack_required, args}). @@ -430,11 +433,6 @@ update_use({inactive, Since, Active, Avg}, active) -> use_avg(Active, Inactive, Avg) -> Time = Inactive + Active, - Ratio = Active / Time, - Weight = erlang:min(1, Time / 1000000), - case Avg of - undefined -> Ratio; - _ -> Ratio * Weight + Avg * (1 - Weight) - end. + rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg). now_micros() -> timer:now_diff(now(), {0,0,0}). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8711d139..020b5b33 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -21,8 +21,8 @@ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0]). + needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1, + status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -277,11 +277,10 @@ unconfirmed, confirmed, ack_out_counter, - ack_in_counter, - ack_rates + ack_in_counter }). --record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). +-record(rates, { in, out, ack_in, ack_out, timestamp }). -record(msg_status, { seq_id, @@ -322,11 +321,11 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, - ingress :: {timestamp(), non_neg_integer()}, - avg_egress :: float(), - avg_ingress :: float(), - timestamp :: timestamp() }). +-type(rates() :: #rates { in :: float(), + out :: float(), + ack_in :: float(), + ack_out :: float(), + timestamp :: timestamp()}). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer(), @@ -368,8 +367,7 @@ unconfirmed :: gb_set(), confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer(), - ack_rates :: rates() }). + ack_in_counter :: non_neg_integer() }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -384,6 +382,18 @@ count = 0, end_seq_id = Z }). +-define(MICROS_PER_SECOND, 1000000.0). + +%% We're sampling every 5s for RAM duration; a half life that is of +%% the same order of magnitude is probably about right. +-define(RATE_AVG_HALF_LIFE, 5.0). + +%% We will recalculate the #rates{} every time we get asked for our +%% RAM duration, or every N messages published, whichever is +%% sooner. We do this since the priority calculations in +%% rabbit_amqqueue_process need fairly fresh rates. +-define(MSGS_PER_RATE_CALC, 100). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -540,14 +550,18 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, - PCount1 = PCount + one_if(IsPersistent1), + InCount1 = InCount + 1, + PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - a(reduce_memory_use( - inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }))). + State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount1, + persistent_count = PCount1, + unconfirmed = UC1 }), + a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of + true -> update_rates(State3); + false -> State3 + end)). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -696,10 +710,10 @@ depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> set_ram_duration_target( DurationTarget, State = #vqstate { - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate }, + rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }, target_ram_count = TargetRamCount }) -> Rate = AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, @@ -716,29 +730,43 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). -ram_duration(State = #vqstate { - rates = #rates { timestamp = Timestamp, - egress = Egress, - ingress = Ingress } = Rates, - ack_rates = #rates { timestamp = AckTimestamp, - egress = AckEgress, - ingress = AckIngress } = ARates, - in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount, - ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev, - ram_pending_ack = RPA, - ram_ack_count_prev = RamAckCountPrev }) -> - Now = now(), - {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), - {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - - {AvgAckEgressRate, AckEgress1} = - update_rate(Now, AckTimestamp, AckOutCount, AckEgress), - {AvgAckIngressRate, AckIngress1} = - update_rate(Now, AckTimestamp, AckInCount, AckIngress), +update_rates(State = #vqstate{ in_counter = InCount, + out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, + rates = #rates{ in = InRate, + out = OutRate, + ack_in = AckInRate, + ack_out = AckOutRate, + timestamp = TS }}) -> + Now = erlang:now(), + + Rates = #rates { in = update_rate(Now, TS, InCount, InRate), + out = update_rate(Now, TS, OutCount, OutRate), + ack_in = update_rate(Now, TS, AckInCount, AckInRate), + ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), + timestamp = Now }, + + State#vqstate{ in_counter = 0, + out_counter = 0, + ack_in_counter = 0, + ack_out_counter = 0, + rates = Rates }. + +update_rate(Now, TS, Count, Rate) -> + Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND, + rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate). + +ram_duration(State) -> + State1 = #vqstate { rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }, + ram_msg_count = RamMsgCount, + ram_msg_count_prev = RamMsgCountPrev, + ram_pending_ack = RPA, + ram_ack_count_prev = RamAckCountPrev } = + update_rates(State), RamAckCount = gb_trees:size(RPA), @@ -752,25 +780,7 @@ ram_duration(State = #vqstate { AvgAckEgressRate + AvgAckIngressRate)) end, - {Duration, State #vqstate { - rates = Rates #rates { - egress = Egress1, - ingress = Ingress1, - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate, - timestamp = Now }, - ack_rates = ARates #rates { - egress = AckEgress1, - ingress = AckIngress1, - avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate, - timestamp = Now }, - in_counter = 0, - out_counter = 0, - ack_in_counter = 0, - ack_out_counter = 0, - ram_msg_count_prev = RamMsgCount, - ram_ack_count_prev = RamAckCount }}. + {Duration, State1}. needs_timeout(State = #vqstate { index_state = IndexState, target_ram_count = TargetRamCount }) -> @@ -796,6 +806,10 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. +msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, + out = AvgEgressRate } }) -> + {AvgIngressRate, AvgEgressRate}. + status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, @@ -805,10 +819,11 @@ status(#vqstate { ram_msg_count = RamMsgCount, next_seq_id = NextSeqId, persistent_count = PersistentCount, - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - ack_rates = #rates { avg_egress = AvgAckEgressRate, - avg_ingress = AvgAckIngressRate } }) -> + rates = #rates { in = AvgIngressRate, + out = AvgEgressRate, + ack_in = AvgAckIngressRate, + ack_out = AvgAckEgressRate }}) -> + [ {q1 , ?QUEUE:len(Q1)}, {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, @@ -998,10 +1013,6 @@ expand_delta(SeqId, #delta { count = Count, expand_delta(_SeqId, #delta { count = Count } = Delta) -> d(Delta #delta { count = Count + 1 }). -update_rate(Now, Then, Count, {OThen, OCount}) -> - %% avg over the current period and the previous - {1000000.0 * (Count + OCount) / timer:now_diff(Now, OThen), {Then, Count}}. - %%---------------------------------------------------------------------------- %% Internal major helpers for Public API %%---------------------------------------------------------------------------- @@ -1046,22 +1057,21 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_ack_count_prev = 0, out_counter = 0, in_counter = 0, - rates = blank_rate(Now, DeltaCount1), + rates = blank_rates(Now), msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), confirmed = gb_sets:new(), ack_out_counter = 0, - ack_in_counter = 0, - ack_rates = blank_rate(Now, 0) }, + ack_in_counter = 0 }, a(maybe_deltas_to_betas(State)). -blank_rate(Timestamp, IngressLength) -> - #rates { egress = {Timestamp, 0}, - ingress = {Timestamp, IngressLength}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Timestamp }. +blank_rates(Now) -> + #rates { in = 0.0, + out = 0.0, + ack_in = 0.0, + ack_out = 0.0, + timestamp = Now}. in_r(MsgStatus = #msg_status { msg = undefined }, State = #vqstate { q3 = Q3, q4 = Q4 }) -> @@ -1535,11 +1545,10 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, - rates = #rates { avg_ingress = AvgIngress, - avg_egress = AvgEgress }, - ack_rates = #rates { avg_ingress = AvgAckIngress, - avg_egress = AvgAckEgress } - }) -> + rates = #rates { in = AvgIngress, + out = AvgEgress, + ack_in = AvgAckIngress, + ack_out = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of |