diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 15:44:20 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 15:44:20 +0100 |
commit | 1d2b538b8370114595da4b522b53e1cd951530b7 (patch) | |
tree | e093d00636f75f50c19905f508665c33fa9d6c80 | |
parent | cba76a6a6227080ddc3edd1012e056b92676cddf (diff) | |
download | rabbitmq-server-1d2b538b8370114595da4b522b53e1cd951530b7.tar.gz |
get the external pending acks at the beginning
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 5 |
2 files changed, 7 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index bd33e955..6cfc13c7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast(GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -375,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> Len = BQ:len(BQS), - ok = gm:broadcast(GM, {length, Len}), + ok = gm:broadcast(GM, {length, Len, BQ:pending_ack(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -406,7 +406,8 @@ length_fun() -> fun (?MODULE, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast( + GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}), State end) end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 58b9b644..38bbf59f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -887,8 +887,9 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, State) -> - {ok, set_synchronised(Length, State)}; +process_instruction({length, Length, ExtPending}, State) -> + {ok, set_synchronised(Length, + State #state { external_pending = ExtPending })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> |