summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 17:49:56 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 17:49:56 +0100
commitb09d33688a18af3cb6121d8a95ed70000b3635fa (patch)
treeb1d4ca84e4f22ae65d3cbfb618bf2090096934be
parent4a60868b78268113c92a77a8d375afe502f314bf (diff)
downloadrabbitmq-server-b09d33688a18af3cb6121d8a95ed70000b3635fa.tar.gz
`set_synchronized' accepts the difference instead of separate args
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
1 files changed, 7 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4e153ca1..f11426bb 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -829,7 +829,7 @@ process_instruction({set_length, Length, Dropped, AckRequired},
StateN #state { backing_queue_state = BQSN1 })
end, State, lists:duplicate(ToDrop, const)),
case AckRequired of
- true -> set_synchronised(ToDrop, Dropped, Length, State1);
+ true -> set_synchronised(Dropped - ToDrop, Length, State1);
false -> set_synchronised(Length, State1)
end;
false ->
@@ -861,7 +861,7 @@ process_instruction({ack, MsgIds, Length},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, set_synchronised(length(AckTags), length(MsgIds), Length,
+ {ok, set_synchronised(length(MsgIds) - length(AckTags), Length,
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds, Length},
@@ -870,7 +870,7 @@ process_instruction({requeue, MsgIds, Length},
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- {ok, set_synchronised(length(AckTags), length(MsgIds), Length,
+ {ok, set_synchronised(length(MsgIds) - length(AckTags), Length,
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 })};
process_instruction({sender_death, ChPid},
@@ -918,13 +918,13 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(_, _, _, State = #state { external_pending = undefined }) ->
+set_synchronised(_, _, State = #state { external_pending = undefined }) ->
State;
-set_synchronised(LocalPending, RemotePending, Length,
+set_synchronised(PendingDelta, Length,
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
external_pending = ExtPending }) ->
- ExtPending1 = ExtPending - (RemotePending - LocalPending),
+ ExtPending1 = ExtPending - PendingDelta,
State1 = State #state { external_pending = ExtPending1 },
case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of
true -> set_synchronised1(true, State1);
@@ -932,7 +932,7 @@ set_synchronised(LocalPending, RemotePending, Length,
end.
set_synchronised(Length, State) ->
- set_synchronised(0, 0, Length, State).
+ set_synchronised(0, Length, State).
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.