summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 15:44:20 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 15:44:20 +0100
commit1d2b538b8370114595da4b522b53e1cd951530b7 (patch)
treee093d00636f75f50c19905f508665c33fa9d6c80
parentcba76a6a6227080ddc3edd1012e056b92676cddf (diff)
downloadrabbitmq-server-1d2b538b8370114595da4b522b53e1cd951530b7.tar.gz
get the external pending acks at the beginning
-rw-r--r--src/rabbit_mirror_queue_master.erl7
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
2 files changed, 7 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index bd33e955..6cfc13c7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -375,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {length, Len}),
+ ok = gm:broadcast(GM, {length, Len, BQ:pending_ack(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -406,7 +406,8 @@ length_fun() ->
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(
+ GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}),
State
end)
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 58b9b644..38bbf59f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -887,8 +887,9 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length}, State) ->
- {ok, set_synchronised(Length, State)};
+process_instruction({length, Length, ExtPending}, State) ->
+ {ok, set_synchronised(Length,
+ State #state { external_pending = ExtPending })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->