summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-24 12:50:40 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-24 12:50:40 +0000
commite25426838aab44876d11845847606ee6ffee00b0 (patch)
treed1af0d4bac18550ec635d8b1fe2ae38d11e0cae1
parent55b5f4c07614ea84f5f4499eb1417cb6844e0e14 (diff)
parentdc717d13bdaff8ee0b1e78b33596491f43fe1b30 (diff)
downloadrabbitmq-server-e25426838aab44876d11845847606ee6ffee00b0.tar.gz
Merge in default
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_variable_queue.erl8
4 files changed, 37 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2b0882b8..b4891d99 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -22,6 +22,7 @@
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(CONSUMER_BIAS, 0.8).
-export([start_link/1, info_keys/0]).
@@ -829,24 +830,37 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _Len, _State) ->
+prioritise_call(Msg, _From, _Len, State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- stat -> 7;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ stat -> 7;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
+ {basic_cancel, _, _, _} -> consumer_bias(State);
+ _ -> 0
end.
-prioritise_cast(Msg, _Len, _State) ->
+prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
+ {ack, _AckTags, _ChPid} -> consumer_bias(State);
+ {notify_sent, _ChPid, _Credit} -> consumer_bias(State);
+ {resume, _ChPid} -> consumer_bias(State);
_ -> 0
end.
+consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {Ingress, Egress} = BQ:msg_rates(BQS),
+ case ?CONSUMER_BIAS of
+ B when B > 0.0 andalso Ingress >= (1.0 - B) * Egress -> +1;
+ B when B < 0.0 andalso Egress >= (1.0 + B) * Ingress -> -1;
+ _ -> 0
+ end.
+
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 61b504bc..1bbd1543 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -203,6 +203,10 @@
%% Called immediately before the queue hibernates.
-callback handle_pre_hibernate(state()) -> state().
+%% Used to help prioritisation in rabbit_amqqueue_process. The rate of
+%% inbound messages and outbound messages at the moment.
+-callback msg_rates(state()) -> {float(), float()}.
+
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
-callback status(state()) -> [{atom(), any()}].
@@ -230,7 +234,8 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {msg_rates, 1}, {status, 1},
+ {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9ce5afcb..b272c64f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -353,6 +353,9 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:msg_rates(BQS).
+
status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:status(BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52..aa090ed4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -21,8 +21,8 @@
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0]).
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1,
+ status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -789,6 +789,10 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+msg_rates(#vqstate { rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate } }) ->
+ {AvgIngressRate, AvgEgressRate}.
+
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,