diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-24 12:50:40 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-24 12:50:40 +0000 |
commit | e25426838aab44876d11845847606ee6ffee00b0 (patch) | |
tree | d1af0d4bac18550ec635d8b1fe2ae38d11e0cae1 | |
parent | 55b5f4c07614ea84f5f4499eb1417cb6844e0e14 (diff) | |
parent | dc717d13bdaff8ee0b1e78b33596491f43fe1b30 (diff) | |
download | rabbitmq-server-e25426838aab44876d11845847606ee6ffee00b0.tar.gz |
Merge in default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
4 files changed, 37 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2b0882b8..b4891d99 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -22,6 +22,7 @@ -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS, 0.8). -export([start_link/1, info_keys/0]). @@ -829,24 +830,37 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - stat -> 7; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + stat -> 7; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State); + {basic_cancel, _, _, _} -> consumer_bias(State); + _ -> 0 end. -prioritise_cast(Msg, _Len, _State) -> +prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; + {ack, _AckTags, _ChPid} -> consumer_bias(State); + {notify_sent, _ChPid, _Credit} -> consumer_bias(State); + {resume, _ChPid} -> consumer_bias(State); _ -> 0 end. +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) -> + {Ingress, Egress} = BQ:msg_rates(BQS), + case ?CONSUMER_BIAS of + B when B > 0.0 andalso Ingress >= (1.0 - B) * Egress -> +1; + B when B < 0.0 andalso Egress >= (1.0 + B) * Ingress -> -1; + _ -> 0 + end. + prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 61b504bc..1bbd1543 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -203,6 +203,10 @@ %% Called immediately before the queue hibernates. -callback handle_pre_hibernate(state()) -> state(). +%% Used to help prioritisation in rabbit_amqqueue_process. The rate of +%% inbound messages and outbound messages at the moment. +-callback msg_rates(state()) -> {float(), float()}. + %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status -callback status(state()) -> [{atom(), any()}]. @@ -230,7 +234,8 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1}, + {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9ce5afcb..b272c64f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2]). + msg_rates/1, status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -353,6 +353,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. +msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:msg_rates(BQS). + status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:status(BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac2b9f52..aa090ed4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -21,8 +21,8 @@ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0]). + needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1, + status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -789,6 +789,10 @@ timeout(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. +msg_rates(#vqstate { rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate } }) -> + {AvgIngressRate, AvgEgressRate}. + status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, |