diff options
-rw-r--r-- | src/rabbit_backing_queue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 7 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 7 |
3 files changed, 17 insertions, 7 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index ed5340fe..eac1db2f 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -152,6 +152,9 @@ %% Is my queue empty? -callback is_empty(state()) -> boolean(). +%% How many pending acks do we have? +-callback pending_ack(state()) -> non_neg_integer(). + %% For the next three functions, the assumption is that you're %% monitoring something like the ingress and egress rates of the %% queue. The RAM duration is thus the length of time represented by @@ -212,9 +215,10 @@ behaviour_info(callbacks) -> {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, - {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, - {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, - {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}]; + {is_empty, 1}, {pending_ack, 1}, {set_ram_duration_target, 2}, + {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, + {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}, + {discard, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 094b83c9..bd33e955 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,8 +18,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, - set_ram_duration_target/2, ram_duration/1, + requeue/2, len/1, is_empty/1, pending_ack/1, drain_confirmed/1, + dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). @@ -274,6 +274,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:is_empty(BQS). +pending_ack(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:pending_ack(BQS). + set_ram_duration_target(Target, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bd606dfb..22829765 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -19,8 +19,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_timeout/1, - timeout/1, handle_pre_hibernate/1, status/1, invoke/3, + pending_ack/1, set_ram_duration_target/2, ram_duration/1, + needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -681,6 +681,9 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). +pending_ack(#vqstate { pending_ack = Ack }) -> + gb_trees:size(Ack). + set_ram_duration_target( DurationTarget, State = #vqstate { rates = #rates { avg_egress = AvgEgressRate, |