diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 17:49:56 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 17:49:56 +0100 |
commit | b09d33688a18af3cb6121d8a95ed70000b3635fa (patch) | |
tree | b1d4ca84e4f22ae65d3cbfb618bf2090096934be | |
parent | 4a60868b78268113c92a77a8d375afe502f314bf (diff) | |
download | rabbitmq-server-b09d33688a18af3cb6121d8a95ed70000b3635fa.tar.gz |
`set_synchronized' accepts the difference instead of separate args
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 4e153ca1..f11426bb 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -829,7 +829,7 @@ process_instruction({set_length, Length, Dropped, AckRequired}, StateN #state { backing_queue_state = BQSN1 }) end, State, lists:duplicate(ToDrop, const)), case AckRequired of - true -> set_synchronised(ToDrop, Dropped, Length, State1); + true -> set_synchronised(Dropped - ToDrop, Length, State1); false -> set_synchronised(Length, State1) end; false -> @@ -861,7 +861,7 @@ process_instruction({ack, MsgIds, Length}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, set_synchronised(length(AckTags), length(MsgIds), Length, + {ok, set_synchronised(length(MsgIds) - length(AckTags), Length, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds, Length}, @@ -870,7 +870,7 @@ process_instruction({requeue, MsgIds, Length}, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - {ok, set_synchronised(length(AckTags), length(MsgIds), Length, + {ok, set_synchronised(length(MsgIds) - length(AckTags), Length, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 })}; process_instruction({sender_death, ChPid}, @@ -918,13 +918,13 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(_, _, _, State = #state { external_pending = undefined }) -> +set_synchronised(_, _, State = #state { external_pending = undefined }) -> State; -set_synchronised(LocalPending, RemotePending, Length, +set_synchronised(PendingDelta, Length, State = #state { backing_queue = BQ, backing_queue_state = BQS, external_pending = ExtPending }) -> - ExtPending1 = ExtPending - (RemotePending - LocalPending), + ExtPending1 = ExtPending - PendingDelta, State1 = State #state { external_pending = ExtPending1 }, case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of true -> set_synchronised1(true, State1); @@ -932,7 +932,7 @@ set_synchronised(LocalPending, RemotePending, Length, end. set_synchronised(Length, State) -> - set_synchronised(0, 0, Length, State). + set_synchronised(0, Length, State). %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. |