summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-25 18:27:09 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-25 18:27:09 +0100
commit42e2c535eaaa385c3175d7b6aff37e2b83f2c721 (patch)
treec3124083f320513cb18cf93db8e02be945a7c4a2
parent6cdf3e7e0a198f969808268f396ebb0a98440068 (diff)
downloadrabbitmq-server-42e2c535eaaa385c3175d7b6aff37e2b83f2c721.tar.gz
Rename set_synchronised to update_delta. And remove that nasty little arithmetic-based hack. We should be explicit about the fact that we treat depth messages from the master differently when we don't have a delta.
-rw-r--r--src/rabbit_mirror_queue_slave.erl34
1 files changed, 19 insertions, 15 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4d53bd1b..7c3aef7a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -800,7 +800,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end, State, lists:duplicate(ToDrop, const)),
{ok, case AckRequired of
true -> State1;
- false -> set_synchronised(ToDrop - Dropped, State1)
+ false -> update_delta(ToDrop - Dropped, State1)
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
@@ -815,7 +815,7 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
_ when QLen =< Remaining andalso AckRequired ->
State;
_ when QLen =< Remaining ->
- set_synchronised(-1, State)
+ update_delta(-1, State)
end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
@@ -824,9 +824,9 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 })};
+ {ok, update_delta(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -853,15 +853,10 @@ process_instruction({sender_death, ChPid},
known_senders = pmon:demonitor(ChPid, KS) }
end};
process_instruction({depth, Depth},
- State = #state { depth_delta = D,
- backing_queue = BQ,
+ State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- D1 = case D of
- undefined -> 1; %% anything but 0 will do here
- _ -> D
- end,
- {ok, set_synchronised(Depth - BQ:depth(BQS) - D1,
- State #state { depth_delta = D1 })};
+ {ok, update_delta_from_master(Depth - BQ:depth(BQS), State)};
+
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -887,9 +882,18 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(_DeltaChange, State = #state { depth_delta = undefined }) ->
+update_delta_from_master(NewDelta, State = #state{depth_delta = undefined}) ->
+ case NewDelta of
+ 0 -> ok = record_synchronised(State#state.q),
+ State #state { depth_delta = 0 };
+ N when N > 0 -> State #state { depth_delta = N }
+ end;
+update_delta_from_master(DeltaChange, State) ->
+ update_delta(DeltaChange, State).
+
+update_delta(_DeltaChange, State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(DeltaChange, State = #state { depth_delta = Delta }) ->
+update_delta(DeltaChange, State = #state { depth_delta = Delta }) ->
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.
case {Delta, Delta + DeltaChange} of