summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl17
1 files changed, 10 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 477449e3..fb9f7e34 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,8 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
- set_ram_duration_target/2, ram_duration/1,
+ requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
@@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -145,7 +145,7 @@ monitor_wait([MRef | MRefs]) ->
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0, false}),
+ ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -187,8 +187,8 @@ dropwhile(Pred, AckRequired,
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
+ ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Next, Msgs, State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 } }.
@@ -274,6 +274,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:is_empty(BQS).
+depth(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:depth(BQS).
+
set_ram_duration_target(Target, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state =
@@ -372,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {length, Len}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -403,7 +406,7 @@ length_fun() ->
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State
end)
end.