summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_backing_queue.erl10
-rw-r--r--src/rabbit_mirror_queue_master.erl7
-rw-r--r--src/rabbit_variable_queue.erl7
3 files changed, 17 insertions, 7 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index ed5340fe..eac1db2f 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -152,6 +152,9 @@
%% Is my queue empty?
-callback is_empty(state()) -> boolean().
+%% How many pending acks do we have?
+-callback pending_ack(state()) -> non_neg_integer().
+
%% For the next three functions, the assumption is that you're
%% monitoring something like the ingress and egress rates of the
%% queue. The RAM duration is thus the length of time represented by
@@ -212,9 +215,10 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
- {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1},
- {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1},
- {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}];
+ {is_empty, 1}, {pending_ack, 1}, {set_ram_duration_target, 2},
+ {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
+ {discard, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 094b83c9..bd33e955 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,8 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
- set_ram_duration_target/2, ram_duration/1,
+ requeue/2, len/1, is_empty/1, pending_ack/1, drain_confirmed/1,
+ dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
@@ -274,6 +274,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:is_empty(BQS).
+pending_ack(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:pending_ack(BQS).
+
set_ram_duration_target(Target, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state =
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bd606dfb..22829765 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,8 +19,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
- timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ pending_ack/1, set_ram_duration_target/2, ram_duration/1,
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -681,6 +681,9 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
+pending_ack(#vqstate { pending_ack = Ack }) ->
+ gb_trees:size(Ack).
+
set_ram_duration_target(
DurationTarget, State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,